Commit 7bc2bfb5 authored by Alexander Strizhakov's avatar Alexander Strizhakov Committed by Alexander Strizhakov

manual migration for update exisitng activities

parent abfe16f1
Pipeline #21779 passed with stages
in 14 minutes and 49 seconds
......@@ -69,3 +69,18 @@ mix pleroma.database update_users_following_followers_counts
```sh tab="From Source"
mix pleroma.database fix_likes_collections
```
## Fix thread_recipients for pre-existing activities
Updates `thread_recipients` field in `activities` table. Fixes thread_containment for pre-existing activities. By default task updates today activities.
### Options
- `--period` - run with special period. Possible values: `w` - last 7 days, `m` - last 30 days, `all` - for all the time.
```sh tab="OTP"
./bin/pleroma_ctl database fix_thread_recipients [<options>]
```
```sh tab="From Source"
mix pleroma.database fix_thread_recipients [<options>]
```
......@@ -3,14 +3,19 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Mix.Tasks.Pleroma.Database do
use Mix.Task
import Mix.Pleroma
alias Pleroma.Activity
alias Pleroma.Conversation
alias Pleroma.Object
alias Pleroma.Repo
alias Pleroma.RepoStreamer
alias Pleroma.User
require Logger
require Pleroma.Constants
import Mix.Pleroma
use Mix.Task
@shortdoc "A collection of database related tasks"
@moduledoc File.read!("docs/administration/CLI_tasks/database.md")
......@@ -113,7 +118,7 @@ def run(["fix_likes_collections"]) do
where: fragment("(?)->>'likes' is not null", object.data),
select: %{id: object.id, likes: fragment("(?)->>'likes'", object.data)}
)
|> Pleroma.RepoStreamer.chunk_stream(100)
|> RepoStreamer.chunk_stream(100)
|> Stream.each(fn objects ->
ids =
objects
......@@ -135,4 +140,117 @@ def run(["fix_likes_collections"]) do
end)
|> Stream.run()
end
def run(["fix_thread_recipients" | args]) do
import Ecto.Query
start_pleroma()
{options, [], []} =
OptionParser.parse(
args,
strict: [
period: :string
]
)
start =
if options[:period] do
date =
case options[:period] do
"w" -> Date.utc_today() |> Date.add(-7)
"m" -> Date.utc_today() |> Date.add(-30)
"all" -> nil
_ -> Date.utc_today()
end
if date do
{:ok, start} = NaiveDateTime.new(date, ~T[00:00:00])
start
end
end
# update root activities
query = from(a in Activity)
query =
if start do
where(query, [a], a.inserted_at >= ^start)
else
query
end
query
|> join(:inner, [activity], o in Object,
on:
fragment(
"(?->>'id') = COALESCE((?)->'object'->> 'id', (?)->>'object')",
o.data,
activity.data,
activity.data
)
)
|> preload([activity, object], object: object)
|> where(
[a, o],
fragment(
"?->>'inReplyTo' IS NULL",
o.data
)
)
|> RepoStreamer.chunk_stream(512)
|> Stream.each(fn chunk ->
Enum.each(chunk, fn %Activity{} = activity ->
thread_recipients =
Pleroma.Web.ActivityPub.ActivityPub.get_thread_recipients(activity.recipients)
Repo.update!(Ecto.Changeset.change(activity, thread_recipients: thread_recipients))
end)
end)
|> Stream.run()
# update activities with in_reply_to
query = from(a in Activity)
query =
if start do
where(query, [a], a.inserted_at >= ^start)
else
query
end
query
|> join(:inner, [activity], o in Object,
on:
fragment(
"(?->>'id') = COALESCE((?)->'object'->> 'id', (?)->>'object')",
o.data,
activity.data,
activity.data
)
)
|> preload([activity, object], object: object)
|> where(
[a, o],
fragment(
"?->>'inReplyTo' IS NOT NULL",
o.data
)
)
|> RepoStreamer.chunk_stream(512)
|> Stream.each(fn chunk ->
Enum.each(chunk, fn %Activity{} = activity ->
in_reply_to = Activity.get_in_reply_to_activity(activity)
thread_recipients =
Pleroma.Web.ActivityPub.ActivityPub.get_thread_recipients(
activity.recipients,
in_reply_to
)
Repo.update!(Ecto.Changeset.change(activity, thread_recipients: thread_recipients))
end)
end)
|> Stream.run()
end
end
......@@ -7,5 +7,10 @@ def change do
end
create_if_not_exists(index(:activities, [:thread_recipients], using: :gin))
Mix.Pleroma.shell_error(
"\nMigration to update pre-existing activities has been removed from automatic run due to long execution. If necessary, you can perform it manually with the choice of the period for updating.\n" <>
"Details: https://docs-develop.pleroma.social/backend/administration/CLI_tasks/database/#fix-thread_recipients-for-pre-existing-activities"
)
end
end
defmodule Pleroma.Repo.Migrations.InsertThreadRecipientsInActivities do
use Ecto.Migration
import Ecto.Query
def change do
# update root activities
from(a in Pleroma.Activity)
|> join(:inner, [activity], o in Pleroma.Object,
on:
fragment(
"(?->>'id') = COALESCE((?)->'object'->> 'id', (?)->>'object')",
o.data,
activity.data,
activity.data
)
)
|> preload([activity, object], object: object)
|> where(
[a, o],
fragment(
"?->>'inReplyTo' IS NULL",
o.data
)
)
|> Pleroma.RepoStreamer.chunk_stream(512)
|> Stream.each(fn chunk ->
Enum.each(chunk, fn %Pleroma.Activity{} = activity ->
thread_recipients =
Pleroma.Web.ActivityPub.ActivityPub.get_thread_recipients(activity.recipients)
Pleroma.Repo.update!(
Ecto.Changeset.change(activity, thread_recipients: thread_recipients)
)
end)
end)
|> Stream.run()
# update activities with in_reply_to
from(a in Pleroma.Activity)
|> join(:inner, [activity], o in Pleroma.Object,
on:
fragment(
"(?->>'id') = COALESCE((?)->'object'->> 'id', (?)->>'object')",
o.data,
activity.data,
activity.data
)
)
|> preload([activity, object], object: object)
|> where(
[a, o],
fragment(
"?->>'inReplyTo' IS NOT NULL",
o.data
)
)
|> Pleroma.RepoStreamer.chunk_stream(512)
|> Stream.each(fn chunk ->
Enum.each(chunk, fn %Pleroma.Activity{} = activity ->
in_reply_to = Pleroma.Activity.get_in_reply_to_activity(activity)
thread_recipients =
Pleroma.Web.ActivityPub.ActivityPub.get_thread_recipients(
activity.recipients,
in_reply_to
)
Pleroma.Repo.update!(
Ecto.Changeset.change(activity, thread_recipients: thread_recipients)
)
end)
end)
|> Stream.run()
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment