feat(server): add env based serving check for different workers

This commit is contained in:
mytharcher 2025-06-27 00:02:32 +08:00
parent 5a49e9d4e8
commit 5478723c8d
3 changed files with 81 additions and 23 deletions

View File

@ -35,6 +35,14 @@ LOGGER_FORMAT=
# The cluster mode will not work in development mode either.
CLUSTER_MODE=
# Set server running mode, which determines how the server processes requests and background jobs.
# - `<EMPTY>`: run as a standalone server, process all requests and background jobs.
# - `!`: run as a master server, only process user requests from UI (http), but not background jobs.
# - `*`: run as a worker server, only process background jobs.
# - `<topic1>,<topic2>`: run as a worker server, only process background jobs for the specified topics.
# - `!,<topic2>`: run as mixed mode, process user requests from UI (http) and background jobs for the specified topics.
WORKER_MODE=
################# DATABASE #################
# postgres | mysql | mariadb | sqlite

View File

@ -454,6 +454,32 @@ export class Application<StateT = DefaultState, ContextT = DefaultContext> exten
return this._aesEncryptor;
}
/**
* Check if the application is serving as a specific worker.
* @experimental
*/
public serving(key?: string): boolean {
const { WORKER_MODE = '' } = process.env;
if (!WORKER_MODE) {
return true;
}
const topics = WORKER_MODE.trim().split(',');
if (key) {
if (WORKER_MODE === '*') {
return true;
}
if (topics.includes(key)) {
return true;
}
return false;
} else {
if (topics.includes('!')) {
return true;
}
return false;
}
}
/**
* @internal
*/

View File

@ -56,6 +56,8 @@ export type EventOptions = {
type CachedEvent = [WorkflowModel, any, EventOptions];
const WORKER_MODE_WORKFLOW_PROCESS = 'workflow:process';
export default class PluginWorkflowServer extends Plugin {
instructions: Registry<InstructionInterface> = new Registry();
triggers: Registry<Trigger> = new Registry();
@ -64,7 +66,7 @@ export default class PluginWorkflowServer extends Plugin {
snowflake: Snowflake;
private ready = false;
private executing: Promise<void> | null = null;
private executing: Promise<any> | null = null;
private pending: Pending[] = [];
private events: CachedEvent[] = [];
private eventsCount = 0;
@ -79,13 +81,15 @@ export default class PluginWorkflowServer extends Plugin {
filterByTk: event.executionId,
});
if (!execution || execution.status !== EXECUTION_STATUS.QUEUEING) {
this.getLogger('dispatcher').info(
`execution (${event.executionId}) from queue not found or not in queueing status, skip`,
);
return;
}
this.getLogger(execution.workflowId).info(
`execution (${execution.id}) received from queue, adding to pending list`,
);
this.pending.push([execution]);
this.dispatch();
this.run(execution);
};
private onBeforeSave = async (instance: WorkflowModel, { transaction, cycling }) => {
@ -345,10 +349,12 @@ export default class PluginWorkflowServer extends Plugin {
custom_epoch: pluginRecord?.createdAt.getTime(),
});
this.app.backgroundJobManager.subscribe(`${this.name}.pendingExecution`, {
idle: () => !this.executing && !this.pending.length && !this.events.length,
process: this.onQueueExecution,
});
if (this.app.serving(WORKER_MODE_WORKFLOW_PROCESS)) {
this.app.backgroundJobManager.subscribe(`${this.name}.pendingExecution`, {
idle: () => !this.executing && !this.pending.length && !this.events.length,
process: this.onQueueExecution,
});
}
}
/**
@ -522,20 +528,32 @@ export default class PluginWorkflowServer extends Plugin {
return null;
}
public async resume(job) {
if (!job.execution) {
job.execution = await job.getExecution();
}
this.getLogger(job.execution.workflowId).info(
`execution (${job.execution.id}) resuming from job (${job.id}) added to pending list`,
);
this.pending.push([job.execution, job]);
if (this.executing) {
async run(execution: ExecutionModel, job?: JobModel): Promise<void> {
while (this.executing) {
await this.executing;
}
this.executing = this.process(execution, job);
await this.executing;
this.executing = null;
this.dispatch();
}
public async resume(job) {
let { execution } = job;
if (!execution) {
execution = await job.getExecution();
}
this.getLogger(execution.workflowId).info(
`execution (${execution.id}) resuming from job (${job.id}) added to pending list`,
);
this.run(execution, job);
}
/**
* Start a deferred execution
* @experimental
@ -545,11 +563,8 @@ export default class PluginWorkflowServer extends Plugin {
return;
}
this.getLogger(execution.workflowId).info(`starting deferred execution (${execution.id})`);
this.pending.push([execution]);
if (this.executing) {
await this.executing;
}
this.dispatch();
this.run(execution);
}
private async validateEvent(workflow: WorkflowModel, context: any, options: EventOptions) {
@ -692,6 +707,13 @@ export default class PluginWorkflowServer extends Plugin {
return;
}
if (!this.app.serving(WORKER_MODE_WORKFLOW_PROCESS)) {
this.getLogger('dispatcher').warn(
`${WORKER_MODE_WORKFLOW_PROCESS} is not serving, new dispatching will be ignored`,
);
return;
}
if (this.executing) {
this.getLogger('dispatcher').warn(`workflow executing is not finished, new dispatching will be ignored`);
return;
@ -717,7 +739,9 @@ export default class PluginWorkflowServer extends Plugin {
async (transaction) => {
const execution = (await this.db.getRepository('executions').findOne({
filter: {
status: EXECUTION_STATUS.QUEUEING,
status: {
[Op.is]: EXECUTION_STATUS.QUEUEING,
},
'workflow.enabled': true,
},
sort: 'id',
@ -763,7 +787,7 @@ export default class PluginWorkflowServer extends Plugin {
if (execution.status === EXECUTION_STATUS.QUEUEING) {
const transaction = await this.useDataSourceTransaction('main', options.transaction);
await execution.update({ status: EXECUTION_STATUS.STARTED }, { transaction });
logger.info(`queueing execution (${execution.id}) from pending list updated to started`);
logger.info(`execution (${execution.id}) from pending list updated to started`);
}
const processor = this.createProcessor(execution, options);