From 3beccbfd31cc10754200624da762e7d256e5e7a3 Mon Sep 17 00:00:00 2001 From: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com> Date: Tue, 25 Oct 2022 17:03:05 -0700 Subject: [PATCH] Reduce timeout of some long running tasks --- ts/background.ts | 5 + ts/textsecure/MessageReceiver.ts | 171 +++++++++++++++++++------------ ts/textsecure/TaskWithTimeout.ts | 25 ++++- 3 files changed, 134 insertions(+), 67 deletions(-) diff --git a/ts/background.ts b/ts/background.ts index fada558e6f..f0ca6a730b 100644 --- a/ts/background.ts +++ b/ts/background.ts @@ -17,6 +17,7 @@ import { HTTPError } from './textsecure/Errors'; import createTaskWithTimeout, { suspendTasksWithTimeout, resumeTasksWithTimeout, + reportLongRunningTasks, } from './textsecure/TaskWithTimeout'; import type { MessageAttributesType, @@ -995,6 +996,10 @@ export async function startApp(): Promise { } }, FIVE_MINUTES); + setInterval(() => { + reportLongRunningTasks(); + }, FIVE_MINUTES); + let mainWindowStats = { isMaximized: false, isFullScreen: false, diff --git a/ts/textsecure/MessageReceiver.ts b/ts/textsecure/MessageReceiver.ts index 1dc23b9fbd..198e21c476 100644 --- a/ts/textsecure/MessageReceiver.ts +++ b/ts/textsecure/MessageReceiver.ts @@ -174,6 +174,10 @@ export type MessageReceiverOptions = { serverTrustRoot: string; }; +const TASK_WITH_TIMEOUT_OPTIONS = { + timeout: 2 * durations.MINUTE, +}; + const LOG_UNEXPECTED_URGENT_VALUES = false; const MUST_BE_URGENT_TYPES: Array = [ 'message', @@ -331,9 +335,13 @@ export default class MessageReceiver if (request.verb === 'PUT' && request.path === '/api/v1/queue/empty') { this.incomingQueue.add( - createTaskWithTimeout(async () => { - this.onEmpty(); - }, 'incomingQueue/onEmpty') + createTaskWithTimeout( + async () => { + this.onEmpty(); + }, + 'incomingQueue/onEmpty', + TASK_WITH_TIMEOUT_OPTIONS + ) ); } return; @@ -407,12 +415,16 @@ export default class MessageReceiver } catch (e) { request.respond(500, 'Bad encrypted websocket message'); log.error('Error handling incoming message:', Errors.toLogFormat(e)); - await this.dispatchAndWait(new ErrorEvent(e)); + await this.dispatchAndWait('websocket request', new ErrorEvent(e)); } }; this.incomingQueue.add( - createTaskWithTimeout(job, 'incomingQueue/websocket') + createTaskWithTimeout( + job, + 'incomingQueue/websocket', + TASK_WITH_TIMEOUT_OPTIONS + ) ); } @@ -421,7 +433,8 @@ export default class MessageReceiver this.incomingQueue.add( createTaskWithTimeout( async () => this.queueAllCached(), - 'incomingQueue/queueAllCached' + 'incomingQueue/queueAllCached', + TASK_WITH_TIMEOUT_OPTIONS ) ); @@ -457,7 +470,11 @@ export default class MessageReceiver ); return this.incomingQueue.add( - createTaskWithTimeout(waitForIncomingQueue, 'drain/waitForIncoming') + createTaskWithTimeout( + waitForIncomingQueue, + 'drain/waitForIncoming', + TASK_WITH_TIMEOUT_OPTIONS + ) ); } @@ -605,11 +622,12 @@ export default class MessageReceiver // Private // - private async dispatchAndWait(event: Event): Promise { + private async dispatchAndWait(id: string, event: Event): Promise { this.appQueue.add( createTaskWithTimeout( async () => Promise.all(this.dispatchEvent(event)), - 'dispatchEvent' + `dispatchEvent(${event.type}, ${id})`, + TASK_WITH_TIMEOUT_OPTIONS ) ); } @@ -658,7 +676,9 @@ export default class MessageReceiver : this.decryptedQueue; try { - return await queue.add(createTaskWithTimeout(task, id)); + return await queue.add( + createTaskWithTimeout(task, id, TASK_WITH_TIMEOUT_OPTIONS) + ); } finally { this.updateProgress(this.count); } @@ -684,7 +704,9 @@ export default class MessageReceiver ); // We don't await here because we don't want this to gate future message processing - this.appQueue.add(createTaskWithTimeout(emitEmpty, 'emitEmpty')); + this.appQueue.add( + createTaskWithTimeout(emitEmpty, 'emitEmpty', TASK_WITH_TIMEOUT_OPTIONS) + ); }; const waitForEncryptedQueue = async () => { @@ -710,7 +732,11 @@ export default class MessageReceiver const waitForCacheAddBatcher = async () => { await this.decryptAndCacheBatcher.onIdle(); this.incomingQueue.add( - createTaskWithTimeout(waitForIncomingQueue, 'onEmpty/waitForIncoming') + createTaskWithTimeout( + waitForIncomingQueue, + 'onEmpty/waitForIncoming', + TASK_WITH_TIMEOUT_OPTIONS + ) ); }; @@ -810,7 +836,7 @@ export default class MessageReceiver async () => { this.queueDecryptedEnvelope(decryptedEnvelope, payloadPlaintext); }, - 'queueDecryptedEnvelope', + `queueDecryptedEnvelope(${getEnvelopeId(decryptedEnvelope)})`, TaskType.Encrypted ); } else { @@ -850,7 +876,8 @@ export default class MessageReceiver this.incomingQueue.add( createTaskWithTimeout( async () => this.queueAllCached(), - 'queueAllCached' + 'queueAllCached', + TASK_WITH_TIMEOUT_OPTIONS ) ); }, RETRY_TIMEOUT); @@ -1074,13 +1101,14 @@ export default class MessageReceiver const task = this.handleDecryptedEnvelope.bind(this, envelope, plaintext); const taskWithTimeout = createTaskWithTimeout( task, - `queueDecryptedEnvelope ${id}` + `queueDecryptedEnvelope ${id}`, + TASK_WITH_TIMEOUT_OPTIONS ); try { await this.addToQueue( taskWithTimeout, - 'dispatchEvent', + `handleDecryptedEnvelope(${id})`, TaskType.Decrypted ); } catch (error) { @@ -1125,7 +1153,7 @@ export default class MessageReceiver this.addToQueue( async () => this.dispatchEvent(new EnvelopeEvent(unsealedEnvelope)), - 'dispatchEvent', + `dispatchEvent(EnvelopeEvent(${logId}))`, TaskType.Decrypted ); @@ -1398,12 +1426,14 @@ export default class MessageReceiver if (syncMessage?.pniIdentity) { inProgressMessageType = 'pni identity'; await this.handlePNIIdentity(envelope, syncMessage.pniIdentity); + this.removeFromCache(envelope); return { plaintext: undefined, envelope }; } if (syncMessage?.pniChangeNumber) { inProgressMessageType = 'pni change number'; await this.handlePNIChangeNumber(envelope, syncMessage.pniChangeNumber); + this.removeFromCache(envelope); return { plaintext: undefined, envelope }; } @@ -1422,6 +1452,7 @@ export default class MessageReceiver (envelope.sourceUuid && this.isUuidBlocked(envelope.sourceUuid))) ) { log.info(`${logId}: Dropping non-GV2 message from blocked sender`); + this.removeFromCache(envelope); return { plaintext: undefined, envelope }; } @@ -1492,6 +1523,7 @@ export default class MessageReceiver logUnexpectedUrgentValue(envelope, 'deliveryReceipt'); await this.dispatchAndWait( + getEnvelopeId(envelope), new DeliveryEvent( { timestamp: envelope.timestamp, @@ -1822,6 +1854,8 @@ export default class MessageReceiver throw error; } + const envelopeId = getEnvelopeId(envelope); + if (uuid && deviceId) { const { cipherTextBytes, cipherTextType } = envelope; const event = new DecryptionErrorEvent( @@ -1842,11 +1876,10 @@ export default class MessageReceiver // Avoid deadlocks by scheduling processing on decrypted queue this.addToQueue( async () => this.dispatchEvent(event), - 'decrypted/dispatchEvent', + `decrypted/dispatchEvent/DecryptionErrorEvent(${envelopeId})`, TaskType.Decrypted ); } else { - const envelopeId = getEnvelopeId(envelope); this.removeFromCache(envelope); log.error( `MessageReceiver.decrypt: Envelope ${envelopeId} missing uuid or deviceId` @@ -1939,7 +1972,7 @@ export default class MessageReceiver }, this.removeFromCache.bind(this, envelope) ); - return this.dispatchAndWait(ev); + return this.dispatchAndWait(getEnvelopeId(envelope), ev); } private async handleStoryMessage( @@ -2036,7 +2069,7 @@ export default class MessageReceiver }, this.removeFromCache.bind(this, envelope) ); - this.dispatchAndWait(ev); + this.dispatchAndWait(logId, ev); return; } @@ -2094,7 +2127,7 @@ export default class MessageReceiver }, this.removeFromCache.bind(this, envelope) ); - this.dispatchAndWait(ev); + this.dispatchAndWait(logId, ev); }); return; } @@ -2117,7 +2150,7 @@ export default class MessageReceiver }, this.removeFromCache.bind(this, envelope) ); - return this.dispatchAndWait(ev); + return this.dispatchAndWait(logId, ev); } private async handleDataMessage( @@ -2148,7 +2181,7 @@ export default class MessageReceiver return undefined; } - await this.checkGroupV1Data(msg); + this.checkGroupV1Data(msg); if (msg.flags && msg.flags & Proto.DataMessage.Flags.END_SESSION) { p = this.handleEndSession(envelope, new UUID(destination)); @@ -2170,7 +2203,7 @@ export default class MessageReceiver }, this.removeFromCache.bind(this, envelope) ); - return this.dispatchAndWait(ev); + return this.dispatchAndWait(logId, ev); } await p; @@ -2236,7 +2269,7 @@ export default class MessageReceiver }, this.removeFromCache.bind(this, envelope) ); - return this.dispatchAndWait(ev); + return this.dispatchAndWait(logId, ev); } private async maybeUpdateTimestamp( @@ -2305,10 +2338,7 @@ export default class MessageReceiver content.decryptionErrorMessage && Bytes.isNotEmpty(content.decryptionErrorMessage) ) { - await this.handleDecryptionError( - envelope, - content.decryptionErrorMessage - ); + this.handleDecryptionError(envelope, content.decryptionErrorMessage); return; } if (content.syncMessage) { @@ -2323,7 +2353,7 @@ export default class MessageReceiver return; } if (content.nullMessage) { - await this.handleNullMessage(envelope); + this.handleNullMessage(envelope); return; } if (content.callingMessage) { @@ -2335,7 +2365,7 @@ export default class MessageReceiver return; } if (content.typingMessage) { - await this.handleTypingMessage(envelope, content.typingMessage); + this.handleTypingMessage(envelope, content.typingMessage); return; } @@ -2351,10 +2381,10 @@ export default class MessageReceiver } } - private async handleDecryptionError( + private handleDecryptionError( envelope: UnsealedEnvelope, decryptionError: Uint8Array - ) { + ): void { const logId = getEnvelopeId(envelope); log.info(`handleDecryptionError: ${logId}`); @@ -2381,7 +2411,7 @@ export default class MessageReceiver }, () => this.removeFromCache(envelope) ); - await this.dispatchEvent(event); + this.dispatchEvent(event); } private async handleSenderKeyDistributionMessage( @@ -2506,6 +2536,8 @@ export default class MessageReceiver logUnexpectedUrgentValue(envelope, type); + const logId = getEnvelopeId(envelope); + await Promise.all( receiptMessage.timestamp.map(async rawTimestamp => { const ev = new EventClass( @@ -2519,15 +2551,15 @@ export default class MessageReceiver }, this.removeFromCache.bind(this, envelope) ); - await this.dispatchAndWait(ev); + await this.dispatchAndWait(logId, ev); }) ); } - private async handleTypingMessage( + private handleTypingMessage( envelope: UnsealedEnvelope, typingMessage: Proto.ITypingMessage - ): Promise { + ): void { this.removeFromCache(envelope); logUnexpectedUrgentValue(envelope, 'typing'); @@ -2564,7 +2596,7 @@ export default class MessageReceiver } } - await this.dispatchEvent( + this.dispatchEvent( new TypingEvent({ sender: envelope.source, senderUuid: envelope.sourceUuid, @@ -2640,9 +2672,7 @@ export default class MessageReceiver return Bytes.toBase64(data.id); } - private async checkGroupV1Data( - message: Readonly - ): Promise { + private checkGroupV1Data(message: Readonly): void { const { group } = message; if (!group) { @@ -2744,7 +2774,8 @@ export default class MessageReceiver }, this.removeFromCache.bind(this, envelope) ); - return this.dispatchAndWait(ev); + const logId = getEnvelopeId(envelope); + return this.dispatchAndWait(logId, ev); } if (sentMessage.storyMessage) { @@ -2767,7 +2798,7 @@ export default class MessageReceiver return; } - await this.checkGroupV1Data(sentMessage.message); + this.checkGroupV1Data(sentMessage.message); strictAssert(sentMessage.timestamp, 'sent message without timestamp'); @@ -2847,7 +2878,8 @@ export default class MessageReceiver envelope: ProcessedEnvelope, configuration: Proto.SyncMessage.IConfiguration ): Promise { - log.info('got configuration sync message'); + const logId = getEnvelopeId(envelope); + log.info('got configuration sync message', logId); logUnexpectedUrgentValue(envelope, 'configurationSync'); @@ -2855,14 +2887,15 @@ export default class MessageReceiver configuration, this.removeFromCache.bind(this, envelope) ); - return this.dispatchAndWait(ev); + return this.dispatchAndWait(logId, ev); } private async handleViewOnceOpen( envelope: ProcessedEnvelope, sync: Proto.SyncMessage.IViewOnceOpen ): Promise { - log.info('got view once open sync message'); + const logId = getEnvelopeId(envelope); + log.info('got view once open sync message', logId); logUnexpectedUrgentValue(envelope, 'viewOnceSync'); @@ -2877,14 +2910,15 @@ export default class MessageReceiver this.removeFromCache.bind(this, envelope) ); - return this.dispatchAndWait(ev); + return this.dispatchAndWait(logId, ev); } private async handleMessageRequestResponse( envelope: ProcessedEnvelope, sync: Proto.SyncMessage.IMessageRequestResponse ): Promise { - log.info('got message request response sync message'); + const logId = getEnvelopeId(envelope); + log.info('got message request response sync message', logId); logUnexpectedUrgentValue(envelope, 'messageRequestSync'); @@ -2921,14 +2955,15 @@ export default class MessageReceiver this.removeFromCache.bind(this, envelope) ); - return this.dispatchAndWait(ev); + return this.dispatchAndWait(logId, ev); } private async handleFetchLatest( envelope: ProcessedEnvelope, sync: Proto.SyncMessage.IFetchLatest ): Promise { - log.info('got fetch latest sync message'); + const logId = getEnvelopeId(envelope); + log.info('got fetch latest sync message', logId); logUnexpectedUrgentValue(envelope, 'fetchLatestManifestSync'); @@ -2937,14 +2972,15 @@ export default class MessageReceiver this.removeFromCache.bind(this, envelope) ); - return this.dispatchAndWait(ev); + return this.dispatchAndWait(logId, ev); } private async handleKeys( envelope: ProcessedEnvelope, sync: Proto.SyncMessage.IKeys ): Promise { - log.info('got keys sync message'); + const logId = getEnvelopeId(envelope); + log.info('got keys sync message', logId); logUnexpectedUrgentValue(envelope, 'keySync'); @@ -2957,7 +2993,7 @@ export default class MessageReceiver this.removeFromCache.bind(this, envelope) ); - return this.dispatchAndWait(ev); + return this.dispatchAndWait(logId, ev); } // Runs on TaskType.Encrypted queue @@ -3019,7 +3055,8 @@ export default class MessageReceiver operations: Array ): Promise { const ENUM = Proto.SyncMessage.StickerPackOperation.Type; - log.info('got sticker pack operation sync message'); + const logId = getEnvelopeId(envelope); + log.info('got sticker pack operation sync message', logId); logUnexpectedUrgentValue(envelope, 'stickerPackSync'); const stickerPacks = operations.map(operation => ({ @@ -3034,14 +3071,15 @@ export default class MessageReceiver this.removeFromCache.bind(this, envelope) ); - return this.dispatchAndWait(ev); + return this.dispatchAndWait(logId, ev); } private async handleRead( envelope: ProcessedEnvelope, read: Array ): Promise { - log.info('MessageReceiver.handleRead', getEnvelopeId(envelope)); + const logId = getEnvelopeId(envelope); + log.info('MessageReceiver.handleRead', logId); logUnexpectedUrgentValue(envelope, 'readSync'); @@ -3058,7 +3096,7 @@ export default class MessageReceiver }, this.removeFromCache.bind(this, envelope) ); - results.push(this.dispatchAndWait(ev)); + results.push(this.dispatchAndWait(logId, ev)); } await Promise.all(results); } @@ -3067,7 +3105,8 @@ export default class MessageReceiver envelope: ProcessedEnvelope, viewed: ReadonlyArray ): Promise { - log.info('MessageReceiver.handleViewed', getEnvelopeId(envelope)); + const logId = getEnvelopeId(envelope); + log.info('MessageReceiver.handleViewed', logId); logUnexpectedUrgentValue(envelope, 'viewSync'); @@ -3084,7 +3123,7 @@ export default class MessageReceiver }, this.removeFromCache.bind(this, envelope) ); - await this.dispatchAndWait(ev); + await this.dispatchAndWait(logId, ev); }) ); } @@ -3093,7 +3132,8 @@ export default class MessageReceiver envelope: ProcessedEnvelope, contacts: Proto.SyncMessage.IContacts ): Promise { - log.info(`MessageReceiver: handleContacts ${getEnvelopeId(envelope)}`); + const logId = getEnvelopeId(envelope); + log.info(`MessageReceiver: handleContacts ${logId}`); const { blob } = contacts; if (!blob) { throw new Error('MessageReceiver.handleContacts: blob field was missing'); @@ -3112,7 +3152,7 @@ export default class MessageReceiver envelope.receivedAtCounter, envelope.timestamp ); - await this.dispatchAndWait(contactSync); + await this.dispatchAndWait(logId, contactSync); log.info('handleContacts: finished'); } @@ -3121,8 +3161,9 @@ export default class MessageReceiver envelope: ProcessedEnvelope, groups: Proto.SyncMessage.IGroups ): Promise { + const logId = getEnvelopeId(envelope); log.info('group sync'); - log.info(`MessageReceiver: handleGroups ${getEnvelopeId(envelope)}`); + log.info(`MessageReceiver: handleGroups ${logId}`); const { blob } = groups; this.removeFromCache(envelope); @@ -3157,7 +3198,7 @@ export default class MessageReceiver }, envelope.receivedAtCounter ); - const promise = this.dispatchAndWait(ev).catch(e => { + const promise = this.dispatchAndWait(logId, ev).catch(e => { log.error('error processing group', e); }); groupDetails = groupBuffer.next(); @@ -3167,7 +3208,7 @@ export default class MessageReceiver await Promise.all(promises); const ev = new GroupSyncEvent(); - return this.dispatchAndWait(ev); + return this.dispatchAndWait(logId, ev); } private async handleBlocked( diff --git a/ts/textsecure/TaskWithTimeout.ts b/ts/textsecure/TaskWithTimeout.ts index 3f163a3798..83c2e0d06f 100644 --- a/ts/textsecure/TaskWithTimeout.ts +++ b/ts/textsecure/TaskWithTimeout.ts @@ -1,13 +1,15 @@ // Copyright 2020-2022 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only -import * as durations from '../util/durations'; +import { MINUTE } from '../util/durations'; import { clearTimeoutIfNecessary } from '../util/clearTimeoutIfNecessary'; import { explodePromise } from '../util/explodePromise'; import { toLogFormat } from '../types/errors'; import * as log from '../logging/log'; type TaskType = { + id: string; + startedAt: number | undefined; suspend(): void; resume(): void; }; @@ -31,12 +33,28 @@ export function resumeTasksWithTimeout(): void { } } +export function reportLongRunningTasks(): void { + const now = Date.now(); + for (const task of tasks) { + if (task.startedAt === undefined) { + continue; + } + + const duration = Math.max(0, now - task.startedAt); + if (duration > MINUTE) { + log.warn( + `TaskWithTimeout: ${task.id} has been running for ${duration}ms` + ); + } + } +} + export default function createTaskWithTimeout>( task: (...args: Args) => Promise, id: string, options: { timeout?: number } = {} ): (...args: Args) => Promise { - const timeout = options.timeout || 30 * durations.MINUTE; + const timeout = options.timeout || 30 * MINUTE; const timeoutError = new Error(`${id || ''} task did not complete in time.`); @@ -54,6 +72,7 @@ export default function createTaskWithTimeout>( return; } + entry.startedAt = Date.now(); timer = setTimeout(() => { if (complete) { return; @@ -72,6 +91,8 @@ export default function createTaskWithTimeout>( }; const entry: TaskType = { + id, + startedAt: undefined, suspend: stopTimer, resume: startTimer, };