From 5bcaa9d11f4f76d42c0b02dee4121d29c5efd9bf Mon Sep 17 00:00:00 2001 From: Junyi Date: Sun, 21 Apr 2024 14:23:13 +0800 Subject: [PATCH] feat(plugin-workflow): add checker for intervally dispatching (#4119) --- .../plugin-workflow/src/server/Plugin.ts | 63 +++++++++++-------- 1 file changed, 37 insertions(+), 26 deletions(-) diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts index e88e90dfbe..9b4cde69c2 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts @@ -45,6 +45,7 @@ export default class PluginWorkflowServer extends Plugin { private loggerCache: LRUCache; private meter = null; + private checker: NodeJS.Timeout = null; private onBeforeSave = async (instance: WorkflowModel, options) => { const Model = instance.constructor; @@ -245,6 +246,10 @@ export default class PluginWorkflowServer extends Plugin { workflows.forEach((workflow: WorkflowModel) => { this.toggle(workflow); }); + + this.checker = setInterval(() => { + this.dispatch(); + }, 300_000); }); this.app.on('afterStart', () => { @@ -271,6 +276,10 @@ export default class PluginWorkflowServer extends Plugin { if (this.executing) { await this.executing; } + + if (this.checker) { + clearInterval(this.checker); + } }); } @@ -466,35 +475,37 @@ export default class PluginWorkflowServer extends Plugin { this.executing = (async () => { let next: Pending | null = null; - // resuming has high priority - if (this.pending.length) { - next = this.pending.shift() as Pending; - this.getLogger(next[0].workflowId).info(`pending execution (${next[0].id}) ready to process`); - } else { - const execution = (await this.db.getRepository('executions').findOne({ - filter: { - status: EXECUTION_STATUS.QUEUEING, - 'workflow.enabled': true, - 'workflow.id': { - [Op.not]: null, + try { + // resuming has high priority + if (this.pending.length) { + next = this.pending.shift() as Pending; + this.getLogger(next[0].workflowId).info(`pending execution (${next[0].id}) ready to process`); + } else { + const execution = (await this.db.getRepository('executions').findOne({ + filter: { + status: EXECUTION_STATUS.QUEUEING, + 'workflow.enabled': true, + 'workflow.id': { + [Op.not]: null, + }, }, - }, - appends: ['workflow'], - sort: 'createdAt', - })) as ExecutionModel; - if (execution) { - this.getLogger(execution.workflowId).info(`execution (${execution.id}) fetched from db`); - next = [execution]; + appends: ['workflow'], + sort: 'id', + })) as ExecutionModel; + if (execution) { + this.getLogger(execution.workflowId).info(`execution (${execution.id}) fetched from db`); + next = [execution]; + } } - } - if (next) { - await this.process(...next); - } + if (next) { + await this.process(...next); + } + } finally { + this.executing = null; - this.executing = null; - - if (next) { - this.dispatch(); + if (next) { + this.dispatch(); + } } })(); }