Skip to content

Commit 033a358

Browse files
committed
feat(sharded): add an option for dynamic private channels
1 parent dc1407f commit 033a358

File tree

2 files changed

+49
-13
lines changed

2 files changed

+49
-13
lines changed

lib/sharded-adapter.ts

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,17 @@ export interface ShardedRedisAdapterOptions {
3434
* The default value, useful when some rooms have a low number of clients (so only a few Socket.IO servers are notified).
3535
*
3636
* Only public rooms (i.e. not related to a particular Socket ID) are taken in account, because:
37-
*
3837
* - a lot of connected clients would mean a lot of subscription/unsubscription
3938
* - the Socket ID attribute is ephemeral
4039
*
40+
* - "dynamic-private"
41+
*
42+
* Like "dynamic" but creates separate channels for private rooms as well. Useful when there is lots of 1:1 communication
43+
* via socket.emit() calls.
44+
*
4145
* @default "dynamic"
4246
*/
43-
subscriptionMode?: "static" | "dynamic";
47+
subscriptionMode?: "static" | "dynamic" | "dynamic-private";
4448
}
4549

4650
/**
@@ -91,15 +95,13 @@ class ShardedRedisAdapter extends ClusterAdapter {
9195

9296
if (this.opts.subscriptionMode === "dynamic") {
9397
this.on("create-room", (room) => {
94-
const isPublicRoom = !this.sids.has(room);
95-
if (isPublicRoom) {
98+
if (this.shouldUseASeparateNamespace(room)) {
9699
SSUBSCRIBE(this.subClient, this.dynamicChannel(room), handler);
97100
}
98101
});
99102

100103
this.on("delete-room", (room) => {
101-
const isPublicRoom = !this.sids.has(room);
102-
if (isPublicRoom) {
104+
if (this.shouldUseASeparateNamespace(room)) {
103105
SUNSUBSCRIBE(this.subClient, this.dynamicChannel(room));
104106
}
105107
});
@@ -111,8 +113,7 @@ class ShardedRedisAdapter extends ClusterAdapter {
111113

112114
if (this.opts.subscriptionMode === "dynamic") {
113115
this.rooms.forEach((_sids, room) => {
114-
const isPublicRoom = !this.sids.has(room);
115-
if (isPublicRoom) {
116+
if (this.shouldUseASeparateNamespace(room)) {
116117
channels.push(this.dynamicChannel(room));
117118
}
118119
});
@@ -133,15 +134,19 @@ class ShardedRedisAdapter extends ClusterAdapter {
133134
}
134135

135136
private computeChannel(message) {
137+
const isDynamicChannel =
138+
(this.opts.subscriptionMode === "dynamic" &&
139+
!looksLikeASocketId(message.data.opts.rooms[0])) ||
140+
this.opts.subscriptionMode === "dynamic-private";
141+
136142
// broadcast with ack can not use a dynamic channel, because the serverCount() method return the number of all
137143
// servers, not only the ones where the given room exists
138-
const useDynamicChannel =
139-
this.opts.subscriptionMode === "dynamic" &&
144+
const isSupportedMessageType =
140145
message.type === MessageType.BROADCAST &&
141146
message.data.requestId === undefined &&
142-
message.data.opts.rooms.length === 1 &&
143-
!looksLikeASocketId(message.data.opts.rooms[0]);
144-
if (useDynamicChannel) {
147+
message.data.opts.rooms.length === 1;
148+
149+
if (isDynamicChannel && isSupportedMessageType) {
145150
return this.dynamicChannel(message.data.opts.rooms[0]);
146151
} else {
147152
return this.channel;
@@ -204,4 +209,13 @@ class ShardedRedisAdapter extends ClusterAdapter {
204209
override serverCount(): Promise<number> {
205210
return PUBSUB(this.pubClient, "SHARDNUMSUB", this.channel);
206211
}
212+
213+
shouldUseASeparateNamespace(room: string): boolean {
214+
const isPublicRoom = !this.sids.has(room);
215+
216+
return (
217+
(this.opts.subscriptionMode === "dynamic" && isPublicRoom) ||
218+
this.opts.subscriptionMode === "dynamic-private"
219+
);
220+
}
207221
}

test/test-runner.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,28 @@ describe("@socket.io/redis-adapter", () => {
175175
true
176176
));
177177

178+
describe("[sharded] redis@4 standalone (dynamic subscription mode & dynamic private channels)", () =>
179+
testSuite(
180+
async () => {
181+
const pubClient = createClient();
182+
const subClient = pubClient.duplicate();
183+
184+
await Promise.all([pubClient.connect(), subClient.connect()]);
185+
186+
return [
187+
createShardedAdapter(pubClient, subClient, {
188+
subscriptionMode: "dynamic-private",
189+
}),
190+
() => {
191+
pubClient.disconnect();
192+
subClient.disconnect();
193+
},
194+
];
195+
},
196+
"redis@4",
197+
true
198+
));
199+
178200
describe("[sharded] redis@4 standalone (static subscription mode)", () =>
179201
testSuite(
180202
async () => {

0 commit comments

Comments
 (0)