refactor(server): change variable names and use singleton for local lock

This commit is contained in:
mytharcher 2024-08-14 15:46:10 +00:00
parent 47ced56eb6
commit 387e16f3bf
2 changed files with 48 additions and 55 deletions

View File

@ -10,7 +10,6 @@
import { Mutex, withTimeout } from 'async-mutex'; import { Mutex, withTimeout } from 'async-mutex';
import { Application } from '../application'; import { Application } from '../application';
import { LocalLock } from '../lock-manager';
function sleep(ms = 1000) { function sleep(ms = 1000) {
return new Promise((resolve) => { return new Promise((resolve) => {

View File

@ -8,7 +8,7 @@
*/ */
import { Registry } from '@nocobase/utils'; import { Registry } from '@nocobase/utils';
import { Mutex, withTimeout, MutexInterface, E_CANCELED } from 'async-mutex'; import { Mutex, tryAcquire, MutexInterface, E_CANCELED } from 'async-mutex';
export type Releaser = () => void | Promise<void>; export type Releaser = () => void | Promise<void>;
@ -17,6 +17,7 @@ export abstract class AbstractLockAdapter {
async close() {} async close() {}
abstract acquire(key: string, ttl: number): Releaser | Promise<Releaser>; abstract acquire(key: string, ttl: number): Releaser | Promise<Releaser>;
abstract runExclusive<T>(key: string, fn: () => Promise<T>, ttl: number): Promise<T>; abstract runExclusive<T>(key: string, fn: () => Promise<T>, ttl: number): Promise<T>;
// abstract tryAcquire(key: string, ttl: number): Releaser | Promise<Releaser>;
} }
export class LockAbortError extends Error { export class LockAbortError extends Error {
@ -25,39 +26,42 @@ export class LockAbortError extends Error {
} }
} }
export class LocalLock { class LocalLockAdapter extends AbstractLockAdapter {
private lock: MutexInterface; static locks = new Map<string, MutexInterface>();
constructor(private ttl: number) { private getLock(key: string): MutexInterface {
this.lock = new Mutex(); let lock = (<typeof LocalLockAdapter>this.constructor).locks.get(key);
if (!lock) {
lock = new Mutex();
(<typeof LocalLockAdapter>this.constructor).locks.set(key, lock);
}
return lock;
} }
setTTL(ttl: number) { async acquire(key: string, ttl: number) {
this.ttl = ttl; const lock = this.getLock(key);
} const release = (await lock.acquire()) as Releaser;
async acquire() {
const release = (await this.lock.acquire()) as Releaser;
const timer = setTimeout(() => { const timer = setTimeout(() => {
if (this.lock.isLocked()) { if (lock.isLocked()) {
release(); release();
} }
}, this.ttl); }, ttl);
return () => { return () => {
release(); release();
clearTimeout(timer); clearTimeout(timer);
}; };
} }
async runExclusive<T>(fn: () => Promise<T>): Promise<T> { async runExclusive<T>(key: string, fn: () => Promise<T>, ttl: number): Promise<T> {
const lock = this.getLock(key);
let timer; let timer;
try { try {
timer = setTimeout(() => { timer = setTimeout(() => {
if (this.lock.isLocked()) { if (lock.isLocked()) {
this.lock.release(); lock.release();
} }
}); }, ttl);
return this.lock.runExclusive(fn); return lock.runExclusive(fn);
} catch (e) { } catch (e) {
if (e === E_CANCELED) { if (e === E_CANCELED) {
throw new LockAbortError('Lock aborted', { cause: E_CANCELED }); throw new LockAbortError('Lock aborted', { cause: E_CANCELED });
@ -68,36 +72,15 @@ export class LocalLock {
clearTimeout(timer); clearTimeout(timer);
} }
} }
} // async tryAcquire(key: string, ttl: number) {
// const lock = this.getLock(key);
class LocalLockAdapter extends AbstractLockAdapter { // return lock.tryAcquire(ttl);
private locks = new Map<string, LocalLock>(); // }
private getLock(key: string, ttl: number): LocalLock {
let lock = this.locks.get(key);
if (!lock) {
lock = new LocalLock(ttl);
this.locks.set(key, lock);
} else {
lock.setTTL(ttl);
}
return lock;
}
async acquire(key: string, ttl: number) {
const lock = this.getLock(key, ttl);
return lock.acquire();
}
async runExclusive<T>(key: string, fn: () => Promise<T>, ttl: number): Promise<T> {
const lock = this.getLock(key, ttl);
return lock.runExclusive(fn);
}
} }
export interface LockAdapterConfig<C extends AbstractLockAdapter = AbstractLockAdapter> { export interface LockAdapterConfig<C extends AbstractLockAdapter = AbstractLockAdapter> {
Client: new (...args: any[]) => C; Adapter: new (...args: any[]) => C;
[key: string]: any; options?: Record<string, any>;
} }
export interface LockManagerOptions { export interface LockManagerOptions {
@ -106,11 +89,11 @@ export interface LockManagerOptions {
export class LockManager { export class LockManager {
private registry = new Registry<LockAdapterConfig>(); private registry = new Registry<LockAdapterConfig>();
private clients = new Map<string, AbstractLockAdapter>(); private adapters = new Map<string, AbstractLockAdapter>();
constructor(private options: LockManagerOptions = {}) { constructor(private options: LockManagerOptions = {}) {
this.registry.register('local', { this.registry.register('local', {
Client: LocalLockAdapter, Adapter: LocalLockAdapter,
}); });
} }
@ -118,31 +101,42 @@ export class LockManager {
this.registry.register(name, adapterConfig); this.registry.register(name, adapterConfig);
} }
private async getClient(): Promise<AbstractLockAdapter> { private async getAdapter(): Promise<AbstractLockAdapter> {
const type = this.options.defaultAdapter || 'local'; const type = this.options.defaultAdapter || 'local';
let client = this.clients.get(type); let client = this.adapters.get(type);
if (!client) { if (!client) {
const adapter = this.registry.get(type); const adapter = this.registry.get(type);
if (!adapter) { if (!adapter) {
throw new Error(`Lock adapter "${type}" not registered`); throw new Error(`Lock adapter "${type}" not registered`);
} }
const { Client, ...config } = adapter; const { Adapter, options } = adapter;
client = new Client(config); client = new Adapter(options);
await client.connect(); await client.connect();
this.clients.set(type, client); this.adapters.set(type, client);
} }
return client; return client;
} }
public async close() {
for (const client of this.adapters.values()) {
await client.close();
}
}
public async acquire(key: string, ttl = 500) { public async acquire(key: string, ttl = 500) {
const client = await this.getClient(); const client = await this.getAdapter();
return client.acquire(key, ttl); return client.acquire(key, ttl);
} }
public async runExclusive<T>(key: string, fn: () => Promise<T>, ttl = 500): Promise<T> { public async runExclusive<T>(key: string, fn: () => Promise<T>, ttl = 500): Promise<T> {
const client = await this.getClient(); const client = await this.getAdapter();
return client.runExclusive(key, fn, ttl); return client.runExclusive(key, fn, ttl);
} }
// public async tryAcquire(key: string, ttl = 500) {
// const client = await this.getAdapter();
// return client.tryAcquire(key, ttl);
// }
} }