fetcher.ex 8.63 KB
Newer Older
1
# Pleroma: A lightweight social networking server
2
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3
4
# SPDX-License-Identifier: AGPL-3.0-only

5
defmodule Pleroma.Object.Fetcher do
kaniini's avatar
kaniini committed
6
  alias Pleroma.HTTP
7
  alias Pleroma.Instances
Haelwenn's avatar
Haelwenn committed
8
  alias Pleroma.Maps
9
  alias Pleroma.Object
10
  alias Pleroma.Object.Containment
rinpatch's avatar
rinpatch committed
11
  alias Pleroma.Repo
12
13
  alias Pleroma.Signature
  alias Pleroma.Web.ActivityPub.InternalFetchActor
14
  alias Pleroma.Web.ActivityPub.ObjectValidator
15
  alias Pleroma.Web.ActivityPub.Transmogrifier
16
  alias Pleroma.Web.Federator
17
18

  require Logger
19
  require Pleroma.Constants
20

21
22
23
24
25
26
27
28
  defp touch_changeset(changeset) do
    updated_at =
      NaiveDateTime.utc_now()
      |> NaiveDateTime.truncate(:second)

    Ecto.Changeset.put_change(changeset, :updated_at, updated_at)
  end

29
  defp maybe_reinject_internal_fields(%{data: %{} = old_data}, new_data) do
30
31
32
33
34
    has_history? = fn
      %{"formerRepresentations" => %{"orderedItems" => list}} when is_list(list) -> true
      _ -> false
    end

35
36
    internal_fields = Map.take(old_data, Pleroma.Constants.object_internal_fields())

37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
    remote_history_exists? = has_history?.(new_data)

    # If the remote history exists, we treat that as the only source of truth.
    new_data =
      if has_history?.(old_data) and not remote_history_exists? do
        Map.put(new_data, "formerRepresentations", old_data["formerRepresentations"])
      else
        new_data
      end

    # If the remote does not have history information, we need to manage it ourselves
    new_data =
      if not remote_history_exists? do
        changed? =
          Pleroma.Constants.status_updatable_fields()
          |> Enum.any?(fn field -> Map.get(old_data, field) != Map.get(new_data, field) end)

tusooa's avatar
tusooa committed
54
55
56
57
58
59
60
61
        %{updated_object: updated_object} =
          new_data
          |> Object.Updater.maybe_update_history(old_data,
            updated: changed?,
            use_history_in_new_object?: false
          )

        updated_object
62
63
64
65
      else
        new_data
      end

66
    Map.merge(new_data, internal_fields)
67
68
  end

69
  defp maybe_reinject_internal_fields(_, new_data), do: new_data
70

71
  @spec reinject_object(struct(), map()) :: {:ok, Object.t()} | {:error, any()}
72
73
74
  defp reinject_object(%Object{data: %{"type" => "Question"}} = object, new_data) do
    Logger.debug("Reinjecting object #{new_data["id"]}")

75
    with data <- maybe_reinject_internal_fields(object, new_data),
76
77
78
79
80
81
82
83
84
85
86
87
88
         {:ok, data, _} <- ObjectValidator.validate(data, %{}),
         changeset <- Object.change(object, %{data: data}),
         changeset <- touch_changeset(changeset),
         {:ok, object} <- Repo.insert_or_update(changeset),
         {:ok, object} <- Object.set_cache(object) do
      {:ok, object}
    else
      e ->
        Logger.error("Error while processing object: #{inspect(e)}")
        {:error, e}
    end
  end

89
90
  defp reinject_object(%Object{} = object, new_data) do
    Logger.debug("Reinjecting object #{new_data["id"]}")
91

92
93
94
    with new_data <- Transmogrifier.fix_object(new_data),
         data <- maybe_reinject_internal_fields(object, new_data),
         changeset <- Object.change(object, %{data: data}),
95
         changeset <- touch_changeset(changeset),
96
97
         {:ok, object} <- Repo.insert_or_update(changeset),
         {:ok, object} <- Object.set_cache(object) do
98
99
100
101
102
103
104
105
      {:ok, object}
    else
      e ->
        Logger.error("Error while processing object: #{inspect(e)}")
        {:error, e}
    end
  end

rinpatch's avatar
rinpatch committed
106
  def refetch_object(%Object{data: %{"id" => id}} = object) do
107
    with {:local, false} <- {:local, Object.local?(object)},
108
109
         {:ok, new_data} <- fetch_and_contain_remote_object_from_id(id),
         {:ok, object} <- reinject_object(object, new_data) do
rinpatch's avatar
rinpatch committed
110
111
      {:ok, object}
    else
112
      {:local, true} -> {:ok, object}
rinpatch's avatar
rinpatch committed
113
114
115
116
      e -> {:error, e}
    end
  end

117
  # Note: will create a Create activity, which we need internally at the moment.
118
  def fetch_object_from_id(id, options \\ []) do
119
120
121
    with {_, nil} <- {:fetch_object, Object.get_cached_by_ap_id(id)},
         {_, true} <- {:allowed_depth, Federator.allowed_thread_distance?(options[:depth])},
         {_, {:ok, data}} <- {:fetch, fetch_and_contain_remote_object_from_id(id)},
122
         {_, nil} <- {:normalize, Object.normalize(data, fetch: false)},
Maksim's avatar
Maksim committed
123
         params <- prepare_activity_params(data),
124
125
         {_, :ok} <- {:containment, Containment.contain_origin(id, params)},
         {_, {:ok, activity}} <-
kaniini's avatar
kaniini committed
126
           {:transmogrifier, Transmogrifier.handle_incoming(params, options)},
127
         {_, _data, %Object{} = object} <-
128
           {:object, data, Object.normalize(activity, fetch: false)} do
129
130
      {:ok, object}
    else
131
132
133
      {:allowed_depth, false} ->
        {:error, "Max thread distance exceeded."}

Maksim's avatar
Maksim committed
134
135
      {:containment, _} ->
        {:error, "Object containment failed."}
136

137
138
      {:transmogrifier, {:error, {:reject, e}}} ->
        {:reject, e}
139

140
141
142
      {:transmogrifier, {:reject, e}} ->
        {:reject, e}

143
144
      {:transmogrifier, _} = e ->
        {:error, e}
145

Maksim's avatar
Maksim committed
146
      {:object, data, nil} ->
147
        reinject_object(%Object{}, data)
148

Maksim's avatar
Maksim committed
149
150
      {:normalize, object = %Object{}} ->
        {:ok, object}
151

Maksim's avatar
Maksim committed
152
153
      {:fetch_object, %Object{} = object} ->
        {:ok, object}
154

Steven Fuchs's avatar
Steven Fuchs committed
155
156
157
      {:fetch, {:error, error}} ->
        {:error, error}

158
159
      e ->
        e
160
    end
Maksim's avatar
Maksim committed
161
162
163
164
165
166
167
168
169
  end

  defp prepare_activity_params(data) do
    %{
      "type" => "Create",
      # Should we seriously keep this attributedTo thing?
      "actor" => data["actor"] || data["attributedTo"],
      "object" => data
    }
Haelwenn's avatar
Haelwenn committed
170
171
172
173
    |> Maps.put_if_present("to", data["to"])
    |> Maps.put_if_present("cc", data["cc"])
    |> Maps.put_if_present("bto", data["bto"])
    |> Maps.put_if_present("bcc", data["bcc"])
174
175
  end

176
177
  def fetch_object_from_id!(id, options \\ []) do
    with {:ok, object} <- fetch_object_from_id(id, options) do
178
179
      object
    else
Steven Fuchs's avatar
Steven Fuchs committed
180
181
182
      {:error, %Tesla.Mock.Error{}} ->
        nil

183
184
185
      {:error, "Object has been deleted"} ->
        nil

186
187
188
189
      {:reject, reason} ->
        Logger.info("Rejected #{id} while fetching: #{inspect(reason)}")
        nil

190
191
      e ->
        Logger.error("Error while fetching #{id}: #{inspect(e)}")
192
193
194
195
        nil
    end
  end

196
197
198
199
200
201
202
203
204
205
206
  defp make_signature(id, date) do
    uri = URI.parse(id)

    signature =
      InternalFetchActor.get_actor()
      |> Signature.sign(%{
        "(request-target)": "get #{uri.path}",
        host: uri.host,
        date: date
      })

207
    {"signature", signature}
208
209
210
211
  end

  defp sign_fetch(headers, id, date) do
    if Pleroma.Config.get([:activitypub, :sign_object_fetches]) do
212
      [make_signature(id, date) | headers]
213
214
215
216
217
218
219
    else
      headers
    end
  end

  defp maybe_date_fetch(headers, date) do
    if Pleroma.Config.get([:activitypub, :sign_object_fetches]) do
220
      [{"date", date} | headers]
221
222
223
224
225
    else
      headers
    end
  end

rinpatch's avatar
rinpatch committed
226
  def fetch_and_contain_remote_object_from_id(id)
227

rinpatch's avatar
rinpatch committed
228
229
  def fetch_and_contain_remote_object_from_id(%{"id" => id}),
    do: fetch_and_contain_remote_object_from_id(id)
230

rinpatch's avatar
rinpatch committed
231
  def fetch_and_contain_remote_object_from_id(id) when is_binary(id) do
minibikini's avatar
minibikini committed
232
    Logger.debug("Fetching object #{id} via AP")
233

234
    with {:scheme, true} <- {:scheme, String.starts_with?(id, "http")},
rinpatch's avatar
rinpatch committed
235
         {:ok, body} <- get_object(id),
236
237
         {:ok, data} <- safe_json_decode(body),
         :ok <- Containment.contain_origin_from_id(id, data) do
238
239
240
241
      if not Instances.reachable?(id) do
        Instances.set_reachable(id)
      end

242
243
244
245
246
247
248
249
250
251
252
253
254
      {:ok, data}
    else
      {:scheme, _} ->
        {:error, "Unsupported URI scheme"}

      {:error, e} ->
        {:error, e}

      e ->
        {:error, e}
    end
  end

rinpatch's avatar
rinpatch committed
255
  def fetch_and_contain_remote_object_from_id(_id),
256
257
    do: {:error, "id must be a string"}

rinpatch's avatar
rinpatch committed
258
  defp get_object(id) do
Maksim's avatar
Maksim committed
259
    date = Pleroma.Signature.signed_date()
260
261

    headers =
Alexander Strizhakov's avatar
Alexander Strizhakov committed
262
      [{"accept", "application/activity+json"}]
263
264
265
      |> maybe_date_fetch(date)
      |> sign_fetch(id, date)

266
    case HTTP.get(id, headers) do
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
      {:ok, %{body: body, status: code, headers: headers}} when code in 200..299 ->
        case List.keyfind(headers, "content-type", 0) do
          {_, content_type} ->
            case Plug.Conn.Utils.media_type(content_type) do
              {:ok, "application", "activity+json", _} ->
                {:ok, body}

              {:ok, "application", "ld+json",
               %{"profile" => "https://www.w3.org/ns/activitystreams"}} ->
                {:ok, body}

              _ ->
                {:error, {:content_type, content_type}}
            end

          _ ->
            {:error, {:content_type, nil}}
        end
285

minibikini's avatar
minibikini committed
286
      {:ok, %{status: code}} when code in [404, 410] ->
minibikini's avatar
minibikini committed
287
288
        {:error, "Object has been deleted"}

Steven Fuchs's avatar
Steven Fuchs committed
289
290
291
      {:error, e} ->
        {:error, e}

292
293
      e ->
        {:error, e}
294
295
    end
  end
296

297
298
  defp safe_json_decode(nil), do: {:ok, nil}
  defp safe_json_decode(json), do: Jason.decode(json)
299
end