mirror of
https://gitee.com/nocobase/nocobase.git
synced 2025-05-05 05:29:26 +08:00
refactor(plugin-workflow-delay): support to use variable for duration (#6621)
This commit is contained in:
parent
6f4e27a286
commit
86380b2ba5
@ -8,11 +8,10 @@
|
||||
*/
|
||||
|
||||
import React from 'react';
|
||||
import { InputNumber, Select } from 'antd';
|
||||
import { Space } from 'antd';
|
||||
import { HourglassOutlined } from '@ant-design/icons';
|
||||
|
||||
import { css, useCompile } from '@nocobase/client';
|
||||
import { Instruction, JOB_STATUS } from '@nocobase/plugin-workflow/client';
|
||||
import { Instruction, JOB_STATUS, WorkflowVariableInput } from '@nocobase/plugin-workflow/client';
|
||||
|
||||
import { NAMESPACE } from '../locale';
|
||||
|
||||
@ -24,47 +23,6 @@ const UnitOptions = [
|
||||
{ value: 604800_000, label: `{{t('Weeks', { ns: "workflow" })}}` },
|
||||
];
|
||||
|
||||
function getNumberOption(v) {
|
||||
return UnitOptions.slice()
|
||||
.reverse()
|
||||
.find((item) => !(v % item.value));
|
||||
}
|
||||
|
||||
function Duration({ value = 60000, onChange }) {
|
||||
const compile = useCompile();
|
||||
const option = getNumberOption(value);
|
||||
const quantity = Math.round(value / option.value);
|
||||
|
||||
return (
|
||||
<fieldset
|
||||
className={css`
|
||||
display: flex;
|
||||
gap: 0.5em;
|
||||
`}
|
||||
>
|
||||
<InputNumber
|
||||
min={1}
|
||||
value={quantity}
|
||||
onChange={(v) => onChange(Math.round(v * option.value))}
|
||||
className="auto-width"
|
||||
/>
|
||||
<Select
|
||||
// @ts-ignore
|
||||
role="button"
|
||||
data-testid="select-time-unit"
|
||||
popupMatchSelectWidth={false}
|
||||
value={option.value}
|
||||
onChange={(unit) => onChange(Math.round(quantity * unit))}
|
||||
className="auto-width"
|
||||
options={UnitOptions.map((item) => ({
|
||||
value: item.value,
|
||||
label: compile(item.label),
|
||||
}))}
|
||||
/>
|
||||
</fieldset>
|
||||
);
|
||||
}
|
||||
|
||||
export default class extends Instruction {
|
||||
title = `{{t("Delay", { ns: "${NAMESPACE}" })}}`;
|
||||
type = 'delay';
|
||||
@ -73,12 +31,37 @@ export default class extends Instruction {
|
||||
icon = (<HourglassOutlined style={{}} />);
|
||||
fieldset = {
|
||||
duration: {
|
||||
type: 'number',
|
||||
type: 'void',
|
||||
title: `{{t("Duration", { ns: "${NAMESPACE}" })}}`,
|
||||
'x-decorator': 'FormItem',
|
||||
'x-component': 'Duration',
|
||||
default: 60000,
|
||||
'x-component': 'Space.Compact',
|
||||
required: true,
|
||||
properties: {
|
||||
unit: {
|
||||
type: 'number',
|
||||
'x-decorator': 'FormItem',
|
||||
'x-component': 'Select',
|
||||
'x-component-props': {
|
||||
placeholder: `{{t("Unit", { ns: "${NAMESPACE}" })}}`,
|
||||
className: 'auto-width',
|
||||
allowClear: false,
|
||||
},
|
||||
enum: UnitOptions,
|
||||
default: 60_000,
|
||||
},
|
||||
duration: {
|
||||
type: 'number',
|
||||
'x-decorator': 'FormItem',
|
||||
'x-component': 'WorkflowVariableInput',
|
||||
'x-component-props': {
|
||||
placeholder: `{{t("Duration", { ns: "${NAMESPACE}" })}}`,
|
||||
useTypedConstant: [['number', { min: 1 }]],
|
||||
nullable: false,
|
||||
},
|
||||
default: 1,
|
||||
required: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
endStatus: {
|
||||
type: 'number',
|
||||
@ -94,7 +77,8 @@ export default class extends Instruction {
|
||||
},
|
||||
};
|
||||
components = {
|
||||
Duration,
|
||||
WorkflowVariableInput,
|
||||
Space,
|
||||
};
|
||||
isAvailable({ engine, workflow, upstream, branchIndex }) {
|
||||
return !engine.isWorkflowSync(workflow);
|
||||
|
@ -2,6 +2,7 @@
|
||||
"Delay": "延时",
|
||||
"Delay a period of time and then continue or exit the process. Can be used to set wait or timeout times in parallel branches.": "延时一段时间,然后继续或退出流程。可以用于并行分支中等待其他分支或设置超时时间。",
|
||||
"Duration": "时长",
|
||||
"Unit": "单位",
|
||||
"End status": "到时状态",
|
||||
"Select status": "选择状态",
|
||||
"Succeed and continue": "通过并继续",
|
||||
|
@ -74,7 +74,7 @@ export default class extends Instruction {
|
||||
schedule(job) {
|
||||
const now = new Date();
|
||||
const createdAt = Date.parse(job.createdAt);
|
||||
const delay = createdAt + job.node.config.duration - now.getTime();
|
||||
const delay = createdAt + job.result - now.getTime();
|
||||
if (delay > 0) {
|
||||
const trigger = this.trigger.bind(this, job);
|
||||
this.timers.set(job.id, setTimeout(trigger, delay));
|
||||
@ -96,9 +96,10 @@ export default class extends Instruction {
|
||||
}
|
||||
|
||||
async run(node, prevJob, processor: Processor) {
|
||||
const duration = processor.getParsedValue(node.config.duration || 1, node.id) * (node.config.unit || 1_000);
|
||||
const job = await processor.saveJob({
|
||||
status: JOB_STATUS.PENDING,
|
||||
result: null,
|
||||
result: duration,
|
||||
nodeId: node.id,
|
||||
nodeKey: node.key,
|
||||
upstreamId: prevJob?.id ?? null,
|
||||
|
@ -47,7 +47,8 @@ describe('workflow > instructions > delay', () => {
|
||||
const n1 = await workflow.createNode({
|
||||
type: 'delay',
|
||||
config: {
|
||||
duration: 2000,
|
||||
duration: 2,
|
||||
unit: 1000,
|
||||
endStatus: JOB_STATUS.RESOLVED,
|
||||
},
|
||||
});
|
||||
@ -73,7 +74,8 @@ describe('workflow > instructions > delay', () => {
|
||||
const n1 = await workflow.createNode({
|
||||
type: 'delay',
|
||||
config: {
|
||||
duration: 2000,
|
||||
duration: 2,
|
||||
unit: 1000,
|
||||
endStatus: JOB_STATUS.FAILED,
|
||||
},
|
||||
});
|
||||
@ -95,11 +97,49 @@ describe('workflow > instructions > delay', () => {
|
||||
expect(j2.status).toBe(JOB_STATUS.FAILED);
|
||||
});
|
||||
|
||||
it('duration by variable', async () => {
|
||||
const n1 = await workflow.createNode({
|
||||
type: 'echoVariable',
|
||||
config: {
|
||||
variable: 2,
|
||||
},
|
||||
});
|
||||
|
||||
const n2 = await workflow.createNode({
|
||||
type: 'delay',
|
||||
config: {
|
||||
duration: `{{$jobsMapByNodeKey.${n1.key}}}`,
|
||||
unit: 1000,
|
||||
endStatus: JOB_STATUS.RESOLVED,
|
||||
},
|
||||
upstreamId: n1.id,
|
||||
});
|
||||
|
||||
await n1.setDownstream(n2);
|
||||
|
||||
const post = await PostRepo.create({ values: { title: 't1' } });
|
||||
|
||||
await sleep(500);
|
||||
|
||||
const [e1] = await workflow.getExecutions();
|
||||
expect(e1.status).toEqual(EXECUTION_STATUS.STARTED);
|
||||
const [, j1] = await e1.getJobs({ order: [['id', 'ASC']] });
|
||||
expect(j1.status).toBe(JOB_STATUS.PENDING);
|
||||
|
||||
await sleep(2000);
|
||||
|
||||
const [e2] = await workflow.getExecutions();
|
||||
expect(e2.status).toEqual(EXECUTION_STATUS.RESOLVED);
|
||||
const [, j2] = await e2.getJobs({ order: [['id', 'ASC']] });
|
||||
expect(j2.status).toBe(JOB_STATUS.RESOLVED);
|
||||
});
|
||||
|
||||
it('delay to resolve and downstream node error', async () => {
|
||||
const n1 = await workflow.createNode({
|
||||
type: 'delay',
|
||||
config: {
|
||||
duration: 2000,
|
||||
duration: 2,
|
||||
unit: 1000,
|
||||
endStatus: JOB_STATUS.RESOLVED,
|
||||
},
|
||||
});
|
||||
@ -139,7 +179,8 @@ describe('workflow > instructions > delay', () => {
|
||||
await workflow.createNode({
|
||||
type: 'delay',
|
||||
config: {
|
||||
duration: 2000,
|
||||
duration: 2,
|
||||
unit: 1000,
|
||||
endStatus: JOB_STATUS.RESOLVED,
|
||||
},
|
||||
});
|
||||
|
@ -0,0 +1,86 @@
|
||||
/**
|
||||
* This file is part of the NocoBase (R) project.
|
||||
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
|
||||
* Authors: NocoBase Team.
|
||||
*
|
||||
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
|
||||
* For more information, please refer to: https://www.nocobase.com/agreement.
|
||||
*/
|
||||
|
||||
import { createMockServer } from '@nocobase/test';
|
||||
import Migration from '../../migrations/20250403223032-add-duration-unit';
|
||||
|
||||
describe('20250403223032-add-duration-unit', () => {
|
||||
let app;
|
||||
let migration;
|
||||
let NodeRepo;
|
||||
|
||||
beforeEach(async () => {
|
||||
app = await createMockServer({
|
||||
plugins: ['nocobase'],
|
||||
});
|
||||
await app.version.update('1.6.0');
|
||||
|
||||
migration = new Migration({ db: app.db, app } as any);
|
||||
|
||||
NodeRepo = app.db.getRepository('flow_nodes');
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await app.destroy();
|
||||
});
|
||||
|
||||
it(`duration as null`, async () => {
|
||||
const n1 = await NodeRepo.create({
|
||||
values: {
|
||||
type: 'delay',
|
||||
config: {},
|
||||
},
|
||||
});
|
||||
|
||||
await migration.up();
|
||||
|
||||
const n2 = await NodeRepo.findOne({
|
||||
filterByTk: n1.id,
|
||||
});
|
||||
expect(n2.config.duration).toBeFalsy();
|
||||
});
|
||||
|
||||
it(`duration as number (second)`, async () => {
|
||||
const n1 = await NodeRepo.create({
|
||||
values: {
|
||||
type: 'delay',
|
||||
config: {
|
||||
duration: 1000,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
await migration.up();
|
||||
|
||||
const n2 = await NodeRepo.findOne({
|
||||
filterByTk: n1.id,
|
||||
});
|
||||
expect(n2.config.duration).toBe(1);
|
||||
expect(n2.config.unit).toBe(1000);
|
||||
});
|
||||
|
||||
it(`duration as number (day)`, async () => {
|
||||
const n1 = await NodeRepo.create({
|
||||
values: {
|
||||
type: 'delay',
|
||||
config: {
|
||||
duration: 1000 * 60 * 60 * 24 * 2,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
await migration.up();
|
||||
|
||||
const n2 = await NodeRepo.findOne({
|
||||
filterByTk: n1.id,
|
||||
});
|
||||
expect(n2.config.duration).toBe(2);
|
||||
expect(n2.config.unit).toBe(1000 * 60 * 60 * 24);
|
||||
});
|
||||
});
|
@ -0,0 +1,54 @@
|
||||
/**
|
||||
* This file is part of the NocoBase (R) project.
|
||||
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
|
||||
* Authors: NocoBase Team.
|
||||
*
|
||||
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
|
||||
* For more information, please refer to: https://www.nocobase.com/agreement.
|
||||
*/
|
||||
|
||||
import { Migration } from '@nocobase/server';
|
||||
|
||||
const UnitOptions = [604800_000, 86400_000, 3600_000, 60_000, 1_000];
|
||||
|
||||
function getNumberOption(v) {
|
||||
return UnitOptions.find((item) => !(v % item));
|
||||
}
|
||||
|
||||
export default class extends Migration {
|
||||
appVersion = '<1.7.0';
|
||||
async up() {
|
||||
const { db } = this.context;
|
||||
|
||||
const NodeRepo = db.getRepository('flow_nodes');
|
||||
await db.sequelize.transaction(async (transaction) => {
|
||||
const nodes = await NodeRepo.find({
|
||||
filter: {
|
||||
type: 'delay',
|
||||
},
|
||||
transaction,
|
||||
});
|
||||
|
||||
await nodes.reduce(
|
||||
(promise, node) =>
|
||||
promise.then(async () => {
|
||||
if (node.config.unit) {
|
||||
return;
|
||||
}
|
||||
if (!node.config.duration) {
|
||||
return;
|
||||
}
|
||||
const unit = getNumberOption(node.config.duration);
|
||||
const duration = node.config.duration / unit;
|
||||
node.set('config', { ...node.config, duration, unit });
|
||||
node.changed('config', true);
|
||||
await node.save({
|
||||
silent: true,
|
||||
transaction,
|
||||
});
|
||||
}),
|
||||
Promise.resolve(),
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user