Move "active epoch" configuration from dynamic to static configuration

This commit is contained in:
Jon Chambers
2026-06-16 17:27:57 -04:00
committed by Jon Chambers
parent 7c9e3d029b
commit 024fa9ce5f
6 changed files with 57 additions and 66 deletions
@@ -17,7 +17,8 @@ import java.util.Map;
import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore;
public record FoundationDbMessagesConfiguration(@NotEmpty Map<String, @Valid FoundationDbClusterConfiguration> clusters,
@NotEmpty Map<@PositiveOrZero @Max(FoundationDbMessageStore.MAX_EPOCHS - 1) Integer, @Size(min = 1, max = FoundationDbMessageStore.MAX_SHARDS - 1) List<String>> epochs) {
@NotEmpty Map<@PositiveOrZero @Max(FoundationDbMessageStore.MAX_EPOCHS - 1) Integer, @Size(min = 1, max = FoundationDbMessageStore.MAX_SHARDS - 1) List<String>> epochs,
@PositiveOrZero @Max(FoundationDbMessageStore.MAX_EPOCHS - 1) int activeEpoch) {
@AssertTrue
boolean isEveryEpochClusterConfigured() {
@@ -42,4 +43,9 @@ public record FoundationDbMessagesConfiguration(@NotEmpty Map<String, @Valid Fou
return true;
}
@AssertTrue
boolean isActiveEpochConfigured() {
return epochs().containsKey(activeEpoch());
}
}
@@ -84,11 +84,6 @@ public class DynamicConfiguration {
@Valid
private DynamicTurnConfiguration turn = new DynamicTurnConfiguration();
@JsonProperty
@Valid
private DynamicFoundationDbMessagesConfiguration foundationDbMessages =
new DynamicFoundationDbMessagesConfiguration(0);
public Optional<DynamicExperimentEnrollmentConfiguration> getExperimentEnrollmentConfiguration(
final String experimentName) {
return Optional.ofNullable(experiments.get(experimentName));
@@ -158,8 +153,4 @@ public class DynamicConfiguration {
public DynamicTurnConfiguration getTurnConfiguration() {
return turn;
}
public DynamicFoundationDbMessagesConfiguration getFoundationDbMessagesConfiguration() {
return foundationDbMessages;
}
}
@@ -1,13 +0,0 @@
/*
* Copyright 2026 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.configuration.dynamic;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.PositiveOrZero;
import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore;
public record DynamicFoundationDbMessagesConfiguration(@PositiveOrZero @Max(FoundationDbMessageStore.MAX_EPOCHS - 1) int activeEpoch) {
}
@@ -19,11 +19,9 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.storage.MessageStream;
import org.whispersystems.textsecuregcm.util.Conversions;
import org.whispersystems.textsecuregcm.util.Util;
@@ -42,8 +40,8 @@ import org.whispersystems.textsecuregcm.util.Util;
public class FoundationDbMessageStore {
private final Database[][] databasesByEpoch;
private final int activeEpoch;
private final VersionstampUUIDCipher versionstampUUIDCipher;
private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
private final Clock clock;
private static final Subspace MESSAGES_SUBSPACE = new Subspace(Tuple.from("M"));
@@ -69,8 +67,8 @@ public class FoundationDbMessageStore {
}
public FoundationDbMessageStore(final Map<Integer, List<Database>> databasesByEpoch,
final int activeEpoch,
final VersionstampUUIDCipher versionstampUUIDCipher,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
final Clock clock) {
final Database[][] databasesByEpochArray = new Database[MAX_EPOCHS][];
@@ -79,8 +77,8 @@ public class FoundationDbMessageStore {
databasesByEpochArray[epoch] = databases.toArray(Database[]::new));
this.databasesByEpoch = databasesByEpochArray;
this.activeEpoch = activeEpoch;
this.versionstampUUIDCipher = versionstampUUIDCipher;
this.dynamicConfigurationManager = dynamicConfigurationManager;
this.clock = clock;
}
@@ -93,7 +91,15 @@ public class FoundationDbMessageStore {
public CompletableFuture<Map<Byte, InsertResult>> insert(final AciServiceIdentifier aciServiceIdentifier,
final Map<Byte, MessageProtos.Envelope> messagesByDeviceId) {
return insert(Map.of(aciServiceIdentifier, messagesByDeviceId))
return insert(aciServiceIdentifier, messagesByDeviceId, activeEpoch);
}
@VisibleForTesting
CompletableFuture<Map<Byte, InsertResult>> insert(final AciServiceIdentifier aciServiceIdentifier,
final Map<Byte, MessageProtos.Envelope> messagesByDeviceId,
final int epoch) {
return insert(Map.of(aciServiceIdentifier, messagesByDeviceId), epoch)
.thenApply(resultsByServiceIdentifier -> {
assert resultsByServiceIdentifier.size() == 1;
@@ -115,6 +121,14 @@ public class FoundationDbMessageStore {
public CompletableFuture<Map<AciServiceIdentifier, Map<Byte, InsertResult>>> insert(
final Map<AciServiceIdentifier, Map<Byte, MessageProtos.Envelope>> messagesByServiceIdentifier) {
return insert(messagesByServiceIdentifier, activeEpoch);
}
@VisibleForTesting
CompletableFuture<Map<AciServiceIdentifier, Map<Byte, InsertResult>>> insert(
final Map<AciServiceIdentifier, Map<Byte, MessageProtos.Envelope>> messagesByServiceIdentifier,
final int epoch) {
if (messagesByServiceIdentifier.entrySet()
.stream()
.anyMatch(entry -> entry.getValue().isEmpty())) {
@@ -129,12 +143,9 @@ public class FoundationDbMessageStore {
throw new IllegalArgumentException("Messages must not have pre-set server GUIDs");
}
final int activeEpoch =
dynamicConfigurationManager.getConfiguration().getFoundationDbMessagesConfiguration().activeEpoch();
final Map<Integer, List<Map.Entry<AciServiceIdentifier, Map<Byte, MessageProtos.Envelope>>>> messagesByShardId =
messagesByServiceIdentifier.entrySet().stream()
.collect(Collectors.groupingBy(entry -> hashAciToShardNumber(entry.getKey(), activeEpoch)));
.collect(Collectors.groupingBy(entry -> hashAciToShardNumber(entry.getKey(), epoch)));
final List<CompletableFuture<Map<AciServiceIdentifier, Map<Byte, InsertResult>>>> chunkFutures =
new ArrayList<>();
@@ -150,7 +161,7 @@ public class FoundationDbMessageStore {
.sum();
if (estimatedTransactionSize > MAX_MESSAGE_CHUNK_SIZE) {
chunkFutures.add(insertChunk(shardId, activeEpoch, messagesForShard.subList(start, current)));
chunkFutures.add(insertChunk(shardId, epoch, messagesForShard.subList(start, current)));
start = current;
estimatedTransactionSize = 0;
@@ -160,7 +171,7 @@ public class FoundationDbMessageStore {
}
assert start < messagesForShard.size();
chunkFutures.add(insertChunk(shardId, activeEpoch, messagesForShard.subList(start, messagesForShard.size())));
chunkFutures.add(insertChunk(shardId, epoch, messagesForShard.subList(start, messagesForShard.size())));
});
return CompletableFuture.allOf(chunkFutures.toArray(CompletableFuture[]::new))
@@ -18,12 +18,14 @@ class FoundationDbMessagesConfigurationTest {
void isEveryEpochClusterConfigured() {
assertTrue(new FoundationDbMessagesConfiguration(
Map.of("messages-0", new FoundationDbClusterConfiguration("test-url", null)),
Map.of(0, List.of("messages-0"))
Map.of(0, List.of("messages-0")),
0
).isEveryEpochClusterConfigured());
assertFalse(new FoundationDbMessagesConfiguration(
Map.of("messages-0", new FoundationDbClusterConfiguration("test-url", null)),
Map.of(0, List.of("messages-0", "unconfigured-cluster"))
Map.of(0, List.of("messages-0", "unconfigured-cluster")),
0
).isEveryEpochClusterConfigured());
}
@@ -31,12 +33,29 @@ class FoundationDbMessagesConfigurationTest {
void isEveryEpochFreeOfDuplicates() {
assertTrue(new FoundationDbMessagesConfiguration(
Map.of("messages-0", new FoundationDbClusterConfiguration("test-url", null)),
Map.of(0, List.of("messages-0"))
Map.of(0, List.of("messages-0")),
0
).isEveryEpochFreeOfDuplicates());
assertFalse(new FoundationDbMessagesConfiguration(
Map.of("messages-0", new FoundationDbClusterConfiguration("test-url", null)),
Map.of(0, List.of("messages-0", "messages-0"))
Map.of(0, List.of("messages-0", "messages-0")),
0
).isEveryEpochFreeOfDuplicates());
}
@Test
void isActiveEpochConfigured() {
assertTrue(new FoundationDbMessagesConfiguration(
Map.of("messages-0", new FoundationDbClusterConfiguration("test-url", null)),
Map.of(0, List.of("messages-0")),
0
).isActiveEpochConfigured());
assertFalse(new FoundationDbMessagesConfiguration(
Map.of("messages-0", new FoundationDbClusterConfiguration("test-url", null)),
Map.of(0, List.of("messages-0")),
1
).isActiveEpochConfigured());
}
}
@@ -8,8 +8,6 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.apple.foundationdb.Database;
import com.apple.foundationdb.KeyValue;
@@ -57,12 +55,9 @@ import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.junitpioneer.jupiter.cartesian.CartesianTest;
import org.junitpioneer.jupiter.params.IntRangeSource;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicFoundationDbMessagesConfiguration;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.storage.FoundationDbClusterExtension;
import org.whispersystems.textsecuregcm.storage.MessageStream;
import org.whispersystems.textsecuregcm.storage.MessageStreamEntry;
@@ -80,7 +75,6 @@ class FoundationDbMessageStoreTest {
static FoundationDbClusterExtension FOUNDATION_DB_EXTENSION = new FoundationDbClusterExtension(2);
private VersionstampUUIDCipher versionstampUUIDCipher;
private DynamicFoundationDbMessagesConfiguration foundationDbMessagesConfiguration;
private FoundationDbMessageStore foundationDbMessageStore;
private static final Clock CLOCK = Clock.fixed(Instant.ofEpochSecond(500), ZoneId.of("UTC"));
@@ -95,17 +89,6 @@ class FoundationDbMessageStoreTest {
versionstampUUIDCipher = new VersionstampUUIDCipher(0, versionstampCipherKey);
foundationDbMessagesConfiguration = mock(DynamicFoundationDbMessagesConfiguration.class);
when(foundationDbMessagesConfiguration.activeEpoch()).thenReturn(DEFAULT_EPOCH);
final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class);
when(dynamicConfiguration.getFoundationDbMessagesConfiguration()).thenReturn(foundationDbMessagesConfiguration);
@SuppressWarnings("unchecked") final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager =
mock(DynamicConfigurationManager.class);
when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration);
final List<Database> databases = Arrays.asList(FOUNDATION_DB_EXTENSION.getDatabases());
foundationDbMessageStore = new FoundationDbMessageStore(
@@ -116,8 +99,8 @@ class FoundationDbMessageStoreTest {
DEFAULT_EPOCH, databases,
FUTURE_EPOCH, databases.reversed()
),
DEFAULT_EPOCH,
versionstampUUIDCipher,
dynamicConfigurationManager,
CLOCK);
}
@@ -200,15 +183,11 @@ class FoundationDbMessageStoreTest {
final MessageProtos.Envelope defaultEpochMessage = generateRandomMessage(false);
final MessageProtos.Envelope futureEpochMessage = generateRandomMessage(false);
when(foundationDbMessagesConfiguration.activeEpoch()).thenReturn(DEFAULT_EPOCH);
final Map<Byte, FoundationDbMessageStore.InsertResult> defaultEpochInsertResult =
foundationDbMessageStore.insert(aci, Map.of(deviceId, defaultEpochMessage)).join();
when(foundationDbMessagesConfiguration.activeEpoch()).thenReturn(FUTURE_EPOCH);
foundationDbMessageStore.insert(aci, Map.of(deviceId, defaultEpochMessage), DEFAULT_EPOCH).join();
final Map<Byte, FoundationDbMessageStore.InsertResult> futureEpochInsertResult =
foundationDbMessageStore.insert(aci, Map.of(deviceId, futureEpochMessage)).join();
foundationDbMessageStore.insert(aci, Map.of(deviceId, futureEpochMessage), FUTURE_EPOCH).join();
for (int epoch : new int[] { DEFAULT_EPOCH, FUTURE_EPOCH }) {
final List<KeyValue> itemsInDeviceQueue = getItemsInDeviceQueue(aci, deviceId, epoch);
@@ -941,15 +920,13 @@ class FoundationDbMessageStoreTest {
final MessageGuidCodec messageGuidCodec =
new MessageGuidCodec(aci.uuid(), deviceId, versionstampUUIDCipher);
when(foundationDbMessagesConfiguration.activeEpoch()).thenReturn(epoch);
return IntStream.range(0, messageCount)
.mapToObj(_ -> {
final MessageProtos.Envelope message =
generateRandomMessage(false, 16, timestampSupplier.get());
final FoundationDbMessageStore.InsertResult insertResult =
foundationDbMessageStore.insert(aci, Map.of(deviceId, message)).join().get(deviceId);
foundationDbMessageStore.insert(aci, Map.of(deviceId, message), epoch).join().get(deviceId);
final Versionstamp versionstamp = insertResult.versionstamp().orElseThrow();
final UUID messageGuid = messageGuidCodec.encodeMessageGuid(versionstamp);