Implement the message send log for sender key retries.

This commit is contained in:
Greyson Parrelli
2021-06-24 11:22:20 -04:00
committed by Cody Henthorne
parent 6502ef64ce
commit f19033a7a2
31 changed files with 1077 additions and 179 deletions

View File

@@ -219,18 +219,16 @@ public class SignalServiceMessageSender {
*
* @param recipient The sender of the received message you're acknowledging.
* @param message The read receipt to deliver.
* @throws IOException
* @throws UntrustedIdentityException
*/
public void sendReceipt(SignalServiceAddress recipient,
Optional<UnidentifiedAccessPair> unidentifiedAccess,
SignalServiceReceiptMessage message)
public SendMessageResult sendReceipt(SignalServiceAddress recipient,
Optional<UnidentifiedAccessPair> unidentifiedAccess,
SignalServiceReceiptMessage message)
throws IOException, UntrustedIdentityException
{
Content content = createReceiptContent(message);
EnvelopeContent envelopeContent = EnvelopeContent.encrypted(content, ContentHint.IMPLICIT, Optional.absent());
sendMessage(recipient, getTargetUnidentifiedAccess(unidentifiedAccess), message.getWhen(), envelopeContent, false, null);
return sendMessage(recipient, getTargetUnidentifiedAccess(unidentifiedAccess), message.getWhen(), envelopeContent, false, null);
}
/**
@@ -254,8 +252,6 @@ public class SignalServiceMessageSender {
*
* @param recipient The destination
* @param message The typing indicator to deliver
* @throws IOException
* @throws UntrustedIdentityException
*/
public void sendTyping(SignalServiceAddress recipient,
Optional<UnidentifiedAccessPair> unidentifiedAccess,
@@ -402,6 +398,23 @@ public class SignalServiceMessageSender {
new SignalGroupSessionBuilder(sessionLock, new GroupSessionBuilder(store)).process(sender, senderKeyDistributionMessage);
}
/**
* Resend a previously-sent message.
*/
public SendMessageResult resendContent(SignalServiceAddress address,
Optional<UnidentifiedAccessPair> unidentifiedAccess,
long timestamp,
Content content,
ContentHint contentHint,
Optional<byte[]> groupId)
throws UntrustedIdentityException, IOException
{
EnvelopeContent envelopeContent = EnvelopeContent.encrypted(content, contentHint, groupId);
Optional<UnidentifiedAccess> access = unidentifiedAccess.isPresent() ? unidentifiedAccess.get().getTargetUnidentifiedAccess() : Optional.absent();
return sendMessage(address, access, timestamp, envelopeContent, false, null);
}
/**
* Sends a {@link SignalServiceDataMessage} to a group using sender keys.
*/
@@ -469,7 +482,7 @@ public class SignalServiceMessageSender {
return results;
}
public void sendSyncMessage(SignalServiceSyncMessage message, Optional<UnidentifiedAccessPair> unidentifiedAccess)
public SendMessageResult sendSyncMessage(SignalServiceSyncMessage message, Optional<UnidentifiedAccessPair> unidentifiedAccess)
throws IOException, UntrustedIdentityException
{
Content content;
@@ -502,8 +515,7 @@ public class SignalServiceMessageSender {
} else if (message.getKeys().isPresent()) {
content = createMultiDeviceSyncKeysContent(message.getKeys().get());
} else if (message.getVerified().isPresent()) {
sendVerifiedMessage(message.getVerified().get(), unidentifiedAccess);
return;
return sendVerifiedMessage(message.getVerified().get(), unidentifiedAccess);
} else {
throw new IOException("Unsupported sync message!");
}
@@ -513,7 +525,7 @@ public class SignalServiceMessageSender {
EnvelopeContent envelopeContent = EnvelopeContent.encrypted(content, ContentHint.IMPLICIT, Optional.absent());
sendMessage(localAddress, Optional.absent(), timestamp, envelopeContent, false, null);
return sendMessage(localAddress, Optional.absent(), timestamp, envelopeContent, false, null);
}
public void setSoTimeoutMillis(long soTimeoutMillis) {
@@ -631,7 +643,7 @@ public class SignalServiceMessageSender {
attachment.getUploadTimestamp());
}
private void sendVerifiedMessage(VerifiedMessage message, Optional<UnidentifiedAccessPair> unidentifiedAccess)
private SendMessageResult sendVerifiedMessage(VerifiedMessage message, Optional<UnidentifiedAccessPair> unidentifiedAccess)
throws IOException, UntrustedIdentityException
{
byte[] nullMessageBody = DataMessage.newBuilder()
@@ -657,6 +669,8 @@ public class SignalServiceMessageSender {
sendMessage(localAddress, Optional.absent(), message.getTimestamp(), syncMessageContent, false, null);
}
return result;
}
public SendMessageResult sendNullMessage(SignalServiceAddress address, Optional<UnidentifiedAccessPair> unidentifiedAccess)
@@ -1016,9 +1030,10 @@ public class SignalServiceMessageSender {
private Content createMultiDeviceSentTranscriptContent(SentTranscriptMessage transcript, Optional<UnidentifiedAccessPair> unidentifiedAccess) throws IOException {
SignalServiceAddress address = transcript.getDestination().get();
SendMessageResult result = SendMessageResult.success(address, unidentifiedAccess.isPresent(), true, -1);
Content content = createMessageContent(transcript.getMessage());
SendMessageResult result = SendMessageResult.success(address, Collections.emptyList(), unidentifiedAccess.isPresent(), true, -1, Optional.of(content));
return createMultiDeviceSentTranscriptContent(createMessageContent(transcript.getMessage()),
return createMultiDeviceSentTranscriptContent(content,
Optional.of(address),
transcript.getTimestamp(),
Collections.singletonList(result),
@@ -1613,7 +1628,7 @@ public class SignalServiceMessageSender {
if (pipe.isPresent() && !unidentifiedAccess.isPresent()) {
try {
SendMessageResponse response = pipe.get().send(messages, Optional.absent()).get(10, TimeUnit.SECONDS);
return SendMessageResult.success(recipient, false, response.getNeedsSync() || isMultiDevice.get(), System.currentTimeMillis() - startTime);
return SendMessageResult.success(recipient, messages.getDevices(), false, response.getNeedsSync() || isMultiDevice.get(), System.currentTimeMillis() - startTime, content.getContent());
} catch (IOException | ExecutionException | InterruptedException | TimeoutException e) {
Log.w(TAG, e);
Log.w(TAG, "[sendMessage] Pipe failed, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")");
@@ -1621,7 +1636,7 @@ public class SignalServiceMessageSender {
} else if (unidentifiedPipe.isPresent() && unidentifiedAccess.isPresent()) {
try {
SendMessageResponse response = unidentifiedPipe.get().send(messages, unidentifiedAccess).get(10, TimeUnit.SECONDS);
return SendMessageResult.success(recipient, true, response.getNeedsSync() || isMultiDevice.get(), System.currentTimeMillis() - startTime);
return SendMessageResult.success(recipient, messages.getDevices(), true, response.getNeedsSync() || isMultiDevice.get(), System.currentTimeMillis() - startTime, content.getContent());
} catch (IOException | ExecutionException | InterruptedException | TimeoutException e) {
Log.w(TAG, e);
Log.w(TAG, "[sendMessage] Unidentified pipe failed, falling back...");
@@ -1634,7 +1649,7 @@ public class SignalServiceMessageSender {
SendMessageResponse response = socket.sendMessage(messages, unidentifiedAccess);
return SendMessageResult.success(recipient, unidentifiedAccess.isPresent(), response.getNeedsSync() || isMultiDevice.get(), System.currentTimeMillis() - startTime);
return SendMessageResult.success(recipient, messages.getDevices(), unidentifiedAccess.isPresent(), response.getNeedsSync() || isMultiDevice.get(), System.currentTimeMillis() - startTime, content.getContent());
} catch (InvalidKeyException ike) {
Log.w(TAG, ike);
@@ -1693,15 +1708,21 @@ public class SignalServiceMessageSender {
accessByUuid.put(addressIterator.next().getUuid().get(), accessIterator.next());
}
Map<SignalServiceAddress, List<Integer>> recipientDevices = recipients.stream().collect(Collectors.toMap(a -> a, a -> new LinkedList<>()));
for (int i = 0; i < RETRY_COUNT; i++) {
List<SignalProtocolAddress> destinations = new LinkedList<>();
for (SignalServiceAddress recipient : recipients) {
List<Integer> devices = recipientDevices.get(recipient);
destinations.add(new SignalProtocolAddress(recipient.getUuid().get().toString(), SignalServiceAddress.DEFAULT_DEVICE_ID));
devices.add(SignalServiceAddress.DEFAULT_DEVICE_ID);
for (int deviceId : store.getSubDeviceSessions(recipient.getIdentifier())) {
if (store.containsSession(new SignalProtocolAddress(recipient.getIdentifier(), deviceId))) {
destinations.add(new SignalProtocolAddress(recipient.getUuid().get().toString(), deviceId));
devices.add(deviceId);
}
}
}
@@ -1783,7 +1804,7 @@ public class SignalServiceMessageSender {
if (pipe.isPresent()) {
try {
SendGroupMessageResponse response = pipe.get().sendToGroup(ciphertext, joinedUnidentifiedAccess, timestamp, online).get(10, TimeUnit.SECONDS);
return transformGroupResponseToMessageResults(recipients, response);
return transformGroupResponseToMessageResults(recipientDevices, response, content);
} catch (IOException | ExecutionException | InterruptedException | TimeoutException e) {
Log.w(TAG, "[sendGroupMessage] Pipe failed, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")");
}
@@ -1793,7 +1814,7 @@ public class SignalServiceMessageSender {
try {
SendGroupMessageResponse response = socket.sendGroupMessage(ciphertext, joinedUnidentifiedAccess, timestamp, online);
return transformGroupResponseToMessageResults(recipients, response);
return transformGroupResponseToMessageResults(recipientDevices, response, content);
} catch (GroupMismatchedDevicesException e) {
Log.w(TAG, "[sendGroupMessage] Handling mismatched devices.", e);
for (GroupMismatchedDevices mismatched : e.getMismatchedDevices()) {
@@ -1822,7 +1843,7 @@ public class SignalServiceMessageSender {
throw new IOException("Failed to resolve conflicts after " + RETRY_COUNT + " attempts!");
}
private List<SendMessageResult> transformGroupResponseToMessageResults(List<SignalServiceAddress> recipients, SendGroupMessageResponse response) {
private List<SendMessageResult> transformGroupResponseToMessageResults(Map<SignalServiceAddress, List<Integer>> recipients, SendGroupMessageResponse response, Content content) {
Set<UUID> unregistered = response.getUnsentTargets();
List<SendMessageResult> failures = unregistered.stream()
@@ -1830,12 +1851,14 @@ public class SignalServiceMessageSender {
.map(SendMessageResult::unregisteredFailure)
.collect(Collectors.toList());
List<SendMessageResult> success = recipients.stream()
List<SendMessageResult> success = recipients.keySet()
.stream()
.filter(r -> !unregistered.contains(r.getUuid().get()))
.map(a -> SendMessageResult.success(a, true, isMultiDevice.get(), -1))
.map(a -> SendMessageResult.success(a, recipients.get(a), true, isMultiDevice.get(), -1, Optional.of(content)))
.collect(Collectors.toList());
List<SendMessageResult> results = new LinkedList<>(success);
List<SendMessageResult> results = new ArrayList<>(success.size() + failures.size());
results.addAll(success);
results.addAll(failures);
return results;

View File

@@ -6,7 +6,7 @@ import java.util.HashMap;
import java.util.Map;
public enum ContentHint {
/** This message has content, but you shouldnt expect it to be re-sent to you. */
/** This message has content, but you shouldn't expect it to be re-sent to you. */
DEFAULT(UnidentifiedSenderMessageContent.CONTENT_HINT_DEFAULT),
/** You should expect to be able to have this content be re-sent to you. */

View File

@@ -39,12 +39,17 @@ public interface EnvelopeContent {
*/
int size();
/**
* A content proto, if applicable.
*/
Optional<Content> getContent();
/**
* Wrap {@link Content} you plan on sending as an encrypted message.
* This is the default. Consider anything else exceptional.
*/
static EnvelopeContent encrypted(Content content, ContentHint contentHint, Optional<byte[]> groupId) {
return new Encrypted(content.toByteArray(), contentHint, groupId);
return new Encrypted(content, contentHint, groupId);
}
/**
@@ -56,14 +61,14 @@ public interface EnvelopeContent {
class Encrypted implements EnvelopeContent {
private final byte[] unpaddedMessage;
private final Content content;
private final ContentHint contentHint;
private final Optional<byte[]> groupId;
public Encrypted(byte[] unpaddedMessage, ContentHint contentHint, Optional<byte[]> groupId) {
this.unpaddedMessage = unpaddedMessage;
this.contentHint = contentHint;
this.groupId = groupId;
public Encrypted(Content content, ContentHint contentHint, Optional<byte[]> groupId) {
this.content = content;
this.contentHint = contentHint;
this.groupId = groupId;
}
@Override
@@ -74,7 +79,7 @@ public interface EnvelopeContent {
throws UntrustedIdentityException, InvalidKeyException
{
PushTransportDetails transportDetails = new PushTransportDetails();
CiphertextMessage message = sessionCipher.encrypt(transportDetails.getPaddedMessageBody(unpaddedMessage));
CiphertextMessage message = sessionCipher.encrypt(transportDetails.getPaddedMessageBody(content.toByteArray()));
UnidentifiedSenderMessageContent messageContent = new UnidentifiedSenderMessageContent(message,
senderCertificate,
contentHint.getType(),
@@ -90,7 +95,7 @@ public interface EnvelopeContent {
@Override
public OutgoingPushMessage processUnsealedSender(SignalSessionCipher sessionCipher, SignalProtocolAddress destination) throws UntrustedIdentityException {
PushTransportDetails transportDetails = new PushTransportDetails();
CiphertextMessage message = sessionCipher.encrypt(transportDetails.getPaddedMessageBody(unpaddedMessage));
CiphertextMessage message = sessionCipher.encrypt(transportDetails.getPaddedMessageBody(content.toByteArray()));
int remoteRegistrationId = sessionCipher.getRemoteRegistrationId();
String body = Base64.encodeBytes(message.serialize());
@@ -107,7 +112,12 @@ public interface EnvelopeContent {
@Override
public int size() {
return unpaddedMessage.length;
return content.getSerializedSize();
}
@Override
public Optional<Content> getContent() {
return Optional.of(content);
}
}
@@ -152,5 +162,10 @@ public interface EnvelopeContent {
public int size() {
return plaintextContent.getBody().length;
}
@Override
public Optional<Content> getContent() {
return Optional.absent();
}
}
}

View File

@@ -2,8 +2,13 @@ package org.whispersystems.signalservice.api.messages;
import org.whispersystems.libsignal.IdentityKey;
import org.whispersystems.libsignal.util.guava.Optional;
import org.whispersystems.signalservice.api.push.SignalServiceAddress;
import org.whispersystems.signalservice.api.push.exceptions.ProofRequiredException;
import org.whispersystems.signalservice.internal.push.SignalServiceProtos;
import org.whispersystems.signalservice.internal.push.SignalServiceProtos.Content;
import java.util.List;
public class SendMessageResult {
@@ -14,8 +19,8 @@ public class SendMessageResult {
private final IdentityFailure identityFailure;
private final ProofRequiredException proofRequiredFailure;
public static SendMessageResult success(SignalServiceAddress address, boolean unidentified, boolean needsSync, long duration) {
return new SendMessageResult(address, new Success(unidentified, needsSync, duration), false, false, null, null);
public static SendMessageResult success(SignalServiceAddress address, List<Integer> devices, boolean unidentified, boolean needsSync, long duration, Optional<Content> content) {
return new SendMessageResult(address, new Success(unidentified, needsSync, duration, content, devices), false, false, null, null);
}
public static SendMessageResult networkFailure(SignalServiceAddress address) {
@@ -78,14 +83,18 @@ public class SendMessageResult {
}
public static class Success {
private final boolean unidentified;
private final boolean needsSync;
private final long duration;
private final boolean unidentified;
private final boolean needsSync;
private final long duration;
private final Optional<Content> content;
private final List<Integer> devices;
private Success(boolean unidentified, boolean needsSync, long duration) {
private Success(boolean unidentified, boolean needsSync, long duration, Optional<Content> content, List<Integer> devices) {
this.unidentified = unidentified;
this.needsSync = needsSync;
this.duration = duration;
this.content = content;
this.devices = devices;
}
public boolean isUnidentified() {
@@ -99,6 +108,14 @@ public class SendMessageResult {
public long getDuration() {
return duration;
}
public Optional<Content> getContent() {
return content;
}
public List<Integer> getDevices() {
return devices;
}
}
public static class IdentityFailure {

View File

@@ -30,4 +30,8 @@ public class OutgoingPushMessage {
this.destinationRegistrationId = destinationRegistrationId;
this.content = content;
}
public int getDestinationDeviceId() {
return destinationDeviceId;
}
}

View File

@@ -6,9 +6,11 @@
package org.whispersystems.signalservice.internal.push;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.stream.Collectors;
public class OutgoingPushMessageList {
@@ -50,4 +52,9 @@ public class OutgoingPushMessageList {
public boolean isOnline() {
return online;
}
@JsonIgnore
public List<Integer> getDevices() {
return messages.stream().map(OutgoingPushMessage::getDestinationDeviceId).collect(Collectors.toList());
}
}