Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
Pleroma
pleroma
Commits
00a71831
Commit
00a71831
authored
May 05, 2017
by
lain
Browse files
Basic queue.
parent
2d9fdbcc
Changes
2
Hide whitespace changes
Inline
Side-by-side
lib/pleroma/application.ex
View file @
00a71831
...
...
@@ -18,7 +18,8 @@ def start(_type, _args) do
default_ttl:
25000
,
ttl_interval:
1000
,
limit:
2500
]])
]]),
worker
(
Pleroma
.
Web
.
Federator
,
[])
]
# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
...
...
lib/pleroma/web/federator/federator.ex
View file @
00a71831
defmodule
Pleroma
.
Web
.
Federator
do
use
GenServer
alias
Pleroma
.
User
alias
Pleroma
.
Web
.
WebFinger
require
Logger
@websub
Application
.
get_env
(
:pleroma
,
:websub
)
@max_jobs
10
def
start_link
do
GenServer
.
start_link
(
__MODULE__
,
{
:sets
.
new
(),
:queue
.
new
()},
name:
__MODULE__
)
end
def
handle
(
:publish
,
activity
)
do
Logger
.
debug
(
fn
->
"Running publish for
#{
activity
.
data
[
"id"
]
}
"
end
)
...
...
@@ -28,11 +34,38 @@ def handle(type, payload) do
end
def
enqueue
(
type
,
payload
)
do
# for now, just run immediately in a new process.
if
Mix
.
env
==
:test
do
handle
(
type
,
payload
)
else
spawn
(
fn
->
handle
(
type
,
payload
)
end
)
GenServer
.
cast
(
__MODULE__
,
{
:enqueue
,
type
,
payload
})
end
end
def
maybe_start_job
(
running_jobs
,
queue
)
do
if
(
:sets
.
size
(
running_jobs
)
<
@max_jobs
)
&&
!
:queue
.
is_empty
(
queue
)
do
{{
:value
,
{
type
,
payload
}},
queue
}
=
:queue
.
out
(
queue
)
{
:ok
,
pid
}
=
Task
.
start
(
fn
->
handle
(
type
,
payload
)
end
)
mref
=
Process
.
monitor
(
pid
)
{
:sets
.
add_element
(
mref
,
running_jobs
),
queue
}
else
{
running_jobs
,
queue
}
end
end
def
handle_cast
({
:enqueue
,
type
,
payload
},
{
running_jobs
,
queue
})
do
queue
=
:queue
.
in
({
type
,
payload
},
queue
)
{
running_jobs
,
queue
}
=
maybe_start_job
(
running_jobs
,
queue
)
{
:noreply
,
{
running_jobs
,
queue
}}
end
def
handle_info
({
:DOWN
,
ref
,
:process
,
_pid
,
_reason
},
{
running_jobs
,
queue
})
do
running_jobs
=
:sets
.
del_element
(
ref
,
running_jobs
)
{
running_jobs
,
queue
}
=
maybe_start_job
(
running_jobs
,
queue
)
{
:noreply
,
{
running_jobs
,
queue
}}
end
def
handle_cast
(
m
,
state
)
do
IO
.
inspect
(
"Unknown:
#{
inspect
(
m
)
}
,
#{
inspect
(
state
)
}
"
)
{
:noreply
,
state
}
end
end
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment