streamer.ex 3.37 KB
Newer Older
lain's avatar
lain committed
1 2 3
defmodule Pleroma.Web.Streamer do
  use GenServer
  require Logger
4
  alias Pleroma.{User, Notification}
lain's avatar
lain committed
5 6

  def start_link do
7 8 9 10
    spawn(fn ->
      Process.sleep(1000 * 30) # 30 seconds
      GenServer.cast(__MODULE__, %{action: :ping})
    end)
lain's avatar
lain committed
11 12 13 14 15 16 17
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end

  def add_socket(topic, socket) do
    GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
  end

18 19 20 21
  def remove_socket(topic, socket) do
    GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
  end

lain's avatar
lain committed
22 23 24 25
  def stream(topic, item) do
    GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
  end

26 27 28 29 30 31 32 33 34 35 36 37 38 39
  def handle_cast(%{action: :ping}, topics) do
    Map.values(topics)
    |> List.flatten
    |> Enum.each(fn (socket) ->
      Logger.debug("Sending keepalive ping")
      send socket.transport_pid, {:text, ""}
    end)
    spawn(fn ->
      Process.sleep(1000 * 30) # 30 seconds
      GenServer.cast(__MODULE__, %{action: :ping})
    end)
    {:noreply, topics}
  end

40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
  def handle_cast(%{action: :stream, topic: "user", item: %Notification{} = item}, topics) do
    topic = "user:#{item.user_id}"
    Enum.each(topics[topic] || [], fn (socket) ->
      json = %{
        event: "notification",
        payload: Pleroma.Web.MastodonAPI.MastodonAPIController.render_notification(socket.assigns["user"], item) |> Poison.encode!
      } |> Poison.encode!

      send socket.transport_pid, {:text, json}
    end)
    {:noreply, topics}
  end

  def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
    Logger.debug("Trying to push to users")
    recipient_topics = User.get_recipients_from_activity(item)
    |> Enum.map(fn (%{id: id}) -> "user:#{id}" end)

    Enum.each(recipient_topics, fn (topic) ->
      push_to_socket(topics, topic, item)
    end)
    {:noreply, topics}
  end

  def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
    Logger.debug("Trying to push to #{topic}")
    Logger.debug("Pushing item to #{topic}")
    push_to_socket(topics, topic, item)
lain's avatar
lain committed
68 69 70 71
    {:noreply, topics}
  end

  def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
72
    topic = internal_topic(topic, socket)
lain's avatar
lain committed
73 74 75 76 77 78 79
    sockets_for_topic = sockets[topic] || []
    sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
    sockets = Map.put(sockets, topic, sockets_for_topic)
    Logger.debug("Got new conn for #{topic}")
    {:noreply, sockets}
  end

80
  def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
81
    topic = internal_topic(topic, socket)
82 83 84 85 86 87 88
    sockets_for_topic = sockets[topic] || []
    sockets_for_topic = List.delete(sockets_for_topic, socket)
    sockets = Map.put(sockets, topic, sockets_for_topic)
    Logger.debug("Removed conn for #{topic}")
    {:noreply, sockets}
  end

lain's avatar
lain committed
89
  def handle_cast(m, state) do
lain's avatar
lain committed
90
    Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}")
lain's avatar
lain committed
91 92
    {:noreply, state}
  end
Thog's avatar
Thog committed
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109

  def push_to_socket(topics, topic, item) do
    Enum.each(topics[topic] || [], fn (socket) ->
      json = %{
        event: "update",
        payload: Pleroma.Web.MastodonAPI.StatusView.render("status.json", activity: item, for: socket.assigns[:user]) |> Poison.encode!
      } |> Poison.encode!

      send socket.transport_pid, {:text, json}
    end)
  end

  defp internal_topic("user", socket) do
    "user:#{socket.assigns[:user].id}"
  end

  defp internal_topic(topic, _), do: topic
lain's avatar
lain committed
110
end