Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
pleroma
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
407
Issues
407
List
Boards
Labels
Service Desk
Milestones
Merge Requests
59
Merge Requests
59
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Operations
Operations
Incidents
Environments
Packages & Registries
Packages & Registries
Container Registry
Analytics
Analytics
CI / CD
Repository
Value Stream
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Pleroma
pleroma
Commits
4b10d15e
Commit
4b10d15e
authored
Jan 23, 2021
by
Ivan Tashkinov
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
[#3259] Background replication of objects' attachments into `media` table.
parent
4ade12bf
Pipeline
#34365
failed with stages
in 11 minutes and 58 seconds
Changes
9
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
434 additions
and
4 deletions
+434
-4
lib/pleroma/application.ex
lib/pleroma/application.ex
+2
-1
lib/pleroma/data_migration.ex
lib/pleroma/data_migration.ex
+46
-0
lib/pleroma/ecto_enums.ex
lib/pleroma/ecto_enums.ex
+8
-0
lib/pleroma/migrators/media_table_migrator.ex
lib/pleroma/migrators/media_table_migrator.ex
+299
-0
lib/pleroma/migrators/media_table_migrator/state.ex
lib/pleroma/migrators/media_table_migrator/state.ex
+31
-0
lib/pleroma/repo.ex
lib/pleroma/repo.ex
+3
-3
priv/repo/migrations/20210105195018_create_data_migrations.exs
...repo/migrations/20210105195018_create_data_migrations.exs
+17
-0
priv/repo/migrations/20210111172254_create_data_migration_failed_ids.exs
...tions/20210111172254_create_data_migration_failed_ids.exs
+14
-0
priv/repo/migrations/20210122100954_data_migration_create_populate_media_table.exs
...0122100954_data_migration_create_populate_media_table.exs
+14
-0
No files found.
lib/pleroma/application.ex
View file @
4b10d15e
...
...
@@ -104,7 +104,8 @@ def start(_type, _args) do
chat_child
(
chat_enabled?
())
++
[
Pleroma
.
Web
.
Endpoint
,
Pleroma
.
Gopher
.
Server
Pleroma
.
Gopher
.
Server
,
Pleroma
.
Migrators
.
MediaTableMigrator
]
# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
...
...
lib/pleroma/data_migration.ex
0 → 100644
View file @
4b10d15e
# Pleroma: A lightweight social networking server
# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule
Pleroma
.
DataMigration
do
use
Ecto
.
Schema
alias
Pleroma
.
DataMigration
alias
Pleroma
.
DataMigration
.
State
alias
Pleroma
.
Repo
import
Ecto
.
Changeset
schema
"data_migrations"
do
field
(
:name
,
:string
)
field
(
:state
,
State
,
default:
:pending
)
field
(
:feature_lock
,
:boolean
,
default:
false
)
field
(
:params
,
:map
,
default:
%{})
field
(
:data
,
:map
,
default:
%{})
timestamps
()
end
def
changeset
(
data_migration
,
params
\\
%{})
do
data_migration
|>
cast
(
params
,
[
:name
,
:state
,
:feature_lock
,
:params
,
:data
])
|>
validate_required
([
:name
])
|>
unique_constraint
(
:name
)
end
def
update
(
data_migration
,
params
\\
%{})
do
data_migration
|>
changeset
(
params
)
|>
Repo
.
update
()
end
def
update_state
(
data_migration
,
new_state
)
do
update
(
data_migration
,
%{
state:
new_state
})
end
def
get_by_name
(
name
)
do
Repo
.
get_by
(
DataMigration
,
name:
name
)
end
def
populate_media_table
,
do
:
get_by_name
(
"populate_media_table"
)
end
lib/pleroma/ecto_enums.ex
View file @
4b10d15e
...
...
@@ -17,3 +17,11 @@
follow_accept:
2
,
follow_reject:
3
)
defenum
(
Pleroma
.
DataMigration
.
State
,
pending:
1
,
running:
2
,
complete:
3
,
failed:
4
,
manual:
5
)
lib/pleroma/migrators/media_table_migrator.ex
0 → 100644
View file @
4b10d15e
# Pleroma: A lightweight social networking server
# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule
Pleroma
.
Migrators
.
MediaTableMigrator
do
use
GenServer
require
Logger
import
Ecto
.
Query
alias
__MODULE__
.
State
alias
Pleroma
.
Config
alias
Pleroma
.
DataMigration
alias
Pleroma
.
Media
alias
Pleroma
.
Object
alias
Pleroma
.
Repo
defdelegate
state
(),
to:
State
,
as:
:get
defdelegate
put_stat
(
key
,
value
),
to:
State
,
as:
:put
defdelegate
increment_stat
(
key
,
increment
),
to:
State
,
as:
:increment
defdelegate
data_migration
(),
to:
DataMigration
,
as:
:populate_media_table
@reg_name
{
:global
,
__MODULE__
}
def
whereis
,
do
:
GenServer
.
whereis
(
@reg_name
)
def
start_link
(
_
)
do
case
whereis
()
do
nil
->
GenServer
.
start_link
(
__MODULE__
,
nil
,
name:
@reg_name
)
pid
->
{
:ok
,
pid
}
end
end
@impl
true
def
init
(
_
)
do
{
:ok
,
nil
,
{
:continue
,
:init_state
}}
end
@impl
true
def
handle_continue
(
:init_state
,
_state
)
do
{
:ok
,
_
}
=
State
.
start_link
(
nil
)
update_status
(
:init
)
data_migration
=
data_migration
()
manual_migrations
=
Config
.
get
([
:instance
,
:manual_data_migrations
],
[])
cond
do
Config
.
get
(
:env
)
==
:test
->
update_status
(
:noop
)
is_nil
(
data_migration
)
->
update_status
(
:halt
,
"Data migration does not exist."
)
data_migration
.
state
==
:manual
or
data_migration
.
name
in
manual_migrations
->
update_status
(
:noop
,
"Data migration is in manual execution state."
)
data_migration
.
state
==
:complete
->
handle_success
(
data_migration
)
true
->
send
(
self
(),
:process_attachments
)
end
{
:noreply
,
nil
}
end
@impl
true
def
handle_info
(
:process_attachments
,
state
)
do
data_migration
=
data_migration
()
persistent_data
=
Map
.
take
(
data_migration
.
data
,
[
"max_processed_id"
])
{
:ok
,
data_migration
}
=
DataMigration
.
update
(
data_migration
,
%{
state:
:running
,
data:
persistent_data
})
update_status
(
:running
)
put_stat
(
:started_at
,
NaiveDateTime
.
utc_now
())
Logger
.
info
(
"Starting creating `media` records for objects' attachments..."
)
max_processed_id
=
data_migration
.
data
[
"max_processed_id"
]
||
0
query
()
|>
where
([
object
],
object
.
id
>
^
max_processed_id
)
|>
Repo
.
chunk_stream
(
100
,
:batches
,
timeout:
:infinity
)
|>
Stream
.
each
(
fn
objects
->
object_ids
=
Enum
.
map
(
objects
,
&
&1
.
id
)
failed_ids
=
objects
|>
Enum
.
map
(
&
process_object_attachments
(
&1
))
|>
Enum
.
filter
(
&
(
elem
(
&1
,
0
)
==
:error
))
|>
Enum
.
map
(
&
elem
(
&1
,
1
))
for
failed_id
<-
failed_ids
do
_
=
Repo
.
query
(
"INSERT INTO data_migration_failed_ids(data_migration_id, record_id) "
<>
"VALUES ($1, $2) ON CONFLICT DO NOTHING;"
,
[
data_migration
.
id
,
failed_id
]
)
end
_
=
Repo
.
query
(
"DELETE FROM data_migration_failed_ids "
<>
"WHERE data_migration_id = $1 AND record_id = ANY($2)"
,
[
data_migration
.
id
,
object_ids
--
failed_ids
]
)
max_object_id
=
Enum
.
at
(
object_ids
,
-
1
)
put_stat
(
:max_processed_id
,
max_object_id
)
increment_stat
(
:processed_count
,
length
(
object_ids
))
increment_stat
(
:failed_count
,
length
(
failed_ids
))
put_stat
(
:records_per_second
,
state
()[
:processed_count
]
/
Enum
.
max
([
NaiveDateTime
.
diff
(
NaiveDateTime
.
utc_now
(),
state
()[
:started_at
]),
1
])
)
persist_stats
(
data_migration
)
# A quick and dirty approach to controlling the load this background migration imposes
sleep_interval
=
Config
.
get
([
:populate_media_table
,
:sleep_interval_ms
],
0
)
Process
.
sleep
(
sleep_interval
)
end
)
|>
Stream
.
run
()
with
0
<-
failures_count
(
data_migration
.
id
)
do
{
:ok
,
data_migration
}
=
DataMigration
.
update_state
(
data_migration
,
:complete
)
handle_success
(
data_migration
)
else
_
->
_
=
DataMigration
.
update_state
(
data_migration
,
:failed
)
update_status
(
:failed
,
"Please check data_migration_failed_ids records."
)
end
{
:noreply
,
state
}
end
def
query
do
from
(
object
in
Object
,
where:
fragment
(
"(?)->'attachment' IS NOT NULL AND \
(?)->'attachment' != ANY(ARRAY['null'::jsonb, '[]'::jsonb])"
,
object
.
data
,
object
.
data
),
select:
%{
id:
object
.
id
,
attachment:
fragment
(
"(?)->'attachment'"
,
object
.
data
),
actor:
fragment
(
"(?)->'actor'"
,
object
.
data
)
}
)
end
defp
process_object_attachments
(
object
)
do
attachments
=
if
Map
.
has_key?
(
object
,
:attachment
),
do
:
object
.
attachment
,
else
:
object
.
data
[
"attachment"
]
actor
=
if
Map
.
has_key?
(
object
,
:actor
),
do
:
object
.
actor
,
else
:
object
.
data
[
"actor"
]
Repo
.
transaction
(
fn
->
with
{
_
,
true
}
<-
{
:any
,
Enum
.
any?
(
attachments
||
[],
&
is_nil
(
&1
[
"id"
]))},
updated_attachments
=
Enum
.
map
(
attachments
,
fn
attachment
->
if
is_nil
(
attachment
[
"id"
])
do
with
{
:ok
,
media
}
<-
Media
.
create_from_object_data
(
attachment
,
%{
actor:
actor
,
object_id:
object
.
id
})
do
Map
.
put
(
attachment
,
"id"
,
media
.
id
)
else
{
:error
,
e
}
->
error
=
"ERROR: could not process attachment of object
#{
object
.
id
}
: "
<>
"
#{
attachment
[
"href"
]
}
:
#{
inspect
(
e
)
}
"
Logger
.
error
(
error
)
Repo
.
rollback
(
object
.
id
)
end
else
attachment
end
end
),
{
:ok
,
_
}
<-
Object
.
update_data
(%
Object
{
id:
object
.
id
},
%{
"attachment"
=>
updated_attachments
})
do
object
.
id
else
{
:any
,
false
}
->
object
.
id
{
:error
,
e
}
->
error
=
"ERROR: could not update attachments of object
#{
object
.
id
}
:
#{
inspect
(
e
)
}
"
Logger
.
error
(
error
)
Repo
.
rollback
(
object
.
id
)
end
end
)
end
@doc
"Approximate count for current iteration (including processed records count)"
def
count
(
force
\\
false
,
timeout
\\
:infinity
)
do
stored_count
=
state
()[
:count
]
if
stored_count
&&
!force
do
stored_count
else
processed_count
=
state
()[
:processed_count
]
||
0
max_processed_id
=
data_migration
()
.
data
[
"max_processed_id"
]
||
0
query
=
where
(
query
(),
[
object
],
object
.
id
>
^
max_processed_id
)
count
=
Repo
.
aggregate
(
query
,
:count
,
:id
,
timeout:
timeout
)
+
processed_count
put_stat
(
:count
,
count
)
count
end
end
defp
persist_stats
(
data_migration
)
do
runner_state
=
Map
.
drop
(
state
(),
[
:status
])
_
=
DataMigration
.
update
(
data_migration
,
%{
data:
runner_state
})
end
defp
handle_success
(
_data_migration
)
do
update_status
(
:complete
)
end
def
failed_objects_query
do
from
(
o
in
Object
)
|>
join
(
:inner
,
[
o
],
dmf
in
fragment
(
"SELECT * FROM data_migration_failed_ids"
),
on:
dmf
.
record_id
==
o
.
id
)
|>
where
([
_o
,
dmf
],
dmf
.
data_migration_id
==
^
data_migration
()
.
id
)
|>
order_by
([
o
],
asc:
o
.
id
)
end
def
failures_count
(
data_migration_id
\\
nil
)
do
data_migration_id
=
data_migration_id
||
data_migration
()
.
id
with
{
:ok
,
%{
rows:
[[
count
]]}}
<-
Repo
.
query
(
"SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;"
,
[
data_migration_id
]
)
do
count
end
end
def
retry_failed
do
data_migration
=
data_migration
()
failed_objects_query
()
|>
Repo
.
chunk_stream
(
100
,
:one
)
|>
Stream
.
each
(
fn
object
->
with
{
:ok
,
_
}
<-
process_object_attachments
(
object
)
do
_
=
Repo
.
query
(
"DELETE FROM data_migration_failed_ids "
<>
"WHERE data_migration_id = $1 AND record_id = $2"
,
[
data_migration
.
id
,
object
.
id
]
)
end
end
)
|>
Stream
.
run
()
end
def
force_continue
do
send
(
whereis
(),
:process_attachments
)
end
def
force_restart
do
{
:ok
,
_
}
=
DataMigration
.
update
(
data_migration
(),
%{
state:
:pending
,
data:
%{}})
force_continue
()
end
def
force_complete
do
{
:ok
,
data_migration
}
=
DataMigration
.
update_state
(
data_migration
(),
:complete
)
handle_success
(
data_migration
)
end
defp
update_status
(
status
,
message
\\
nil
)
do
put_stat
(
:status
,
status
)
put_stat
(
:message
,
message
)
end
end
lib/pleroma/migrators/media_table_migrator/state.ex
0 → 100644
View file @
4b10d15e
# Pleroma: A lightweight social networking server
# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule
Pleroma
.
Migrators
.
MediaTableMigrator
.
State
do
use
Agent
@init_state
%{}
@reg_name
{
:global
,
__MODULE__
}
def
start_link
(
_
)
do
Agent
.
start_link
(
fn
->
@init_state
end
,
name:
@reg_name
)
end
def
get
do
Agent
.
get
(
@reg_name
,
&
&1
)
end
def
put
(
key
,
value
)
do
Agent
.
update
(
@reg_name
,
fn
state
->
Map
.
put
(
state
,
key
,
value
)
end
)
end
def
increment
(
key
,
increment
\\
1
)
do
Agent
.
update
(
@reg_name
,
fn
state
->
updated_value
=
(
state
[
key
]
||
0
)
+
increment
Map
.
put
(
state
,
key
,
updated_value
)
end
)
end
end
lib/pleroma/repo.ex
View file @
4b10d15e
...
...
@@ -63,8 +63,8 @@ def get_assoc(resource, association) do
iex> Pleroma.Repo.chunk_stream(Pleroma.Activity.Queries.by_actor(ap_id), 500, :batches)
"""
@spec
chunk_stream
(
Ecto
.
Query
.
t
(),
integer
(),
atom
())
::
Enumerable
.
t
()
def
chunk_stream
(
query
,
chunk_size
,
returns_as
\\
:one
)
do
# We don't actually need start and end func
it
ons of resource streaming,
def
chunk_stream
(
query
,
chunk_size
,
returns_as
\\
:one
,
query_options
\\
[]
)
do
# We don't actually need start and end func
ti
ons of resource streaming,
# but it seems to be the only way to not fetch records one-by-one and
# have individual records be the elements of the stream, instead of
# lists of records
...
...
@@ -76,7 +76,7 @@ def chunk_stream(query, chunk_size, returns_as \\ :one) do
|>
order_by
(
asc:
:id
)
|>
where
([
r
],
r
.
id
>
^
last_id
)
|>
limit
(
^
chunk_size
)
|>
all
()
|>
all
(
query_options
)
|>
case
do
[]
->
{
:halt
,
last_id
}
...
...
priv/repo/migrations/20210105195018_create_data_migrations.exs
0 → 100644
View file @
4b10d15e
defmodule
Pleroma
.
Repo
.
Migrations
.
CreateDataMigrations
do
use
Ecto
.
Migration
def
change
do
create_if_not_exists
table
(
:data_migrations
)
do
add
(
:name
,
:string
,
null:
false
)
add
(
:state
,
:integer
,
default:
1
)
add
(
:feature_lock
,
:boolean
,
default:
false
)
add
(
:params
,
:map
,
default:
%{})
add
(
:data
,
:map
,
default:
%{})
timestamps
()
end
create_if_not_exists
(
unique_index
(
:data_migrations
,
[
:name
]))
end
end
priv/repo/migrations/20210111172254_create_data_migration_failed_ids.exs
0 → 100644
View file @
4b10d15e
defmodule
Pleroma
.
Repo
.
Migrations
.
CreateDataMigrationFailedIds
do
use
Ecto
.
Migration
def
change
do
create_if_not_exists
table
(
:data_migration_failed_ids
,
primary_key:
false
)
do
add
(
:data_migration_id
,
references
(
:data_migrations
),
null:
false
)
add
(
:record_id
,
:bigint
,
null:
false
)
end
create_if_not_exists
(
unique_index
(
:data_migration_failed_ids
,
[
:data_migration_id
,
:record_id
])
)
end
end
priv/repo/migrations/20210122100954_data_migration_create_populate_media_table.exs
0 → 100644
View file @
4b10d15e
defmodule
Pleroma
.
Repo
.
Migrations
.
DataMigrationCreatePopulateMediaTable
do
use
Ecto
.
Migration
def
up
do
dt
=
NaiveDateTime
.
utc_now
()
execute
(
"INSERT INTO data_migrations(name, inserted_at, updated_at) "
<>
"VALUES ('populate_media_table', '
#{
dt
}
', '
#{
dt
}
') ON CONFLICT DO NOTHING;"
)
end
def
down
,
do
:
:ok
end
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment