From d574c8c7ced09dbb97f978c7e6a5774872ac0f95 Mon Sep 17 00:00:00 2001 From: Junyi Date: Sun, 26 Nov 2023 15:07:50 +0800 Subject: [PATCH] fix(plugin-workflow): fix schedule trigger (#3096) * fix(plugin-workflow): fix schedule trigger * fix(plugin-workflow): fix increment bug based on dialect --- .../plugin-workflow/src/locale/zh-CN.ts | 2 +- .../plugin-workflow/src/server/Plugin.ts | 21 +++-- .../plugin-workflow/src/server/Processor.ts | 4 +- .../src/server/__tests__/Processor.test.ts | 76 +++++++++++++++++++ .../__tests__/instructions/request.test.ts | 61 +++++++++++++-- .../src/server/triggers/schedule.ts | 74 +++++++++--------- 6 files changed, 186 insertions(+), 52 deletions(-) diff --git a/packages/plugins/@nocobase/plugin-workflow/src/locale/zh-CN.ts b/packages/plugins/@nocobase/plugin-workflow/src/locale/zh-CN.ts index 45c6584a1b..5cc730dab0 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/locale/zh-CN.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/locale/zh-CN.ts @@ -60,7 +60,7 @@ export default { 'Please select the associated fields that need to be accessed in subsequent nodes. With more than two levels of to-many associations may cause performance issue, please use with caution.': '请选中需要在后续节点中被访问的关系字段。超过两层的对多关联可能会导致性能问题,请谨慎使用。', 'Schedule event': '定时任务', - 'Scheduled job base on time conditions.': '基于时间条件的计划任务', + 'Event will be scheduled and triggered base on time conditions.': '基于时间条件进行定时触发的事件。', 'Trigger mode': '触发模式', 'Based on certain date': '自定义时间', 'Based on date field of collection': '根据数据表时间字段', diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts index ab29eaa4cf..39811b262f 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts @@ -280,14 +280,23 @@ export default class WorkflowPlugin extends Plugin { { transaction }, ); - await workflow.increment('executed', { transaction }); + await workflow.increment(['executed', 'allExecuted'], { transaction }); + // NOTE: https://sequelize.org/api/v6/class/src/model.js~model#instance-method-increment + if (this.db.options.dialect !== 'postgres') { + await workflow.reload({ transaction }); + } - await (workflow.constructor).increment('allExecuted', { - where: { - key: workflow.key, + await (workflow.constructor).update( + { + allExecuted: workflow.allExecuted, }, - transaction, - }); + { + where: { + key: workflow.key, + }, + transaction, + }, + ); execution.workflow = workflow; diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/Processor.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/Processor.ts index 73b918c199..fd8a366326 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/Processor.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/Processor.ts @@ -213,7 +213,9 @@ export default class Processor { const { instructions } = this.options.plugin; const instruction = instructions.get(node.type); if (typeof instruction.resume !== 'function') { - return Promise.reject(new Error('`resume` should be implemented')); + return Promise.reject( + new Error(`"resume" method should be implemented for [${node.type}] instruction of node (#${node.id})`), + ); } return this.exec(instruction.resume.bind(instruction), node, job); diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Processor.test.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Processor.test.ts index ba9f918bad..8608ebde81 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Processor.test.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Processor.test.ts @@ -413,6 +413,82 @@ describe('workflow > Processor', () => { expect(jobs.length).toEqual(5); }); + it('condition contains loop (target as 0)', async () => { + const n1 = await workflow.createNode({ + type: 'condition', + }); + + const n2 = await workflow.createNode({ + type: 'loop', + branchIndex: BRANCH_INDEX.ON_TRUE, + upstreamId: n1.id, + config: { + target: 0, + }, + }); + + const n3 = await workflow.createNode({ + type: 'echo', + branchIndex: 0, + upstreamId: n2.id, + }); + + const n4 = await workflow.createNode({ + type: 'echo', + upstreamId: n1.id, + }); + + await n1.setDownstream(n4); + + const post = await PostRepo.create({ values: { title: 't1' } }); + + await sleep(500); + + const [e1] = await workflow.getExecutions(); + expect(e1.status).toEqual(EXECUTION_STATUS.RESOLVED); + const jobs = await e1.getJobs({ order: [['id', 'ASC']] }); + expect(jobs.length).toBe(3); + expect(jobs[0].status).toBe(JOB_STATUS.RESOLVED); + }); + + it('condition contains loop (target as 2)', async () => { + const n1 = await workflow.createNode({ + type: 'condition', + }); + + const n2 = await workflow.createNode({ + type: 'loop', + branchIndex: BRANCH_INDEX.ON_TRUE, + upstreamId: n1.id, + config: { + target: 2, + }, + }); + + const n3 = await workflow.createNode({ + type: 'echo', + branchIndex: 0, + upstreamId: n2.id, + }); + + const n4 = await workflow.createNode({ + type: 'echo', + upstreamId: n1.id, + }); + + await n1.setDownstream(n4); + + const post = await PostRepo.create({ values: { title: 't1' } }); + + await sleep(500); + + const [e1] = await workflow.getExecutions(); + expect(e1.status).toEqual(EXECUTION_STATUS.RESOLVED); + const jobs = await e1.getJobs({ order: [['id', 'ASC']] }); + expect(jobs.length).toBe(5); + expect(jobs[0].status).toBe(JOB_STATUS.RESOLVED); + }); + it('parallel branches contains condition', async () => { const n1 = await workflow.createNode({ type: 'parallel', diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/instructions/request.test.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/instructions/request.test.ts index 116daa926f..14c830bed7 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/instructions/request.test.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/instructions/request.test.ts @@ -1,5 +1,9 @@ -import { Application, Gateway } from '@nocobase/server'; +import jwt from 'jsonwebtoken'; + +import { Gateway } from '@nocobase/server'; import Database from '@nocobase/database'; +import { MockServer } from '@nocobase/test'; + import { getApp, sleep } from '..'; import { RequestConfig } from '../../instructions/request'; import { EXECUTION_STATUS, JOB_STATUS } from '../../constants'; @@ -11,14 +15,20 @@ const URL_400 = `http://localhost:${PORT}/api/400`; const URL_TIMEOUT = `http://localhost:${PORT}/api/timeout`; describe('workflow > instructions > request', () => { - let app: Application; + let app: MockServer; let db: Database; let PostRepo; let WorkflowModel; let workflow; beforeEach(async () => { - app = await getApp({ manual: true }); + app = await getApp({ + resourcer: { + prefix: '/api', + }, + plugins: ['users', 'auth'], + manual: true, + }); app.use(async (ctx, next) => { if (ctx.path === '/api/400') { @@ -29,13 +39,14 @@ describe('workflow > instructions > request', () => { return ctx.throw(new Error('timeout')); } if (ctx.path === '/api/data') { + await sleep(100); ctx.withoutDataWrapping = true; ctx.body = { meta: { title: ctx.query.title }, data: { title: ctx.request.body['title'] }, }; } - next(); + await next(); }); Gateway.getInstance().start({ @@ -62,8 +73,8 @@ describe('workflow > instructions > request', () => { afterEach(() => app.destroy()); - describe('request', () => { - it('request', async () => { + describe('request static app routes', () => { + it('get data', async () => { await workflow.createNode({ type: 'request', config: { @@ -80,6 +91,7 @@ describe('workflow > instructions > request', () => { expect(execution.status).toEqual(EXECUTION_STATUS.RESOLVED); const [job] = await execution.getJobs(); expect(job.status).toEqual(JOB_STATUS.RESOLVED); + expect(job.result).toEqual({ meta: {}, data: {} }); }); it('request - timeout', async () => { @@ -249,4 +261,41 @@ describe('workflow > instructions > request', () => { expect(jobs[0].result).toBe(2); }); }); + + describe('request db resource', () => { + it('request db resource', async () => { + const user = await db.getRepository('users').create({}); + + const token = jwt.sign( + { + userId: typeof user.id, + }, + process.env.APP_KEY, + { + expiresIn: '1d', + }, + ); + + const n1 = await workflow.createNode({ + type: 'request', + config: { + url: `http://localhost:${PORT}/api/categories`, + method: 'POST', + headers: [{ name: 'Authorization', value: `Bearer ${token}` }], + } as RequestConfig, + }); + + await PostRepo.create({ values: { title: 't1' } }); + + await sleep(500); + + const category = await db.getRepository('categories').findOne({}); + + const [execution] = await workflow.getExecutions(); + expect(execution.status).toBe(EXECUTION_STATUS.RESOLVED); + const [job] = await execution.getJobs(); + expect(job.status).toBe(JOB_STATUS.RESOLVED); + expect(job.result.data).toMatchObject({}); + }); + }); }); diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/triggers/schedule.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/triggers/schedule.ts index 2b54ca0f08..c3834d7494 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/triggers/schedule.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/triggers/schedule.ts @@ -32,7 +32,7 @@ interface ScheduleMode { on?(this: ScheduleTrigger, workflow: WorkflowModel): void; off?(this: ScheduleTrigger, workflow: WorkflowModel): void; shouldCache(this: ScheduleTrigger, workflow: WorkflowModel, now: Date): Promise | boolean; - trigger(this: ScheduleTrigger, workflow: WorkflowModel, now: Date): any; + trigger(this: ScheduleTrigger, workflow: WorkflowModel, now: Date): Promise | number; } const ScheduleModes = new Map(); @@ -80,29 +80,31 @@ ScheduleModes.set(SCHEDULE_MODE.CONSTANT, { // NOTE: align to second start const startTime = parseDateWithoutMs(startsOn); if (!startTime || startTime > timestamp) { - return; + return 0; } if (repeat) { if (typeof repeat === 'number') { if (Math.round(timestamp - startTime) % repeat) { - return; + return 0; } } if (endsOn) { const endTime = parseDateWithoutMs(endsOn); if (!endTime || endTime < timestamp) { - return; + return 0; } } } else { if (startTime !== timestamp) { - return; + return 0; } } - return this.plugin.trigger(workflow, { date: now }); + this.plugin.trigger(workflow, { date: now }); + + return 1; }, }); @@ -233,14 +235,14 @@ ScheduleModes.set(SCHEDULE_MODE.COLLECTION_FIELD, { // when repeat is number, means repeat after startsOn // (now - startsOn) % repeat <= cacheCycle if (repeat) { - const tsFn = DialectTimestampFnMap[db.options.dialect!]; + const tsFn = DialectTimestampFnMap[db.options.dialect]; if (typeof repeat === 'number' && repeat > this.cacheCycle && tsFn) { - conditions.push( - where( - fn('MOD', literal(`${Math.round(timestamp / 1000)} - ${tsFn(startsOn.field)}`), Math.round(repeat / 1000)), - { [Op.lt]: Math.round(this.cacheCycle / 1000) }, - ), + const modExp = fn( + 'MOD', + literal(`${Math.round(timestamp / 1000)} - ${tsFn(startsOn.field)}`), + Math.round(repeat / 1000), ); + conditions.push(where(modExp, { [Op.lt]: Math.round(this.cacheCycle / 1000) })); // conditions.push(literal(`mod(${timestamp} - ${tsFn(startsOn.field)} * 1000, ${repeat}) < ${this.cacheCycle}`)); } @@ -283,7 +285,7 @@ ScheduleModes.set(SCHEDULE_MODE.COLLECTION_FIELD, { const startTimestamp = getOnTimestampWithOffset(startsOn, now); if (!startTimestamp) { - return false; + return 0; } const conditions: any[] = [ @@ -302,26 +304,26 @@ ScheduleModes.set(SCHEDULE_MODE.COLLECTION_FIELD, { }, }); - const tsFn = DialectTimestampFnMap[this.plugin.app.db.options.dialect!]; + const tsFn = DialectTimestampFnMap[this.plugin.app.db.options.dialect]; if (typeof repeat === 'number' && tsFn) { - conditions.push( - where( - fn('MOD', literal(`${Math.round(timestamp / 1000)} - ${tsFn(startsOn.field)}`), Math.round(repeat / 1000)), - { [Op.eq]: 0 }, - ), + const modExp = fn( + 'MOD', + literal(`${Math.round(timestamp / 1000)} - ${tsFn(startsOn.field)}`), + Math.round(repeat / 1000), ); + conditions.push(where(modExp, { [Op.eq]: 0 })); // conditions.push(literal(`MOD(CAST(${timestamp} AS BIGINT) - CAST((FLOOR(${tsFn(startsOn.field)}) AS BIGINT) * 1000), ${repeat}) = 0`)); } if (endsOn) { const endTimestamp = getOnTimestampWithOffset(endsOn, now); if (!endTimestamp) { - return false; + return 0; } if (typeof endsOn === 'string') { if (endTimestamp <= timestamp) { - return false; + return 0; } } else { conditions.push({ @@ -342,10 +344,15 @@ ScheduleModes.set(SCHEDULE_MODE.COLLECTION_FIELD, { const repo = this.plugin.app.db.getRepository(collection); const instances = await repo.find({ - filter: { - $and: conditions, + where: { + [Op.and]: conditions, }, appends, + ...(workflow.config.limit + ? { + limit: Math.max(workflow.config.limit - workflow.allExecuted, 0), + } + : {}), }); instances.forEach((item) => { @@ -354,6 +361,8 @@ ScheduleModes.set(SCHEDULE_MODE.COLLECTION_FIELD, { data: item.toJSON(), }); }); + + return instances.length; }, }); @@ -424,7 +433,7 @@ export default class ScheduleTrigger extends Trigger { } init() { - if (this.plugin.app.getPlugin('multi-app-share-collection').enabled && this.plugin.app.name !== 'main') { + if (this.plugin.app.getPlugin('multi-app-share-collection')?.enabled && this.plugin.app.name !== 'main') { return; } @@ -465,7 +474,6 @@ export default class ScheduleTrigger extends Trigger { async onTick(now) { // NOTE: trigger workflows in sequence when sqlite due to only one transaction const isSqlite = this.plugin.app.db.options.dialect === 'sqlite'; - return Array.from(this.cache.values()).reduce( (prev, workflow) => { if (!this.shouldTrigger(workflow, now)) { @@ -482,19 +490,9 @@ export default class ScheduleTrigger extends Trigger { } async reload() { - const WorkflowModel = this.plugin.app.db.getCollection('workflows').model; - const workflows = await WorkflowModel.findAll({ - where: { enabled: true, type: 'schedule' }, - include: [ - { - association: 'executions', - attributes: ['id', 'createdAt'], - separate: true, - limit: 1, - order: [['createdAt', 'DESC']], - }, - ], - group: ['id'], + const WorkflowRepo = this.plugin.app.db.getRepository('workflows'); + const workflows = await WorkflowRepo.find({ + filter: { enabled: true, type: 'schedule' }, }); // NOTE: clear cached jobs in last cycle