diff --git a/.env.example b/.env.example index 3701158c1d..113eff9814 100644 --- a/.env.example +++ b/.env.example @@ -35,6 +35,14 @@ LOGGER_FORMAT= # The cluster mode will not work in development mode either. CLUSTER_MODE= +# Set server running mode, which determines how the server processes requests and background jobs. +# - ``: run as a standalone server, process all requests and background jobs. +# - `!`: run as a master server, only process user requests from UI (http), but not background jobs. +# - `*`: run as a worker server, only process background jobs. +# - `,`: run as a worker server, only process background jobs for the specified topics. +# - `!,`: run as mixed mode, process user requests from UI (http) and background jobs for the specified topics. +WORKER_MODE= + ################# DATABASE ################# # postgres | mysql | mariadb | sqlite diff --git a/packages/core/server/src/application.ts b/packages/core/server/src/application.ts index 734ad2e4eb..211d14a616 100644 --- a/packages/core/server/src/application.ts +++ b/packages/core/server/src/application.ts @@ -454,6 +454,32 @@ export class Application exten return this._aesEncryptor; } + /** + * Check if the application is serving as a specific worker. + * @experimental + */ + public serving(key?: string): boolean { + const { WORKER_MODE = '' } = process.env; + if (!WORKER_MODE) { + return true; + } + const topics = WORKER_MODE.trim().split(','); + if (key) { + if (WORKER_MODE === '*') { + return true; + } + if (topics.includes(key)) { + return true; + } + return false; + } else { + if (topics.includes('!')) { + return true; + } + return false; + } + } + /** * @internal */ diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts index 2f270e2515..dcada2ac8a 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts @@ -56,6 +56,8 @@ export type EventOptions = { type CachedEvent = [WorkflowModel, any, EventOptions]; +const WORKER_MODE_WORKFLOW_PROCESS = 'workflow:process'; + export default class PluginWorkflowServer extends Plugin { instructions: Registry = new Registry(); triggers: Registry = new Registry(); @@ -64,7 +66,7 @@ export default class PluginWorkflowServer extends Plugin { snowflake: Snowflake; private ready = false; - private executing: Promise | null = null; + private executing: Promise | null = null; private pending: Pending[] = []; private events: CachedEvent[] = []; private eventsCount = 0; @@ -79,13 +81,15 @@ export default class PluginWorkflowServer extends Plugin { filterByTk: event.executionId, }); if (!execution || execution.status !== EXECUTION_STATUS.QUEUEING) { + this.getLogger('dispatcher').info( + `execution (${event.executionId}) from queue not found or not in queueing status, skip`, + ); return; } this.getLogger(execution.workflowId).info( `execution (${execution.id}) received from queue, adding to pending list`, ); - this.pending.push([execution]); - this.dispatch(); + this.run(execution); }; private onBeforeSave = async (instance: WorkflowModel, { transaction, cycling }) => { @@ -345,10 +349,12 @@ export default class PluginWorkflowServer extends Plugin { custom_epoch: pluginRecord?.createdAt.getTime(), }); - this.app.backgroundJobManager.subscribe(`${this.name}.pendingExecution`, { - idle: () => !this.executing && !this.pending.length && !this.events.length, - process: this.onQueueExecution, - }); + if (this.app.serving(WORKER_MODE_WORKFLOW_PROCESS)) { + this.app.backgroundJobManager.subscribe(`${this.name}.pendingExecution`, { + idle: () => !this.executing && !this.pending.length && !this.events.length, + process: this.onQueueExecution, + }); + } } /** @@ -522,20 +528,32 @@ export default class PluginWorkflowServer extends Plugin { return null; } - public async resume(job) { - if (!job.execution) { - job.execution = await job.getExecution(); - } - this.getLogger(job.execution.workflowId).info( - `execution (${job.execution.id}) resuming from job (${job.id}) added to pending list`, - ); - this.pending.push([job.execution, job]); - if (this.executing) { + async run(execution: ExecutionModel, job?: JobModel): Promise { + while (this.executing) { await this.executing; } + + this.executing = this.process(execution, job); + + await this.executing; + + this.executing = null; + this.dispatch(); } + public async resume(job) { + let { execution } = job; + if (!execution) { + execution = await job.getExecution(); + } + this.getLogger(execution.workflowId).info( + `execution (${execution.id}) resuming from job (${job.id}) added to pending list`, + ); + + this.run(execution, job); + } + /** * Start a deferred execution * @experimental @@ -545,11 +563,8 @@ export default class PluginWorkflowServer extends Plugin { return; } this.getLogger(execution.workflowId).info(`starting deferred execution (${execution.id})`); - this.pending.push([execution]); - if (this.executing) { - await this.executing; - } - this.dispatch(); + + this.run(execution); } private async validateEvent(workflow: WorkflowModel, context: any, options: EventOptions) { @@ -692,6 +707,13 @@ export default class PluginWorkflowServer extends Plugin { return; } + if (!this.app.serving(WORKER_MODE_WORKFLOW_PROCESS)) { + this.getLogger('dispatcher').warn( + `${WORKER_MODE_WORKFLOW_PROCESS} is not serving, new dispatching will be ignored`, + ); + return; + } + if (this.executing) { this.getLogger('dispatcher').warn(`workflow executing is not finished, new dispatching will be ignored`); return; @@ -717,7 +739,9 @@ export default class PluginWorkflowServer extends Plugin { async (transaction) => { const execution = (await this.db.getRepository('executions').findOne({ filter: { - status: EXECUTION_STATUS.QUEUEING, + status: { + [Op.is]: EXECUTION_STATUS.QUEUEING, + }, 'workflow.enabled': true, }, sort: 'id', @@ -763,7 +787,7 @@ export default class PluginWorkflowServer extends Plugin { if (execution.status === EXECUTION_STATUS.QUEUEING) { const transaction = await this.useDataSourceTransaction('main', options.transaction); await execution.update({ status: EXECUTION_STATUS.STARTED }, { transaction }); - logger.info(`queueing execution (${execution.id}) from pending list updated to started`); + logger.info(`execution (${execution.id}) from pending list updated to started`); } const processor = this.createProcessor(execution, options);