Incrementally insert MSL entries for legacy group sends.

This commit is contained in:
Greyson Parrelli
2021-07-02 17:25:31 -04:00
committed by Alex Hart
parent acc825971b
commit 7f0a0bef5a
14 changed files with 237 additions and 368 deletions

View File

@@ -108,6 +108,7 @@ import org.whispersystems.signalservice.internal.push.exceptions.MismatchedDevic
import org.whispersystems.signalservice.internal.push.exceptions.StaleDevicesException;
import org.whispersystems.signalservice.internal.push.http.AttachmentCipherOutputStreamFactory;
import org.whispersystems.signalservice.internal.push.http.CancelationSignal;
import org.whispersystems.signalservice.internal.push.http.PartialSendCompleteListener;
import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec;
import org.whispersystems.signalservice.internal.util.StaticCredentialsProvider;
import org.whispersystems.signalservice.internal.util.Util;
@@ -273,7 +274,7 @@ public class SignalServiceMessageSender {
Content content = createTypingContent(message);
EnvelopeContent envelopeContent = EnvelopeContent.encrypted(content, ContentHint.IMPLICIT, Optional.absent());
sendMessage(recipients, getTargetUnidentifiedAccess(unidentifiedAccess), message.getTimestamp(), envelopeContent, true, cancelationSignal);
sendMessage(recipients, getTargetUnidentifiedAccess(unidentifiedAccess), message.getTimestamp(), envelopeContent, true, null, cancelationSignal);
}
/**
@@ -388,7 +389,7 @@ public class SignalServiceMessageSender {
EnvelopeContent envelopeContent = EnvelopeContent.encrypted(content, ContentHint.IMPLICIT, Optional.of(groupId));
long timestamp = System.currentTimeMillis();
return sendMessage(recipients, getTargetUnidentifiedAccess(unidentifiedAccess), timestamp, envelopeContent, false, null);
return sendMessage(recipients, getTargetUnidentifiedAccess(unidentifiedAccess), timestamp, envelopeContent, false, null, null);
}
/**
@@ -443,21 +444,22 @@ public class SignalServiceMessageSender {
/**
* Sends a message to a group using client-side fanout.
*
* @param recipients The group members.
* @param message The group message.
* @throws IOException
* @param partialListener A listener that will be called when an individual send is completed. Will be invoked on an arbitrary background thread, *not*
* the calling thread.
*/
public List<SendMessageResult> sendDataMessage(List<SignalServiceAddress> recipients,
List<Optional<UnidentifiedAccessPair>> unidentifiedAccess,
boolean isRecipientUpdate,
ContentHint contentHint,
SignalServiceDataMessage message)
SignalServiceDataMessage message,
PartialSendCompleteListener partialListener,
CancelationSignal cancelationSignal)
throws IOException, UntrustedIdentityException
{
Content content = createMessageContent(message);
EnvelopeContent envelopeContent = EnvelopeContent.encrypted(content, contentHint, message.getGroupId());
long timestamp = message.getTimestamp();
List<SendMessageResult> results = sendMessage(recipients, getTargetUnidentifiedAccess(unidentifiedAccess), timestamp, envelopeContent, false, null);
List<SendMessageResult> results = sendMessage(recipients, getTargetUnidentifiedAccess(unidentifiedAccess), timestamp, envelopeContent, false, partialListener, cancelationSignal);
boolean needsSyncInResults = false;
for (SendMessageResult result : results) {
@@ -1531,6 +1533,7 @@ public class SignalServiceMessageSender {
long timestamp,
EnvelopeContent content,
boolean online,
PartialSendCompleteListener partialListener,
CancelationSignal cancelationSignal)
throws IOException
{
@@ -1544,7 +1547,13 @@ public class SignalServiceMessageSender {
while (recipientIterator.hasNext()) {
SignalServiceAddress recipient = recipientIterator.next();
Optional<UnidentifiedAccess> access = unidentifiedAccessIterator.next();
futureResults.add(executor.submit(() -> sendMessage(recipient, access, timestamp, content, online, cancelationSignal)));
futureResults.add(executor.submit(() -> {
SendMessageResult result = sendMessage(recipient, access, timestamp, content, online, cancelationSignal);
if (partialListener != null) {
partialListener.onPartialSendComplete(result);
}
return result;
}));
}
List<SendMessageResult> results = new ArrayList<>(futureResults.size());

View File

@@ -0,0 +1,10 @@
package org.whispersystems.signalservice.internal.push.http;
import org.whispersystems.signalservice.api.messages.SendMessageResult;
/**
* Used to let a listener know when each individual send in a collection of sends has been completed.
*/
public interface PartialSendCompleteListener {
void onPartialSendComplete(SendMessageResult result);
}