Skip to content
Snippets Groups Projects
Verified Commit 77b5154c authored by href's avatar href
Browse files

Cowboy handler for Mastodon WebSocket

parent 5dcb7aec
No related branches found
No related tags found
No related merge requests found
......@@ -50,6 +50,15 @@ config :pleroma, :uri_schemes,
# Configures the endpoint
config :pleroma, Pleroma.Web.Endpoint,
url: [host: "localhost"],
http: [
dispatch: [
{:_,
[
{"/api/v1/streaming", Elixir.Pleroma.Web.MastodonAPI.WebsocketHandler, []},
{:_, Plug.Adapters.Cowboy.Handler, {Pleroma.Web.Endpoint, []}}
]}
]
],
protocol: "https",
secret_key_base: "aK4Abxf29xU9TTDKre9coZPUgevcVCFQJe/5xP/7Lt4BEif6idBIbjupVbOrbKxl",
signing_salt: "CqaoopA2",
......
......@@ -3,8 +3,6 @@ defmodule Pleroma.Web.Endpoint do
socket("/socket", Pleroma.Web.UserSocket)
socket("/api/v1", Pleroma.Web.MastodonAPI.MastodonSocket, websocket: [path: "/streaming"])
# Serve at "/" the static files from "priv/static" directory.
#
# You should set gzip to true if you are running phoenix.digest
......
defmodule Pleroma.Web.MastodonAPI.MastodonSocket do
use Phoenix.Socket
alias Pleroma.Web.OAuth.Token
alias Pleroma.{User, Repo}
@spec connect(params :: map(), Phoenix.Socket.t()) :: {:ok, Phoenix.Socket.t()} | :error
def connect(%{"access_token" => token} = params, socket) do
with %Token{user_id: user_id} <- Repo.get_by(Token, token: token),
%User{} = user <- Repo.get(User, user_id),
stream
when stream in [
"public",
"public:local",
"public:media",
"public:local:media",
"user",
"direct",
"list",
"hashtag"
] <- params["stream"] do
topic =
case stream do
"hashtag" -> "hashtag:#{params["tag"]}"
"list" -> "list:#{params["list"]}"
_ -> stream
end
socket =
socket
|> assign(:topic, topic)
|> assign(:user, user)
Pleroma.Web.Streamer.add_socket(topic, socket)
{:ok, socket}
else
_e -> :error
end
end
def connect(%{"stream" => stream} = params, socket)
when stream in ["public", "public:local", "hashtag"] do
topic =
case stream do
"hashtag" -> "hashtag:#{params["tag"]}"
_ -> stream
end
socket =
socket
|> assign(:topic, topic)
Pleroma.Web.Streamer.add_socket(topic, socket)
{:ok, socket}
end
def connect(_params, _socket), do: :error
def id(_), do: nil
def handle(:text, message, _state) do
# | :ok
# | state
# | {:text, message}
# | {:text, message, state}
# | {:close, "Goodbye!"}
{:text, message}
end
def handle(:closed, _, %{socket: socket}) do
topic = socket.assigns[:topic]
Pleroma.Web.Streamer.remove_socket(topic, socket)
end
end
defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
require Logger
alias Pleroma.Web.OAuth.Token
alias Pleroma.{User, Repo}
@behaviour :cowboy_websocket_handler
@streams [
"public",
"public:local",
"public:media",
"public:local:media",
"user",
"direct",
"list",
"hashtag"
]
@anonymous_streams ["public", "public:local", "hashtag"]
# Handled by periodic keepalive in Pleroma.Web.Streamer.
@timeout :infinity
def init(_type, _req, _opts) do
{:upgrade, :protocol, :cowboy_websocket}
end
def websocket_init(_type, req, _opts) do
with {qs, req} <- :cowboy_req.qs(req),
params <- :cow_qs.parse_qs(qs),
access_token <- List.keyfind(params, "access_token", 0),
{_, stream} <- List.keyfind(params, "stream", 0),
{:ok, user} <- allow_request(stream, access_token),
topic when is_binary(topic) <- expand_topic(stream, params) do
send(self(), :subscribe)
{:ok, req, %{user: user, topic: topic}, @timeout}
else
{:error, code} ->
Logger.debug("#{__MODULE__} denied connection: #{inspect(code)} - #{inspect(req)}")
{:ok, req} = :cowboy_req.reply(code, req)
{:shutdown, req}
error ->
Logger.debug("#{__MODULE__} denied connection: #{inspect(error)} - #{inspect(req)}")
{:shutdown, req}
end
end
# We never receive messages.
def websocket_handle(_frame, req, state) do
{:ok, req, state}
end
def websocket_info(:subscribe, req, state) do
Logger.debug(
"#{__MODULE__} accepted websocket connection for user #{
(state.user || %{id: "anonymous"}).id
}, topic #{state.topic}"
)
Pleroma.Web.Streamer.add_socket(state.topic, streamer_socket(state))
{:ok, req, state}
end
def websocket_info({:text, message}, req, state) do
{:reply, {:text, message}, req, state}
end
def websocket_terminate(reason, _req, state) do
Logger.debug(
"#{__MODULE__} terminating websocket connection for user #{
(state.user || %{id: "anonymous"}).id
}, topic #{state.topic || "?"}: #{inspect(reason)}"
)
Pleroma.Web.Streamer.remove_socket(state.topic, streamer_socket(state))
:ok
end
# Public streams without authentication.
defp allow_request(stream, nil) when stream in @anonymous_streams do
{:ok, nil}
end
# Authenticated streams.
defp allow_request(stream, {"access_token", access_token}) when stream in @streams do
with %Token{user_id: user_id} <- Repo.get_by(Token, token: access_token),
user = %User{} <- Repo.get(User, user_id) do
{:ok, user}
else
_ -> {:error, 403}
end
end
# Not authenticated.
defp allow_request(stream, _) when stream in @streams, do: {:error, 403}
# No matching stream.
defp allow_request(_, _), do: {:error, 404}
defp expand_topic("hashtag", params) do
case List.keyfind(params, "tag", 0) do
{_, tag} -> "hashtag:#{tag}"
_ -> nil
end
end
defp expand_topic("list", params) do
case List.keyfind(params, "list", 0) do
{_, list} -> "list:#{list}"
_ -> nil
end
end
defp expand_topic(topic, _), do: topic
defp streamer_socket(state) do
%{transport_pid: self(), assigns: state}
end
end
......@@ -4,17 +4,9 @@ defmodule Pleroma.Web.Streamer do
alias Pleroma.{User, Notification, Activity, Object, Repo}
alias Pleroma.Web.ActivityPub.ActivityPub
def init(args) do
{:ok, args}
end
@keepalive_interval :timer.seconds(30)
def start_link do
spawn(fn ->
# 30 seconds
Process.sleep(1000 * 30)
GenServer.cast(__MODULE__, %{action: :ping})
end)
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
......@@ -30,6 +22,16 @@ defmodule Pleroma.Web.Streamer do
GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
end
def init(args) do
spawn(fn ->
# 30 seconds
Process.sleep(@keepalive_interval)
GenServer.cast(__MODULE__, %{action: :ping})
end)
{:ok, args}
end
def handle_cast(%{action: :ping}, topics) do
Map.values(topics)
|> List.flatten()
......@@ -40,7 +42,7 @@ defmodule Pleroma.Web.Streamer do
spawn(fn ->
# 30 seconds
Process.sleep(1000 * 30)
Process.sleep(@keepalive_interval)
GenServer.cast(__MODULE__, %{action: :ping})
end)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment