fed_sockets.ex 6.26 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only

defmodule Pleroma.Web.FedSockets do
  @moduledoc """
  This documents the FedSockets framework. A framework for federating
  ActivityPub objects between servers via persistant WebSocket connections.

  FedSockets allow servers to authenticate on first contact and maintain that
  connection, eliminating the need to authenticate every time data needs to be shared.

  ## Protocol
  FedSockets currently support 2 types of data transfer:
    * `publish` method which doesn't require a response
    * `fetch` method requires a response be sent

    ### Publish
    The publish operation sends a json encoded map of the shape:
      %{action: :publish, data: json}
    and accepts (but does not require) a reply of form:
      %{"action" => "publish_reply"}

    The outgoing params represent
      * data: ActivityPub object encoded into json


    ### Fetch
    The fetch operation sends a json encoded map of the shape:
      %{action: :fetch, data: id, uuid: fetch_uuid}
    and requires a reply of form:
      %{"action" => "fetch_reply", "uuid" => uuid, "data" => data}

    The outgoing params represent
      * id: an ActivityPub object URI
      * uuid: a unique uuid generated by the sender

    The reply params represent
      * data: an ActivityPub object encoded into json
      * uuid: the uuid sent along with the fetch request

  ## Examples
  Clients of FedSocket transfers shouldn't need to use any of the functions outside of this module.

  A typical publish operation can be performed through the following code, and a fetch operation in a similar manner.

    case FedSockets.get_or_create_fed_socket(inbox) do
      {:ok, fedsocket} ->
        FedSockets.publish(fedsocket, json)

      _ ->
        alternative_publish(inbox, actor, json, params)
    end

  ## Configuration
  FedSockets have the following config settings

  config :pleroma, :fed_sockets,
  enabled: true,
  ping_interval: :timer.seconds(15),
  connection_duration: :timer.hours(1),
  rejection_duration: :timer.hours(1),
  fed_socket_fetches: [
    default: 12_000,
    interval: 3_000,
    lazy: false
  ]
    * enabled - turn FedSockets on or off with this flag. Can be toggled at runtime.
    * connection_duration - How long a FedSocket can sit idle before it's culled.
    * rejection_duration - After failing to make a FedSocket connection a host will be excluded
    from further connections for this amount of time
    * fed_socket_fetches - Use these parameters to pass options to the Cachex queue backing the FetchRegistry
    * fed_socket_rejections - Use these parameters to pass options to the Cachex queue backing the FedRegistry

    Cachex options are
      * default: the minimum amount of time a fetch can wait before it times out.
      * interval: the interval between checks for timed out entries. This plus the default represent the maximum time allowed
      * lazy: leave at false for consistant and fast lookups, set to true for stricter timeout enforcement

  """
  require Logger

  alias Pleroma.Web.FedSockets.FedRegistry
  alias Pleroma.Web.FedSockets.FedSocket
  alias Pleroma.Web.FedSockets.SocketInfo

  @doc """
  returns a FedSocket for the given origin. Will reuse an existing one or create a new one.

  address is expected to be a fully formed URL such as:
  "http://www.example.com" or "http://www.example.com:8080"

  It can and usually does include additional path parameters,
  but these are ignored as the FedSockets are organized by host and port info alone.
  """
  def get_or_create_fed_socket(address) do
    with {:cache, {:error, :missing}} <- {:cache, get_fed_socket(address)},
         {:connect, {:ok, _pid}} <- {:connect, FedSocket.connect_to_host(address)},
         {:cache, {:ok, fed_socket}} <- {:cache, get_fed_socket(address)} do
      Logger.debug("fedsocket created for - #{inspect(address)}")
      {:ok, fed_socket}
    else
      {:cache, {:ok, socket}} ->
        Logger.debug("fedsocket found in cache - #{inspect(address)}")
        {:ok, socket}

107
108
109
      {:cache, {:error, :rejected} = e} ->
        e

110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
      {:connect, {:error, _host}} ->
        Logger.debug("set host rejected for - #{inspect(address)}")
        FedRegistry.set_host_rejected(address)
        {:error, :rejected}

      {_, {:error, :disabled}} ->
        {:error, :disabled}

      {_, {:error, reason}} ->
        Logger.warn("get_or_create_fed_socket error - #{inspect(reason)}")
        {:error, reason}
    end
  end

  @doc """
  returns a FedSocket for the given origin. Will not create a new FedSocket if one does not exist.

  address is expected to be a fully formed URL such as:
    "http://www.example.com" or "http://www.example.com:8080"
  """
  def get_fed_socket(address) do
    origin = SocketInfo.origin(address)

    with {:config, true} <- {:config, Pleroma.Config.get([:fed_sockets, :enabled], false)},
         {:ok, socket} <- FedRegistry.get_fed_socket(origin) do
      {:ok, socket}
    else
      {:config, _} ->
        {:error, :disabled}

      {:error, :rejected} ->
        Logger.debug("FedSocket previously rejected - #{inspect(origin)}")
        {:error, :rejected}

      {:error, reason} ->
        {:error, reason}
    end
  end

  @doc """
  Sends the supplied data via the publish protocol.
  It will not block waiting for a reply.
  Returns :ok but this is not an indication of a successful transfer.

  the data is expected to be JSON encoded binary data.
  """
  def publish(%SocketInfo{} = fed_socket, json) do
    FedSocket.publish(fed_socket, json)
  end

  @doc """
  Sends the supplied data via the fetch protocol.
  It will block waiting for a reply or timeout.

  Returns {:ok, object} where object is the requested object (or nil)
          {:error, :timeout} in the event the message was not responded to

  the id is expected to be the URI of an ActivityPub object.
  """
  def fetch(%SocketInfo{} = fed_socket, id) do
    FedSocket.fetch(fed_socket, id)
  end

  @doc """
  Disconnect all and restart FedSockets.
  This is mainly used in development and testing but could be useful in production.
  """
  def reset do
    FedRegistry
    |> Process.whereis()
    |> Process.exit(:testing)
  end

  def uri_for_origin(origin),
    do: "ws://#{origin}/api/fedsocket/v1"
end