chore: ws api (#6080)

* chore: ws api

* fix: ws error

* fix: send ws message response
This commit is contained in:
ChengLei Shao 2025-01-16 19:45:43 +08:00 committed by GitHub
parent 87513c0c9c
commit 27b45e8dae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 81 additions and 86 deletions

View File

@ -433,62 +433,6 @@ export class Gateway extends EventEmitter {
this.wsServer = new WSServer(); this.wsServer = new WSServer();
this.wsServer.on('message', async ({ client, message }) => {
const app = await AppSupervisor.getInstance().getApp(client.app);
if (!app) {
return;
}
const parsedMessage = JSON.parse(message.toString());
if (!parsedMessage.type) {
return;
}
if (!app.listenerCount(`ws:setTag`)) {
app.on('ws:setTag', ({ clientId, tagKey, tagValue }) => {
this.wsServer.setClientTag(clientId, tagKey, tagValue);
});
app.on('ws:removeTag', ({ clientId, tagKey }) => {
this.wsServer.removeClientTag(clientId, tagKey);
});
app.on('ws:sendToTag', ({ tagKey, tagValue, message }) => {
this.wsServer.sendToConnectionsByTags(
[
{ tagName: tagKey, tagValue },
{ tagName: 'app', tagValue: app.name },
],
message,
);
});
app.on('ws:sendToTags', ({ tags, message }) => {
this.wsServer.sendToConnectionsByTags(tags, message);
});
app.on('ws:authorized', ({ clientId, userId }) => {
this.wsServer.sendToConnectionsByTags(
[
{ tagName: 'userId', tagValue: userId },
{ tagName: 'app', tagValue: app.name },
],
{ type: 'authorized' },
);
});
}
const eventName = `ws:message:${parsedMessage.type}`;
app.emit(eventName, {
clientId: client.id,
tags: [...client.tags],
payload: parsedMessage.payload,
});
});
this.server.on('upgrade', (request, socket, head) => { this.server.on('upgrade', (request, socket, head) => {
const { pathname } = parse(request.url); const { pathname } = parse(request.url);

View File

@ -16,6 +16,7 @@ import { applyErrorWithArgs, getErrorWithCode } from './errors';
import lodash from 'lodash'; import lodash from 'lodash';
import { Logger } from '@nocobase/logger'; import { Logger } from '@nocobase/logger';
import EventEmitter from 'events'; import EventEmitter from 'events';
import { parse } from 'url';
declare class WebSocketWithId extends WebSocket { declare class WebSocketWithId extends WebSocket {
id: string; id: string;
@ -135,6 +136,72 @@ export class WSServer extends EventEmitter {
}, },
}); });
}); });
AppSupervisor.getInstance().on('afterAppAdded', (app) => {
this.bindAppWSEvents(app);
});
this.on('message', async ({ client, message }) => {
const app = await AppSupervisor.getInstance().getApp(client.app);
if (!app) {
return;
}
const parsedMessage = JSON.parse(message.toString());
if (!parsedMessage.type) {
return;
}
const eventName = `ws:message:${parsedMessage.type}`;
app.emit(eventName, {
clientId: client.id,
tags: [...client.tags],
payload: parsedMessage.payload,
});
});
}
bindAppWSEvents(app) {
if (app.listenerCount('ws:setTag') > 0) {
return;
}
app.on('ws:setTag', ({ clientId, tagKey, tagValue }) => {
this.setClientTag(clientId, tagKey, tagValue);
});
app.on('ws:removeTag', ({ clientId, tagKey }) => {
this.removeClientTag(clientId, tagKey);
});
app.on('ws:sendToTag', ({ tagKey, tagValue, message }) => {
this.sendToConnectionsByTags(
[
{ tagName: tagKey, tagValue },
{ tagName: 'app', tagValue: app.name },
],
message,
);
});
app.on('ws:sendToClient', ({ clientId, message }) => {
this.sendToClient(clientId, message);
});
app.on('ws:sendToCurrentApp', ({ message }) => {
this.sendToConnectionsByTag('app', app.name, message);
});
app.on('ws:sendToTags', ({ tags, message }) => {
this.sendToConnectionsByTags(tags, message);
});
app.on('ws:authorized', ({ clientId, userId }) => {
this.sendToClient(clientId, { type: 'authorized' });
});
} }
addNewConnection(ws: WebSocketWithId, request: IncomingMessage) { addNewConnection(ws: WebSocketWithId, request: IncomingMessage) {
@ -155,6 +222,9 @@ export class WSServer extends EventEmitter {
setClientTag(clientId: string, tagKey: string, tagValue: string) { setClientTag(clientId: string, tagKey: string, tagValue: string) {
const client = this.webSocketClients.get(clientId); const client = this.webSocketClients.get(clientId);
if (!client) {
return;
}
client.tags.add(`${tagKey}#${tagValue}`); client.tags.add(`${tagKey}#${tagValue}`);
console.log(`client tags: ${Array.from(client.tags)}`); console.log(`client tags: ${Array.from(client.tags)}`);
} }
@ -186,32 +256,6 @@ export class WSServer extends EventEmitter {
if (!hasApp) { if (!hasApp) {
AppSupervisor.getInstance().bootStrapApp(handleAppName); AppSupervisor.getInstance().bootStrapApp(handleAppName);
} }
// const appStatus = AppSupervisor.getInstance().getAppStatus(handleAppName, 'initializing');
// if (appStatus === 'not_found') {
// this.sendMessageToConnection(client, {
// type: 'maintaining',
// payload: getPayloadByErrorCode('APP_NOT_FOUND', { appName: handleAppName }),
// });
// return;
// }
// if (appStatus === 'initializing') {
// this.sendMessageToConnection(client, {
// type: 'maintaining',
// payload: getPayloadByErrorCode('APP_INITIALIZING', { appName: handleAppName }),
// });
// return;
// }
// const app = await AppSupervisor.getInstance().getApp(handleAppName);
//
// this.sendMessageToConnection(client, {
// type: 'maintaining',
// payload: getPayloadByErrorCode(appStatus, { app }),
// });
} }
removeConnection(id: string) { removeConnection(id: string) {
@ -242,6 +286,13 @@ export class WSServer extends EventEmitter {
}); });
} }
sendToClient(clientId: string, sendMessage: object) {
const client = this.webSocketClients.get(clientId);
if (client) {
this.sendMessageToConnection(client, sendMessage);
}
}
loopThroughConnections(callback: (client: WebSocketClient) => void) { loopThroughConnections(callback: (client: WebSocketClient) => void) {
this.webSocketClients.forEach((client) => { this.webSocketClients.forEach((client) => {
callback(client); callback(client);

View File

@ -54,7 +54,7 @@ export class PluginAsyncExportServer extends Plugin {
const asyncTaskManager = this.app.container.get<AsyncTasksManager>('AsyncTaskManager'); const asyncTaskManager = this.app.container.get<AsyncTasksManager>('AsyncTaskManager');
this.app.on(`ws:message:request:async-tasks:list`, async (message) => { this.app.on(`ws:message:request:async-tasks:list`, async (message) => {
const { tags } = message; const { tags, clientId } = message;
this.app.logger.info(`Received request for async tasks with tags: ${JSON.stringify(tags)}`); this.app.logger.info(`Received request for async tasks with tags: ${JSON.stringify(tags)}`);
@ -68,9 +68,8 @@ export class PluginAsyncExportServer extends Plugin {
this.app.logger.info(`Found ${tasks.length} tasks for userId: ${userId}`); this.app.logger.info(`Found ${tasks.length} tasks for userId: ${userId}`);
this.app.emit('ws:sendToTag', { this.app.emit('ws:sendToClient', {
tagKey: 'userId', clientId,
tagValue: userId,
message: { message: {
type: 'async-tasks', type: 'async-tasks',
payload: tasks.map((task) => task.toJSON()), payload: tasks.map((task) => task.toJSON()),

View File

@ -151,6 +151,7 @@ export class PluginAuthServer extends Plugin {
userId: user.id, userId: user.id,
}); });
}); });
this.app.auditManager.registerActions([ this.app.auditManager.registerActions([
{ {
name: 'auth:signIn', name: 'auth:signIn',