Simplify TaskWithTimeout

Co-authored-by: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com>
Co-authored-by: trevor-signal <131492920+trevor-signal@users.noreply.github.com>
This commit is contained in:
automated-signal
2026-03-24 11:31:03 -05:00
committed by GitHub
parent 09126429c0
commit 1a7112ec74
12 changed files with 164 additions and 212 deletions

View File

@@ -2,127 +2,116 @@
// SPDX-License-Identifier: AGPL-3.0-only
import { MINUTE } from '../util/durations/index.std.js';
import { clearTimeoutIfNecessary } from '../util/clearTimeoutIfNecessary.std.js';
import { explodePromise } from '../util/explodePromise.std.js';
import { toLogFormat } from '../types/errors.std.js';
import { missingCaseError } from '../util/missingCaseError.std.js';
import { createLogger } from '../logging/log.std.js';
const TICK_INTERVAL = MINUTE / 2;
const log = createLogger('TaskWithTimeout');
type TaskType = {
id: string;
startedAt: number | undefined;
suspend(): void;
resume(): void;
ticks: number;
maxTicks: number;
reject: (error: Error) => void;
};
const tasks = new Set<TaskType>();
let shouldStartTimers = true;
let shouldStartTicking = true;
let tickInterval: NodeJS.Timeout | undefined;
function maybeStartTicking(): void {
if (!shouldStartTicking) {
return;
}
// Already ticking
if (tickInterval != null) {
return;
}
log.info('starting tick');
tickInterval = setInterval(() => {
for (const task of tasks) {
task.ticks += 1;
if (task.ticks < task.maxTicks) {
continue;
}
tasks.delete(task);
task.reject(new Error(`TaskWithTimeout(${task.id}) timed out`));
}
}, TICK_INTERVAL);
}
export function suspendTasksWithTimeout(): void {
if (!shouldStartTicking) {
return;
}
log.info(`suspending ${tasks.size} tasks`);
shouldStartTimers = false;
for (const task of tasks) {
task.suspend();
shouldStartTicking = false;
if (tickInterval != null) {
log.info('stopping tick');
clearInterval(tickInterval);
tickInterval = undefined;
}
}
export function resumeTasksWithTimeout(): void {
log.info(`resuming ${tasks.size} tasks`);
shouldStartTimers = true;
for (const task of tasks) {
task.resume();
if (shouldStartTicking) {
return;
}
log.info(`resuming ${tasks.size} tasks`);
shouldStartTicking = true;
maybeStartTicking();
}
export function reportLongRunningTasks(): void {
const now = Date.now();
for (const task of tasks) {
if (task.startedAt === undefined) {
continue;
}
const duration = Math.max(0, now - task.startedAt);
const duration = task.ticks * TICK_INTERVAL;
if (duration > MINUTE) {
log.warn(`${task.id} has been running for ${duration}ms`);
log.warn(`${task.id} has been running for ~${duration}ms`);
}
}
}
export default function createTaskWithTimeout<T, Args extends Array<unknown>>(
task: (...args: Args) => Promise<T>,
export async function runTaskWithTimeout<T>(
task: () => Promise<T>,
id: string,
options: { timeout?: number } = {}
): (...args: Args) => Promise<T> {
const timeout = options.timeout || 30 * MINUTE;
taskType: 'long-running' | 'short-lived' = 'long-running'
): Promise<T> {
let maxTicks: number;
const timeoutError = new Error(`${id || ''} task did not complete in time.`);
if (taskType === 'long-running') {
maxTicks = (30 * MINUTE) / TICK_INTERVAL;
} else if (taskType === 'short-lived') {
maxTicks = (2 * MINUTE) / TICK_INTERVAL;
} else {
throw missingCaseError(taskType);
}
return async (...args: Args) => {
let complete = false;
const { promise: timerPromise, reject } = explodePromise<never>();
let timer: NodeJS.Timeout | undefined;
const { promise: timerPromise, reject } = explodePromise<never>();
const startTimer = () => {
stopTimer();
if (complete) {
return;
}
entry.startedAt = Date.now();
timer = setTimeout(() => {
if (complete) {
log.warn(`${id} task timed out, but was already complete`);
return;
}
complete = true;
tasks.delete(entry);
log.error(toLogFormat(timeoutError));
reject(timeoutError);
}, timeout);
};
const stopTimer = () => {
clearTimeoutIfNecessary(timer);
timer = undefined;
};
const entry: TaskType = {
id,
startedAt: undefined,
suspend: () => {
log.warn(`${id} task suspended`);
stopTimer();
},
resume: () => {
log.warn(`${id} task resumed`);
startTimer();
},
};
tasks.add(entry);
if (shouldStartTimers) {
startTimer();
}
let result: unknown;
const run = async (): Promise<void> => {
result = await task(...args);
};
try {
await Promise.race([run(), timerPromise]);
return result as T;
} finally {
complete = true;
tasks.delete(entry);
stopTimer();
}
const entry: TaskType = {
id,
ticks: 0,
maxTicks,
reject,
};
tasks.add(entry);
maybeStartTicking();
try {
return await Promise.race([task(), timerPromise]);
} finally {
tasks.delete(entry);
if (tasks.size === 0 && tickInterval != null) {
log.info('stopping tick');
clearInterval(tickInterval);
tickInterval = undefined;
}
}
}