refactor(plugin-workflow): improve code of schedule trigger (#6589)

This commit is contained in:
Junyi 2025-04-01 08:34:06 +08:00 committed by GitHub
parent 4ef6b9037d
commit ce28c68018
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 83 additions and 69 deletions

View File

@ -42,7 +42,7 @@ function getRepeatTypeValue(v) {
return 'none'; return 'none';
} }
function CommonRepeatField({ value, onChange }) { function CommonRepeatField({ value, onChange, disabled }) {
const { t } = useWorkflowTranslation(); const { t } = useWorkflowTranslation();
const option = getNumberOption(value); const option = getNumberOption(value);
@ -59,11 +59,12 @@ function CommonRepeatField({ value, onChange }) {
addonBefore={t('Every')} addonBefore={t('Every')}
addonAfter={t(option.unitText)} addonAfter={t(option.unitText)}
className="auto-width" className="auto-width"
disabled={disabled}
/> />
); );
} }
export function RepeatField({ value = null, onChange }) { export function RepeatField({ value = null, onChange, disabled }) {
const { t } = useWorkflowTranslation(); const { t } = useWorkflowTranslation();
const typeValue = getRepeatTypeValue(value); const typeValue = getRepeatTypeValue(value);
const onTypeChange = useCallback( const onTypeChange = useCallback(
@ -114,20 +115,23 @@ export function RepeatField({ value = null, onChange }) {
} }
`} `}
> >
<Select value={typeValue} onChange={onTypeChange} className="auto-width"> <Select value={typeValue} onChange={onTypeChange} className="auto-width" disabled={disabled}>
{RepeatOptions.map((item) => ( {RepeatOptions.map((item) => (
<Select.Option key={item.value} value={item.value}> <Select.Option key={item.value} value={item.value}>
{t(item.text)} {t(item.text)}
</Select.Option> </Select.Option>
))} ))}
</Select> </Select>
{typeof typeValue === 'number' ? <CommonRepeatField value={value} onChange={onChange} /> : null} {typeof typeValue === 'number' ? (
<CommonRepeatField value={value} onChange={onChange} disabled={disabled} />
) : null}
{typeValue === 'cron' ? ( {typeValue === 'cron' ? (
<Cron <Cron
value={value.trim().split(/\s+/).slice(1).join(' ')} value={value.trim().split(/\s+/).slice(1).join(' ')}
setValue={(v) => onChange(`0 ${v}`)} setValue={(v) => onChange(`0 ${v}`)}
clearButton={false} clearButton={false}
locale={window['cronLocale']} locale={window['cronLocale']}
disabled={disabled}
/> />
) : null} ) : null}
</fieldset> </fieldset>

View File

@ -366,11 +366,16 @@ export default class PluginWorkflowServer extends Plugin {
const prev = workflow.previous(); const prev = workflow.previous();
if (prev.config) { if (prev.config) {
trigger.off({ ...workflow.get(), ...prev }); trigger.off({ ...workflow.get(), ...prev });
this.getLogger(workflow.id).info(`toggle OFF workflow ${workflow.id} based on configuration before updated`);
} }
trigger.on(workflow); trigger.on(workflow);
this.getLogger(workflow.id).info(`toggle ON workflow ${workflow.id}`);
this.enabledCache.set(workflow.id, workflow); this.enabledCache.set(workflow.id, workflow);
} else { } else {
trigger.off(workflow); trigger.off(workflow);
this.getLogger(workflow.id).info(`toggle OFF workflow ${workflow.id}`);
this.enabledCache.delete(workflow.id); this.enabledCache.delete(workflow.id);
} }
if (!silent) { if (!silent) {

View File

@ -104,50 +104,54 @@ export default class DateFieldScheduleTrigger {
// caching workflows in range, default to 5min // caching workflows in range, default to 5min
cacheCycle = 300_000; cacheCycle = 300_000;
onAfterStart = () => {
if (this.timer) {
return;
}
this.timer = setInterval(() => this.reload(), this.cacheCycle);
this.reload();
};
onBeforeStop = () => {
if (this.timer) {
clearInterval(this.timer);
}
for (const [key, timer] of this.cache.entries()) {
clearTimeout(timer);
this.cache.delete(key);
}
};
constructor(public workflow: Plugin) { constructor(public workflow: Plugin) {
workflow.app.on('afterStart', async () => { workflow.app.on('afterStart', this.onAfterStart);
if (this.timer) { workflow.app.on('beforeStop', this.onBeforeStop);
return;
}
this.timer = setInterval(() => this.reload(), this.cacheCycle);
this.reload();
});
workflow.app.on('beforeStop', () => {
if (this.timer) {
clearInterval(this.timer);
}
for (const [key, timer] of this.cache.entries()) {
clearTimeout(timer);
this.cache.delete(key);
}
});
} }
async reload() { reload() {
for (const [key, timer] of this.cache.entries()) {
clearTimeout(timer);
this.cache.delete(key);
}
const workflows = Array.from(this.workflow.enabledCache.values()).filter( const workflows = Array.from(this.workflow.enabledCache.values()).filter(
(item) => item.type === 'schedule' && item.config.mode === SCHEDULE_MODE.DATE_FIELD, (item) => item.type === 'schedule' && item.config.mode === SCHEDULE_MODE.DATE_FIELD,
); );
// NOTE: clear cached jobs in last cycle workflows.forEach((workflow) => {
this.cache = new Map(); this.inspect(workflow);
});
this.inspect(workflows);
} }
inspect(workflows: WorkflowModel[]) { async inspect(workflow: WorkflowModel) {
const now = new Date(); const now = new Date();
const records = await this.loadRecordsToSchedule(workflow, now);
workflows.forEach(async (workflow) => { this.workflow.getLogger(workflow.id).info(`[Schedule on date field] ${records.length} records to schedule`);
const records = await this.loadRecordsToSchedule(workflow, now); records.forEach((record) => {
this.workflow.getLogger(workflow.id).info(`[Schedule on date field] ${records.length} records to schedule`); const nextTime = this.getRecordNextTime(workflow, record);
records.forEach((record) => { this.schedule(workflow, record, nextTime, Boolean(nextTime));
const nextTime = this.getRecordNextTime(workflow, record);
this.schedule(workflow, record, nextTime, Boolean(nextTime));
});
}); });
} }
@ -233,8 +237,6 @@ export default class DateFieldScheduleTrigger {
[Op.gte]: new Date(endTimestamp), [Op.gte]: new Date(endTimestamp),
}, },
}); });
} else {
this.workflow.getLogger(id).warn(`[Schedule on date field] "endsOn.field" is not configured`);
} }
} }
} }
@ -367,7 +369,7 @@ export default class DateFieldScheduleTrigger {
} }
on(workflow: WorkflowModel) { on(workflow: WorkflowModel) {
this.inspect([workflow]); this.inspect(workflow);
const { collection } = workflow.config; const { collection } = workflow.config;
const [dataSourceName, collectionName] = parseCollectionName(collection); const [dataSourceName, collectionName] = parseCollectionName(collection);

View File

@ -18,36 +18,39 @@ const MAX_SAFE_INTERVAL = 2147483647;
export default class StaticScheduleTrigger { export default class StaticScheduleTrigger {
private timers: Map<string, NodeJS.Timeout | null> = new Map(); private timers: Map<string, NodeJS.Timeout | null> = new Map();
constructor(public workflow: Plugin) { onAfterStart = () => {
workflow.app.on('afterStart', async () => { const workflows = Array.from(this.workflow.enabledCache.values()).filter(
const workflows = Array.from(this.workflow.enabledCache.values()).filter( (item) => item.type === 'schedule' && item.config.mode === SCHEDULE_MODE.STATIC,
(item) => item.type === 'schedule' && item.config.mode === SCHEDULE_MODE.STATIC, );
);
this.inspect(workflows);
});
workflow.app.on('beforeStop', () => {
for (const timer of this.timers.values()) {
clearInterval(timer);
}
});
}
inspect(workflows: WorkflowModel[]) {
const now = new Date();
workflows.forEach((workflow) => { workflows.forEach((workflow) => {
const nextTime = this.getNextTime(workflow, now); this.inspect(workflow);
if (nextTime) {
this.workflow
.getLogger(workflow.id)
.info(`caching scheduled workflow will run at: ${new Date(nextTime).toISOString()}`);
} else {
this.workflow.getLogger(workflow.id).info('workflow will not be scheduled');
}
this.schedule(workflow, nextTime, nextTime >= now.getTime());
}); });
};
onBeforeStop = () => {
for (const timer of this.timers.values()) {
clearInterval(timer);
}
};
constructor(public workflow: Plugin) {
workflow.app.on('afterStart', this.onAfterStart);
workflow.app.on('beforeStop', this.onBeforeStop);
}
inspect(workflow: WorkflowModel) {
const now = new Date();
const nextTime = this.getNextTime(workflow, now);
if (nextTime) {
this.workflow
.getLogger(workflow.id)
.info(`caching scheduled workflow will run at: ${new Date(nextTime).toISOString()}`);
} else {
this.workflow.getLogger(workflow.id).info('workflow will not be scheduled');
}
this.schedule(workflow, nextTime, nextTime >= now.getTime());
} }
getNextTime({ config, allExecuted }: WorkflowModel, currentDate: Date, nextSecond = false) { getNextTime({ config, allExecuted }: WorkflowModel, currentDate: Date, nextSecond = false) {
@ -130,7 +133,7 @@ export default class StaticScheduleTrigger {
} }
on(workflow) { on(workflow) {
this.inspect([workflow]); this.inspect(workflow);
} }
off(workflow) { off(workflow) {