diff --git a/packages/core/server/src/__tests__/pub-sub-manager.test.ts b/packages/core/server/src/__tests__/pub-sub-manager.test.ts index f1c0d6122e..7794a7ac31 100644 --- a/packages/core/server/src/__tests__/pub-sub-manager.test.ts +++ b/packages/core/server/src/__tests__/pub-sub-manager.test.ts @@ -19,7 +19,7 @@ export class RedisPubSubAdapter implements IPubSubAdapter { constructor() { this.publisher = createClient({ - url: process.env.REDIS_URL || 'redis://redis:6379', + url: process.env.PUB_SUB_REDIS_URL || 'redis://redis:6379', }); this.subscriber = this.publisher.duplicate(); } @@ -87,10 +87,10 @@ describe('pub-sub-manager', () => { }); await node1.pubSubManager.publish('chan1nel', `channel1_message_1`); await sleep(1000); - expect(count).toBe(2); + expect(count).toBe(1); await node1.pm.get(Plugin1).sendMessage('plugin send message'); await sleep(1000); - expect(count).toBe(4); + expect(count).toBe(2); await node1.destroy(); await node2.destroy(); }); diff --git a/packages/core/server/src/pub-sub-manager.ts b/packages/core/server/src/pub-sub-manager.ts index 98e2e79346..a1646fd3cc 100644 --- a/packages/core/server/src/pub-sub-manager.ts +++ b/packages/core/server/src/pub-sub-manager.ts @@ -7,16 +7,19 @@ * For more information, please refer to: https://www.nocobase.com/agreement. */ +import { uid } from '@nocobase/utils'; import Application from './application'; export class PubSubManager { adapter: IPubSubAdapter; subscribes = new Map(); + publisherId: string; constructor( protected app: Application, protected options: any = {}, ) { + this.publisherId = uid(); app.on('afterStart', async () => { await this.connect(); }); @@ -47,8 +50,8 @@ export class PubSubManager { await this.adapter.connect(); // subscribe 要在 connect 之后 for (const [channel, callbacks] of this.subscribes) { - for (const callback of callbacks) { - await this.adapter.subscribe(`${this.prefix}.${channel}`, callback); + for (const [, fn] of callbacks) { + await this.adapter.subscribe(`${this.prefix}.${channel}`, fn); } } } @@ -60,41 +63,60 @@ export class PubSubManager { return await this.adapter.close(); } - async subscribe(channel, callback) { + async subscribe(channel, callback, skipSelf = true) { + const fn = (wrappedMessage) => { + const { publisherId, message } = JSON.parse(wrappedMessage); + if (skipSelf && publisherId === this.publisherId) { + return; + } + callback(message); + }; if (!this.subscribes.has(channel)) { - const set = new Set(); - this.subscribes.set(channel, set); + const map = new Map(); + this.subscribes.set(channel, map); } - const set = this.subscribes.get(channel); - set.add(callback); + const map = this.subscribes.get(channel); + map.set(callback, fn); } async unsubscribe(channel, callback) { - const set = this.subscribes.get(channel); - if (set) { - set.delete(callback); + const map: Map = this.subscribes.get(channel); + let fn = null; + if (map) { + fn = map.get(callback); } - if (!this.adapter) { + if (!this.adapter || !fn) { return; } - return this.adapter.unsubscribe(`${this.prefix}.${channel}`, callback); + return this.adapter.unsubscribe(`${this.prefix}.${channel}`, fn); } async publish(channel, message) { if (!this.adapter) { return; } - return this.adapter.publish(`${this.prefix}.${channel}`, message); + + const wrappedMessage = JSON.stringify({ + publisherId: this.publisherId, + message: message, + }); + + return this.adapter.publish(`${this.prefix}.${channel}`, wrappedMessage); } - onMessage(callback) { + onMessage(callback, skipSelf = true) { if (!this.adapter) { return; } - return this.adapter.onMessage((channel, message) => { - if (channel.startsWith(`${this.prefix}.`)) { - callback(channel, message); + return this.adapter.onMessage((channel: string, wrappedMessage) => { + if (!channel.startsWith(`${this.prefix}.`)) { + return; } + const { publisherId, message } = JSON.parse(wrappedMessage); + if (skipSelf && publisherId === this.publisherId) { + return; + } + callback(channel, message); }); } }