diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/WebSocketStrategy.java b/app/src/main/java/org/thoughtcrime/securesms/messages/WebSocketStrategy.java index 58b1e3f942..b2e3b1e28c 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/WebSocketStrategy.java +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/WebSocketStrategy.java @@ -39,14 +39,19 @@ public class WebSocketStrategy extends MessageRetrievalStrategy { jobManager.addListener(job -> job.getParameters().getQueue() != null && job.getParameters().getQueue().startsWith(PushProcessMessageJob.QUEUE_PREFIX), queueListener); - blockUntilWebsocketDrained(observer); + if (!blockUntilWebsocketDrained(observer)) { + return false; + } + stopwatch.split("decryptions-drained"); Set processQueues = queueListener.getQueues(); Log.d(TAG, "Discovered " + processQueues.size() + " queue(s): " + processQueues); for (String queue : processQueues) { - blockUntilJobQueueDrained(queue, QUEUE_TIMEOUT); + if (!blockUntilJobQueueDrained(queue, QUEUE_TIMEOUT)) { + return false; + } } stopwatch.split("process-drained"); @@ -58,7 +63,7 @@ public class WebSocketStrategy extends MessageRetrievalStrategy { } } - private static void blockUntilWebsocketDrained(IncomingMessageObserver observer) { + private static boolean blockUntilWebsocketDrained(IncomingMessageObserver observer) { CountDownLatch latch = new CountDownLatch(1); observer.addDecryptionDrainedListener(new Runnable() { @@ -69,15 +74,19 @@ public class WebSocketStrategy extends MessageRetrievalStrategy { }); try { - if (!latch.await(1, TimeUnit.MINUTES)) { + if (latch.await(1, TimeUnit.MINUTES)) { + return true; + } else { Log.w(TAG, "Hit timeout while waiting for decryptions to drain!"); + return false; } } catch (InterruptedException e) { Log.w(TAG, "Interrupted!", e); + return false; } } - private static void blockUntilJobQueueDrained(@NonNull String queue, long timeoutMs) { + private static boolean blockUntilJobQueueDrained(@NonNull String queue, long timeoutMs) { long startTime = System.currentTimeMillis(); final JobManager jobManager = ApplicationDependencies.getJobManager(); final MarkerJob markerJob = new MarkerJob(queue); @@ -86,12 +95,14 @@ public class WebSocketStrategy extends MessageRetrievalStrategy { if (!jobState.isPresent()) { Log.w(TAG, "Timed out waiting for " + queue + " job(s) to finish!"); + return false; } long endTime = System.currentTimeMillis(); long duration = endTime - startTime; Log.d(TAG, "Waited " + duration + " ms for the " + queue + " job(s) to finish."); + return true; } @Override