feat: skip self

This commit is contained in:
chenos 2024-07-23 15:30:32 +08:00 committed by mytharcher
parent adc10b50df
commit 1c2de28a0d
2 changed files with 42 additions and 20 deletions

View File

@ -19,7 +19,7 @@ export class RedisPubSubAdapter implements IPubSubAdapter {
constructor() { constructor() {
this.publisher = createClient({ this.publisher = createClient({
url: process.env.REDIS_URL || 'redis://redis:6379', url: process.env.PUB_SUB_REDIS_URL || 'redis://redis:6379',
}); });
this.subscriber = this.publisher.duplicate(); this.subscriber = this.publisher.duplicate();
} }
@ -87,10 +87,10 @@ describe('pub-sub-manager', () => {
}); });
await node1.pubSubManager.publish('chan1nel', `channel1_message_1`); await node1.pubSubManager.publish('chan1nel', `channel1_message_1`);
await sleep(1000); await sleep(1000);
expect(count).toBe(2); expect(count).toBe(1);
await node1.pm.get(Plugin1).sendMessage('plugin send message'); await node1.pm.get(Plugin1).sendMessage('plugin send message');
await sleep(1000); await sleep(1000);
expect(count).toBe(4); expect(count).toBe(2);
await node1.destroy(); await node1.destroy();
await node2.destroy(); await node2.destroy();
}); });

View File

@ -7,16 +7,19 @@
* For more information, please refer to: https://www.nocobase.com/agreement. * For more information, please refer to: https://www.nocobase.com/agreement.
*/ */
import { uid } from '@nocobase/utils';
import Application from './application'; import Application from './application';
export class PubSubManager { export class PubSubManager {
adapter: IPubSubAdapter; adapter: IPubSubAdapter;
subscribes = new Map(); subscribes = new Map();
publisherId: string;
constructor( constructor(
protected app: Application, protected app: Application,
protected options: any = {}, protected options: any = {},
) { ) {
this.publisherId = uid();
app.on('afterStart', async () => { app.on('afterStart', async () => {
await this.connect(); await this.connect();
}); });
@ -47,8 +50,8 @@ export class PubSubManager {
await this.adapter.connect(); await this.adapter.connect();
// subscribe 要在 connect 之后 // subscribe 要在 connect 之后
for (const [channel, callbacks] of this.subscribes) { for (const [channel, callbacks] of this.subscribes) {
for (const callback of callbacks) { for (const [, fn] of callbacks) {
await this.adapter.subscribe(`${this.prefix}.${channel}`, callback); await this.adapter.subscribe(`${this.prefix}.${channel}`, fn);
} }
} }
} }
@ -60,41 +63,60 @@ export class PubSubManager {
return await this.adapter.close(); return await this.adapter.close();
} }
async subscribe(channel, callback) { async subscribe(channel, callback, skipSelf = true) {
const fn = (wrappedMessage) => {
const { publisherId, message } = JSON.parse(wrappedMessage);
if (skipSelf && publisherId === this.publisherId) {
return;
}
callback(message);
};
if (!this.subscribes.has(channel)) { if (!this.subscribes.has(channel)) {
const set = new Set(); const map = new Map();
this.subscribes.set(channel, set); this.subscribes.set(channel, map);
} }
const set = this.subscribes.get(channel); const map = this.subscribes.get(channel);
set.add(callback); map.set(callback, fn);
} }
async unsubscribe(channel, callback) { async unsubscribe(channel, callback) {
const set = this.subscribes.get(channel); const map: Map<any, any> = this.subscribes.get(channel);
if (set) { let fn = null;
set.delete(callback); if (map) {
fn = map.get(callback);
} }
if (!this.adapter) { if (!this.adapter || !fn) {
return; return;
} }
return this.adapter.unsubscribe(`${this.prefix}.${channel}`, callback); return this.adapter.unsubscribe(`${this.prefix}.${channel}`, fn);
} }
async publish(channel, message) { async publish(channel, message) {
if (!this.adapter) { if (!this.adapter) {
return; return;
} }
return this.adapter.publish(`${this.prefix}.${channel}`, message);
const wrappedMessage = JSON.stringify({
publisherId: this.publisherId,
message: message,
});
return this.adapter.publish(`${this.prefix}.${channel}`, wrappedMessage);
} }
onMessage(callback) { onMessage(callback, skipSelf = true) {
if (!this.adapter) { if (!this.adapter) {
return; return;
} }
return this.adapter.onMessage((channel, message) => { return this.adapter.onMessage((channel: string, wrappedMessage) => {
if (channel.startsWith(`${this.prefix}.`)) { if (!channel.startsWith(`${this.prefix}.`)) {
callback(channel, message); return;
} }
const { publisherId, message } = JSON.parse(wrappedMessage);
if (skipSelf && publisherId === this.publisherId) {
return;
}
callback(channel, message);
}); });
} }
} }