diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 6e7421478..2d755891f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -100,7 +100,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Disconn @VisibleForTesting static final int MESSAGE_SENDER_MAX_CONCURRENCY = 256; - private static final int DEFAULT_SEND_FUTURES_TIMEOUT_MILLIS = 5 * 60 * 1000; + static final Duration DEFAULT_SEND_FUTURES_TIMEOUT = Duration.ofMinutes(5); private static final Duration CLOSE_WITH_PENDING_MESSAGES_NOTIFICATION_DELAY = Duration.ofMinutes(1); @@ -118,7 +118,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Disconn private final Device authenticatedDevice; private final WebSocketClient client; - private final int sendFuturesTimeoutMillis; + private final Duration sendFuturesTimeout; private final Semaphore processStoredMessagesSemaphore = new Semaphore(1); private final AtomicReference storedMessageState = new AtomicReference<>( @@ -159,7 +159,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Disconn authenticatedAccount, authenticatedDevice, client, - DEFAULT_SEND_FUTURES_TIMEOUT_MILLIS, + DEFAULT_SEND_FUTURES_TIMEOUT, messageDeliveryScheduler, clientReleaseManager, messageDeliveryLoopMonitor, experimentEnrollmentManager); @@ -174,7 +174,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Disconn Account authenticatedAccount, Device authenticatedDevice, WebSocketClient client, - int sendFuturesTimeoutMillis, + Duration sendFuturesTimeout, Scheduler messageDeliveryScheduler, ClientReleaseManager clientReleaseManager, MessageDeliveryLoopMonitor messageDeliveryLoopMonitor, @@ -188,7 +188,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Disconn this.authenticatedAccount = authenticatedAccount; this.authenticatedDevice = authenticatedDevice; this.client = client; - this.sendFuturesTimeoutMillis = sendFuturesTimeoutMillis; + this.sendFuturesTimeout = sendFuturesTimeout; this.messageDeliveryScheduler = messageDeliveryScheduler; this.clientReleaseManager = clientReleaseManager; this.messageDeliveryLoopMonitor = messageDeliveryLoopMonitor; @@ -377,7 +377,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Disconn } }) .flatMapSequential(envelope -> - Mono.defer(() -> Mono.fromFuture(() -> sendMessage(envelope).orTimeout(sendFuturesTimeoutMillis, TimeUnit.MILLISECONDS))) + Mono.defer(() -> Mono.fromFuture(() -> sendMessage(envelope)).timeout(sendFuturesTimeout)) .doOnError(this::measureSendMessageErrors) // Note that this will retry both for "send to client" timeouts and failures to delete messages on // acknowledgement diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java index d84fa5c99..a3f4e74fa 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java @@ -294,7 +294,7 @@ class WebSocketConnectionIntegrationTest { account, device, webSocketClient, - 1000, // use a short timeout, so that this test completes quickly + Duration.ofSeconds(1), // use a short timeout, so that this test completes quickly messageDeliveryScheduler, clientReleaseManager, mock(MessageDeliveryLoopMonitor.class),