Skip to content
Snippets Groups Projects
Commit 2d2c8176 authored by Eugen Rochko's avatar Eugen Rochko
Browse files

Adding embedded PuSH server

parent 26287b6e
No related branches found
No related tags found
No related merge requests found
Showing
with 257 additions and 8 deletions
# frozen_string_literal: true
class Api::PushController < ApiController
def update
mode = params['hub.mode']
topic = params['hub.topic']
callback = params['hub.callback']
lease_seconds = params['hub.lease_seconds']
secret = params['hub.secret']
case mode
when 'subscribe'
response, status = Pubsubhubbub::SubscribeService.new.call(topic_to_account(topic), callback, secret, lease_seconds)
when 'unsubscribe'
response, status = Pubsubhubbub::UnsubscribeService.new.call(topic_to_account(topic), callback)
else
response = "Unknown mode: #{mode}"
status = 422
end
render plain: response, status: status
end
private
def topic_to_account(topic_url)
return if topic_url.blank?
uri = Addressable::URI.parse(topic_url)
params = Rails.application.routes.recognize_path(uri.path)
domain = uri.host + (uri.port ? ":#{uri.port}" : '')
return unless TagManager.instance.local_domain?(domain) && params[:controller] == 'accounts' && params[:action] == 'show' && params[:format] == 'atom'
Account.find_local(params[:username])
end
end
......@@ -44,8 +44,12 @@ class Account < ApplicationRecord
has_many :block_relationships, class_name: 'Block', foreign_key: 'account_id', dependent: :destroy
has_many :blocking, -> { order('blocks.id desc') }, through: :block_relationships, source: :target_account
# Media
has_many :media_attachments, dependent: :destroy
# PuSH subscriptions
has_many :subscriptions, dependent: :destroy
pg_search_scope :search_for, against: { username: 'A', domain: 'B' }, using: { tsearch: { prefix: true } }
scope :remote, -> { where.not(domain: nil) }
......
# frozen_string_literal: true
class Subscription < ApplicationRecord
MIN_EXPIRATION = 3600 * 24
MAX_EXPIRATION = 3600 * 24 * 30
belongs_to :account
validates :callback_url, presence: true
validates :callback_url, uniqueness: { scope: :account_id }
scope :active, -> { where(confirmed: true).where('expires_at > ?', Time.now.utc) }
def lease_seconds=(str)
self.expires_at = Time.now.utc + [[MIN_EXPIRATION, str.to_i].max, MAX_EXPIRATION].min.seconds
end
def lease_seconds
(expires_at - Time.now.utc).to_i
end
before_validation :set_min_expiration
private
def set_min_expiration
self.lease_seconds = 0 unless expires_at
end
end
......@@ -7,7 +7,9 @@ class FavouriteService < BaseService
# @return [Favourite]
def call(account, status)
favourite = Favourite.create!(account: account, status: status)
HubPingWorker.perform_async(account.id)
Pubsubhubbub::DistributionWorker.perform_async(favourite.stream_entry.id)
if status.local?
NotifyService.new.call(status.account, favourite)
......
......@@ -19,7 +19,10 @@ class FollowService < BaseService
end
merge_into_timeline(target_account, source_account)
HubPingWorker.perform_async(source_account.id)
Pubsubhubbub::DistributionWorker.perform_async(follow.stream_entry.id)
follow
end
......
......@@ -14,8 +14,11 @@ class PostStatusService < BaseService
attach_media(status, options[:media_ids])
process_mentions_service.call(status)
process_hashtags_service.call(status)
DistributionWorker.perform_async(status.id)
HubPingWorker.perform_async(account.id)
Pubsubhubbub::DistributionWorker.perform_async(status.stream_entry.id)
status
end
......
# frozen_string_literal: true
class Pubsubhubbub::SubscribeService < BaseService
def call(account, callback, secret, lease_seconds)
return ['Invalid topic URL', 422] if account.nil?
return ['Invalid callback URL', 422] unless !callback.blank? && callback =~ /\A#{URI.regexp(%w(http https))}\z/
subscription = Subscription.where(account: account, callback_url: callback).first_or_create!(account: account, callback_url: callback)
Pubsubhubbub::ConfirmationWorker.perform_async(subscription.id, 'subscribe', secret, lease_seconds)
['', 202]
end
end
# frozen_string_literal: true
class Pubsubhubbub::SubscribeService < BaseService
def call(account, callback)
return ['Invalid topic URL', 422] if account.nil?
subscription = Subscription.where(account: account, callback_url: callback)
unless subscription.nil?
Pubsubhubbub::ConfirmationWorker.perform_async(subscription.id, 'unsubscribe')
end
['', 202]
end
end
......@@ -7,8 +7,10 @@ class ReblogService < BaseService
# @return [Status]
def call(account, reblogged_status)
reblog = account.statuses.create!(reblog: reblogged_status, text: '')
DistributionWorker.perform_async(reblog.id)
HubPingWorker.perform_async(account.id)
Pubsubhubbub::DistributionWorker.perform_async(reblog.stream_entry.id)
if reblogged_status.local?
NotifyService.new.call(reblogged_status.account, reblog)
......
......@@ -10,6 +10,9 @@ class RemoveStatusService < BaseService
remove_from_public(status)
status.destroy!
HubPingWorker.perform_async(status.account.id)
Pubsubhubbub::DistributionWorker.perform_async(status.stream_entry.id)
end
private
......
# frozen_string_literal: true
Nokogiri::XML::Builder.new do |xml|
feed(xml) do
simple_id xml, account_url(@account, format: 'atom')
......@@ -12,6 +14,7 @@ Nokogiri::XML::Builder.new do |xml|
link_alternate xml, TagManager.instance.url_for(@account)
link_self xml, account_url(@account, format: 'atom')
link_hub xml, api_push_url
link_hub xml, Rails.configuration.x.hub_url
link_salmon xml, api_salmon_url(@account.id)
......
# frozen_string_literal: true
class Pubsubhubbub::ConfirmationWorker
include Sidekiq::Worker
include RoutingHelper
def perform(subscription_id, mode, secret = nil, lease_seconds = nil)
subscription = Subscription.find(subscription_id)
challenge = SecureRandom.hex
subscription.secret = secret
subscription.lease_seconds = lease_seconds
response = HTTP.headers(user_agent: 'Mastodon/PubSubHubbub')
.timeout(:per_operation, write: 20, connect: 20, read: 50)
.get(subscription.callback_url, params: {
'hub.topic' => account_url(subscription.account, format: :atom),
'hub.mode' => mode,
'hub.challenge' => challenge,
'hub.lease_seconds' => subscription.lease_seconds,
})
if mode == 'subscribe' && response.body.to_s == challenge
subscription.save!
elsif (mode == 'unsubscribe' && response.body.to_s == challenge) || !subscription.confirmed?
subscription.destroy!
end
end
end
# frozen_string_literal: true
class Pubsubhubbub::DeliveryWorker
include Sidekiq::Worker
include RoutingHelper
def perform(subscription_id, payload)
subscription = Subscription.find(subscription_id)
headers = {}
headers['User-Agent'] = 'Mastodon/PubSubHubbub'
headers['Link'] = LinkHeader.new([[api_push_url, [%w(rel hub)]], [account_url(subscription.account, format: :atom), [%w(rel self)]]]).to_s
headers['X-Hub-Signature'] = signature(subscription.secret, payload) unless subscription.secret.blank?
response = HTTP.timeout(:per_operation, write: 50, connect: 20, read: 50)
.headers(headers)
.post(subscription.callback_url, body: payload)
raise "Delivery failed for #{subscription.callback_url}: HTTP #{response.code}" unless response.code > 199 && response.code < 300
end
private
def signature(secret, payload)
hmac = OpenSSL::HMAC.hexdigest(OpenSSL::Digest.new('sha1'), secret, payload)
"sha1=#{hmac}"
end
end
# frozen_string_literal: true
class Pubsubhubbub::DistributionWorker
include Sidekiq::Worker
def perform(stream_entry_id)
stream_entry = StreamEntry.find(stream_entry_id)
account = stream_entry.account
payload = AccountsController.render(:show, assigns: { account: account, entries: [stream_entry] }, formats: [:atom])
Subscription.where(account: account).active.select('id').find_each do |subscription|
Pubsubhubbub::DeliveryWorker.perform_async(subscription.id, payload)
end
end
end
......@@ -7,9 +7,9 @@ class ThreadResolveWorker
child_status = Status.find(child_status_id)
parent_status = FetchRemoteStatusService.new.call(parent_url)
unless parent_status.nil?
child_status.thread = parent_status
child_status.save!
end
return if parent_status.nil?
child_status.thread = parent_status
child_status.save!
end
end
# frozen_string_literal: true
require 'sidekiq/web'
Rails.application.routes.draw do
mount ActionCable.server => '/cable'
mount ActionCable.server, at: 'cable'
authenticate :user, lambda { |u| u.admin? } do
mount Sidekiq::Web, at: 'sidekiq'
......@@ -19,7 +21,7 @@ Rails.application.routes.draw do
sessions: 'auth/sessions',
registrations: 'auth/registrations',
passwords: 'auth/passwords',
confirmations: 'auth/confirmations'
confirmations: 'auth/confirmations',
}
resources :accounts, path: 'users', only: [:show], param: :username do
......@@ -43,10 +45,13 @@ Rails.application.routes.draw do
resources :tags, only: [:show]
namespace :api do
# PubSubHubbub
# PubSubHubbub outgoing subscriptions
resources :subscriptions, only: [:show]
post '/subscriptions/:id', to: 'subscriptions#update'
# PubSubHubbub incoming subscriptions
post '/push', to: 'push#update', as: :push
# Salmon
post '/salmon/:id', to: 'salmon#update', as: :salmon
......
class CreateSubscriptions < ActiveRecord::Migration[5.0]
def change
create_table :subscriptions do |t|
t.string :callback_url, null: false, default: ''
t.string :secret
t.datetime :expires_at, null: true, default: nil
t.boolean :confirmed, null: false, default: false
t.integer :account_id, null: false
t.timestamps
end
add_index :subscriptions, [:callback_url, :account_id], unique: true
end
end
......@@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 20161123093447) do
ActiveRecord::Schema.define(version: 20161128103007) do
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"
......@@ -143,6 +143,19 @@ ActiveRecord::Schema.define(version: 20161123093447) do
t.index ["uid"], name: "index_oauth_applications_on_uid", unique: true, using: :btree
end
create_table "pubsubhubbub_subscriptions", force: :cascade do |t|
t.string "topic", default: "", null: false
t.string "callback", default: "", null: false
t.string "mode", default: "", null: false
t.string "challenge", default: "", null: false
t.string "secret"
t.boolean "confirmed", default: false, null: false
t.datetime "expires_at", null: false
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["topic", "callback"], name: "index_pubsubhubbub_subscriptions_on_topic_and_callback", unique: true, using: :btree
end
create_table "settings", force: :cascade do |t|
t.string "var", null: false
t.text "value"
......@@ -185,6 +198,17 @@ ActiveRecord::Schema.define(version: 20161123093447) do
t.index ["activity_id", "activity_type"], name: "index_stream_entries_on_activity_id_and_activity_type", using: :btree
end
create_table "subscriptions", force: :cascade do |t|
t.string "callback_url", default: "", null: false
t.string "secret"
t.datetime "expires_at"
t.boolean "confirmed", default: false, null: false
t.integer "account_id", null: false
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["callback_url", "account_id"], name: "index_subscriptions_on_callback_url_and_account_id", unique: true, using: :btree
end
create_table "tags", force: :cascade do |t|
t.string "name", default: "", null: false
t.datetime "created_at", null: false
......
require 'rails_helper'
RSpec.describe Api::PushController, type: :controller do
describe 'POST #update' do
context 'with hub.mode=subscribe' do
pending
end
context 'with hub.mode=unsubscribe' do
pending
end
end
end
Fabricator(:subscription) do
callback_url "http://example.com/callback"
secret "foobar"
expires_at "2016-11-28 11:30:07"
confirmed false
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment