Track backup metrics on refreshes

This commit is contained in:
Ravi Khadiwala
2025-05-27 18:10:24 -05:00
committed by ravi-signal
parent 030d8e8dd4
commit 4dc3b19d2a
10 changed files with 198 additions and 131 deletions

View File

@@ -7,10 +7,14 @@ package org.whispersystems.textsecuregcm.auth;
import org.signal.libsignal.zkgroup.backups.BackupCredentialType;
import org.signal.libsignal.zkgroup.backups.BackupLevel;
import org.whispersystems.textsecuregcm.util.ua.UserAgent;
import javax.annotation.Nullable;
public record AuthenticatedBackupUser(byte[] backupId,
BackupCredentialType credentialType,
BackupLevel backupLevel,
String backupDir,
String mediaDir) {
public record AuthenticatedBackupUser(
byte[] backupId,
BackupCredentialType credentialType,
BackupLevel backupLevel,
String backupDir,
String mediaDir,
@Nullable UserAgent userAgent) {
}

View File

@@ -10,6 +10,8 @@ import io.dropwizard.util.DataSize;
import io.grpc.Status;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import java.security.SecureRandom;
import java.time.Clock;
@@ -36,9 +38,13 @@ import org.whispersystems.textsecuregcm.attachments.TusAttachmentGenerator;
import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
import org.whispersystems.textsecuregcm.util.AsyncTimerUtil;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException;
import org.whispersystems.textsecuregcm.util.ua.UserAgent;
import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
@@ -316,7 +322,9 @@ public class BackupManager {
.thenApply(ignored -> usage))
.whenComplete((newUsage, throwable) -> {
boolean usageChanged = throwable == null && !newUsage.equals(info.usageInfo());
Metrics.counter(USAGE_RECALCULATION_COUNTER_NAME, "usageChanged", String.valueOf(usageChanged))
Metrics.counter(USAGE_RECALCULATION_COUNTER_NAME, Tags.of(
UserAgentTagUtil.getPlatformTag(backupUser.userAgent()),
Tag.of("usageChanged", String.valueOf(usageChanged))))
.increment();
})
.thenApply(newUsage -> MAX_TOTAL_BACKUP_MEDIA_BYTES - newUsage.bytesUsed());
@@ -520,7 +528,8 @@ public class BackupManager {
*/
public CompletableFuture<AuthenticatedBackupUser> authenticateBackupUser(
final BackupAuthCredentialPresentation presentation,
final byte[] signature) {
final byte[] signature,
final String userAgentString) {
final PresentationSignatureVerifier signatureVerifier = verifyPresentation(presentation);
return backupsDb
.retrieveAuthenticationData(presentation.getBackupId())
@@ -538,12 +547,20 @@ public class BackupManager {
final Pair<BackupCredentialType, BackupLevel> credentialTypeAndBackupLevel =
signatureVerifier.verifySignature(signature, authenticationData.publicKey());
UserAgent userAgent;
try {
userAgent = UserAgentUtil.parseUserAgentString(userAgentString);
} catch (UnrecognizedUserAgentException e) {
userAgent = null;
}
return new AuthenticatedBackupUser(
presentation.getBackupId(),
credentialTypeAndBackupLevel.first(),
credentialTypeAndBackupLevel.second(),
authenticationData.backupDir(),
authenticationData.mediaDir());
authenticationData.mediaDir(),
userAgent);
})
.thenApply(result -> {
Metrics.counter(ZK_AUTHN_COUNTER_NAME, SUCCESS_TAG_NAME, String.valueOf(true)).increment();
@@ -673,8 +690,9 @@ public class BackupManager {
@VisibleForTesting
static void checkBackupLevel(final AuthenticatedBackupUser backupUser, final BackupLevel backupLevel) {
if (backupUser.backupLevel().compareTo(backupLevel) < 0) {
Metrics.counter(ZK_AUTHZ_FAILURE_COUNTER_NAME,
FAILURE_REASON_TAG_NAME, "level")
Metrics.counter(ZK_AUTHZ_FAILURE_COUNTER_NAME, Tags.of(
UserAgentTagUtil.getPlatformTag(backupUser.userAgent()),
Tag.of(FAILURE_REASON_TAG_NAME, "level")))
.increment();
throw Status.PERMISSION_DENIED

View File

@@ -11,6 +11,7 @@ import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.time.Clock;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
@@ -21,12 +22,16 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import org.signal.libsignal.protocol.InvalidKeyException;
import org.signal.libsignal.protocol.ecc.ECPublicKey;
import org.signal.libsignal.zkgroup.backups.BackupLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser;
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import org.whispersystems.textsecuregcm.util.Util;
@@ -38,6 +43,7 @@ import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ReturnValue;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.Update;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
@@ -79,6 +85,10 @@ public class BackupsDb {
private final SecureRandom secureRandom;
private static final String NUM_OBJECTS_SUMMARY_NAME = "numObjects";
private static final String BYTES_USED_SUMMARY_NAME = "bytesUsed";
private static final String BACKUPS_COUNTER_NAME = "backups";
// The backups table
// B: 16 bytes that identifies the backup
@@ -217,12 +227,10 @@ public class BackupsDb {
*/
CompletableFuture<Void> trackMedia(final AuthenticatedBackupUser backupUser, final long mediaCountDelta,
final long mediaBytesDelta) {
final Instant now = clock.instant();
return dynamoClient
.updateItem(
// Update the media quota and TTL
UpdateBuilder.forUser(backupTableName, backupUser)
.setRefreshTimes(now)
.incrementMediaBytes(mediaBytesDelta)
.incrementMediaCount(mediaCountDelta)
.updateItemBuilder()
@@ -237,12 +245,15 @@ public class BackupsDb {
* @param backupUser an already authorized backup user
*/
CompletableFuture<Void> ttlRefresh(final AuthenticatedBackupUser backupUser) {
final Instant today = clock.instant().truncatedTo(ChronoUnit.DAYS);
// update message backup TTL
return dynamoClient.updateItem(UpdateBuilder.forUser(backupTableName, backupUser)
.setRefreshTimes(clock)
.setRefreshTimes(today)
.updateItemBuilder()
.returnValues(ReturnValue.ALL_OLD)
.build())
.thenRun(Util.NOOP);
.thenAccept(updateItemResponse ->
updateMetricsAfterRefresh(backupUser, today, updateItemResponse.attributes()));
}
/**
@@ -251,14 +262,40 @@ public class BackupsDb {
* @param backupUser an already authorized backup user
*/
CompletableFuture<Void> addMessageBackup(final AuthenticatedBackupUser backupUser) {
final Instant today = clock.instant().truncatedTo(ChronoUnit.DAYS);
// this could race with concurrent updates, but the only effect would be last-writer-wins on the timestamp
return dynamoClient.updateItem(
UpdateBuilder.forUser(backupTableName, backupUser)
.setRefreshTimes(clock)
.setRefreshTimes(today)
.setCdn(BACKUP_CDN)
.updateItemBuilder()
.returnValues(ReturnValue.ALL_OLD)
.build())
.thenRun(Util.NOOP);
.thenAccept(updateItemResponse ->
updateMetricsAfterRefresh(backupUser, today, updateItemResponse.attributes()));
}
private void updateMetricsAfterRefresh(final AuthenticatedBackupUser backupUser, final Instant today, final Map<String, AttributeValue> item) {
final Instant previousRefreshTime = Instant.ofEpochSecond(
AttributeValues.getLong(item, ATTR_LAST_REFRESH, 0L));
// Only publish a metric update once per day
if (previousRefreshTime.isBefore(today)) {
final long mediaCount = AttributeValues.getLong(item, ATTR_MEDIA_COUNT, 0L);
final long bytesUsed = AttributeValues.getLong(item, ATTR_MEDIA_BYTES_USED, 0L);
final Tags tags = Tags.of(
UserAgentTagUtil.getPlatformTag(backupUser.userAgent()),
Tag.of("tier", backupUser.backupLevel().name()));
Metrics.summary(NUM_OBJECTS_SUMMARY_NAME, tags).record(mediaCount);
Metrics.summary(BYTES_USED_SUMMARY_NAME, tags).record(bytesUsed);
// Report that the backup is out of quota if it cannot store a max size media object
final boolean quotaExhausted = bytesUsed >=
(BackupManager.MAX_TOTAL_BACKUP_MEDIA_BYTES - BackupManager.MAX_MEDIA_OBJECT_SIZE);
Metrics.counter(BACKUPS_COUNTER_NAME,
tags.and("quotaExhausted", String.valueOf(quotaExhausted)))
.increment();
}
}
/**
@@ -707,17 +744,20 @@ public class BackupsDb {
};
}
UpdateBuilder setRefreshTimes(final Clock clock) {
return setRefreshTimes(clock.instant().truncatedTo(ChronoUnit.DAYS));
}
/**
* Set the lastRefresh time as part of the update
* <p>
* This always updates lastRefreshTime, and updates lastMediaRefreshTime if the backup user has the appropriate
* level.
*/
UpdateBuilder setRefreshTimes(final Clock clock) {
return this.setRefreshTimes(clock.instant());
}
UpdateBuilder setRefreshTimes(final Instant refreshTime) {
if (!refreshTime.truncatedTo(ChronoUnit.DAYS).equals(refreshTime)) {
throw new IllegalArgumentException("Refresh time must be day aligned");
}
addSetExpression("#lastRefreshTime = :lastRefreshTime",
Map.entry("#lastRefreshTime", ATTR_LAST_REFRESH),
Map.entry(":lastRefreshTime", AttributeValues.n(refreshTime.getEpochSecond())));

View File

@@ -5,8 +5,6 @@
package org.whispersystems.textsecuregcm.controllers;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import com.fasterxml.jackson.core.JsonParser;
@@ -17,9 +15,6 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.net.HttpHeaders;
import io.dropwizard.auth.Auth;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
@@ -348,6 +343,7 @@ public class ArchiveController {
@ApiResponseZkAuth
public CompletionStage<ReadAuthResponse> readAuth(
@ReadOnly @Auth final Optional<AuthenticatedDevice> account,
@HeaderParam(HttpHeaders.USER_AGENT) final String userAgent,
@Parameter(description = BackupAuthCredentialPresentationHeader.DESCRIPTION, schema = @Schema(implementation = String.class))
@NotNull
@@ -361,7 +357,7 @@ public class ArchiveController {
if (account.isPresent()) {
throw new BadRequestException("must not use authenticated connection for anonymous operations");
}
return backupManager.authenticateBackupUser(presentation.presentation, signature.signature)
return backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent)
.thenApply(user -> backupManager.generateReadAuth(user, cdn))
.thenApply(ReadAuthResponse::new);
}
@@ -399,6 +395,7 @@ public class ArchiveController {
@ApiResponseZkAuth
public CompletionStage<BackupInfoResponse> backupInfo(
@ReadOnly @Auth final Optional<AuthenticatedDevice> account,
@HeaderParam(HttpHeaders.USER_AGENT) final String userAgent,
@Parameter(description = BackupAuthCredentialPresentationHeader.DESCRIPTION, schema = @Schema(implementation = String.class))
@NotNull
@@ -411,7 +408,7 @@ public class ArchiveController {
throw new BadRequestException("must not use authenticated connection for anonymous operations");
}
return backupManager.authenticateBackupUser(presentation.presentation, signature.signature)
return backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent)
.thenCompose(backupManager::backupInfo)
.thenApply(backupInfo -> new BackupInfoResponse(
backupInfo.cdn(),
@@ -454,6 +451,9 @@ public class ArchiveController {
@HeaderParam(X_SIGNAL_ZK_AUTH_SIGNATURE) final BackupAuthCredentialPresentationSignature signature,
@Valid @NotNull SetPublicKeyRequest setPublicKeyRequest) {
if (account.isPresent()) {
throw new BadRequestException("must not use authenticated connection for anonymous operations");
}
return backupManager
.setPublicKey(presentation.presentation, signature.signature, setPublicKeyRequest.backupIdPublicKey)
.thenApply(Util.ASYNC_EMPTY_RESPONSE);
@@ -481,6 +481,7 @@ public class ArchiveController {
@ApiResponseZkAuth
public CompletionStage<UploadDescriptorResponse> backup(
@ReadOnly @Auth final Optional<AuthenticatedDevice> account,
@HeaderParam(HttpHeaders.USER_AGENT) final String userAgent,
@Parameter(description = BackupAuthCredentialPresentationHeader.DESCRIPTION, schema = @Schema(implementation = String.class))
@NotNull
@@ -492,7 +493,7 @@ public class ArchiveController {
if (account.isPresent()) {
throw new BadRequestException("must not use authenticated connection for anonymous operations");
}
return backupManager.authenticateBackupUser(presentation.presentation, signature.signature)
return backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent)
.thenCompose(backupManager::createMessageBackupUploadDescriptor)
.thenApply(result -> new UploadDescriptorResponse(
result.cdn(),
@@ -517,6 +518,8 @@ public class ArchiveController {
@ApiResponseZkAuth
public CompletionStage<UploadDescriptorResponse> uploadTemporaryAttachment(
@ReadOnly @Auth final Optional<AuthenticatedDevice> account,
@HeaderParam(HttpHeaders.USER_AGENT) final String userAgent,
@Parameter(description = BackupAuthCredentialPresentationHeader.DESCRIPTION, schema = @Schema(implementation = String.class))
@NotNull
@@ -528,7 +531,7 @@ public class ArchiveController {
if (account.isPresent()) {
throw new BadRequestException("must not use authenticated connection for anonymous operations");
}
return backupManager.authenticateBackupUser(presentation.presentation, signature.signature)
return backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent)
.thenCompose(backupManager::createTemporaryAttachmentUploadDescriptor)
.thenApply(result -> new UploadDescriptorResponse(
result.cdn(),
@@ -620,7 +623,7 @@ public class ArchiveController {
}
return Mono
.fromFuture(backupManager.authenticateBackupUser(presentation.presentation, signature.signature))
.fromFuture(backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent))
.flatMap(backupUser -> backupManager.copyToBackup(backupUser, List.of(copyMediaRequest.toCopyParameters()))
.next()
.doOnNext(result -> backupMetrics.updateCopyCounter(result, UserAgentTagUtil.getPlatformTag(userAgent)))
@@ -719,7 +722,7 @@ public class ArchiveController {
throw new BadRequestException("must not use authenticated connection for anonymous operations");
}
final Stream<CopyParameters> copyParams = copyMediaRequest.items().stream().map(CopyMediaRequest::toCopyParameters);
return Mono.fromFuture(backupManager.authenticateBackupUser(presentation.presentation, signature.signature))
return Mono.fromFuture(backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent))
.flatMapMany(backupUser -> backupManager.copyToBackup(backupUser, copyParams.toList()))
.doOnNext(result -> backupMetrics.updateCopyCounter(result, UserAgentTagUtil.getPlatformTag(userAgent)))
.map(CopyMediaBatchResponse.Entry::fromCopyResult)
@@ -741,6 +744,7 @@ public class ArchiveController {
@ApiResponseZkAuth
public CompletionStage<Response> refresh(
@ReadOnly @Auth final Optional<AuthenticatedDevice> account,
@HeaderParam(HttpHeaders.USER_AGENT) final String userAgent,
@Parameter(description = BackupAuthCredentialPresentationHeader.DESCRIPTION, schema = @Schema(implementation = String.class))
@NotNull
@@ -753,7 +757,7 @@ public class ArchiveController {
throw new BadRequestException("must not use authenticated connection for anonymous operations");
}
return backupManager
.authenticateBackupUser(presentation.presentation, signature.signature)
.authenticateBackupUser(presentation.presentation, signature.signature, userAgent)
.thenCompose(backupManager::ttlRefresh)
.thenApply(Util.ASYNC_EMPTY_RESPONSE);
}
@@ -807,6 +811,7 @@ public class ArchiveController {
@ApiResponseZkAuth
public CompletionStage<ListResponse> listMedia(
@ReadOnly @Auth final Optional<AuthenticatedDevice> account,
@HeaderParam(HttpHeaders.USER_AGENT) final String userAgent,
@Parameter(description = BackupAuthCredentialPresentationHeader.DESCRIPTION, schema = @Schema(implementation = String.class))
@NotNull
@@ -825,7 +830,7 @@ public class ArchiveController {
throw new BadRequestException("must not use authenticated connection for anonymous operations");
}
return backupManager
.authenticateBackupUser(presentation.presentation, signature.signature)
.authenticateBackupUser(presentation.presentation, signature.signature, userAgent)
.thenCompose(backupUser -> backupManager.list(backupUser, cursor, limit.orElse(1000))
.thenApply(result -> new ListResponse(
result.media()
@@ -862,6 +867,7 @@ public class ArchiveController {
@ApiResponseZkAuth
public CompletionStage<Response> deleteMedia(
@ReadOnly @Auth final Optional<AuthenticatedDevice> account,
@HeaderParam(HttpHeaders.USER_AGENT) final String userAgent,
@Parameter(description = BackupAuthCredentialPresentationHeader.DESCRIPTION, schema = @Schema(implementation = String.class))
@NotNull
@@ -881,7 +887,7 @@ public class ArchiveController {
.toList();
return backupManager
.authenticateBackupUser(presentation.presentation, signature.signature)
.authenticateBackupUser(presentation.presentation, signature.signature, userAgent)
.thenCompose(authenticatedBackupUser -> backupManager
.deleteMedia(authenticatedBackupUser, toDelete)
.then().toFuture())
@@ -898,6 +904,7 @@ public class ArchiveController {
@ApiResponseZkAuth
public CompletionStage<Response> deleteBackup(
@ReadOnly @Auth final Optional<AuthenticatedDevice> account,
@HeaderParam(HttpHeaders.USER_AGENT) final String userAgent,
@Parameter(description = BackupAuthCredentialPresentationHeader.DESCRIPTION, schema = @Schema(implementation = String.class))
@NotNull
@@ -910,7 +917,7 @@ public class ArchiveController {
throw new BadRequestException("must not use authenticated connection for anonymous operations");
}
return backupManager
.authenticateBackupUser(presentation.presentation, signature.signature)
.authenticateBackupUser(presentation.presentation, signature.signature, userAgent)
.thenCompose(backupManager::deleteEntireBackup)
.thenApply(Util.ASYNC_EMPTY_RESPONSE);
}

View File

@@ -206,7 +206,8 @@ public class BackupsAnonymousGrpcService extends ReactorBackupsAnonymousGrpc.Bac
try {
return backupManager.authenticateBackupUser(
new BackupAuthCredentialPresentation(signedPresentation.getPresentation().toByteArray()),
signedPresentation.getPresentationSignature().toByteArray());
signedPresentation.getPresentationSignature().toByteArray(),
RequestAttributesUtil.getUserAgent().orElse(null));
} catch (InvalidInputException e) {
throw Status.UNAUTHENTICATED.withDescription("Could not deserialize presentation").asRuntimeException();
}

View File

@@ -11,7 +11,6 @@ import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Metrics;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.signal.libsignal.zkgroup.backups.BackupLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
@@ -64,43 +63,17 @@ public class BackupMetricsCommand extends AbstractCommandWithDependencies {
segments,
Runtime.getRuntime().availableProcessors());
final DistributionSummary numObjectsMediaTier = Metrics.summary(name(getClass(), "numObjects"),
"tier", BackupLevel.PAID.name());
final DistributionSummary bytesUsedMediaTier = Metrics.summary(name(getClass(), "bytesUsed"),
"tier", BackupLevel.PAID.name());
final DistributionSummary numObjectsMessagesTier = Metrics.summary(name(getClass(), "numObjects"),
"tier", BackupLevel.FREE.name());
final DistributionSummary bytesUsedMessagesTier = Metrics.summary(name(getClass(), "bytesUsed"),
"tier", BackupLevel.FREE.name());
final DistributionSummary timeSinceLastRefresh = Metrics.summary(name(getClass(),
"timeSinceLastRefresh"));
final DistributionSummary timeSinceLastMediaRefresh = Metrics.summary(name(getClass(),
"timeSinceLastMediaRefresh"));
final String backupsCounterName = name(getClass(), "backups");
final BackupManager backupManager = commandDependencies.backupManager();
final Long backupsExpired = backupManager
.listBackupAttributes(segments, Schedulers.parallel())
.doOnNext(backupMetadata -> {
final boolean subscribed = backupMetadata.lastMediaRefresh().equals(backupMetadata.lastRefresh());
if (subscribed) {
numObjectsMediaTier.record(backupMetadata.numObjects());
bytesUsedMediaTier.record(backupMetadata.bytesUsed());
} else {
numObjectsMessagesTier.record(backupMetadata.numObjects());
bytesUsedMessagesTier.record(backupMetadata.bytesUsed());
}
timeSinceLastRefresh.record(timeSince(backupMetadata.lastRefresh()).getSeconds());
timeSinceLastMediaRefresh.record(timeSince(backupMetadata.lastMediaRefresh()).getSeconds());
// Report that the backup is out of quota if it cannot store a max size media object
final boolean quotaExhausted = backupMetadata.bytesUsed() >=
(BackupManager.MAX_TOTAL_BACKUP_MEDIA_BYTES - BackupManager.MAX_MEDIA_OBJECT_SIZE);
Metrics.counter(backupsCounterName,
"subscribed", String.valueOf(subscribed),
"quotaExhausted", String.valueOf(quotaExhausted)).increment();
})
.count()
.block();