Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
Pleroma
pleroma
Commits
bd5bdc4c
Commit
bd5bdc4c
authored
Nov 11, 2017
by
lain
Browse files
MastoAPI: Basic streaming.
parent
a1923d20
Pipeline
#150
passed with stage
in 1 minute and 56 seconds
Changes
6
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
lib/pleroma/application.ex
View file @
bd5bdc4c
...
...
@@ -19,7 +19,8 @@ def start(_type, _args) do
ttl_interval:
1000
,
limit:
2500
]]),
worker
(
Pleroma
.
Web
.
Federator
,
[])
worker
(
Pleroma
.
Web
.
Federator
,
[]),
worker
(
Pleroma
.
Web
.
Streamer
,
[])
]
# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
...
...
lib/pleroma/web/activity_pub/activity_pub.ex
View file @
bd5bdc4c
...
...
@@ -22,6 +22,9 @@ def create(to, actor, context, object, additional \\ %{}, published \\ nil, loca
with
create_data
<-
make_create_data
(%{
to:
to
,
actor:
actor
,
published:
published
,
context:
context
,
object:
object
},
additional
),
{
:ok
,
activity
}
<-
insert
(
create_data
,
local
),
:ok
<-
maybe_federate
(
activity
)
do
if
activity
.
data
[
"type"
]
==
"Create"
and
Enum
.
member?
(
activity
.
data
[
"to"
],
"https://www.w3.org/ns/activitystreams#Public"
)
do
Pleroma
.
Web
.
Streamer
.
stream
(
"public"
,
activity
)
end
{
:ok
,
activity
}
end
end
...
...
lib/pleroma/web/endpoint.ex
View file @
bd5bdc4c
...
...
@@ -2,6 +2,7 @@ defmodule Pleroma.Web.Endpoint do
use
Phoenix
.
Endpoint
,
otp_app:
:pleroma
socket
"/socket"
,
Pleroma
.
Web
.
UserSocket
socket
"/api/v1"
,
Pleroma
.
Web
.
MastodonAPI
.
MastodonSocket
# Serve at "/" the static files from "priv/static" directory.
#
...
...
lib/pleroma/web/mastodon_api/mastodon_socket.ex
0 → 100644
View file @
bd5bdc4c
defmodule
Pleroma
.
Web
.
MastodonAPI
.
MastodonSocket
do
use
Phoenix
.
Socket
transport
:streaming
,
Phoenix
.
Transports
.
WebSocket
.
Raw
def
connect
(
params
,
socket
)
do
IO
.
inspect
(
params
)
Pleroma
.
Web
.
Streamer
.
add_socket
(
params
[
"stream"
],
socket
)
{
:ok
,
socket
}
end
def
id
(
socket
),
do
:
nil
def
handle
(
:text
,
message
,
state
)
do
IO
.
inspect
message
#| :ok
#| state
#| {:text, message}
#| {:text, message, state}
#| {:close, "Goodbye!"}
{
:text
,
message
}
end
def
handle
(
:closed
,
reason
,
_state
)
do
IO
.
inspect
reason
end
end
lib/pleroma/web/streamer.ex
0 → 100644
View file @
bd5bdc4c
defmodule
Pleroma
.
Web
.
Streamer
do
use
GenServer
require
Logger
import
Plug
.
Conn
def
start_link
do
GenServer
.
start_link
(
__MODULE__
,
%{},
name:
__MODULE__
)
end
def
add_socket
(
topic
,
socket
)
do
GenServer
.
cast
(
__MODULE__
,
%{
action:
:add
,
socket:
socket
,
topic:
topic
})
end
def
stream
(
topic
,
item
)
do
GenServer
.
cast
(
__MODULE__
,
%{
action:
:stream
,
topic:
topic
,
item:
item
})
end
def
handle_cast
(%{
action:
:stream
,
topic:
topic
,
item:
item
},
topics
)
do
Logger
.
debug
(
"Trying to push to
#{
topic
}
"
)
Logger
.
debug
(
"Pushing item to
#{
topic
}
"
)
Enum
.
each
(
topics
[
topic
]
||
[],
fn
(
socket
)
->
json
=
%{
event:
"update"
,
payload:
Pleroma
.
Web
.
MastodonAPI
.
StatusView
.
render
(
"status.json"
,
activity:
item
)
|>
Poison
.
encode!
}
|>
Poison
.
encode!
send
socket
.
transport_pid
,
{
:text
,
json
}
end
)
{
:noreply
,
topics
}
end
def
handle_cast
(%{
action:
:add
,
topic:
topic
,
socket:
socket
},
sockets
)
do
sockets_for_topic
=
sockets
[
topic
]
||
[]
sockets_for_topic
=
Enum
.
uniq
([
socket
|
sockets_for_topic
])
sockets
=
Map
.
put
(
sockets
,
topic
,
sockets_for_topic
)
Logger
.
debug
(
"Got new conn for
#{
topic
}
"
)
IO
.
inspect
(
sockets
)
{
:noreply
,
sockets
}
end
def
handle_cast
(
m
,
state
)
do
IO
.
inspect
(
"Unknown:
#{
inspect
(
m
)
}
,
#{
inspect
(
state
)
}
"
)
{
:noreply
,
state
}
end
end
lib/transports.ex
0 → 100644
View file @
bd5bdc4c
defmodule
Phoenix
.
Transports
.
WebSocket
.
Raw
do
import
Plug
.
Conn
,
only:
[
fetch_query_params:
1
,
send_resp:
3
]
alias
Phoenix
.
Socket
.
Transport
def
default_config
do
[
timeout:
60_000
,
transport_log:
false
,
cowboy:
Phoenix
.
Endpoint
.
CowboyWebSocket
]
end
def
init
(%
Plug
.
Conn
{
method:
"GET"
}
=
conn
,
{
endpoint
,
handler
,
transport
})
do
{
_
,
opts
}
=
handler
.
__transport__
(
transport
)
conn
=
conn
|>
fetch_query_params
|>
Transport
.
transport_log
(
opts
[
:transport_log
])
|>
Transport
.
force_ssl
(
handler
,
endpoint
,
opts
)
|>
Transport
.
check_origin
(
handler
,
endpoint
,
opts
)
case
conn
do
%{
halted:
false
}
=
conn
->
case
Transport
.
connect
(
endpoint
,
handler
,
transport
,
__MODULE__
,
nil
,
conn
.
params
)
do
{
:ok
,
socket
}
->
{
:ok
,
conn
,
{
__MODULE__
,
{
socket
,
opts
}}}
:error
->
send_resp
(
conn
,
:forbidden
,
""
)
{
:error
,
conn
}
end
_
->
{
:error
,
conn
}
end
end
def
init
(
conn
,
_
)
do
send_resp
(
conn
,
:bad_request
,
""
)
{
:error
,
conn
}
end
def
ws_init
({
socket
,
config
})
do
Process
.
flag
(
:trap_exit
,
true
)
{
:ok
,
%{
socket:
socket
},
config
[
:timeout
]}
end
def
ws_handle
(
op
,
data
,
state
)
do
state
.
socket
.
handler
|>
apply
(
:handle
,
[
op
,
data
,
state
])
|>
case
do
{
op
,
data
}
->
{
:reply
,
{
op
,
data
},
state
}
{
op
,
data
,
state
}
->
{
:reply
,
{
op
,
data
},
state
}
%{}
=
state
->
{
:ok
,
state
}
_
->
{
:ok
,
state
}
end
end
def
ws_info
({
op
,
data
}
=
tuple
,
state
)
do
{
:reply
,
tuple
,
state
}
end
def
ws_info
(
_tuple
,
state
),
do
:
{
:ok
,
state
}
def
ws_close
(
state
)
do
ws_handle
(
:closed
,
:normal
,
state
)
end
def
ws_terminate
(
reason
,
state
)
do
ws_handle
(
:closed
,
reason
,
state
)
end
end
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment