diff --git a/.env.example b/.env.example index c6f7c0477b..2a199a7926 100644 --- a/.env.example +++ b/.env.example @@ -29,6 +29,12 @@ LOGGER_MAX_SIZE= # console | json | logfmt | delimiter LOGGER_FORMAT= +# Start application in cluster mode when the value is set (same as pm2 -i ). +# Cluster mode will only work properly when plugins related to distributed architecture are enabled. +# Otherwise, the application's functionality may encounter unexpected issues. +# The cluster mode will not work in development mode either. +CLUSTER_MODE= + ################# DATABASE ################# # postgres | msysql | mariadb | sqlite diff --git a/packages/core/cli/src/commands/start.js b/packages/core/cli/src/commands/start.js index 4d2ba005cb..2437ea3dfb 100644 --- a/packages/core/cli/src/commands/start.js +++ b/packages/core/cli/src/commands/start.js @@ -33,6 +33,7 @@ module.exports = (cli) => { .command('start') .option('-p, --port [port]') .option('-d, --daemon') + .option('-i, --instances [instances]') .option('--db-sync') .option('--quickstart') .allowUnknownOption() @@ -61,13 +62,16 @@ module.exports = (cli) => { } await postCheck(opts); deleteSockFiles(); + const instances = opts.instances || process.env.CLUSTER_MODE; + const instancesArgs = instances ? ['-i', instances] : []; if (opts.daemon) { - run('pm2', ['start', `${APP_PACKAGE_ROOT}/lib/index.js`, '--', ...process.argv.slice(2)]); + run('pm2', ['start', ...instancesArgs, `${APP_PACKAGE_ROOT}/lib/index.js`, '--', ...process.argv.slice(2)]); } else { run( 'pm2-runtime', [ 'start', + ...instancesArgs, `${APP_PACKAGE_ROOT}/lib/index.js`, NODE_ARGS ? `--node-args="${NODE_ARGS}"` : undefined, '--', diff --git a/packages/core/server/src/plugin.ts b/packages/core/server/src/plugin.ts index bca22be34a..73db78c2ea 100644 --- a/packages/core/server/src/plugin.ts +++ b/packages/core/server/src/plugin.ts @@ -18,6 +18,7 @@ import { resolve } from 'path'; import { Application } from './application'; import { InstallOptions, getExposeChangelogUrl, getExposeReadmeUrl } from './plugin-manager'; import { checkAndGetCompatible, getPluginBasePath } from './plugin-manager/utils'; +import { SyncMessageData } from './sync-manager'; export interface PluginInterface { beforeLoad?: () => void; @@ -133,6 +134,12 @@ export abstract class Plugin implements PluginInterface { async afterRemove() {} + /** + * Fired when a sync message is received. + * @experimental + */ + onSync(message: SyncMessageData): Promise | void {} + /** * @deprecated */ diff --git a/packages/core/server/src/sync-manager.ts b/packages/core/server/src/sync-manager.ts index af128fa786..b01fc3c3c1 100644 --- a/packages/core/server/src/sync-manager.ts +++ b/packages/core/server/src/sync-manager.ts @@ -34,7 +34,7 @@ export class SyncManager { private nodeId: string; private app: Application; private eventEmitter = new EventEmitter(); - private adapter = null; + private adapter: SyncAdapter = null; private incomingBuffer: SyncMessageData[] = []; private outgoingBuffer: [string, SyncMessageData][] = []; private flushTimer: NodeJS.Timeout = null; @@ -43,12 +43,21 @@ export class SyncManager { return this.adapter ? this.adapter.ready : false; } + private onMessage(namespace, message) { + this.app.logger.info(`emit sync event in namespace ${namespace}`); + this.eventEmitter.emit(namespace, message); + const pluginInstance = this.app.pm.get(namespace); + pluginInstance.onSync(message); + } + private onSync = (messages: SyncMessage[]) => { - this.app.logger.info('sync messages received into buffer:', messages); + this.app.logger.info('sync messages received, save into buffer:', messages); + if (this.flushTimer) { clearTimeout(this.flushTimer); this.flushTimer = null; } + this.incomingBuffer = uniqWith( this.incomingBuffer.concat( messages @@ -60,9 +69,9 @@ export class SyncManager { this.flushTimer = setTimeout(() => { this.incomingBuffer.forEach(({ namespace, ...message }) => { - this.app.logger.info(`emit sync event in namespace ${namespace}`); - this.eventEmitter.emit(namespace, message); + this.onMessage(namespace, message); }); + this.incomingBuffer = []; }, 1000); }; @@ -82,9 +91,11 @@ export class SyncManager { if (this.adapter) { throw new Error('sync adapter is already exists'); } + if (!adapter) { return; } + this.adapter = adapter; this.adapter.on('message', this.onSync); this.adapter.on('ready', this.onReady); diff --git a/packages/plugins/@nocobase/plugin-data-source-main/src/server/server.ts b/packages/plugins/@nocobase/plugin-data-source-main/src/server/server.ts index 35806d135f..ab3a0f6d17 100644 --- a/packages/plugins/@nocobase/plugin-data-source-main/src/server/server.ts +++ b/packages/plugins/@nocobase/plugin-data-source-main/src/server/server.ts @@ -40,6 +40,19 @@ export class PluginDataSourceMainServer extends Plugin { this.loadFilter = filter; } + async onSync(message) { + const { type, collectionName } = message; + if (type === 'newCollection') { + const collectionModel: CollectionModel = await this.app.db.getCollection('collections').repository.findOne({ + filter: { + name: collectionName, + }, + }); + + await collectionModel.load(); + } + } + async beforeLoad() { if (this.app.db.inDialect('postgres')) { this.schema = process.env.COLLECTION_MANAGER_SCHEMA || this.db.options.schema || 'public'; @@ -77,6 +90,11 @@ export class PluginDataSourceMainServer extends Plugin { await model.migrate({ transaction, }); + + this.app.syncManager.publish(this.name, { + type: 'newCollection', + collectionName: model.get('name'), + }); } }, ); diff --git a/packages/plugins/@nocobase/plugin-file-manager/src/server/server.ts b/packages/plugins/@nocobase/plugin-file-manager/src/server/server.ts index 9c8fe09404..297a9655b1 100644 --- a/packages/plugins/@nocobase/plugin-file-manager/src/server/server.ts +++ b/packages/plugins/@nocobase/plugin-file-manager/src/server/server.ts @@ -66,7 +66,7 @@ export default class PluginFileManagerServer extends Plugin { } } - private onSync = async (message) => { + async onSync(message) { if (message.type === 'storageChange') { const storage = await this.db.getRepository('storages').findOne({ filterByTk: message.storageId, @@ -79,7 +79,7 @@ export default class PluginFileManagerServer extends Plugin { const id = Number.parseInt(message.storageId, 10); this.storagesCache.delete(id); } - }; + } async beforeLoad() { this.db.registerModels({ FileModel }); @@ -90,8 +90,6 @@ export default class PluginFileManagerServer extends Plugin { }); this.app.on('afterStart', async () => { await this.loadStorages(); - - this.app.syncManager.subscribe(this.name, this.onSync); }); } diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts index fa96050412..0ea6276e8b 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts @@ -110,7 +110,7 @@ export default class PluginWorkflowServer extends Plugin { } }; - private onSync = async (message) => { + async onSync(message) { if (message.type === 'statusChange') { const workflowId = Number.parseInt(message.workflowId, 10); const enabled = Number.parseInt(message.enabled, 10); @@ -133,7 +133,7 @@ export default class PluginWorkflowServer extends Plugin { } } } - }; + } /** * @experimental @@ -284,7 +284,6 @@ export default class PluginWorkflowServer extends Plugin { this.app.on('afterStart', async () => { this.app.setMaintainingMessage('check for not started executions'); this.ready = true; - this.app.syncManager.subscribe(this.name, this.onSync); const collection = db.getCollection('workflows'); const workflows = await collection.repository.find({ @@ -308,8 +307,6 @@ export default class PluginWorkflowServer extends Plugin { this.toggle(workflow, false); } - this.app.syncManager.unsubscribe('workflow', this.onSync); - this.ready = false; if (this.events.length) { await this.prepare();