federator.ex 5.61 KB
Newer Older
1
# Pleroma: A lightweight social networking server
kaniini's avatar
kaniini committed
2
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3
4
# SPDX-License-Identifier: AGPL-3.0-only

5
defmodule Pleroma.Web.Federator do
lain's avatar
lain committed
6
  alias Pleroma.Activity
rinpatch's avatar
rinpatch committed
7
  alias Pleroma.Object.Containment
Haelwenn's avatar
Haelwenn committed
8
  alias Pleroma.User
9
  alias Pleroma.Web.ActivityPub.ActivityPub
10
  alias Pleroma.Web.ActivityPub.Relay
lain's avatar
lain committed
11
  alias Pleroma.Web.ActivityPub.Transmogrifier
12
  alias Pleroma.Web.ActivityPub.Utils
13
  alias Pleroma.Web.ActivityPub.Visibility
eal's avatar
eal committed
14
  alias Pleroma.Web.Federator.RetryQueue
15
  alias Pleroma.Web.OStatus
16
17
18
  alias Pleroma.Web.Salmon
  alias Pleroma.Web.WebFinger
  alias Pleroma.Web.Websub
19

20
21
  require Logger

lain's avatar
lain committed
22
  @websub Application.get_env(:pleroma, :websub)
23
  @ostatus Application.get_env(:pleroma, :ostatus)
lain's avatar
lain committed
24

25
  def init do
minibikini's avatar
minibikini committed
26
    # 1 minute
Haelwenn's avatar
Haelwenn committed
27
    Process.sleep(1000 * 60)
minibikini's avatar
minibikini committed
28
    refresh_subscriptions()
lain's avatar
lain committed
29
30
  end

minibikini's avatar
minibikini committed
31
32
33
  # Client API

  def incoming_doc(doc) do
34
    PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
minibikini's avatar
minibikini committed
35
36
37
  end

  def incoming_ap_doc(params) do
38
    PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
minibikini's avatar
minibikini committed
39
40
41
  end

  def publish(activity, priority \\ 1) do
42
    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority)
minibikini's avatar
minibikini committed
43
44
45
  end

  def publish_single_ap(params) do
46
    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_single_ap, params])
minibikini's avatar
minibikini committed
47
  end
lain's avatar
lain committed
48

minibikini's avatar
minibikini committed
49
  def publish_single_websub(websub) do
50
    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_single_websub, websub])
lain's avatar
lain committed
51
  end
52

minibikini's avatar
minibikini committed
53
  def verify_websub(websub) do
54
    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub])
minibikini's avatar
minibikini committed
55
56
57
  end

  def request_subscription(sub) do
58
    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub])
minibikini's avatar
minibikini committed
59
60
  end

61
  def refresh_subscriptions do
62
    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions])
minibikini's avatar
minibikini committed
63
  end
lain's avatar
lain committed
64

65
  def publish_single_salmon(params) do
66
    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_single_salmon, params])
lain's avatar
lain committed
67
  end
68

minibikini's avatar
minibikini committed
69
70
71
  # Job Worker Callbacks

  def perform(:refresh_subscriptions) do
lain's avatar
lain committed
72
73
    Logger.debug("Federator running refresh subscriptions")
    Websub.refresh_subscriptions()
lain's avatar
lain committed
74

lain's avatar
lain committed
75
    spawn(fn ->
lain's avatar
lain committed
76
77
      # 6 hours
      Process.sleep(1000 * 60 * 60 * 6)
minibikini's avatar
minibikini committed
78
      refresh_subscriptions()
lain's avatar
lain committed
79
80
81
    end)
  end

minibikini's avatar
minibikini committed
82
  def perform(:request_subscription, websub) do
lain's avatar
lain committed
83
    Logger.debug("Refreshing #{websub.topic}")
lain's avatar
lain committed
84
85

    with {:ok, websub} <- Websub.request_subscription(websub) do
lain's avatar
lain committed
86
87
88
89
90
91
      Logger.debug("Successfully refreshed #{websub.topic}")
    else
      _e -> Logger.debug("Couldn't refresh #{websub.topic}")
    end
  end

minibikini's avatar
minibikini committed
92
  def perform(:publish, activity) do
lain's avatar
lain committed
93
    Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
lain's avatar
lain committed
94

95
    with actor when not is_nil(actor) <- User.get_cached_by_ap_id(activity.data["actor"]) do
lain's avatar
lain committed
96
      {:ok, actor} = WebFinger.ensure_keys_present(actor)
lain's avatar
lain committed
97

lain's avatar
lain committed
98
      if Visibility.is_public?(activity) do
99
100
101
        if OStatus.is_representable?(activity) do
          Logger.info(fn -> "Sending #{activity.data["id"]} out via WebSub" end)
          Websub.publish(Pleroma.Web.OStatus.feed_path(actor), actor, activity)
102

103
104
105
          Logger.info(fn -> "Sending #{activity.data["id"]} out via Salmon" end)
          Pleroma.Web.Salmon.publish(actor, activity)
        end
106

107
        if Keyword.get(Application.get_env(:pleroma, :instance), :allow_relay) do
kaniini's avatar
kaniini committed
108
          Logger.info(fn -> "Relaying #{activity.data["id"]} out" end)
109
          Relay.publish(activity)
kaniini's avatar
kaniini committed
110
        end
111
      end
lain's avatar
lain committed
112
113

      Logger.info(fn -> "Sending #{activity.data["id"]} out via AP" end)
114
      Pleroma.Web.ActivityPub.ActivityPub.publish(actor, activity)
115
116
117
    end
  end

minibikini's avatar
minibikini committed
118
  def perform(:verify_websub, websub) do
lain's avatar
lain committed
119
120
121
122
    Logger.debug(fn ->
      "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
    end)

lain's avatar
lain committed
123
    @websub.verify(websub)
124
125
  end

minibikini's avatar
minibikini committed
126
  def perform(:incoming_doc, doc) do
127
    Logger.info("Got document, trying to parse")
128
129
130
    @ostatus.handle_incoming(doc)
  end

minibikini's avatar
minibikini committed
131
  def perform(:incoming_ap_doc, params) do
feld's avatar
feld committed
132
    Logger.info("Handling incoming AP activity")
lain's avatar
lain committed
133

134
135
    params = Utils.normalize_params(params)

136
137
    # NOTE: we use the actor ID to do the containment, this is fine because an
    # actor shouldn't be acting on objects outside their own AP server.
lain's avatar
lain committed
138
    with {:ok, _user} <- ap_enabled_actor(params["actor"]),
139
         nil <- Activity.normalize(params["id"]),
140
         :ok <- Containment.contain_origin_from_id(params["actor"], params),
141
142
         {:ok, activity} <- Transmogrifier.handle_incoming(params) do
      {:ok, activity}
lain's avatar
lain committed
143
144
145
    else
      %Activity{} ->
        Logger.info("Already had #{params["id"]}")
146
        :error
lain's avatar
lain committed
147

feld's avatar
feld committed
148
      _e ->
lain's avatar
lain committed
149
150
        # Just drop those for now
        Logger.info("Unhandled activity")
lain's avatar
lain committed
151
        Logger.info(Poison.encode!(params, pretty: 2))
152
        :error
lain's avatar
lain committed
153
154
155
    end
  end

156
  def perform(:publish_single_salmon, params) do
157
    Salmon.send_to_user(params)
158
159
  end

minibikini's avatar
minibikini committed
160
  def perform(:publish_single_ap, params) do
eal's avatar
eal committed
161
162
163
164
165
    case ActivityPub.publish_one(params) do
      {:ok, _} ->
        :ok

      {:error, _} ->
eal's avatar
eal committed
166
        RetryQueue.enqueue(params, ActivityPub)
eal's avatar
eal committed
167
    end
lain's avatar
lain committed
168
169
  end

minibikini's avatar
minibikini committed
170
  def perform(
eal's avatar
eal committed
171
        :publish_single_websub,
Maksim's avatar
Maksim committed
172
        %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params
eal's avatar
eal committed
173
174
175
176
177
178
      ) do
    case Websub.publish_one(params) do
      {:ok, _} ->
        :ok

      {:error, _} ->
eal's avatar
eal committed
179
        RetryQueue.enqueue(params, Websub)
lain's avatar
lain committed
180
181
182
    end
  end

minibikini's avatar
minibikini committed
183
  def perform(type, _) do
lain's avatar
lain committed
184
    Logger.debug(fn -> "Unknown task: #{type}" end)
feld's avatar
feld committed
185
    {:error, "Don't know what to do with this"}
186
187
  end

lain's avatar
lain committed
188
  def ap_enabled_actor(id) do
minibikini's avatar
minibikini committed
189
    user = User.get_cached_by_ap_id(id)
lain's avatar
lain committed
190

lain's avatar
lain committed
191
192
193
194
195
196
    if User.ap_enabled?(user) do
      {:ok, user}
    else
      ActivityPub.make_user_from_ap_id(id)
    end
  end
197
end