diff --git a/ts/jobs/JobQueue.ts b/ts/jobs/JobQueue.ts index f9d2df67f8..c18d1e0852 100644 --- a/ts/jobs/JobQueue.ts +++ b/ts/jobs/JobQueue.ts @@ -1,6 +1,7 @@ // Copyright 2021 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only +import PQueue from 'p-queue'; import { v4 as uuid } from 'uuid'; import { noop } from 'lodash'; @@ -60,6 +61,8 @@ export abstract class JobQueue { } >(); + private readonly defaultInMemoryQueue = new PQueue(); + private started = false; constructor(options: Readonly) { @@ -172,6 +175,10 @@ export abstract class JobQueue { return new Job(id, timestamp, this.queueType, data, completion); } + protected getInMemoryQueue(_parsedJob: ParsedJob): PQueue { + return this.defaultInMemoryQueue; + } + private async enqueueStoredJob(storedJob: Readonly) { assert( storedJob.queueType === this.queueType, @@ -205,38 +212,46 @@ export abstract class JobQueue { data: parsedData, }; + const queue: PQueue = this.getInMemoryQueue(parsedJob); + const logger = new JobLogger(parsedJob, this.logger); - let result: + const result: | undefined | { success: true } - | { success: false; err: unknown }; + | { success: false; err: unknown } = await queue.add(async () => { + for (let attempt = 1; attempt <= this.maxAttempts; attempt += 1) { + const isFinalAttempt = attempt === this.maxAttempts; - for (let attempt = 1; attempt <= this.maxAttempts; attempt += 1) { - logger.attempt = attempt; + logger.attempt = attempt; - log.info( - `${this.logPrefix} running job ${storedJob.id}, attempt ${attempt} of ${this.maxAttempts}` - ); - try { - // We want an `await` in the loop, as we don't want a single job running more - // than once at a time. Ideally, the job will succeed on the first attempt. - // eslint-disable-next-line no-await-in-loop - await this.run(parsedJob, { attempt, log: logger }); - result = { success: true }; log.info( - `${this.logPrefix} job ${storedJob.id} succeeded on attempt ${attempt}` - ); - break; - } catch (err: unknown) { - result = { success: false, err }; - log.error( - `${this.logPrefix} job ${ - storedJob.id - } failed on attempt ${attempt}. ${Errors.toLogFormat(err)}` + `${this.logPrefix} running job ${storedJob.id}, attempt ${attempt} of ${this.maxAttempts}` ); + try { + // We want an `await` in the loop, as we don't want a single job running more + // than once at a time. Ideally, the job will succeed on the first attempt. + // eslint-disable-next-line no-await-in-loop + await this.run(parsedJob, { attempt, log: logger }); + log.info( + `${this.logPrefix} job ${storedJob.id} succeeded on attempt ${attempt}` + ); + return { success: true }; + } catch (err: unknown) { + log.error( + `${this.logPrefix} job ${ + storedJob.id + } failed on attempt ${attempt}. ${Errors.toLogFormat(err)}` + ); + if (isFinalAttempt) { + return { success: false, err }; + } + } } - } + + // This should never happen. See the assertion below. + return undefined; + }); await this.store.delete(storedJob.id); diff --git a/ts/jobs/normalMessageSendJobQueue.ts b/ts/jobs/normalMessageSendJobQueue.ts index 372a0da63d..1e5931e7f1 100644 --- a/ts/jobs/normalMessageSendJobQueue.ts +++ b/ts/jobs/normalMessageSendJobQueue.ts @@ -95,25 +95,25 @@ export class NormalMessageSendJobQueue extends JobQueue): PQueue { + const { conversationId } = data; + + const existingQueue = this.queues.get(conversationId); if (existingQueue) { return existingQueue; } const newQueue = new PQueue({ concurrency: 1 }); newQueue.once('idle', () => { - this.queues.delete(queueKey); + this.queues.delete(conversationId); }); - this.queues.set(queueKey, newQueue); + this.queues.set(conversationId, newQueue); return newQueue; } - private enqueue(queueKey: string, fn: () => Promise): Promise { - return this.getQueue(queueKey).add(fn); - } - protected async run( { data, @@ -121,248 +121,244 @@ export class NormalMessageSendJobQueue extends JobQueue, { attempt, log }: Readonly<{ attempt: number; log: LoggerType }> ): Promise { - const { messageId, conversationId } = data; + const { messageId } = data; - await this.enqueue(conversationId, async () => { - const timeRemaining = timestamp + MAX_RETRY_TIME - Date.now(); - const isFinalAttempt = attempt >= MAX_ATTEMPTS; + const timeRemaining = timestamp + MAX_RETRY_TIME - Date.now(); + const isFinalAttempt = attempt >= MAX_ATTEMPTS; - // We don't immediately use this value because we may want to mark the message - // failed before doing so. - const shouldContinue = await commonShouldJobContinue({ - attempt, - log, - timeRemaining, + // We don't immediately use this value because we may want to mark the message + // failed before doing so. + const shouldContinue = await commonShouldJobContinue({ + attempt, + log, + timeRemaining, + }); + + await window.ConversationController.loadPromise(); + + const message = await getMessageById(messageId); + if (!message) { + log.info( + `message ${messageId} was not found, maybe because it was deleted. Giving up on sending it` + ); + return; + } + + if (!isOutgoing(message.attributes)) { + log.error( + `message ${messageId} was not an outgoing message to begin with. This is probably a bogus job. Giving up on sending it` + ); + return; + } + + if (message.isErased() || message.get('deletedForEveryone')) { + log.info(`message ${messageId} was erased. Giving up on sending it`); + return; + } + + let messageSendErrors: Array = []; + + // We don't want to save errors on messages unless we're giving up. If it's our + // final attempt, we know upfront that we want to give up. However, we might also + // want to give up if (1) we get a 508 from the server, asking us to please stop + // (2) we get a 428 from the server, flagging the message for spam (3) some other + // reason not known at the time of this writing. + // + // This awkward callback lets us hold onto errors we might want to save, so we can + // decide whether to save them later on. + const saveErrors = isFinalAttempt + ? undefined + : (errors: Array) => { + messageSendErrors = errors; + }; + + if (!shouldContinue) { + log.info(`message ${messageId} ran out of time. Giving up on sending it`); + await markMessageFailed(message, messageSendErrors); + return; + } + + try { + const conversation = message.getConversation(); + if (!conversation) { + throw new Error( + `could not find conversation for message with ID ${messageId}` + ); + } + + const { + allRecipientIdentifiers, + recipientIdentifiersWithoutMe, + untrustedConversationIds, + } = getMessageRecipients({ + message, + conversation, }); - await window.ConversationController.loadPromise(); - - const message = await getMessageById(messageId); - if (!message) { + if (untrustedConversationIds.length) { log.info( - `message ${messageId} was not found, maybe because it was deleted. Giving up on sending it` + `message ${messageId} sending blocked because ${untrustedConversationIds.length} conversation(s) were untrusted. Giving up on the job, but it may be reborn later` + ); + window.reduxActions.conversations.messageStoppedByMissingVerification( + messageId, + untrustedConversationIds ); return; } - if (!isOutgoing(message.attributes)) { - log.error( - `message ${messageId} was not an outgoing message to begin with. This is probably a bogus job. Giving up on sending it` + if (!allRecipientIdentifiers.length) { + log.warn( + `trying to send message ${messageId} but it looks like it was already sent to everyone. This is unexpected, but we're giving up` ); return; } - if (message.isErased() || message.get('deletedForEveryone')) { - log.info(`message ${messageId} was erased. Giving up on sending it`); - return; - } + const { + attachments, + body, + deletedForEveryoneTimestamp, + expireTimer, + mentions, + messageTimestamp, + preview, + profileKey, + quote, + sticker, + } = await getMessageSendData({ conversation, message }); - let messageSendErrors: Array = []; + let messageSendPromise: Promise; - // We don't want to save errors on messages unless we're giving up. If it's our - // final attempt, we know upfront that we want to give up. However, we might also - // want to give up if (1) we get a 508 from the server, asking us to please stop - // (2) we get a 428 from the server, flagging the message for spam (3) some other - // reason not known at the time of this writing. - // - // This awkward callback lets us hold onto errors we might want to save, so we can - // decide whether to save them later on. - const saveErrors = isFinalAttempt - ? undefined - : (errors: Array) => { - messageSendErrors = errors; - }; - - if (!shouldContinue) { - log.info( - `message ${messageId} ran out of time. Giving up on sending it` - ); - await markMessageFailed(message, messageSendErrors); - return; - } - - try { - const conversation = message.getConversation(); - if (!conversation) { - throw new Error( - `could not find conversation for message with ID ${messageId}` - ); - } - - const { - allRecipientIdentifiers, - recipientIdentifiersWithoutMe, - untrustedConversationIds, - } = getMessageRecipients({ - message, - conversation, - }); - - if (untrustedConversationIds.length) { - log.info( - `message ${messageId} sending blocked because ${untrustedConversationIds.length} conversation(s) were untrusted. Giving up on the job, but it may be reborn later` - ); - window.reduxActions.conversations.messageStoppedByMissingVerification( - messageId, - untrustedConversationIds - ); - return; - } - - if (!allRecipientIdentifiers.length) { - log.warn( - `trying to send message ${messageId} but it looks like it was already sent to everyone. This is unexpected, but we're giving up` - ); - return; - } - - const { + if (recipientIdentifiersWithoutMe.length === 0) { + log.info('sending sync message only'); + const dataMessage = await window.textsecure.messaging.getDataMessage({ attachments, body, deletedForEveryoneTimestamp, expireTimer, - mentions, - messageTimestamp, preview, profileKey, quote, + recipients: allRecipientIdentifiers, sticker, - } = await getMessageSendData({ conversation, message }); - - let messageSendPromise: Promise; - - if (recipientIdentifiersWithoutMe.length === 0) { - log.info('sending sync message only'); - const dataMessage = await window.textsecure.messaging.getDataMessage({ - attachments, - body, - deletedForEveryoneTimestamp, - expireTimer, - preview, - profileKey, - quote, - recipients: allRecipientIdentifiers, - sticker, - timestamp: messageTimestamp, - }); - messageSendPromise = message.sendSyncMessageOnly( - dataMessage, - saveErrors - ); - } else { - const conversationType = conversation.get('type'); - const sendOptions = await getSendOptions(conversation.attributes); - const { ContentHint } = Proto.UnidentifiedSenderMessage.Message; - - let innerPromise: Promise; - if (conversationType === Message.GROUP) { - log.info('sending group message'); - innerPromise = window.Signal.Util.sendToGroup({ - groupSendOptions: { - attachments, - deletedForEveryoneTimestamp, - expireTimer, - groupV1: updateRecipients( - conversation.getGroupV1Info(), - recipientIdentifiersWithoutMe - ), - groupV2: updateRecipients( - conversation.getGroupV2Info(), - recipientIdentifiersWithoutMe - ), - messageText: body, - preview, - profileKey, - quote, - sticker, - timestamp: messageTimestamp, - mentions, - }, - conversation, - contentHint: ContentHint.RESENDABLE, - messageId, - sendOptions, - sendType: 'message', - }); - } else { - log.info('sending direct message'); - innerPromise = window.textsecure.messaging.sendMessageToIdentifier({ - identifier: recipientIdentifiersWithoutMe[0], - messageText: body, - attachments, - quote, - preview, - sticker, - reaction: null, - deletedForEveryoneTimestamp, - timestamp: messageTimestamp, - expireTimer, - contentHint: ContentHint.RESENDABLE, - groupId: undefined, - profileKey, - options: sendOptions, - }); - } - - messageSendPromise = message.send( - handleMessageSend(innerPromise, { - messageIds: [messageId], - sendType: 'message', - }), - saveErrors - ); - } - - await messageSendPromise; - - if ( - getLastChallengeError({ - errors: messageSendErrors, - }) - ) { - log.info( - `message ${messageId} hit a spam challenge. Not retrying any more` - ); - await message.saveErrors(messageSendErrors); - return; - } - - const didFullySend = - !messageSendErrors.length || didSendToEveryone(message); - if (!didFullySend) { - throw new Error('message did not fully send'); - } - } catch (err: unknown) { - const serverAskedUsToStop: boolean = messageSendErrors.some( - (messageSendError: unknown) => - messageSendError instanceof Error && - parseIntWithFallback(messageSendError.code, -1) === 508 + timestamp: messageTimestamp, + }); + messageSendPromise = message.sendSyncMessageOnly( + dataMessage, + saveErrors ); + } else { + const conversationType = conversation.get('type'); + const sendOptions = await getSendOptions(conversation.attributes); + const { ContentHint } = Proto.UnidentifiedSenderMessage.Message; - if (isFinalAttempt || serverAskedUsToStop) { - await markMessageFailed(message, messageSendErrors); - } - - if (serverAskedUsToStop) { - log.info('server responded with 508. Giving up on this job'); - return; - } - - if (!isFinalAttempt) { - const maybe413Error: undefined | Error = messageSendErrors.find( - (messageSendError: unknown) => - messageSendError instanceof Error && messageSendError.code === 413 - ); - await sleepFor413RetryAfterTimeIfApplicable({ - err: maybe413Error, - log, - timeRemaining, + let innerPromise: Promise; + if (conversationType === Message.GROUP) { + log.info('sending group message'); + innerPromise = window.Signal.Util.sendToGroup({ + groupSendOptions: { + attachments, + deletedForEveryoneTimestamp, + expireTimer, + groupV1: updateRecipients( + conversation.getGroupV1Info(), + recipientIdentifiersWithoutMe + ), + groupV2: updateRecipients( + conversation.getGroupV2Info(), + recipientIdentifiersWithoutMe + ), + messageText: body, + preview, + profileKey, + quote, + sticker, + timestamp: messageTimestamp, + mentions, + }, + conversation, + contentHint: ContentHint.RESENDABLE, + messageId, + sendOptions, + sendType: 'message', + }); + } else { + log.info('sending direct message'); + innerPromise = window.textsecure.messaging.sendMessageToIdentifier({ + identifier: recipientIdentifiersWithoutMe[0], + messageText: body, + attachments, + quote, + preview, + sticker, + reaction: null, + deletedForEveryoneTimestamp, + timestamp: messageTimestamp, + expireTimer, + contentHint: ContentHint.RESENDABLE, + groupId: undefined, + profileKey, + options: sendOptions, }); } - throw err; + messageSendPromise = message.send( + handleMessageSend(innerPromise, { + messageIds: [messageId], + sendType: 'message', + }), + saveErrors + ); } - }); + + await messageSendPromise; + + if ( + getLastChallengeError({ + errors: messageSendErrors, + }) + ) { + log.info( + `message ${messageId} hit a spam challenge. Not retrying any more` + ); + await message.saveErrors(messageSendErrors); + return; + } + + const didFullySend = + !messageSendErrors.length || didSendToEveryone(message); + if (!didFullySend) { + throw new Error('message did not fully send'); + } + } catch (err: unknown) { + const serverAskedUsToStop: boolean = messageSendErrors.some( + (messageSendError: unknown) => + messageSendError instanceof Error && + parseIntWithFallback(messageSendError.code, -1) === 508 + ); + + if (isFinalAttempt || serverAskedUsToStop) { + await markMessageFailed(message, messageSendErrors); + } + + if (serverAskedUsToStop) { + log.info('server responded with 508. Giving up on this job'); + return; + } + + if (!isFinalAttempt) { + const maybe413Error: undefined | Error = messageSendErrors.find( + (messageSendError: unknown) => + messageSendError instanceof Error && messageSendError.code === 413 + ); + await sleepFor413RetryAfterTimeIfApplicable({ + err: maybe413Error, + log, + timeRemaining, + }); + } + + throw err; + } } } diff --git a/ts/test-node/jobs/JobQueue_test.ts b/ts/test-node/jobs/JobQueue_test.ts index 8b6569fed1..fc8081e077 100644 --- a/ts/test-node/jobs/JobQueue_test.ts +++ b/ts/test-node/jobs/JobQueue_test.ts @@ -8,6 +8,7 @@ import EventEmitter, { once } from 'events'; import { z } from 'zod'; import { noop, groupBy } from 'lodash'; import { v4 as uuid } from 'uuid'; +import PQueue from 'p-queue'; import { JobError } from '../../jobs/JobError'; import { TestJobQueueStore } from './TestJobQueueStore'; import { missingCaseError } from '../../util/missingCaseError'; @@ -67,6 +68,98 @@ describe('JobQueue', () => { assert.isEmpty(store.storedJobs); }); + it('by default, kicks off multiple jobs in parallel', async () => { + let activeJobCount = 0; + const eventBus = new EventEmitter(); + const updateActiveJobCount = (incrementBy: number): void => { + activeJobCount += incrementBy; + eventBus.emit('updated'); + }; + + class Queue extends JobQueue { + parseData(data: unknown): number { + return z.number().parse(data); + } + + async run(): Promise { + try { + updateActiveJobCount(1); + await new Promise(resolve => { + eventBus.on('updated', () => { + if (activeJobCount === 4) { + eventBus.emit('got to 4'); + resolve(); + } + }); + }); + } finally { + updateActiveJobCount(-1); + } + } + } + + const store = new TestJobQueueStore(); + + const queue = new Queue({ + store, + queueType: 'test queue', + maxAttempts: 100, + }); + queue.streamJobs(); + + queue.add(1); + queue.add(2); + queue.add(3); + queue.add(4); + + await once(eventBus, 'got to 4'); + }); + + it('can override the in-memory queue', async () => { + let jobsAdded = 0; + const testQueue = new PQueue(); + testQueue.on('add', () => { + jobsAdded += 1; + }); + + class Queue extends JobQueue { + parseData(data: unknown): number { + return z.number().parse(data); + } + + protected getInMemoryQueue(parsedJob: ParsedJob): PQueue { + assert( + new Set([1, 2, 3, 4]).has(parsedJob.data), + 'Bad data passed to `getInMemoryQueue`' + ); + return testQueue; + } + + run(): Promise { + return Promise.resolve(); + } + } + + const store = new TestJobQueueStore(); + + const queue = new Queue({ + store, + queueType: 'test queue', + maxAttempts: 100, + }); + queue.streamJobs(); + + const jobs = await Promise.all([ + queue.add(1), + queue.add(2), + queue.add(3), + queue.add(4), + ]); + await Promise.all(jobs.map(job => job.completion)); + + assert.strictEqual(jobsAdded, 4); + }); + it('writes jobs to the database correctly', async () => { const store = new TestJobQueueStore();