From 27b45e8dae40d856d4804a86c70ff6d980d71d98 Mon Sep 17 00:00:00 2001 From: ChengLei Shao Date: Thu, 16 Jan 2025 19:45:43 +0800 Subject: [PATCH] chore: ws api (#6080) * chore: ws api * fix: ws error * fix: send ws message response --- packages/core/server/src/gateway/index.ts | 56 ---------- packages/core/server/src/gateway/ws-server.ts | 103 +++++++++++++----- .../src/server/plugin.ts | 7 +- .../plugin-auth/src/server/plugin.ts | 1 + 4 files changed, 81 insertions(+), 86 deletions(-) diff --git a/packages/core/server/src/gateway/index.ts b/packages/core/server/src/gateway/index.ts index 66524a2242..8d47ae37c5 100644 --- a/packages/core/server/src/gateway/index.ts +++ b/packages/core/server/src/gateway/index.ts @@ -433,62 +433,6 @@ export class Gateway extends EventEmitter { this.wsServer = new WSServer(); - this.wsServer.on('message', async ({ client, message }) => { - const app = await AppSupervisor.getInstance().getApp(client.app); - - if (!app) { - return; - } - - const parsedMessage = JSON.parse(message.toString()); - - if (!parsedMessage.type) { - return; - } - - if (!app.listenerCount(`ws:setTag`)) { - app.on('ws:setTag', ({ clientId, tagKey, tagValue }) => { - this.wsServer.setClientTag(clientId, tagKey, tagValue); - }); - - app.on('ws:removeTag', ({ clientId, tagKey }) => { - this.wsServer.removeClientTag(clientId, tagKey); - }); - - app.on('ws:sendToTag', ({ tagKey, tagValue, message }) => { - this.wsServer.sendToConnectionsByTags( - [ - { tagName: tagKey, tagValue }, - { tagName: 'app', tagValue: app.name }, - ], - message, - ); - }); - - app.on('ws:sendToTags', ({ tags, message }) => { - this.wsServer.sendToConnectionsByTags(tags, message); - }); - - app.on('ws:authorized', ({ clientId, userId }) => { - this.wsServer.sendToConnectionsByTags( - [ - { tagName: 'userId', tagValue: userId }, - { tagName: 'app', tagValue: app.name }, - ], - { type: 'authorized' }, - ); - }); - } - - const eventName = `ws:message:${parsedMessage.type}`; - - app.emit(eventName, { - clientId: client.id, - tags: [...client.tags], - payload: parsedMessage.payload, - }); - }); - this.server.on('upgrade', (request, socket, head) => { const { pathname } = parse(request.url); diff --git a/packages/core/server/src/gateway/ws-server.ts b/packages/core/server/src/gateway/ws-server.ts index c435a25d16..bddef1bc21 100644 --- a/packages/core/server/src/gateway/ws-server.ts +++ b/packages/core/server/src/gateway/ws-server.ts @@ -16,6 +16,7 @@ import { applyErrorWithArgs, getErrorWithCode } from './errors'; import lodash from 'lodash'; import { Logger } from '@nocobase/logger'; import EventEmitter from 'events'; +import { parse } from 'url'; declare class WebSocketWithId extends WebSocket { id: string; @@ -135,6 +136,72 @@ export class WSServer extends EventEmitter { }, }); }); + + AppSupervisor.getInstance().on('afterAppAdded', (app) => { + this.bindAppWSEvents(app); + }); + + this.on('message', async ({ client, message }) => { + const app = await AppSupervisor.getInstance().getApp(client.app); + + if (!app) { + return; + } + + const parsedMessage = JSON.parse(message.toString()); + + if (!parsedMessage.type) { + return; + } + + const eventName = `ws:message:${parsedMessage.type}`; + + app.emit(eventName, { + clientId: client.id, + tags: [...client.tags], + payload: parsedMessage.payload, + }); + }); + } + + bindAppWSEvents(app) { + if (app.listenerCount('ws:setTag') > 0) { + return; + } + + app.on('ws:setTag', ({ clientId, tagKey, tagValue }) => { + this.setClientTag(clientId, tagKey, tagValue); + }); + + app.on('ws:removeTag', ({ clientId, tagKey }) => { + this.removeClientTag(clientId, tagKey); + }); + + app.on('ws:sendToTag', ({ tagKey, tagValue, message }) => { + this.sendToConnectionsByTags( + [ + { tagName: tagKey, tagValue }, + { tagName: 'app', tagValue: app.name }, + ], + message, + ); + }); + + app.on('ws:sendToClient', ({ clientId, message }) => { + this.sendToClient(clientId, message); + }); + + app.on('ws:sendToCurrentApp', ({ message }) => { + this.sendToConnectionsByTag('app', app.name, message); + }); + + app.on('ws:sendToTags', ({ tags, message }) => { + this.sendToConnectionsByTags(tags, message); + }); + + app.on('ws:authorized', ({ clientId, userId }) => { + this.sendToClient(clientId, { type: 'authorized' }); + }); } addNewConnection(ws: WebSocketWithId, request: IncomingMessage) { @@ -155,6 +222,9 @@ export class WSServer extends EventEmitter { setClientTag(clientId: string, tagKey: string, tagValue: string) { const client = this.webSocketClients.get(clientId); + if (!client) { + return; + } client.tags.add(`${tagKey}#${tagValue}`); console.log(`client tags: ${Array.from(client.tags)}`); } @@ -186,32 +256,6 @@ export class WSServer extends EventEmitter { if (!hasApp) { AppSupervisor.getInstance().bootStrapApp(handleAppName); } - - // const appStatus = AppSupervisor.getInstance().getAppStatus(handleAppName, 'initializing'); - - // if (appStatus === 'not_found') { - // this.sendMessageToConnection(client, { - // type: 'maintaining', - // payload: getPayloadByErrorCode('APP_NOT_FOUND', { appName: handleAppName }), - // }); - // return; - // } - - // if (appStatus === 'initializing') { - // this.sendMessageToConnection(client, { - // type: 'maintaining', - // payload: getPayloadByErrorCode('APP_INITIALIZING', { appName: handleAppName }), - // }); - - // return; - // } - - // const app = await AppSupervisor.getInstance().getApp(handleAppName); - // - // this.sendMessageToConnection(client, { - // type: 'maintaining', - // payload: getPayloadByErrorCode(appStatus, { app }), - // }); } removeConnection(id: string) { @@ -242,6 +286,13 @@ export class WSServer extends EventEmitter { }); } + sendToClient(clientId: string, sendMessage: object) { + const client = this.webSocketClients.get(clientId); + if (client) { + this.sendMessageToConnection(client, sendMessage); + } + } + loopThroughConnections(callback: (client: WebSocketClient) => void) { this.webSocketClients.forEach((client) => { callback(client); diff --git a/packages/plugins/@nocobase/plugin-async-task-manager/src/server/plugin.ts b/packages/plugins/@nocobase/plugin-async-task-manager/src/server/plugin.ts index 812ac6c9bd..494d826848 100644 --- a/packages/plugins/@nocobase/plugin-async-task-manager/src/server/plugin.ts +++ b/packages/plugins/@nocobase/plugin-async-task-manager/src/server/plugin.ts @@ -54,7 +54,7 @@ export class PluginAsyncExportServer extends Plugin { const asyncTaskManager = this.app.container.get('AsyncTaskManager'); this.app.on(`ws:message:request:async-tasks:list`, async (message) => { - const { tags } = message; + const { tags, clientId } = message; this.app.logger.info(`Received request for async tasks with tags: ${JSON.stringify(tags)}`); @@ -68,9 +68,8 @@ export class PluginAsyncExportServer extends Plugin { this.app.logger.info(`Found ${tasks.length} tasks for userId: ${userId}`); - this.app.emit('ws:sendToTag', { - tagKey: 'userId', - tagValue: userId, + this.app.emit('ws:sendToClient', { + clientId, message: { type: 'async-tasks', payload: tasks.map((task) => task.toJSON()), diff --git a/packages/plugins/@nocobase/plugin-auth/src/server/plugin.ts b/packages/plugins/@nocobase/plugin-auth/src/server/plugin.ts index 458f1ae730..d1ee058b25 100644 --- a/packages/plugins/@nocobase/plugin-auth/src/server/plugin.ts +++ b/packages/plugins/@nocobase/plugin-auth/src/server/plugin.ts @@ -151,6 +151,7 @@ export class PluginAuthServer extends Plugin { userId: user.id, }); }); + this.app.auditManager.registerActions([ { name: 'auth:signIn',