fix(plugin-workflow): fix loading enabled workflows list with stats (#6642)

* fix(plugin-workflow): fix loading enabled workflows list with stats

* fix(plugin-workflow): add fallback logic to make sure workflow has stats
This commit is contained in:
Junyi 2025-04-11 08:46:14 +08:00 committed by GitHub
parent 6d435dbcf8
commit e65ff559c2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 105 additions and 77 deletions

View File

@ -113,6 +113,97 @@ export default class PluginWorkflowServer extends Plugin {
} }
}; };
private onAfterCreate = async (model: WorkflowModel, { transaction }) => {
const WorkflowStatsModel = this.db.getModel('workflowStats');
const [stats, created] = await WorkflowStatsModel.findOrCreate({
where: { key: model.key },
defaults: { key: model.key },
transaction,
});
model.stats = stats;
model.versionStats = await model.createVersionStats({ id: model.id }, { transaction });
if (model.enabled) {
this.toggle(model, true, { transaction });
}
};
private onAfterUpdate = async (model: WorkflowModel, { transaction }) => {
model.stats = await model.getStats({ transaction });
model.versionStats = await model.getVersionStats({ transaction });
this.toggle(model, model.enabled, { transaction });
};
private onAfterDestroy = async (model: WorkflowModel, { transaction }) => {
this.toggle(model, false, { transaction });
const TaskRepo = this.db.getRepository('workflowTasks');
await TaskRepo.destroy({
filter: {
workflowId: model.id,
},
transaction,
});
};
// [Life Cycle]:
// * load all workflows in db
// * add all hooks for enabled workflows
// * add hooks for create/update[enabled]/delete workflow to add/remove specific hooks
private onAfterStart = async () => {
this.ready = true;
const collection = this.db.getCollection('workflows');
const workflows = await collection.repository.find({
filter: { enabled: true },
appends: ['stats', 'versionStats'],
});
for (const workflow of workflows) {
// NOTE: workflow stats may not be created in migration (for compatibility)
if (!workflow.stats) {
workflow.stats = await workflow.createStats({ executed: 0 });
}
// NOTE: workflow stats may not be created in migration (for compatibility)
if (!workflow.versionStats) {
workflow.versionStats = await workflow.createVersionStats({ executed: 0 });
}
this.toggle(workflow, true, { silent: true });
}
this.checker = setInterval(() => {
this.getLogger('dispatcher').info(`(cycling) check for queueing executions`);
this.dispatch();
}, 300_000);
this.app.on('workflow:dispatch', () => {
this.app.logger.info('workflow:dispatch');
this.dispatch();
});
// check for queueing executions
this.getLogger('dispatcher').info('(starting) check for queueing executions');
this.dispatch();
};
private onBeforeStop = async () => {
for (const workflow of this.enabledCache.values()) {
this.toggle(workflow, false, { silent: true });
}
this.ready = false;
if (this.events.length) {
await this.prepare();
}
if (this.executing) {
await this.executing;
}
if (this.checker) {
clearInterval(this.checker);
}
};
async handleSyncMessage(message) { async handleSyncMessage(message) {
if (message.type === 'statusChange') { if (message.type === 'statusChange') {
if (message.enabled) { if (message.enabled) {
@ -289,84 +380,12 @@ export default class PluginWorkflowServer extends Plugin {
}); });
db.on('workflows.beforeSave', this.onBeforeSave); db.on('workflows.beforeSave', this.onBeforeSave);
db.on('workflows.afterCreate', async (model: WorkflowModel, { transaction }) => { db.on('workflows.afterCreate', this.onAfterCreate);
const WorkflowStatsModel = this.db.getModel('workflowStats'); db.on('workflows.afterUpdate', this.onAfterUpdate);
const [stats, created] = await WorkflowStatsModel.findOrCreate({ db.on('workflows.afterDestroy', this.onAfterDestroy);
where: { key: model.key },
defaults: { key: model.key },
transaction,
});
model.stats = stats;
model.versionStats = await model.createVersionStats({ id: model.id }, { transaction });
if (model.enabled) {
this.toggle(model, true, { transaction });
}
});
db.on('workflows.afterUpdate', async (model: WorkflowModel, { transaction }) => {
model.stats = await model.getStats({ transaction });
model.versionStats = await model.getVersionStats({ transaction });
this.toggle(model, model.enabled, { transaction });
});
db.on('workflows.afterDestroy', async (model: WorkflowModel, { transaction }) => {
this.toggle(model, false, { transaction });
const TaskRepo = this.db.getRepository('workflowTasks'); this.app.on('afterStart', this.onAfterStart);
await TaskRepo.destroy({ this.app.on('beforeStop', this.onBeforeStop);
filter: {
workflowId: model.id,
},
transaction,
});
});
// [Life Cycle]:
// * load all workflows in db
// * add all hooks for enabled workflows
// * add hooks for create/update[enabled]/delete workflow to add/remove specific hooks
this.app.on('afterStart', async () => {
this.ready = true;
const collection = db.getCollection('workflows');
const workflows = await collection.repository.find({
filter: { enabled: true },
});
workflows.forEach((workflow: WorkflowModel) => {
this.toggle(workflow, true, { silent: true });
});
this.checker = setInterval(() => {
this.getLogger('dispatcher').info(`(cycling) check for queueing executions`);
this.dispatch();
}, 300_000);
this.app.on('workflow:dispatch', () => {
this.app.logger.info('workflow:dispatch');
this.dispatch();
});
// check for queueing executions
this.getLogger('dispatcher').info('(starting) check for queueing executions');
this.dispatch();
});
this.app.on('beforeStop', async () => {
for (const workflow of this.enabledCache.values()) {
this.toggle(workflow, false, { silent: true });
}
this.ready = false;
if (this.events.length) {
await this.prepare();
}
if (this.executing) {
await this.executing;
}
if (this.checker) {
clearInterval(this.checker);
}
});
} }
private toggle( private toggle(

View File

@ -43,6 +43,11 @@ describe('workflow > Plugin', () => {
}); });
expect(workflow.current).toBe(true); expect(workflow.current).toBe(true);
expect(workflow.stats).toBeDefined();
expect(workflow.stats.executed).toBe(0);
expect(workflow.versionStats).toBeDefined();
expect(workflow.versionStats.executed).toBe(0);
}); });
it('create with disabled', async () => { it('create with disabled', async () => {
@ -358,6 +363,10 @@ describe('workflow > Plugin', () => {
await sleep(500); await sleep(500);
const w1_1 = plugin.enabledCache.get(w1.id);
expect(w1_1.stats).toBeDefined();
expect(w1_1.stats.executed).toBe(0);
await e1.reload(); await e1.reload();
expect(e1.status).toBe(EXECUTION_STATUS.RESOLVED); expect(e1.status).toBe(EXECUTION_STATUS.RESOLVED);