Improve thread synchronization in RedisDynamoDbMessagePublisherTest

This commit is contained in:
Jon Chambers
2025-08-11 18:24:10 -04:00
committed by GitHub
parent 2e32ab3282
commit 085127326b

View File

@@ -193,11 +193,11 @@ class RedisDynamoDbMessagePublisherTest {
final RedisDynamoDbMessagePublisher messagePublisher =
new RedisDynamoDbMessagePublisher(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, DESTINATION_SERVICE_IDENTIFIER.uuid(), destinationDevice);
final CountDownLatch countDownLatch = new CountDownLatch(2);
final CountDownLatch queueEmptyCountDownLatch = new CountDownLatch(1);
Thread.ofVirtual().start(() -> {
try {
countDownLatch.await();
queueEmptyCountDownLatch.await();
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
@@ -210,7 +210,11 @@ class RedisDynamoDbMessagePublisherTest {
});
StepVerifier.create(JdkFlowAdapter.flowPublisherToFlux(messagePublisher)
.doOnNext(_ -> countDownLatch.countDown()))
.doOnNext(entry -> {
if (entry instanceof MessageStreamEntry.QueueEmpty) {
queueEmptyCountDownLatch.countDown();
}
}))
.expectNext(new MessageStreamEntry.Envelope(dynamoDbMessage))
.expectNext(new MessageStreamEntry.Envelope(redisMessage))
.expectNext(new MessageStreamEntry.QueueEmpty())
@@ -228,11 +232,11 @@ class RedisDynamoDbMessagePublisherTest {
final RedisDynamoDbMessagePublisher messagePublisher =
new RedisDynamoDbMessagePublisher(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, DESTINATION_SERVICE_IDENTIFIER.uuid(), destinationDevice);
final CountDownLatch countDownLatch = new CountDownLatch(2);
final CountDownLatch queueEmptyCountDownLatch = new CountDownLatch(1);
Thread.ofVirtual().start(() -> {
try {
countDownLatch.await();
queueEmptyCountDownLatch.await();
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
@@ -245,7 +249,11 @@ class RedisDynamoDbMessagePublisherTest {
});
StepVerifier.create(JdkFlowAdapter.flowPublisherToFlux(messagePublisher)
.doOnNext(_ -> countDownLatch.countDown()))
.doOnNext(entry -> {
if (entry instanceof MessageStreamEntry.QueueEmpty) {
queueEmptyCountDownLatch.countDown();
}
}))
.expectNext(new MessageStreamEntry.Envelope(dynamoDbMessage))
.expectNext(new MessageStreamEntry.Envelope(redisMessage))
.expectNext(new MessageStreamEntry.QueueEmpty())