mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-21 06:48:07 +01:00
Use pre-calculated pre-key counts when possible
This commit is contained in:
@@ -24,11 +24,16 @@ public class SingleUseECPreKeyStore extends SingleUsePreKeyStore<ECPreKey> {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, AttributeValue> getItemFromPreKey(final UUID identifier, final byte deviceId, final ECPreKey preKey) {
|
||||
protected Map<String, AttributeValue> getItemFromPreKey(final UUID identifier,
|
||||
final byte deviceId,
|
||||
final ECPreKey preKey,
|
||||
final int remainingKeys) {
|
||||
|
||||
return Map.of(
|
||||
KEY_ACCOUNT_UUID, getPartitionKey(identifier),
|
||||
KEY_DEVICE_ID_KEY_ID, getSortKey(deviceId, preKey.keyId()),
|
||||
ATTR_PUBLIC_KEY, AttributeValues.fromByteArray(preKey.serializedPublicKey()));
|
||||
ATTR_PUBLIC_KEY, AttributeValues.fromByteArray(preKey.serializedPublicKey()),
|
||||
ATTR_REMAINING_KEYS, AttributeValues.fromInt(remainingKeys));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -21,12 +21,17 @@ public class SingleUseKEMPreKeyStore extends SingleUsePreKeyStore<KEMSignedPreKe
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, AttributeValue> getItemFromPreKey(final UUID identifier, final byte deviceId, final KEMSignedPreKey signedPreKey) {
|
||||
protected Map<String, AttributeValue> getItemFromPreKey(final UUID identifier,
|
||||
final byte deviceId,
|
||||
final KEMSignedPreKey signedPreKey,
|
||||
final int remainingKeys) {
|
||||
|
||||
return Map.of(
|
||||
KEY_ACCOUNT_UUID, getPartitionKey(identifier),
|
||||
KEY_DEVICE_ID_KEY_ID, getSortKey(deviceId, signedPreKey.keyId()),
|
||||
ATTR_PUBLIC_KEY, AttributeValues.fromByteArray(signedPreKey.serializedPublicKey()),
|
||||
ATTR_SIGNATURE, AttributeValues.fromByteArray(signedPreKey.signature()));
|
||||
ATTR_SIGNATURE, AttributeValues.fromByteArray(signedPreKey.signature()),
|
||||
ATTR_REMAINING_KEYS, AttributeValues.fromInt(remainingKeys));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -8,16 +8,19 @@ package org.whispersystems.textsecuregcm.storage;
|
||||
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
|
||||
import static org.whispersystems.textsecuregcm.storage.AbstractDynamoDbStore.DYNAMO_DB_MAX_BATCH_SIZE;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.micrometer.core.instrument.DistributionSummary;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import io.micrometer.core.instrument.Timer;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.time.Duration;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.whispersystems.textsecuregcm.entities.PreKey;
|
||||
import org.whispersystems.textsecuregcm.util.AttributeValues;
|
||||
@@ -49,9 +52,10 @@ public abstract class SingleUsePreKeyStore<K extends PreKey<?>> {
|
||||
private final DynamoDbAsyncClient dynamoDbAsyncClient;
|
||||
private final String tableName;
|
||||
|
||||
private final String getKeyCountTimerName = name(getClass(), "getCount");
|
||||
|
||||
private final Timer storeKeyTimer = Metrics.timer(name(getClass(), "storeKey"));
|
||||
private final Timer storeKeyBatchTimer = Metrics.timer(name(getClass(), "storeKeyBatch"));
|
||||
private final Timer getKeyCountTimer = Metrics.timer(name(getClass(), "getCount"));
|
||||
private final Timer deleteForDeviceTimer = Metrics.timer(name(getClass(), "deleteForDevice"));
|
||||
private final Timer deleteForAccountTimer = Metrics.timer(name(getClass(), "deleteForAccount"));
|
||||
|
||||
@@ -74,6 +78,7 @@ public abstract class SingleUsePreKeyStore<K extends PreKey<?>> {
|
||||
static final String KEY_DEVICE_ID_KEY_ID = "DK";
|
||||
static final String ATTR_PUBLIC_KEY = "P";
|
||||
static final String ATTR_SIGNATURE = "S";
|
||||
static final String ATTR_REMAINING_KEYS = "R";
|
||||
|
||||
protected SingleUsePreKeyStore(final DynamoDbAsyncClient dynamoDbAsyncClient, final String tableName) {
|
||||
this.dynamoDbAsyncClient = dynamoDbAsyncClient;
|
||||
@@ -97,18 +102,22 @@ public abstract class SingleUsePreKeyStore<K extends PreKey<?>> {
|
||||
return Mono.fromFuture(() -> delete(identifier, deviceId))
|
||||
.thenMany(
|
||||
Flux.fromIterable(preKeys)
|
||||
.flatMap(preKey -> Mono.fromFuture(() -> store(identifier, deviceId, preKey)), DYNAMO_DB_MAX_BATCH_SIZE))
|
||||
.sort(Comparator.comparing(preKey -> preKey.keyId()))
|
||||
.zipWith(Flux.range(0, preKeys.size()).map(i -> preKeys.size() - i))
|
||||
.flatMap(preKeyAndRemainingCount -> Mono.fromFuture(() ->
|
||||
store(identifier, deviceId, preKeyAndRemainingCount.getT1(), preKeyAndRemainingCount.getT2())),
|
||||
DYNAMO_DB_MAX_BATCH_SIZE))
|
||||
.then()
|
||||
.toFuture()
|
||||
.thenRun(() -> sample.stop(storeKeyBatchTimer));
|
||||
}
|
||||
|
||||
private CompletableFuture<Void> store(final UUID identifier, final byte deviceId, final K preKey) {
|
||||
private CompletableFuture<Void> store(final UUID identifier, final byte deviceId, final K preKey, final int remainingKeys) {
|
||||
final Timer.Sample sample = Timer.start();
|
||||
|
||||
return dynamoDbAsyncClient.putItem(PutItemRequest.builder()
|
||||
.tableName(tableName)
|
||||
.item(getItemFromPreKey(identifier, deviceId, preKey))
|
||||
.item(getItemFromPreKey(identifier, deviceId, preKey, remainingKeys))
|
||||
.build())
|
||||
.thenRun(() -> sample.stop(storeKeyTimer));
|
||||
}
|
||||
@@ -172,6 +181,56 @@ public abstract class SingleUsePreKeyStore<K extends PreKey<?>> {
|
||||
public CompletableFuture<Integer> getCount(final UUID identifier, final byte deviceId) {
|
||||
final Timer.Sample sample = Timer.start();
|
||||
|
||||
final AtomicBoolean countFromPeek = new AtomicBoolean(true);
|
||||
|
||||
return peekCount(identifier, deviceId)
|
||||
.thenCompose(maybeCount -> maybeCount
|
||||
.map(CompletableFuture::completedFuture)
|
||||
// Older key sets may not have a pre-calculated pre-key count; take the less efficient approach of counting
|
||||
// items instead
|
||||
.orElseGet(() -> {
|
||||
countFromPeek.set(false);
|
||||
return scanCount(identifier, deviceId);
|
||||
}))
|
||||
.whenComplete((keyCount, throwable) -> {
|
||||
sample.stop(Metrics.timer(getKeyCountTimerName, "method", countFromPeek.get() ? "peek" : "scan"));
|
||||
|
||||
if (throwable == null && keyCount != null) {
|
||||
availableKeyCountDistributionSummary.record(keyCount);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
CompletableFuture<Optional<Integer>> peekCount(final UUID identifier, final byte deviceId) {
|
||||
return dynamoDbAsyncClient.query(QueryRequest.builder()
|
||||
.tableName(tableName)
|
||||
.consistentRead(false)
|
||||
.keyConditionExpression("#uuid = :uuid AND begins_with (#sort, :sortprefix)")
|
||||
.expressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID, "#sort", KEY_DEVICE_ID_KEY_ID))
|
||||
.expressionAttributeValues(Map.of(
|
||||
":uuid", getPartitionKey(identifier),
|
||||
":sortprefix", getSortKeyPrefix(deviceId)))
|
||||
.projectionExpression(ATTR_REMAINING_KEYS)
|
||||
.limit(1)
|
||||
.build())
|
||||
.thenApply(response -> {
|
||||
if (response.count() > 0) {
|
||||
final Map<String, AttributeValue> item = response.items().getFirst();
|
||||
|
||||
if (item.containsKey(ATTR_REMAINING_KEYS)) {
|
||||
return Optional.of(Integer.parseInt(item.get(ATTR_REMAINING_KEYS).n()));
|
||||
} else {
|
||||
return Optional.empty();
|
||||
}
|
||||
} else {
|
||||
return Optional.of(0);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
CompletableFuture<Integer> scanCount(final UUID identifier, final byte deviceId) {
|
||||
// Getting an accurate count from DynamoDB can be very confusing. See:
|
||||
//
|
||||
// - https://github.com/aws/aws-sdk-java/issues/693
|
||||
@@ -189,14 +248,7 @@ public abstract class SingleUsePreKeyStore<K extends PreKey<?>> {
|
||||
.build()))
|
||||
.map(QueryResponse::count)
|
||||
.reduce(0, Integer::sum)
|
||||
.toFuture()
|
||||
.whenComplete((keyCount, throwable) -> {
|
||||
sample.stop(getKeyCountTimer);
|
||||
|
||||
if (throwable == null && keyCount != null) {
|
||||
availableKeyCountDistributionSummary.record(keyCount);
|
||||
}
|
||||
});
|
||||
.toFuture();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -280,8 +332,10 @@ public abstract class SingleUsePreKeyStore<K extends PreKey<?>> {
|
||||
return AttributeValues.fromByteBuffer(byteBuffer.flip());
|
||||
}
|
||||
|
||||
protected abstract Map<String, AttributeValue> getItemFromPreKey(final UUID identifier, final byte deviceId,
|
||||
final K preKey);
|
||||
protected abstract Map<String, AttributeValue> getItemFromPreKey(final UUID identifier,
|
||||
final byte deviceId,
|
||||
final K preKey,
|
||||
final int remainingKeys);
|
||||
|
||||
protected abstract K getPreKeyFromItem(final Map<String, AttributeValue> item);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user