fix(plugin-workflow): fix schedule trigger conditions and logs (#6042)

* fix(plugin-workflow): fix schedule trigger conditions and logs

* fix(plugin-workflow): fix resume dispatch
This commit is contained in:
Junyi 2025-01-12 16:11:53 +08:00 committed by GitHub
parent ab04e65e74
commit 095ee3b23c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 51 additions and 14 deletions

View File

@ -371,7 +371,7 @@ export default class PluginWorkflowServer extends Plugin {
}
});
if (duplicated) {
logger.warn(`event of workflow ${workflow.id} is duplicated, event will be ignored`);
logger.warn(`event of workflow ${workflow.id} is duplicated (${options.eventKey}), event will be ignored`);
return;
}
// `null` means not to trigger

View File

@ -192,6 +192,39 @@ describe('workflow > triggers > schedule > date field mode', () => {
expect(d2 - 4000).toBe(startTime.getTime());
});
it('starts on post.createdAt and repeat by cron with endsOn by collection field but no field configured', async () => {
await sleepToEvenSecond();
const startTime = new Date();
startTime.setMilliseconds(0);
const workflow = await WorkflowModel.create({
enabled: true,
type: 'schedule',
config: {
mode: 1,
collection: 'posts',
startsOn: {
field: 'createdAt',
},
repeat: '*/2 * * * * *',
endsOn: {},
},
});
const post = await PostRepo.create({ values: { title: 't1' } });
await sleep(5000);
const executions = await workflow.getExecutions({ order: [['createdAt', 'ASC']] });
expect(executions.length).toBe(3);
const d0 = Date.parse(executions[0].context.date);
expect(d0).toBe(startTime.getTime());
const d1 = Date.parse(executions[1].context.date);
expect(d1 - 2000).toBe(startTime.getTime());
const d2 = Date.parse(executions[2].context.date);
expect(d2 - 4000).toBe(startTime.getTime());
});
it('starts on post.createdAt and repeat by cron with endsOn at certain time', async () => {
await sleepToEvenSecond();
const startTime = new Date();

View File

@ -142,6 +142,7 @@ export default class ScheduleTrigger {
workflows.forEach(async (workflow) => {
const records = await this.loadRecordsToSchedule(workflow, now);
this.workflow.getLogger(workflow.id).info(`[Schedule on date field] ${records.length} records to schedule`);
records.forEach((record) => {
const nextTime = this.getRecordNextTime(workflow, record);
this.schedule(workflow, record, nextTime, Boolean(nextTime));
@ -157,20 +158,23 @@ export default class ScheduleTrigger {
// i. endsOn after now -> yes
// ii. endsOn before now -> no
async loadRecordsToSchedule(
{ config: { collection, limit, startsOn, repeat, endsOn }, allExecuted }: WorkflowModel,
{ id, config: { collection, limit, startsOn, repeat, endsOn }, allExecuted }: WorkflowModel,
currentDate: Date,
) {
const { dataSourceManager } = this.workflow.app;
if (limit && allExecuted >= limit) {
this.workflow.getLogger(id).warn(`[Schedule on date field] limit reached (all executed ${allExecuted})`);
return [];
}
if (!startsOn) {
this.workflow.getLogger(id).warn(`[Schedule on date field] "startsOn" is not configured`);
return [];
}
const timestamp = currentDate.getTime();
const startTimestamp = getOnTimestampWithOffset(startsOn, currentDate);
if (!startTimestamp) {
this.workflow.getLogger(id).warn(`[Schedule on date field] "startsOn.field" is not configured`);
return [];
}
@ -216,21 +220,21 @@ export default class ScheduleTrigger {
}
if (endsOn) {
const now = new Date();
const endTimestamp = getOnTimestampWithOffset(endsOn, now);
if (!endTimestamp) {
return [];
}
if (typeof endsOn === 'string') {
if (endTimestamp <= timestamp) {
if (parseDateWithoutMs(endsOn) <= timestamp) {
return [];
}
} else {
conditions.push({
[endsOn.field]: {
[Op.gte]: new Date(endTimestamp),
},
});
const endTimestamp = getOnTimestampWithOffset(endsOn, currentDate);
if (endTimestamp) {
conditions.push({
[endsOn.field]: {
[Op.gte]: new Date(endTimestamp),
},
});
} else {
this.workflow.getLogger(id).warn(`[Schedule on date field] "endsOn.field" is not configured`);
}
}
}
} else {
@ -240,7 +244,7 @@ export default class ScheduleTrigger {
},
});
}
this.workflow.getLogger(id).debug(`[Schedule on date field] conditions: `, { conditions });
return model.findAll({
where: {
[Op.and]: conditions,