diff --git a/packages/core/server/src/__tests__/pub-sub-manager.test.ts b/packages/core/server/src/__tests__/pub-sub-manager.test.ts index c6f21f3362..63707ddd95 100644 --- a/packages/core/server/src/__tests__/pub-sub-manager.test.ts +++ b/packages/core/server/src/__tests__/pub-sub-manager.test.ts @@ -52,6 +52,14 @@ describe('pub-sub-manager', () => { test('case1', async () => { let count = 0; class Plugin1 extends Plugin { + get name() { + return 'Plugin1'; + } + + onMessage() { + ++count; + } + async beforeLoad() { this.app.pubSubManager.setAdapter(new RedisPubSubAdapter()); await this.app.pubSubManager.subscribe('chan1nel', (message) => { @@ -77,6 +85,9 @@ describe('pub-sub-manager', () => { await node1.pubSubManager.publish('chan1nel', `channel1_message_1`); await sleep(1000); expect(count).toBe(2); + await node1.pm.get(Plugin1).sendMessage('plugin send message'); + await sleep(1000); + expect(count).toBe(4); await node1.destroy(); await node2.destroy(); }); diff --git a/packages/core/server/src/plugin.ts b/packages/core/server/src/plugin.ts index aced18f95a..dca582a795 100644 --- a/packages/core/server/src/plugin.ts +++ b/packages/core/server/src/plugin.ts @@ -148,6 +148,15 @@ export abstract class Plugin implements PluginInterface { this.app.syncManager.publish(this.name, message); } + onMessage(message) {} + async sendMessage(message) { + if (!this.name) { + return; + } + console.log('sendMessage', this.name); + await this.app.pubSubManager.publish(this.name, message); + } + /** * @deprecated */ diff --git a/packages/core/server/src/pub-sub-manager.ts b/packages/core/server/src/pub-sub-manager.ts index 919af3d736..98e2e79346 100644 --- a/packages/core/server/src/pub-sub-manager.ts +++ b/packages/core/server/src/pub-sub-manager.ts @@ -23,6 +23,13 @@ export class PubSubManager { app.on('afterStop', async () => { await this.close(); }); + app.on('beforeLoadPlugin', async (plugin) => { + if (!plugin.name) { + return; + } + console.log('beforeLoadPlugin', plugin.name); + await this.subscribe(plugin.name, plugin.onMessage.bind(plugin)); + }); } get prefix() { @@ -34,6 +41,9 @@ export class PubSubManager { } async connect() { + if (!this.adapter) { + return; + } await this.adapter.connect(); // subscribe 要在 connect 之后 for (const [channel, callbacks] of this.subscribes) { @@ -44,6 +54,9 @@ export class PubSubManager { } async close() { + if (!this.adapter) { + return; + } return await this.adapter.close(); } @@ -61,14 +74,23 @@ export class PubSubManager { if (set) { set.delete(callback); } + if (!this.adapter) { + return; + } return this.adapter.unsubscribe(`${this.prefix}.${channel}`, callback); } async publish(channel, message) { + if (!this.adapter) { + return; + } return this.adapter.publish(`${this.prefix}.${channel}`, message); } onMessage(callback) { + if (!this.adapter) { + return; + } return this.adapter.onMessage((channel, message) => { if (channel.startsWith(`${this.prefix}.`)) { callback(channel, message); diff --git a/packages/core/server/src/sync-manager.ts b/packages/core/server/src/sync-manager.ts index faace9d7a6..e8fc42df5b 100644 --- a/packages/core/server/src/sync-manager.ts +++ b/packages/core/server/src/sync-manager.ts @@ -7,10 +7,10 @@ * For more information, please refer to: https://www.nocobase.com/agreement. */ +import { isEqual, uniqWith } from 'lodash'; import { randomUUID } from 'node:crypto'; import EventEmitter from 'node:events'; import Application from './application'; -import { isEqual, uniqWith } from 'lodash'; export abstract class SyncAdapter extends EventEmitter { abstract get ready(): boolean; @@ -42,6 +42,10 @@ export class SyncManager { return this.adapter ? this.adapter.ready : false; } + constructor(private app: Application) { + this.nodeId = `${process.env.NODE_ID || randomUUID()}-${process.pid}`; + } + private onMessage(namespace, message) { this.app.logger.info(`emit sync event in namespace ${namespace}`); this.eventEmitter.emit(namespace, message); @@ -81,10 +85,6 @@ export class SyncManager { } }; - constructor(private app: Application) { - this.nodeId = `${process.env.NODE_ID || randomUUID()}-${process.pid}`; - } - public init(adapter: SyncAdapter) { if (this.adapter) { throw new Error('sync adapter is already exists');