fix(plugin-workflow): fix oom when create job with unsafe integer id (#6637)

* fix(plugin-workflow): fix oom when create job with unsafe integer id

* chore(plugin-workflow): remove useless code

* chore(plugin-workflow): remove only for test case

* fix(plugin-workflow): fix test cases

* fix(plugin-workflow): fix test case

* fix(plugin-workflow): fix pg test case
This commit is contained in:
Junyi 2025-04-10 11:01:25 +08:00 committed by GitHub
parent 29d7cf3dbf
commit 5717e91a0b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 124 additions and 21 deletions

View File

@ -14,7 +14,14 @@ export default {
migrationRules: ['schema-only'], migrationRules: ['schema-only'],
name: 'jobs', name: 'jobs',
shared: true, shared: true,
autoGenId: false,
fields: [ fields: [
{
type: 'bigInt',
name: 'id',
primaryKey: true,
autoIncrement: false,
},
{ {
type: 'belongsTo', type: 'belongsTo',
name: 'execution', name: 'execution',

View File

@ -264,7 +264,10 @@ export default class Processor {
const JobsModel = this.options.plugin.db.getModel('jobs'); const JobsModel = this.options.plugin.db.getModel('jobs');
await JobsModel.bulkCreate( await JobsModel.bulkCreate(
newJobs.map((job) => job.toJSON()), newJobs.map((job) => job.toJSON()),
{ transaction: this.mainTransaction }, {
transaction: this.mainTransaction,
returning: false,
},
); );
for (const job of newJobs) { for (const job of newJobs) {
job.isNewRecord = false; job.isNewRecord = false;

View File

@ -41,27 +41,19 @@ describe('workflow > Processor', () => {
afterEach(() => app.destroy()); afterEach(() => app.destroy());
describe('base', () => { describe('base', () => {
it.skip('saveJob', async () => { it.skipIf(process.env['DB_DIALECT'] === 'sqlite')('job id out of max safe integer', async () => {
const execution = await workflow.createExecution({ const JobModel = db.getModel('jobs');
key: workflow.key,
context: {},
status: EXECUTION_STATUS.STARTED,
eventKey: '123',
});
const processor = plugin.createProcessor(execution); const records = await JobModel.bulkCreate([
{
id: '10267424896650240',
},
{
id: '10267424930204672',
},
]);
const job1 = await processor.saveJob({ expect(records.length).toBe(2);
status: JOB_STATUS.RESOLVED,
result: null,
});
const job2 = await processor.saveJob({
status: JOB_STATUS.RESOLVED,
result: 'abc',
});
expect(job2).toBeDefined();
}); });
it('empty workflow without any nodes', async () => { it('empty workflow without any nodes', async () => {

View File

@ -26,7 +26,7 @@ describe('20250320223415-stats', () => {
describe('legacy stats should be migrated', () => { describe('legacy stats should be migrated', () => {
beforeEach(async () => { beforeEach(async () => {
app = await getApp(); app = await getApp();
app.version.update('1.6.0'); await app.version.update('1.6.0');
plugin = app.pm.get(PluginWorkflowServer) as PluginWorkflowServer; plugin = app.pm.get(PluginWorkflowServer) as PluginWorkflowServer;
WorkflowRepo = app.db.getRepository('workflows'); WorkflowRepo = app.db.getRepository('workflows');
WorkflowStatsRepo = app.db.getRepository('workflowStats'); WorkflowStatsRepo = app.db.getRepository('workflowStats');

View File

@ -0,0 +1,64 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import { MockServer } from '@nocobase/test';
import { getApp } from '@nocobase/plugin-workflow-test';
import Migration from '../../migrations/20250409164913-remove-jobs-auto-increment';
describe.skipIf(process.env.DB_DIALECT === 'sqlite')('20250409164913-remove-jobs-auto-increment', () => {
let app: MockServer;
let OldCollection;
beforeEach(async () => {
app = await getApp();
await app.version.update('1.6.0');
OldCollection = app.db.collection({
name: 'jobs',
fields: [],
});
await app.db.sync({ force: true });
});
afterEach(() => app.destroy());
it.runIf(process.env.DB_DIALECT === 'postgres')('[PG] no auto increment any more', async () => {
const oldColumns = await app.db.sequelize.getQueryInterface().describeTable(OldCollection.getTableNameWithSchema());
expect(oldColumns.id.defaultValue.includes('jobs_id_seq::regclass')).toBe(true);
const JobRepo = app.db.getRepository('jobs');
const j1 = await JobRepo.create({});
expect(j1.id).toBe(1);
const migration = new Migration({ app, db: app.db } as any);
await migration.up();
const newColumns = await app.db.sequelize.getQueryInterface().describeTable(OldCollection.getTableNameWithSchema());
expect(newColumns.id.defaultValue).toBeFalsy();
await expect(async () => await JobRepo.create({})).rejects.toThrow();
});
it.runIf(['mysql', 'mariadb'].includes(process.env.DB_DIALECT))('[MySQL] no auto increment any more', async () => {
const oldColumns = await app.db.sequelize.getQueryInterface().describeTable(OldCollection.getTableNameWithSchema());
expect(oldColumns.id.autoIncrement).toBe(true);
const JobRepo = app.db.getRepository('jobs');
const j1 = await JobRepo.create({});
expect(j1.id).toBe(1);
const migration = new Migration({ app, db: app.db } as any);
await migration.up();
const newColumns = await app.db.sequelize.getQueryInterface().describeTable(OldCollection.getTableNameWithSchema());
expect(newColumns.id.autoIncrement).toBe(false);
await expect(async () => await JobRepo.create({})).rejects.toThrow();
});
});

View File

@ -0,0 +1,37 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import { Migration } from '@nocobase/server';
export default class extends Migration {
appVersion = '<1.7.0';
on = 'beforeLoad';
async up() {
const { db } = this.context;
const jobCollection = db.collection({
name: 'jobs',
});
const tableNameWithQuotes = jobCollection.getRealTableName(true);
await db.sequelize.transaction(async (transaction) => {
if (this.db.isPostgresCompatibleDialect()) {
await db.sequelize.query(`ALTER TABLE ${tableNameWithQuotes} ALTER COLUMN id DROP DEFAULT`, {
transaction,
});
return;
}
if (this.db.isMySQLCompatibleDialect()) {
await db.sequelize.query(`ALTER TABLE ${tableNameWithQuotes} MODIFY COLUMN id BIGINT`, {
transaction,
});
return;
}
});
}
}