diff --git a/ts/jobs/helpers/sendDeleteForEveryone.ts b/ts/jobs/helpers/sendDeleteForEveryone.ts index b3ca6c3603..be0ea4302d 100644 --- a/ts/jobs/helpers/sendDeleteForEveryone.ts +++ b/ts/jobs/helpers/sendDeleteForEveryone.ts @@ -92,7 +92,7 @@ export async function sendDeleteForEveryone( await conversation.queueJob( 'conversationQueue/sendDeleteForEveryone', - async () => { + async abortSignal => { log.info( `Sending deleteForEveryone to conversation ${logId}`, `with timestamp ${timestamp}`, @@ -209,6 +209,7 @@ export async function sendDeleteForEveryone( messageIds, send: async () => window.Signal.Util.sendToGroup({ + abortSignal, contentHint, groupSendOptions: { groupV1: conversation.getGroupV1Info(recipients), diff --git a/ts/jobs/helpers/sendGroupUpdate.ts b/ts/jobs/helpers/sendGroupUpdate.ts index 982324e670..cbe6744b22 100644 --- a/ts/jobs/helpers/sendGroupUpdate.ts +++ b/ts/jobs/helpers/sendGroupUpdate.ts @@ -90,27 +90,30 @@ export async function sendGroupUpdate( }; try { - await conversation.queueJob('conversationQueue/sendGroupUpdate', async () => - wrapWithSyncMessageSend({ - conversation, - logId, - messageIds: [], - send: async () => - window.Signal.Util.sendToGroup({ - groupSendOptions: { - groupV2, - timestamp, - profileKey, - }, - contentHint, - messageId: undefined, - sendOptions, - sendTarget: conversation.toSenderKeyTarget(), - sendType, - }), - sendType, - timestamp, - }) + await conversation.queueJob( + 'conversationQueue/sendGroupUpdate', + async abortSignal => + wrapWithSyncMessageSend({ + conversation, + logId, + messageIds: [], + send: async () => + window.Signal.Util.sendToGroup({ + abortSignal, + groupSendOptions: { + groupV2, + timestamp, + profileKey, + }, + contentHint, + messageId: undefined, + sendOptions, + sendTarget: conversation.toSenderKeyTarget(), + sendType, + }), + sendType, + timestamp, + }) ); } catch (error: unknown) { await handleMultipleSendErrors({ diff --git a/ts/jobs/helpers/sendNormalMessage.ts b/ts/jobs/helpers/sendNormalMessage.ts index 6e2ed0beab..a74990849c 100644 --- a/ts/jobs/helpers/sendNormalMessage.ts +++ b/ts/jobs/helpers/sendNormalMessage.ts @@ -199,8 +199,9 @@ export async function sendNormalMessage( log.info('sending group message'); innerPromise = conversation.queueJob( 'conversationQueue/sendNormalMessage', - () => + abortSignal => window.Signal.Util.sendToGroup({ + abortSignal, contentHint: ContentHint.RESENDABLE, groupSendOptions: { attachments, diff --git a/ts/jobs/helpers/sendReaction.ts b/ts/jobs/helpers/sendReaction.ts index 70337b939a..fea23561f1 100644 --- a/ts/jobs/helpers/sendReaction.ts +++ b/ts/jobs/helpers/sendReaction.ts @@ -242,7 +242,7 @@ export async function sendReaction( log.info('sending group reaction message'); promise = conversation.queueJob( 'conversationQueue/sendReaction', - () => { + abortSignal => { // Note: this will happen for all old jobs queued before 5.32.x if (isGroupV2(conversation.attributes) && !isNumber(revision)) { log.error('No revision provided, but conversation is GroupV2'); @@ -256,6 +256,7 @@ export async function sendReaction( } return window.Signal.Util.sendToGroup({ + abortSignal, contentHint: ContentHint.RESENDABLE, groupSendOptions: { groupV1: conversation.getGroupV1Info( diff --git a/ts/models/conversations.ts b/ts/models/conversations.ts index 75c0378fb6..ab8a3e540d 100644 --- a/ts/models/conversations.ts +++ b/ts/models/conversations.ts @@ -3440,7 +3440,10 @@ export class ConversationModel extends window.Backbone return null; } - queueJob(name: string, callback: () => Promise): Promise { + queueJob( + name: string, + callback: (abortSignal: AbortSignal) => Promise + ): Promise { this.jobQueue = this.jobQueue || new window.PQueue({ concurrency: 1 }); const taskWithTimeout = createTaskWithTimeout( @@ -3448,6 +3451,9 @@ export class ConversationModel extends window.Backbone `conversation ${this.idForLogging()}` ); + const abortController = new AbortController(); + const { signal: abortSignal } = abortController; + const queuedAt = Date.now(); return this.jobQueue.add(async () => { const startedAt = Date.now(); @@ -3458,7 +3464,10 @@ export class ConversationModel extends window.Backbone } try { - return await taskWithTimeout(); + return await taskWithTimeout(abortSignal); + } catch (error) { + abortController.abort(); + throw error; } finally { const duration = Date.now() - startedAt; diff --git a/ts/util/sendToGroup.ts b/ts/util/sendToGroup.ts index e0ae4f9723..bd66540e32 100644 --- a/ts/util/sendToGroup.ts +++ b/ts/util/sendToGroup.ts @@ -89,6 +89,7 @@ export type SenderKeyTargetType = { }; export async function sendToGroup({ + abortSignal, contentHint, groupSendOptions, isPartialSend, @@ -97,6 +98,7 @@ export async function sendToGroup({ sendTarget, sendType, }: { + abortSignal?: AbortSignal; contentHint: number; groupSendOptions: GroupSendOptionsType; isPartialSend?: boolean; @@ -120,6 +122,12 @@ export async function sendToGroup({ protoAttributes ); + // Attachment upload might take too long to succeed - we don't want to proceed + // with the send if the caller aborted this call. + if (abortSignal?.aborted) { + throw new Error('sendToGroup was aborted'); + } + return sendContentMessageToGroup({ contentHint, contentMessage,