diff options
Diffstat (limited to 'lib/compilation-queue.ts')
-rw-r--r-- | lib/compilation-queue.ts | 62 |
1 files changed, 47 insertions, 15 deletions
diff --git a/lib/compilation-queue.ts b/lib/compilation-queue.ts index 0356dec37..d9a903557 100644 --- a/lib/compilation-queue.ts +++ b/lib/compilation-queue.ts @@ -41,43 +41,75 @@ const queueCompleted = new PromClient.Counter({ name: 'ce_compilation_queue_completed_total', help: 'Total number of jobs completed', }); +const queueStale = new PromClient.Counter({ + name: 'ce_compilation_queue_stale_total', + help: 'Total number of jobs abandoned before starting as they were stale', +}); export type Job<TaskResultType> = () => PromiseLike<TaskResultType>; +export type EnqueueOptions = { + abandonIfStale?: boolean; + highPriority?: boolean; +}; + export class CompilationQueue { private readonly _running: Set<number> = new Set(); private readonly _queue: Queue; + private readonly _staleAfterMs: number; - constructor(concurrency: number, timeout: number) { + constructor(concurrency: number, timeout: number, staleAfterMs: number) { this._queue = new Queue({ concurrency, timeout, throwOnTimeout: true, }); + this._staleAfterMs = staleAfterMs; } static fromProps(ceProps) { - return new CompilationQueue(ceProps('maxConcurrentCompiles', 1), ceProps('compilationEnvTimeoutMs')); + return new CompilationQueue( + ceProps('maxConcurrentCompiles', 1), + ceProps('compilationEnvTimeoutMs'), + ceProps('compilationStaleAfterMs', 60_000), + ); } - enqueue<Result>(job: Job<Result>): PromiseLike<Result> { + enqueue<Result>(job: Job<Result>, options?: EnqueueOptions): PromiseLike<Result> { const enqueueAsyncId = executionAsyncId(); + const enqueuedAt = Date.now(); + // If we're asked to enqueue a job when we're already in a async queued job context, just run it. // This prevents a deadlock. if (this._running.has(enqueueAsyncId)) return job(); queueEnqueued.inc(); - return this._queue.add(() => { - queueDequeued.inc(); - const jobAsyncId = executionAsyncId(); - if (this._running.has(jobAsyncId)) throw new Error('somehow we entered the context twice'); - try { - this._running.add(jobAsyncId); - return job(); - } finally { - this._running.delete(jobAsyncId); - queueCompleted.inc(); - } - }) as PromiseLike<Result>; // TODO(supergrecko): investigate why this assert is needed + return this._queue.add( + () => { + const dequeuedAt = Date.now(); + queueDequeued.inc(); + if (options && options.abandonIfStale && dequeuedAt > enqueuedAt + this._staleAfterMs) { + queueCompleted.inc(); + queueStale.inc(); + const queueTimeSecs = (dequeuedAt - enqueuedAt) / 1000; + const limitSecs = this._staleAfterMs / 1000; + throw new Error( + `Compilation was in the queue too long (${queueTimeSecs.toFixed(1)}s > ${limitSecs.toFixed( + 1, + )}s)`, + ); + } + const jobAsyncId = executionAsyncId(); + if (this._running.has(jobAsyncId)) throw new Error('somehow we entered the context twice'); + try { + this._running.add(jobAsyncId); + return job(); + } finally { + this._running.delete(jobAsyncId); + queueCompleted.inc(); + } + }, + {priority: options?.highPriority ? 100 : 0}, + ) as PromiseLike<Result>; // TODO(supergrecko): investigate why this assert is needed } status(): {busy: boolean; pending: number; size: number} { |