aboutsummaryrefslogtreecommitdiff
path: root/lib/compilation-queue.ts
diff options
context:
space:
mode:
authorMatt Godbolt <matt@godbolt.org>2023-08-07 22:17:12 -0500
committerGitHub <noreply@github.com>2023-08-07 22:17:12 -0500
commit6c476bbccc58c4eb01bcf08838a093dcadbf2a87 (patch)
treef19bb949c7cd02b2d595e5b2f4b2ea6f243d63ed /lib/compilation-queue.ts
parentc5300fad6386bcfdc94e0ed4b2d862ce21439737 (diff)
downloadcompiler-explorer-6c476bbccc58c4eb01bcf08838a093dcadbf2a87.tar.gz
compiler-explorer-6c476bbccc58c4eb01bcf08838a093dcadbf2a87.zip
Support abandoned queued compilations (#5278)gh-8398
After some time we know either the client or CloudFront will give up on pending compilations. As such, if we continue to process compilations after the client's timed out we're just clogging up the compilation queue with pointless work. As such, this change now supports the notion of "stale" work which will be abandoned once it's made it to the front of the queue. Only compiles coming from the user will be abandoned, so discovery and health checks are unaffected. Hopefully this will mitigate the number of nodes marked unhealthy due to being overloaded: work they were doing was "pointless" anyway, and them being killed by going unhealthy is equivalent to abandoning all the work in flight anyway, but this means there's a fighting chance the node will recover quickly enough to return a "healthy" status.
Diffstat (limited to 'lib/compilation-queue.ts')
-rw-r--r--lib/compilation-queue.ts62
1 files changed, 47 insertions, 15 deletions
diff --git a/lib/compilation-queue.ts b/lib/compilation-queue.ts
index 0356dec37..d9a903557 100644
--- a/lib/compilation-queue.ts
+++ b/lib/compilation-queue.ts
@@ -41,43 +41,75 @@ const queueCompleted = new PromClient.Counter({
name: 'ce_compilation_queue_completed_total',
help: 'Total number of jobs completed',
});
+const queueStale = new PromClient.Counter({
+ name: 'ce_compilation_queue_stale_total',
+ help: 'Total number of jobs abandoned before starting as they were stale',
+});
export type Job<TaskResultType> = () => PromiseLike<TaskResultType>;
+export type EnqueueOptions = {
+ abandonIfStale?: boolean;
+ highPriority?: boolean;
+};
+
export class CompilationQueue {
private readonly _running: Set<number> = new Set();
private readonly _queue: Queue;
+ private readonly _staleAfterMs: number;
- constructor(concurrency: number, timeout: number) {
+ constructor(concurrency: number, timeout: number, staleAfterMs: number) {
this._queue = new Queue({
concurrency,
timeout,
throwOnTimeout: true,
});
+ this._staleAfterMs = staleAfterMs;
}
static fromProps(ceProps) {
- return new CompilationQueue(ceProps('maxConcurrentCompiles', 1), ceProps('compilationEnvTimeoutMs'));
+ return new CompilationQueue(
+ ceProps('maxConcurrentCompiles', 1),
+ ceProps('compilationEnvTimeoutMs'),
+ ceProps('compilationStaleAfterMs', 60_000),
+ );
}
- enqueue<Result>(job: Job<Result>): PromiseLike<Result> {
+ enqueue<Result>(job: Job<Result>, options?: EnqueueOptions): PromiseLike<Result> {
const enqueueAsyncId = executionAsyncId();
+ const enqueuedAt = Date.now();
+
// If we're asked to enqueue a job when we're already in a async queued job context, just run it.
// This prevents a deadlock.
if (this._running.has(enqueueAsyncId)) return job();
queueEnqueued.inc();
- return this._queue.add(() => {
- queueDequeued.inc();
- const jobAsyncId = executionAsyncId();
- if (this._running.has(jobAsyncId)) throw new Error('somehow we entered the context twice');
- try {
- this._running.add(jobAsyncId);
- return job();
- } finally {
- this._running.delete(jobAsyncId);
- queueCompleted.inc();
- }
- }) as PromiseLike<Result>; // TODO(supergrecko): investigate why this assert is needed
+ return this._queue.add(
+ () => {
+ const dequeuedAt = Date.now();
+ queueDequeued.inc();
+ if (options && options.abandonIfStale && dequeuedAt > enqueuedAt + this._staleAfterMs) {
+ queueCompleted.inc();
+ queueStale.inc();
+ const queueTimeSecs = (dequeuedAt - enqueuedAt) / 1000;
+ const limitSecs = this._staleAfterMs / 1000;
+ throw new Error(
+ `Compilation was in the queue too long (${queueTimeSecs.toFixed(1)}s > ${limitSecs.toFixed(
+ 1,
+ )}s)`,
+ );
+ }
+ const jobAsyncId = executionAsyncId();
+ if (this._running.has(jobAsyncId)) throw new Error('somehow we entered the context twice');
+ try {
+ this._running.add(jobAsyncId);
+ return job();
+ } finally {
+ this._running.delete(jobAsyncId);
+ queueCompleted.inc();
+ }
+ },
+ {priority: options?.highPriority ? 100 : 0},
+ ) as PromiseLike<Result>; // TODO(supergrecko): investigate why this assert is needed
}
status(): {busy: boolean; pending: number; size: number} {