Commit 195c8838 authored by Eugenij's avatar Eugenij

Add an ability to schedule jobs to be executed in the future.

parent eca00bfa
# Changelog
## [unreleased]
### Added
- Scheduler: ability to enqueue jobs in the future
...@@ -28,6 +28,22 @@ config :pleroma_job_queue, :queues, ...@@ -28,6 +28,22 @@ config :pleroma_job_queue, :queues,
another_queue: 50 another_queue: 50
``` ```
Configure the scheduler like this:
```elixir
config :pleroma_job_queue, :scheduler,
enabled: true,
poll_interval: :timer.seconds(10),
store: PleromaJobQueue.Scheduler.Store.ETS
```
* `enabled` - whether the scheduler is enabled (Default: `false`)
* `poll_interval` - how often to check for scheduled jobs in milliseconds (Default: `10_000`)
* `store` - a module that stores scheduled jobs. It should implement the `PleromaJobQueue.Scheduler.Store` behavior. The default is an in-memory store based on ETS tables: `PleromaJobQueue.Scheduler.Store.ETS`.
The scheduler allows you to execute jobs at specific time in the future.
By default it uses an in-memory ETS table which means the jobs won't be available after restart.
## Usage ## Usage
[See documentation](http://hexdocs.pm/pleroma_job_queue) [See documentation](http://hexdocs.pm/pleroma_job_queue)
......
...@@ -5,3 +5,7 @@ use Mix.Config ...@@ -5,3 +5,7 @@ use Mix.Config
# file won't be loaded nor affect the parent project. For this reason, # file won't be loaded nor affect the parent project. For this reason,
# if you want to provide default values for your application for # if you want to provide default values for your application for
# third-party users, it should be done in your "mix.exs" file. # third-party users, it should be done in your "mix.exs" file.
if File.exists?("./config/#{Mix.env()}.exs") do
import_config("#{Mix.env()}.exs")
end
use Mix.Config
config :pleroma_job_queue, :scheduler, enabled: true
...@@ -33,6 +33,22 @@ defmodule PleromaJobQueue do ...@@ -33,6 +33,22 @@ defmodule PleromaJobQueue do
```elixir ```elixir
config :pleroma_job_queue, disabled: true config :pleroma_job_queue, disabled: true
``` ```
Configure the scheduler like this:
```elixir
config :pleroma_job_queue, :scheduler,
enabled: true,
poll_interval: :timer.seconds(10),
store: PleromaJobQueue.Scheduler.Store.ETS
```
* `enabled` - whether the scheduler is enabled (Default: `true`)
* `poll_interval` - how often to check for scheduled jobs in milliseconds (Default: `10_000`)
* `store` - a module that stores scheduled jobs. It should implement the `PleromaJobQueue.Scheduler.Store` behavior. The default is an in-memory store based on ETS tables: `PleromaJobQueue.Scheduler.Store.ETS`.
The scheduler allows you to execute jobs at specific time in the future.
By default it uses an in-memory ETS table which means the jobs won't be available after restart.
""" """
@doc """ @doc """
...@@ -42,10 +58,10 @@ defmodule PleromaJobQueue do ...@@ -42,10 +58,10 @@ defmodule PleromaJobQueue do
## Arguments ## Arguments
- `queue_name` - a queue name(must be specified in the config). * `queue_name` - a queue name(must be specified in the config).
- `mod` - a worker module (must have `perform` function). * `mod` - a worker module (must have `perform` function).
- `args` - a list of arguments for the `perform` function of the worker module. * `args` - a list of arguments for the `perform` function of the worker module.
- `priority` - a job priority (`1` by default). The higher number has a lower priority. * `priority` - a job priority (`1` by default). The higher number has a lower priority.
## Examples ## Examples
...@@ -81,6 +97,59 @@ defmodule PleromaJobQueue do ...@@ -81,6 +97,59 @@ defmodule PleromaJobQueue do
end end
end end
@doc """
Schedules a job to be enqueued at specific time in the future.
## Arguments
* `timestamp` - a unix timestamp in milliseconds
* `queue_name` - a queue name (must be specified in the config).
* `mod` - a worker module (must have `perform` function).
* `args` - a list of arguments for the `perform` function of the worker module.
* `priority` - a job priority (`1` by default). The higher number has a lower priority.
## Examples
Enqueue `MyWorker.perform/0` at specific time:
iex> time = DateTime.to_unix(DateTime.utc_now(), :millisecond)
iex> enqueue_at(time, :example_queue, MyWorker)
:ok
"""
@spec enqueue_at(pos_integer, atom(), module(), [any()], non_neg_integer()) :: any()
def enqueue_at(timestamp, queue_name, mod, args \\ [], priority \\ 1) do
if enabled?() do
GenServer.cast(
PleromaJobQueue.Scheduler,
{:enqueue_at, timestamp, queue_name, mod, args, priority}
)
end
end
@doc """
Schedules a job to be enqueued after the given offset in milliseconds.
## Arguments
* `offset` - an offset from now in milliseconds
* `queue_name` - a queue name (must be specified in the config).
* `mod` - a worker module (must have `perform` function).
* `args` - a list of arguments for the `perform` function of the worker module.
* `priority` - a job priority (`1` by default). The higher number has a lower priority.
## Examples
Enqueue `MyWorker.perform/0` after 10 seconds:
iex> enqueue_in(:timer.seconds(10), :example_queue, MyWorker)
:ok
"""
@spec enqueue_in(non_neg_integer(), atom(), module(), [any()], non_neg_integer()) :: any()
def enqueue_in(offset, queue_name, mod, args \\ [], priority \\ 1) do
now = :os.system_time(:millisecond)
enqueue_at(now + offset, queue_name, mod, args, priority)
end
@doc """ @doc """
Returns a maximum concurrent jobs for a given queue name. Returns a maximum concurrent jobs for a given queue name.
""" """
...@@ -92,5 +161,5 @@ defmodule PleromaJobQueue do ...@@ -92,5 +161,5 @@ defmodule PleromaJobQueue do
|> Keyword.get(queue_name, 1) |> Keyword.get(queue_name, 1)
end end
defp enabled?(), do: not Application.get_env(:pleroma_job_queue, :disabled, false) def enabled?(), do: not Application.get_env(:pleroma_job_queue, :disabled, false)
end end
...@@ -14,7 +14,8 @@ defmodule PleromaJobQueue.Application do ...@@ -14,7 +14,8 @@ defmodule PleromaJobQueue.Application do
children = [ children = [
# Starts a worker by calling: PleromaJobQueue.Worker.start_link(arg) # Starts a worker by calling: PleromaJobQueue.Worker.start_link(arg)
# {PleromaJobQueue.Worker, arg} # {PleromaJobQueue.Worker, arg}
PleromaJobQueue.Worker PleromaJobQueue.Worker,
PleromaJobQueue.Scheduler
] ]
# See https://hexdocs.pm/elixir/Supervisor.html # See https://hexdocs.pm/elixir/Supervisor.html
......
# PleromaJobQueue: A lightweight job queue
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule PleromaJobQueue.Scheduler do
@moduledoc """
Scheduler.
The workflow is as follows:
* Add jobs to the store.
* Every N ms, check for expired jobs and send them to the execution queue.
"""
use GenServer
@poll_interval :pleroma_job_queue
|> Application.get_env(:scheduler, [])
|> Keyword.get(:poll_interval, :timer.seconds(10))
@store :pleroma_job_queue
|> Application.get_env(:scheduler, [])
|> Keyword.get(:store, PleromaJobQueue.Scheduler.Store.ETS)
def start_link([]) do
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end
@spec init(any()) :: {:ok, :ets.tid()} | :ignore
@impl true
def init(_) do
with true <- PleromaJobQueue.enabled?() and enabled?(),
{:ok, state} = @store.init([]) do
schedule_next(:poll)
{:ok, state}
else
_ -> :ignore
end
end
@impl true
def handle_cast({:enqueue_at, timestamp, queue_name, mod, args, priority}, state) do
job = %{
queue_name: queue_name,
mod: mod,
args: args,
priority: priority
}
@store.insert(state, timestamp, job)
{:noreply, state}
end
@impl true
def handle_call({:get_all}, _, state) do
result = @store.all(state)
{:reply, result, state}
end
def get_all do
GenServer.call(__MODULE__, {:get_all})
end
@impl true
def handle_info(:poll, state) do
now = :os.system_time(:millisecond)
@store.drain(state, now)
schedule_next(:poll)
{:noreply, state}
end
defp schedule_next(:poll) do
Process.send_after(self(), :poll, @poll_interval)
end
def enabled? do
:pleroma_job_queue
|> Application.get_env(:scheduler, [])
|> Keyword.get(:enabled, false)
end
end
# PleromaJobQueue: A lightweight job queue
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule PleromaJobQueue.Scheduler.Store do
@moduledoc """
Defines an interface for the store
"""
@doc """
Initialize the store.
"""
@callback init(options :: list) :: {:ok, store :: any()} | {:error, error :: any()}
@doc """
Insert a scheduled job.
* `store` - a representation of the store
* `timestamp` - a unix timestamp when the job should be enqueued to the execution queue.
* `job` - a map that represents the job
"""
@callback insert(store :: any(), timestamp :: pos_integer, job :: map) :: :ok | {:error, any()}
@doc """
Delete the jobs that are expired and send them to the execution queue.
* `store` - a representation of the store.
* `timestamp` - a unix timestamp past which the jobs should be considered expired.
"""
@callback drain(store :: any(), timestamp :: pos_integer) :: any()
@doc """
Retrieve all the jobs currently present in the store.
"""
@callback all(store :: any()) :: list
end
# PleromaJobQueue: A lightweight job queue
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule PleromaJobQueue.Scheduler.Store.ETS do
@moduledoc """
Implements a store using ETS.
"""
alias PleromaJobQueue.Scheduler.Store
@behaviour Store
@impl true
def init(_) do
index_table = :ets.new(Module.concat([__MODULE__, IndexTable]), [:ordered_set])
entry_table = :ets.new(Module.concat([__MODULE__, EntryTable]), [:duplicate_bag])
{:ok, {index_table, entry_table}}
end
@impl true
def insert({index_table, entry_table}, timestamp, job) do
:ets.insert(index_table, {timestamp, :const})
:ets.insert(entry_table, {timestamp, job})
:ok
end
@impl true
def drain({index_table, entry_table} = state, now) do
first = :ets.first(index_table)
cond do
first == :"$end_of_table" ->
:noop
first <= now ->
:ets.delete(index_table, first)
entry_table
|> :ets.take(first)
|> Enum.each(fn {_, job} ->
PleromaJobQueue.enqueue(job.queue_name, job.mod, job.args, job.priority)
end)
drain(state, now)
true ->
:noop
end
end
@impl true
def all({_index_table, entry_table}) do
:ets.tab2list(entry_table)
end
end
# PleromaJobQueue: A lightweight job queue
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule PleromaJobQueue.SchedulerTest do
use ExUnit.Case
defmodule(TestWorker, do: def(perform(:test_job, _, _), do: :ok))
alias PleromaJobQueue.Scheduler
@queue_name :testing
test "enqueue_at" do
time1 = :os.system_time(:millisecond) - :timer.seconds(4)
time2 = :os.system_time(:millisecond) + :timer.seconds(4)
time3 = :os.system_time(:millisecond) + :timer.seconds(14)
PleromaJobQueue.enqueue_at(time1, @queue_name, Scheduler, [:test_job, :foo, :bar], 1)
PleromaJobQueue.enqueue_at(time2, @queue_name, Scheduler, [:test_job, :foo, :bar], 2)
PleromaJobQueue.enqueue_at(time3, @queue_name, Scheduler, [:test_job, :foo, :bar], 3)
PleromaJobQueue.enqueue_at(time3, @queue_name, Scheduler, [:test_job, :bar, :foo], 3)
pid = Process.whereis(Scheduler)
Process.send(pid, :poll, [])
expected1 =
{time1,
%{
args: [:test_job, :foo, :bar],
mod: Scheduler,
priority: 1,
queue_name: :testing
}}
expected2 =
{time2,
%{
args: [:test_job, :foo, :bar],
mod: Scheduler,
priority: 2,
queue_name: :testing
}}
expected3 =
{time3,
%{
args: [:test_job, :foo, :bar],
mod: Scheduler,
priority: 3,
queue_name: :testing
}}
expected4 =
{time3,
%{
args: [:test_job, :bar, :foo],
mod: Scheduler,
priority: 3,
queue_name: :testing
}}
result = Scheduler.get_all()
refute expected1 in result
assert expected2 in result
assert expected3 in result
assert expected4 in result
end
end
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
defmodule PleromaJobQueueTest do defmodule PleromaJobQueueTest do
use ExUnit.Case use ExUnit.Case
alias PleromaJobQueue.Scheduler
defmodule Worker do defmodule Worker do
defp pid, do: Application.get_env(:pleroma_job_queue, :test_pid) defp pid, do: Application.get_env(:pleroma_job_queue, :test_pid)
...@@ -31,6 +32,35 @@ defmodule PleromaJobQueueTest do ...@@ -31,6 +32,35 @@ defmodule PleromaJobQueueTest do
assert_receive {:test_job, {:foo, :bar}} assert_receive {:test_job, {:foo, :bar}}
end end
test "enqueue_at/5" do
set_pid()
future = :os.system_time(:millisecond) + :timer.seconds(4)
past = :os.system_time(:millisecond) - :timer.seconds(4)
:ok = PleromaJobQueue.enqueue_at(future, @queue_name, Worker, [:test_job, :future, :bar])
:ok = PleromaJobQueue.enqueue_at(past, @queue_name, Worker, [:test_job, :past, :bar])
pid = Process.whereis(Scheduler)
Process.send(pid, :poll, [])
refute_receive {:test_job, {:future, :bar}}
assert_receive {:test_job, {:past, :bar}}
end
test "enqueue_in/5" do
set_pid()
offset1 = :timer.seconds(4)
offset2 = :timer.seconds(14)
:ok = PleromaJobQueue.enqueue_in(offset1, @queue_name, Worker, [:test_job, :foo, :bar1])
:ok = PleromaJobQueue.enqueue_in(offset2, @queue_name, Worker, [:test_job, :foo, :bar2])
pid = Process.whereis(Scheduler)
Process.send(pid, :poll, [])
result = Scheduler.get_all() |> Enum.map(&elem(&1, 1).args)
assert [:test_job, :foo, :bar1] in result
assert [:test_job, :foo, :bar2] in result
end
test "max_jobs/1" do test "max_jobs/1" do
assert Application.get_env(:pleroma_job_queue, @queue_name, 1) == assert Application.get_env(:pleroma_job_queue, @queue_name, 1) ==
PleromaJobQueue.max_jobs(@queue_name) PleromaJobQueue.max_jobs(@queue_name)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment