Add storage service optimization to avoid manifest reads.

This commit is contained in:
Greyson Parrelli
2025-04-14 12:01:37 -04:00
committed by Cody Henthorne
parent fe70637140
commit f68bb2dc88
15 changed files with 57 additions and 32 deletions

View File

@@ -160,7 +160,7 @@ fun ToolScreen(
modifier = Modifier.fillMaxWidth()
) {
ActionRow("Enqueue StorageSyncJob", "Just a normal syncing operation.") {
AppDependencies.jobManager.add(StorageSyncJob())
AppDependencies.jobManager.add(StorageSyncJob.forLocalChange())
}
ActionRow("Enqueue StorageForcePushJob", "Forces your local state over the remote.") {

View File

@@ -931,7 +931,6 @@ public class ConversationListFragment extends MainFragment implements ActionMode
private void handleMarkAsUnread(@NonNull Collection<Long> ids) {
SimpleTask.run(getViewLifecycleOwner().getLifecycle(), () -> {
SignalDatabase.threads().setForcedUnread(ids);
StorageSyncHelper.scheduleSyncForDataChange();
return null;
}, none -> endActionModeIfActive());
}

View File

@@ -293,7 +293,7 @@ public class RefreshOwnProfileJob extends BaseJob {
private void syncWithStorageServiceThenUploadProfile() {
AppDependencies.getJobManager()
.startChain(new StorageSyncJob())
.startChain(StorageSyncJob.forRemoteChange())
.then(new ProfileUploadJob())
.enqueue();
}

View File

@@ -98,7 +98,7 @@ class StorageRotateManifestJob private constructor(parameters: Parameters) : Job
}
StorageServiceRepository.WriteStorageRecordsResult.ConflictError -> {
Log.w(TAG, "Hit a conflict! Enqueuing a sync followed by another rotation.")
AppDependencies.jobManager.add(StorageSyncJob())
AppDependencies.jobManager.add(StorageSyncJob.forRemoteChange())
AppDependencies.jobManager.add(StorageRotateManifestJob())
Result.failure()
}

View File

@@ -14,6 +14,7 @@ import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
import org.thoughtcrime.securesms.jobs.protos.StorageSyncJobData
import org.thoughtcrime.securesms.keyvalue.SignalStore
import org.thoughtcrime.securesms.net.SignalNetwork
import org.thoughtcrime.securesms.recipients.Recipient
@@ -127,25 +128,38 @@ import java.util.stream.Collectors
* to enqueue a [StorageServiceMigrationJob] as an app migration to make sure it gets
* synced.
*/
class StorageSyncJob private constructor(parameters: Parameters) : BaseJob(parameters) {
class StorageSyncJob private constructor(parameters: Parameters, private var localManifestOutOfDate: Boolean) : BaseJob(parameters) {
companion object {
const val KEY: String = "StorageSyncJobV2"
const val QUEUE_KEY: String = "StorageSyncingJobs"
private val TAG = Log.tag(StorageSyncJob::class.java)
@JvmStatic
fun forLocalChange(): StorageSyncJob {
return StorageSyncJob(localManifestOutOfDate = false)
}
@JvmStatic
fun forRemoteChange(): StorageSyncJob {
return StorageSyncJob(localManifestOutOfDate = true)
}
}
constructor() : this(
constructor(localManifestOutOfDate: Boolean) : this(
Parameters.Builder().addConstraint(NetworkConstraint.KEY)
.setQueue(QUEUE_KEY)
.setMaxInstancesForFactory(2)
.setLifespan(TimeUnit.DAYS.toMillis(1))
.setMaxAttempts(3)
.build()
.build(),
localManifestOutOfDate
)
override fun serialize(): ByteArray? = null
override fun serialize(): ByteArray {
return StorageSyncJobData(localManifestOutOfDate = localManifestOutOfDate).encode()
}
override fun getFactoryKey(): String = KEY
@@ -222,12 +236,18 @@ class StorageSyncJob private constructor(parameters: Parameters) : BaseJob(param
val repository = StorageServiceRepository(SignalNetwork.storageService)
val localManifest = SignalStore.storageService.manifest
val remoteManifest = when (val result = repository.getStorageManifestIfDifferentVersion(storageServiceKey, localManifest.version)) {
is ManifestIfDifferentVersionResult.DifferentVersion -> result.manifest
ManifestIfDifferentVersionResult.SameVersion -> localManifest
is ManifestIfDifferentVersionResult.DecryptionError -> throw result.exception
is ManifestIfDifferentVersionResult.NetworkError -> throw result.exception
is ManifestIfDifferentVersionResult.StatusCodeError -> throw result.exception
val remoteManifest = if (localManifestOutOfDate || localManifest.version < 1 || runAttempt >= 3) {
Log.i(TAG, "Local manifest is invalid. Fetching remote manifest. (localManifestOutOfDate: $localManifestOutOfDate, localManifest.version: ${localManifest.version}, runAttempt: $runAttempt)")
when (val result = repository.getStorageManifestIfDifferentVersion(storageServiceKey, localManifest.version)) {
is ManifestIfDifferentVersionResult.DifferentVersion -> result.manifest
ManifestIfDifferentVersionResult.SameVersion -> localManifest
is ManifestIfDifferentVersionResult.DecryptionError -> throw result.exception
is ManifestIfDifferentVersionResult.NetworkError -> throw result.exception
is ManifestIfDifferentVersionResult.StatusCodeError -> throw result.exception
}
} else {
Log.i(TAG, "Local manifest is potentially valid. Using it in place of fetching the remote manifest.")
localManifest
}
stopwatch.split("remote-manifest")
@@ -373,6 +393,7 @@ class StorageSyncJob private constructor(parameters: Parameters) : BaseJob(param
is StorageServiceRepository.WriteStorageRecordsResult.NetworkError -> throw result.exception
StorageServiceRepository.WriteStorageRecordsResult.ConflictError -> {
Log.w(TAG, "Hit a conflict when trying to resolve the conflict! Retrying.")
localManifestOutOfDate = true
throw RetryLaterException()
}
}
@@ -410,7 +431,7 @@ class StorageSyncJob private constructor(parameters: Parameters) : BaseJob(param
}
Log.i(TAG, "Enqueueing a storage sync job to handle any possible merges after applying unknown records.")
AppDependencies.jobManager.add(StorageSyncJob())
AppDependencies.jobManager.add(StorageSyncJob.forLocalChange())
}
stopwatch.split("known-unknowns")
@@ -582,7 +603,8 @@ class StorageSyncJob private constructor(parameters: Parameters) : BaseJob(param
class Factory : Job.Factory<StorageSyncJob?> {
override fun create(parameters: Parameters, serializedData: ByteArray?): StorageSyncJob {
return StorageSyncJob(parameters)
val data = serializedData?.let { StorageSyncJobData.ADAPTER.decode(it) } ?: StorageSyncJobData()
return StorageSyncJob(parameters, data.localManifestOutOfDate)
}
}
}

View File

@@ -57,6 +57,7 @@ import org.thoughtcrime.securesms.jobs.PushProcessMessageJob
import org.thoughtcrime.securesms.jobs.RefreshAttributesJob
import org.thoughtcrime.securesms.jobs.RetrieveProfileJob
import org.thoughtcrime.securesms.jobs.SendDeliveryReceiptJob
import org.thoughtcrime.securesms.jobs.StorageSyncJob
import org.thoughtcrime.securesms.jobs.TrimThreadJob
import org.thoughtcrime.securesms.jobs.protos.GroupCallPeekJobData
import org.thoughtcrime.securesms.keyvalue.SignalStore
@@ -88,7 +89,6 @@ import org.thoughtcrime.securesms.recipients.Recipient.HiddenState
import org.thoughtcrime.securesms.recipients.RecipientId
import org.thoughtcrime.securesms.recipients.RecipientUtil
import org.thoughtcrime.securesms.stickers.StickerLocator
import org.thoughtcrime.securesms.storage.StorageSyncHelper
import org.thoughtcrime.securesms.util.EarlyMessageCacheEntry
import org.thoughtcrime.securesms.util.LinkUtil
import org.thoughtcrime.securesms.util.MediaUtil
@@ -244,7 +244,7 @@ object DataMessageProcessor {
if (senderRecipient.isSelf) {
if (ProfileKeyUtil.getSelfProfileKey() != messageProfileKey) {
warn(timestamp, "Saw a sync message whose profile key doesn't match our records. Scheduling a storage sync to check.")
StorageSyncHelper.scheduleSyncForDataChange()
AppDependencies.jobManager.add(StorageSyncJob.forRemoteChange())
}
} else if (messageProfileKey != null) {
if (messageProfileKeyBytes.contentEquals(senderRecipient.profileKey)) {

View File

@@ -60,6 +60,7 @@ import org.thoughtcrime.securesms.jobs.PushProcessEarlyMessagesJob
import org.thoughtcrime.securesms.jobs.RefreshCallLinkDetailsJob
import org.thoughtcrime.securesms.jobs.RefreshOwnProfileJob
import org.thoughtcrime.securesms.jobs.StickerPackDownloadJob
import org.thoughtcrime.securesms.jobs.StorageSyncJob
import org.thoughtcrime.securesms.keyvalue.SignalStore
import org.thoughtcrime.securesms.linkpreview.LinkPreview
import org.thoughtcrime.securesms.messages.MessageContentProcessor.Companion.log
@@ -92,7 +93,6 @@ import org.thoughtcrime.securesms.recipients.RecipientId
import org.thoughtcrime.securesms.service.webrtc.links.CallLinkCredentials
import org.thoughtcrime.securesms.service.webrtc.links.CallLinkRoomId
import org.thoughtcrime.securesms.service.webrtc.links.SignalCallLinkState
import org.thoughtcrime.securesms.storage.StorageSyncHelper
import org.thoughtcrime.securesms.stories.Stories
import org.thoughtcrime.securesms.util.EarlyMessageCacheEntry
import org.thoughtcrime.securesms.util.IdentityUtil
@@ -1116,7 +1116,7 @@ object SyncMessageProcessor {
log(envelopeTimestamp, "Received fetch request with type: $fetchType")
when (fetchType) {
FetchLatest.Type.LOCAL_PROFILE -> AppDependencies.jobManager.add(RefreshOwnProfileJob())
FetchLatest.Type.STORAGE_MANIFEST -> StorageSyncHelper.scheduleSyncForDataChange()
FetchLatest.Type.STORAGE_MANIFEST -> AppDependencies.jobManager.add(StorageSyncJob.forRemoteChange())
FetchLatest.Type.SUBSCRIPTION_STATUS -> warn(envelopeTimestamp, "Dropping subscription status fetch message.")
else -> warn(envelopeTimestamp, "Received a fetch message for an unknown type.")
}
@@ -1382,7 +1382,7 @@ object SyncMessageProcessor {
)
)
StorageSyncHelper.scheduleSyncForDataChange()
AppDependencies.jobManager.add(StorageSyncJob.forRemoteChange())
}
AppDependencies.jobManager.add(RefreshCallLinkDetailsJob(callLinkUpdate))

View File

@@ -47,7 +47,7 @@ public class AccountRecordMigrationJob extends MigrationJob {
}
SignalDatabase.recipients().markNeedsSync(Recipient.self().getId());
AppDependencies.getJobManager().add(new StorageSyncJob());
AppDependencies.getJobManager().add(StorageSyncJob.forLocalChange());
}
@Override

View File

@@ -45,12 +45,12 @@ internal class StorageFixLocalUnknownMigrationJob(
if (SignalStore.account.hasLinkedDevices) {
Log.i(TAG, "Multi-device.")
jobManager.startChain(StorageSyncJob())
jobManager.startChain(StorageSyncJob.forLocalChange())
.then(MultiDeviceKeysUpdateJob())
.enqueue()
} else {
Log.i(TAG, "Single-device.")
jobManager.add(StorageSyncJob())
jobManager.add(StorageSyncJob.forRemoteChange())
}
}

View File

@@ -54,12 +54,12 @@ public class StorageServiceMigrationJob extends MigrationJob {
if (SignalStore.account().hasLinkedDevices()) {
Log.i(TAG, "Multi-device.");
jobManager.startChain(new StorageSyncJob())
jobManager.startChain(StorageSyncJob.forLocalChange())
.then(new MultiDeviceKeysUpdateJob())
.enqueue();
} else {
Log.i(TAG, "Single-device.");
jobManager.add(new StorageSyncJob());
jobManager.add(StorageSyncJob.forLocalChange());
}
}

View File

@@ -874,7 +874,7 @@ class RegistrationViewModel : ViewModel() {
stopwatch.split("account-restore")
AppDependencies.jobManager
.startChain(StorageSyncJob())
.startChain(StorageSyncJob.forRemoteChange())
.then(ReclaimUsernameAndLinkJob())
.enqueueAndBlockUntilCompletion(TimeUnit.SECONDS.toMillis(10))
stopwatch.split("storage-sync")

View File

@@ -46,7 +46,7 @@ public final class RegistrationUtil {
}
AppDependencies.getJobManager().startChain(new RefreshAttributesJob())
.then(new StorageSyncJob())
.then(StorageSyncJob.forRemoteChange())
.then(new DirectoryRefreshJob(false))
.enqueue();

View File

@@ -41,7 +41,7 @@ object StorageServiceRestore {
AppDependencies
.jobManager
.startChain(StorageSyncJob())
.startChain(StorageSyncJob.forRemoteChange())
.then(ReclaimUsernameAndLinkJob())
.enqueueBlocking(10.seconds)
stopwatch.split("storage-sync-restore")

View File

@@ -260,16 +260,16 @@ object StorageSyncHelper {
Log.d(TAG, "Registration still ongoing. Ignore sync request.")
return
}
AppDependencies.jobManager.add(StorageSyncJob())
AppDependencies.jobManager.add(StorageSyncJob.forLocalChange())
}
@JvmStatic
fun scheduleRoutineSync() {
val timeSinceLastSync = System.currentTimeMillis() - SignalStore.storageService.lastSyncTime
if (timeSinceLastSync > REFRESH_INTERVAL) {
if (timeSinceLastSync > REFRESH_INTERVAL && SignalStore.registration.isRegistrationComplete) {
Log.d(TAG, "Scheduling a sync. Last sync was $timeSinceLastSync ms ago.")
scheduleSyncForDataChange()
AppDependencies.jobManager.add(StorageSyncJob.forRemoteChange())
} else {
Log.d(TAG, "No need for sync. Last sync was $timeSinceLastSync ms ago.")
}

View File

@@ -213,4 +213,8 @@ message InAppPaymentSourceData {
message InAppPaymentSetupJobData {
uint64 inAppPaymentId = 1;
InAppPaymentSourceData inAppPaymentSource = 2;
}
message StorageSyncJobData {
bool localManifestOutOfDate = 1;
}