diff --git a/ts/background.ts b/ts/background.ts index 5ebf8dc811..e3cce32187 100644 --- a/ts/background.ts +++ b/ts/background.ts @@ -2,7 +2,7 @@ // SPDX-License-Identifier: AGPL-3.0-only import { webFrame } from 'electron'; -import { isNumber, debounce } from 'lodash'; +import { isNumber, debounce, groupBy } from 'lodash'; import { bindActionCreators } from 'redux'; import { render } from 'react-dom'; import { batch as batchDispatch } from 'react-redux'; @@ -29,6 +29,7 @@ import * as Timers from './Timers'; import * as indexedDb from './indexeddb'; import type { MenuOptionsType } from './types/menu'; import type { Receipt } from './types/Receipt'; +import { ReceiptType } from './types/Receipt'; import { SocketStatus } from './types/SocketStatus'; import { DEFAULT_CONVERSATION_COLOR } from './types/Colors'; import { ThemeType } from './types/Util'; @@ -145,11 +146,13 @@ import { ToastCaptchaSolved } from './components/ToastCaptchaSolved'; import { showToast } from './util/showToast'; import { startInteractionMode } from './windows/startInteractionMode'; import type { MainWindowStatsType } from './windows/context'; -import { deliveryReceiptsJobQueue } from './jobs/deliveryReceiptsJobQueue'; import { ReactionSource } from './reactions/ReactionSource'; import { singleProtoJobQueue } from './jobs/singleProtoJobQueue'; import { getInitialState } from './state/getInitialState'; -import { conversationJobQueue } from './jobs/conversationJobQueue'; +import { + conversationJobQueue, + conversationQueueJobEnum, +} from './jobs/conversationJobQueue'; import { SeenStatus } from './MessageSeenStatus'; import MessageSender from './textsecure/SendMessage'; import type AccountManager from './textsecure/AccountManager'; @@ -482,7 +485,17 @@ export async function startApp(): Promise { wait: 500, maxSize: 100, processBatch: async deliveryReceipts => { - await deliveryReceiptsJobQueue.add({ deliveryReceipts }); + const groups = groupBy(deliveryReceipts, 'conversationId'); + await Promise.all( + Object.keys(groups).map(async conversationId => { + await conversationJobQueue.add({ + type: conversationQueueJobEnum.enum.Receipts, + conversationId, + receiptsType: ReceiptType.Delivery, + receipts: groups[conversationId], + }); + }) + ); }, }); diff --git a/ts/jobs/conversationJobQueue.ts b/ts/jobs/conversationJobQueue.ts index 2744b12f9a..864162a255 100644 --- a/ts/jobs/conversationJobQueue.ts +++ b/ts/jobs/conversationJobQueue.ts @@ -20,6 +20,7 @@ import { sendDeleteStoryForEveryone } from './helpers/sendDeleteStoryForEveryone import { sendProfileKey } from './helpers/sendProfileKey'; import { sendReaction } from './helpers/sendReaction'; import { sendStory } from './helpers/sendStory'; +import { sendReceipts } from './helpers/sendReceipts'; import type { LoggerType } from '../types/Logging'; import { ConversationVerificationState } from '../state/ducks/conversationsEnums'; @@ -37,6 +38,7 @@ import type { Job } from './Job'; import type { ParsedJob } from './types'; import type SendMessage from '../textsecure/SendMessage'; import type { UUIDStringType } from '../types/UUID'; +import { receiptSchema, ReceiptType } from '../types/Receipt'; // Note: generally, we only want to add to this list. If you do need to change one of // these values, you'll likely need to write a database migration. @@ -49,6 +51,7 @@ export const conversationQueueJobEnum = z.enum([ 'ProfileKey', 'Reaction', 'Story', + 'Receipts', ]); const deleteForEveryoneJobDataSchema = z.object({ @@ -139,6 +142,14 @@ const storyJobDataSchema = z.object({ }); export type StoryJobData = z.infer; +const receiptsJobDataSchema = z.object({ + type: z.literal(conversationQueueJobEnum.enum.Receipts), + conversationId: z.string(), + receiptsType: z.nativeEnum(ReceiptType), + receipts: receiptSchema.array(), +}); +export type ReceiptsJobData = z.infer; + export const conversationQueueJobDataSchema = z.union([ deleteForEveryoneJobDataSchema, deleteStoryForEveryoneJobDataSchema, @@ -148,6 +159,7 @@ export const conversationQueueJobDataSchema = z.union([ profileKeyJobDataSchema, reactionJobDataSchema, storyJobDataSchema, + receiptsJobDataSchema, ]); export type ConversationQueueJobData = z.infer< typeof conversationQueueJobDataSchema @@ -384,6 +396,9 @@ export class ConversationJobQueue extends JobQueue { case jobSet.Story: await sendStory(conversation, jobBundle, data); break; + case jobSet.Receipts: + await sendReceipts(conversation, jobBundle, data); + break; default: { // Note: This should never happen, because the zod call in parseData wouldn't // accept data that doesn't look like our type specification. diff --git a/ts/jobs/deliveryReceiptsJobQueue.ts b/ts/jobs/deliveryReceiptsJobQueue.ts deleted file mode 100644 index 7f32160f80..0000000000 --- a/ts/jobs/deliveryReceiptsJobQueue.ts +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2021 Signal Messenger, LLC -// SPDX-License-Identifier: AGPL-3.0-only - -import { z } from 'zod'; -import type { LoggerType } from '../types/Logging'; -import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff'; -import { receiptSchema, ReceiptType } from '../types/Receipt'; -import { MAX_RETRY_TIME, runReceiptJob } from './helpers/receiptHelpers'; - -import { JobQueue } from './JobQueue'; -import { jobQueueDatabaseStore } from './JobQueueDatabaseStore'; - -const deliveryReceiptsJobDataSchema = z.object({ - deliveryReceipts: receiptSchema.array(), -}); - -type DeliveryReceiptsJobData = z.infer; - -export class DeliveryReceiptsJobQueue extends JobQueue { - protected parseData(data: unknown): DeliveryReceiptsJobData { - return deliveryReceiptsJobDataSchema.parse(data); - } - - protected async run( - { - data, - timestamp, - }: Readonly<{ data: DeliveryReceiptsJobData; timestamp: number }>, - { attempt, log }: Readonly<{ attempt: number; log: LoggerType }> - ): Promise { - await runReceiptJob({ - attempt, - log, - timestamp, - receipts: data.deliveryReceipts, - type: ReceiptType.Delivery, - }); - } -} - -export const deliveryReceiptsJobQueue = new DeliveryReceiptsJobQueue({ - store: jobQueueDatabaseStore, - queueType: 'delivery receipts', - maxAttempts: exponentialBackoffMaxAttempts(MAX_RETRY_TIME), -}); diff --git a/ts/jobs/helpers/receiptHelpers.ts b/ts/jobs/helpers/receiptHelpers.ts deleted file mode 100644 index 76e55d951c..0000000000 --- a/ts/jobs/helpers/receiptHelpers.ts +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2021 Signal Messenger, LLC -// SPDX-License-Identifier: AGPL-3.0-only - -import * as durations from '../../util/durations'; -import type { LoggerType } from '../../types/Logging'; -import type { Receipt, ReceiptType } from '../../types/Receipt'; -import { sendReceipts } from '../../util/sendReceipts'; -import { commonShouldJobContinue } from './commonShouldJobContinue'; -import { handleCommonJobRequestError } from './handleCommonJobRequestError'; - -export const MAX_RETRY_TIME = durations.DAY; - -export async function runReceiptJob({ - attempt, - log, - timestamp, - receipts, - type, -}: Readonly<{ - attempt: number; - log: LoggerType; - receipts: ReadonlyArray; - timestamp: number; - type: ReceiptType; -}>): Promise { - const timeRemaining = timestamp + MAX_RETRY_TIME - Date.now(); - - const shouldContinue = await commonShouldJobContinue({ - attempt, - log, - timeRemaining, - skipWait: false, - }); - if (!shouldContinue) { - return; - } - - try { - await sendReceipts({ log, receipts, type }); - } catch (err: unknown) { - await handleCommonJobRequestError({ err, log, timeRemaining }); - } -} diff --git a/ts/jobs/helpers/sendReceipts.ts b/ts/jobs/helpers/sendReceipts.ts new file mode 100644 index 0000000000..7562d0e60e --- /dev/null +++ b/ts/jobs/helpers/sendReceipts.ts @@ -0,0 +1,21 @@ +// Copyright 2023 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import type { ConversationModel } from '../../models/conversations'; +import { sendReceipts as sendReceiptsTask } from '../../util/sendReceipts'; +import type { + ConversationQueueJobBundle, + ReceiptsJobData, +} from '../conversationJobQueue'; + +export async function sendReceipts( + _conversation: ConversationModel, + { log }: ConversationQueueJobBundle, + data: ReceiptsJobData +): Promise { + await sendReceiptsTask({ + log, + receipts: data.receipts, + type: data.receiptsType, + }); +} diff --git a/ts/jobs/initializeAllJobQueues.ts b/ts/jobs/initializeAllJobQueues.ts index 451aab3ad8..b3b81ab8b0 100644 --- a/ts/jobs/initializeAllJobQueues.ts +++ b/ts/jobs/initializeAllJobQueues.ts @@ -5,15 +5,12 @@ import type { WebAPIType } from '../textsecure/WebAPI'; import { drop } from '../util/drop'; import { conversationJobQueue } from './conversationJobQueue'; -import { deliveryReceiptsJobQueue } from './deliveryReceiptsJobQueue'; -import { readReceiptsJobQueue } from './readReceiptsJobQueue'; import { readSyncJobQueue } from './readSyncJobQueue'; import { removeStorageKeyJobQueue } from './removeStorageKeyJobQueue'; import { reportSpamJobQueue } from './reportSpamJobQueue'; import { singleProtoJobQueue } from './singleProtoJobQueue'; import { viewOnceOpenJobQueue } from './viewOnceOpenJobQueue'; import { viewSyncJobQueue } from './viewSyncJobQueue'; -import { viewedReceiptsJobQueue } from './viewedReceiptsJobQueue'; /** * Start all of the job queues. Should be called when the database is ready. @@ -31,11 +28,6 @@ export function initializeAllJobQueues({ // Single proto send queue, used for a variety of one-off simple messages drop(singleProtoJobQueue.streamJobs()); - // Syncs to others - drop(deliveryReceiptsJobQueue.streamJobs()); - drop(readReceiptsJobQueue.streamJobs()); - drop(viewedReceiptsJobQueue.streamJobs()); - // Syncs to ourselves drop(readSyncJobQueue.streamJobs()); drop(viewSyncJobQueue.streamJobs()); diff --git a/ts/jobs/readReceiptsJobQueue.ts b/ts/jobs/readReceiptsJobQueue.ts deleted file mode 100644 index ebda879018..0000000000 --- a/ts/jobs/readReceiptsJobQueue.ts +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright 2021 Signal Messenger, LLC -// SPDX-License-Identifier: AGPL-3.0-only - -import { z } from 'zod'; -import type { LoggerType } from '../types/Logging'; -import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff'; -import type { StorageInterface } from '../types/Storage.d'; -import type { Receipt } from '../types/Receipt'; -import { receiptSchema, ReceiptType } from '../types/Receipt'; -import { MAX_RETRY_TIME, runReceiptJob } from './helpers/receiptHelpers'; - -import { JobQueue } from './JobQueue'; -import { jobQueueDatabaseStore } from './JobQueueDatabaseStore'; - -const readReceiptsJobDataSchema = z.object({ - readReceipts: receiptSchema.array(), -}); - -type ReadReceiptsJobData = z.infer; - -export class ReadReceiptsJobQueue extends JobQueue { - public async addIfAllowedByUser( - storage: Pick, - readReceipts: Array - ): Promise { - if (storage.get('read-receipt-setting')) { - await this.add({ readReceipts }); - } - } - - protected parseData(data: unknown): ReadReceiptsJobData { - return readReceiptsJobDataSchema.parse(data); - } - - protected async run( - { - data, - timestamp, - }: Readonly<{ data: ReadReceiptsJobData; timestamp: number }>, - { attempt, log }: Readonly<{ attempt: number; log: LoggerType }> - ): Promise { - await runReceiptJob({ - attempt, - log, - timestamp, - receipts: data.readReceipts, - type: ReceiptType.Read, - }); - } -} - -export const readReceiptsJobQueue = new ReadReceiptsJobQueue({ - store: jobQueueDatabaseStore, - queueType: 'read receipts', - maxAttempts: exponentialBackoffMaxAttempts(MAX_RETRY_TIME), -}); diff --git a/ts/jobs/viewedReceiptsJobQueue.ts b/ts/jobs/viewedReceiptsJobQueue.ts deleted file mode 100644 index 8b0ccbc9d9..0000000000 --- a/ts/jobs/viewedReceiptsJobQueue.ts +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2021 Signal Messenger, LLC -// SPDX-License-Identifier: AGPL-3.0-only - -import { z } from 'zod'; -import type { LoggerType } from '../types/Logging'; -import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff'; -import { receiptSchema, ReceiptType } from '../types/Receipt'; -import { MAX_RETRY_TIME, runReceiptJob } from './helpers/receiptHelpers'; - -import { JobQueue } from './JobQueue'; -import { jobQueueDatabaseStore } from './JobQueueDatabaseStore'; - -const viewedReceiptsJobDataSchema = z.object({ viewedReceipt: receiptSchema }); - -type ViewedReceiptsJobData = z.infer; - -export class ViewedReceiptsJobQueue extends JobQueue { - protected parseData(data: unknown): ViewedReceiptsJobData { - return viewedReceiptsJobDataSchema.parse(data); - } - - protected async run( - { - data, - timestamp, - }: Readonly<{ data: ViewedReceiptsJobData; timestamp: number }>, - { attempt, log }: Readonly<{ attempt: number; log: LoggerType }> - ): Promise { - await runReceiptJob({ - attempt, - log, - timestamp, - receipts: [data.viewedReceipt], - type: ReceiptType.Viewed, - }); - } -} - -export const viewedReceiptsJobQueue = new ViewedReceiptsJobQueue({ - store: jobQueueDatabaseStore, - queueType: 'viewed receipts', - maxAttempts: exponentialBackoffMaxAttempts(MAX_RETRY_TIME), -}); diff --git a/ts/models/conversations.ts b/ts/models/conversations.ts index 7aadcde715..410915343b 100644 --- a/ts/models/conversations.ts +++ b/ts/models/conversations.ts @@ -137,7 +137,6 @@ import { conversationJobQueue, conversationQueueJobEnum, } from '../jobs/conversationJobQueue'; -import { readReceiptsJobQueue } from '../jobs/readReceiptsJobQueue'; import type { ReactionModel } from '../messageModifiers/Reactions'; import { isAnnouncementGroupReady } from '../util/isAnnouncementGroupReady'; import { getProfile } from '../util/getProfile'; @@ -160,6 +159,7 @@ import { isMemberRequestingToJoin } from '../util/isMemberRequestingToJoin'; import { removePendingMember } from '../util/removePendingMember'; import { isMemberPending } from '../util/isMemberPending'; import { imageToBlurHash } from '../util/imageToBlurHash'; +import { ReceiptType } from '../types/Receipt'; const EMPTY_ARRAY: Readonly<[]> = []; const EMPTY_GROUP_COLLISIONS: GroupNameCollisionsWithIdsByTitle = {}; @@ -2182,17 +2182,22 @@ export class ConversationModel extends window.Backbone const readMessages = messages.filter(m => !hasErrors(m) && isIncoming(m)); if (isLocalAction) { + const conversationId = this.get('id'); + // eslint-disable-next-line no-await-in-loop - await readReceiptsJobQueue.addIfAllowedByUser( - window.storage, - readMessages.map(m => ({ + await conversationJobQueue.add({ + type: conversationQueueJobEnum.enum.Receipts, + conversationId: this.get('id'), + receiptsType: ReceiptType.Read, + receipts: readMessages.map(m => ({ messageId: m.id, + conversationId, senderE164: m.source, senderUuid: m.sourceUuid, timestamp: m.sent_at, isDirectConversation: isDirectConversation(this.attributes), - })) - ); + })), + }); } // eslint-disable-next-line no-await-in-loop diff --git a/ts/models/messages.ts b/ts/models/messages.ts index 98000633cc..d9afec3ff0 100644 --- a/ts/models/messages.ts +++ b/ts/models/messages.ts @@ -2488,6 +2488,7 @@ export class MessageModel extends window.Backbone.Model { window.Whisper.deliveryReceiptQueue.add(() => { window.Whisper.deliveryReceiptBatcher.add({ messageId, + conversationId, senderE164: source, senderUuid: sourceUuid, timestamp: this.get('sent_at'), diff --git a/ts/sql/migrations/78-merge-receipt-jobs.ts b/ts/sql/migrations/78-merge-receipt-jobs.ts new file mode 100644 index 0000000000..5210f9f054 --- /dev/null +++ b/ts/sql/migrations/78-merge-receipt-jobs.ts @@ -0,0 +1,132 @@ +// Copyright 2023 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import type { Database } from '@signalapp/better-sqlite3'; + +import type { LoggerType } from '../../types/Logging'; +import { isRecord } from '../../util/isRecord'; +import { + getJobsInQueueSync, + getMessageByIdSync, + insertJobSync, +} from '../Server'; + +export default function updateToSchemaVersion78( + currentVersion: number, + db: Database, + logger: LoggerType +): void { + if (currentVersion >= 78) { + return; + } + + db.transaction(() => { + const deleteJobsInQueue = db.prepare( + 'DELETE FROM jobs WHERE queueType = $queueType' + ); + + const queues = [ + { + queueType: 'delivery receipts', + jobDataKey: 'deliveryReceipts', + jobDataIsArray: true, + newReceiptsType: 'deliveryReceipt', + }, + { + queueType: 'read receipts', + jobDataKey: 'readReceipts', + jobDataIsArray: true, + newReceiptsType: 'readReceipt', + }, + { + queueType: 'viewed receipts', + jobDataKey: 'viewedReceipt', + jobDataIsArray: false, + newReceiptsType: 'viewedReceipt', + }, + ]; + + for (const queue of queues) { + const prevJobs = getJobsInQueueSync(db, queue.queueType); + deleteJobsInQueue.run({ queueType: queue.queueType }); + + prevJobs.forEach(job => { + const { data, id } = job; + if (!isRecord(data)) { + logger.warn( + `updateToSchemaVersion78: ${queue.queueType} queue job ${id} was missing valid data` + ); + return; + } + + const { messageId } = data; + if (typeof messageId !== 'string') { + logger.warn( + `updateToSchemaVersion78: ${queue.queueType} queue job ${id} had a non-string messageId` + ); + return; + } + + const message = getMessageByIdSync(db, messageId); + if (!message) { + logger.warn( + `updateToSchemaVersion78: Unable to find message for ${queue.queueType} job ${id}` + ); + return; + } + + const { conversationId } = message; + if (typeof conversationId !== 'string') { + logger.warn( + `updateToSchemaVersion78: ${queue.queueType} queue job ${id} had a non-string conversationId` + ); + return; + } + + const oldReceipts = queue.jobDataIsArray + ? data[queue.jobDataKey] + : [data[queue.jobDataKey]]; + + if (!Array.isArray(oldReceipts)) { + logger.warn( + `updateToSchemaVersion78: ${queue.queueType} queue job ${id} had a non-array ${queue.jobDataKey}` + ); + return; + } + + const newReceipts = []; + + for (const receipt of oldReceipts) { + if (!isRecord(receipt)) { + logger.warn( + `updateToSchemaVersion78: ${queue.queueType} queue job ${id} had a non-record receipt` + ); + continue; + } + + newReceipts.push({ + ...receipt, + conversationId, + }); + } + + const newJob = { + ...job, + queueType: 'conversation', + data: { + type: 'Receipts', + conversationId, + receiptsType: queue.newReceiptsType, + receipts: newReceipts, + }, + }; + + insertJobSync(db, newJob); + }); + } + + db.pragma('user_version = 78'); + })(); + + logger.info('updateToSchemaVersion78: success!'); +} diff --git a/ts/sql/migrations/index.ts b/ts/sql/migrations/index.ts index a45c7eb9ff..bc9e0989b1 100644 --- a/ts/sql/migrations/index.ts +++ b/ts/sql/migrations/index.ts @@ -53,6 +53,7 @@ import updateToSchemaVersion74 from './74-optimize-convo-open'; import updateToSchemaVersion75 from './75-noop'; import updateToSchemaVersion76 from './76-optimize-convo-open-2'; import updateToSchemaVersion77 from './77-signal-tokenizer'; +import updateToSchemaVersion78 from './78-merge-receipt-jobs'; function updateToSchemaVersion1( currentVersion: number, @@ -1975,6 +1976,7 @@ export const SCHEMA_VERSIONS = [ updateToSchemaVersion75, updateToSchemaVersion76, updateToSchemaVersion77, + updateToSchemaVersion78, ]; export function updateSchema(db: Database, logger: LoggerType): void { diff --git a/ts/state/ducks/conversations.ts b/ts/state/ducks/conversations.ts index 3f85019642..66b865b057 100644 --- a/ts/state/ducks/conversations.ts +++ b/ts/state/ducks/conversations.ts @@ -103,7 +103,6 @@ import { isGroupV2, } from '../../util/whatTypeOfConversation'; import { missingCaseError } from '../../util/missingCaseError'; -import { viewedReceiptsJobQueue } from '../../jobs/viewedReceiptsJobQueue'; import { viewSyncJobQueue } from '../../jobs/viewSyncJobQueue'; import { ReadStatus } from '../../messages/MessageReadStatus'; import { isIncoming, isOutgoing } from '../selectors/message'; @@ -147,6 +146,7 @@ import { setQuoteByMessageId, resetComposer, } from './composer'; +import { ReceiptType } from '../../types/Receipt'; // State @@ -1675,17 +1675,24 @@ export const markViewed = (messageId: string): void => { if (isIncoming(message.attributes)) { const convoAttributes = message.getConversation()?.attributes; + const conversationId = message.get('conversationId'); drop( - viewedReceiptsJobQueue.add({ - viewedReceipt: { - messageId, - senderE164, - senderUuid, - timestamp, - isDirectConversation: convoAttributes - ? isDirectConversation(convoAttributes) - : true, - }, + conversationJobQueue.add({ + type: conversationQueueJobEnum.enum.Receipts, + conversationId, + receiptsType: ReceiptType.Viewed, + receipts: [ + { + messageId, + conversationId, + senderE164, + senderUuid, + timestamp, + isDirectConversation: convoAttributes + ? isDirectConversation(convoAttributes) + : true, + }, + ], }) ); } diff --git a/ts/state/ducks/stories.ts b/ts/state/ducks/stories.ts index db759eec75..c00ecfd6b1 100644 --- a/ts/state/ducks/stories.ts +++ b/ts/state/ducks/stories.ts @@ -55,11 +55,15 @@ import type { BoundActionCreatorsMapObject } from '../../hooks/useBoundActions'; import { useBoundActions } from '../../hooks/useBoundActions'; import { verifyStoryListMembers as doVerifyStoryListMembers } from '../../util/verifyStoryListMembers'; import { viewSyncJobQueue } from '../../jobs/viewSyncJobQueue'; -import { viewedReceiptsJobQueue } from '../../jobs/viewedReceiptsJobQueue'; import { getOwn } from '../../util/getOwn'; import { SHOW_TOAST } from './toast'; import { ToastType } from '../../types/Toast'; import type { ShowToastActionType } from './toast'; +import { + conversationJobQueue, + conversationQueueJobEnum, +} from '../../jobs/conversationJobQueue'; +import { ReceiptType } from '../../types/Receipt'; export type StoryDataType = ReadonlyDeep< { @@ -399,8 +403,11 @@ function markStoryRead( ourUuid: window.textsecure.storage.user.getCheckedUuid().toString(), }); + const conversationId = message.get('conversationId'); + const viewedReceipt = { messageId, + conversationId, senderE164: message.attributes.source, senderUuid: message.attributes.sourceUuid, timestamp: message.attributes.sent_at, @@ -413,7 +420,14 @@ function markStoryRead( } if (window.Events.getStoryViewReceiptsEnabled()) { - drop(viewedReceiptsJobQueue.add({ viewedReceipt })); + drop( + conversationJobQueue.add({ + type: conversationQueueJobEnum.enum.Receipts, + conversationId, + receiptsType: ReceiptType.Viewed, + receipts: [viewedReceipt], + }) + ); } await dataInterface.addNewStoryRead({ diff --git a/ts/test-mock/playwright.ts b/ts/test-mock/playwright.ts index c76b149da2..cfd999d435 100644 --- a/ts/test-mock/playwright.ts +++ b/ts/test-mock/playwright.ts @@ -8,6 +8,7 @@ import type { IPCRequest as ChallengeRequestType, IPCResponse as ChallengeResponseType, } from '../challenge'; +import type { ReceiptType } from '../types/Receipt'; export type AppLoadedInfoType = Readonly<{ loadTime: number; @@ -23,6 +24,11 @@ export type ConversationOpenInfoType = Readonly<{ delta: number; }>; +export type ReceiptsInfoType = Readonly<{ + type: ReceiptType; + timestamps: Array; +}>; + export type StorageServiceInfoType = Readonly<{ manifestVersion: number; }>; @@ -70,6 +76,10 @@ export class App { return this.waitForEvent('challenge'); } + public async waitForReceipts(): Promise { + return this.waitForEvent('receipts'); + } + public async waitForStorageService(): Promise { return this.waitForEvent('storageServiceComplete'); } diff --git a/ts/test-mock/rate-limit/viewed_test.ts b/ts/test-mock/rate-limit/viewed_test.ts new file mode 100644 index 0000000000..b74753a90c --- /dev/null +++ b/ts/test-mock/rate-limit/viewed_test.ts @@ -0,0 +1,128 @@ +// Copyright 2023 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { assert } from 'chai'; +import type { PrimaryDevice } from '@signalapp/mock-server'; +import { StorageState, UUIDKind } from '@signalapp/mock-server'; +import createDebug from 'debug'; +import * as durations from '../../util/durations'; +import { Bootstrap } from '../bootstrap'; +import type { App } from '../bootstrap'; +import { ReceiptType } from '../../types/Receipt'; + +export const debug = createDebug('mock:test:challenge:receipts'); + +describe('challenge/receipts', function challengeReceiptsTest() { + this.timeout(durations.MINUTE * 100); + + let bootstrap: Bootstrap; + let app: App; + let contact: PrimaryDevice; + + beforeEach(async () => { + bootstrap = new Bootstrap({ + contactCount: 0, + contactsWithoutProfileKey: 40, + }); + await bootstrap.init(); + app = await bootstrap.link(); + + const { server, desktop, phone } = bootstrap; + + contact = await server.createPrimaryDevice({ + profileName: 'Jamie', + }); + + let state = StorageState.getEmpty(); + + state = state.updateAccount({ + profileKey: phone.profileKey.serialize(), + e164: phone.device.number, + givenName: phone.profileName, + readReceipts: true, + }); + + state = state.addContact( + contact, + { + whitelisted: true, + serviceE164: contact.device.number, + identityKey: contact.getPublicKey(UUIDKind.PNI).serialize(), + pni: contact.device.getUUIDByKind(UUIDKind.PNI), + givenName: 'Jamie', + }, + UUIDKind.PNI + ); + + // Just to make PNI Contact visible in the left pane + state = state.pin(contact, UUIDKind.PNI); + + const ourKey = await desktop.popSingleUseKey(); + await contact.addSingleUseKey(desktop, ourKey); + + await phone.setStorageState(state); + }); + + afterEach(async function after() { + if (this.currentTest?.state !== 'passed') { + await bootstrap.saveLogs(app); + } + + await app.close(); + await bootstrap.teardown(); + }); + + it('should wait for the challenge to be handled', async () => { + const { server, desktop } = bootstrap; + + debug( + `Rate limiting (desktop: ${desktop.uuid}) -> (contact: ${contact.device.uuid})` + ); + server.rateLimit({ source: desktop.uuid, target: contact.device.uuid }); + + const timestamp = bootstrap.getTimestamp(); + + debug('Sending a message from contact'); + await contact.sendText(desktop, 'Hello there!', { + timestamp, + }); + + const window = await app.getWindow(); + const leftPane = window.locator('.left-pane-wrapper'); + const conversationStack = window.locator('.conversation-stack'); + + debug(`Opening conversation with contact (${contact.toContact().uuid})`); + await leftPane + .locator(`[data-testid="${contact.toContact().uuid}"]`) + .click(); + + debug('Accept conversation from contact'); + await conversationStack + .locator('.module-message-request-actions button >> "Accept"') + .click(); + + debug('Waiting for challenge'); + const request = await app.waitForChallenge(); + + debug('Solving challenge'); + await app.solveChallenge({ + seq: request.seq, + data: { captcha: 'anything' }, + }); + + const requests = server.stopRateLimiting({ + source: desktop.uuid, + target: contact.device.uuid, + }); + + debug(`rate limited requests: ${requests}`); + assert.strictEqual(requests, 1); + + debug('Waiting for receipts'); + const receipts = await app.waitForReceipts(); + + assert.strictEqual(receipts.type, ReceiptType.Read); + assert.strictEqual(receipts.timestamps.length, 1); + assert.strictEqual(receipts.timestamps[0], timestamp); + }); +}); diff --git a/ts/test-node/sql_migrations_test.ts b/ts/test-node/sql_migrations_test.ts index 1a283d5ffc..c3bb931bdf 100644 --- a/ts/test-node/sql_migrations_test.ts +++ b/ts/test-node/sql_migrations_test.ts @@ -2494,4 +2494,596 @@ describe('SQL migrations test', () => { ); }); }); + + describe('updateToSchemaVersion78', () => { + it('moves receipt jobs over to conversation queue', () => { + updateToVersion(77); + + const MESSAGE_ID_1 = generateGuid(); + const CONVERSATION_ID_1 = generateGuid(); + + db.exec( + ` + INSERT INTO messages + (id, json) + VALUES ('${MESSAGE_ID_1}', '${JSON.stringify({ + conversationId: CONVERSATION_ID_1, + })}') + ` + ); + + insertJobSync(db, { + id: 'id-1', + timestamp: 1, + queueType: 'random job', + data: {}, + }); + insertJobSync(db, { + id: 'id-2', + timestamp: 2, + queueType: 'delivery receipts', + data: { + messageId: MESSAGE_ID_1, + deliveryReceipts: [], + }, + }); + insertJobSync(db, { + id: 'id-3', + timestamp: 3, + queueType: 'read receipts', + data: { + messageId: MESSAGE_ID_1, + readReceipts: [], + }, + }); + insertJobSync(db, { + id: 'id-4', + timestamp: 4, + queueType: 'viewed receipts', + data: { + messageId: MESSAGE_ID_1, + viewedReceipt: {}, + }, + }); + insertJobSync(db, { + id: 'id-5', + timestamp: 5, + queueType: 'conversation', + data: {}, + }); + + const totalJobs = db.prepare('SELECT COUNT(*) FROM jobs;').pluck(); + const conversationJobs = db + .prepare("SELECT COUNT(*) FROM jobs WHERE queueType = 'conversation';") + .pluck(); + const deliveryJobs = db + .prepare( + "SELECT COUNT(*) FROM jobs WHERE queueType = 'delivery receipts';" + ) + .pluck(); + const readJobs = db + .prepare("SELECT COUNT(*) FROM jobs WHERE queueType = 'read receipts';") + .pluck(); + const viewedJobs = db + .prepare( + "SELECT COUNT(*) FROM jobs WHERE queueType = 'viewed receipts';" + ) + .pluck(); + + assert.strictEqual(totalJobs.get(), 5, 'before total'); + assert.strictEqual(conversationJobs.get(), 1, 'before conversation'); + assert.strictEqual(deliveryJobs.get(), 1, 'before delivery'); + assert.strictEqual(readJobs.get(), 1, 'before read'); + assert.strictEqual(viewedJobs.get(), 1, 'before viewed'); + + updateToVersion(78); + + assert.strictEqual(totalJobs.get(), 5, 'after total'); + assert.strictEqual(conversationJobs.get(), 4, 'after conversation'); + assert.strictEqual(deliveryJobs.get(), 0, 'after delivery'); + assert.strictEqual(readJobs.get(), 0, 'after read'); + assert.strictEqual(viewedJobs.get(), 0, 'after viewed'); + }); + + it('updates delivery jobs with their conversationId', () => { + updateToVersion(77); + + const MESSAGE_ID_1 = generateGuid(); + const MESSAGE_ID_2 = generateGuid(); + const MESSAGE_ID_3 = generateGuid(); + + const CONVERSATION_ID_1 = generateGuid(); + const CONVERSATION_ID_2 = generateGuid(); + + insertJobSync(db, { + id: 'id-1', + timestamp: 1, + queueType: 'delivery receipts', + data: { + messageId: MESSAGE_ID_1, + deliveryReceipts: [ + { + messageId: MESSAGE_ID_1, + timestamp: 1, + }, + ], + }, + }); + insertJobSync(db, { + id: 'id-2', + timestamp: 2, + queueType: 'delivery receipts', + data: { + messageId: MESSAGE_ID_2, + deliveryReceipts: [ + { + messageId: MESSAGE_ID_1, + timestamp: 2, + }, + ], + }, + }); + insertJobSync(db, { + id: 'id-3-missing-data', + timestamp: 3, + queueType: 'delivery receipts', + }); + insertJobSync(db, { + id: 'id-4-non-string-messageId', + timestamp: 4, + queueType: 'delivery receipts', + data: { + messageId: 4, + deliveryReceipts: [ + { + messageId: MESSAGE_ID_1, + timestamp: 4, + }, + ], + }, + }); + insertJobSync(db, { + id: 'id-5-missing-message', + timestamp: 5, + queueType: 'delivery receipts', + data: { + messageId: 'missing', + deliveryReceipts: [ + { + messageId: MESSAGE_ID_1, + timestamp: 5, + }, + ], + }, + }); + insertJobSync(db, { + id: 'id-6-missing-conversation', + timestamp: 6, + queueType: 'delivery receipts', + data: { + messageId: MESSAGE_ID_3, + deliveryReceipts: [ + { + messageId: MESSAGE_ID_1, + timestamp: 6, + }, + ], + }, + }); + insertJobSync(db, { + id: 'id-7-missing-delivery-receipts', + timestamp: 7, + queueType: 'delivery receipts', + data: { + messageId: MESSAGE_ID_3, + }, + }); + + const messageJson1 = JSON.stringify({ + conversationId: CONVERSATION_ID_1, + }); + const messageJson2 = JSON.stringify({ + conversationId: CONVERSATION_ID_2, + }); + db.exec( + ` + INSERT INTO messages + (id, conversationId, json) + VALUES + ('${MESSAGE_ID_1}', '${CONVERSATION_ID_1}', '${messageJson1}'), + ('${MESSAGE_ID_2}', '${CONVERSATION_ID_2}', '${messageJson2}'), + ('${MESSAGE_ID_3}', null, '{}'); + ` + ); + + const totalJobs = db.prepare('SELECT COUNT(*) FROM jobs;').pluck(); + const conversationJobs = db + .prepare("SELECT COUNT(*) FROM jobs WHERE queueType = 'conversation';") + .pluck(); + const deliveryJobs = db + .prepare( + "SELECT COUNT(*) FROM jobs WHERE queueType = 'delivery receipts';" + ) + .pluck(); + + assert.strictEqual(totalJobs.get(), 7, 'total jobs before'); + assert.strictEqual(conversationJobs.get(), 0, 'conversation jobs before'); + assert.strictEqual(deliveryJobs.get(), 7, 'delivery jobs before'); + + updateToVersion(78); + + assert.strictEqual(totalJobs.get(), 2, 'total jobs after'); + assert.strictEqual(conversationJobs.get(), 2, 'conversation jobs after'); + assert.strictEqual(deliveryJobs.get(), 0, 'delivery jobs after'); + + const jobs = getJobsInQueueSync(db, 'conversation'); + + assert.deepEqual(jobs, [ + { + id: 'id-1', + timestamp: 1, + queueType: 'conversation', + data: { + type: 'Receipts', + conversationId: CONVERSATION_ID_1, + receiptsType: 'deliveryReceipt', + receipts: [ + { + messageId: MESSAGE_ID_1, + conversationId: CONVERSATION_ID_1, + timestamp: 1, + }, + ], + }, + }, + { + id: 'id-2', + timestamp: 2, + queueType: 'conversation', + data: { + type: 'Receipts', + conversationId: CONVERSATION_ID_2, + receiptsType: 'deliveryReceipt', + receipts: [ + { + messageId: MESSAGE_ID_1, + conversationId: CONVERSATION_ID_2, + timestamp: 2, + }, + ], + }, + }, + ]); + }); + + it('updates read jobs with their conversationId', () => { + updateToVersion(77); + + const MESSAGE_ID_1 = generateGuid(); + const MESSAGE_ID_2 = generateGuid(); + const MESSAGE_ID_3 = generateGuid(); + + const CONVERSATION_ID_1 = generateGuid(); + const CONVERSATION_ID_2 = generateGuid(); + + insertJobSync(db, { + id: 'id-1', + timestamp: 1, + queueType: 'read receipts', + data: { + messageId: MESSAGE_ID_1, + readReceipts: [ + { + messageId: MESSAGE_ID_1, + timestamp: 1, + }, + ], + }, + }); + insertJobSync(db, { + id: 'id-2', + timestamp: 2, + queueType: 'read receipts', + data: { + messageId: MESSAGE_ID_2, + readReceipts: [ + { + messageId: MESSAGE_ID_1, + timestamp: 2, + }, + ], + }, + }); + insertJobSync(db, { + id: 'id-3-missing-data', + timestamp: 3, + queueType: 'read receipts', + }); + insertJobSync(db, { + id: 'id-4-non-string-messageId', + timestamp: 4, + queueType: 'read receipts', + data: { + messageId: 4, + readReceipts: [ + { + messageId: MESSAGE_ID_1, + timestamp: 4, + }, + ], + }, + }); + insertJobSync(db, { + id: 'id-5-missing-message', + timestamp: 5, + queueType: 'read receipts', + data: { + messageId: 'missing', + readReceipts: [ + { + messageId: MESSAGE_ID_1, + timestamp: 5, + }, + ], + }, + }); + insertJobSync(db, { + id: 'id-6-missing-conversation', + timestamp: 6, + queueType: 'read receipts', + data: { + messageId: MESSAGE_ID_3, + readReceipts: [ + { + messageId: MESSAGE_ID_1, + timestamp: 6, + }, + ], + }, + }); + insertJobSync(db, { + id: 'id-7-missing-read-receipts', + timestamp: 7, + queueType: 'read receipts', + data: { + messageId: MESSAGE_ID_3, + }, + }); + + const messageJson1 = JSON.stringify({ + conversationId: CONVERSATION_ID_1, + }); + const messageJson2 = JSON.stringify({ + conversationId: CONVERSATION_ID_2, + }); + db.exec( + ` + INSERT INTO messages + (id, conversationId, json) + VALUES + ('${MESSAGE_ID_1}', '${CONVERSATION_ID_1}', '${messageJson1}'), + ('${MESSAGE_ID_2}', '${CONVERSATION_ID_2}', '${messageJson2}'), + ('${MESSAGE_ID_3}', null, '{}'); + ` + ); + + const totalJobs = db.prepare('SELECT COUNT(*) FROM jobs;').pluck(); + const conversationJobs = db + .prepare("SELECT COUNT(*) FROM jobs WHERE queueType = 'conversation';") + .pluck(); + const readJobs = db + .prepare("SELECT COUNT(*) FROM jobs WHERE queueType = 'read receipts';") + .pluck(); + + assert.strictEqual(totalJobs.get(), 7, 'total jobs before'); + assert.strictEqual(conversationJobs.get(), 0, 'conversation jobs before'); + assert.strictEqual(readJobs.get(), 7, 'delivery jobs before'); + + updateToVersion(78); + + assert.strictEqual(totalJobs.get(), 2, 'total jobs after'); + assert.strictEqual(conversationJobs.get(), 2, 'conversation jobs after'); + assert.strictEqual(readJobs.get(), 0, 'read jobs after'); + + const jobs = getJobsInQueueSync(db, 'conversation'); + + assert.deepEqual(jobs, [ + { + id: 'id-1', + timestamp: 1, + queueType: 'conversation', + data: { + type: 'Receipts', + conversationId: CONVERSATION_ID_1, + receiptsType: 'readReceipt', + receipts: [ + { + messageId: MESSAGE_ID_1, + conversationId: CONVERSATION_ID_1, + timestamp: 1, + }, + ], + }, + }, + { + id: 'id-2', + timestamp: 2, + queueType: 'conversation', + data: { + type: 'Receipts', + conversationId: CONVERSATION_ID_2, + receiptsType: 'readReceipt', + receipts: [ + { + messageId: MESSAGE_ID_1, + conversationId: CONVERSATION_ID_2, + timestamp: 2, + }, + ], + }, + }, + ]); + }); + + it('updates viewed jobs with their conversationId', () => { + updateToVersion(77); + + const MESSAGE_ID_1 = generateGuid(); + const MESSAGE_ID_2 = generateGuid(); + const MESSAGE_ID_3 = generateGuid(); + + const CONVERSATION_ID_1 = generateGuid(); + const CONVERSATION_ID_2 = generateGuid(); + + insertJobSync(db, { + id: 'id-1', + timestamp: 1, + queueType: 'viewed receipts', + data: { + messageId: MESSAGE_ID_1, + viewedReceipt: { + messageId: MESSAGE_ID_1, + timestamp: 1, + }, + }, + }); + insertJobSync(db, { + id: 'id-2', + timestamp: 2, + queueType: 'viewed receipts', + data: { + messageId: MESSAGE_ID_2, + viewedReceipt: { + messageId: MESSAGE_ID_1, + timestamp: 2, + }, + }, + }); + insertJobSync(db, { + id: 'id-3-missing-data', + timestamp: 3, + queueType: 'viewed receipts', + }); + insertJobSync(db, { + id: 'id-4-non-string-messageId', + timestamp: 4, + queueType: 'viewed receipts', + data: { + messageId: 4, + viewedReceipt: { + messageId: MESSAGE_ID_1, + timestamp: 4, + }, + }, + }); + insertJobSync(db, { + id: 'id-5-missing-message', + timestamp: 5, + queueType: 'viewed receipts', + data: { + messageId: 'missing', + viewedReceipt: { + messageId: MESSAGE_ID_1, + timestamp: 5, + }, + }, + }); + insertJobSync(db, { + id: 'id-6-missing-conversation', + timestamp: 6, + queueType: 'viewed receipts', + data: { + messageId: MESSAGE_ID_3, + viewedReceipt: { + messageId: MESSAGE_ID_1, + timestamp: 6, + }, + }, + }); + insertJobSync(db, { + id: 'id-7-missing-viewed-receipt', + timestamp: 7, + queueType: 'viewed receipts', + data: { + messageId: MESSAGE_ID_3, + }, + }); + + const messageJson1 = JSON.stringify({ + conversationId: CONVERSATION_ID_1, + }); + const messageJson2 = JSON.stringify({ + conversationId: CONVERSATION_ID_2, + }); + db.exec( + ` + INSERT INTO messages + (id, conversationId, json) + VALUES + ('${MESSAGE_ID_1}', '${CONVERSATION_ID_1}', '${messageJson1}'), + ('${MESSAGE_ID_2}', '${CONVERSATION_ID_2}', '${messageJson2}'), + ('${MESSAGE_ID_3}', null, '{}'); + ` + ); + + const totalJobs = db.prepare('SELECT COUNT(*) FROM jobs;').pluck(); + const conversationJobs = db + .prepare("SELECT COUNT(*) FROM jobs WHERE queueType = 'conversation';") + .pluck(); + const viewedJobs = db + .prepare( + "SELECT COUNT(*) FROM jobs WHERE queueType = 'viewed receipts';" + ) + .pluck(); + + assert.strictEqual(totalJobs.get(), 7, 'total jobs before'); + assert.strictEqual(conversationJobs.get(), 0, 'conversation jobs before'); + assert.strictEqual(viewedJobs.get(), 7, 'delivery jobs before'); + + updateToVersion(78); + + assert.strictEqual(totalJobs.get(), 2, 'total jobs after'); + assert.strictEqual(conversationJobs.get(), 2, 'conversation jobs after'); + assert.strictEqual(viewedJobs.get(), 0, 'viewed jobs after'); + + const jobs = getJobsInQueueSync(db, 'conversation'); + + assert.deepEqual(jobs, [ + { + id: 'id-1', + timestamp: 1, + queueType: 'conversation', + data: { + type: 'Receipts', + conversationId: CONVERSATION_ID_1, + receiptsType: 'viewedReceipt', + receipts: [ + { + messageId: MESSAGE_ID_1, + conversationId: CONVERSATION_ID_1, + timestamp: 1, + }, + ], + }, + }, + { + id: 'id-2', + timestamp: 2, + queueType: 'conversation', + data: { + type: 'Receipts', + conversationId: CONVERSATION_ID_2, + receiptsType: 'viewedReceipt', + receipts: [ + { + messageId: MESSAGE_ID_1, + conversationId: CONVERSATION_ID_2, + timestamp: 2, + }, + ], + }, + }, + ]); + }); + }); }); diff --git a/ts/types/Receipt.ts b/ts/types/Receipt.ts index efd97a2acf..f08a19f713 100644 --- a/ts/types/Receipt.ts +++ b/ts/types/Receipt.ts @@ -5,6 +5,7 @@ import { z } from 'zod'; export const receiptSchema = z.object({ messageId: z.string(), + conversationId: z.string(), senderE164: z.string().optional(), senderUuid: z.string().optional(), timestamp: z.number(), diff --git a/ts/util/markConversationRead.ts b/ts/util/markConversationRead.ts index 27d1c243a5..54a3bea9a2 100644 --- a/ts/util/markConversationRead.ts +++ b/ts/util/markConversationRead.ts @@ -5,7 +5,6 @@ import { omit } from 'lodash'; import type { ConversationAttributesType } from '../model-types.d'; import { hasErrors } from '../state/selectors/message'; -import { readReceiptsJobQueue } from '../jobs/readReceiptsJobQueue'; import { readSyncJobQueue } from '../jobs/readSyncJobQueue'; import { notificationService } from '../services/notifications'; import { expiringMessagesDeletionService } from '../services/expiringMessagesDeletion'; @@ -16,6 +15,11 @@ import { getConversationIdForLogging } from './idForLogging'; import { drop } from './drop'; import { isConversationAccepted } from './isConversationAccepted'; import { ReadStatus } from '../messages/MessageReadStatus'; +import { + conversationJobQueue, + conversationQueueJobEnum, +} from '../jobs/conversationJobQueue'; +import { ReceiptType } from '../types/Receipt'; export async function markConversationRead( conversationAttrs: ConversationAttributesType, @@ -88,6 +92,7 @@ export async function markConversationRead( return { messageId: messageSyncData.id, + conversationId: conversationAttrs.id, originalReadStatus: messageSyncData.originalReadStatus, senderE164: messageSyncData.source, senderUuid: messageSyncData.sourceUuid, @@ -138,10 +143,12 @@ export async function markConversationRead( } if (isConversationAccepted(conversationAttrs)) { - await readReceiptsJobQueue.addIfAllowedByUser( - window.storage, - allReadMessagesSync - ); + await conversationJobQueue.add({ + type: conversationQueueJobEnum.enum.Receipts, + conversationId, + receiptsType: ReceiptType.Read, + receipts: allReadMessagesSync, + }); } } diff --git a/ts/util/sendReceipts.ts b/ts/util/sendReceipts.ts index 4c51d4946b..9b6a444cff 100644 --- a/ts/util/sendReceipts.ts +++ b/ts/util/sendReceipts.ts @@ -137,6 +137,11 @@ export async function sendReceipts({ }), { messageIds, sendType: type } ); + + window.SignalCI?.handleEvent('receipts', { + type, + timestamps, + }); }) ); })