Commit 03fdd5fe authored by Egor's avatar Egor

Initial

parents
Pipeline #9570 failed with stage
in 1 minute and 22 seconds
# Used by "mix format"
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
]
# The directory Mix will write compiled artifacts to.
/_build/
# If you run "mix test --cover", coverage assets end up here.
/cover/
# The directory Mix downloads your dependencies sources to.
/deps/
# Where third-party dependencies like ExDoc output generated docs.
/doc/
# Ignore .fetch files in case you like to edit your project deps locally.
/.fetch
# If the VM crashes, it generates a dump, let's ignore it too.
erl_crash.dump
# Also ignore archive artifacts (built via "mix archive.build").
*.ez
# Ignore package tarball (built via "mix hex.build").
pleroma_job_queue-*.tar
# Dialyzer
/priv/plts/*.plt
/priv/plts/*.plt.hash
image: elixir:1.7
cache:
key: ${CI_COMMIT_REF_SLUG}
paths:
- deps
- _build
stages:
- test
- publish
before_script:
- mix local.hex --force
- mix local.rebar --force
- mix deps.get
- mix compile --force
lint:
stage: test
script:
- mix format --check-formatted
unit-testing:
stage: test
coverage: '/(\d+\.\d+\%) \| Total/'
script:
- mix test --trace --preload-modules --cover
analysis:
stage: test
script:
- mix credo --strict --only=warnings,todo,fixme,consistency,readability
This diff is collapsed.
# 🗳 Pleroma Job Queue
> A lightweight job queue.
## Installation
Add `pleroma_job_queue` to your list of dependencies in `mix.exs`:
```elixir
def deps do
[
{:pleroma_job_queue, "~> 0.1.0"}
]
end
```
## Configuration
You need list your queues with max concurrent jobs, like this:
```elixir
config :pleroma_job_queue,
my_queue: 100,
another_queue: 50
```
## Usage
[See documentation](http://hexdocs.pm/pleroma_job_queue)
## Contibuting
TBD
## License
TBD
use Mix.Config
# This configuration is loaded before any dependency and is restricted
# to this project. If another project depends on this project, this
# file won't be loaded nor affect the parent project. For this reason,
# if you want to provide default values for your application for
# third-party users, it should be done in your "mix.exs" file.
config :pleroma_job_queue, testing: 2
# PleromaJobQueue: A lightweight job queue
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule PleromaJobQueue do
@moduledoc """
A lightweight job queue
## Installation
Add `pleroma_job_queue` to your list of dependencies in `mix.exs`:
```elixir
def deps do
[
{:pleroma_job_queue, "~> 0.1.0"}
]
end
```
## Configuration
You need list your queues with max concurrent jobs, like this:
```elixir
config :pleroma_job_queue,
my_queue: 100,
another_queue: 50
```
"""
@doc """
Enqueues a job.
Returns `:ok`.
## Arguments
- `queue_name` - a queue name(must be specified in the config).
- `mod` - a worker module (must have `perform` function).
- `args` - a list of arguments for the `perform` function of the worker module.
- `priority` - a job priority (`1` by default). The higher number has a lower priority.
## Examples
Enqueue `MyWorker.perform/0` with `priority=1`:
iex> PleromaJobQueue.enqueue(:example_queue, MyWorker)
:ok
Enqueue `MyWorker.perform(:job_name)` with `priority=5`:
iex> PleromaJobQueue.enqueue(:example_queue, MyWorker, [:job_name], 5)
:ok
Enqueue `MyWorker.perform(:another_job, data)` with `priority=1`:
iex> data = "foobar"
iex> PleromaJobQueue.enqueue(:example_queue, MyWorker, [:another_job, data])
:ok
Enqueue `MyWorker.perform(:foobar_job, :foo, :bar, 42)` with `priority=1`:
iex> PleromaJobQueue.enqueue(:example_queue, MyWorker, [:foobar_job, :foo, :bar, 42])
:ok
"""
@spec enqueue(atom(), module(), [any()], non_neg_integer()) :: :ok
def enqueue(queue_name, mod, args \\ [], priority \\ 1) do
GenServer.cast(PleromaJobQueue.Worker, {:enqueue, queue_name, mod, args, priority})
end
@doc """
Returns a maximum concurrent jobs for a given queue name.
"""
@spec max_jobs(atom()) :: non_neg_integer()
def max_jobs(queue_name) do
Application.get_env(:pleroma_job_queue, queue_name, 0)
end
end
# PleromaJobQueue: A lightweight job queue
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule PleromaJobQueue.Application do
# See https://hexdocs.pm/elixir/Application.html
# for more information on OTP Applications
@moduledoc false
use Application
def start(_type, _args) do
# List all child processes to be supervised
children = [
# Starts a worker by calling: PleromaJobQueue.Worker.start_link(arg)
# {PleromaJobQueue.Worker, arg}
PleromaJobQueue.Worker
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: PleromaJobQueue.Supervisor]
Supervisor.start_link(children, opts)
end
end
defmodule PleromaJobQueue.State do
defstruct queues: %{}, refs: %{}
@type t :: %__MODULE__{
queues: %{optional(atom()) => {running_jobs(), queue()}},
refs: %{optional(reference()) => atom()}
}
@type job :: {module(), [any()]}
@type running_jobs :: :sets.set(reference())
@type queue :: [
%{
item: job(),
priority: non_neg_integer()
}
]
end
# PleromaJobQueue: A lightweight job queue
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule PleromaJobQueue.Worker do
@moduledoc """
Queue Worker
"""
use GenServer
import PleromaJobQueue, only: [max_jobs: 1]
alias PleromaJobQueue.State
def start_link([]) do
GenServer.start_link(__MODULE__, %State{}, name: __MODULE__)
end
@impl true
@spec init(State.t()) :: {:ok, State.t()}
def init(state) do
queues =
:pleroma_job_queue
|> Application.get_all_env()
|> Enum.map(fn {name, _} -> create_queue(name) end)
|> Enum.into(%{})
|> Map.merge(state.queues)
{:ok, Map.put(state, :queues, queues)}
end
@impl true
def handle_cast({:enqueue, queue_name, mod, args, priority}, state) do
{running_jobs, queue} = state.queues[queue_name]
queue = enqueue_sorted(queue, {mod, args}, priority)
state =
state
|> update_queue(queue_name, {running_jobs, queue})
|> maybe_start_job(queue_name, running_jobs, queue)
{:noreply, state}
end
@impl true
def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
queue_name = state.refs[ref]
{running_jobs, queue} = state.queues[queue_name]
running_jobs = :sets.del_element(ref, running_jobs)
state =
state
|> remove_ref(ref)
|> update_queue(queue_name, {running_jobs, queue})
|> maybe_start_job(queue_name, running_jobs, queue)
{:noreply, state}
end
@spec maybe_start_job(State.t(), atom(), State.running_jobs(), State.queue()) :: State.t()
def maybe_start_job(state, _queue_name, _running_jobs, []), do: state
def maybe_start_job(state, queue_name, running_jobs, queue) do
if :sets.size(running_jobs) < max_jobs(queue_name) do
{{mod, args}, queue} = queue_pop(queue)
{:ok, pid} = Task.start(fn -> apply(mod, :perform, args) end)
mref = Process.monitor(pid)
state
|> add_ref(queue_name, mref)
|> update_queue(queue_name, {:sets.add_element(mref, running_jobs), queue})
else
state
end
end
@spec create_queue(atom()) :: {atom(), {State.running_jobs(), State.queue()}}
def create_queue(queue_name) do
{queue_name, {:sets.new(), []}}
end
@spec enqueue_sorted(State.queue(), State.job(), non_neg_integer()) :: State.queue()
def enqueue_sorted(queue, element, priority) do
Enum.sort_by([%{item: element, priority: priority} | queue], & &1.priority)
end
@spec queue_pop(State.queue()) :: {State.job(), State.queue()}
def queue_pop([%{item: element} | queue]) do
{element, queue}
end
defp add_ref(state, queue_name, ref) do
refs = Map.put(state.refs, ref, queue_name)
%State{state | refs: refs}
end
defp remove_ref(state, ref) do
refs = Map.delete(state.refs, ref)
%State{state | refs: refs}
end
defp update_queue(state, queue_name, data) do
queues = Map.put(state.queues, queue_name, data)
%State{state | queues: queues}
end
end
defmodule PleromaJobQueue.MixProject do
use Mix.Project
def project do
[
app: :pleroma_job_queue,
version: "0.1.0",
elixir: "~> 1.7",
start_permanent: Mix.env() == :prod,
deps: deps(),
# Docs
name: "PleromaJobQueue",
source_url: "https://git.pleroma.social/pleroma/pleroma_job_queue",
source_url_pattern:
"https://git.pleroma.social/pleroma/pleroma_job_queue/blob/master/%{path}#L%{line}",
homepage_url: "https://git.pleroma.social/pleroma/pleroma_job_queue",
docs: [
main: "PleromaJobQueue"
]
]
end
# Run "mix help compile.app" to learn about applications.
def application do
[
extra_applications: [:logger],
mod: {PleromaJobQueue.Application, []}
]
end
# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:credo, "~> 1.0.0", only: [:dev, :test], runtime: false},
{:ex_doc, "~> 0.19", only: :dev, runtime: false},
{:dialyxir, "~> 1.0.0-rc.5", only: [:dev], runtime: false}
]
end
end
%{
"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm"},
"credo": {:hex, :credo, "1.0.4", "d2214d4cc88c07f54004ffd5a2a27408208841be5eca9f5a72ce9e8e835f7ede", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"},
"dialyxir": {:hex, :dialyxir, "1.0.0-rc.5", "c9c2379c59cf2dfc74690f48866e33ffb55ff660e5e02405c14614d204efdc4f", [:mix], [{:erlex, "~> 0.2.1", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm"},
"earmark": {:hex, :earmark, "1.3.2", "b840562ea3d67795ffbb5bd88940b1bed0ed9fa32834915125ea7d02e35888a5", [:mix], [], "hexpm"},
"erlex": {:hex, :erlex, "0.2.1", "cee02918660807cbba9a7229cae9b42d1c6143b768c781fa6cee1eaf03ad860b", [:mix], [], "hexpm"},
"ex_doc": {:hex, :ex_doc, "0.19.3", "3c7b0f02851f5fc13b040e8e925051452e41248f685e40250d7e40b07b9f8c10", [:mix], [{:earmark, "~> 1.2", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.10", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm"},
"jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"},
"makeup": {:hex, :makeup, "0.8.0", "9cf32aea71c7fe0a4b2e9246c2c4978f9070257e5c9ce6d4a28ec450a839b55f", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"},
"makeup_elixir": {:hex, :makeup_elixir, "0.13.0", "be7a477997dcac2e48a9d695ec730b2d22418292675c75aa2d34ba0909dcdeda", [:mix], [{:makeup, "~> 0.8", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm"},
}
# PleromaJobQueue: A lightweight job queue
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule PleromaJobQueue.WorkerTest do
use ExUnit.Case
defmodule(TestWorker, do: def(perform(:test_job, _, _), do: :ok))
alias PleromaJobQueue.Worker
alias PleromaJobQueue.State
@queue_name :testing
setup do
state = %State{
queues: Enum.into([Worker.create_queue(@queue_name)], %{}),
refs: %{}
}
[state: state]
end
test "creates queue" do
queue = Worker.create_queue(:foobar)
assert {:foobar, set} = queue
assert :set == set |> elem(0) |> elem(0)
end
test "enqueues an element according to priority" do
queue = [%{item: 1, priority: 2}]
new_queue = Worker.enqueue_sorted(queue, 2, 1)
assert new_queue == [%{item: 2, priority: 1}, %{item: 1, priority: 2}]
new_queue = Worker.enqueue_sorted(queue, 2, 3)
assert new_queue == [%{item: 1, priority: 2}, %{item: 2, priority: 3}]
end
test "pop first item" do
queue = [%{item: 2, priority: 1}, %{item: 1, priority: 2}]
assert {2, [%{item: 1, priority: 2}]} = Worker.queue_pop(queue)
end
test "enqueue a job", %{state: state} do
assert {:noreply, new_state} =
Worker.handle_cast(
{:enqueue, @queue_name, TestWorker, [:test_job, :foo, :bar], 3},
state
)
assert %{queues: %{testing: {running_jobs, []}}, refs: _} = new_state
assert :sets.size(running_jobs) == 1
assert [ref] = :sets.to_list(running_jobs)
assert %{refs: %{^ref => @queue_name}} = new_state
end
test "max jobs setting", %{state: state} do
max_jobs = PleromaJobQueue.max_jobs(@queue_name)
{:noreply, state} =
Enum.reduce(1..(max_jobs + 1), {:noreply, state}, fn _, {:noreply, state} ->
Worker.handle_cast(
{:enqueue, @queue_name, TestWorker, [:test_job, :foo, :bar], 3},
state
)
end)
assert %{
queues: %{
testing:
{running_jobs, [%{item: {TestWorker, [:test_job, :foo, :bar]}, priority: 3}]}
}
} = state
assert :sets.size(running_jobs) == max_jobs
end
test "remove job after it finished", %{state: state} do
{:noreply, new_state} =
Worker.handle_cast(
{:enqueue, @queue_name, TestWorker, [:test_job, :foo, :bar], 3},
state
)
%{queues: %{testing: {running_jobs, []}}} = new_state
[ref] = :sets.to_list(running_jobs)
assert {:noreply, %{queues: %{testing: {running_jobs, []}}, refs: %{}}} =
Worker.handle_info({:DOWN, ref, :process, nil, nil}, new_state)
assert :sets.size(running_jobs) == 0
end
end
# PleromaJobQueue: A lightweight job queue
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule PleromaJobQueueTest do
use ExUnit.Case
defmodule Worker do
defp pid, do: Application.get_env(:test, :pid)
def perform, do: send(pid(), {:test, :no_args})
def perform(:skip), do: nil
def perform(:test_job), do: send(pid(), {:test, :test_job})
def perform(:test_job, a, b), do: send(pid(), {:test, {a, b}})
def perform(:priority, priority), do: send(pid(), {:priority, priority})
end
@queue_name :testing
test "enqueue/4" do
set_pid()
assert :ok == PleromaJobQueue.enqueue(@queue_name, Worker)
assert :no_args == receive_result(:test)
assert :ok == PleromaJobQueue.enqueue(@queue_name, Worker, [:test_job])
assert :test_job == receive_result(:test)
assert :ok == PleromaJobQueue.enqueue(@queue_name, Worker, [:test_job, :foo, :bar])
assert {:foo, :bar} == receive_result(:test)
end
test "max_jobs/1" do
assert Application.get_env(:pleroma_job_queue, @queue_name, 0) ==
PleromaJobQueue.max_jobs(@queue_name)
end
test "priority" do
set_pid()
PleromaJobQueue.enqueue(@queue_name, Worker, [:skip], 10)
PleromaJobQueue.enqueue(@queue_name, Worker, [:skip], 11)
PleromaJobQueue.enqueue(@queue_name, Worker, [:priority, 12], 12)
PleromaJobQueue.enqueue(@queue_name, Worker, [:priority, 13], 13)
PleromaJobQueue.enqueue(@queue_name, Worker, [:priority, 20], 20)
PleromaJobQueue.enqueue(@queue_name, Worker, [:priority, 14], 14)
PleromaJobQueue.enqueue(@queue_name, Worker, [:priority, 15], 15)
PleromaJobQueue.enqueue(@queue_name, Worker, [:priority, 16], 16)
PleromaJobQueue.enqueue(@queue_name, Worker, [:priority, 17], 17)
PleromaJobQueue.enqueue(@queue_name, Worker, [:priority, 18], 18)
PleromaJobQueue.enqueue(@queue_name, Worker, [:priority, 19], 19)
PleromaJobQueue.enqueue(@queue_name, Worker, [:priority, 19], 19)
PleromaJobQueue.enqueue(@queue_name, Worker, [:priority, 1], 1)
assert 1 == receive_result(:priority)
end
test "README install version check" do
app = Keyword.get(Mix.Project.config(), :app)
app_version = app |> Application.spec(:vsn) |> to_string()
readme = File.read!("README.md")
[_, readme_versions] = Regex.run(~r/{:#{app}, "(.+)"}/, readme)
assert Version.match?(
app_version,
readme_versions
),
"""
Install version constraint in README.md does not match to current app version.
Current App Version: #{app_version}
Readme Install Versions: #{readme_versions}
"""
end
test "PleromaJobQueue install version check" do
app = Keyword.get(Mix.Project.config(), :app)
app_version = app |> Application.spec(:vsn) |> to_string()
readme = File.read!("lib/pleroma_job_queue.ex")
[_, readme_versions] = Regex.run(~r/{:#{app}, "(.+)"}/, readme)
assert Version.match?(
app_version,
readme_versions
),
"""
Install version constraint in PleromaJobQueue.ex does not match to current app version.
Current App Version: #{app_version}
PleromaJobQueue Install Versions: #{readme_versions}
"""
end
defp receive_result(name) do
receive do
{^name, value} -> value
end
end
defp set_pid, do: Application.put_env(:test, :pid, self())
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment