Junyi 02c3a2be25
feat(server): add lock manager to server (for testing on dev) (#5144)
* feat: pub/sub manager

* fix: test case

* fix: test error

* fix: test error

* feat: skip self

* feat: debounce

* feat: improve code

* fix: test error

* feat: test cases

* feat: test cases

* fix: improve code

* fix: improve code

* feat: improve code

* fix: improve code

* fix: test case

* fix: typo

* fix: createPubSubManager

* fix: delete messageHandlers

* fix: test case

* feat: improve code

* fix: test error

* fix: test error

* refactor(server): adapt to new api and fix test

* fix(plugin-data-source-main): fix changed api

* fix: test error

* fix: remove sync-manager test case

* chore(server): remove legacy code

* fix(plugin-workflow): fix send sync message with transaction

* chore(server): remove legacy code

* chore(server): remove legacy code

* fix(plugin-workflow): fix test case

* fix(plugin-workflow): fix test case

* test(server): test skip-install parameter in cluster

* test(server): avoid multiple installation in cluster

* test(server): installation in cluster

* feat: sync collection using sync manager (#4920)

* chore: sync collection message

* chore: sync acl

* fix: typo

* chore: sync data source

* chore: remove collection

* fix: typo

* fix: test

* chore: sync sub app event

* chore: sync collection test

* chore: collection test

* chore: test

* chore: data source sync message

* chore: sync multi app

* chore: test

* chore: test

* chore: test

* chore: test

* chore: test

* chore: error message

* fix(server): add type and remove log

* fix(server): not to publish when adpater is not connected

* refactor(server): refine types

* chore: timeout

* fix(server): fix pubSubManager options

* test(ci): test ci checkout

* feat(server): add lock manager to server

* feat: update ci

* feat: pub/sub manager (#4933)

* feat: pub/sub manager

* fix: test case

* fix: test error

* fix: test error

* feat: skip self

* feat: debounce

* feat: improve code

* fix: test error

* feat: test cases

* feat: test cases

* fix: improve code

* fix: improve code

* feat: improve code

* fix: improve code

* fix: test case

* fix: typo

* fix: createPubSubManager

* fix: delete messageHandlers

* fix: test case

* feat: improve code

* fix: test error

* fix: test error

* refactor(server): adapt to new api and fix test

* fix(plugin-data-source-main): fix changed api

* fix: test error

* fix: remove sync-manager test case

* chore(server): remove legacy code

* fix(plugin-workflow): fix send sync message with transaction

* chore(server): remove legacy code

* chore(server): remove legacy code

* fix(plugin-workflow): fix test case

* fix(plugin-workflow): fix test case

* test(server): test skip-install parameter in cluster

* test(server): avoid multiple installation in cluster

* test(server): installation in cluster

* feat: sync collection using sync manager (#4920)

* chore: sync collection message

* chore: sync acl

* fix: typo

* chore: sync data source

* chore: remove collection

* fix: typo

* fix: test

* chore: sync sub app event

* chore: sync collection test

* chore: collection test

* chore: test

* chore: data source sync message

* chore: sync multi app

* chore: test

* chore: test

* chore: test

* chore: test

* chore: test

* chore: error message

* fix(server): add type and remove log

* fix(server): not to publish when adpater is not connected

* refactor(server): refine types

* chore: timeout

* fix(server): fix pubSubManager options

* test(ci): test ci checkout

---------

Co-authored-by: mytharcher <mytharcher@gmail.com>
Co-authored-by: ChengLei Shao <chareice@live.com>

* refactor(server): refactor api and local lock

* refactor(server): change variable names and use singleton for local lock

* fix: lockManager.close

* refactor(server): adjust types

* feat(server): add  api

* refactor(core): move lock-manager to independent package to be used in db

* refactor(plugins): change to new lock manager to use locks

* fix(auth): fix test case

* chore: ttl of sort field lock

* fix: ttl

* fix(plugins): revert lock usage back for some plugins

* refactor(plugin-field-sort): move sort field to plugin

* chore: update build ci

* fix(server): fix build errors

* fix(plugin-field-sort): fix test case

* fix(plugin-field-sort): fix register move action

* fix(plugin-field-sort): fix load logic

* fix(plugin-data-source-main): fix lock usage

* chore(plugin-data-source-main): remove unused import

* fix(server): fix import crypto in pub sub manager (#5111)

* fix(plugin-field-sort): fix build and test cases

* fix(plugin-user-data-sync): fix test with sort field

* fix(plugin-users): fix test with sort field

---------

Co-authored-by: chenos <chenlinxh@gmail.com>
Co-authored-by: ChengLei Shao <chareice@live.com>
2024-08-28 14:41:45 +08:00

170 lines
4.4 KiB
TypeScript

/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import { Registry } from '@nocobase/utils';
import { Mutex, MutexInterface, E_CANCELED } from 'async-mutex';
export type Releaser = () => void | Promise<void>;
export interface ILock {
acquire(ttl: number): Releaser | Promise<Releaser>;
runExclusive<T>(fn: () => Promise<T>, ttl: number): Promise<T>;
}
export interface ILockAdapter {
connect(): Promise<void>;
close(): Promise<void>;
acquire(key: string, ttl: number): Releaser | Promise<Releaser>;
runExclusive<T>(key: string, fn: () => Promise<T>, ttl: number): Promise<T>;
tryAcquire(key: string, timeout?: number): Promise<ILock>;
}
export class LockAbortError extends Error {
constructor(message, options) {
super(message, options);
}
}
export class LockAcquireError extends Error {
constructor(message, options?) {
super(message, options);
}
}
class LocalLockAdapter implements ILockAdapter {
static locks = new Map<string, MutexInterface>();
async connect() {}
async close() {}
private getLock(key: string): MutexInterface {
let lock = (<typeof LocalLockAdapter>this.constructor).locks.get(key);
if (!lock) {
lock = new Mutex();
(<typeof LocalLockAdapter>this.constructor).locks.set(key, lock);
}
return lock;
}
async acquire(key: string, ttl: number) {
const lock = this.getLock(key);
const release = (await lock.acquire()) as Releaser;
const timer = setTimeout(() => {
if (lock.isLocked()) {
release();
}
}, ttl);
return () => {
release();
clearTimeout(timer);
};
}
async runExclusive<T>(key: string, fn: () => Promise<T>, ttl: number): Promise<T> {
const lock = this.getLock(key);
let timer;
try {
timer = setTimeout(() => {
if (lock.isLocked()) {
lock.release();
}
}, ttl);
return lock.runExclusive(fn);
} catch (e) {
if (e === E_CANCELED) {
throw new LockAbortError('Lock aborted', { cause: E_CANCELED });
} else {
throw e;
}
} finally {
clearTimeout(timer);
}
}
async tryAcquire(key: string) {
const lock = this.getLock(key);
if (lock.isLocked()) {
throw new LockAcquireError('lock is locked');
}
return {
acquire: async (ttl) => {
return this.acquire(key, ttl);
},
runExclusive: async (fn: () => Promise<any>, ttl) => {
return this.runExclusive(key, fn, ttl);
},
};
}
}
export interface LockAdapterConfig<C extends ILockAdapter = ILockAdapter> {
Adapter: new (...args: any[]) => C;
options?: Record<string, any>;
}
export interface LockManagerOptions {
defaultAdapter?: string;
}
export class LockManager {
private registry = new Registry<LockAdapterConfig>();
private adapters = new Map<string, ILockAdapter>();
constructor(private options: LockManagerOptions = {}) {
this.registry.register('local', {
Adapter: LocalLockAdapter,
});
}
registerAdapter(name: string, adapterConfig: LockAdapterConfig) {
this.registry.register(name, adapterConfig);
}
private async getAdapter(): Promise<ILockAdapter> {
const type = this.options.defaultAdapter || 'local';
let client = this.adapters.get(type);
if (!client) {
const adapter = this.registry.get(type);
if (!adapter) {
throw new Error(`Lock adapter "${type}" not registered`);
}
const { Adapter, options } = adapter;
client = new Adapter(options);
await client.connect();
this.adapters.set(type, client);
}
return client;
}
public async close() {
for (const client of this.adapters.values()) {
await client.close();
}
}
public async acquire(key: string, ttl = 500): Promise<Releaser> {
const client = await this.getAdapter();
return client.acquire(key, ttl);
}
public async runExclusive<T>(key: string, fn: () => Promise<T>, ttl = 500): Promise<T> {
const client = await this.getAdapter();
return client.runExclusive(key, fn, ttl);
}
public async tryAcquire(key: string) {
const client = await this.getAdapter();
return client.tryAcquire(key);
}
}
export default LockManager;