diff --git a/packages/plugins/@nocobase/plugin-async-task-manager/src/server/base-task-manager.ts b/packages/plugins/@nocobase/plugin-async-task-manager/src/server/base-task-manager.ts index 81e592d206..3aa9050113 100644 --- a/packages/plugins/@nocobase/plugin-async-task-manager/src/server/base-task-manager.ts +++ b/packages/plugins/@nocobase/plugin-async-task-manager/src/server/base-task-manager.ts @@ -66,14 +66,17 @@ export class BaseTaskManager extends EventEmitter implements AsyncTasksManager { } private enqueueTask(task: ITask): void { - this.queue.add(async () => { - try { - this.logger.debug(`Starting execution of task ${task.taskId} from queue`); - await task.run(); - } catch (error) { - this.logger.error(`Error executing task ${task.taskId} from queue: ${error.message}`); + const taskHandler = async () => { + if (task.status.type === 'pending') { + try { + this.logger.debug(`Starting execution of task ${task.taskId} from queue`); + await task.run(); + } catch (error) { + this.logger.error(`Error executing task ${task.taskId} from queue: ${error.message}`); + } } - }); + }; + this.queue.add(taskHandler); } pauseQueue(): void { @@ -95,8 +98,11 @@ export class BaseTaskManager extends EventEmitter implements AsyncTasksManager { this.logger.warn(`Attempted to cancel non-existent task ${taskId}`); return false; } - this.logger.info(`Cancelling task ${taskId}, type: ${task.constructor.name}, tags: ${JSON.stringify(task.tags)}`); + if (task.status.type === 'pending') { + await task.statusChange({ type: 'cancelled' }); + return true; + } return task.cancel(); } diff --git a/packages/plugins/@nocobase/plugin-async-task-manager/src/server/interfaces/task.ts b/packages/plugins/@nocobase/plugin-async-task-manager/src/server/interfaces/task.ts index 81c1438944..a029e68b48 100644 --- a/packages/plugins/@nocobase/plugin-async-task-manager/src/server/interfaces/task.ts +++ b/packages/plugins/@nocobase/plugin-async-task-manager/src/server/interfaces/task.ts @@ -1,3 +1,12 @@ +/** + * This file is part of the NocoBase (R) project. + * Copyright (c) 2020-2024 NocoBase Co., Ltd. + * Authors: NocoBase Team. + * + * This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License. + * For more information, please refer to: https://www.nocobase.com/agreement. + */ + import { Logger } from '@nocobase/logger'; import { TaskStatus } from './async-task-manager'; import { EventEmitter } from 'events'; @@ -22,6 +31,7 @@ export interface ITask extends EventEmitter { setContext(context: any): void; cancel(): Promise; + statusChange: (status: TaskStatus) => Promise; execute(): Promise; reportProgress(progress: { total: number; current: number }): void; run(): Promise; diff --git a/packages/plugins/@nocobase/plugin-async-task-manager/src/server/task-type.ts b/packages/plugins/@nocobase/plugin-async-task-manager/src/server/task-type.ts index a79e8626e5..20ae3c0f43 100644 --- a/packages/plugins/@nocobase/plugin-async-task-manager/src/server/task-type.ts +++ b/packages/plugins/@nocobase/plugin-async-task-manager/src/server/task-type.ts @@ -85,6 +85,11 @@ export abstract class TaskType extends EventEmitter implements ITask { return true; } + async statusChange(status: TaskStatus) { + this.status = status; + this.emit('statusChange', this.status); + } + /** * Execute the task implementation * @returns Promise that resolves with the task result @@ -143,9 +148,7 @@ export abstract class TaskType extends EventEmitter implements ITask { this.emit('statusChange', this.status); } catch (error) { if (error instanceof CancelError) { - this.status = { - type: 'cancelled', - }; + this.statusChange({ type: 'cancelled' }); this.logger?.info(`Task ${this.taskId} was cancelled during execution`); } else { this.status = {