fix(plugin-workflow): fix transaction across data sources (#5818)

This commit is contained in:
Junyi 2024-12-07 14:41:59 +08:00 committed by GitHub
parent 223abab1b7
commit fb5c033524
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 17 additions and 5 deletions

View File

@ -585,7 +585,8 @@ export default class PluginWorkflowServer extends Plugin {
private async process(execution: ExecutionModel, job?: JobModel, options: Transactionable = {}): Promise<Processor> {
if (execution.status === EXECUTION_STATUS.QUEUEING) {
await execution.update({ status: EXECUTION_STATUS.STARTED }, { transaction: options.transaction });
const transaction = await this.useDataSourceTransaction('main', options.transaction);
await execution.update({ status: EXECUTION_STATUS.STARTED }, { transaction });
}
const logger = this.getLogger(execution.workflowId);
const processor = this.createProcessor(execution, options);

View File

@ -41,6 +41,11 @@ export default class Processor {
*/
transaction: Transaction;
/**
* @experimental
*/
mainTransaction: Transaction;
/**
* @experimental
*/
@ -105,13 +110,16 @@ export default class Processor {
public async prepare() {
const {
execution,
transaction,
options: { plugin },
} = this;
if (!execution.workflow) {
execution.workflow = plugin.enabledCache.get(execution.workflowId);
}
this.mainTransaction = plugin.useDataSourceTransaction('main', this.transaction);
const transaction = this.mainTransaction;
const nodes = await execution.workflow.getNodes({ transaction });
this.makeNodes(nodes);
@ -156,7 +164,7 @@ export default class Processor {
this.logger.debug(`config of node`, { data: node.config });
job = await instruction(node, prevJob, this);
if (!job) {
return null;
return this.exit();
}
} catch (err) {
// for uncaught error, set to error
@ -250,7 +258,10 @@ export default class Processor {
public async exit(s?: number) {
if (typeof s === 'number') {
const status = (<typeof Processor>this.constructor).StatusMap[s] ?? Math.sign(s);
await this.execution.update({ status }, { transaction: this.transaction });
await this.execution.update({ status }, { transaction: this.mainTransaction });
}
if (this.mainTransaction && this.mainTransaction !== this.transaction) {
await this.mainTransaction.commit();
}
this.logger.info(`execution (${this.execution.id}) exiting with status ${this.execution.status}`);
return null;
@ -262,7 +273,7 @@ export default class Processor {
*/
async saveJob(payload: JobModel | Record<string, any>): Promise<JobModel> {
const { database } = <typeof ExecutionModel>this.execution.constructor;
const { transaction } = this;
const { mainTransaction: transaction } = this;
const { model } = database.getCollection('jobs');
let job;
if (payload instanceof model) {