websocket_handler.ex 3.33 KB
Newer Older
1
# Pleroma: A lightweight social networking server
kaniini's avatar
kaniini committed
2
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3
4
# SPDX-License-Identifier: AGPL-3.0-only

href's avatar
href committed
5
6
7
defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
  require Logger

Haelwenn's avatar
Haelwenn committed
8
9
  alias Pleroma.Repo
  alias Pleroma.User
10
  alias Pleroma.Web.OAuth.Token
href's avatar
href committed
11

12
  @behaviour :cowboy_websocket
href's avatar
href committed
13
14
15
16
17
18
19
20
21
22
23
24
25

  @streams [
    "public",
    "public:local",
    "public:media",
    "public:local:media",
    "user",
    "direct",
    "list",
    "hashtag"
  ]
  @anonymous_streams ["public", "public:local", "hashtag"]

26
27
28
29
  # Handled by periodic keepalive in Pleroma.Web.Streamer.
  @timeout :infinity

  def init(%{qs: qs} = req, state) do
30
    with params <- :cow_qs.parse_qs(qs),
href's avatar
href committed
31
32
33
34
         access_token <- List.keyfind(params, "access_token", 0),
         {_, stream} <- List.keyfind(params, "stream", 0),
         {:ok, user} <- allow_request(stream, access_token),
         topic when is_binary(topic) <- expand_topic(stream, params) do
35
      {:cowboy_websocket, req, %{user: user, topic: topic}, %{idle_timeout: @timeout}}
href's avatar
href committed
36
37
38
39
    else
      {:error, code} ->
        Logger.debug("#{__MODULE__} denied connection: #{inspect(code)} - #{inspect(req)}")
        {:ok, req} = :cowboy_req.reply(code, req)
40
        {:ok, req, state}
href's avatar
href committed
41
42
43

      error ->
        Logger.debug("#{__MODULE__} denied connection: #{inspect(error)} - #{inspect(req)}")
44
        {:ok, req} = :cowboy_req.reply(400, req)
45
        {:ok, req, state}
href's avatar
href committed
46
47
48
    end
  end

49
50
51
52
53
  def websocket_init(state) do
    send(self(), :subscribe)
    {:ok, state}
  end

href's avatar
href committed
54
  # We never receive messages.
55
56
  def websocket_handle(_frame, state) do
    {:ok, state}
href's avatar
href committed
57
58
  end

59
  def websocket_info(:subscribe, state) do
href's avatar
href committed
60
61
62
63
64
65
66
    Logger.debug(
      "#{__MODULE__} accepted websocket connection for user #{
        (state.user || %{id: "anonymous"}).id
      }, topic #{state.topic}"
    )

    Pleroma.Web.Streamer.add_socket(state.topic, streamer_socket(state))
67
    {:ok, state}
href's avatar
href committed
68
69
  end

70
71
  def websocket_info({:text, message}, state) do
    {:reply, {:text, message}, state}
href's avatar
href committed
72
73
  end

74
  def terminate(reason, _req, state) do
href's avatar
href committed
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
    Logger.debug(
      "#{__MODULE__} terminating websocket connection for user #{
        (state.user || %{id: "anonymous"}).id
      }, topic #{state.topic || "?"}: #{inspect(reason)}"
    )

    Pleroma.Web.Streamer.remove_socket(state.topic, streamer_socket(state))
    :ok
  end

  # Public streams without authentication.
  defp allow_request(stream, nil) when stream in @anonymous_streams do
    {:ok, nil}
  end

  # Authenticated streams.
  defp allow_request(stream, {"access_token", access_token}) when stream in @streams do
    with %Token{user_id: user_id} <- Repo.get_by(Token, token: access_token),
minibikini's avatar
minibikini committed
93
         user = %User{} <- User.get_cached_by_id(user_id) do
href's avatar
href committed
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
      {:ok, user}
    else
      _ -> {:error, 403}
    end
  end

  # Not authenticated.
  defp allow_request(stream, _) when stream in @streams, do: {:error, 403}

  # No matching stream.
  defp allow_request(_, _), do: {:error, 404}

  defp expand_topic("hashtag", params) do
    case List.keyfind(params, "tag", 0) do
      {_, tag} -> "hashtag:#{tag}"
      _ -> nil
    end
  end

  defp expand_topic("list", params) do
    case List.keyfind(params, "list", 0) do
      {_, list} -> "list:#{list}"
      _ -> nil
    end
  end

  defp expand_topic(topic, _), do: topic

  defp streamer_socket(state) do
    %{transport_pid: self(), assigns: state}
  end
end