From aa8bc65be860386edad2d3778b710041f4cb84ee Mon Sep 17 00:00:00 2001 From: Junyi Date: Sat, 21 Dec 2024 09:50:00 +0800 Subject: [PATCH] fix(plugin-workflow): fix transactions between data sources (#5916) --- .../src/server/AggregateInstruction.ts | 2 +- .../src/server/__tests__/instruction.test.ts | 33 +++++++++++++++++++ .../plugin-workflow/src/server/Plugin.ts | 10 +++--- 3 files changed, 40 insertions(+), 5 deletions(-) diff --git a/packages/plugins/@nocobase/plugin-workflow-aggregate/src/server/AggregateInstruction.ts b/packages/plugins/@nocobase/plugin-workflow-aggregate/src/server/AggregateInstruction.ts index 792ac52e26..fb7d665c92 100644 --- a/packages/plugins/@nocobase/plugin-workflow-aggregate/src/server/AggregateInstruction.ts +++ b/packages/plugins/@nocobase/plugin-workflow-aggregate/src/server/AggregateInstruction.ts @@ -43,7 +43,7 @@ export default class extends Instruction { const result = await repo.aggregate({ ...options, method: aggregators[aggregator], - // transaction: processor.transaction, + transaction: this.workflow.useDataSourceTransaction(dataSourceName, processor.transaction), }); return { diff --git a/packages/plugins/@nocobase/plugin-workflow-aggregate/src/server/__tests__/instruction.test.ts b/packages/plugins/@nocobase/plugin-workflow-aggregate/src/server/__tests__/instruction.test.ts index 0efcc8bfcd..b2e76cf15d 100644 --- a/packages/plugins/@nocobase/plugin-workflow-aggregate/src/server/__tests__/instruction.test.ts +++ b/packages/plugins/@nocobase/plugin-workflow-aggregate/src/server/__tests__/instruction.test.ts @@ -333,5 +333,38 @@ describe('workflow > instructions > aggregate', () => { const [job] = await execution.getJobs(); expect(job.result).toBe(1); }); + + it('transaction in sync workflow', async () => { + PostRepo = app.dataSourceManager.dataSources.get('another').collectionManager.getRepository('posts'); + + const w1 = await WorkflowModel.create({ + enabled: true, + type: 'collection', + sync: true, + config: { + mode: 1, + collection: 'another:posts', + }, + }); + + const n1 = await w1.createNode({ + type: 'aggregate', + config: { + collection: 'another:posts', + aggregator: 'count', + params: { + field: 'id', + }, + }, + }); + + const p1 = await PostRepo.create({ values: { title: 't1' } }); + + const e1s = await w1.getExecutions(); + expect(e1s.length).toBe(1); + expect(e1s[0].status).toBe(EXECUTION_STATUS.RESOLVED); + const [job] = await e1s[0].getJobs(); + expect(job.result).toBe(1); + }); }); }); diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts index ddf5b3d69e..b273e034c2 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/Plugin.ts @@ -441,11 +441,13 @@ export default class PluginWorkflowServer extends Plugin { context, options: EventOptions, ): Promise { - const { transaction = await this.db.sequelize.transaction(), deferred } = options; + const { deferred } = options; + const transaction = await this.useDataSourceTransaction('main', options.transaction, true); + const sameTransaction = options.transaction === transaction; const trigger = this.triggers.get(workflow.type); const valid = await trigger.validateEvent(workflow, context, { ...options, transaction }); if (!valid) { - if (!options.transaction) { + if (!sameTransaction) { await transaction.commit(); } return null; @@ -463,7 +465,7 @@ export default class PluginWorkflowServer extends Plugin { { transaction }, ); } catch (err) { - if (!options.transaction) { + if (!sameTransaction) { await transaction.rollback(); } throw err; @@ -489,7 +491,7 @@ export default class PluginWorkflowServer extends Plugin { }, ); - if (!options.transaction) { + if (!sameTransaction) { await transaction.commit(); }