Commit 849f4047 authored by minibikini's avatar minibikini

add an option to disable async

parents 9d210fbc ce360f3f
Pipeline #9636 passed with stage
in 54 seconds
......@@ -4,7 +4,7 @@
[![coverage report](https://git.pleroma.social/pleroma/pleroma_job_queue/badges/master/coverage.svg)](https://git.pleroma.social/pleroma/pleroma_job_queue/commits/master)
[![Hex pm](https://img.shields.io/hexpm/v/pleroma_job_queue.svg?style=flat)](https://hex.pm/packages/pleroma_job_queue)
> A lightweight job queue.
> A lightweight job queue
## Installation
......@@ -13,17 +13,17 @@ Add `pleroma_job_queue` to your list of dependencies in `mix.exs`:
```elixir
def deps do
[
{:pleroma_job_queue, "~> 0.1.0"}
{:pleroma_job_queue, "~> 0.2.0"}
]
end
```
## Configuration
You need to list your queues with max concurrent jobs like this:
List your queues with max concurrent jobs like this:
```elixir
config :pleroma_job_queue,
config :pleroma_job_queue, :queues,
my_queue: 100,
another_queue: 50
```
......
......@@ -5,5 +5,3 @@ use Mix.Config
# file won't be loaded nor affect the parent project. For this reason,
# if you want to provide default values for your application for
# third-party users, it should be done in your "mix.exs" file.
config :pleroma_job_queue, testing: 2
......@@ -13,21 +13,26 @@ defmodule PleromaJobQueue do
```elixir
def deps do
[
{:pleroma_job_queue, "~> 0.1.0"}
{:pleroma_job_queue, "~> 0.2.0"}
]
end
```
## Configuration
You need to list your queues with max concurrent jobs like this:
List your queues with max concurrent jobs like this:
```elixir
config :pleroma_job_queue,
config :pleroma_job_queue, :queues,
my_queue: 100,
another_queue: 50
```
You can disable `pleroma_job_queue` if you need to your jobs synchronously:
```elixir
config :pleroma_job_queue, disabled: true
```
"""
@doc """
......@@ -67,9 +72,13 @@ defmodule PleromaJobQueue do
"""
@spec enqueue(atom(), module(), [any()], non_neg_integer()) :: :ok
@spec enqueue(atom(), module(), [any()], non_neg_integer()) :: any()
def enqueue(queue_name, mod, args \\ [], priority \\ 1) do
GenServer.cast(PleromaJobQueue.Worker, {:enqueue, queue_name, mod, args, priority})
if enabled?() do
GenServer.cast(PleromaJobQueue.Worker, {:enqueue, queue_name, mod, args, priority})
else
apply(mod, :perform, args)
end
end
@doc """
......@@ -78,6 +87,10 @@ defmodule PleromaJobQueue do
@spec max_jobs(atom()) :: non_neg_integer()
def max_jobs(queue_name) do
Application.get_env(:pleroma_job_queue, queue_name, 0)
:pleroma_job_queue
|> Application.get_env(:queues, [])
|> Keyword.get(queue_name, 1)
end
defp enabled?(), do: not Application.get_env(:pleroma_job_queue, :disabled, false)
end
......@@ -17,20 +17,20 @@ defmodule PleromaJobQueue.Worker do
@impl true
@spec init(State.t()) :: {:ok, State.t()}
def init(state) do
def init(%State{queues: queues} = state) do
queues =
:pleroma_job_queue
|> Application.get_all_env()
|> Enum.map(fn {name, _} -> create_queue(name) end)
|> Application.get_env(:queues, [])
|> Enum.map(fn {name, _} -> {name, create_queue()} end)
|> Enum.into(%{})
|> Map.merge(state.queues)
|> Map.merge(queues)
{:ok, Map.put(state, :queues, queues)}
{:ok, %State{state | queues: queues}}
end
@impl true
def handle_cast({:enqueue, queue_name, mod, args, priority}, state) do
{running_jobs, queue} = state.queues[queue_name]
def handle_cast({:enqueue, queue_name, mod, args, priority}, %State{queues: queues} = state) do
{running_jobs, queue} = Map.get(queues, queue_name, create_queue())
queue = enqueue_sorted(queue, {mod, args}, priority)
......@@ -60,9 +60,9 @@ defmodule PleromaJobQueue.Worker do
end
@spec maybe_start_job(State.t(), atom(), State.running_jobs(), State.queue()) :: State.t()
def maybe_start_job(state, _queue_name, _running_jobs, []), do: state
def maybe_start_job(%State{} = state, _queue_name, _running_jobs, []), do: state
def maybe_start_job(state, queue_name, running_jobs, queue) do
def maybe_start_job(%State{} = state, queue_name, running_jobs, queue) do
if :sets.size(running_jobs) < max_jobs(queue_name) do
{{mod, args}, queue} = queue_pop(queue)
{:ok, pid} = Task.start(fn -> apply(mod, :perform, args) end)
......@@ -76,9 +76,9 @@ defmodule PleromaJobQueue.Worker do
end
end
@spec create_queue(atom()) :: {atom(), {State.running_jobs(), State.queue()}}
def create_queue(queue_name) do
{queue_name, {:sets.new(), []}}
@spec create_queue() :: {State.running_jobs(), State.queue()}
def create_queue do
{:sets.new(), []}
end
@spec enqueue_sorted(State.queue(), State.job(), non_neg_integer()) :: State.queue()
......@@ -91,18 +91,15 @@ defmodule PleromaJobQueue.Worker do
{element, queue}
end
defp add_ref(state, queue_name, ref) do
refs = Map.put(state.refs, ref, queue_name)
%State{state | refs: refs}
defp add_ref(%State{refs: refs} = state, queue_name, ref) do
%State{state | refs: Map.put(refs, ref, queue_name)}
end
defp remove_ref(state, ref) do
refs = Map.delete(state.refs, ref)
%State{state | refs: refs}
defp remove_ref(%State{refs: refs} = state, ref) do
%State{state | refs: Map.delete(refs, ref)}
end
defp update_queue(state, queue_name, data) do
queues = Map.put(state.queues, queue_name, data)
%State{state | queues: queues}
defp update_queue(%State{queues: queues} = state, queue_name, data) do
%State{state | queues: Map.put(queues, queue_name, data)}
end
end
......@@ -6,7 +6,7 @@ defmodule PleromaJobQueue.MixProject do
app: :pleroma_job_queue,
name: "Pleroma Job Queue",
description: "A lightweight job queue",
version: "0.1.0",
version: "0.2.0",
elixir: "~> 1.7",
start_permanent: Mix.env() == :prod,
deps: deps(),
......
# PleromaJobQueue: A lightweight job queue
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule PleromaJobQueue.InstallVersionTest do
use ExUnit.Case, async: true
describe("install version check") do
test "README.md" do
assert_version("README.md")
end
test "lib/pleroma_job_queue.ex" do
assert_version("lib/pleroma_job_queue.ex")
end
end
defp assert_version(filename) do
app = Keyword.get(Mix.Project.config(), :app)
app_version = app |> Application.spec(:vsn) |> to_string()
file = File.read!(filename)
[_, file_versions] = Regex.run(~r/{:#{app}, "(.+)"}/, file)
assert Version.match?(
app_version,
file_versions
),
"""
Install version constraint in `#{filename}` does not match to current app version.
Current App Version: #{app_version}
`#{filename}` Install Versions: #{file_versions}
"""
end
end
......@@ -14,18 +14,20 @@ defmodule PleromaJobQueue.WorkerTest do
setup do
state = %State{
queues: Enum.into([Worker.create_queue(@queue_name)], %{}),
queues: Enum.into([Worker.create_queue()], %{}),
refs: %{}
}
[state: state]
end
test "creates queue" do
queue = Worker.create_queue(:foobar)
test "create_queue/1" do
{running_jobs, queue} = Worker.create_queue()
assert {:foobar, set} = queue
assert :set == set |> elem(0) |> elem(0)
assert queue == []
assert :sets.is_set(running_jobs)
assert :sets.is_empty(running_jobs)
assert PleromaJobQueue.max_jobs(:foobar) == 1
end
test "enqueues an element according to priority" do
......
......@@ -6,12 +6,13 @@ defmodule PleromaJobQueueTest do
use ExUnit.Case
defmodule Worker do
defp pid, do: Application.get_env(:test, :pid)
defp pid, do: Application.get_env(:pleroma_job_queue, :test_pid)
def perform, do: send(pid(), {:test, :no_args})
def perform(:skip), do: nil
def perform(:test_job), do: send(pid(), {:test, :test_job})
def perform(:test_job, a, b), do: send(pid(), {:test, {a, b}})
def perform(:sync), do: :sync
def perform(:test_job), do: send(pid(), :test_job)
def perform(:test_job, a, b), do: send(pid(), {:test_job, {a, b}})
def perform(:priority, priority), do: send(pid(), {:priority, priority})
end
......@@ -21,24 +22,29 @@ defmodule PleromaJobQueueTest do
set_pid()
assert :ok == PleromaJobQueue.enqueue(@queue_name, Worker)
assert :no_args == receive_result(:test)
assert_receive {:test, :no_args}
assert :ok == PleromaJobQueue.enqueue(@queue_name, Worker, [:test_job])
assert :test_job == receive_result(:test)
assert_receive :test_job
assert :ok == PleromaJobQueue.enqueue(@queue_name, Worker, [:test_job, :foo, :bar])
assert {:foo, :bar} == receive_result(:test)
assert_receive {:test_job, {:foo, :bar}}
end
test "max_jobs/1" do
assert Application.get_env(:pleroma_job_queue, @queue_name, 0) ==
assert Application.get_env(:pleroma_job_queue, @queue_name, 1) ==
PleromaJobQueue.max_jobs(@queue_name)
end
test "disable" do
Application.put_env(:pleroma_job_queue, :disabled, true)
assert :sync == PleromaJobQueue.enqueue(@queue_name, Worker, [:sync])
Application.put_env(:pleroma_job_queue, :disabled, false)
end
test "priority" do
set_pid()
PleromaJobQueue.enqueue(@queue_name, Worker, [:skip], 10)
PleromaJobQueue.enqueue(@queue_name, Worker, [:skip], 11)
PleromaJobQueue.enqueue(@queue_name, Worker, [:priority, 12], 12)
PleromaJobQueue.enqueue(@queue_name, Worker, [:priority, 13], 13)
......@@ -52,50 +58,9 @@ defmodule PleromaJobQueueTest do
PleromaJobQueue.enqueue(@queue_name, Worker, [:priority, 19], 19)
PleromaJobQueue.enqueue(@queue_name, Worker, [:priority, 1], 1)
assert 1 == receive_result(:priority)
end
test "README install version check" do
app = Keyword.get(Mix.Project.config(), :app)
app_version = app |> Application.spec(:vsn) |> to_string()
readme = File.read!("README.md")
[_, readme_versions] = Regex.run(~r/{:#{app}, "(.+)"}/, readme)
assert Version.match?(
app_version,
readme_versions
),
"""
Install version constraint in README.md does not match to current app version.
Current App Version: #{app_version}
Readme Install Versions: #{readme_versions}
"""
end
test "lib/pleroma_job_queue.ex install version check" do
app = Keyword.get(Mix.Project.config(), :app)
app_version = app |> Application.spec(:vsn) |> to_string()
readme = File.read!("lib/pleroma_job_queue.ex")
[_, readme_versions] = Regex.run(~r/{:#{app}, "(.+)"}/, readme)
assert Version.match?(
app_version,
readme_versions
),
"""
Install version constraint in `lib/pleroma_job_queue.ex` does not match to current app version.
Current App Version: #{app_version}
`lib/pleroma_job_queue.ex` Install Versions: #{readme_versions}
"""
end
defp receive_result(name) do
receive do
{^name, value} -> value
end
assert_receive {:priority, priority}
assert priority == 1
end
defp set_pid, do: Application.put_env(:test, :pid, self())
defp set_pid, do: Application.put_env(:pleroma_job_queue, :test_pid, self())
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment