feat: async task manager (#5979)

This commit is contained in:
ChengLei Shao 2025-01-01 13:04:03 +08:00 committed by GitHub
parent ed6d6f9f9a
commit 23ac4eb229
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 2108 additions and 0 deletions

View File

@ -0,0 +1,2 @@
/node_modules
/src

View File

@ -0,0 +1 @@
# @nocobase/plugin-async-export

View File

@ -0,0 +1,2 @@
export * from './dist/client';
export { default } from './dist/client';

View File

@ -0,0 +1 @@
module.exports = require('./dist/client/index.js');

View File

@ -0,0 +1,15 @@
{
"name": "@nocobase/plugin-async-task-manager",
"displayName": "Async task manager",
"displayName.zh-CN": "异步任务管理器",
"description": "Manage and monitor asynchronous tasks such as data import/export. Support task progress tracking and notification.",
"description.zh-CN": "管理和监控数据导入导出等异步任务。支持任务进度跟踪和通知。",
"version": "1.5.0-beta.19",
"main": "dist/server/index.js",
"dependencies": {},
"peerDependencies": {
"@nocobase/client": "1.x",
"@nocobase/server": "1.x",
"@nocobase/test": "1.x"
}
}

View File

@ -0,0 +1,2 @@
export * from './dist/server';
export { default } from './dist/server';

View File

@ -0,0 +1 @@
module.exports = require('./dist/server/index.js');

View File

@ -0,0 +1,176 @@
import { PinnedPluginListProvider, SchemaComponentOptions, useApp } from '@nocobase/client';
import { AsyncTasks } from './components/AsyncTasks';
import React, { useEffect, useState, createContext, useContext, useCallback } from 'react';
import { message } from 'antd';
import { useT } from './locale';
export const AsyncTaskContext = createContext<any>(null);
export const useAsyncTask = () => {
const context = useContext(AsyncTaskContext);
if (!context) {
throw new Error('useAsyncTask must be used within AsyncTaskManagerProvider');
}
return context;
};
export const AsyncTaskManagerProvider = (props) => {
const app = useApp();
const t = useT();
const [tasks, setTasks] = useState<any[]>([]);
const [popoverVisible, setPopoverVisible] = useState(false);
const [hasProcessingTasks, setHasProcessingTasks] = useState(false);
const [cancellingTasks, setCancellingTasks] = useState<Set<string>>(new Set());
const [modalVisible, setModalVisible] = useState(false);
const [currentError, setCurrentError] = useState<any>(null);
const [resultModalVisible, setResultModalVisible] = useState(false);
const [currentTask, setCurrentTask] = useState(null);
const [wsAuthorized, setWsAuthorized] = useState(() => app.isWsAuthorized);
useEffect(() => {
setHasProcessingTasks(tasks.some((task) => task.status.type !== 'success' && task.status.type !== 'failed'));
}, [tasks]);
const handleTaskMessage = useCallback((event: CustomEvent) => {
const tasks = event.detail;
setTasks(tasks ? tasks.sort((a, b) => new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime()) : []);
}, []);
const handleTaskCreated = useCallback((event: CustomEvent) => {
const taskData = event.detail;
setTasks((prev) => {
const newTasks = [taskData, ...prev];
return newTasks.sort((a, b) => new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime());
});
setPopoverVisible(true);
}, []);
const handleTaskProgress = useCallback((event: CustomEvent) => {
const { taskId, progress } = event.detail;
setTasks((prev) => prev.map((task) => (task.taskId === taskId ? { ...task, progress } : task)));
}, []);
const handleTaskStatus = useCallback((event: CustomEvent) => {
const { taskId, status } = event.detail;
if (status.type === 'cancelled') {
setTasks((prev) => prev.filter((task) => task.taskId !== taskId));
} else {
setTasks((prev) => {
const newTasks = prev.map((task) => {
if (task.taskId === taskId) {
if (status.type === 'success' && task.status.type !== 'success') {
message.success(t('Task completed'));
}
if (status.type === 'failed' && task.status.type !== 'failed') {
message.error(t('Task failed'));
}
return { ...task, status };
}
return task;
});
return newTasks;
});
}
}, []);
const handleWsAuthorized = useCallback(() => {
setWsAuthorized(true);
}, []);
const handleTaskCancelled = useCallback((event: CustomEvent) => {
const { taskId } = event.detail;
setCancellingTasks((prev) => {
const newSet = new Set(prev);
newSet.delete(taskId);
return newSet;
});
message.success(t('Task cancelled'));
}, []);
useEffect(() => {
app.eventBus.addEventListener('ws:message:async-tasks', handleTaskMessage);
app.eventBus.addEventListener('ws:message:async-tasks:created', handleTaskCreated);
app.eventBus.addEventListener('ws:message:async-tasks:progress', handleTaskProgress);
app.eventBus.addEventListener('ws:message:async-tasks:status', handleTaskStatus);
app.eventBus.addEventListener('ws:message:authorized', handleWsAuthorized);
app.eventBus.addEventListener('ws:message:async-tasks:cancelled', handleTaskCancelled);
if (wsAuthorized) {
app.ws.send(
JSON.stringify({
type: 'request:async-tasks:list',
}),
);
}
return () => {
app.eventBus.removeEventListener('ws:message:async-tasks', handleTaskMessage);
app.eventBus.removeEventListener('ws:message:async-tasks:created', handleTaskCreated);
app.eventBus.removeEventListener('ws:message:async-tasks:progress', handleTaskProgress);
app.eventBus.removeEventListener('ws:message:async-tasks:status', handleTaskStatus);
app.eventBus.removeEventListener('ws:message:authorized', handleWsAuthorized);
app.eventBus.removeEventListener('ws:message:async-tasks:cancelled', handleTaskCancelled);
};
}, [
app,
handleTaskMessage,
handleTaskCreated,
handleTaskProgress,
handleTaskStatus,
handleWsAuthorized,
handleTaskCancelled,
wsAuthorized,
]);
const handleCancelTask = async (taskId: string) => {
setCancellingTasks((prev) => new Set(prev).add(taskId));
try {
app.ws.send(
JSON.stringify({
type: 'request:async-tasks:cancel',
payload: { taskId },
}),
);
} catch (error) {
console.error('Failed to cancel task:', error);
setCancellingTasks((prev) => {
const newSet = new Set(prev);
newSet.delete(taskId);
return newSet;
});
}
};
const contextValue = {
tasks,
popoverVisible,
setPopoverVisible,
hasProcessingTasks,
cancellingTasks,
modalVisible,
setModalVisible,
currentError,
setCurrentError,
resultModalVisible,
setResultModalVisible,
currentTask,
setCurrentTask,
handleCancelTask,
};
return (
<AsyncTaskContext.Provider value={contextValue}>
<PinnedPluginListProvider
items={
tasks.length > 0
? {
asyncTasks: { order: 300, component: 'AsyncTasks', pin: true, snippet: '*' },
}
: {}
}
>
<SchemaComponentOptions components={{ AsyncTasks }}>{props.children}</SchemaComponentOptions>
</PinnedPluginListProvider>
</AsyncTaskContext.Provider>
);
};

View File

@ -0,0 +1,13 @@
import React from 'react';
export class TaskResultRendererManager {
private renderers = new Map<string, React.ComponentType<any>>();
register(type: string, renderer: React.ComponentType<any>) {
this.renderers.set(type, renderer);
}
get(type: string) {
return this.renderers.get(type);
}
}

View File

@ -0,0 +1,249 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
// CSS modules
type CSSModuleClasses = { readonly [key: string]: string };
declare module '*.module.css' {
const classes: CSSModuleClasses;
export default classes;
}
declare module '*.module.scss' {
const classes: CSSModuleClasses;
export default classes;
}
declare module '*.module.sass' {
const classes: CSSModuleClasses;
export default classes;
}
declare module '*.module.less' {
const classes: CSSModuleClasses;
export default classes;
}
declare module '*.module.styl' {
const classes: CSSModuleClasses;
export default classes;
}
declare module '*.module.stylus' {
const classes: CSSModuleClasses;
export default classes;
}
declare module '*.module.pcss' {
const classes: CSSModuleClasses;
export default classes;
}
declare module '*.module.sss' {
const classes: CSSModuleClasses;
export default classes;
}
// CSS
declare module '*.css' { }
declare module '*.scss' { }
declare module '*.sass' { }
declare module '*.less' { }
declare module '*.styl' { }
declare module '*.stylus' { }
declare module '*.pcss' { }
declare module '*.sss' { }
// Built-in asset types
// see `src/node/constants.ts`
// images
declare module '*.apng' {
const src: string;
export default src;
}
declare module '*.png' {
const src: string;
export default src;
}
declare module '*.jpg' {
const src: string;
export default src;
}
declare module '*.jpeg' {
const src: string;
export default src;
}
declare module '*.jfif' {
const src: string;
export default src;
}
declare module '*.pjpeg' {
const src: string;
export default src;
}
declare module '*.pjp' {
const src: string;
export default src;
}
declare module '*.gif' {
const src: string;
export default src;
}
declare module '*.svg' {
const src: string;
export default src;
}
declare module '*.ico' {
const src: string;
export default src;
}
declare module '*.webp' {
const src: string;
export default src;
}
declare module '*.avif' {
const src: string;
export default src;
}
// media
declare module '*.mp4' {
const src: string;
export default src;
}
declare module '*.webm' {
const src: string;
export default src;
}
declare module '*.ogg' {
const src: string;
export default src;
}
declare module '*.mp3' {
const src: string;
export default src;
}
declare module '*.wav' {
const src: string;
export default src;
}
declare module '*.flac' {
const src: string;
export default src;
}
declare module '*.aac' {
const src: string;
export default src;
}
declare module '*.opus' {
const src: string;
export default src;
}
declare module '*.mov' {
const src: string;
export default src;
}
declare module '*.m4a' {
const src: string;
export default src;
}
declare module '*.vtt' {
const src: string;
export default src;
}
// fonts
declare module '*.woff' {
const src: string;
export default src;
}
declare module '*.woff2' {
const src: string;
export default src;
}
declare module '*.eot' {
const src: string;
export default src;
}
declare module '*.ttf' {
const src: string;
export default src;
}
declare module '*.otf' {
const src: string;
export default src;
}
// other
declare module '*.webmanifest' {
const src: string;
export default src;
}
declare module '*.pdf' {
const src: string;
export default src;
}
declare module '*.txt' {
const src: string;
export default src;
}
// wasm?init
declare module '*.wasm?init' {
const initWasm: (options?: WebAssembly.Imports) => Promise<WebAssembly.Instance>;
export default initWasm;
}
// web worker
declare module '*?worker' {
const workerConstructor: {
new(options?: { name?: string }): Worker;
};
export default workerConstructor;
}
declare module '*?worker&inline' {
const workerConstructor: {
new(options?: { name?: string }): Worker;
};
export default workerConstructor;
}
declare module '*?worker&url' {
const src: string;
export default src;
}
declare module '*?sharedworker' {
const sharedWorkerConstructor: {
new(options?: { name?: string }): SharedWorker;
};
export default sharedWorkerConstructor;
}
declare module '*?sharedworker&inline' {
const sharedWorkerConstructor: {
new(options?: { name?: string }): SharedWorker;
};
export default sharedWorkerConstructor;
}
declare module '*?sharedworker&url' {
const src: string;
export default src;
}
declare module '*?raw' {
const src: string;
export default src;
}
declare module '*?url' {
const src: string;
export default src;
}
declare module '*?inline' {
const src: string;
export default src;
}

View File

@ -0,0 +1,400 @@
import React, { useEffect } from 'react';
import { Button, Popover, Table, Tag, Progress, Space, Tooltip, Popconfirm, Modal, Empty } from 'antd';
import { Icon, useApp, usePlugin } from '@nocobase/client';
import { ExclamationCircleOutlined } from '@ant-design/icons';
import dayjs from 'dayjs';
import 'dayjs/locale/zh-cn';
import relativeTime from 'dayjs/plugin/relativeTime';
import { useT } from '../locale';
import { useAsyncTask } from '../AsyncTaskManagerProvider';
// Configure dayjs
dayjs.extend(relativeTime);
const renderTaskResult = (status, t) => {
if (status.type !== 'success' || !status.payload?.message?.messageId) {
return null;
}
const { messageId, messageValues } = status.payload.message;
return (
<div style={{ marginLeft: 8 }}>
<Tag color="success">{t(messageId, messageValues)}</Tag>
</div>
);
};
export const AsyncTasks = () => {
const {
tasks,
popoverVisible,
setPopoverVisible,
hasProcessingTasks,
cancellingTasks,
modalVisible,
setModalVisible,
currentError,
setCurrentError,
resultModalVisible,
setResultModalVisible,
currentTask,
setCurrentTask,
handleCancelTask,
} = useAsyncTask();
const plugin = usePlugin<any>('async-task-manager');
const app = useApp();
const t = useT();
useEffect(() => {
const handleClickOutside = (event: MouseEvent) => {
if (popoverVisible) {
const popoverElements = document.querySelectorAll('.ant-popover');
const buttonElement = document.querySelector('.sync-task-button');
let clickedInside = false;
popoverElements.forEach((element) => {
if (element.contains(event.target as Node)) {
clickedInside = true;
}
});
if (buttonElement?.contains(event.target as Node)) {
clickedInside = true;
}
if (!clickedInside) {
setPopoverVisible(false);
}
}
};
document.addEventListener('click', handleClickOutside);
return () => {
document.removeEventListener('click', handleClickOutside);
};
}, [popoverVisible, setPopoverVisible]);
const showTaskResult = (task) => {
setCurrentTask(task);
setResultModalVisible(true);
setPopoverVisible(false);
};
const renderTaskResultModal = () => {
if (!currentTask) {
return;
}
const { payload } = currentTask.status;
const renderer = plugin.taskResultRendererManager.get(currentTask.title.actionType);
return (
<Modal
title={t('Task result')}
open={resultModalVisible}
footer={[
<Button key="close" onClick={() => setResultModalVisible(false)}>
{t('Close')}
</Button>,
]}
onCancel={() => setResultModalVisible(false)}
>
{renderer ? (
React.createElement(renderer, { payload, task: currentTask })
) : (
<div>{t(`No renderer available for this task type, payload: ${payload}`)}</div>
)}
</Modal>
);
};
const columns = [
{
title: t('Created at'),
dataIndex: 'createdAt',
key: 'createdAt',
width: 180,
render: (createdAt: string) => (
<Tooltip title={dayjs(createdAt).format('YYYY-MM-DD HH:mm:ss')}>{dayjs(createdAt).fromNow()}</Tooltip>
),
},
{
title: t('Task'),
dataIndex: 'title',
key: 'title',
render: (_, record: any) => {
const title = record.title;
if (!title) {
return '-';
}
const actionTypeMap = {
export: t('Export'),
import: t('Import'),
'export-attachments': t('Export attachments'),
};
const actionText = actionTypeMap[title.actionType] || title.actionType;
const taskTypeMap = {
'export-attachments': t('Export {collection} attachments'),
export: t('Export {collection} data'),
import: t('Import {collection} data'),
};
const taskTemplate = taskTypeMap[title.actionType] || `${actionText} ${title.collection} ${t('Data')}`;
return taskTemplate.replace('{collection}', title.collection);
},
},
{
title: t('Status'),
dataIndex: 'status',
key: 'status',
width: 160,
render: (status: any, record: any) => {
const statusMap = {
pending: {
color: 'default',
text: t('Waiting'),
icon: 'ClockCircleOutlined',
},
running: {
color: 'processing',
text: t('Processing'),
icon: 'LoadingOutlined',
},
success: {
color: 'success',
text: t('Completed'),
icon: 'CheckCircleOutlined',
},
failed: {
color: 'error',
text: t('Failed'),
icon: 'CloseCircleOutlined',
},
cancelled: {
color: 'warning',
text: t('Cancelled'),
icon: 'StopOutlined',
},
};
const { color, text } = statusMap[status.type] || {};
const renderProgress = () => {
const commonStyle = {
width: 100,
margin: 0,
};
switch (status.indicator) {
case 'spinner':
return (
<Progress
type="line"
size="small"
strokeWidth={4}
percent={100}
status="active"
showInfo={false}
style={commonStyle}
/>
);
case 'progress':
return (
<Progress
type="line"
size="small"
strokeWidth={4}
percent={Number(((record.progress?.current / record.progress?.total) * 100).toFixed(2))}
status="active"
style={commonStyle}
format={(percent) => `${percent.toFixed(1)}%`}
/>
);
case 'success':
return (
<Progress
type="line"
size="small"
strokeWidth={4}
percent={100}
status="success"
style={commonStyle}
format={() => ''}
/>
);
case 'error':
return (
<Progress
type="line"
size="small"
strokeWidth={4}
percent={100}
status="exception"
style={commonStyle}
format={() => ''}
/>
);
default:
return null;
}
};
return (
<div style={{ display: 'flex', alignItems: 'center', gap: 8 }}>
<div style={{ flex: 1 }}>{renderProgress()}</div>
<Tag
color={color}
icon={statusMap[status.type]?.icon ? <Icon type={statusMap[status.type].icon} /> : null}
style={{ margin: 0, padding: '0 4px', height: 22, width: 22 }}
/>
{renderTaskResult(status, t)}
</div>
);
},
},
{
title: t('Actions'),
key: 'actions',
width: 180,
render: (_, record: any) => {
const actions = [];
const isTaskCancelling = cancellingTasks.has(record.taskId);
if (record.status.type === 'running' || record.status.type === 'pending') {
actions.push(
<Popconfirm
key="cancel"
title={t('Confirm cancel')}
description={t('Confirm cancel description')}
onConfirm={() => handleCancelTask(record.taskId)}
okText={t('Confirm')}
cancelText={t('Cancel')}
disabled={isTaskCancelling}
>
<Button
type="link"
size="small"
icon={<Icon type={isTaskCancelling ? 'LoadingOutlined' : 'StopOutlined'} />}
disabled={isTaskCancelling}
>
{isTaskCancelling ? t('Cancelling') : t('Cancel')}
</Button>
</Popconfirm>,
);
}
if (record.status.type === 'success') {
if (record.status.resultType === 'file') {
actions.push(
<Button
key="download"
type="link"
size="small"
icon={<Icon type="DownloadOutlined" />}
onClick={() => {
const token = app.apiClient.auth.token;
const url = app.getApiUrl(
`asyncTasks:fetchFile/${record.taskId}?token=${token}&__appName=${app.name}`,
);
window.open(url);
}}
>
{t('Download')}
</Button>,
);
} else if (record.status.payload) {
actions.push(
<Button
key="view"
type="link"
size="small"
icon={<Icon type="EyeOutlined" />}
onClick={() => showTaskResult(record)}
>
{t('View result')}
</Button>,
);
}
}
if (record.status.type === 'failed') {
actions.push(
<Button
key="error"
type="link"
size="small"
icon={<Icon type="ExclamationCircleOutlined" />}
onClick={() => {
setCurrentError(record.status.errors);
setModalVisible(true);
setPopoverVisible(false);
}}
>
{t('Error details')}
</Button>,
);
}
return <Space size="middle">{actions}</Space>;
},
},
];
const content = (
<div style={{ width: tasks.length > 0 ? 800 : 200 }}>
{tasks.length > 0 ? (
<Table columns={columns} dataSource={tasks} size="small" pagination={false} rowKey="taskId" />
) : (
<div style={{ padding: '24px 0', display: 'flex', justifyContent: 'center' }}>
<Empty description={t('No tasks')} image={Empty.PRESENTED_IMAGE_SIMPLE} />
</div>
)}
</div>
);
return (
<>
<Popover
content={content}
trigger="hover"
placement="bottom"
open={popoverVisible}
onOpenChange={setPopoverVisible}
>
<Button
className="sync-task-button"
icon={<Icon type={'SyncOutlined'} spin={hasProcessingTasks} />}
onClick={() => setPopoverVisible(!popoverVisible)}
/>
</Popover>
{renderTaskResultModal()}
<Modal
title={t('Error Details')}
open={modalVisible}
onCancel={() => setModalVisible(false)}
footer={[
<Button key="ok" type="primary" onClick={() => setModalVisible(false)}>
{t('OK')}
</Button>,
]}
width={400}
>
{currentError?.map((error, index) => (
<div key={index} style={{ marginBottom: 16 }}>
<div style={{ color: '#ff4d4f', marginBottom: 8 }}>{error.message}</div>
{error.code && (
<div style={{ color: '#999', fontSize: 12 }}>
{t('Error code')}: {error.code}
</div>
)}
</div>
))}
</Modal>
</>
);
};

View File

@ -0,0 +1,13 @@
import { Plugin } from '@nocobase/client';
import { AsyncTaskManagerProvider } from './AsyncTaskManagerProvider';
import { TaskResultRendererManager } from './TaskResultRendererManager';
export class PluginAsyncExportClient extends Plugin {
taskResultRendererManager: TaskResultRendererManager = new TaskResultRendererManager();
async load() {
this.app.use(AsyncTaskManagerProvider);
}
}
export default PluginAsyncExportClient;

View File

@ -0,0 +1,12 @@
// @ts-ignore
import pkg from '../../package.json';
import { useApp } from '@nocobase/client';
export function useT() {
const app = useApp();
return (str: string) => app.i18n.t(str, { ns: [pkg.name, 'client'] });
}
export function tStr(key: string) {
return `{{t(${JSON.stringify(key)}, { ns: ['${pkg.name}', 'client'], nsMode: 'fallback' })}}`;
}

View File

@ -0,0 +1,2 @@
export * from './server';
export { default } from './server';

View File

@ -0,0 +1,5 @@
{
"Export {collection} attachments": "{collection} attachments export",
"Export {collection} data": "{collection} data export",
"Import {collection} data": "{collection} data import"
}

View File

@ -0,0 +1,45 @@
{
"Export": "导出",
"Import": "导入",
"Data": "数据",
"Task": "任务",
"Status": "状态",
"Actions": "操作",
"Created at": "创建时间",
"Type": "类型",
"Waiting": "等待中",
"Processing": "进行中",
"Completed": "已完成",
"Failed": "执行失败",
"Cancelled": "已取消",
"Cancel": "取消",
"Cancelling": "取消中",
"Download": "下载",
"Error details": "错误详情",
"Confirm cancel": "确认取消",
"Confirm cancel description": "确定要取消这个任务吗?",
"Confirm": "确定",
"Task cancelled": "任务已取消",
"Task completed": "任务已完成",
"Task failed": "任务执行失败",
"Error Details": "错误详情",
"Close": "关闭",
"Error code": "错误代码",
"Unknown error": "未知错误",
"OK": "确定",
"Import result": "导入结果",
"Import completed": "导入完成:{{success}} 条记录已导入,{{updated}} 条记录已更新,{{skipped}} 条记录已跳过,共 {{total}} 条记录",
"Import summary": "已导入 {{success}}/{{total}} 条记录",
"Import details": "成功导入 {{success}} 条,更新 {{updated}} 条,跳过 {{skipped}} 条,共 {{total}} 条",
"Imported": "已导入 {{count}}/{{total}}",
"Successfully imported": "成功导入",
"Updated records": "已更新记录",
"Skipped records": "已跳过记录",
"Total records": "总记录数",
"View result": "查看结果",
"ImportResult": "已导入 {{success}} 条,更新 {{updated}} 条,跳过 {{skipped}} 条,共 {{total}} 条",
"Task result": "任务结果",
"Export {collection} attachments": "导出{collection}附件",
"Export {collection} data": "导出{collection}记录",
"Import {collection} data": "导入{collection}数据"
}

View File

@ -0,0 +1,462 @@
import { AsyncTasksManager, TaskStatus, CancelError } from '../interfaces/async-task-manager';
import { createMockServer } from '@nocobase/test';
import { TaskType } from '../task-type';
describe('task manager', () => {
let taskManager: AsyncTasksManager;
let app;
beforeEach(async () => {
app = await createMockServer({
plugins: ['nocobase', 'async-task-manager'],
});
taskManager = app.container.get('AsyncTaskManager');
});
afterEach(async () => {
await app.destroy();
});
it('should register task type', async () => {
class TestTaskType extends TaskType {
static type = 'test';
async execute() {
for (let i = 0; i < 10; i++) {
this.reportProgress({
total: 10,
current: i,
});
}
return {
a: 'b',
};
}
}
taskManager.registerTaskType(TestTaskType);
const task = await taskManager.createTask({
type: 'test',
params: {},
});
expect(task).toBeTruthy();
expect(task.status.type).toBe('pending');
// should get tasks status through task id
const getResp = await app.agent().resource('asyncTasks').get({
filterByTk: task.taskId,
});
expect(getResp.status).toBe(200);
const testFn = vi.fn();
task.on('progress', (progress) => {
testFn();
});
await task.run();
expect(testFn).toHaveBeenCalledTimes(10);
const getResp2 = await app.agent().resource('asyncTasks').get({
filterByTk: task.taskId,
});
expect(getResp2.status).toBe(200);
expect(getResp2.body.data.type).toBe('success');
expect(getResp2.body.data.payload).toEqual({
a: 'b',
});
});
it('should get tasks by tag', async () => {
class TestTaskType extends TaskType {
static type = 'test';
async execute() {
return { success: true };
}
}
taskManager.registerTaskType(TestTaskType);
const task1 = await taskManager.createTask({
type: 'test',
params: {},
tags: {
category: 'import',
source: 'excel',
},
});
const task2 = await taskManager.createTask({
type: 'test',
params: {},
tags: {
category: 'import',
source: 'csv',
},
});
const task3 = await taskManager.createTask({
type: 'test',
params: {},
tags: {
category: 'export',
source: 'excel',
},
});
const importTasks = await taskManager.getTasksByTag('category', 'import');
expect(importTasks.length).toBe(2);
const excelTasks = await taskManager.getTasksByTag('source', 'excel');
expect(excelTasks.length).toBe(2);
const csvTasks = await taskManager.getTasksByTag('source', 'csv');
expect(csvTasks.length).toBe(1);
});
it('should emit events when task status changes', async () => {
class TestTaskType extends TaskType {
static type = 'test';
async execute() {
this.reportProgress({ total: 10, current: 5 });
return { success: true };
}
}
taskManager.registerTaskType(TestTaskType);
const taskCreatedFn = vi.fn();
const taskProgressFn = vi.fn();
const taskStatusChangeFn = vi.fn();
taskManager.on('taskCreated', taskCreatedFn);
taskManager.on('taskProgress', taskProgressFn);
taskManager.on('taskStatusChange', (event) => {
console.log('taskStatusChange', event);
taskStatusChangeFn(event);
});
const task = await taskManager.createTask({
type: 'test',
params: {},
});
// 测试任务创建事件
expect(taskCreatedFn).toHaveBeenCalledTimes(1);
expect(taskCreatedFn).toHaveBeenCalledWith(
expect.objectContaining({
task: expect.any(TestTaskType),
}),
);
await task.run();
// 测试进度事件
expect(taskProgressFn).toHaveBeenCalledTimes(1);
expect(taskProgressFn).toHaveBeenCalledWith(
expect.objectContaining({
task,
progress: { total: 10, current: 5 },
}),
);
// 测试状态变更事件
expect(taskStatusChangeFn).toHaveBeenCalledTimes(2); // 初始 running 状态和最终 success 状态
expect(taskStatusChangeFn).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
task,
status: {
type: 'running',
indicator: 'progress',
},
}),
);
expect(taskStatusChangeFn).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
task,
status: {
type: 'success',
indicator: 'success',
payload: { success: true },
},
}),
);
});
it('should emit events when task fails', async () => {
class FailingTaskType extends TaskType {
static type = 'failing';
async execute() {
throw new Error('Task failed');
}
}
taskManager.registerTaskType(FailingTaskType);
const taskStatusChangeFn = vi.fn();
taskManager.on('taskStatusChange', taskStatusChangeFn);
const task = taskManager.createTask({
type: 'failing',
params: {},
});
// 使用 try/catch 来处理预期的错误
try {
await task.run();
} catch (error) {
console.log('error', error);
// 错误已经被 TaskType 的 run 方法处理了,这里不需要做任何事
}
expect(taskStatusChangeFn).toHaveBeenCalledTimes(2); // 一次是 running 状态,一次是 failed 状态
expect(taskStatusChangeFn).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
task,
status: {
type: 'running',
indicator: 'progress',
},
}),
);
expect(taskStatusChangeFn).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
task,
status: {
type: 'failed',
indicator: 'error',
errors: [{ message: 'Task failed' }],
},
}),
);
});
it('should handle task progress correctly', async () => {
class TestTaskType extends TaskType {
static type = 'test';
async execute() {
for (let i = 0; i < 10; i++) {
await new Promise((resolve) => setTimeout(resolve, 10));
this.reportProgress({
total: 10,
current: i + 1,
});
}
return {
completed: true,
processedItems: 10,
};
}
}
taskManager.registerTaskType(TestTaskType);
const progressUpdates = [];
const statusChanges = [];
const task = await taskManager.createTask({
type: 'test',
params: {},
});
// 监听进度更新
task.on('progress', (progress) => {
progressUpdates.push(progress);
});
// 监听状态变化
task.on('statusChange', (status) => {
statusChanges.push(status);
});
// 运行任务
await task.run();
// 验证进度更新
expect(progressUpdates.length).toBe(10);
expect(progressUpdates[0]).toEqual({
total: 10,
current: 1,
});
expect(progressUpdates[9]).toEqual({
total: 10,
current: 10,
});
// 验证状态变化
expect(statusChanges.length).toBe(2); // pending -> running -> success
expect(statusChanges[0].type).toBe('running');
expect(statusChanges[1].type).toBe('success');
expect(statusChanges[1].payload).toEqual({
completed: true,
processedItems: 10,
});
// 验证最终任务状态和进度
const finalTask = await app.agent().resource('asyncTasks').get({
filterByTk: task.taskId,
});
expect(finalTask.body.data.type).toBe('success');
});
it('should cancel task correctly', async () => {
let executionCompleted = false;
class LongRunningTaskType extends TaskType {
static type = 'long-running';
async execute() {
for (let i = 0; i < 10; i++) {
if (this.isCancelled) {
throw new CancelError();
}
await new Promise((resolve) => setTimeout(resolve, 1000));
this.reportProgress({
total: 10,
current: i + 1,
});
}
executionCompleted = true;
return { success: true };
}
}
taskManager.registerTaskType(LongRunningTaskType);
const statusChanges: TaskStatus[] = [];
const task = await taskManager.createTask({
type: 'long-running',
params: {},
});
task.on('statusChange', (status) => {
statusChanges.push(status);
});
// 启动任务
const runPromise = task.run();
// 等待一小段时间让任务开始执行
await new Promise((resolve) => setTimeout(resolve, 150));
// 取消任务
const cancelled = await taskManager.cancelTask(task.taskId);
expect(cancelled).toBe(true);
// 等待任务完成执行
await runPromise;
// 等待状态变更事件被处理
await new Promise((resolve) => setTimeout(resolve, 100));
// 验证状态变化
expect(statusChanges.length).toBe(2); // running -> cancelled
expect(statusChanges[0].type).toBe('running');
expect(statusChanges[1].type).toBe('cancelled');
// 验证任务状态
expect(task.status.type).toBe('cancelled');
// 验证任务是否正确中断
expect(executionCompleted).toBe(false);
// 验证无法取消不存在的任务
expect(await taskManager.cancelTask('non-existent-id')).toBe(false);
});
describe('task cancellation', () => {
it('should remove task from memory immediately after cancellation', async () => {
class CancellableTaskType extends TaskType {
static type = 'cancellable';
async execute() {
while (!this.isCancelled) {
await new Promise((resolve) => setTimeout(resolve, 100));
}
throw new CancelError();
}
}
taskManager.registerTaskType(CancellableTaskType);
const task = await taskManager.createTask({
type: 'cancellable',
params: {},
});
// 启动任务
const runPromise = task.run();
// 等待任务开始执行
await new Promise((resolve) => setTimeout(resolve, 150));
// 取消任务
const cancelled = await taskManager.cancelTask(task.taskId);
expect(cancelled).toBe(true);
// 等待任务结束和状态变更事件被处理
await runPromise;
await new Promise((resolve) => setTimeout(resolve, 100));
// 验证任务已从内存中移除
const tasks = Array.from(taskManager['tasks'].values()) as TaskType[];
expect(tasks.length).toBe(0);
// 验证无法获取已删除任务的状态
await expect(taskManager.getTaskStatus(task.taskId)).rejects.toThrow();
});
it('should handle multiple cancellation attempts', async () => {
class MultiCancelTaskType extends TaskType {
static type = 'multi-cancel';
async execute() {
while (!this.isCancelled) {
await new Promise((resolve) => setTimeout(resolve, 100));
}
throw new CancelError();
}
}
taskManager.registerTaskType(MultiCancelTaskType);
const task = await taskManager.createTask({
type: 'multi-cancel',
params: {},
});
// 启动任务
const runPromise = task.run();
// 第一次取消
const firstCancellation = await taskManager.cancelTask(task.taskId);
expect(firstCancellation).toBe(true);
// 等待任务结束和状态变更事件被处理
await runPromise;
await new Promise((resolve) => setTimeout(resolve, 100));
// 第二次取消应该返回 false因为任务已经从内存中移除
const secondCancellation = await taskManager.cancelTask(task.taskId);
expect(secondCancellation).toBe(false);
});
});
});

View File

@ -0,0 +1,134 @@
import { EventEmitter } from 'events';
import { AsyncTasksManager, CreateTaskOptions, TaskId, TaskStatus } from './interfaces/async-task-manager';
import { Logger } from '@nocobase/logger';
import { ITask, TaskConstructor } from './interfaces/task';
import { Application } from '@nocobase/server';
export class BaseTaskManager extends EventEmitter implements AsyncTasksManager {
private taskTypes: Map<string, TaskConstructor> = new Map();
private tasks: Map<TaskId, ITask> = new Map();
// Clean up completed tasks after 30 minutes by default
private readonly cleanupDelay = 30 * 60 * 1000;
private logger: Logger;
private app: Application;
setLogger(logger: Logger): void {
this.logger = logger;
}
setApp(app: Application): void {
this.app = app;
}
private scheduleCleanup(taskId: TaskId) {
setTimeout(() => {
this.tasks.delete(taskId);
this.logger.debug(`Task ${taskId} cleaned up after ${this.cleanupDelay}ms`);
}, this.cleanupDelay);
}
constructor() {
super();
}
async cancelTask(taskId: TaskId): Promise<boolean> {
const task = this.tasks.get(taskId);
if (!task) {
this.logger.warn(`Attempted to cancel non-existent task ${taskId}`);
return false;
}
this.logger.info(`Cancelling task ${taskId}, type: ${task.constructor.name}, tags: ${JSON.stringify(task.tags)}`);
return task.cancel();
}
createTask<T>(options: CreateTaskOptions): ITask {
const taskType = this.taskTypes.get(options.type);
if (!taskType) {
this.logger.error(`Task type not found: ${options.type}, params: ${JSON.stringify(options.params)}`);
throw new Error(`Task type ${options.type} not found`);
}
this.logger.info(
`Creating task of type: ${options.type}, params: ${JSON.stringify(options.params)}, tags: ${JSON.stringify(
options.tags,
)}`,
);
const task = new (taskType as unknown as new (
options: CreateTaskOptions['params'],
tags?: Record<string, string>,
) => ITask)(options.params, options.tags);
task.title = options.title;
task.setLogger(this.logger);
task.setApp(this.app);
task.setContext(options.context);
this.tasks.set(task.taskId, task);
this.logger.info(
`Created new task ${task.taskId} of type ${options.type}, params: ${JSON.stringify(
options.params,
)}, tags: ${JSON.stringify(options.tags)}, title: ${task.title}`,
);
this.emit('taskCreated', { task });
task.on('progress', (progress) => {
this.logger.debug(`Task ${task.taskId} progress: ${progress}`);
this.emit('taskProgress', { task, progress });
});
task.on('statusChange', (status) => {
if (['success', 'failed'].includes(status.type)) {
this.scheduleCleanup(task.taskId);
} else if (status.type === 'cancelled') {
// Remove task immediately when cancelled
this.tasks.delete(task.taskId);
}
this.emit('taskStatusChange', { task, status });
});
return task;
}
getTask(taskId: TaskId): ITask | undefined {
const task = this.tasks.get(taskId);
if (!task) {
this.logger.debug(`Task not found: ${taskId}`);
return undefined;
}
this.logger.debug(`Retrieved task ${taskId}, type: ${task.constructor.name}, status: ${task.status.type}`);
return task;
}
async getTaskStatus(taskId: TaskId): Promise<TaskStatus> {
const task = this.tasks.get(taskId);
if (!task) {
this.logger.warn(`Attempted to get status of non-existent task ${taskId}`);
throw new Error(`Task ${taskId} not found`);
}
this.logger.debug(`Getting status for task ${taskId}, current status: ${task.status.type}`);
return task.status;
}
registerTaskType(taskType: TaskConstructor): void {
this.logger.info(`Registering task type: ${taskType.type}`);
this.taskTypes.set(taskType.type, taskType);
}
async getTasksByTag(tagKey: string, tagValue: string): Promise<ITask[]> {
this.logger.debug(`Getting tasks by tag - key: ${tagKey}, value: ${tagValue}`);
const tasks = Array.from(this.tasks.values()).filter((task) => {
return task.tags[tagKey] == tagValue;
});
this.logger.debug(`Found ${tasks.length} tasks with tag ${tagKey}=${tagValue}`);
return tasks;
}
}

View File

@ -0,0 +1,85 @@
import { CancelError } from './interfaces/async-task-manager';
import process from 'node:process';
import { Worker } from 'worker_threads';
import path from 'path';
import { TaskType } from './task-type';
export class CommandTaskType extends TaskType {
static type = 'command';
workerThread: Worker;
async execute() {
const { argv } = this.options;
const isDev = (process.argv[1]?.endsWith('.ts') || process.argv[1].includes('tinypool')) ?? false;
const appRoot = process.env.APP_PACKAGE_ROOT || 'packages/core/app';
const workerPath = path.resolve(process.cwd(), appRoot, isDev ? 'src/index.ts' : 'lib/index.js');
const workerPromise = new Promise((resolve, reject) => {
try {
this.logger?.info(
`Creating worker for task ${this.taskId} - path: ${workerPath}, argv: ${JSON.stringify(
argv,
)}, isDev: ${isDev}`,
);
const worker = new Worker(workerPath, {
execArgv: isDev ? ['--require', 'tsx/cjs'] : [],
workerData: {
argv,
},
});
this.workerThread = worker;
this.logger?.debug(`Worker created successfully for task ${this.taskId}`);
let isCancelling = false;
// Listen for abort signal
this.abortController.signal.addEventListener('abort', () => {
isCancelling = true;
this.logger?.info(`Terminating worker for task ${this.taskId} due to cancellation`);
worker.terminate();
});
worker.on('message', (message) => {
this.logger?.debug(`Worker message received for task ${this.taskId} - type: ${message.type}`);
if (message.type === 'progress') {
this.reportProgress(message.payload);
}
if (message.type === 'success') {
this.logger?.info(
`Worker completed successfully for task ${this.taskId} with payload: ${JSON.stringify(message.payload)}`,
);
resolve(message.payload);
}
});
worker.on('error', (error) => {
this.logger?.error(`Worker error for task ${this.taskId}`, error);
reject(error);
});
worker.on('exit', (code) => {
this.logger?.info(`Worker exited for task ${this.taskId} with code ${code}`);
if (isCancelling) {
reject(new CancelError());
} else if (code !== 0) {
reject(new Error(`Worker stopped with exit code ${code}`));
} else {
resolve(code);
}
});
worker.on('messageerror', (error) => {
reject(error);
});
} catch (error) {
reject(error);
}
});
return workerPromise;
}
}

View File

@ -0,0 +1,4 @@
export * from './interfaces/async-task-manager';
export * from './static-import';
export { default } from './plugin';

View File

@ -0,0 +1,69 @@
import { Logger } from '@nocobase/logger';
import { ITask, TaskConstructor } from './task';
import { Application } from '@nocobase/server';
import { EventEmitter } from 'events';
export type TaskOptions = any;
export interface CreateTaskOptions {
type: string;
params: TaskOptions;
tags?: Record<string, string>;
title?: {
actionType: string;
collection: string;
dataSource: string;
};
context?: any;
}
export type TaskId = string;
export type TaskStatus = PendingStatus | SuccessStatus<any> | RunningStatus | FailedStatus | CancelledStatus;
export type ProgressIndicator = 'spinner' | 'progress' | 'success' | 'error';
export interface PendingStatus {
type: 'pending';
indicator?: 'spinner';
}
export interface SuccessStatus<T = any> {
type: 'success';
indicator?: 'success';
resultType?: 'file' | 'data';
payload?: T;
}
export interface RunningStatus {
type: 'running';
indicator: 'progress';
}
export interface FailedStatus {
type: 'failed';
indicator?: 'error';
errors: Array<{ message: string; code?: number }>;
}
export interface CancelledStatus {
type: 'cancelled';
}
export interface AsyncTasksManager extends EventEmitter {
setLogger(logger: Logger): void;
setApp(app: Application): void;
registerTaskType(taskType: TaskConstructor): void;
createTask<T>(options: CreateTaskOptions): ITask;
getTasksByTag(tagKey: string, tagValue: string): Promise<ITask[]>;
cancelTask(taskId: TaskId): Promise<boolean>;
getTaskStatus(taskId: TaskId): Promise<TaskStatus>;
getTask(taskId: TaskId): ITask | undefined;
}
export class CancelError extends Error {
constructor(message = 'Task cancelled') {
super(message);
this.name = 'CancelError';
}
}

View File

@ -0,0 +1,34 @@
import { Logger } from '@nocobase/logger';
import { TaskStatus } from './async-task-manager';
import { EventEmitter } from 'events';
import { Application } from '@nocobase/server';
export interface ITask extends EventEmitter {
taskId: string;
status: TaskStatus;
progress: {
total: number;
current: number;
};
startedAt: Date;
fulfilledAt: Date;
tags: Record<string, string>;
createdAt: Date;
title?: any;
isCancelled: boolean;
context?: any;
setLogger(logger: Logger): void;
setApp(app: Application): void;
setContext(context: any): void;
cancel(): Promise<boolean>;
execute(): Promise<any>;
reportProgress(progress: { total: number; current: number }): void;
run(): Promise<void>;
toJSON(options?: { raw?: boolean }): any;
}
export interface TaskConstructor {
type: string;
new (options: any, tags?: Record<string, string>): ITask;
}

View File

@ -0,0 +1,142 @@
import { Plugin } from '@nocobase/server';
import { BaseTaskManager } from './base-task-manager';
import { AsyncTasksManager } from './interfaces/async-task-manager';
import { CommandTaskType } from './command-task-type';
import asyncTasksResource from './resourcers/async-tasks';
export class PluginAsyncExportServer extends Plugin {
async afterAdd() {}
async beforeLoad() {
this.app.container.register('AsyncTaskManager', () => {
const manager = new BaseTaskManager();
// @ts-ignore
manager.setLogger(this.app.logger);
manager.setApp(this.app);
return manager;
});
this.app.container.get<AsyncTasksManager>('AsyncTaskManager').registerTaskType(CommandTaskType);
this.app.acl.allow('asyncTasks', ['get', 'fetchFile'], 'loggedIn');
}
async load() {
this.app.resourceManager.define(asyncTasksResource);
const asyncTaskManager = this.app.container.get<AsyncTasksManager>('AsyncTaskManager');
this.app.on(`ws:message:request:async-tasks:list`, async (message) => {
const { tags } = message;
this.app.logger.info(`Received request for async tasks with tags: ${JSON.stringify(tags)}`);
const userTag = tags?.find((tag) => tag.startsWith('userId#'));
const userId = userTag ? userTag.split('#')[1] : null;
if (userId) {
this.app.logger.info(`Fetching tasks for userId: ${userId}`);
const tasks = await asyncTaskManager.getTasksByTag('userId', userId);
this.app.logger.info(`Found ${tasks.length} tasks for userId: ${userId}`);
this.app.emit('ws:sendToTag', {
tagKey: 'userId',
tagValue: userId,
message: {
type: 'async-tasks',
payload: tasks.map((task) => task.toJSON()),
},
});
} else {
this.app.logger.warn(`No userId found in message tags: ${JSON.stringify(tags)}`);
}
});
asyncTaskManager.on('taskCreated', ({ task }) => {
const userId = task.tags['userId'];
if (userId) {
this.app.emit('ws:sendToTag', {
tagKey: 'userId',
tagValue: userId,
message: {
type: 'async-tasks:created',
payload: task.toJSON(),
},
});
}
});
asyncTaskManager.on('taskProgress', ({ task, progress }) => {
const userId = task.tags['userId'];
if (userId) {
this.app.emit('ws:sendToTag', {
tagKey: 'userId',
tagValue: userId,
message: {
type: 'async-tasks:progress',
payload: {
taskId: task.taskId,
progress,
},
},
});
}
});
asyncTaskManager.on('taskStatusChange', ({ task, status }) => {
const userId = task.tags['userId'];
if (userId) {
this.app.emit('ws:sendToTag', {
tagKey: 'userId',
tagValue: userId,
message: {
type: 'async-tasks:status',
payload: {
taskId: task.taskId,
status: task.toJSON().status,
},
},
});
}
});
asyncTaskManager.on('taskStatusChange', ({ status }) => {
if (status.type === 'success') {
this.app.emit('workflow:dispatch');
}
});
this.app.on('ws:message:request:async-tasks:cancel', async (message) => {
const { payload, tags } = message;
const { taskId } = payload;
const userTag = tags?.find((tag) => tag.startsWith('userId#'));
const userId = userTag ? userTag.split('#')[1] : null;
if (userId) {
const task = asyncTaskManager.getTask(taskId);
if (task.tags['userId'] != userId) {
return;
}
const cancelled = await asyncTaskManager.cancelTask(taskId);
if (cancelled) {
this.app.emit('ws:sendToTag', {
tagKey: 'userId',
tagValue: userId,
message: {
type: 'async-tasks:cancelled',
payload: { taskId },
},
});
}
}
});
}
}
export default PluginAsyncExportServer;

View File

@ -0,0 +1,40 @@
import fs from 'fs';
import { basename } from 'path';
export default {
name: 'asyncTasks',
actions: {
async get(ctx, next) {
const { filterByTk } = ctx.action.params;
const taskManager = ctx.app.container.get('AsyncTaskManager');
const taskStatus = await taskManager.getTaskStatus(filterByTk);
ctx.body = taskStatus;
await next();
},
async fetchFile(ctx, next) {
const { filterByTk } = ctx.action.params;
const taskManager = ctx.app.container.get('AsyncTaskManager');
const taskStatus = await taskManager.getTaskStatus(filterByTk);
// throw error if task is not success
if (taskStatus.type !== 'success') {
throw new Error('Task is not success status');
}
const { filePath } = taskStatus.payload;
if (!filePath) {
throw new Error('not a file task');
}
// send file to client
ctx.body = fs.createReadStream(filePath);
ctx.set({
'Content-Type': 'application/octet-stream',
'Content-Disposition': `attachment; filename=${basename(filePath)}`,
});
await next();
},
},
};

View File

@ -0,0 +1,5 @@
import { appendToBuiltInPlugins } from '@nocobase/server';
export async function staticImport() {
await appendToBuiltInPlugins('@nocobase/plugin-async-task-manager');
}

View File

@ -0,0 +1,194 @@
import { v4 as uuidv4 } from 'uuid';
import EventEmitter from 'events';
import { AbortController } from 'abort-controller';
import { Logger } from '@nocobase/logger';
import { TaskOptions, TaskStatus, CancelError } from './interfaces/async-task-manager';
import { ITask } from './interfaces/task';
import Application from '@nocobase/server';
import PluginErrorHandler, { ErrorHandler } from '@nocobase/plugin-error-handler';
export abstract class TaskType extends EventEmitter implements ITask {
static type: string;
public status: TaskStatus;
protected logger: Logger;
protected app: Application;
public progress: {
total: number;
current: number;
} = {
total: 0,
current: 0,
};
public startedAt: Date;
public fulfilledAt: Date;
public taskId: string;
public tags: Record<string, string>;
public createdAt: Date;
public context?: any;
public title;
protected abortController: AbortController = new AbortController();
private _isCancelled = false;
get isCancelled() {
return this._isCancelled;
}
constructor(
protected options: TaskOptions,
tags?: Record<string, string>,
) {
super();
this.status = {
type: 'pending',
indicator: 'spinner',
};
this.taskId = uuidv4();
this.tags = tags || {};
this.createdAt = new Date();
}
setLogger(logger: Logger) {
this.logger = logger;
}
setApp(app: Application) {
this.app = app;
}
setContext(context: any) {
this.context = context;
}
/**
* Cancel the task
*/
async cancel() {
this._isCancelled = true;
this.abortController.abort();
this.logger?.debug(`Task ${this.taskId} cancelled`);
return true;
}
/**
* Execute the task implementation
* @returns Promise that resolves with the task result
*/
abstract execute(): Promise<any>;
/**
* Report task progress
* @param progress Progress information containing total and current values
*/
reportProgress(progress: { total: number; current: number }) {
this.progress = progress;
this.logger?.debug(`Task ${this.taskId} progress update - current: ${progress.current}, total: ${progress.total}`);
this.emit('progress', progress);
}
/**
* Run the task
* This method handles task lifecycle, including:
* - Status management
* - Error handling
* - Progress tracking
* - Event emission
*/
async run() {
this.startedAt = new Date();
this.logger?.info(`Starting task ${this.taskId}, type: ${(this.constructor as typeof TaskType).type}`);
this.status = {
type: 'running',
indicator: 'progress',
};
this.emit('statusChange', this.status);
try {
if (this._isCancelled) {
this.logger?.info(`Task ${this.taskId} was cancelled before execution`);
this.status = {
type: 'cancelled',
};
this.emit('statusChange', this.status);
return;
}
const executePromise = this.execute();
const result = await executePromise;
this.status = {
type: 'success',
indicator: 'success',
payload: result,
};
this.logger?.info(`Task ${this.taskId} completed successfully with result: ${JSON.stringify(result)}`);
this.emit('statusChange', this.status);
} catch (error) {
if (error instanceof CancelError) {
this.status = {
type: 'cancelled',
};
this.logger?.info(`Task ${this.taskId} was cancelled during execution`);
} else {
this.status = {
type: 'failed',
indicator: 'error',
errors: [{ message: this.renderErrorMessage(error) }],
};
this.logger?.error(`Task ${this.taskId} failed with error: ${error.message}`);
}
this.emit('statusChange', this.status);
} finally {
this.fulfilledAt = new Date();
const duration = this.fulfilledAt.getTime() - this.startedAt.getTime();
this.logger?.info(`Task ${this.taskId} finished in ${duration}ms`);
}
}
private renderErrorMessage(error: Error) {
const errorHandlerPlugin = this.app.pm.get('error-handler') as PluginErrorHandler;
if (!errorHandlerPlugin || !this.context) {
return error.message;
}
const errorHandler: ErrorHandler = errorHandlerPlugin.errorHandler;
errorHandler.renderError(error, this.context);
return this.context.body.errors[0].message;
}
toJSON(options?: { raw?: boolean }) {
const json = {
taskId: this.taskId,
status: { ...this.status },
progress: this.progress,
tags: this.tags,
createdAt: this.createdAt,
startedAt: this.startedAt,
fulfilledAt: this.fulfilledAt,
title: this.title,
};
// If not in raw mode and the status is success with a file path, transform the status format
if (!options?.raw && json.status.type === 'success' && json.status.payload?.filePath) {
json.status = {
type: 'success',
indicator: 'success',
resultType: 'file',
payload: {},
};
}
return json;
}
}