mirror of
https://gitee.com/nocobase/nocobase.git
synced 2025-05-05 05:29:26 +08:00
Compare commits
2 Commits
7e80f44195
...
478d836404
Author | SHA1 | Date | |
---|---|---|---|
|
478d836404 | ||
|
284ccd5bc1 |
@ -12,7 +12,7 @@
|
|||||||
"@nocobase/server": "1.x",
|
"@nocobase/server": "1.x",
|
||||||
"@nocobase/test": "1.x"
|
"@nocobase/test": "1.x"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"dependencies": {
|
||||||
"p-queue": "^8.1.0"
|
"p-queue": "^6.6.2"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -66,14 +66,17 @@ export class BaseTaskManager extends EventEmitter implements AsyncTasksManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private enqueueTask(task: ITask): void {
|
private enqueueTask(task: ITask): void {
|
||||||
this.queue.add(async () => {
|
const taskHandler = async () => {
|
||||||
try {
|
if (task.status.type === 'pending') {
|
||||||
this.logger.debug(`Starting execution of task ${task.taskId} from queue`);
|
try {
|
||||||
await task.run();
|
this.logger.debug(`Starting execution of task ${task.taskId} from queue`);
|
||||||
} catch (error) {
|
await task.run();
|
||||||
this.logger.error(`Error executing task ${task.taskId} from queue: ${error.message}`);
|
} catch (error) {
|
||||||
|
this.logger.error(`Error executing task ${task.taskId} from queue: ${error.message}`);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
};
|
||||||
|
this.queue.add(taskHandler);
|
||||||
}
|
}
|
||||||
|
|
||||||
pauseQueue(): void {
|
pauseQueue(): void {
|
||||||
@ -95,8 +98,11 @@ export class BaseTaskManager extends EventEmitter implements AsyncTasksManager {
|
|||||||
this.logger.warn(`Attempted to cancel non-existent task ${taskId}`);
|
this.logger.warn(`Attempted to cancel non-existent task ${taskId}`);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.logger.info(`Cancelling task ${taskId}, type: ${task.constructor.name}, tags: ${JSON.stringify(task.tags)}`);
|
this.logger.info(`Cancelling task ${taskId}, type: ${task.constructor.name}, tags: ${JSON.stringify(task.tags)}`);
|
||||||
|
if (task.status.type === 'pending') {
|
||||||
|
await task.statusChange({ type: 'cancelled' });
|
||||||
|
return true;
|
||||||
|
}
|
||||||
return task.cancel();
|
return task.cancel();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,3 +1,12 @@
|
|||||||
|
/**
|
||||||
|
* This file is part of the NocoBase (R) project.
|
||||||
|
* Copyright (c) 2020-2024 NocoBase Co., Ltd.
|
||||||
|
* Authors: NocoBase Team.
|
||||||
|
*
|
||||||
|
* This project is dual-licensed under AGPL-3.0 and NocoBase Commercial License.
|
||||||
|
* For more information, please refer to: https://www.nocobase.com/agreement.
|
||||||
|
*/
|
||||||
|
|
||||||
import { Logger } from '@nocobase/logger';
|
import { Logger } from '@nocobase/logger';
|
||||||
import { TaskStatus } from './async-task-manager';
|
import { TaskStatus } from './async-task-manager';
|
||||||
import { EventEmitter } from 'events';
|
import { EventEmitter } from 'events';
|
||||||
@ -22,6 +31,7 @@ export interface ITask extends EventEmitter {
|
|||||||
setContext(context: any): void;
|
setContext(context: any): void;
|
||||||
|
|
||||||
cancel(): Promise<boolean>;
|
cancel(): Promise<boolean>;
|
||||||
|
statusChange: (status: TaskStatus) => Promise<void>;
|
||||||
execute(): Promise<any>;
|
execute(): Promise<any>;
|
||||||
reportProgress(progress: { total: number; current: number }): void;
|
reportProgress(progress: { total: number; current: number }): void;
|
||||||
run(): Promise<void>;
|
run(): Promise<void>;
|
||||||
|
@ -85,6 +85,11 @@ export abstract class TaskType extends EventEmitter implements ITask {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async statusChange(status: TaskStatus) {
|
||||||
|
this.status = status;
|
||||||
|
this.emit('statusChange', this.status);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Execute the task implementation
|
* Execute the task implementation
|
||||||
* @returns Promise that resolves with the task result
|
* @returns Promise that resolves with the task result
|
||||||
@ -143,9 +148,7 @@ export abstract class TaskType extends EventEmitter implements ITask {
|
|||||||
this.emit('statusChange', this.status);
|
this.emit('statusChange', this.status);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
if (error instanceof CancelError) {
|
if (error instanceof CancelError) {
|
||||||
this.status = {
|
this.statusChange({ type: 'cancelled' });
|
||||||
type: 'cancelled',
|
|
||||||
};
|
|
||||||
this.logger?.info(`Task ${this.taskId} was cancelled during execution`);
|
this.logger?.info(`Task ${this.taskId} was cancelled during execution`);
|
||||||
} else {
|
} else {
|
||||||
this.status = {
|
this.status = {
|
||||||
|
13
yarn.lock
13
yarn.lock
@ -29388,14 +29388,6 @@ p-queue@^6.6.2:
|
|||||||
eventemitter3 "^4.0.4"
|
eventemitter3 "^4.0.4"
|
||||||
p-timeout "^3.2.0"
|
p-timeout "^3.2.0"
|
||||||
|
|
||||||
p-queue@^8.1.0:
|
|
||||||
version "8.1.0"
|
|
||||||
resolved "https://registry.npmmirror.com/p-queue/-/p-queue-8.1.0.tgz#d71929249868b10b16f885d8a82beeaf35d32279"
|
|
||||||
integrity sha512-mxLDbbGIBEXTJL0zEx8JIylaj3xQ7Z/7eEVjcF9fJX4DBiH9oqe+oahYnlKKxm0Ci9TlWTyhSHgygxMxjIB2jw==
|
|
||||||
dependencies:
|
|
||||||
eventemitter3 "^5.0.1"
|
|
||||||
p-timeout "^6.1.2"
|
|
||||||
|
|
||||||
p-reduce@^2.0.0, p-reduce@^2.1.0:
|
p-reduce@^2.0.0, p-reduce@^2.1.0:
|
||||||
version "2.1.0"
|
version "2.1.0"
|
||||||
resolved "https://registry.npmmirror.com/p-reduce/-/p-reduce-2.1.0.tgz#09408da49507c6c274faa31f28df334bc712b64a"
|
resolved "https://registry.npmmirror.com/p-reduce/-/p-reduce-2.1.0.tgz#09408da49507c6c274faa31f28df334bc712b64a"
|
||||||
@ -29416,11 +29408,6 @@ p-timeout@^3.2.0:
|
|||||||
dependencies:
|
dependencies:
|
||||||
p-finally "^1.0.0"
|
p-finally "^1.0.0"
|
||||||
|
|
||||||
p-timeout@^6.1.2:
|
|
||||||
version "6.1.4"
|
|
||||||
resolved "https://registry.npmmirror.com/p-timeout/-/p-timeout-6.1.4.tgz#418e1f4dd833fa96a2e3f532547dd2abdb08dbc2"
|
|
||||||
integrity sha512-MyIV3ZA/PmyBN/ud8vV9XzwTrNtR4jFrObymZYnZqMmW0zA8Z17vnT0rBgFE/TlohB+YCHqXMgZzb3Csp49vqg==
|
|
||||||
|
|
||||||
p-try@^1.0.0:
|
p-try@^1.0.0:
|
||||||
version "1.0.0"
|
version "1.0.0"
|
||||||
resolved "https://registry.npmmirror.com/p-try/-/p-try-1.0.0.tgz#cbc79cdbaf8fd4228e13f621f2b1a237c1b207b3"
|
resolved "https://registry.npmmirror.com/p-try/-/p-try-1.0.0.tgz#cbc79cdbaf8fd4228e13f621f2b1a237c1b207b3"
|
||||||
|
Loading…
x
Reference in New Issue
Block a user