federator.ex 5.46 KB
Newer Older
1
# Pleroma: A lightweight social networking server
kaniini's avatar
kaniini committed
2
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3
4
# SPDX-License-Identifier: AGPL-3.0-only

5
defmodule Pleroma.Web.Federator do
lain's avatar
lain committed
6
  alias Pleroma.Activity
Haelwenn's avatar
Haelwenn committed
7
8
9
10
  alias Pleroma.User
  alias Pleroma.Web.WebFinger
  alias Pleroma.Web.Websub
  alias Pleroma.Web.Salmon
11
  alias Pleroma.Web.ActivityPub.ActivityPub
12
  alias Pleroma.Web.ActivityPub.Relay
lain's avatar
lain committed
13
  alias Pleroma.Web.ActivityPub.Transmogrifier
14
  alias Pleroma.Web.ActivityPub.Utils
eal's avatar
eal committed
15
  alias Pleroma.Web.Federator.RetryQueue
16
  alias Pleroma.Web.OStatus
17
  alias Pleroma.Jobs
18

19
20
  require Logger

lain's avatar
lain committed
21
  @websub Application.get_env(:pleroma, :websub)
22
  @ostatus Application.get_env(:pleroma, :ostatus)
lain's avatar
lain committed
23

minibikini's avatar
minibikini committed
24
25
26
27
  def init() do
    # 1 minute
    Process.sleep(1000 * 60 * 1)
    refresh_subscriptions()
lain's avatar
lain committed
28
29
  end

minibikini's avatar
minibikini committed
30
31
32
33
34
35
36
37
38
39
40
  # Client API

  def incoming_doc(doc) do
    Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
  end

  def incoming_ap_doc(params) do
    Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
  end

  def publish(activity, priority \\ 1) do
minibikini's avatar
minibikini committed
41
    Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority)
minibikini's avatar
minibikini committed
42
43
44
  end

  def publish_single_ap(params) do
minibikini's avatar
minibikini committed
45
    Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish_single_ap, params])
minibikini's avatar
minibikini committed
46
  end
lain's avatar
lain committed
47

minibikini's avatar
minibikini committed
48
  def publish_single_websub(websub) do
minibikini's avatar
minibikini committed
49
    Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish_single_websub, websub])
lain's avatar
lain committed
50
  end
51

minibikini's avatar
minibikini committed
52
  def verify_websub(websub) do
minibikini's avatar
minibikini committed
53
    Jobs.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub])
minibikini's avatar
minibikini committed
54
55
56
  end

  def request_subscription(sub) do
minibikini's avatar
minibikini committed
57
    Jobs.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub])
minibikini's avatar
minibikini committed
58
59
60
  end

  def refresh_subscriptions() do
minibikini's avatar
minibikini committed
61
    Jobs.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions])
minibikini's avatar
minibikini committed
62
63
  end

64
  def publish_single_salmon(params) do
minibikini's avatar
minibikini committed
65
    Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish_single_salmon, params])
66
67
  end

minibikini's avatar
minibikini committed
68
69
70
  # Job Worker Callbacks

  def perform(:refresh_subscriptions) do
lain's avatar
lain committed
71
72
    Logger.debug("Federator running refresh subscriptions")
    Websub.refresh_subscriptions()
lain's avatar
lain committed
73

lain's avatar
lain committed
74
    spawn(fn ->
lain's avatar
lain committed
75
76
      # 6 hours
      Process.sleep(1000 * 60 * 60 * 6)
minibikini's avatar
minibikini committed
77
      refresh_subscriptions()
lain's avatar
lain committed
78
79
80
    end)
  end

minibikini's avatar
minibikini committed
81
  def perform(:request_subscription, websub) do
lain's avatar
lain committed
82
    Logger.debug("Refreshing #{websub.topic}")
lain's avatar
lain committed
83
84

    with {:ok, websub} <- Websub.request_subscription(websub) do
lain's avatar
lain committed
85
86
87
88
89
90
      Logger.debug("Successfully refreshed #{websub.topic}")
    else
      _e -> Logger.debug("Couldn't refresh #{websub.topic}")
    end
  end

minibikini's avatar
minibikini committed
91
  def perform(:publish, activity) do
lain's avatar
lain committed
92
    Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
lain's avatar
lain committed
93

94
    with actor when not is_nil(actor) <- User.get_cached_by_ap_id(activity.data["actor"]) do
lain's avatar
lain committed
95
      {:ok, actor} = WebFinger.ensure_keys_present(actor)
lain's avatar
lain committed
96

97
      if ActivityPub.is_public?(activity) do
98
99
100
        if OStatus.is_representable?(activity) do
          Logger.info(fn -> "Sending #{activity.data["id"]} out via WebSub" end)
          Websub.publish(Pleroma.Web.OStatus.feed_path(actor), actor, activity)
101

102
103
104
          Logger.info(fn -> "Sending #{activity.data["id"]} out via Salmon" end)
          Pleroma.Web.Salmon.publish(actor, activity)
        end
105

106
        if Keyword.get(Application.get_env(:pleroma, :instance), :allow_relay) do
kaniini's avatar
kaniini committed
107
          Logger.info(fn -> "Relaying #{activity.data["id"]} out" end)
108
          Relay.publish(activity)
kaniini's avatar
kaniini committed
109
        end
110
      end
lain's avatar
lain committed
111
112

      Logger.info(fn -> "Sending #{activity.data["id"]} out via AP" end)
113
      Pleroma.Web.ActivityPub.ActivityPub.publish(actor, activity)
114
115
116
    end
  end

minibikini's avatar
minibikini committed
117
  def perform(:verify_websub, websub) do
lain's avatar
lain committed
118
119
120
121
    Logger.debug(fn ->
      "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
    end)

lain's avatar
lain committed
122
    @websub.verify(websub)
123
124
  end

minibikini's avatar
minibikini committed
125
  def perform(:incoming_doc, doc) do
126
    Logger.info("Got document, trying to parse")
127
128
129
    @ostatus.handle_incoming(doc)
  end

minibikini's avatar
minibikini committed
130
  def perform(:incoming_ap_doc, params) do
feld's avatar
feld committed
131
    Logger.info("Handling incoming AP activity")
lain's avatar
lain committed
132

133
134
    params = Utils.normalize_params(params)

135
136
    # NOTE: we use the actor ID to do the containment, this is fine because an
    # actor shouldn't be acting on objects outside their own AP server.
lain's avatar
lain committed
137
    with {:ok, _user} <- ap_enabled_actor(params["actor"]),
138
         nil <- Activity.normalize(params["id"]),
139
         :ok <- Transmogrifier.contain_origin_from_id(params["actor"], params),
140
141
         {:ok, activity} <- Transmogrifier.handle_incoming(params) do
      {:ok, activity}
lain's avatar
lain committed
142
143
144
    else
      %Activity{} ->
        Logger.info("Already had #{params["id"]}")
145
        :error
lain's avatar
lain committed
146

feld's avatar
feld committed
147
      _e ->
lain's avatar
lain committed
148
149
        # Just drop those for now
        Logger.info("Unhandled activity")
lain's avatar
lain committed
150
        Logger.info(Poison.encode!(params, pretty: 2))
151
        :error
lain's avatar
lain committed
152
153
154
    end
  end

155
  def perform(:publish_single_salmon, params) do
156
    Salmon.send_to_user(params)
157
158
  end

minibikini's avatar
minibikini committed
159
  def perform(:publish_single_ap, params) do
eal's avatar
eal committed
160
161
162
163
164
    case ActivityPub.publish_one(params) do
      {:ok, _} ->
        :ok

      {:error, _} ->
eal's avatar
eal committed
165
        RetryQueue.enqueue(params, ActivityPub)
eal's avatar
eal committed
166
    end
lain's avatar
lain committed
167
168
  end

minibikini's avatar
minibikini committed
169
  def perform(
eal's avatar
eal committed
170
        :publish_single_websub,
Maksim's avatar
Maksim committed
171
        %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params
eal's avatar
eal committed
172
173
174
175
176
177
      ) do
    case Websub.publish_one(params) do
      {:ok, _} ->
        :ok

      {:error, _} ->
eal's avatar
eal committed
178
        RetryQueue.enqueue(params, Websub)
lain's avatar
lain committed
179
180
181
    end
  end

minibikini's avatar
minibikini committed
182
  def perform(type, _) do
lain's avatar
lain committed
183
    Logger.debug(fn -> "Unknown task: #{type}" end)
feld's avatar
feld committed
184
    {:error, "Don't know what to do with this"}
185
186
  end

lain's avatar
lain committed
187
188
  def ap_enabled_actor(id) do
    user = User.get_by_ap_id(id)
lain's avatar
lain committed
189

lain's avatar
lain committed
190
191
192
193
194
195
    if User.ap_enabled?(user) do
      {:ok, user}
    else
      ActivityPub.make_user_from_ap_id(id)
    end
  end
196
end