Commit fe7fd331 authored by kaniini's avatar kaniini

Merge branch 'revert-4fabf83a' into 'develop'

Revert "Merge branch 'streamer-refactoring' into 'develop'"

See merge request !1674
parents 4fabf83a c623b432
Pipeline #17223 failed with stages
in 5 minutes and 30 seconds
......@@ -43,7 +43,3 @@ docs/generated_config.md
# Code test coverage
/cover
/Elixir.*.coverdata
.idea
pleroma.iml
......@@ -331,10 +331,6 @@
follow_handshake_timeout: 500,
sign_object_fetches: true
config :pleroma, :streamer,
workers: 3,
overflow_workers: 2
config :pleroma, :user, deny_follow_blocked: true
config :pleroma, :mrf_normalize_markup, scrub_policy: Pleroma.HTML.Scrubber.Default
......
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Activity.Ir.Topics do
alias Pleroma.Object
alias Pleroma.Web.ActivityPub.Visibility
def get_activity_topics(activity) do
activity
|> Object.normalize()
|> generate_topics(activity)
|> List.flatten()
end
defp generate_topics(%{data: %{"type" => "Answer"}}, _) do
[]
end
defp generate_topics(object, activity) do
["user", "list"] ++ visibility_tags(object, activity)
end
defp visibility_tags(object, activity) do
case Visibility.get_visibility(activity) do
"public" ->
if activity.local do
["public", "public:local"]
else
["public"]
end
|> item_creation_tags(object, activity)
"direct" ->
["direct"]
_ ->
[]
end
end
defp item_creation_tags(tags, %{data: %{"type" => "Create"}} = object, activity) do
tags ++ hashtags_to_topics(object) ++ attachment_topics(object, activity)
end
defp item_creation_tags(tags, _, _) do
tags
end
defp hashtags_to_topics(%{data: %{"tag" => tags}}) do
tags
|> Enum.filter(&is_bitstring(&1))
|> Enum.map(fn tag -> "hashtag:" <> tag end)
end
defp hashtags_to_topics(_), do: []
defp attachment_topics(%{data: %{"attachment" => []}}, _act), do: []
defp attachment_topics(_object, %{local: true}), do: ["public:media", "public:local:media"]
defp attachment_topics(_object, _act), do: ["public:media"]
end
......@@ -141,7 +141,7 @@ defp oauth_cleanup_enabled?,
defp streamer_child(:test), do: []
defp streamer_child(_) do
[Pleroma.Web.Streamer.supervisor()]
[Pleroma.Web.Streamer]
end
defp oauth_cleanup_child(true),
......
......@@ -210,10 +210,8 @@ def create_notification(%Activity{} = activity, %User{} = user) do
unless skip?(activity, user) do
notification = %Notification{user_id: user.id, activity: activity}
{:ok, notification} = Repo.insert(notification)
["user", "user:notification"]
|> Streamer.stream(notification)
Streamer.stream("user", notification)
Streamer.stream("user:notification", notification)
Push.send(notification)
notification
end
......
......@@ -4,7 +4,6 @@
defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.Activity
alias Pleroma.Activity.Ir.Topics
alias Pleroma.Config
alias Pleroma.Conversation
alias Pleroma.Notification
......@@ -17,7 +16,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.User
alias Pleroma.Web.ActivityPub.MRF
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.Streamer
alias Pleroma.Web.WebFinger
alias Pleroma.Workers.BackgroundWorker
......@@ -189,7 +187,9 @@ def stream_out_participations(participations) do
participations
|> Repo.preload(:user)
Streamer.stream("participation", participations)
Enum.each(participations, fn participation ->
Pleroma.Web.Streamer.stream("participation", participation)
end)
end
def stream_out_participations(%Object{data: %{"context" => context}}, user) do
......@@ -208,15 +208,41 @@ def stream_out_participations(%Object{data: %{"context" => context}}, user) do
def stream_out_participations(_, _), do: :noop
def stream_out(%Activity{data: %{"type" => data_type}} = activity)
when data_type in ["Create", "Announce", "Delete"] do
activity
|> Topics.get_activity_topics()
|> Streamer.stream(activity)
end
def stream_out(_activity) do
:noop
def stream_out(activity) do
if activity.data["type"] in ["Create", "Announce", "Delete"] do
object = Object.normalize(activity)
# Do not stream out poll replies
unless object.data["type"] == "Answer" do
Pleroma.Web.Streamer.stream("user", activity)
Pleroma.Web.Streamer.stream("list", activity)
if get_visibility(activity) == "public" do
Pleroma.Web.Streamer.stream("public", activity)
if activity.local do
Pleroma.Web.Streamer.stream("public:local", activity)
end
if activity.data["type"] in ["Create"] do
object.data
|> Map.get("tag", [])
|> Enum.filter(fn tag -> is_bitstring(tag) end)
|> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
if object.data["attachment"] != [] do
Pleroma.Web.Streamer.stream("public:media", activity)
if activity.local do
Pleroma.Web.Streamer.stream("public:local:media", activity)
end
end
end
else
if get_visibility(activity) == "direct",
do: Pleroma.Web.Streamer.stream("direct", activity)
end
end
end
end
def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
......
......@@ -8,7 +8,6 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.Web.OAuth.Token
alias Pleroma.Web.Streamer
@behaviour :cowboy_websocket
......@@ -25,7 +24,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
]
@anonymous_streams ["public", "public:local", "hashtag"]
# Handled by periodic keepalive in Pleroma.Web.Streamer.Ping.
# Handled by periodic keepalive in Pleroma.Web.Streamer.
@timeout :infinity
def init(%{qs: qs} = req, state) do
......@@ -66,7 +65,7 @@ def websocket_info(:subscribe, state) do
}, topic #{state.topic}"
)
Streamer.add_socket(state.topic, streamer_socket(state))
Pleroma.Web.Streamer.add_socket(state.topic, streamer_socket(state))
{:ok, state}
end
......@@ -81,7 +80,7 @@ def terminate(reason, _req, state) do
}, topic #{state.topic || "?"}: #{inspect(reason)}"
)
Streamer.remove_socket(state.topic, streamer_socket(state))
Pleroma.Web.Streamer.remove_socket(state.topic, streamer_socket(state))
:ok
end
......
defmodule Pleroma.Web.Streamer.Worker do
use GenServer
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Streamer do
use GenServer
require Logger
alias Pleroma.Activity
alias Pleroma.Config
alias Pleroma.Conversation.Participation
......@@ -12,63 +14,69 @@ defmodule Pleroma.Web.Streamer.Worker do
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.Streamer.State
alias Pleroma.Web.Streamer.StreamerSocket
alias Pleroma.Web.StreamerView
alias Pleroma.Web.MastodonAPI.NotificationView
@keepalive_interval :timer.seconds(30)
def start_link(_) do
GenServer.start_link(__MODULE__, %{}, [])
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def init(init_arg) do
{:ok, init_arg}
def add_socket(topic, socket) do
GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
end
def stream(pid, topics, items) do
GenServer.call(pid, {:stream, topics, items})
def remove_socket(topic, socket) do
GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
end
def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
Enum.each(topics, fn t ->
do_stream(%{topic: t, item: item})
end)
{:reply, state, state}
def stream(topic, item) do
GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
end
def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
Enum.each(items, fn i ->
do_stream(%{topic: topic, item: i})
end)
def init(args) do
Process.send_after(self(), %{action: :ping}, @keepalive_interval)
{:reply, state, state}
{:ok, args}
end
def handle_call({:stream, topic, item}, _from, state) do
do_stream(%{topic: topic, item: item})
def handle_info(%{action: :ping}, topics) do
topics
|> Map.values()
|> List.flatten()
|> Enum.each(fn socket ->
Logger.debug("Sending keepalive ping")
send(socket.transport_pid, {:text, ""})
end)
Process.send_after(self(), %{action: :ping}, @keepalive_interval)
{:reply, state, state}
{:noreply, topics}
end
defp do_stream(%{topic: "direct", item: item}) do
def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do
recipient_topics =
User.get_recipients_from_activity(item)
|> Enum.map(fn %{id: id} -> "direct:#{id}" end)
Enum.each(recipient_topics, fn user_topic ->
Enum.each(recipient_topics || [], fn user_topic ->
Logger.debug("Trying to push direct message to #{user_topic}\n\n")
push_to_socket(State.get_sockets(), user_topic, item)
push_to_socket(topics, user_topic, item)
end)
{:noreply, topics}
end
defp do_stream(%{topic: "participation", item: participation}) do
def handle_cast(%{action: :stream, topic: "participation", item: participation}, topics) do
user_topic = "direct:#{participation.user_id}"
Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
push_to_socket(State.get_sockets(), user_topic, participation)
push_to_socket(topics, user_topic, participation)
{:noreply, topics}
end
defp do_stream(%{topic: "list", item: item}) do
def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do
# filter the recipient list if the activity is not public, see #270.
recipient_lists =
case Visibility.is_public?(item) do
......@@ -88,25 +96,35 @@ defp do_stream(%{topic: "list", item: item}) do
recipient_lists
|> Enum.map(fn %{id: id} -> "list:#{id}" end)
Enum.each(recipient_topics, fn list_topic ->
Enum.each(recipient_topics || [], fn list_topic ->
Logger.debug("Trying to push message to #{list_topic}\n\n")
push_to_socket(State.get_sockets(), list_topic, item)
push_to_socket(topics, list_topic, item)
end)
{:noreply, topics}
end
defp do_stream(%{topic: topic, item: %Notification{} = item})
when topic in ["user", "user:notification"] do
State.get_sockets()
def handle_cast(
%{action: :stream, topic: topic, item: %Notification{} = item},
topics
)
when topic in ["user", "user:notification"] do
topics
|> Map.get("#{topic}:#{item.user_id}", [])
|> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
|> Enum.each(fn socket ->
with %User{} = user <- User.get_cached_by_ap_id(socket.assigns[:user].ap_id),
true <- should_send?(user, item) do
send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
send(
socket.transport_pid,
{:text, represent_notification(socket.assigns[:user], item)}
)
end
end)
{:noreply, topics}
end
defp do_stream(%{topic: "user", item: item}) do
def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
Logger.debug("Trying to push to users")
recipient_topics =
......@@ -114,14 +132,94 @@ defp do_stream(%{topic: "user", item: item}) do
|> Enum.map(fn %{id: id} -> "user:#{id}" end)
Enum.each(recipient_topics, fn topic ->
push_to_socket(State.get_sockets(), topic, item)
push_to_socket(topics, topic, item)
end)
{:noreply, topics}
end
defp do_stream(%{topic: topic, item: item}) do
def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
Logger.debug("Trying to push to #{topic}")
Logger.debug("Pushing item to #{topic}")
push_to_socket(State.get_sockets(), topic, item)
push_to_socket(topics, topic, item)
{:noreply, topics}
end
def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
topic = internal_topic(topic, socket)
sockets_for_topic = sockets[topic] || []
sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
sockets = Map.put(sockets, topic, sockets_for_topic)
Logger.debug("Got new conn for #{topic}")
{:noreply, sockets}
end
def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
topic = internal_topic(topic, socket)
sockets_for_topic = sockets[topic] || []
sockets_for_topic = List.delete(sockets_for_topic, socket)
sockets = Map.put(sockets, topic, sockets_for_topic)
Logger.debug("Removed conn for #{topic}")
{:noreply, sockets}
end
def handle_cast(m, state) do
Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}")
{:noreply, state}
end
defp represent_update(%Activity{} = activity, %User{} = user) do
%{
event: "update",
payload:
Pleroma.Web.MastodonAPI.StatusView.render(
"status.json",
activity: activity,
for: user
)
|> Jason.encode!()
}
|> Jason.encode!()
end
defp represent_update(%Activity{} = activity) do
%{
event: "update",
payload:
Pleroma.Web.MastodonAPI.StatusView.render(
"status.json",
activity: activity
)
|> Jason.encode!()
}
|> Jason.encode!()
end
def represent_conversation(%Participation{} = participation) do
%{
event: "conversation",
payload:
Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
participation: participation,
for: participation.user
})
|> Jason.encode!()
}
|> Jason.encode!()
end
@spec represent_notification(User.t(), Notification.t()) :: binary()
defp represent_notification(%User{} = user, %Notification{} = notify) do
%{
event: "notification",
payload:
NotificationView.render(
"show.json",
%{notification: notify, for: user}
)
|> Jason.encode!()
}
|> Jason.encode!()
end
defp should_send?(%User{} = user, %Activity{} = item) do
......@@ -150,35 +248,32 @@ defp should_send?(%User{} = user, %Notification{activity: activity}) do
end
def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
Enum.each(topics[topic] || [], fn %StreamerSocket{
transport_pid: transport_pid,
user: socket_user
} ->
Enum.each(topics[topic] || [], fn socket ->
# Get the current user so we have up-to-date blocks etc.
if socket_user do
user = User.get_cached_by_ap_id(socket_user.ap_id)
if socket.assigns[:user] do
user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
if should_send?(user, item) do
send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
send(socket.transport_pid, {:text, represent_update(item, user)})
end
else
send(transport_pid, {:text, StreamerView.render("update.json", item)})
send(socket.transport_pid, {:text, represent_update(item)})
end
end)
end
def push_to_socket(topics, topic, %Participation{} = participation) do
Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
Enum.each(topics[topic] || [], fn socket ->
send(socket.transport_pid, {:text, represent_conversation(participation)})
end)
end
def push_to_socket(topics, topic, %Activity{
data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
}) do
Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
Enum.each(topics[topic] || [], fn socket ->
send(
transport_pid,
socket.transport_pid,
{:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
)
end)
......@@ -187,26 +282,29 @@ def push_to_socket(topics, topic, %Activity{
def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
def push_to_socket(topics, topic, item) do
Enum.each(topics[topic] || [], fn %StreamerSocket{
transport_pid: transport_pid,
user: socket_user
} ->
Enum.each(topics[topic] || [], fn socket ->
# Get the current user so we have up-to-date blocks etc.
if socket_user do
user = User.get_cached_by_ap_id(socket_user.ap_id)
if socket.assigns[:user] do
user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
blocks = user.info.blocks || []
mutes = user.info.mutes || []
with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)),
true <- thread_containment(item, user) do
send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
send(socket.transport_pid, {:text, represent_update(item, user)})
end
else
send(transport_pid, {:text, StreamerView.render("update.json", item)})
send(socket.transport_pid, {:text, represent_update(item)})
end
end)
end
defp internal_topic(topic, socket) when topic in ~w[user user:notification direct] do
"#{topic}:#{socket.assigns[:user].id}"
end
defp internal_topic(topic, _), do: topic
@spec thread_containment(Activity.t(), User.t()) :: boolean()
defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true
......
defmodule Pleroma.Web.Streamer.Ping do
use GenServer
require Logger
alias Pleroma.Web.Streamer.State
alias Pleroma.Web.Streamer.StreamerSocket
@keepalive_interval :timer.seconds(30)
def start_link(opts) do
ping_interval = Keyword.get(opts, :ping_interval, @keepalive_interval)
GenServer.start_link(__MODULE__, %{ping_interval: ping_interval}, name: __MODULE__)
end
def init(%{ping_interval: ping_interval} = args) do
Process.send_after(self(), :ping, ping_interval)
{:ok, args}
end
def handle_info(:ping, %{ping_interval: ping_interval} = state) do
State.get_sockets()
|> Map.values()
|> List.flatten()
|> Enum.each(fn %StreamerSocket{transport_pid: transport_pid} ->
Logger.debug("Sending keepalive ping")
send(transport_pid, {:text, ""})
end)
Process.send_after(self(), :ping, ping_interval)
{:noreply, state}
end
end
defmodule Pleroma.Web.Streamer.State do
use GenServer
require Logger
alias Pleroma.Web.Streamer.StreamerSocket
def start_link(_) do
GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__)
end
def add_socket(topic, socket) do
GenServer.call(__MODULE__, {:add, socket, topic})
end
def remove_socket(topic, socket) do
GenServer.call(__MODULE__, {:remove, socket, topic})
end
def get_sockets do
%{sockets: stream_sockets}