aboutsummaryrefslogtreecommitdiff
path: root/lib/compilation-queue.ts
diff options
context:
space:
mode:
Diffstat (limited to 'lib/compilation-queue.ts')
-rw-r--r--lib/compilation-queue.ts62
1 files changed, 47 insertions, 15 deletions
diff --git a/lib/compilation-queue.ts b/lib/compilation-queue.ts
index 0356dec37..d9a903557 100644
--- a/lib/compilation-queue.ts
+++ b/lib/compilation-queue.ts
@@ -41,43 +41,75 @@ const queueCompleted = new PromClient.Counter({
name: 'ce_compilation_queue_completed_total',
help: 'Total number of jobs completed',
});
+const queueStale = new PromClient.Counter({
+ name: 'ce_compilation_queue_stale_total',
+ help: 'Total number of jobs abandoned before starting as they were stale',
+});
export type Job<TaskResultType> = () => PromiseLike<TaskResultType>;
+export type EnqueueOptions = {
+ abandonIfStale?: boolean;
+ highPriority?: boolean;
+};
+
export class CompilationQueue {
private readonly _running: Set<number> = new Set();
private readonly _queue: Queue;
+ private readonly _staleAfterMs: number;
- constructor(concurrency: number, timeout: number) {
+ constructor(concurrency: number, timeout: number, staleAfterMs: number) {
this._queue = new Queue({
concurrency,
timeout,
throwOnTimeout: true,
});
+ this._staleAfterMs = staleAfterMs;
}
static fromProps(ceProps) {
- return new CompilationQueue(ceProps('maxConcurrentCompiles', 1), ceProps('compilationEnvTimeoutMs'));
+ return new CompilationQueue(
+ ceProps('maxConcurrentCompiles', 1),
+ ceProps('compilationEnvTimeoutMs'),
+ ceProps('compilationStaleAfterMs', 60_000),
+ );
}
- enqueue<Result>(job: Job<Result>): PromiseLike<Result> {
+ enqueue<Result>(job: Job<Result>, options?: EnqueueOptions): PromiseLike<Result> {
const enqueueAsyncId = executionAsyncId();
+ const enqueuedAt = Date.now();
+
// If we're asked to enqueue a job when we're already in a async queued job context, just run it.
// This prevents a deadlock.
if (this._running.has(enqueueAsyncId)) return job();
queueEnqueued.inc();
- return this._queue.add(() => {
- queueDequeued.inc();
- const jobAsyncId = executionAsyncId();
- if (this._running.has(jobAsyncId)) throw new Error('somehow we entered the context twice');
- try {
- this._running.add(jobAsyncId);
- return job();
- } finally {
- this._running.delete(jobAsyncId);
- queueCompleted.inc();
- }
- }) as PromiseLike<Result>; // TODO(supergrecko): investigate why this assert is needed
+ return this._queue.add(
+ () => {
+ const dequeuedAt = Date.now();
+ queueDequeued.inc();
+ if (options && options.abandonIfStale && dequeuedAt > enqueuedAt + this._staleAfterMs) {
+ queueCompleted.inc();
+ queueStale.inc();
+ const queueTimeSecs = (dequeuedAt - enqueuedAt) / 1000;
+ const limitSecs = this._staleAfterMs / 1000;
+ throw new Error(
+ `Compilation was in the queue too long (${queueTimeSecs.toFixed(1)}s > ${limitSecs.toFixed(
+ 1,
+ )}s)`,
+ );
+ }
+ const jobAsyncId = executionAsyncId();
+ if (this._running.has(jobAsyncId)) throw new Error('somehow we entered the context twice');
+ try {
+ this._running.add(jobAsyncId);
+ return job();
+ } finally {
+ this._running.delete(jobAsyncId);
+ queueCompleted.inc();
+ }
+ },
+ {priority: options?.highPriority ? 100 : 0},
+ ) as PromiseLike<Result>; // TODO(supergrecko): investigate why this assert is needed
}
status(): {busy: boolean; pending: number; size: number} {