feat: pub/sub manager (#4933)

* feat: pub/sub manager

* fix: test case

* fix: test error

* fix: test error

* feat: skip self

* feat: debounce

* feat: improve code

* fix: test error

* feat: test cases

* feat: test cases

* fix: improve code

* fix: improve code

* feat: improve code

* fix: improve code

* fix: test case

* fix: typo

* fix: createPubSubManager

* fix: delete messageHandlers

* fix: test case

* feat: improve code

* fix: test error

* fix: test error

* refactor(server): adapt to new api and fix test

* fix(plugin-data-source-main): fix changed api

* fix: test error

* fix: remove sync-manager test case

* chore(server): remove legacy code

* fix(plugin-workflow): fix send sync message with transaction

* chore(server): remove legacy code

* chore(server): remove legacy code

* fix(plugin-workflow): fix test case

* fix(plugin-workflow): fix test case

* test(server): test skip-install parameter in cluster

* test(server): avoid multiple installation in cluster

* test(server): installation in cluster

* feat: sync collection using sync manager (#4920)

* chore: sync collection message

* chore: sync acl

* fix: typo

* chore: sync data source

* chore: remove collection

* fix: typo

* fix: test

* chore: sync sub app event

* chore: sync collection test

* chore: collection test

* chore: test

* chore: data source sync message

* chore: sync multi app

* chore: test

* chore: test

* chore: test

* chore: test

* chore: test

* chore: error message

* fix(server): add type and remove log

* fix(server): not to publish when adpater is not connected

* refactor(server): refine types

* chore: timeout

* fix(server): fix pubSubManager options

* test(ci): test ci checkout

---------

Co-authored-by: mytharcher <mytharcher@gmail.com>
Co-authored-by: ChengLei Shao <chareice@live.com>
This commit is contained in:
chenos 2024-08-14 12:09:36 +08:00 committed by GitHub
parent 835836e924
commit 88e47afe39
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
38 changed files with 1764 additions and 353 deletions

View File

@ -26,7 +26,7 @@ jobs:
- 4873:4873
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
ref: ${{ github.event.inputs.base_branch }}
ssh-key: ${{ secrets.SUBMODULE_SSH_KEY }}
@ -38,7 +38,7 @@ jobs:
- name: Echo PR branch
run: echo "${{ steps.set_pro_pr_branch.outputs.pr_branch }}"
- name: Checkout pro-plugins
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
repository: nocobase/pro-plugins
path: packages/pro-plugins

View File

@ -110,7 +110,7 @@ export interface SchemaInitializerOptions<P1 = ButtonProps, P2 = {}> {
insertPosition?: 'beforeBegin' | 'afterBegin' | 'beforeEnd' | 'afterEnd';
designable?: boolean;
wrap?: (s: ISchema, options?: any) => ISchema;
useWrap?: () => ((s: ISchema, options?: any) => ISchema);
useWrap?: () => (s: ISchema, options?: any) => ISchema;
onSuccess?: (data: any) => void;
insert?: InsertType;
useInsert?: () => InsertType;

View File

@ -47,8 +47,11 @@ export const SettingsCenterDropdown = () => {
return {
key: setting.name,
icon: setting.icon,
label: setting.link ? <div onClick={() => window.open(setting.link)}>{compile(setting.title)}</div> :
label: setting.link ? (
<div onClick={() => window.open(setting.link)}>{compile(setting.title)}</div>
) : (
<Link to={setting.path}>{compile(setting.title)}</Link>
),
};
});
}, [app, t]);

View File

@ -0,0 +1,229 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import { MemoryPubSubAdapter, MockServer, createMockServer, sleep } from '@nocobase/test';
import { PubSubManager } from '../pub-sub-manager';
describe('connect', () => {
let pubSubManager: PubSubManager;
beforeEach(async () => {
pubSubManager = new PubSubManager({ channelPrefix: 'pubsub1' });
pubSubManager.setAdapter(new MemoryPubSubAdapter());
});
afterEach(async () => {
await pubSubManager.close();
});
test('not connected', async () => {
const mockListener = vi.fn();
await pubSubManager.subscribe('test1', mockListener);
await pubSubManager.publish('test1', 'message1');
expect(mockListener).not.toHaveBeenCalled();
});
test('closed', async () => {
const mockListener = vi.fn();
await pubSubManager.connect();
await pubSubManager.subscribe('test1', mockListener);
await pubSubManager.close();
await pubSubManager.publish('test1', 'message1');
expect(mockListener).not.toHaveBeenCalled();
});
test('subscribe before connect', async () => {
const mockListener = vi.fn();
await pubSubManager.subscribe('test1', mockListener);
await pubSubManager.connect();
await pubSubManager.publish('test1', 'message1');
expect(mockListener).toHaveBeenCalled();
expect(mockListener).toBeCalledTimes(1);
expect(mockListener).toHaveBeenCalledWith('message1');
});
test('subscribe after connect', async () => {
await pubSubManager.connect();
const mockListener = vi.fn();
await pubSubManager.subscribe('test1', mockListener);
await pubSubManager.publish('test1', 'message1');
expect(mockListener).toHaveBeenCalled();
expect(mockListener).toBeCalledTimes(1);
expect(mockListener).toHaveBeenCalledWith('message1');
});
});
describe('skipSelf, unsubscribe, debounce', () => {
let pubSubManager: PubSubManager;
beforeEach(async () => {
pubSubManager = new PubSubManager({ channelPrefix: 'pubsub1' });
pubSubManager.setAdapter(new MemoryPubSubAdapter());
await pubSubManager.connect();
});
afterEach(async () => {
await pubSubManager.close();
});
test('skipSelf: false', async () => {
const mockListener = vi.fn();
await pubSubManager.subscribe('test1', mockListener);
await pubSubManager.publish('test1', 'message1');
expect(mockListener).toHaveBeenCalled();
expect(mockListener).toBeCalledTimes(1);
expect(mockListener).toHaveBeenCalledWith('message1');
});
test('skipSelf: true', async () => {
const mockListener = vi.fn();
await pubSubManager.subscribe('test1', mockListener);
await pubSubManager.publish('test1', 'message1', { skipSelf: true });
expect(mockListener).not.toHaveBeenCalled();
});
test('debounce', async () => {
const mockListener = vi.fn();
await pubSubManager.subscribe('test1', mockListener, { debounce: 1000 });
pubSubManager.publish('test1', 'message1');
pubSubManager.publish('test1', 'message1');
pubSubManager.publish('test1', 'message1');
pubSubManager.publish('test1', 'message1');
pubSubManager.publish('test1', 'message1');
pubSubManager.publish('test1', 'message1');
pubSubManager.publish('test1', 'message2');
pubSubManager.publish('test1', 'message2');
pubSubManager.publish('test1', 'message2');
pubSubManager.publish('test1', 'message2');
pubSubManager.publish('test1', 'message2');
pubSubManager.publish('test1', 'message2');
await sleep(500);
expect(pubSubManager['handlerManager']['uniqueMessageHandlers'].size).toBe(2);
await sleep(2000);
expect(pubSubManager['handlerManager']['uniqueMessageHandlers'].size).toBe(0);
expect(mockListener).toBeCalledTimes(2);
});
test('message format', async () => {
const mockListener = vi.fn();
await pubSubManager.subscribe('test1', mockListener);
await pubSubManager.publish('test1', 1);
expect(mockListener).toBeCalledTimes(1);
expect(mockListener).toHaveBeenCalledWith(1);
const msg2 = ['message1'];
await pubSubManager.publish('test1', msg2);
expect(mockListener).toBeCalledTimes(2);
expect(mockListener).toHaveBeenCalledWith(msg2);
const msg3 = { type: 'test' };
await pubSubManager.publish('test1', msg3);
expect(mockListener).toBeCalledTimes(3);
expect(mockListener).toHaveBeenCalledWith(msg3);
await pubSubManager.publish('test1', true);
expect(mockListener).toBeCalledTimes(4);
expect(mockListener).toHaveBeenCalledWith(true);
await pubSubManager.publish('test1', false);
expect(mockListener).toBeCalledTimes(5);
expect(mockListener).toHaveBeenCalledWith(false);
await pubSubManager.publish('test1', null);
expect(mockListener).toBeCalledTimes(6);
expect(mockListener).toHaveBeenCalledWith(null);
});
test('unsubscribe', async () => {
const mockListener = vi.fn();
await pubSubManager.subscribe('test1', mockListener);
await pubSubManager.publish('test1', 'message1');
expect(mockListener).toHaveBeenCalled();
expect(mockListener).toBeCalledTimes(1);
expect(mockListener).toHaveBeenCalledWith('message1');
await pubSubManager.unsubscribe('test1', mockListener);
await pubSubManager.publish('test1', 'message1');
expect(mockListener).toBeCalledTimes(1);
});
});
describe('Pub/Sub', () => {
let publisher: PubSubManager;
let subscriber: PubSubManager;
beforeEach(async () => {
const pubsub = new MemoryPubSubAdapter();
publisher = new PubSubManager({ channelPrefix: 'pubsub1' });
publisher.setAdapter(pubsub);
await publisher.connect();
subscriber = new PubSubManager({ channelPrefix: 'pubsub1' });
subscriber.setAdapter(pubsub);
await subscriber.connect();
});
afterEach(async () => {
await publisher.close();
await subscriber.close();
});
test('subscribe publish', async () => {
const mockListener = vi.fn();
await subscriber.subscribe('test1', mockListener);
await publisher.publish('test1', 'message1');
expect(mockListener).toHaveBeenCalled();
expect(mockListener).toBeCalledTimes(1);
expect(mockListener).toHaveBeenCalledWith('message1');
});
test('subscribe twice', async () => {
const mockListener = vi.fn();
await subscriber.subscribe('test1', mockListener);
await subscriber.subscribe('test1', mockListener);
await publisher.publish('test1', 'message1');
expect(mockListener).toHaveBeenCalled();
expect(mockListener).toBeCalledTimes(1);
expect(mockListener).toHaveBeenCalledWith('message1');
});
test('publish only self', async () => {
const mockListener = vi.fn();
await subscriber.subscribe('test1', mockListener);
await publisher.subscribe('test1', mockListener);
await publisher.publish('test1', 'message1', { onlySelf: true });
expect(mockListener).toHaveBeenCalled();
expect(mockListener).toBeCalledTimes(1);
expect(mockListener).toHaveBeenCalledWith('message1');
});
});
describe('app.pubSubManager', () => {
let app: MockServer;
let pubSubManager: PubSubManager;
beforeEach(async () => {
app = await createMockServer({
pubSubManager: {
channelPrefix: 'app1',
},
});
pubSubManager = app.pubSubManager;
});
afterEach(async () => {
await app.destroy();
});
test('adapter', async () => {
expect(await pubSubManager.isConnected()).toBe(true);
});
test('subscribe + publish', async () => {
const mockListener = vi.fn();
await pubSubManager.subscribe('test1', mockListener);
await pubSubManager.publish('test1', 'message1');
expect(mockListener).toHaveBeenCalled();
expect(mockListener).toBeCalledTimes(1);
expect(mockListener).toHaveBeenCalledWith('message1');
});
});

View File

@ -1,61 +0,0 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import { Application } from '../application';
import { SyncAdapter } from '../sync-manager';
class MockAdapter extends SyncAdapter {
private _ready: boolean;
constructor({ ready = false, ...options } = {}) {
super(options);
this._ready = ready;
}
get ready() {
return this._ready;
}
publish(data: Record<string, any>): void {
return;
}
}
describe('sync manager', () => {
let app: Application;
beforeEach(() => {
app = new Application({
database: {
dialect: 'sqlite',
storage: ':memory:',
},
resourcer: {
prefix: '/api',
},
acl: false,
dataWrapping: false,
registerActions: false,
});
});
afterEach(async () => {
return app.destroy();
});
it('sync manager should be initialized with application', async () => {
expect(app.syncManager).toBeDefined();
});
it('init adapter', async () => {
const mockAdapter1 = new MockAdapter();
app.syncManager.init(mockAdapter1);
expect(() => app.syncManager.init(mockAdapter1)).toThrowError();
});
});

View File

@ -0,0 +1,94 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import { Plugin } from '@nocobase/server';
import { createMockCluster, sleep } from '@nocobase/test';
describe('sync-message-manager', () => {
test('subscribe + publish', async () => {
const cluster = await createMockCluster();
const [node1, node2] = cluster.nodes;
const mockListener = vi.fn();
await node1.syncMessageManager.subscribe('test1', mockListener);
await node2.syncMessageManager.subscribe('test1', mockListener);
await node2.syncMessageManager.publish('test1', 'message1');
expect(mockListener).toHaveBeenCalled();
expect(mockListener).toBeCalledTimes(1);
expect(mockListener).toHaveBeenCalledWith('message1');
await cluster.destroy();
});
test('transaction', async () => {
const cluster = await createMockCluster();
const [node1, node2] = cluster.nodes;
const mockListener = vi.fn();
await node1.syncMessageManager.subscribe('test1', mockListener);
const transaction = await node2.db.sequelize.transaction();
node2.syncMessageManager.publish('test1', 'message1', { transaction });
await sleep(1000);
expect(mockListener).not.toHaveBeenCalled();
await transaction.commit();
await sleep(1100);
expect(mockListener).toHaveBeenCalled();
expect(mockListener).toBeCalledTimes(1);
expect(mockListener).toHaveBeenCalledWith('message1');
await cluster.destroy();
});
test('plugin.handleSyncMessage', async () => {
const mockListener = vi.fn();
class MyPlugin extends Plugin {
get name() {
return 'test1';
}
async handleSyncMessage(message) {
mockListener(message);
}
}
const cluster = await createMockCluster({
plugins: [MyPlugin],
});
const [app1, app2] = cluster.nodes;
await app1.pm.get(MyPlugin).sendSyncMessage('message1');
expect(mockListener).toBeCalledTimes(1);
expect(mockListener).toHaveBeenCalledWith('message1');
await app2.pm.get(MyPlugin).sendSyncMessage('message2');
expect(mockListener).toBeCalledTimes(2);
expect(mockListener).toHaveBeenCalledWith('message2');
await cluster.destroy();
});
test('plugin.handleSyncMessage + transaction', async () => {
const mockListener = vi.fn();
class MyPlugin extends Plugin {
get name() {
return 'test1';
}
async handleSyncMessage(message) {
mockListener(message);
}
}
const cluster = await createMockCluster({
plugins: [MyPlugin],
});
const [app1, app2] = cluster.nodes;
const transaction = await app1.db.sequelize.transaction();
app1.pm.get(MyPlugin).sendSyncMessage('message1', { transaction });
await sleep(1000);
expect(mockListener).not.toHaveBeenCalled();
await transaction.commit();
await sleep(1100);
expect(mockListener).toBeCalledTimes(1);
expect(mockListener).toHaveBeenCalledWith('message1');
await app2.pm.get(MyPlugin).sendSyncMessage('message2');
expect(mockListener).toBeCalledTimes(2);
expect(mockListener).toHaveBeenCalledWith('message2');
await cluster.destroy();
});
});

View File

@ -59,7 +59,8 @@ import { dataTemplate } from './middlewares/data-template';
import validateFilterParams from './middlewares/validate-filter-params';
import { Plugin } from './plugin';
import { InstallOptions, PluginManager } from './plugin-manager';
import { SyncManager } from './sync-manager';
import { createPubSubManager, PubSubManager, PubSubManagerOptions } from './pub-sub-manager';
import { SyncMessageManager } from './sync-message-manager';
import packageJson from '../package.json';
@ -97,6 +98,8 @@ export interface ApplicationOptions {
*/
resourcer?: ResourceManagerOptions;
resourceManager?: ResourceManagerOptions;
pubSubManager?: PubSubManagerOptions;
syncMessageManager?: any;
bodyParser?: any;
cors?: any;
dataWrapping?: boolean;
@ -225,7 +228,8 @@ export class Application<StateT = DefaultState, ContextT = DefaultContext> exten
/**
* @internal
*/
public syncManager: SyncManager;
public pubSubManager: PubSubManager;
public syncMessageManager: SyncMessageManager;
public requestLogger: Logger;
private sqlLogger: Logger;
protected _logger: SystemLogger;
@ -518,6 +522,10 @@ export class Application<StateT = DefaultState, ContextT = DefaultContext> exten
await this.cacheManager.close();
}
if (this.pubSubManager) {
await this.pubSubManager.close();
}
if (this.telemetry.started) {
await this.telemetry.shutdown();
}
@ -1120,7 +1128,8 @@ export class Application<StateT = DefaultState, ContextT = DefaultContext> exten
this._cli = this.createCLI();
this._i18n = createI18n(options);
this.syncManager = new SyncManager(this);
this.pubSubManager = createPubSubManager(this, options.pubSubManager);
this.syncMessageManager = new SyncMessageManager(this, options.syncMessageManager);
this.context.db = this.db;
/**

View File

@ -7,13 +7,13 @@
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
export * from './app-supervisor';
export * from './application';
export { Application as default } from './application';
export * from './gateway';
export * as middlewares from './middlewares';
export * from './migration';
export * from './plugin';
export * from './plugin-manager';
export * from './gateway';
export * from './app-supervisor';
export * from './sync-manager';
export * from './pub-sub-manager';
export const OFFICIAL_PLUGIN_PREFIX = '@nocobase/plugin-';

View File

@ -9,16 +9,15 @@
/* istanbul ignore file -- @preserve */
import { Model } from '@nocobase/database';
import { Model, Transactionable } from '@nocobase/database';
import { LoggerOptions } from '@nocobase/logger';
import { fsExists } from '@nocobase/utils';
import fs from 'fs';
import type { TFuncKey, TOptions } from 'i18next';
import { resolve } from 'path';
import { Application } from './application';
import { InstallOptions, getExposeChangelogUrl, getExposeReadmeUrl } from './plugin-manager';
import { getExposeChangelogUrl, getExposeReadmeUrl, InstallOptions } from './plugin-manager';
import { checkAndGetCompatible, getPluginBasePath } from './plugin-manager/utils';
import { SyncMessageData } from './sync-manager';
export interface PluginInterface {
beforeLoad?: () => void;
@ -134,18 +133,13 @@ export abstract class Plugin<O = any> implements PluginInterface {
async afterRemove() {}
/**
* Fired when a sync message is received.
* @experimental
*/
onSync(message: SyncMessageData = {}): Promise<void> | void {}
async handleSyncMessage(message: any) {}
async sendSyncMessage(message: any, options?: Transactionable) {
if (!this.name) {
throw new Error(`plugin name invalid`);
}
/**
* Publish a sync message.
* @experimental
*/
sync(message?: SyncMessageData) {
this.app.syncManager.publish(this.name, message);
await this.app.syncMessageManager.publish(this.name, message, options);
}
/**
@ -179,13 +173,6 @@ export abstract class Plugin<O = any> implements PluginInterface {
});
}
private async getPluginBasePath() {
if (!this.options.packageName) {
return;
}
return getPluginBasePath(this.options.packageName);
}
/**
* @internal
*/
@ -252,6 +239,13 @@ export abstract class Plugin<O = any> implements PluginInterface {
return results;
}
private async getPluginBasePath() {
if (!this.options.packageName) {
return;
}
return getPluginBasePath(this.options.packageName);
}
}
export default Plugin;

View File

@ -0,0 +1,121 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import _ from 'lodash';
import { type PubSubManagerSubscribeOptions } from './types';
export class HandlerManager {
handlers: Map<any, any>;
uniqueMessageHandlers: Map<any, any>;
constructor(protected publisherId: string) {
this.reset();
}
protected async getMessageHash(message) {
const encoder = new TextEncoder();
const data = encoder.encode(JSON.stringify(message));
const hashBuffer = await crypto.subtle.digest('SHA-256', data);
const hashArray = Array.from(new Uint8Array(hashBuffer));
const hashHex = hashArray.map((b) => b.toString(16).padStart(2, '0')).join('');
return hashHex;
}
protected verifyMessage({ onlySelf, skipSelf, publisherId }) {
if (onlySelf && publisherId !== this.publisherId) {
return;
} else if (!onlySelf && skipSelf && publisherId === this.publisherId) {
return;
}
return true;
}
protected debounce(func, wait: number) {
if (wait) {
return _.debounce(func, wait);
}
return func;
}
async handleMessage({ channel, message, callback, debounce }) {
if (!debounce) {
await callback(message);
return;
}
const messageHash = channel + (await this.getMessageHash(message));
if (!this.uniqueMessageHandlers.has(messageHash)) {
this.uniqueMessageHandlers.set(messageHash, this.debounce(callback, debounce));
}
const handler = this.uniqueMessageHandlers.get(messageHash);
try {
await handler(message);
setTimeout(() => {
this.uniqueMessageHandlers.delete(messageHash);
}, debounce);
} catch (error) {
this.uniqueMessageHandlers.delete(messageHash);
throw error;
}
}
wrapper(channel, callback, options) {
const { debounce = 0 } = options;
return async (wrappedMessage) => {
const json = JSON.parse(wrappedMessage);
if (!this.verifyMessage(json)) {
return;
}
await this.handleMessage({ channel, message: json.message, debounce, callback });
};
}
set(channel: string, callback, options: PubSubManagerSubscribeOptions) {
if (!this.handlers.has(channel)) {
this.handlers.set(channel, new Map());
}
const headlerMap = this.handlers.get(channel);
const headler = this.wrapper(channel, callback, options);
headlerMap.set(callback, headler);
return headler;
}
get(channel: string, callback) {
const headlerMap = this.handlers.get(channel);
if (!headlerMap) {
return;
}
return headlerMap.get(callback);
}
delete(channel: string, callback) {
if (!callback) {
return;
}
const headlerMap = this.handlers.get(channel);
if (!headlerMap) {
return;
}
const headler = headlerMap.get(callback);
headlerMap.delete(callback);
return headler;
}
reset() {
this.handlers = new Map();
this.uniqueMessageHandlers = new Map();
}
async each(callback) {
for (const [channel, headlerMap] of this.handlers) {
for (const headler of headlerMap.values()) {
await callback(channel, headler);
}
}
}
}

View File

@ -0,0 +1,13 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
export * from './handler-manager';
export * from './pub-sub-manager';
export * from './types';

View File

@ -0,0 +1,109 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import { uid } from '@nocobase/utils';
import Application from '../application';
import { HandlerManager } from './handler-manager';
import {
PubSubCallback,
type IPubSubAdapter,
type PubSubManagerOptions,
type PubSubManagerPublishOptions,
type PubSubManagerSubscribeOptions,
} from './types';
export const createPubSubManager = (app: Application, options: PubSubManagerOptions) => {
const pubSubManager = new PubSubManager(options);
app.on('afterStart', async () => {
await pubSubManager.connect();
});
app.on('afterStop', async () => {
await pubSubManager.close();
});
return pubSubManager;
};
export class PubSubManager {
protected publisherId: string;
protected adapter: IPubSubAdapter;
protected handlerManager: HandlerManager;
constructor(protected options: PubSubManagerOptions = {}) {
this.publisherId = uid();
this.handlerManager = new HandlerManager(this.publisherId);
}
get channelPrefix() {
return this.options?.channelPrefix ? `${this.options.channelPrefix}.` : '';
}
setAdapter(adapter: IPubSubAdapter) {
this.adapter = adapter;
}
async isConnected() {
if (this.adapter) {
return this.adapter.isConnected();
}
return false;
}
async connect() {
if (!this.adapter) {
return;
}
await this.adapter.connect();
// 如果没连接前添加的订阅,连接后需要把订阅添加上
await this.handlerManager.each(async (channel, headler) => {
await this.adapter.subscribe(`${this.channelPrefix}${channel}`, headler);
});
}
async close() {
if (!this.adapter) {
return;
}
return await this.adapter.close();
}
async subscribe(channel: string, callback: PubSubCallback, options: PubSubManagerSubscribeOptions = {}) {
// 先退订,防止重复订阅
await this.unsubscribe(channel, callback);
const handler = this.handlerManager.set(channel, callback, options);
// 连接之后才能订阅
if (await this.isConnected()) {
await this.adapter.subscribe(`${this.channelPrefix}${channel}`, handler);
}
}
async unsubscribe(channel: string, callback: PubSubCallback) {
const handler = this.handlerManager.delete(channel, callback);
if (!this.adapter || !handler) {
return;
}
return this.adapter.unsubscribe(`${this.channelPrefix}${channel}`, handler);
}
async publish(channel: string, message: any, options?: PubSubManagerPublishOptions) {
if (!this.adapter?.isConnected()) {
return;
}
const wrappedMessage = JSON.stringify({
publisherId: this.publisherId,
...options,
message: message,
});
return this.adapter.publish(`${this.channelPrefix}${channel}`, wrappedMessage);
}
}

View File

@ -0,0 +1,32 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
export interface PubSubManagerOptions {
channelPrefix?: string;
}
export interface PubSubManagerPublishOptions {
skipSelf?: boolean;
onlySelf?: boolean;
}
export interface PubSubManagerSubscribeOptions {
debounce?: number;
}
export type PubSubCallback = (message: any) => Promise<void>;
export interface IPubSubAdapter {
isConnected(): Promise<boolean> | boolean;
connect(): Promise<any>;
close(): Promise<any>;
subscribe(channel: string, callback: PubSubCallback): Promise<any>;
unsubscribe(channel: string, callback: PubSubCallback): Promise<any>;
publish(channel: string, message: string): Promise<any>;
}

View File

@ -1,127 +0,0 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import { randomUUID } from 'node:crypto';
import EventEmitter from 'node:events';
import Application from './application';
import { isEqual, uniqWith } from 'lodash';
export abstract class SyncAdapter extends EventEmitter {
abstract get ready(): boolean;
public abstract publish(data: SyncMessage): void | Promise<void>;
}
export type SyncMessageData = Record<string, string>;
export type SyncEventCallback = (message: SyncMessageData) => void;
export type SyncMessage = {
namespace: string;
nodeId: string;
appName: string;
} & SyncMessageData;
/**
* @experimental
*/
export class SyncManager {
private nodeId: string;
private eventEmitter = new EventEmitter();
private adapter: SyncAdapter = null;
private incomingBuffer: SyncMessageData[] = [];
private outgoingBuffer: [string, SyncMessageData][] = [];
private flushTimer: NodeJS.Timeout = null;
public get available() {
return this.adapter ? this.adapter.ready : false;
}
private onMessage(namespace, message) {
this.app.logger.info(`emit sync event in namespace ${namespace}`);
this.eventEmitter.emit(namespace, message);
const pluginInstance = this.app.pm.get(namespace);
pluginInstance.onSync(message);
}
private onSync = (messages: SyncMessage[]) => {
this.app.logger.info('sync messages received, save into buffer:', messages);
if (this.flushTimer) {
clearTimeout(this.flushTimer);
this.flushTimer = null;
}
this.incomingBuffer = uniqWith(
this.incomingBuffer.concat(
messages
.filter((item) => item.nodeId !== this.nodeId && item.appName === this.app.name)
.map(({ nodeId, appName, ...message }) => message),
),
isEqual,
);
this.flushTimer = setTimeout(() => {
this.incomingBuffer.forEach(({ namespace, ...message }) => {
this.onMessage(namespace, message);
});
this.incomingBuffer = [];
}, 1000);
};
private onReady = () => {
while (this.outgoingBuffer.length) {
const [namespace, data] = this.outgoingBuffer.shift();
this.publish(namespace, data);
}
};
constructor(private app: Application) {
this.nodeId = `${process.env.NODE_ID || randomUUID()}-${process.pid}`;
}
public init(adapter: SyncAdapter) {
if (this.adapter) {
throw new Error('sync adapter is already exists');
}
if (!adapter) {
return;
}
this.adapter = adapter;
this.adapter.on('message', this.onSync);
this.adapter.on('ready', this.onReady);
}
public subscribe(namespace: string, callback: SyncEventCallback) {
this.eventEmitter.on(namespace, callback);
}
public unsubscribe(namespace: string, callback: SyncEventCallback) {
this.eventEmitter.off(namespace, callback);
}
/**
* Publish a message to the sync manager
*/
public publish(namespace: string, data: SyncMessageData = {}) {
if (!this.adapter) {
return;
}
if (!this.adapter.ready) {
this.outgoingBuffer.push([namespace, data]);
this.app.logger.warn(`sync adapter is not ready for now, message will be send when it is ready`);
return;
}
this.app.logger.info(`publishing sync message from #${this.nodeId} (${this.app.name}) in namespace ${namespace}:`, {
data,
});
return this.adapter.publish({ ...data, nodeId: this.nodeId, appName: this.app.name, namespace });
}
}

View File

@ -0,0 +1,82 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import { Transactionable } from '@nocobase/database';
import Application from './application';
import { PubSubCallback, PubSubManager, PubSubManagerPublishOptions } from './pub-sub-manager';
export class SyncMessageManager {
protected versionManager: SyncMessageVersionManager;
protected pubSubManager: PubSubManager;
constructor(
protected app: Application,
protected options: any = {},
) {
this.versionManager = new SyncMessageVersionManager();
app.on('beforeLoadPlugin', async (plugin) => {
if (!plugin.name) {
return;
}
await this.subscribe(plugin.name, plugin.handleSyncMessage.bind(plugin));
});
}
get debounce() {
return this.options.debounce || 1000;
}
async publish(channel: string, message, options?: PubSubManagerPublishOptions & Transactionable) {
const { transaction, ...others } = options || {};
if (transaction) {
return await new Promise((resolve, reject) => {
const timer = setTimeout(() => {
reject(new Error(`Publish message to ${channel} timeout, message: ${JSON.stringify(message)}`));
}, 50000);
transaction.afterCommit(async () => {
try {
const r = await this.app.pubSubManager.publish(`${this.app.name}.sync.${channel}`, message, {
skipSelf: true,
...others,
});
resolve(r);
} catch (error) {
reject(error);
} finally {
clearTimeout(timer);
}
});
});
} else {
return await this.app.pubSubManager.publish(`${this.app.name}.sync.${channel}`, message, {
skipSelf: true,
...options,
});
}
}
async subscribe(channel: string, callback: PubSubCallback) {
return await this.app.pubSubManager.subscribe(`${this.app.name}.sync.${channel}`, callback, {
debounce: this.debounce,
});
}
async unsubscribe(channel: string, callback: PubSubCallback) {
return this.app.pubSubManager.unsubscribe(`${this.app.name}.sync.${channel}`, callback);
}
async sync() {
// TODO
}
}
export class SyncMessageVersionManager {
// TODO
}

View File

@ -10,10 +10,11 @@
import { describe } from 'vitest';
import ws from 'ws';
export { mockDatabase, MockDatabase } from '@nocobase/database';
export { MockDatabase, mockDatabase } from '@nocobase/database';
export { default as supertest } from 'supertest';
export * from './memory-pub-sub-adapter';
export * from './mock-isolated-cluster';
export * from './mock-server';
export * from './mock-cluster';
export const pgOnly: () => any = () => (process.env.DB_DIALECT == 'postgres' ? describe : describe.skip);
export const isPg = () => process.env.DB_DIALECT == 'postgres';

View File

@ -0,0 +1,78 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import { IPubSubAdapter } from '@nocobase/server';
import { AsyncEmitter, applyMixins, uid } from '@nocobase/utils';
import { EventEmitter } from 'events';
const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
class TestEventEmitter extends EventEmitter {
declare emitAsync: (event: string | symbol, ...args: any[]) => Promise<boolean>;
}
applyMixins(TestEventEmitter, [AsyncEmitter]);
export class MemoryPubSubAdapter implements IPubSubAdapter {
protected emitter: TestEventEmitter;
connected = false;
static instances = new Map<string, MemoryPubSubAdapter>();
static create(name?: string, options?: any) {
if (!name) {
name = uid();
}
if (!this.instances.has(name)) {
this.instances.set(name, new MemoryPubSubAdapter(options));
}
return this.instances.get(name);
}
constructor(protected options: any = {}) {
this.emitter = new TestEventEmitter();
}
async connect() {
this.connected = true;
}
async close() {
this.connected = false;
}
async isConnected() {
return this.connected;
}
async subscribe(channel, callback) {
this.emitter.on(channel, callback);
}
async unsubscribe(channel, callback) {
this.emitter.off(channel, callback);
}
async publish(channel, message) {
console.log(this.connected, { channel, message });
if (!this.connected) {
return;
}
await this.emitter.emitAsync(channel, message);
await this.emitter.emitAsync('__publish__', channel, message);
// 用于处理延迟问题
if (this.options.debounce) {
await sleep(Number(this.options.debounce));
}
}
async subscribeAll(callback) {
this.emitter.on('__publish__', callback);
}
}

View File

@ -0,0 +1,25 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import { CollectionManager, DataSource } from '@nocobase/data-source-manager';
import { waitSecond } from '@nocobase/test';
export class MockDataSource extends DataSource {
static testConnection(options?: any): Promise<boolean> {
return Promise.resolve(true);
}
async load(): Promise<void> {
await waitSecond(1000);
}
createCollectionManager(options?: any): any {
return new CollectionManager(options);
}
}

View File

@ -14,19 +14,19 @@ import { getPortPromise } from 'portfinder';
import { uid } from '@nocobase/utils';
import { createMockServer } from './mock-server';
type ClusterOptions = {
type IsolatedClusterOptions = {
script?: string;
env?: Record<string, any>;
plugins?: string[];
instances?: number;
};
export class MockCluster {
export class MockIsolatedCluster {
private script = `${process.env.APP_PACKAGE_ROOT}/src/index.ts`;
private processes = [];
private mockApp;
constructor(private options: ClusterOptions = {}) {
constructor(private options: IsolatedClusterOptions = {}) {
if (options.script) {
this.script = options.script;
}

View File

@ -9,9 +9,12 @@
import { mockDatabase } from '@nocobase/database';
import { Application, ApplicationOptions, AppSupervisor, Gateway, PluginManager } from '@nocobase/server';
import { uid } from '@nocobase/utils';
import jwt from 'jsonwebtoken';
import qs from 'qs';
import supertest, { SuperAgentTest } from 'supertest';
import { MemoryPubSubAdapter } from './memory-pub-sub-adapter';
import { MockDataSource } from './mock-data-source';
interface ActionParams {
filterByTk?: any;
@ -74,6 +77,10 @@ interface ExtendedAgent extends SuperAgentTest {
}
export class MockServer extends Application {
registerMockDataSource() {
this.dataSourceManager.factory.register('mock', MockDataSource);
}
async loadAndInstall(options: any = {}) {
await this.load({ method: 'install' });
@ -228,10 +235,25 @@ export function mockServer(options: ApplicationOptions = {}) {
PluginManager.findPackagePatched = true;
}
const app = new MockServer({
const mockServerOptions = {
acl: false,
syncMessageManager: {
debounce: 500,
},
...options,
});
};
const app = new MockServer(mockServerOptions);
const basename = app.options.pubSubManager?.channelPrefix;
if (basename) {
app.pubSubManager.setAdapter(
MemoryPubSubAdapter.create(basename, {
debounce: 500,
}),
);
}
return app;
}
@ -244,16 +266,68 @@ export async function startMockServer(options: ApplicationOptions = {}) {
type BeforeInstallFn = (app) => Promise<void>;
export async function createMockServer(
options: ApplicationOptions & {
export type MockServerOptions = ApplicationOptions & {
version?: string;
beforeInstall?: BeforeInstallFn;
skipInstall?: boolean;
skipStart?: boolean;
} = {},
) {
};
export type MockClusterOptions = MockServerOptions & {
number?: number;
clusterName?: string;
appName?: string;
};
export type MockCluster = {
nodes: MockServer[];
destroy: () => Promise<void>;
};
export async function createMockCluster({
number = 2,
clusterName = `cluster_${uid()}`,
appName = `app_${uid()}`,
...options
}: MockClusterOptions = {}): Promise<MockCluster> {
const nodes: MockServer[] = [];
let dbOptions;
for (let i = 0; i < number; i++) {
if (dbOptions) {
options['database'] = {
...dbOptions,
};
}
const app: MockServer = await createMockServer({
...options,
skipSupervisor: true,
name: clusterName + '_' + appName,
pubSubManager: {
channelPrefix: clusterName,
},
});
if (!dbOptions) {
dbOptions = app.db.options;
}
nodes.push(app);
}
return {
nodes,
async destroy() {
for (const node of nodes) {
await node.destroy();
}
},
};
}
export async function createMockServer(options: MockServerOptions = {}) {
const { version, beforeInstall, skipInstall, skipStart, ...others } = options;
const app: any = mockServer(others);
const app: MockServer = mockServer(others);
if (!skipInstall) {
if (beforeInstall) {
await beforeInstall(app);

View File

@ -17,7 +17,7 @@ export class RoleResourceModel extends Model {
role.revokeResource(resourceName);
}
async writeToACL(options: { acl: ACL; transaction: any }) {
async writeToACL(options: { acl: ACL; transaction?: any }) {
const { acl } = options;
const resourceName = this.get('name') as string;
const roleName = this.get('roleName') as string;

View File

@ -0,0 +1,235 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import { createMockCluster, sleep } from '@nocobase/test';
describe('cluster', () => {
let cluster;
beforeEach(async () => {
cluster = await createMockCluster({
plugins: ['error-handler', 'data-source-main', 'ui-schema-storage'],
acl: false,
});
});
afterEach(async () => {
await cluster.destroy();
});
describe('sync collection', () => {
test('create collection', async () => {
const [app1, app2] = cluster.nodes;
await app1.db.getRepository('collections').create({
values: {
name: 'tests',
fields: [
{
name: 'name',
type: 'string',
},
],
},
context: {},
});
await sleep(2000);
const testsCollection = app2.db.getCollection('tests');
expect(testsCollection).toBeTruthy();
});
test('update collection', async () => {
const [app1, app2] = cluster.nodes;
await app1.db.getRepository('collections').create({
values: {
name: 'tests',
fields: [
{
name: 'name',
type: 'string',
},
],
description: 'test collection',
},
context: {},
});
await sleep(2000);
const testsCollection = app2.db.getCollection('tests');
expect(testsCollection).toBeTruthy();
await app1.db.getRepository('collections').update({
filterByTk: 'tests',
values: {
description: 'new test collection',
},
context: {},
});
await sleep(2000);
expect(testsCollection.options.description).toBe('new test collection');
});
test('destroy collection', async () => {
const [app1, app2] = cluster.nodes;
await app1.db.getRepository('collections').create({
values: {
name: 'tests',
fields: [
{
name: 'name',
type: 'string',
},
],
},
context: {},
});
await sleep(2000);
const testsCollection = app2.db.getCollection('tests');
expect(testsCollection).toBeTruthy();
await app1.db.getRepository('collections').destroy({
filterByTk: 'tests',
context: {},
});
await sleep(2000);
expect(app2.db.getCollection('tests')).toBeFalsy();
});
});
describe('sync fields', () => {
test('create field', async () => {
const [app1, app2] = cluster.nodes;
await app1.db.getRepository('collections').create({
values: {
name: 'tests',
fields: [
{
name: 'name',
type: 'string',
},
],
},
context: {},
});
await sleep(2000);
const testsCollection = app2.db.getCollection('tests');
expect(testsCollection).toBeTruthy();
await app1.db.getRepository('fields').create({
values: {
name: 'age',
type: 'integer',
collectionName: 'tests',
},
context: {},
});
await sleep(2000);
const ageField = testsCollection.getField('age');
expect(ageField).toBeTruthy();
});
test('update field', async () => {
const [app1, app2] = cluster.nodes;
await app1.db.getRepository('collections').create({
values: {
name: 'tests',
fields: [
{
name: 'name',
type: 'string',
},
],
},
context: {},
});
await sleep(2000);
const testsCollection = app2.db.getCollection('tests');
expect(testsCollection).toBeTruthy();
await app1.db.getRepository('fields').create({
values: {
name: 'age',
type: 'integer',
collectionName: 'tests',
},
context: {},
});
await sleep(2000);
await app1.db.getRepository('collections.fields', 'tests').update({
filterByTk: 'age',
values: {
description: 'age field',
},
context: {},
});
await sleep(2000);
const ageField = testsCollection.getField('age');
expect(ageField).toBeTruthy();
expect(ageField.options.description).toBe('age field');
});
test('destroy field', async () => {
const [app1, app2] = cluster.nodes;
await app1.db.getRepository('collections').create({
values: {
name: 'tests',
fields: [
{
name: 'name',
type: 'string',
},
{
name: 'age',
type: 'integer',
},
],
},
context: {},
});
await sleep(2000);
const testsCollection = app2.db.getCollection('tests');
expect(testsCollection).toBeTruthy();
await app1.db.getRepository('collections.fields', 'tests').destroy({
filterByTk: 'age',
context: {},
});
await sleep(2000);
expect(testsCollection.getField('age')).toBeFalsy();
});
});
});

View File

@ -41,9 +41,10 @@ export class PluginDataSourceMainServer extends Plugin {
this.loadFilter = filter;
}
async onSync(message) {
async handleSyncMessage(message) {
const { type, collectionName } = message;
if (type === 'newCollection') {
if (type === 'syncCollection') {
const collectionModel: CollectionModel = await this.app.db.getCollection('collections').repository.findOne({
filter: {
name: collectionName,
@ -52,6 +53,26 @@ export class PluginDataSourceMainServer extends Plugin {
await collectionModel.load();
}
if (type === 'removeField') {
const { collectionName, fieldName } = message;
const collection = this.app.db.getCollection(collectionName);
if (!collection) {
return;
}
return collection.removeFieldFromDb(fieldName);
}
if (type === 'removeCollection') {
const { collectionName } = message;
const collection = this.app.db.getCollection(collectionName);
if (!collection) {
return;
}
collection.remove();
}
}
async beforeLoad() {
@ -92,10 +113,15 @@ export class PluginDataSourceMainServer extends Plugin {
transaction,
});
this.app.syncManager.publish(this.name, {
type: 'newCollection',
this.sendSyncMessage(
{
type: 'syncCollection',
collectionName: model.get('name'),
});
},
{
transaction,
},
);
}
},
);
@ -113,6 +139,16 @@ export class PluginDataSourceMainServer extends Plugin {
}
await model.remove(removeOptions);
this.sendSyncMessage(
{
type: 'removeCollection',
collectionName: model.get('name'),
},
{
transaction: options.transaction,
},
);
});
// 要在 beforeInitOptions 之前处理
@ -262,6 +298,16 @@ export class PluginDataSourceMainServer extends Plugin {
};
await collection.sync(syncOptions);
this.sendSyncMessage(
{
type: 'syncCollection',
collectionName: model.get('collectionName'),
},
{
transaction,
},
);
}
});
@ -273,6 +319,17 @@ export class PluginDataSourceMainServer extends Plugin {
this.app.db.on('fields.beforeDestroy', async (model: FieldModel, options) => {
await mutex.runExclusive(async () => {
await model.remove(options);
this.sendSyncMessage(
{
type: 'removeField',
collectionName: model.get('collectionName'),
fieldName: model.get('name'),
},
{
transaction: options.transaction,
},
);
});
});

View File

@ -0,0 +1,49 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import { createMockCluster, waitSecond } from '@nocobase/test';
describe('cluster', () => {
let cluster;
beforeEach(async () => {
cluster = await createMockCluster({
plugins: ['nocobase'],
acl: false,
});
for (const node of cluster.nodes) {
node.registerMockDataSource();
}
});
afterEach(async () => {
await cluster.destroy();
});
test('create data source', async () => {
const app1 = cluster.nodes[0];
await app1.db.getRepository('dataSources').create({
values: {
key: 'mockInstance1',
type: 'mock',
displayName: 'Mock',
options: {},
},
});
await waitSecond(2000);
const dataSource = app1.dataSourceManager.get('mockInstance1');
expect(dataSource).toBeDefined();
const dataSourceInApp2 = cluster.nodes[1].dataSourceManager.get('mockInstance1');
expect(dataSourceInApp2).toBeDefined();
});
});

View File

@ -36,6 +36,96 @@ export class PluginDataSourceManagerServer extends Plugin {
[dataSourceKey: string]: DataSourceState;
} = {};
async handleSyncMessage(message) {
const { type } = message;
if (type === 'syncRole') {
const { roleName, dataSourceKey } = message;
const dataSource = this.app.dataSourceManager.dataSources.get(dataSourceKey);
const dataSourceRole: DataSourcesRolesModel = await this.app.db.getRepository('dataSourcesRoles').findOne({
filter: {
dataSourceKey,
roleName,
},
});
await dataSourceRole.writeToAcl({
acl: dataSource.acl,
});
}
if (type === 'syncRoleResource') {
const { roleName, dataSourceKey, resourceName } = message;
const dataSource = this.app.dataSourceManager.dataSources.get(dataSourceKey);
const dataSourceRoleResource: DataSourcesRolesResourcesModel = await this.app.db
.getRepository('dataSourcesRolesResources')
.findOne({
filter: {
dataSourceKey,
roleName,
name: resourceName,
},
});
await dataSourceRoleResource.writeToACL({
acl: dataSource.acl,
});
}
if (type === 'loadDataSource') {
const { dataSourceKey } = message;
const dataSourceModel = await this.app.db.getRepository('dataSources').findOne({
filter: {
key: dataSourceKey,
},
});
if (!dataSourceModel) {
return;
}
await dataSourceModel.loadIntoApplication({
app: this.app,
});
}
if (type === 'loadDataSourceField') {
const { key } = message;
const fieldModel = await this.app.db.getRepository('dataSourcesFields').findOne({
filter: {
key,
},
});
fieldModel.load({
app: this.app,
});
}
if (type === 'removeDataSourceCollection') {
const { dataSourceKey, collectionName } = message;
const dataSource = this.app.dataSourceManager.dataSources.get(dataSourceKey);
dataSource.collectionManager.removeCollection(collectionName);
}
if (type === 'removeDataSourceField') {
const { key } = message;
const fieldModel = await this.app.db.getRepository('dataSourcesFields').findOne({
filter: {
key,
},
});
fieldModel.unload({
app: this.app,
});
}
if (type === 'removeDataSource') {
const { dataSourceKey } = message;
this.app.dataSourceManager.dataSources.delete(dataSourceKey);
}
}
async beforeLoad() {
this.app.db.registerModels({
DataSourcesCollectionModel,
@ -100,6 +190,16 @@ export class PluginDataSourceManagerServer extends Plugin {
model.loadIntoApplication({
app: this.app,
});
this.sendSyncMessage(
{
type: 'loadDataSource',
dataSourceKey: model.get('key'),
},
{
transaction: options.transaction,
},
);
}
});
@ -264,6 +364,7 @@ export class PluginDataSourceManagerServer extends Plugin {
}
});
const self = this;
this.app.actions({
async ['dataSources:listEnabled'](ctx, next) {
const dataSources = await ctx.db.getRepository('dataSources').find({
@ -302,6 +403,7 @@ export class PluginDataSourceManagerServer extends Plugin {
async ['dataSources:refresh'](ctx, next) {
const { filterByTk, clientStatus } = ctx.action.params;
const dataSourceModel: DataSourceModel = await ctx.db.getRepository('dataSources').findOne({
filter: {
key: filterByTk,
@ -317,6 +419,11 @@ export class PluginDataSourceManagerServer extends Plugin {
dataSourceModel.loadIntoApplication({
app: ctx.app,
});
ctx.app.syncMessageManager.publish(self.name, {
type: 'loadDataSource',
dataSourceKey: dataSourceModel.get('key'),
});
}
ctx.body = {
@ -352,21 +459,52 @@ export class PluginDataSourceManagerServer extends Plugin {
}
});
this.app.db.on('dataSourcesCollections.afterDestroy', async (model: DataSourcesCollectionModel) => {
this.app.db.on('dataSourcesCollections.afterDestroy', async (model: DataSourcesCollectionModel, options) => {
const dataSource = this.app.dataSourceManager.dataSources.get(model.get('dataSourceKey'));
dataSource.collectionManager.removeCollection(model.get('name'));
this.sendSyncMessage(
{
type: 'removeDataSourceCollection',
dataSourceKey: model.get('dataSourceKey'),
collectionName: model.get('name'),
},
{
transaction: options.transaction,
},
);
});
this.app.db.on('dataSourcesFields.afterSaveWithAssociations', async (model: DataSourcesFieldModel) => {
this.app.db.on('dataSourcesFields.afterSaveWithAssociations', async (model: DataSourcesFieldModel, options) => {
model.load({
app: this.app,
});
this.sendSyncMessage(
{
type: 'loadDataSourceField',
key: model.get('key'),
},
{
transaction: options.transaction,
},
);
});
this.app.db.on('dataSourcesFields.afterDestroy', async (model: DataSourcesFieldModel) => {
this.app.db.on('dataSourcesFields.afterDestroy', async (model: DataSourcesFieldModel, options) => {
model.unload({
app: this.app,
});
this.sendSyncMessage(
{
type: 'removeDataSourceField',
key: model.get('key'),
},
{
transaction: options.transaction,
},
);
});
this.app.db.on(
@ -379,8 +517,18 @@ export class PluginDataSourceManagerServer extends Plugin {
},
);
this.app.db.on('dataSources.afterDestroy', async (model: DataSourceModel) => {
this.app.db.on('dataSources.afterDestroy', async (model: DataSourceModel, options) => {
this.app.dataSourceManager.dataSources.delete(model.get('key'));
this.sendSyncMessage(
{
type: 'removeDataSource',
dataSourceKey: model.get('key'),
},
{
transaction: options.transaction,
},
);
});
this.app.on('afterStart', async (app: Application) => {
@ -412,6 +560,19 @@ export class PluginDataSourceManagerServer extends Plugin {
acl: dataSource.acl,
transaction: transaction,
});
// sync roles resources between nodes
this.sendSyncMessage(
{
type: 'syncRoleResource',
roleName: model.get('roleName'),
dataSourceKey: model.get('dataSourceKey'),
resourceName: model.get('name'),
},
{
transaction,
},
);
},
);
@ -429,6 +590,18 @@ export class PluginDataSourceManagerServer extends Plugin {
acl: dataSource.acl,
transaction: transaction,
});
this.sendSyncMessage(
{
type: 'syncRoleResource',
roleName: resource.get('roleName'),
dataSourceKey: resource.get('dataSourceKey'),
resourceName: resource.get('name'),
},
{
transaction,
},
);
},
);
@ -440,6 +613,18 @@ export class PluginDataSourceManagerServer extends Plugin {
if (role) {
role.revokeResource(model.get('name'));
}
this.sendSyncMessage(
{
type: 'syncRoleResource',
roleName,
dataSourceKey: model.get('dataSourceKey'),
resourceName: model.get('name'),
},
{
transaction: options.transaction,
},
);
});
this.app.db.on('dataSourcesRoles.afterSave', async (model: DataSourcesRolesModel, options) => {
@ -462,6 +647,18 @@ export class PluginDataSourceManagerServer extends Plugin {
hooks: false,
transaction,
});
// sync role between nodes
this.sendSyncMessage(
{
type: 'syncRole',
roleName: model.get('roleName'),
dataSourceKey: model.get('dataSourceKey'),
},
{
transaction,
},
);
});
this.app.on('acl:writeResources', async ({ roleName, transaction }) => {

View File

@ -0,0 +1,51 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import { createMockCluster, sleep } from '@nocobase/test';
import Plugin from '..';
describe('file-manager > cluster', () => {
let cluster;
beforeEach(async () => {
cluster = await createMockCluster({
plugins: ['file-manager'],
});
});
afterEach(() => cluster.destroy());
describe('sync message', () => {
it('storage cache should be sync between every nodes', async () => {
const [app1, app2] = cluster.nodes;
const StorageRepo = app1.db.getRepository('storages');
const s1 = await StorageRepo.findOne({
filter: {
default: true,
},
});
const p1 = app1.pm.get(Plugin) as Plugin;
const p2 = app2.pm.get(Plugin) as Plugin;
expect(p1.storagesCache.get(s1.id)).toEqual(s1.toJSON());
expect(p2.storagesCache.get(s1.id)).toEqual(s1.toJSON());
await s1.update({
path: 'a',
});
expect(p1.storagesCache.get(s1.id).path).toEqual('a');
await sleep(550);
expect(p2.storagesCache.get(s1.id).path).toEqual('a');
});
});
});

View File

@ -12,15 +12,15 @@ import { resolve } from 'path';
import { Plugin } from '@nocobase/server';
import { Registry } from '@nocobase/utils';
import { STORAGE_TYPE_ALI_OSS, STORAGE_TYPE_LOCAL, STORAGE_TYPE_S3, STORAGE_TYPE_TX_COS } from '../constants';
import { FileModel } from './FileModel';
import initActions from './actions';
import { AttachmentInterface } from './interfaces/attachment-interface';
import { IStorage, StorageModel } from './storages';
import { STORAGE_TYPE_ALI_OSS, STORAGE_TYPE_LOCAL, STORAGE_TYPE_S3, STORAGE_TYPE_TX_COS } from '../constants';
import StorageTypeLocal from './storages/local';
import StorageTypeAliOss from './storages/ali-oss';
import StorageTypeLocal from './storages/local';
import StorageTypeS3 from './storages/s3';
import StorageTypeTxCos from './storages/tx-cos';
import { AttachmentInterface } from './interfaces/attachment-interface';
export type * from './storages';
@ -66,7 +66,7 @@ export default class PluginFileManagerServer extends Plugin {
}
}
async onSync(message) {
async handleSyncMessage(message) {
if (message.type === 'storageChange') {
const storage = await this.db.getRepository('storages').findOne({
filterByTk: message.storageId,
@ -76,7 +76,7 @@ export default class PluginFileManagerServer extends Plugin {
}
}
if (message.type === 'storageRemove') {
const id = Number.parseInt(message.storageId, 10);
const id = message.storageId;
this.storagesCache.delete(id);
}
}
@ -104,19 +104,25 @@ export default class PluginFileManagerServer extends Plugin {
});
const Storage = this.db.getModel('storages');
Storage.afterSave((m) => {
Storage.afterSave((m, { transaction }) => {
this.storagesCache.set(m.id, m.toJSON());
this.sync({
this.sendSyncMessage(
{
type: 'storageChange',
storageId: `${m.id}`,
storageId: m.id,
},
{ transaction },
);
});
});
Storage.afterDestroy((m) => {
Storage.afterDestroy((m, { transaction }) => {
this.storagesCache.delete(m.id);
this.sync({
this.sendSyncMessage(
{
type: 'storageRemove',
storageId: `${m.id}`,
});
storageId: m.id,
},
{ transaction },
);
});
this.app.acl.registerSnippet({

View File

@ -46,36 +46,34 @@ const app = mockApp({
'mobileRoutes:list': {
data: [
{
"id": 10,
"createdAt": "2024-07-08T13:22:33.763Z",
"updatedAt": "2024-07-08T13:22:33.763Z",
"parentId": null,
"title": "Test1",
"icon": "AppstoreOutlined",
"schemaUid": "test",
"type": "page",
"options": null,
"sort": 1,
"createdById": 1,
"updatedById": 1,
id: 10,
createdAt: '2024-07-08T13:22:33.763Z',
updatedAt: '2024-07-08T13:22:33.763Z',
parentId: null,
title: 'Test1',
icon: 'AppstoreOutlined',
schemaUid: 'test',
type: 'page',
options: null,
sort: 1,
createdById: 1,
updatedById: 1,
},
{
"id": 13,
"createdAt": "2024-07-08T13:23:01.929Z",
"updatedAt": "2024-07-08T13:23:12.433Z",
"parentId": null,
"title": "Test2",
"icon": "aliwangwangoutlined",
"schemaUid": null,
"type": "link",
"options": {
"schemaUid": null,
"url": "https://github.com",
"params": [
{}
]
}
}
id: 13,
createdAt: '2024-07-08T13:23:01.929Z',
updatedAt: '2024-07-08T13:23:12.433Z',
parentId: null,
title: 'Test2',
icon: 'aliwangwangoutlined',
schemaUid: null,
type: 'link',
options: {
schemaUid: null,
url: 'https://github.com',
params: [{}],
},
},
],
},
},

View File

@ -15,8 +15,8 @@ const Demo = () => {
title: 'Link',
icon: 'AppstoreOutlined',
options: {
url: 'https://github.com'
}
url: 'https://github.com',
},
}),
)}
/>

View File

@ -16,8 +16,8 @@ const schema = getMobileTabBarItemSchema({
title: 'Link',
icon: 'AppstoreOutlined',
options: {
url: 'https://github.com'
}
url: 'https://github.com',
},
});
const Demo = () => {

View File

@ -14,13 +14,7 @@ const schema = getMobileTabBarItemSchema({
});
const Demo = () => {
return (
<SchemaComponent
schema={schemaViewer(
schema,
)}
/>
);
return <SchemaComponent schema={schemaViewer(schema)} />;
};
class MyPlugin extends Plugin {

View File

@ -18,5 +18,5 @@ export function getMobileTabBarItemSchema(routeItem: MobileRouteItem) {
schemaUid: routeItem.schemaUid,
...(routeItem.options || {}),
},
}
};
}

View File

@ -143,6 +143,34 @@ export class PluginMultiAppManagerServer extends Plugin {
return lodash.cloneDeep(lodash.omit(oldConfig, ['migrator']));
}
async handleSyncMessage(message) {
const { type } = message;
if (type === 'startApp') {
const { appName } = message;
const model = await this.app.db.getRepository('applications').findOne({
filter: {
name: appName,
},
});
if (!model) {
return;
}
const subApp = model.registerToSupervisor(this.app, {
appOptionsFactory: this.appOptionsFactory,
});
subApp.runCommand('start', '--quickstart');
}
if (type === 'removeApp') {
const { appName } = message;
await AppSupervisor.getInstance().removeApp(appName);
}
}
setSubAppUpgradeHandler(handler: SubAppUpgradeHandler) {
this.subAppUpgradeHandler = handler;
}
@ -186,6 +214,16 @@ export class PluginMultiAppManagerServer extends Plugin {
context: options.context,
});
this.sendSyncMessage(
{
type: 'startApp',
appName: name,
},
{
transaction,
},
);
const startPromise = subApp.runCommand('start', '--quickstart');
if (options?.context?.waitSubAppInstall) {
@ -194,8 +232,18 @@ export class PluginMultiAppManagerServer extends Plugin {
},
);
this.db.on('applications.afterDestroy', async (model: ApplicationModel) => {
this.db.on('applications.afterDestroy', async (model: ApplicationModel, options) => {
await AppSupervisor.getInstance().removeApp(model.get('name') as string);
this.sendSyncMessage(
{
type: 'removeApp',
appName: model.get('name'),
},
{
transaction: options.transaction,
},
);
});
const self = this;

View File

@ -10,42 +10,38 @@
import path from 'path';
import { ApplicationOptions, Plugin } from '@nocobase/server';
import { MockServer, createMockServer, mockDatabase } from '@nocobase/test';
import { MockClusterOptions, MockServer, createMockCluster, createMockServer, mockDatabase } from '@nocobase/test';
import functions from './functions';
import triggers from './triggers';
import instructions from './instructions';
import { SequelizeDataSource } from '@nocobase/data-source-manager';
import { uid } from '@nocobase/utils';
export { sleep } from '@nocobase/test';
export interface MockServerOptions extends ApplicationOptions {
interface WorkflowMockServerOptions extends ApplicationOptions {
collectionsPath?: string;
}
// async function createMockServer(options: MockServerOptions) {
// const app = mockServer(options);
// await app.cleanDb();
// await app.runCommand('start', '--quickstart');
// return app;
// }
export function sleep(ms: number) {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
interface WorkflowMockClusterOptions extends MockClusterOptions {
collectionsPath?: string;
}
export async function getApp(options: MockServerOptions = {}): Promise<MockServer> {
const { plugins = [], collectionsPath, ...others } = options;
class TestCollectionPlugin extends Plugin {
class TestCollectionPlugin extends Plugin {
async load() {
if (collectionsPath) {
await this.db.import({ directory: collectionsPath });
}
if (this.options.collectionsPath) {
await this.db.import({ directory: this.options.collectionsPath });
}
}
}
export async function getApp({
plugins = [],
collectionsPath,
...options
}: WorkflowMockServerOptions = {}): Promise<MockServer> {
const app = await createMockServer({
...others,
...options,
plugins: [
[
'workflow',
@ -56,7 +52,7 @@ export async function getApp(options: MockServerOptions = {}): Promise<MockServe
},
],
'workflow-test',
TestCollectionPlugin,
[TestCollectionPlugin, { collectionsPath }],
...plugins,
],
});
@ -86,6 +82,25 @@ export async function getApp(options: MockServerOptions = {}): Promise<MockServe
return app;
}
export async function getCluster({ plugins = [], collectionsPath, ...options }: WorkflowMockClusterOptions) {
return createMockCluster({
...options,
plugins: [
[
'workflow',
{
triggers,
instructions,
functions,
},
],
'workflow-test',
[TestCollectionPlugin, { collectionsPath }],
...plugins,
],
});
}
export default class WorkflowTestPlugin extends Plugin {
async load() {
await this.importCollections(path.resolve(__dirname, 'collections'));

View File

@ -12,7 +12,7 @@ import { randomUUID } from 'crypto';
import LRUCache from 'lru-cache';
import { Op, Transactionable } from '@nocobase/database';
import { Op, Transaction, Transactionable } from '@nocobase/database';
import { Plugin } from '@nocobase/server';
import { Registry } from '@nocobase/utils';
@ -64,7 +64,7 @@ export default class PluginWorkflowServer extends Plugin {
private meter = null;
private checker: NodeJS.Timeout = null;
private onBeforeSave = async (instance: WorkflowModel, options) => {
private onBeforeSave = async (instance: WorkflowModel, { transaction }) => {
const Model = <typeof WorkflowModel>instance.constructor;
if (instance.enabled) {
@ -74,7 +74,7 @@ export default class PluginWorkflowServer extends Plugin {
where: {
key: instance.key,
},
transaction: options.transaction,
transaction,
});
if (!count) {
instance.set('current', true);
@ -93,7 +93,7 @@ export default class PluginWorkflowServer extends Plugin {
[Op.ne]: instance.id,
},
},
transaction: options.transaction,
transaction,
});
if (previous) {
@ -101,35 +101,33 @@ export default class PluginWorkflowServer extends Plugin {
await previous.update(
{ enabled: false, current: null },
{
transaction: options.transaction,
transaction,
hooks: false,
},
);
this.toggle(previous, false);
this.toggle(previous, false, { transaction });
}
};
async onSync(message) {
async handleSyncMessage(message) {
if (message.type === 'statusChange') {
const workflowId = Number.parseInt(message.workflowId, 10);
const enabled = Number.parseInt(message.enabled, 10);
if (enabled) {
let workflow = this.enabledCache.get(workflowId);
if (message.enabled) {
let workflow = this.enabledCache.get(message.workflowId);
if (workflow) {
await workflow.reload();
} else {
workflow = await this.db.getRepository('workflows').findOne({
filterByTk: workflowId,
filterByTk: message.workflowId,
});
}
if (workflow) {
this.toggle(workflow, true, true);
this.toggle(workflow, true, { silent: true });
}
} else {
const workflow = this.enabledCache.get(workflowId);
const workflow = this.enabledCache.get(message.workflowId);
if (workflow) {
this.toggle(workflow, false, true);
this.toggle(workflow, false, { silent: true });
}
}
}
@ -270,13 +268,17 @@ export default class PluginWorkflowServer extends Plugin {
});
db.on('workflows.beforeSave', this.onBeforeSave);
db.on('workflows.afterCreate', (model: WorkflowModel) => {
db.on('workflows.afterCreate', (model: WorkflowModel, { transaction }) => {
if (model.enabled) {
this.toggle(model);
this.toggle(model, true, { transaction });
}
});
db.on('workflows.afterUpdate', (model: WorkflowModel) => this.toggle(model));
db.on('workflows.beforeDestroy', (model: WorkflowModel) => this.toggle(model, false));
db.on('workflows.afterUpdate', (model: WorkflowModel, { transaction }) =>
this.toggle(model, model.enabled, { transaction }),
);
db.on('workflows.afterDestroy', (model: WorkflowModel, { transaction }) =>
this.toggle(model, false, { transaction }),
);
// [Life Cycle]:
// * load all workflows in db
@ -292,7 +294,7 @@ export default class PluginWorkflowServer extends Plugin {
});
workflows.forEach((workflow: WorkflowModel) => {
this.toggle(workflow);
this.toggle(workflow, true, { silent: true });
});
this.checker = setInterval(() => {
@ -305,7 +307,7 @@ export default class PluginWorkflowServer extends Plugin {
this.app.on('beforeStop', async () => {
for (const workflow of this.enabledCache.values()) {
this.toggle(workflow, false);
this.toggle(workflow, false, { silent: true });
}
this.ready = false;
@ -322,7 +324,11 @@ export default class PluginWorkflowServer extends Plugin {
});
}
private toggle(workflow: WorkflowModel, enable?: boolean, silent = false) {
private toggle(
workflow: WorkflowModel,
enable?: boolean,
{ silent, transaction }: { silent?: boolean } & Transactionable = {},
) {
const type = workflow.get('type');
const trigger = this.triggers.get(type);
if (!trigger) {
@ -343,11 +349,14 @@ export default class PluginWorkflowServer extends Plugin {
this.enabledCache.delete(workflow.id);
}
if (!silent) {
this.sync({
this.sendSyncMessage(
{
type: 'statusChange',
workflowId: `${workflow.id}`,
enabled: `${Number(next)}`,
});
workflowId: workflow.id,
enabled: next,
},
{ transaction },
);
}
}
@ -357,6 +366,10 @@ export default class PluginWorkflowServer extends Plugin {
options: EventOptions = {},
): void | Promise<Processor | null> {
const logger = this.getLogger(workflow.id);
if (!workflow.enabled) {
logger.warn(`workflow ${workflow.id} is not enabled, event will be ignored`);
return;
}
if (!this.ready) {
logger.warn(`app is not ready, event of workflow ${workflow.id} will be ignored`);
logger.debug(`ignored event data:`, context);

View File

@ -510,7 +510,7 @@ describe('workflow > Plugin', () => {
});
});
describe('sync', () => {
describe('sync trigger', () => {
it('sync on trigger class', async () => {
const w1 = await WorkflowModel.create({
enabled: true,

View File

@ -0,0 +1,72 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import { sleep } from '@nocobase/test';
import { getCluster } from '@nocobase/plugin-workflow-test';
import Plugin, { Processor } from '..';
import { EXECUTION_STATUS } from '../constants';
describe('workflow > cluster', () => {
let cluster;
beforeEach(async () => {
cluster = await getCluster({
number: 3,
});
});
afterEach(() => cluster.destroy());
describe('sync message', () => {
it('enabled status of workflow should be sync in every nodes', async () => {
const [app1, app2, app3] = cluster.nodes;
const WorkflowRepo = app1.db.getRepository('workflows');
const w1 = await WorkflowRepo.create({
values: {
type: 'syncTrigger',
enabled: true,
},
});
const p1 = app1.pm.get(Plugin) as Plugin;
const pro1 = (await p1.trigger(w1, {})) as Processor;
expect(pro1.execution.status).toBe(EXECUTION_STATUS.RESOLVED);
await sleep(550);
const p2 = app2.pm.get(Plugin) as Plugin;
const w2 = p2.enabledCache.get(w1.id);
expect(w2).toBeDefined();
const pro2 = (await p2.trigger(w2, {})) as Processor;
expect(pro2.execution.status).toBe(EXECUTION_STATUS.RESOLVED);
const p3 = app3.pm.get(Plugin) as Plugin;
const w3 = p3.enabledCache.get(w1.id);
expect(w3).toBeDefined();
const pro3 = (await p3.trigger(w3, {})) as Processor;
expect(pro3.execution.status).toBe(EXECUTION_STATUS.RESOLVED);
const executions = await w1.getExecutions();
expect(executions.length).toBe(3);
await w1.update({
enabled: false,
});
await sleep(550);
expect(p2.enabledCache.get(w1.id)).toBeUndefined();
expect(p3.enabledCache.get(w1.id)).toBeUndefined();
});
});
});