diff options
author | Matt Godbolt <matt@godbolt.org> | 2023-08-07 22:17:12 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-08-07 22:17:12 -0500 |
commit | 6c476bbccc58c4eb01bcf08838a093dcadbf2a87 (patch) | |
tree | f19bb949c7cd02b2d595e5b2f4b2ea6f243d63ed /lib/compilation-queue.ts | |
parent | c5300fad6386bcfdc94e0ed4b2d862ce21439737 (diff) | |
download | compiler-explorer-6c476bbccc58c4eb01bcf08838a093dcadbf2a87.tar.gz compiler-explorer-6c476bbccc58c4eb01bcf08838a093dcadbf2a87.zip |
Support abandoned queued compilations (#5278)gh-8398
After some time we know either the client or CloudFront will give up on
pending compilations. As such, if we continue to process compilations
after the client's timed out we're just clogging up the compilation
queue with pointless work.
As such, this change now supports the notion of "stale" work which will
be abandoned once it's made it to the front of the queue. Only compiles
coming from the user will be abandoned, so discovery and health checks
are unaffected.
Hopefully this will mitigate the number of nodes marked unhealthy due to
being overloaded: work they were doing was "pointless" anyway, and them
being killed by going unhealthy is equivalent to abandoning all the work
in flight anyway, but this means there's a fighting chance the node will
recover quickly enough to return a "healthy" status.
Diffstat (limited to 'lib/compilation-queue.ts')
-rw-r--r-- | lib/compilation-queue.ts | 62 |
1 files changed, 47 insertions, 15 deletions
diff --git a/lib/compilation-queue.ts b/lib/compilation-queue.ts index 0356dec37..d9a903557 100644 --- a/lib/compilation-queue.ts +++ b/lib/compilation-queue.ts @@ -41,43 +41,75 @@ const queueCompleted = new PromClient.Counter({ name: 'ce_compilation_queue_completed_total', help: 'Total number of jobs completed', }); +const queueStale = new PromClient.Counter({ + name: 'ce_compilation_queue_stale_total', + help: 'Total number of jobs abandoned before starting as they were stale', +}); export type Job<TaskResultType> = () => PromiseLike<TaskResultType>; +export type EnqueueOptions = { + abandonIfStale?: boolean; + highPriority?: boolean; +}; + export class CompilationQueue { private readonly _running: Set<number> = new Set(); private readonly _queue: Queue; + private readonly _staleAfterMs: number; - constructor(concurrency: number, timeout: number) { + constructor(concurrency: number, timeout: number, staleAfterMs: number) { this._queue = new Queue({ concurrency, timeout, throwOnTimeout: true, }); + this._staleAfterMs = staleAfterMs; } static fromProps(ceProps) { - return new CompilationQueue(ceProps('maxConcurrentCompiles', 1), ceProps('compilationEnvTimeoutMs')); + return new CompilationQueue( + ceProps('maxConcurrentCompiles', 1), + ceProps('compilationEnvTimeoutMs'), + ceProps('compilationStaleAfterMs', 60_000), + ); } - enqueue<Result>(job: Job<Result>): PromiseLike<Result> { + enqueue<Result>(job: Job<Result>, options?: EnqueueOptions): PromiseLike<Result> { const enqueueAsyncId = executionAsyncId(); + const enqueuedAt = Date.now(); + // If we're asked to enqueue a job when we're already in a async queued job context, just run it. // This prevents a deadlock. if (this._running.has(enqueueAsyncId)) return job(); queueEnqueued.inc(); - return this._queue.add(() => { - queueDequeued.inc(); - const jobAsyncId = executionAsyncId(); - if (this._running.has(jobAsyncId)) throw new Error('somehow we entered the context twice'); - try { - this._running.add(jobAsyncId); - return job(); - } finally { - this._running.delete(jobAsyncId); - queueCompleted.inc(); - } - }) as PromiseLike<Result>; // TODO(supergrecko): investigate why this assert is needed + return this._queue.add( + () => { + const dequeuedAt = Date.now(); + queueDequeued.inc(); + if (options && options.abandonIfStale && dequeuedAt > enqueuedAt + this._staleAfterMs) { + queueCompleted.inc(); + queueStale.inc(); + const queueTimeSecs = (dequeuedAt - enqueuedAt) / 1000; + const limitSecs = this._staleAfterMs / 1000; + throw new Error( + `Compilation was in the queue too long (${queueTimeSecs.toFixed(1)}s > ${limitSecs.toFixed( + 1, + )}s)`, + ); + } + const jobAsyncId = executionAsyncId(); + if (this._running.has(jobAsyncId)) throw new Error('somehow we entered the context twice'); + try { + this._running.add(jobAsyncId); + return job(); + } finally { + this._running.delete(jobAsyncId); + queueCompleted.inc(); + } + }, + {priority: options?.highPriority ? 100 : 0}, + ) as PromiseLike<Result>; // TODO(supergrecko): investigate why this assert is needed } status(): {busy: boolean; pending: number; size: number} { |