diff --git a/packages/core/server/src/plugin.ts b/packages/core/server/src/plugin.ts index 73db78c2ea..aced18f95a 100644 --- a/packages/core/server/src/plugin.ts +++ b/packages/core/server/src/plugin.ts @@ -138,7 +138,15 @@ export abstract class Plugin implements PluginInterface { * Fired when a sync message is received. * @experimental */ - onSync(message: SyncMessageData): Promise | void {} + onSync(message: SyncMessageData = {}): Promise | void {} + + /** + * Publish a sync message. + * @experimental + */ + sync(message?: SyncMessageData) { + this.app.syncManager.publish(this.name, message); + } /** * @deprecated diff --git a/packages/core/server/src/sync-manager.ts b/packages/core/server/src/sync-manager.ts index b01fc3c3c1..faace9d7a6 100644 --- a/packages/core/server/src/sync-manager.ts +++ b/packages/core/server/src/sync-manager.ts @@ -32,7 +32,6 @@ export type SyncMessage = { */ export class SyncManager { private nodeId: string; - private app: Application; private eventEmitter = new EventEmitter(); private adapter: SyncAdapter = null; private incomingBuffer: SyncMessageData[] = []; @@ -82,8 +81,7 @@ export class SyncManager { } }; - constructor(app: Application) { - this.app = app; + constructor(private app: Application) { this.nodeId = `${process.env.NODE_ID || randomUUID()}-${process.pid}`; } @@ -112,7 +110,7 @@ export class SyncManager { /** * Publish a message to the sync manager */ - public publish(namespace: string, data: SyncMessageData) { + public publish(namespace: string, data: SyncMessageData = {}) { if (!this.adapter) { return; } diff --git a/packages/core/test/src/server/index.ts b/packages/core/test/src/server/index.ts index 71bf32dac7..8b0cb5a86c 100644 --- a/packages/core/test/src/server/index.ts +++ b/packages/core/test/src/server/index.ts @@ -12,7 +12,8 @@ import ws from 'ws'; export { mockDatabase, MockDatabase } from '@nocobase/database'; export { default as supertest } from 'supertest'; -export * from './mockServer'; +export * from './mock-server'; +export * from './mock-cluster'; export const pgOnly: () => any = () => (process.env.DB_DIALECT == 'postgres' ? describe : describe.skip); export const isPg = () => process.env.DB_DIALECT == 'postgres'; @@ -22,9 +23,13 @@ export function randomStr() { return Math.random().toString(36).substring(2); } -export const waitSecond = async (timeout = 1000) => { - await new Promise((resolve) => setTimeout(resolve, timeout)); -}; +export function sleep(ms = 1000) { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} + +export const waitSecond = sleep; export const startServerWithRandomPort = async (startServer) => { return await new Promise((resolve) => { diff --git a/packages/core/test/src/server/mock-cluster.ts b/packages/core/test/src/server/mock-cluster.ts new file mode 100644 index 0000000000..c7d5773350 --- /dev/null +++ b/packages/core/test/src/server/mock-cluster.ts @@ -0,0 +1,91 @@ +/** + * This file is part of the NocoBase (R) project. + * Copyright (c) 2020-2024 NocoBase Co., Ltd. + * Authors: NocoBase Team. + * + * This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License. + * For more information, please refer to: https://www.nocobase.com/agreement. + */ + +import Path from 'node:path'; +import { spawn, ChildProcess } from 'node:child_process'; + +import { getPortPromise } from 'portfinder'; +import { uid } from '@nocobase/utils'; +import { createMockServer } from './mock-server'; + +type ProcessConfig = { + env: Record; +}; + +type ClusterOptions = { + script: string; + config: ProcessConfig; + instances: number; + plugins?: string[]; +}; + +export class MockCluster { + private processes = []; + private mockApp; + + constructor(private options: ClusterOptions) {} + + async start(): Promise { + // NOTE: use this for install app first + this.mockApp = await createMockServer({ + plugins: this.options.plugins, + }); + + this.processes = []; + const ports = []; + + for (let i = 0; i < this.options.instances; i++) { + const port = await getPortPromise(); + const childProcess = spawn('node', ['./node_modules/tsx/dist/cli.mjs', this.options.script, 'start'], { + env: { + ...process.env, + ...this.options.config.env, + APP_PORT: `${port}`, + APPEND_PRESET_BUILT_IN_PLUGINS: (this.options.plugins ?? []).join(','), + SOCKET_PATH: `storage/tests/gateway-cluster-${uid()}.sock`, + PM2_HOME: Path.resolve(process.cwd(), `storage/tests/.pm2-${uid()}`), + }, + }); + + await new Promise((resolve, reject) => { + const startTimer = setTimeout(() => reject(new Error('app not started in 10s')), 10000); + childProcess.stdout.on('data', (data) => { + console.log(data.toString()); + if (data.toString().includes('app has been started')) { + clearTimeout(startTimer); + resolve(childProcess); + } + }); + }); + + this.processes.push({ + childProcess, + port, + }); + + ports.push(port); + } + + return ports; + } + + async stop() { + await this.mockApp.destroy(); + + return Promise.all( + this.processes.map(({ childProcess }) => { + const promise = new Promise((resolve) => { + childProcess.on('exit', resolve); + }); + childProcess.kill(); + return promise; + }), + ); + } +} diff --git a/packages/core/test/src/server/mockServer.ts b/packages/core/test/src/server/mock-server.ts similarity index 100% rename from packages/core/test/src/server/mockServer.ts rename to packages/core/test/src/server/mock-server.ts diff --git a/packages/plugins/@nocobase/plugin-file-manager/src/server/server.ts b/packages/plugins/@nocobase/plugin-file-manager/src/server/server.ts index 297a9655b1..293d9bb33b 100644 --- a/packages/plugins/@nocobase/plugin-file-manager/src/server/server.ts +++ b/packages/plugins/@nocobase/plugin-file-manager/src/server/server.ts @@ -106,14 +106,14 @@ export default class PluginFileManagerServer extends Plugin { const Storage = this.db.getModel('storages'); Storage.afterSave((m) => { this.storagesCache.set(m.id, m.toJSON()); - this.app.syncManager.publish(this.name, { + this.sync({ type: 'storageChange', storageId: `${m.id}`, }); }); Storage.afterDestroy((m) => { this.storagesCache.delete(m.id); - this.app.syncManager.publish(this.name, { + this.sync({ type: 'storageRemove', storageId: `${m.id}`, }); diff --git a/packages/plugins/@nocobase/plugin-workflow-test/src/server/index.ts b/packages/plugins/@nocobase/plugin-workflow-test/src/server/index.ts index e292352e98..71fcd04fba 100644 --- a/packages/plugins/@nocobase/plugin-workflow-test/src/server/index.ts +++ b/packages/plugins/@nocobase/plugin-workflow-test/src/server/index.ts @@ -15,7 +15,6 @@ import { MockServer, createMockServer, mockDatabase } from '@nocobase/test'; import functions from './functions'; import triggers from './triggers'; import instructions from './instructions'; -import { Resourcer } from '@nocobase/resourcer'; import { SequelizeDataSource } from '@nocobase/data-source-manager'; import { uid } from '@nocobase/utils'; diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts index 0ea6276e8b..4de893582a 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts @@ -342,7 +342,7 @@ export default class PluginWorkflowServer extends Plugin { this.enabledCache.delete(workflow.id); } if (!silent) { - this.app.syncManager.publish(this.name, { + this.sync({ type: 'statusChange', workflowId: `${workflow.id}`, enabled: `${Number(next)}`,