From ad39ff21eee403bc8aa3fe4a0ad59b42c86d4a94 Mon Sep 17 00:00:00 2001 From: chenos Date: Thu, 25 Jul 2024 15:34:51 +0800 Subject: [PATCH] feat: test cases --- .../src/__tests__/pub-sub-manager.test.ts | 109 ++++++++++++++---- .../__tests__/sync-message-manager.test.ts | 45 ++++++++ packages/core/server/src/application.ts | 13 ++- packages/core/server/src/plugin.ts | 4 +- packages/core/server/src/pub-sub-manager.ts | 51 ++++---- .../core/server/src/sync-message-manager.ts | 49 ++++++++ .../test/src/server/memory-pub-sub-adapter.ts | 12 +- packages/core/test/src/server/mock-server.ts | 38 +++++- 8 files changed, 263 insertions(+), 58 deletions(-) create mode 100644 packages/core/server/src/__tests__/sync-message-manager.test.ts create mode 100644 packages/core/server/src/sync-message-manager.ts diff --git a/packages/core/server/src/__tests__/pub-sub-manager.test.ts b/packages/core/server/src/__tests__/pub-sub-manager.test.ts index 7132ea4a15..565ea2725c 100644 --- a/packages/core/server/src/__tests__/pub-sub-manager.test.ts +++ b/packages/core/server/src/__tests__/pub-sub-manager.test.ts @@ -7,7 +7,7 @@ * For more information, please refer to: https://www.nocobase.com/agreement. */ -import { MemoryPubSubAdapter, sleep } from '@nocobase/test'; +import { MemoryPubSubAdapter, MockServer, createMockServer, sleep } from '@nocobase/test'; import { PubSubManager } from '../pub-sub-manager'; describe('connect', () => { @@ -40,7 +40,7 @@ describe('connect', () => { test('subscribe before connect', async () => { const mockListener = vi.fn(); - await pubSubManager.subscribe('test1', mockListener, { skipSelf: false }); + await pubSubManager.subscribe('test1', mockListener); await pubSubManager.connect(); await pubSubManager.publish('test1', 'message1'); expect(mockListener).toHaveBeenCalled(); @@ -51,7 +51,7 @@ describe('connect', () => { test('subscribe after connect', async () => { await pubSubManager.connect(); const mockListener = vi.fn(); - await pubSubManager.subscribe('test1', mockListener, { skipSelf: false }); + await pubSubManager.subscribe('test1', mockListener); await pubSubManager.publish('test1', 'message1'); expect(mockListener).toHaveBeenCalled(); expect(mockListener).toBeCalledTimes(1); @@ -72,25 +72,25 @@ describe('skipSelf, unsubscribe, debounce', () => { await pubSubManager.close(); }); - test('skipSelf: true', async () => { - const mockListener = vi.fn(); - await pubSubManager.subscribe('test1', mockListener); - await pubSubManager.publish('test1', 'message1'); - expect(mockListener).not.toHaveBeenCalled(); - }); - test('skipSelf: false', async () => { const mockListener = vi.fn(); - await pubSubManager.subscribe('test1', mockListener, { skipSelf: false }); + await pubSubManager.subscribe('test1', mockListener); await pubSubManager.publish('test1', 'message1'); expect(mockListener).toHaveBeenCalled(); expect(mockListener).toBeCalledTimes(1); expect(mockListener).toHaveBeenCalledWith('message1'); }); + test('skipSelf: true', async () => { + const mockListener = vi.fn(); + await pubSubManager.subscribe('test1', mockListener); + await pubSubManager.publish('test1', 'message1', { skipSelf: true }); + expect(mockListener).not.toHaveBeenCalled(); + }); + test('debounce', async () => { const mockListener = vi.fn(); - await pubSubManager.subscribe('test1', mockListener, { skipSelf: false, debounce: 1000 }); + await pubSubManager.subscribe('test1', mockListener, { debounce: 1000 }); pubSubManager.publish('test1', 'message1'); pubSubManager.publish('test1', 'message1'); pubSubManager.publish('test1', 'message1'); @@ -109,7 +109,7 @@ describe('skipSelf, unsubscribe, debounce', () => { test('debounce', async () => { const mockListener = vi.fn(); - await pubSubManager.subscribeAll(mockListener, { skipSelf: false, debounce: 1000 }); + await pubSubManager.subscribeAll(mockListener, { debounce: 1000 }); pubSubManager.publish('test1', 'message1'); pubSubManager.publish('test1', 'message1'); pubSubManager.publish('test1', 'message2'); @@ -120,9 +120,34 @@ describe('skipSelf, unsubscribe, debounce', () => { expect(mockListener).toBeCalledTimes(3); }); + test('message format', async () => { + const mockListener = vi.fn(); + await pubSubManager.subscribe('test1', mockListener); + await pubSubManager.publish('test1', 1); + expect(mockListener).toBeCalledTimes(1); + expect(mockListener).toHaveBeenCalledWith(1); + const msg2 = ['message1']; + await pubSubManager.publish('test1', msg2); + expect(mockListener).toBeCalledTimes(2); + expect(mockListener).toHaveBeenCalledWith(msg2); + const msg3 = { type: 'test' }; + await pubSubManager.publish('test1', msg3); + expect(mockListener).toBeCalledTimes(3); + expect(mockListener).toHaveBeenCalledWith(msg3); + await pubSubManager.publish('test1', true); + expect(mockListener).toBeCalledTimes(4); + expect(mockListener).toHaveBeenCalledWith(true); + await pubSubManager.publish('test1', false); + expect(mockListener).toBeCalledTimes(5); + expect(mockListener).toHaveBeenCalledWith(false); + await pubSubManager.publish('test1', null); + expect(mockListener).toBeCalledTimes(6); + expect(mockListener).toHaveBeenCalledWith(null); + }); + test('unsubscribe', async () => { const mockListener = vi.fn(); - await pubSubManager.subscribe('test1', mockListener, { skipSelf: false }); + await pubSubManager.subscribe('test1', mockListener); await pubSubManager.publish('test1', 'message1'); expect(mockListener).toHaveBeenCalled(); expect(mockListener).toBeCalledTimes(1); @@ -136,17 +161,17 @@ describe('skipSelf, unsubscribe, debounce', () => { const mockListener = vi.fn(); await pubSubManager.subscribeAll(mockListener); await pubSubManager.publish('test1', 'message1'); - expect(mockListener).not.toHaveBeenCalled(); - }); - - test('subscribeAll + skipSelf: false', async () => { - const mockListener = vi.fn(); - await pubSubManager.subscribeAll(mockListener, { skipSelf: false }); - await pubSubManager.publish('test1', 'message1'); expect(mockListener).toHaveBeenCalled(); expect(mockListener).toBeCalledTimes(1); expect(mockListener).toHaveBeenCalledWith('test1', 'message1'); }); + + test('publish + skipSelf: false', async () => { + const mockListener = vi.fn(); + await pubSubManager.subscribeAll(mockListener); + await pubSubManager.publish('test1', 'message1', { skipSelf: true }); + expect(mockListener).not.toHaveBeenCalled(); + }); }); describe('Pub/Sub', () => { @@ -186,4 +211,46 @@ describe('Pub/Sub', () => { expect(mockListener).toBeCalledTimes(1); expect(mockListener).toHaveBeenCalledWith('message1'); }); + + test('publish only self', async () => { + const mockListener = vi.fn(); + await subscriber.subscribe('test1', mockListener); + await publisher.subscribe('test1', mockListener); + await publisher.publish('test1', 'message1', { onlySelf: true }); + expect(mockListener).toHaveBeenCalled(); + expect(mockListener).toBeCalledTimes(1); + expect(mockListener).toHaveBeenCalledWith('message1'); + }); +}); + +describe('app.pubSubManager', () => { + let app: MockServer; + let pubSubManager: PubSubManager; + + beforeEach(async () => { + app = await createMockServer({ + pubSubManager: { + basename: 'app1', + }, + }); + pubSubManager = app.pubSubManager; + }); + + afterEach(async () => { + await app.destroy(); + }); + + test('adapter', async () => { + expect(pubSubManager.connected).toBe(true); + expect(pubSubManager.adapter).toBeInstanceOf(MemoryPubSubAdapter); + }); + + test('subscribe + publish', async () => { + const mockListener = vi.fn(); + await pubSubManager.subscribe('test1', mockListener); + await pubSubManager.publish('test1', 'message1'); + expect(mockListener).toHaveBeenCalled(); + expect(mockListener).toBeCalledTimes(1); + expect(mockListener).toHaveBeenCalledWith('message1'); + }); }); diff --git a/packages/core/server/src/__tests__/sync-message-manager.test.ts b/packages/core/server/src/__tests__/sync-message-manager.test.ts new file mode 100644 index 0000000000..b26db4c324 --- /dev/null +++ b/packages/core/server/src/__tests__/sync-message-manager.test.ts @@ -0,0 +1,45 @@ +/** + * This file is part of the NocoBase (R) project. + * Copyright (c) 2020-2024 NocoBase Co., Ltd. + * Authors: NocoBase Team. + * + * This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License. + * For more information, please refer to: https://www.nocobase.com/agreement. + */ + +import { Plugin } from '@nocobase/server'; +import { createMultiMockServer } from '@nocobase/test'; + +describe('sync-message-manager', () => { + test('subscribe + publish', async () => { + const [node1, node2] = await createMultiMockServer({ basename: 'base1' }); + const mockListener = vi.fn(); + await node1.syncMessageManager.subscribe('test1', mockListener); + await node2.syncMessageManager.subscribe('test1', mockListener); + await node2.syncMessageManager.publish('test1', 'message1'); + expect(mockListener).toHaveBeenCalled(); + expect(mockListener).toBeCalledTimes(1); + expect(mockListener).toHaveBeenCalledWith('message1'); + await node1.destroy(); + await node2.destroy(); + }); + + test('plugin.handleSyncMessage', async () => { + const mockListener = vi.fn(); + class MyPlugin extends Plugin { + get name() { + return 'test1'; + } + async handleSyncMessage(message) { + mockListener(message); + } + } + const [node1, node2] = await createMultiMockServer({ basename: 'base1', plugins: [MyPlugin] }); + await node1.pm.get(MyPlugin).sendSyncMessage('message1'); + expect(mockListener).toBeCalledTimes(1); + expect(mockListener).toHaveBeenCalledWith('message1'); + await node2.pm.get(MyPlugin).sendSyncMessage('message2'); + expect(mockListener).toBeCalledTimes(2); + expect(mockListener).toHaveBeenCalledWith('message2'); + }); +}); diff --git a/packages/core/server/src/application.ts b/packages/core/server/src/application.ts index 88e62f5a07..9e97e1b1a0 100644 --- a/packages/core/server/src/application.ts +++ b/packages/core/server/src/application.ts @@ -60,8 +60,9 @@ import { dataTemplate } from './middlewares/data-template'; import validateFilterParams from './middlewares/validate-filter-params'; import { Plugin } from './plugin'; import { InstallOptions, PluginManager } from './plugin-manager'; -import { PubSubManager } from './pub-sub-manager'; +import { PubSubManager, PubSubManagerOptions } from './pub-sub-manager'; import { SyncManager } from './sync-manager'; +import { SyncMessageManager } from './sync-message-manager'; import packageJson from '../package.json'; @@ -99,7 +100,8 @@ export interface ApplicationOptions { */ resourcer?: ResourceManagerOptions; resourceManager?: ResourceManagerOptions; - pubSubManager?: any; + pubSubManager?: PubSubManagerOptions; + syncMessageManager?: any; bodyParser?: any; cors?: any; dataWrapping?: boolean; @@ -230,6 +232,7 @@ export class Application exten */ public syncManager: SyncManager; public pubSubManager: PubSubManager; + public syncMessageManager: SyncMessageManager; public requestLogger: Logger; private sqlLogger: Logger; protected _logger: SystemLogger; @@ -1129,7 +1132,11 @@ export class Application exten this._cli = this.createCLI(); this._i18n = createI18n(options); this.syncManager = new SyncManager(this); - this.pubSubManager = PubSubManager.create(this, options.pubSubManager); + this.pubSubManager = PubSubManager.create(this, { + basename: this.name, + ...options.pubSubManager, + }); + this.syncMessageManager = new SyncMessageManager(this, options.syncMessageManager); this.context.db = this.db; /** diff --git a/packages/core/server/src/plugin.ts b/packages/core/server/src/plugin.ts index a0814ae9af..917e5585b7 100644 --- a/packages/core/server/src/plugin.ts +++ b/packages/core/server/src/plugin.ts @@ -151,9 +151,9 @@ export abstract class Plugin implements PluginInterface { async handleSyncMessage(message) {} async sendSyncMessage(message) { if (!this.name) { - return; + throw new Error(`plugin name invalid`); } - await this.app.pubSubManager.publish(this.name, message); + await this.app.syncMessageManager.publish(this.name, message); } /** diff --git a/packages/core/server/src/pub-sub-manager.ts b/packages/core/server/src/pub-sub-manager.ts index 398c9b14f5..f95f8d6eab 100644 --- a/packages/core/server/src/pub-sub-manager.ts +++ b/packages/core/server/src/pub-sub-manager.ts @@ -16,8 +16,12 @@ export interface PubSubManagerOptions { basename?: string; } -export interface PubSubManagerCallbackOptions { +export interface PubSubManagerPublishOptions { skipSelf?: boolean; + onlySelf?: boolean; +} + +export interface PubSubManagerSubscribeOptions { debounce?: number; } @@ -36,14 +40,6 @@ export class PubSubManager { app.on('afterStop', async () => { await pubSubManager.close(); }); - app.on('beforeLoadPlugin', async (plugin) => { - if (!plugin.name) { - return; - } - await pubSubManager.subscribe(plugin.name, plugin.handleSyncMessage.bind(plugin), { - debounce: Number(process.env.PUB_SUB_DEFAULT_DEBOUNCE || 1000), - }); - }); return pubSubManager; } @@ -85,11 +81,13 @@ export class PubSubManager { return crypto.createHash('sha256').update(JSON.stringify(message)).digest('hex'); } - async subscribe(channel: string, callback, options: PubSubManagerCallbackOptions = {}) { - const { skipSelf = true, debounce = 0 } = options; + async subscribe(channel: string, callback, options: PubSubManagerSubscribeOptions = {}) { + const { debounce = 0 } = options; const wrappedCallback = async (wrappedMessage) => { - const { publisherId, message } = JSON.parse(wrappedMessage); - if (skipSelf && publisherId === this.publisherId) { + const { onlySelf, skipSelf, publisherId, message } = JSON.parse(wrappedMessage); + if (onlySelf && publisherId !== this.publisherId) { + return; + } else if (!onlySelf && skipSelf && publisherId === this.publisherId) { return; } if (!debounce) { @@ -131,37 +129,33 @@ export class PubSubManager { return this.adapter.unsubscribe(`${this.basename}${channel}`, fn); } - async publish(channel, message) { + async publish(channel, message, options?: PubSubManagerPublishOptions) { if (!this.adapter) { return; } const wrappedMessage = JSON.stringify({ publisherId: this.publisherId, + ...options, message: message, }); return this.adapter.publish(`${this.basename}${channel}`, wrappedMessage); } - protected debounce(func, wait: number) { - if (wait) { - return _.debounce(func, wait); - } - return func; - } - - async subscribeAll(callback, options: PubSubManagerCallbackOptions = {}) { + async subscribeAll(callback, options: PubSubManagerSubscribeOptions = {}) { if (!this.adapter) { return; } - const { skipSelf = true, debounce = 0 } = options; + const { debounce = 0 } = options; return this.adapter.subscribeAll(async (channel: string, wrappedMessage) => { if (!channel.startsWith(this.basename)) { return; } - const { publisherId, message } = JSON.parse(wrappedMessage); - if (skipSelf && publisherId === this.publisherId) { + const { onlySelf, skipSelf, publisherId, message } = JSON.parse(wrappedMessage); + if (onlySelf && publisherId !== this.publisherId) { + return; + } else if (!onlySelf && skipSelf && publisherId === this.publisherId) { return; } const realChannel = channel.substring(this.basename.length); @@ -178,6 +172,13 @@ export class PubSubManager { this.messageHanders.delete(messageHash); }); } + + protected debounce(func, wait: number) { + if (wait) { + return _.debounce(func, wait); + } + return func; + } } export interface IPubSubAdapter { diff --git a/packages/core/server/src/sync-message-manager.ts b/packages/core/server/src/sync-message-manager.ts new file mode 100644 index 0000000000..c0f4ebc524 --- /dev/null +++ b/packages/core/server/src/sync-message-manager.ts @@ -0,0 +1,49 @@ +/** + * This file is part of the NocoBase (R) project. + * Copyright (c) 2020-2024 NocoBase Co., Ltd. + * Authors: NocoBase Team. + * + * This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License. + * For more information, please refer to: https://www.nocobase.com/agreement. + */ + +import Application from './application'; +import { PubSubManager, PubSubManagerPublishOptions } from './pub-sub-manager'; + +export class SyncMessageManager { + protected versionManager: SyncMessageVersionManager; + protected pubSubManager: PubSubManager; + + constructor( + protected app: Application, + protected options: any = {}, + ) { + this.versionManager = new SyncMessageVersionManager(); + app.on('beforeLoadPlugin', async (plugin) => { + if (!plugin.name) { + return; + } + await this.subscribe(plugin.name, plugin.handleSyncMessage.bind(plugin)); + }); + } + + get debounce() { + return this.options.debounce || 1000; + } + + async publish(channel: string, message, options?: PubSubManagerPublishOptions) { + await this.app.pubSubManager.publish(`${this.app.name}.sync.${channel}`, message, { skipSelf: true, ...options }); + } + + async subscribe(channel: string, callback) { + await this.app.pubSubManager.subscribe(`${this.app.name}.sync.${channel}`, callback, { debounce: this.debounce }); + } + + async sync() { + // TODO + } +} + +export class SyncMessageVersionManager { + // TODO +} diff --git a/packages/core/test/src/server/memory-pub-sub-adapter.ts b/packages/core/test/src/server/memory-pub-sub-adapter.ts index 2ccdcba557..1d2151196b 100644 --- a/packages/core/test/src/server/memory-pub-sub-adapter.ts +++ b/packages/core/test/src/server/memory-pub-sub-adapter.ts @@ -25,17 +25,17 @@ export class MemoryPubSubAdapter implements IPubSubAdapter { static instances = new Map(); - static create(name?: string) { + static create(name?: string, options?: any) { if (!name) { name = uid(); } if (!this.instances.has(name)) { - this.instances.set(name, new MemoryPubSubAdapter()); + this.instances.set(name, new MemoryPubSubAdapter(options)); } return this.instances.get(name); } - constructor() { + constructor(protected options: any = {}) { this.emitter = new TestEventEmitter(); } @@ -56,12 +56,16 @@ export class MemoryPubSubAdapter implements IPubSubAdapter { } async publish(channel, message) { + console.log(this.connected, { channel, message }); if (!this.connected) { return; } await this.emitter.emitAsync(channel, message); await this.emitter.emitAsync('__publish__', channel, message); - await sleep(Number(process.env.PUB_SUB_DEFAULT_DEBOUNCE || 1000)); + // 用于处理延迟问题 + if (this.options.debounce) { + await sleep(Number(this.options.debounce)); + } } async subscribeAll(callback) { diff --git a/packages/core/test/src/server/mock-server.ts b/packages/core/test/src/server/mock-server.ts index 86c3dbcb58..e0e51315ba 100644 --- a/packages/core/test/src/server/mock-server.ts +++ b/packages/core/test/src/server/mock-server.ts @@ -10,6 +10,7 @@ import { mockDatabase } from '@nocobase/database'; import { Application, ApplicationOptions, AppSupervisor, Gateway, PluginManager } from '@nocobase/server'; import jwt from 'jsonwebtoken'; +import _ from 'lodash'; import qs from 'qs'; import supertest, { SuperAgentTest } from 'supertest'; import { MemoryPubSubAdapter } from './memory-pub-sub-adapter'; @@ -231,12 +232,19 @@ export function mockServer(options: ApplicationOptions = {}) { const app = new MockServer({ acl: false, + syncMessageManager: { + debounce: 1000, + }, ...options, }); - if (options.pubSubManager) { - app.pubSubManager.setAdapter(MemoryPubSubAdapter.create(options.pubSubManager?.basename)); - } + const basename = app.options.pubSubManager?.basename || app.name; + + app.pubSubManager.setAdapter( + MemoryPubSubAdapter.create(basename, { + debounce: 1000, + }), + ); return app; } @@ -249,6 +257,30 @@ export async function startMockServer(options: ApplicationOptions = {}) { type BeforeInstallFn = (app) => Promise; +export async function createMultiMockServer( + options: ApplicationOptions & { + number?: number; + version?: string; + basename?: string; + beforeInstall?: BeforeInstallFn; + skipInstall?: boolean; + skipStart?: boolean; + } = {}, +) { + const instances: MockServer[] = []; + for (const i of _.range(0, options.number || 2)) { + const app: MockServer = await createMockServer({ + ...options, + skipSupervisor: true, + pubSubManager: { + basename: options.basename, + }, + }); + instances.push(app); + } + return instances; +} + export async function createMockServer( options: ApplicationOptions & { version?: string;