Include shard ID as user data in message versionstamps

This commit is contained in:
Jon Chambers
2026-04-01 10:24:51 -04:00
committed by Jon Chambers
parent 2500e85c01
commit 844e103c86
2 changed files with 23 additions and 14 deletions

View File

@@ -111,8 +111,6 @@ public class FoundationDbMessageStore {
new ArrayList<>(); new ArrayList<>();
messagesByShardId.forEach((shardId, messagesForShard) -> { messagesByShardId.forEach((shardId, messagesForShard) -> {
final Database shard = databases[shardId];
int start = 0, current = 0; int start = 0, current = 0;
int estimatedTransactionSize = 0; int estimatedTransactionSize = 0;
@@ -123,7 +121,7 @@ public class FoundationDbMessageStore {
.sum(); .sum();
if (estimatedTransactionSize > MAX_MESSAGE_CHUNK_SIZE) { if (estimatedTransactionSize > MAX_MESSAGE_CHUNK_SIZE) {
chunkFutures.add(insertChunk(shard, messagesForShard.subList(start, current))); chunkFutures.add(insertChunk(shardId, messagesForShard.subList(start, current)));
start = current; start = current;
estimatedTransactionSize = 0; estimatedTransactionSize = 0;
@@ -133,7 +131,7 @@ public class FoundationDbMessageStore {
} }
assert start < messagesForShard.size(); assert start < messagesForShard.size();
chunkFutures.add(insertChunk(shard, messagesForShard.subList(start, messagesForShard.size()))); chunkFutures.add(insertChunk(shardId, messagesForShard.subList(start, messagesForShard.size())));
}); });
return CompletableFuture.allOf(chunkFutures.toArray(CompletableFuture[]::new)) return CompletableFuture.allOf(chunkFutures.toArray(CompletableFuture[]::new))
@@ -146,7 +144,7 @@ public class FoundationDbMessageStore {
} }
private CompletableFuture<Map<AciServiceIdentifier, Map<Byte, InsertResult>>> insertChunk( private CompletableFuture<Map<AciServiceIdentifier, Map<Byte, InsertResult>>> insertChunk(
final Database database, final int shardId,
final List<Map.Entry<AciServiceIdentifier, Map<Byte, MessageProtos.Envelope>>> messagesByAccountIdentifier) { final List<Map.Entry<AciServiceIdentifier, Map<Byte, MessageProtos.Envelope>>> messagesByAccountIdentifier) {
final Map<AciServiceIdentifier, CompletableFuture<Map<Byte, Boolean>>> insertFuturesByAci = new HashMap<>(); final Map<AciServiceIdentifier, CompletableFuture<Map<Byte, Boolean>>> insertFuturesByAci = new HashMap<>();
@@ -158,7 +156,7 @@ public class FoundationDbMessageStore {
.map(MessageProtos.Envelope::getEphemeral) .map(MessageProtos.Envelope::getEphemeral)
.orElseThrow(() -> new IllegalStateException("One or more bundles is empty")); .orElseThrow(() -> new IllegalStateException("One or more bundles is empty"));
return database.runAsync(transaction -> { return databases[shardId].runAsync(transaction -> {
messagesByAccountIdentifier.forEach(entry -> messagesByAccountIdentifier.forEach(entry ->
insertFuturesByAci.put(entry.getKey(), insert(entry.getKey(), entry.getValue(), transaction))); insertFuturesByAci.put(entry.getKey(), insert(entry.getKey(), entry.getValue(), transaction)));
@@ -171,7 +169,7 @@ public class FoundationDbMessageStore {
.anyMatch(isPresent -> isPresent); .anyMatch(isPresent -> isPresent);
if (anyClientPresent || !ephemeral) { if (anyClientPresent || !ephemeral) {
return transaction.getVersionstamp() return transaction.getVersionstamp()
.thenApply(versionstampBytes -> Optional.of(Versionstamp.complete(versionstampBytes))); .thenApply(versionstampBytes -> Optional.of(Versionstamp.complete(versionstampBytes, shardId)));
} }
return CompletableFuture.completedFuture(Optional.<Versionstamp>empty()); return CompletableFuture.completedFuture(Optional.<Versionstamp>empty());
}); });
@@ -222,7 +220,7 @@ public class FoundationDbMessageStore {
if (isPresent || !message.getEphemeral()) { if (isPresent || !message.getEphemeral()) {
transaction.mutate(MutationType.SET_VERSIONSTAMPED_KEY, transaction.mutate(MutationType.SET_VERSIONSTAMPED_KEY,
getDeviceQueueSubspace(aci, deviceId) getDeviceQueueSubspace(aci, deviceId)
.packWithVersionstamp(Tuple.from(Versionstamp.incomplete())), message.toByteArray()); .packWithVersionstamp(Tuple.from(Versionstamp.incomplete(hashAciToShardNumber(aci)))), message.toByteArray());
} }
return isPresent; return isPresent;
@@ -241,7 +239,7 @@ public class FoundationDbMessageStore {
if (anyClientPresent) { if (anyClientPresent) {
transaction.mutate(MutationType.SET_VERSIONSTAMPED_VALUE, getMessagesAvailableWatchKey(aci), transaction.mutate(MutationType.SET_VERSIONSTAMPED_VALUE, getMessagesAvailableWatchKey(aci),
Tuple.from(Versionstamp.incomplete()).packWithVersionstamp()); Tuple.from(Versionstamp.incomplete(hashAciToShardNumber(aci))).packWithVersionstamp());
} }
return presenceByDeviceId; return presenceByDeviceId;

View File

@@ -74,23 +74,34 @@ class FoundationDbMessageStoreTest {
@ParameterizedTest @ParameterizedTest
@MethodSource @MethodSource
void insert(final long presenceUpdatedBeforeSeconds, final boolean ephemeral, final boolean expectMessagesInserted, void insert(final long presenceUpdatedBeforeSeconds,
final boolean expectVersionstampUpdated, final boolean expectPresenceState) { final boolean ephemeral,
final boolean expectMessagesInserted,
final boolean expectVersionstampUpdated,
final boolean expectPresenceState) {
final AciServiceIdentifier aci = new AciServiceIdentifier(UUID.randomUUID()); final AciServiceIdentifier aci = new AciServiceIdentifier(UUID.randomUUID());
final List<Byte> deviceIds = IntStream.range(Device.PRIMARY_ID, Device.PRIMARY_ID + 6) final List<Byte> deviceIds = IntStream.range(Device.PRIMARY_ID, Device.PRIMARY_ID + 6)
.mapToObj(i -> (byte) i) .mapToObj(i -> (byte) i)
.toList(); .toList();
deviceIds.forEach(deviceId -> writePresenceKey(aci, deviceId, 1, presenceUpdatedBeforeSeconds)); deviceIds.forEach(deviceId -> writePresenceKey(aci, deviceId, 1, presenceUpdatedBeforeSeconds));
final Map<Byte, MessageProtos.Envelope> messagesByDeviceId = deviceIds.stream() final Map<Byte, MessageProtos.Envelope> messagesByDeviceId = deviceIds.stream()
.collect(Collectors.toMap(Function.identity(), _ -> generateRandomMessage(ephemeral))); .collect(Collectors.toMap(Function.identity(), _ -> generateRandomMessage(ephemeral)));
final Map<Byte, FoundationDbMessageStore.InsertResult> result = foundationDbMessageStore.insert(aci, messagesByDeviceId).join();
assertNotNull(result); final Map<Byte, FoundationDbMessageStore.InsertResult> result =
foundationDbMessageStore.insert(aci, messagesByDeviceId).join();
assertTrue(result.keySet().containsAll(deviceIds));
final Optional<Versionstamp> returnedVersionstamp = result.values().stream().findFirst() final Optional<Versionstamp> returnedVersionstamp = result.values().stream().findFirst()
.flatMap(FoundationDbMessageStore.InsertResult::versionstamp); .flatMap(FoundationDbMessageStore.InsertResult::versionstamp);
if (expectMessagesInserted) { if (expectMessagesInserted) {
assertTrue(returnedVersionstamp.isPresent()); assertTrue(returnedVersionstamp.isPresent());
assertTrue(result.values().stream().allMatch(insertResult -> returnedVersionstamp.equals(insertResult.versionstamp()))); assertTrue(result.values().stream().allMatch(insertResult -> returnedVersionstamp.equals(insertResult.versionstamp())));
final Map<Byte, MessageProtos.Envelope> storedMessagesByDeviceId = deviceIds.stream() final Map<Byte, MessageProtos.Envelope> storedMessagesByDeviceId = deviceIds.stream()
.collect(Collectors.toMap(Function.identity(), deviceId -> { .collect(Collectors.toMap(Function.identity(), deviceId -> {
try { try {
@@ -109,7 +120,7 @@ class FoundationDbMessageStoreTest {
if (expectVersionstampUpdated) { if (expectVersionstampUpdated) {
final Optional<Versionstamp> messagesAvailableWatchVersionstamp = getMessagesAvailableWatch(aci); final Optional<Versionstamp> messagesAvailableWatchVersionstamp = getMessagesAvailableWatch(aci);
assertTrue(messagesAvailableWatchVersionstamp.isPresent()); assertTrue(messagesAvailableWatchVersionstamp.isPresent());
assertEquals(returnedVersionstamp, messagesAvailableWatchVersionstamp, assertEquals(messagesAvailableWatchVersionstamp, returnedVersionstamp,
"messages available versionstamp should be the versionstamp of the last insert transaction"); "messages available versionstamp should be the versionstamp of the last insert transaction");
} else { } else {
assertTrue(getMessagesAvailableWatch(aci).isEmpty()); assertTrue(getMessagesAvailableWatch(aci).isEmpty());