From 011e71429befebb27b4c010ddb84fbf116d66a5c Mon Sep 17 00:00:00 2001 From: chenos Date: Tue, 23 Jul 2024 10:42:10 +0800 Subject: [PATCH] feat: pub/sub manager --- .../src/__tests__/pub-sub-manager.test.ts | 83 ++++++++++++++++++ packages/core/server/src/application.ts | 9 ++ packages/core/server/src/pub-sub-manager.ts | 87 +++++++++++++++++++ 3 files changed, 179 insertions(+) create mode 100644 packages/core/server/src/__tests__/pub-sub-manager.test.ts create mode 100644 packages/core/server/src/pub-sub-manager.ts diff --git a/packages/core/server/src/__tests__/pub-sub-manager.test.ts b/packages/core/server/src/__tests__/pub-sub-manager.test.ts new file mode 100644 index 0000000000..c6f21f3362 --- /dev/null +++ b/packages/core/server/src/__tests__/pub-sub-manager.test.ts @@ -0,0 +1,83 @@ +/** + * This file is part of the NocoBase (R) project. + * Copyright (c) 2020-2024 NocoBase Co., Ltd. + * Authors: NocoBase Team. + * + * This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License. + * For more information, please refer to: https://www.nocobase.com/agreement. + */ + +import { createMockServer, MockServer } from '@nocobase/test'; +import { createClient } from 'redis'; +import Plugin from '../plugin'; +import { IPubSubAdapter } from '../pub-sub-manager'; + +export class RedisPubSubAdapter implements IPubSubAdapter { + publisher; + subscriber; + + constructor() { + this.publisher = createClient(); + this.subscriber = this.publisher.duplicate(); + } + + async connect() { + await this.publisher.connect(); + await this.subscriber.connect(); + } + + async close() { + await this.publisher.disconnect(); + await this.subscriber.disconnect(); + } + + async subscribe(channel, callback) { + return this.subscriber.subscribe(channel, callback, true); + } + + async unsubscribe(channel, callback) { + return this.subscriber.unsubscribe(channel, callback, true); + } + + async publish(channel, message) { + return this.publisher.publish(channel, message); + } + + onMessage(callback) {} +} + +const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); + +describe('pub-sub-manager', () => { + test('case1', async () => { + let count = 0; + class Plugin1 extends Plugin { + async beforeLoad() { + this.app.pubSubManager.setAdapter(new RedisPubSubAdapter()); + await this.app.pubSubManager.subscribe('chan1nel', (message) => { + ++count; + console.log(`Channel1 subscriber collected message: ${message}`); + }); + } + } + const appOpts = { + pubSubManager: { + name: 'app1', + }, + plugins: [Plugin1, 'nocobase'], + }; + const node1: MockServer = await createMockServer({ + ...appOpts, + name: 'node1', + }); + const node2: MockServer = await createMockServer({ + ...appOpts, + name: 'node2', + }); + await node1.pubSubManager.publish('chan1nel', `channel1_message_1`); + await sleep(1000); + expect(count).toBe(2); + await node1.destroy(); + await node2.destroy(); + }); +}); diff --git a/packages/core/server/src/application.ts b/packages/core/server/src/application.ts index 065a440794..e2bb8416c3 100644 --- a/packages/core/server/src/application.ts +++ b/packages/core/server/src/application.ts @@ -36,6 +36,7 @@ import lodash from 'lodash'; import { RecordableHistogram } from 'node:perf_hooks'; import path, { basename, resolve } from 'path'; import semver from 'semver'; +import packageJson from '../package.json'; import { createACL } from './acl'; import { AppCommand } from './app-command'; import { AppSupervisor } from './app-supervisor'; @@ -59,6 +60,7 @@ import { dataTemplate } from './middlewares/data-template'; import validateFilterParams from './middlewares/validate-filter-params'; import { Plugin } from './plugin'; import { InstallOptions, PluginManager } from './plugin-manager'; +import { PubSubManager } from './pub-sub-manager'; import { SyncManager } from './sync-manager'; import packageJson from '../package.json'; @@ -97,6 +99,7 @@ export interface ApplicationOptions { */ resourcer?: ResourceManagerOptions; resourceManager?: ResourceManagerOptions; + pubSubManager?: any; bodyParser?: any; cors?: any; dataWrapping?: boolean; @@ -226,6 +229,7 @@ export class Application exten * @internal */ public syncManager: SyncManager; + public pubSubManager: PubSubManager; public requestLogger: Logger; private sqlLogger: Logger; protected _logger: SystemLogger; @@ -518,6 +522,10 @@ export class Application exten await this.cacheManager.close(); } + if (this.pubSubManager) { + await this.pubSubManager.close(); + } + if (this.telemetry.started) { await this.telemetry.shutdown(); } @@ -1121,6 +1129,7 @@ export class Application exten this._cli = this.createCLI(); this._i18n = createI18n(options); this.syncManager = new SyncManager(this); + this.pubSubManager = new PubSubManager(this, options.pubSubManager); this.context.db = this.db; /** diff --git a/packages/core/server/src/pub-sub-manager.ts b/packages/core/server/src/pub-sub-manager.ts new file mode 100644 index 0000000000..919af3d736 --- /dev/null +++ b/packages/core/server/src/pub-sub-manager.ts @@ -0,0 +1,87 @@ +/** + * This file is part of the NocoBase (R) project. + * Copyright (c) 2020-2024 NocoBase Co., Ltd. + * Authors: NocoBase Team. + * + * This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License. + * For more information, please refer to: https://www.nocobase.com/agreement. + */ + +import Application from './application'; + +export class PubSubManager { + adapter: IPubSubAdapter; + subscribes = new Map(); + + constructor( + protected app: Application, + protected options: any = {}, + ) { + app.on('afterStart', async () => { + await this.connect(); + }); + app.on('afterStop', async () => { + await this.close(); + }); + } + + get prefix() { + return this.options.name || this.app.name; + } + + setAdapter(adapter: IPubSubAdapter) { + this.adapter = adapter; + } + + async connect() { + await this.adapter.connect(); + // subscribe 要在 connect 之后 + for (const [channel, callbacks] of this.subscribes) { + for (const callback of callbacks) { + await this.adapter.subscribe(`${this.prefix}.${channel}`, callback); + } + } + } + + async close() { + return await this.adapter.close(); + } + + async subscribe(channel, callback) { + if (!this.subscribes.has(channel)) { + const set = new Set(); + this.subscribes.set(channel, set); + } + const set = this.subscribes.get(channel); + set.add(callback); + } + + async unsubscribe(channel, callback) { + const set = this.subscribes.get(channel); + if (set) { + set.delete(callback); + } + return this.adapter.unsubscribe(`${this.prefix}.${channel}`, callback); + } + + async publish(channel, message) { + return this.adapter.publish(`${this.prefix}.${channel}`, message); + } + + onMessage(callback) { + return this.adapter.onMessage((channel, message) => { + if (channel.startsWith(`${this.prefix}.`)) { + callback(channel, message); + } + }); + } +} + +export interface IPubSubAdapter { + connect(): Promise; + close(): Promise; + subscribe(channel: string, callback): Promise; + unsubscribe(channel: string, callback): Promise; + publish(channel: string, message): Promise; + onMessage(callback): void; +}