Insert resent messages at the proper location.

This commit is contained in:
Greyson Parrelli
2021-06-29 15:13:31 -04:00
committed by Alex Hart
parent 90a27d2227
commit a1c8573fad
20 changed files with 306 additions and 168 deletions

View File

@@ -188,6 +188,10 @@ public final class GroupSendUtil {
int successCount = (int) results.stream().filter(SendMessageResult::isSuccess).count();
Log.d(TAG, "Successfully sent using sender key to " + successCount + "/" + targets.size() + " sender key targets.");
if (sendOperation.shouldIncludeInMessageLog()) {
DatabaseFactory.getMessageLogDatabase(context).insertIfPossible(sendOperation.getSentTimestamp(), senderKeyTargets, results, sendOperation.getContentHint(), sendOperation.getRelatedMessageId(), sendOperation.isRelatedMessageMms());
}
} catch (NoSessionException e) {
Log.w(TAG, "No session. Falling back to legacy sends.", e);
legacyTargets.addAll(senderKeyTargets);
@@ -198,9 +202,6 @@ public final class GroupSendUtil {
}
if (cancelationSignal != null && cancelationSignal.isCanceled()) {
if (sendOperation.shouldIncludeInMessageLog()) {
DatabaseFactory.getMessageLogDatabase(context).insertIfPossible(sendOperation.getSentTimestamp(), allTargets, allResults, sendOperation.getContentHint(), sendOperation.getRelatedMessageId(), sendOperation.isRelatedMessageMms());
}
throw new CancelationException();
}
@@ -217,10 +218,10 @@ public final class GroupSendUtil {
int successCount = (int) results.stream().filter(SendMessageResult::isSuccess).count();
Log.d(TAG, "Successfully using 1:1 to " + successCount + "/" + targets.size() + " legacy targets.");
}
if (sendOperation.shouldIncludeInMessageLog()) {
DatabaseFactory.getMessageLogDatabase(context).insertIfPossible(sendOperation.getSentTimestamp(), allTargets, allResults, sendOperation.getContentHint(), sendOperation.getRelatedMessageId(), sendOperation.isRelatedMessageMms());
if (sendOperation.shouldIncludeInMessageLog()) {
DatabaseFactory.getMessageLogDatabase(context).insertIfPossible(sendOperation.getSentTimestamp(), legacyTargets, results, sendOperation.getContentHint(), sendOperation.getRelatedMessageId(), sendOperation.isRelatedMessageMms());
}
}
return allResults;

View File

@@ -159,8 +159,8 @@ public class IncomingMessageProcessor {
}
private void processReceipt(@NonNull SignalServiceEnvelope envelope) {
Log.i(TAG, "Received server receipt for " + envelope.getTimestamp());
Recipient sender = Recipient.externalHighTrustPush(context, envelope.getSourceAddress());
Log.i(TAG, "Received server receipt. Sender: " + sender.getId() + ", Device: " + envelope.getSourceDevice() + ", Timestamp: " + envelope.getTimestamp());
mmsSmsDatabase.incrementDeliveryReceiptCount(new SyncMessageId(sender.getId(), envelope.getTimestamp()), System.currentTimeMillis());
DatabaseFactory.getMessageLogDatabase(context).deleteEntryForRecipient(envelope.getTimestamp(), sender.getId(), envelope.getSourceDevice());

View File

@@ -45,6 +45,7 @@ import org.thoughtcrime.securesms.database.model.Mention;
import org.thoughtcrime.securesms.database.model.MessageLogEntry;
import org.thoughtcrime.securesms.database.model.MessageRecord;
import org.thoughtcrime.securesms.database.model.MmsMessageRecord;
import org.thoughtcrime.securesms.database.model.PendingRetryReceiptModel;
import org.thoughtcrime.securesms.database.model.ReactionRecord;
import org.thoughtcrime.securesms.database.model.StickerRecord;
import org.thoughtcrime.securesms.database.model.ThreadRecord;
@@ -229,6 +230,10 @@ public final class MessageContentProcessor {
return;
}
RecipientId senderId = RecipientId.fromHighTrust(content.getSender());
PendingRetryReceiptModel pending = DatabaseFactory.getPendingRetryReceiptDatabase(context).get(senderId, content.getTimestamp());
long receivedTime = handlePendingRetry(pending, content);
log(String.valueOf(content.getTimestamp()), "Beginning message processing.");
if (content.getSenderKeyDistributionMessage().isPresent()) {
@@ -249,13 +254,13 @@ public final class MessageContentProcessor {
if (isInvalidMessage(message)) handleInvalidMessage(content.getSender(), content.getSenderDevice(), groupId, content.getTimestamp(), smsMessageId);
else if (message.isEndSession()) handleEndSessionMessage(content, smsMessageId);
else if (message.isGroupV1Update()) handleGroupV1Message(content, message, smsMessageId, groupId.get().requireV1());
else if (message.isExpirationUpdate()) handleExpirationUpdate(content, message, smsMessageId, groupId);
else if (message.isGroupV1Update()) handleGroupV1Message(content, message, smsMessageId, groupId.get().requireV1(), receivedTime);
else if (message.isExpirationUpdate()) handleExpirationUpdate(content, message, smsMessageId, groupId, receivedTime);
else if (message.getReaction().isPresent()) handleReaction(content, message);
else if (message.getRemoteDelete().isPresent()) handleRemoteDelete(content, message);
else if (message.getPayment().isPresent()) handlePayment(content, message);
else if (isMediaMessage) handleMediaMessage(content, message, smsMessageId);
else if (message.getBody().isPresent()) handleTextMessage(content, message, smsMessageId, groupId);
else if (isMediaMessage) handleMediaMessage(content, message, smsMessageId, receivedTime);
else if (message.getBody().isPresent()) handleTextMessage(content, message, smsMessageId, groupId, receivedTime);
else if (Build.VERSION.SDK_INT > 19 && message.getGroupCallUpdate().isPresent()) handleGroupCallUpdateMessage(content, message, groupId);
if (groupId.isPresent() && groupDatabase.isUnknownGroup(groupId.get())) {
@@ -332,11 +337,18 @@ public final class MessageContentProcessor {
handleTypingMessage(content, content.getTypingMessage().get());
} else if (content.getDecryptionErrorMessage().isPresent()) {
handleRetryReceipt(content, content.getDecryptionErrorMessage().get());
} else if (content.getSenderKeyDistributionMessage().isPresent()) {
// Already handled, here in order to prevent unrecognized message log
} else {
warn(String.valueOf(content.getTimestamp()), "Got unrecognized message!");
}
resetRecipientToPush(Recipient.externalPush(context, content.getSender()));
if (pending != null) {
warn(content.getTimestamp(), "Pending retry was processed. Deleting.");
DatabaseFactory.getPendingRetryReceiptDatabase(context).delete(pending.getId());
}
} catch (StorageFailedException e) {
warn(String.valueOf(content.getTimestamp()), e);
handleCorruptMessage(e.getSender(), e.getSenderDevice(), timestamp, smsMessageId);
@@ -345,6 +357,33 @@ public final class MessageContentProcessor {
}
}
private long handlePendingRetry(PendingRetryReceiptModel pending, SignalServiceContent content) throws BadGroupIdException {
long receivedTime = System.currentTimeMillis();
if (pending != null) {
warn(content.getTimestamp(), "Incoming message matches a pending retry we were expecting.");
Recipient destination = getMessageDestination(content);
Long threadId = DatabaseFactory.getThreadDatabase(context).getThreadIdFor(destination.getId());
if (threadId != null) {
ThreadDatabase.ConversationMetadata metadata = DatabaseFactory.getThreadDatabase(context).getConversationMetadata(threadId);
long visibleThread = ApplicationDependencies.getMessageNotifier().getVisibleThread();
if (threadId != visibleThread && metadata.getLastSeen() > 0 && metadata.getLastSeen() < pending.getReceivedTimestamp()) {
receivedTime = pending.getReceivedTimestamp();
warn(content.getTimestamp(), "Thread has not been opened yet. Using received timestamp of " + receivedTime);
} else {
warn(content.getTimestamp(), "Thread was opened after receiving the original message. Using the current time for received time. (Last seen: " + metadata.getLastSeen() + ", ThreadVisible: " + (threadId == visibleThread) + ")");
}
} else {
warn(content.getTimestamp(), "Could not find a thread for the pending message. Using current time for received time.");
}
}
return receivedTime;
}
private void handlePayment(@NonNull SignalServiceContent content, @NonNull SignalServiceDataMessage message) {
if (!message.getPayment().isPresent()) {
throw new AssertionError();
@@ -611,6 +650,7 @@ public final class MessageContentProcessor {
content.getSenderDevice(),
content.getTimestamp(),
content.getServerReceivedTimestamp(),
System.currentTimeMillis(),
"",
Optional.absent(),
0,
@@ -667,13 +707,14 @@ public final class MessageContentProcessor {
private void handleGroupV1Message(@NonNull SignalServiceContent content,
@NonNull SignalServiceDataMessage message,
@NonNull Optional<Long> smsMessageId,
@NonNull GroupId.V1 groupId)
@NonNull GroupId.V1 groupId,
long receivedTime)
throws StorageFailedException, BadGroupIdException
{
GroupV1MessageProcessor.process(context, content, message, false);
if (message.getExpiresInSeconds() != 0 && message.getExpiresInSeconds() != getMessageDestination(content, message).getExpireMessages()) {
handleExpirationUpdate(content, message, Optional.absent(), Optional.of(groupId));
handleExpirationUpdate(content, message, Optional.absent(), Optional.of(groupId), receivedTime);
}
if (smsMessageId.isPresent()) {
@@ -703,7 +744,8 @@ public final class MessageContentProcessor {
private void handleExpirationUpdate(@NonNull SignalServiceContent content,
@NonNull SignalServiceDataMessage message,
@NonNull Optional<Long> smsMessageId,
@NonNull Optional<GroupId> groupId)
@NonNull Optional<GroupId> groupId,
long receivedTime)
throws StorageFailedException, BadGroupIdException
{
if (groupId.isPresent() && groupId.get().isV2()) {
@@ -726,6 +768,7 @@ public final class MessageContentProcessor {
IncomingMediaMessage mediaMessage = new IncomingMediaMessage(sender.getId(),
content.getTimestamp(),
content.getServerReceivedTimestamp(),
receivedTime,
-1,
expiresInSeconds * 1000L,
true,
@@ -1150,7 +1193,8 @@ public final class MessageContentProcessor {
private void handleMediaMessage(@NonNull SignalServiceContent content,
@NonNull SignalServiceDataMessage message,
@NonNull Optional<Long> smsMessageId)
@NonNull Optional<Long> smsMessageId,
long receivedTime)
throws StorageFailedException, BadGroupIdException
{
notifyTypingStoppedFromIncomingMessage(getMessageDestination(content, message), content.getSender(), content.getSenderDevice());
@@ -1170,6 +1214,7 @@ public final class MessageContentProcessor {
IncomingMediaMessage mediaMessage = new IncomingMediaMessage(RecipientId.fromHighTrust(content.getSender()),
message.getTimestamp(),
content.getServerReceivedTimestamp(),
receivedTime,
-1,
message.getExpiresInSeconds() * 1000L,
false,
@@ -1373,7 +1418,8 @@ public final class MessageContentProcessor {
private void handleTextMessage(@NonNull SignalServiceContent content,
@NonNull SignalServiceDataMessage message,
@NonNull Optional<Long> smsMessageId,
@NonNull Optional<GroupId> groupId)
@NonNull Optional<GroupId> groupId,
long receivedTime)
throws StorageFailedException, BadGroupIdException
{
MessageDatabase database = DatabaseFactory.getSmsDatabase(context);
@@ -1381,7 +1427,7 @@ public final class MessageContentProcessor {
Recipient recipient = getMessageDestination(content, message);
if (message.getExpiresInSeconds() != recipient.getExpireMessages()) {
handleExpirationUpdate(content, message, Optional.absent(), groupId);
handleExpirationUpdate(content, message, Optional.absent(), groupId, receivedTime);
}
Long threadId;
@@ -1395,6 +1441,7 @@ public final class MessageContentProcessor {
content.getSenderDevice(),
message.getTimestamp(),
content.getServerReceivedTimestamp(),
receivedTime,
body,
groupId,
message.getExpiresInSeconds() * 1000L,
@@ -1603,12 +1650,13 @@ public final class MessageContentProcessor {
return;
}
log("Processing viewed reciepts for IDs: " + Util.join(message.getTimestamps(), ","));
Recipient sender = Recipient.externalHighTrustPush(context, content.getSender());
log(TAG, "Processing viewed receipts. Sender: " + sender.getId() + ", Device: " + content.getSenderDevice() + ", Timestamps: " + Util.join(message.getTimestamps(), ", "));
List<SyncMessageId> ids = Stream.of(message.getTimestamps())
.map(t -> new SyncMessageId(sender.getId(), t))
.toList();
Recipient sender = Recipient.externalHighTrustPush(context, content.getSender());
List<SyncMessageId> ids = Stream.of(message.getTimestamps())
.map(t -> new SyncMessageId(sender.getId(), t))
.toList();
Collection<SyncMessageId> unhandled = DatabaseFactory.getMmsSmsDatabase(context)
.incrementViewedReceiptCounts(ids, content.getTimestamp());
@@ -1622,12 +1670,12 @@ public final class MessageContentProcessor {
private void handleDeliveryReceipt(@NonNull SignalServiceContent content,
@NonNull SignalServiceReceiptMessage message)
{
log(TAG, "Processing delivery receipts for IDs: " + Util.join(message.getTimestamps(), ", "));
Recipient sender = Recipient.externalHighTrustPush(context, content.getSender());
log(TAG, "Processing delivery receipts. Sender: " + sender.getId() + ", Device: " + content.getSenderDevice() + ", Timestamps: " + Util.join(message.getTimestamps(), ", "));
Recipient sender = Recipient.externalHighTrustPush(context, content.getSender());
List<SyncMessageId> ids = Stream.of(message.getTimestamps())
.map(t -> new SyncMessageId(sender.getId(), t))
.toList();
List<SyncMessageId> ids = Stream.of(message.getTimestamps())
.map(t -> new SyncMessageId(sender.getId(), t))
.toList();
DatabaseFactory.getMmsSmsDatabase(context).incrementDeliveryReceiptCounts(ids, System.currentTimeMillis());
DatabaseFactory.getMessageLogDatabase(context).deleteEntriesForRecipient(message.getTimestamps(), sender.getId(), content.getSenderDevice());
@@ -1642,12 +1690,12 @@ public final class MessageContentProcessor {
return;
}
log("Processing read receipts for IDs: " + Util.join(message.getTimestamps(), ", "));
Recipient sender = Recipient.externalHighTrustPush(context, content.getSender());
log(TAG, "Processing read receipts. Sender: " + sender.getId() + ", Device: " + content.getSenderDevice() + ", Timestamps: " + Util.join(message.getTimestamps(), ", "));
Recipient sender = Recipient.externalHighTrustPush(context, content.getSender());
List<SyncMessageId> ids = Stream.of(message.getTimestamps())
.map(t -> new SyncMessageId(sender.getId(), t))
.toList();
List<SyncMessageId> ids = Stream.of(message.getTimestamps())
.map(t -> new SyncMessageId(sender.getId(), t))
.toList();
Collection<SyncMessageId> unhandled = DatabaseFactory.getMmsSmsDatabase(context).incrementReadReceiptCounts(ids, content.getTimestamp());
@@ -2016,7 +2064,7 @@ public final class MessageContentProcessor {
private Optional<InsertResult> insertPlaceholder(@NonNull String sender, int senderDevice, long timestamp, Optional<GroupId> groupId) {
MessageDatabase database = DatabaseFactory.getSmsDatabase(context);
IncomingTextMessage textMessage = new IncomingTextMessage(Recipient.external(context, sender).getId(),
senderDevice, timestamp, -1, "",
senderDevice, timestamp, -1, System.currentTimeMillis(), "",
groupId, 0, false, null);
textMessage = new IncomingEncryptedMessage(textMessage, "");
@@ -2029,6 +2077,11 @@ public final class MessageContentProcessor {
return getGroupRecipient(message.getMessage().getGroupContext()).or(() -> Recipient.externalPush(context, message.getDestination().get()));
}
private Recipient getMessageDestination(@NonNull SignalServiceContent content) throws BadGroupIdException {
SignalServiceDataMessage message = content.getDataMessage().orNull();
return getGroupRecipient(message != null ? message.getGroupContext() : Optional.absent()).or(() -> Recipient.externalHighTrustPush(context, content.getSender()));
}
private Recipient getMessageDestination(@NonNull SignalServiceContent content,
@NonNull SignalServiceDataMessage message)
throws BadGroupIdException
@@ -2056,7 +2109,7 @@ public final class MessageContentProcessor {
Recipient author = Recipient.externalPush(context, sender);
long threadId = DatabaseFactory.getThreadDatabase(context).getThreadIdFor(conversationRecipient);
if (threadId > 0) {
if (threadId > 0 && TextSecurePreferences.isTypingIndicatorsEnabled(context)) {
Log.d(TAG, "Typing stopped on thread " + threadId + " due to an incoming message.");
ApplicationDependencies.getTypingStatusRepository().onTypingStopped(context, threadId, author, device, true);
}