feat: pub/sub manager

This commit is contained in:
chenos 2024-07-23 10:42:10 +08:00 committed by mytharcher
parent 22ad2b4a91
commit 011e71429b
3 changed files with 179 additions and 0 deletions

View File

@ -0,0 +1,83 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import { createMockServer, MockServer } from '@nocobase/test';
import { createClient } from 'redis';
import Plugin from '../plugin';
import { IPubSubAdapter } from '../pub-sub-manager';
export class RedisPubSubAdapter implements IPubSubAdapter {
publisher;
subscriber;
constructor() {
this.publisher = createClient();
this.subscriber = this.publisher.duplicate();
}
async connect() {
await this.publisher.connect();
await this.subscriber.connect();
}
async close() {
await this.publisher.disconnect();
await this.subscriber.disconnect();
}
async subscribe(channel, callback) {
return this.subscriber.subscribe(channel, callback, true);
}
async unsubscribe(channel, callback) {
return this.subscriber.unsubscribe(channel, callback, true);
}
async publish(channel, message) {
return this.publisher.publish(channel, message);
}
onMessage(callback) {}
}
const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
describe('pub-sub-manager', () => {
test('case1', async () => {
let count = 0;
class Plugin1 extends Plugin {
async beforeLoad() {
this.app.pubSubManager.setAdapter(new RedisPubSubAdapter());
await this.app.pubSubManager.subscribe('chan1nel', (message) => {
++count;
console.log(`Channel1 subscriber collected message: ${message}`);
});
}
}
const appOpts = {
pubSubManager: {
name: 'app1',
},
plugins: [Plugin1, 'nocobase'],
};
const node1: MockServer = await createMockServer({
...appOpts,
name: 'node1',
});
const node2: MockServer = await createMockServer({
...appOpts,
name: 'node2',
});
await node1.pubSubManager.publish('chan1nel', `channel1_message_1`);
await sleep(1000);
expect(count).toBe(2);
await node1.destroy();
await node2.destroy();
});
});

View File

@ -36,6 +36,7 @@ import lodash from 'lodash';
import { RecordableHistogram } from 'node:perf_hooks';
import path, { basename, resolve } from 'path';
import semver from 'semver';
import packageJson from '../package.json';
import { createACL } from './acl';
import { AppCommand } from './app-command';
import { AppSupervisor } from './app-supervisor';
@ -59,6 +60,7 @@ import { dataTemplate } from './middlewares/data-template';
import validateFilterParams from './middlewares/validate-filter-params';
import { Plugin } from './plugin';
import { InstallOptions, PluginManager } from './plugin-manager';
import { PubSubManager } from './pub-sub-manager';
import { SyncManager } from './sync-manager';
import packageJson from '../package.json';
@ -97,6 +99,7 @@ export interface ApplicationOptions {
*/
resourcer?: ResourceManagerOptions;
resourceManager?: ResourceManagerOptions;
pubSubManager?: any;
bodyParser?: any;
cors?: any;
dataWrapping?: boolean;
@ -226,6 +229,7 @@ export class Application<StateT = DefaultState, ContextT = DefaultContext> exten
* @internal
*/
public syncManager: SyncManager;
public pubSubManager: PubSubManager;
public requestLogger: Logger;
private sqlLogger: Logger;
protected _logger: SystemLogger;
@ -518,6 +522,10 @@ export class Application<StateT = DefaultState, ContextT = DefaultContext> exten
await this.cacheManager.close();
}
if (this.pubSubManager) {
await this.pubSubManager.close();
}
if (this.telemetry.started) {
await this.telemetry.shutdown();
}
@ -1121,6 +1129,7 @@ export class Application<StateT = DefaultState, ContextT = DefaultContext> exten
this._cli = this.createCLI();
this._i18n = createI18n(options);
this.syncManager = new SyncManager(this);
this.pubSubManager = new PubSubManager(this, options.pubSubManager);
this.context.db = this.db;
/**

View File

@ -0,0 +1,87 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import Application from './application';
export class PubSubManager {
adapter: IPubSubAdapter;
subscribes = new Map();
constructor(
protected app: Application,
protected options: any = {},
) {
app.on('afterStart', async () => {
await this.connect();
});
app.on('afterStop', async () => {
await this.close();
});
}
get prefix() {
return this.options.name || this.app.name;
}
setAdapter(adapter: IPubSubAdapter) {
this.adapter = adapter;
}
async connect() {
await this.adapter.connect();
// subscribe 要在 connect 之后
for (const [channel, callbacks] of this.subscribes) {
for (const callback of callbacks) {
await this.adapter.subscribe(`${this.prefix}.${channel}`, callback);
}
}
}
async close() {
return await this.adapter.close();
}
async subscribe(channel, callback) {
if (!this.subscribes.has(channel)) {
const set = new Set();
this.subscribes.set(channel, set);
}
const set = this.subscribes.get(channel);
set.add(callback);
}
async unsubscribe(channel, callback) {
const set = this.subscribes.get(channel);
if (set) {
set.delete(callback);
}
return this.adapter.unsubscribe(`${this.prefix}.${channel}`, callback);
}
async publish(channel, message) {
return this.adapter.publish(`${this.prefix}.${channel}`, message);
}
onMessage(callback) {
return this.adapter.onMessage((channel, message) => {
if (channel.startsWith(`${this.prefix}.`)) {
callback(channel, message);
}
});
}
}
export interface IPubSubAdapter {
connect(): Promise<any>;
close(): Promise<any>;
subscribe(channel: string, callback): Promise<any>;
unsubscribe(channel: string, callback): Promise<any>;
publish(channel: string, message): Promise<any>;
onMessage(callback): void;
}