optimization for group reports

parent 5178c8db
Pipeline #22947 passed with stages
in 7 minutes and 59 seconds
...@@ -98,9 +98,11 @@ def query_timelines(user) do ...@@ -98,9 +98,11 @@ def query_timelines(user) do
end, end,
"Rendering favorites timeline" => fn -> "Rendering favorites timeline" => fn ->
conn = Phoenix.ConnTest.build_conn(:get, "http://localhost:4001/api/v1/favourites", nil) conn = Phoenix.ConnTest.build_conn(:get, "http://localhost:4001/api/v1/favourites", nil)
Pleroma.Web.MastodonAPI.StatusController.favourites( Pleroma.Web.MastodonAPI.StatusController.favourites(
%Plug.Conn{conn | %Plug.Conn{
assigns: %{user: user}, conn
| assigns: %{user: user},
query_params: %{"limit" => "0"}, query_params: %{"limit" => "0"},
body_params: %{}, body_params: %{},
cookies: %{}, cookies: %{},
...@@ -115,7 +117,6 @@ def query_timelines(user) do ...@@ -115,7 +117,6 @@ def query_timelines(user) do
phoenix_format: "json", phoenix_format: "json",
phoenix_layout: {Pleroma.Web.LayoutView, "app.html"}, phoenix_layout: {Pleroma.Web.LayoutView, "app.html"},
phoenix_recycled: true, phoenix_recycled: true,
phoenix_view: Pleroma.Web.MastodonAPI.StatusView, phoenix_view: Pleroma.Web.MastodonAPI.StatusView,
plug_session: %{"user_id" => user.id}, plug_session: %{"user_id" => user.id},
plug_session_fetch: :done, plug_session_fetch: :done,
...@@ -123,8 +124,9 @@ def query_timelines(user) do ...@@ -123,8 +124,9 @@ def query_timelines(user) do
plug_skip_csrf_protection: true plug_skip_csrf_protection: true
} }
}, },
%{}) %{}
end, )
end
}) })
end end
...@@ -257,4 +259,22 @@ def query_long_thread(user, activity) do ...@@ -257,4 +259,22 @@ def query_long_thread(user, activity) do
end end
}) })
end end
def query_flags_group_reports do
statuses = Pleroma.Web.ActivityPub.Utils.get_reported_activities()
Benchee.run(%{
"Old method" => fn ->
Pleroma.Web.AdminAPI.ReportView.render(
"index_grouped.json",
Pleroma.Web.ActivityPub.Utils.get_reports_grouped_by_status(statuses)
)
end,
"New method" => fn ->
Pleroma.Web.AdminAPI.ReportView.render("index_grouped_new.json", %{
groups: Pleroma.Web.ActivityPub.Utils.get_grouped_reports()
})
end
})
end
end end
defmodule Pleroma.LoadTesting.Generator do defmodule Pleroma.LoadTesting.Generator do
use Pleroma.LoadTesting.Helper use Pleroma.LoadTesting.Helper
import Ecto.Query
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
def generate_like_activities(user, posts) do def generate_like_activities(user, posts) do
...@@ -406,4 +409,49 @@ defp do_generate_non_visible_post(not_friend, users) do ...@@ -406,4 +409,49 @@ defp do_generate_non_visible_post(not_friend, users) do
CommonAPI.post(user, post) CommonAPI.post(user, post)
end) end)
end end
def generate_flags(remote_users, users) do
IO.puts("Starting generating 100 flag activities...")
{time, _} =
:timer.tc(fn ->
do_generate_flags(remote_users, users)
end)
IO.puts("Inserting flag activities take #{to_sec(time)} sec.\n")
end
defp do_generate_flags(remote_users, users) do
Task.async_stream(
1..100,
fn _ ->
do_generate_flag(Enum.random(remote_users), Enum.random(users))
end,
max_concurrency: 30,
timeout: 30_000
)
|> Stream.run()
end
defp do_generate_flag(actor, user) do
limit = Enum.random(1..3)
activities =
from(a in Pleroma.Activity,
where: a.local == true,
where: a.actor == ^user.ap_id,
order_by: fragment("RANDOM()"),
limit: ^limit
)
|> Repo.all()
Pleroma.Web.ActivityPub.ActivityPub.flag(%{
context: Pleroma.Web.ActivityPub.Utils.generate_context_id(),
actor: actor,
account: user,
statuses: activities,
content: "Some content",
forward: false
})
end
end end
...@@ -101,7 +101,8 @@ def run(args) do ...@@ -101,7 +101,8 @@ def run(args) do
generate_remote_activities(user, remote_users) generate_remote_activities(user, remote_users)
generate_like_activities( generate_like_activities(
user, Pleroma.Repo.all(Pleroma.Activity.Queries.by_type("Create")) user,
Pleroma.Repo.all(Pleroma.Activity.Queries.by_type("Create"))
) )
generate_dms(user, users, opts) generate_dms(user, users, opts)
...@@ -110,6 +111,8 @@ def run(args) do ...@@ -110,6 +111,8 @@ def run(args) do
generate_non_visible_message(user, users) generate_non_visible_message(user, users)
generate_flags(remote_users, users)
IO.puts("Users in DB: #{Repo.aggregate(from(u in User), :count, :id)}") IO.puts("Users in DB: #{Repo.aggregate(from(u in User), :count, :id)}")
IO.puts("Activities in DB: #{Repo.aggregate(from(a in Pleroma.Activity), :count, :id)}") IO.puts("Activities in DB: #{Repo.aggregate(from(a in Pleroma.Activity), :count, :id)}")
...@@ -127,6 +130,7 @@ def run(args) do ...@@ -127,6 +130,7 @@ def run(args) do
query_long_thread(user, activity) query_long_thread(user, activity)
Pleroma.Config.put([:instance, :skip_thread_containment], false) Pleroma.Config.put([:instance, :skip_thread_containment], false)
query_timelines(user) query_timelines(user)
query_flags_group_reports()
end end
defp clean_tables do defp clean_tables do
...@@ -134,5 +138,6 @@ defp clean_tables do ...@@ -134,5 +138,6 @@ defp clean_tables do
Ecto.Adapters.SQL.query!(Repo, "TRUNCATE users CASCADE;") Ecto.Adapters.SQL.query!(Repo, "TRUNCATE users CASCADE;")
Ecto.Adapters.SQL.query!(Repo, "TRUNCATE activities CASCADE;") Ecto.Adapters.SQL.query!(Repo, "TRUNCATE activities CASCADE;")
Ecto.Adapters.SQL.query!(Repo, "TRUNCATE objects CASCADE;") Ecto.Adapters.SQL.query!(Repo, "TRUNCATE objects CASCADE;")
Ecto.Adapters.SQL.query!(Repo, "TRUNCATE oban_jobs CASCADE;")
end end
end end
...@@ -190,9 +190,33 @@ def get_by_id_with_object(id) do ...@@ -190,9 +190,33 @@ def get_by_id_with_object(id) do
|> Repo.one() |> Repo.one()
end end
def all_by_ids(ids) do
ids
|> all_by_ids_query()
|> Repo.all()
end
def all_by_ids_query(query \\ Activity, ids) do
from(a in query, where: a.id in ^ids)
end
def all_by_ids_with_object(ids) do def all_by_ids_with_object(ids) do
Activity ids
|> where([a], a.id in ^ids) |> all_by_ids_query()
|> with_preloaded_object()
|> Repo.all()
end
def all_by_ap_ids_query(query \\ Activity, ap_ids) do
from(
a in query,
where: fragment("(?)->>'id' = ANY(?)", a.data, ^ap_ids)
)
end
def all_by_ap_ids_with_object(ap_ids) do
ap_ids
|> all_by_ap_ids_query()
|> with_preloaded_object() |> with_preloaded_object()
|> Repo.all() |> Repo.all()
end end
...@@ -330,4 +354,6 @@ def direct_conversation_id(activity, for_user) do ...@@ -330,4 +354,6 @@ def direct_conversation_id(activity, for_user) do
_ -> nil _ -> nil
end end
end end
def flags_activities_query(query \\ Activity), do: Queries.by_type(query, "Flag")
end end
...@@ -713,6 +713,11 @@ def get_all_by_ids(ids) do ...@@ -713,6 +713,11 @@ def get_all_by_ids(ids) do
|> Repo.all() |> Repo.all()
end end
def get_all_by_ap_ids(ap_ids) do
from(u in __MODULE__, where: u.ap_id in ^ap_ids)
|> Repo.all()
end
# This is mostly an SPC migration fix. This guesses the user nickname by taking the last part # This is mostly an SPC migration fix. This guesses the user nickname by taking the last part
# of the ap_id and the domain and tries to get that user # of the ap_id and the domain and tries to get that user
def get_by_guessed_nickname(ap_id) do def get_by_guessed_nickname(ap_id) do
......
...@@ -236,7 +236,7 @@ def create(%{to: to, actor: actor, context: context, object: object} = params, f ...@@ -236,7 +236,7 @@ def create(%{to: to, actor: actor, context: context, object: object} = params, f
# only accept false as false value # only accept false as false value
local = !(params[:local] == false) local = !(params[:local] == false)
published = params[:published] published = params[:published]
quick_insert? = Pleroma.Config.get([:env]) == :benchmark quick_insert? = Config.get([:env]) == :benchmark
with create_data <- with create_data <-
make_create_data( make_create_data(
...@@ -525,6 +525,8 @@ def flag( ...@@ -525,6 +525,8 @@ def flag(
additional = params[:additional] || %{} additional = params[:additional] || %{}
quick_insert? = Config.get(:env) == :benchmark
additional = additional =
if forward do if forward do
Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]}) Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
...@@ -534,6 +536,7 @@ def flag( ...@@ -534,6 +536,7 @@ def flag(
with flag_data <- make_flag_data(params, additional), with flag_data <- make_flag_data(params, additional),
{:ok, activity} <- insert(flag_data, local), {:ok, activity} <- insert(flag_data, local),
{:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
{:ok, stripped_activity} <- strip_report_status_data(activity), {:ok, stripped_activity} <- strip_report_status_data(activity),
:ok <- maybe_federate(stripped_activity) do :ok <- maybe_federate(stripped_activity) do
Enum.each(User.all_superusers(), fn superuser -> Enum.each(User.all_superusers(), fn superuser ->
...@@ -543,6 +546,9 @@ def flag( ...@@ -543,6 +546,9 @@ def flag(
end) end)
{:ok, activity} {:ok, activity}
else
{:quick_insert, true, activity} -> {:ok, activity}
e -> e
end end
end end
...@@ -1321,7 +1327,7 @@ defp normalize_counter(_), do: 0 ...@@ -1321,7 +1327,7 @@ defp normalize_counter(_), do: 0
defp maybe_update_follow_information(data) do defp maybe_update_follow_information(data) do
with {:enabled, true} <- with {:enabled, true} <-
{:enabled, Pleroma.Config.get([:instance, :external_user_synchronization])}, {:enabled, Config.get([:instance, :external_user_synchronization])},
{:ok, info} <- fetch_follow_information_for_user(data) do {:ok, info} <- fetch_follow_information_for_user(data) do
info = Map.merge(data[:info] || %{}, info) info = Map.merge(data[:info] || %{}, info)
Map.put(data, :info, info) Map.put(data, :info, info)
......
...@@ -907,12 +907,124 @@ def get_reports_grouped_by_status(activity_ids) do ...@@ -907,12 +907,124 @@ def get_reports_grouped_by_status(activity_ids) do
} }
end end
@spec get_reported_activities() :: [ @spec get_grouped_reports() :: [
%{ %{
required(:activity) => String.t(), status: Activity.t(),
required(:date) => String.t() account: User.t(),
actors: [User.t()],
reports: [Activity.t()]
} }
] ]
def get_grouped_reports do
reports = Activity.flags_activities_query() |> Repo.all()
flags =
Enum.map(reports, fn %{
id: id,
actor: actor,
data: %{"object" => [account | statuses], "published" => date}
} ->
flag = %{
report_id: id,
actor: actor,
account: account,
date: date
}
Enum.map(statuses, fn
status when is_map(status) ->
Map.put(flag, :id, status["id"])
activity_id when is_binary(activity_id) ->
Map.put(flag, :id, activity_id)
end)
end)
ids = %{accounts: [], actors: [], reports: []}
{ids, groups} =
flags
|> List.flatten()
|> Enum.reduce({ids, %{}}, fn status, {ids, acc} ->
acc =
Map.update(
acc,
status.id,
%{
account: status.account,
actors: [status.actor],
reports: [status.report_id],
date: status.date
},
&update_reported_group(&1, status)
)
ids =
ids
|> Map.put(:accounts, [status.account | ids.accounts])
|> Map.put(:actors, [status.actor | ids.actors])
|> Map.put(:reports, [status.report_id | ids.reports])
{ids, acc}
end)
loaded_activities =
groups
|> Map.keys()
|> Activity.all_by_ap_ids_with_object()
|> Enum.reduce(%{}, fn activity, acc ->
Map.put(acc, activity.data["id"], activity)
end)
loaded_users =
(ids.accounts ++ ids.actors)
|> Enum.uniq()
|> User.get_all_by_ap_ids()
|> Enum.reduce(%{}, fn user, acc -> Map.put(acc, user.ap_id, user) end)
loaded_reports =
reports
|> Enum.reduce(%{}, fn report, acc -> Map.put(acc, report.id, report) end)
Enum.map(groups, fn {activity_id, group} ->
updated_actors =
group.actors
|> Enum.uniq()
|> Enum.map(&Map.get(loaded_users, &1))
updated_reports =
group.reports
|> Enum.uniq()
|> Enum.map(&Map.get(loaded_reports, &1))
group
|> Map.put(
:status,
Map.get(loaded_activities, activity_id, %{"id" => activity_id, "deleted" => true})
)
|> Map.put(
:account,
Map.get(loaded_users, group.account, %{"id" => group.account, "deleted" => true})
)
|> Map.put(:actors, updated_actors)
|> Map.put(:reports, updated_reports)
end)
end
defp update_reported_group(group, status) do
if NaiveDateTime.compare(
NaiveDateTime.from_iso8601!(status.date),
NaiveDateTime.from_iso8601!(group.date)
) == :gt do
Map.put(group, :date, status.date)
else
group
end
|> Map.put(:actors, [status.actor | group.actors])
|> Map.put(:reports, [status.report_id | group.reports])
end
@spec get_reported_activities() :: [String.t()]
def get_reported_activities do def get_reported_activities do
reported_activities_query = reported_activities_query =
from(a in Activity, from(a in Activity,
......
...@@ -61,6 +61,10 @@ def render("service.json", %{user: user}) do ...@@ -61,6 +61,10 @@ def render("service.json", %{user: user}) do
|> Map.merge(Utils.make_json_ld_header()) |> Map.merge(Utils.make_json_ld_header())
end end
def render("users.json", %{users: users}) do
render_many(users, Pleroma.Web.ActivityPub.UserView, "user.json")
end
# the instance itself is not a Person, but instead an Application # the instance itself is not a Person, but instead an Application
def render("user.json", %{user: %User{nickname: nil} = user}), def render("user.json", %{user: %User{nickname: nil} = user}),
do: render("service.json", %{user: user}) do: render("service.json", %{user: user})
......
...@@ -670,6 +670,12 @@ def list_grouped_reports(conn, _params) do ...@@ -670,6 +670,12 @@ def list_grouped_reports(conn, _params) do
|> render("index_grouped.json", Utils.get_reports_grouped_by_status(statuses)) |> render("index_grouped.json", Utils.get_reports_grouped_by_status(statuses))
end end
def list_grouped_reports_new(conn, _params) do
conn
|> put_view(ReportView)
|> render("index_grouped_new.json", %{groups: Utils.get_grouped_reports()})
end
def report_show(conn, %{"id" => id}) do def report_show(conn, %{"id" => id}) do
with %Activity{} = report <- Activity.get_by_id(id) do with %Activity{} = report <- Activity.get_by_id(id) do
conn conn
......
...@@ -44,6 +44,42 @@ def render("show.json", %{report: report, user: user, account: account, statuses ...@@ -44,6 +44,42 @@ def render("show.json", %{report: report, user: user, account: account, statuses
} }
end end
def render("index_grouped_new.json", %{groups: groups}) do
updated =
Enum.map(groups, fn report ->
status =
case report.status do
%Activity{} = activity -> StatusView.render("show.json", %{activity: activity})
_ -> report.status
end
|> Map.put_new("deleted", false)
report
|> Map.replace!(
:actors,
Enum.map(report[:actors], &merge_account_views/1)
)
|> Map.replace!(
:account,
merge_account_views(report[:account])
)
|> Map.replace!(
:status,
status
)
|> Map.replace!(
:reports,
report[:reports]
|> Enum.map(&Report.extract_report_info(&1))
|> Enum.map(&render(__MODULE__, "show.json", &1))
)
end)
%{
reports: updated
}
end
def render("index_grouped.json", %{groups: groups}) do def render("index_grouped.json", %{groups: groups}) do
reports = reports =
Enum.map(groups, fn group -> Enum.map(groups, fn group ->
......
...@@ -184,7 +184,8 @@ defmodule Pleroma.Web.Router do ...@@ -184,7 +184,8 @@ defmodule Pleroma.Web.Router do
patch("/users/resend_confirmation_email", AdminAPIController, :resend_confirmation_email) patch("/users/resend_confirmation_email", AdminAPIController, :resend_confirmation_email)
get("/reports", AdminAPIController, :list_reports) get("/reports", AdminAPIController, :list_reports)
get("/grouped_reports", AdminAPIController, :list_grouped_reports) get("/grouped_reports", AdminAPIController, :list_grouped_reports_new)
get("/grouped_reports_new", AdminAPIController, :list_grouped_reports_new)
get("/reports/:id", AdminAPIController, :report_show) get("/reports/:id", AdminAPIController, :report_show)
patch("/reports", AdminAPIController, :reports_update) patch("/reports", AdminAPIController, :reports_update)
post("/reports/:id/notes", AdminAPIController, :report_notes_create) post("/reports/:id/notes", AdminAPIController, :report_notes_create)
......
...@@ -1640,6 +1640,95 @@ test "returns 403 when requested by anonymous" do ...@@ -1640,6 +1640,95 @@ test "returns 403 when requested by anonymous" do
} }
end end
test "returns reports grouped by status new", %{
conn: conn,
first_status: first_status,
second_status: second_status,
third_status: third_status,
first_status_reports: first_status_reports,
second_status_reports: second_status_reports,
third_status_reports: third_status_reports,
target_user: target_user,
reporter: reporter
} do
response =
conn
|> get("/api/pleroma/admin/grouped_reports_new")
|> json_response(:ok)
assert length(response["reports"]) == 3
first_group = Enum.find(response["reports"], &(&1["status"]["id"] == first_status.id))
second_group = Enum.find(response["reports"], &(&1["status"]["id"] == second_status.id))
third_group = Enum.find(response["reports"], &(&1["status"]["id"] == third_status.id))
assert length(first_group["reports"]) == 3
assert length(second_group["reports"]) == 2
assert length(third_group["reports"]) == 1
assert first_group["date"] ==
Enum.max_by(first_status_reports, fn act ->
NaiveDateTime.from_iso8601!(act.data["published"])
end).data["published"]
assert first_group["status"] ==
Map.put(
stringify_keys(StatusView.render("show.json", %{activity: first_status})),
"deleted",
false
)
assert(first_group["account"]["id"] == target_user.id)
assert length(first_group["actors"]) == 1
assert hd(first_group["actors"])["id"] == reporter.id
assert Enum.map(first_group["reports"], & &1["id"]) --
Enum.map(first_status_reports, & &1.id) == []
assert second_group["date"] ==
Enum.max_by(second_status_reports, fn act ->
NaiveDateTime.from_iso8601!(act.data["published"])
end).data["published"]
assert second_group["status"] ==
Map.put(
stringify_keys(StatusView