From d77c332c575b64351841b7358e73cf2a86b57d38 Mon Sep 17 00:00:00 2001 From: aaaaaajie Date: Mon, 14 Apr 2025 14:43:07 +0800 Subject: [PATCH] refactor: sync runner handle schema --- .../query-interface/mysql-query-interface.ts | 2 + .../postgres-query-interface.ts | 12 +- .../src/query-interface/query-interface.ts | 2 + .../query-interface/sqlite-query-interface.ts | 2 + packages/core/database/src/sync-runner.ts | 146 ++---------------- .../src/server/repository.ts | 72 ++++++--- 6 files changed, 80 insertions(+), 156 deletions(-) diff --git a/packages/core/database/src/query-interface/mysql-query-interface.ts b/packages/core/database/src/query-interface/mysql-query-interface.ts index 4e45b35f74..551ed4876d 100644 --- a/packages/core/database/src/query-interface/mysql-query-interface.ts +++ b/packages/core/database/src/query-interface/mysql-query-interface.ts @@ -156,4 +156,6 @@ export default class MysqlQueryInterface extends QueryInterface { nullSafe(): string { return 'IFNULL'; } + + async ensureSchema(schemaName: string): Promise {} } diff --git a/packages/core/database/src/query-interface/postgres-query-interface.ts b/packages/core/database/src/query-interface/postgres-query-interface.ts index d992702927..8620026c36 100644 --- a/packages/core/database/src/query-interface/postgres-query-interface.ts +++ b/packages/core/database/src/query-interface/postgres-query-interface.ts @@ -16,7 +16,7 @@ import QueryInterface, { RemoveColumnOptions, TableInfo, } from './query-interface'; -import { ModelStatic, Transaction } from 'sequelize'; +import { ModelStatic, Transaction, Transactionable } from 'sequelize'; export default class PostgresQueryInterface extends QueryInterface { constructor(db) { @@ -259,4 +259,14 @@ $BODY$ nullSafe(): string { return 'COALESCE'; } + + async ensureSchema(schemaName: string, options: Transactionable): Promise { + if (schemaName === 'public') { + return; + } + await this.db.sequelize.query(`CREATE SCHEMA IF NOT EXISTS "${schemaName}";`, { + raw: true, + transaction: options?.transaction, + }); + } } diff --git a/packages/core/database/src/query-interface/query-interface.ts b/packages/core/database/src/query-interface/query-interface.ts index 6d8c0e64ea..94a84e4c77 100644 --- a/packages/core/database/src/query-interface/query-interface.ts +++ b/packages/core/database/src/query-interface/query-interface.ts @@ -114,6 +114,8 @@ export default abstract class QueryInterface { // Usage: mysql => ifnull, postgres => coalesce, mssql => isnull abstract nullSafe(): string; + abstract ensureSchema(schemaName: string, options: Transactionable): Promise; + get defaultSchemaName() { return this.config.defaultSchemaName; } diff --git a/packages/core/database/src/query-interface/sqlite-query-interface.ts b/packages/core/database/src/query-interface/sqlite-query-interface.ts index 953cb1a0ba..725a398f0c 100644 --- a/packages/core/database/src/query-interface/sqlite-query-interface.ts +++ b/packages/core/database/src/query-interface/sqlite-query-interface.ts @@ -161,4 +161,6 @@ export default class SqliteQueryInterface extends QueryInterface { nullSafe(): string { return 'IFNULL'; } + + async ensureSchema(schemaName: string): Promise {} } diff --git a/packages/core/database/src/sync-runner.ts b/packages/core/database/src/sync-runner.ts index 9fdea89ef9..5250b88007 100644 --- a/packages/core/database/src/sync-runner.ts +++ b/packages/core/database/src/sync-runner.ts @@ -8,7 +8,7 @@ */ import { isPlainObject } from '@nocobase/utils'; -import { Op, Model as SequelizeModel } from 'sequelize'; +import { Model as SequelizeModel } from 'sequelize'; import { Collection } from './collection'; import Database from './database'; import { ZeroColumnTableError } from './errors/zero-column-table-error'; @@ -134,18 +134,6 @@ export class SyncRunner { // remove primary key if (this.database.inDialect('mariadb', 'mysql')) { await this.sequelize.query(`ALTER TABLE ${this.collection.quotedTableName()} DROP PRIMARY KEY;`, options); - } else if (this.database.inDialect('mssql')) { - const constraintName = await this.sequelize.query( - `SELECT name FROM sys.key_constraints - WHERE type = 'PK' AND parent_object_id = OBJECT_ID('${this.collection.quotedTableName()}')`, - { ...options, type: 'SELECT' }, - ); - if (constraintName?.[0] && constraintName[0]['name']) { - await this.sequelize.query( - `ALTER TABLE ${this.collection.quotedTableName()} DROP CONSTRAINT ${constraintName[0]['name']};`, - options, - ); - } } } } @@ -232,7 +220,6 @@ export class SyncRunner { defaultValue: attributeDefaultValue, }; - // TODO: use dialect QueryInterface to change column default value if (this.database.inDialect('postgres')) { // @ts-ignore const query = this.queryInterface.queryGenerator.attributesToSQL( @@ -279,94 +266,25 @@ export class SyncRunner { return this.rawAttributes[key].unique == true; }); - const rebuildIndexes = []; // remove unique index that not in model for (const existUniqueIndex of existsUniqueIndexes) { const isSingleField = existUniqueIndex.fields.length == 1; if (!isSingleField) continue; const columnName = existUniqueIndex.fields[0].attribute; + const currentAttribute = this.findAttributeByColumnName(columnName); - const needRemove = !currentAttribute || (!currentAttribute.unique && !currentAttribute.primaryKey); - if (this.database.inDialect('mssql')) { - try { - // 设置更短的查询超时时间,避免长时间阻塞 - const queryOptions = { - ...options, - type: 'SELECT', - timeout: 5000, // 设置5秒超时,可根据实际情况调整 - }; - // 优化查询,只查询必要的信息 - const constraintCheck = await this.sequelize.query( - `SELECT OBJECT_NAME(object_id) as name, is_unique_constraint - FROM sys.indexes WITH (NOLOCK) - WHERE object_id = OBJECT_ID('${this.collection.quotedTableName()}') - AND name = '${existUniqueIndex.name}'`, - queryOptions, - ); - - if (constraintCheck.length > 0) { - const isConstraint = (constraintCheck)[0]['is_unique_constraint']; - - // 同样使用优化的查询选项 - const isPrimaryKey = await this.sequelize.query( - `SELECT 1 FROM sys.indexes WITH (NOLOCK) - WHERE object_id = OBJECT_ID('${this.collection.quotedTableName()}') - AND name = '${existUniqueIndex.name}' - AND is_primary_key = 1`, - queryOptions, - ); - - // 跳过主键索引 - if (isPrimaryKey.length > 0) { - continue; - } - - if (!needRemove) { - rebuildIndexes.push(columnName); - } - - if (isConstraint) { - await this.sequelize.query( - `ALTER TABLE ${this.collection.quotedTableName()} DROP CONSTRAINT [${existUniqueIndex.name}]`, - options, - ); - } else { - await this.sequelize.query( - `DROP INDEX [${existUniqueIndex.name}] ON ${this.collection.quotedTableName()};`, - options, - ); - } - - // 修复索引名称中的语法错误(多了一个右花括号) - await this.sequelize.query( - ` - CREATE UNIQUE INDEX [${this.collection.tableName()}_${columnName}_uk] - ON ${this.tableName} ([${columnName}]) - WHERE [${columnName}] IS NOT NULL; - `, - options, - ); - } - } catch (error) { - // 添加错误处理,记录错误但不中断流程 - console.warn(`Error handling unique index for ${existUniqueIndex.name}: ${error.message}`); - // 如果是超时错误,可以继续处理下一个索引 - if (error.message.includes('Timeout')) { - continue; - } - throw error; - } - } - if (needRemove) { + if (!currentAttribute || (!currentAttribute.unique && !currentAttribute.primaryKey)) { if (this.database.inDialect('postgres')) { // @ts-ignore const constraints = await this.queryInterface.showConstraint(this.tableName, existUniqueIndex.name, options); if (constraints.some((c) => c.constraintName === existUniqueIndex.name)) { await this.queryInterface.removeConstraint(this.tableName, existUniqueIndex.name, options); } - } else if (this.database.inDialect('sqlite')) { + } + + if (this.database.inDialect('sqlite')) { const changeAttribute = { ...currentAttribute, unique: false, @@ -378,34 +296,19 @@ export class SyncRunner { } } } + // add unique index that not in database for (const uniqueAttribute of uniqueAttributes) { - const field = this.rawAttributes[uniqueAttribute].field; - - // 检查是否存在包含该字段的索引(包括组合索引) - const hasFieldIndex = existsUniqueIndexes.some((index) => { - return index.fields[0].attribute === field; + // check index exists or not + const indexExists = existsUniqueIndexes.find((index) => { + return index.fields.length == 1 && index.fields[0].attribute == this.rawAttributes[uniqueAttribute].field; }); - if (!hasFieldIndex) { - // 检查是否有重复数据 - const duplicateCheck = await this.sequelize.query( - `SELECT COUNT(*) as count, ${field} - FROM ${this.collection.quotedTableName()} - GROUP BY ${field} - HAVING COUNT(*) > 1`, - { ...options, type: 'SELECT' }, - ); - - if (duplicateCheck.length > 0) { - console.warn(`Cannot create unique index on ${field} due to duplicate values`); - continue; - } - - await this.queryInterface.addIndex(this.tableName, [field], { + if (!indexExists) { + await this.queryInterface.addIndex(this.tableName, [this.rawAttributes[uniqueAttribute].field], { unique: true, transaction: options?.transaction, - name: `${this.collection.tableName()}_${field}_uk`, + name: `${this.collection.tableName()}_${this.rawAttributes[uniqueAttribute].field}_uk`, }); } } @@ -469,7 +372,7 @@ export class SyncRunner { async handleZeroColumnModel(options) { // @ts-ignore if (Object.keys(this.model.tableAttributes).length === 0) { - if (this.database.inDialect('sqlite', 'mysql', 'mariadb', 'postgres', 'mssql')) { + if (this.database.inDialect('sqlite', 'mysql', 'mariadb', 'postgres')) { throw new ZeroColumnTableError( `Zero-column tables aren't supported in ${this.database.sequelize.getDialect()}`, ); @@ -499,25 +402,6 @@ export class SyncRunner { async handleSchema(options) { // @ts-ignore const _schema = this.model._schema; - - if (_schema && _schema != 'public') { - if (this.database.inDialect('mssql')) { - await this.sequelize.query( - `IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = '${_schema}') - BEGIN - EXEC('CREATE SCHEMA [${_schema}]') - END`, - { - raw: true, - transaction: options?.transaction, - }, - ); - } else { - await this.sequelize.query(`CREATE SCHEMA IF NOT EXISTS "${_schema}";`, { - raw: true, - transaction: options?.transaction, - }); - } - } + await this.database.queryInterface.ensureSchema(_schema, { transaction: options?.transaction }); } } diff --git a/packages/plugins/@nocobase/plugin-ui-schema-storage/src/server/repository.ts b/packages/plugins/@nocobase/plugin-ui-schema-storage/src/server/repository.ts index 17a3821d72..92862dee53 100644 --- a/packages/plugins/@nocobase/plugin-ui-schema-storage/src/server/repository.ts +++ b/packages/plugins/@nocobase/plugin-ui-schema-storage/src/server/repository.ts @@ -322,7 +322,7 @@ export class UiSchemaRepository extends Repository { if (!newSchema['properties']) { const s = await this.model.findByPk(rootUid, { transaction }); s.set('schema', { ...s.toJSON(), ...newSchema }); - await s.save({ transaction, hooks: false }); + await s.update({ schema: s.schema }, { transaction, hooks: false }); await this.emitAfterSaveEvent(s, options); if (newSchema['x-server-hooks']) { await this.database.emitAsync(`${this.collection.name}.afterSave`, s, options); @@ -578,19 +578,22 @@ export class UiSchemaRepository extends Repository { ]; }, []); - // insert tree path - await this.database.sequelize.query( - this.sqlAdapter( - `INSERT INTO ${ - this.uiSchemaTreePathTableName - } (ancestor, descendant, depth, type, async, sort) VALUES ${treePathData.map((item) => '(?)').join(',')}`, - ), - { - replacements: treePathData, - type: 'insert', - transaction, - }, - ); + // insert tree path in chunks of 1000 rows + const chunkedData = lodash.chunk(treePathData, 1000); + for (const chunk of chunkedData) { + await this.database.sequelize.query( + this.sqlAdapter( + `INSERT INTO ${this.uiSchemaTreePathTableName} (ancestor, descendant, depth, type, async, sort) VALUES ${chunk + .map(() => '(?)') + .join(',')}`, + ), + { + replacements: chunk, + type: 'insert', + transaction, + }, + ); + } const rootNode = nodes[0]; if (rootNode['x-server-hooks']) { @@ -729,18 +732,29 @@ export class UiSchemaRepository extends Repository { if (nodePosition === 'first') { sort = 1; - let updateSql = `UPDATE ${treeTable} as TreeTable + let updateSql: string; + if (this.database.inDialect('postgres', 'sqlite')) { + `UPDATE ${treeTable} as TreeTable SET sort = TreeTable.sort + 1 FROM ${treeTable} as NodeInfo WHERE NodeInfo.descendant = TreeTable.descendant and NodeInfo.depth = 0 AND TreeTable.depth = 1 AND TreeTable.ancestor = :ancestor and NodeInfo.type = :type`; + } // Compatible with mysql - if (this.database.isMySQLCompatibleDialect()) { + else if (this.database.isMySQLCompatibleDialect()) { updateSql = `UPDATE ${treeTable} as TreeTable JOIN ${treeTable} as NodeInfo ON (NodeInfo.descendant = TreeTable.descendant and NodeInfo.depth = 0) SET TreeTable.sort = TreeTable.sort + 1 WHERE TreeTable.depth = 1 AND TreeTable.ancestor = :ancestor and NodeInfo.type = :type`; + } else { + updateSql = `UPDATE TreeTable + SET TreeTable.sort = TreeTable.sort + 1 + FROM ${treeTable} AS TreeTable + JOIN ${treeTable} AS NodeInfo ON NodeInfo.descendant = TreeTable.descendant AND NodeInfo.depth = 0 + WHERE TreeTable.depth = 1 + AND TreeTable.ancestor = :ancestor + AND NodeInfo.type = :type`; } // move all child last index @@ -755,9 +769,7 @@ export class UiSchemaRepository extends Repository { if (nodePosition === 'last') { const maxSort = await db.sequelize.query( - `SELECT ${ - this.database.sequelize.getDialect() === 'postgres' ? 'coalesce' : 'ifnull' - }(MAX(TreeTable.sort), 0) as maxsort FROM ${treeTable} as TreeTable + `SELECT ${this.database.queryInterface.nullSafe()}(MAX(TreeTable.sort), 0) as maxsort FROM ${treeTable} as TreeTable LEFT JOIN ${treeTable} as NodeInfo ON NodeInfo.descendant = TreeTable.descendant and NodeInfo.depth = 0 WHERE TreeTable.depth = 1 AND TreeTable.ancestor = :ancestor and NodeInfo.type = :type`, @@ -799,7 +811,9 @@ export class UiSchemaRepository extends Repository { sort += 1; } - let updateSql = `UPDATE ${treeTable} as TreeTable + let updateSql: string; + if (this.database.inDialect('postgres', 'sqlite')) { + updateSql = `UPDATE ${treeTable} as TreeTable SET sort = TreeTable.sort + 1 FROM ${treeTable} as NodeInfo WHERE NodeInfo.descendant = TreeTable.descendant @@ -808,12 +822,20 @@ export class UiSchemaRepository extends Repository { AND TreeTable.ancestor = :ancestor and TreeTable.sort >= :sort and NodeInfo.type = :type`; - - if (this.database.isMySQLCompatibleDialect()) { + } else if (this.database.isMySQLCompatibleDialect()) { updateSql = `UPDATE ${treeTable} as TreeTable JOIN ${treeTable} as NodeInfo ON (NodeInfo.descendant = TreeTable.descendant and NodeInfo.depth = 0) SET TreeTable.sort = TreeTable.sort + 1 WHERE TreeTable.depth = 1 AND TreeTable.ancestor = :ancestor and TreeTable.sort >= :sort and NodeInfo.type = :type`; + } else { + updateSql = `UPDATE TreeTable + SET TreeTable.sort = TreeTable.sort + 1 + FROM ${treeTable} AS TreeTable + JOIN ${treeTable} AS NodeInfo ON NodeInfo.descendant = TreeTable.descendant AND NodeInfo.depth = 0 + WHERE TreeTable.depth = 1 + AND TreeTable.ancestor = :ancestor + AND TreeTable.sort >= :sort + AND NodeInfo.type = :type`; } await db.sequelize.query(updateSql, { @@ -1089,7 +1111,7 @@ WHERE TreeTable.depth = 1 AND TreeTable.ancestor = :ancestor and TreeTable.sort LEFT JOIN ${this.uiSchemasTableName} as "SchemaTable" ON "SchemaTable"."x-uid" = TreePath.descendant LEFT JOIN ${this.uiSchemaTreePathTableName} as NodeInfo ON NodeInfo.descendant = "SchemaTable"."x-uid" and NodeInfo.descendant = NodeInfo.ancestor and NodeInfo.depth = 0 LEFT JOIN ${this.uiSchemaTreePathTableName} as ParentPath ON (ParentPath.descendant = "SchemaTable"."x-uid" AND ParentPath.depth = 1) - WHERE TreePath.ancestor = :ancestor AND (NodeInfo.async = false or TreePath.depth <= 1)`; + WHERE TreePath.ancestor = :ancestor AND (NodeInfo.async = ${this.database.queryInterface.booleanValues.false} or TreePath.depth <= 1)`; const nodes = await db.sequelize.query(this.sqlAdapter(rawSql), { replacements: { @@ -1120,7 +1142,9 @@ WHERE TreeTable.depth = 1 AND TreeTable.ancestor = :ancestor and TreeTable.sort LEFT JOIN ${treeTable} as NodeInfo ON NodeInfo.descendant = "SchemaTable"."x-uid" and NodeInfo.descendant = NodeInfo.ancestor and NodeInfo.depth = 0 LEFT JOIN ${treeTable} as ParentPath ON (ParentPath.descendant = "SchemaTable"."x-uid" AND ParentPath.depth = 1) WHERE TreePath.ancestor = :ancestor ${ - options?.includeAsyncNode ? '' : 'AND (NodeInfo.async != true or TreePath.depth = 0)' + options?.includeAsyncNode + ? '' + : `AND (NodeInfo.async != ${this.database.queryInterface.booleanValues.true} or TreePath.depth = 0)` } `;