fix: test case

This commit is contained in:
chenos 2024-07-23 12:32:06 +08:00 committed by mytharcher
parent 011e71429b
commit 6ac9117e9d
4 changed files with 47 additions and 5 deletions

View File

@ -52,6 +52,14 @@ describe('pub-sub-manager', () => {
test('case1', async () => { test('case1', async () => {
let count = 0; let count = 0;
class Plugin1 extends Plugin { class Plugin1 extends Plugin {
get name() {
return 'Plugin1';
}
onMessage() {
++count;
}
async beforeLoad() { async beforeLoad() {
this.app.pubSubManager.setAdapter(new RedisPubSubAdapter()); this.app.pubSubManager.setAdapter(new RedisPubSubAdapter());
await this.app.pubSubManager.subscribe('chan1nel', (message) => { await this.app.pubSubManager.subscribe('chan1nel', (message) => {
@ -77,6 +85,9 @@ describe('pub-sub-manager', () => {
await node1.pubSubManager.publish('chan1nel', `channel1_message_1`); await node1.pubSubManager.publish('chan1nel', `channel1_message_1`);
await sleep(1000); await sleep(1000);
expect(count).toBe(2); expect(count).toBe(2);
await node1.pm.get(Plugin1).sendMessage('plugin send message');
await sleep(1000);
expect(count).toBe(4);
await node1.destroy(); await node1.destroy();
await node2.destroy(); await node2.destroy();
}); });

View File

@ -148,6 +148,15 @@ export abstract class Plugin<O = any> implements PluginInterface {
this.app.syncManager.publish(this.name, message); this.app.syncManager.publish(this.name, message);
} }
onMessage(message) {}
async sendMessage(message) {
if (!this.name) {
return;
}
console.log('sendMessage', this.name);
await this.app.pubSubManager.publish(this.name, message);
}
/** /**
* @deprecated * @deprecated
*/ */

View File

@ -23,6 +23,13 @@ export class PubSubManager {
app.on('afterStop', async () => { app.on('afterStop', async () => {
await this.close(); await this.close();
}); });
app.on('beforeLoadPlugin', async (plugin) => {
if (!plugin.name) {
return;
}
console.log('beforeLoadPlugin', plugin.name);
await this.subscribe(plugin.name, plugin.onMessage.bind(plugin));
});
} }
get prefix() { get prefix() {
@ -34,6 +41,9 @@ export class PubSubManager {
} }
async connect() { async connect() {
if (!this.adapter) {
return;
}
await this.adapter.connect(); await this.adapter.connect();
// subscribe 要在 connect 之后 // subscribe 要在 connect 之后
for (const [channel, callbacks] of this.subscribes) { for (const [channel, callbacks] of this.subscribes) {
@ -44,6 +54,9 @@ export class PubSubManager {
} }
async close() { async close() {
if (!this.adapter) {
return;
}
return await this.adapter.close(); return await this.adapter.close();
} }
@ -61,14 +74,23 @@ export class PubSubManager {
if (set) { if (set) {
set.delete(callback); set.delete(callback);
} }
if (!this.adapter) {
return;
}
return this.adapter.unsubscribe(`${this.prefix}.${channel}`, callback); return this.adapter.unsubscribe(`${this.prefix}.${channel}`, callback);
} }
async publish(channel, message) { async publish(channel, message) {
if (!this.adapter) {
return;
}
return this.adapter.publish(`${this.prefix}.${channel}`, message); return this.adapter.publish(`${this.prefix}.${channel}`, message);
} }
onMessage(callback) { onMessage(callback) {
if (!this.adapter) {
return;
}
return this.adapter.onMessage((channel, message) => { return this.adapter.onMessage((channel, message) => {
if (channel.startsWith(`${this.prefix}.`)) { if (channel.startsWith(`${this.prefix}.`)) {
callback(channel, message); callback(channel, message);

View File

@ -7,10 +7,10 @@
* For more information, please refer to: https://www.nocobase.com/agreement. * For more information, please refer to: https://www.nocobase.com/agreement.
*/ */
import { isEqual, uniqWith } from 'lodash';
import { randomUUID } from 'node:crypto'; import { randomUUID } from 'node:crypto';
import EventEmitter from 'node:events'; import EventEmitter from 'node:events';
import Application from './application'; import Application from './application';
import { isEqual, uniqWith } from 'lodash';
export abstract class SyncAdapter extends EventEmitter { export abstract class SyncAdapter extends EventEmitter {
abstract get ready(): boolean; abstract get ready(): boolean;
@ -42,6 +42,10 @@ export class SyncManager {
return this.adapter ? this.adapter.ready : false; return this.adapter ? this.adapter.ready : false;
} }
constructor(private app: Application) {
this.nodeId = `${process.env.NODE_ID || randomUUID()}-${process.pid}`;
}
private onMessage(namespace, message) { private onMessage(namespace, message) {
this.app.logger.info(`emit sync event in namespace ${namespace}`); this.app.logger.info(`emit sync event in namespace ${namespace}`);
this.eventEmitter.emit(namespace, message); this.eventEmitter.emit(namespace, message);
@ -81,10 +85,6 @@ export class SyncManager {
} }
}; };
constructor(private app: Application) {
this.nodeId = `${process.env.NODE_ID || randomUUID()}-${process.pid}`;
}
public init(adapter: SyncAdapter) { public init(adapter: SyncAdapter) {
if (this.adapter) { if (this.adapter) {
throw new Error('sync adapter is already exists'); throw new Error('sync adapter is already exists');