diff --git a/packages/plugins/@nocobase/plugin-workflow-request/src/server/RequestInstruction.ts b/packages/plugins/@nocobase/plugin-workflow-request/src/server/RequestInstruction.ts index c2b2b6ccde..ed793d0cad 100644 --- a/packages/plugins/@nocobase/plugin-workflow-request/src/server/RequestInstruction.ts +++ b/packages/plugins/@nocobase/plugin-workflow-request/src/server/RequestInstruction.ts @@ -160,14 +160,15 @@ export default class extends Instruction { }) .finally(() => { processor.logger.debug(`request (#${node.id}) ended, resume workflow...`); - setImmediate(() => { + setTimeout(() => { + job.execution = processor.execution; this.workflow.resume(job); }); }); processor.logger.info(`request (#${node.id}) sent to "${config.url}", waiting for response...`); - return processor.exit(); + return job; } async resume(node: FlowNodeModel, job, processor: Processor) { diff --git a/packages/plugins/@nocobase/plugin-workflow-test/src/server/instructions.ts b/packages/plugins/@nocobase/plugin-workflow-test/src/server/instructions.ts index cf6c9fa950..c8a5d77b48 100644 --- a/packages/plugins/@nocobase/plugin-workflow-test/src/server/instructions.ts +++ b/packages/plugins/@nocobase/plugin-workflow-test/src/server/instructions.ts @@ -78,6 +78,30 @@ export default { }, }, + asyncResume: { + async run(node, input, processor) { + const job = await processor.saveJob({ + status: 0, + nodeId: node.id, + nodeKey: node.key, + upstreamId: input?.id ?? null, + }); + + setTimeout(() => { + job.set({ + status: 1, + }); + + processor.options.plugin.resume(job); + }, 100); + + return null; + }, + resume(node, job, processor) { + return job; + }, + }, + customizedSuccess: { run(node, input, processor) { return { diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts index b273e034c2..39ad6c6abf 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts @@ -10,9 +10,10 @@ import path from 'path'; import { randomUUID } from 'crypto'; +import { Transaction, Transactionable } from 'sequelize'; import LRUCache from 'lru-cache'; -import { Op, Transactionable } from '@nocobase/database'; +import { Op } from '@nocobase/database'; import { Plugin } from '@nocobase/server'; import { Registry } from '@nocobase/utils'; @@ -284,7 +285,6 @@ export default class PluginWorkflowServer extends Plugin { // * add all hooks for enabled workflows // * add hooks for create/update[enabled]/delete workflow to add/remove specific hooks this.app.on('afterStart', async () => { - this.app.setMaintainingMessage('check for not started executions'); this.ready = true; const collection = db.getCollection('workflows'); @@ -297,10 +297,12 @@ export default class PluginWorkflowServer extends Plugin { }); this.checker = setInterval(() => { + this.getLogger('dispatcher').info(`(cycling) check for queueing executions`); this.dispatch(); }, 300_000); - // check for not started executions + // check for queueing executions + this.getLogger('dispatcher').info('(starting) check for queueing executions'); this.dispatch(); }); @@ -363,6 +365,15 @@ export default class PluginWorkflowServer extends Plugin { logger.debug(`ignored event data:`, context); return; } + const duplicated = this.events.find(([w, c, { eventKey }]) => { + if (eventKey && options.eventKey) { + return eventKey === options.eventKey; + } + }); + if (duplicated) { + logger.warn(`event of workflow ${workflow.id} is duplicated, event will be ignored`); + return; + } // `null` means not to trigger if (context == null) { logger.warn(`workflow ${workflow.id} event data context is null, event will be ignored`); @@ -381,6 +392,7 @@ export default class PluginWorkflowServer extends Plugin { logger.debug(`event data:`, { context }); if (this.events.length > 1) { + logger.info(`new event is pending to be prepared after previous preparation is finished`); return; } @@ -417,6 +429,9 @@ export default class PluginWorkflowServer extends Plugin { `execution (${job.execution.id}) resuming from job (${job.id}) added to pending list`, ); this.pending.push([job.execution, job]); + if (this.executing) { + await this.executing; + } this.dispatch(); } @@ -424,18 +439,17 @@ export default class PluginWorkflowServer extends Plugin { * Start a deferred execution * @experimental */ - public start(execution: ExecutionModel) { + public async start(execution: ExecutionModel) { if (execution.status !== EXECUTION_STATUS.STARTED) { return; } this.pending.push([execution]); + if (this.executing) { + await this.executing; + } this.dispatch(); } - public createProcessor(execution: ExecutionModel, options = {}): Processor { - return new Processor(execution, { ...options, plugin: this }); - } - private async createExecution( workflow: WorkflowModel, context, @@ -508,7 +522,7 @@ export default class PluginWorkflowServer extends Plugin { const event = this.events.shift(); this.eventsCount = this.events.length; if (!event) { - this.getLogger('dispatcher').warn(`events queue is empty, no need to prepare`); + this.getLogger('dispatcher').info(`events queue is empty, no need to prepare`); return; } @@ -550,41 +564,61 @@ export default class PluginWorkflowServer extends Plugin { this.executing = (async () => { let next: Pending | null = null; - try { - // resuming has high priority - if (this.pending.length) { - next = this.pending.shift() as Pending; - this.getLogger(next[0].workflowId).info(`pending execution (${next[0].id}) ready to process`); - } else { - const execution = (await this.db.getRepository('executions').findOne({ - filter: { - status: EXECUTION_STATUS.QUEUEING, - 'workflow.enabled': true, - 'workflow.id': { - [Op.not]: null, - }, + // resuming has high priority + if (this.pending.length) { + next = this.pending.shift() as Pending; + this.getLogger(next[0].workflowId).info(`pending execution (${next[0].id}) ready to process`); + } else { + try { + await this.db.sequelize.transaction( + { + isolationLevel: + this.db.options.dialect === 'sqlite' ? [][0] : Transaction.ISOLATION_LEVELS.REPEATABLE_READ, }, - appends: ['workflow'], - sort: 'id', - })) as ExecutionModel; - if (execution) { - this.getLogger(execution.workflowId).info(`execution (${execution.id}) fetched from db`); - next = [execution]; - } - } - if (next) { - await this.process(...next); - } - } finally { - this.executing = null; - - if (next) { - this.dispatch(); + async (transaction) => { + const execution = (await this.db.getRepository('executions').findOne({ + filter: { + status: EXECUTION_STATUS.QUEUEING, + 'workflow.enabled': true, + }, + sort: 'id', + transaction, + })) as ExecutionModel; + if (execution) { + this.getLogger(execution.workflowId).info(`execution (${execution.id}) fetched from db`); + await execution.update( + { + status: EXECUTION_STATUS.STARTED, + }, + { transaction }, + ); + execution.workflow = this.enabledCache.get(execution.workflowId); + next = [execution]; + } else { + this.getLogger('dispatcher').info(`no execution in db queued to process`); + } + }, + ); + } catch (error) { + this.getLogger('dispatcher').error(`fetching execution from db failed: ${error.message}`, { error }); } } + if (next) { + await this.process(...next); + } + this.executing = null; + + if (next) { + this.getLogger('dispatcher').info(`last process finished, will do another dispatch`); + this.dispatch(); + } })(); } + public createProcessor(execution: ExecutionModel, options = {}): Processor { + return new Processor(execution, { ...options, plugin: this }); + } + private async process(execution: ExecutionModel, job?: JobModel, options: Transactionable = {}): Promise { if (execution.status === EXECUTION_STATUS.QUEUEING) { const transaction = await this.useDataSourceTransaction('main', options.transaction); diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Plugin.test.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Plugin.test.ts index c1140bdea9..ecde370187 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Plugin.test.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Plugin.test.ts @@ -273,7 +273,7 @@ describe('workflow > Plugin', () => { const p1 = await PostRepo.create({ values: { title: 't1' } }); - await sleep(1000); + await sleep(500); const [e1] = await w1.getExecutions(); expect(e1.status).toBe(EXECUTION_STATUS.RESOLVED); @@ -299,13 +299,34 @@ describe('workflow > Plugin', () => { const p2 = await PostRepo.create({ values: { title: 't2' } }); const p3 = await PostRepo.create({ values: { title: 't3' } }); - await sleep(1000); + await sleep(500); const executions = await w1.getExecutions(); expect(executions.length).toBe(3); expect(executions.map((item) => item.status)).toEqual(Array(3).fill(EXECUTION_STATUS.RESOLVED)); }); + it('duplicated event trigger', async () => { + const w1 = await WorkflowModel.create({ + enabled: true, + type: 'asyncTrigger', + }); + + const n1 = await w1.createNode({ + type: 'asyncResume', + }); + + plugin.trigger(w1, {}, { eventKey: 'a' }); + plugin.trigger(w1, {}, { eventKey: 'a' }); + + await sleep(1000); + + const executions = await w1.getExecutions(); + expect(executions.length).toBe(1); + const jobs = await executions[0].getJobs(); + expect(jobs.length).toBe(1); + }); + it('when server starts, process all created executions', async () => { const w1 = await WorkflowModel.create({ enabled: true, @@ -330,6 +351,9 @@ describe('workflow > Plugin', () => { }, }); + const e1s = await w1.getExecutions(); + expect(e1s.length).toBe(1); + await app.start(); await sleep(500); diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/triggers/schedule/mode-static.test.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/triggers/schedule/mode-static.test.ts index 224c853b1d..711dec0ee7 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/triggers/schedule/mode-static.test.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/triggers/schedule/mode-static.test.ts @@ -32,6 +32,7 @@ function consumeTime(n: number) { describe('workflow > triggers > schedule > static mode', () => { let app: MockServer; let db: Database; + let plugin; let PostRepo; let CategoryRepo; let WorkflowRepo; @@ -40,6 +41,7 @@ describe('workflow > triggers > schedule > static mode', () => { app = await getApp(); db = app.db; + plugin = app.pm.get('workflow') as Plugin; const workflow = db.getCollection('workflows'); WorkflowRepo = workflow.repository; PostRepo = db.getCollection('posts').repository; @@ -393,11 +395,7 @@ describe('workflow > triggers > schedule > static mode', () => { type: 'echo', }); - (app.pm.get('workflow') as Plugin).trigger( - workflow, - { date: start }, - { eventKey: `${workflow.id}@${start.getTime()}` }, - ); + plugin.trigger(workflow, { date: start }, { eventKey: `${workflow.id}@${start.getTime()}` }); await sleep(3000); @@ -429,7 +427,7 @@ describe('workflow > triggers > schedule > static mode', () => { type: 'echo', }); - const trigger = (app.pm.get('workflow') as Plugin).triggers.get(workflow.type); + const trigger = plugin.triggers.get(workflow.type); trigger.on(workflow); await sleep(3000); @@ -439,5 +437,58 @@ describe('workflow > triggers > schedule > static mode', () => { const j1s = await e1s[0].getJobs(); expect(j1s.length).toBe(1); }); + + it('different workflows could trigger in same time but not duplicated for each', async () => { + await sleepToEvenSecond(); + + const start = new Date(); + start.setMilliseconds(0); + start.setSeconds(start.getSeconds() + 2); + + const w1 = await WorkflowRepo.create({ + values: { + enabled: true, + type: 'schedule', + config: { + mode: 0, + startsOn: start.toISOString(), + }, + }, + }); + + const n1 = w1.createNode({ + type: 'echo', + }); + + const w2 = await WorkflowRepo.create({ + values: { + enabled: true, + type: 'schedule', + config: { + mode: 0, + startsOn: start.toISOString(), + }, + }, + }); + + const n2 = w2.createNode({ + type: 'echo', + }); + + plugin.trigger(w1, { date: start }, { eventKey: `${w1.id}@${start.getTime()}` }); + plugin.trigger(w2, { date: start }, { eventKey: `${w2.id}@${start.getTime()}` }); + + await sleep(3000); + + const e1s = await w1.getExecutions(); + expect(e1s.length).toBe(1); + const j1s = await e1s[0].getJobs(); + expect(j1s.length).toBe(1); + + const e2s = await w2.getExecutions(); + expect(e2s.length).toBe(1); + const j2s = await e2s[0].getJobs(); + expect(j2s.length).toBe(1); + }); }); });