feat: test cases

This commit is contained in:
chenos 2024-07-25 15:34:51 +08:00 committed by mytharcher
parent 58ef2213b9
commit ad39ff21ee
8 changed files with 263 additions and 58 deletions

View File

@ -7,7 +7,7 @@
* For more information, please refer to: https://www.nocobase.com/agreement. * For more information, please refer to: https://www.nocobase.com/agreement.
*/ */
import { MemoryPubSubAdapter, sleep } from '@nocobase/test'; import { MemoryPubSubAdapter, MockServer, createMockServer, sleep } from '@nocobase/test';
import { PubSubManager } from '../pub-sub-manager'; import { PubSubManager } from '../pub-sub-manager';
describe('connect', () => { describe('connect', () => {
@ -40,7 +40,7 @@ describe('connect', () => {
test('subscribe before connect', async () => { test('subscribe before connect', async () => {
const mockListener = vi.fn(); const mockListener = vi.fn();
await pubSubManager.subscribe('test1', mockListener, { skipSelf: false }); await pubSubManager.subscribe('test1', mockListener);
await pubSubManager.connect(); await pubSubManager.connect();
await pubSubManager.publish('test1', 'message1'); await pubSubManager.publish('test1', 'message1');
expect(mockListener).toHaveBeenCalled(); expect(mockListener).toHaveBeenCalled();
@ -51,7 +51,7 @@ describe('connect', () => {
test('subscribe after connect', async () => { test('subscribe after connect', async () => {
await pubSubManager.connect(); await pubSubManager.connect();
const mockListener = vi.fn(); const mockListener = vi.fn();
await pubSubManager.subscribe('test1', mockListener, { skipSelf: false }); await pubSubManager.subscribe('test1', mockListener);
await pubSubManager.publish('test1', 'message1'); await pubSubManager.publish('test1', 'message1');
expect(mockListener).toHaveBeenCalled(); expect(mockListener).toHaveBeenCalled();
expect(mockListener).toBeCalledTimes(1); expect(mockListener).toBeCalledTimes(1);
@ -72,25 +72,25 @@ describe('skipSelf, unsubscribe, debounce', () => {
await pubSubManager.close(); await pubSubManager.close();
}); });
test('skipSelf: true', async () => {
const mockListener = vi.fn();
await pubSubManager.subscribe('test1', mockListener);
await pubSubManager.publish('test1', 'message1');
expect(mockListener).not.toHaveBeenCalled();
});
test('skipSelf: false', async () => { test('skipSelf: false', async () => {
const mockListener = vi.fn(); const mockListener = vi.fn();
await pubSubManager.subscribe('test1', mockListener, { skipSelf: false }); await pubSubManager.subscribe('test1', mockListener);
await pubSubManager.publish('test1', 'message1'); await pubSubManager.publish('test1', 'message1');
expect(mockListener).toHaveBeenCalled(); expect(mockListener).toHaveBeenCalled();
expect(mockListener).toBeCalledTimes(1); expect(mockListener).toBeCalledTimes(1);
expect(mockListener).toHaveBeenCalledWith('message1'); expect(mockListener).toHaveBeenCalledWith('message1');
}); });
test('skipSelf: true', async () => {
const mockListener = vi.fn();
await pubSubManager.subscribe('test1', mockListener);
await pubSubManager.publish('test1', 'message1', { skipSelf: true });
expect(mockListener).not.toHaveBeenCalled();
});
test('debounce', async () => { test('debounce', async () => {
const mockListener = vi.fn(); const mockListener = vi.fn();
await pubSubManager.subscribe('test1', mockListener, { skipSelf: false, debounce: 1000 }); await pubSubManager.subscribe('test1', mockListener, { debounce: 1000 });
pubSubManager.publish('test1', 'message1'); pubSubManager.publish('test1', 'message1');
pubSubManager.publish('test1', 'message1'); pubSubManager.publish('test1', 'message1');
pubSubManager.publish('test1', 'message1'); pubSubManager.publish('test1', 'message1');
@ -109,7 +109,7 @@ describe('skipSelf, unsubscribe, debounce', () => {
test('debounce', async () => { test('debounce', async () => {
const mockListener = vi.fn(); const mockListener = vi.fn();
await pubSubManager.subscribeAll(mockListener, { skipSelf: false, debounce: 1000 }); await pubSubManager.subscribeAll(mockListener, { debounce: 1000 });
pubSubManager.publish('test1', 'message1'); pubSubManager.publish('test1', 'message1');
pubSubManager.publish('test1', 'message1'); pubSubManager.publish('test1', 'message1');
pubSubManager.publish('test1', 'message2'); pubSubManager.publish('test1', 'message2');
@ -120,9 +120,34 @@ describe('skipSelf, unsubscribe, debounce', () => {
expect(mockListener).toBeCalledTimes(3); expect(mockListener).toBeCalledTimes(3);
}); });
test('message format', async () => {
const mockListener = vi.fn();
await pubSubManager.subscribe('test1', mockListener);
await pubSubManager.publish('test1', 1);
expect(mockListener).toBeCalledTimes(1);
expect(mockListener).toHaveBeenCalledWith(1);
const msg2 = ['message1'];
await pubSubManager.publish('test1', msg2);
expect(mockListener).toBeCalledTimes(2);
expect(mockListener).toHaveBeenCalledWith(msg2);
const msg3 = { type: 'test' };
await pubSubManager.publish('test1', msg3);
expect(mockListener).toBeCalledTimes(3);
expect(mockListener).toHaveBeenCalledWith(msg3);
await pubSubManager.publish('test1', true);
expect(mockListener).toBeCalledTimes(4);
expect(mockListener).toHaveBeenCalledWith(true);
await pubSubManager.publish('test1', false);
expect(mockListener).toBeCalledTimes(5);
expect(mockListener).toHaveBeenCalledWith(false);
await pubSubManager.publish('test1', null);
expect(mockListener).toBeCalledTimes(6);
expect(mockListener).toHaveBeenCalledWith(null);
});
test('unsubscribe', async () => { test('unsubscribe', async () => {
const mockListener = vi.fn(); const mockListener = vi.fn();
await pubSubManager.subscribe('test1', mockListener, { skipSelf: false }); await pubSubManager.subscribe('test1', mockListener);
await pubSubManager.publish('test1', 'message1'); await pubSubManager.publish('test1', 'message1');
expect(mockListener).toHaveBeenCalled(); expect(mockListener).toHaveBeenCalled();
expect(mockListener).toBeCalledTimes(1); expect(mockListener).toBeCalledTimes(1);
@ -136,17 +161,17 @@ describe('skipSelf, unsubscribe, debounce', () => {
const mockListener = vi.fn(); const mockListener = vi.fn();
await pubSubManager.subscribeAll(mockListener); await pubSubManager.subscribeAll(mockListener);
await pubSubManager.publish('test1', 'message1'); await pubSubManager.publish('test1', 'message1');
expect(mockListener).not.toHaveBeenCalled();
});
test('subscribeAll + skipSelf: false', async () => {
const mockListener = vi.fn();
await pubSubManager.subscribeAll(mockListener, { skipSelf: false });
await pubSubManager.publish('test1', 'message1');
expect(mockListener).toHaveBeenCalled(); expect(mockListener).toHaveBeenCalled();
expect(mockListener).toBeCalledTimes(1); expect(mockListener).toBeCalledTimes(1);
expect(mockListener).toHaveBeenCalledWith('test1', 'message1'); expect(mockListener).toHaveBeenCalledWith('test1', 'message1');
}); });
test('publish + skipSelf: false', async () => {
const mockListener = vi.fn();
await pubSubManager.subscribeAll(mockListener);
await pubSubManager.publish('test1', 'message1', { skipSelf: true });
expect(mockListener).not.toHaveBeenCalled();
});
}); });
describe('Pub/Sub', () => { describe('Pub/Sub', () => {
@ -186,4 +211,46 @@ describe('Pub/Sub', () => {
expect(mockListener).toBeCalledTimes(1); expect(mockListener).toBeCalledTimes(1);
expect(mockListener).toHaveBeenCalledWith('message1'); expect(mockListener).toHaveBeenCalledWith('message1');
}); });
test('publish only self', async () => {
const mockListener = vi.fn();
await subscriber.subscribe('test1', mockListener);
await publisher.subscribe('test1', mockListener);
await publisher.publish('test1', 'message1', { onlySelf: true });
expect(mockListener).toHaveBeenCalled();
expect(mockListener).toBeCalledTimes(1);
expect(mockListener).toHaveBeenCalledWith('message1');
});
});
describe('app.pubSubManager', () => {
let app: MockServer;
let pubSubManager: PubSubManager;
beforeEach(async () => {
app = await createMockServer({
pubSubManager: {
basename: 'app1',
},
});
pubSubManager = app.pubSubManager;
});
afterEach(async () => {
await app.destroy();
});
test('adapter', async () => {
expect(pubSubManager.connected).toBe(true);
expect(pubSubManager.adapter).toBeInstanceOf(MemoryPubSubAdapter);
});
test('subscribe + publish', async () => {
const mockListener = vi.fn();
await pubSubManager.subscribe('test1', mockListener);
await pubSubManager.publish('test1', 'message1');
expect(mockListener).toHaveBeenCalled();
expect(mockListener).toBeCalledTimes(1);
expect(mockListener).toHaveBeenCalledWith('message1');
});
}); });

View File

@ -0,0 +1,45 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import { Plugin } from '@nocobase/server';
import { createMultiMockServer } from '@nocobase/test';
describe('sync-message-manager', () => {
test('subscribe + publish', async () => {
const [node1, node2] = await createMultiMockServer({ basename: 'base1' });
const mockListener = vi.fn();
await node1.syncMessageManager.subscribe('test1', mockListener);
await node2.syncMessageManager.subscribe('test1', mockListener);
await node2.syncMessageManager.publish('test1', 'message1');
expect(mockListener).toHaveBeenCalled();
expect(mockListener).toBeCalledTimes(1);
expect(mockListener).toHaveBeenCalledWith('message1');
await node1.destroy();
await node2.destroy();
});
test('plugin.handleSyncMessage', async () => {
const mockListener = vi.fn();
class MyPlugin extends Plugin {
get name() {
return 'test1';
}
async handleSyncMessage(message) {
mockListener(message);
}
}
const [node1, node2] = await createMultiMockServer({ basename: 'base1', plugins: [MyPlugin] });
await node1.pm.get(MyPlugin).sendSyncMessage('message1');
expect(mockListener).toBeCalledTimes(1);
expect(mockListener).toHaveBeenCalledWith('message1');
await node2.pm.get(MyPlugin).sendSyncMessage('message2');
expect(mockListener).toBeCalledTimes(2);
expect(mockListener).toHaveBeenCalledWith('message2');
});
});

View File

@ -60,8 +60,9 @@ import { dataTemplate } from './middlewares/data-template';
import validateFilterParams from './middlewares/validate-filter-params'; import validateFilterParams from './middlewares/validate-filter-params';
import { Plugin } from './plugin'; import { Plugin } from './plugin';
import { InstallOptions, PluginManager } from './plugin-manager'; import { InstallOptions, PluginManager } from './plugin-manager';
import { PubSubManager } from './pub-sub-manager'; import { PubSubManager, PubSubManagerOptions } from './pub-sub-manager';
import { SyncManager } from './sync-manager'; import { SyncManager } from './sync-manager';
import { SyncMessageManager } from './sync-message-manager';
import packageJson from '../package.json'; import packageJson from '../package.json';
@ -99,7 +100,8 @@ export interface ApplicationOptions {
*/ */
resourcer?: ResourceManagerOptions; resourcer?: ResourceManagerOptions;
resourceManager?: ResourceManagerOptions; resourceManager?: ResourceManagerOptions;
pubSubManager?: any; pubSubManager?: PubSubManagerOptions;
syncMessageManager?: any;
bodyParser?: any; bodyParser?: any;
cors?: any; cors?: any;
dataWrapping?: boolean; dataWrapping?: boolean;
@ -230,6 +232,7 @@ export class Application<StateT = DefaultState, ContextT = DefaultContext> exten
*/ */
public syncManager: SyncManager; public syncManager: SyncManager;
public pubSubManager: PubSubManager; public pubSubManager: PubSubManager;
public syncMessageManager: SyncMessageManager;
public requestLogger: Logger; public requestLogger: Logger;
private sqlLogger: Logger; private sqlLogger: Logger;
protected _logger: SystemLogger; protected _logger: SystemLogger;
@ -1129,7 +1132,11 @@ export class Application<StateT = DefaultState, ContextT = DefaultContext> exten
this._cli = this.createCLI(); this._cli = this.createCLI();
this._i18n = createI18n(options); this._i18n = createI18n(options);
this.syncManager = new SyncManager(this); this.syncManager = new SyncManager(this);
this.pubSubManager = PubSubManager.create(this, options.pubSubManager); this.pubSubManager = PubSubManager.create(this, {
basename: this.name,
...options.pubSubManager,
});
this.syncMessageManager = new SyncMessageManager(this, options.syncMessageManager);
this.context.db = this.db; this.context.db = this.db;
/** /**

View File

@ -151,9 +151,9 @@ export abstract class Plugin<O = any> implements PluginInterface {
async handleSyncMessage(message) {} async handleSyncMessage(message) {}
async sendSyncMessage(message) { async sendSyncMessage(message) {
if (!this.name) { if (!this.name) {
return; throw new Error(`plugin name invalid`);
} }
await this.app.pubSubManager.publish(this.name, message); await this.app.syncMessageManager.publish(this.name, message);
} }
/** /**

View File

@ -16,8 +16,12 @@ export interface PubSubManagerOptions {
basename?: string; basename?: string;
} }
export interface PubSubManagerCallbackOptions { export interface PubSubManagerPublishOptions {
skipSelf?: boolean; skipSelf?: boolean;
onlySelf?: boolean;
}
export interface PubSubManagerSubscribeOptions {
debounce?: number; debounce?: number;
} }
@ -36,14 +40,6 @@ export class PubSubManager {
app.on('afterStop', async () => { app.on('afterStop', async () => {
await pubSubManager.close(); await pubSubManager.close();
}); });
app.on('beforeLoadPlugin', async (plugin) => {
if (!plugin.name) {
return;
}
await pubSubManager.subscribe(plugin.name, plugin.handleSyncMessage.bind(plugin), {
debounce: Number(process.env.PUB_SUB_DEFAULT_DEBOUNCE || 1000),
});
});
return pubSubManager; return pubSubManager;
} }
@ -85,11 +81,13 @@ export class PubSubManager {
return crypto.createHash('sha256').update(JSON.stringify(message)).digest('hex'); return crypto.createHash('sha256').update(JSON.stringify(message)).digest('hex');
} }
async subscribe(channel: string, callback, options: PubSubManagerCallbackOptions = {}) { async subscribe(channel: string, callback, options: PubSubManagerSubscribeOptions = {}) {
const { skipSelf = true, debounce = 0 } = options; const { debounce = 0 } = options;
const wrappedCallback = async (wrappedMessage) => { const wrappedCallback = async (wrappedMessage) => {
const { publisherId, message } = JSON.parse(wrappedMessage); const { onlySelf, skipSelf, publisherId, message } = JSON.parse(wrappedMessage);
if (skipSelf && publisherId === this.publisherId) { if (onlySelf && publisherId !== this.publisherId) {
return;
} else if (!onlySelf && skipSelf && publisherId === this.publisherId) {
return; return;
} }
if (!debounce) { if (!debounce) {
@ -131,37 +129,33 @@ export class PubSubManager {
return this.adapter.unsubscribe(`${this.basename}${channel}`, fn); return this.adapter.unsubscribe(`${this.basename}${channel}`, fn);
} }
async publish(channel, message) { async publish(channel, message, options?: PubSubManagerPublishOptions) {
if (!this.adapter) { if (!this.adapter) {
return; return;
} }
const wrappedMessage = JSON.stringify({ const wrappedMessage = JSON.stringify({
publisherId: this.publisherId, publisherId: this.publisherId,
...options,
message: message, message: message,
}); });
return this.adapter.publish(`${this.basename}${channel}`, wrappedMessage); return this.adapter.publish(`${this.basename}${channel}`, wrappedMessage);
} }
protected debounce(func, wait: number) { async subscribeAll(callback, options: PubSubManagerSubscribeOptions = {}) {
if (wait) {
return _.debounce(func, wait);
}
return func;
}
async subscribeAll(callback, options: PubSubManagerCallbackOptions = {}) {
if (!this.adapter) { if (!this.adapter) {
return; return;
} }
const { skipSelf = true, debounce = 0 } = options; const { debounce = 0 } = options;
return this.adapter.subscribeAll(async (channel: string, wrappedMessage) => { return this.adapter.subscribeAll(async (channel: string, wrappedMessage) => {
if (!channel.startsWith(this.basename)) { if (!channel.startsWith(this.basename)) {
return; return;
} }
const { publisherId, message } = JSON.parse(wrappedMessage); const { onlySelf, skipSelf, publisherId, message } = JSON.parse(wrappedMessage);
if (skipSelf && publisherId === this.publisherId) { if (onlySelf && publisherId !== this.publisherId) {
return;
} else if (!onlySelf && skipSelf && publisherId === this.publisherId) {
return; return;
} }
const realChannel = channel.substring(this.basename.length); const realChannel = channel.substring(this.basename.length);
@ -178,6 +172,13 @@ export class PubSubManager {
this.messageHanders.delete(messageHash); this.messageHanders.delete(messageHash);
}); });
} }
protected debounce(func, wait: number) {
if (wait) {
return _.debounce(func, wait);
}
return func;
}
} }
export interface IPubSubAdapter { export interface IPubSubAdapter {

View File

@ -0,0 +1,49 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import Application from './application';
import { PubSubManager, PubSubManagerPublishOptions } from './pub-sub-manager';
export class SyncMessageManager {
protected versionManager: SyncMessageVersionManager;
protected pubSubManager: PubSubManager;
constructor(
protected app: Application,
protected options: any = {},
) {
this.versionManager = new SyncMessageVersionManager();
app.on('beforeLoadPlugin', async (plugin) => {
if (!plugin.name) {
return;
}
await this.subscribe(plugin.name, plugin.handleSyncMessage.bind(plugin));
});
}
get debounce() {
return this.options.debounce || 1000;
}
async publish(channel: string, message, options?: PubSubManagerPublishOptions) {
await this.app.pubSubManager.publish(`${this.app.name}.sync.${channel}`, message, { skipSelf: true, ...options });
}
async subscribe(channel: string, callback) {
await this.app.pubSubManager.subscribe(`${this.app.name}.sync.${channel}`, callback, { debounce: this.debounce });
}
async sync() {
// TODO
}
}
export class SyncMessageVersionManager {
// TODO
}

View File

@ -25,17 +25,17 @@ export class MemoryPubSubAdapter implements IPubSubAdapter {
static instances = new Map<string, MemoryPubSubAdapter>(); static instances = new Map<string, MemoryPubSubAdapter>();
static create(name?: string) { static create(name?: string, options?: any) {
if (!name) { if (!name) {
name = uid(); name = uid();
} }
if (!this.instances.has(name)) { if (!this.instances.has(name)) {
this.instances.set(name, new MemoryPubSubAdapter()); this.instances.set(name, new MemoryPubSubAdapter(options));
} }
return this.instances.get(name); return this.instances.get(name);
} }
constructor() { constructor(protected options: any = {}) {
this.emitter = new TestEventEmitter(); this.emitter = new TestEventEmitter();
} }
@ -56,12 +56,16 @@ export class MemoryPubSubAdapter implements IPubSubAdapter {
} }
async publish(channel, message) { async publish(channel, message) {
console.log(this.connected, { channel, message });
if (!this.connected) { if (!this.connected) {
return; return;
} }
await this.emitter.emitAsync(channel, message); await this.emitter.emitAsync(channel, message);
await this.emitter.emitAsync('__publish__', channel, message); await this.emitter.emitAsync('__publish__', channel, message);
await sleep(Number(process.env.PUB_SUB_DEFAULT_DEBOUNCE || 1000)); // 用于处理延迟问题
if (this.options.debounce) {
await sleep(Number(this.options.debounce));
}
} }
async subscribeAll(callback) { async subscribeAll(callback) {

View File

@ -10,6 +10,7 @@
import { mockDatabase } from '@nocobase/database'; import { mockDatabase } from '@nocobase/database';
import { Application, ApplicationOptions, AppSupervisor, Gateway, PluginManager } from '@nocobase/server'; import { Application, ApplicationOptions, AppSupervisor, Gateway, PluginManager } from '@nocobase/server';
import jwt from 'jsonwebtoken'; import jwt from 'jsonwebtoken';
import _ from 'lodash';
import qs from 'qs'; import qs from 'qs';
import supertest, { SuperAgentTest } from 'supertest'; import supertest, { SuperAgentTest } from 'supertest';
import { MemoryPubSubAdapter } from './memory-pub-sub-adapter'; import { MemoryPubSubAdapter } from './memory-pub-sub-adapter';
@ -231,12 +232,19 @@ export function mockServer(options: ApplicationOptions = {}) {
const app = new MockServer({ const app = new MockServer({
acl: false, acl: false,
syncMessageManager: {
debounce: 1000,
},
...options, ...options,
}); });
if (options.pubSubManager) { const basename = app.options.pubSubManager?.basename || app.name;
app.pubSubManager.setAdapter(MemoryPubSubAdapter.create(options.pubSubManager?.basename));
} app.pubSubManager.setAdapter(
MemoryPubSubAdapter.create(basename, {
debounce: 1000,
}),
);
return app; return app;
} }
@ -249,6 +257,30 @@ export async function startMockServer(options: ApplicationOptions = {}) {
type BeforeInstallFn = (app) => Promise<void>; type BeforeInstallFn = (app) => Promise<void>;
export async function createMultiMockServer(
options: ApplicationOptions & {
number?: number;
version?: string;
basename?: string;
beforeInstall?: BeforeInstallFn;
skipInstall?: boolean;
skipStart?: boolean;
} = {},
) {
const instances: MockServer[] = [];
for (const i of _.range(0, options.number || 2)) {
const app: MockServer = await createMockServer({
...options,
skipSupervisor: true,
pubSubManager: {
basename: options.basename,
},
});
instances.push(app);
}
return instances;
}
export async function createMockServer( export async function createMockServer(
options: ApplicationOptions & { options: ApplicationOptions & {
version?: string; version?: string;