diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/FoundationDbMessagesConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/FoundationDbMessagesConfiguration.java index 1cf82f371..d9d020344 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/FoundationDbMessagesConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/FoundationDbMessagesConfiguration.java @@ -17,7 +17,8 @@ import java.util.Map; import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore; public record FoundationDbMessagesConfiguration(@NotEmpty Map clusters, - @NotEmpty Map<@PositiveOrZero @Max(FoundationDbMessageStore.MAX_EPOCHS - 1) Integer, @Size(min = 1, max = FoundationDbMessageStore.MAX_SHARDS - 1) List> epochs) { + @NotEmpty Map<@PositiveOrZero @Max(FoundationDbMessageStore.MAX_EPOCHS - 1) Integer, @Size(min = 1, max = FoundationDbMessageStore.MAX_SHARDS - 1) List> epochs, + @PositiveOrZero @Max(FoundationDbMessageStore.MAX_EPOCHS - 1) int activeEpoch) { @AssertTrue boolean isEveryEpochClusterConfigured() { @@ -42,4 +43,9 @@ public record FoundationDbMessagesConfiguration(@NotEmpty Map getExperimentEnrollmentConfiguration( final String experimentName) { return Optional.ofNullable(experiments.get(experimentName)); @@ -158,8 +153,4 @@ public class DynamicConfiguration { public DynamicTurnConfiguration getTurnConfiguration() { return turn; } - - public DynamicFoundationDbMessagesConfiguration getFoundationDbMessagesConfiguration() { - return foundationDbMessages; - } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicFoundationDbMessagesConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicFoundationDbMessagesConfiguration.java deleted file mode 100644 index 9d1f629ae..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicFoundationDbMessagesConfiguration.java +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright 2026 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.configuration.dynamic; - -import jakarta.validation.constraints.Max; -import jakarta.validation.constraints.PositiveOrZero; -import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore; - -public record DynamicFoundationDbMessagesConfiguration(@PositiveOrZero @Max(FoundationDbMessageStore.MAX_EPOCHS - 1) int activeEpoch) { -} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessageStore.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessageStore.java index f2ceff7c9..3548a59a3 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessageStore.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessageStore.java @@ -19,11 +19,9 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.stream.Collectors; -import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier; import org.whispersystems.textsecuregcm.storage.Device; -import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.storage.MessageStream; import org.whispersystems.textsecuregcm.util.Conversions; import org.whispersystems.textsecuregcm.util.Util; @@ -42,8 +40,8 @@ import org.whispersystems.textsecuregcm.util.Util; public class FoundationDbMessageStore { private final Database[][] databasesByEpoch; + private final int activeEpoch; private final VersionstampUUIDCipher versionstampUUIDCipher; - private final DynamicConfigurationManager dynamicConfigurationManager; private final Clock clock; private static final Subspace MESSAGES_SUBSPACE = new Subspace(Tuple.from("M")); @@ -69,8 +67,8 @@ public class FoundationDbMessageStore { } public FoundationDbMessageStore(final Map> databasesByEpoch, + final int activeEpoch, final VersionstampUUIDCipher versionstampUUIDCipher, - final DynamicConfigurationManager dynamicConfigurationManager, final Clock clock) { final Database[][] databasesByEpochArray = new Database[MAX_EPOCHS][]; @@ -79,8 +77,8 @@ public class FoundationDbMessageStore { databasesByEpochArray[epoch] = databases.toArray(Database[]::new)); this.databasesByEpoch = databasesByEpochArray; + this.activeEpoch = activeEpoch; this.versionstampUUIDCipher = versionstampUUIDCipher; - this.dynamicConfigurationManager = dynamicConfigurationManager; this.clock = clock; } @@ -93,7 +91,15 @@ public class FoundationDbMessageStore { public CompletableFuture> insert(final AciServiceIdentifier aciServiceIdentifier, final Map messagesByDeviceId) { - return insert(Map.of(aciServiceIdentifier, messagesByDeviceId)) + return insert(aciServiceIdentifier, messagesByDeviceId, activeEpoch); + } + + @VisibleForTesting + CompletableFuture> insert(final AciServiceIdentifier aciServiceIdentifier, + final Map messagesByDeviceId, + final int epoch) { + + return insert(Map.of(aciServiceIdentifier, messagesByDeviceId), epoch) .thenApply(resultsByServiceIdentifier -> { assert resultsByServiceIdentifier.size() == 1; @@ -115,6 +121,14 @@ public class FoundationDbMessageStore { public CompletableFuture>> insert( final Map> messagesByServiceIdentifier) { + return insert(messagesByServiceIdentifier, activeEpoch); + } + + @VisibleForTesting + CompletableFuture>> insert( + final Map> messagesByServiceIdentifier, + final int epoch) { + if (messagesByServiceIdentifier.entrySet() .stream() .anyMatch(entry -> entry.getValue().isEmpty())) { @@ -129,12 +143,9 @@ public class FoundationDbMessageStore { throw new IllegalArgumentException("Messages must not have pre-set server GUIDs"); } - final int activeEpoch = - dynamicConfigurationManager.getConfiguration().getFoundationDbMessagesConfiguration().activeEpoch(); - final Map>>> messagesByShardId = messagesByServiceIdentifier.entrySet().stream() - .collect(Collectors.groupingBy(entry -> hashAciToShardNumber(entry.getKey(), activeEpoch))); + .collect(Collectors.groupingBy(entry -> hashAciToShardNumber(entry.getKey(), epoch))); final List>>> chunkFutures = new ArrayList<>(); @@ -150,7 +161,7 @@ public class FoundationDbMessageStore { .sum(); if (estimatedTransactionSize > MAX_MESSAGE_CHUNK_SIZE) { - chunkFutures.add(insertChunk(shardId, activeEpoch, messagesForShard.subList(start, current))); + chunkFutures.add(insertChunk(shardId, epoch, messagesForShard.subList(start, current))); start = current; estimatedTransactionSize = 0; @@ -160,7 +171,7 @@ public class FoundationDbMessageStore { } assert start < messagesForShard.size(); - chunkFutures.add(insertChunk(shardId, activeEpoch, messagesForShard.subList(start, messagesForShard.size()))); + chunkFutures.add(insertChunk(shardId, epoch, messagesForShard.subList(start, messagesForShard.size()))); }); return CompletableFuture.allOf(chunkFutures.toArray(CompletableFuture[]::new)) diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/configuration/FoundationDbMessagesConfigurationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/configuration/FoundationDbMessagesConfigurationTest.java index dd7412591..5a3944a7c 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/configuration/FoundationDbMessagesConfigurationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/configuration/FoundationDbMessagesConfigurationTest.java @@ -18,12 +18,14 @@ class FoundationDbMessagesConfigurationTest { void isEveryEpochClusterConfigured() { assertTrue(new FoundationDbMessagesConfiguration( Map.of("messages-0", new FoundationDbClusterConfiguration("test-url", null)), - Map.of(0, List.of("messages-0")) + Map.of(0, List.of("messages-0")), + 0 ).isEveryEpochClusterConfigured()); assertFalse(new FoundationDbMessagesConfiguration( Map.of("messages-0", new FoundationDbClusterConfiguration("test-url", null)), - Map.of(0, List.of("messages-0", "unconfigured-cluster")) + Map.of(0, List.of("messages-0", "unconfigured-cluster")), + 0 ).isEveryEpochClusterConfigured()); } @@ -31,12 +33,29 @@ class FoundationDbMessagesConfigurationTest { void isEveryEpochFreeOfDuplicates() { assertTrue(new FoundationDbMessagesConfiguration( Map.of("messages-0", new FoundationDbClusterConfiguration("test-url", null)), - Map.of(0, List.of("messages-0")) + Map.of(0, List.of("messages-0")), + 0 ).isEveryEpochFreeOfDuplicates()); assertFalse(new FoundationDbMessagesConfiguration( Map.of("messages-0", new FoundationDbClusterConfiguration("test-url", null)), - Map.of(0, List.of("messages-0", "messages-0")) + Map.of(0, List.of("messages-0", "messages-0")), + 0 ).isEveryEpochFreeOfDuplicates()); } + + @Test + void isActiveEpochConfigured() { + assertTrue(new FoundationDbMessagesConfiguration( + Map.of("messages-0", new FoundationDbClusterConfiguration("test-url", null)), + Map.of(0, List.of("messages-0")), + 0 + ).isActiveEpochConfigured()); + + assertFalse(new FoundationDbMessagesConfiguration( + Map.of("messages-0", new FoundationDbClusterConfiguration("test-url", null)), + Map.of(0, List.of("messages-0")), + 1 + ).isActiveEpochConfigured()); + } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessageStoreTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessageStoreTest.java index 9c7002e1d..1d10c207e 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessageStoreTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessageStoreTest.java @@ -8,8 +8,6 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import com.apple.foundationdb.Database; import com.apple.foundationdb.KeyValue; @@ -57,12 +55,9 @@ import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.junitpioneer.jupiter.cartesian.CartesianTest; import org.junitpioneer.jupiter.params.IntRangeSource; -import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; -import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicFoundationDbMessagesConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier; import org.whispersystems.textsecuregcm.storage.Device; -import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.storage.FoundationDbClusterExtension; import org.whispersystems.textsecuregcm.storage.MessageStream; import org.whispersystems.textsecuregcm.storage.MessageStreamEntry; @@ -80,7 +75,6 @@ class FoundationDbMessageStoreTest { static FoundationDbClusterExtension FOUNDATION_DB_EXTENSION = new FoundationDbClusterExtension(2); private VersionstampUUIDCipher versionstampUUIDCipher; - private DynamicFoundationDbMessagesConfiguration foundationDbMessagesConfiguration; private FoundationDbMessageStore foundationDbMessageStore; private static final Clock CLOCK = Clock.fixed(Instant.ofEpochSecond(500), ZoneId.of("UTC")); @@ -95,17 +89,6 @@ class FoundationDbMessageStoreTest { versionstampUUIDCipher = new VersionstampUUIDCipher(0, versionstampCipherKey); - foundationDbMessagesConfiguration = mock(DynamicFoundationDbMessagesConfiguration.class); - when(foundationDbMessagesConfiguration.activeEpoch()).thenReturn(DEFAULT_EPOCH); - - final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class); - when(dynamicConfiguration.getFoundationDbMessagesConfiguration()).thenReturn(foundationDbMessagesConfiguration); - - @SuppressWarnings("unchecked") final DynamicConfigurationManager dynamicConfigurationManager = - mock(DynamicConfigurationManager.class); - - when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); - final List databases = Arrays.asList(FOUNDATION_DB_EXTENSION.getDatabases()); foundationDbMessageStore = new FoundationDbMessageStore( @@ -116,8 +99,8 @@ class FoundationDbMessageStoreTest { DEFAULT_EPOCH, databases, FUTURE_EPOCH, databases.reversed() ), + DEFAULT_EPOCH, versionstampUUIDCipher, - dynamicConfigurationManager, CLOCK); } @@ -200,15 +183,11 @@ class FoundationDbMessageStoreTest { final MessageProtos.Envelope defaultEpochMessage = generateRandomMessage(false); final MessageProtos.Envelope futureEpochMessage = generateRandomMessage(false); - when(foundationDbMessagesConfiguration.activeEpoch()).thenReturn(DEFAULT_EPOCH); - final Map defaultEpochInsertResult = - foundationDbMessageStore.insert(aci, Map.of(deviceId, defaultEpochMessage)).join(); - - when(foundationDbMessagesConfiguration.activeEpoch()).thenReturn(FUTURE_EPOCH); + foundationDbMessageStore.insert(aci, Map.of(deviceId, defaultEpochMessage), DEFAULT_EPOCH).join(); final Map futureEpochInsertResult = - foundationDbMessageStore.insert(aci, Map.of(deviceId, futureEpochMessage)).join(); + foundationDbMessageStore.insert(aci, Map.of(deviceId, futureEpochMessage), FUTURE_EPOCH).join(); for (int epoch : new int[] { DEFAULT_EPOCH, FUTURE_EPOCH }) { final List itemsInDeviceQueue = getItemsInDeviceQueue(aci, deviceId, epoch); @@ -941,15 +920,13 @@ class FoundationDbMessageStoreTest { final MessageGuidCodec messageGuidCodec = new MessageGuidCodec(aci.uuid(), deviceId, versionstampUUIDCipher); - when(foundationDbMessagesConfiguration.activeEpoch()).thenReturn(epoch); - return IntStream.range(0, messageCount) .mapToObj(_ -> { final MessageProtos.Envelope message = generateRandomMessage(false, 16, timestampSupplier.get()); final FoundationDbMessageStore.InsertResult insertResult = - foundationDbMessageStore.insert(aci, Map.of(deviceId, message)).join().get(deviceId); + foundationDbMessageStore.insert(aci, Map.of(deviceId, message), epoch).join().get(deviceId); final Versionstamp versionstamp = insertResult.versionstamp().orElseThrow(); final UUID messageGuid = messageGuidCodec.encodeMessageGuid(versionstamp);