fetcher.ex 7.28 KB
Newer Older
1
# Pleroma: A lightweight social networking server
2
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
3
4
# SPDX-License-Identifier: AGPL-3.0-only

5
defmodule Pleroma.Object.Fetcher do
kaniini's avatar
kaniini committed
6
  alias Pleroma.HTTP
7
  alias Pleroma.Object
8
  alias Pleroma.Object.Containment
rinpatch's avatar
rinpatch committed
9
  alias Pleroma.Repo
10
11
  alias Pleroma.Signature
  alias Pleroma.Web.ActivityPub.InternalFetchActor
12
  alias Pleroma.Web.ActivityPub.ObjectValidator
13
  alias Pleroma.Web.ActivityPub.Transmogrifier
14
  alias Pleroma.Web.Federator
15
16

  require Logger
17
  require Pleroma.Constants
18

19
20
21
22
23
24
25
26
  defp touch_changeset(changeset) do
    updated_at =
      NaiveDateTime.utc_now()
      |> NaiveDateTime.truncate(:second)

    Ecto.Changeset.put_change(changeset, :updated_at, updated_at)
  end

27
  defp maybe_reinject_internal_fields(%{data: %{} = old_data}, new_data) do
28
29
    internal_fields = Map.take(old_data, Pleroma.Constants.object_internal_fields())

30
    Map.merge(new_data, internal_fields)
31
32
  end

33
  defp maybe_reinject_internal_fields(_, new_data), do: new_data
34

35
  @spec reinject_object(struct(), map()) :: {:ok, Object.t()} | {:error, any()}
36
37
38
  defp reinject_object(%Object{data: %{"type" => "Question"}} = object, new_data) do
    Logger.debug("Reinjecting object #{new_data["id"]}")

39
    with data <- maybe_reinject_internal_fields(object, new_data),
40
41
42
43
44
45
46
47
48
49
50
51
52
         {:ok, data, _} <- ObjectValidator.validate(data, %{}),
         changeset <- Object.change(object, %{data: data}),
         changeset <- touch_changeset(changeset),
         {:ok, object} <- Repo.insert_or_update(changeset),
         {:ok, object} <- Object.set_cache(object) do
      {:ok, object}
    else
      e ->
        Logger.error("Error while processing object: #{inspect(e)}")
        {:error, e}
    end
  end

53
54
  defp reinject_object(%Object{} = object, new_data) do
    Logger.debug("Reinjecting object #{new_data["id"]}")
55

56
57
58
    with new_data <- Transmogrifier.fix_object(new_data),
         data <- maybe_reinject_internal_fields(object, new_data),
         changeset <- Object.change(object, %{data: data}),
59
         changeset <- touch_changeset(changeset),
60
61
         {:ok, object} <- Repo.insert_or_update(changeset),
         {:ok, object} <- Object.set_cache(object) do
62
63
64
65
66
67
68
69
      {:ok, object}
    else
      e ->
        Logger.error("Error while processing object: #{inspect(e)}")
        {:error, e}
    end
  end

rinpatch's avatar
rinpatch committed
70
  def refetch_object(%Object{data: %{"id" => id}} = object) do
71
    with {:local, false} <- {:local, Object.local?(object)},
72
73
         {:ok, new_data} <- fetch_and_contain_remote_object_from_id(id),
         {:ok, object} <- reinject_object(object, new_data) do
rinpatch's avatar
rinpatch committed
74
75
      {:ok, object}
    else
76
      {:local, true} -> {:ok, object}
rinpatch's avatar
rinpatch committed
77
78
79
80
      e -> {:error, e}
    end
  end

81
  # Note: will create a Create activity, which we need internally at the moment.
82
  def fetch_object_from_id(id, options \\ []) do
83
84
85
86
    with {_, nil} <- {:fetch_object, Object.get_cached_by_ap_id(id)},
         {_, true} <- {:allowed_depth, Federator.allowed_thread_distance?(options[:depth])},
         {_, {:ok, data}} <- {:fetch, fetch_and_contain_remote_object_from_id(id)},
         {_, nil} <- {:normalize, Object.normalize(data, false)},
Maksim's avatar
Maksim committed
87
         params <- prepare_activity_params(data),
88
89
         {_, :ok} <- {:containment, Containment.contain_origin(id, params)},
         {_, {:ok, activity}} <-
kaniini's avatar
kaniini committed
90
           {:transmogrifier, Transmogrifier.handle_incoming(params, options)},
91
         {_, _data, %Object{} = object} <-
Maksim's avatar
Maksim committed
92
           {:object, data, Object.normalize(activity, false)} do
93
94
      {:ok, object}
    else
95
96
97
      {:allowed_depth, false} ->
        {:error, "Max thread distance exceeded."}

Maksim's avatar
Maksim committed
98
99
      {:containment, _} ->
        {:error, "Object containment failed."}
100

101
102
      {:transmogrifier, {:error, {:reject, e}}} ->
        {:reject, e}
103

104
105
      {:transmogrifier, _} = e ->
        {:error, e}
106

Maksim's avatar
Maksim committed
107
      {:object, data, nil} ->
108
        reinject_object(%Object{}, data)
109

Maksim's avatar
Maksim committed
110
111
      {:normalize, object = %Object{}} ->
        {:ok, object}
112

Maksim's avatar
Maksim committed
113
114
      {:fetch_object, %Object{} = object} ->
        {:ok, object}
115

Steven Fuchs's avatar
Steven Fuchs committed
116
117
118
      {:fetch, {:error, error}} ->
        {:error, error}

119
120
      e ->
        e
121
    end
Maksim's avatar
Maksim committed
122
123
124
125
126
  end

  defp prepare_activity_params(data) do
    %{
      "type" => "Create",
127
128
      "to" => data["to"] || [],
      "cc" => data["cc"] || [],
Maksim's avatar
Maksim committed
129
130
131
132
      # Should we seriously keep this attributedTo thing?
      "actor" => data["actor"] || data["attributedTo"],
      "object" => data
    }
133
134
  end

135
136
  def fetch_object_from_id!(id, options \\ []) do
    with {:ok, object} <- fetch_object_from_id(id, options) do
137
138
      object
    else
Steven Fuchs's avatar
Steven Fuchs committed
139
140
141
      {:error, %Tesla.Mock.Error{}} ->
        nil

142
143
144
      {:error, "Object has been deleted"} ->
        nil

145
146
147
148
      {:reject, reason} ->
        Logger.info("Rejected #{id} while fetching: #{inspect(reason)}")
        nil

149
150
      e ->
        Logger.error("Error while fetching #{id}: #{inspect(e)}")
151
152
153
154
        nil
    end
  end

155
156
157
158
159
160
161
162
163
164
165
  defp make_signature(id, date) do
    uri = URI.parse(id)

    signature =
      InternalFetchActor.get_actor()
      |> Signature.sign(%{
        "(request-target)": "get #{uri.path}",
        host: uri.host,
        date: date
      })

166
    {"signature", signature}
167
168
169
170
  end

  defp sign_fetch(headers, id, date) do
    if Pleroma.Config.get([:activitypub, :sign_object_fetches]) do
171
      [make_signature(id, date) | headers]
172
173
174
175
176
177
178
    else
      headers
    end
  end

  defp maybe_date_fetch(headers, date) do
    if Pleroma.Config.get([:activitypub, :sign_object_fetches]) do
179
      [{"date", date} | headers]
180
181
182
183
184
    else
      headers
    end
  end

rinpatch's avatar
rinpatch committed
185
  def fetch_and_contain_remote_object_from_id(id)
186

rinpatch's avatar
rinpatch committed
187
188
  def fetch_and_contain_remote_object_from_id(%{"id" => id}),
    do: fetch_and_contain_remote_object_from_id(id)
189

rinpatch's avatar
rinpatch committed
190
  def fetch_and_contain_remote_object_from_id(id) when is_binary(id) do
minibikini's avatar
minibikini committed
191
    Logger.debug("Fetching object #{id} via AP")
192

193
    with {:scheme, true} <- {:scheme, String.starts_with?(id, "http")},
rinpatch's avatar
rinpatch committed
194
         {:ok, body} <- get_object(id),
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
         {:ok, data} <- safe_json_decode(body),
         :ok <- Containment.contain_origin_from_id(id, data) do
      {:ok, data}
    else
      {:scheme, _} ->
        {:error, "Unsupported URI scheme"}

      {:error, e} ->
        {:error, e}

      e ->
        {:error, e}
    end
  end

rinpatch's avatar
rinpatch committed
210
  def fetch_and_contain_remote_object_from_id(_id),
211
212
    do: {:error, "id must be a string"}

rinpatch's avatar
rinpatch committed
213
  defp get_object(id) do
Maksim's avatar
Maksim committed
214
    date = Pleroma.Signature.signed_date()
215
216

    headers =
Alexander Strizhakov's avatar
Alexander Strizhakov committed
217
      [{"accept", "application/activity+json"}]
218
219
220
      |> maybe_date_fetch(date)
      |> sign_fetch(id, date)

221
    case HTTP.get(id, headers) do
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
      {:ok, %{body: body, status: code, headers: headers}} when code in 200..299 ->
        case List.keyfind(headers, "content-type", 0) do
          {_, content_type} ->
            case Plug.Conn.Utils.media_type(content_type) do
              {:ok, "application", "activity+json", _} ->
                {:ok, body}

              {:ok, "application", "ld+json",
               %{"profile" => "https://www.w3.org/ns/activitystreams"}} ->
                {:ok, body}

              _ ->
                {:error, {:content_type, content_type}}
            end

          _ ->
            {:error, {:content_type, nil}}
        end
240

minibikini's avatar
minibikini committed
241
      {:ok, %{status: code}} when code in [404, 410] ->
minibikini's avatar
minibikini committed
242
243
        {:error, "Object has been deleted"}

Steven Fuchs's avatar
Steven Fuchs committed
244
245
246
      {:error, e} ->
        {:error, e}

247
248
      e ->
        {:error, e}
249
250
    end
  end
251

252
253
  defp safe_json_decode(nil), do: {:ok, nil}
  defp safe_json_decode(json), do: Jason.decode(json)
254
end