From a96783a4b0749134b450d8a40358f9c7f3225782 Mon Sep 17 00:00:00 2001 From: Junyi Date: Tue, 18 Feb 2025 00:16:35 +0800 Subject: [PATCH] fix(plugin-workflow-loop): fix pending nodes in loop but resolved (#6236) * fix(plugin-workflow-loop): fix pending nodes in loop but resolved * fix(plugin-workflow-test): fix test instruction --- .../src/server/LoopInstruction.ts | 7 ++- .../src/server/__tests__/instruction.test.ts | 54 +++++++++++++++++++ .../src/server/instructions.ts | 6 +++ .../plugin-workflow/src/server/Processor.ts | 2 +- .../src/server/__tests__/Processor.test.ts | 23 ++++++++ 5 files changed, 90 insertions(+), 2 deletions(-) diff --git a/packages/plugins/@nocobase/plugin-workflow-loop/src/server/LoopInstruction.ts b/packages/plugins/@nocobase/plugin-workflow-loop/src/server/LoopInstruction.ts index 4a3fd4b7df..baf6715424 100644 --- a/packages/plugins/@nocobase/plugin-workflow-loop/src/server/LoopInstruction.ts +++ b/packages/plugins/@nocobase/plugin-workflow-loop/src/server/LoopInstruction.ts @@ -97,7 +97,12 @@ export default class extends Instruction { const { result, status } = job; // NOTE: if loop has been done (resolved / rejected), do not care newly executed branch jobs. if (status !== JOB_STATUS.PENDING) { - return processor.exit(); + processor.logger.warn(`loop (${job.nodeId}) has been done, ignore newly resumed event`); + return null; + } + + if (branchJob.id !== job.id && branchJob.status === JOB_STATUS.PENDING) { + return null; } const nextIndex = result.looped + 1; diff --git a/packages/plugins/@nocobase/plugin-workflow-loop/src/server/__tests__/instruction.test.ts b/packages/plugins/@nocobase/plugin-workflow-loop/src/server/__tests__/instruction.test.ts index dd139f0938..660ec7e187 100644 --- a/packages/plugins/@nocobase/plugin-workflow-loop/src/server/__tests__/instruction.test.ts +++ b/packages/plugins/@nocobase/plugin-workflow-loop/src/server/__tests__/instruction.test.ts @@ -780,6 +780,60 @@ describe('workflow > instructions > loop', () => { expect(jobs[0].status).toBe(JOB_STATUS.RESOLVED); expect(jobs[0].result).toEqual({ looped: 2 }); }); + + it('exit as continue with async node inside', async () => { + const n1 = await workflow.createNode({ + type: 'loop', + config: { + target: 2, + exit: EXIT.CONTINUE, + }, + }); + + const n2 = await workflow.createNode({ + type: 'pending', + upstreamId: n1.id, + branchIndex: 0, + }); + + const post = await PostRepo.create({ values: { title: 't1' } }); + + await sleep(500); + + const [e1] = await workflow.getExecutions(); + expect(e1.status).toBe(EXECUTION_STATUS.STARTED); + const j1s = await e1.getJobs({ order: [['id', 'ASC']] }); + + expect(j1s.length).toBe(2); + + j1s[1].set('status', JOB_STATUS.RESOLVED); + plugin.resume(j1s[1]); + + await sleep(500); + + const [e2] = await workflow.getExecutions(); + expect(e2.status).toBe(EXECUTION_STATUS.STARTED); + const j2s = await e2.getJobs({ order: [['id', 'ASC']] }); + + expect(j2s.length).toBe(3); + expect(j2s[0].result).toEqual({ looped: 1 }); + expect(j2s[1].status).toBe(JOB_STATUS.RESOLVED); + expect(j2s[2].status).toBe(JOB_STATUS.PENDING); + + j2s[2].set('status', JOB_STATUS.RESOLVED); + plugin.resume(j2s[2]); + + await sleep(500); + + const [e3] = await workflow.getExecutions(); + expect(e3.status).toBe(EXECUTION_STATUS.RESOLVED); + const j3s = await e3.getJobs({ order: [['id', 'ASC']] }); + + expect(j3s.length).toBe(3); + expect(j3s[0].result).toEqual({ looped: 2 }); + expect(j3s[1].status).toBe(JOB_STATUS.RESOLVED); + expect(j3s[2].status).toBe(JOB_STATUS.RESOLVED); + }); }); }); diff --git a/packages/plugins/@nocobase/plugin-workflow-test/src/server/instructions.ts b/packages/plugins/@nocobase/plugin-workflow-test/src/server/instructions.ts index c8a5d77b48..97e50c4402 100644 --- a/packages/plugins/@nocobase/plugin-workflow-test/src/server/instructions.ts +++ b/packages/plugins/@nocobase/plugin-workflow-test/src/server/instructions.ts @@ -46,6 +46,12 @@ export default { status: 0, }; }, + resume(node, job) { + if (node.config.status != null) { + job.set('status', node.config.status); + } + return job; + }, test() { return { status: 0, diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/Processor.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/Processor.ts index 40a2403133..672c135c21 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/Processor.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/Processor.ts @@ -234,7 +234,7 @@ export default class Processor { if (parentNode) { this.logger.debug(`not on main, recall to parent entry node (${node.id})})`); await this.recall(parentNode, job); - return job; + return null; } // really done for all nodes diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Processor.test.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Processor.test.ts index 3e6b20e94e..1acc1a0188 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Processor.test.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Processor.test.ts @@ -41,6 +41,29 @@ describe('workflow > Processor', () => { afterEach(() => app.destroy()); describe('base', () => { + it.skip('saveJob', async () => { + const execution = await workflow.createExecution({ + key: workflow.key, + context: {}, + status: EXECUTION_STATUS.STARTED, + eventKey: '123', + }); + + const processor = plugin.createProcessor(execution); + + const job1 = await processor.saveJob({ + status: JOB_STATUS.RESOLVED, + result: null, + }); + + const job2 = await processor.saveJob({ + status: JOB_STATUS.RESOLVED, + result: 'abc', + }); + + expect(job2).toBeDefined(); + }); + it('empty workflow without any nodes', async () => { const post = await PostRepo.create({ values: { title: 't1' } });