From d8ac5a390a98740e6a8a4b04ec850375638875ce Mon Sep 17 00:00:00 2001 From: Greyson Parrelli Date: Tue, 4 Apr 2023 10:40:33 -0400 Subject: [PATCH] Write to MSL before sending a sync message. --- .../securesms/messages/GroupSendUtil.java | 65 ++++++++++++++----- .../api/SignalServiceMessageSender.java | 43 ++++++++---- .../PartialSendBatchCompleteListener.java | 12 ++++ 3 files changed, 91 insertions(+), 29 deletions(-) create mode 100644 libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/http/PartialSendBatchCompleteListener.java diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/GroupSendUtil.java b/app/src/main/java/org/thoughtcrime/securesms/messages/GroupSendUtil.java index d3123e66d2..459a76a5ea 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/GroupSendUtil.java +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/GroupSendUtil.java @@ -47,6 +47,7 @@ import org.whispersystems.signalservice.api.push.exceptions.NotFoundException; import org.whispersystems.signalservice.api.util.Preconditions; import org.whispersystems.signalservice.internal.push.exceptions.InvalidUnidentifiedAccessHeaderException; import org.whispersystems.signalservice.internal.push.http.CancelationSignal; +import org.whispersystems.signalservice.internal.push.http.PartialSendBatchCompleteListener; import org.whispersystems.signalservice.internal.push.http.PartialSendCompleteListener; import java.io.IOException; @@ -315,17 +316,32 @@ public final class GroupSendUtil { try { List targets = senderKeyTargets.stream().map(r -> recipients.getAddress(r.getId())).collect(Collectors.toList()); List access = senderKeyTargets.stream().map(r -> recipients.requireAccess(r.getId())).collect(Collectors.toList()); - List results = sendOperation.sendWithSenderKey(messageSender, distributionId, targets, access, isRecipientUpdate); + + final MessageSendLogTables messageLogDatabase = SignalDatabase.messageLog(); + final AtomicLong entryId = new AtomicLong(-1); + final boolean includeInMessageLog = sendOperation.shouldIncludeInMessageLog(); + + List results = sendOperation.sendWithSenderKey(messageSender, distributionId, targets, access, isRecipientUpdate, partialResults -> { + if (!includeInMessageLog) { + return; + } + + synchronized (entryId) { + if (entryId.get() == -1) { + entryId.set(messageLogDatabase.insertIfPossible(sendOperation.getSentTimestamp(), senderKeyTargets, partialResults, sendOperation.getContentHint(), sendOperation.getRelatedMessageId(), sendOperation.isUrgent())); + } else { + for (SendMessageResult result : partialResults) { + entryId.set(messageLogDatabase.addRecipientToExistingEntryIfPossible(entryId.get(), recipients.requireRecipientId(result.getAddress()), sendOperation.getSentTimestamp(), result, sendOperation.getContentHint(), sendOperation.getRelatedMessageId(), sendOperation.isUrgent())); + } + } + } + }); allResults.addAll(results); int successCount = (int) results.stream().filter(SendMessageResult::isSuccess).count(); Log.d(TAG, "Successfully sent using sender key to " + successCount + "/" + targets.size() + " sender key targets."); - if (sendOperation.shouldIncludeInMessageLog()) { - SignalDatabase.messageLog().insertIfPossible(sendOperation.getSentTimestamp(), senderKeyTargets, results, sendOperation.getContentHint(), sendOperation.getRelatedMessageId(), sendOperation.isUrgent()); - } - if (relatedMessageId != null) { SignalLocalMetrics.GroupMessageSend.onSenderKeyMslInserted(relatedMessageId.getId()); } @@ -370,9 +386,9 @@ public final class GroupSendUtil { List> access = legacyTargets.stream().map(r -> recipients.getAccessPair(r.getId())).collect(Collectors.toList()); boolean recipientUpdate = isRecipientUpdate || allResults.size() > 0; - final MessageSendLogTables messageLogDatabase = SignalDatabase.messageLog(); - final AtomicLong entryId = new AtomicLong(-1); - final boolean includeInMessageLog = sendOperation.shouldIncludeInMessageLog(); + final MessageSendLogTables messageLogDatabase = SignalDatabase.messageLog(); + final AtomicLong entryId = new AtomicLong(-1); + final boolean includeInMessageLog = sendOperation.shouldIncludeInMessageLog(); List results = sendOperation.sendLegacy(messageSender, targets, legacyTargets, access, recipientUpdate, result -> { if (!includeInMessageLog) { @@ -437,7 +453,8 @@ public final class GroupSendUtil { @NonNull DistributionId distributionId, @NonNull List targets, @NonNull List access, - boolean isRecipientUpdate) + boolean isRecipientUpdate, + @Nullable PartialSendBatchCompleteListener partialListener) throws NoSessionException, UntrustedIdentityException, InvalidKeyException, IOException, InvalidRegistrationIdException; @NonNull List sendLegacy(@NonNull SignalServiceMessageSender messageSender, @@ -490,11 +507,12 @@ public final class GroupSendUtil { @NonNull DistributionId distributionId, @NonNull List targets, @NonNull List access, - boolean isRecipientUpdate) + boolean isRecipientUpdate, + @Nullable PartialSendBatchCompleteListener partialListener) throws NoSessionException, UntrustedIdentityException, InvalidKeyException, IOException, InvalidRegistrationIdException { SenderKeyGroupEvents listener = relatedMessageId != null ? new SenderKeyMetricEventListener(relatedMessageId.getId()) : SenderKeyGroupEvents.EMPTY; - return messageSender.sendGroupDataMessage(distributionId, targets, access, isRecipientUpdate, contentHint, message, listener, urgent, isForStory); + return messageSender.sendGroupDataMessage(distributionId, targets, access, isRecipientUpdate, contentHint, message, listener, urgent, isForStory, partialListener); } @Override @@ -566,11 +584,18 @@ public final class GroupSendUtil { @NonNull DistributionId distributionId, @NonNull List targets, @NonNull List access, - boolean isRecipientUpdate) + boolean isRecipientUpdate, + @Nullable PartialSendBatchCompleteListener partialListener) throws NoSessionException, UntrustedIdentityException, InvalidKeyException, IOException, InvalidRegistrationIdException { messageSender.sendGroupTyping(distributionId, targets, access, message); - return targets.stream().map(a -> SendMessageResult.success(a, Collections.emptyList(), true, false, -1, Optional.empty())).collect(Collectors.toList()); + List results = targets.stream().map(a -> SendMessageResult.success(a, Collections.emptyList(), true, false, -1, Optional.empty())).collect(Collectors.toList()); + + if (partialListener != null) { + partialListener.onPartialSendComplete(results); + } + + return results; } @Override @@ -626,10 +651,11 @@ public final class GroupSendUtil { @NonNull DistributionId distributionId, @NonNull List targets, @NonNull List access, - boolean isRecipientUpdate) + boolean isRecipientUpdate, + @Nullable PartialSendBatchCompleteListener partialSendListener) throws NoSessionException, UntrustedIdentityException, InvalidKeyException, IOException, InvalidRegistrationIdException { - return messageSender.sendCallMessage(distributionId, targets, access, message); + return messageSender.sendCallMessage(distributionId, targets, access, message, partialSendListener); } @Override @@ -697,10 +723,11 @@ public final class GroupSendUtil { @NonNull DistributionId distributionId, @NonNull List targets, @NonNull List access, - boolean isRecipientUpdate) + boolean isRecipientUpdate, + @Nullable PartialSendBatchCompleteListener partialListener) throws NoSessionException, UntrustedIdentityException, InvalidKeyException, IOException, InvalidRegistrationIdException { - return messageSender.sendGroupStory(distributionId, Optional.ofNullable(groupId).map(GroupId::getDecodedId), targets, access, isRecipientUpdate, message, getSentTimestamp(), manifest); + return messageSender.sendGroupStory(distributionId, Optional.ofNullable(groupId).map(GroupId::getDecodedId), targets, access, isRecipientUpdate, message, getSentTimestamp(), manifest, partialListener); } @Override @@ -828,6 +855,10 @@ public final class GroupSendUtil { return accessList.requireIdByAddress(address); } + @NonNull List requireRecipientIds(@NonNull List addresses) { + return addresses.stream().map(accessList::requireIdByAddress).collect(Collectors.toList()); + } + private static @NonNull Map mapAddresses(@NonNull Context context, @NonNull List recipients) throws IOException { List addresses = RecipientUtil.toSignalServiceAddressesFromResolved(context, recipients); diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java index 1497199b3d..5d71c576fb 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java @@ -122,6 +122,7 @@ import org.whispersystems.signalservice.internal.push.exceptions.MismatchedDevic import org.whispersystems.signalservice.internal.push.exceptions.StaleDevicesException; import org.whispersystems.signalservice.internal.push.http.AttachmentCipherOutputStreamFactory; import org.whispersystems.signalservice.internal.push.http.CancelationSignal; +import org.whispersystems.signalservice.internal.push.http.PartialSendBatchCompleteListener; import org.whispersystems.signalservice.internal.push.http.PartialSendCompleteListener; import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec; import org.whispersystems.signalservice.internal.util.Util; @@ -309,7 +310,8 @@ public class SignalServiceMessageSender { boolean isRecipientUpdate, SignalServiceStoryMessage message, long timestamp, - Set manifest) + Set manifest, + PartialSendBatchCompleteListener partialListener) throws IOException, UntrustedIdentityException, InvalidKeyException, NoSessionException, InvalidRegistrationIdException { Log.d(TAG, "[" + timestamp + "] Sending a story."); @@ -317,6 +319,10 @@ public class SignalServiceMessageSender { Content content = createStoryContent(message); List sendMessageResults = sendGroupMessage(distributionId, recipients, unidentifiedAccess, timestamp, content, ContentHint.IMPLICIT, groupId, false, SenderKeyGroupEvents.EMPTY, false, true); + if (partialListener != null) { + partialListener.onPartialSendComplete(sendMessageResults); + } + if (aciStore.isMultiDevice()) { sendStorySyncMessage(message, timestamp, isRecipientUpdate, manifest); } @@ -363,13 +369,21 @@ public class SignalServiceMessageSender { public List sendCallMessage(DistributionId distributionId, List recipients, List unidentifiedAccess, - SignalServiceCallMessage message) + SignalServiceCallMessage message, + PartialSendBatchCompleteListener partialListener) throws IOException, UntrustedIdentityException, InvalidKeyException, NoSessionException, InvalidRegistrationIdException { Log.d(TAG, "[" + message.getTimestamp().get() + "] Sending a call message (sender key)."); Content content = createCallContent(message); - return sendGroupMessage(distributionId, recipients, unidentifiedAccess, message.getTimestamp().get(), content, ContentHint.IMPLICIT, message.getGroupId(), false, SenderKeyGroupEvents.EMPTY, message.isUrgent(), false); + + List results = sendGroupMessage(distributionId, recipients, unidentifiedAccess, message.getTimestamp().get(), content, ContentHint.IMPLICIT, message.getGroupId(), false, SenderKeyGroupEvents.EMPTY, message.isUrgent(), false); + + if (partialListener != null) { + partialListener.onPartialSendComplete(results); + } + + return results; } /** @@ -492,15 +506,16 @@ public class SignalServiceMessageSender { /** * Sends a {@link SignalServiceDataMessage} to a group using sender keys. */ - public List sendGroupDataMessage(DistributionId distributionId, - List recipients, - List unidentifiedAccess, - boolean isRecipientUpdate, - ContentHint contentHint, - SignalServiceDataMessage message, - SenderKeyGroupEvents sendEvents, - boolean urgent, - boolean isForStory) + public List sendGroupDataMessage(DistributionId distributionId, + List recipients, + List unidentifiedAccess, + boolean isRecipientUpdate, + ContentHint contentHint, + SignalServiceDataMessage message, + SenderKeyGroupEvents sendEvents, + boolean urgent, + boolean isForStory, + PartialSendBatchCompleteListener partialListener) throws IOException, UntrustedIdentityException, NoSessionException, InvalidKeyException, InvalidRegistrationIdException { Log.d(TAG, "[" + message.getTimestamp() + "] Sending a group data message to " + recipients.size() + " recipients using DistributionId " + distributionId); @@ -509,6 +524,10 @@ public class SignalServiceMessageSender { Optional groupId = message.getGroupId(); List results = sendGroupMessage(distributionId, recipients, unidentifiedAccess, message.getTimestamp(), content, contentHint, groupId, false, sendEvents, urgent, isForStory); + if (partialListener != null) { + partialListener.onPartialSendComplete(results); + } + sendEvents.onMessageSent(); if (aciStore.isMultiDevice()) { diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/http/PartialSendBatchCompleteListener.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/http/PartialSendBatchCompleteListener.java new file mode 100644 index 0000000000..2bb5478a9c --- /dev/null +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/http/PartialSendBatchCompleteListener.java @@ -0,0 +1,12 @@ +package org.whispersystems.signalservice.internal.push.http; + +import org.whispersystems.signalservice.api.messages.SendMessageResult; + +import java.util.List; + +/** + * Used to let a listener know when a batch of sends in a collection of sends has been completed. + */ +public interface PartialSendBatchCompleteListener { + void onPartialSendComplete(List results); +}