From d8ec83280637e53ded67d4938a198cbeb9e8db05 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E7=8C=AB=E5=90=B8=E8=A1=80=E9=AC=BC=E3=83=87=E3=82=A3?=
 =?UTF-8?q?=E3=83=95=E3=83=AA=E3=82=B9=20/=20=E7=8C=AB=E3=83=AD=E3=82=ADP?=
 <deflis@gmail.com>
Date: Wed, 21 Jun 2017 03:41:41 +0900
Subject: [PATCH] Fix streaming server. Redis connection subscribe for each
 channel. (#3828)

---
 streaming/index.js | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git a/streaming/index.js b/streaming/index.js
index 270ed6f703..5afdd59619 100644
--- a/streaming/index.js
+++ b/streaming/index.js
@@ -115,7 +115,7 @@ const startWorker = (workerId) => {
 
   const subs = {};
 
-  redisSubscribeClient.on('pmessage', (_, channel, message) => {
+  redisSubscribeClient.on('message', (channel, message) => {
     const callbacks = subs[channel];
 
     log.silly(`New message on channel ${channel}`);
@@ -127,8 +127,6 @@ const startWorker = (workerId) => {
     callbacks.forEach(callback => callback(message));
   });
 
-  redisSubscribeClient.psubscribe(`${redisPrefix}timeline:*`);
-
   const subscriptionHeartbeat = (channel) => {
     const interval = 6*60;
     const tellSubscribed = () => {
@@ -144,12 +142,20 @@ const startWorker = (workerId) => {
   const subscribe = (channel, callback) => {
     log.silly(`Adding listener for ${channel}`);
     subs[channel] = subs[channel] || [];
+    if (subs[channel].length === 0) {
+      log.verbose(`Subscribe ${channel}`);
+      redisSubscribeClient.subscribe(channel);
+    }
     subs[channel].push(callback);
   };
 
   const unsubscribe = (channel, callback) => {
     log.silly(`Removing listener for ${channel}`);
     subs[channel] = subs[channel].filter(item => item !== callback);
+    if (subs[channel].length === 0) {
+      log.verbose(`Unsubscribe ${channel}`);
+      redisSubscribeClient.unsubscribe(channel);
+    }
   };
 
   const allowCrossDomain = (req, res, next) => {
-- 
GitLab