refactor(plugin-workflow): add deferred option for async workflow triggering (#4772)

* feat(plugin-workflow): add deferred option for async workflow triggering

* fix(plugin-workflow): revert type back

* fix(plugin-workflow): fix status checking
This commit is contained in:
Junyi 2024-06-28 11:59:22 +08:00 committed by GitHub
parent dc8c0b03a8
commit 86a56ef7cc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 59 additions and 8 deletions

View File

@ -42,6 +42,7 @@ type Pending = [ExecutionModel, JobModel?];
type EventOptions = { type EventOptions = {
eventKey?: string; eventKey?: string;
context?: any; context?: any;
deferred?: boolean;
[key: string]: any; [key: string]: any;
} & Transactionable; } & Transactionable;
@ -355,7 +356,7 @@ export default class PluginWorkflowServer extends Plugin {
private async triggerSync( private async triggerSync(
workflow: WorkflowModel, workflow: WorkflowModel,
context: object, context: object,
options: EventOptions = {}, { deferred, ...options }: EventOptions = {},
): Promise<Processor | null> { ): Promise<Processor | null> {
let execution; let execution;
try { try {
@ -384,6 +385,18 @@ export default class PluginWorkflowServer extends Plugin {
this.dispatch(); this.dispatch();
} }
/**
* Start a deferred execution
* @experimental
*/
public start(execution: ExecutionModel) {
if (execution.status !== EXECUTION_STATUS.STARTED) {
return;
}
this.pending.push([execution]);
this.dispatch();
}
public createProcessor(execution: ExecutionModel, options = {}): Processor { public createProcessor(execution: ExecutionModel, options = {}): Processor {
return new Processor(execution, { ...options, plugin: this }); return new Processor(execution, { ...options, plugin: this });
} }
@ -393,7 +406,7 @@ export default class PluginWorkflowServer extends Plugin {
context, context,
options: EventOptions, options: EventOptions,
): Promise<ExecutionModel | null> { ): Promise<ExecutionModel | null> {
const { transaction = await this.db.sequelize.transaction() } = options; const { transaction = await this.db.sequelize.transaction(), deferred } = options;
const trigger = this.triggers.get(workflow.type); const trigger = this.triggers.get(workflow.type);
const valid = await trigger.validateEvent(workflow, context, { ...options, transaction }); const valid = await trigger.validateEvent(workflow, context, { ...options, transaction });
if (!valid) { if (!valid) {
@ -410,7 +423,7 @@ export default class PluginWorkflowServer extends Plugin {
context, context,
key: workflow.key, key: workflow.key,
eventKey: options.eventKey ?? randomUUID(), eventKey: options.eventKey ?? randomUUID(),
status: EXECUTION_STATUS.QUEUEING, status: deferred ? EXECUTION_STATUS.STARTED : EXECUTION_STATUS.QUEUEING,
}, },
{ transaction }, { transaction },
); );
@ -468,7 +481,7 @@ export default class PluginWorkflowServer extends Plugin {
try { try {
const execution = await this.createExecution(...event); const execution = await this.createExecution(...event);
// NOTE: cache first execution for most cases // NOTE: cache first execution for most cases
if (execution && !this.executing && !this.pending.length) { if (execution?.status === EXECUTION_STATUS.QUEUEING && !this.executing && !this.pending.length) {
this.pending.push([execution]); this.pending.push([execution]);
} }
} catch (err) { } catch (err) {

View File

@ -11,7 +11,7 @@ import { MockServer } from '@nocobase/test';
import Database from '@nocobase/database'; import Database from '@nocobase/database';
import { getApp, sleep } from '@nocobase/plugin-workflow-test'; import { getApp, sleep } from '@nocobase/plugin-workflow-test';
import Plugin from '..'; import Plugin, { Processor } from '..';
import { EXECUTION_STATUS } from '../constants'; import { EXECUTION_STATUS } from '../constants';
describe('workflow > Plugin', () => { describe('workflow > Plugin', () => {
@ -19,7 +19,7 @@ describe('workflow > Plugin', () => {
let db: Database; let db: Database;
let PostRepo; let PostRepo;
let WorkflowModel; let WorkflowModel;
let plugin; let plugin: Plugin;
beforeEach(async () => { beforeEach(async () => {
app = await getApp(); app = await getApp();
@ -472,6 +472,44 @@ describe('workflow > Plugin', () => {
}); });
}); });
describe('deffered', () => {
it('deffered will not be process immediately, and can be start', async () => {
const w1 = await WorkflowModel.create({
enabled: true,
type: 'asyncTrigger',
});
plugin.trigger(w1, {}, { deferred: true });
await sleep(500);
const e1s = await w1.getExecutions();
expect(e1s.length).toBe(1);
expect(e1s[0].status).toBe(EXECUTION_STATUS.STARTED);
plugin.start(e1s[0]);
await sleep(500);
const e2s = await w1.getExecutions();
expect(e2s.length).toBe(1);
expect(e2s[0].status).toBe(EXECUTION_STATUS.RESOLVED);
});
it('sync workflow will ignore the deferred option, and start it immediately', async () => {
const w1 = await WorkflowModel.create({
enabled: true,
type: 'syncTrigger',
});
const processor = await plugin.trigger(w1, {}, { deferred: true });
const e1s = await w1.getExecutions();
expect(e1s.length).toBe(1);
expect(e1s[0].status).toBe(EXECUTION_STATUS.RESOLVED);
});
});
describe('sync', () => { describe('sync', () => {
it('sync on trigger class', async () => { it('sync on trigger class', async () => {
const w1 = await WorkflowModel.create({ const w1 = await WorkflowModel.create({
@ -479,7 +517,7 @@ describe('workflow > Plugin', () => {
type: 'syncTrigger', type: 'syncTrigger',
}); });
const processor = await plugin.trigger(w1, {}); const processor = (await plugin.trigger(w1, {})) as Processor;
const executions = await w1.getExecutions(); const executions = await w1.getExecutions();
expect(executions.length).toBe(1); expect(executions.length).toBe(1);
@ -495,7 +533,7 @@ describe('workflow > Plugin', () => {
sync: true, sync: true,
}); });
const processor = await plugin.trigger(w1, {}); const processor = (await plugin.trigger(w1, {})) as Processor;
const executions = await w1.getExecutions(); const executions = await w1.getExecutions();
expect(executions.length).toBe(1); expect(executions.length).toBe(1);