From 86a56ef7cc1b4dcbfb7cc38fa547d5f0974ff23b Mon Sep 17 00:00:00 2001 From: Junyi Date: Fri, 28 Jun 2024 11:59:22 +0800 Subject: [PATCH] refactor(plugin-workflow): add deferred option for async workflow triggering (#4772) * feat(plugin-workflow): add deferred option for async workflow triggering * fix(plugin-workflow): revert type back * fix(plugin-workflow): fix status checking --- .../plugin-workflow/src/server/Plugin.ts | 21 +++++++-- .../src/server/__tests__/Plugin.test.ts | 46 +++++++++++++++++-- 2 files changed, 59 insertions(+), 8 deletions(-) diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts index 4adc4eaeaf..98678c718b 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts @@ -42,6 +42,7 @@ type Pending = [ExecutionModel, JobModel?]; type EventOptions = { eventKey?: string; context?: any; + deferred?: boolean; [key: string]: any; } & Transactionable; @@ -355,7 +356,7 @@ export default class PluginWorkflowServer extends Plugin { private async triggerSync( workflow: WorkflowModel, context: object, - options: EventOptions = {}, + { deferred, ...options }: EventOptions = {}, ): Promise { let execution; try { @@ -384,6 +385,18 @@ export default class PluginWorkflowServer extends Plugin { this.dispatch(); } + /** + * Start a deferred execution + * @experimental + */ + public start(execution: ExecutionModel) { + if (execution.status !== EXECUTION_STATUS.STARTED) { + return; + } + this.pending.push([execution]); + this.dispatch(); + } + public createProcessor(execution: ExecutionModel, options = {}): Processor { return new Processor(execution, { ...options, plugin: this }); } @@ -393,7 +406,7 @@ export default class PluginWorkflowServer extends Plugin { context, options: EventOptions, ): Promise { - const { transaction = await this.db.sequelize.transaction() } = options; + const { transaction = await this.db.sequelize.transaction(), deferred } = options; const trigger = this.triggers.get(workflow.type); const valid = await trigger.validateEvent(workflow, context, { ...options, transaction }); if (!valid) { @@ -410,7 +423,7 @@ export default class PluginWorkflowServer extends Plugin { context, key: workflow.key, eventKey: options.eventKey ?? randomUUID(), - status: EXECUTION_STATUS.QUEUEING, + status: deferred ? EXECUTION_STATUS.STARTED : EXECUTION_STATUS.QUEUEING, }, { transaction }, ); @@ -468,7 +481,7 @@ export default class PluginWorkflowServer extends Plugin { try { const execution = await this.createExecution(...event); // NOTE: cache first execution for most cases - if (execution && !this.executing && !this.pending.length) { + if (execution?.status === EXECUTION_STATUS.QUEUEING && !this.executing && !this.pending.length) { this.pending.push([execution]); } } catch (err) { diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Plugin.test.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Plugin.test.ts index 3322baa647..c1140bdea9 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Plugin.test.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Plugin.test.ts @@ -11,7 +11,7 @@ import { MockServer } from '@nocobase/test'; import Database from '@nocobase/database'; import { getApp, sleep } from '@nocobase/plugin-workflow-test'; -import Plugin from '..'; +import Plugin, { Processor } from '..'; import { EXECUTION_STATUS } from '../constants'; describe('workflow > Plugin', () => { @@ -19,7 +19,7 @@ describe('workflow > Plugin', () => { let db: Database; let PostRepo; let WorkflowModel; - let plugin; + let plugin: Plugin; beforeEach(async () => { app = await getApp(); @@ -472,6 +472,44 @@ describe('workflow > Plugin', () => { }); }); + describe('deffered', () => { + it('deffered will not be process immediately, and can be start', async () => { + const w1 = await WorkflowModel.create({ + enabled: true, + type: 'asyncTrigger', + }); + + plugin.trigger(w1, {}, { deferred: true }); + + await sleep(500); + + const e1s = await w1.getExecutions(); + expect(e1s.length).toBe(1); + expect(e1s[0].status).toBe(EXECUTION_STATUS.STARTED); + + plugin.start(e1s[0]); + + await sleep(500); + + const e2s = await w1.getExecutions(); + expect(e2s.length).toBe(1); + expect(e2s[0].status).toBe(EXECUTION_STATUS.RESOLVED); + }); + + it('sync workflow will ignore the deferred option, and start it immediately', async () => { + const w1 = await WorkflowModel.create({ + enabled: true, + type: 'syncTrigger', + }); + + const processor = await plugin.trigger(w1, {}, { deferred: true }); + + const e1s = await w1.getExecutions(); + expect(e1s.length).toBe(1); + expect(e1s[0].status).toBe(EXECUTION_STATUS.RESOLVED); + }); + }); + describe('sync', () => { it('sync on trigger class', async () => { const w1 = await WorkflowModel.create({ @@ -479,7 +517,7 @@ describe('workflow > Plugin', () => { type: 'syncTrigger', }); - const processor = await plugin.trigger(w1, {}); + const processor = (await plugin.trigger(w1, {})) as Processor; const executions = await w1.getExecutions(); expect(executions.length).toBe(1); @@ -495,7 +533,7 @@ describe('workflow > Plugin', () => { sync: true, }); - const processor = await plugin.trigger(w1, {}); + const processor = (await plugin.trigger(w1, {})) as Processor; const executions = await w1.getExecutions(); expect(executions.length).toBe(1);