Workaround for sdk issue iterating large DynamoDB pages

This commit is contained in:
Ravi Khadiwala
2025-09-09 17:47:19 -05:00
committed by ravi-signal
parent c544628dfe
commit 7e3540bda0
6 changed files with 10 additions and 13 deletions

View File

@@ -630,11 +630,10 @@ public class BackupManager {
* List all backups stored in the backups table
*
* @param segments Number of segments to read in parallel from the underlying backup database
* @param scheduler Scheduler for running downstream operations
* @return Flux of {@link StoredBackupAttributes} for each backup record in the backups table
*/
public Flux<StoredBackupAttributes> listBackupAttributes(final int segments, final Scheduler scheduler) {
return this.backupsDb.listBackupAttributes(segments, scheduler);
public Flux<StoredBackupAttributes> listBackupAttributes(final int segments) {
return this.backupsDb.listBackupAttributes(segments);
}
/**

View File

@@ -497,14 +497,12 @@ public class BackupsDb {
}
}
Flux<StoredBackupAttributes> listBackupAttributes(final int segments, final Scheduler scheduler) {
Flux<StoredBackupAttributes> listBackupAttributes(final int segments) {
if (segments < 1) {
throw new IllegalArgumentException("Total number of segments must be positive");
}
return Flux.range(0, segments)
.parallel()
.runOn(scheduler)
.flatMap(segment -> dynamoClient.scanPaginator(ScanRequest.builder()
.tableName(backupTableName)
.consistentRead(true)
@@ -519,9 +517,9 @@ public class BackupsDb {
"#backupDir", ATTR_BACKUP_DIR,
"#mediaDir", ATTR_MEDIA_DIR))
.projectionExpression("#backupIdHash, #refresh, #mediaRefresh, #bytesUsed, #numObjects, #backupDir, #mediaDir")
.build())
.items())
.sequential()
.build()))
// Don't use the SDK's item publisher, works around https://github.com/aws/aws-sdk-java-v2/issues/6411
.concatMap(page -> Flux.fromIterable(page.items()))
.filter(item -> item.containsKey(KEY_BACKUP_ID_HASH))
.map(item -> new StoredBackupAttributes(
AttributeValues.getByteArray(item, KEY_BACKUP_ID_HASH, null),

View File

@@ -70,7 +70,7 @@ public class BackupMetricsCommand extends AbstractCommandWithDependencies {
final BackupManager backupManager = commandDependencies.backupManager();
final Long backupsExpired = backupManager
.listBackupAttributes(segments, Schedulers.parallel())
.listBackupAttributes(segments)
.doOnNext(backupMetadata -> {
timeSinceLastRefresh.record(timeSince(backupMetadata.lastRefresh()).getSeconds());
timeSinceLastMediaRefresh.record(timeSince(backupMetadata.lastMediaRefresh()).getSeconds());

View File

@@ -74,7 +74,7 @@ public class BackupUsageRecalculationCommand extends AbstractCommandWithDependen
final BackupManager backupManager = commandDependencies.backupManager();
final Long backupsConsidered = backupManager
.listBackupAttributes(segments, Schedulers.parallel())
.listBackupAttributes(segments)
.flatMap(attrs -> Mono.fromCompletionStage(() -> backupManager.recalculateQuota(attrs)).doOnNext(maybeRecalculationResult -> maybeRecalculationResult.ifPresent(recalculationResult -> {
if (!recalculationResult.newUsage().equals(recalculationResult.oldUsage())) {
logger.info("Recalculated usage. oldUsage={}, newUsage={}, lastRefresh={}, lastMediaRefresh={}",