feat: cancel task

This commit is contained in:
aaaaaajie 2025-05-03 17:27:00 +08:00
parent 284ccd5bc1
commit 478d836404
3 changed files with 30 additions and 11 deletions

View File

@ -66,14 +66,17 @@ export class BaseTaskManager extends EventEmitter implements AsyncTasksManager {
} }
private enqueueTask(task: ITask): void { private enqueueTask(task: ITask): void {
this.queue.add(async () => { const taskHandler = async () => {
try { if (task.status.type === 'pending') {
this.logger.debug(`Starting execution of task ${task.taskId} from queue`); try {
await task.run(); this.logger.debug(`Starting execution of task ${task.taskId} from queue`);
} catch (error) { await task.run();
this.logger.error(`Error executing task ${task.taskId} from queue: ${error.message}`); } catch (error) {
this.logger.error(`Error executing task ${task.taskId} from queue: ${error.message}`);
}
} }
}); };
this.queue.add(taskHandler);
} }
pauseQueue(): void { pauseQueue(): void {
@ -95,8 +98,11 @@ export class BaseTaskManager extends EventEmitter implements AsyncTasksManager {
this.logger.warn(`Attempted to cancel non-existent task ${taskId}`); this.logger.warn(`Attempted to cancel non-existent task ${taskId}`);
return false; return false;
} }
this.logger.info(`Cancelling task ${taskId}, type: ${task.constructor.name}, tags: ${JSON.stringify(task.tags)}`); this.logger.info(`Cancelling task ${taskId}, type: ${task.constructor.name}, tags: ${JSON.stringify(task.tags)}`);
if (task.status.type === 'pending') {
await task.statusChange({ type: 'cancelled' });
return true;
}
return task.cancel(); return task.cancel();
} }

View File

@ -1,3 +1,12 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import { Logger } from '@nocobase/logger'; import { Logger } from '@nocobase/logger';
import { TaskStatus } from './async-task-manager'; import { TaskStatus } from './async-task-manager';
import { EventEmitter } from 'events'; import { EventEmitter } from 'events';
@ -22,6 +31,7 @@ export interface ITask extends EventEmitter {
setContext(context: any): void; setContext(context: any): void;
cancel(): Promise<boolean>; cancel(): Promise<boolean>;
statusChange: (status: TaskStatus) => Promise<void>;
execute(): Promise<any>; execute(): Promise<any>;
reportProgress(progress: { total: number; current: number }): void; reportProgress(progress: { total: number; current: number }): void;
run(): Promise<void>; run(): Promise<void>;

View File

@ -85,6 +85,11 @@ export abstract class TaskType extends EventEmitter implements ITask {
return true; return true;
} }
async statusChange(status: TaskStatus) {
this.status = status;
this.emit('statusChange', this.status);
}
/** /**
* Execute the task implementation * Execute the task implementation
* @returns Promise that resolves with the task result * @returns Promise that resolves with the task result
@ -143,9 +148,7 @@ export abstract class TaskType extends EventEmitter implements ITask {
this.emit('statusChange', this.status); this.emit('statusChange', this.status);
} catch (error) { } catch (error) {
if (error instanceof CancelError) { if (error instanceof CancelError) {
this.status = { this.statusChange({ type: 'cancelled' });
type: 'cancelled',
};
this.logger?.info(`Task ${this.taskId} was cancelled during execution`); this.logger?.info(`Task ${this.taskId} was cancelled during execution`);
} else { } else {
this.status = { this.status = {