scheduler.ex 1.87 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
# PleromaJobQueue: A lightweight job queue
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only

defmodule PleromaJobQueue.Scheduler do
  @moduledoc """
  Scheduler.

  The workflow is as follows:

  * Add jobs to the store.
  * Every N ms, check for expired jobs and send them to the execution queue.
  """
  use GenServer

  @poll_interval :pleroma_job_queue
                 |> Application.get_env(:scheduler, [])
                 |> Keyword.get(:poll_interval, :timer.seconds(10))

  @store :pleroma_job_queue
         |> Application.get_env(:scheduler, [])
         |> Keyword.get(:store, PleromaJobQueue.Scheduler.Store.ETS)

  def start_link([]) do
    GenServer.start_link(__MODULE__, nil, name: __MODULE__)
  end

  @spec init(any()) :: {:ok, :ets.tid()} | :ignore
  @impl true
  def init(_) do
    with true <- PleromaJobQueue.enabled?() and enabled?(),
Eugenij's avatar
Eugenij committed
32
         {:ok, state} <- @store.init([]) do
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
      schedule_next(:poll)
      {:ok, state}
    else
      _ -> :ignore
    end
  end

  @impl true
  def handle_cast({:enqueue_at, timestamp, queue_name, mod, args, priority}, state) do
    job = %{
      queue_name: queue_name,
      mod: mod,
      args: args,
      priority: priority
    }

    @store.insert(state, timestamp, job)
    {:noreply, state}
  end

  @impl true
  def handle_call({:get_all}, _, state) do
    result = @store.all(state)
    {:reply, result, state}
  end

  def get_all do
    GenServer.call(__MODULE__, {:get_all})
  end

  @impl true
  def handle_info(:poll, state) do
    now = :os.system_time(:millisecond)
    @store.drain(state, now)
    schedule_next(:poll)
    {:noreply, state}
  end

  defp schedule_next(:poll) do
    Process.send_after(self(), :poll, @poll_interval)
  end

  def enabled? do
    :pleroma_job_queue
    |> Application.get_env(:scheduler, [])
    |> Keyword.get(:enabled, false)
  end
end