Compare commits

...

2 Commits

Author SHA1 Message Date
aaaaaajie
478d836404 feat: cancel task 2025-05-03 17:27:00 +08:00
aaaaaajie
284ccd5bc1 fix: build 2025-05-03 13:34:06 +08:00
5 changed files with 32 additions and 26 deletions

View File

@ -12,7 +12,7 @@
"@nocobase/server": "1.x",
"@nocobase/test": "1.x"
},
"devDependencies": {
"p-queue": "^8.1.0"
"dependencies": {
"p-queue": "^6.6.2"
}
}

View File

@ -66,14 +66,17 @@ export class BaseTaskManager extends EventEmitter implements AsyncTasksManager {
}
private enqueueTask(task: ITask): void {
this.queue.add(async () => {
try {
this.logger.debug(`Starting execution of task ${task.taskId} from queue`);
await task.run();
} catch (error) {
this.logger.error(`Error executing task ${task.taskId} from queue: ${error.message}`);
const taskHandler = async () => {
if (task.status.type === 'pending') {
try {
this.logger.debug(`Starting execution of task ${task.taskId} from queue`);
await task.run();
} catch (error) {
this.logger.error(`Error executing task ${task.taskId} from queue: ${error.message}`);
}
}
});
};
this.queue.add(taskHandler);
}
pauseQueue(): void {
@ -95,8 +98,11 @@ export class BaseTaskManager extends EventEmitter implements AsyncTasksManager {
this.logger.warn(`Attempted to cancel non-existent task ${taskId}`);
return false;
}
this.logger.info(`Cancelling task ${taskId}, type: ${task.constructor.name}, tags: ${JSON.stringify(task.tags)}`);
if (task.status.type === 'pending') {
await task.statusChange({ type: 'cancelled' });
return true;
}
return task.cancel();
}

View File

@ -1,3 +1,12 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import { Logger } from '@nocobase/logger';
import { TaskStatus } from './async-task-manager';
import { EventEmitter } from 'events';
@ -22,6 +31,7 @@ export interface ITask extends EventEmitter {
setContext(context: any): void;
cancel(): Promise<boolean>;
statusChange: (status: TaskStatus) => Promise<void>;
execute(): Promise<any>;
reportProgress(progress: { total: number; current: number }): void;
run(): Promise<void>;

View File

@ -85,6 +85,11 @@ export abstract class TaskType extends EventEmitter implements ITask {
return true;
}
async statusChange(status: TaskStatus) {
this.status = status;
this.emit('statusChange', this.status);
}
/**
* Execute the task implementation
* @returns Promise that resolves with the task result
@ -143,9 +148,7 @@ export abstract class TaskType extends EventEmitter implements ITask {
this.emit('statusChange', this.status);
} catch (error) {
if (error instanceof CancelError) {
this.status = {
type: 'cancelled',
};
this.statusChange({ type: 'cancelled' });
this.logger?.info(`Task ${this.taskId} was cancelled during execution`);
} else {
this.status = {

View File

@ -29388,14 +29388,6 @@ p-queue@^6.6.2:
eventemitter3 "^4.0.4"
p-timeout "^3.2.0"
p-queue@^8.1.0:
version "8.1.0"
resolved "https://registry.npmmirror.com/p-queue/-/p-queue-8.1.0.tgz#d71929249868b10b16f885d8a82beeaf35d32279"
integrity sha512-mxLDbbGIBEXTJL0zEx8JIylaj3xQ7Z/7eEVjcF9fJX4DBiH9oqe+oahYnlKKxm0Ci9TlWTyhSHgygxMxjIB2jw==
dependencies:
eventemitter3 "^5.0.1"
p-timeout "^6.1.2"
p-reduce@^2.0.0, p-reduce@^2.1.0:
version "2.1.0"
resolved "https://registry.npmmirror.com/p-reduce/-/p-reduce-2.1.0.tgz#09408da49507c6c274faa31f28df334bc712b64a"
@ -29416,11 +29408,6 @@ p-timeout@^3.2.0:
dependencies:
p-finally "^1.0.0"
p-timeout@^6.1.2:
version "6.1.4"
resolved "https://registry.npmmirror.com/p-timeout/-/p-timeout-6.1.4.tgz#418e1f4dd833fa96a2e3f532547dd2abdb08dbc2"
integrity sha512-MyIV3ZA/PmyBN/ud8vV9XzwTrNtR4jFrObymZYnZqMmW0zA8Z17vnT0rBgFE/TlohB+YCHqXMgZzb3Csp49vqg==
p-try@^1.0.0:
version "1.0.0"
resolved "https://registry.npmmirror.com/p-try/-/p-try-1.0.0.tgz#cbc79cdbaf8fd4228e13f621f2b1a237c1b207b3"