Simplify message deduplication

This commit is contained in:
trevor-signal
2026-03-03 16:04:04 -05:00
committed by GitHub
parent 864b4f2bfb
commit f1c285f58e
7 changed files with 212 additions and 135 deletions

View File

@@ -16,7 +16,6 @@ import {
deliveryReceiptQueue,
deliveryReceiptBatcher,
} from '../util/deliveryReceipt.preload.js';
import { getSenderIdentifier } from '../util/getSenderIdentifier.dom.js';
import { upgradeMessageSchema } from '../util/migrations.preload.js';
import { getOwn } from '../util/getOwn.std.js';
import {
@@ -68,8 +67,8 @@ import {
modifyTargetMessage,
ModifyTargetMessageResult,
} from '../util/modifyTargetMessage.preload.js';
import { saveAndNotify } from './saveAndNotify.preload.js';
import { MessageModel } from '../models/messages.preload.js';
import type { saveAndNotify } from './saveAndNotify.preload.js';
import type { MessageModel } from '../models/messages.preload.js';
import { safeParsePartial } from '../util/schemas.std.js';
import { PollCreateSchema } from '../types/Polls.dom.js';
@@ -94,7 +93,10 @@ export async function handleDataMessage(
message: MessageModel,
initialMessage: ProcessedDataMessage,
confirm: () => void,
options: { data?: SentEventData } = {}
options: { data?: SentEventData } = {},
dependencies: {
saveAndNotify: typeof saveAndNotify;
}
): Promise<void> {
const { data } = options;
@@ -119,21 +121,12 @@ export async function handleDataMessage(
await conversation.queueJob(idLog, async () => {
log.info(`${idLog}: starting processing in queue`);
// First, check for duplicates. If we find one, stop processing here.
const senderIdentifier = getSenderIdentifier(message.attributes);
const inMemoryMessage = window.MessageCache.findBySender(senderIdentifier);
if (inMemoryMessage) {
log.info(`${idLog}: cache hit`, senderIdentifier);
} else {
log.info(`${idLog}: duplicate check db lookup needed`, senderIdentifier);
}
let existingMessage = inMemoryMessage;
if (!existingMessage) {
const fromDb = await DataReader.getMessageBySender(message.attributes);
existingMessage = fromDb
? window.MessageCache.register(new MessageModel(fromDb))
: undefined;
}
// First, check for duplicate messages. We dedupe by senderAci + timestamp, in the
// same way we address messages in the Signal ecosystem.
const existingMessage = await window.MessageCache.findBySentAt(
message.attributes.sent_at,
msg => msg.attributes.sourceServiceId === sourceServiceId
);
const isUpdate = Boolean(data && data.isRecipientUpdate);
@@ -152,6 +145,7 @@ export async function handleDataMessage(
confirm();
return;
}
if (type === 'outgoing') {
if (isUpdate && existingMessage) {
log.info(
@@ -238,7 +232,6 @@ export async function handleDataMessage(
}
// GroupV2
if (initialMessage.groupV2) {
if (isGroupV1(conversation.attributes)) {
// If we received a GroupV2 message in a GroupV1 group, we migrate!
@@ -805,7 +798,7 @@ export async function handleDataMessage(
}
log.info(`${idLog}: Batching save`);
drop(saveAndNotify(message, conversation, confirm));
drop(dependencies.saveAndNotify(message, conversation, confirm));
} catch (error) {
const errorForLog = Errors.toLogFormat(error);
log.error(`${idLog}: error:`, errorForLog);