mirror of
https://gitee.com/nocobase/nocobase.git
synced 2025-07-02 03:02:19 +08:00
413 lines
12 KiB
TypeScript
413 lines
12 KiB
TypeScript
/**
|
|
* This file is part of the NocoBase (R) project.
|
|
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
|
|
* Authors: NocoBase Team.
|
|
*
|
|
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
|
|
* For more information, please refer to: https://www.nocobase.com/agreement.
|
|
*/
|
|
|
|
import { Model, Transaction, Transactionable } from '@nocobase/database';
|
|
import { appendArrayColumn } from '@nocobase/evaluators';
|
|
import { Logger } from '@nocobase/logger';
|
|
import { parse } from '@nocobase/utils';
|
|
import set from 'lodash/set';
|
|
import type Plugin from './Plugin';
|
|
import { EXECUTION_STATUS, JOB_STATUS } from './constants';
|
|
import { Runner } from './instructions';
|
|
import type { ExecutionModel, FlowNodeModel, JobModel } from './types';
|
|
|
|
export interface ProcessorOptions extends Transactionable {
|
|
plugin: Plugin;
|
|
[key: string]: any;
|
|
}
|
|
|
|
export default class Processor {
|
|
static StatusMap = {
|
|
[JOB_STATUS.PENDING]: EXECUTION_STATUS.STARTED,
|
|
[JOB_STATUS.RESOLVED]: EXECUTION_STATUS.RESOLVED,
|
|
[JOB_STATUS.FAILED]: EXECUTION_STATUS.FAILED,
|
|
[JOB_STATUS.ERROR]: EXECUTION_STATUS.ERROR,
|
|
[JOB_STATUS.ABORTED]: EXECUTION_STATUS.ABORTED,
|
|
[JOB_STATUS.CANCELED]: EXECUTION_STATUS.CANCELED,
|
|
[JOB_STATUS.REJECTED]: EXECUTION_STATUS.REJECTED,
|
|
[JOB_STATUS.RETRY_NEEDED]: EXECUTION_STATUS.RETRY_NEEDED,
|
|
};
|
|
|
|
logger: Logger;
|
|
|
|
/**
|
|
* @experimental
|
|
*/
|
|
transaction: Transaction;
|
|
|
|
/**
|
|
* @experimental
|
|
*/
|
|
nodes: FlowNodeModel[] = [];
|
|
|
|
/**
|
|
* @experimental
|
|
*/
|
|
nodesMap = new Map<number, FlowNodeModel>();
|
|
|
|
/**
|
|
* @experimental
|
|
*/
|
|
jobsMap = new Map<number, JobModel>();
|
|
|
|
/**
|
|
* @experimental
|
|
*/
|
|
jobsMapByNodeKey: { [key: string]: any } = {};
|
|
|
|
/**
|
|
* @experimental
|
|
*/
|
|
lastSavedJob: JobModel | null = null;
|
|
|
|
constructor(
|
|
public execution: ExecutionModel,
|
|
public options: ProcessorOptions,
|
|
) {
|
|
this.logger = options.plugin.getLogger(execution.workflowId);
|
|
this.transaction = options.transaction;
|
|
}
|
|
|
|
// make dual linked nodes list then cache
|
|
private makeNodes(nodes: FlowNodeModel[] = []) {
|
|
this.nodes = nodes;
|
|
|
|
nodes.forEach((node) => {
|
|
this.nodesMap.set(node.id, node);
|
|
});
|
|
|
|
nodes.forEach((node) => {
|
|
if (node.upstreamId) {
|
|
node.upstream = this.nodesMap.get(node.upstreamId) as FlowNodeModel;
|
|
}
|
|
|
|
if (node.downstreamId) {
|
|
node.downstream = this.nodesMap.get(node.downstreamId) as FlowNodeModel;
|
|
}
|
|
});
|
|
}
|
|
|
|
private makeJobs(jobs: Array<JobModel>) {
|
|
jobs.forEach((job) => {
|
|
this.jobsMap.set(job.id, job);
|
|
|
|
const node = this.nodesMap.get(job.nodeId);
|
|
this.jobsMapByNodeKey[node.key] = job.result;
|
|
});
|
|
}
|
|
|
|
public async prepare() {
|
|
const {
|
|
execution,
|
|
transaction,
|
|
options: { plugin },
|
|
} = this;
|
|
if (!execution.workflow) {
|
|
execution.workflow = plugin.enabledCache.get(execution.workflowId);
|
|
}
|
|
|
|
const nodes = await execution.workflow.getNodes({ transaction });
|
|
|
|
this.makeNodes(nodes);
|
|
|
|
const jobs = await execution.getJobs({
|
|
order: [['id', 'ASC']],
|
|
transaction,
|
|
});
|
|
|
|
this.makeJobs(jobs);
|
|
}
|
|
|
|
public async start() {
|
|
const { execution } = this;
|
|
if (execution.status !== EXECUTION_STATUS.STARTED) {
|
|
throw new Error(`execution was ended with status ${execution.status} before, could not be started again`);
|
|
}
|
|
await this.prepare();
|
|
if (this.nodes.length) {
|
|
const head = this.nodes.find((item) => !item.upstream);
|
|
await this.run(head, { result: execution.context });
|
|
} else {
|
|
await this.exit(JOB_STATUS.RESOLVED);
|
|
}
|
|
}
|
|
|
|
public async resume(job: JobModel) {
|
|
const { execution } = this;
|
|
if (execution.status !== EXECUTION_STATUS.STARTED) {
|
|
throw new Error(`execution was ended with status ${execution.status} before, could not be resumed`);
|
|
}
|
|
await this.prepare();
|
|
const node = this.nodesMap.get(job.nodeId);
|
|
await this.recall(node, job);
|
|
}
|
|
|
|
private async exec(instruction: Runner, node: FlowNodeModel, prevJob) {
|
|
let job;
|
|
try {
|
|
// call instruction to get result and status
|
|
this.logger.info(`execution (${this.execution.id}) run instruction [${node.type}] for node (${node.id})`);
|
|
this.logger.debug(`config of node`, { data: node.config });
|
|
job = await instruction(node, prevJob, this);
|
|
if (!job) {
|
|
return null;
|
|
}
|
|
} catch (err) {
|
|
// for uncaught error, set to error
|
|
this.logger.error(
|
|
`execution (${this.execution.id}) run instruction [${node.type}] for node (${node.id}) failed: `,
|
|
err,
|
|
);
|
|
job = {
|
|
result:
|
|
err instanceof Error
|
|
? {
|
|
message: err.message,
|
|
...err,
|
|
}
|
|
: err,
|
|
status: JOB_STATUS.ERROR,
|
|
};
|
|
// if previous job is from resuming
|
|
if (prevJob && prevJob.nodeId === node.id) {
|
|
prevJob.set(job);
|
|
job = prevJob;
|
|
}
|
|
}
|
|
|
|
if (!(job instanceof Model)) {
|
|
job.upstreamId = prevJob instanceof Model ? prevJob.get('id') : null;
|
|
job.nodeId = node.id;
|
|
job.nodeKey = node.key;
|
|
}
|
|
const savedJob = await this.saveJob(job);
|
|
|
|
this.logger.info(
|
|
`execution (${this.execution.id}) run instruction [${node.type}] for node (${node.id}) finished as status: ${savedJob.status}`,
|
|
);
|
|
this.logger.debug(`result of node`, { data: savedJob.result });
|
|
|
|
if (savedJob.status === JOB_STATUS.RESOLVED && node.downstream) {
|
|
// run next node
|
|
this.logger.debug(`run next node (${node.downstreamId})`);
|
|
return this.run(node.downstream, savedJob);
|
|
}
|
|
|
|
// all nodes in scope have been executed
|
|
return this.end(node, savedJob);
|
|
}
|
|
|
|
public async run(node, input?) {
|
|
const { instructions } = this.options.plugin;
|
|
const instruction = instructions.get(node.type);
|
|
if (typeof instruction.run !== 'function') {
|
|
return Promise.reject(new Error('`run` should be implemented for customized execution of the node'));
|
|
}
|
|
|
|
return this.exec(instruction.run.bind(instruction), node, input);
|
|
}
|
|
|
|
// parent node should take over the control
|
|
public async end(node, job: JobModel) {
|
|
this.logger.debug(`branch ended at node (${node.id})`);
|
|
const parentNode = this.findBranchParentNode(node);
|
|
// no parent, means on main flow
|
|
if (parentNode) {
|
|
this.logger.debug(`not on main, recall to parent entry node (${node.id})})`);
|
|
await this.recall(parentNode, job);
|
|
return job;
|
|
}
|
|
|
|
// really done for all nodes
|
|
// * should mark execution as done with last job status
|
|
return this.exit(job.status);
|
|
}
|
|
|
|
private async recall(node, job) {
|
|
const { instructions } = this.options.plugin;
|
|
const instruction = instructions.get(node.type);
|
|
if (typeof instruction.resume !== 'function') {
|
|
return Promise.reject(
|
|
new Error(`"resume" method should be implemented for [${node.type}] instruction of node (#${node.id})`),
|
|
);
|
|
}
|
|
|
|
return this.exec(instruction.resume.bind(instruction), node, job);
|
|
}
|
|
|
|
public async exit(s?: number) {
|
|
if (typeof s === 'number') {
|
|
const status = (<typeof Processor>this.constructor).StatusMap[s] ?? Math.sign(s);
|
|
await this.execution.update({ status }, { transaction: this.transaction });
|
|
}
|
|
this.logger.info(`execution (${this.execution.id}) exiting with status ${this.execution.status}`);
|
|
return null;
|
|
}
|
|
|
|
// TODO(optimize)
|
|
/**
|
|
* @experimental
|
|
* @param {JobModel | Record<string, any>} payload
|
|
* @returns {JobModel}
|
|
*/
|
|
async saveJob(payload) {
|
|
const { database } = <typeof ExecutionModel>this.execution.constructor;
|
|
const { transaction } = this;
|
|
const { model } = database.getCollection('jobs');
|
|
let job;
|
|
if (payload instanceof model) {
|
|
job = await payload.save({ transaction });
|
|
} else if (payload.id) {
|
|
job = await model.findByPk(payload.id, { transaction });
|
|
await job.update(payload, { transaction });
|
|
} else {
|
|
job = await model.create(
|
|
{
|
|
...payload,
|
|
executionId: this.execution.id,
|
|
},
|
|
{ transaction },
|
|
);
|
|
}
|
|
this.jobsMap.set(job.id, job);
|
|
|
|
this.lastSavedJob = job;
|
|
this.jobsMapByNodeKey[job.nodeKey] = job.result;
|
|
|
|
return job;
|
|
}
|
|
|
|
/**
|
|
* @experimental
|
|
*/
|
|
getBranches(node: FlowNodeModel): FlowNodeModel[] {
|
|
return this.nodes
|
|
.filter((item) => item.upstream === node && item.branchIndex !== null)
|
|
.sort((a, b) => Number(a.branchIndex) - Number(b.branchIndex));
|
|
}
|
|
|
|
/**
|
|
* @experimental
|
|
* find the first node in current branch
|
|
*/
|
|
findBranchStartNode(node: FlowNodeModel, parent?: FlowNodeModel): FlowNodeModel | null {
|
|
for (let n = node; n; n = n.upstream) {
|
|
if (!parent) {
|
|
if (n.branchIndex !== null) {
|
|
return n;
|
|
}
|
|
} else {
|
|
if (n.upstream === parent) {
|
|
return n;
|
|
}
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
/**
|
|
* @experimental
|
|
* find the node start current branch
|
|
*/
|
|
findBranchParentNode(node: FlowNodeModel): FlowNodeModel | null {
|
|
for (let n = node; n; n = n.upstream) {
|
|
if (n.branchIndex !== null) {
|
|
return n.upstream;
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
/**
|
|
* @experimental
|
|
*/
|
|
findBranchEndNode(node: FlowNodeModel): FlowNodeModel | null {
|
|
for (let n = node; n; n = n.downstream) {
|
|
if (!n.downstream) {
|
|
return n;
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
/**
|
|
* @experimental
|
|
*/
|
|
findBranchParentJob(job: JobModel, node: FlowNodeModel): JobModel | null {
|
|
for (let j: JobModel | undefined = job; j; j = this.jobsMap.get(j.upstreamId)) {
|
|
if (j.nodeId === node.id) {
|
|
return j;
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
/**
|
|
* @experimental
|
|
*/
|
|
findBranchLastJob(node: FlowNodeModel, job: JobModel): JobModel | null {
|
|
const allJobs = Array.from(this.jobsMap.values());
|
|
const branchJobs = [];
|
|
for (let n = this.findBranchEndNode(node); n && n !== node.upstream; n = n.upstream) {
|
|
branchJobs.push(...allJobs.filter((item) => item.nodeId === n.id));
|
|
}
|
|
branchJobs.sort((a, b) => a.createdAt.getTime() - b.createdAt.getTime());
|
|
for (let i = branchJobs.length - 1; i >= 0; i -= 1) {
|
|
for (let j = branchJobs[i]; j && j.id !== job.id; j = this.jobsMap.get(j.upstreamId)) {
|
|
if (j.upstreamId === job.id) {
|
|
return branchJobs[i];
|
|
}
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
/**
|
|
* @experimental
|
|
*/
|
|
public getScope(sourceNodeId: number) {
|
|
const node = this.nodesMap.get(sourceNodeId);
|
|
const systemFns = {};
|
|
const scope = {
|
|
execution: this.execution,
|
|
node,
|
|
};
|
|
for (const [name, fn] of this.options.plugin.functions.getEntities()) {
|
|
set(systemFns, name, fn.bind(scope));
|
|
}
|
|
|
|
const $scopes = {};
|
|
for (let n = this.findBranchParentNode(node); n; n = this.findBranchParentNode(n)) {
|
|
const instruction = this.options.plugin.instructions.get(n.type);
|
|
if (typeof instruction.getScope === 'function') {
|
|
$scopes[n.id] = $scopes[n.key] = instruction.getScope(n, this.jobsMapByNodeKey[n.key], this);
|
|
}
|
|
}
|
|
|
|
return {
|
|
$context: this.execution.context,
|
|
$jobsMapByNodeKey: this.jobsMapByNodeKey,
|
|
$system: systemFns,
|
|
$scopes,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* @experimental
|
|
*/
|
|
public getParsedValue(value, sourceNodeId: number, additionalScope?: object) {
|
|
const template = parse(value);
|
|
const scope = Object.assign(this.getScope(sourceNodeId), additionalScope);
|
|
template.parameters.forEach(({ key }) => {
|
|
appendArrayColumn(scope, key);
|
|
});
|
|
return template(scope);
|
|
}
|
|
}
|