feat(plugin-workflow): add checker for intervally dispatching (#4119)

This commit is contained in:
Junyi 2024-04-21 14:23:13 +08:00 committed by GitHub
parent e25d15518e
commit 5bcaa9d11f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -45,6 +45,7 @@ export default class PluginWorkflowServer extends Plugin {
private loggerCache: LRUCache<string, Logger>;
private meter = null;
private checker: NodeJS.Timeout = null;
private onBeforeSave = async (instance: WorkflowModel, options) => {
const Model = <typeof WorkflowModel>instance.constructor;
@ -245,6 +246,10 @@ export default class PluginWorkflowServer extends Plugin {
workflows.forEach((workflow: WorkflowModel) => {
this.toggle(workflow);
});
this.checker = setInterval(() => {
this.dispatch();
}, 300_000);
});
this.app.on('afterStart', () => {
@ -271,6 +276,10 @@ export default class PluginWorkflowServer extends Plugin {
if (this.executing) {
await this.executing;
}
if (this.checker) {
clearInterval(this.checker);
}
});
}
@ -466,35 +475,37 @@ export default class PluginWorkflowServer extends Plugin {
this.executing = (async () => {
let next: Pending | null = null;
// resuming has high priority
if (this.pending.length) {
next = this.pending.shift() as Pending;
this.getLogger(next[0].workflowId).info(`pending execution (${next[0].id}) ready to process`);
} else {
const execution = (await this.db.getRepository('executions').findOne({
filter: {
status: EXECUTION_STATUS.QUEUEING,
'workflow.enabled': true,
'workflow.id': {
[Op.not]: null,
try {
// resuming has high priority
if (this.pending.length) {
next = this.pending.shift() as Pending;
this.getLogger(next[0].workflowId).info(`pending execution (${next[0].id}) ready to process`);
} else {
const execution = (await this.db.getRepository('executions').findOne({
filter: {
status: EXECUTION_STATUS.QUEUEING,
'workflow.enabled': true,
'workflow.id': {
[Op.not]: null,
},
},
},
appends: ['workflow'],
sort: 'createdAt',
})) as ExecutionModel;
if (execution) {
this.getLogger(execution.workflowId).info(`execution (${execution.id}) fetched from db`);
next = [execution];
appends: ['workflow'],
sort: 'id',
})) as ExecutionModel;
if (execution) {
this.getLogger(execution.workflowId).info(`execution (${execution.id}) fetched from db`);
next = [execution];
}
}
}
if (next) {
await this.process(...next);
}
if (next) {
await this.process(...next);
}
} finally {
this.executing = null;
this.executing = null;
if (next) {
this.dispatch();
if (next) {
this.dispatch();
}
}
})();
}