diff --git a/ts/jobs/JobQueue.ts b/ts/jobs/JobQueue.ts index 37dbee052d..2facb669f0 100644 --- a/ts/jobs/JobQueue.ts +++ b/ts/jobs/JobQueue.ts @@ -93,7 +93,10 @@ export abstract class JobQueue { * If it rejects, the job will be retried up to `maxAttempts - 1` times, after which it * will be deleted from the store. */ - protected abstract run(job: Readonly>): Promise; + protected abstract run( + job: Readonly>, + extra?: Readonly<{ attempt: number }> + ): Promise; /** * Start streaming jobs from the store. @@ -198,7 +201,7 @@ export abstract class JobQueue { // We want an `await` in the loop, as we don't want a single job running more // than once at a time. Ideally, the job will succeed on the first attempt. // eslint-disable-next-line no-await-in-loop - await this.run(parsedJob); + await this.run(parsedJob, { attempt }); result = { success: true }; log.info( `${this.logPrefix} job ${storedJob.id} succeeded on attempt ${attempt}` diff --git a/ts/jobs/initializeAllJobQueues.ts b/ts/jobs/initializeAllJobQueues.ts index 17cc338adb..0068202ff2 100644 --- a/ts/jobs/initializeAllJobQueues.ts +++ b/ts/jobs/initializeAllJobQueues.ts @@ -3,6 +3,7 @@ import type { WebAPIType } from '../textsecure/WebAPI'; +import { readSyncJobQueue } from './readSyncJobQueue'; import { removeStorageKeyJobQueue } from './removeStorageKeyJobQueue'; import { reportSpamJobQueue } from './reportSpamJobQueue'; @@ -16,6 +17,7 @@ export function initializeAllJobQueues({ }): void { reportSpamJobQueue.initialize({ server }); + readSyncJobQueue.streamJobs(); removeStorageKeyJobQueue.streamJobs(); reportSpamJobQueue.streamJobs(); } diff --git a/ts/jobs/readSyncJobQueue.ts b/ts/jobs/readSyncJobQueue.ts new file mode 100644 index 0000000000..7dc8c06766 --- /dev/null +++ b/ts/jobs/readSyncJobQueue.ts @@ -0,0 +1,118 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +/* eslint-disable class-methods-use-this */ + +import * as z from 'zod'; +import * as moment from 'moment'; +import { getSendOptions } from '../util/getSendOptions'; +import { handleMessageSend } from '../util/handleMessageSend'; +import { isNotNil } from '../util/isNotNil'; +import { sleep } from '../util/sleep'; +import { + exponentialBackoffSleepTime, + exponentialBackoffMaxAttempts, +} from '../util/exponentialBackoff'; +import * as log from '../logging/log'; +import { isDone as isDeviceLinked } from '../util/registration'; +import { waitForOnline } from '../util/waitForOnline'; +import { parseIntWithFallback } from '../util/parseIntWithFallback'; + +import { JobQueue } from './JobQueue'; +import { jobQueueDatabaseStore } from './JobQueueDatabaseStore'; + +const MAX_RETRY_TIME = moment.duration(1, 'day').asMilliseconds(); + +const readSyncJobDataSchema = z.object({ + readSyncs: z.array( + z.object({ + messageId: z.string().optional(), + senderE164: z.string().optional(), + senderUuid: z.string().optional(), + timestamp: z.number(), + }) + ), +}); + +export type ReadSyncJobData = z.infer; + +export class ReadSyncJobQueue extends JobQueue { + protected parseData(data: unknown): ReadSyncJobData { + return readSyncJobDataSchema.parse(data); + } + + protected async run( + { data, timestamp }: Readonly<{ data: ReadSyncJobData; timestamp: number }>, + { attempt }: Readonly<{ attempt: number }> + ): Promise { + const { readSyncs } = data; + if (!readSyncs.length) { + log.info( + "readSyncJobQueue: skipping this job because there's nothing to sync" + ); + return; + } + + const maxJobAge = timestamp + MAX_RETRY_TIME; + const timeRemaining = maxJobAge - Date.now(); + + if (timeRemaining <= 0) { + log.info("readSyncJobQueue: giving up because it's been too long"); + return; + } + + try { + await waitForOnline(window.navigator, window, { timeout: timeRemaining }); + } catch (err) { + log.info("readSyncJobQueue: didn't come online in time, giving up"); + return; + } + + await new Promise(resolve => { + window.storage.onready(resolve); + }); + + if (!isDeviceLinked()) { + log.info("readSyncJobQueue: skipping this job because we're unlinked"); + return; + } + + await sleep(exponentialBackoffSleepTime(attempt)); + + const messageIds = readSyncs.map(item => item.messageId).filter(isNotNil); + + const ourConversation = window.ConversationController.getOurConversationOrThrow(); + const sendOptions = await getSendOptions(ourConversation.attributes, { + syncMessage: true, + }); + + try { + await handleMessageSend( + window.textsecure.messaging.syncReadMessages(readSyncs, sendOptions), + { messageIds, sendType: 'readSync' } + ); + } catch (err: unknown) { + if (!(err instanceof Error)) { + throw err; + } + + const code = parseIntWithFallback(err.code, -1); + if (code === 508) { + log.info( + 'readSyncJobQueue: server responded with 508. Giving up on this job' + ); + return; + } + + throw err; + } + } +} + +export const readSyncJobQueue = new ReadSyncJobQueue({ + store: jobQueueDatabaseStore, + + queueType: 'read sync', + + maxAttempts: exponentialBackoffMaxAttempts(MAX_RETRY_TIME), +}); diff --git a/ts/test-both/util/exponentialBackoff_test.ts b/ts/test-both/util/exponentialBackoff_test.ts new file mode 100644 index 0000000000..ddd3c20113 --- /dev/null +++ b/ts/test-both/util/exponentialBackoff_test.ts @@ -0,0 +1,46 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { assert } from 'chai'; +import * as moment from 'moment'; + +import { + exponentialBackoffSleepTime, + exponentialBackoffMaxAttempts, +} from '../../util/exponentialBackoff'; + +describe('exponential backoff utilities', () => { + describe('exponentialBackoffSleepTime', () => { + it('returns slowly growing values', () => { + assert.strictEqual(exponentialBackoffSleepTime(1), 0); + assert.strictEqual(exponentialBackoffSleepTime(2), 190); + assert.strictEqual(exponentialBackoffSleepTime(3), 361); + assert.approximately(exponentialBackoffSleepTime(4), 686, 1); + assert.approximately(exponentialBackoffSleepTime(5), 1303, 1); + }); + + it('plateaus at a maximum after 15 attempts', () => { + const maximum = moment.duration(15, 'minutes').asMilliseconds(); + for (let attempt = 16; attempt < 100; attempt += 1) { + assert.strictEqual(exponentialBackoffSleepTime(attempt), maximum); + } + }); + }); + + describe('exponentialBackoffMaxAttempts', () => { + it('returns 2 attempts for a short period of time', () => { + assert.strictEqual(exponentialBackoffMaxAttempts(1), 2); + assert.strictEqual(exponentialBackoffMaxAttempts(99), 2); + }); + + it('returns 6 attempts for a 5 seconds', () => { + assert.strictEqual(exponentialBackoffMaxAttempts(5000), 6); + }); + + it('returns 110 attempts for 1 day', () => { + // This is a test case that is lifted from iOS's codebase. + const oneDay = moment.duration(24, 'hours').asMilliseconds(); + assert.strictEqual(exponentialBackoffMaxAttempts(oneDay), 110); + }); + }); +}); diff --git a/ts/test-electron/util/waitForOnline_test.ts b/ts/test-electron/util/waitForOnline_test.ts index c71ad3922c..be3dcf571f 100644 --- a/ts/test-electron/util/waitForOnline_test.ts +++ b/ts/test-electron/util/waitForOnline_test.ts @@ -7,6 +7,16 @@ import * as sinon from 'sinon'; import { waitForOnline } from '../../util/waitForOnline'; describe('waitForOnline', () => { + let sandbox: sinon.SinonSandbox; + + beforeEach(() => { + sandbox = sinon.createSandbox(); + }); + + afterEach(() => { + sandbox.restore(); + }); + function getFakeWindow(): EventTarget { const result = new EventTarget(); sinon.stub(result, 'addEventListener'); @@ -24,7 +34,7 @@ describe('waitForOnline', () => { sinon.assert.notCalled(fakeWindow.removeEventListener as sinon.SinonStub); }); - it("if you're offline, resolves as soon as you're online", async () => { + it("if you're offline, resolves as soon as you're online (and cleans up listeners)", async () => { const fakeNavigator = { onLine: false }; const fakeWindow = getFakeWindow(); @@ -48,4 +58,88 @@ describe('waitForOnline', () => { sinon.assert.calledOnce(fakeWindow.addEventListener as sinon.SinonStub); sinon.assert.calledOnce(fakeWindow.removeEventListener as sinon.SinonStub); }); + + it("resolves immediately if you're online when passed a timeout", async () => { + const fakeNavigator = { onLine: true }; + const fakeWindow = getFakeWindow(); + + await waitForOnline(fakeNavigator, fakeWindow, { timeout: 1234 }); + + sinon.assert.notCalled(fakeWindow.addEventListener as sinon.SinonStub); + sinon.assert.notCalled(fakeWindow.removeEventListener as sinon.SinonStub); + }); + + it("resolves immediately if you're online even if passed a timeout of 0", async () => { + const fakeNavigator = { onLine: true }; + const fakeWindow = getFakeWindow(); + + await waitForOnline(fakeNavigator, fakeWindow, { timeout: 0 }); + + sinon.assert.notCalled(fakeWindow.addEventListener as sinon.SinonStub); + sinon.assert.notCalled(fakeWindow.removeEventListener as sinon.SinonStub); + }); + + it("if you're offline, resolves as soon as you're online if it happens before the timeout", async () => { + const clock = sandbox.useFakeTimers(); + + const fakeNavigator = { onLine: false }; + const fakeWindow = getFakeWindow(); + + (fakeWindow.addEventListener as sinon.SinonStub) + .withArgs('online') + .callsFake((_eventName: string, callback: () => void) => { + setTimeout(callback, 1000); + }); + + let done = false; + (async () => { + await waitForOnline(fakeNavigator, fakeWindow, { timeout: 9999 }); + done = true; + })(); + + await clock.tickAsync(600); + assert.isFalse(done); + + await clock.tickAsync(500); + + assert.isTrue(done); + }); + + it('rejects if too much time has passed, and cleans up listeners', async () => { + const clock = sandbox.useFakeTimers(); + + const fakeNavigator = { onLine: false }; + const fakeWindow = getFakeWindow(); + + (fakeWindow.addEventListener as sinon.SinonStub) + .withArgs('online') + .callsFake((_eventName: string, callback: () => void) => { + setTimeout(callback, 9999); + }); + + const promise = waitForOnline(fakeNavigator, fakeWindow, { + timeout: 100, + }); + + await clock.tickAsync(500); + + await assert.isRejected(promise); + + sinon.assert.calledOnce(fakeWindow.removeEventListener as sinon.SinonStub); + }); + + it('rejects if offline and passed a timeout of 0', async () => { + const fakeNavigator = { onLine: false }; + const fakeWindow = getFakeWindow(); + + (fakeWindow.addEventListener as sinon.SinonStub) + .withArgs('online') + .callsFake((_eventName: string, callback: () => void) => { + setTimeout(callback, 9999); + }); + + const promise = waitForOnline(fakeNavigator, fakeWindow, { timeout: 0 }); + + await assert.isRejected(promise); + }); }); diff --git a/ts/test-node/jobs/JobQueue_test.ts b/ts/test-node/jobs/JobQueue_test.ts index 04aa76b69f..a7532f6437 100644 --- a/ts/test-node/jobs/JobQueue_test.ts +++ b/ts/test-node/jobs/JobQueue_test.ts @@ -204,6 +204,42 @@ describe('JobQueue', () => { assert.isEmpty(store.storedJobs); }); + it('passes the attempt number to the run function', async () => { + const attempts: Array = []; + + const store = new TestJobQueueStore(); + + class TestQueue extends JobQueue { + parseData(data: unknown): string { + return z.string().parse(data); + } + + async run( + _: unknown, + { attempt }: Readonly<{ attempt: number }> + ): Promise { + attempts.push(attempt); + throw new Error('this job always fails'); + } + } + + const queue = new TestQueue({ + store, + queueType: 'test', + maxAttempts: 6, + }); + + queue.streamJobs(); + + try { + await (await queue.add('foo')).completion; + } catch (err: unknown) { + // We expect this to fail. + } + + assert.deepStrictEqual(attempts, [1, 2, 3, 4, 5, 6]); + }); + it('makes job.completion reject if parseData throws', async () => { class TestQueue extends JobQueue { parseData(data: unknown): string { diff --git a/ts/util/exponentialBackoff.ts b/ts/util/exponentialBackoff.ts new file mode 100644 index 0000000000..3b781b6228 --- /dev/null +++ b/ts/util/exponentialBackoff.ts @@ -0,0 +1,45 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import * as moment from 'moment'; + +const BACKOFF_FACTOR = 1.9; +const MAX_BACKOFF = moment.duration(15, 'minutes').asMilliseconds(); + +/** + * For a given attempt, how long should we sleep (in milliseconds)? + * + * The attempt should be a positive integer, and it is 1-indexed. The first attempt is 1, + * the second is 2, and so on. + * + * This is modified from [iOS's codebase][0]. + * + * [0]: https://github.com/signalapp/Signal-iOS/blob/6069741602421744edfb59923d2fb3a66b1b23c1/SignalServiceKit/src/Util/OWSOperation.swift + */ +export function exponentialBackoffSleepTime(attempt: number): number { + const failureCount = attempt - 1; + if (failureCount === 0) { + return 0; + } + return Math.min(MAX_BACKOFF, 100 * BACKOFF_FACTOR ** failureCount); +} + +/** + * If I want to retry for X milliseconds, how many attempts is that, roughly? For example, + * 24 hours (86,400,000 milliseconds) is 111 attempts. + * + * `desiredDurationMs` should be at least 1. + */ +export function exponentialBackoffMaxAttempts( + desiredDurationMs: number +): number { + let attempts = 0; + let total = 0; + // There's probably some algebra we could do here instead of this loop, but this is + // fast even for giant numbers, and is typically called just once at startup. + do { + attempts += 1; + total += exponentialBackoffSleepTime(attempts); + } while (total < desiredDurationMs); + return attempts; +} diff --git a/ts/util/markConversationRead.ts b/ts/util/markConversationRead.ts index b5303c5315..04caf60db5 100644 --- a/ts/util/markConversationRead.ts +++ b/ts/util/markConversationRead.ts @@ -2,11 +2,9 @@ // SPDX-License-Identifier: AGPL-3.0-only import { ConversationAttributesType } from '../model-types.d'; -import { handleMessageSend } from './handleMessageSend'; -import { getSendOptions } from './getSendOptions'; import { sendReadReceiptsFor } from './sendReadReceiptsFor'; import { hasErrors } from '../state/selectors/message'; -import { isNotNil } from './isNotNil'; +import { readSyncJobQueue } from '../jobs/readSyncJobQueue'; export async function markConversationRead( conversationAttrs: ConversationAttributesType, @@ -105,26 +103,17 @@ export async function markConversationRead( ...unreadMessagesSyncData, ...Array.from(unreadReactionSyncData.values()), ]; - const messageIds = readSyncs.map(item => item.messageId).filter(isNotNil); if (readSyncs.length && options.sendReadReceipts) { window.log.info(`Sending ${readSyncs.length} read syncs`); // Because syncReadMessages sends to our other devices, and sendReadReceipts goes // to a contact, we need accessKeys for both. - const ourConversation = window.ConversationController.getOurConversationOrThrow(); - const sendOptions = await getSendOptions(ourConversation.attributes, { - syncMessage: true, - }); - if (window.ConversationController.areWePrimaryDevice()) { window.log.warn( 'markConversationRead: We are primary device; not sending read syncs' ); } else { - await handleMessageSend( - window.textsecure.messaging.syncReadMessages(readSyncs, sendOptions), - { messageIds, sendType: 'readSync' } - ); + readSyncJobQueue.add({ readSyncs }); } await sendReadReceiptsFor(conversationAttrs, unreadMessagesSyncData); diff --git a/ts/util/waitForOnline.ts b/ts/util/waitForOnline.ts index 3a32802f15..1e2ecfc636 100644 --- a/ts/util/waitForOnline.ts +++ b/ts/util/waitForOnline.ts @@ -3,19 +3,33 @@ export function waitForOnline( navigator: Readonly<{ onLine: boolean }>, - onlineEventTarget: EventTarget + onlineEventTarget: EventTarget, + options: Readonly<{ timeout?: number }> = {} ): Promise { - return new Promise(resolve => { + const { timeout } = options; + + return new Promise((resolve, reject) => { if (navigator.onLine) { resolve(); return; } const listener = () => { - onlineEventTarget.removeEventListener('online', listener); + cleanup(); resolve(); }; + const cleanup = () => { + onlineEventTarget.removeEventListener('online', listener); + }; + onlineEventTarget.addEventListener('online', listener); + + if (timeout !== undefined) { + setTimeout(() => { + cleanup(); + reject(new Error('waitForOnline: did not come online in time')); + }, timeout); + } }); }