From 5717e91a0bc268ae7568cfb243c58eaf0b3365df Mon Sep 17 00:00:00 2001 From: Junyi Date: Thu, 10 Apr 2025 11:01:25 +0800 Subject: [PATCH] fix(plugin-workflow): fix oom when create job with unsafe integer id (#6637) * fix(plugin-workflow): fix oom when create job with unsafe integer id * chore(plugin-workflow): remove useless code * chore(plugin-workflow): remove only for test case * fix(plugin-workflow): fix test cases * fix(plugin-workflow): fix test case * fix(plugin-workflow): fix pg test case --- .../src/common/collections/jobs.ts | 7 ++ .../plugin-workflow/src/server/Processor.ts | 5 +- .../src/server/__tests__/Processor.test.ts | 30 ++++----- .../migrations/20250320223415-stats.test.ts | 2 +- ...9164913-remove-jobs-auto-increment.test.ts | 64 +++++++++++++++++++ ...250409164913-remove-jobs-auto-increment.ts | 37 +++++++++++ 6 files changed, 124 insertions(+), 21 deletions(-) create mode 100644 packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/migrations/20250409164913-remove-jobs-auto-increment.test.ts create mode 100644 packages/plugins/@nocobase/plugin-workflow/src/server/migrations/20250409164913-remove-jobs-auto-increment.ts diff --git a/packages/plugins/@nocobase/plugin-workflow/src/common/collections/jobs.ts b/packages/plugins/@nocobase/plugin-workflow/src/common/collections/jobs.ts index 4b961d6801..9d5eb9028d 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/common/collections/jobs.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/common/collections/jobs.ts @@ -14,7 +14,14 @@ export default { migrationRules: ['schema-only'], name: 'jobs', shared: true, + autoGenId: false, fields: [ + { + type: 'bigInt', + name: 'id', + primaryKey: true, + autoIncrement: false, + }, { type: 'belongsTo', name: 'execution', diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/Processor.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/Processor.ts index ebb510e863..f140767363 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/Processor.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/Processor.ts @@ -264,7 +264,10 @@ export default class Processor { const JobsModel = this.options.plugin.db.getModel('jobs'); await JobsModel.bulkCreate( newJobs.map((job) => job.toJSON()), - { transaction: this.mainTransaction }, + { + transaction: this.mainTransaction, + returning: false, + }, ); for (const job of newJobs) { job.isNewRecord = false; diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Processor.test.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Processor.test.ts index 1acc1a0188..5a09c92452 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Processor.test.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/Processor.test.ts @@ -41,27 +41,19 @@ describe('workflow > Processor', () => { afterEach(() => app.destroy()); describe('base', () => { - it.skip('saveJob', async () => { - const execution = await workflow.createExecution({ - key: workflow.key, - context: {}, - status: EXECUTION_STATUS.STARTED, - eventKey: '123', - }); + it.skipIf(process.env['DB_DIALECT'] === 'sqlite')('job id out of max safe integer', async () => { + const JobModel = db.getModel('jobs'); - const processor = plugin.createProcessor(execution); + const records = await JobModel.bulkCreate([ + { + id: '10267424896650240', + }, + { + id: '10267424930204672', + }, + ]); - const job1 = await processor.saveJob({ - status: JOB_STATUS.RESOLVED, - result: null, - }); - - const job2 = await processor.saveJob({ - status: JOB_STATUS.RESOLVED, - result: 'abc', - }); - - expect(job2).toBeDefined(); + expect(records.length).toBe(2); }); it('empty workflow without any nodes', async () => { diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/migrations/20250320223415-stats.test.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/migrations/20250320223415-stats.test.ts index 9e373fc164..cec660e4ff 100644 --- a/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/migrations/20250320223415-stats.test.ts +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/migrations/20250320223415-stats.test.ts @@ -26,7 +26,7 @@ describe('20250320223415-stats', () => { describe('legacy stats should be migrated', () => { beforeEach(async () => { app = await getApp(); - app.version.update('1.6.0'); + await app.version.update('1.6.0'); plugin = app.pm.get(PluginWorkflowServer) as PluginWorkflowServer; WorkflowRepo = app.db.getRepository('workflows'); WorkflowStatsRepo = app.db.getRepository('workflowStats'); diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/migrations/20250409164913-remove-jobs-auto-increment.test.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/migrations/20250409164913-remove-jobs-auto-increment.test.ts new file mode 100644 index 0000000000..5710e3317a --- /dev/null +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/__tests__/migrations/20250409164913-remove-jobs-auto-increment.test.ts @@ -0,0 +1,64 @@ +/** + * This file is part of the NocoBase (R) project. + * Copyright (c) 2020-2024 NocoBase Co., Ltd. + * Authors: NocoBase Team. + * + * This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License. + * For more information, please refer to: https://www.nocobase.com/agreement. + */ + +import { MockServer } from '@nocobase/test'; +import { getApp } from '@nocobase/plugin-workflow-test'; + +import Migration from '../../migrations/20250409164913-remove-jobs-auto-increment'; + +describe.skipIf(process.env.DB_DIALECT === 'sqlite')('20250409164913-remove-jobs-auto-increment', () => { + let app: MockServer; + let OldCollection; + + beforeEach(async () => { + app = await getApp(); + await app.version.update('1.6.0'); + OldCollection = app.db.collection({ + name: 'jobs', + fields: [], + }); + await app.db.sync({ force: true }); + }); + + afterEach(() => app.destroy()); + + it.runIf(process.env.DB_DIALECT === 'postgres')('[PG] no auto increment any more', async () => { + const oldColumns = await app.db.sequelize.getQueryInterface().describeTable(OldCollection.getTableNameWithSchema()); + expect(oldColumns.id.defaultValue.includes('jobs_id_seq::regclass')).toBe(true); + + const JobRepo = app.db.getRepository('jobs'); + const j1 = await JobRepo.create({}); + expect(j1.id).toBe(1); + + const migration = new Migration({ app, db: app.db } as any); + await migration.up(); + + const newColumns = await app.db.sequelize.getQueryInterface().describeTable(OldCollection.getTableNameWithSchema()); + expect(newColumns.id.defaultValue).toBeFalsy(); + + await expect(async () => await JobRepo.create({})).rejects.toThrow(); + }); + + it.runIf(['mysql', 'mariadb'].includes(process.env.DB_DIALECT))('[MySQL] no auto increment any more', async () => { + const oldColumns = await app.db.sequelize.getQueryInterface().describeTable(OldCollection.getTableNameWithSchema()); + expect(oldColumns.id.autoIncrement).toBe(true); + + const JobRepo = app.db.getRepository('jobs'); + const j1 = await JobRepo.create({}); + expect(j1.id).toBe(1); + + const migration = new Migration({ app, db: app.db } as any); + await migration.up(); + + const newColumns = await app.db.sequelize.getQueryInterface().describeTable(OldCollection.getTableNameWithSchema()); + expect(newColumns.id.autoIncrement).toBe(false); + + await expect(async () => await JobRepo.create({})).rejects.toThrow(); + }); +}); diff --git a/packages/plugins/@nocobase/plugin-workflow/src/server/migrations/20250409164913-remove-jobs-auto-increment.ts b/packages/plugins/@nocobase/plugin-workflow/src/server/migrations/20250409164913-remove-jobs-auto-increment.ts new file mode 100644 index 0000000000..7795ba5bc5 --- /dev/null +++ b/packages/plugins/@nocobase/plugin-workflow/src/server/migrations/20250409164913-remove-jobs-auto-increment.ts @@ -0,0 +1,37 @@ +/** + * This file is part of the NocoBase (R) project. + * Copyright (c) 2020-2024 NocoBase Co., Ltd. + * Authors: NocoBase Team. + * + * This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License. + * For more information, please refer to: https://www.nocobase.com/agreement. + */ + +import { Migration } from '@nocobase/server'; + +export default class extends Migration { + appVersion = '<1.7.0'; + on = 'beforeLoad'; + async up() { + const { db } = this.context; + const jobCollection = db.collection({ + name: 'jobs', + }); + const tableNameWithQuotes = jobCollection.getRealTableName(true); + + await db.sequelize.transaction(async (transaction) => { + if (this.db.isPostgresCompatibleDialect()) { + await db.sequelize.query(`ALTER TABLE ${tableNameWithQuotes} ALTER COLUMN id DROP DEFAULT`, { + transaction, + }); + return; + } + if (this.db.isMySQLCompatibleDialect()) { + await db.sequelize.query(`ALTER TABLE ${tableNameWithQuotes} MODIFY COLUMN id BIGINT`, { + transaction, + }); + return; + } + }); + } +}