Commit e2eb2408 authored by Roman Chvanikov's avatar Roman Chvanikov

Add scheduling

parent 1445c55f
......@@ -97,6 +97,13 @@ defmodule PleromaJobQueue do
end
end
def schedule(schedule, queue, mod, args \\ [], priority \\ 1) do
with {:ok, %Crontab.CronExpression{} = cron_expr} <-
Crontab.CronExpression.Parser.parse(schedule) do
send(PleromaJobQueue.Worker, {:schedule, cron_expr, queue, mod, args, priority})
end
end
@doc """
Schedules a job to be enqueued at specific time in the future.
......
......@@ -25,6 +25,17 @@ defmodule PleromaJobQueue.Worker do
|> Enum.into(%{})
|> Map.merge(queues)
:pleroma_job_queue
|> Application.get_env(:schedule, [])
|> Enum.each(fn {queue, %{cron_expr: cron_expr_unparsed, module: mod} = params} ->
with %Crontab.CronExpression{} = cron_expr <-
Crontab.CronExpression.Parser.parse(cron_expr_unparsed) do
args = Map.get(params, :args, [])
priority = Map.get(params, :priority, 1)
send(self(), {:schedule, cron_expr, queue, mod, args, priority})
end
end)
{:ok, %State{state | queues: queues}}
end
......@@ -42,6 +53,34 @@ defmodule PleromaJobQueue.Worker do
{:noreply, state}
end
def handle_info(
{:schedule, %Crontab.CronExpression{} = cron_expr, queue, mod, args, priority},
state
) do
next_run_date = Crontab.Scheduler.get_next_run_date(cron_expr)
interval = NaiveDateTime.diff(next_run_date, NaiveDateTime.utc_now())
Process.send_after(
self(),
{:enqueue_scheduled, queue, mod, args, priority, interval},
interval
)
{:noreply, state}
end
def handle_info({:enqueue_scheduled, queue, mod, args, priority, interval}, state) do
GenServer.cast(self(), {:enqueue, queue, mod, args, priority})
Process.send_after(
self(),
{:enqueue_scheduled, queue, mod, args, priority, interval},
interval
)
{:noreply, state}
end
@impl true
def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
queue_name = state.refs[ref]
......
......@@ -35,6 +35,7 @@ defmodule PleromaJobQueue.MixProject do
# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:crontab, "~> 1.1"},
{:credo, "~> 1.0.0", only: [:dev, :test], runtime: false},
{:ex_doc, "~> 0.19", only: :dev, runtime: false},
{:dialyxir, "~> 1.0.0-rc.5", only: [:dev], runtime: false}
......
%{
"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm"},
"credo": {:hex, :credo, "1.0.4", "d2214d4cc88c07f54004ffd5a2a27408208841be5eca9f5a72ce9e8e835f7ede", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"},
"crontab": {:hex, :crontab, "1.1.7", "b9219f0bdc8678b94143655a8f229716c5810c0636a4489f98c0956137e53985", [:mix], [{:ecto, "~> 1.0 or ~> 2.0 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: true]}], "hexpm"},
"dialyxir": {:hex, :dialyxir, "1.0.0-rc.5", "c9c2379c59cf2dfc74690f48866e33ffb55ff660e5e02405c14614d204efdc4f", [:mix], [{:erlex, "~> 0.2.1", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm"},
"earmark": {:hex, :earmark, "1.3.2", "b840562ea3d67795ffbb5bd88940b1bed0ed9fa32834915125ea7d02e35888a5", [:mix], [], "hexpm"},
"erlex": {:hex, :erlex, "0.2.1", "cee02918660807cbba9a7229cae9b42d1c6143b768c781fa6cee1eaf03ad860b", [:mix], [], "hexpm"},
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment