fix(plugin-workflow): fix duplicate triggering (#6022)

* fix(plugin-workflow): fix duplicate triggering

* fix(plugin-workflow): await executing before resume

* fix(plugin-workflow): fix failed test case
This commit is contained in:
Junyi 2025-01-10 15:03:08 +08:00 committed by GitHub
parent da24f729f2
commit a3ae8032f4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 182 additions and 48 deletions

View File

@ -160,14 +160,15 @@ export default class extends Instruction {
}) })
.finally(() => { .finally(() => {
processor.logger.debug(`request (#${node.id}) ended, resume workflow...`); processor.logger.debug(`request (#${node.id}) ended, resume workflow...`);
setImmediate(() => { setTimeout(() => {
job.execution = processor.execution;
this.workflow.resume(job); this.workflow.resume(job);
}); });
}); });
processor.logger.info(`request (#${node.id}) sent to "${config.url}", waiting for response...`); processor.logger.info(`request (#${node.id}) sent to "${config.url}", waiting for response...`);
return processor.exit(); return job;
} }
async resume(node: FlowNodeModel, job, processor: Processor) { async resume(node: FlowNodeModel, job, processor: Processor) {

View File

@ -78,6 +78,30 @@ export default {
}, },
}, },
asyncResume: {
async run(node, input, processor) {
const job = await processor.saveJob({
status: 0,
nodeId: node.id,
nodeKey: node.key,
upstreamId: input?.id ?? null,
});
setTimeout(() => {
job.set({
status: 1,
});
processor.options.plugin.resume(job);
}, 100);
return null;
},
resume(node, job, processor) {
return job;
},
},
customizedSuccess: { customizedSuccess: {
run(node, input, processor) { run(node, input, processor) {
return { return {

View File

@ -10,9 +10,10 @@
import path from 'path'; import path from 'path';
import { randomUUID } from 'crypto'; import { randomUUID } from 'crypto';
import { Transaction, Transactionable } from 'sequelize';
import LRUCache from 'lru-cache'; import LRUCache from 'lru-cache';
import { Op, Transactionable } from '@nocobase/database'; import { Op } from '@nocobase/database';
import { Plugin } from '@nocobase/server'; import { Plugin } from '@nocobase/server';
import { Registry } from '@nocobase/utils'; import { Registry } from '@nocobase/utils';
@ -284,7 +285,6 @@ export default class PluginWorkflowServer extends Plugin {
// * add all hooks for enabled workflows // * add all hooks for enabled workflows
// * add hooks for create/update[enabled]/delete workflow to add/remove specific hooks // * add hooks for create/update[enabled]/delete workflow to add/remove specific hooks
this.app.on('afterStart', async () => { this.app.on('afterStart', async () => {
this.app.setMaintainingMessage('check for not started executions');
this.ready = true; this.ready = true;
const collection = db.getCollection('workflows'); const collection = db.getCollection('workflows');
@ -297,10 +297,12 @@ export default class PluginWorkflowServer extends Plugin {
}); });
this.checker = setInterval(() => { this.checker = setInterval(() => {
this.getLogger('dispatcher').info(`(cycling) check for queueing executions`);
this.dispatch(); this.dispatch();
}, 300_000); }, 300_000);
// check for not started executions // check for queueing executions
this.getLogger('dispatcher').info('(starting) check for queueing executions');
this.dispatch(); this.dispatch();
}); });
@ -363,6 +365,15 @@ export default class PluginWorkflowServer extends Plugin {
logger.debug(`ignored event data:`, context); logger.debug(`ignored event data:`, context);
return; return;
} }
const duplicated = this.events.find(([w, c, { eventKey }]) => {
if (eventKey && options.eventKey) {
return eventKey === options.eventKey;
}
});
if (duplicated) {
logger.warn(`event of workflow ${workflow.id} is duplicated, event will be ignored`);
return;
}
// `null` means not to trigger // `null` means not to trigger
if (context == null) { if (context == null) {
logger.warn(`workflow ${workflow.id} event data context is null, event will be ignored`); logger.warn(`workflow ${workflow.id} event data context is null, event will be ignored`);
@ -381,6 +392,7 @@ export default class PluginWorkflowServer extends Plugin {
logger.debug(`event data:`, { context }); logger.debug(`event data:`, { context });
if (this.events.length > 1) { if (this.events.length > 1) {
logger.info(`new event is pending to be prepared after previous preparation is finished`);
return; return;
} }
@ -417,6 +429,9 @@ export default class PluginWorkflowServer extends Plugin {
`execution (${job.execution.id}) resuming from job (${job.id}) added to pending list`, `execution (${job.execution.id}) resuming from job (${job.id}) added to pending list`,
); );
this.pending.push([job.execution, job]); this.pending.push([job.execution, job]);
if (this.executing) {
await this.executing;
}
this.dispatch(); this.dispatch();
} }
@ -424,18 +439,17 @@ export default class PluginWorkflowServer extends Plugin {
* Start a deferred execution * Start a deferred execution
* @experimental * @experimental
*/ */
public start(execution: ExecutionModel) { public async start(execution: ExecutionModel) {
if (execution.status !== EXECUTION_STATUS.STARTED) { if (execution.status !== EXECUTION_STATUS.STARTED) {
return; return;
} }
this.pending.push([execution]); this.pending.push([execution]);
if (this.executing) {
await this.executing;
}
this.dispatch(); this.dispatch();
} }
public createProcessor(execution: ExecutionModel, options = {}): Processor {
return new Processor(execution, { ...options, plugin: this });
}
private async createExecution( private async createExecution(
workflow: WorkflowModel, workflow: WorkflowModel,
context, context,
@ -508,7 +522,7 @@ export default class PluginWorkflowServer extends Plugin {
const event = this.events.shift(); const event = this.events.shift();
this.eventsCount = this.events.length; this.eventsCount = this.events.length;
if (!event) { if (!event) {
this.getLogger('dispatcher').warn(`events queue is empty, no need to prepare`); this.getLogger('dispatcher').info(`events queue is empty, no need to prepare`);
return; return;
} }
@ -550,41 +564,61 @@ export default class PluginWorkflowServer extends Plugin {
this.executing = (async () => { this.executing = (async () => {
let next: Pending | null = null; let next: Pending | null = null;
try { // resuming has high priority
// resuming has high priority if (this.pending.length) {
if (this.pending.length) { next = this.pending.shift() as Pending;
next = this.pending.shift() as Pending; this.getLogger(next[0].workflowId).info(`pending execution (${next[0].id}) ready to process`);
this.getLogger(next[0].workflowId).info(`pending execution (${next[0].id}) ready to process`); } else {
} else { try {
const execution = (await this.db.getRepository('executions').findOne({ await this.db.sequelize.transaction(
filter: { {
status: EXECUTION_STATUS.QUEUEING, isolationLevel:
'workflow.enabled': true, this.db.options.dialect === 'sqlite' ? [][0] : Transaction.ISOLATION_LEVELS.REPEATABLE_READ,
'workflow.id': {
[Op.not]: null,
},
}, },
appends: ['workflow'], async (transaction) => {
sort: 'id', const execution = (await this.db.getRepository('executions').findOne({
})) as ExecutionModel; filter: {
if (execution) { status: EXECUTION_STATUS.QUEUEING,
this.getLogger(execution.workflowId).info(`execution (${execution.id}) fetched from db`); 'workflow.enabled': true,
next = [execution]; },
} sort: 'id',
} transaction,
if (next) { })) as ExecutionModel;
await this.process(...next); if (execution) {
} this.getLogger(execution.workflowId).info(`execution (${execution.id}) fetched from db`);
} finally { await execution.update(
this.executing = null; {
status: EXECUTION_STATUS.STARTED,
if (next) { },
this.dispatch(); { transaction },
);
execution.workflow = this.enabledCache.get(execution.workflowId);
next = [execution];
} else {
this.getLogger('dispatcher').info(`no execution in db queued to process`);
}
},
);
} catch (error) {
this.getLogger('dispatcher').error(`fetching execution from db failed: ${error.message}`, { error });
} }
} }
if (next) {
await this.process(...next);
}
this.executing = null;
if (next) {
this.getLogger('dispatcher').info(`last process finished, will do another dispatch`);
this.dispatch();
}
})(); })();
} }
public createProcessor(execution: ExecutionModel, options = {}): Processor {
return new Processor(execution, { ...options, plugin: this });
}
private async process(execution: ExecutionModel, job?: JobModel, options: Transactionable = {}): Promise<Processor> { private async process(execution: ExecutionModel, job?: JobModel, options: Transactionable = {}): Promise<Processor> {
if (execution.status === EXECUTION_STATUS.QUEUEING) { if (execution.status === EXECUTION_STATUS.QUEUEING) {
const transaction = await this.useDataSourceTransaction('main', options.transaction); const transaction = await this.useDataSourceTransaction('main', options.transaction);

View File

@ -273,7 +273,7 @@ describe('workflow > Plugin', () => {
const p1 = await PostRepo.create({ values: { title: 't1' } }); const p1 = await PostRepo.create({ values: { title: 't1' } });
await sleep(1000); await sleep(500);
const [e1] = await w1.getExecutions(); const [e1] = await w1.getExecutions();
expect(e1.status).toBe(EXECUTION_STATUS.RESOLVED); expect(e1.status).toBe(EXECUTION_STATUS.RESOLVED);
@ -299,13 +299,34 @@ describe('workflow > Plugin', () => {
const p2 = await PostRepo.create({ values: { title: 't2' } }); const p2 = await PostRepo.create({ values: { title: 't2' } });
const p3 = await PostRepo.create({ values: { title: 't3' } }); const p3 = await PostRepo.create({ values: { title: 't3' } });
await sleep(1000); await sleep(500);
const executions = await w1.getExecutions(); const executions = await w1.getExecutions();
expect(executions.length).toBe(3); expect(executions.length).toBe(3);
expect(executions.map((item) => item.status)).toEqual(Array(3).fill(EXECUTION_STATUS.RESOLVED)); expect(executions.map((item) => item.status)).toEqual(Array(3).fill(EXECUTION_STATUS.RESOLVED));
}); });
it('duplicated event trigger', async () => {
const w1 = await WorkflowModel.create({
enabled: true,
type: 'asyncTrigger',
});
const n1 = await w1.createNode({
type: 'asyncResume',
});
plugin.trigger(w1, {}, { eventKey: 'a' });
plugin.trigger(w1, {}, { eventKey: 'a' });
await sleep(1000);
const executions = await w1.getExecutions();
expect(executions.length).toBe(1);
const jobs = await executions[0].getJobs();
expect(jobs.length).toBe(1);
});
it('when server starts, process all created executions', async () => { it('when server starts, process all created executions', async () => {
const w1 = await WorkflowModel.create({ const w1 = await WorkflowModel.create({
enabled: true, enabled: true,
@ -330,6 +351,9 @@ describe('workflow > Plugin', () => {
}, },
}); });
const e1s = await w1.getExecutions();
expect(e1s.length).toBe(1);
await app.start(); await app.start();
await sleep(500); await sleep(500);

View File

@ -32,6 +32,7 @@ function consumeTime(n: number) {
describe('workflow > triggers > schedule > static mode', () => { describe('workflow > triggers > schedule > static mode', () => {
let app: MockServer; let app: MockServer;
let db: Database; let db: Database;
let plugin;
let PostRepo; let PostRepo;
let CategoryRepo; let CategoryRepo;
let WorkflowRepo; let WorkflowRepo;
@ -40,6 +41,7 @@ describe('workflow > triggers > schedule > static mode', () => {
app = await getApp(); app = await getApp();
db = app.db; db = app.db;
plugin = app.pm.get('workflow') as Plugin;
const workflow = db.getCollection('workflows'); const workflow = db.getCollection('workflows');
WorkflowRepo = workflow.repository; WorkflowRepo = workflow.repository;
PostRepo = db.getCollection('posts').repository; PostRepo = db.getCollection('posts').repository;
@ -393,11 +395,7 @@ describe('workflow > triggers > schedule > static mode', () => {
type: 'echo', type: 'echo',
}); });
(app.pm.get('workflow') as Plugin).trigger( plugin.trigger(workflow, { date: start }, { eventKey: `${workflow.id}@${start.getTime()}` });
workflow,
{ date: start },
{ eventKey: `${workflow.id}@${start.getTime()}` },
);
await sleep(3000); await sleep(3000);
@ -429,7 +427,7 @@ describe('workflow > triggers > schedule > static mode', () => {
type: 'echo', type: 'echo',
}); });
const trigger = (app.pm.get('workflow') as Plugin).triggers.get(workflow.type); const trigger = plugin.triggers.get(workflow.type);
trigger.on(workflow); trigger.on(workflow);
await sleep(3000); await sleep(3000);
@ -439,5 +437,58 @@ describe('workflow > triggers > schedule > static mode', () => {
const j1s = await e1s[0].getJobs(); const j1s = await e1s[0].getJobs();
expect(j1s.length).toBe(1); expect(j1s.length).toBe(1);
}); });
it('different workflows could trigger in same time but not duplicated for each', async () => {
await sleepToEvenSecond();
const start = new Date();
start.setMilliseconds(0);
start.setSeconds(start.getSeconds() + 2);
const w1 = await WorkflowRepo.create({
values: {
enabled: true,
type: 'schedule',
config: {
mode: 0,
startsOn: start.toISOString(),
},
},
});
const n1 = w1.createNode({
type: 'echo',
});
const w2 = await WorkflowRepo.create({
values: {
enabled: true,
type: 'schedule',
config: {
mode: 0,
startsOn: start.toISOString(),
},
},
});
const n2 = w2.createNode({
type: 'echo',
});
plugin.trigger(w1, { date: start }, { eventKey: `${w1.id}@${start.getTime()}` });
plugin.trigger(w2, { date: start }, { eventKey: `${w2.id}@${start.getTime()}` });
await sleep(3000);
const e1s = await w1.getExecutions();
expect(e1s.length).toBe(1);
const j1s = await e1s[0].getJobs();
expect(j1s.length).toBe(1);
const e2s = await w2.getExecutions();
expect(e2s.length).toBe(1);
const j2s = await e2s[0].getJobs();
expect(j2s.length).toBe(1);
});
}); });
}); });