fed_registry.ex 5.19 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only

defmodule Pleroma.Web.FedSockets.FedRegistry do
  @moduledoc """
  The FedRegistry stores the active FedSockets for quick retrieval.

  The storage and retrieval portion of the FedRegistry is done in process through
  elixir's `Registry` module for speed and its ability to monitor for terminated processes.

  Dropped connections will be caught by `Registry` and deleted. Since the next
  message will initiate a new connection there is no reason to try and reconnect at that point.

  Normally outside modules should have no need to call or use the FedRegistry themselves.
  """

  alias Pleroma.Web.FedSockets.FedSocket
  alias Pleroma.Web.FedSockets.SocketInfo

  require Logger

  @default_rejection_duration 15 * 60 * 1000
  @rejections :fed_socket_rejections

  @doc """
  Retrieves a FedSocket from the Registry given it's origin.

  The origin is expected to be a string identifying the endpoint "example.com" or "example2.com:8080"

  Will return:
    * {:ok, fed_socket} for working FedSockets
    * {:error, :rejected} for origins that have been tried and refused within the rejection duration interval
    * {:error, some_reason} usually :missing for unknown origins
  """
  def get_fed_socket(origin) do
    case get_registry_data(origin) do
      {:error, reason} ->
        {:error, reason}

      {:ok, %{state: :connected} = socket_info} ->
        {:ok, socket_info}
    end
  end

  @doc """
  Adds a connected FedSocket to the Registry.

  Always returns {:ok, fed_socket}
  """
  def add_fed_socket(origin, pid \\ nil) do
    origin
    |> SocketInfo.build(pid)
    |> SocketInfo.connect()
    |> add_socket_info
  end

  defp add_socket_info(%{origin: origin, state: :connected} = socket_info) do
    case Registry.register(FedSockets.Registry, origin, socket_info) do
      {:ok, _owner} ->
        clear_prior_rejection(origin)
        Logger.debug("fedsocket added: #{inspect(origin)}")

        {:ok, socket_info}

      {:error, {:already_registered, _pid}} ->
        FedSocket.close(socket_info)
        existing_socket_info = Registry.lookup(FedSockets.Registry, origin)

        {:ok, existing_socket_info}

      _ ->
        {:error, :error_adding_socket}
    end
  end

  @doc """
  Mark this origin as having rejected a connection attempt.
  This will keep it from getting additional connection attempts
  for a period of time specified in the config.

  Always returns {:ok, new_reg_data}
  """
  def set_host_rejected(uri) do
    new_reg_data =
      uri
      |> SocketInfo.origin()
      |> get_or_create_registry_data()
      |> set_to_rejected()
      |> save_registry_data()

    {:ok, new_reg_data}
  end

  @doc """
  Retrieves the FedRegistryData from the Registry given it's origin.

  The origin is expected to be a string identifying the endpoint "example.com" or "example2.com:8080"

  Will return:
    * {:ok, fed_registry_data} for known origins
    * {:error, :missing} for uniknown origins
    * {:error, :cache_error} indicating some low level runtime issues
  """
  def get_registry_data(origin) do
    case Registry.lookup(FedSockets.Registry, origin) do
      [] ->
        if is_rejected?(origin) do
          Logger.debug("previously rejected fedsocket requested")
          {:error, :rejected}
        else
          {:error, :missing}
        end

      [{_pid, %{state: :connected} = socket_info}] ->
        {:ok, socket_info}

      _ ->
        {:error, :cache_error}
    end
  end

  @doc """
  Retrieves a map of all sockets from the Registry. The keys are the origins and the values are the corresponding SocketInfo
  """
  def list_all do
    (list_all_connected() ++ list_all_rejected())
    |> Enum.into(%{})
  end

  defp list_all_connected do
    FedSockets.Registry
    |> Registry.select([{{:"$1", :_, :"$3"}, [], [{{:"$1", :"$3"}}]}])
  end

  defp list_all_rejected do
    {:ok, keys} = Cachex.keys(@rejections)

    {:ok, registry_data} =
      Cachex.execute(@rejections, fn worker ->
        Enum.map(keys, fn k -> {k, Cachex.get!(worker, k)} end)
      end)

    registry_data
  end

  defp clear_prior_rejection(origin),
    do: Cachex.del(@rejections, origin)

  defp is_rejected?(origin) do
    case Cachex.get(@rejections, origin) do
      {:ok, nil} ->
        false

      {:ok, _} ->
        true
    end
  end

  defp get_or_create_registry_data(origin) do
    case get_registry_data(origin) do
      {:error, :missing} ->
        %SocketInfo{origin: origin}

      {:ok, socket_info} ->
        socket_info
    end
  end

  defp save_registry_data(%SocketInfo{origin: origin, state: :connected} = socket_info) do
    {:ok, true} = Registry.update_value(FedSockets.Registry, origin, fn _ -> socket_info end)
    socket_info
  end

  defp save_registry_data(%SocketInfo{origin: origin, state: :rejected} = socket_info) do
    rejection_expiration =
      Pleroma.Config.get([:fed_sockets, :rejection_duration], @default_rejection_duration)

    {:ok, true} = Cachex.put(@rejections, origin, socket_info, ttl: rejection_expiration)
    socket_info
  end

  defp set_to_rejected(%SocketInfo{} = socket_info),
    do: %SocketInfo{socket_info | state: :rejected}
end