Compare commits

...

2 Commits

Author SHA1 Message Date
aaaaaajie
478d836404 feat: cancel task 2025-05-03 17:27:00 +08:00
aaaaaajie
284ccd5bc1 fix: build 2025-05-03 13:34:06 +08:00
5 changed files with 32 additions and 26 deletions

View File

@ -12,7 +12,7 @@
"@nocobase/server": "1.x", "@nocobase/server": "1.x",
"@nocobase/test": "1.x" "@nocobase/test": "1.x"
}, },
"devDependencies": { "dependencies": {
"p-queue": "^8.1.0" "p-queue": "^6.6.2"
} }
} }

View File

@ -66,14 +66,17 @@ export class BaseTaskManager extends EventEmitter implements AsyncTasksManager {
} }
private enqueueTask(task: ITask): void { private enqueueTask(task: ITask): void {
this.queue.add(async () => { const taskHandler = async () => {
try { if (task.status.type === 'pending') {
this.logger.debug(`Starting execution of task ${task.taskId} from queue`); try {
await task.run(); this.logger.debug(`Starting execution of task ${task.taskId} from queue`);
} catch (error) { await task.run();
this.logger.error(`Error executing task ${task.taskId} from queue: ${error.message}`); } catch (error) {
this.logger.error(`Error executing task ${task.taskId} from queue: ${error.message}`);
}
} }
}); };
this.queue.add(taskHandler);
} }
pauseQueue(): void { pauseQueue(): void {
@ -95,8 +98,11 @@ export class BaseTaskManager extends EventEmitter implements AsyncTasksManager {
this.logger.warn(`Attempted to cancel non-existent task ${taskId}`); this.logger.warn(`Attempted to cancel non-existent task ${taskId}`);
return false; return false;
} }
this.logger.info(`Cancelling task ${taskId}, type: ${task.constructor.name}, tags: ${JSON.stringify(task.tags)}`); this.logger.info(`Cancelling task ${taskId}, type: ${task.constructor.name}, tags: ${JSON.stringify(task.tags)}`);
if (task.status.type === 'pending') {
await task.statusChange({ type: 'cancelled' });
return true;
}
return task.cancel(); return task.cancel();
} }

View File

@ -1,3 +1,12 @@
/**
* This file is part of the NocoBase (R) project.
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
* Authors: NocoBase Team.
*
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
* For more information, please refer to: https://www.nocobase.com/agreement.
*/
import { Logger } from '@nocobase/logger'; import { Logger } from '@nocobase/logger';
import { TaskStatus } from './async-task-manager'; import { TaskStatus } from './async-task-manager';
import { EventEmitter } from 'events'; import { EventEmitter } from 'events';
@ -22,6 +31,7 @@ export interface ITask extends EventEmitter {
setContext(context: any): void; setContext(context: any): void;
cancel(): Promise<boolean>; cancel(): Promise<boolean>;
statusChange: (status: TaskStatus) => Promise<void>;
execute(): Promise<any>; execute(): Promise<any>;
reportProgress(progress: { total: number; current: number }): void; reportProgress(progress: { total: number; current: number }): void;
run(): Promise<void>; run(): Promise<void>;

View File

@ -85,6 +85,11 @@ export abstract class TaskType extends EventEmitter implements ITask {
return true; return true;
} }
async statusChange(status: TaskStatus) {
this.status = status;
this.emit('statusChange', this.status);
}
/** /**
* Execute the task implementation * Execute the task implementation
* @returns Promise that resolves with the task result * @returns Promise that resolves with the task result
@ -143,9 +148,7 @@ export abstract class TaskType extends EventEmitter implements ITask {
this.emit('statusChange', this.status); this.emit('statusChange', this.status);
} catch (error) { } catch (error) {
if (error instanceof CancelError) { if (error instanceof CancelError) {
this.status = { this.statusChange({ type: 'cancelled' });
type: 'cancelled',
};
this.logger?.info(`Task ${this.taskId} was cancelled during execution`); this.logger?.info(`Task ${this.taskId} was cancelled during execution`);
} else { } else {
this.status = { this.status = {

View File

@ -29388,14 +29388,6 @@ p-queue@^6.6.2:
eventemitter3 "^4.0.4" eventemitter3 "^4.0.4"
p-timeout "^3.2.0" p-timeout "^3.2.0"
p-queue@^8.1.0:
version "8.1.0"
resolved "https://registry.npmmirror.com/p-queue/-/p-queue-8.1.0.tgz#d71929249868b10b16f885d8a82beeaf35d32279"
integrity sha512-mxLDbbGIBEXTJL0zEx8JIylaj3xQ7Z/7eEVjcF9fJX4DBiH9oqe+oahYnlKKxm0Ci9TlWTyhSHgygxMxjIB2jw==
dependencies:
eventemitter3 "^5.0.1"
p-timeout "^6.1.2"
p-reduce@^2.0.0, p-reduce@^2.1.0: p-reduce@^2.0.0, p-reduce@^2.1.0:
version "2.1.0" version "2.1.0"
resolved "https://registry.npmmirror.com/p-reduce/-/p-reduce-2.1.0.tgz#09408da49507c6c274faa31f28df334bc712b64a" resolved "https://registry.npmmirror.com/p-reduce/-/p-reduce-2.1.0.tgz#09408da49507c6c274faa31f28df334bc712b64a"
@ -29416,11 +29408,6 @@ p-timeout@^3.2.0:
dependencies: dependencies:
p-finally "^1.0.0" p-finally "^1.0.0"
p-timeout@^6.1.2:
version "6.1.4"
resolved "https://registry.npmmirror.com/p-timeout/-/p-timeout-6.1.4.tgz#418e1f4dd833fa96a2e3f532547dd2abdb08dbc2"
integrity sha512-MyIV3ZA/PmyBN/ud8vV9XzwTrNtR4jFrObymZYnZqMmW0zA8Z17vnT0rBgFE/TlohB+YCHqXMgZzb3Csp49vqg==
p-try@^1.0.0: p-try@^1.0.0:
version "1.0.0" version "1.0.0"
resolved "https://registry.npmmirror.com/p-try/-/p-try-1.0.0.tgz#cbc79cdbaf8fd4228e13f621f2b1a237c1b207b3" resolved "https://registry.npmmirror.com/p-try/-/p-try-1.0.0.tgz#cbc79cdbaf8fd4228e13f621f2b1a237c1b207b3"