Retrieve profiles in parallel.

This commit is contained in:
Greyson Parrelli
2020-06-08 19:04:55 -04:00
parent 2822042eeb
commit 2751fd7efc
24 changed files with 639 additions and 270 deletions

View File

@@ -123,16 +123,17 @@ public class IdentityUtil {
long time = System.currentTimeMillis();
SmsDatabase smsDatabase = DatabaseFactory.getSmsDatabase(context);
GroupDatabase groupDatabase = DatabaseFactory.getGroupDatabase(context);
GroupDatabase.Reader reader = groupDatabase.getGroups();
GroupDatabase.GroupRecord groupRecord;
try (GroupDatabase.Reader reader = groupDatabase.getGroups()) {
GroupDatabase.GroupRecord groupRecord;
while ((groupRecord = reader.getNext()) != null) {
if (groupRecord.getMembers().contains(recipientId) && groupRecord.isActive()) {
IncomingTextMessage incoming = new IncomingTextMessage(recipientId, 1, time, time, null, Optional.of(groupRecord.getId()), 0, false);
IncomingIdentityUpdateMessage groupUpdate = new IncomingIdentityUpdateMessage(incoming);
while ((groupRecord = reader.getNext()) != null) {
if (groupRecord.getMembers().contains(recipientId) && groupRecord.isActive()) {
IncomingTextMessage incoming = new IncomingTextMessage(recipientId, 1, time, time, null, Optional.of(groupRecord.getId()), 0, false);
IncomingIdentityUpdateMessage groupUpdate = new IncomingIdentityUpdateMessage(incoming);
smsDatabase.insertMessageInbox(groupUpdate);
smsDatabase.insertMessageInbox(groupUpdate);
}
}
}

View File

@@ -26,8 +26,16 @@ import org.whispersystems.signalservice.api.profiles.ProfileAndCredential;
import org.whispersystems.signalservice.api.profiles.SignalServiceProfile;
import org.whispersystems.signalservice.api.push.SignalServiceAddress;
import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResponseCodeException;
import org.whispersystems.signalservice.api.push.exceptions.NotFoundException;
import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException;
import org.whispersystems.signalservice.internal.util.concurrent.CascadingFuture;
import org.whispersystems.signalservice.internal.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Aids in the retrieval and decryption of profiles.
@@ -37,31 +45,44 @@ public final class ProfileUtil {
private ProfileUtil() {
}
private static final String TAG = Log.tag(ProfileUtil.class);
@WorkerThread
public static @NonNull ProfileAndCredential retrieveProfile(@NonNull Context context,
@NonNull Recipient recipient,
@NonNull SignalServiceProfile.RequestType requestType)
throws IOException
public static @NonNull ProfileAndCredential retrieveProfileSync(@NonNull Context context,
@NonNull Recipient recipient,
@NonNull SignalServiceProfile.RequestType requestType)
throws IOException
{
try {
return retrieveProfile(context, recipient, requestType).get(10, TimeUnit.SECONDS);
} catch (ExecutionException e) {
if (e.getCause() instanceof PushNetworkException) {
throw (PushNetworkException) e.getCause();
} else {
throw new IOException(e);
}
} catch (InterruptedException | TimeoutException e) {
throw new PushNetworkException(e);
}
}
public static @NonNull ListenableFuture<ProfileAndCredential> retrieveProfile(@NonNull Context context,
@NonNull Recipient recipient,
@NonNull SignalServiceProfile.RequestType requestType)
{
SignalServiceAddress address = RecipientUtil.toSignalServiceAddress(context, recipient);
Optional<UnidentifiedAccess> unidentifiedAccess = getUnidentifiedAccess(context, recipient);
Optional<ProfileKey> profileKey = ProfileKeyUtil.profileKeyOptional(recipient.getProfileKey());
ProfileAndCredential profile;
try {
profile = retrieveProfileInternal(address, profileKey, unidentifiedAccess, requestType);
} catch (NonSuccessfulResponseCodeException e) {
if (unidentifiedAccess.isPresent()) {
profile = retrieveProfileInternal(address, profileKey, Optional.absent(), requestType);
} else {
throw e;
}
if (unidentifiedAccess.isPresent()) {
return new CascadingFuture<>(Arrays.asList(() -> getPipeRetrievalFuture(address, profileKey, unidentifiedAccess, requestType),
() -> getSocketRetrievalFuture(address, profileKey, unidentifiedAccess, requestType),
() -> getPipeRetrievalFuture(address, profileKey, Optional.absent(), requestType),
() -> getSocketRetrievalFuture(address, profileKey, Optional.absent(), requestType)),
e -> !(e instanceof NotFoundException));
} else {
return new CascadingFuture<>(Arrays.asList(() -> getPipeRetrievalFuture(address, profileKey, Optional.absent(), requestType),
() -> getSocketRetrievalFuture(address, profileKey, Optional.absent(), requestType)),
e -> !(e instanceof NotFoundException));
}
return profile;
}
public static @Nullable String decryptName(@NonNull ProfileKey profileKey, @Nullable String encryptedName)
@@ -75,32 +96,30 @@ public final class ProfileUtil {
return new String(profileCipher.decryptName(Base64.decode(encryptedName)));
}
@WorkerThread
private static @NonNull ProfileAndCredential retrieveProfileInternal(@NonNull SignalServiceAddress address,
@NonNull Optional<ProfileKey> profileKey,
@NonNull Optional<UnidentifiedAccess> unidentifiedAccess,
@NonNull SignalServiceProfile.RequestType requestType)
throws IOException
private static @NonNull ListenableFuture<ProfileAndCredential> getPipeRetrievalFuture(@NonNull SignalServiceAddress address,
@NonNull Optional<ProfileKey> profileKey,
@NonNull Optional<UnidentifiedAccess> unidentifiedAccess,
@NonNull SignalServiceProfile.RequestType requestType)
throws IOException
{
SignalServiceMessagePipe authPipe = IncomingMessageObserver.getPipe();
SignalServiceMessagePipe unidentifiedPipe = IncomingMessageObserver.getUnidentifiedPipe();
SignalServiceMessagePipe pipe = unidentifiedPipe != null && unidentifiedAccess.isPresent() ? unidentifiedPipe
: authPipe;
if (pipe != null) {
try {
return pipe.getProfile(address, profileKey, unidentifiedAccess, requestType);
} catch (IOException e) {
Log.w(TAG, "Websocket request failed. Falling back to REST.", e);
}
return pipe.getProfile(address, profileKey, unidentifiedAccess, requestType);
}
throw new IOException("No pipe available!");
}
private static @NonNull ListenableFuture<ProfileAndCredential> getSocketRetrievalFuture(@NonNull SignalServiceAddress address,
@NonNull Optional<ProfileKey> profileKey,
@NonNull Optional<UnidentifiedAccess> unidentifiedAccess,
@NonNull SignalServiceProfile.RequestType requestType)
{
SignalServiceMessageReceiver receiver = ApplicationDependencies.getSignalServiceMessageReceiver();
try {
return receiver.retrieveProfile(address, profileKey, unidentifiedAccess, requestType);
} catch (VerificationFailedException e) {
throw new IOException("Verification Problem", e);
}
return receiver.retrieveProfile(address, profileKey, unidentifiedAccess, requestType);
}
private static Optional<UnidentifiedAccess> getUnidentifiedAccess(@NonNull Context context, @NonNull Recipient recipient) {