feat(server): add api

This commit is contained in:
mytharcher 2024-08-15 11:58:43 +00:00
parent c17dfd36b8
commit 483ea698b1
2 changed files with 58 additions and 39 deletions

View File

@ -7,9 +7,10 @@
* For more information, please refer to: https://www.nocobase.com/agreement. * For more information, please refer to: https://www.nocobase.com/agreement.
*/ */
import { Mutex, withTimeout } from 'async-mutex'; import { Mutex, tryAcquire } from 'async-mutex';
import { Application } from '../application'; import { Application } from '../application';
import { LockAcquireError } from '../lock-manager';
function sleep(ms = 1000) { function sleep(ms = 1000) {
return new Promise((resolve) => { return new Promise((resolve) => {
@ -19,7 +20,7 @@ function sleep(ms = 1000) {
describe('lock manager', () => { describe('lock manager', () => {
describe.skip('mutex example', () => { describe.skip('mutex example', () => {
it('mutex', async () => { it('acquire and release', async () => {
const order = []; const order = [];
const lock = new Mutex(); const lock = new Mutex();
const release1 = await lock.acquire(); const release1 = await lock.acquire();
@ -42,21 +43,18 @@ describe('lock manager', () => {
expect(order).toEqual([1, 4, 2, 3, 5, 6]); expect(order).toEqual([1, 4, 2, 3, 5, 6]);
}); });
it.skip('with timeout', async () => { it('tryAcquire', async () => {
const lock = withTimeout(new Mutex(), 200); const order = [];
const r1 = await lock.acquire(); const lock = new Mutex();
const l1 = tryAcquire(lock);
expect(l1.isLocked()).toBe(false);
const release1 = await lock.acquire();
expect(lock.isLocked()).toBe(true); expect(lock.isLocked()).toBe(true);
const l2 = lock.acquire(); const l2 = tryAcquire(lock);
await sleep(100); await expect(async () => {
expect(lock.isLocked()).toBe(true); const r2 = await l2.acquire();
setTimeout(async () => { }).rejects.toThrow();
expect(lock.isLocked()).toBe(false); await release1();
const r2 = await l2;
expect(lock.isLocked()).toBe(true);
await r2();
expect(lock.isLocked()).toBe(false);
}, 150);
await sleep(300);
}); });
}); });
@ -168,5 +166,28 @@ describe('lock manager', () => {
await sleep(200); await sleep(200);
expect(order).toEqual([3, 4, 5, 1, 6, 2]); expect(order).toEqual([3, 4, 5, 1, 6, 2]);
}); });
it('tryAcquire', async () => {
const release = await app.lockManager.acquire('test');
await expect(app.lockManager.tryAcquire('test')).rejects.toThrowError(LockAcquireError);
await release();
const lock = await app.lockManager.tryAcquire('test');
expect(lock.acquire).toBeTypeOf('function');
expect(lock.runExclusive).toBeTypeOf('function');
const order = [];
const r1 = await lock.acquire(200);
order.push(1);
setTimeout(async () => {
order.push(2);
await r1();
order.push(3);
}, 100);
const r2 = await lock.acquire(200);
order.push(4);
await sleep(300);
await r2();
expect(order).toEqual([1, 2, 3, 4]);
});
}); });
}); });

View File

@ -8,7 +8,7 @@
*/ */
import { Registry } from '@nocobase/utils'; import { Registry } from '@nocobase/utils';
import { Mutex, tryAcquire, MutexInterface, E_CANCELED } from 'async-mutex'; import { Mutex, MutexInterface, E_CANCELED } from 'async-mutex';
export type Releaser = () => void | Promise<void>; export type Releaser = () => void | Promise<void>;
@ -22,7 +22,7 @@ export interface ILockAdapter {
close(): Promise<void>; close(): Promise<void>;
acquire(key: string, ttl: number): Releaser | Promise<Releaser>; acquire(key: string, ttl: number): Releaser | Promise<Releaser>;
runExclusive<T>(key: string, fn: () => Promise<T>, ttl: number): Promise<T>; runExclusive<T>(key: string, fn: () => Promise<T>, ttl: number): Promise<T>;
// tryAcquire(key: string, timeout?: number): Promise<ILock>; tryAcquire(key: string, timeout?: number): Promise<ILock>;
} }
export class LockAbortError extends Error { export class LockAbortError extends Error {
@ -32,7 +32,7 @@ export class LockAbortError extends Error {
} }
export class LockAcquireError extends Error { export class LockAcquireError extends Error {
constructor(message, options) { constructor(message, options?) {
super(message, options); super(message, options);
} }
} }
@ -87,22 +87,20 @@ class LocalLockAdapter implements ILockAdapter {
} }
} }
// async tryAcquire(key: string) { async tryAcquire(key: string) {
// try { const lock = this.getLock(key);
// const lock = this.getLock(key); if (lock.isLocked()) {
// await tryAcquire(lock); throw new LockAcquireError('lock is locked');
// return { }
// async acquire(ttl) { return {
// return this.acquire(key, ttl); acquire: async (ttl) => {
// }, return this.acquire(key, ttl);
// async runExclusive(fn: () => Promise<any>, ttl) { },
// return this.runExclusive(key, fn, ttl); runExclusive: async (fn: () => Promise<any>, ttl) => {
// }, return this.runExclusive(key, fn, ttl);
// }; },
// } catch (e) { };
// throw new LockAcquireError('Lock acquire error', { cause: e }); }
// }
// }
} }
export interface LockAdapterConfig<C extends ILockAdapter = ILockAdapter> { export interface LockAdapterConfig<C extends ILockAdapter = ILockAdapter> {
@ -162,8 +160,8 @@ export class LockManager {
return client.runExclusive(key, fn, ttl); return client.runExclusive(key, fn, ttl);
} }
// public async tryAcquire(key: string, ttl = 500) { public async tryAcquire(key: string) {
// const client = await this.getAdapter(); const client = await this.getAdapter();
// return client.tryAcquire(key, ttl); return client.tryAcquire(key);
// } }
} }