Commit 875b1d4b authored by minibikini's avatar minibikini

Merge remote-tracking branch 'origin/master'

parents efcfaa36 f9026dba
......@@ -25,3 +25,6 @@ pleroma_job_queue-*.tar
# Dialyzer
/priv/plts/*.plt
/priv/plts/*.plt.hash
# Elixir language server
/.elixir_ls
......@@ -3,4 +3,4 @@
## [unreleased]
### Added
- Scheduler: ability to enqueue jobs in the future
- Scheduler: add cron-like scheduling for repeating jobs
......@@ -97,6 +97,40 @@ defmodule PleromaJobQueue do
end
end
@doc """
Schedule a repeating task that will be enqueued with given params according to
the given cron expression.
In case of invalid cron expression given, an error will be returned.
## Examples
Enqueue `MyWorker.perform/0` to be repeated every minute:
iex> PleromaJobQueue.schedule("* * * * *", :queue_name, MyWorker)
:ok
Enqueue `MyWorker.perform(:arg1, :arg2)` with priority 5 to be repeated every Sunday midnight:
iex> PleromaJobQueue.schedule("0 0 * * 7", :queue_name, MyWorker, [:arg1, :arg2], 5)
:ok
Invalid cron expression:
iex> PleromaJobQueue.schedule("9 9 9 9 9", :queue_name, MyWorker, [:arg1, :arg2], 5)
{:error, "Can't parse 9 as day of week"}
"""
@spec schedule(String.t(), atom(), module(), [any()], non_neg_integer()) ::
:ok | {:error, String.t()}
def schedule(schedule, queue, mod, args \\ [], priority \\ 1) do
with {:ok, %Crontab.CronExpression{} = cron_expr} <-
Crontab.CronExpression.Parser.parse(schedule) do
send(PleromaJobQueue.Worker, {:schedule, cron_expr, queue, mod, args, priority})
:ok
end
end
@doc """
Schedules a job to be enqueued at specific time in the future.
......
......@@ -25,6 +25,17 @@ defmodule PleromaJobQueue.Worker do
|> Enum.into(%{})
|> Map.merge(queues)
:pleroma_job_queue
|> Application.get_env(:schedule, [])
|> Enum.each(fn {queue, %{cron_expr: cron_expr_unparsed, module: mod} = params} ->
with {:ok, %Crontab.CronExpression{} = cron_expr} <-
Crontab.CronExpression.Parser.parse(cron_expr_unparsed) do
args = Map.get(params, :args, [])
priority = Map.get(params, :priority, 1)
send(self(), {:schedule, cron_expr, queue, mod, args, priority})
end
end)
{:ok, %State{state | queues: queues}}
end
......@@ -42,6 +53,25 @@ defmodule PleromaJobQueue.Worker do
{:noreply, state}
end
def handle_info(
{:schedule, %Crontab.CronExpression{} = cron_expr, queue, mod, args, priority},
state
) do
with {:ok, next_run_date} <- Crontab.Scheduler.get_next_run_date(cron_expr) do
interval = NaiveDateTime.diff(next_run_date, NaiveDateTime.utc_now())
enqueue_scheduled(queue, mod, args, priority, interval)
end
{:noreply, state}
end
def handle_info({:enqueue_scheduled, queue, mod, args, priority, interval}, state) do
GenServer.cast(self(), {:enqueue, queue, mod, args, priority})
enqueue_scheduled(queue, mod, args, priority, interval)
{:noreply, state}
end
@impl true
def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
queue_name = state.refs[ref]
......@@ -102,4 +132,12 @@ defmodule PleromaJobQueue.Worker do
defp update_queue(%State{queues: queues} = state, queue_name, data) do
%State{state | queues: Map.put(queues, queue_name, data)}
end
defp enqueue_scheduled(queue, mod, args, priority, interval) do
Process.send_after(
__MODULE__,
{:enqueue_scheduled, queue, mod, args, priority, interval},
interval
)
end
end
......@@ -35,9 +35,11 @@ defmodule PleromaJobQueue.MixProject do
# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:crontab, "~> 1.1"},
{:credo, "~> 1.0.0", only: [:dev, :test], runtime: false},
{:ex_doc, "~> 0.19", only: :dev, runtime: false},
{:dialyxir, "~> 1.0.0-rc.5", only: [:dev], runtime: false}
{:dialyxir, "~> 1.0.0-rc.5", only: [:dev], runtime: false},
{:mock, "~> 0.3.3", only: [:test], runtime: false}
]
end
......
%{
"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm"},
"credo": {:hex, :credo, "1.0.4", "d2214d4cc88c07f54004ffd5a2a27408208841be5eca9f5a72ce9e8e835f7ede", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"},
"crontab": {:hex, :crontab, "1.1.7", "b9219f0bdc8678b94143655a8f229716c5810c0636a4489f98c0956137e53985", [:mix], [{:ecto, "~> 1.0 or ~> 2.0 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: true]}], "hexpm"},
"dialyxir": {:hex, :dialyxir, "1.0.0-rc.5", "c9c2379c59cf2dfc74690f48866e33ffb55ff660e5e02405c14614d204efdc4f", [:mix], [{:erlex, "~> 0.2.1", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm"},
"earmark": {:hex, :earmark, "1.3.2", "b840562ea3d67795ffbb5bd88940b1bed0ed9fa32834915125ea7d02e35888a5", [:mix], [], "hexpm"},
"erlex": {:hex, :erlex, "0.2.1", "cee02918660807cbba9a7229cae9b42d1c6143b768c781fa6cee1eaf03ad860b", [:mix], [], "hexpm"},
......@@ -8,5 +9,7 @@
"jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"},
"makeup": {:hex, :makeup, "0.8.0", "9cf32aea71c7fe0a4b2e9246c2c4978f9070257e5c9ce6d4a28ec450a839b55f", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"},
"makeup_elixir": {:hex, :makeup_elixir, "0.13.0", "be7a477997dcac2e48a9d695ec730b2d22418292675c75aa2d34ba0909dcdeda", [:mix], [{:makeup, "~> 0.8", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"},
"meck": {:hex, :meck, "0.8.13", "ffedb39f99b0b99703b8601c6f17c7f76313ee12de6b646e671e3188401f7866", [:rebar3], [], "hexpm"},
"mock": {:hex, :mock, "0.3.3", "42a433794b1291a9cf1525c6d26b38e039e0d3a360732b5e467bfc77ef26c914", [:mix], [{:meck, "~> 0.8.13", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm"},
}
......@@ -5,6 +5,8 @@
defmodule PleromaJobQueue.WorkerTest do
use ExUnit.Case
import Mock
defmodule(TestWorker, do: def(perform(:test_job, _, _), do: :ok))
alias PleromaJobQueue.State
......@@ -95,4 +97,42 @@ defmodule PleromaJobQueue.WorkerTest do
assert :sets.size(running_jobs) == 0
end
defmodule ScheduledWorker do
def perform(pid) do
send(pid, :executing)
end
end
describe "Scheduling" do
setup do
Application.stop(:pleroma_job_queue)
schedule = [
test_queue: %{
cron_expr: "* * * * *",
module: __MODULE__.ScheduledWorker,
args: [self()]
}
]
Application.put_env(:pleroma_job_queue, :schedule, schedule)
end
test "Scheduled tasks are executed repeatedly" do
get_next_run_date_mock = [
get_next_run_date: fn _cron_expr ->
{:ok, NaiveDateTime.add(NaiveDateTime.utc_now(), 1, :second)}
end
]
with_mock Crontab.Scheduler, get_next_run_date_mock do
Application.start(:pleroma_job_queue)
Enum.each(1..3, fn _i ->
assert_receive :executing, 1_500
end)
end
end
end
end
......@@ -6,6 +6,8 @@ defmodule PleromaJobQueueTest do
use ExUnit.Case
alias PleromaJobQueue.Scheduler
import Mock
defmodule Worker do
defp pid, do: Application.get_env(:pleroma_job_queue, :test_pid)
......@@ -92,5 +94,20 @@ defmodule PleromaJobQueueTest do
assert priority == 1
end
test "schedule" do
set_pid()
get_next_run_date_mock = [
get_next_run_date: fn _cron_expr ->
{:ok, NaiveDateTime.add(NaiveDateTime.utc_now(), 1, :second)}
end
]
with_mock Crontab.Scheduler, get_next_run_date_mock do
PleromaJobQueue.schedule("* * * * *", @queue_name, Worker)
assert_receive {:test, :no_args}, 1_500
end
end
defp set_pid, do: Application.put_env(:pleroma_job_queue, :test_pid, self())
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment