Skip to content

Commit fe89f7e

Browse files
authored
feat: implement adapter.close() function (#485)
When the close function is called it will (p)unsubscribe from the channels it (p)subscribed to in the constructor. Related: - #480 - socketio/socket.io@5d9220b
1 parent 9b940b8 commit fe89f7e

File tree

2 files changed

+146
-11
lines changed

2 files changed

+146
-11
lines changed

lib/index.ts

Lines changed: 75 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,10 @@ export class RedisAdapter extends Adapter {
102102
private readonly channel: string;
103103
private readonly requestChannel: string;
104104
private readonly responseChannel: string;
105+
private readonly specificResponseChannel: string;
105106
private requests: Map<string, Request> = new Map();
106107
private ackRequests: Map<string, AckRequest> = new Map();
108+
private redisListeners: Map<string, Function> = new Map();
107109

108110
/**
109111
* Adapter constructor.
@@ -133,34 +135,51 @@ export class RedisAdapter extends Adapter {
133135
this.channel = prefix + "#" + nsp.name + "#";
134136
this.requestChannel = prefix + "-request#" + this.nsp.name + "#";
135137
this.responseChannel = prefix + "-response#" + this.nsp.name + "#";
136-
const specificResponseChannel = this.responseChannel + this.uid + "#";
138+
this.specificResponseChannel = this.responseChannel + this.uid + "#";
137139

138140
const isRedisV4 = typeof this.pubClient.pSubscribe === "function";
139141
if (isRedisV4) {
142+
this.redisListeners.set("psub", (msg, channel) => {
143+
this.onmessage(null, channel, msg);
144+
});
145+
146+
this.redisListeners.set("sub", (msg, channel) => {
147+
this.onrequest(channel, msg);
148+
});
149+
140150
this.subClient.pSubscribe(
141151
this.channel + "*",
142-
(msg, channel) => {
143-
this.onmessage(null, channel, msg);
144-
},
152+
this.redisListeners.get("psub"),
145153
true
146154
);
147155
this.subClient.subscribe(
148-
[this.requestChannel, this.responseChannel, specificResponseChannel],
149-
(msg, channel) => {
150-
this.onrequest(channel, msg);
151-
},
156+
[
157+
this.requestChannel,
158+
this.responseChannel,
159+
this.specificResponseChannel,
160+
],
161+
this.redisListeners.get("sub"),
152162
true
153163
);
154164
} else {
165+
this.redisListeners.set("pmessageBuffer", this.onmessage.bind(this));
166+
this.redisListeners.set("messageBuffer", this.onrequest.bind(this));
167+
155168
this.subClient.psubscribe(this.channel + "*");
156-
this.subClient.on("pmessageBuffer", this.onmessage.bind(this));
169+
this.subClient.on(
170+
"pmessageBuffer",
171+
this.redisListeners.get("pmessageBuffer")
172+
);
157173

158174
this.subClient.subscribe([
159175
this.requestChannel,
160176
this.responseChannel,
161-
specificResponseChannel,
177+
this.specificResponseChannel,
162178
]);
163-
this.subClient.on("messageBuffer", this.onrequest.bind(this));
179+
this.subClient.on(
180+
"messageBuffer",
181+
this.redisListeners.get("messageBuffer")
182+
);
164183
}
165184

166185
const registerFriendlyErrorHandler = (redisClient) => {
@@ -917,4 +936,49 @@ export class RedisAdapter extends Adapter {
917936
serverCount(): Promise<number> {
918937
return this.getNumSub();
919938
}
939+
940+
close(): Promise<void> | void {
941+
const isRedisV4 = typeof this.pubClient.pSubscribe === "function";
942+
if (isRedisV4) {
943+
this.subClient.pUnsubscribe(
944+
this.channel + "*",
945+
this.redisListeners.get("psub"),
946+
true
947+
);
948+
949+
// There is a bug in redis v4 when unsubscribing multiple channels at once, so we'll unsub one at a time.
950+
// See https://github.com/redis/node-redis/issues/2052
951+
this.subClient.unsubscribe(
952+
this.requestChannel,
953+
this.redisListeners.get("sub"),
954+
true
955+
);
956+
this.subClient.unsubscribe(
957+
this.responseChannel,
958+
this.redisListeners.get("sub"),
959+
true
960+
);
961+
this.subClient.unsubscribe(
962+
this.specificResponseChannel,
963+
this.redisListeners.get("sub"),
964+
true
965+
);
966+
} else {
967+
this.subClient.punsubscribe(this.channel + "*");
968+
this.subClient.off(
969+
"pmessageBuffer",
970+
this.redisListeners.get("pmessageBuffer")
971+
);
972+
973+
this.subClient.unsubscribe([
974+
this.requestChannel,
975+
this.responseChannel,
976+
this.specificResponseChannel,
977+
]);
978+
this.subClient.off(
979+
"messageBuffer",
980+
this.redisListeners.get("messageBuffer")
981+
);
982+
}
983+
}
920984
}

test/index.ts

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,77 @@ describe(`socket.io-redis with ${
192192
});
193193
});
194194

195+
it("unsubscribes when close is called", async () => {
196+
const parseInfo = (rawInfo: string) => {
197+
const info = {};
198+
199+
rawInfo.split("\r\n").forEach((line) => {
200+
if (line.length > 0 && !line.startsWith("#")) {
201+
const fieldVal = line.split(":");
202+
info[fieldVal[0]] = fieldVal[1];
203+
}
204+
});
205+
206+
return info;
207+
};
208+
209+
const getInfo = async (): Promise<any> => {
210+
if (process.env.REDIS_CLIENT === undefined) {
211+
return parseInfo(
212+
await namespace3.adapter.pubClient.sendCommand(["info"])
213+
);
214+
} else if (process.env.REDIS_CLIENT === "ioredis") {
215+
return parseInfo(await namespace3.adapter.pubClient.call("info"));
216+
} else {
217+
return await new Promise((resolve, reject) => {
218+
namespace3.adapter.pubClient.sendCommand(
219+
"info",
220+
[],
221+
(err, result) => {
222+
if (err) {
223+
reject(err);
224+
}
225+
resolve(parseInfo(result));
226+
}
227+
);
228+
});
229+
}
230+
};
231+
232+
return new Promise(async (resolve, reject) => {
233+
// Give it a moment to subscribe to all the channels
234+
setTimeout(async () => {
235+
try {
236+
const info = await getInfo();
237+
238+
// Depending on the version of redis this may be 3 (redis < v5) or 1 (redis > v4)
239+
// Older versions subscribed multiple times on the same pattern. Newer versions only sub once.
240+
expect(info.pubsub_patterns).to.be.greaterThan(0);
241+
expect(info.pubsub_channels).to.eql(5); // 2 shared (request/response) + 3 unique for each namespace
242+
243+
namespace1.adapter.close();
244+
namespace2.adapter.close();
245+
namespace3.adapter.close();
246+
247+
// Give it a moment to unsubscribe
248+
setTimeout(async () => {
249+
try {
250+
const info = await getInfo();
251+
252+
expect(info.pubsub_patterns).to.eql(0); // All patterns subscriptions should be unsubscribed
253+
expect(info.pubsub_channels).to.eql(0); // All subscriptions should be unsubscribed
254+
resolve();
255+
} catch (error) {
256+
reject(error);
257+
}
258+
}, 100);
259+
} catch (error) {
260+
reject(error);
261+
}
262+
}, 100);
263+
});
264+
});
265+
195266
if (process.env.REDIS_CLIENT === undefined) {
196267
// redis@4
197268
it("ignores messages from unknown channels", (done) => {

0 commit comments

Comments
 (0)