diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupManager.java index 9c7624531..2e0b80eeb 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupManager.java @@ -630,11 +630,10 @@ public class BackupManager { * List all backups stored in the backups table * * @param segments Number of segments to read in parallel from the underlying backup database - * @param scheduler Scheduler for running downstream operations * @return Flux of {@link StoredBackupAttributes} for each backup record in the backups table */ - public Flux listBackupAttributes(final int segments, final Scheduler scheduler) { - return this.backupsDb.listBackupAttributes(segments, scheduler); + public Flux listBackupAttributes(final int segments) { + return this.backupsDb.listBackupAttributes(segments); } /** diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupsDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupsDb.java index 47ad082c4..3974d8e1f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupsDb.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupsDb.java @@ -497,14 +497,12 @@ public class BackupsDb { } } - Flux listBackupAttributes(final int segments, final Scheduler scheduler) { + Flux listBackupAttributes(final int segments) { if (segments < 1) { throw new IllegalArgumentException("Total number of segments must be positive"); } return Flux.range(0, segments) - .parallel() - .runOn(scheduler) .flatMap(segment -> dynamoClient.scanPaginator(ScanRequest.builder() .tableName(backupTableName) .consistentRead(true) @@ -519,9 +517,9 @@ public class BackupsDb { "#backupDir", ATTR_BACKUP_DIR, "#mediaDir", ATTR_MEDIA_DIR)) .projectionExpression("#backupIdHash, #refresh, #mediaRefresh, #bytesUsed, #numObjects, #backupDir, #mediaDir") - .build()) - .items()) - .sequential() + .build())) + // Don't use the SDK's item publisher, works around https://github.com/aws/aws-sdk-java-v2/issues/6411 + .concatMap(page -> Flux.fromIterable(page.items())) .filter(item -> item.containsKey(KEY_BACKUP_ID_HASH)) .map(item -> new StoredBackupAttributes( AttributeValues.getByteArray(item, KEY_BACKUP_ID_HASH, null), diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/BackupMetricsCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/BackupMetricsCommand.java index a2d2d912e..b9e597263 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/BackupMetricsCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/BackupMetricsCommand.java @@ -70,7 +70,7 @@ public class BackupMetricsCommand extends AbstractCommandWithDependencies { final BackupManager backupManager = commandDependencies.backupManager(); final Long backupsExpired = backupManager - .listBackupAttributes(segments, Schedulers.parallel()) + .listBackupAttributes(segments) .doOnNext(backupMetadata -> { timeSinceLastRefresh.record(timeSince(backupMetadata.lastRefresh()).getSeconds()); timeSinceLastMediaRefresh.record(timeSince(backupMetadata.lastMediaRefresh()).getSeconds()); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/BackupUsageRecalculationCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/BackupUsageRecalculationCommand.java index 5f121797b..eb2bc0ede 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/BackupUsageRecalculationCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/BackupUsageRecalculationCommand.java @@ -74,7 +74,7 @@ public class BackupUsageRecalculationCommand extends AbstractCommandWithDependen final BackupManager backupManager = commandDependencies.backupManager(); final Long backupsConsidered = backupManager - .listBackupAttributes(segments, Schedulers.parallel()) + .listBackupAttributes(segments) .flatMap(attrs -> Mono.fromCompletionStage(() -> backupManager.recalculateQuota(attrs)).doOnNext(maybeRecalculationResult -> maybeRecalculationResult.ifPresent(recalculationResult -> { if (!recalculationResult.newUsage().equals(recalculationResult.oldUsage())) { logger.info("Recalculated usage. oldUsage={}, newUsage={}, lastRefresh={}, lastMediaRefresh={}", diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupManagerTest.java index 68245bddc..e4fa07a6b 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupManagerTest.java @@ -682,7 +682,7 @@ public class BackupManagerTest { backupsDb.setMediaUsage(backupUser, oldUsage).join(); when(remoteStorageManager.calculateBytesUsed(eq(backupMediaPrefix))) .thenReturn(CompletableFuture.completedFuture(newUsage)); - final StoredBackupAttributes attrs = backupManager.listBackupAttributes(1, Schedulers.immediate()) + final StoredBackupAttributes attrs = backupManager.listBackupAttributes(1) .single() .blockOptional().orElseThrow(); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupsDbTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupsDbTest.java index df14d4eca..d14ba96d1 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupsDbTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupsDbTest.java @@ -236,7 +236,7 @@ public class BackupsDbTest { backupsDb.trackMedia(users.get(1), 10, 100).join(); backupsDb.trackMedia(users.get(2), 1, 1000).join(); - final List sbms = backupsDb.listBackupAttributes(1, Schedulers.immediate()) + final List sbms = backupsDb.listBackupAttributes(1) .sort(Comparator.comparing(StoredBackupAttributes::lastRefresh)) .collectList() .block();