diff --git a/ts/background.preload.ts b/ts/background.preload.ts index 69109b074b..938e2fa794 100644 --- a/ts/background.preload.ts +++ b/ts/background.preload.ts @@ -285,6 +285,7 @@ import { itemStorage } from './textsecure/Storage.preload.js'; import { isPinnedMessagesReceiveEnabled } from './util/isPinnedMessagesEnabled.dom.js'; import { initMessageCleanup } from './services/messageStateCleanup.dom.js'; import { MessageCache } from './services/MessageCache.preload.js'; +import { saveAndNotify } from './messages/saveAndNotify.preload.js'; const { isNumber, throttle } = lodash; @@ -2706,7 +2707,15 @@ export async function startApp(): Promise { } // Don't wait for handleDataMessage, as it has its own per-conversation queueing - drop(handleDataMessage(message, data.message, event.confirm)); + drop( + handleDataMessage( + message, + data.message, + event.confirm, + {}, + { saveAndNotify } + ) + ); } async function onProfileKey({ @@ -3261,9 +3270,15 @@ export async function startApp(): Promise { // Don't wait for handleDataMessage, as it has its own per-conversation queueing drop( - handleDataMessage(message, data.message, event.confirm, { - data, - }) + handleDataMessage( + message, + data.message, + event.confirm, + { + data, + }, + { saveAndNotify } + ) ); } diff --git a/ts/messages/handleDataMessage.preload.ts b/ts/messages/handleDataMessage.preload.ts index 8cb2a09f76..74e10d3063 100644 --- a/ts/messages/handleDataMessage.preload.ts +++ b/ts/messages/handleDataMessage.preload.ts @@ -16,7 +16,6 @@ import { deliveryReceiptQueue, deliveryReceiptBatcher, } from '../util/deliveryReceipt.preload.js'; -import { getSenderIdentifier } from '../util/getSenderIdentifier.dom.js'; import { upgradeMessageSchema } from '../util/migrations.preload.js'; import { getOwn } from '../util/getOwn.std.js'; import { @@ -68,8 +67,8 @@ import { modifyTargetMessage, ModifyTargetMessageResult, } from '../util/modifyTargetMessage.preload.js'; -import { saveAndNotify } from './saveAndNotify.preload.js'; -import { MessageModel } from '../models/messages.preload.js'; +import type { saveAndNotify } from './saveAndNotify.preload.js'; +import type { MessageModel } from '../models/messages.preload.js'; import { safeParsePartial } from '../util/schemas.std.js'; import { PollCreateSchema } from '../types/Polls.dom.js'; @@ -94,7 +93,10 @@ export async function handleDataMessage( message: MessageModel, initialMessage: ProcessedDataMessage, confirm: () => void, - options: { data?: SentEventData } = {} + options: { data?: SentEventData } = {}, + dependencies: { + saveAndNotify: typeof saveAndNotify; + } ): Promise { const { data } = options; @@ -119,21 +121,12 @@ export async function handleDataMessage( await conversation.queueJob(idLog, async () => { log.info(`${idLog}: starting processing in queue`); - // First, check for duplicates. If we find one, stop processing here. - const senderIdentifier = getSenderIdentifier(message.attributes); - const inMemoryMessage = window.MessageCache.findBySender(senderIdentifier); - if (inMemoryMessage) { - log.info(`${idLog}: cache hit`, senderIdentifier); - } else { - log.info(`${idLog}: duplicate check db lookup needed`, senderIdentifier); - } - let existingMessage = inMemoryMessage; - if (!existingMessage) { - const fromDb = await DataReader.getMessageBySender(message.attributes); - existingMessage = fromDb - ? window.MessageCache.register(new MessageModel(fromDb)) - : undefined; - } + // First, check for duplicate messages. We dedupe by senderAci + timestamp, in the + // same way we address messages in the Signal ecosystem. + const existingMessage = await window.MessageCache.findBySentAt( + message.attributes.sent_at, + msg => msg.attributes.sourceServiceId === sourceServiceId + ); const isUpdate = Boolean(data && data.isRecipientUpdate); @@ -152,6 +145,7 @@ export async function handleDataMessage( confirm(); return; } + if (type === 'outgoing') { if (isUpdate && existingMessage) { log.info( @@ -238,7 +232,6 @@ export async function handleDataMessage( } // GroupV2 - if (initialMessage.groupV2) { if (isGroupV1(conversation.attributes)) { // If we received a GroupV2 message in a GroupV1 group, we migrate! @@ -805,7 +798,7 @@ export async function handleDataMessage( } log.info(`${idLog}: Batching save`); - drop(saveAndNotify(message, conversation, confirm)); + drop(dependencies.saveAndNotify(message, conversation, confirm)); } catch (error) { const errorForLog = Errors.toLogFormat(error); log.error(`${idLog}: error:`, errorForLog); diff --git a/ts/services/MessageCache.preload.ts b/ts/services/MessageCache.preload.ts index 4c1695ea9c..a3e2c9913c 100644 --- a/ts/services/MessageCache.preload.ts +++ b/ts/services/MessageCache.preload.ts @@ -4,11 +4,9 @@ import lodash from 'lodash'; import { LRUCache } from 'lru-cache'; -import { createLogger } from '../logging/log.std.js'; import { MessageModel } from '../models/messages.preload.js'; import { DataReader, DataWriter } from '../sql/Client.preload.js'; import { getMessageConversation } from '../util/getMessageConversation.dom.js'; -import { getSenderIdentifier } from '../util/getSenderIdentifier.dom.js'; import { upgradeMessageSchema } from '../util/migrations.preload.js'; import { isNotNil } from '../util/isNotNil.std.js'; import { isStory } from '../messages/helpers.std.js'; @@ -23,8 +21,6 @@ import { getSelectedConversationId } from '../state/selectors/nav.std.js'; const { throttle } = lodash; -const log = createLogger('MessageCache'); - const MAX_THROTTLED_REDUX_UPDATERS = 200; export class MessageCache { static install(): MessageCache { @@ -35,7 +31,6 @@ export class MessageCache { #state = { messages: new Map(), - messageIdsBySender: new Map(), messageIdsBySentAt: new Map>(), lastAccessedAt: new Map(), }; @@ -72,16 +67,6 @@ export class MessageCache { return message; } - // Finds a message in the cache by sender identifier - public findBySender(senderIdentifier: string): MessageModel | undefined { - const id = this.#state.messageIdsBySender.get(senderIdentifier); - if (!id) { - return undefined; - } - - return this.getById(id); - } - // Finds a message in the cache by Id public getById(id: string): MessageModel | undefined { const message = this.#state.messages.get(id); @@ -109,7 +94,6 @@ export class MessageCache { return inMemory; } - log.info(`findBySentAt(${sentAt}): db lookup needed`); const allOnDisk = await DataReader.getMessagesBySentAt(sentAt); const onDisk = allOnDisk .map(message => this.register(new MessageModel(message))) @@ -220,10 +204,6 @@ export class MessageCache { return; } - this.#state.messageIdsBySender.delete( - getSenderIdentifier(message.attributes) - ); - const { id, sent_at: sentAt } = message.attributes; const previousIdsBySentAt = this.#state.messageIdsBySentAt.get(sentAt); @@ -236,10 +216,6 @@ export class MessageCache { } this.#state.lastAccessedAt.set(id, Date.now()); - this.#state.messageIdsBySender.set( - getSenderIdentifier(message.attributes), - id - ); this.#throttledUpdateRedux(message.attributes); } @@ -270,10 +246,6 @@ export class MessageCache { this.#state.messages.set(message.id, message); this.#state.lastAccessedAt.set(message.id, Date.now()); this.#state.messageIdsBySentAt.set(sentAt, Array.from(nextIdsBySentAtSet)); - this.#state.messageIdsBySender.set( - getSenderIdentifier(message.attributes), - id - ); } #removeMessage(messageId: string): void { @@ -299,9 +271,6 @@ export class MessageCache { this.#state.messages.delete(messageId); this.#state.lastAccessedAt.delete(messageId); - this.#state.messageIdsBySender.delete( - getSenderIdentifier(message.attributes) - ); } #updateRedux(attributes: MessageAttributesType) { diff --git a/ts/sql/Interface.std.ts b/ts/sql/Interface.std.ts index de6b54da0a..ceba84cb4f 100644 --- a/ts/sql/Interface.std.ts +++ b/ts/sql/Interface.std.ts @@ -910,12 +910,6 @@ type ReadableInterface = { sentAtTimestamp: number, options: { includeEdits: boolean } ) => MessageType | null; - getMessageBySender: (options: { - source?: string; - sourceServiceId?: ServiceIdString; - sourceDevice?: number; - sent_at: number; - }) => MessageType | undefined; getMessageById: (id: string) => MessageType | undefined; getMessagesById: (messageIds: ReadonlyArray) => Array; _getAllMessages: () => Array; diff --git a/ts/sql/Server.node.ts b/ts/sql/Server.node.ts index 82c04bf153..9e025bf0db 100644 --- a/ts/sql/Server.node.ts +++ b/ts/sql/Server.node.ts @@ -458,7 +458,6 @@ export const DataReader: ServerReadableInterface = { getReactionByTimestamp, _getAllReactions, getMessageByAuthorAciAndSentAt, - getMessageBySender, getMessageById, getMessagesById, _getAllMessages, @@ -3542,55 +3541,6 @@ function getMessageByAuthorAciAndSentAt( })(); } -function getMessageBySender( - db: ReadableDB, - { - source, - sourceServiceId, - sourceDevice, - sent_at, - }: { - source?: string; - sourceServiceId?: ServiceIdString; - sourceDevice?: number; - sent_at: number; - } -): MessageType | undefined { - return db.transaction(() => { - const rows: Array = db - .prepare( - ` - SELECT ${MESSAGE_COLUMNS.join(', ')} FROM messages WHERE - (source = $source OR sourceServiceId = $sourceServiceId) AND - sourceDevice = $sourceDevice AND - sent_at = $sent_at - LIMIT 2; - ` - ) - .all({ - source: source || null, - sourceServiceId: sourceServiceId || null, - sourceDevice: sourceDevice || null, - sent_at, - }); - - if (rows.length > 1) { - logger.warn('getMessageBySender: More than one message found for', { - sent_at, - source, - sourceServiceId, - sourceDevice, - }); - } - - if (rows.length < 1) { - return undefined; - } - - return hydrateMessage(db, rows[0]); - })(); -} - export function _storyIdPredicate( storyId: string | undefined, includeStoryReplies: boolean diff --git a/ts/test-electron/messages/handleDataMessage_test.preload.ts b/ts/test-electron/messages/handleDataMessage_test.preload.ts new file mode 100644 index 0000000000..ee885726f9 --- /dev/null +++ b/ts/test-electron/messages/handleDataMessage_test.preload.ts @@ -0,0 +1,179 @@ +// Copyright 2026 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { assert } from 'chai'; +import * as sinon from 'sinon'; +import { v4 as uuid } from 'uuid'; + +import { handleDataMessage } from '../../messages/handleDataMessage.preload.js'; +import type { MessageAttributesType } from '../../model-types.d.ts'; +import { MessageModel } from '../../models/messages.preload.js'; +import { MessageCache } from '../../services/MessageCache.preload.js'; +import { DataWriter } from '../../sql/Client.preload.js'; +import { itemStorage } from '../../textsecure/Storage.preload.js'; +import type { ProcessedDataMessage } from '../../textsecure/Types.d.ts'; +import { + type AciString, + generateAci, + generatePni, +} from '../../types/ServiceId.std.js'; +import { DurationInSeconds } from '../../util/durations/duration-in-seconds.std.js'; +import { SignalService } from '../../protobuf/index.std.js'; + +describe('handleDataMessage', () => { + let ourAci: AciString; + + beforeEach(async () => { + ourAci = generateAci(); + MessageCache.install(); + await itemStorage.user.setAciAndDeviceId(ourAci, 1); + await itemStorage.user.setPni(generatePni()); + + window.ConversationController.reset(); + MessageCache.install(); + await window.ConversationController.load(); + }); + + afterEach(async () => { + await DataWriter.removeAll(); + await itemStorage.fetch(); + window.ConversationController.reset(); + }); + + it('deduplicates incoming messages with same sender/timestamp, if existing message saved to DB', async () => { + const senderAci = generateAci(); + const conversation = await window.ConversationController.getOrCreateAndWait( + senderAci, + 'private' + ); + const sentAt = Date.now(); + + const existingAttributes: MessageAttributesType = { + id: uuid(), + conversationId: conversation.id, + type: 'incoming', + sourceServiceId: senderAci, + sourceDevice: 1, + sent_at: sentAt, + timestamp: sentAt, + received_at: sentAt, + }; + + await DataWriter.saveMessage(existingAttributes, { + ourAci, + forceSave: true, + postSaveUpdates: () => Promise.resolve(), + }); + + const dataMessage: ProcessedDataMessage = { + attachments: [], + flags: 0, + body: 'body', + expireTimer: DurationInSeconds.fromDays(0), + expireTimerVersion: 1, + isViewOnce: false, + timestamp: sentAt, + requiredProtocolVersion: + SignalService.DataMessage.ProtocolVersion.CURRENT, + }; + + const duplicateMessage = new MessageModel({ + id: uuid(), + conversationId: conversation.id, + type: 'incoming', + sourceServiceId: senderAci, + sourceDevice: 2, + sent_at: sentAt, + timestamp: sentAt, + received_at: sentAt + 1, + }); + + const saveAndNotify = sinon.stub(); + const confirm = sinon.stub(); + + await handleDataMessage( + duplicateMessage, + dataMessage, + confirm, + {}, + { saveAndNotify } + ); + + assert.strictEqual(saveAndNotify.callCount, 0, 'not saved'); + assert.strictEqual(confirm.callCount, 1, 'confirmed immediately'); + }); + + it('deduplicates incoming messages with same sender/timestamp, if existing message only in memory', async () => { + const senderAci = generateAci(); + const conversation = await window.ConversationController.getOrCreateAndWait( + senderAci, + 'private' + ); + const sentAt = Date.now(); + const dataMessage: ProcessedDataMessage = { + attachments: [], + flags: 0, + body: 'body', + expireTimer: DurationInSeconds.fromDays(0), + expireTimerVersion: 1, + isViewOnce: false, + timestamp: sentAt, + requiredProtocolVersion: + SignalService.DataMessage.ProtocolVersion.CURRENT, + }; + const saveAndNotify = sinon.stub(); + const confirm = sinon.stub(); + + const attributes: MessageAttributesType = { + id: uuid(), + conversationId: conversation.id, + type: 'incoming', + sourceServiceId: senderAci, + sourceDevice: 1, + sent_at: sentAt, + timestamp: sentAt, + received_at: sentAt, + }; + + await handleDataMessage( + new MessageModel(attributes), + dataMessage, + confirm, + {}, + { saveAndNotify } + ); + + assert.strictEqual(saveAndNotify.callCount, 1, 'initial message saved'); + assert.strictEqual(confirm.callCount, 0, 'not confirmed until saved'); + + // Calling it again with same message does not call saveAndNotify, but does confirm() + await handleDataMessage( + new MessageModel(attributes), + dataMessage, + confirm, + {}, + { saveAndNotify } + ); + + assert.strictEqual(saveAndNotify.callCount, 1, 'not saved again'); + assert.strictEqual(confirm.callCount, 1, 'duplicate confirmed immediately'); + + // Calling it again with different message but same aci/timestamp does not call + // saveAndNotify, but does confirm() + await handleDataMessage( + new MessageModel({ + ...attributes, + // we intentionally (if suboptimally) do not consider deviceId when deduplicating + sourceDevice: 2, + id: uuid(), + }), + dataMessage, + confirm, + {}, + { saveAndNotify } + ); + + assert.strictEqual(saveAndNotify.callCount, 1, 'not saved again'); + assert.strictEqual(confirm.callCount, 2, 'duplicate confirmed immediately'); + }); +}); diff --git a/ts/util/getSenderIdentifier.dom.ts b/ts/util/getSenderIdentifier.dom.ts deleted file mode 100644 index b32a2c4d96..0000000000 --- a/ts/util/getSenderIdentifier.dom.ts +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright 2023 Signal Messenger, LLC -// SPDX-License-Identifier: AGPL-3.0-only - -import type { ReadonlyMessageAttributesType } from '../model-types.d.ts'; - -export function getSenderIdentifier({ - sent_at: sentAt, - source, - sourceServiceId, - sourceDevice, -}: Pick< - ReadonlyMessageAttributesType, - 'sent_at' | 'source' | 'sourceServiceId' | 'sourceDevice' ->): string { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const conversation = window.ConversationController.lookupOrCreate({ - e164: source, - serviceId: sourceServiceId, - reason: `MessageModel.getSenderIdentifier(${sentAt})`, - })!; - - return `${conversation?.id}.${sourceDevice}-${sentAt}`; -}