mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-20 10:28:12 +01:00
Remove UAK normalization code
All accounts now have UAKs in top-level attributes
This commit is contained in:
committed by
ravi-signal
parent
953cd2ae0c
commit
a7f1cd25b9
@@ -48,10 +48,6 @@ public class DynamicConfiguration {
|
||||
@Valid
|
||||
private DynamicPushLatencyConfiguration pushLatency = new DynamicPushLatencyConfiguration(Collections.emptyMap());
|
||||
|
||||
@JsonProperty
|
||||
@Valid
|
||||
private DynamicUakMigrationConfiguration uakMigrationConfiguration = new DynamicUakMigrationConfiguration();
|
||||
|
||||
@JsonProperty
|
||||
@Valid
|
||||
private DynamicTurnConfiguration turn = new DynamicTurnConfiguration();
|
||||
@@ -115,10 +111,6 @@ public class DynamicConfiguration {
|
||||
return pushLatency;
|
||||
}
|
||||
|
||||
public DynamicUakMigrationConfiguration getUakMigrationConfiguration() {
|
||||
return uakMigrationConfiguration;
|
||||
}
|
||||
|
||||
public DynamicTurnConfiguration getTurnConfiguration() {
|
||||
return turn;
|
||||
}
|
||||
|
||||
@@ -1,19 +0,0 @@
|
||||
package org.whispersystems.textsecuregcm.configuration.dynamic;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
public class DynamicUakMigrationConfiguration {
|
||||
@JsonProperty
|
||||
private boolean enabled = true;
|
||||
|
||||
@JsonProperty
|
||||
private int maxOutstandingNormalizes = 25;
|
||||
|
||||
public boolean isEnabled() {
|
||||
return enabled;
|
||||
}
|
||||
|
||||
public int getMaxOutstandingNormalizes() {
|
||||
return maxOutstandingNormalizes;
|
||||
}
|
||||
}
|
||||
@@ -9,24 +9,19 @@ import static com.codahale.metrics.MetricRegistry.name;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.micrometer.core.instrument.Counter;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import io.micrometer.core.instrument.Tags;
|
||||
import io.micrometer.core.instrument.Timer;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@@ -35,7 +30,6 @@ import java.util.stream.Collectors;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicUakMigrationConfiguration;
|
||||
import org.whispersystems.textsecuregcm.util.AttributeValues;
|
||||
import org.whispersystems.textsecuregcm.util.SystemMapper;
|
||||
import org.whispersystems.textsecuregcm.util.UUIDUtil;
|
||||
@@ -47,7 +41,6 @@ import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedExce
|
||||
import software.amazon.awssdk.services.dynamodb.model.Delete;
|
||||
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException;
|
||||
import software.amazon.awssdk.services.dynamodb.model.Put;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ReturnValuesOnConditionCheckFailure;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
|
||||
@@ -78,7 +71,6 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
// unidentified access key; byte[] or null
|
||||
static final String ATTR_UAK = "UAK";
|
||||
|
||||
private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
|
||||
private final DynamoDbClient client;
|
||||
private final DynamoDbAsyncClient asyncClient;
|
||||
|
||||
@@ -101,11 +93,6 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
private static final Timer GET_ALL_FROM_START_TIMER = Metrics.timer(name(Accounts.class, "getAllFrom"));
|
||||
private static final Timer GET_ALL_FROM_OFFSET_TIMER = Metrics.timer(name(Accounts.class, "getAllFromOffset"));
|
||||
private static final Timer DELETE_TIMER = Metrics.timer(name(Accounts.class, "delete"));
|
||||
private static final Timer NORMALIZE_ITEM_TIMER = Metrics.timer(name(Accounts.class, "normalizeItem"));
|
||||
|
||||
private static final Counter UAK_NORMALIZE_SUCCESS_COUNT = Metrics.counter(name(Accounts.class, "normalizeUakSuccess"));
|
||||
private static final String UAK_NORMALIZE_ERROR_NAME = name(Accounts.class, "normalizeUakError");
|
||||
private static final String UAK_NORMALIZE_FAILURE_REASON_TAG_NAME = "reason";
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(Accounts.class);
|
||||
|
||||
@@ -116,7 +103,6 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
final int scanPageSize) {
|
||||
|
||||
super(client);
|
||||
this.dynamicConfigurationManager = dynamicConfigurationManager;
|
||||
this.client = client;
|
||||
this.asyncClient = asyncClient;
|
||||
this.phoneNumberConstraintTableName = phoneNumberConstraintTableName;
|
||||
@@ -697,82 +683,10 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
return toRecord.get().whenComplete((ignoreT, ignoreE) -> timer.record(Duration.between(start, Instant.now())));
|
||||
}
|
||||
|
||||
private List<Account> normalizeIfRequired(final List<Map<String, AttributeValue>> items) {
|
||||
|
||||
// The UAK top-level attribute may not exist on older records,
|
||||
// if it is absent and there is a UAK in the account blob we'll
|
||||
// add the UAK as a top-level attribute
|
||||
// TODO: Can eliminate this once all uaks exist as top-level attributes
|
||||
final List<Account> allAccounts = new ArrayList<>();
|
||||
final List<Account> accountsToNormalize = new ArrayList<>();
|
||||
for (Map<String, AttributeValue> item : items) {
|
||||
final Account account = fromItem(item);
|
||||
allAccounts.add(account);
|
||||
|
||||
boolean hasAttrUak = item.containsKey(ATTR_UAK);
|
||||
if (!hasAttrUak && account.getUnidentifiedAccessKey().isPresent()) {
|
||||
// the top level uak attribute doesn't exist, but there's a uak in the account
|
||||
accountsToNormalize.add(account);
|
||||
} else if (hasAttrUak && account.getUnidentifiedAccessKey().isPresent()) {
|
||||
final AttributeValue attr = item.get(ATTR_UAK);
|
||||
final byte[] nestedUak = account.getUnidentifiedAccessKey().get();
|
||||
if (!Arrays.equals(attr.b().asByteArray(), nestedUak)) {
|
||||
log.warn("Discovered mismatch between attribute UAK data UAK, normalizing");
|
||||
accountsToNormalize.add(account);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final DynamicUakMigrationConfiguration currentConfig = this.dynamicConfigurationManager.getConfiguration().getUakMigrationConfiguration();
|
||||
if (!currentConfig.isEnabled()) {
|
||||
log.debug("Account normalization is disabled, skipping normalization for {} accounts", accountsToNormalize.size());
|
||||
return allAccounts;
|
||||
}
|
||||
|
||||
for (List<Account> accounts : Lists.partition(accountsToNormalize, currentConfig.getMaxOutstandingNormalizes())) {
|
||||
try {
|
||||
final CompletableFuture<?>[] accountFutures = accounts.stream()
|
||||
.map(account -> record(NORMALIZE_ITEM_TIMER,
|
||||
() -> this.updateAsync(account).whenComplete((result, throwable) -> {
|
||||
if (throwable == null) {
|
||||
UAK_NORMALIZE_SUCCESS_COUNT.increment();
|
||||
return;
|
||||
}
|
||||
|
||||
throwable = unwrap(throwable);
|
||||
if (throwable instanceof ContestedOptimisticLockException) {
|
||||
// Could succeed on retry, but just backoff since this is a housekeeping operation
|
||||
Metrics.counter(UAK_NORMALIZE_ERROR_NAME,
|
||||
Tags.of(UAK_NORMALIZE_FAILURE_REASON_TAG_NAME, "ContestedOptimisticLock")).increment();
|
||||
} else if (throwable instanceof ProvisionedThroughputExceededException) {
|
||||
Metrics.counter(UAK_NORMALIZE_ERROR_NAME,
|
||||
Tags.of(UAK_NORMALIZE_FAILURE_REASON_TAG_NAME, "ProvisionedThroughPutExceeded"))
|
||||
.increment();
|
||||
} else {
|
||||
log.warn("Failed to normalize account, skipping", throwable);
|
||||
Metrics.counter(UAK_NORMALIZE_ERROR_NAME,
|
||||
Tags.of(UAK_NORMALIZE_FAILURE_REASON_TAG_NAME, "unknown"))
|
||||
.increment();
|
||||
}
|
||||
})).toCompletableFuture()).toArray(CompletableFuture[]::new);
|
||||
|
||||
// wait for a futures in batch to complete
|
||||
CompletableFuture
|
||||
.allOf(accountFutures)
|
||||
// exceptions handled in individual futures
|
||||
.exceptionally(e -> null)
|
||||
.join();
|
||||
} catch (Exception e) {
|
||||
log.warn("Failed to update batch of {} accounts, skipping", accounts.size(), e);
|
||||
}
|
||||
}
|
||||
return allAccounts;
|
||||
}
|
||||
|
||||
private AccountCrawlChunk scanForChunk(final ScanRequest.Builder scanRequestBuilder, final int maxCount, final Timer timer) {
|
||||
scanRequestBuilder.tableName(accountsTableName);
|
||||
final List<Map<String, AttributeValue>> items = timer.record(() -> scan(scanRequestBuilder.build(), maxCount));
|
||||
final List<Account> accounts = normalizeIfRequired(items);
|
||||
final List<Account> accounts = items.stream().map(Accounts::fromItem).toList();
|
||||
return new AccountCrawlChunk(accounts, accounts.size() > 0 ? accounts.get(accounts.size() - 1).getUuid() : null);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user