mirror of
https://github.com/signalapp/Signal-Android.git
synced 2025-12-23 20:48:43 +00:00
Convert RetrieveProfileJob to kotlin.
This commit is contained in:
committed by
Clark Chen
parent
ca9a629804
commit
f10418face
@@ -236,7 +236,7 @@ public class ApplicationContext extends MultiDexApplication implements AppForegr
|
||||
|
||||
SignalExecutors.BOUNDED.execute(() -> {
|
||||
FeatureFlags.refreshIfNecessary();
|
||||
RetrieveProfileJob.enqueueRoutineFetchIfNecessary(this);
|
||||
RetrieveProfileJob.enqueueRoutineFetchIfNecessary();
|
||||
executePendingContactSync();
|
||||
KeyCachingService.onAppForegrounded(this);
|
||||
ApplicationDependencies.getShakeToReport().enable();
|
||||
|
||||
@@ -3473,19 +3473,12 @@ open class RecipientTable(context: Context, databaseHelper: SignalDatabase) : Da
|
||||
}
|
||||
|
||||
fun markProfilesFetched(ids: Collection<RecipientId>, time: Long) {
|
||||
val db = writableDatabase
|
||||
db.beginTransaction()
|
||||
try {
|
||||
val values = ContentValues(1).apply {
|
||||
put(LAST_PROFILE_FETCH, time)
|
||||
}
|
||||
writableDatabase.withinTransaction { db ->
|
||||
val values = contentValuesOf(LAST_PROFILE_FETCH to time)
|
||||
|
||||
for (id in ids) {
|
||||
db.update(TABLE_NAME, values, ID_WHERE, arrayOf(id.serialize()))
|
||||
SqlUtil.buildCollectionQuery(ID, ids).forEach { query ->
|
||||
db.update(TABLE_NAME, values, query.where, query.whereArgs)
|
||||
}
|
||||
db.setTransactionSuccessful()
|
||||
} finally {
|
||||
db.endTransaction()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,666 +0,0 @@
|
||||
package org.thoughtcrime.securesms.jobs;
|
||||
|
||||
import android.app.Application;
|
||||
import android.content.Context;
|
||||
import android.text.TextUtils;
|
||||
|
||||
import androidx.annotation.NonNull;
|
||||
import androidx.annotation.Nullable;
|
||||
import androidx.annotation.WorkerThread;
|
||||
|
||||
import com.annimon.stream.Collectors;
|
||||
import com.annimon.stream.Stream;
|
||||
import com.annimon.stream.function.Predicate;
|
||||
|
||||
import org.signal.core.util.Base64;
|
||||
import org.signal.core.util.ListUtil;
|
||||
import org.signal.core.util.SetUtil;
|
||||
import org.signal.core.util.Stopwatch;
|
||||
import org.signal.core.util.concurrent.SignalExecutors;
|
||||
import org.signal.core.util.logging.Log;
|
||||
import org.signal.libsignal.protocol.IdentityKey;
|
||||
import org.signal.libsignal.protocol.InvalidKeyException;
|
||||
import org.signal.libsignal.protocol.util.Pair;
|
||||
import org.signal.libsignal.zkgroup.InvalidInputException;
|
||||
import org.signal.libsignal.zkgroup.profiles.ExpiringProfileKeyCredential;
|
||||
import org.signal.libsignal.zkgroup.profiles.ProfileKey;
|
||||
import org.thoughtcrime.securesms.badges.Badges;
|
||||
import org.thoughtcrime.securesms.badges.models.Badge;
|
||||
import org.thoughtcrime.securesms.crypto.ProfileKeyUtil;
|
||||
import org.thoughtcrime.securesms.database.GroupTable;
|
||||
import org.thoughtcrime.securesms.database.RecipientTable;
|
||||
import org.thoughtcrime.securesms.database.RecipientTable.PhoneNumberSharingState;
|
||||
import org.thoughtcrime.securesms.database.RecipientTable.UnidentifiedAccessMode;
|
||||
import org.thoughtcrime.securesms.database.SignalDatabase;
|
||||
import org.thoughtcrime.securesms.database.model.RecipientRecord;
|
||||
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
|
||||
import org.thoughtcrime.securesms.jobmanager.Job;
|
||||
import org.thoughtcrime.securesms.jobmanager.JobManager;
|
||||
import org.thoughtcrime.securesms.jobmanager.JsonJobData;
|
||||
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
|
||||
import org.thoughtcrime.securesms.keyvalue.SignalStore;
|
||||
import org.thoughtcrime.securesms.notifications.v2.ConversationId;
|
||||
import org.thoughtcrime.securesms.profiles.ProfileName;
|
||||
import org.thoughtcrime.securesms.recipients.Recipient;
|
||||
import org.thoughtcrime.securesms.recipients.RecipientId;
|
||||
import org.thoughtcrime.securesms.recipients.RecipientUtil;
|
||||
import org.thoughtcrime.securesms.transport.RetryLaterException;
|
||||
import org.thoughtcrime.securesms.util.IdentityUtil;
|
||||
import org.thoughtcrime.securesms.util.ProfileUtil;
|
||||
import org.thoughtcrime.securesms.util.Util;
|
||||
import org.whispersystems.signalservice.api.crypto.InvalidCiphertextException;
|
||||
import org.whispersystems.signalservice.api.crypto.ProfileCipher;
|
||||
import org.whispersystems.signalservice.api.profiles.ProfileAndCredential;
|
||||
import org.whispersystems.signalservice.api.profiles.SignalServiceProfile;
|
||||
import org.whispersystems.signalservice.api.services.ProfileService;
|
||||
import org.whispersystems.signalservice.api.util.ExpiringProfileCredentialUtil;
|
||||
import org.whispersystems.signalservice.internal.ServiceResponse;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.reactivex.rxjava3.core.Observable;
|
||||
import io.reactivex.rxjava3.schedulers.Schedulers;
|
||||
|
||||
/**
|
||||
* Retrieves a users profile and sets the appropriate local fields.
|
||||
*/
|
||||
public class RetrieveProfileJob extends BaseJob {
|
||||
|
||||
public static final String KEY = "RetrieveProfileJob";
|
||||
|
||||
private static final String TAG = Log.tag(RetrieveProfileJob.class);
|
||||
|
||||
private static final String KEY_RECIPIENTS = "recipients";
|
||||
private static final String DEDUPE_KEY_RETRIEVE_AVATAR = KEY + "_RETRIEVE_PROFILE_AVATAR";
|
||||
|
||||
private final Set<RecipientId> recipientIds;
|
||||
|
||||
/**
|
||||
* Identical to {@link #enqueue(Set)})}, but run on a background thread for convenience.
|
||||
*/
|
||||
public static void enqueueAsync(@NonNull RecipientId recipientId) {
|
||||
SignalExecutors.BOUNDED_IO.execute(() -> ApplicationDependencies.getJobManager().add(forRecipient(recipientId)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Submits the necessary job to refresh the profile of the requested recipient. Works for any
|
||||
* RecipientId, including individuals, groups, or yourself.
|
||||
* <p>
|
||||
* Identical to {@link #enqueue(Set)})}
|
||||
*/
|
||||
@WorkerThread
|
||||
public static void enqueue(@NonNull RecipientId recipientId) {
|
||||
ApplicationDependencies.getJobManager().add(forRecipient(recipientId));
|
||||
}
|
||||
|
||||
/**
|
||||
* Submits the necessary jobs to refresh the profiles of the requested recipients. Works for any
|
||||
* RecipientIds, including individuals, groups, or yourself.
|
||||
*/
|
||||
@WorkerThread
|
||||
public static void enqueue(@NonNull Set<RecipientId> recipientIds) {
|
||||
JobManager jobManager = ApplicationDependencies.getJobManager();
|
||||
|
||||
for (Job job : forRecipients(recipientIds)) {
|
||||
jobManager.add(job);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Works for any RecipientId, whether it's an individual, group, or yourself.
|
||||
*/
|
||||
@WorkerThread
|
||||
public static @NonNull Job forRecipient(@NonNull RecipientId recipientId) {
|
||||
Recipient recipient = Recipient.resolved(recipientId);
|
||||
|
||||
if (recipient.isSelf()) {
|
||||
return new RefreshOwnProfileJob();
|
||||
} else if (recipient.isGroup()) {
|
||||
List<RecipientId> recipients = SignalDatabase.groups().getGroupMemberIds(recipient.requireGroupId(), GroupTable.MemberSet.FULL_MEMBERS_EXCLUDING_SELF);
|
||||
|
||||
return new RetrieveProfileJob(new HashSet<>(recipients));
|
||||
} else {
|
||||
return new RetrieveProfileJob(Collections.singleton(recipientId));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Works for any RecipientId, whether it's an individual, group, or yourself.
|
||||
*
|
||||
* @return A list of length 2 or less. Two iff you are in the recipients.
|
||||
*/
|
||||
@WorkerThread
|
||||
public static @NonNull List<Job> forRecipients(@NonNull Set<RecipientId> recipientIds) {
|
||||
Context context = ApplicationDependencies.getApplication();
|
||||
Set<RecipientId> combined = new HashSet<>(recipientIds.size());
|
||||
boolean includeSelf = false;
|
||||
|
||||
for (RecipientId recipientId : recipientIds) {
|
||||
Recipient recipient = Recipient.resolved(recipientId);
|
||||
|
||||
if (recipient.isSelf()) {
|
||||
includeSelf = true;
|
||||
} else if (recipient.isGroup()) {
|
||||
List<Recipient> recipients = SignalDatabase.groups().getGroupMembers(recipient.requireGroupId(), GroupTable.MemberSet.FULL_MEMBERS_EXCLUDING_SELF);
|
||||
combined.addAll(Stream.of(recipients).map(Recipient::getId).toList());
|
||||
} else {
|
||||
combined.add(recipientId);
|
||||
}
|
||||
}
|
||||
|
||||
List<Job> jobs = new ArrayList<>(2);
|
||||
|
||||
if (includeSelf) {
|
||||
jobs.add(new RefreshOwnProfileJob());
|
||||
}
|
||||
|
||||
if (combined.size() > 0) {
|
||||
jobs.add(new RetrieveProfileJob(combined));
|
||||
}
|
||||
|
||||
return jobs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Will fetch some profiles to ensure we're decently up-to-date if we haven't done so within a
|
||||
* certain time period.
|
||||
*/
|
||||
public static void enqueueRoutineFetchIfNecessary(Application application) {
|
||||
if (!SignalStore.registrationValues().isRegistrationComplete() ||
|
||||
!SignalStore.account().isRegistered() ||
|
||||
SignalStore.account().getAci() == null)
|
||||
{
|
||||
Log.i(TAG, "Registration not complete. Skipping.");
|
||||
return;
|
||||
}
|
||||
|
||||
long timeSinceRefresh = System.currentTimeMillis() - SignalStore.misc().getLastProfileRefreshTime();
|
||||
if (timeSinceRefresh < TimeUnit.HOURS.toMillis(12)) {
|
||||
Log.i(TAG, "Too soon to refresh. Did the last refresh " + timeSinceRefresh + " ms ago.");
|
||||
return;
|
||||
}
|
||||
|
||||
SignalExecutors.BOUNDED.execute(() -> {
|
||||
RecipientTable db = SignalDatabase.recipients();
|
||||
long current = System.currentTimeMillis();
|
||||
|
||||
List<RecipientId> ids = db.getRecipientsForRoutineProfileFetch(current - TimeUnit.DAYS.toMillis(30),
|
||||
current - TimeUnit.DAYS.toMillis(1),
|
||||
50);
|
||||
|
||||
ids.add(Recipient.self().getId());
|
||||
|
||||
if (ids.size() > 0) {
|
||||
Log.i(TAG, "Optimistically refreshing " + ids.size() + " eligible recipient(s).");
|
||||
enqueue(new HashSet<>(ids));
|
||||
} else {
|
||||
Log.i(TAG, "No recipients to refresh.");
|
||||
}
|
||||
|
||||
SignalStore.misc().setLastProfileRefreshTime(System.currentTimeMillis());
|
||||
});
|
||||
}
|
||||
|
||||
public RetrieveProfileJob(@NonNull Set<RecipientId> recipientIds) {
|
||||
this(new Job.Parameters.Builder().addConstraint(NetworkConstraint.KEY)
|
||||
.setMaxAttempts(3)
|
||||
.build(),
|
||||
recipientIds);
|
||||
}
|
||||
|
||||
private RetrieveProfileJob(@NonNull Job.Parameters parameters, @NonNull Set<RecipientId> recipientIds) {
|
||||
super(parameters);
|
||||
this.recipientIds = recipientIds;
|
||||
}
|
||||
|
||||
@Override
|
||||
public @Nullable byte[] serialize() {
|
||||
return new JsonJobData.Builder().putStringListAsArray(KEY_RECIPIENTS, Stream.of(recipientIds)
|
||||
.map(RecipientId::serialize)
|
||||
.toList())
|
||||
.serialize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public @NonNull String getFactoryKey() {
|
||||
return KEY;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean shouldTrace() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRun() throws IOException, RetryLaterException {
|
||||
if (!SignalStore.account().isRegistered()) {
|
||||
Log.w(TAG, "Unregistered. Skipping.");
|
||||
return;
|
||||
}
|
||||
|
||||
Stopwatch stopwatch = new Stopwatch("RetrieveProfile");
|
||||
RecipientTable recipientTable = SignalDatabase.recipients();
|
||||
|
||||
RecipientUtil.ensureUuidsAreAvailable(context, Stream.of(Recipient.resolvedList(recipientIds))
|
||||
.filter(r -> r.getRegistered() != RecipientTable.RegisteredState.NOT_REGISTERED)
|
||||
.toList());
|
||||
|
||||
List<Recipient> recipients = Recipient.resolvedList(recipientIds);
|
||||
stopwatch.split("resolve-ensure");
|
||||
|
||||
List<Observable<Pair<Recipient, ServiceResponse<ProfileAndCredential>>>> requests = Stream.of(recipients)
|
||||
.filter(Recipient::hasServiceId)
|
||||
.map(r -> ProfileUtil.retrieveProfile(context, r, getRequestType(r)).toObservable())
|
||||
.toList();
|
||||
stopwatch.split("requests");
|
||||
|
||||
OperationState operationState = Observable.mergeDelayError(requests, 16, 1)
|
||||
.observeOn(Schedulers.io(), true)
|
||||
.scan(new OperationState(), (state, pair) -> {
|
||||
Recipient recipient = pair.first();
|
||||
ProfileService.ProfileResponseProcessor processor = new ProfileService.ProfileResponseProcessor(pair.second());
|
||||
if (processor.hasResult()) {
|
||||
state.profiles.add(processor.getResult(recipient));
|
||||
} else if (processor.notFound()) {
|
||||
Log.w(TAG, "Failed to find a profile for " + recipient.getId());
|
||||
if (recipient.isRegistered()) {
|
||||
state.unregistered.add(recipient.getId());
|
||||
}
|
||||
} else if (processor.genericIoError()) {
|
||||
state.retries.add(recipient.getId());
|
||||
} else {
|
||||
Log.w(TAG, "Failed to retrieve profile for " + recipient.getId(), processor.getError());
|
||||
}
|
||||
return state;
|
||||
})
|
||||
.lastOrError()
|
||||
.blockingGet();
|
||||
|
||||
stopwatch.split("responses");
|
||||
|
||||
final Map<RecipientId, RecipientRecord> localRecords = SignalDatabase.recipients().getRecords(recipientIds);
|
||||
|
||||
Log.d(TAG, "Fetched " + localRecords.size() + " existing records.");
|
||||
|
||||
stopwatch.split("disk-fetch");
|
||||
|
||||
Set<RecipientId> success = SetUtil.difference(recipientIds, operationState.retries);
|
||||
|
||||
Set<RecipientId> newlyRegistered = Stream.of(operationState.profiles)
|
||||
.map(Pair::first)
|
||||
.filterNot(Recipient::isRegistered)
|
||||
.map(Recipient::getId)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
List<Pair<Recipient, ProfileAndCredential>> updatedProfiles = Stream.of(operationState.profiles)
|
||||
.filter(recipientProfileAndCredentialPair -> {
|
||||
final Recipient recipientToUpdate = recipientProfileAndCredentialPair.first();
|
||||
final RecipientId recipientToUpdateId = recipientToUpdate.getId();
|
||||
|
||||
final RecipientRecord localRecipientRecord = localRecords.get(recipientToUpdateId);
|
||||
if (localRecipientRecord == null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
final SignalServiceProfile remoteProfile = recipientProfileAndCredentialPair.second().getProfile();
|
||||
final Optional<ExpiringProfileKeyCredential> remoteCredential = recipientProfileAndCredentialPair.second().getExpiringProfileKeyCredential();
|
||||
|
||||
try {
|
||||
return isUpdated(localRecipientRecord, remoteProfile, remoteCredential);
|
||||
} catch (InvalidCiphertextException | IOException e) {
|
||||
Log.w(TAG, "Could not compare new and old profiles.", e);
|
||||
return true;
|
||||
}
|
||||
})
|
||||
.toList();
|
||||
|
||||
Log.d(TAG, "Committing updates to " + updatedProfiles.size() + " of " + operationState.profiles.size() + " retrieved profiles.");
|
||||
|
||||
if (!updatedProfiles.isEmpty()) {
|
||||
//noinspection SimplifyStreamApiCallChains
|
||||
ListUtil.chunk(updatedProfiles, 150).stream().forEach(list -> {
|
||||
SignalDatabase.runInTransaction((db) -> {
|
||||
for (Pair<Recipient, ProfileAndCredential> profile : list) {
|
||||
process(profile.first(), profile.second());
|
||||
}
|
||||
return null;
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
recipientTable.markProfilesFetched(success, System.currentTimeMillis());
|
||||
|
||||
if (operationState.unregistered.size() > 0 || newlyRegistered.size() > 0) {
|
||||
Log.i(TAG, "Marking " + newlyRegistered.size() + " users as registered and " + operationState.unregistered.size() + " users as unregistered.");
|
||||
recipientTable.bulkUpdatedRegisteredStatus(newlyRegistered, operationState.unregistered);
|
||||
}
|
||||
|
||||
stopwatch.split("process");
|
||||
|
||||
for (Pair<Recipient, ProfileAndCredential> profile : operationState.profiles) {
|
||||
setIdentityKey(profile.first(), profile.second().getProfile().getIdentityKey());
|
||||
}
|
||||
|
||||
stopwatch.split("identityKeys");
|
||||
|
||||
long keyCount = Stream.of(operationState.profiles).map(Pair::first).map(Recipient::getProfileKey).withoutNulls().count();
|
||||
Log.d(TAG, String.format(Locale.US, "Started with %d recipient(s). Found %d profile(s), and had keys for %d of them. Will retry %d.", recipients.size(), operationState.profiles.size(), keyCount, operationState.retries.size()));
|
||||
|
||||
stopwatch.stop(TAG);
|
||||
|
||||
recipientIds.clear();
|
||||
recipientIds.addAll(operationState.retries);
|
||||
|
||||
if (recipientIds.size() > 0) {
|
||||
throw new RetryLaterException();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean onShouldRetry(@NonNull Exception e) {
|
||||
return e instanceof RetryLaterException;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure() {}
|
||||
|
||||
private boolean isUpdated(RecipientRecord localRecipientRecord, SignalServiceProfile remoteProfile, Optional<ExpiringProfileKeyCredential> remoteExpiringProfileKeyCredential)
|
||||
throws InvalidCiphertextException, IOException {
|
||||
|
||||
if (!Util.equals(localRecipientRecord.getProfileAvatar(), remoteProfile.getAvatar())) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!Util.equals(localRecipientRecord.getBadges(), Stream.of(remoteProfile.getBadges()).map(Badges::fromServiceBadge).collect(Collectors.toList()))) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!Util.equals(localRecipientRecord.getCapabilities().getRawBits(), RecipientTable.maskCapabilitiesToLong(remoteProfile.getCapabilities()))) {
|
||||
return true;
|
||||
}
|
||||
|
||||
ProfileKey profileKey = ProfileKeyUtil.profileKeyOrNull(localRecipientRecord.getProfileKey());
|
||||
final UnidentifiedAccessMode accessMode = deriveUnidentifiedAccessMode(profileKey,
|
||||
remoteProfile.getUnidentifiedAccess(),
|
||||
remoteProfile.isUnrestrictedUnidentifiedAccess());
|
||||
if (!Util.equals(localRecipientRecord.getUnidentifiedAccessMode(), accessMode)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (profileKey == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final ProfileName newProfileName = ProfileName.fromSerialized(ProfileUtil.decryptString(profileKey, remoteProfile.getName()));
|
||||
if (!Util.equals(localRecipientRecord.getProfileName(), newProfileName)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!Util.equals(localRecipientRecord.getAbout(), ProfileUtil.decryptString(profileKey, remoteProfile.getAbout()))) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (remoteExpiringProfileKeyCredential.isPresent() && !Util.equals(localRecipientRecord.getExpiringProfileKeyCredential(), remoteExpiringProfileKeyCredential.get())) {
|
||||
return true;
|
||||
}
|
||||
|
||||
PhoneNumberSharingState remotePhoneNumberSharing = ProfileUtil.decryptBoolean(profileKey, remoteProfile.getPhoneNumberSharing())
|
||||
.map(value -> value ? PhoneNumberSharingState.ENABLED : PhoneNumberSharingState.DISABLED)
|
||||
.orElse(PhoneNumberSharingState.UNKNOWN);
|
||||
|
||||
if (localRecipientRecord.getPhoneNumberSharing() != remotePhoneNumberSharing) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private void process(Recipient recipient, ProfileAndCredential profileAndCredential) {
|
||||
SignalServiceProfile profile = profileAndCredential.getProfile();
|
||||
ProfileKey recipientProfileKey = ProfileKeyUtil.profileKeyOrNull(recipient.getProfileKey());
|
||||
|
||||
boolean wroteNewProfileName = setProfileName(recipient, profile.getName());
|
||||
|
||||
setProfileAbout(recipient, profile.getAbout(), profile.getAboutEmoji());
|
||||
setProfileAvatar(recipient, profile.getAvatar());
|
||||
setProfileBadges(recipient, profile.getBadges());
|
||||
setProfileCapabilities(recipient, profile.getCapabilities());
|
||||
setUnidentifiedAccessMode(recipient, profile.getUnidentifiedAccess(), profile.isUnrestrictedUnidentifiedAccess());
|
||||
setPhoneNumberSharingMode(recipient, profile.getPhoneNumberSharing());
|
||||
|
||||
if (recipientProfileKey != null) {
|
||||
profileAndCredential.getExpiringProfileKeyCredential()
|
||||
.ifPresent(profileKeyCredential -> setExpiringProfileKeyCredential(recipient, recipientProfileKey, profileKeyCredential));
|
||||
}
|
||||
|
||||
if (recipient.hasNonUsernameDisplayName(context) || wroteNewProfileName) {
|
||||
clearUsername(recipient);
|
||||
}
|
||||
}
|
||||
|
||||
private void setProfileBadges(@NonNull Recipient recipient, @Nullable List<SignalServiceProfile.Badge> serviceBadges) {
|
||||
if (serviceBadges == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<Badge> badges = serviceBadges.stream().map(Badges::fromServiceBadge).collect(java.util.stream.Collectors.toList());
|
||||
|
||||
if (badges.size() != recipient.getBadges().size()) {
|
||||
Log.i(TAG, "Likely change in badges for " + recipient.getId() + ". Going from " + recipient.getBadges().size() + " badge(s) to " + badges.size() + ".");
|
||||
}
|
||||
|
||||
SignalDatabase.recipients().setBadges(recipient.getId(), badges);
|
||||
}
|
||||
|
||||
private void setExpiringProfileKeyCredential(@NonNull Recipient recipient,
|
||||
@NonNull ProfileKey recipientProfileKey,
|
||||
@NonNull ExpiringProfileKeyCredential credential)
|
||||
{
|
||||
RecipientTable recipientTable = SignalDatabase.recipients();
|
||||
recipientTable.setProfileKeyCredential(recipient.getId(), recipientProfileKey, credential);
|
||||
}
|
||||
|
||||
private static SignalServiceProfile.RequestType getRequestType(@NonNull Recipient recipient) {
|
||||
return ExpiringProfileCredentialUtil.isValid(recipient.getExpiringProfileKeyCredential()) ? SignalServiceProfile.RequestType.PROFILE
|
||||
: SignalServiceProfile.RequestType.PROFILE_AND_CREDENTIAL;
|
||||
}
|
||||
|
||||
private void setIdentityKey(Recipient recipient, String identityKeyValue) {
|
||||
try {
|
||||
if (TextUtils.isEmpty(identityKeyValue)) {
|
||||
Log.w(TAG, "Identity key is missing on profile!");
|
||||
return;
|
||||
}
|
||||
|
||||
IdentityKey identityKey = new IdentityKey(Base64.decode(identityKeyValue), 0);
|
||||
|
||||
if (!ApplicationDependencies.getProtocolStore().aci().identities().getIdentityRecord(recipient.getId()).isPresent()) {
|
||||
Log.w(TAG, "Still first use for " + recipient.getId());
|
||||
return;
|
||||
}
|
||||
|
||||
IdentityUtil.saveIdentity(recipient.requireServiceId().toString(), identityKey);
|
||||
} catch (InvalidKeyException | IOException e) {
|
||||
Log.w(TAG, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void setUnidentifiedAccessMode(Recipient recipient, String unidentifiedAccessVerifier, boolean unrestrictedUnidentifiedAccess) {
|
||||
|
||||
final ProfileKey profileKey = ProfileKeyUtil.profileKeyOrNull(recipient.getProfileKey());
|
||||
final UnidentifiedAccessMode newMode = deriveUnidentifiedAccessMode(profileKey, unidentifiedAccessVerifier, unrestrictedUnidentifiedAccess);
|
||||
if (recipient.getUnidentifiedAccessMode() != newMode) {
|
||||
if (newMode == UnidentifiedAccessMode.UNRESTRICTED) {
|
||||
Log.i(TAG, "Marking recipient UD status as unrestricted.");
|
||||
} else if (profileKey == null || unidentifiedAccessVerifier == null) {
|
||||
Log.i(TAG, "Marking recipient UD status as disabled.");
|
||||
} else {
|
||||
Log.i(TAG, "Marking recipient UD status as " + newMode.name() + " after verification.");
|
||||
}
|
||||
SignalDatabase.recipients().setUnidentifiedAccessMode(recipient.getId(), newMode);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private UnidentifiedAccessMode deriveUnidentifiedAccessMode(ProfileKey profileKey, String unidentifiedAccessVerifier, boolean unrestrictedUnidentifiedAccess) {
|
||||
|
||||
if (unrestrictedUnidentifiedAccess && unidentifiedAccessVerifier != null) {
|
||||
return UnidentifiedAccessMode.UNRESTRICTED;
|
||||
} else if (profileKey == null || unidentifiedAccessVerifier == null) {
|
||||
return UnidentifiedAccessMode.DISABLED;
|
||||
} else {
|
||||
ProfileCipher profileCipher = new ProfileCipher(profileKey);
|
||||
boolean verifiedUnidentifiedAccess;
|
||||
|
||||
try {
|
||||
verifiedUnidentifiedAccess = profileCipher.verifyUnidentifiedAccess(Base64.decode(unidentifiedAccessVerifier));
|
||||
} catch (IOException e) {
|
||||
Log.w(TAG, e);
|
||||
verifiedUnidentifiedAccess = false;
|
||||
}
|
||||
|
||||
return verifiedUnidentifiedAccess ? UnidentifiedAccessMode.ENABLED : UnidentifiedAccessMode.DISABLED;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean setProfileName(Recipient recipient, String profileName) {
|
||||
try {
|
||||
ProfileKey profileKey = ProfileKeyUtil.profileKeyOrNull(recipient.getProfileKey());
|
||||
if (profileKey == null) return false;
|
||||
|
||||
String plaintextProfileName = Util.emptyIfNull(ProfileUtil.decryptString(profileKey, profileName));
|
||||
|
||||
if (TextUtils.isEmpty(plaintextProfileName)) {
|
||||
Log.w(TAG, "No name set on the profile for " + recipient.getId() + " -- Leaving it alone");
|
||||
return false;
|
||||
}
|
||||
|
||||
ProfileName remoteProfileName = ProfileName.fromSerialized(plaintextProfileName);
|
||||
ProfileName localProfileName = recipient.getProfileName();
|
||||
|
||||
if (!remoteProfileName.equals(localProfileName)) {
|
||||
Log.i(TAG, "Profile name updated. Writing new value.");
|
||||
SignalDatabase.recipients().setProfileName(recipient.getId(), remoteProfileName);
|
||||
|
||||
String remoteDisplayName = remoteProfileName.toString();
|
||||
String localDisplayName = localProfileName.toString();
|
||||
|
||||
boolean writeChangeEvent = !recipient.isBlocked() &&
|
||||
!recipient.isGroup() &&
|
||||
!recipient.isSelf() &&
|
||||
!localDisplayName.isEmpty() &&
|
||||
!remoteDisplayName.equals(localDisplayName);
|
||||
if (writeChangeEvent) {
|
||||
Log.i(TAG, "Writing a profile name change event for " + recipient.getId());
|
||||
SignalDatabase.messages().insertProfileNameChangeMessages(recipient, remoteDisplayName, localDisplayName);
|
||||
} else {
|
||||
Log.i(TAG, String.format(Locale.US, "Name changed, but wasn't relevant to write an event. blocked: %s, group: %s, self: %s, firstSet: %s, displayChange: %s",
|
||||
recipient.isBlocked(), recipient.isGroup(), recipient.isSelf(), localDisplayName.isEmpty(), !remoteDisplayName.equals(localDisplayName)));
|
||||
}
|
||||
|
||||
if (writeChangeEvent || localDisplayName.isEmpty()) {
|
||||
Long threadId = SignalDatabase.threads().getThreadIdFor(recipient.getId());
|
||||
if (threadId != null) {
|
||||
ApplicationDependencies.getMessageNotifier().updateNotification(context, ConversationId.forConversation(threadId));
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
} catch (InvalidCiphertextException e) {
|
||||
Log.w(TAG, "Bad profile key for " + recipient.getId());
|
||||
} catch (IOException e) {
|
||||
Log.w(TAG, e);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private void setProfileAbout(@NonNull Recipient recipient, @Nullable String encryptedAbout, @Nullable String encryptedEmoji) {
|
||||
try {
|
||||
ProfileKey profileKey = ProfileKeyUtil.profileKeyOrNull(recipient.getProfileKey());
|
||||
if (profileKey == null) return;
|
||||
|
||||
String plaintextAbout = ProfileUtil.decryptString(profileKey, encryptedAbout);
|
||||
String plaintextEmoji = ProfileUtil.decryptString(profileKey, encryptedEmoji);
|
||||
|
||||
SignalDatabase.recipients().setAbout(recipient.getId(), plaintextAbout, plaintextEmoji);
|
||||
} catch (InvalidCiphertextException | IOException e) {
|
||||
Log.w(TAG, e);
|
||||
}
|
||||
}
|
||||
|
||||
private static void setProfileAvatar(Recipient recipient, String profileAvatar) {
|
||||
if (recipient.getProfileKey() == null) return;
|
||||
if (!Util.equals(profileAvatar, recipient.getProfileAvatar())) {
|
||||
SignalDatabase.runPostSuccessfulTransaction(DEDUPE_KEY_RETRIEVE_AVATAR + recipient.getId(), () -> {
|
||||
SignalExecutors.BOUNDED.execute(() -> {
|
||||
ApplicationDependencies.getJobManager().add(new RetrieveProfileAvatarJob(recipient, profileAvatar));
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void clearUsername(Recipient recipient) {
|
||||
SignalDatabase.recipients().setUsername(recipient.getId(), null);
|
||||
}
|
||||
|
||||
private void setProfileCapabilities(@NonNull Recipient recipient, @Nullable SignalServiceProfile.Capabilities capabilities) {
|
||||
if (capabilities == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
SignalDatabase.recipients().setCapabilities(recipient.getId(), capabilities);
|
||||
}
|
||||
|
||||
private void setPhoneNumberSharingMode(Recipient recipient, String phoneNumberSharing) {
|
||||
ProfileKey profileKey = ProfileKeyUtil.profileKeyOrNull(recipient.getProfileKey());
|
||||
if (profileKey == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
PhoneNumberSharingState remotePhoneNumberSharing = ProfileUtil.decryptBoolean(profileKey, phoneNumberSharing)
|
||||
.map(value -> value ? PhoneNumberSharingState.ENABLED : PhoneNumberSharingState.DISABLED)
|
||||
.orElse(PhoneNumberSharingState.UNKNOWN);
|
||||
|
||||
if (recipient.getPhoneNumberSharing() != remotePhoneNumberSharing) {
|
||||
Log.i(TAG, "Updating phone number sharing state for " + recipient.getId() + " to " + remotePhoneNumberSharing);
|
||||
SignalDatabase.recipients().setPhoneNumberSharing(recipient.getId(), remotePhoneNumberSharing);
|
||||
}
|
||||
} catch (InvalidCiphertextException | IOException e) {
|
||||
Log.w(TAG, "Failed to set the phone number sharing setting!", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Collective state as responses are processed as they come in.
|
||||
*/
|
||||
private static class OperationState {
|
||||
final Set<RecipientId> retries = new HashSet<>();
|
||||
final Set<RecipientId> unregistered = new HashSet<>();
|
||||
final List<Pair<Recipient, ProfileAndCredential>> profiles = new ArrayList<>();
|
||||
}
|
||||
|
||||
public static final class Factory implements Job.Factory<RetrieveProfileJob> {
|
||||
|
||||
@Override
|
||||
public @NonNull RetrieveProfileJob create(@NonNull Parameters parameters, @Nullable byte[] serializedData) {
|
||||
JsonJobData data = JsonJobData.deserialize(serializedData);
|
||||
|
||||
String[] ids = data.getStringArray(KEY_RECIPIENTS);
|
||||
Set<RecipientId> recipientIds = Stream.of(ids).map(RecipientId::from).collect(Collectors.toSet());
|
||||
|
||||
return new RetrieveProfileJob(parameters, recipientIds);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,586 @@
|
||||
package org.thoughtcrime.securesms.jobs
|
||||
|
||||
import androidx.annotation.WorkerThread
|
||||
import io.reactivex.rxjava3.core.Observable
|
||||
import io.reactivex.rxjava3.schedulers.Schedulers
|
||||
import org.signal.core.util.Base64.decode
|
||||
import org.signal.core.util.Stopwatch
|
||||
import org.signal.core.util.concurrent.SignalExecutors
|
||||
import org.signal.core.util.concurrent.safeBlockingGet
|
||||
import org.signal.core.util.logging.Log
|
||||
import org.signal.libsignal.protocol.IdentityKey
|
||||
import org.signal.libsignal.protocol.InvalidKeyException
|
||||
import org.signal.libsignal.protocol.util.Pair
|
||||
import org.signal.libsignal.zkgroup.profiles.ExpiringProfileKeyCredential
|
||||
import org.signal.libsignal.zkgroup.profiles.ProfileKey
|
||||
import org.thoughtcrime.securesms.badges.Badges
|
||||
import org.thoughtcrime.securesms.crypto.ProfileKeyUtil
|
||||
import org.thoughtcrime.securesms.database.GroupTable
|
||||
import org.thoughtcrime.securesms.database.RecipientTable
|
||||
import org.thoughtcrime.securesms.database.RecipientTable.Companion.maskCapabilitiesToLong
|
||||
import org.thoughtcrime.securesms.database.RecipientTable.PhoneNumberSharingState
|
||||
import org.thoughtcrime.securesms.database.RecipientTable.UnidentifiedAccessMode
|
||||
import org.thoughtcrime.securesms.database.SignalDatabase
|
||||
import org.thoughtcrime.securesms.database.model.RecipientRecord
|
||||
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
|
||||
import org.thoughtcrime.securesms.jobmanager.Job
|
||||
import org.thoughtcrime.securesms.jobmanager.JsonJobData
|
||||
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
|
||||
import org.thoughtcrime.securesms.keyvalue.SignalStore
|
||||
import org.thoughtcrime.securesms.notifications.v2.ConversationId.Companion.forConversation
|
||||
import org.thoughtcrime.securesms.profiles.ProfileName
|
||||
import org.thoughtcrime.securesms.recipients.Recipient
|
||||
import org.thoughtcrime.securesms.recipients.RecipientId
|
||||
import org.thoughtcrime.securesms.recipients.RecipientUtil
|
||||
import org.thoughtcrime.securesms.transport.RetryLaterException
|
||||
import org.thoughtcrime.securesms.util.IdentityUtil
|
||||
import org.thoughtcrime.securesms.util.ProfileUtil
|
||||
import org.thoughtcrime.securesms.util.Util
|
||||
import org.whispersystems.signalservice.api.crypto.InvalidCiphertextException
|
||||
import org.whispersystems.signalservice.api.crypto.ProfileCipher
|
||||
import org.whispersystems.signalservice.api.profiles.ProfileAndCredential
|
||||
import org.whispersystems.signalservice.api.profiles.SignalServiceProfile
|
||||
import org.whispersystems.signalservice.api.services.ProfileService.ProfileResponseProcessor
|
||||
import org.whispersystems.signalservice.api.util.ExpiringProfileCredentialUtil
|
||||
import org.whispersystems.signalservice.internal.ServiceResponse
|
||||
import java.io.IOException
|
||||
import java.util.Optional
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
/**
|
||||
* Retrieves a users profile and sets the appropriate local fields.
|
||||
*/
|
||||
class RetrieveProfileJob private constructor(parameters: Parameters, private val recipientIds: MutableSet<RecipientId>) : BaseJob(parameters) {
|
||||
constructor(recipientIds: Set<RecipientId>) : this(
|
||||
Parameters.Builder()
|
||||
.addConstraint(NetworkConstraint.KEY)
|
||||
.setMaxAttempts(3)
|
||||
.build(),
|
||||
recipientIds.toMutableSet()
|
||||
)
|
||||
|
||||
override fun serialize(): ByteArray? {
|
||||
return JsonJobData.Builder()
|
||||
.putStringListAsArray(KEY_RECIPIENTS, recipientIds.map { it.serialize() })
|
||||
.serialize()
|
||||
}
|
||||
|
||||
override fun getFactoryKey(): String = KEY
|
||||
|
||||
override fun shouldTrace(): Boolean = true
|
||||
|
||||
@Throws(IOException::class, RetryLaterException::class)
|
||||
public override fun onRun() {
|
||||
if (!SignalStore.account().isRegistered) {
|
||||
Log.w(TAG, "Unregistered. Skipping.")
|
||||
return
|
||||
}
|
||||
|
||||
val stopwatch = Stopwatch("RetrieveProfile")
|
||||
|
||||
RecipientUtil.ensureUuidsAreAvailable(
|
||||
context,
|
||||
Recipient.resolvedList(recipientIds).filter { it.registered != RecipientTable.RegisteredState.NOT_REGISTERED }
|
||||
)
|
||||
|
||||
val recipients = Recipient.resolvedList(recipientIds)
|
||||
stopwatch.split("resolve-ensure")
|
||||
|
||||
val requests: List<Observable<Pair<Recipient, ServiceResponse<ProfileAndCredential>>>> = recipients
|
||||
.filter { it.hasServiceId() }
|
||||
.map { ProfileUtil.retrieveProfile(context, it, getRequestType(it)).toObservable() }
|
||||
stopwatch.split("requests")
|
||||
|
||||
val operationState = Observable.mergeDelayError(requests, 16, 1)
|
||||
.observeOn(Schedulers.io(), true)
|
||||
.scan(OperationState()) { state: OperationState, pair: Pair<Recipient, ServiceResponse<ProfileAndCredential>> ->
|
||||
val recipient = pair.first()
|
||||
val processor = ProfileResponseProcessor(pair.second())
|
||||
|
||||
if (processor.hasResult()) {
|
||||
state.profiles.add(processor.getResult(recipient))
|
||||
} else if (processor.notFound()) {
|
||||
Log.w(TAG, "Failed to find a profile for ${recipient.id}")
|
||||
if (recipient.isRegistered) {
|
||||
state.unregistered.add(recipient.id)
|
||||
}
|
||||
} else if (processor.genericIoError()) {
|
||||
state.retries.add(recipient.id)
|
||||
} else {
|
||||
Log.w(TAG, "Failed to retrieve profile for ${recipient.id}", processor.error)
|
||||
}
|
||||
state
|
||||
}
|
||||
.lastOrError()
|
||||
.safeBlockingGet()
|
||||
stopwatch.split("responses")
|
||||
|
||||
val localRecords = SignalDatabase.recipients.getRecords(recipientIds)
|
||||
Log.d(TAG, "Fetched ${localRecords.size} existing records.")
|
||||
stopwatch.split("disk-fetch")
|
||||
|
||||
val successIds: Set<RecipientId> = recipientIds - operationState.retries
|
||||
val newlyRegisteredIds: Set<RecipientId> = operationState.profiles
|
||||
.map { it.first() }
|
||||
.filterNot { it.isRegistered }
|
||||
.map { it.id }
|
||||
.toSet()
|
||||
|
||||
val updatedProfiles = operationState.profiles
|
||||
.filter { recipientProfileAndCredentialPair: Pair<Recipient, ProfileAndCredential> ->
|
||||
val recipientToUpdate = recipientProfileAndCredentialPair.first()
|
||||
val localRecipientRecord = localRecords[recipientToUpdate.id] ?: return@filter true
|
||||
val remoteProfile = recipientProfileAndCredentialPair.second().profile
|
||||
val remoteCredential = recipientProfileAndCredentialPair.second().expiringProfileKeyCredential
|
||||
|
||||
return@filter try {
|
||||
isUpdated(localRecipientRecord, remoteProfile, remoteCredential)
|
||||
} catch (e: InvalidCiphertextException) {
|
||||
Log.w(TAG, "Could not compare new and old profiles.", e)
|
||||
true
|
||||
} catch (e: IOException) {
|
||||
Log.w(TAG, "Could not compare new and old profiles.", e)
|
||||
true
|
||||
}
|
||||
}
|
||||
.toList()
|
||||
stopwatch.split("filter")
|
||||
|
||||
Log.d(TAG, "Committing updates to " + updatedProfiles.size + " of " + operationState.profiles.size + " retrieved profiles.")
|
||||
updatedProfiles.chunked(150).forEach { list: List<Pair<Recipient, ProfileAndCredential>> ->
|
||||
SignalDatabase.runInTransaction {
|
||||
for (profile in list) {
|
||||
process(profile.first(), profile.second())
|
||||
}
|
||||
}
|
||||
}
|
||||
stopwatch.split("process")
|
||||
|
||||
SignalDatabase.recipients.markProfilesFetched(successIds, System.currentTimeMillis())
|
||||
stopwatch.split("mark-fetched")
|
||||
|
||||
if (operationState.unregistered.isNotEmpty() || newlyRegisteredIds.isNotEmpty()) {
|
||||
Log.i(TAG, "Marking " + newlyRegisteredIds.size + " users as registered and " + operationState.unregistered.size + " users as unregistered.")
|
||||
SignalDatabase.recipients.bulkUpdatedRegisteredStatus(newlyRegisteredIds, operationState.unregistered)
|
||||
}
|
||||
stopwatch.split("registered-update")
|
||||
|
||||
for (profile in operationState.profiles) {
|
||||
setIdentityKey(profile.first(), profile.second().profile.identityKey)
|
||||
}
|
||||
stopwatch.split("identityKeys")
|
||||
|
||||
val keyCount = operationState.profiles.map { it.first() }.mapNotNull { it.profileKey }.count()
|
||||
|
||||
Log.d(TAG, "Started with ${recipients.size} recipient(s). Found ${operationState.profiles.size} profile(s), and had keys for $keyCount of them. Will retry ${operationState.retries.size}.")
|
||||
stopwatch.stop(TAG)
|
||||
|
||||
recipientIds.clear()
|
||||
recipientIds.addAll(operationState.retries)
|
||||
|
||||
if (recipientIds.isNotEmpty()) {
|
||||
throw RetryLaterException()
|
||||
}
|
||||
}
|
||||
|
||||
public override fun onShouldRetry(e: Exception): Boolean {
|
||||
return e is RetryLaterException
|
||||
}
|
||||
|
||||
override fun onFailure() {}
|
||||
|
||||
@Throws(InvalidCiphertextException::class, IOException::class)
|
||||
private fun isUpdated(localRecipientRecord: RecipientRecord, remoteProfile: SignalServiceProfile, remoteExpiringProfileKeyCredential: Optional<ExpiringProfileKeyCredential>): Boolean {
|
||||
if (localRecipientRecord.signalProfileAvatar != remoteProfile.avatar) {
|
||||
return true
|
||||
}
|
||||
|
||||
if (localRecipientRecord.badges != remoteProfile.badges.map { Badges.fromServiceBadge(it) }) {
|
||||
return true
|
||||
}
|
||||
|
||||
if (localRecipientRecord.capabilities.rawBits != maskCapabilitiesToLong(remoteProfile.capabilities)) {
|
||||
return true
|
||||
}
|
||||
|
||||
val profileKey = ProfileKeyUtil.profileKeyOrNull(localRecipientRecord.profileKey)
|
||||
val accessMode = deriveUnidentifiedAccessMode(
|
||||
profileKey = profileKey,
|
||||
unidentifiedAccessVerifier = remoteProfile.unidentifiedAccess,
|
||||
unrestrictedUnidentifiedAccess = remoteProfile.isUnrestrictedUnidentifiedAccess
|
||||
)
|
||||
|
||||
if (localRecipientRecord.unidentifiedAccessMode != accessMode) {
|
||||
return true
|
||||
}
|
||||
|
||||
if (profileKey == null) {
|
||||
return false
|
||||
}
|
||||
|
||||
val newProfileName = ProfileName.fromSerialized(ProfileUtil.decryptString(profileKey, remoteProfile.name))
|
||||
if (localRecipientRecord.signalProfileName != newProfileName) {
|
||||
return true
|
||||
}
|
||||
|
||||
if (localRecipientRecord.about != ProfileUtil.decryptString(profileKey, remoteProfile.about)) {
|
||||
return true
|
||||
}
|
||||
|
||||
if (remoteExpiringProfileKeyCredential.isPresent && localRecipientRecord.expiringProfileKeyCredential != remoteExpiringProfileKeyCredential.get()) {
|
||||
return true
|
||||
}
|
||||
|
||||
val remotePhoneNumberSharing = ProfileUtil.decryptBoolean(profileKey, remoteProfile.phoneNumberSharing)
|
||||
.map { value: Boolean -> if (value) PhoneNumberSharingState.ENABLED else PhoneNumberSharingState.DISABLED }
|
||||
.orElse(PhoneNumberSharingState.UNKNOWN)
|
||||
|
||||
if (localRecipientRecord.phoneNumberSharing != remotePhoneNumberSharing) {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
private fun process(recipient: Recipient, profileAndCredential: ProfileAndCredential) {
|
||||
val profile = profileAndCredential.profile
|
||||
val recipientProfileKey = ProfileKeyUtil.profileKeyOrNull(recipient.profileKey)
|
||||
val wroteNewProfileName = setProfileName(recipient, profile.name)
|
||||
|
||||
setProfileAbout(recipient, profile.about, profile.aboutEmoji)
|
||||
setProfileAvatar(recipient, profile.avatar)
|
||||
setProfileBadges(recipient, profile.badges)
|
||||
setProfileCapabilities(recipient, profile.capabilities)
|
||||
setUnidentifiedAccessMode(recipient, profile.unidentifiedAccess, profile.isUnrestrictedUnidentifiedAccess)
|
||||
setPhoneNumberSharingMode(recipient, profile.phoneNumberSharing)
|
||||
|
||||
if (recipientProfileKey != null) {
|
||||
profileAndCredential.expiringProfileKeyCredential
|
||||
.ifPresent { profileKeyCredential: ExpiringProfileKeyCredential -> setExpiringProfileKeyCredential(recipient, recipientProfileKey, profileKeyCredential) }
|
||||
}
|
||||
|
||||
if (recipient.hasNonUsernameDisplayName(context) || wroteNewProfileName) {
|
||||
clearUsername(recipient)
|
||||
}
|
||||
}
|
||||
|
||||
private fun setProfileBadges(recipient: Recipient, serviceBadges: List<SignalServiceProfile.Badge>?) {
|
||||
if (serviceBadges == null) {
|
||||
return
|
||||
}
|
||||
|
||||
val badges = serviceBadges.map { Badges.fromServiceBadge(it) }
|
||||
if (badges.size != recipient.badges.size) {
|
||||
Log.i(TAG, "Likely change in badges for ${recipient.id}. Going from ${recipient.badges.size} badge(s) to ${badges.size}.")
|
||||
}
|
||||
|
||||
SignalDatabase.recipients.setBadges(recipient.id, badges)
|
||||
}
|
||||
|
||||
private fun setExpiringProfileKeyCredential(
|
||||
recipient: Recipient,
|
||||
recipientProfileKey: ProfileKey,
|
||||
credential: ExpiringProfileKeyCredential
|
||||
) {
|
||||
SignalDatabase.recipients.setProfileKeyCredential(recipient.id, recipientProfileKey, credential)
|
||||
}
|
||||
|
||||
private fun setIdentityKey(recipient: Recipient, identityKeyValue: String?) {
|
||||
try {
|
||||
if (identityKeyValue.isNullOrBlank()) {
|
||||
Log.w(TAG, "Identity key is missing on profile!")
|
||||
return
|
||||
}
|
||||
|
||||
val identityKey = IdentityKey(decode(identityKeyValue), 0)
|
||||
if (!ApplicationDependencies.getProtocolStore().aci().identities().getIdentityRecord(recipient.id).isPresent) {
|
||||
Log.w(TAG, "Still first use for ${recipient.id}")
|
||||
return
|
||||
}
|
||||
|
||||
IdentityUtil.saveIdentity(recipient.requireServiceId().toString(), identityKey)
|
||||
} catch (e: InvalidKeyException) {
|
||||
Log.w(TAG, e)
|
||||
} catch (e: IOException) {
|
||||
Log.w(TAG, e)
|
||||
}
|
||||
}
|
||||
|
||||
private fun setUnidentifiedAccessMode(recipient: Recipient, unidentifiedAccessVerifier: String?, unrestrictedUnidentifiedAccess: Boolean) {
|
||||
val profileKey = ProfileKeyUtil.profileKeyOrNull(recipient.profileKey)
|
||||
val newMode = deriveUnidentifiedAccessMode(profileKey, unidentifiedAccessVerifier, unrestrictedUnidentifiedAccess)
|
||||
|
||||
if (recipient.unidentifiedAccessMode !== newMode) {
|
||||
if (newMode === UnidentifiedAccessMode.UNRESTRICTED) {
|
||||
Log.i(TAG, "Marking recipient UD status as unrestricted.")
|
||||
} else if (profileKey == null || unidentifiedAccessVerifier == null) {
|
||||
Log.i(TAG, "Marking recipient UD status as disabled.")
|
||||
} else {
|
||||
Log.i(TAG, "Marking recipient UD status as " + newMode.name + " after verification.")
|
||||
}
|
||||
|
||||
SignalDatabase.recipients.setUnidentifiedAccessMode(recipient.id, newMode)
|
||||
}
|
||||
}
|
||||
|
||||
private fun deriveUnidentifiedAccessMode(profileKey: ProfileKey?, unidentifiedAccessVerifier: String?, unrestrictedUnidentifiedAccess: Boolean): UnidentifiedAccessMode {
|
||||
return if (unrestrictedUnidentifiedAccess && unidentifiedAccessVerifier != null) {
|
||||
UnidentifiedAccessMode.UNRESTRICTED
|
||||
} else if (profileKey == null || unidentifiedAccessVerifier == null) {
|
||||
UnidentifiedAccessMode.DISABLED
|
||||
} else {
|
||||
val profileCipher = ProfileCipher(profileKey)
|
||||
val verifiedUnidentifiedAccess: Boolean = try {
|
||||
profileCipher.verifyUnidentifiedAccess(decode(unidentifiedAccessVerifier))
|
||||
} catch (e: IOException) {
|
||||
Log.w(TAG, e)
|
||||
false
|
||||
}
|
||||
|
||||
if (verifiedUnidentifiedAccess) {
|
||||
UnidentifiedAccessMode.ENABLED
|
||||
} else {
|
||||
UnidentifiedAccessMode.DISABLED
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun setProfileName(recipient: Recipient, profileName: String?): Boolean {
|
||||
try {
|
||||
val profileKey = ProfileKeyUtil.profileKeyOrNull(recipient.profileKey) ?: return false
|
||||
val plaintextProfileName = Util.emptyIfNull(ProfileUtil.decryptString(profileKey, profileName))
|
||||
|
||||
if (plaintextProfileName.isNullOrBlank()) {
|
||||
Log.w(TAG, "No name set on the profile for ${recipient.id} -- Leaving it alone")
|
||||
return false
|
||||
}
|
||||
|
||||
val remoteProfileName = ProfileName.fromSerialized(plaintextProfileName)
|
||||
val localProfileName = recipient.profileName
|
||||
|
||||
if (remoteProfileName != localProfileName) {
|
||||
Log.i(TAG, "Profile name updated. Writing new value.")
|
||||
SignalDatabase.recipients.setProfileName(recipient.id, remoteProfileName)
|
||||
|
||||
val remoteDisplayName = remoteProfileName.toString()
|
||||
val localDisplayName = localProfileName.toString()
|
||||
val writeChangeEvent = !recipient.isBlocked &&
|
||||
!recipient.isGroup &&
|
||||
!recipient.isSelf &&
|
||||
localDisplayName.isNotEmpty() &&
|
||||
remoteDisplayName != localDisplayName
|
||||
|
||||
if (writeChangeEvent) {
|
||||
Log.i(TAG, "Writing a profile name change event for ${recipient.id}")
|
||||
SignalDatabase.messages.insertProfileNameChangeMessages(recipient, remoteDisplayName, localDisplayName)
|
||||
} else {
|
||||
Log.i(TAG, "Name changed, but wasn't relevant to write an event. blocked: ${recipient.isBlocked}, group: ${recipient.isGroup}, self: ${recipient.isSelf}, firstSet: ${localDisplayName.isEmpty()}, displayChange: ${remoteDisplayName != localDisplayName}")
|
||||
}
|
||||
|
||||
if (writeChangeEvent || localDisplayName.isEmpty()) {
|
||||
val threadId = SignalDatabase.threads.getThreadIdFor(recipient.id)
|
||||
if (threadId != null) {
|
||||
ApplicationDependencies.getMessageNotifier().updateNotification(context, forConversation(threadId))
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
} catch (e: InvalidCiphertextException) {
|
||||
Log.w(TAG, "Bad profile key for ${recipient.id}")
|
||||
} catch (e: IOException) {
|
||||
Log.w(TAG, e)
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
private fun setProfileAbout(recipient: Recipient, encryptedAbout: String?, encryptedEmoji: String?) {
|
||||
try {
|
||||
val profileKey = ProfileKeyUtil.profileKeyOrNull(recipient.profileKey) ?: return
|
||||
val plaintextAbout = ProfileUtil.decryptString(profileKey, encryptedAbout)
|
||||
val plaintextEmoji = ProfileUtil.decryptString(profileKey, encryptedEmoji)
|
||||
|
||||
SignalDatabase.recipients.setAbout(recipient.id, plaintextAbout, plaintextEmoji)
|
||||
} catch (e: InvalidCiphertextException) {
|
||||
Log.w(TAG, e)
|
||||
} catch (e: IOException) {
|
||||
Log.w(TAG, e)
|
||||
}
|
||||
}
|
||||
|
||||
private fun clearUsername(recipient: Recipient) {
|
||||
SignalDatabase.recipients.setUsername(recipient.id, null)
|
||||
}
|
||||
|
||||
private fun setProfileCapabilities(recipient: Recipient, capabilities: SignalServiceProfile.Capabilities?) {
|
||||
if (capabilities == null) {
|
||||
return
|
||||
}
|
||||
|
||||
SignalDatabase.recipients.setCapabilities(recipient.id, capabilities)
|
||||
}
|
||||
|
||||
private fun setPhoneNumberSharingMode(recipient: Recipient, phoneNumberSharing: String?) {
|
||||
val profileKey = ProfileKeyUtil.profileKeyOrNull(recipient.profileKey) ?: return
|
||||
|
||||
try {
|
||||
val remotePhoneNumberSharing = ProfileUtil.decryptBoolean(profileKey, phoneNumberSharing)
|
||||
.map { value: Boolean -> if (value) PhoneNumberSharingState.ENABLED else PhoneNumberSharingState.DISABLED }
|
||||
.orElse(PhoneNumberSharingState.UNKNOWN)
|
||||
|
||||
if (recipient.phoneNumberSharing !== remotePhoneNumberSharing) {
|
||||
Log.i(TAG, "Updating phone number sharing state for " + recipient.id + " to " + remotePhoneNumberSharing)
|
||||
SignalDatabase.recipients.setPhoneNumberSharing(recipient.id, remotePhoneNumberSharing)
|
||||
}
|
||||
} catch (e: InvalidCiphertextException) {
|
||||
Log.w(TAG, "Failed to set the phone number sharing setting!", e)
|
||||
} catch (e: IOException) {
|
||||
Log.w(TAG, "Failed to set the phone number sharing setting!", e)
|
||||
}
|
||||
}
|
||||
|
||||
private fun setProfileAvatar(recipient: Recipient, profileAvatar: String?) {
|
||||
if (recipient.profileKey == null) {
|
||||
return
|
||||
}
|
||||
|
||||
if (profileAvatar != recipient.profileAvatar) {
|
||||
SignalDatabase.runPostSuccessfulTransaction(DEDUPE_KEY_RETRIEVE_AVATAR + recipient.id) {
|
||||
SignalExecutors.BOUNDED.execute {
|
||||
ApplicationDependencies.getJobManager().add(RetrieveProfileAvatarJob(recipient, profileAvatar))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun getRequestType(recipient: Recipient): SignalServiceProfile.RequestType {
|
||||
return if (ExpiringProfileCredentialUtil.isValid(recipient.expiringProfileKeyCredential)) SignalServiceProfile.RequestType.PROFILE else SignalServiceProfile.RequestType.PROFILE_AND_CREDENTIAL
|
||||
}
|
||||
|
||||
/**
|
||||
* Collective state as responses are processed as they come in.
|
||||
*/
|
||||
private class OperationState {
|
||||
val retries: MutableSet<RecipientId> = HashSet()
|
||||
val unregistered: MutableSet<RecipientId> = HashSet()
|
||||
val profiles: MutableList<Pair<Recipient, ProfileAndCredential>> = ArrayList()
|
||||
}
|
||||
|
||||
class Factory : Job.Factory<RetrieveProfileJob?> {
|
||||
override fun create(parameters: Parameters, serializedData: ByteArray?): RetrieveProfileJob {
|
||||
val data = JsonJobData.deserialize(serializedData)
|
||||
val recipientIds: MutableSet<RecipientId> = data.getStringArray(KEY_RECIPIENTS).map { RecipientId.from(it) }.toMutableSet()
|
||||
|
||||
return RetrieveProfileJob(parameters, recipientIds)
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
const val KEY = "RetrieveProfileJob"
|
||||
private val TAG = Log.tag(RetrieveProfileJob::class.java)
|
||||
private const val KEY_RECIPIENTS = "recipients"
|
||||
private const val DEDUPE_KEY_RETRIEVE_AVATAR = KEY + "_RETRIEVE_PROFILE_AVATAR"
|
||||
|
||||
/**
|
||||
* Submits the necessary job to refresh the profile of the requested recipient. Works for any
|
||||
* RecipientId, including individuals, groups, or yourself.
|
||||
*
|
||||
*
|
||||
* Identical to [.enqueue])}
|
||||
*/
|
||||
@JvmStatic
|
||||
@WorkerThread
|
||||
fun enqueue(recipientId: RecipientId) {
|
||||
ApplicationDependencies.getJobManager().add(forRecipient(recipientId))
|
||||
}
|
||||
|
||||
/**
|
||||
* Submits the necessary jobs to refresh the profiles of the requested recipients. Works for any
|
||||
* RecipientIds, including individuals, groups, or yourself.
|
||||
*/
|
||||
@JvmStatic
|
||||
@WorkerThread
|
||||
fun enqueue(recipientIds: Set<RecipientId>) {
|
||||
val jobManager = ApplicationDependencies.getJobManager()
|
||||
for (job in forRecipients(recipientIds)) {
|
||||
jobManager.add(job)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Works for any RecipientId, whether it's an individual, group, or yourself.
|
||||
*/
|
||||
@JvmStatic
|
||||
@WorkerThread
|
||||
fun forRecipient(recipientId: RecipientId): Job {
|
||||
return forRecipients(setOf(recipientId)).first()
|
||||
}
|
||||
|
||||
/**
|
||||
* Works for any RecipientId, whether it's an individual, group, or yourself.
|
||||
*
|
||||
* @return A list of length 2 or less. Two iff you are in the recipients.
|
||||
*/
|
||||
@JvmStatic
|
||||
@WorkerThread
|
||||
fun forRecipients(recipientIds: Set<RecipientId>): List<Job> {
|
||||
val combined: MutableSet<RecipientId> = HashSet(recipientIds.size)
|
||||
var includeSelf = false
|
||||
|
||||
for (recipientId in recipientIds) {
|
||||
val recipient = Recipient.resolved(recipientId)
|
||||
when {
|
||||
recipient.isSelf -> includeSelf = true
|
||||
recipient.isGroup -> combined += SignalDatabase.groups.getGroupMemberIds(recipient.requireGroupId(), GroupTable.MemberSet.FULL_MEMBERS_EXCLUDING_SELF)
|
||||
else -> combined.add(recipientId)
|
||||
}
|
||||
}
|
||||
|
||||
return ArrayList<Job>(2).apply {
|
||||
if (includeSelf) {
|
||||
add(RefreshOwnProfileJob())
|
||||
}
|
||||
if (combined.size > 0) {
|
||||
add(RetrieveProfileJob(combined))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Will fetch some profiles to ensure we're decently up-to-date if we haven't done so within a
|
||||
* certain time period.
|
||||
*/
|
||||
@JvmStatic
|
||||
fun enqueueRoutineFetchIfNecessary() {
|
||||
if (!SignalStore.registrationValues().isRegistrationComplete || !SignalStore.account().isRegistered || SignalStore.account().aci == null) {
|
||||
Log.i(TAG, "Registration not complete. Skipping.")
|
||||
return
|
||||
}
|
||||
|
||||
val timeSinceRefresh = System.currentTimeMillis() - SignalStore.misc().lastProfileRefreshTime
|
||||
if (timeSinceRefresh < TimeUnit.HOURS.toMillis(12)) {
|
||||
Log.i(TAG, "Too soon to refresh. Did the last refresh $timeSinceRefresh ms ago.")
|
||||
return
|
||||
}
|
||||
|
||||
SignalExecutors.BOUNDED.execute {
|
||||
val current = System.currentTimeMillis()
|
||||
val ids: List<RecipientId> = SignalDatabase.recipients.getRecipientsForRoutineProfileFetch(
|
||||
lastInteractionThreshold = current - TimeUnit.DAYS.toMillis(30),
|
||||
lastProfileFetchThreshold = current - TimeUnit.DAYS.toMillis(1),
|
||||
limit = 50
|
||||
) + Recipient.self().id
|
||||
|
||||
if (ids.isNotEmpty()) {
|
||||
Log.i(TAG, "Optimistically refreshing ${ids.size} eligible recipient(s).")
|
||||
enqueue(ids.toSet())
|
||||
} else {
|
||||
Log.i(TAG, "No recipients to refresh.")
|
||||
}
|
||||
|
||||
SignalStore.misc().lastProfileRefreshTime = System.currentTimeMillis()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user