From 1ac180ca05609948650c954b6da37f2d9be09ffe Mon Sep 17 00:00:00 2001 From: stwf Date: Tue, 10 Sep 2019 11:53:49 -0400 Subject: [PATCH 1/7] lessen casting --- .gitignore | 4 + lib/pleroma/activity.ex | 2 + lib/pleroma/application.ex | 2 +- lib/pleroma/notification.ex | 2 +- lib/pleroma/web/activity_pub/activity_pub.ex | 19 +-- .../web/mastodon_api/websocket_handler.ex | 7 +- lib/pleroma_web/streamer/ping.ex | 33 +++++ lib/pleroma_web/streamer/state.ex | 94 +++++++++++++ .../web => pleroma_web/streamer}/streamer.ex | 133 +++++++----------- lib/pleroma_web/streamer/streamer_socket.ex | 31 ++++ lib/pleroma_web/streamer/supervisor.ex | 18 +++ test/integration/mastodon_websocket_test.exs | 11 +- test/notification_test.exs | 10 +- test/notification_test.exs.rej | 10 ++ test/pleroma_web/streamer/ping_test.exs | 38 +++++ test/pleroma_web/streamer/state_test.exs | 57 ++++++++ .../streamer}/streamer_test.exs | 116 ++++++--------- test/web/activity_pub/activity_pub_test.exs | 4 +- 18 files changed, 401 insertions(+), 190 deletions(-) create mode 100644 lib/pleroma_web/streamer/ping.ex create mode 100644 lib/pleroma_web/streamer/state.ex rename lib/{pleroma/web => pleroma_web/streamer}/streamer.ex (68%) create mode 100644 lib/pleroma_web/streamer/streamer_socket.ex create mode 100644 lib/pleroma_web/streamer/supervisor.ex create mode 100644 test/notification_test.exs.rej create mode 100644 test/pleroma_web/streamer/ping_test.exs create mode 100644 test/pleroma_web/streamer/state_test.exs rename test/{web => pleroma_web/streamer}/streamer_test.exs (86%) diff --git a/.gitignore b/.gitignore index 9591f99763..e9700923ca 100644 --- a/.gitignore +++ b/.gitignore @@ -42,3 +42,7 @@ erl_crash.dump # Code test coverage /cover /Elixir.*.coverdata + +.idea +pleroma.iml + diff --git a/lib/pleroma/activity.ex b/lib/pleroma/activity.ex index 6a51d4cf36..3e7362bd69 100644 --- a/lib/pleroma/activity.ex +++ b/lib/pleroma/activity.ex @@ -157,6 +157,8 @@ def get_by_id(id) do |> Repo.one() end + def get_by_id_with_object(nil), do: nil + def get_by_id_with_object(id) do from(activity in Activity, where: activity.id == ^id, diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 1d46925f80..4999911e71 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -142,7 +142,7 @@ defp oauth_cleanup_enabled?, defp streamer_child(:test), do: [] defp streamer_child(_) do - [Pleroma.Web.Streamer] + [PleromaWeb.Streamer] end defp oauth_cleanup_child(true), diff --git a/lib/pleroma/notification.ex b/lib/pleroma/notification.ex index b7c880c515..4f16962bde 100644 --- a/lib/pleroma/notification.ex +++ b/lib/pleroma/notification.ex @@ -13,7 +13,7 @@ defmodule Pleroma.Notification do alias Pleroma.User alias Pleroma.Web.CommonAPI.Utils alias Pleroma.Web.Push - alias Pleroma.Web.Streamer + alias PleromaWeb.Streamer import Ecto.Query import Ecto.Changeset diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index d23ec66ac4..61d0145b11 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -17,6 +17,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do alias Pleroma.Web.ActivityPub.MRF alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.WebFinger + alias PleromaWeb.Streamer import Ecto.Query import Pleroma.Web.ActivityPub.Utils @@ -187,7 +188,7 @@ def stream_out_participations(participations) do |> Repo.preload(:user) Enum.each(participations, fn participation -> - Pleroma.Web.Streamer.stream("participation", participation) + Streamer.stream("participation", participation) end) end @@ -212,33 +213,33 @@ def stream_out(activity) do object = Object.normalize(activity) # Do not stream out poll replies unless object.data["type"] == "Answer" do - Pleroma.Web.Streamer.stream("user", activity) - Pleroma.Web.Streamer.stream("list", activity) + Streamer.stream("user", activity) + Streamer.stream("list", activity) if get_visibility(activity) == "public" do - Pleroma.Web.Streamer.stream("public", activity) + Streamer.stream("public", activity) if activity.local do - Pleroma.Web.Streamer.stream("public:local", activity) + Streamer.stream("public:local", activity) end if activity.data["type"] in ["Create"] do object.data |> Map.get("tag", []) |> Enum.filter(fn tag -> is_bitstring(tag) end) - |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end) + |> Enum.each(fn tag -> Streamer.stream("hashtag:" <> tag, activity) end) if object.data["attachment"] != [] do - Pleroma.Web.Streamer.stream("public:media", activity) + Streamer.stream("public:media", activity) if activity.local do - Pleroma.Web.Streamer.stream("public:local:media", activity) + Streamer.stream("public:local:media", activity) end end end else if get_visibility(activity) == "direct", - do: Pleroma.Web.Streamer.stream("direct", activity) + do: Streamer.stream("direct", activity) end end end diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex index dbd3542ead..c32246497e 100644 --- a/lib/pleroma/web/mastodon_api/websocket_handler.ex +++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex @@ -8,6 +8,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do alias Pleroma.Repo alias Pleroma.User alias Pleroma.Web.OAuth.Token + alias PleromaWeb.Streamer @behaviour :cowboy_websocket @@ -24,7 +25,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do ] @anonymous_streams ["public", "public:local", "hashtag"] - # Handled by periodic keepalive in Pleroma.Web.Streamer. + # Handled by periodic keepalive in PleromaWeb.Streamer. @timeout :infinity def init(%{qs: qs} = req, state) do @@ -65,7 +66,7 @@ def websocket_info(:subscribe, state) do }, topic #{state.topic}" ) - Pleroma.Web.Streamer.add_socket(state.topic, streamer_socket(state)) + Streamer.add_socket(state.topic, streamer_socket(state)) {:ok, state} end @@ -80,7 +81,7 @@ def terminate(reason, _req, state) do }, topic #{state.topic || "?"}: #{inspect(reason)}" ) - Pleroma.Web.Streamer.remove_socket(state.topic, streamer_socket(state)) + Streamer.remove_socket(state.topic, streamer_socket(state)) :ok end diff --git a/lib/pleroma_web/streamer/ping.ex b/lib/pleroma_web/streamer/ping.ex new file mode 100644 index 0000000000..f3f9c7a0b9 --- /dev/null +++ b/lib/pleroma_web/streamer/ping.ex @@ -0,0 +1,33 @@ +defmodule PleromaWeb.Streamer.Ping do + use GenServer + require Logger + + alias PleromaWeb.Streamer.State + alias PleromaWeb.Streamer.StreamerSocket + + @keepalive_interval :timer.seconds(30) + + def start_link(opts) do + ping_interval = Keyword.get(opts, :ping_interval, @keepalive_interval) + GenServer.start_link(__MODULE__, %{ping_interval: ping_interval}, name: __MODULE__) + end + + def init(%{ping_interval: ping_interval} = args) do + Process.send_after(self(), :ping, ping_interval) + {:ok, args} + end + + def handle_info(:ping, %{ping_interval: ping_interval} = state) do + State.get_sockets() + |> Map.values() + |> List.flatten() + |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid} -> + Logger.debug("Sending keepalive ping") + send(transport_pid, {:text, ""}) + end) + + Process.send_after(self(), :ping, ping_interval) + + {:noreply, state} + end +end diff --git a/lib/pleroma_web/streamer/state.ex b/lib/pleroma_web/streamer/state.ex new file mode 100644 index 0000000000..04f66074a1 --- /dev/null +++ b/lib/pleroma_web/streamer/state.ex @@ -0,0 +1,94 @@ +defmodule PleromaWeb.Streamer.State do + use GenServer + require Logger + + alias PleromaWeb.Streamer.StreamerSocket + + def start_link(_) do + GenServer.start_link(__MODULE__, %{sockets: %{}, streams: []}, name: __MODULE__) + end + + def add_socket(topic, socket) do + GenServer.call(__MODULE__, %{action: :add, socket: socket, topic: topic}) + end + + def remove_socket(topic, socket) do + GenServer.call(__MODULE__, %{action: :remove, socket: socket, topic: topic}) + end + + def get_sockets() do + %{sockets: stream_sockets} = GenServer.call(__MODULE__, :get_state) + stream_sockets + end + + def add_stream_item(topic, item) do + GenServer.call(__MODULE__, %{action: :add, item: item, topic: topic}) + end + + + + def init(init_arg) do + {:ok, init_arg} + end + + def handle_call(:get_state, _from, state) do + {:reply, state, state} + end + + def handle_call( + %{action: :add, topic: topic, socket: socket}, + _from, + %{sockets: sockets} = state + ) do + internal_topic = internal_topic(topic, socket) + stream_socket = StreamerSocket.from_socket(socket) + + sockets_for_topic = + sockets + |> Map.get(internal_topic, []) + |> List.insert_at(0, stream_socket) + |> Enum.uniq() + + state = Kernel.put_in(state, [:sockets, internal_topic], sockets_for_topic) + Logger.debug("Got new conn for #{topic}") + {:reply, state, state} + end + + def handle_call( + %{action: :add, topic: topic, item: _item}, + _from, + %{streams: _streams} = state + ) do + + + state = Map.put(state, :streams, []) + Logger.debug("Got new conn for #{topic}") + {:reply, state, state} + end + + def handle_call( + %{action: :remove, topic: topic, socket: socket}, + _from, + %{sockets: sockets} = state + ) do + internal_topic = internal_topic(topic, socket) + stream_socket = PleromaWeb.Streamer.StreamerSocket.from_socket(socket) + + sockets_for_topic = + sockets + |> Map.get(internal_topic, []) + |> List.delete(stream_socket) + + state = Kernel.put_in(state, [:sockets, internal_topic], sockets_for_topic) + {:reply, state, state} + end + + defp internal_topic(topic, socket) + when topic in ~w[user user:notification direct] do + "#{topic}:#{socket.assigns[:user].id}" + end + + defp internal_topic(topic, _) do + topic + end +end diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma_web/streamer/streamer.ex similarity index 68% rename from lib/pleroma/web/streamer.ex rename to lib/pleroma_web/streamer/streamer.ex index 587c43f401..71f5b67fd5 100644 --- a/lib/pleroma/web/streamer.ex +++ b/lib/pleroma_web/streamer/streamer.ex @@ -2,7 +2,7 @@ # Copyright © 2017-2019 Pleroma Authors # SPDX-License-Identifier: AGPL-3.0-only -defmodule Pleroma.Web.Streamer do +defmodule PleromaWeb.Streamer do use GenServer require Logger alias Pleroma.Activity @@ -15,68 +15,60 @@ defmodule Pleroma.Web.Streamer do alias Pleroma.Web.ActivityPub.Visibility alias Pleroma.Web.CommonAPI alias Pleroma.Web.MastodonAPI.NotificationView - - @keepalive_interval :timer.seconds(30) + alias PleromaWeb.Streamer.State + alias PleromaWeb.Streamer.StreamerSocket def start_link(_) do + IO.puts("inside start_link StreamerStreamerStreamerStreamer") GenServer.start_link(__MODULE__, %{}, name: __MODULE__) end def add_socket(topic, socket) do - GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic}) + State.add_socket(topic, socket) end def remove_socket(topic, socket) do - GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic}) + State.remove_socket(topic, socket) + end + + def get_sockets() do + State.get_sockets() end def stream(topic, item) do GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item}) end - def init(args) do - Process.send_after(self(), %{action: :ping}, @keepalive_interval) + def supervisor, do: PleromaWeb.Streamer.Supervisor + def init(args) do + IO.puts("inside init StreamerStreamerStreamerStreamer") {:ok, args} end - def handle_info(%{action: :ping}, topics) do - topics - |> Map.values() - |> List.flatten() - |> Enum.each(fn socket -> - Logger.debug("Sending keepalive ping") - send(socket.transport_pid, {:text, ""}) - end) - - Process.send_after(self(), %{action: :ping}, @keepalive_interval) - - {:noreply, topics} - end - - def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do + def handle_cast(%{action: :stream, topic: "direct", item: item}, state) do recipient_topics = User.get_recipients_from_activity(item) |> Enum.map(fn %{id: id} -> "direct:#{id}" end) Enum.each(recipient_topics || [], fn user_topic -> Logger.debug("Trying to push direct message to #{user_topic}\n\n") - push_to_socket(topics, user_topic, item) + push_to_socket(State.get_sockets(), user_topic, item) end) - {:noreply, topics} + {:noreply, state} end - def handle_cast(%{action: :stream, topic: "participation", item: participation}, topics) do + def handle_cast(%{action: :stream, topic: "participation", item: participation}, state) do user_topic = "direct:#{participation.user_id}" Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n") - push_to_socket(topics, user_topic, participation) + push_to_socket(State.get_sockets(), user_topic, participation) - {:noreply, topics} + {:noreply, state} end - def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do + def handle_cast(%{action: :stream, topic: "list", item: item}, state) do # filter the recipient list if the activity is not public, see #270. recipient_lists = case Visibility.is_public?(item) do @@ -98,33 +90,30 @@ def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do Enum.each(recipient_topics || [], fn list_topic -> Logger.debug("Trying to push message to #{list_topic}\n\n") - push_to_socket(topics, list_topic, item) + push_to_socket(State.get_sockets(), list_topic, item) end) - {:noreply, topics} + {:noreply, state} end def handle_cast( %{action: :stream, topic: topic, item: %Notification{} = item}, - topics + state ) when topic in ["user", "user:notification"] do - topics + State.get_sockets() |> Map.get("#{topic}:#{item.user_id}", []) - |> Enum.each(fn socket -> - with %User{} = user <- User.get_cached_by_ap_id(socket.assigns[:user].ap_id), + |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} -> + with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id), true <- should_send?(user, item) do - send( - socket.transport_pid, - {:text, represent_notification(socket.assigns[:user], item)} - ) + send(transport_pid, {:text, represent_notification(socket_user, item)}) end end) - {:noreply, topics} + {:noreply, state} end - def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do + def handle_cast(%{action: :stream, topic: "user", item: item}, state) do Logger.debug("Trying to push to users") recipient_topics = @@ -132,35 +121,17 @@ def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do |> Enum.map(fn %{id: id} -> "user:#{id}" end) Enum.each(recipient_topics, fn topic -> - push_to_socket(topics, topic, item) + push_to_socket(State.get_sockets(), topic, item) end) - {:noreply, topics} + {:noreply, state} end - def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do + def handle_cast(%{action: :stream, topic: topic, item: item}, state) do Logger.debug("Trying to push to #{topic}") Logger.debug("Pushing item to #{topic}") - push_to_socket(topics, topic, item) - {:noreply, topics} - end - - def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do - topic = internal_topic(topic, socket) - sockets_for_topic = sockets[topic] || [] - sockets_for_topic = Enum.uniq([socket | sockets_for_topic]) - sockets = Map.put(sockets, topic, sockets_for_topic) - Logger.debug("Got new conn for #{topic}") - {:noreply, sockets} - end - - def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do - topic = internal_topic(topic, socket) - sockets_for_topic = sockets[topic] || [] - sockets_for_topic = List.delete(sockets_for_topic, socket) - sockets = Map.put(sockets, topic, sockets_for_topic) - Logger.debug("Removed conn for #{topic}") - {:noreply, sockets} + push_to_socket(State.get_sockets(), topic, item) + {:noreply, state} end def handle_cast(m, state) do @@ -236,7 +207,7 @@ defp should_send?(%User{} = user, %Activity{} = item) do false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host), false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host), true <- thread_containment(item, user), - false <- CommonAPI.thread_muted?(user, item) do + false <- CommonAPI.thread_muted?(user, item) do true else _ -> false @@ -248,32 +219,32 @@ defp should_send?(%User{} = user, %Notification{activity: activity}) do end def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do - Enum.each(topics[topic] || [], fn socket -> + Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} -> # Get the current user so we have up-to-date blocks etc. - if socket.assigns[:user] do - user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id) + if socket_user do + user = User.get_cached_by_ap_id(socket_user.ap_id) if should_send?(user, item) do - send(socket.transport_pid, {:text, represent_update(item, user)}) + send(transport_pid, {:text, represent_update(item, user)}) end else - send(socket.transport_pid, {:text, represent_update(item)}) + send(transport_pid, {:text, represent_update(item)}) end end) end def push_to_socket(topics, topic, %Participation{} = participation) do - Enum.each(topics[topic] || [], fn socket -> - send(socket.transport_pid, {:text, represent_conversation(participation)}) + Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} -> + send(transport_pid, {:text, represent_conversation(participation)}) end) end def push_to_socket(topics, topic, %Activity{ data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id} }) do - Enum.each(topics[topic] || [], fn socket -> + Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} -> send( - socket.transport_pid, + transport_pid, {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()} ) end) @@ -282,29 +253,23 @@ def push_to_socket(topics, topic, %Activity{ def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop def push_to_socket(topics, topic, item) do - Enum.each(topics[topic] || [], fn socket -> + Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} -> # Get the current user so we have up-to-date blocks etc. - if socket.assigns[:user] do - user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id) + if socket_user do + user = User.get_cached_by_ap_id(socket_user.ap_id) blocks = user.info.blocks || [] mutes = user.info.mutes || [] with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)), true <- thread_containment(item, user) do - send(socket.transport_pid, {:text, represent_update(item, user)}) + send(transport_pid, {:text, represent_update(item, user)}) end else - send(socket.transport_pid, {:text, represent_update(item)}) + send(transport_pid, {:text, represent_update(item)}) end end) end - defp internal_topic(topic, socket) when topic in ~w[user user:notification direct] do - "#{topic}:#{socket.assigns[:user].id}" - end - - defp internal_topic(topic, _), do: topic - @spec thread_containment(Activity.t(), User.t()) :: boolean() defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true diff --git a/lib/pleroma_web/streamer/streamer_socket.ex b/lib/pleroma_web/streamer/streamer_socket.ex new file mode 100644 index 0000000000..56bd44cfa5 --- /dev/null +++ b/lib/pleroma_web/streamer/streamer_socket.ex @@ -0,0 +1,31 @@ +defmodule PleromaWeb.Streamer.StreamerSocket do + defstruct transport_pid: nil, user: nil + + alias Pleroma.User + alias PleromaWeb.Streamer.StreamerSocket + + def from_socket(%{ + transport_pid: transport_pid, + assigns: %{user: nil} + }) do + %StreamerSocket{ + transport_pid: transport_pid + } + end + + def from_socket(%{ + transport_pid: transport_pid, + assigns: %{user: %User{} = user} + }) do + %StreamerSocket{ + transport_pid: transport_pid, + user: user + } + end + + def from_socket(%{transport_pid: transport_pid}) do + %StreamerSocket{ + transport_pid: transport_pid + } + end +end diff --git a/lib/pleroma_web/streamer/supervisor.ex b/lib/pleroma_web/streamer/supervisor.ex new file mode 100644 index 0000000000..d2439021eb --- /dev/null +++ b/lib/pleroma_web/streamer/supervisor.ex @@ -0,0 +1,18 @@ +defmodule PleromaWeb.Streamer.Supervisor do + use Supervisor + + def start_link(opts) do + Supervisor.start_link(__MODULE__, opts, name: __MODULE__) + end + + def init(args) do + children = [ + {PleromaWeb.Streamer.State, args}, + {PleromaWeb.Streamer.Ping, args}, + {PleromaWeb.Streamer, args} + ] + + opts = [strategy: :one_for_one, name: PleromaWeb.Streamer.Supervisor] + Supervisor.init(children, opts) + end +end diff --git a/test/integration/mastodon_websocket_test.exs b/test/integration/mastodon_websocket_test.exs index 3975cdcd69..237b01c0c5 100644 --- a/test/integration/mastodon_websocket_test.exs +++ b/test/integration/mastodon_websocket_test.exs @@ -10,7 +10,7 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do alias Pleroma.Integration.WebsocketClient alias Pleroma.Web.CommonAPI alias Pleroma.Web.OAuth - alias Pleroma.Web.Streamer + alias PleromaWeb.Streamer @path Pleroma.Web.Endpoint.url() |> URI.parse() @@ -19,13 +19,8 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do |> URI.to_string() setup do - GenServer.start(Streamer, %{}, name: Streamer) - - on_exit(fn -> - if pid = Process.whereis(Streamer) do - Process.exit(pid, :kill) - end - end) + start_supervised(Streamer.supervisor()) + :ok end def start_socket(qs \\ nil, headers \\ []) do diff --git a/test/notification_test.exs b/test/notification_test.exs index 2a52dad8d8..d2c24f8f5f 100644 --- a/test/notification_test.exs +++ b/test/notification_test.exs @@ -11,7 +11,7 @@ defmodule Pleroma.NotificationTest do alias Pleroma.User alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.CommonAPI - alias Pleroma.Web.Streamer + alias PleromaWeb.Streamer describe "create_notifications" do test "notifies someone when they are directly addressed" do @@ -69,13 +69,9 @@ test "does not create a notification for subscribed users if status is a reply" describe "create_notification" do setup do - GenServer.start(Streamer, %{}, name: Streamer) + start_supervised(Streamer.supervisor()) - on_exit(fn -> - if pid = Process.whereis(Streamer) do - Process.exit(pid, :kill) - end - end) + :ok end test "it creates a notification for user and send to the 'user' and the 'user:notification' stream" do diff --git a/test/notification_test.exs.rej b/test/notification_test.exs.rej new file mode 100644 index 0000000000..793faa4573 --- /dev/null +++ b/test/notification_test.exs.rej @@ -0,0 +1,10 @@ +diff a/test/notification_test.exs b/test/notification_test.exs (rejected hunks) +@@ -11,7 +11,7 @@ defmodule Pleroma.NotificationTest do + alias Pleroma.User + alias Pleroma.Web.ActivityPub.Transmogrifier + alias Pleroma.Web.CommonAPI +- alias Pleroma.Web.Streamer ++ alias PleromaWeb.Streamer + alias Pleroma.Web.TwitterAPI.TwitterAPI + + describe "create_notifications" do diff --git a/test/pleroma_web/streamer/ping_test.exs b/test/pleroma_web/streamer/ping_test.exs new file mode 100644 index 0000000000..5a43b83a9b --- /dev/null +++ b/test/pleroma_web/streamer/ping_test.exs @@ -0,0 +1,38 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule PleromaWeb.PingTest do + use Pleroma.DataCase + + import Pleroma.Factory + # alias Pleroma.User + alias PleromaWeb.Streamer + # alias PleromaWeb.Streamer.StreamerSocket + + setup do + start_supervised({Streamer.supervisor(), [ping_interval: 30]}) + + :ok + end + + describe "sockets" do + setup do + user = insert(:user) + {:ok, %{user: user}} + end + + test "it sends pings", %{user: user} do + task = + Task.async(fn -> + assert_receive {:text, received_event}, 40 + assert_receive {:text, received_event}, 40 + assert_receive {:text, received_event}, 40 + end) + + Streamer.add_socket("public", %{transport_pid: task.pid, assigns: %{user: user}}) + + Task.await(task) + end + end +end \ No newline at end of file diff --git a/test/pleroma_web/streamer/state_test.exs b/test/pleroma_web/streamer/state_test.exs new file mode 100644 index 0000000000..2fe047a86c --- /dev/null +++ b/test/pleroma_web/streamer/state_test.exs @@ -0,0 +1,57 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule PleromaWeb.StateTest do + use Pleroma.DataCase + + import Pleroma.Factory + #alias Pleroma.User + alias PleromaWeb.Streamer + alias PleromaWeb.Streamer.StreamerSocket + + setup do + start_supervised(Streamer.supervisor()) + + :ok + end + + describe "sockets" do + setup do + user = insert(:user) + user2 = insert(:user) + {:ok, %{user: user, user2: user2}} + end + + test "it can add a socket", %{user: user} do + Streamer.add_socket("public", %{transport_pid: 1, assigns: %{user: user}}) + + assert( + %{"public" => [%StreamerSocket{transport_pid: 1}]} = Streamer.get_sockets() + ) + end + + test "it can add multiple sockets per user", %{user: user} do + Streamer.add_socket("public", %{transport_pid: 1, assigns: %{user: user}}) + Streamer.add_socket("public", %{transport_pid: 2, assigns: %{user: user}}) + + assert( + %{"public" => [ + %StreamerSocket{transport_pid: 2}, + %StreamerSocket{transport_pid: 1} + ]} = Streamer.get_sockets() + ) + end + + test "it will not add a duplicate socket", %{user: user} do + Streamer.add_socket("activity", %{transport_pid: 1, assigns: %{user: user}}) + Streamer.add_socket("activity", %{transport_pid: 1, assigns: %{user: user}}) + + assert( + %{"activity" => [ + %StreamerSocket{transport_pid: 1} + ]} = Streamer.get_sockets() + ) + end + end +end \ No newline at end of file diff --git a/test/web/streamer_test.exs b/test/pleroma_web/streamer/streamer_test.exs similarity index 86% rename from test/web/streamer_test.exs rename to test/pleroma_web/streamer/streamer_test.exs index 96fa7645f9..22d15aab95 100644 --- a/test/web/streamer_test.exs +++ b/test/pleroma_web/streamer/streamer_test.exs @@ -2,27 +2,30 @@ # Copyright © 2017-2018 Pleroma Authors # SPDX-License-Identifier: AGPL-3.0-only -defmodule Pleroma.Web.StreamerTest do +defmodule PleromaWeb.StreamerTest do use Pleroma.DataCase alias Pleroma.List alias Pleroma.User alias Pleroma.Web.CommonAPI - alias Pleroma.Web.Streamer + alias PleromaWeb.Streamer import Pleroma.Factory + alias PleromaWeb.Streamer.StreamerSocket - clear_config_all([:instance, :skip_thread_containment]) + setup do + skip_thread_containment = Pleroma.Config.get([:instance, :skip_thread_containment]) - describe "user streams" do - setup do - GenServer.start(Streamer, %{}, name: Streamer) + on_exit(fn -> + Pleroma.Config.put([:instance, :skip_thread_containment], skip_thread_containment) + end) - on_exit(fn -> - if pid = Process.whereis(Streamer) do - Process.exit(pid, :kill) - end - end) + start_supervised(Streamer.supervisor()) + + :ok + end + describe "user streams" do + setup do user = insert(:user) notify = insert(:notification, user: user, activity: build(:note_activity)) {:ok, %{user: user, notify: notify}} @@ -125,11 +128,9 @@ test "it sends to public" do assert_receive {:text, _}, 4_000 end) - fake_socket = %{ + fake_socket = %StreamerSocket{ transport_pid: task.pid, - assigns: %{ - user: user - } + user: user } {:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"}) @@ -155,11 +156,9 @@ test "it sends to public" do assert received_event == expected_event end) - fake_socket = %{ + fake_socket = %StreamerSocket{ transport_pid: task.pid, - assigns: %{ - user: user - } + user: user } {:ok, activity} = CommonAPI.delete(activity.id, other_user) @@ -189,7 +188,7 @@ test "it doesn't send to user if recipients invalid and thread containment is en ) task = Task.async(fn -> refute_receive {:text, _}, 1_000 end) - fake_socket = %{transport_pid: task.pid, assigns: %{user: user}} + fake_socket = %StreamerSocket{transport_pid: task.pid, user: user} topics = %{"public" => [fake_socket]} Streamer.push_to_socket(topics, "public", activity) @@ -211,7 +210,7 @@ test "it sends message if recipients invalid and thread containment is disabled" ) task = Task.async(fn -> assert_receive {:text, _}, 1_000 end) - fake_socket = %{transport_pid: task.pid, assigns: %{user: user}} + fake_socket = %StreamerSocket{transport_pid: task.pid, user: user} topics = %{"public" => [fake_socket]} Streamer.push_to_socket(topics, "public", activity) @@ -233,7 +232,7 @@ test "it sends message if recipients invalid and thread containment is enabled b ) task = Task.async(fn -> assert_receive {:text, _}, 1_000 end) - fake_socket = %{transport_pid: task.pid, assigns: %{user: user}} + fake_socket = %StreamerSocket{transport_pid: task.pid, user: user} topics = %{"public" => [fake_socket]} Streamer.push_to_socket(topics, "public", activity) @@ -251,11 +250,9 @@ test "it doesn't send to blocked users" do refute_receive {:text, _}, 1_000 end) - fake_socket = %{ + fake_socket = %StreamerSocket{ transport_pid: task.pid, - assigns: %{ - user: user - } + user: user } {:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"}) @@ -284,11 +281,9 @@ test "it doesn't send unwanted DMs to list" do refute_receive {:text, _}, 1_000 end) - fake_socket = %{ + fake_socket = %StreamerSocket{ transport_pid: task.pid, - assigns: %{ - user: user_a - } + user: user_a } {:ok, activity} = @@ -318,11 +313,9 @@ test "it doesn't send unwanted private posts to list" do refute_receive {:text, _}, 1_000 end) - fake_socket = %{ + fake_socket = %StreamerSocket{ transport_pid: task.pid, - assigns: %{ - user: user_a - } + user: user_a } {:ok, activity} = @@ -340,7 +333,7 @@ test "it doesn't send unwanted private posts to list" do Task.await(task) end - test "it send wanted private posts to list" do + test "it sends wanted private posts to list" do user_a = insert(:user) user_b = insert(:user) @@ -354,11 +347,9 @@ test "it send wanted private posts to list" do assert_receive {:text, _}, 1_000 end) - fake_socket = %{ + fake_socket = %StreamerSocket{ transport_pid: task.pid, - assigns: %{ - user: user_a - } + user: user_a } {:ok, activity} = @@ -367,11 +358,12 @@ test "it send wanted private posts to list" do "visibility" => "private" }) - topics = %{ - "list:#{list.id}" => [fake_socket] - } + Streamer.add_socket( + "list:#{list.id}", + fake_socket + ) - Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics) + Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, %{}) Task.await(task) end @@ -387,11 +379,9 @@ test "it doesn't send muted reblogs" do refute_receive {:text, _}, 1_000 end) - fake_socket = %{ + fake_socket = %StreamerSocket{ transport_pid: task.pid, - assigns: %{ - user: user1 - } + user: user1 } {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"}) @@ -406,36 +396,8 @@ test "it doesn't send muted reblogs" do Task.await(task) end - test "it doesn't send posts from muted threads" do - user = insert(:user) - user2 = insert(:user) - {:ok, user2, user, _activity} = CommonAPI.follow(user2, user) - - {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"}) - - {:ok, activity} = CommonAPI.add_mute(user2, activity) - - task = Task.async(fn -> refute_receive {:text, _}, 4_000 end) - - Streamer.add_socket( - "user", - %{transport_pid: task.pid, assigns: %{user: user2}} - ) - - Streamer.stream("user", activity) - Task.await(task) - end - describe "direct streams" do setup do - GenServer.start(Streamer, %{}, name: Streamer) - - on_exit(fn -> - if pid = Process.whereis(Streamer) do - Process.exit(pid, :kill) - end - end) - :ok end @@ -474,6 +436,8 @@ test "it doesn't send conversation update to the 'direct' streamj when the last task = Task.async(fn -> + assert_receive {:text, received_event}, 4_000 + assert_receive {:text, received_event}, 4_000 assert_receive {:text, received_event}, 4_000 assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event) @@ -509,6 +473,8 @@ test "it sends conversation update to the 'direct' stream when a message is dele task = Task.async(fn -> + assert_receive {:text, received_event}, 4_000 + assert_receive {:text, received_event}, 4_000 assert_receive {:text, received_event}, 4_000 assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event) diff --git a/test/web/activity_pub/activity_pub_test.exs b/test/web/activity_pub/activity_pub_test.exs index f72b44aed4..3686648be5 100644 --- a/test/web/activity_pub/activity_pub_test.exs +++ b/test/web/activity_pub/activity_pub_test.exs @@ -34,12 +34,12 @@ test "it streams them out" do conversation.participations |> Repo.preload(:user) - with_mock Pleroma.Web.Streamer, + with_mock PleromaWeb.Streamer, stream: fn _, _ -> nil end do ActivityPub.stream_out_participations(conversation.participations) Enum.each(participations, fn participation -> - assert called(Pleroma.Web.Streamer.stream("participation", participation)) + assert called(PleromaWeb.Streamer.stream("participation", participation)) end) end end -- GitLab From e3af5e6ba45de0f82068e6ca3795935af8b9da76 Mon Sep 17 00:00:00 2001 From: stwf Date: Tue, 10 Sep 2019 11:54:17 -0400 Subject: [PATCH 2/7] refactor ActivityPub --- lib/pleroma/web/activity_pub/activity_pub.ex | 50 ++------ lib/pleroma/web/activity_pub/topics.ex | 70 +++++++++++ lib/pleroma_web/streamer/state.ex | 4 - lib/pleroma_web/streamer/streamer.ex | 26 +++- lib/pleroma_web/streamer/streamer_socket.ex | 12 +- test/pleroma_web/streamer/ping_test.exs | 4 +- test/pleroma_web/streamer/state_test.exs | 19 +-- test/pleroma_web/streamer/streamer_test.exs | 2 +- test/web/activity_pub/activity_pub_test.exs | 4 +- test/web/activity_pub/topics_test.exs | 125 +++++++++++++++++++ 10 files changed, 247 insertions(+), 69 deletions(-) create mode 100644 lib/pleroma/web/activity_pub/topics.ex create mode 100644 test/web/activity_pub/topics_test.exs diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 61d0145b11..9e3ccfd384 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -15,6 +15,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do alias Pleroma.Upload alias Pleroma.User alias Pleroma.Web.ActivityPub.MRF + alias Pleroma.Web.ActivityPub.Topics alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.WebFinger alias PleromaWeb.Streamer @@ -187,9 +188,7 @@ def stream_out_participations(participations) do participations |> Repo.preload(:user) - Enum.each(participations, fn participation -> - Streamer.stream("participation", participation) - end) + Streamer.stream("participation", participations) end def stream_out_participations(%Object{data: %{"context" => context}}, user) do @@ -208,41 +207,16 @@ def stream_out_participations(%Object{data: %{"context" => context}}, user) do def stream_out_participations(_, _), do: :noop - def stream_out(activity) do - if activity.data["type"] in ["Create", "Announce", "Delete"] do - object = Object.normalize(activity) - # Do not stream out poll replies - unless object.data["type"] == "Answer" do - Streamer.stream("user", activity) - Streamer.stream("list", activity) - - if get_visibility(activity) == "public" do - Streamer.stream("public", activity) - - if activity.local do - Streamer.stream("public:local", activity) - end - - if activity.data["type"] in ["Create"] do - object.data - |> Map.get("tag", []) - |> Enum.filter(fn tag -> is_bitstring(tag) end) - |> Enum.each(fn tag -> Streamer.stream("hashtag:" <> tag, activity) end) - - if object.data["attachment"] != [] do - Streamer.stream("public:media", activity) - - if activity.local do - Streamer.stream("public:local:media", activity) - end - end - end - else - if get_visibility(activity) == "direct", - do: Streamer.stream("direct", activity) - end - end - end + def stream_out(%Activity{data: %{"type" => data_type}} = activity) + when data_type in ["Create", "Announce", "Delete"] do + + activity + |> Topics.get_activity_topics() + |> Streamer.stream(activity) + end + + def stream_out(_activity) do + :noop end def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do diff --git a/lib/pleroma/web/activity_pub/topics.ex b/lib/pleroma/web/activity_pub/topics.ex new file mode 100644 index 0000000000..a016014b11 --- /dev/null +++ b/lib/pleroma/web/activity_pub/topics.ex @@ -0,0 +1,70 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Web.ActivityPub.Topics do + + alias Pleroma.Object + alias Pleroma.Web.ActivityPub.Visibility + + def get_activity_topics(activity) do + activity + |> Object.normalize + |> generate_topics(activity) + |> List.flatten + end + + defp generate_topics(%{data: %{type: "Answer"}}, _) do + [] + end + + defp generate_topics(object, activity) do + ["user", "list"] ++ visibility_tags(object, activity) + end + + defp visibility_tags(object, activity) do + case Visibility.get_visibility(activity) do + "public" -> + if activity.local do + ["public", "public:local"] + else + ["public"] + end + |> item_creation_tags(object, activity) + + "direct" -> + ["direct"] + + _ -> + [] + end + end + + defp item_creation_tags(tags, %{data: %{type: "Create"}} = object, activity) do + tags ++ hashtags_to_topics(object) ++ attachment_topics(object, activity) + end + + defp item_creation_tags(tags, _, _) do + tags + end + + defp hashtags_to_topics(obj) do + obj.data + |> Map.get("tag", []) + |> Enum.filter(fn tag -> is_bitstring(tag) end) + |> Enum.map(fn tag -> "hashtag:" <> tag end) + end + + defp attachment_topics(%{data: %{"attachment" => []}}, _activity) do + [] + end + + defp attachment_topics(_object, activity) do + if activity.local do + ["public:media", "public:local:media"] + else + ["public:media"] + end + end + +end diff --git a/lib/pleroma_web/streamer/state.ex b/lib/pleroma_web/streamer/state.ex index 04f66074a1..03479c8774 100644 --- a/lib/pleroma_web/streamer/state.ex +++ b/lib/pleroma_web/streamer/state.ex @@ -25,8 +25,6 @@ def add_stream_item(topic, item) do GenServer.call(__MODULE__, %{action: :add, item: item, topic: topic}) end - - def init(init_arg) do {:ok, init_arg} end @@ -59,8 +57,6 @@ def handle_call( _from, %{streams: _streams} = state ) do - - state = Map.put(state, :streams, []) Logger.debug("Got new conn for #{topic}") {:reply, state, state} diff --git a/lib/pleroma_web/streamer/streamer.ex b/lib/pleroma_web/streamer/streamer.ex index 71f5b67fd5..6a65f80a09 100644 --- a/lib/pleroma_web/streamer/streamer.ex +++ b/lib/pleroma_web/streamer/streamer.ex @@ -19,7 +19,6 @@ defmodule PleromaWeb.Streamer do alias PleromaWeb.Streamer.StreamerSocket def start_link(_) do - IO.puts("inside start_link StreamerStreamerStreamerStreamer") GenServer.start_link(__MODULE__, %{}, name: __MODULE__) end @@ -35,6 +34,18 @@ def get_sockets() do State.get_sockets() end + def stream(topics, item) when is_list(topics) do + Enum.each(topics, fn t -> + GenServer.cast(__MODULE__, %{action: :stream, topic: t, item: item}) + end) + end + + def stream(topic, items) when is_list(items) do + Enum.each(items, fn i -> + GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: i}) + end) + end + def stream(topic, item) do GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item}) end @@ -42,7 +53,6 @@ def stream(topic, item) do def supervisor, do: PleromaWeb.Streamer.Supervisor def init(args) do - IO.puts("inside init StreamerStreamerStreamerStreamer") {:ok, args} end @@ -207,7 +217,7 @@ defp should_send?(%User{} = user, %Activity{} = item) do false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host), false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host), true <- thread_containment(item, user), - false <- CommonAPI.thread_muted?(user, item) do + false <- CommonAPI.thread_muted?(user, item) do true else _ -> false @@ -219,7 +229,10 @@ defp should_send?(%User{} = user, %Notification{activity: activity}) do end def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do - Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} -> + Enum.each(topics[topic] || [], fn %StreamerSocket{ + transport_pid: transport_pid, + user: socket_user + } -> # Get the current user so we have up-to-date blocks etc. if socket_user do user = User.get_cached_by_ap_id(socket_user.ap_id) @@ -253,7 +266,10 @@ def push_to_socket(topics, topic, %Activity{ def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop def push_to_socket(topics, topic, item) do - Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} -> + Enum.each(topics[topic] || [], fn %StreamerSocket{ + transport_pid: transport_pid, + user: socket_user + } -> # Get the current user so we have up-to-date blocks etc. if socket_user do user = User.get_cached_by_ap_id(socket_user.ap_id) diff --git a/lib/pleroma_web/streamer/streamer_socket.ex b/lib/pleroma_web/streamer/streamer_socket.ex index 56bd44cfa5..3bb55f5f7d 100644 --- a/lib/pleroma_web/streamer/streamer_socket.ex +++ b/lib/pleroma_web/streamer/streamer_socket.ex @@ -5,18 +5,18 @@ defmodule PleromaWeb.Streamer.StreamerSocket do alias PleromaWeb.Streamer.StreamerSocket def from_socket(%{ - transport_pid: transport_pid, - assigns: %{user: nil} - }) do + transport_pid: transport_pid, + assigns: %{user: nil} + }) do %StreamerSocket{ transport_pid: transport_pid } end def from_socket(%{ - transport_pid: transport_pid, - assigns: %{user: %User{} = user} - }) do + transport_pid: transport_pid, + assigns: %{user: %User{} = user} + }) do %StreamerSocket{ transport_pid: transport_pid, user: user diff --git a/test/pleroma_web/streamer/ping_test.exs b/test/pleroma_web/streamer/ping_test.exs index 5a43b83a9b..856e2b7062 100644 --- a/test/pleroma_web/streamer/ping_test.exs +++ b/test/pleroma_web/streamer/ping_test.exs @@ -6,9 +6,7 @@ defmodule PleromaWeb.PingTest do use Pleroma.DataCase import Pleroma.Factory - # alias Pleroma.User alias PleromaWeb.Streamer - # alias PleromaWeb.Streamer.StreamerSocket setup do start_supervised({Streamer.supervisor(), [ping_interval: 30]}) @@ -35,4 +33,4 @@ test "it sends pings", %{user: user} do Task.await(task) end end -end \ No newline at end of file +end diff --git a/test/pleroma_web/streamer/state_test.exs b/test/pleroma_web/streamer/state_test.exs index 2fe047a86c..38621044e7 100644 --- a/test/pleroma_web/streamer/state_test.exs +++ b/test/pleroma_web/streamer/state_test.exs @@ -6,7 +6,6 @@ defmodule PleromaWeb.StateTest do use Pleroma.DataCase import Pleroma.Factory - #alias Pleroma.User alias PleromaWeb.Streamer alias PleromaWeb.Streamer.StreamerSocket @@ -26,9 +25,7 @@ defmodule PleromaWeb.StateTest do test "it can add a socket", %{user: user} do Streamer.add_socket("public", %{transport_pid: 1, assigns: %{user: user}}) - assert( - %{"public" => [%StreamerSocket{transport_pid: 1}]} = Streamer.get_sockets() - ) + assert(%{"public" => [%StreamerSocket{transport_pid: 1}]} = Streamer.get_sockets()) end test "it can add multiple sockets per user", %{user: user} do @@ -36,10 +33,12 @@ test "it can add multiple sockets per user", %{user: user} do Streamer.add_socket("public", %{transport_pid: 2, assigns: %{user: user}}) assert( - %{"public" => [ + %{ + "public" => [ %StreamerSocket{transport_pid: 2}, %StreamerSocket{transport_pid: 1} - ]} = Streamer.get_sockets() + ] + } = Streamer.get_sockets() ) end @@ -48,10 +47,12 @@ test "it will not add a duplicate socket", %{user: user} do Streamer.add_socket("activity", %{transport_pid: 1, assigns: %{user: user}}) assert( - %{"activity" => [ + %{ + "activity" => [ %StreamerSocket{transport_pid: 1} - ]} = Streamer.get_sockets() + ] + } = Streamer.get_sockets() ) end end -end \ No newline at end of file +end diff --git a/test/pleroma_web/streamer/streamer_test.exs b/test/pleroma_web/streamer/streamer_test.exs index 22d15aab95..183bfba84f 100644 --- a/test/pleroma_web/streamer/streamer_test.exs +++ b/test/pleroma_web/streamer/streamer_test.exs @@ -156,7 +156,7 @@ test "it sends to public" do assert received_event == expected_event end) - fake_socket = %StreamerSocket{ + fake_socket = %StreamerSocket{ transport_pid: task.pid, user: user } diff --git a/test/web/activity_pub/activity_pub_test.exs b/test/web/activity_pub/activity_pub_test.exs index 3686648be5..359e972177 100644 --- a/test/web/activity_pub/activity_pub_test.exs +++ b/test/web/activity_pub/activity_pub_test.exs @@ -38,9 +38,7 @@ test "it streams them out" do stream: fn _, _ -> nil end do ActivityPub.stream_out_participations(conversation.participations) - Enum.each(participations, fn participation -> - assert called(PleromaWeb.Streamer.stream("participation", participation)) - end) + assert called(PleromaWeb.Streamer.stream("participation", participations)) end end end diff --git a/test/web/activity_pub/topics_test.exs b/test/web/activity_pub/topics_test.exs new file mode 100644 index 0000000000..2fb5d279a9 --- /dev/null +++ b/test/web/activity_pub/topics_test.exs @@ -0,0 +1,125 @@ +defmodule Pleroma.Web.ActivityPub.TopicsTest do + use Pleroma.DataCase + + alias Pleroma.Activity + alias Pleroma.Object + alias Pleroma.Web.ActivityPub.Topics + + require Pleroma.Constants + + describe "poll answer" do + test "produce no topics" do + activity = %Activity{object: %Object{data: %{type: "Answer"}}} + + assert [] == Topics.get_activity_topics(activity) + end + end + + describe "non poll answer" do + test "always add user and list topics" do + activity = %Activity{object: %Object{data: %{type: "FooBar"}}} + topics = Topics.get_activity_topics(activity) + + assert Enum.member?(topics, "user") + assert Enum.member?(topics, "list") + end + end + + describe "public visibility" do + setup do + activity = %Activity{object: %Object{data: %{type: "Note"}}, data: %{"to" => [Pleroma.Constants.as_public()]}} + {:ok, activity: activity} + end + + test "produces public topic", %{activity: activity} do + topics = Topics.get_activity_topics(activity) + + assert Enum.member?(topics, "public") + end + + test "local action produces public:local topic", %{activity: activity} do + activity = %{activity | local: true} + topics = Topics.get_activity_topics(activity) + + assert Enum.member?(topics, "public:local") + end + + test "non-local action does not produce public:local topic", %{activity: activity} do + activity = %{activity | local: false} + topics = Topics.get_activity_topics(activity) + + refute Enum.member?(topics, "public:local") + end + end + + describe "public visibility create events" do + setup do + activity = %Activity{object: %Object{data: %{:type => "Create", "attachment" => []}}, data: %{"to" => [Pleroma.Constants.as_public()]}} + {:ok, activity: activity} + end + + test "with no attachments doesn't produce public:media topics", %{activity: activity} do + topics = Topics.get_activity_topics(activity) + + refute Enum.member?(topics, "public:media") + refute Enum.member?(topics, "public:local:media") + end + + test "converts tags to hash tags", %{activity: %{object: %{data: data} = object} = activity} do + tagged_data = Map.put(data, "tag", ["foo", "bar"]) + activity = %{activity | object: %{object | data: tagged_data}} + + topics = Topics.get_activity_topics(activity) + + assert Enum.member?(topics, "hashtag:foo") + assert Enum.member?(topics, "hashtag:bar") + end + end + + describe "public visibility create events with attachments" do + setup do + activity = %Activity{object: %Object{data: %{:type => "Create", "attachment" => ["foo"]}}, data: %{"to" => [Pleroma.Constants.as_public()]}} + {:ok, activity: activity} + end + + test "produce public:media topics", %{activity: activity} do + topics = Topics.get_activity_topics(activity) + + assert Enum.member?(topics, "public:media") + end + + test "local produces public:local:media topics", %{activity: activity} do + topics = Topics.get_activity_topics(activity) + + assert Enum.member?(topics, "public:local:media") + end + + test "non-local doesn't produce public:local:media topics", %{activity: activity} do + activity = %{activity | local: false} + + topics = Topics.get_activity_topics(activity) + + refute Enum.member?(topics, "public:local:media") + end + + end + + describe "non-public visibility" do + test "produces direct topic" do + activity = %Activity{object: %Object{data: %{type: "Note"}}, data: %{"to" => []}} + topics = Topics.get_activity_topics(activity) + + assert Enum.member?(topics, "direct") + refute Enum.member?(topics, "public") + refute Enum.member?(topics, "public:local") + refute Enum.member?(topics, "public:media") + refute Enum.member?(topics, "public:local:media") + end + end +end + + + + + + -- GitLab From ce2891cfa1544757ad729303bb400bfa471c8fcf Mon Sep 17 00:00:00 2001 From: stwf Date: Tue, 10 Sep 2019 11:58:17 -0400 Subject: [PATCH 3/7] create Worker module --- lib/pleroma/application.ex | 2 +- lib/pleroma/notification.ex | 6 +- lib/pleroma/web/activity_pub/activity_pub.ex | 1 - lib/pleroma/web/activity_pub/topics.ex | 6 +- lib/pleroma_web/streamer/state.ex | 2 +- lib/pleroma_web/streamer/streamer.ex | 273 +------------------ lib/pleroma_web/streamer/supervisor.ex | 14 +- lib/pleroma_web/streamer/worker.ex | 220 +++++++++++++++ lib/pleroma_web/views/streamer_view.ex | 67 +++++ mix.lock | 1 + test/notification_test.exs | 2 +- test/pleroma_web/streamer/streamer_test.exs | 43 ++- 12 files changed, 350 insertions(+), 287 deletions(-) create mode 100644 lib/pleroma_web/streamer/worker.ex create mode 100644 lib/pleroma_web/views/streamer_view.ex diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 4999911e71..88a4bc65d5 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -142,7 +142,7 @@ defp oauth_cleanup_enabled?, defp streamer_child(:test), do: [] defp streamer_child(_) do - [PleromaWeb.Streamer] + [PleromaWeb.Streamer.supervisor()] end defp oauth_cleanup_child(true), diff --git a/lib/pleroma/notification.ex b/lib/pleroma/notification.ex index 4f16962bde..046214a51c 100644 --- a/lib/pleroma/notification.ex +++ b/lib/pleroma/notification.ex @@ -210,8 +210,10 @@ def create_notification(%Activity{} = activity, %User{} = user) do unless skip?(activity, user) do notification = %Notification{user_id: user.id, activity: activity} {:ok, notification} = Repo.insert(notification) - Streamer.stream("user", notification) - Streamer.stream("user:notification", notification) + + ["user", "user:notification"] + |> Streamer.stream(notification) + Push.send(notification) notification end diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 9e3ccfd384..edd91e46e9 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -209,7 +209,6 @@ def stream_out_participations(_, _), do: :noop def stream_out(%Activity{data: %{"type" => data_type}} = activity) when data_type in ["Create", "Announce", "Delete"] do - activity |> Topics.get_activity_topics() |> Streamer.stream(activity) diff --git a/lib/pleroma/web/activity_pub/topics.ex b/lib/pleroma/web/activity_pub/topics.ex index a016014b11..ed7e77c1a9 100644 --- a/lib/pleroma/web/activity_pub/topics.ex +++ b/lib/pleroma/web/activity_pub/topics.ex @@ -3,15 +3,14 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.ActivityPub.Topics do - alias Pleroma.Object alias Pleroma.Web.ActivityPub.Visibility def get_activity_topics(activity) do activity - |> Object.normalize + |> Object.normalize() |> generate_topics(activity) - |> List.flatten + |> List.flatten() end defp generate_topics(%{data: %{type: "Answer"}}, _) do @@ -66,5 +65,4 @@ defp attachment_topics(_object, activity) do ["public:media"] end end - end diff --git a/lib/pleroma_web/streamer/state.ex b/lib/pleroma_web/streamer/state.ex index 03479c8774..2c9284245a 100644 --- a/lib/pleroma_web/streamer/state.ex +++ b/lib/pleroma_web/streamer/state.ex @@ -16,7 +16,7 @@ def remove_socket(topic, socket) do GenServer.call(__MODULE__, %{action: :remove, socket: socket, topic: topic}) end - def get_sockets() do + def get_sockets do %{sockets: stream_sockets} = GenServer.call(__MODULE__, :get_state) stream_sockets end diff --git a/lib/pleroma_web/streamer/streamer.ex b/lib/pleroma_web/streamer/streamer.ex index 6a65f80a09..0b0446a760 100644 --- a/lib/pleroma_web/streamer/streamer.ex +++ b/lib/pleroma_web/streamer/streamer.ex @@ -4,19 +4,9 @@ defmodule PleromaWeb.Streamer do use GenServer - require Logger - alias Pleroma.Activity - alias Pleroma.Config - alias Pleroma.Conversation.Participation - alias Pleroma.Notification - alias Pleroma.Object - alias Pleroma.User - alias Pleroma.Web.ActivityPub.ActivityPub - alias Pleroma.Web.ActivityPub.Visibility - alias Pleroma.Web.CommonAPI - alias Pleroma.Web.MastodonAPI.NotificationView + alias PleromaWeb.Streamer.State - alias PleromaWeb.Streamer.StreamerSocket + alias PleromaWeb.Streamer.Worker def start_link(_) do GenServer.start_link(__MODULE__, %{}, name: __MODULE__) @@ -30,24 +20,16 @@ def remove_socket(topic, socket) do State.remove_socket(topic, socket) end - def get_sockets() do + def get_sockets do State.get_sockets() end - def stream(topics, item) when is_list(topics) do - Enum.each(topics, fn t -> - GenServer.cast(__MODULE__, %{action: :stream, topic: t, item: item}) - end) - end - - def stream(topic, items) when is_list(items) do - Enum.each(items, fn i -> - GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: i}) - end) + def direct_push(topics, topic, activity) do + Worker.push_to_socket(topics, topic, activity) end - def stream(topic, item) do - GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item}) + def stream(topics, items) do + GenServer.cast(Worker, %{action: :stream, topic: topics, item: items}) end def supervisor, do: PleromaWeb.Streamer.Supervisor @@ -55,245 +37,4 @@ def supervisor, do: PleromaWeb.Streamer.Supervisor def init(args) do {:ok, args} end - - def handle_cast(%{action: :stream, topic: "direct", item: item}, state) do - recipient_topics = - User.get_recipients_from_activity(item) - |> Enum.map(fn %{id: id} -> "direct:#{id}" end) - - Enum.each(recipient_topics || [], fn user_topic -> - Logger.debug("Trying to push direct message to #{user_topic}\n\n") - push_to_socket(State.get_sockets(), user_topic, item) - end) - - {:noreply, state} - end - - def handle_cast(%{action: :stream, topic: "participation", item: participation}, state) do - user_topic = "direct:#{participation.user_id}" - Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n") - - push_to_socket(State.get_sockets(), user_topic, participation) - - {:noreply, state} - end - - def handle_cast(%{action: :stream, topic: "list", item: item}, state) do - # filter the recipient list if the activity is not public, see #270. - recipient_lists = - case Visibility.is_public?(item) do - true -> - Pleroma.List.get_lists_from_activity(item) - - _ -> - Pleroma.List.get_lists_from_activity(item) - |> Enum.filter(fn list -> - owner = User.get_cached_by_id(list.user_id) - - Visibility.visible_for_user?(item, owner) - end) - end - - recipient_topics = - recipient_lists - |> Enum.map(fn %{id: id} -> "list:#{id}" end) - - Enum.each(recipient_topics || [], fn list_topic -> - Logger.debug("Trying to push message to #{list_topic}\n\n") - push_to_socket(State.get_sockets(), list_topic, item) - end) - - {:noreply, state} - end - - def handle_cast( - %{action: :stream, topic: topic, item: %Notification{} = item}, - state - ) - when topic in ["user", "user:notification"] do - State.get_sockets() - |> Map.get("#{topic}:#{item.user_id}", []) - |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} -> - with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id), - true <- should_send?(user, item) do - send(transport_pid, {:text, represent_notification(socket_user, item)}) - end - end) - - {:noreply, state} - end - - def handle_cast(%{action: :stream, topic: "user", item: item}, state) do - Logger.debug("Trying to push to users") - - recipient_topics = - User.get_recipients_from_activity(item) - |> Enum.map(fn %{id: id} -> "user:#{id}" end) - - Enum.each(recipient_topics, fn topic -> - push_to_socket(State.get_sockets(), topic, item) - end) - - {:noreply, state} - end - - def handle_cast(%{action: :stream, topic: topic, item: item}, state) do - Logger.debug("Trying to push to #{topic}") - Logger.debug("Pushing item to #{topic}") - push_to_socket(State.get_sockets(), topic, item) - {:noreply, state} - end - - def handle_cast(m, state) do - Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}") - {:noreply, state} - end - - defp represent_update(%Activity{} = activity, %User{} = user) do - %{ - event: "update", - payload: - Pleroma.Web.MastodonAPI.StatusView.render( - "status.json", - activity: activity, - for: user - ) - |> Jason.encode!() - } - |> Jason.encode!() - end - - defp represent_update(%Activity{} = activity) do - %{ - event: "update", - payload: - Pleroma.Web.MastodonAPI.StatusView.render( - "status.json", - activity: activity - ) - |> Jason.encode!() - } - |> Jason.encode!() - end - - def represent_conversation(%Participation{} = participation) do - %{ - event: "conversation", - payload: - Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{ - participation: participation, - for: participation.user - }) - |> Jason.encode!() - } - |> Jason.encode!() - end - - @spec represent_notification(User.t(), Notification.t()) :: binary() - defp represent_notification(%User{} = user, %Notification{} = notify) do - %{ - event: "notification", - payload: - NotificationView.render( - "show.json", - %{notification: notify, for: user} - ) - |> Jason.encode!() - } - |> Jason.encode!() - end - - defp should_send?(%User{} = user, %Activity{} = item) do - blocks = user.info.blocks || [] - mutes = user.info.mutes || [] - reblog_mutes = user.info.muted_reblogs || [] - domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks) - - with parent when not is_nil(parent) <- Object.normalize(item), - true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)), - true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)), - %{host: item_host} <- URI.parse(item.actor), - %{host: parent_host} <- URI.parse(parent.data["actor"]), - false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host), - false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host), - true <- thread_containment(item, user), - false <- CommonAPI.thread_muted?(user, item) do - true - else - _ -> false - end - end - - defp should_send?(%User{} = user, %Notification{activity: activity}) do - should_send?(user, activity) - end - - def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do - Enum.each(topics[topic] || [], fn %StreamerSocket{ - transport_pid: transport_pid, - user: socket_user - } -> - # Get the current user so we have up-to-date blocks etc. - if socket_user do - user = User.get_cached_by_ap_id(socket_user.ap_id) - - if should_send?(user, item) do - send(transport_pid, {:text, represent_update(item, user)}) - end - else - send(transport_pid, {:text, represent_update(item)}) - end - end) - end - - def push_to_socket(topics, topic, %Participation{} = participation) do - Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} -> - send(transport_pid, {:text, represent_conversation(participation)}) - end) - end - - def push_to_socket(topics, topic, %Activity{ - data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id} - }) do - Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} -> - send( - transport_pid, - {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()} - ) - end) - end - - def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop - - def push_to_socket(topics, topic, item) do - Enum.each(topics[topic] || [], fn %StreamerSocket{ - transport_pid: transport_pid, - user: socket_user - } -> - # Get the current user so we have up-to-date blocks etc. - if socket_user do - user = User.get_cached_by_ap_id(socket_user.ap_id) - blocks = user.info.blocks || [] - mutes = user.info.mutes || [] - - with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)), - true <- thread_containment(item, user) do - send(transport_pid, {:text, represent_update(item, user)}) - end - else - send(transport_pid, {:text, represent_update(item)}) - end - end) - end - - @spec thread_containment(Activity.t(), User.t()) :: boolean() - defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true - - defp thread_containment(activity, user) do - if Config.get([:instance, :skip_thread_containment]) do - true - else - ActivityPub.contain_activity(activity, user) - end - end end diff --git a/lib/pleroma_web/streamer/supervisor.ex b/lib/pleroma_web/streamer/supervisor.ex index d2439021eb..368cccac16 100644 --- a/lib/pleroma_web/streamer/supervisor.ex +++ b/lib/pleroma_web/streamer/supervisor.ex @@ -9,10 +9,22 @@ def init(args) do children = [ {PleromaWeb.Streamer.State, args}, {PleromaWeb.Streamer.Ping, args}, - {PleromaWeb.Streamer, args} + {PleromaWeb.Streamer.Worker, args}, + {PleromaWeb.Streamer, args}, + # :poolboy.child_spec(:streamer_worker, poolboy_config()) ] opts = [strategy: :one_for_one, name: PleromaWeb.Streamer.Supervisor] Supervisor.init(children, opts) end + + +# defp poolboy_config do +# [ +# {:name, {:local, :streamer_worker}}, +# {:worker_module, PleromaWeb.Streamer.Worker}, +# {:size, 10}, +# {:max_overflow, 2} +# ] +# end end diff --git a/lib/pleroma_web/streamer/worker.ex b/lib/pleroma_web/streamer/worker.ex new file mode 100644 index 0000000000..fd3fc1ca97 --- /dev/null +++ b/lib/pleroma_web/streamer/worker.ex @@ -0,0 +1,220 @@ +defmodule PleromaWeb.Streamer.Worker do + use GenServer + + require Logger + + alias Pleroma.Activity + alias Pleroma.Config + alias Pleroma.Conversation.Participation + alias Pleroma.Notification + alias Pleroma.Object + alias Pleroma.User + alias Pleroma.Web.ActivityPub.ActivityPub + alias Pleroma.Web.ActivityPub.Visibility + alias Pleroma.Web.CommonAPI + alias PleromaWeb.Streamer.State + alias PleromaWeb.Streamer.StreamerSocket + alias PleromaWeb.StreamerView + + def start_link(_) do + GenServer.start_link(__MODULE__, %{}, name: __MODULE__) + end + + def init(init_arg) do + {:ok, init_arg} + end + + def handle_cast(%{action: :stream, topic: topics, item: item}, state) when is_list(topics) do + Enum.each(topics, fn t -> + do_stream(%{topic: t, item: item}) + end) + + {:noreply, state} + end + + def handle_cast(%{action: :stream, topic: topic, item: items}, state) when is_list(items) do + Enum.each(items, fn i -> + do_stream(%{topic: topic, item: i}) + end) + + {:noreply, state} + end + + def handle_cast(%{action: :stream, topic: topic, item: item}, state) do + do_stream(%{topic: topic, item: item}) + + {:noreply, state} + end + + defp do_stream(%{topic: "direct", item: item}) do + recipient_topics = + User.get_recipients_from_activity(item) + |> Enum.map(fn %{id: id} -> "direct:#{id}" end) + + Enum.each(recipient_topics || [], fn user_topic -> + Logger.debug("Trying to push direct message to #{user_topic}\n\n") + push_to_socket(State.get_sockets(), user_topic, item) + end) + end + + defp do_stream(%{topic: "participation", item: participation}) do + user_topic = "direct:#{participation.user_id}" + Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n") + + push_to_socket(State.get_sockets(), user_topic, participation) + end + + defp do_stream(%{topic: "list", item: item}) do + # filter the recipient list if the activity is not public, see #270. + recipient_lists = + case Visibility.is_public?(item) do + true -> + Pleroma.List.get_lists_from_activity(item) + + _ -> + Pleroma.List.get_lists_from_activity(item) + |> Enum.filter(fn list -> + owner = User.get_cached_by_id(list.user_id) + + Visibility.visible_for_user?(item, owner) + end) + end + + recipient_topics = + recipient_lists + |> Enum.map(fn %{id: id} -> "list:#{id}" end) + + Enum.each(recipient_topics || [], fn list_topic -> + Logger.debug("Trying to push message to #{list_topic}\n\n") + push_to_socket(State.get_sockets(), list_topic, item) + end) + end + + defp do_stream(%{topic: topic, item: %Notification{} = item}) + when topic in ["user", "user:notification"] do + State.get_sockets() + |> Map.get("#{topic}:#{item.user_id}", []) + |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} -> + with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id), + true <- should_send?(user, item) do + send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)}) + end + end) + end + + defp do_stream(%{topic: "user", item: item}) do + Logger.debug("Trying to push to users") + + recipient_topics = + User.get_recipients_from_activity(item) + |> Enum.map(fn %{id: id} -> "user:#{id}" end) + + Enum.each(recipient_topics, fn topic -> + push_to_socket(State.get_sockets(), topic, item) + end) + end + + defp do_stream(%{topic: topic, item: item}) do + Logger.debug("Trying to push to #{topic}") + Logger.debug("Pushing item to #{topic}") + push_to_socket(State.get_sockets(), topic, item) + end + + defp do_stream(m) do + Logger.info("Unknown: #{inspect(m)}") + end + + defp should_send?(%User{} = user, %Activity{} = item) do + blocks = user.info.blocks || [] + mutes = user.info.mutes || [] + reblog_mutes = user.info.muted_reblogs || [] + domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks) + + with parent when not is_nil(parent) <- Object.normalize(item), + true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)), + true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)), + %{host: item_host} <- URI.parse(item.actor), + %{host: parent_host} <- URI.parse(parent.data["actor"]), + false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host), + false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host), + true <- thread_containment(item, user), + false <- CommonAPI.thread_muted?(user, item) do + true + else + _ -> false + end + end + + defp should_send?(%User{} = user, %Notification{activity: activity}) do + should_send?(user, activity) + end + + def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do + Enum.each(topics[topic] || [], fn %StreamerSocket{ + transport_pid: transport_pid, + user: socket_user + } -> + # Get the current user so we have up-to-date blocks etc. + if socket_user do + user = User.get_cached_by_ap_id(socket_user.ap_id) + + if should_send?(user, item) do + send(transport_pid, {:text, StreamerView.render("update.json", item, user)}) + end + else + send(transport_pid, {:text, StreamerView.render("update.json", item)}) + end + end) + end + + def push_to_socket(topics, topic, %Participation{} = participation) do + Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} -> + send(transport_pid, {:text, StreamerView.render("conversation.json", participation)}) + end) + end + + def push_to_socket(topics, topic, %Activity{ + data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id} + }) do + Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} -> + send( + transport_pid, + {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()} + ) + end) + end + + def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop + + def push_to_socket(topics, topic, item) do + Enum.each(topics[topic] || [], fn %StreamerSocket{ + transport_pid: transport_pid, + user: socket_user + } -> + # Get the current user so we have up-to-date blocks etc. + if socket_user do + user = User.get_cached_by_ap_id(socket_user.ap_id) + blocks = user.info.blocks || [] + mutes = user.info.mutes || [] + + with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)), + true <- thread_containment(item, user) do + send(transport_pid, {:text, StreamerView.render("update.json", item, user)}) + end + else + send(transport_pid, {:text, StreamerView.render("update.json", item)}) + end + end) + end + + @spec thread_containment(Activity.t(), User.t()) :: boolean() + defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true + + defp thread_containment(activity, user) do + if Config.get([:instance, :skip_thread_containment]) do + true + else + ActivityPub.contain_activity(activity, user) + end + end +end diff --git a/lib/pleroma_web/views/streamer_view.ex b/lib/pleroma_web/views/streamer_view.ex new file mode 100644 index 0000000000..c3feacf5c7 --- /dev/null +++ b/lib/pleroma_web/views/streamer_view.ex @@ -0,0 +1,67 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule PleromaWeb.StreamerView do + use Pleroma.Web, :view + + alias Pleroma.Activity + alias Pleroma.Conversation.Participation + alias Pleroma.Notification + alias Pleroma.Web.MastodonAPI.NotificationView + alias Pleroma.User + + def render("update.json", %Activity{} = activity, %User{} = user) do + %{ + event: "update", + payload: + Pleroma.Web.MastodonAPI.StatusView.render( + "status.json", + activity: activity, + for: user + ) + |> Jason.encode!() + } + |> Jason.encode!() + end + + def render("notification.json", %User{} = user, %Notification{} = notify) do + %{ + event: "notification", + payload: + NotificationView.render( + "show.json", + %{notification: notify, for: user} + ) + |> Jason.encode!() + } + |> Jason.encode!() + end + + def render("update.json", %Activity{} = activity) do + %{ + event: "update", + payload: + Pleroma.Web.MastodonAPI.StatusView.render( + "status.json", + activity: activity + ) + |> Jason.encode!() + } + |> Jason.encode!() + end + + def render("conversation.json", %Participation{} = participation) do + %{ + event: "conversation", + payload: + Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{ + participation: participation, + for: participation.user + }) + |> Jason.encode!() + } + |> Jason.encode!() + end + +end diff --git a/mix.lock b/mix.lock index 2639e96e95..8ca34144b5 100644 --- a/mix.lock +++ b/mix.lock @@ -70,6 +70,7 @@ "plug_crypto": {:hex, :plug_crypto, "1.0.0", "18e49317d3fa343f24620ed22795ec29d4a5e602d52d1513ccea0b07d8ea7d4d", [:mix], [], "hexpm"}, "plug_static_index_html": {:hex, :plug_static_index_html, "1.0.0", "840123d4d3975585133485ea86af73cb2600afd7f2a976f9f5fd8b3808e636a0", [:mix], [{:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"}, "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"}, + "poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm"}, "postgrex": {:hex, :postgrex, "0.14.3", "5754dee2fdf6e9e508cbf49ab138df964278700b764177e8f3871e658b345a1e", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm"}, "prometheus": {:hex, :prometheus, "4.4.1", "1e96073b3ed7788053768fea779cbc896ddc3bdd9ba60687f2ad50b252ac87d6", [:mix, :rebar3], [], "hexpm"}, "prometheus_ecto": {:hex, :prometheus_ecto, "1.4.1", "6c768ea9654de871e5b32fab2eac348467b3021604ebebbcbd8bcbe806a65ed5", [:mix], [{:ecto, "~> 2.0 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:prometheus_ex, "~> 1.1 or ~> 2.0 or ~> 3.0", [hex: :prometheus_ex, repo: "hexpm", optional: false]}], "hexpm"}, diff --git a/test/notification_test.exs b/test/notification_test.exs index d2c24f8f5f..6a1c609294 100644 --- a/test/notification_test.exs +++ b/test/notification_test.exs @@ -9,9 +9,9 @@ defmodule Pleroma.NotificationTest do alias Pleroma.Notification alias Pleroma.User + alias PleromaWeb.Streamer alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.CommonAPI - alias PleromaWeb.Streamer describe "create_notifications" do test "notifies someone when they are directly addressed" do diff --git a/test/pleroma_web/streamer/streamer_test.exs b/test/pleroma_web/streamer/streamer_test.exs index 183bfba84f..1b559f3394 100644 --- a/test/pleroma_web/streamer/streamer_test.exs +++ b/test/pleroma_web/streamer/streamer_test.exs @@ -11,6 +11,7 @@ defmodule PleromaWeb.StreamerTest do alias PleromaWeb.Streamer import Pleroma.Factory alias PleromaWeb.Streamer.StreamerSocket + alias PleromaWeb.Streamer.Worker setup do skip_thread_containment = Pleroma.Config.get([:instance, :skip_thread_containment]) @@ -139,7 +140,7 @@ test "it sends to public" do "public" => [fake_socket] } - Streamer.push_to_socket(topics, "public", activity) + Streamer.direct_push(topics, "public", activity) Task.await(task) @@ -167,7 +168,7 @@ test "it sends to public" do "public" => [fake_socket] } - Streamer.push_to_socket(topics, "public", activity) + Streamer.direct_push(topics, "public", activity) Task.await(task) end @@ -190,7 +191,7 @@ test "it doesn't send to user if recipients invalid and thread containment is en task = Task.async(fn -> refute_receive {:text, _}, 1_000 end) fake_socket = %StreamerSocket{transport_pid: task.pid, user: user} topics = %{"public" => [fake_socket]} - Streamer.push_to_socket(topics, "public", activity) + Streamer.direct_push(topics, "public", activity) Task.await(task) end @@ -212,7 +213,7 @@ test "it sends message if recipients invalid and thread containment is disabled" task = Task.async(fn -> assert_receive {:text, _}, 1_000 end) fake_socket = %StreamerSocket{transport_pid: task.pid, user: user} topics = %{"public" => [fake_socket]} - Streamer.push_to_socket(topics, "public", activity) + Streamer.direct_push(topics, "public", activity) Task.await(task) end @@ -234,7 +235,7 @@ test "it sends message if recipients invalid and thread containment is enabled b task = Task.async(fn -> assert_receive {:text, _}, 1_000 end) fake_socket = %StreamerSocket{transport_pid: task.pid, user: user} topics = %{"public" => [fake_socket]} - Streamer.push_to_socket(topics, "public", activity) + Streamer.direct_push(topics, "public", activity) Task.await(task) end @@ -261,7 +262,7 @@ test "it doesn't send to blocked users" do "public" => [fake_socket] } - Streamer.push_to_socket(topics, "public", activity) + Streamer.direct_push(topics, "public", activity) Task.await(task) end @@ -296,7 +297,7 @@ test "it doesn't send unwanted DMs to list" do "list:#{list.id}" => [fake_socket] } - Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics) + Worker.handle_cast(%{action: :stream, topic: "list", item: activity}, topics) Task.await(task) end @@ -328,7 +329,7 @@ test "it doesn't send unwanted private posts to list" do "list:#{list.id}" => [fake_socket] } - Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics) + Worker.handle_cast(%{action: :stream, topic: "list", item: activity}, topics) Task.await(task) end @@ -363,7 +364,7 @@ test "it sends wanted private posts to list" do fake_socket ) - Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, %{}) + Worker.handle_cast(%{action: :stream, topic: "list", item: activity}, %{}) Task.await(task) end @@ -391,11 +392,33 @@ test "it doesn't send muted reblogs" do "public" => [fake_socket] } - Streamer.push_to_socket(topics, "public", announce_activity) + Streamer.direct_push(topics, "public", announce_activity) Task.await(task) end + test "it doesn't send posts from muted threads" do + user = insert(:user) + user2 = insert(:user) + {:ok, user2, user, _activity} = CommonAPI.follow(user2, user) + + {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"}) + + {:ok, activity} = CommonAPI.add_mute(user2, activity) + + task = Task.async(fn -> refute_receive {:text, _}, 4_000 end) + + Process.sleep(4000) + + Streamer.add_socket( + "user", + %{transport_pid: task.pid, assigns: %{user: user2}} + ) + + Streamer.stream("user", activity) + Task.await(task) + end + describe "direct streams" do setup do :ok -- GitLab From 1cae2167d36d687c5e98109e835f6c4e1b84b5fa Mon Sep 17 00:00:00 2001 From: stwf Date: Tue, 10 Sep 2019 12:14:48 -0400 Subject: [PATCH 4/7] Implement Poolboy --- config/config.exs | 4 ++ lib/pleroma/activity.ex | 2 - lib/pleroma/web/activity_pub/topics.ex | 25 +++++------- lib/pleroma_web/streamer/state.ex | 32 +++------------ lib/pleroma_web/streamer/streamer.ex | 40 ++++++++++++------- lib/pleroma_web/streamer/supervisor.ex | 25 ++++++------ lib/pleroma_web/streamer/worker.ex | 22 +++++------ lib/pleroma_web/views/streamer_view.ex | 3 +- mix.exs | 1 + test/integration/mastodon_websocket_test.exs | 11 +++--- test/notification_test.exs | 7 +--- test/notification_test.exs.rej | 10 ----- test/pleroma_web/streamer/state_test.exs | 6 +-- test/pleroma_web/streamer/streamer_test.exs | 41 ++++++++------------ test/support/conn_case.ex | 4 ++ test/support/data_case.ex | 4 ++ test/web/activity_pub/topics_test.exs | 31 ++++++++------- 17 files changed, 120 insertions(+), 148 deletions(-) delete mode 100644 test/notification_test.exs.rej diff --git a/config/config.exs b/config/config.exs index 5206fe3759..003d37aab2 100644 --- a/config/config.exs +++ b/config/config.exs @@ -313,6 +313,10 @@ follow_handshake_timeout: 500, sign_object_fetches: true +config :pleroma, :streamer, + workers: 3, + overflow_workers: 2 + config :pleroma, :user, deny_follow_blocked: true config :pleroma, :mrf_normalize_markup, scrub_policy: Pleroma.HTML.Scrubber.Default diff --git a/lib/pleroma/activity.ex b/lib/pleroma/activity.ex index 3e7362bd69..6a51d4cf36 100644 --- a/lib/pleroma/activity.ex +++ b/lib/pleroma/activity.ex @@ -157,8 +157,6 @@ def get_by_id(id) do |> Repo.one() end - def get_by_id_with_object(nil), do: nil - def get_by_id_with_object(id) do from(activity in Activity, where: activity.id == ^id, diff --git a/lib/pleroma/web/activity_pub/topics.ex b/lib/pleroma/web/activity_pub/topics.ex index ed7e77c1a9..e1fa33f240 100644 --- a/lib/pleroma/web/activity_pub/topics.ex +++ b/lib/pleroma/web/activity_pub/topics.ex @@ -13,7 +13,7 @@ def get_activity_topics(activity) do |> List.flatten() end - defp generate_topics(%{data: %{type: "Answer"}}, _) do + defp generate_topics(%{data: %{"type" => "Answer"}}, _) do [] end @@ -39,7 +39,7 @@ defp visibility_tags(object, activity) do end end - defp item_creation_tags(tags, %{data: %{type: "Create"}} = object, activity) do + defp item_creation_tags(tags, %{data: %{"type" => "Create"}} = object, activity) do tags ++ hashtags_to_topics(object) ++ attachment_topics(object, activity) end @@ -47,22 +47,17 @@ defp item_creation_tags(tags, _, _) do tags end - defp hashtags_to_topics(obj) do - obj.data - |> Map.get("tag", []) + defp hashtags_to_topics(%{data: %{"tag" => tags}}) do + tags |> Enum.filter(fn tag -> is_bitstring(tag) end) |> Enum.map(fn tag -> "hashtag:" <> tag end) end - defp attachment_topics(%{data: %{"attachment" => []}}, _activity) do - [] - end + defp hashtags_to_topics(_), do: [] - defp attachment_topics(_object, activity) do - if activity.local do - ["public:media", "public:local:media"] - else - ["public:media"] - end - end + defp attachment_topics(%{data: %{"attachment" => []}}, _act), do: [] + + defp attachment_topics(_object, %{local: true}), do: ["public:media", "public:local:media"] + + defp attachment_topics(_object, _act), do: ["public:media"] end diff --git a/lib/pleroma_web/streamer/state.ex b/lib/pleroma_web/streamer/state.ex index 2c9284245a..3fef9f48a8 100644 --- a/lib/pleroma_web/streamer/state.ex +++ b/lib/pleroma_web/streamer/state.ex @@ -5,15 +5,15 @@ defmodule PleromaWeb.Streamer.State do alias PleromaWeb.Streamer.StreamerSocket def start_link(_) do - GenServer.start_link(__MODULE__, %{sockets: %{}, streams: []}, name: __MODULE__) + GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__) end def add_socket(topic, socket) do - GenServer.call(__MODULE__, %{action: :add, socket: socket, topic: topic}) + GenServer.call(__MODULE__, {:add, socket, topic}) end def remove_socket(topic, socket) do - GenServer.call(__MODULE__, %{action: :remove, socket: socket, topic: topic}) + GenServer.call(__MODULE__, {:remove, socket, topic}) end def get_sockets do @@ -21,10 +21,6 @@ def get_sockets do stream_sockets end - def add_stream_item(topic, item) do - GenServer.call(__MODULE__, %{action: :add, item: item, topic: topic}) - end - def init(init_arg) do {:ok, init_arg} end @@ -33,11 +29,7 @@ def handle_call(:get_state, _from, state) do {:reply, state, state} end - def handle_call( - %{action: :add, topic: topic, socket: socket}, - _from, - %{sockets: sockets} = state - ) do + def handle_call({:add, socket, topic}, _from, %{sockets: sockets} = state) do internal_topic = internal_topic(topic, socket) stream_socket = StreamerSocket.from_socket(socket) @@ -52,21 +44,7 @@ def handle_call( {:reply, state, state} end - def handle_call( - %{action: :add, topic: topic, item: _item}, - _from, - %{streams: _streams} = state - ) do - state = Map.put(state, :streams, []) - Logger.debug("Got new conn for #{topic}") - {:reply, state, state} - end - - def handle_call( - %{action: :remove, topic: topic, socket: socket}, - _from, - %{sockets: sockets} = state - ) do + def handle_call({:remove, socket, topic}, _from, %{sockets: sockets} = state) do internal_topic = internal_topic(topic, socket) stream_socket = PleromaWeb.Streamer.StreamerSocket.from_socket(socket) diff --git a/lib/pleroma_web/streamer/streamer.ex b/lib/pleroma_web/streamer/streamer.ex index 0b0446a760..cf11edc515 100644 --- a/lib/pleroma_web/streamer/streamer.ex +++ b/lib/pleroma_web/streamer/streamer.ex @@ -3,14 +3,10 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule PleromaWeb.Streamer do - use GenServer - alias PleromaWeb.Streamer.State - alias PleromaWeb.Streamer.Worker - def start_link(_) do - GenServer.start_link(__MODULE__, %{}, name: __MODULE__) - end + @timeout 60_000 + @mix_env Mix.env() def add_socket(topic, socket) do State.add_socket(topic, socket) @@ -24,17 +20,35 @@ def get_sockets do State.get_sockets() end - def direct_push(topics, topic, activity) do - Worker.push_to_socket(topics, topic, activity) - end - def stream(topics, items) do - GenServer.cast(Worker, %{action: :stream, topic: topics, item: items}) + if should_send?() do + Task.async(fn -> + :poolboy.transaction( + :streamer_worker, + fn pid -> GenServer.call(pid, {:stream, topics, items}) end, + @timeout + ) + end) + end end def supervisor, do: PleromaWeb.Streamer.Supervisor - def init(args) do - {:ok, args} + defp should_send? do + handle_should_send(@mix_env) + end + + defp handle_should_send(:test) do + case Process.whereis(:streamer_worker) do + nil -> + false + + pid -> + Process.alive?(pid) + end + end + + defp handle_should_send(_) do + true end end diff --git a/lib/pleroma_web/streamer/supervisor.ex b/lib/pleroma_web/streamer/supervisor.ex index 368cccac16..a162ffca61 100644 --- a/lib/pleroma_web/streamer/supervisor.ex +++ b/lib/pleroma_web/streamer/supervisor.ex @@ -9,22 +9,25 @@ def init(args) do children = [ {PleromaWeb.Streamer.State, args}, {PleromaWeb.Streamer.Ping, args}, - {PleromaWeb.Streamer.Worker, args}, - {PleromaWeb.Streamer, args}, - # :poolboy.child_spec(:streamer_worker, poolboy_config()) + :poolboy.child_spec(:streamer_worker, poolboy_config()) ] opts = [strategy: :one_for_one, name: PleromaWeb.Streamer.Supervisor] Supervisor.init(children, opts) end + defp poolboy_config do + opts = + Application.get_env(:pleroma, :streamer, + workers: 3, + overflow_workers: 2 + ) -# defp poolboy_config do -# [ -# {:name, {:local, :streamer_worker}}, -# {:worker_module, PleromaWeb.Streamer.Worker}, -# {:size, 10}, -# {:max_overflow, 2} -# ] -# end + [ + {:name, {:local, :streamer_worker}}, + {:worker_module, PleromaWeb.Streamer.Worker}, + {:size, opts[:workers]}, + {:max_overflow, opts[:overflow_workers]} + ] + end end diff --git a/lib/pleroma_web/streamer/worker.ex b/lib/pleroma_web/streamer/worker.ex index fd3fc1ca97..88cf7de603 100644 --- a/lib/pleroma_web/streamer/worker.ex +++ b/lib/pleroma_web/streamer/worker.ex @@ -17,33 +17,33 @@ defmodule PleromaWeb.Streamer.Worker do alias PleromaWeb.StreamerView def start_link(_) do - GenServer.start_link(__MODULE__, %{}, name: __MODULE__) + GenServer.start_link(__MODULE__, %{}, []) end def init(init_arg) do {:ok, init_arg} end - def handle_cast(%{action: :stream, topic: topics, item: item}, state) when is_list(topics) do + def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do Enum.each(topics, fn t -> do_stream(%{topic: t, item: item}) end) - {:noreply, state} + {:reply, state, state} end - def handle_cast(%{action: :stream, topic: topic, item: items}, state) when is_list(items) do + def handle_call({:stream, topic, items}, _from, state) when is_list(items) do Enum.each(items, fn i -> do_stream(%{topic: topic, item: i}) end) - {:noreply, state} + {:reply, state, state} end - def handle_cast(%{action: :stream, topic: topic, item: item}, state) do + def handle_call({:stream, topic, item}, _from, state) do do_stream(%{topic: topic, item: item}) - {:noreply, state} + {:reply, state, state} end defp do_stream(%{topic: "direct", item: item}) do @@ -51,7 +51,7 @@ defp do_stream(%{topic: "direct", item: item}) do User.get_recipients_from_activity(item) |> Enum.map(fn %{id: id} -> "direct:#{id}" end) - Enum.each(recipient_topics || [], fn user_topic -> + Enum.each(recipient_topics, fn user_topic -> Logger.debug("Trying to push direct message to #{user_topic}\n\n") push_to_socket(State.get_sockets(), user_topic, item) end) @@ -84,7 +84,7 @@ defp do_stream(%{topic: "list", item: item}) do recipient_lists |> Enum.map(fn %{id: id} -> "list:#{id}" end) - Enum.each(recipient_topics || [], fn list_topic -> + Enum.each(recipient_topics, fn list_topic -> Logger.debug("Trying to push message to #{list_topic}\n\n") push_to_socket(State.get_sockets(), list_topic, item) end) @@ -120,10 +120,6 @@ defp do_stream(%{topic: topic, item: item}) do push_to_socket(State.get_sockets(), topic, item) end - defp do_stream(m) do - Logger.info("Unknown: #{inspect(m)}") - end - defp should_send?(%User{} = user, %Activity{} = item) do blocks = user.info.blocks || [] mutes = user.info.mutes || [] diff --git a/lib/pleroma_web/views/streamer_view.ex b/lib/pleroma_web/views/streamer_view.ex index c3feacf5c7..67e8556fc6 100644 --- a/lib/pleroma_web/views/streamer_view.ex +++ b/lib/pleroma_web/views/streamer_view.ex @@ -8,8 +8,8 @@ defmodule PleromaWeb.StreamerView do alias Pleroma.Activity alias Pleroma.Conversation.Participation alias Pleroma.Notification - alias Pleroma.Web.MastodonAPI.NotificationView alias Pleroma.User + alias Pleroma.Web.MastodonAPI.NotificationView def render("update.json", %Activity{} = activity, %User{} = user) do %{ @@ -63,5 +63,4 @@ def render("conversation.json", %Participation{} = participation) do } |> Jason.encode!() end - end diff --git a/mix.exs b/mix.exs index 3170d6f2d7..7e4868fc48 100644 --- a/mix.exs +++ b/mix.exs @@ -143,6 +143,7 @@ defp deps do ref: "293d77bb6f4a67ac8bde1428735c3b42f22cbb30"}, {:pleroma_job_queue, "~> 0.3"}, {:telemetry, "~> 0.3"}, + {:poolboy, "~> 1.5"}, {:prometheus_ex, "~> 3.0"}, {:prometheus_plugs, "~> 1.1"}, {:prometheus_phoenix, "~> 1.3"}, diff --git a/test/integration/mastodon_websocket_test.exs b/test/integration/mastodon_websocket_test.exs index 237b01c0c5..40eff169c5 100644 --- a/test/integration/mastodon_websocket_test.exs +++ b/test/integration/mastodon_websocket_test.exs @@ -10,7 +10,6 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do alias Pleroma.Integration.WebsocketClient alias Pleroma.Web.CommonAPI alias Pleroma.Web.OAuth - alias PleromaWeb.Streamer @path Pleroma.Web.Endpoint.url() |> URI.parse() @@ -18,11 +17,6 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do |> Map.put(:path, "/api/v1/streaming") |> URI.to_string() - setup do - start_supervised(Streamer.supervisor()) - :ok - end - def start_socket(qs \\ nil, headers \\ []) do path = case qs do @@ -43,12 +37,14 @@ test "requires authentication and a valid token for protected streams" do assert {:error, {403, _}} = start_socket("?stream=user") end + @tag needs_streamer: true test "allows public streams without authentication" do assert {:ok, _} = start_socket("?stream=public") assert {:ok, _} = start_socket("?stream=public:local") assert {:ok, _} = start_socket("?stream=hashtag&tag=lain") end + @tag needs_streamer: true test "receives well formatted events" do user = insert(:user) {:ok, _} = start_socket("?stream=public") @@ -93,16 +89,19 @@ test "accepts valid tokens", state do assert {:ok, _} = start_socket("?stream=user&access_token=#{state.token.token}") end + @tag needs_streamer: true test "accepts the 'user' stream", %{token: token} = _state do assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}") assert {:error, {403, "Forbidden"}} = start_socket("?stream=user") end + @tag needs_streamer: true test "accepts the 'user:notification' stream", %{token: token} = _state do assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}") assert {:error, {403, "Forbidden"}} = start_socket("?stream=user:notification") end + @tag needs_streamer: true test "accepts valid token on Sec-WebSocket-Protocol header", %{token: token} do assert {:ok, _} = start_socket("?stream=user", [{"Sec-WebSocket-Protocol", token.token}]) diff --git a/test/notification_test.exs b/test/notification_test.exs index 6a1c609294..4493af1f55 100644 --- a/test/notification_test.exs +++ b/test/notification_test.exs @@ -68,12 +68,7 @@ test "does not create a notification for subscribed users if status is a reply" end describe "create_notification" do - setup do - start_supervised(Streamer.supervisor()) - - :ok - end - + @tag needs_streamer: true test "it creates a notification for user and send to the 'user' and the 'user:notification' stream" do user = insert(:user) task = Task.async(fn -> assert_receive {:text, _}, 4_000 end) diff --git a/test/notification_test.exs.rej b/test/notification_test.exs.rej deleted file mode 100644 index 793faa4573..0000000000 --- a/test/notification_test.exs.rej +++ /dev/null @@ -1,10 +0,0 @@ -diff a/test/notification_test.exs b/test/notification_test.exs (rejected hunks) -@@ -11,7 +11,7 @@ defmodule Pleroma.NotificationTest do - alias Pleroma.User - alias Pleroma.Web.ActivityPub.Transmogrifier - alias Pleroma.Web.CommonAPI -- alias Pleroma.Web.Streamer -+ alias PleromaWeb.Streamer - alias Pleroma.Web.TwitterAPI.TwitterAPI - - describe "create_notifications" do diff --git a/test/pleroma_web/streamer/state_test.exs b/test/pleroma_web/streamer/state_test.exs index 38621044e7..6c39547c39 100644 --- a/test/pleroma_web/streamer/state_test.exs +++ b/test/pleroma_web/streamer/state_test.exs @@ -9,11 +9,7 @@ defmodule PleromaWeb.StateTest do alias PleromaWeb.Streamer alias PleromaWeb.Streamer.StreamerSocket - setup do - start_supervised(Streamer.supervisor()) - - :ok - end + @moduletag needs_streamer: true describe "sockets" do setup do diff --git a/test/pleroma_web/streamer/streamer_test.exs b/test/pleroma_web/streamer/streamer_test.exs index 1b559f3394..06fc33aa6f 100644 --- a/test/pleroma_web/streamer/streamer_test.exs +++ b/test/pleroma_web/streamer/streamer_test.exs @@ -13,17 +13,8 @@ defmodule PleromaWeb.StreamerTest do alias PleromaWeb.Streamer.StreamerSocket alias PleromaWeb.Streamer.Worker - setup do - skip_thread_containment = Pleroma.Config.get([:instance, :skip_thread_containment]) - - on_exit(fn -> - Pleroma.Config.put([:instance, :skip_thread_containment], skip_thread_containment) - end) - - start_supervised(Streamer.supervisor()) - - :ok - end + @moduletag needs_streamer: true + clear_config_all([:instance, :skip_thread_containment]) describe "user streams" do setup do @@ -140,7 +131,7 @@ test "it sends to public" do "public" => [fake_socket] } - Streamer.direct_push(topics, "public", activity) + Worker.push_to_socket(topics, "public", activity) Task.await(task) @@ -168,7 +159,7 @@ test "it sends to public" do "public" => [fake_socket] } - Streamer.direct_push(topics, "public", activity) + Worker.push_to_socket(topics, "public", activity) Task.await(task) end @@ -191,7 +182,7 @@ test "it doesn't send to user if recipients invalid and thread containment is en task = Task.async(fn -> refute_receive {:text, _}, 1_000 end) fake_socket = %StreamerSocket{transport_pid: task.pid, user: user} topics = %{"public" => [fake_socket]} - Streamer.direct_push(topics, "public", activity) + Worker.push_to_socket(topics, "public", activity) Task.await(task) end @@ -213,7 +204,7 @@ test "it sends message if recipients invalid and thread containment is disabled" task = Task.async(fn -> assert_receive {:text, _}, 1_000 end) fake_socket = %StreamerSocket{transport_pid: task.pid, user: user} topics = %{"public" => [fake_socket]} - Streamer.direct_push(topics, "public", activity) + Worker.push_to_socket(topics, "public", activity) Task.await(task) end @@ -235,7 +226,7 @@ test "it sends message if recipients invalid and thread containment is enabled b task = Task.async(fn -> assert_receive {:text, _}, 1_000 end) fake_socket = %StreamerSocket{transport_pid: task.pid, user: user} topics = %{"public" => [fake_socket]} - Streamer.direct_push(topics, "public", activity) + Worker.push_to_socket(topics, "public", activity) Task.await(task) end @@ -262,7 +253,7 @@ test "it doesn't send to blocked users" do "public" => [fake_socket] } - Streamer.direct_push(topics, "public", activity) + Worker.push_to_socket(topics, "public", activity) Task.await(task) end @@ -297,7 +288,7 @@ test "it doesn't send unwanted DMs to list" do "list:#{list.id}" => [fake_socket] } - Worker.handle_cast(%{action: :stream, topic: "list", item: activity}, topics) + Worker.handle_call({:stream, "list", activity}, self(), topics) Task.await(task) end @@ -329,7 +320,7 @@ test "it doesn't send unwanted private posts to list" do "list:#{list.id}" => [fake_socket] } - Worker.handle_cast(%{action: :stream, topic: "list", item: activity}, topics) + Worker.handle_call({:stream, "list", activity}, self(), topics) Task.await(task) end @@ -364,7 +355,7 @@ test "it sends wanted private posts to list" do fake_socket ) - Worker.handle_cast(%{action: :stream, topic: "list", item: activity}, %{}) + Worker.handle_call({:stream, "list", activity}, self(), %{}) Task.await(task) end @@ -392,7 +383,7 @@ test "it doesn't send muted reblogs" do "public" => [fake_socket] } - Streamer.direct_push(topics, "public", announce_activity) + Worker.push_to_socket(topics, "public", announce_activity) Task.await(task) end @@ -459,14 +450,14 @@ test "it doesn't send conversation update to the 'direct' streamj when the last task = Task.async(fn -> - assert_receive {:text, received_event}, 4_000 - assert_receive {:text, received_event}, 4_000 assert_receive {:text, received_event}, 4_000 assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event) refute_receive {:text, _}, 4_000 end) + Process.sleep(1000) + Streamer.add_socket( "direct", %{transport_pid: task.pid, assigns: %{user: user}} @@ -496,8 +487,6 @@ test "it sends conversation update to the 'direct' stream when a message is dele task = Task.async(fn -> - assert_receive {:text, received_event}, 4_000 - assert_receive {:text, received_event}, 4_000 assert_receive {:text, received_event}, 4_000 assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event) @@ -510,6 +499,8 @@ test "it sends conversation update to the 'direct' stream when a message is dele assert last_status["id"] == to_string(create_activity.id) end) + Process.sleep(1000) + Streamer.add_socket( "direct", %{transport_pid: task.pid, assigns: %{user: user}} diff --git a/test/support/conn_case.ex b/test/support/conn_case.ex index ec5892ff53..9aedcfce62 100644 --- a/test/support/conn_case.ex +++ b/test/support/conn_case.ex @@ -40,6 +40,10 @@ defmodule Pleroma.Web.ConnCase do Ecto.Adapters.SQL.Sandbox.mode(Pleroma.Repo, {:shared, self()}) end + if tags[:needs_streamer] do + start_supervised(PleromaWeb.Streamer.supervisor()) + end + {:ok, conn: Phoenix.ConnTest.build_conn()} end end diff --git a/test/support/data_case.ex b/test/support/data_case.ex index f3d98e7e31..88782bfbbc 100644 --- a/test/support/data_case.ex +++ b/test/support/data_case.ex @@ -39,6 +39,10 @@ defmodule Pleroma.DataCase do Ecto.Adapters.SQL.Sandbox.mode(Pleroma.Repo, {:shared, self()}) end + if tags[:needs_streamer] do + start_supervised(PleromaWeb.Streamer.supervisor()) + end + :ok end diff --git a/test/web/activity_pub/topics_test.exs b/test/web/activity_pub/topics_test.exs index 2fb5d279a9..9e550e0b52 100644 --- a/test/web/activity_pub/topics_test.exs +++ b/test/web/activity_pub/topics_test.exs @@ -9,7 +9,7 @@ defmodule Pleroma.Web.ActivityPub.TopicsTest do describe "poll answer" do test "produce no topics" do - activity = %Activity{object: %Object{data: %{type: "Answer"}}} + activity = %Activity{object: %Object{data: %{"type" => "Answer"}}} assert [] == Topics.get_activity_topics(activity) end @@ -17,7 +17,7 @@ test "produce no topics" do describe "non poll answer" do test "always add user and list topics" do - activity = %Activity{object: %Object{data: %{type: "FooBar"}}} + activity = %Activity{object: %Object{data: %{"type" => "FooBar"}}} topics = Topics.get_activity_topics(activity) assert Enum.member?(topics, "user") @@ -27,7 +27,11 @@ test "always add user and list topics" do describe "public visibility" do setup do - activity = %Activity{object: %Object{data: %{type: "Note"}}, data: %{"to" => [Pleroma.Constants.as_public()]}} + activity = %Activity{ + object: %Object{data: %{"type" => "Note"}}, + data: %{"to" => [Pleroma.Constants.as_public()]} + } + {:ok, activity: activity} end @@ -54,7 +58,11 @@ test "non-local action does not produce public:local topic", %{activity: activit describe "public visibility create events" do setup do - activity = %Activity{object: %Object{data: %{:type => "Create", "attachment" => []}}, data: %{"to" => [Pleroma.Constants.as_public()]}} + activity = %Activity{ + object: %Object{data: %{"type" => "Create", "attachment" => []}}, + data: %{"to" => [Pleroma.Constants.as_public()]} + } + {:ok, activity: activity} end @@ -78,7 +86,11 @@ test "converts tags to hash tags", %{activity: %{object: %{data: data} = object} describe "public visibility create events with attachments" do setup do - activity = %Activity{object: %Object{data: %{:type => "Create", "attachment" => ["foo"]}}, data: %{"to" => [Pleroma.Constants.as_public()]}} + activity = %Activity{ + object: %Object{data: %{"type" => "Create", "attachment" => ["foo"]}}, + data: %{"to" => [Pleroma.Constants.as_public()]} + } + {:ok, activity: activity} end @@ -101,12 +113,11 @@ test "non-local doesn't produce public:local:media topics", %{activity: activity refute Enum.member?(topics, "public:local:media") end - end describe "non-public visibility" do test "produces direct topic" do - activity = %Activity{object: %Object{data: %{type: "Note"}}, data: %{"to" => []}} + activity = %Activity{object: %Object{data: %{"type" => "Note"}}, data: %{"to" => []}} topics = Topics.get_activity_topics(activity) assert Enum.member?(topics, "direct") @@ -117,9 +128,3 @@ test "produces direct topic" do end end end - - - - - - -- GitLab From f40cb39f3130af9fccc61405de2bd6f3741fd6d1 Mon Sep 17 00:00:00 2001 From: stwf Date: Tue, 10 Sep 2019 13:39:59 -0400 Subject: [PATCH 5/7] Clean up --- lib/pleroma_web/streamer/streamer.ex | 3 ++- lib/pleroma_web/streamer/worker.ex | 4 ++++ test/notification_test.exs | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/lib/pleroma_web/streamer/streamer.ex b/lib/pleroma_web/streamer/streamer.ex index cf11edc515..396444db8b 100644 --- a/lib/pleroma_web/streamer/streamer.ex +++ b/lib/pleroma_web/streamer/streamer.ex @@ -4,6 +4,7 @@ defmodule PleromaWeb.Streamer do alias PleromaWeb.Streamer.State + alias PleromaWeb.Streamer.Worker @timeout 60_000 @mix_env Mix.env() @@ -25,7 +26,7 @@ def stream(topics, items) do Task.async(fn -> :poolboy.transaction( :streamer_worker, - fn pid -> GenServer.call(pid, {:stream, topics, items}) end, + &Worker.stream(&1, topics, items), @timeout ) end) diff --git a/lib/pleroma_web/streamer/worker.ex b/lib/pleroma_web/streamer/worker.ex index 88cf7de603..b76184d9a8 100644 --- a/lib/pleroma_web/streamer/worker.ex +++ b/lib/pleroma_web/streamer/worker.ex @@ -24,6 +24,10 @@ def init(init_arg) do {:ok, init_arg} end + def stream(pid, topics, items) do + GenServer.call(pid, {:stream, topics, items}) + end + def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do Enum.each(topics, fn t -> do_stream(%{topic: t, item: item}) diff --git a/test/notification_test.exs b/test/notification_test.exs index 4493af1f55..9eea77fb64 100644 --- a/test/notification_test.exs +++ b/test/notification_test.exs @@ -9,9 +9,9 @@ defmodule Pleroma.NotificationTest do alias Pleroma.Notification alias Pleroma.User - alias PleromaWeb.Streamer alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.CommonAPI + alias PleromaWeb.Streamer describe "create_notifications" do test "notifies someone when they are directly addressed" do -- GitLab From ed6e9765eecdc3df082b5ff19144966e2bc461e6 Mon Sep 17 00:00:00 2001 From: stwf Date: Thu, 12 Sep 2019 11:55:28 -0400 Subject: [PATCH 6/7] Incorporate suggestions --- .../{web/activity_pub => activity/ir}/topics.ex | 4 ++-- lib/pleroma/application.ex | 2 +- lib/pleroma/notification.ex | 2 +- lib/pleroma/web/activity_pub/activity_pub.ex | 4 ++-- lib/pleroma/web/mastodon_api/websocket_handler.ex | 4 ++-- lib/{pleroma_web => pleroma/web}/streamer/ping.ex | 6 +++--- .../web}/streamer/state.ex | 8 ++++---- .../web}/streamer/streamer.ex | 8 ++++---- .../web}/streamer/streamer_socket.ex | 4 ++-- .../web}/streamer/supervisor.ex | 12 ++++++------ .../web}/streamer/worker.ex | 8 ++++---- .../web}/views/streamer_view.ex | 2 +- .../activity_pub => activity/ir}/topics_test.exs | 15 +++++++++++++-- test/notification_test.exs | 2 +- test/support/conn_case.ex | 2 +- test/support/data_case.ex | 2 +- test/web/activity_pub/activity_pub_test.exs | 4 ++-- test/{pleroma_web => web}/streamer/ping_test.exs | 4 ++-- test/{pleroma_web => web}/streamer/state_test.exs | 6 +++--- .../streamer/streamer_test.exs | 11 ++++++----- 20 files changed, 61 insertions(+), 49 deletions(-) rename lib/pleroma/{web/activity_pub => activity/ir}/topics.ex (94%) rename lib/{pleroma_web => pleroma/web}/streamer/ping.ex (86%) rename lib/{pleroma_web => pleroma/web}/streamer/state.ex (86%) rename lib/{pleroma_web => pleroma/web}/streamer/streamer.ex (85%) rename lib/{pleroma_web => pleroma/web}/streamer/streamer_socket.ex (85%) rename lib/{pleroma_web => pleroma/web}/streamer/supervisor.ex (62%) rename lib/{pleroma_web => pleroma/web}/streamer/worker.ex (97%) rename lib/{pleroma_web => pleroma/web}/views/streamer_view.ex (97%) rename test/{web/activity_pub => activity/ir}/topics_test.exs (89%) rename test/{pleroma_web => web}/streamer/ping_test.exs (92%) rename test/{pleroma_web => web}/streamer/state_test.exs (93%) rename test/{pleroma_web => web}/streamer/streamer_test.exs (98%) diff --git a/lib/pleroma/web/activity_pub/topics.ex b/lib/pleroma/activity/ir/topics.ex similarity index 94% rename from lib/pleroma/web/activity_pub/topics.ex rename to lib/pleroma/activity/ir/topics.ex index e1fa33f240..010897abcc 100644 --- a/lib/pleroma/web/activity_pub/topics.ex +++ b/lib/pleroma/activity/ir/topics.ex @@ -2,7 +2,7 @@ # Copyright © 2017-2019 Pleroma Authors # SPDX-License-Identifier: AGPL-3.0-only -defmodule Pleroma.Web.ActivityPub.Topics do +defmodule Pleroma.Activity.Ir.Topics do alias Pleroma.Object alias Pleroma.Web.ActivityPub.Visibility @@ -49,7 +49,7 @@ defp item_creation_tags(tags, _, _) do defp hashtags_to_topics(%{data: %{"tag" => tags}}) do tags - |> Enum.filter(fn tag -> is_bitstring(tag) end) + |> Enum.filter(&is_bitstring(&1)) |> Enum.map(fn tag -> "hashtag:" <> tag end) end diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 88a4bc65d5..45ea83c389 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -142,7 +142,7 @@ defp oauth_cleanup_enabled?, defp streamer_child(:test), do: [] defp streamer_child(_) do - [PleromaWeb.Streamer.supervisor()] + [Pleroma.Web.Streamer.supervisor()] end defp oauth_cleanup_child(true), diff --git a/lib/pleroma/notification.ex b/lib/pleroma/notification.ex index 046214a51c..8012389ac3 100644 --- a/lib/pleroma/notification.ex +++ b/lib/pleroma/notification.ex @@ -13,7 +13,7 @@ defmodule Pleroma.Notification do alias Pleroma.User alias Pleroma.Web.CommonAPI.Utils alias Pleroma.Web.Push - alias PleromaWeb.Streamer + alias Pleroma.Web.Streamer import Ecto.Query import Ecto.Changeset diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index edd91e46e9..e11298a1e8 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -4,6 +4,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do alias Pleroma.Activity + alias Pleroma.Activity.Ir.Topics alias Pleroma.Config alias Pleroma.Conversation alias Pleroma.Notification @@ -15,10 +16,9 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do alias Pleroma.Upload alias Pleroma.User alias Pleroma.Web.ActivityPub.MRF - alias Pleroma.Web.ActivityPub.Topics alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.WebFinger - alias PleromaWeb.Streamer + alias Pleroma.Web.Streamer import Ecto.Query import Pleroma.Web.ActivityPub.Utils diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex index c32246497e..3c26eb4069 100644 --- a/lib/pleroma/web/mastodon_api/websocket_handler.ex +++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex @@ -8,7 +8,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do alias Pleroma.Repo alias Pleroma.User alias Pleroma.Web.OAuth.Token - alias PleromaWeb.Streamer + alias Pleroma.Web.Streamer @behaviour :cowboy_websocket @@ -25,7 +25,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do ] @anonymous_streams ["public", "public:local", "hashtag"] - # Handled by periodic keepalive in PleromaWeb.Streamer. + # Handled by periodic keepalive in Pleroma.Web.Streamer.Ping. @timeout :infinity def init(%{qs: qs} = req, state) do diff --git a/lib/pleroma_web/streamer/ping.ex b/lib/pleroma/web/streamer/ping.ex similarity index 86% rename from lib/pleroma_web/streamer/ping.ex rename to lib/pleroma/web/streamer/ping.ex index f3f9c7a0b9..f77cbb95c6 100644 --- a/lib/pleroma_web/streamer/ping.ex +++ b/lib/pleroma/web/streamer/ping.ex @@ -1,9 +1,9 @@ -defmodule PleromaWeb.Streamer.Ping do +defmodule Pleroma.Web.Streamer.Ping do use GenServer require Logger - alias PleromaWeb.Streamer.State - alias PleromaWeb.Streamer.StreamerSocket + alias Pleroma.Web.Streamer.State + alias Pleroma.Web.Streamer.StreamerSocket @keepalive_interval :timer.seconds(30) diff --git a/lib/pleroma_web/streamer/state.ex b/lib/pleroma/web/streamer/state.ex similarity index 86% rename from lib/pleroma_web/streamer/state.ex rename to lib/pleroma/web/streamer/state.ex index 3fef9f48a8..7b5199068f 100644 --- a/lib/pleroma_web/streamer/state.ex +++ b/lib/pleroma/web/streamer/state.ex @@ -1,8 +1,8 @@ -defmodule PleromaWeb.Streamer.State do +defmodule Pleroma.Web.Streamer.State do use GenServer require Logger - alias PleromaWeb.Streamer.StreamerSocket + alias Pleroma.Web.Streamer.StreamerSocket def start_link(_) do GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__) @@ -39,14 +39,14 @@ def handle_call({:add, socket, topic}, _from, %{sockets: sockets} = state) do |> List.insert_at(0, stream_socket) |> Enum.uniq() - state = Kernel.put_in(state, [:sockets, internal_topic], sockets_for_topic) + state = put_in(state, [:sockets, internal_topic], sockets_for_topic) Logger.debug("Got new conn for #{topic}") {:reply, state, state} end def handle_call({:remove, socket, topic}, _from, %{sockets: sockets} = state) do internal_topic = internal_topic(topic, socket) - stream_socket = PleromaWeb.Streamer.StreamerSocket.from_socket(socket) + stream_socket = StreamerSocket.from_socket(socket) sockets_for_topic = sockets diff --git a/lib/pleroma_web/streamer/streamer.ex b/lib/pleroma/web/streamer/streamer.ex similarity index 85% rename from lib/pleroma_web/streamer/streamer.ex rename to lib/pleroma/web/streamer/streamer.ex index 396444db8b..8cf719277f 100644 --- a/lib/pleroma_web/streamer/streamer.ex +++ b/lib/pleroma/web/streamer/streamer.ex @@ -2,9 +2,9 @@ # Copyright © 2017-2019 Pleroma Authors # SPDX-License-Identifier: AGPL-3.0-only -defmodule PleromaWeb.Streamer do - alias PleromaWeb.Streamer.State - alias PleromaWeb.Streamer.Worker +defmodule Pleroma.Web.Streamer do + alias Pleroma.Web.Streamer.State + alias Pleroma.Web.Streamer.Worker @timeout 60_000 @mix_env Mix.env() @@ -33,7 +33,7 @@ def stream(topics, items) do end end - def supervisor, do: PleromaWeb.Streamer.Supervisor + def supervisor, do: Pleroma.Web.Streamer.Supervisor defp should_send? do handle_should_send(@mix_env) diff --git a/lib/pleroma_web/streamer/streamer_socket.ex b/lib/pleroma/web/streamer/streamer_socket.ex similarity index 85% rename from lib/pleroma_web/streamer/streamer_socket.ex rename to lib/pleroma/web/streamer/streamer_socket.ex index 3bb55f5f7d..f006c03061 100644 --- a/lib/pleroma_web/streamer/streamer_socket.ex +++ b/lib/pleroma/web/streamer/streamer_socket.ex @@ -1,8 +1,8 @@ -defmodule PleromaWeb.Streamer.StreamerSocket do +defmodule Pleroma.Web.Streamer.StreamerSocket do defstruct transport_pid: nil, user: nil alias Pleroma.User - alias PleromaWeb.Streamer.StreamerSocket + alias Pleroma.Web.Streamer.StreamerSocket def from_socket(%{ transport_pid: transport_pid, diff --git a/lib/pleroma_web/streamer/supervisor.ex b/lib/pleroma/web/streamer/supervisor.ex similarity index 62% rename from lib/pleroma_web/streamer/supervisor.ex rename to lib/pleroma/web/streamer/supervisor.ex index a162ffca61..6afe19323a 100644 --- a/lib/pleroma_web/streamer/supervisor.ex +++ b/lib/pleroma/web/streamer/supervisor.ex @@ -1,4 +1,4 @@ -defmodule PleromaWeb.Streamer.Supervisor do +defmodule Pleroma.Web.Streamer.Supervisor do use Supervisor def start_link(opts) do @@ -7,25 +7,25 @@ def start_link(opts) do def init(args) do children = [ - {PleromaWeb.Streamer.State, args}, - {PleromaWeb.Streamer.Ping, args}, + {Pleroma.Web.Streamer.State, args}, + {Pleroma.Web.Streamer.Ping, args}, :poolboy.child_spec(:streamer_worker, poolboy_config()) ] - opts = [strategy: :one_for_one, name: PleromaWeb.Streamer.Supervisor] + opts = [strategy: :one_for_one, name: Pleroma.Web.Streamer.Supervisor] Supervisor.init(children, opts) end defp poolboy_config do opts = - Application.get_env(:pleroma, :streamer, + Pleroma.Config.get(:streamer, workers: 3, overflow_workers: 2 ) [ {:name, {:local, :streamer_worker}}, - {:worker_module, PleromaWeb.Streamer.Worker}, + {:worker_module, Pleroma.Web.Streamer.Worker}, {:size, opts[:workers]}, {:max_overflow, opts[:overflow_workers]} ] diff --git a/lib/pleroma_web/streamer/worker.ex b/lib/pleroma/web/streamer/worker.ex similarity index 97% rename from lib/pleroma_web/streamer/worker.ex rename to lib/pleroma/web/streamer/worker.ex index b76184d9a8..5804508eb1 100644 --- a/lib/pleroma_web/streamer/worker.ex +++ b/lib/pleroma/web/streamer/worker.ex @@ -1,4 +1,4 @@ -defmodule PleromaWeb.Streamer.Worker do +defmodule Pleroma.Web.Streamer.Worker do use GenServer require Logger @@ -12,9 +12,9 @@ defmodule PleromaWeb.Streamer.Worker do alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.ActivityPub.Visibility alias Pleroma.Web.CommonAPI - alias PleromaWeb.Streamer.State - alias PleromaWeb.Streamer.StreamerSocket - alias PleromaWeb.StreamerView + alias Pleroma.Web.Streamer.State + alias Pleroma.Web.Streamer.StreamerSocket + alias Pleroma.Web.StreamerView def start_link(_) do GenServer.start_link(__MODULE__, %{}, []) diff --git a/lib/pleroma_web/views/streamer_view.ex b/lib/pleroma/web/views/streamer_view.ex similarity index 97% rename from lib/pleroma_web/views/streamer_view.ex rename to lib/pleroma/web/views/streamer_view.ex index 67e8556fc6..b13030fa0a 100644 --- a/lib/pleroma_web/views/streamer_view.ex +++ b/lib/pleroma/web/views/streamer_view.ex @@ -2,7 +2,7 @@ # Copyright © 2017-2019 Pleroma Authors # SPDX-License-Identifier: AGPL-3.0-only -defmodule PleromaWeb.StreamerView do +defmodule Pleroma.Web.StreamerView do use Pleroma.Web, :view alias Pleroma.Activity diff --git a/test/web/activity_pub/topics_test.exs b/test/activity/ir/topics_test.exs similarity index 89% rename from test/web/activity_pub/topics_test.exs rename to test/activity/ir/topics_test.exs index 9e550e0b52..70b037b210 100644 --- a/test/web/activity_pub/topics_test.exs +++ b/test/activity/ir/topics_test.exs @@ -1,9 +1,9 @@ -defmodule Pleroma.Web.ActivityPub.TopicsTest do +defmodule Pleroma.Activity.Ir.TopicsTest do use Pleroma.DataCase alias Pleroma.Activity alias Pleroma.Object - alias Pleroma.Web.ActivityPub.Topics + alias Pleroma.Activity.Ir.Topics require Pleroma.Constants @@ -82,6 +82,17 @@ test "converts tags to hash tags", %{activity: %{object: %{data: data} = object} assert Enum.member?(topics, "hashtag:foo") assert Enum.member?(topics, "hashtag:bar") end + + test "only converts strinngs to hash tags", %{ + activity: %{object: %{data: data} = object} = activity + } do + tagged_data = Map.put(data, "tag", [2]) + activity = %{activity | object: %{object | data: tagged_data}} + + topics = Topics.get_activity_topics(activity) + + refute Enum.member?(topics, "hashtag:2") + end end describe "public visibility create events with attachments" do diff --git a/test/notification_test.exs b/test/notification_test.exs index 9eea77fb64..d68d4485fa 100644 --- a/test/notification_test.exs +++ b/test/notification_test.exs @@ -11,7 +11,7 @@ defmodule Pleroma.NotificationTest do alias Pleroma.User alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.CommonAPI - alias PleromaWeb.Streamer + alias Pleroma.Web.Streamer describe "create_notifications" do test "notifies someone when they are directly addressed" do diff --git a/test/support/conn_case.ex b/test/support/conn_case.ex index 9aedcfce62..b39c706774 100644 --- a/test/support/conn_case.ex +++ b/test/support/conn_case.ex @@ -41,7 +41,7 @@ defmodule Pleroma.Web.ConnCase do end if tags[:needs_streamer] do - start_supervised(PleromaWeb.Streamer.supervisor()) + start_supervised(Pleroma.Web.Streamer.supervisor()) end {:ok, conn: Phoenix.ConnTest.build_conn()} diff --git a/test/support/data_case.ex b/test/support/data_case.ex index 88782bfbbc..17fa152140 100644 --- a/test/support/data_case.ex +++ b/test/support/data_case.ex @@ -40,7 +40,7 @@ defmodule Pleroma.DataCase do end if tags[:needs_streamer] do - start_supervised(PleromaWeb.Streamer.supervisor()) + start_supervised(Pleroma.Web.Streamer.supervisor()) end :ok diff --git a/test/web/activity_pub/activity_pub_test.exs b/test/web/activity_pub/activity_pub_test.exs index 359e972177..ed6f201050 100644 --- a/test/web/activity_pub/activity_pub_test.exs +++ b/test/web/activity_pub/activity_pub_test.exs @@ -34,11 +34,11 @@ test "it streams them out" do conversation.participations |> Repo.preload(:user) - with_mock PleromaWeb.Streamer, + with_mock Pleroma.Web.Streamer, stream: fn _, _ -> nil end do ActivityPub.stream_out_participations(conversation.participations) - assert called(PleromaWeb.Streamer.stream("participation", participations)) + assert called(Pleroma.Web.Streamer.stream("participation", participations)) end end end diff --git a/test/pleroma_web/streamer/ping_test.exs b/test/web/streamer/ping_test.exs similarity index 92% rename from test/pleroma_web/streamer/ping_test.exs rename to test/web/streamer/ping_test.exs index 856e2b7062..3d52c00e41 100644 --- a/test/pleroma_web/streamer/ping_test.exs +++ b/test/web/streamer/ping_test.exs @@ -2,11 +2,11 @@ # Copyright © 2017-2019 Pleroma Authors # SPDX-License-Identifier: AGPL-3.0-only -defmodule PleromaWeb.PingTest do +defmodule Pleroma.Web.PingTest do use Pleroma.DataCase import Pleroma.Factory - alias PleromaWeb.Streamer + alias Pleroma.Web.Streamer setup do start_supervised({Streamer.supervisor(), [ping_interval: 30]}) diff --git a/test/pleroma_web/streamer/state_test.exs b/test/web/streamer/state_test.exs similarity index 93% rename from test/pleroma_web/streamer/state_test.exs rename to test/web/streamer/state_test.exs index 6c39547c39..d1aeac541a 100644 --- a/test/pleroma_web/streamer/state_test.exs +++ b/test/web/streamer/state_test.exs @@ -2,12 +2,12 @@ # Copyright © 2017-2019 Pleroma Authors # SPDX-License-Identifier: AGPL-3.0-only -defmodule PleromaWeb.StateTest do +defmodule Pleroma.Web.StateTest do use Pleroma.DataCase import Pleroma.Factory - alias PleromaWeb.Streamer - alias PleromaWeb.Streamer.StreamerSocket + alias Pleroma.Web.Streamer + alias Pleroma.Web.Streamer.StreamerSocket @moduletag needs_streamer: true diff --git a/test/pleroma_web/streamer/streamer_test.exs b/test/web/streamer/streamer_test.exs similarity index 98% rename from test/pleroma_web/streamer/streamer_test.exs rename to test/web/streamer/streamer_test.exs index 06fc33aa6f..88847e20f5 100644 --- a/test/pleroma_web/streamer/streamer_test.exs +++ b/test/web/streamer/streamer_test.exs @@ -2,16 +2,17 @@ # Copyright © 2017-2018 Pleroma Authors # SPDX-License-Identifier: AGPL-3.0-only -defmodule PleromaWeb.StreamerTest do +defmodule Pleroma.Web.StreamerTest do use Pleroma.DataCase + import Pleroma.Factory + alias Pleroma.List alias Pleroma.User alias Pleroma.Web.CommonAPI - alias PleromaWeb.Streamer - import Pleroma.Factory - alias PleromaWeb.Streamer.StreamerSocket - alias PleromaWeb.Streamer.Worker + alias Pleroma.Web.Streamer + alias Pleroma.Web.Streamer.StreamerSocket + alias Pleroma.Web.Streamer.Worker @moduletag needs_streamer: true clear_config_all([:instance, :skip_thread_containment]) -- GitLab From 571275ec14c1816d386d7a066b5d0116291389f7 Mon Sep 17 00:00:00 2001 From: stwf Date: Thu, 12 Sep 2019 12:25:58 -0400 Subject: [PATCH 7/7] cleanup --- lib/pleroma/web/activity_pub/activity_pub.ex | 2 +- test/activity/ir/topics_test.exs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index e11298a1e8..d629f23953 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -17,8 +17,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do alias Pleroma.User alias Pleroma.Web.ActivityPub.MRF alias Pleroma.Web.ActivityPub.Transmogrifier - alias Pleroma.Web.WebFinger alias Pleroma.Web.Streamer + alias Pleroma.Web.WebFinger import Ecto.Query import Pleroma.Web.ActivityPub.Utils diff --git a/test/activity/ir/topics_test.exs b/test/activity/ir/topics_test.exs index 70b037b210..e75f83586a 100644 --- a/test/activity/ir/topics_test.exs +++ b/test/activity/ir/topics_test.exs @@ -2,8 +2,8 @@ defmodule Pleroma.Activity.Ir.TopicsTest do use Pleroma.DataCase alias Pleroma.Activity - alias Pleroma.Object alias Pleroma.Activity.Ir.Topics + alias Pleroma.Object require Pleroma.Constants -- GitLab