Optimize uploads during media composition.

By uploading in advance (when on unmetered connections), media messages
can send almost instantly.
This commit is contained in:
Greyson Parrelli
2020-01-08 15:56:51 -05:00
parent 92e97e61c1
commit fadcc606f8
37 changed files with 1413 additions and 452 deletions

View File

@@ -17,8 +17,11 @@
package org.thoughtcrime.securesms.sms;
import android.content.Context;
import android.os.Parcel;
import android.os.Parcelable;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import com.annimon.stream.Stream;
@@ -59,14 +62,16 @@ import org.thoughtcrime.securesms.mms.OutgoingSecureMediaMessage;
import org.thoughtcrime.securesms.recipients.Recipient;
import org.thoughtcrime.securesms.recipients.RecipientId;
import org.thoughtcrime.securesms.service.ExpiringMessageManager;
import org.thoughtcrime.securesms.util.ParcelUtil;
import org.thoughtcrime.securesms.util.TextSecurePreferences;
import org.whispersystems.libsignal.util.guava.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
public class MessageSender {
@@ -118,7 +123,7 @@ public class MessageSender {
Recipient recipient = message.getRecipient();
long messageId = database.insertMessageOutbox(message, allocatedThreadId, forceSms, insertListener);
sendMediaMessage(context, recipient, forceSms, messageId, message.getExpiresIn());
sendMediaMessage(context, recipient, forceSms, messageId, Collections.emptyList());
return allocatedThreadId;
} catch (MmsException e) {
@@ -127,72 +132,97 @@ public class MessageSender {
}
}
public static void sendMediaBroadcast(@NonNull Context context, @NonNull List<OutgoingSecureMediaMessage> messages) {
if (messages.isEmpty()) {
Log.w(TAG, "sendMediaBroadcast() - No messages!");
return;
}
if (!isValidBroadcastList(messages)) {
Log.w(TAG, "sendMediaBroadcast() - Invalid message list!");
return;
}
ThreadDatabase threadDatabase = DatabaseFactory.getThreadDatabase(context);
MmsDatabase mmsDatabase = DatabaseFactory.getMmsDatabase(context);
AttachmentDatabase attachmentDatabase = DatabaseFactory.getAttachmentDatabase(context);
List<List<DatabaseAttachment>> databaseAttachments = new ArrayList<>(messages.get(0).getAttachments().size());
List<Long> messageIds = new ArrayList<>(messages.size());
for (int i = 0; i < messages.get(0).getAttachments().size(); i++) {
databaseAttachments.add(new ArrayList<>(messages.size()));
}
public static long sendPushWithPreUploadedMedia(final Context context,
final OutgoingMediaMessage message,
final Collection<PreUploadResult> preUploadResults,
final long threadId,
final SmsDatabase.InsertListener insertListener)
{
Preconditions.checkArgument(message.getAttachments().isEmpty(), "If the media is pre-uploaded, there should be no attachments on the message.");
try {
try {
mmsDatabase.beginTransaction();
ThreadDatabase threadDatabase = DatabaseFactory.getThreadDatabase(context);
MmsDatabase mmsDatabase = DatabaseFactory.getMmsDatabase(context);
AttachmentDatabase attachmentDatabase = DatabaseFactory.getAttachmentDatabase(context);
for (OutgoingSecureMediaMessage message : messages) {
long allocatedThreadId = threadDatabase.getThreadIdFor(message.getRecipient(), message.getDistributionType());
long messageId = mmsDatabase.insertMessageOutbox(message, allocatedThreadId, false, null);
List<DatabaseAttachment> attachments = attachmentDatabase.getAttachmentsForMessage(messageId);
long allocatedThreadId;
if (attachments.size() != databaseAttachments.size()) {
Log.w(TAG, "Got back an attachment list that was a different size than expected. Expected: " + databaseAttachments.size() + " Actual: "+ attachments.size());
return;
}
for (int i = 0; i < attachments.size(); i++) {
databaseAttachments.get(i).add(attachments.get(i));
if (threadId == -1) {
allocatedThreadId = threadDatabase.getThreadIdFor(message.getRecipient(), message.getDistributionType());
} else {
allocatedThreadId = threadId;
}
Recipient recipient = message.getRecipient();
long messageId = mmsDatabase.insertMessageOutbox(message, allocatedThreadId, false, insertListener);
List<AttachmentId> attachmentIds = Stream.of(preUploadResults).map(PreUploadResult::getAttachmentId).toList();
List<String> jobIds = Stream.of(preUploadResults).map(PreUploadResult::getJobIds).flatMap(Stream::of).toList();
attachmentDatabase.updateMessageId(attachmentIds, messageId);
sendMediaMessage(context, recipient, false, messageId, jobIds);
return allocatedThreadId;
} catch (MmsException e) {
Log.w(TAG, e);
return threadId;
}
}
public static void sendMediaBroadcast(@NonNull Context context, @NonNull List<OutgoingSecureMediaMessage> messages, @NonNull Collection<PreUploadResult> preUploadResults) {
Preconditions.checkArgument(messages.size() > 0, "No messages!");
Preconditions.checkArgument(Stream.of(messages).allMatch(m -> m.getAttachments().isEmpty()), "Messages can't have attachments! They should be pre-uploaded.");
JobManager jobManager = ApplicationDependencies.getJobManager();
AttachmentDatabase attachmentDatabase = DatabaseFactory.getAttachmentDatabase(context);
MmsDatabase mmsDatabase = DatabaseFactory.getMmsDatabase(context);
ThreadDatabase threadDatabase = DatabaseFactory.getThreadDatabase(context);
List<AttachmentId> preUploadAttachmentIds = Stream.of(preUploadResults).map(PreUploadResult::getAttachmentId).toList();
List<String> preUploadJobIds = Stream.of(preUploadResults).map(PreUploadResult::getJobIds).flatMap(Stream::of).toList();
List<Long> messageIds = new ArrayList<>(messages.size());
List<String> messageDependsOnIds = new ArrayList<>(preUploadJobIds);
mmsDatabase.beginTransaction();
try {
OutgoingSecureMediaMessage primaryMessage = messages.get(0);
long primaryThreadId = threadDatabase.getThreadIdFor(primaryMessage.getRecipient(), primaryMessage.getDistributionType());
long primaryMessageId = mmsDatabase.insertMessageOutbox(primaryMessage, primaryThreadId, false, null);
attachmentDatabase.updateMessageId(preUploadAttachmentIds, primaryMessageId);
messageIds.add(primaryMessageId);
if (messages.size() > 0) {
List<OutgoingSecureMediaMessage> secondaryMessages = messages.subList(1, messages.size());
List<List<AttachmentId>> attachmentCopies = new ArrayList<>();
List<DatabaseAttachment> preUploadAttachments = Stream.of(preUploadAttachmentIds)
.map(attachmentDatabase::getAttachment)
.toList();
for (int i = 0; i < preUploadAttachmentIds.size(); i++) {
attachmentCopies.add(new ArrayList<>(messages.size()));
}
for (OutgoingSecureMediaMessage secondaryMessage : secondaryMessages) {
long allocatedThreadId = threadDatabase.getThreadIdFor(secondaryMessage.getRecipient(), secondaryMessage.getDistributionType());
long messageId = mmsDatabase.insertMessageOutbox(secondaryMessage, allocatedThreadId, false, null);
List<AttachmentId> attachmentIds = new ArrayList<>(preUploadAttachmentIds.size());
for (int i = 0; i < preUploadAttachments.size(); i++) {
AttachmentId attachmentId = attachmentDatabase.insertAttachmentForPreUpload(preUploadAttachments.get(i)).getAttachmentId();
attachmentCopies.get(i).add(attachmentId);
attachmentIds.add(attachmentId);
}
attachmentDatabase.updateMessageId(attachmentIds, messageId);
messageIds.add(messageId);
}
mmsDatabase.setTransactionSuccessful();
} finally {
mmsDatabase.endTransaction();
}
List<Job> compressionJobs = new ArrayList<>(databaseAttachments.size());
List<Job> uploadJobs = new ArrayList<>(databaseAttachments.size());
List<Job> copyJobs = new ArrayList<>(databaseAttachments.size());
List<Job> messageJobs = new ArrayList<>(databaseAttachments.get(0).size());
for (List<DatabaseAttachment> attachmentList : databaseAttachments) {
DatabaseAttachment source = attachmentList.get(0);
compressionJobs.add(AttachmentCompressionJob.fromAttachment(source, false, -1));
uploadJobs.add(new AttachmentUploadJob(source.getAttachmentId()));
if (attachmentList.size() > 1) {
AttachmentId sourceId = source.getAttachmentId();
List<AttachmentId> destinationIds = Stream.of(attachmentList.subList(1, attachmentList.size()))
.map(DatabaseAttachment::getAttachmentId)
.toList();
copyJobs.add(new AttachmentCopyJob(sourceId, destinationIds));
for (int i = 0; i < attachmentCopies.size(); i++) {
Job copyJob = new AttachmentCopyJob(preUploadAttachmentIds.get(i), attachmentCopies.get(i));
jobManager.add(copyJob, preUploadJobIds);
messageDependsOnIds.add(copyJob.getId());
}
}
@@ -204,32 +234,47 @@ public class MessageSender {
if (isLocalSelfSend(context, recipient, false)) {
sendLocalMediaSelf(context, messageId);
} else if (isGroupPushSend(recipient)) {
messageJobs.add(new PushGroupSendJob(messageId, recipient.getId(), null));
jobManager.add(new PushGroupSendJob(messageId, recipient.getId(), null), messageDependsOnIds);
} else {
messageJobs.add(new PushMediaSendJob(messageId, recipient));
jobManager.add(new PushMediaSendJob(messageId, recipient), messageDependsOnIds);
}
}
Log.i(TAG, String.format(Locale.ENGLISH, "sendMediaBroadcast() - Uploading %d attachment(s), copying %d of them, then sending %d messages.",
uploadJobs.size(),
copyJobs.size(),
messageJobs.size()));
JobManager.Chain chain = ApplicationDependencies.getJobManager()
.startChain(compressionJobs)
.then(uploadJobs);
if (copyJobs.size() > 0) {
chain = chain.then(copyJobs);
}
chain = chain.then(messageJobs);
chain.enqueue();
mmsDatabase.setTransactionSuccessful();
} catch (MmsException e) {
Log.w(TAG, "sendMediaBroadcast() - Failed to send messages!", e);
Log.w(TAG, "Failed to send messages.", e);
} finally {
mmsDatabase.endTransaction();
}
}
/**
* @return A result if the attachment was enqueued, or null if it failed to enqueue or shouldn't
* be enqueued (like in the case of a local self-send).
*/
public static @Nullable PreUploadResult preUploadPushAttachment(@NonNull Context context, @NonNull Attachment attachment, @Nullable Recipient recipient) {
if (recipient != null && isLocalSelfSend(context, recipient, false)) {
return null;
}
try {
AttachmentDatabase attachmentDatabase = DatabaseFactory.getAttachmentDatabase(context);
DatabaseAttachment databaseAttachment = attachmentDatabase.insertAttachmentForPreUpload(attachment);
Job compressionJob = AttachmentCompressionJob.fromAttachment(databaseAttachment, false, -1);
Job uploadJob = new AttachmentUploadJob(databaseAttachment.getAttachmentId());
ApplicationDependencies.getJobManager()
.startChain(compressionJob)
.then(uploadJob)
.enqueue();
return new PreUploadResult(databaseAttachment.getAttachmentId(), Arrays.asList(compressionJob.getId(), uploadJob.getId()));
} catch (MmsException e) {
Log.w(TAG, "preUploadPushAttachment() - Failed to upload!", e);
return null;
}
}
public static void sendNewReaction(@NonNull Context context, long messageId, boolean isMms, @NonNull String emoji) {
MessagingDatabase db = isMms ? DatabaseFactory.getMmsDatabase(context) : DatabaseFactory.getSmsDatabase(context);
@@ -258,31 +303,30 @@ public class MessageSender {
public static void resendGroupMessage(Context context, MessageRecord messageRecord, RecipientId filterRecipientId) {
if (!messageRecord.isMms()) throw new AssertionError("Not Group");
sendGroupPush(context, messageRecord.getRecipient(), messageRecord.getId(), filterRecipientId);
sendGroupPush(context, messageRecord.getRecipient(), messageRecord.getId(), filterRecipientId, Collections.emptyList());
}
public static void resend(Context context, MessageRecord messageRecord) {
long messageId = messageRecord.getId();
boolean forceSms = messageRecord.isForcedSms();
boolean keyExchange = messageRecord.isKeyExchange();
long expiresIn = messageRecord.getExpiresIn();
Recipient recipient = messageRecord.getRecipient();
if (messageRecord.isMms()) {
sendMediaMessage(context, recipient, forceSms, messageId, expiresIn);
sendMediaMessage(context, recipient, forceSms, messageId, Collections.emptyList());
} else {
sendTextMessage(context, recipient, forceSms, keyExchange, messageId);
}
}
private static void sendMediaMessage(Context context, Recipient recipient, boolean forceSms, long messageId, long expiresIn)
private static void sendMediaMessage(Context context, Recipient recipient, boolean forceSms, long messageId, @NonNull Collection<String> uploadJobIds)
{
if (isLocalSelfSend(context, recipient, forceSms)) {
sendLocalMediaSelf(context, messageId);
} else if (isGroupPushSend(recipient)) {
sendGroupPush(context, recipient, messageId, null);
sendGroupPush(context, recipient, messageId, null, uploadJobIds);
} else if (!forceSms && isPushMediaSend(context, recipient)) {
sendMediaPush(context, recipient, messageId);
sendMediaPush(context, recipient, messageId, uploadJobIds);
} else {
sendMms(context, messageId);
}
@@ -295,25 +339,37 @@ public class MessageSender {
if (isLocalSelfSend(context, recipient, forceSms)) {
sendLocalTextSelf(context, messageId);
} else if (!forceSms && isPushTextSend(context, recipient, keyExchange)) {
sendTextPush(context, recipient, messageId);
sendTextPush(recipient, messageId);
} else {
sendSms(context, recipient, messageId);
}
}
private static void sendTextPush(Context context, Recipient recipient, long messageId) {
private static void sendTextPush(Recipient recipient, long messageId) {
JobManager jobManager = ApplicationDependencies.getJobManager();
jobManager.add(new PushTextSendJob(messageId, recipient));
}
private static void sendMediaPush(Context context, Recipient recipient, long messageId) {
private static void sendMediaPush(Context context, Recipient recipient, long messageId, @NonNull Collection<String> uploadJobIds) {
JobManager jobManager = ApplicationDependencies.getJobManager();
PushMediaSendJob.enqueue(context, jobManager, messageId, recipient);
if (uploadJobIds.size() > 0) {
Job mediaSend = new PushMediaSendJob(messageId, recipient);
jobManager.add(mediaSend, uploadJobIds);
} else {
PushMediaSendJob.enqueue(context, jobManager, messageId, recipient);
}
}
private static void sendGroupPush(Context context, Recipient recipient, long messageId, RecipientId filterRecipientId) {
private static void sendGroupPush(Context context, Recipient recipient, long messageId, RecipientId filterRecipientId, @NonNull Collection<String> uploadJobIds) {
JobManager jobManager = ApplicationDependencies.getJobManager();
PushGroupSendJob.enqueue(context, jobManager, messageId, recipient.getId(), filterRecipientId);
if (uploadJobIds.size() > 0) {
Job groupSend = new PushGroupSendJob(messageId, recipient.getId(), filterRecipientId);
jobManager.add(groupSend, uploadJobIds);
} else {
PushGroupSendJob.enqueue(context, jobManager, messageId, recipient.getId(), filterRecipientId);
}
}
private static void sendSms(Context context, Recipient recipient, long messageId) {
@@ -370,8 +426,9 @@ public class MessageSender {
}
}
private static boolean isLocalSelfSend(@NonNull Context context, @NonNull Recipient recipient, boolean forceSms) {
return recipient.isLocalNumber() &&
public static boolean isLocalSelfSend(@NonNull Context context, @Nullable Recipient recipient, boolean forceSms) {
return recipient != null &&
recipient.isLocalNumber() &&
!forceSms &&
TextSecurePreferences.isPushRegistered(context) &&
!TextSecurePreferences.isMultiDevice(context);
@@ -428,19 +485,49 @@ public class MessageSender {
}
}
private static boolean isValidBroadcastList(@NonNull List<OutgoingSecureMediaMessage> messages) {
if (messages.isEmpty()) {
return false;
public static class PreUploadResult implements Parcelable {
private final AttachmentId attachmentId;
private final Collection<String> jobIds;
PreUploadResult(@NonNull AttachmentId attachmentId, @NonNull Collection<String> jobIds) {
this.attachmentId = attachmentId;
this.jobIds = jobIds;
}
int attachmentSize = messages.get(0).getAttachments().size();
private PreUploadResult(Parcel in) {
this.attachmentId = in.readParcelable(AttachmentId.class.getClassLoader());
this.jobIds = ParcelUtil.readStringCollection(in);
}
for (OutgoingSecureMediaMessage message : messages) {
if (message.getAttachments().size() != attachmentSize) {
return false;
public @NonNull AttachmentId getAttachmentId() {
return attachmentId;
}
public @NonNull Collection<String> getJobIds() {
return jobIds;
}
public static final Creator<PreUploadResult> CREATOR = new Creator<PreUploadResult>() {
@Override
public PreUploadResult createFromParcel(Parcel in) {
return new PreUploadResult(in);
}
@Override
public PreUploadResult[] newArray(int size) {
return new PreUploadResult[size];
}
};
@Override
public int describeContents() {
return 0;
}
return true;
@Override
public void writeToParcel(Parcel dest, int flags) {
dest.writeParcelable(attachmentId, flags);
ParcelUtil.writeStringCollection(dest, jobIds);
}
}
}