From 2f584b40bd224df0624bc0fd4a3cede5d0dac6a3 Mon Sep 17 00:00:00 2001 From: mytharcher Date: Tue, 1 Feb 2022 12:04:08 +0800 Subject: [PATCH] feat(plugin-workflow): add parallel branch and mixed test cases --- .../src/__tests__/execution.test.ts | 219 +++++++++++++++++- .../plugin-workflow/src/__tests__/index.ts | 6 + .../__tests__/instructions/condition.test.ts | 12 +- .../src/instructions/condition.ts | 15 +- .../plugin-workflow/src/instructions/index.ts | 4 +- .../src/instructions/parallel.ts | 92 ++++++++ .../plugin-workflow/src/models/Execution.ts | 97 +++++--- 7 files changed, 388 insertions(+), 57 deletions(-) create mode 100644 packages/plugin-workflow/src/instructions/parallel.ts diff --git a/packages/plugin-workflow/src/__tests__/execution.test.ts b/packages/plugin-workflow/src/__tests__/execution.test.ts index 2cb7de3596..00ca0d710b 100644 --- a/packages/plugin-workflow/src/__tests__/execution.test.ts +++ b/packages/plugin-workflow/src/__tests__/execution.test.ts @@ -250,7 +250,7 @@ describe('execution', () => { const [execution] = await workflow.getExecutions(); expect(execution.status).toEqual(EXECUTION_STATUS.STARTED); - const [pending] = await execution.getJobs({ nodeId: n2.id }); + const [pending] = await execution.getJobs({ where: { nodeId: n2.id } }); pending.set('result', 123); await execution.resume(pending); @@ -285,7 +285,7 @@ describe('execution', () => { const [execution] = await workflow.getExecutions(); expect(execution.status).toEqual(EXECUTION_STATUS.STARTED); - const [pending] = await execution.getJobs({ nodeId: n2.id }); + const [pending] = await execution.getJobs({ where: { nodeId: n2.id } }); pending.set('result', 123); await execution.resume(pending); expect(execution.status).toEqual(EXECUTION_STATUS.REJECTED); @@ -294,4 +294,219 @@ describe('execution', () => { expect(jobs.length).toEqual(2); }); }); + + describe('branch: parallel node', () => { + it('link to single branch', async () => { + const n1 = await workflow.createNode({ + title: 'parallel', + type: 'parallel' + }); + + const n2 = await workflow.createNode({ + title: 'echo1', + type: 'echo', + upstreamId: n1.id, + branchIndex: 0 + }); + + const n3 = await workflow.createNode({ + title: 'echo2', + type: 'echo', + upstreamId: n1.id + }); + + await n1.setDownstream(n3); + + const post = await PostModel.create({ title: 't1' }); + + const [execution] = await workflow.getExecutions(); + expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED); + const jobs = await execution.getJobs({ order: [['id', 'ASC']] }); + expect(jobs.length).toEqual(3); + }); + + it('link to multipe branches', async () => { + const n1 = await workflow.createNode({ + title: 'parallel', + type: 'parallel' + }); + + const n2 = await workflow.createNode({ + title: 'echo1', + type: 'echo', + upstreamId: n1.id, + branchIndex: 0 + }); + + const n3 = await workflow.createNode({ + title: 'echo2', + type: 'echo', + upstreamId: n1.id, + branchIndex: 1 + }); + + const n4 = await workflow.createNode({ + title: 'echo on end', + type: 'echo', + upstreamId: n1.id + }); + + await n1.setDownstream(n4); + + const post = await PostModel.create({ title: 't1' }); + + const [execution] = await workflow.getExecutions(); + expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED); + const jobs = await execution.getJobs({ order: [['id', 'ASC']] }); + expect(jobs.length).toEqual(4); + }); + + it('downstream has manual node', async () => { + const n1 = await workflow.createNode({ + title: 'parallel', + type: 'parallel' + }); + + const n2 = await workflow.createNode({ + title: 'prompt', + type: 'prompt', + upstreamId: n1.id, + branchIndex: 0 + }); + + const n3 = await workflow.createNode({ + title: 'echo', + type: 'echo', + upstreamId: n1.id, + branchIndex: 1 + }); + + const n4 = await workflow.createNode({ + title: 'echo on end', + type: 'echo', + upstreamId: n1.id + }); + + await n1.setDownstream(n4); + + const post = await PostModel.create({ title: 't1' }); + + const [execution] = await workflow.getExecutions(); + expect(execution.status).toEqual(EXECUTION_STATUS.STARTED); + + const [pending] = await execution.getJobs({ nodeId: n2.id }); + pending.set('result', 123); + await execution.resume(pending); + + expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED); + const jobs = await execution.getJobs({ order: [['id', 'ASC']] }); + expect(jobs.length).toEqual(4); + }); + }); + + describe('branch: mixed', () => { + it('condition branches contains parallel', async () => { + const n1 = await workflow.createNode({ + title: 'condition', + type: 'condition' + }); + + const n2 = await workflow.createNode({ + title: 'parallel', + type: 'parallel', + branchIndex: BRANCH_INDEX.ON_TRUE, + upstreamId: n1.id + }); + + const n3 = await workflow.createNode({ + title: 'prompt', + type: 'prompt', + upstreamId: n2.id, + branchIndex: 0 + }); + + const n4 = await workflow.createNode({ + title: 'parallel echo', + type: 'echo', + upstreamId: n2.id, + branchIndex: 1 + }); + + const n5 = await workflow.createNode({ + title: 'last echo', + type: 'echo', + upstreamId: n1.id + }); + + await n1.setDownstream(n5); + + const post = await PostModel.create({ title: 't1' }); + + const [execution] = await workflow.getExecutions(); + expect(execution.status).toEqual(EXECUTION_STATUS.STARTED); + + const pendingJobs = await execution.getJobs(); + expect(pendingJobs.length).toBe(4); + + const pending = pendingJobs.find(item => item.nodeId === n3.id ); + pending.set('result', 123); + await execution.resume(pending); + + expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED); + const jobs = await execution.getJobs({ order: [['id', 'ASC']] }); + expect(jobs.length).toEqual(5); + }); + + it('parallel branches contains condition', async () => { + const n1 = await workflow.createNode({ + title: 'parallel', + type: 'parallel' + }); + + const n2 = await workflow.createNode({ + title: 'prompt', + type: 'prompt', + upstreamId: n1.id, + branchIndex: 0 + }); + + const n3 = await workflow.createNode({ + title: 'condition', + type: 'condition', + upstreamId: n1.id, + branchIndex: 1 + }); + + const n4 = await workflow.createNode({ + title: 'condition echo', + type: 'echo', + upstreamId: n3.id, + branchIndex: BRANCH_INDEX.ON_TRUE + }); + + const n5 = await workflow.createNode({ + title: 'last echo', + type: 'echo', + upstreamId: n1.id + }); + + await n1.setDownstream(n5); + + const post = await PostModel.create({ title: 't1' }); + + const [execution] = await workflow.getExecutions(); + expect(execution.status).toEqual(EXECUTION_STATUS.STARTED); + + const pendingJobs = await execution.getJobs(); + expect(pendingJobs.length).toBe(4); + + const pending = pendingJobs.find(item => item.nodeId === n2.id ); + pending.set('result', 123); + await execution.resume(pending); + + expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED); + const jobs = await execution.getJobs({ order: [['id', 'ASC']] }); + expect(jobs.length).toEqual(5); + }); + }); }); diff --git a/packages/plugin-workflow/src/__tests__/index.ts b/packages/plugin-workflow/src/__tests__/index.ts index b097af4a54..65b0265370 100644 --- a/packages/plugin-workflow/src/__tests__/index.ts +++ b/packages/plugin-workflow/src/__tests__/index.ts @@ -5,6 +5,12 @@ import plugin from '../server'; import { registerInstruction } from '../instructions'; import { JOB_STATUS } from '../constants'; +export function sleep(ms: number) { + return new Promise(resolve => { + setTimeout(resolve, ms); + }); +} + export async function getApp(options = {}): Promise { const app = mockServer(options); diff --git a/packages/plugin-workflow/src/__tests__/instructions/condition.test.ts b/packages/plugin-workflow/src/__tests__/instructions/condition.test.ts index 9fe7f9bf7e..5fbdcbb17f 100644 --- a/packages/plugin-workflow/src/__tests__/instructions/condition.test.ts +++ b/packages/plugin-workflow/src/__tests__/instructions/condition.test.ts @@ -22,8 +22,9 @@ describe('workflow > instructions > condition', () => { workflow = await WorkflowModel.create({ title: 'test workflow', enabled: true, - type: 'afterCreate', + type: 'model', config: { + mode: 1, collection: 'posts' } }); @@ -75,15 +76,6 @@ describe('workflow > instructions > condition', () => { }); it('calculation to false downstream', async () => { - const workflow = await WorkflowModel.create({ - title: 'condition workflow', - enabled: true, - type: 'afterCreate', - config: { - collection: 'posts' - } - }); - const n1 = await workflow.createNode({ title: 'condition', type: 'condition', diff --git a/packages/plugin-workflow/src/instructions/condition.ts b/packages/plugin-workflow/src/instructions/condition.ts index db15e5342b..b38df688c0 100644 --- a/packages/plugin-workflow/src/instructions/condition.ts +++ b/packages/plugin-workflow/src/instructions/condition.ts @@ -95,22 +95,13 @@ export default { const savedJob = await execution.saveJob(job); - // return execution.exec(branchNode, savedJob); - const tailJob = await execution.exec(branchNode, savedJob); - - if (tailJob.status === JOB_STATUS.PENDING) { - savedJob.set('status', JOB_STATUS.PENDING); - return savedJob; - } - - return tailJob; + return execution.exec(branchNode, savedJob); }, async resume(this, branchJob, execution) { if (branchJob.status === JOB_STATUS.RESOLVED) { - const job = execution.findBranchParentJob(branchJob, this); - job.set('status', JOB_STATUS.RESOLVED); - return job; + // return to continue this.downstream + return branchJob; } // pass control to upper scope by ending current scope diff --git a/packages/plugin-workflow/src/instructions/index.ts b/packages/plugin-workflow/src/instructions/index.ts index 6f4026d20d..65a97e4839 100644 --- a/packages/plugin-workflow/src/instructions/index.ts +++ b/packages/plugin-workflow/src/instructions/index.ts @@ -3,7 +3,7 @@ import FlowNodeModel from "../models/FlowNode"; import prompt from './prompt'; import condition from './condition'; -// import parallel from './parallel'; +import parallel from './parallel'; export interface Job { status: number; @@ -41,4 +41,4 @@ export function registerInstruction(key: string, instruction: any) { registerInstruction('prompt', prompt); registerInstruction('condition', condition); -// registerInstruction('parallel', parallel); +registerInstruction('parallel', parallel); diff --git a/packages/plugin-workflow/src/instructions/parallel.ts b/packages/plugin-workflow/src/instructions/parallel.ts new file mode 100644 index 0000000000..9c3a775f42 --- /dev/null +++ b/packages/plugin-workflow/src/instructions/parallel.ts @@ -0,0 +1,92 @@ +import { JOB_STATUS } from "../constants"; +import ExecutionModel from "../models/Execution"; +import FlowNodeModel from "../models/FlowNode"; +import JobModel from "../models/Job"; + +export const PARALLEL_MODE = { + ALL: 'all', + ANY: 'any', + RACE: 'race' +} as const; + +const StatusGetters = { + [PARALLEL_MODE.ALL](result) { + if (result.some(j => j && j.status === JOB_STATUS.REJECTED)) { + return JOB_STATUS.REJECTED; + } + if (result.every(j => j && j.status === JOB_STATUS.RESOLVED)) { + return JOB_STATUS.RESOLVED; + } + return JOB_STATUS.PENDING; + }, + [PARALLEL_MODE.ANY](result) { + return result.some(j => j && j.status === JOB_STATUS.RESOLVED) + ? JOB_STATUS.RESOLVED + : ( + result.some(j => j && j.status === JOB_STATUS.PENDING) + ? JOB_STATUS.PENDING + : JOB_STATUS.REJECTED + ) + }, + [PARALLEL_MODE.RACE](result) { + return result.some(j => j && j.status === JOB_STATUS.RESOLVED) + ? JOB_STATUS.RESOLVED + : ( + result.some(j => j && j.status === JOB_STATUS.REJECTED) + ? JOB_STATUS.REJECTED + : JOB_STATUS.PENDING + ) + } +}; + +export default { + async run(this: FlowNodeModel, prevJob: JobModel, execution: ExecutionModel) { + const branches = execution.nodes + .filter(item => item.upstream === this && item.branchIndex !== null) + .sort((a, b) => a.branchIndex - b.branchIndex); + + const job = await execution.saveJob({ + status: JOB_STATUS.PENDING, + result: Array(branches.length).fill(null), + nodeId: this.id, + upstreamId: prevJob?.id ?? null + }); + + // NOTE: + // use `reduce` but not `Promise.all` here to avoid racing manupulating db. + // for users, this is almost equivalent to `Promise.all`, + // because of the delay is not significant sensible. + // another better aspect of this is, it could handle sequenced branches in future. + await branches.reduce((promise: Promise, branch) => promise.then(() => execution.exec(branch, job)), Promise.resolve()); + + return execution.end(this, job); + }, + + async resume(this, branchJob, execution: ExecutionModel) { + const job = execution.findBranchParentJob(branchJob, this); + + const { result, status } = job; + // if parallel has been done (resolved / rejected), do not care newly executed branch jobs. + if (status !== JOB_STATUS.PENDING) { + return null; + } + + // find the index of the node which start the branch + const jobNode = execution.nodesMap.get(branchJob.nodeId); + const { branchIndex } = execution.findBranchStartNode(jobNode); + const { mode = PARALLEL_MODE.ALL } = this.config || {}; + + const newResult = [...result.slice(0, branchIndex), branchJob.get(), ...result.slice(branchIndex + 1)]; + job.set({ + result: newResult, + status: StatusGetters[mode](newResult) + }); + + if (job.status === JOB_STATUS.PENDING) { + await job.save({ transaction: execution.transaction }); + return execution.end(this, job); + } + + return job; + } +} diff --git a/packages/plugin-workflow/src/models/Execution.ts b/packages/plugin-workflow/src/models/Execution.ts index 62d3dc8395..26dc50dc1d 100644 --- a/packages/plugin-workflow/src/models/Execution.ts +++ b/packages/plugin-workflow/src/models/Execution.ts @@ -2,7 +2,8 @@ import { Model, BelongsToGetAssociationMixin, Optional, - HasManyGetAssociationsMixin + HasManyGetAssociationsMixin, + Transaction } from 'sequelize'; import Database from '@nocobase/database'; @@ -22,11 +23,15 @@ interface ExecutionAttributes { interface ExecutionCreationAttributes extends Optional {} +export interface ExecutionOptions { + transaction?: Transaction; +} + export default class ExecutionModel extends Model implements ExecutionAttributes { - declare readonly database: Database; + declare static readonly database: Database; declare id: number; declare title: string; @@ -42,10 +47,20 @@ export default class ExecutionModel declare jobs?: JobModel[]; declare getJobs: HasManyGetAssociationsMixin; + options: ExecutionOptions; + transaction: Transaction; + nodes: Array = []; nodesMap = new Map(); jobsMap = new Map(); + static StatusMap = { + [JOB_STATUS.PENDING]: EXECUTION_STATUS.STARTED, + [JOB_STATUS.RESOLVED]: EXECUTION_STATUS.RESOLVED, + [JOB_STATUS.REJECTED]: EXECUTION_STATUS.REJECTED, + [JOB_STATUS.CANCELLED]: EXECUTION_STATUS.CANCELLED, + }; + // make dual linked nodes list then cache makeNodes(nodes = []) { this.nodes = nodes; @@ -71,44 +86,60 @@ export default class ExecutionModel }); } - async prepare() { + async prepare(options) { if (this.status !== EXECUTION_STATUS.STARTED) { throw new Error(`execution was ended with status ${this.status}`); } + this.options = options || {}; + const { transaction = await (this.constructor).database.sequelize.transaction() } = this.options; + this.transaction = transaction; + if (!this.workflow) { - this.workflow = await this.getWorkflow(); + this.workflow = await this.getWorkflow({ transaction }); } - const nodes = await this.workflow.getNodes(); + const nodes = await this.workflow.getNodes({ transaction }); this.makeNodes(nodes); - const jobs = await this.getJobs(); + const jobs = await this.getJobs({ transaction }); this.makeJobs(jobs); } - async start(options) { - await this.prepare(); - if (!this.nodes.length) { - return this.exit(); + async start(options: ExecutionOptions) { + await this.prepare(options); + if (this.nodes.length) { + const head = this.nodes.find(item => !item.upstream); + await this.exec(head, { result: this.context }); + } else { + await this.exit(null); } - const head = this.nodes.find(item => !item.upstream); - return this.exec(head, { result: this.context }); + await this.commit(); } - async resume(job, options) { - await this.prepare(); + async resume(job: JobModel, options: ExecutionOptions) { + await this.prepare(options); const node = this.nodesMap.get(job.nodeId); - return this.recall(node, job); + await this.recall(node, job); + await this.commit(); } - private async run(instruction, node, prevJob) { + private async commit() { + if (!this.options || !this.options.transaction) { + await this.transaction.commit(); + } + } + + private async run(instruction, node: FlowNodeModel, prevJob) { let job; try { // call instruction to get result and status job = await instruction.call(node, prevJob, this); + if (!job) { + return null; + } } catch (err) { // for uncaught error, set to rejected job = { @@ -122,11 +153,11 @@ export default class ExecutionModel } } - let savedJob; + let savedJob: JobModel; // TODO(optimize): many checking of resuming or new could be improved // could be implemented separately in exec() / resume() if (job instanceof Model) { - savedJob = await job.save(); + savedJob = await job.save({ transaction: this.transaction }) as JobModel; } else { const upstreamId = prevJob instanceof Model ? prevJob.get('id') : null; savedJob = await this.saveJob({ @@ -173,32 +204,36 @@ export default class ExecutionModel return this.run(resume, node, job); } - async exit(job?: JobModel) { - const executionStatusMap = { - [JOB_STATUS.PENDING]: EXECUTION_STATUS.STARTED, - [JOB_STATUS.RESOLVED]: EXECUTION_STATUS.RESOLVED, - [JOB_STATUS.REJECTED]: EXECUTION_STATUS.REJECTED, - [JOB_STATUS.CANCELLED]: EXECUTION_STATUS.CANCELLED, - }; - const status = job ? executionStatusMap[job.status] : EXECUTION_STATUS.RESOLVED; - await this.update({ status }); - return job; + async exit(job: JobModel | null) { + const status = job ? ExecutionModel.StatusMap[job.status] : EXECUTION_STATUS.RESOLVED; + await this.update({ status }, { transaction: this.transaction }); + return null; } // TODO(optimize) async saveJob(payload) { - // @ts-ignore - const { database } = this.constructor; + const { database } = this.constructor; const { model } = database.getCollection('jobs'); const [result] = await model.upsert({ ...payload, executionId: this.id - }) as [JobModel, boolean | null]; + }, { transaction: this.transaction }) as [JobModel, boolean | null]; this.jobsMap.set(result.id, result); return result; } + // find the first node in current branch + findBranchStartNode(node: FlowNodeModel): FlowNodeModel | null { + for (let n = node; n; n = n.upstream) { + if (n.branchIndex !== null) { + return n; + } + } + return null; + } + + // find the node start current branch findBranchParentNode(node: FlowNodeModel): FlowNodeModel | null { for (let n = node; n; n = n.upstream) { if (n.branchIndex !== null) {