federator.ex 4.28 KB
Newer Older
1
# Pleroma: A lightweight social networking server
kaniini's avatar
kaniini committed
2
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
3
4
# SPDX-License-Identifier: AGPL-3.0-only

5
defmodule Pleroma.Web.Federator do
lain's avatar
lain committed
6
  alias Pleroma.Activity
rinpatch's avatar
rinpatch committed
7
  alias Pleroma.Object.Containment
Haelwenn's avatar
Haelwenn committed
8
  alias Pleroma.User
9
  alias Pleroma.Web.ActivityPub.ActivityPub
lain's avatar
lain committed
10
  alias Pleroma.Web.ActivityPub.Transmogrifier
11
  alias Pleroma.Web.ActivityPub.Utils
12
  alias Pleroma.Web.Federator.Publisher
eal's avatar
eal committed
13
  alias Pleroma.Web.Federator.RetryQueue
14
  alias Pleroma.Web.OStatus
15
  alias Pleroma.Web.Websub
16

17
18
  require Logger

19
  def init do
minibikini's avatar
minibikini committed
20
    # 1 minute
Haelwenn's avatar
Haelwenn committed
21
    Process.sleep(1000 * 60)
minibikini's avatar
minibikini committed
22
    refresh_subscriptions()
lain's avatar
lain committed
23
24
  end

25
26
27
28
29
30
  @max_replies_depth 100

  @doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)"
  # credo:disable-for-previous-line Credo.Check.Readability.MaxLineLength
  def max_replies_depth, do: @max_replies_depth

minibikini's avatar
minibikini committed
31
32
33
  # Client API

  def incoming_doc(doc) do
34
    PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
minibikini's avatar
minibikini committed
35
36
37
  end

  def incoming_ap_doc(params) do
38
    PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
minibikini's avatar
minibikini committed
39
40
41
  end

  def publish(activity, priority \\ 1) do
42
    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority)
minibikini's avatar
minibikini committed
43
44
45
  end

  def verify_websub(websub) do
46
    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub])
minibikini's avatar
minibikini committed
47
48
49
  end

  def request_subscription(sub) do
50
    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub])
minibikini's avatar
minibikini committed
51
52
  end

53
  def refresh_subscriptions do
54
    PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions])
minibikini's avatar
minibikini committed
55
  end
lain's avatar
lain committed
56

minibikini's avatar
minibikini committed
57
58
59
  # Job Worker Callbacks

  def perform(:refresh_subscriptions) do
lain's avatar
lain committed
60
61
    Logger.debug("Federator running refresh subscriptions")
    Websub.refresh_subscriptions()
lain's avatar
lain committed
62

lain's avatar
lain committed
63
    spawn(fn ->
lain's avatar
lain committed
64
65
      # 6 hours
      Process.sleep(1000 * 60 * 60 * 6)
minibikini's avatar
minibikini committed
66
      refresh_subscriptions()
lain's avatar
lain committed
67
68
69
    end)
  end

minibikini's avatar
minibikini committed
70
  def perform(:request_subscription, websub) do
lain's avatar
lain committed
71
    Logger.debug("Refreshing #{websub.topic}")
lain's avatar
lain committed
72
73

    with {:ok, websub} <- Websub.request_subscription(websub) do
lain's avatar
lain committed
74
75
76
77
78
79
      Logger.debug("Successfully refreshed #{websub.topic}")
    else
      _e -> Logger.debug("Couldn't refresh #{websub.topic}")
    end
  end

minibikini's avatar
minibikini committed
80
  def perform(:publish, activity) do
lain's avatar
lain committed
81
    Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
lain's avatar
lain committed
82

83
84
    with %User{} = actor <- User.get_cached_by_ap_id(activity.data["actor"]),
         {:ok, actor} <- User.ensure_keys_present(actor) do
85
      Publisher.publish(actor, activity)
86
87
88
    end
  end

minibikini's avatar
minibikini committed
89
  def perform(:verify_websub, websub) do
lain's avatar
lain committed
90
91
92
93
    Logger.debug(fn ->
      "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
    end)

94
    Websub.verify(websub)
95
96
  end

minibikini's avatar
minibikini committed
97
  def perform(:incoming_doc, doc) do
98
    Logger.info("Got document, trying to parse")
99
    OStatus.handle_incoming(doc)
100
101
  end

minibikini's avatar
minibikini committed
102
  def perform(:incoming_ap_doc, params) do
feld's avatar
feld committed
103
    Logger.info("Handling incoming AP activity")
lain's avatar
lain committed
104

105
106
    params = Utils.normalize_params(params)

107
108
    # NOTE: we use the actor ID to do the containment, this is fine because an
    # actor shouldn't be acting on objects outside their own AP server.
lain's avatar
lain committed
109
    with {:ok, _user} <- ap_enabled_actor(params["actor"]),
110
         nil <- Activity.normalize(params["id"]),
111
         :ok <- Containment.contain_origin_from_id(params["actor"], params),
112
113
         {:ok, activity} <- Transmogrifier.handle_incoming(params) do
      {:ok, activity}
lain's avatar
lain committed
114
115
116
    else
      %Activity{} ->
        Logger.info("Already had #{params["id"]}")
117
        :error
lain's avatar
lain committed
118

feld's avatar
feld committed
119
      _e ->
lain's avatar
lain committed
120
121
        # Just drop those for now
        Logger.info("Unhandled activity")
feld's avatar
feld committed
122
        Logger.info(Jason.encode!(params, pretty: true))
123
        :error
lain's avatar
lain committed
124
125
126
    end
  end

minibikini's avatar
minibikini committed
127
  def perform(
eal's avatar
eal committed
128
        :publish_single_websub,
Maksim's avatar
Maksim committed
129
        %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params
eal's avatar
eal committed
130
131
132
133
134
135
      ) do
    case Websub.publish_one(params) do
      {:ok, _} ->
        :ok

      {:error, _} ->
eal's avatar
eal committed
136
        RetryQueue.enqueue(params, Websub)
lain's avatar
lain committed
137
138
139
    end
  end

minibikini's avatar
minibikini committed
140
  def perform(type, _) do
lain's avatar
lain committed
141
    Logger.debug(fn -> "Unknown task: #{type}" end)
feld's avatar
feld committed
142
    {:error, "Don't know what to do with this"}
143
144
  end

lain's avatar
lain committed
145
  def ap_enabled_actor(id) do
minibikini's avatar
minibikini committed
146
    user = User.get_cached_by_ap_id(id)
lain's avatar
lain committed
147

lain's avatar
lain committed
148
149
150
151
152
153
    if User.ap_enabled?(user) do
      {:ok, user}
    else
      ActivityPub.make_user_from_ap_id(id)
    end
  end
154
end