diff --git a/packages/core/server/src/__tests__/lock-manager.test.ts b/packages/core/server/src/__tests__/lock-manager.test.ts index a07cea07dd..4b6781d7ec 100644 --- a/packages/core/server/src/__tests__/lock-manager.test.ts +++ b/packages/core/server/src/__tests__/lock-manager.test.ts @@ -10,7 +10,6 @@ import { Mutex, withTimeout } from 'async-mutex'; import { Application } from '../application'; -import { LocalLock } from '../lock-manager'; function sleep(ms = 1000) { return new Promise((resolve) => { diff --git a/packages/core/server/src/lock-manager.ts b/packages/core/server/src/lock-manager.ts index 9cdf69cc46..af64533863 100644 --- a/packages/core/server/src/lock-manager.ts +++ b/packages/core/server/src/lock-manager.ts @@ -8,7 +8,7 @@ */ import { Registry } from '@nocobase/utils'; -import { Mutex, withTimeout, MutexInterface, E_CANCELED } from 'async-mutex'; +import { Mutex, tryAcquire, MutexInterface, E_CANCELED } from 'async-mutex'; export type Releaser = () => void | Promise; @@ -17,6 +17,7 @@ export abstract class AbstractLockAdapter { async close() {} abstract acquire(key: string, ttl: number): Releaser | Promise; abstract runExclusive(key: string, fn: () => Promise, ttl: number): Promise; + // abstract tryAcquire(key: string, ttl: number): Releaser | Promise; } export class LockAbortError extends Error { @@ -25,39 +26,42 @@ export class LockAbortError extends Error { } } -export class LocalLock { - private lock: MutexInterface; +class LocalLockAdapter extends AbstractLockAdapter { + static locks = new Map(); - constructor(private ttl: number) { - this.lock = new Mutex(); + private getLock(key: string): MutexInterface { + let lock = (this.constructor).locks.get(key); + if (!lock) { + lock = new Mutex(); + (this.constructor).locks.set(key, lock); + } + return lock; } - setTTL(ttl: number) { - this.ttl = ttl; - } - - async acquire() { - const release = (await this.lock.acquire()) as Releaser; + async acquire(key: string, ttl: number) { + const lock = this.getLock(key); + const release = (await lock.acquire()) as Releaser; const timer = setTimeout(() => { - if (this.lock.isLocked()) { + if (lock.isLocked()) { release(); } - }, this.ttl); + }, ttl); return () => { release(); clearTimeout(timer); }; } - async runExclusive(fn: () => Promise): Promise { + async runExclusive(key: string, fn: () => Promise, ttl: number): Promise { + const lock = this.getLock(key); let timer; try { timer = setTimeout(() => { - if (this.lock.isLocked()) { - this.lock.release(); + if (lock.isLocked()) { + lock.release(); } - }); - return this.lock.runExclusive(fn); + }, ttl); + return lock.runExclusive(fn); } catch (e) { if (e === E_CANCELED) { throw new LockAbortError('Lock aborted', { cause: E_CANCELED }); @@ -68,36 +72,15 @@ export class LocalLock { clearTimeout(timer); } } -} - -class LocalLockAdapter extends AbstractLockAdapter { - private locks = new Map(); - - private getLock(key: string, ttl: number): LocalLock { - let lock = this.locks.get(key); - if (!lock) { - lock = new LocalLock(ttl); - this.locks.set(key, lock); - } else { - lock.setTTL(ttl); - } - return lock; - } - - async acquire(key: string, ttl: number) { - const lock = this.getLock(key, ttl); - return lock.acquire(); - } - - async runExclusive(key: string, fn: () => Promise, ttl: number): Promise { - const lock = this.getLock(key, ttl); - return lock.runExclusive(fn); - } + // async tryAcquire(key: string, ttl: number) { + // const lock = this.getLock(key); + // return lock.tryAcquire(ttl); + // } } export interface LockAdapterConfig { - Client: new (...args: any[]) => C; - [key: string]: any; + Adapter: new (...args: any[]) => C; + options?: Record; } export interface LockManagerOptions { @@ -106,11 +89,11 @@ export interface LockManagerOptions { export class LockManager { private registry = new Registry(); - private clients = new Map(); + private adapters = new Map(); constructor(private options: LockManagerOptions = {}) { this.registry.register('local', { - Client: LocalLockAdapter, + Adapter: LocalLockAdapter, }); } @@ -118,31 +101,42 @@ export class LockManager { this.registry.register(name, adapterConfig); } - private async getClient(): Promise { + private async getAdapter(): Promise { const type = this.options.defaultAdapter || 'local'; - let client = this.clients.get(type); + let client = this.adapters.get(type); if (!client) { const adapter = this.registry.get(type); if (!adapter) { throw new Error(`Lock adapter "${type}" not registered`); } - const { Client, ...config } = adapter; - client = new Client(config); + const { Adapter, options } = adapter; + client = new Adapter(options); await client.connect(); - this.clients.set(type, client); + this.adapters.set(type, client); } return client; } + public async close() { + for (const client of this.adapters.values()) { + await client.close(); + } + } + public async acquire(key: string, ttl = 500) { - const client = await this.getClient(); + const client = await this.getAdapter(); return client.acquire(key, ttl); } public async runExclusive(key: string, fn: () => Promise, ttl = 500): Promise { - const client = await this.getClient(); + const client = await this.getAdapter(); return client.runExclusive(key, fn, ttl); } + + // public async tryAcquire(key: string, ttl = 500) { + // const client = await this.getAdapter(); + // return client.tryAcquire(key, ttl); + // } }