diff --git a/packages/plugins/@nocobase/plugin-workflow-delay/src/server/DelayInstruction.ts b/packages/plugins/@nocobase/plugin-workflow-delay/src/server/DelayInstruction.ts index a4d476a6a2..46e3d2a3ea 100644 --- a/packages/plugins/@nocobase/plugin-workflow-delay/src/server/DelayInstruction.ts +++ b/packages/plugins/@nocobase/plugin-workflow-delay/src/server/DelayInstruction.ts @@ -108,7 +108,7 @@ export default class extends Instruction { // add to schedule this.schedule(job); - return processor.exit(); + return null; } async resume(node, prevJob, processor: Processor) { diff --git a/packages/plugins/@nocobase/plugin-workflow/package.json b/packages/plugins/@nocobase/plugin-workflow/package.json index 1ab336fbbe..07ab37bbfb 100644 --- a/packages/plugins/@nocobase/plugin-workflow/package.json +++ b/packages/plugins/@nocobase/plugin-workflow/package.json @@ -24,6 +24,7 @@ "dayjs": "^1.11.8", "lodash": "4.17.21", "lru-cache": "8.0.5", + "nodejs-snowflake": "2.0.1", "react": "18.x", "react-i18next": "^11.15.1", "react-js-cron": "^3.1.0", diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/Dispatcher.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/Dispatcher.ts new file mode 100644 index 0000000000..bd93f359c9 --- /dev/null +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/Dispatcher.ts @@ -0,0 +1,12 @@ +/** + * This file is part of the NocoBase (R) project. + * Copyright (c) 2020-2024 NocoBase Co., Ltd. + * Authors: NocoBase Team. + * + * This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License. + * For more information, please refer to: https://www.nocobase.com/agreement. + */ + +export default class Dispatcher { + constructor() {} +} diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts index bf68d346ad..f15008b422 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts @@ -10,6 +10,7 @@ import path from 'path'; import { randomUUID } from 'crypto'; +import { Snowflake } from 'nodejs-snowflake'; import { Transaction, Transactionable } from 'sequelize'; import LRUCache from 'lru-cache'; @@ -61,6 +62,7 @@ export default class PluginWorkflowServer extends Plugin { triggers: Registry = new Registry(); functions: Registry = new Registry(); enabledCache: Map = new Map(); + snowflake: Snowflake; private ready = false; private executing: Promise | null = null; @@ -219,6 +221,14 @@ export default class PluginWorkflowServer extends Plugin { WorkflowRepository, WorkflowTasksRepository, }); + + const PluginRepo = this.db.getRepository('applicationPlugins'); + const pluginRecord = await PluginRepo.findOne({ + filter: { name: this.name }, + }); + this.snowflake = new Snowflake({ + custom_epoch: pluginRecord?.createdAt.getTime(), + }); } /** diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/Processor.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/Processor.ts index 1a6d20dd79..ebb510e863 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/Processor.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/Processor.ts @@ -56,15 +56,9 @@ export default class Processor { */ nodesMap = new Map(); - /** - * @experimental - */ - jobsMap = new Map(); - - /** - * @experimental - */ - jobsMapByNodeKey: { [key: string]: any } = {}; + private jobsMapByNodeKey: { [key: string]: JobModel } = {}; + private jobResultsMapByNodeKey: { [key: string]: any } = {}; + private jobsToSave: Map = new Map(); /** * @experimental @@ -100,10 +94,9 @@ export default class Processor { private makeJobs(jobs: Array) { jobs.forEach((job) => { - this.jobsMap.set(job.id, job); - const node = this.nodesMap.get(job.nodeId); - this.jobsMapByNodeKey[node.key] = job.result; + this.jobsMapByNodeKey[node.key] = job; + this.jobResultsMapByNodeKey[node.key] = job.result; }); } @@ -192,11 +185,11 @@ export default class Processor { } if (!(job instanceof Model)) { - job.upstreamId = prevJob instanceof Model ? prevJob.get('id') : null; + // job.upstreamId = prevJob instanceof Model ? prevJob.get('id') : null; job.nodeId = node.id; job.nodeKey = node.key; } - const savedJob = await this.saveJob(job); + const savedJob = this.saveJob(job); this.logger.info( `execution (${this.execution.id}) run instruction [${node.type}] for node (${node.id}) finished as status: ${savedJob.status}`, @@ -258,6 +251,27 @@ export default class Processor { } public async exit(s?: number) { + if (this.jobsToSave.size) { + const newJobs = []; + for (const job of this.jobsToSave.values()) { + if (job.isNewRecord) { + newJobs.push(job); + } else { + await job.save({ transaction: this.mainTransaction }); + } + } + if (newJobs.length) { + const JobsModel = this.options.plugin.db.getModel('jobs'); + await JobsModel.bulkCreate( + newJobs.map((job) => job.toJSON()), + { transaction: this.mainTransaction }, + ); + for (const job of newJobs) { + job.isNewRecord = false; + } + } + this.jobsToSave.clear(); + } if (typeof s === 'number') { const status = (this.constructor).StatusMap[s] ?? Math.sign(s); await this.execution.update({ status }, { transaction: this.mainTransaction }); @@ -269,33 +283,30 @@ export default class Processor { return null; } - // TODO(optimize) /** * @experimental */ - async saveJob(payload: JobModel | Record): Promise { + saveJob(payload: JobModel | Record): JobModel { const { database } = this.execution.constructor; - const { mainTransaction: transaction } = this; const { model } = database.getCollection('jobs'); let job; if (payload instanceof model) { - job = await payload.save({ transaction }); - } else if (payload.id) { - job = await model.findByPk(payload.id, { transaction }); - await job.update(payload, { transaction }); + job = payload; + job.set('updatedAt', new Date()); } else { - job = await model.create( - { - ...payload, - executionId: this.execution.id, - }, - { transaction }, - ); + job = model.build({ + ...payload, + id: this.options.plugin.snowflake.getUniqueID().toString(), + createdAt: new Date(), + updatedAt: new Date(), + executionId: this.execution.id, + }); } - this.jobsMap.set(job.id, job); + this.jobsToSave.set(job.id, job); this.lastSavedJob = job; - this.jobsMapByNodeKey[job.nodeKey] = job.result; + this.jobsMapByNodeKey[job.nodeKey] = job; + this.jobResultsMapByNodeKey[job.nodeKey] = job.result; return job; } @@ -357,32 +368,20 @@ export default class Processor { * @experimental */ findBranchParentJob(job: JobModel, node: FlowNodeModel): JobModel | null { - for (let j: JobModel | undefined = job; j; j = this.jobsMap.get(j.upstreamId)) { - if (j.nodeId === node.id) { - return j; - } - } - return null; + return this.jobsMapByNodeKey[node.key]; } /** * @experimental */ findBranchLastJob(node: FlowNodeModel, job: JobModel): JobModel | null { - const allJobs = Array.from(this.jobsMap.values()); + const allJobs = Object.values(this.jobsMapByNodeKey); const branchJobs = []; for (let n = this.findBranchEndNode(node); n && n !== node.upstream; n = n.upstream) { branchJobs.push(...allJobs.filter((item) => item.nodeId === n.id)); } - branchJobs.sort((a, b) => a.createdAt.getTime() - b.createdAt.getTime()); - for (let i = branchJobs.length - 1; i >= 0; i -= 1) { - for (let j = branchJobs[i]; j && j.id !== job.id; j = this.jobsMap.get(j.upstreamId)) { - if (j.upstreamId === job.id) { - return branchJobs[i]; - } - } - } - return null; + branchJobs.sort((a, b) => a.updatedAt.getTime() - b.updatedAt.getTime()); + return branchJobs[branchJobs.length - 1] || null; } /** @@ -403,13 +402,13 @@ export default class Processor { for (let n = includeSelfScope ? node : this.findBranchParentNode(node); n; n = this.findBranchParentNode(n)) { const instruction = this.options.plugin.instructions.get(n.type); if (typeof instruction?.getScope === 'function') { - $scopes[n.id] = $scopes[n.key] = instruction.getScope(n, this.jobsMapByNodeKey[n.key], this); + $scopes[n.id] = $scopes[n.key] = instruction.getScope(n, this.jobResultsMapByNodeKey[n.key], this); } } return { $context: this.execution.context, - $jobsMapByNodeKey: this.jobsMapByNodeKey, + $jobsMapByNodeKey: this.jobResultsMapByNodeKey, $system: systemFns, $scopes, $env: this.options.plugin.app.environment.getVariables(),