refactor: sync runner handle schema

This commit is contained in:
aaaaaajie 2025-04-14 14:43:07 +08:00
parent 89d50029bf
commit d77c332c57
6 changed files with 80 additions and 156 deletions

View File

@ -156,4 +156,6 @@ export default class MysqlQueryInterface extends QueryInterface {
nullSafe(): string {
return 'IFNULL';
}
async ensureSchema(schemaName: string): Promise<void> {}
}

View File

@ -16,7 +16,7 @@ import QueryInterface, {
RemoveColumnOptions,
TableInfo,
} from './query-interface';
import { ModelStatic, Transaction } from 'sequelize';
import { ModelStatic, Transaction, Transactionable } from 'sequelize';
export default class PostgresQueryInterface extends QueryInterface {
constructor(db) {
@ -259,4 +259,14 @@ $BODY$
nullSafe(): string {
return 'COALESCE';
}
async ensureSchema(schemaName: string, options: Transactionable): Promise<void> {
if (schemaName === 'public') {
return;
}
await this.db.sequelize.query(`CREATE SCHEMA IF NOT EXISTS "${schemaName}";`, {
raw: true,
transaction: options?.transaction,
});
}
}

View File

@ -114,6 +114,8 @@ export default abstract class QueryInterface {
// Usage: mysql => ifnull, postgres => coalesce, mssql => isnull
abstract nullSafe(): string;
abstract ensureSchema(schemaName: string, options: Transactionable): Promise<void>;
get defaultSchemaName() {
return this.config.defaultSchemaName;
}

View File

@ -161,4 +161,6 @@ export default class SqliteQueryInterface extends QueryInterface {
nullSafe(): string {
return 'IFNULL';
}
async ensureSchema(schemaName: string): Promise<void> {}
}

View File

@ -8,7 +8,7 @@
*/
import { isPlainObject } from '@nocobase/utils';
import { Op, Model as SequelizeModel } from 'sequelize';
import { Model as SequelizeModel } from 'sequelize';
import { Collection } from './collection';
import Database from './database';
import { ZeroColumnTableError } from './errors/zero-column-table-error';
@ -134,18 +134,6 @@ export class SyncRunner {
// remove primary key
if (this.database.inDialect('mariadb', 'mysql')) {
await this.sequelize.query(`ALTER TABLE ${this.collection.quotedTableName()} DROP PRIMARY KEY;`, options);
} else if (this.database.inDialect('mssql')) {
const constraintName = await this.sequelize.query(
`SELECT name FROM sys.key_constraints
WHERE type = 'PK' AND parent_object_id = OBJECT_ID('${this.collection.quotedTableName()}')`,
{ ...options, type: 'SELECT' },
);
if (constraintName?.[0] && constraintName[0]['name']) {
await this.sequelize.query(
`ALTER TABLE ${this.collection.quotedTableName()} DROP CONSTRAINT ${constraintName[0]['name']};`,
options,
);
}
}
}
}
@ -232,7 +220,6 @@ export class SyncRunner {
defaultValue: attributeDefaultValue,
};
// TODO: use dialect QueryInterface to change column default value
if (this.database.inDialect('postgres')) {
// @ts-ignore
const query = this.queryInterface.queryGenerator.attributesToSQL(
@ -279,94 +266,25 @@ export class SyncRunner {
return this.rawAttributes[key].unique == true;
});
const rebuildIndexes = [];
// remove unique index that not in model
for (const existUniqueIndex of existsUniqueIndexes) {
const isSingleField = existUniqueIndex.fields.length == 1;
if (!isSingleField) continue;
const columnName = existUniqueIndex.fields[0].attribute;
const currentAttribute = this.findAttributeByColumnName(columnName);
const needRemove = !currentAttribute || (!currentAttribute.unique && !currentAttribute.primaryKey);
if (this.database.inDialect('mssql')) {
try {
// 设置更短的查询超时时间,避免长时间阻塞
const queryOptions = {
...options,
type: 'SELECT',
timeout: 5000, // 设置5秒超时可根据实际情况调整
};
// 优化查询,只查询必要的信息
const constraintCheck = await this.sequelize.query(
`SELECT OBJECT_NAME(object_id) as name, is_unique_constraint
FROM sys.indexes WITH (NOLOCK)
WHERE object_id = OBJECT_ID('${this.collection.quotedTableName()}')
AND name = '${existUniqueIndex.name}'`,
queryOptions,
);
if (constraintCheck.length > 0) {
const isConstraint = (<any>constraintCheck)[0]['is_unique_constraint'];
// 同样使用优化的查询选项
const isPrimaryKey = await this.sequelize.query(
`SELECT 1 FROM sys.indexes WITH (NOLOCK)
WHERE object_id = OBJECT_ID('${this.collection.quotedTableName()}')
AND name = '${existUniqueIndex.name}'
AND is_primary_key = 1`,
queryOptions,
);
// 跳过主键索引
if (isPrimaryKey.length > 0) {
continue;
}
if (!needRemove) {
rebuildIndexes.push(columnName);
}
if (isConstraint) {
await this.sequelize.query(
`ALTER TABLE ${this.collection.quotedTableName()} DROP CONSTRAINT [${existUniqueIndex.name}]`,
options,
);
} else {
await this.sequelize.query(
`DROP INDEX [${existUniqueIndex.name}] ON ${this.collection.quotedTableName()};`,
options,
);
}
// 修复索引名称中的语法错误(多了一个右花括号)
await this.sequelize.query(
`
CREATE UNIQUE INDEX [${this.collection.tableName()}_${columnName}_uk]
ON ${this.tableName} ([${columnName}])
WHERE [${columnName}] IS NOT NULL;
`,
options,
);
}
} catch (error) {
// 添加错误处理,记录错误但不中断流程
console.warn(`Error handling unique index for ${existUniqueIndex.name}: ${error.message}`);
// 如果是超时错误,可以继续处理下一个索引
if (error.message.includes('Timeout')) {
continue;
}
throw error;
}
}
if (needRemove) {
if (!currentAttribute || (!currentAttribute.unique && !currentAttribute.primaryKey)) {
if (this.database.inDialect('postgres')) {
// @ts-ignore
const constraints = await this.queryInterface.showConstraint(this.tableName, existUniqueIndex.name, options);
if (constraints.some((c) => c.constraintName === existUniqueIndex.name)) {
await this.queryInterface.removeConstraint(this.tableName, existUniqueIndex.name, options);
}
} else if (this.database.inDialect('sqlite')) {
}
if (this.database.inDialect('sqlite')) {
const changeAttribute = {
...currentAttribute,
unique: false,
@ -378,34 +296,19 @@ export class SyncRunner {
}
}
}
// add unique index that not in database
for (const uniqueAttribute of uniqueAttributes) {
const field = this.rawAttributes[uniqueAttribute].field;
// 检查是否存在包含该字段的索引(包括组合索引)
const hasFieldIndex = existsUniqueIndexes.some((index) => {
return index.fields[0].attribute === field;
// check index exists or not
const indexExists = existsUniqueIndexes.find((index) => {
return index.fields.length == 1 && index.fields[0].attribute == this.rawAttributes[uniqueAttribute].field;
});
if (!hasFieldIndex) {
// 检查是否有重复数据
const duplicateCheck = await this.sequelize.query(
`SELECT COUNT(*) as count, ${field}
FROM ${this.collection.quotedTableName()}
GROUP BY ${field}
HAVING COUNT(*) > 1`,
{ ...options, type: 'SELECT' },
);
if (duplicateCheck.length > 0) {
console.warn(`Cannot create unique index on ${field} due to duplicate values`);
continue;
}
await this.queryInterface.addIndex(this.tableName, [field], {
if (!indexExists) {
await this.queryInterface.addIndex(this.tableName, [this.rawAttributes[uniqueAttribute].field], {
unique: true,
transaction: options?.transaction,
name: `${this.collection.tableName()}_${field}_uk`,
name: `${this.collection.tableName()}_${this.rawAttributes[uniqueAttribute].field}_uk`,
});
}
}
@ -469,7 +372,7 @@ export class SyncRunner {
async handleZeroColumnModel(options) {
// @ts-ignore
if (Object.keys(this.model.tableAttributes).length === 0) {
if (this.database.inDialect('sqlite', 'mysql', 'mariadb', 'postgres', 'mssql')) {
if (this.database.inDialect('sqlite', 'mysql', 'mariadb', 'postgres')) {
throw new ZeroColumnTableError(
`Zero-column tables aren't supported in ${this.database.sequelize.getDialect()}`,
);
@ -499,25 +402,6 @@ export class SyncRunner {
async handleSchema(options) {
// @ts-ignore
const _schema = this.model._schema;
if (_schema && _schema != 'public') {
if (this.database.inDialect('mssql')) {
await this.sequelize.query(
`IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = '${_schema}')
BEGIN
EXEC('CREATE SCHEMA [${_schema}]')
END`,
{
raw: true,
transaction: options?.transaction,
},
);
} else {
await this.sequelize.query(`CREATE SCHEMA IF NOT EXISTS "${_schema}";`, {
raw: true,
transaction: options?.transaction,
});
}
}
await this.database.queryInterface.ensureSchema(_schema, { transaction: options?.transaction });
}
}

View File

@ -322,7 +322,7 @@ export class UiSchemaRepository extends Repository {
if (!newSchema['properties']) {
const s = await this.model.findByPk(rootUid, { transaction });
s.set('schema', { ...s.toJSON(), ...newSchema });
await s.save({ transaction, hooks: false });
await s.update({ schema: s.schema }, { transaction, hooks: false });
await this.emitAfterSaveEvent(s, options);
if (newSchema['x-server-hooks']) {
await this.database.emitAsync(`${this.collection.name}.afterSave`, s, options);
@ -578,19 +578,22 @@ export class UiSchemaRepository extends Repository {
];
}, []);
// insert tree path
await this.database.sequelize.query(
this.sqlAdapter(
`INSERT INTO ${
this.uiSchemaTreePathTableName
} (ancestor, descendant, depth, type, async, sort) VALUES ${treePathData.map((item) => '(?)').join(',')}`,
),
{
replacements: treePathData,
type: 'insert',
transaction,
},
);
// insert tree path in chunks of 1000 rows
const chunkedData = lodash.chunk(treePathData, 1000);
for (const chunk of chunkedData) {
await this.database.sequelize.query(
this.sqlAdapter(
`INSERT INTO ${this.uiSchemaTreePathTableName} (ancestor, descendant, depth, type, async, sort) VALUES ${chunk
.map(() => '(?)')
.join(',')}`,
),
{
replacements: chunk,
type: 'insert',
transaction,
},
);
}
const rootNode = nodes[0];
if (rootNode['x-server-hooks']) {
@ -729,18 +732,29 @@ export class UiSchemaRepository extends Repository {
if (nodePosition === 'first') {
sort = 1;
let updateSql = `UPDATE ${treeTable} as TreeTable
let updateSql: string;
if (this.database.inDialect('postgres', 'sqlite')) {
`UPDATE ${treeTable} as TreeTable
SET sort = TreeTable.sort + 1
FROM ${treeTable} as NodeInfo
WHERE NodeInfo.descendant = TreeTable.descendant and NodeInfo.depth = 0
AND TreeTable.depth = 1 AND TreeTable.ancestor = :ancestor and NodeInfo.type = :type`;
}
// Compatible with mysql
if (this.database.isMySQLCompatibleDialect()) {
else if (this.database.isMySQLCompatibleDialect()) {
updateSql = `UPDATE ${treeTable} as TreeTable
JOIN ${treeTable} as NodeInfo ON (NodeInfo.descendant = TreeTable.descendant and NodeInfo.depth = 0)
SET TreeTable.sort = TreeTable.sort + 1
WHERE TreeTable.depth = 1 AND TreeTable.ancestor = :ancestor and NodeInfo.type = :type`;
} else {
updateSql = `UPDATE TreeTable
SET TreeTable.sort = TreeTable.sort + 1
FROM ${treeTable} AS TreeTable
JOIN ${treeTable} AS NodeInfo ON NodeInfo.descendant = TreeTable.descendant AND NodeInfo.depth = 0
WHERE TreeTable.depth = 1
AND TreeTable.ancestor = :ancestor
AND NodeInfo.type = :type`;
}
// move all child last index
@ -755,9 +769,7 @@ export class UiSchemaRepository extends Repository {
if (nodePosition === 'last') {
const maxSort = await db.sequelize.query(
`SELECT ${
this.database.sequelize.getDialect() === 'postgres' ? 'coalesce' : 'ifnull'
}(MAX(TreeTable.sort), 0) as maxsort FROM ${treeTable} as TreeTable
`SELECT ${this.database.queryInterface.nullSafe()}(MAX(TreeTable.sort), 0) as maxsort FROM ${treeTable} as TreeTable
LEFT JOIN ${treeTable} as NodeInfo
ON NodeInfo.descendant = TreeTable.descendant and NodeInfo.depth = 0
WHERE TreeTable.depth = 1 AND TreeTable.ancestor = :ancestor and NodeInfo.type = :type`,
@ -799,7 +811,9 @@ export class UiSchemaRepository extends Repository {
sort += 1;
}
let updateSql = `UPDATE ${treeTable} as TreeTable
let updateSql: string;
if (this.database.inDialect('postgres', 'sqlite')) {
updateSql = `UPDATE ${treeTable} as TreeTable
SET sort = TreeTable.sort + 1
FROM ${treeTable} as NodeInfo
WHERE NodeInfo.descendant = TreeTable.descendant
@ -808,12 +822,20 @@ export class UiSchemaRepository extends Repository {
AND TreeTable.ancestor = :ancestor
and TreeTable.sort >= :sort
and NodeInfo.type = :type`;
if (this.database.isMySQLCompatibleDialect()) {
} else if (this.database.isMySQLCompatibleDialect()) {
updateSql = `UPDATE ${treeTable} as TreeTable
JOIN ${treeTable} as NodeInfo ON (NodeInfo.descendant = TreeTable.descendant and NodeInfo.depth = 0)
SET TreeTable.sort = TreeTable.sort + 1
WHERE TreeTable.depth = 1 AND TreeTable.ancestor = :ancestor and TreeTable.sort >= :sort and NodeInfo.type = :type`;
} else {
updateSql = `UPDATE TreeTable
SET TreeTable.sort = TreeTable.sort + 1
FROM ${treeTable} AS TreeTable
JOIN ${treeTable} AS NodeInfo ON NodeInfo.descendant = TreeTable.descendant AND NodeInfo.depth = 0
WHERE TreeTable.depth = 1
AND TreeTable.ancestor = :ancestor
AND TreeTable.sort >= :sort
AND NodeInfo.type = :type`;
}
await db.sequelize.query(updateSql, {
@ -1089,7 +1111,7 @@ WHERE TreeTable.depth = 1 AND TreeTable.ancestor = :ancestor and TreeTable.sort
LEFT JOIN ${this.uiSchemasTableName} as "SchemaTable" ON "SchemaTable"."x-uid" = TreePath.descendant
LEFT JOIN ${this.uiSchemaTreePathTableName} as NodeInfo ON NodeInfo.descendant = "SchemaTable"."x-uid" and NodeInfo.descendant = NodeInfo.ancestor and NodeInfo.depth = 0
LEFT JOIN ${this.uiSchemaTreePathTableName} as ParentPath ON (ParentPath.descendant = "SchemaTable"."x-uid" AND ParentPath.depth = 1)
WHERE TreePath.ancestor = :ancestor AND (NodeInfo.async = false or TreePath.depth <= 1)`;
WHERE TreePath.ancestor = :ancestor AND (NodeInfo.async = ${this.database.queryInterface.booleanValues.false} or TreePath.depth <= 1)`;
const nodes = await db.sequelize.query(this.sqlAdapter(rawSql), {
replacements: {
@ -1120,7 +1142,9 @@ WHERE TreeTable.depth = 1 AND TreeTable.ancestor = :ancestor and TreeTable.sort
LEFT JOIN ${treeTable} as NodeInfo ON NodeInfo.descendant = "SchemaTable"."x-uid" and NodeInfo.descendant = NodeInfo.ancestor and NodeInfo.depth = 0
LEFT JOIN ${treeTable} as ParentPath ON (ParentPath.descendant = "SchemaTable"."x-uid" AND ParentPath.depth = 1)
WHERE TreePath.ancestor = :ancestor ${
options?.includeAsyncNode ? '' : 'AND (NodeInfo.async != true or TreePath.depth = 0)'
options?.includeAsyncNode
? ''
: `AND (NodeInfo.async != ${this.database.queryInterface.booleanValues.true} or TreePath.depth = 0)`
}
`;