Commit 0bc3b77c authored by minibikini's avatar minibikini

support queues without config

parent ecd8b1fa
......@@ -17,20 +17,20 @@ defmodule PleromaJobQueue.Worker do
@impl true
@spec init(State.t()) :: {:ok, State.t()}
def init(state) do
def init(%State{queues: queues} = state) do
queues =
:pleroma_job_queue
|> Application.get_all_env()
|> Enum.map(fn {name, _} -> create_queue(name) end)
|> Enum.map(fn {name, _} -> {name, create_queue(name)} end)
|> Enum.into(%{})
|> Map.merge(state.queues)
|> Map.merge(queues)
{:ok, Map.put(state, :queues, queues)}
{:ok, %State{state | queues: queues}}
end
@impl true
def handle_cast({:enqueue, queue_name, mod, args, priority}, state) do
{running_jobs, queue} = state.queues[queue_name]
def handle_cast({:enqueue, queue_name, mod, args, priority}, %State{queues: queues} = state) do
{running_jobs, queue} = Map.get(queues, queue_name, create_queue(queue_name))
queue = enqueue_sorted(queue, {mod, args}, priority)
......@@ -60,9 +60,9 @@ defmodule PleromaJobQueue.Worker do
end
@spec maybe_start_job(State.t(), atom(), State.running_jobs(), State.queue()) :: State.t()
def maybe_start_job(state, _queue_name, _running_jobs, []), do: state
def maybe_start_job(%State{} = state, _queue_name, _running_jobs, []), do: state
def maybe_start_job(state, queue_name, running_jobs, queue) do
def maybe_start_job(%State{} = state, queue_name, running_jobs, queue) do
if :sets.size(running_jobs) < max_jobs(queue_name) do
{{mod, args}, queue} = queue_pop(queue)
{:ok, pid} = Task.start(fn -> apply(mod, :perform, args) end)
......@@ -76,9 +76,10 @@ defmodule PleromaJobQueue.Worker do
end
end
@spec create_queue(atom()) :: {atom(), {State.running_jobs(), State.queue()}}
@spec create_queue(atom()) :: {State.running_jobs(), State.queue()}
def create_queue(queue_name) do
{queue_name, {:sets.new(), []}}
unless max_jobs(queue_name), do: Application.put_env(:pleroma_job_queue, queue_name, 1)
{:sets.new(), []}
end
@spec enqueue_sorted(State.queue(), State.job(), non_neg_integer()) :: State.queue()
......@@ -91,18 +92,15 @@ defmodule PleromaJobQueue.Worker do
{element, queue}
end
defp add_ref(state, queue_name, ref) do
refs = Map.put(state.refs, ref, queue_name)
%State{state | refs: refs}
defp add_ref(%State{refs: refs} = state, queue_name, ref) do
%State{state | refs: Map.put(refs, ref, queue_name)}
end
defp remove_ref(state, ref) do
refs = Map.delete(state.refs, ref)
%State{state | refs: refs}
defp remove_ref(%State{refs: refs} = state, ref) do
%State{state | refs: Map.delete(refs, ref)}
end
defp update_queue(state, queue_name, data) do
queues = Map.put(state.queues, queue_name, data)
%State{state | queues: queues}
defp update_queue(%State{queues: queues} = state, queue_name, data) do
%State{state | queues: Map.put(queues, queue_name, data)}
end
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment