From 2056efa714460faaf25f6bc03ab643f5a2e8cd3d Mon Sep 17 00:00:00 2001
From: eugenijm <eugenijm@protonmail.com>
Date: Wed, 3 Apr 2019 18:55:04 +0300
Subject: [PATCH] Add scheduler for sending scheduled activities to the queue

---
 config/config.exs                             | 12 ++--
 config/test.exs                               |  5 ++
 docs/config.md                                |  8 ++-
 lib/pleroma/application.ex                    |  3 +-
 lib/pleroma/object.ex                         |  8 ---
 lib/pleroma/scheduled_activity.ex             | 34 ++++++++---
 lib/pleroma/scheduled_activity_worker.ex      | 58 +++++++++++++++++++
 .../mastodon_api/mastodon_api_controller.ex   | 26 ++++++---
 .../views/scheduled_activity_view.ex          | 12 ++--
 test/scheduled_activity_test.exs              | 31 +---------
 test/scheduled_activity_worker_test.exs       | 19 ++++++
 .../mastodon_api_controller_test.exs          | 46 +++++++++++++++
 .../scheduled_activity_view_test.exs          |  2 +-
 13 files changed, 196 insertions(+), 68 deletions(-)
 create mode 100644 lib/pleroma/scheduled_activity_worker.ex
 create mode 100644 test/scheduled_activity_worker_test.exs

diff --git a/config/config.exs b/config/config.exs
index 79cef87e6b..8a977ece5d 100644
--- a/config/config.exs
+++ b/config/config.exs
@@ -361,16 +361,13 @@
 config :pleroma_job_queue, :queues,
   federator_incoming: 50,
   federator_outgoing: 50,
-  mailer: 10
+  mailer: 10,
+  scheduled_activities: 10
 
 config :pleroma, :fetch_initial_posts,
   enabled: false,
   pages: 5
 
-config :pleroma, Pleroma.ScheduledActivity,
-  daily_user_limit: 25,
-  total_user_limit: 100
-
 config :auto_linker,
   opts: [
     scheme: true,
@@ -396,6 +393,11 @@
 
 config :prometheus, Pleroma.Web.Endpoint.MetricsExporter, path: "/api/pleroma/app_metrics"
 
+config :pleroma, Pleroma.ScheduledActivity,
+  daily_user_limit: 25,
+  total_user_limit: 300,
+  enabled: true
+
 # Import environment specific config. This must remain at the bottom
 # of this file so it overrides the configuration defined above.
 import_config "#{Mix.env()}.exs"
diff --git a/config/test.exs b/config/test.exs
index 6a7b9067ee..894fa8d3d3 100644
--- a/config/test.exs
+++ b/config/test.exs
@@ -50,6 +50,11 @@
 
 config :pleroma_job_queue, disabled: true
 
+config :pleroma, Pleroma.ScheduledActivity,
+  daily_user_limit: 2,
+  total_user_limit: 3,
+  enabled: false
+
 try do
   import_config "test.secret.exs"
 rescue
diff --git a/docs/config.md b/docs/config.md
index df21beff36..ba0759e87c 100644
--- a/docs/config.md
+++ b/docs/config.md
@@ -317,6 +317,7 @@ Pleroma has the following queues:
 * `federator_outgoing` - Outgoing federation
 * `federator_incoming` - Incoming federation
 * `mailer` - Email sender, see [`Pleroma.Mailer`](#pleroma-mailer)
+* `scheduled_activities` - Scheduled activities, see [`Pleroma.ScheduledActivities`](#pleromascheduledactivity)
 
 Example:
 
@@ -413,7 +414,8 @@ Pleroma account will be created with the same name as the LDAP user name.
 * `Pleroma.Web.Auth.PleromaAuthenticator`: default database authenticator
 * `Pleroma.Web.Auth.LDAPAuthenticator`: LDAP authentication
 
-##  Pleroma.ScheduledActivity
+## Pleroma.ScheduledActivity
 
-* `daily_user_limit`: the number of scheduled activities a user is allowed to create in a single day
-* `total_user_limit`: the number of scheduled activities a user is allowed to create in total
+* `daily_user_limit`: the number of scheduled activities a user is allowed to create in a single day (Default: `25`)
+* `total_user_limit`: the number of scheduled activities a user is allowed to create in total (Default: `300`)
+* `enabled`: whether scheduled activities are sent to the job queue to be executed
diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex
index 1fc3fb7283..f0cb7d9a8a 100644
--- a/lib/pleroma/application.ex
+++ b/lib/pleroma/application.ex
@@ -104,7 +104,8 @@ def start(_type, _args) do
           ],
           id: :cachex_idem
         ),
-        worker(Pleroma.FlakeId, [])
+        worker(Pleroma.FlakeId, []),
+        worker(Pleroma.ScheduledActivityWorker, [])
       ] ++
         hackney_pool_children() ++
         [
diff --git a/lib/pleroma/object.ex b/lib/pleroma/object.ex
index 786d6296cf..013d621571 100644
--- a/lib/pleroma/object.ex
+++ b/lib/pleroma/object.ex
@@ -184,12 +184,4 @@ def decrease_replies_count(ap_id) do
       _ -> {:error, "Not found"}
     end
   end
-
-  def enforce_user_objects(user, object_ids) do
-    Object
-    |> where([o], fragment("?->>'actor' = ?", o.data, ^user.ap_id))
-    |> where([o], o.id in ^object_ids)
-    |> select([o], o.id)
-    |> Repo.all()
-  end
 end
diff --git a/lib/pleroma/scheduled_activity.ex b/lib/pleroma/scheduled_activity.ex
index 723eb6dc3f..de0e546992 100644
--- a/lib/pleroma/scheduled_activity.ex
+++ b/lib/pleroma/scheduled_activity.ex
@@ -6,7 +6,6 @@ defmodule Pleroma.ScheduledActivity do
   use Ecto.Schema
 
   alias Pleroma.Config
-  alias Pleroma.Object
   alias Pleroma.Repo
   alias Pleroma.ScheduledActivity
   alias Pleroma.User
@@ -37,8 +36,6 @@ defp with_media_attachments(
          %{changes: %{params: %{"media_ids" => media_ids} = params}} = changeset
        )
        when is_list(media_ids) do
-    user = User.get_cached_by_id(changeset.data.user_id)
-    media_ids = Object.enforce_user_objects(user, media_ids) |> Enum.map(&to_string(&1))
     media_attachments = Utils.attachments_from_ids(%{"media_ids" => media_ids})
 
     params =
@@ -79,8 +76,8 @@ def validate_scheduled_at(changeset) do
   def exceeds_daily_user_limit?(user_id, scheduled_at) do
     ScheduledActivity
     |> where(user_id: ^user_id)
-    |> where([s], type(s.scheduled_at, :date) == type(^scheduled_at, :date))
-    |> select([u], count(u.id))
+    |> where([sa], type(sa.scheduled_at, :date) == type(^scheduled_at, :date))
+    |> select([sa], count(sa.id))
     |> Repo.one()
     |> Kernel.>=(Config.get([ScheduledActivity, :daily_user_limit]))
   end
@@ -88,7 +85,7 @@ def exceeds_daily_user_limit?(user_id, scheduled_at) do
   def exceeds_total_user_limit?(user_id) do
     ScheduledActivity
     |> where(user_id: ^user_id)
-    |> select([u], count(u.id))
+    |> select([sa], count(sa.id))
     |> Repo.one()
     |> Kernel.>=(Config.get([ScheduledActivity, :total_user_limit]))
   end
@@ -125,19 +122,40 @@ def get(%User{} = user, scheduled_activity_id) do
     |> Repo.one()
   end
 
-  def update(scheduled_activity, attrs) do
+  def update(%ScheduledActivity{} = scheduled_activity, attrs) do
     scheduled_activity
     |> update_changeset(attrs)
     |> Repo.update()
   end
 
-  def delete(scheduled_activity) do
+  def delete(%ScheduledActivity{} = scheduled_activity) do
     scheduled_activity
     |> Repo.delete()
   end
 
+  def delete(id) when is_binary(id) or is_integer(id) do
+    ScheduledActivity
+    |> where(id: ^id)
+    |> select([sa], sa)
+    |> Repo.delete_all()
+    |> case do
+      {1, [scheduled_activity]} -> {:ok, scheduled_activity}
+      _ -> :error
+    end
+  end
+
   def for_user_query(%User{} = user) do
     ScheduledActivity
     |> where(user_id: ^user.id)
   end
+
+  def due_activities(offset \\ 0) do
+    naive_datetime =
+      NaiveDateTime.utc_now()
+      |> NaiveDateTime.add(offset, :millisecond)
+
+    ScheduledActivity
+    |> where([sa], sa.scheduled_at < ^naive_datetime)
+    |> Repo.all()
+  end
 end
diff --git a/lib/pleroma/scheduled_activity_worker.ex b/lib/pleroma/scheduled_activity_worker.ex
new file mode 100644
index 0000000000..65b38622f2
--- /dev/null
+++ b/lib/pleroma/scheduled_activity_worker.ex
@@ -0,0 +1,58 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.ScheduledActivityWorker do
+  @moduledoc """
+  Sends scheduled activities to the job queue.
+  """
+
+  alias Pleroma.Config
+  alias Pleroma.ScheduledActivity
+  alias Pleroma.User
+  alias Pleroma.Web.CommonAPI
+  use GenServer
+  require Logger
+
+  @schedule_interval :timer.minutes(1)
+
+  def start_link do
+    GenServer.start_link(__MODULE__, nil)
+  end
+
+  def init(_) do
+    if Config.get([ScheduledActivity, :enabled]) do
+      schedule_next()
+      {:ok, nil}
+    else
+      :ignore
+    end
+  end
+
+  def perform(:execute, scheduled_activity_id) do
+    try do
+      {:ok, scheduled_activity} = ScheduledActivity.delete(scheduled_activity_id)
+      %User{} = user = User.get_cached_by_id(scheduled_activity.user_id)
+      {:ok, _result} = CommonAPI.post(user, scheduled_activity.params)
+    rescue
+      error ->
+        Logger.error(
+          "#{__MODULE__} Couldn't create a status from the scheduled activity: #{inspect(error)}"
+        )
+    end
+  end
+
+  def handle_info(:perform, state) do
+    ScheduledActivity.due_activities(@schedule_interval)
+    |> Enum.each(fn scheduled_activity ->
+      PleromaJobQueue.enqueue(:scheduled_activities, __MODULE__, [:execute, scheduled_activity.id])
+    end)
+
+    schedule_next()
+    {:noreply, state}
+  end
+
+  defp schedule_next do
+    Process.send_after(self(), :perform, @schedule_interval)
+  end
+end
diff --git a/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex b/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex
index 6cb5df378b..fc8a2458c3 100644
--- a/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex
+++ b/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex
@@ -5,6 +5,7 @@
 defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
   use Pleroma.Web, :controller
 
+  alias Ecto.Changeset
   alias Pleroma.Activity
   alias Pleroma.Config
   alias Pleroma.Filter
@@ -438,14 +439,12 @@ def post_status(%{assigns: %{user: user}} = conn, %{"status" => _} = params) do
     scheduled_at = params["scheduled_at"]
 
     if scheduled_at && ScheduledActivity.far_enough?(scheduled_at) do
-      {:ok, scheduled_activity} =
-        Cachex.fetch!(:idempotency_cache, idempotency_key, fn _ ->
-          ScheduledActivity.create(user, %{"params" => params, "scheduled_at" => scheduled_at})
-        end)
-
-      conn
-      |> put_view(ScheduledActivityView)
-      |> render("show.json", %{scheduled_activity: scheduled_activity})
+      with {:ok, scheduled_activity} <-
+             ScheduledActivity.create(user, %{"params" => params, "scheduled_at" => scheduled_at}) do
+        conn
+        |> put_view(ScheduledActivityView)
+        |> render("show.json", %{scheduled_activity: scheduled_activity})
+      end
     else
       params = Map.drop(params, ["scheduled_at"])
 
@@ -1474,6 +1473,17 @@ def delete_filter(%{assigns: %{user: user}} = conn, %{"id" => filter_id}) do
 
   # fallback action
   #
+  def errors(conn, {:error, %Changeset{} = changeset}) do
+    error_message =
+      changeset
+      |> Changeset.traverse_errors(fn {message, _opt} -> message end)
+      |> Enum.map_join(", ", fn {_k, v} -> v end)
+
+    conn
+    |> put_status(422)
+    |> json(%{error: error_message})
+  end
+
   def errors(conn, {:error, :not_found}) do
     conn
     |> put_status(404)
diff --git a/lib/pleroma/web/mastodon_api/views/scheduled_activity_view.ex b/lib/pleroma/web/mastodon_api/views/scheduled_activity_view.ex
index 1ebff7aba5..0aae15ab9d 100644
--- a/lib/pleroma/web/mastodon_api/views/scheduled_activity_view.ex
+++ b/lib/pleroma/web/mastodon_api/views/scheduled_activity_view.ex
@@ -16,16 +16,20 @@ def render("index.json", %{scheduled_activities: scheduled_activities}) do
 
   def render("show.json", %{scheduled_activity: %ScheduledActivity{} = scheduled_activity}) do
     %{
-      id: scheduled_activity.id |> to_string,
-      scheduled_at: scheduled_activity.scheduled_at |> CommonAPI.Utils.to_masto_date(),
+      id: to_string(scheduled_activity.id),
+      scheduled_at: CommonAPI.Utils.to_masto_date(scheduled_activity.scheduled_at),
       params: status_params(scheduled_activity.params)
     }
     |> with_media_attachments(scheduled_activity)
   end
 
   defp with_media_attachments(data, %{params: %{"media_attachments" => media_attachments}}) do
-    attachments = render_many(media_attachments, StatusView, "attachment.json", as: :attachment)
-    Map.put(data, :media_attachments, attachments)
+    try do
+      attachments = render_many(media_attachments, StatusView, "attachment.json", as: :attachment)
+      Map.put(data, :media_attachments, attachments)
+    rescue
+      _ -> data
+    end
   end
 
   defp with_media_attachments(data, _), do: data
diff --git a/test/scheduled_activity_test.exs b/test/scheduled_activity_test.exs
index c49c65c0ac..edc7cc3f92 100644
--- a/test/scheduled_activity_test.exs
+++ b/test/scheduled_activity_test.exs
@@ -4,15 +4,11 @@
 
 defmodule Pleroma.ScheduledActivityTest do
   use Pleroma.DataCase
-  alias Pleroma.Config
   alias Pleroma.DataCase
   alias Pleroma.ScheduledActivity
-  alias Pleroma.Web.ActivityPub.ActivityPub
   import Pleroma.Factory
 
   setup context do
-    Config.put([ScheduledActivity, :daily_user_limit], 2)
-    Config.put([ScheduledActivity, :total_user_limit], 3)
     DataCase.ensure_local_uploader(context)
   end
 
@@ -42,7 +38,7 @@ test "when total user limit is exceeded" do
 
       tomorrow =
         NaiveDateTime.utc_now()
-        |> NaiveDateTime.add(:timer.hours(24), :millisecond)
+        |> NaiveDateTime.add(:timer.hours(36), :millisecond)
         |> NaiveDateTime.to_iso8601()
 
       {:ok, _} = ScheduledActivity.create(user, %{params: %{}, scheduled_at: today})
@@ -64,30 +60,5 @@ test "when scheduled_at is earlier than 5 minute from now" do
       {:error, changeset} = ScheduledActivity.create(user, attrs)
       assert changeset.errors == [scheduled_at: {"must be at least 5 minutes from now", []}]
     end
-
-    test "excludes attachments belonging to another user" do
-      user = insert(:user)
-      another_user = insert(:user)
-
-      scheduled_at =
-        NaiveDateTime.utc_now()
-        |> NaiveDateTime.add(:timer.minutes(10), :millisecond)
-        |> NaiveDateTime.to_iso8601()
-
-      file = %Plug.Upload{
-        content_type: "image/jpg",
-        path: Path.absname("test/fixtures/image.jpg"),
-        filename: "an_image.jpg"
-      }
-
-      {:ok, user_upload} = ActivityPub.upload(file, actor: user.ap_id)
-      {:ok, another_user_upload} = ActivityPub.upload(file, actor: another_user.ap_id)
-
-      media_ids = [user_upload.id, another_user_upload.id]
-      attrs = %{params: %{"media_ids" => media_ids}, scheduled_at: scheduled_at}
-      {:ok, scheduled_activity} = ScheduledActivity.create(user, attrs)
-      assert to_string(user_upload.id) in scheduled_activity.params["media_ids"]
-      refute to_string(another_user_upload.id) in scheduled_activity.params["media_ids"]
-    end
   end
 end
diff --git a/test/scheduled_activity_worker_test.exs b/test/scheduled_activity_worker_test.exs
new file mode 100644
index 0000000000..b9c91dda69
--- /dev/null
+++ b/test/scheduled_activity_worker_test.exs
@@ -0,0 +1,19 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.ScheduledActivityWorkerTest do
+  use Pleroma.DataCase
+  alias Pleroma.ScheduledActivity
+  import Pleroma.Factory
+
+  test "creates a status from the scheduled activity" do
+    user = insert(:user)
+    scheduled_activity = insert(:scheduled_activity, user: user, params: %{status: "hi"})
+    Pleroma.ScheduledActivityWorker.perform(:execute, scheduled_activity.id)
+
+    refute Repo.get(ScheduledActivity, scheduled_activity.id)
+    activity = Repo.all(Pleroma.Activity) |> Enum.find(&(&1.actor == user.ap_id))
+    assert activity.data["object"]["content"] == "hi"
+  end
+end
diff --git a/test/web/mastodon_api/mastodon_api_controller_test.exs b/test/web/mastodon_api/mastodon_api_controller_test.exs
index ae23756968..cd01116e29 100644
--- a/test/web/mastodon_api/mastodon_api_controller_test.exs
+++ b/test/web/mastodon_api/mastodon_api_controller_test.exs
@@ -2471,6 +2471,52 @@ test "skips the scheduling and creates the activity if scheduled_at is earlier t
       assert [] == Repo.all(ScheduledActivity)
     end
 
+    test "returns error when daily user limit is exceeded", %{conn: conn} do
+      user = insert(:user)
+
+      today =
+        NaiveDateTime.utc_now()
+        |> NaiveDateTime.add(:timer.minutes(6), :millisecond)
+        |> NaiveDateTime.to_iso8601()
+
+      attrs = %{params: %{}, scheduled_at: today}
+      {:ok, _} = ScheduledActivity.create(user, attrs)
+      {:ok, _} = ScheduledActivity.create(user, attrs)
+
+      conn =
+        conn
+        |> assign(:user, user)
+        |> post("/api/v1/statuses", %{"status" => "scheduled", "scheduled_at" => today})
+
+      assert %{"error" => "daily limit exceeded"} == json_response(conn, 422)
+    end
+
+    test "returns error when total user limit is exceeded", %{conn: conn} do
+      user = insert(:user)
+
+      today =
+        NaiveDateTime.utc_now()
+        |> NaiveDateTime.add(:timer.minutes(6), :millisecond)
+        |> NaiveDateTime.to_iso8601()
+
+      tomorrow =
+        NaiveDateTime.utc_now()
+        |> NaiveDateTime.add(:timer.hours(36), :millisecond)
+        |> NaiveDateTime.to_iso8601()
+
+      attrs = %{params: %{}, scheduled_at: today}
+      {:ok, _} = ScheduledActivity.create(user, attrs)
+      {:ok, _} = ScheduledActivity.create(user, attrs)
+      {:ok, _} = ScheduledActivity.create(user, %{params: %{}, scheduled_at: tomorrow})
+
+      conn =
+        conn
+        |> assign(:user, user)
+        |> post("/api/v1/statuses", %{"status" => "scheduled", "scheduled_at" => tomorrow})
+
+      assert %{"error" => "total limit exceeded"} == json_response(conn, 422)
+    end
+
     test "shows scheduled activities", %{conn: conn} do
       user = insert(:user)
       scheduled_activity_id1 = insert(:scheduled_activity, user: user).id |> to_string()
diff --git a/test/web/mastodon_api/scheduled_activity_view_test.exs b/test/web/mastodon_api/scheduled_activity_view_test.exs
index 26747a0c0f..ecbb855d4e 100644
--- a/test/web/mastodon_api/scheduled_activity_view_test.exs
+++ b/test/web/mastodon_api/scheduled_activity_view_test.exs
@@ -52,7 +52,7 @@ test "A scheduled activity with a media attachment" do
         |> Enum.map(&StatusView.render("attachment.json", %{attachment: &1})),
       params: %{
         in_reply_to_id: to_string(activity.id),
-        media_ids: [to_string(upload.id)],
+        media_ids: [upload.id],
         poll: nil,
         scheduled_at: nil,
         sensitive: true,
-- 
GitLab