diff --git a/ts/jobs/helpers/findRetryAfterTimeFromError.ts b/ts/jobs/helpers/findRetryAfterTimeFromError.ts new file mode 100644 index 0000000000..d6ffcfd071 --- /dev/null +++ b/ts/jobs/helpers/findRetryAfterTimeFromError.ts @@ -0,0 +1,20 @@ +// Copyright 2021-2022 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { isRecord } from '../../util/isRecord'; +import { HTTPError } from '../../textsecure/Errors'; +import { parseRetryAfter } from '../../util/parseRetryAfter'; + +export function findRetryAfterTimeFromError(err: unknown): number { + let rawValue: unknown; + + if (isRecord(err)) { + if (isRecord(err.responseHeaders)) { + rawValue = err.responseHeaders['retry-after']; + } else if (err.httpError instanceof HTTPError) { + rawValue = err.httpError.responseHeaders?.['retry-after']; + } + } + + return parseRetryAfter(rawValue); +} diff --git a/ts/jobs/helpers/handleMultipleSendErrors.ts b/ts/jobs/helpers/handleMultipleSendErrors.ts new file mode 100644 index 0000000000..5e9d8473f9 --- /dev/null +++ b/ts/jobs/helpers/handleMultipleSendErrors.ts @@ -0,0 +1,70 @@ +// Copyright 2022 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import type { LoggerType } from '../../types/Logging'; +import * as Errors from '../../types/errors'; +import { sleepFor413RetryAfterTime } from './sleepFor413RetryAfterTime'; +import { getHttpErrorCode } from './getHttpErrorCode'; +import { strictAssert } from '../../util/assert'; +import { findRetryAfterTimeFromError } from './findRetryAfterTimeFromError'; + +export async function handleMultipleSendErrors({ + errors, + isFinalAttempt, + log, + markFailed, + timeRemaining, +}: Readonly<{ + errors: ReadonlyArray; + isFinalAttempt: boolean; + log: Pick; + markFailed: (() => void) | (() => Promise); + timeRemaining: number; +}>): Promise { + strictAssert(errors.length, 'Expected at least one error'); + + const formattedErrors: Array = []; + + let retryAfterError: unknown; + let longestRetryAfterTime = -Infinity; + + let serverAskedUsToStop = false; + + errors.forEach(error => { + formattedErrors.push(Errors.toLogFormat(error)); + + const errorCode = getHttpErrorCode(error); + if (errorCode === 413) { + const retryAfterTime = findRetryAfterTimeFromError(error); + if (retryAfterTime > longestRetryAfterTime) { + retryAfterError = error; + longestRetryAfterTime = retryAfterTime; + } + } else if (errorCode === 508) { + serverAskedUsToStop = true; + } + }); + + log.info( + `${formattedErrors.length} send error(s): ${formattedErrors.join(',')}` + ); + + if (isFinalAttempt || serverAskedUsToStop) { + await markFailed(); + } + + if (serverAskedUsToStop) { + log.info('server responded with 508. Giving up on this job'); + return; + } + + if (retryAfterError && !isFinalAttempt) { + await sleepFor413RetryAfterTime({ + err: retryAfterError, + log, + timeRemaining, + }); + } + + throw errors[0]; +} diff --git a/ts/jobs/helpers/sleepFor413RetryAfterTime.ts b/ts/jobs/helpers/sleepFor413RetryAfterTime.ts index 75da9608d3..a9c1e7656b 100644 --- a/ts/jobs/helpers/sleepFor413RetryAfterTime.ts +++ b/ts/jobs/helpers/sleepFor413RetryAfterTime.ts @@ -1,11 +1,9 @@ -// Copyright 2021 Signal Messenger, LLC +// Copyright 2021-2022 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only import type { LoggerType } from '../../types/Logging'; import { sleep } from '../../util/sleep'; -import { parseRetryAfter } from '../../util/parseRetryAfter'; -import { isRecord } from '../../util/isRecord'; -import { HTTPError } from '../../textsecure/Errors'; +import { findRetryAfterTimeFromError } from './findRetryAfterTimeFromError'; export async function sleepFor413RetryAfterTime({ err, @@ -20,10 +18,7 @@ export async function sleepFor413RetryAfterTime({ return; } - const retryAfter = Math.min( - parseRetryAfter(findRetryAfterTime(err)), - timeRemaining - ); + const retryAfter = Math.min(findRetryAfterTimeFromError(err), timeRemaining); log.info( `Got a 413 response code. Sleeping for ${retryAfter} millisecond(s)` @@ -31,19 +26,3 @@ export async function sleepFor413RetryAfterTime({ await sleep(retryAfter); } - -function findRetryAfterTime(err: unknown): unknown { - if (!isRecord(err)) { - return undefined; - } - - if (isRecord(err.responseHeaders)) { - return err.responseHeaders['retry-after']; - } - - if (err.httpError instanceof HTTPError) { - return err.httpError.responseHeaders?.['retry-after']; - } - - return undefined; -} diff --git a/ts/jobs/normalMessageSendJobQueue.ts b/ts/jobs/normalMessageSendJobQueue.ts index 3c8b2a02f7..238e89c75a 100644 --- a/ts/jobs/normalMessageSendJobQueue.ts +++ b/ts/jobs/normalMessageSendJobQueue.ts @@ -5,7 +5,6 @@ import type PQueue from 'p-queue'; import type { LoggerType } from '../types/Logging'; import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff'; import { commonShouldJobContinue } from './helpers/commonShouldJobContinue'; -import { sleepFor413RetryAfterTime } from './helpers/sleepFor413RetryAfterTime'; import { InMemoryQueues } from './helpers/InMemoryQueues'; import type { MessageModel } from '../models/messages'; import { getMessageById } from '../messages/getMessageById'; @@ -21,7 +20,6 @@ import { handleMessageSend } from '../util/handleMessageSend'; import type { CallbackResultType } from '../textsecure/Types.d'; import { isSent } from '../messages/MessageSendState'; import { getLastChallengeError, isOutgoing } from '../state/selectors/message'; -import * as Errors from '../types/errors'; import type { AttachmentType } from '../textsecure/SendMessage'; import type { LinkPreviewType } from '../types/message/LinkPreviews'; import type { BodyRangesType } from '../types/Util'; @@ -29,7 +27,7 @@ import type { WhatIsThis } from '../window.d'; import { JobQueue } from './JobQueue'; import { jobQueueDatabaseStore } from './JobQueueDatabaseStore'; -import { getHttpErrorCode } from './helpers/getHttpErrorCode'; +import { handleMultipleSendErrors } from './helpers/handleMultipleSendErrors'; const { loadAttachmentData, loadPreviewData, loadQuoteData, loadStickerData } = window.Signal.Migrations; @@ -288,47 +286,14 @@ export class NormalMessageSendJobQueue extends JobQueue = []; - let serverAskedUsToStop = false; - let retryAfterError: unknown; - messageSendErrors.forEach((messageSendError: unknown) => { - formattedMessageSendErrors.push(Errors.toLogFormat(messageSendError)); - switch (getHttpErrorCode(messageSendError)) { - case 413: - retryAfterError ||= messageSendError; - break; - case 508: - serverAskedUsToStop = true; - break; - default: - break; - } + } catch (thrownError: unknown) { + await handleMultipleSendErrors({ + errors: [thrownError, ...messageSendErrors], + isFinalAttempt, + log, + markFailed: () => markMessageFailed(message, messageSendErrors), + timeRemaining, }); - log.info( - `${ - messageSendErrors.length - } message send error(s): ${formattedMessageSendErrors.join(',')}` - ); - - if (isFinalAttempt || serverAskedUsToStop) { - await markMessageFailed(message, messageSendErrors); - } - - if (serverAskedUsToStop) { - log.info('server responded with 508. Giving up on this job'); - return; - } - - if (!isFinalAttempt && retryAfterError) { - await sleepFor413RetryAfterTime({ - err: retryAfterError, - log, - timeRemaining, - }); - } - - throw err; } } } diff --git a/ts/jobs/reactionJobQueue.ts b/ts/jobs/reactionJobQueue.ts index 9e4e7d02e5..dea1de8fa2 100644 --- a/ts/jobs/reactionJobQueue.ts +++ b/ts/jobs/reactionJobQueue.ts @@ -1,4 +1,4 @@ -// Copyright 2021 Signal Messenger, LLC +// Copyright 2021-2022 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only import * as z from 'zod'; @@ -28,7 +28,7 @@ import { UUID } from '../types/UUID'; import { JobQueue } from './JobQueue'; import { jobQueueDatabaseStore } from './JobQueueDatabaseStore'; import { commonShouldJobContinue } from './helpers/commonShouldJobContinue'; -import { handleCommonJobRequestError } from './helpers/handleCommonJobRequestError'; +import { handleMultipleSendErrors } from './helpers/handleMultipleSendErrors'; import { InMemoryQueues } from './helpers/InMemoryQueues'; const MAX_RETRY_TIME = durations.DAY; @@ -114,6 +114,11 @@ export class ReactionJobQueue extends JobQueue { return; } + let sendErrors: Array = []; + const saveErrors = (errors: Array): void => { + sendErrors = errors; + }; + try { const conversation = message.getConversation(); if (!conversation) { @@ -157,6 +162,7 @@ export class ReactionJobQueue extends JobQueue { }); ephemeralMessageForReactionSend.doNotSave = true; + let didFullySend: boolean; const successfulConversationIds = new Set(); if (recipientIdentifiersWithoutMe.length === 0) { @@ -173,8 +179,12 @@ export class ReactionJobQueue extends JobQueue { recipients: allRecipientIdentifiers, timestamp: pendingReaction.timestamp, }); - await ephemeralMessageForReactionSend.sendSyncMessageOnly(dataMessage); + await ephemeralMessageForReactionSend.sendSyncMessageOnly( + dataMessage, + saveErrors + ); + didFullySend = true; successfulConversationIds.add(ourConversationId); } else { const sendOptions = await getSendOptions(conversation.attributes); @@ -226,9 +236,11 @@ export class ReactionJobQueue extends JobQueue { handleMessageSend(promise, { messageIds: [messageId], sendType: 'reaction', - }) + }), + saveErrors ); + didFullySend = true; const reactionSendStateByConversationId = ephemeralMessageForReactionSend.get('sendStateByConversationId') || {}; @@ -237,6 +249,8 @@ export class ReactionJobQueue extends JobQueue { )) { if (isSent(sendState.status)) { successfulConversationIds.add(conversationId); + } else { + didFullySend = false; } } } @@ -248,15 +262,17 @@ export class ReactionJobQueue extends JobQueue { ); setReactions(message, newReactions); - const didFullySend = true; if (!didFullySend) { throw new Error('reaction did not fully send'); } - } catch (err: unknown) { - if (isFinalAttempt) { - markReactionFailed(message, pendingReaction); - } - await handleCommonJobRequestError({ err, log, timeRemaining }); + } catch (thrownError: unknown) { + await handleMultipleSendErrors({ + errors: [thrownError, ...sendErrors], + isFinalAttempt, + log, + markFailed: () => markReactionFailed(message, pendingReaction), + timeRemaining, + }); } finally { await window.Signal.Data.saveMessage(message.attributes, { ourUuid }); } diff --git a/ts/test-node/jobs/helpers/findRetryAfterTimeFromError_test.ts b/ts/test-node/jobs/helpers/findRetryAfterTimeFromError_test.ts new file mode 100644 index 0000000000..a5f808eae4 --- /dev/null +++ b/ts/test-node/jobs/helpers/findRetryAfterTimeFromError_test.ts @@ -0,0 +1,75 @@ +// Copyright 2022 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { assert } from 'chai'; + +import { findRetryAfterTimeFromError } from '../../../jobs/helpers/findRetryAfterTimeFromError'; +import { HTTPError } from '../../../textsecure/Errors'; + +describe('findRetryAfterTimeFromError', () => { + it('returns 1 second if no Retry-After time is found', () => { + [ + undefined, + null, + {}, + { responseHeaders: {} }, + { responseHeaders: { 'retry-after': 'garbage' } }, + { + httpError: new HTTPError('Slow down', { + code: 413, + headers: {}, + response: {}, + }), + }, + { + httpError: new HTTPError('Slow down', { + code: 413, + headers: { 'retry-after': 'garbage' }, + response: {}, + }), + }, + ].forEach(input => { + assert.strictEqual(findRetryAfterTimeFromError(input), 1000); + }); + }); + + it("returns 1 second if a Retry-After time is found, but it's less than 1 second", () => { + ['0', '-99', '0.5'].forEach(headerValue => { + const input = { responseHeaders: { 'retry-after': headerValue } }; + assert.strictEqual(findRetryAfterTimeFromError(input), 1000); + }); + }); + + it('returns 1 second for extremely large numbers', () => { + const input = { responseHeaders: { 'retry-after': '999999999999999999' } }; + assert.strictEqual(findRetryAfterTimeFromError(input), 1000); + }); + + it('finds the retry-after time on top-level response headers', () => { + const input = { responseHeaders: { 'retry-after': '1234' } }; + assert.strictEqual(findRetryAfterTimeFromError(input), 1234 * 1000); + }); + + it("finds the retry-after time on an HTTP error's response headers", () => { + const input = { + httpError: new HTTPError('Slow down', { + code: 413, + headers: { 'retry-after': '1234' }, + response: {}, + }), + }; + assert.strictEqual(findRetryAfterTimeFromError(input), 1234 * 1000); + }); + + it('prefers the top-level response headers over an HTTP error', () => { + const input = { + responseHeaders: { 'retry-after': '1234' }, + httpError: new HTTPError('Slow down', { + code: 413, + headers: { 'retry-after': '999' }, + response: {}, + }), + }; + assert.strictEqual(findRetryAfterTimeFromError(input), 1234 * 1000); + }); +}); diff --git a/ts/test-node/jobs/helpers/handleMultipleSendErrors_test.ts b/ts/test-node/jobs/helpers/handleMultipleSendErrors_test.ts new file mode 100644 index 0000000000..24e316da11 --- /dev/null +++ b/ts/test-node/jobs/helpers/handleMultipleSendErrors_test.ts @@ -0,0 +1,150 @@ +// Copyright 2022 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { assert } from 'chai'; +import * as sinon from 'sinon'; +import { noop } from 'lodash'; +import { HTTPError } from '../../../textsecure/Errors'; +import { SECOND } from '../../../util/durations'; + +import { handleMultipleSendErrors } from '../../../jobs/helpers/handleMultipleSendErrors'; + +describe('handleMultipleSendErrors', () => { + const make413 = (retryAfter: number): HTTPError => + new HTTPError('Slow down', { + code: 413, + headers: { 'retry-after': retryAfter.toString() }, + response: {}, + }); + + const defaultOptions = { + isFinalAttempt: false, + log: { info: noop }, + markFailed: () => { + throw new Error('This should not be called'); + }, + timeRemaining: 1234, + }; + + let sandbox: sinon.SinonSandbox; + let clock: sinon.SinonFakeTimers; + + beforeEach(() => { + sandbox = sinon.createSandbox(); + clock = sandbox.useFakeTimers(); + }); + + afterEach(() => { + sandbox.restore(); + }); + + it('throws the first provided error', async () => { + await assert.isRejected( + handleMultipleSendErrors({ + ...defaultOptions, + errors: [new Error('first'), new Error('second')], + }), + 'first' + ); + }); + + it("marks the send failed if it's the final attempt", async () => { + const markFailed = sinon.stub(); + + await assert.isRejected( + handleMultipleSendErrors({ + ...defaultOptions, + errors: [new Error('uh oh')], + markFailed, + isFinalAttempt: true, + }) + ); + + sinon.assert.calledOnceWithExactly(markFailed); + }); + + describe('413 handling', () => { + it('sleeps for the longest 413 Retry-After time', async () => { + let done = false; + + (async () => { + try { + await handleMultipleSendErrors({ + ...defaultOptions, + errors: [ + new Error('Other'), + make413(10), + make413(999), + make413(20), + ], + timeRemaining: 99999999, + }); + } catch (err) { + // No-op + } finally { + done = true; + } + })(); + + await clock.tickAsync(900 * SECOND); + assert.isFalse(done, "Didn't sleep for long enough"); + await clock.tickAsync(100 * SECOND); + assert.isTrue(done, 'Slept for too long'); + }); + + it("doesn't sleep longer than the remaining time", async () => { + let done = false; + + (async () => { + try { + await handleMultipleSendErrors({ + ...defaultOptions, + errors: [make413(9999)], + timeRemaining: 99, + }); + } catch (err) { + // No-op + } finally { + done = true; + } + })(); + + await clock.tickAsync(100); + assert.isTrue(done); + }); + + it("doesn't sleep if it's the final attempt", async () => { + await assert.isRejected( + handleMultipleSendErrors({ + ...defaultOptions, + errors: [new Error('uh oh')], + isFinalAttempt: true, + }) + ); + }); + }); + + describe('508 handling', () => { + it('resolves with no error if any 508 is received', async () => { + await assert.isFulfilled( + handleMultipleSendErrors({ + ...defaultOptions, + errors: [new Error('uh oh'), { code: 508 }, make413(99999)], + markFailed: noop, + }) + ); + }); + + it('marks the send failed on a 508', async () => { + const markFailed = sinon.stub(); + + await handleMultipleSendErrors({ + ...defaultOptions, + errors: [{ code: 508 }], + markFailed, + }); + + sinon.assert.calledOnceWithExactly(markFailed); + }); + }); +});