Simplify TaskWithTimeout

Co-authored-by: trevor-signal <131492920+trevor-signal@users.noreply.github.com>
This commit is contained in:
Fedor Indutny
2026-03-24 09:08:36 -07:00
committed by GitHub
parent c050a0c8c2
commit b8cca2c49c
12 changed files with 164 additions and 212 deletions

View File

@@ -15,7 +15,8 @@ import type {
ProcessedDataMessage, ProcessedDataMessage,
} from './textsecure/Types.d.ts'; } from './textsecure/Types.d.ts';
import { HTTPError } from './types/HTTPError.std.js'; import { HTTPError } from './types/HTTPError.std.js';
import createTaskWithTimeout, { import {
runTaskWithTimeout,
suspendTasksWithTimeout, suspendTasksWithTimeout,
resumeTasksWithTimeout, resumeTasksWithTimeout,
reportLongRunningTasks, reportLongRunningTasks,
@@ -603,8 +604,8 @@ export async function startApp(): Promise<void> {
): (event: E) => void { ): (event: E) => void {
return (event: E): void => { return (event: E): void => {
drop( drop(
eventHandlerQueue.add( eventHandlerQueue.add(() =>
createTaskWithTimeout( runTaskWithTimeout(
async () => handler(event), async () => handler(event),
`queuedEventListener(${event.type}, ${event.timeStamp})` `queuedEventListener(${event.type}, ${event.timeStamp})`
) )

View File

@@ -66,7 +66,7 @@ import {
cdsLookup, cdsLookup,
checkAccountExistence, checkAccountExistence,
} from '../textsecure/WebAPI.preload.js'; } from '../textsecure/WebAPI.preload.js';
import createTaskWithTimeout from '../textsecure/TaskWithTimeout.std.js'; import { runTaskWithTimeout } from '../textsecure/TaskWithTimeout.std.js';
import { MessageSender } from '../textsecure/SendMessage.preload.js'; import { MessageSender } from '../textsecure/SendMessage.preload.js';
import type { import type {
CallbackResultType, CallbackResultType,
@@ -3849,8 +3849,6 @@ export class ConversationModel {
this.jobQueue = this.jobQueue || new PQueue({ concurrency: 1 }); this.jobQueue = this.jobQueue || new PQueue({ concurrency: 1 });
const taskWithTimeout = createTaskWithTimeout(callback, logId);
const abortController = new AbortController(); const abortController = new AbortController();
const { signal: abortSignal } = abortController; const { signal: abortSignal } = abortController;
@@ -3864,7 +3862,10 @@ export class ConversationModel {
} }
try { try {
return await taskWithTimeout(abortSignal); return await runTaskWithTimeout(
async () => callback(abortSignal),
logId
);
} catch (error) { } catch (error) {
abortController.abort(); abortController.abort();
throw error; throw error;

View File

@@ -1,8 +1,7 @@
// Copyright 2019 Signal Messenger, LLC // Copyright 2019 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only // SPDX-License-Identifier: AGPL-3.0-only
import createTaskWithTimeout from '../textsecure/TaskWithTimeout.std.js'; import { runTaskWithTimeout } from '../textsecure/TaskWithTimeout.std.js';
import { MINUTE } from '../util/durations/index.std.js';
import { explodePromise } from '../util/explodePromise.std.js'; import { explodePromise } from '../util/explodePromise.std.js';
// Matching Whisper.events.trigger API // Matching Whisper.events.trigger API
@@ -11,16 +10,13 @@ export function trigger(name: string, ...rest: Array<any>): void {
window.Whisper.events.emit(name, ...rest); window.Whisper.events.emit(name, ...rest);
} }
export const waitForEvent = ( export const waitForEvent = (eventName: string): Promise<void> =>
eventName: string, runTaskWithTimeout(
timeout: number = 2 * MINUTE () => {
): Promise<void> =>
createTaskWithTimeout(
(event: string): Promise<void> => {
const { promise, resolve } = explodePromise<void>(); const { promise, resolve } = explodePromise<void>();
window.Whisper.events.once(event, () => resolve()); window.Whisper.events.once(eventName, () => resolve());
return promise; return promise;
}, },
`waitForEvent:${eventName}`, `waitForEvent:${eventName}`,
{ timeout } 'short-lived'
)(eventName); );

View File

@@ -19,7 +19,7 @@ import { assertDev, softAssert } from '../util/assert.std.js';
import { mapObjectWithSpec } from '../util/mapObjectWithSpec.std.js'; import { mapObjectWithSpec } from '../util/mapObjectWithSpec.std.js';
import { maybeDeleteAttachmentFile } from '../util/migrations.preload.js'; import { maybeDeleteAttachmentFile } from '../util/migrations.preload.js';
import { cleanDataForIpc } from './cleanDataForIpc.std.js'; import { cleanDataForIpc } from './cleanDataForIpc.std.js';
import createTaskWithTimeout from '../textsecure/TaskWithTimeout.std.js'; import { runTaskWithTimeout } from '../textsecure/TaskWithTimeout.std.js';
import { isValidUuid, isValidUuidV7 } from '../util/isValidUuid.std.js'; import { isValidUuid, isValidUuidV7 } from '../util/isValidUuid.std.js';
import { formatJobForInsert } from '../jobs/formatJobForInsert.std.js'; import { formatJobForInsert } from '../jobs/formatJobForInsert.std.js';
import { import {
@@ -904,10 +904,10 @@ async function invokeWithTimeout(
name: string, name: string,
...args: Array<unknown> ...args: Array<unknown>
): Promise<void> { ): Promise<void> {
return createTaskWithTimeout( return runTaskWithTimeout(
() => ipc.invoke(name, ...args), () => ipc.invoke(name, ...args),
`callChannel call to ${name}` `callChannel call to ${name}`
)(); );
} }
export function pauseWriteAccess(): Promise<void> { export function pauseWriteAccess(): Promise<void> {

View File

@@ -3,7 +3,7 @@
import { ipcRenderer } from 'electron'; import { ipcRenderer } from 'electron';
import { createLogger } from '../logging/log.std.js'; import { createLogger } from '../logging/log.std.js';
import createTaskWithTimeout from '../textsecure/TaskWithTimeout.std.js'; import { runTaskWithTimeout } from '../textsecure/TaskWithTimeout.std.js';
import { explodePromise } from '../util/explodePromise.std.js'; import { explodePromise } from '../util/explodePromise.std.js';
import { missingCaseError } from '../util/missingCaseError.std.js'; import { missingCaseError } from '../util/missingCaseError.std.js';
@@ -45,7 +45,7 @@ export async function ipcInvoke<T>(
} }
activeJobCount += 1; activeJobCount += 1;
return createTaskWithTimeout(async () => { return runTaskWithTimeout(async () => {
try { try {
const result = await ipcRenderer.invoke(channel, name, ...args); const result = await ipcRenderer.invoke(channel, name, ...args);
if (!result.ok) { if (!result.ok) {
@@ -58,7 +58,7 @@ export async function ipcInvoke<T>(
resolveShutdown?.(); resolveShutdown?.();
} }
} }
}, `SQL channel call (${access}, ${fnName})`)(); }, `SQL channel call (${access}, ${fnName})`);
} }
export async function doShutdown(): Promise<void> { export async function doShutdown(): Promise<void> {

View File

@@ -60,7 +60,7 @@ import { writeProfile } from '../../services/writeProfile.preload.js';
import { keyTransparency } from '../../services/keyTransparency.preload.js'; import { keyTransparency } from '../../services/keyTransparency.preload.js';
import { getConversation } from '../../util/getConversation.preload.js'; import { getConversation } from '../../util/getConversation.preload.js';
import { waitForEvent } from '../../shims/events.dom.js'; import { waitForEvent } from '../../shims/events.dom.js';
import { DAY, MINUTE } from '../../util/durations/index.std.js'; import { DAY } from '../../util/durations/index.std.js';
import { sendSyncRequests } from '../../textsecure/syncRequests.preload.js'; import { sendSyncRequests } from '../../textsecure/syncRequests.preload.js';
import { SmartUpdateDialog } from './UpdateDialog.preload.js'; import { SmartUpdateDialog } from './UpdateDialog.preload.js';
import { Preferences } from '../../components/Preferences.dom.js'; import { Preferences } from '../../components/Preferences.dom.js';
@@ -258,10 +258,7 @@ export function SmartPreferences(): React.JSX.Element | null {
// The weird ones // The weird ones
const makeSyncRequest = async () => { const makeSyncRequest = async () => {
const contactSyncComplete = waitForEvent( const contactSyncComplete = waitForEvent('contactSync:complete');
'contactSync:complete',
5 * MINUTE
);
return Promise.all([sendSyncRequests(), contactSyncComplete]); return Promise.all([sendSyncRequests(), contactSyncComplete]);
}; };

View File

@@ -6,12 +6,14 @@ import * as sinon from 'sinon';
import { sleep } from '../util/sleep.std.js'; import { sleep } from '../util/sleep.std.js';
import { explodePromise } from '../util/explodePromise.std.js'; import { explodePromise } from '../util/explodePromise.std.js';
import createTaskWithTimeout, { import { MINUTE } from '../util/durations/index.std.js';
import {
runTaskWithTimeout,
suspendTasksWithTimeout, suspendTasksWithTimeout,
resumeTasksWithTimeout, resumeTasksWithTimeout,
} from '../textsecure/TaskWithTimeout.std.js'; } from '../textsecure/TaskWithTimeout.std.js';
describe('createTaskWithTimeout', () => { describe('runTaskWithTimeout', () => {
let sandbox: sinon.SinonSandbox; let sandbox: sinon.SinonSandbox;
beforeEach(() => { beforeEach(() => {
@@ -24,18 +26,16 @@ describe('createTaskWithTimeout', () => {
it('resolves when promise resolves', async () => { it('resolves when promise resolves', async () => {
const task = () => Promise.resolve('hi!'); const task = () => Promise.resolve('hi!');
const taskWithTimeout = createTaskWithTimeout(task, 'resolving-task'); const result = await runTaskWithTimeout(task, 'resolving-task');
const result = await taskWithTimeout();
assert.strictEqual(result, 'hi!'); assert.strictEqual(result, 'hi!');
}); });
it('flows error from promise back', async () => { it('flows error from promise back', async () => {
const error = new Error('original'); const error = new Error('original');
const task = () => Promise.reject(error); const task = () => Promise.reject(error);
const taskWithTimeout = createTaskWithTimeout(task, 'rejecting-task'); const taskWithTimeout = runTaskWithTimeout(task, 'rejecting-task');
await assert.isRejected(taskWithTimeout(), 'original'); await assert.isRejected(taskWithTimeout, 'original');
}); });
it('rejects if promise takes too long (this one logs error to console)', async () => { it('rejects if promise takes too long (this one logs error to console)', async () => {
@@ -45,11 +45,11 @@ describe('createTaskWithTimeout', () => {
// Never resolves // Never resolves
const task = () => pause; const task = () => pause;
const taskWithTimeout = createTaskWithTimeout(task, 'slow-task'); const taskWithTimeout = runTaskWithTimeout(task, 'slow-task');
const promise = assert.isRejected(taskWithTimeout()); const promise = assert.isRejected(taskWithTimeout);
await clock.runToLastAsync(); await clock.runAllAsync();
await promise; await promise;
}); });
@@ -61,17 +61,9 @@ describe('createTaskWithTimeout', () => {
const task = () => { const task = () => {
throw error; throw error;
}; };
const taskWithTimeout = createTaskWithTimeout(task, 'throwing-task'); const taskWithTimeout = runTaskWithTimeout(task, 'throwing-task');
await clock.runToLastAsync(); await clock.runToLastAsync();
await assert.isRejected(taskWithTimeout(), 'Task is throwing!'); await assert.isRejected(taskWithTimeout, 'Task is throwing!');
});
it('passes arguments to the underlying function', async () => {
const task = (arg: string) => Promise.resolve(arg);
const taskWithTimeout = createTaskWithTimeout(task, 'arguments-task');
const result = await taskWithTimeout('hi!');
assert.strictEqual(result, 'hi!');
}); });
it('suspends and resumes tasks', async () => { it('suspends and resumes tasks', async () => {
@@ -81,25 +73,21 @@ describe('createTaskWithTimeout', () => {
const task = async () => { const task = async () => {
state = 1; state = 1;
await sleep(900); await sleep(2 * MINUTE - 100);
state = 2; state = 2;
await sleep(900); await sleep(2 * MINUTE - 100);
state = 3; state = 3;
}; };
const taskWithTimeout = createTaskWithTimeout(task, 'suspend-task', { const promise = runTaskWithTimeout(task, 'suspend-task', 'short-lived');
timeout: 1000,
});
const promise = taskWithTimeout();
assert.strictEqual(state, 1); assert.strictEqual(state, 1);
suspendTasksWithTimeout(); suspendTasksWithTimeout();
await clock.tickAsync(900); await clock.tickAsync(2 * MINUTE - 100);
assert.strictEqual(state, 2); assert.strictEqual(state, 2);
resumeTasksWithTimeout(); resumeTasksWithTimeout();
await clock.tickAsync(900); await clock.tickAsync(2 * MINUTE - 100);
assert.strictEqual(state, 3); assert.strictEqual(state, 3);
await promise; await promise;
@@ -112,17 +100,17 @@ describe('createTaskWithTimeout', () => {
// Never resolves // Never resolves
const task = () => pause; const task = () => pause;
const taskWithTimeout = createTaskWithTimeout(task, 'suspend-slow-task'); const taskWithTimeout = runTaskWithTimeout(task, 'suspend-slow-task');
const promise = assert.isRejected(taskWithTimeout()); const promise = assert.isRejected(taskWithTimeout);
suspendTasksWithTimeout(); suspendTasksWithTimeout();
await clock.runToLastAsync(); await clock.runAllAsync();
resumeTasksWithTimeout(); resumeTasksWithTimeout();
await clock.runToLastAsync(); await clock.runAllAsync();
await promise; await promise;
}); });

View File

@@ -31,7 +31,7 @@ import type {
KyberPreKeyType, KyberPreKeyType,
PniKeyMaterialType, PniKeyMaterialType,
} from './Types.d.ts'; } from './Types.d.ts';
import createTaskWithTimeout from './TaskWithTimeout.std.js'; import { runTaskWithTimeout } from './TaskWithTimeout.std.js';
import * as Bytes from '../Bytes.std.js'; import * as Bytes from '../Bytes.std.js';
import * as Errors from '../types/errors.std.js'; import * as Errors from '../types/errors.std.js';
import { import {
@@ -268,9 +268,10 @@ export default class AccountManager extends EventTarget {
async #queueTask<T>(task: () => Promise<T>): Promise<T> { async #queueTask<T>(task: () => Promise<T>): Promise<T> {
this.pendingQueue = this.pendingQueue || new PQueue({ concurrency: 1 }); this.pendingQueue = this.pendingQueue || new PQueue({ concurrency: 1 });
const taskWithTimeout = createTaskWithTimeout(task, 'AccountManager task');
return this.pendingQueue.add(taskWithTimeout); return this.pendingQueue.add(() =>
runTaskWithTimeout(task, 'AccountManager task')
);
} }
encryptDeviceName( encryptDeviceName(

View File

@@ -76,7 +76,7 @@ import { signalProtocolStore } from '../SignalProtocolStore.preload.js';
import { SignalService as Proto } from '../protobuf/index.std.js'; import { SignalService as Proto } from '../protobuf/index.std.js';
import { deriveGroupFields, MASTER_KEY_LENGTH } from '../groups.preload.js'; import { deriveGroupFields, MASTER_KEY_LENGTH } from '../groups.preload.js';
import createTaskWithTimeout from './TaskWithTimeout.std.js'; import { runTaskWithTimeout } from './TaskWithTimeout.std.js';
import { import {
processAttachment, processAttachment,
processDataMessage, processDataMessage,
@@ -245,10 +245,6 @@ export type MessageReceiverOptions = {
serverTrustRoots: Array<string>; serverTrustRoots: Array<string>;
}; };
const TASK_WITH_TIMEOUT_OPTIONS = {
timeout: 2 * durations.MINUTE,
};
const LOG_UNEXPECTED_URGENT_VALUES = false; const LOG_UNEXPECTED_URGENT_VALUES = false;
const MUST_BE_URGENT_TYPES: Array<SendTypesType> = [ const MUST_BE_URGENT_TYPES: Array<SendTypesType> = [
'message', 'message',
@@ -391,13 +387,13 @@ export default class MessageReceiver
if (request.requestType === ServerRequestType.ApiEmptyQueue) { if (request.requestType === ServerRequestType.ApiEmptyQueue) {
drop( drop(
this.#incomingQueue.add( this.#incomingQueue.add(() =>
createTaskWithTimeout( runTaskWithTimeout(
async () => { async () => {
this.#onEmpty(); this.#onEmpty();
}, },
'incomingQueue/onEmpty', 'incomingQueue/onEmpty',
TASK_WITH_TIMEOUT_OPTIONS 'short-lived'
) )
) )
); );
@@ -487,12 +483,8 @@ export default class MessageReceiver
}; };
drop( drop(
this.#incomingQueue.add( this.#incomingQueue.add(() =>
createTaskWithTimeout( runTaskWithTimeout(job, 'incomingQueue/websocket', 'short-lived')
job,
'incomingQueue/websocket',
TASK_WITH_TIMEOUT_OPTIONS
)
) )
); );
} }
@@ -510,13 +502,11 @@ export default class MessageReceiver
#addCachedMessagesToQueue(): Promise<void> { #addCachedMessagesToQueue(): Promise<void> {
log.info('addCachedMessagesToQueue'); log.info('addCachedMessagesToQueue');
return this.#incomingQueue.add( return this.#incomingQueue.add(() =>
createTaskWithTimeout( runTaskWithTimeout(
async () => this.#queueAllCached(), async () => this.#queueAllCached(),
'incomingQueue/queueAllCached', 'incomingQueue/queueAllCached',
{ 'long-running'
timeout: 10 * durations.MINUTE,
}
) )
); );
} }
@@ -547,11 +537,11 @@ export default class MessageReceiver
TaskType.Encrypted TaskType.Encrypted
); );
return this.#incomingQueue.add( return this.#incomingQueue.add(() =>
createTaskWithTimeout( runTaskWithTimeout(
waitForIncomingQueue, waitForIncomingQueue,
'drain/waitForIncoming', 'drain/waitForIncoming',
TASK_WITH_TIMEOUT_OPTIONS 'short-lived'
) )
); );
} }
@@ -732,11 +722,11 @@ export default class MessageReceiver
async #dispatchAndWait(id: string, event: Event): Promise<void> { async #dispatchAndWait(id: string, event: Event): Promise<void> {
drop( drop(
this.#appQueue.add( this.#appQueue.add(() =>
createTaskWithTimeout( runTaskWithTimeout(
async () => Promise.all(this.dispatchEvent(event)), async () => Promise.all(this.dispatchEvent(event)),
`dispatchEvent(${event.type}, ${id})`, `dispatchEvent(${event.type}, ${id})`,
TASK_WITH_TIMEOUT_OPTIONS 'short-lived'
) )
) )
); );
@@ -764,9 +754,7 @@ export default class MessageReceiver
? this.#encryptedQueue ? this.#encryptedQueue
: this.#decryptedQueue; : this.#decryptedQueue;
return queue.add( return queue.add(() => runTaskWithTimeout(task, id, 'short-lived'));
createTaskWithTimeout(task, id, TASK_WITH_TIMEOUT_OPTIONS)
);
} }
#onEmpty(): void { #onEmpty(): void {
@@ -796,12 +784,8 @@ export default class MessageReceiver
// We don't await here because we don't want this to gate future message processing // We don't await here because we don't want this to gate future message processing
drop( drop(
this.#appQueue.add( this.#appQueue.add(() =>
createTaskWithTimeout( runTaskWithTimeout(emitEmpty, 'emitEmpty', 'short-lived')
emitEmpty,
'emitEmpty',
TASK_WITH_TIMEOUT_OPTIONS
)
) )
); );
}; };
@@ -829,11 +813,11 @@ export default class MessageReceiver
const waitForCacheAddBatcher = async () => { const waitForCacheAddBatcher = async () => {
await this.#decryptAndCacheBatcher.onIdle(); await this.#decryptAndCacheBatcher.onIdle();
drop( drop(
this.#incomingQueue.add( this.#incomingQueue.add(() =>
createTaskWithTimeout( runTaskWithTimeout(
waitForIncomingQueue, waitForIncomingQueue,
'onEmpty/waitForIncoming', 'onEmpty/waitForIncoming',
TASK_WITH_TIMEOUT_OPTIONS 'short-lived'
) )
) )
); );
@@ -969,11 +953,11 @@ export default class MessageReceiver
this.#clearRetryTimeout(); this.#clearRetryTimeout();
this.#retryCachedTimeout = setTimeout(() => { this.#retryCachedTimeout = setTimeout(() => {
drop( drop(
this.#incomingQueue.add( this.#incomingQueue.add(() =>
createTaskWithTimeout( runTaskWithTimeout(
async () => this.#queueAllCached(), async () => this.#queueAllCached(),
'queueAllCached', 'queueAllCached',
TASK_WITH_TIMEOUT_OPTIONS 'short-lived'
) )
) )
); );
@@ -1214,15 +1198,15 @@ export default class MessageReceiver
log.info('queueing decrypted envelope', id); log.info('queueing decrypted envelope', id);
const task = this.#handleDecryptedEnvelope.bind(this, envelope, plaintext); const task = this.#handleDecryptedEnvelope.bind(this, envelope, plaintext);
const taskWithTimeout = createTaskWithTimeout(
task,
`queueDecryptedEnvelope ${id}`,
TASK_WITH_TIMEOUT_OPTIONS
);
try { try {
await this.#addToQueue( await this.#addToQueue(
taskWithTimeout, () =>
runTaskWithTimeout(
task,
`queueDecryptedEnvelope ${id}`,
'short-lived'
),
`handleDecryptedEnvelope(${id})`, `handleDecryptedEnvelope(${id})`,
TaskType.Decrypted TaskType.Decrypted
); );

View File

@@ -43,7 +43,7 @@ import {
toPniObject, toPniObject,
toServiceIdObject, toServiceIdObject,
} from '../util/ServiceId.node.js'; } from '../util/ServiceId.node.js';
import createTaskWithTimeout from './TaskWithTimeout.std.js'; import { runTaskWithTimeout } from './TaskWithTimeout.std.js';
import type { CallbackResultType } from './Types.d.ts'; import type { CallbackResultType } from './Types.d.ts';
import type { import type {
SerializedCertificateType, SerializedCertificateType,
@@ -765,12 +765,9 @@ export class MessageSender {
const queue = this.pendingMessages[id]; const queue = this.pendingMessages[id];
const taskWithTimeout = createTaskWithTimeout( return queue.add(() =>
runJob, runTaskWithTimeout(runJob, `queueJobForServiceId ${serviceId} ${id}`)
`queueJobForServiceId ${serviceId} ${id}`
); );
return queue.add(taskWithTimeout);
} }
// Attachment upload functions // Attachment upload functions

View File

@@ -2,127 +2,116 @@
// SPDX-License-Identifier: AGPL-3.0-only // SPDX-License-Identifier: AGPL-3.0-only
import { MINUTE } from '../util/durations/index.std.js'; import { MINUTE } from '../util/durations/index.std.js';
import { clearTimeoutIfNecessary } from '../util/clearTimeoutIfNecessary.std.js';
import { explodePromise } from '../util/explodePromise.std.js'; import { explodePromise } from '../util/explodePromise.std.js';
import { toLogFormat } from '../types/errors.std.js'; import { missingCaseError } from '../util/missingCaseError.std.js';
import { createLogger } from '../logging/log.std.js'; import { createLogger } from '../logging/log.std.js';
const TICK_INTERVAL = MINUTE / 2;
const log = createLogger('TaskWithTimeout'); const log = createLogger('TaskWithTimeout');
type TaskType = { type TaskType = {
id: string; id: string;
startedAt: number | undefined; ticks: number;
suspend(): void; maxTicks: number;
resume(): void; reject: (error: Error) => void;
}; };
const tasks = new Set<TaskType>(); const tasks = new Set<TaskType>();
let shouldStartTimers = true; let shouldStartTicking = true;
let tickInterval: NodeJS.Timeout | undefined;
function maybeStartTicking(): void {
if (!shouldStartTicking) {
return;
}
// Already ticking
if (tickInterval != null) {
return;
}
log.info('starting tick');
tickInterval = setInterval(() => {
for (const task of tasks) {
task.ticks += 1;
if (task.ticks < task.maxTicks) {
continue;
}
tasks.delete(task);
task.reject(new Error(`TaskWithTimeout(${task.id}) timed out`));
}
}, TICK_INTERVAL);
}
export function suspendTasksWithTimeout(): void { export function suspendTasksWithTimeout(): void {
if (!shouldStartTicking) {
return;
}
log.info(`suspending ${tasks.size} tasks`); log.info(`suspending ${tasks.size} tasks`);
shouldStartTimers = false; shouldStartTicking = false;
for (const task of tasks) { if (tickInterval != null) {
task.suspend(); log.info('stopping tick');
clearInterval(tickInterval);
tickInterval = undefined;
} }
} }
export function resumeTasksWithTimeout(): void { export function resumeTasksWithTimeout(): void {
log.info(`resuming ${tasks.size} tasks`); if (shouldStartTicking) {
shouldStartTimers = true; return;
for (const task of tasks) {
task.resume();
} }
log.info(`resuming ${tasks.size} tasks`);
shouldStartTicking = true;
maybeStartTicking();
} }
export function reportLongRunningTasks(): void { export function reportLongRunningTasks(): void {
const now = Date.now();
for (const task of tasks) { for (const task of tasks) {
if (task.startedAt === undefined) { const duration = task.ticks * TICK_INTERVAL;
continue;
}
const duration = Math.max(0, now - task.startedAt);
if (duration > MINUTE) { if (duration > MINUTE) {
log.warn(`${task.id} has been running for ${duration}ms`); log.warn(`${task.id} has been running for ~${duration}ms`);
} }
} }
} }
export default function createTaskWithTimeout<T, Args extends Array<unknown>>( export async function runTaskWithTimeout<T>(
task: (...args: Args) => Promise<T>, task: () => Promise<T>,
id: string, id: string,
options: { timeout?: number } = {} taskType: 'long-running' | 'short-lived' = 'long-running'
): (...args: Args) => Promise<T> { ): Promise<T> {
const timeout = options.timeout || 30 * MINUTE; let maxTicks: number;
const timeoutError = new Error(`${id || ''} task did not complete in time.`); if (taskType === 'long-running') {
maxTicks = (30 * MINUTE) / TICK_INTERVAL;
} else if (taskType === 'short-lived') {
maxTicks = (2 * MINUTE) / TICK_INTERVAL;
} else {
throw missingCaseError(taskType);
}
return async (...args: Args) => { const { promise: timerPromise, reject } = explodePromise<never>();
let complete = false;
let timer: NodeJS.Timeout | undefined; const entry: TaskType = {
id,
const { promise: timerPromise, reject } = explodePromise<never>(); ticks: 0,
maxTicks,
const startTimer = () => { reject,
stopTimer();
if (complete) {
return;
}
entry.startedAt = Date.now();
timer = setTimeout(() => {
if (complete) {
log.warn(`${id} task timed out, but was already complete`);
return;
}
complete = true;
tasks.delete(entry);
log.error(toLogFormat(timeoutError));
reject(timeoutError);
}, timeout);
};
const stopTimer = () => {
clearTimeoutIfNecessary(timer);
timer = undefined;
};
const entry: TaskType = {
id,
startedAt: undefined,
suspend: () => {
log.warn(`${id} task suspended`);
stopTimer();
},
resume: () => {
log.warn(`${id} task resumed`);
startTimer();
},
};
tasks.add(entry);
if (shouldStartTimers) {
startTimer();
}
let result: unknown;
const run = async (): Promise<void> => {
result = await task(...args);
};
try {
await Promise.race([run(), timerPromise]);
return result as T;
} finally {
complete = true;
tasks.delete(entry);
stopTimer();
}
}; };
tasks.add(entry);
maybeStartTicking();
try {
return await Promise.race([task(), timerPromise]);
} finally {
tasks.delete(entry);
if (tasks.size === 0 && tickInterval != null) {
log.info('stopping tick');
clearInterval(tickInterval);
tickInterval = undefined;
}
}
} }

View File

@@ -2,15 +2,13 @@
// SPDX-License-Identifier: AGPL-3.0-only // SPDX-License-Identifier: AGPL-3.0-only
import PQueue from 'p-queue'; import PQueue from 'p-queue';
import createTaskWithTimeout from '../textsecure/TaskWithTimeout.std.js'; import { runTaskWithTimeout } from '../textsecure/TaskWithTimeout.std.js';
function createJobQueue(label: string) { function createJobQueue(label: string) {
const jobQueue = new PQueue({ concurrency: 1 }); const jobQueue = new PQueue({ concurrency: 1 });
return (job: () => Promise<void>, id = '') => { return (job: () => Promise<void>, id = '') => {
const taskWithTimeout = createTaskWithTimeout(job, `${label} ${id}`); return jobQueue.add(() => runTaskWithTimeout(job, `${label} ${id}`));
return jobQueue.add(taskWithTimeout);
}; };
} }