From e65ff559c290dcec37452db0c0ec48391b6af4e2 Mon Sep 17 00:00:00 2001 From: Junyi Date: Fri, 11 Apr 2025 08:46:14 +0800 Subject: [PATCH] fix(plugin-workflow): fix loading enabled workflows list with stats (#6642) * fix(plugin-workflow): fix loading enabled workflows list with stats * fix(plugin-workflow): add fallback logic to make sure workflow has stats --- .../plugin-workflow/src/server/Plugin.ts | 173 ++++++++++-------- .../src/server/__tests__/Plugin.test.ts | 9 + 2 files changed, 105 insertions(+), 77 deletions(-) diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts index f15008b422..698e2e7389 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts @@ -113,6 +113,97 @@ export default class PluginWorkflowServer extends Plugin { } }; + private onAfterCreate = async (model: WorkflowModel, { transaction }) => { + const WorkflowStatsModel = this.db.getModel('workflowStats'); + const [stats, created] = await WorkflowStatsModel.findOrCreate({ + where: { key: model.key }, + defaults: { key: model.key }, + transaction, + }); + model.stats = stats; + model.versionStats = await model.createVersionStats({ id: model.id }, { transaction }); + if (model.enabled) { + this.toggle(model, true, { transaction }); + } + }; + + private onAfterUpdate = async (model: WorkflowModel, { transaction }) => { + model.stats = await model.getStats({ transaction }); + model.versionStats = await model.getVersionStats({ transaction }); + this.toggle(model, model.enabled, { transaction }); + }; + + private onAfterDestroy = async (model: WorkflowModel, { transaction }) => { + this.toggle(model, false, { transaction }); + + const TaskRepo = this.db.getRepository('workflowTasks'); + await TaskRepo.destroy({ + filter: { + workflowId: model.id, + }, + transaction, + }); + }; + + // [Life Cycle]: + // * load all workflows in db + // * add all hooks for enabled workflows + // * add hooks for create/update[enabled]/delete workflow to add/remove specific hooks + private onAfterStart = async () => { + this.ready = true; + + const collection = this.db.getCollection('workflows'); + const workflows = await collection.repository.find({ + filter: { enabled: true }, + appends: ['stats', 'versionStats'], + }); + + for (const workflow of workflows) { + // NOTE: workflow stats may not be created in migration (for compatibility) + if (!workflow.stats) { + workflow.stats = await workflow.createStats({ executed: 0 }); + } + // NOTE: workflow stats may not be created in migration (for compatibility) + if (!workflow.versionStats) { + workflow.versionStats = await workflow.createVersionStats({ executed: 0 }); + } + + this.toggle(workflow, true, { silent: true }); + } + + this.checker = setInterval(() => { + this.getLogger('dispatcher').info(`(cycling) check for queueing executions`); + this.dispatch(); + }, 300_000); + + this.app.on('workflow:dispatch', () => { + this.app.logger.info('workflow:dispatch'); + this.dispatch(); + }); + + // check for queueing executions + this.getLogger('dispatcher').info('(starting) check for queueing executions'); + this.dispatch(); + }; + + private onBeforeStop = async () => { + for (const workflow of this.enabledCache.values()) { + this.toggle(workflow, false, { silent: true }); + } + + this.ready = false; + if (this.events.length) { + await this.prepare(); + } + if (this.executing) { + await this.executing; + } + + if (this.checker) { + clearInterval(this.checker); + } + }; + async handleSyncMessage(message) { if (message.type === 'statusChange') { if (message.enabled) { @@ -289,84 +380,12 @@ export default class PluginWorkflowServer extends Plugin { }); db.on('workflows.beforeSave', this.onBeforeSave); - db.on('workflows.afterCreate', async (model: WorkflowModel, { transaction }) => { - const WorkflowStatsModel = this.db.getModel('workflowStats'); - const [stats, created] = await WorkflowStatsModel.findOrCreate({ - where: { key: model.key }, - defaults: { key: model.key }, - transaction, - }); - model.stats = stats; - model.versionStats = await model.createVersionStats({ id: model.id }, { transaction }); - if (model.enabled) { - this.toggle(model, true, { transaction }); - } - }); - db.on('workflows.afterUpdate', async (model: WorkflowModel, { transaction }) => { - model.stats = await model.getStats({ transaction }); - model.versionStats = await model.getVersionStats({ transaction }); - this.toggle(model, model.enabled, { transaction }); - }); - db.on('workflows.afterDestroy', async (model: WorkflowModel, { transaction }) => { - this.toggle(model, false, { transaction }); + db.on('workflows.afterCreate', this.onAfterCreate); + db.on('workflows.afterUpdate', this.onAfterUpdate); + db.on('workflows.afterDestroy', this.onAfterDestroy); - const TaskRepo = this.db.getRepository('workflowTasks'); - await TaskRepo.destroy({ - filter: { - workflowId: model.id, - }, - transaction, - }); - }); - - // [Life Cycle]: - // * load all workflows in db - // * add all hooks for enabled workflows - // * add hooks for create/update[enabled]/delete workflow to add/remove specific hooks - this.app.on('afterStart', async () => { - this.ready = true; - - const collection = db.getCollection('workflows'); - const workflows = await collection.repository.find({ - filter: { enabled: true }, - }); - - workflows.forEach((workflow: WorkflowModel) => { - this.toggle(workflow, true, { silent: true }); - }); - - this.checker = setInterval(() => { - this.getLogger('dispatcher').info(`(cycling) check for queueing executions`); - this.dispatch(); - }, 300_000); - - this.app.on('workflow:dispatch', () => { - this.app.logger.info('workflow:dispatch'); - this.dispatch(); - }); - - // check for queueing executions - this.getLogger('dispatcher').info('(starting) check for queueing executions'); - this.dispatch(); - }); - - this.app.on('beforeStop', async () => { - for (const workflow of this.enabledCache.values()) { - this.toggle(workflow, false, { silent: true }); - } - - this.ready = false; - if (this.events.length) { - await this.prepare(); - } - if (this.executing) { - await this.executing; - } - - if (this.checker) { - clearInterval(this.checker); - } - }); + this.app.on('afterStart', this.onAfterStart); + this.app.on('beforeStop', this.onBeforeStop); } private toggle( diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Plugin.test.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Plugin.test.ts index acc0344163..44ffe8c985 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Plugin.test.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Plugin.test.ts @@ -43,6 +43,11 @@ describe('workflow > Plugin', () => { }); expect(workflow.current).toBe(true); + + expect(workflow.stats).toBeDefined(); + expect(workflow.stats.executed).toBe(0); + expect(workflow.versionStats).toBeDefined(); + expect(workflow.versionStats.executed).toBe(0); }); it('create with disabled', async () => { @@ -358,6 +363,10 @@ describe('workflow > Plugin', () => { await sleep(500); + const w1_1 = plugin.enabledCache.get(w1.id); + expect(w1_1.stats).toBeDefined(); + expect(w1_1.stats.executed).toBe(0); + await e1.reload(); expect(e1.status).toBe(EXECUTION_STATUS.RESOLVED);