import { JobStatus } from '@/shared/constants'; import EventEmitter from 'node:events'; import z, { ZodTypeAny } from 'zod'; export class TaskQueue { private activeQueue: { context: JobContext, promise?: Promise; }[] = []; private queue?: { context: JobContext, promise?: Promise; }[] = []; private events?: EventEmitter = new EventEmitter(); public enqueue> (id: string, job: T) { this.disposeSafeguard(); if (!this.queue || !this.events) throw new Error("Queue disposed"); const context = new JobContext(id, this.events, job); this.queue.push({ context }); this.events?.emit('queued', { id: context.id, job: context }); return this.processQueue(); } private processQueue () { if (!this.queue) return Promise.resolve(); const next = this.queue.filter(j => !j.context.job.group || !this.activeQueue.some(a => a.context.job.group === j.context.job.group)).map((job, i) => ({ i, job })); next.reverse().forEach(({ i }) => this.queue!.splice(i, 1)); next.forEach(job => { const promise = job.job.context.start(); job.job.promise = promise; this.activeQueue.push(job.job); promise.finally(() => { const index = this.activeQueue.indexOf(job.job); this.activeQueue.splice(index, 1); setTimeout(() => this.processQueue(), 0); }); }); } private disposeSafeguard () { if (!this.queue) throw new Error("Queue disposed"); } public hasActive () { return this.activeQueue.length > 0; } public hasActiveOfType (type: any) { for (const entry of this.activeQueue) { if (entry.context.job instanceof type) { return true; } } return false; } public waitForJob (id: string): Promise { const job = this.queue?.find(j => j.context.id === id) ?? this.activeQueue?.find(j => j.context.id === id); return job?.promise ?? Promise.resolve(); } public findJob> (id: string, type: new (...args: any[]) => T): IPublicJob | undefined { const job = this.queue?.find(j => j.context.id === id) ?? this.activeQueue?.find(j => j.context.id === id); if (job?.context.job instanceof type) { return job?.context; } return undefined; } public on (event: E, listener: E extends keyof EventsList ? EventsList[E] extends unknown[] ? (...args: EventsList[E]) => void : never : never): () => void { this.events?.on(event, listener); return () => this.events?.removeListener(event, listener); } public once (event: E, listener: E extends keyof EventsList ? EventsList[E] extends unknown[] ? (...args: EventsList[E]) => void : never : never) { this.events?.once(event, listener); } public async close () { this.queue = []; this.activeQueue.forEach(c => c.context.abort()); return Promise.all(this.activeQueue.map(c => c.promise)); } } export interface EventsList { started: [e: BaseEvent]; progress: [e: ProgressEvent]; abort: [e: AbortEvent]; /** Called when the job successfully completes */ completed: [e: CompletedEvent]; error: [e: ErrorEvent]; ended: [e: BaseEvent]; queued: [e: BaseEvent]; } interface BaseEvent { id: string; job: IPublicJob; } interface ErrorEvent extends BaseEvent { error: unknown; } interface AbortEvent extends BaseEvent { reason?: any; } interface ProgressEvent extends BaseEvent { progress: number; state?: string; } interface CompletedEvent extends BaseEvent { } export interface IJob { group?: string; start (context: JobContext, TData, TState>): Promise; exposeData?(): TData; } export interface IPublicJob> { progress: number; state?: string; status: JobStatus; job: T; abort: (reason?: any) => void; } type JobClass = new (...args: any[]) => IJob; type JobClassWithStatics = JobClass & { id: string; dataSchema?: any; }; export type JobContextFromClass = JobContext< InstanceType, C extends { dataSchema: ZodTypeAny; } ? z.infer : never, C['id'] >; export class JobContext, TData, TState extends string> implements IPublicJob { private m_id: string; private m_progress: number = 0; private m_state?: TState; private running: boolean = false; private aborted: boolean = false; private completed: boolean = false; private error?: any; private events: EventEmitter; private abortController: AbortController; private readonly m_job: T; constructor(id: string, events: EventEmitter, job: T) { this.m_id = id; this.m_job = job; this.abortController = new AbortController(); this.abortController.signal.addEventListener('abort', () => { this.aborted = true; this.events.emit('abort', { id: this.m_id, reason: this.abortController.signal.reason, job: this } satisfies AbortEvent); }); this.events = events; } public async start (): Promise { try { this.events.emit('started', { id: this.m_id, job: this }); await this.m_job.start(this); this.completed = true; this.events.emit('completed', { id: this.m_id, job: this }); } catch (error) { if (error !== 'cancel') { console.error(error); } this.events.emit('error', { id: this.m_id, job: this, error }); this.error = error; } finally { this.running = false; this.events.emit('ended', { id: this.m_id, job: this }); } } public get status (): JobStatus { if (this.completed) return 'completed'; if (this.error) return 'error'; if (this.aborted) return 'aborted'; if (this.running) return 'running'; return 'queued'; } public get id () { return this.m_id; } public get job () { return this.m_job; } public get abortSignal () { return this.abortController.signal; } public get progress () { return this.m_progress; } public get state () { return this.m_state; } /** * @param progress The 0 to 100 progress * @param state what type of progress is this. Is it really progress. I humanity even advancing. */ public setProgress (progress: number, state?: TState) { this.m_progress = progress; if (state) this.m_state = state; this.events.emit('progress', { id: this.m_id, progress, state: state ?? this.m_state, job: this }); } public abort (reason?: any) { this.error = reason; this.abortController.abort(reason); } }