From 33828439fbe95bcda3a1cf058ce4093058d41b4e Mon Sep 17 00:00:00 2001 From: Greyson Parrelli Date: Thu, 23 Feb 2023 17:00:04 -0500 Subject: [PATCH] Use the websocket for FCM fetches. --- app/src/main/AndroidManifest.xml | 1 + .../securesms/gcm/FcmFetchManager.kt | 4 +- .../securesms/gcm/FcmJobService.java | 4 +- .../jobs/PushNotificationReceiveJob.java | 4 +- .../messages/BackgroundMessageRetriever.java | 4 +- .../messages/IncomingMessageObserver.java | 90 ++++++++++--- .../messages/MessageRetrievalStrategy.java | 41 +----- .../securesms/messages/RestStrategy.java | 127 ------------------ .../securesms/messages/WebSocketStrategy.java | 102 ++++++++++++++ .../securesms/messages/WebsocketStrategy.java | 90 ------------- 10 files changed, 185 insertions(+), 282 deletions(-) delete mode 100644 app/src/main/java/org/thoughtcrime/securesms/messages/RestStrategy.java create mode 100644 app/src/main/java/org/thoughtcrime/securesms/messages/WebSocketStrategy.java delete mode 100644 app/src/main/java/org/thoughtcrime/securesms/messages/WebsocketStrategy.java diff --git a/app/src/main/AndroidManifest.xml b/app/src/main/AndroidManifest.xml index 563df5c59d..31b45f2783 100644 --- a/app/src/main/AndroidManifest.xml +++ b/app/src/main/AndroidManifest.xml @@ -699,6 +699,7 @@ + diff --git a/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmFetchManager.kt b/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmFetchManager.kt index 7a8fc9d4ce..7139d70033 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmFetchManager.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmFetchManager.kt @@ -8,7 +8,7 @@ import org.signal.core.util.logging.Log import org.thoughtcrime.securesms.dependencies.ApplicationDependencies import org.thoughtcrime.securesms.jobs.ForegroundServiceUtil import org.thoughtcrime.securesms.jobs.PushNotificationReceiveJob -import org.thoughtcrime.securesms.messages.RestStrategy +import org.thoughtcrime.securesms.messages.WebSocketStrategy import org.thoughtcrime.securesms.util.concurrent.SerialMonoLifoExecutor /** @@ -97,7 +97,7 @@ object FcmFetchManager { @JvmStatic fun retrieveMessages(context: Context) { - val success = ApplicationDependencies.getBackgroundMessageRetriever().retrieveMessages(context, RestStrategy(), RestStrategy()) + val success = ApplicationDependencies.getBackgroundMessageRetriever().retrieveMessages(context, WebSocketStrategy()) if (success) { Log.i(TAG, "Successfully retrieved messages.") diff --git a/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmJobService.java b/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmJobService.java index 2434be184d..fead61abdc 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmJobService.java +++ b/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmJobService.java @@ -13,7 +13,7 @@ import org.signal.core.util.concurrent.SignalExecutors; import org.signal.core.util.logging.Log; import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; import org.thoughtcrime.securesms.messages.BackgroundMessageRetriever; -import org.thoughtcrime.securesms.messages.RestStrategy; +import org.thoughtcrime.securesms.messages.WebSocketStrategy; import org.thoughtcrime.securesms.util.ServiceUtil; import org.thoughtcrime.securesms.util.TextSecurePreferences; @@ -49,7 +49,7 @@ public class FcmJobService extends JobService { SignalExecutors.UNBOUNDED.execute(() -> { Context context = getApplicationContext(); BackgroundMessageRetriever retriever = ApplicationDependencies.getBackgroundMessageRetriever(); - boolean success = retriever.retrieveMessages(context, new RestStrategy(), new RestStrategy()); + boolean success = retriever.retrieveMessages(context, new WebSocketStrategy()); if (success) { Log.i(TAG, "Successfully retrieved messages."); diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushNotificationReceiveJob.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushNotificationReceiveJob.java index 20a4a2a0ac..cd7b9a3315 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushNotificationReceiveJob.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushNotificationReceiveJob.java @@ -8,7 +8,7 @@ import org.thoughtcrime.securesms.jobmanager.Data; import org.thoughtcrime.securesms.jobmanager.Job; import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint; import org.thoughtcrime.securesms.messages.BackgroundMessageRetriever; -import org.thoughtcrime.securesms.messages.RestStrategy; +import org.thoughtcrime.securesms.messages.WebSocketStrategy; import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException; import java.io.IOException; @@ -60,7 +60,7 @@ public final class PushNotificationReceiveJob extends BaseJob { @Override public void onRun() throws IOException { BackgroundMessageRetriever retriever = ApplicationDependencies.getBackgroundMessageRetriever(); - boolean result = retriever.retrieveMessages(context, foregroundServiceDelayMs, new RestStrategy()); + boolean result = retriever.retrieveMessages(context, foregroundServiceDelayMs, new WebSocketStrategy()); if (result) { Log.i(TAG, "Successfully pulled messages."); diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/BackgroundMessageRetriever.java b/app/src/main/java/org/thoughtcrime/securesms/messages/BackgroundMessageRetriever.java index e85b975be8..e4b1dc3ae5 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/BackgroundMessageRetriever.java +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/BackgroundMessageRetriever.java @@ -34,8 +34,6 @@ public class BackgroundMessageRetriever { private static final Semaphore ACTIVE_LOCK = new Semaphore(2); - private static final long NORMAL_TIMEOUT = TimeUnit.SECONDS.toMillis(10); - public static final long DO_NOT_SHOW_IN_FOREGROUND = DelayedNotificationController.DO_NOT_SHOW; /** @@ -109,7 +107,7 @@ public class BackgroundMessageRetriever { Log.i(TAG, "Attempting strategy: " + strategy.toString() + logSuffix(startTime)); - if (strategy.execute(NORMAL_TIMEOUT)) { + if (strategy.execute()) { Log.i(TAG, "Strategy succeeded: " + strategy.toString() + logSuffix(startTime)); success = true; break; diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java index afd8e9bc73..74e33704fc 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java @@ -26,7 +26,6 @@ import org.thoughtcrime.securesms.jobs.UnableToStartException; import org.thoughtcrime.securesms.keyvalue.SignalStore; import org.thoughtcrime.securesms.messages.IncomingMessageProcessor.Processor; import org.thoughtcrime.securesms.notifications.NotificationChannels; -import org.thoughtcrime.securesms.push.SignalServiceNetworkAccess; import org.thoughtcrime.securesms.util.AppForegroundObserver; import org.thoughtcrime.securesms.util.Util; import org.whispersystems.signalservice.api.SignalWebSocket; @@ -36,6 +35,7 @@ import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableExcept import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; @@ -52,33 +52,35 @@ public class IncomingMessageObserver { private static final String TAG = Log.tag(IncomingMessageObserver.class); - public static final int FOREGROUND_ID = 313399; + public static final int FOREGROUND_ID = 313399; private static final long REQUEST_TIMEOUT_MINUTES = 1; private static final long OLD_REQUEST_WINDOW_MS = TimeUnit.MINUTES.toMillis(5); + private static final long MAX_BACKGROUND_TIME = TimeUnit.MINUTES.toMillis(5); private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger(0); private final Application context; - private final SignalServiceNetworkAccess networkAccess; private final List decryptionDrainedListeners; private final BroadcastReceiver connectionReceiver; private final Map keepAliveTokens; private boolean appVisible; + private long lastInteractionTime; private volatile boolean networkDrained; private volatile boolean decryptionDrained; private volatile boolean terminated; + public IncomingMessageObserver(@NonNull Application context) { if (INSTANCE_COUNT.incrementAndGet() != 1) { throw new AssertionError("Multiple observers!"); } this.context = context; - this.networkAccess = ApplicationDependencies.getSignalServiceNetworkAccess(); this.decryptionDrainedListeners = new CopyOnWriteArrayList<>(); this.keepAliveTokens = new HashMap<>(); + this.lastInteractionTime = System.currentTimeMillis(); new MessageRetrievalThread().start(); @@ -139,6 +141,13 @@ public class IncomingMessageObserver { return decryptionDrained; } + /** + * @return True if the websocket is active, otherwise false. + */ + public boolean isActive() { + return isConnectionNecessary(); + } + public void notifyDecryptionsDrained() { List listenersToTrigger = new ArrayList<>(decryptionDrainedListeners.size()); @@ -157,33 +166,42 @@ public class IncomingMessageObserver { private synchronized void onAppForegrounded() { appVisible = true; + context.startService(new Intent(context, BackgroundService.class)); notifyAll(); } private synchronized void onAppBackgrounded() { - appVisible = false; + appVisible = false; + lastInteractionTime = System.currentTimeMillis(); notifyAll(); } private synchronized boolean isConnectionNecessary() { - boolean registered = SignalStore.account().isRegistered(); - boolean fcmEnabled = SignalStore.account().isFcmEnabled(); - boolean hasNetwork = NetworkConstraint.isMet(context); - boolean hasProxy = SignalStore.proxy().isProxyEnabled(); - boolean forceWebsocket = SignalStore.internalValues().isWebsocketModeForced(); - long oldRequest = System.currentTimeMillis() - OLD_REQUEST_WINDOW_MS; + boolean registered = SignalStore.account().isRegistered(); + boolean fcmEnabled = SignalStore.account().isFcmEnabled(); + boolean hasNetwork = NetworkConstraint.isMet(context); + boolean hasProxy = SignalStore.proxy().isProxyEnabled(); + boolean forceWebsocket = SignalStore.internalValues().isWebsocketModeForced(); + long oldRequest = System.currentTimeMillis() - OLD_REQUEST_WINDOW_MS; + long timeIdle = appVisible ? 0 : System.currentTimeMillis() - lastInteractionTime; boolean removedRequests = keepAliveTokens.entrySet().removeIf(e -> e.getValue() < oldRequest); if (removedRequests) { Log.d(TAG, "Removed old keep web socket open requests."); } - Log.d(TAG, String.format("Network: %s, Foreground: %s, FCM: %s, Stay open requests: [%s], Censored: %s, Registered: %s, Proxy: %s, Force websocket: %s", - hasNetwork, appVisible, fcmEnabled, Util.join(keepAliveTokens.entrySet(), ","), networkAccess.isCensored(), registered, hasProxy, forceWebsocket)); + String lastInteractionString = appVisible ? "N/A" : timeIdle + " ms (" + (timeIdle < MAX_BACKGROUND_TIME ? "within limit" : "over limit") + ")"; - return registered && - (appVisible || !fcmEnabled || forceWebsocket || Util.hasItems(keepAliveTokens)) && - hasNetwork; + boolean conclusion = registered && + (appVisible || timeIdle < MAX_BACKGROUND_TIME || !fcmEnabled || Util.hasItems(keepAliveTokens)) && + hasNetwork; + + String needsConnectionString = conclusion ? "Needs Connection" : "Does Not Need Connection"; + + Log.d(TAG, String.format(Locale.US, "[" + needsConnectionString + "] Network: %s, Foreground: %s, Time Since Last Interaction: %s, FCM: %s, Stay open requests: [%s], Registered: %s, Proxy: %s, Force websocket: %s", + hasNetwork, appVisible, lastInteractionString, fcmEnabled, Util.join(keepAliveTokens.entrySet(), ","), registered, hasProxy, forceWebsocket)); + + return conclusion; } private synchronized void waitForConnectionNecessary() { @@ -212,11 +230,13 @@ public class IncomingMessageObserver { public synchronized void registerKeepAliveToken(String key) { keepAliveTokens.put(key, System.currentTimeMillis()); + lastInteractionTime = System.currentTimeMillis(); notifyAll(); } public synchronized void removeKeepAliveToken(String key) { keepAliveTokens.remove(key); + lastInteractionTime = System.currentTimeMillis(); notifyAll(); } @@ -270,6 +290,10 @@ public class IncomingMessageObserver { attempts = 0; } } + + if (!appVisible) { + BackgroundService.stop(context); + } } catch (Throwable e) { attempts++; Log.w(TAG, e); @@ -313,4 +337,38 @@ public class IncomingMessageObserver { return Service.START_STICKY; } } + + /** + * A service that exists just to encourage the system to keep our process alive a little longer. + */ + public static class BackgroundService extends Service { + + public static void start(Context context) { + try { + context.startService(new Intent(context, BackgroundService.class)); + } catch (Exception e) { + Log.w(TAG, "Failed to start background service.", e); + } + } + + public static void stop(Context context) { + context.stopService(new Intent(context, BackgroundService.class)); + } + + @Override + public @Nullable IBinder onBind(Intent intent) { + return null; + } + + @Override + public int onStartCommand(Intent intent, int flags, int startId) { + Log.d(TAG, "Background service started."); + return START_STICKY; + } + + @Override + public void onDestroy() { + Log.d(TAG, "Background service destroyed."); + } + } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/MessageRetrievalStrategy.java b/app/src/main/java/org/thoughtcrime/securesms/messages/MessageRetrievalStrategy.java index 2e93ad8538..45085dc169 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/MessageRetrievalStrategy.java +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/MessageRetrievalStrategy.java @@ -20,53 +20,14 @@ import java.util.Set; */ public abstract class MessageRetrievalStrategy { - private volatile boolean canceled; - /** * Fetches and processes any pending messages. This method should block until the messages are * actually stored and processed -- not just retrieved. * - * @param timeout Hint for how long this will run. The strategy will also be canceled after the - * timeout ends, but having the timeout available may be useful for setting things - * like socket timeouts. - * * @return True if everything was successful up until cancelation, false otherwise. */ @WorkerThread - abstract boolean execute(long timeout); - - /** - * Marks the strategy as canceled. It is the responsibility of the implementation of - * {@link #execute(long)} to check {@link #isCanceled()} to know if execution should stop. - */ - void cancel() { - this.canceled = true; - } - - protected boolean isCanceled() { - return canceled; - } - - protected static void blockUntilQueueDrained(@NonNull String tag, @NonNull String queue, long timeoutMs) { - long startTime = System.currentTimeMillis(); - final JobManager jobManager = ApplicationDependencies.getJobManager(); - final MarkerJob markerJob = new MarkerJob(queue); - - Optional jobState = jobManager.runSynchronously(markerJob, timeoutMs); - - if (!jobState.isPresent()) { - Log.w(tag, "Timed out waiting for " + queue + " job(s) to finish!"); - } - - long endTime = System.currentTimeMillis(); - long duration = endTime - startTime; - - Log.d(tag, "Waited " + duration + " ms for the " + queue + " job(s) to finish."); - } - - protected static String timeSuffix(long startTime) { - return " (" + (System.currentTimeMillis() - startTime) + " ms elapsed)"; - } + abstract boolean execute(); protected static class QueueFindingJobListener implements JobTracker.JobListener { private final Set queues = new HashSet<>(); diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/RestStrategy.java b/app/src/main/java/org/thoughtcrime/securesms/messages/RestStrategy.java deleted file mode 100644 index 772dc7ac78..0000000000 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/RestStrategy.java +++ /dev/null @@ -1,127 +0,0 @@ -package org.thoughtcrime.securesms.messages; - -import androidx.annotation.NonNull; -import androidx.annotation.WorkerThread; - -import org.signal.core.util.logging.Log; -import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; -import org.thoughtcrime.securesms.jobmanager.JobManager; -import org.thoughtcrime.securesms.jobmanager.JobTracker; -import org.thoughtcrime.securesms.jobs.MarkerJob; -import org.thoughtcrime.securesms.jobs.PushDecryptMessageJob; -import org.thoughtcrime.securesms.jobs.PushProcessMessageJob; -import org.thoughtcrime.securesms.keyvalue.SignalStore; -import org.thoughtcrime.securesms.stories.Stories; -import org.thoughtcrime.securesms.util.TextSecurePreferences; -import org.whispersystems.signalservice.api.SignalServiceMessageReceiver; -import org.whispersystems.signalservice.api.push.exceptions.AuthorizationFailedException; - -import java.io.IOException; -import java.util.Iterator; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Retrieves messages over the REST endpoint. - */ -public class RestStrategy extends MessageRetrievalStrategy { - - private static final String TAG = Log.tag(RestStrategy.class); - - @WorkerThread - @Override - public boolean execute(long timeout) { - long startTime = System.currentTimeMillis(); - JobManager jobManager = ApplicationDependencies.getJobManager(); - QueueFindingJobListener queueListener = new QueueFindingJobListener(); - - try (IncomingMessageProcessor.Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) { - jobManager.addListener(job -> job.getParameters().getQueue() != null && job.getParameters().getQueue().startsWith(PushProcessMessageJob.QUEUE_PREFIX), queueListener); - - int jobCount = enqueuePushDecryptJobs(processor, startTime, timeout); - - if (jobCount == 0) { - Log.d(TAG, "No PushDecryptMessageJobs were enqueued."); - return true; - } else { - Log.d(TAG, jobCount + " PushDecryptMessageJob(s) were enqueued."); - } - - long timeRemainingMs = blockUntilQueueDrained(PushDecryptMessageJob.QUEUE, TimeUnit.SECONDS.toMillis(10)); - Set processQueues = queueListener.getQueues(); - - Log.d(TAG, "Discovered " + processQueues.size() + " queue(s): " + processQueues); - - if (timeRemainingMs > 0) { - Iterator iter = processQueues.iterator(); - - while (iter.hasNext() && timeRemainingMs > 0) { - timeRemainingMs = blockUntilQueueDrained(iter.next(), timeRemainingMs); - } - - if (timeRemainingMs <= 0) { - Log.w(TAG, "Ran out of time while waiting for queues to drain."); - } - } else { - Log.w(TAG, "Ran out of time before we could even wait on individual queues!"); - } - - return true; - } catch (IOException e) { - Log.w(TAG, "Failed to retrieve messages. Resetting the SignalServiceMessageReceiver.", e); - ApplicationDependencies.resetSignalServiceMessageReceiver(); - if (e instanceof AuthorizationFailedException && SignalStore.account().isRegistered() && SignalStore.account().getAci() != null) { - TextSecurePreferences.setUnauthorizedReceived(ApplicationDependencies.getApplication(), true); - } - return false; - } finally { - jobManager.removeListener(queueListener); - } - } - - private static int enqueuePushDecryptJobs(IncomingMessageProcessor.Processor processor, long startTime, long timeout) - throws IOException - { - SignalServiceMessageReceiver receiver = ApplicationDependencies.getSignalServiceMessageReceiver(); - AtomicInteger jobCount = new AtomicInteger(0); - - receiver.setSoTimeoutMillis(timeout); - - receiver.retrieveMessages(Stories.isFeatureEnabled(), envelope -> { - Log.i(TAG, "Retrieved an envelope." + timeSuffix(startTime)); - String jobId = processor.processEnvelope(envelope); - - if (jobId != null) { - jobCount.incrementAndGet(); - } - Log.i(TAG, "Successfully processed an envelope." + timeSuffix(startTime)); - }); - - return jobCount.get(); - } - - private static long blockUntilQueueDrained(@NonNull String queue, long timeoutMs) { - long startTime = System.currentTimeMillis(); - final JobManager jobManager = ApplicationDependencies.getJobManager(); - final MarkerJob markerJob = new MarkerJob(queue); - - Optional jobState = jobManager.runSynchronously(markerJob, timeoutMs); - - if (!jobState.isPresent()) { - Log.w(TAG, "Timed out waiting for " + queue + " job(s) to finish!"); - } - - long endTime = System.currentTimeMillis(); - long duration = endTime - startTime; - - Log.d(TAG, "Waited " + duration + " ms for the " + queue + " job(s) to finish."); - return timeoutMs - duration; - } - - @Override - public @NonNull String toString() { - return Log.tag(RestStrategy.class); - } -} diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/WebSocketStrategy.java b/app/src/main/java/org/thoughtcrime/securesms/messages/WebSocketStrategy.java new file mode 100644 index 0000000000..cb4b67e318 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/WebSocketStrategy.java @@ -0,0 +1,102 @@ +package org.thoughtcrime.securesms.messages; + +import androidx.annotation.NonNull; +import androidx.annotation.WorkerThread; + +import org.signal.core.util.Stopwatch; +import org.signal.core.util.logging.Log; +import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; +import org.thoughtcrime.securesms.jobmanager.JobManager; +import org.thoughtcrime.securesms.jobmanager.JobTracker; +import org.thoughtcrime.securesms.jobs.MarkerJob; +import org.thoughtcrime.securesms.jobs.PushDecryptMessageJob; +import org.thoughtcrime.securesms.jobs.PushProcessMessageJob; + +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * Retrieves messages over the websocket. + */ +public class WebSocketStrategy extends MessageRetrievalStrategy { + + private static final String TAG = Log.tag(WebSocketStrategy.class); + + private static final String KEEP_ALIVE_TOKEN = "WebsocketStrategy"; + private static final long QUEUE_TIMEOUT = TimeUnit.SECONDS.toMillis(30); + + @WorkerThread + @Override + public boolean execute() { + Stopwatch stopwatch = new Stopwatch("websocket-strategy"); + IncomingMessageObserver observer = ApplicationDependencies.getIncomingMessageObserver(); + + observer.registerKeepAliveToken(KEEP_ALIVE_TOKEN); + try { + JobManager jobManager = ApplicationDependencies.getJobManager(); + QueueFindingJobListener queueListener = new QueueFindingJobListener(); + + jobManager.addListener(job -> job.getParameters().getQueue() != null && job.getParameters().getQueue().startsWith(PushProcessMessageJob.QUEUE_PREFIX), queueListener); + + blockUntilWebsocketDrained(observer); + stopwatch.split("decryptions-drained"); + + Set processQueues = queueListener.getQueues(); + Log.d(TAG, "Discovered " + processQueues.size() + " queue(s): " + processQueues); + + for (String queue : processQueues) { + blockUntilJobQueueDrained(queue, QUEUE_TIMEOUT); + } + + stopwatch.split("process-drained"); + stopwatch.stop(TAG); + + return true; + } finally { + ApplicationDependencies.getIncomingMessageObserver().removeKeepAliveToken(KEEP_ALIVE_TOKEN); + } + } + + private static void blockUntilWebsocketDrained(IncomingMessageObserver observer) { + CountDownLatch latch = new CountDownLatch(1); + + observer.addDecryptionDrainedListener(new Runnable() { + @Override public void run() { + latch.countDown(); + observer.removeDecryptionDrainedListener(this); + } + }); + + try { + if (!latch.await(1, TimeUnit.MINUTES)) { + Log.w(TAG, "Hit timeout while waiting for decryptions to drain!"); + } + } catch (InterruptedException e) { + Log.w(TAG, "Interrupted!", e); + } + } + + private static void blockUntilJobQueueDrained(@NonNull String queue, long timeoutMs) { + long startTime = System.currentTimeMillis(); + final JobManager jobManager = ApplicationDependencies.getJobManager(); + final MarkerJob markerJob = new MarkerJob(queue); + + Optional jobState = jobManager.runSynchronously(markerJob, timeoutMs); + + if (!jobState.isPresent()) { + Log.w(TAG, "Timed out waiting for " + queue + " job(s) to finish!"); + } + + long endTime = System.currentTimeMillis(); + long duration = endTime - startTime; + + Log.d(TAG, "Waited " + duration + " ms for the " + queue + " job(s) to finish."); + } + + @Override + public @NonNull String toString() { + return Log.tag(WebSocketStrategy.class); + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/WebsocketStrategy.java b/app/src/main/java/org/thoughtcrime/securesms/messages/WebsocketStrategy.java deleted file mode 100644 index 88c94af22c..0000000000 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/WebsocketStrategy.java +++ /dev/null @@ -1,90 +0,0 @@ -package org.thoughtcrime.securesms.messages; - -import androidx.annotation.NonNull; - -import org.signal.core.util.logging.Log; -import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; -import org.thoughtcrime.securesms.jobmanager.JobManager; -import org.thoughtcrime.securesms.jobs.PushProcessMessageJob; -import org.whispersystems.signalservice.api.SignalWebSocket; -import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope; - -import java.io.IOException; -import java.util.Iterator; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.TimeoutException; - -class WebsocketStrategy extends MessageRetrievalStrategy { - - private static final String TAG = Log.tag(WebsocketStrategy.class); - - private final SignalWebSocket signalWebSocket; - private final JobManager jobManager; - - public WebsocketStrategy() { - this.signalWebSocket = ApplicationDependencies.getSignalWebSocket(); - this.jobManager = ApplicationDependencies.getJobManager(); - } - - @Override - public boolean execute(long timeout) { - long startTime = System.currentTimeMillis(); - - try { - Set processJobQueues = drainWebsocket(timeout, startTime); - Iterator queueIterator = processJobQueues.iterator(); - long timeRemaining = Math.max(0, timeout - (System.currentTimeMillis() - startTime)); - - while (!isCanceled() && queueIterator.hasNext() && timeRemaining > 0) { - String queue = queueIterator.next(); - - blockUntilQueueDrained(TAG, queue, timeRemaining); - - timeRemaining = Math.max(0, timeout - (System.currentTimeMillis() - startTime)); - } - - return true; - } catch (IOException e) { - Log.w(TAG, "Encountered an exception while draining the websocket.", e); - return false; - } - } - - private @NonNull Set drainWebsocket(long timeout, long startTime) throws IOException { - QueueFindingJobListener queueListener = new QueueFindingJobListener(); - - jobManager.addListener(job -> job.getParameters().getQueue() != null && job.getParameters().getQueue().startsWith(PushProcessMessageJob.QUEUE_PREFIX), queueListener); - - try { - signalWebSocket.connect(); - while (shouldContinue()) { - try { - Optional result = signalWebSocket.readOrEmpty(timeout, envelope -> { - Log.i(TAG, "Retrieved envelope! " + envelope.getTimestamp() + timeSuffix(startTime)); - try (IncomingMessageProcessor.Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) { - processor.processEnvelope(envelope); - } - }); - - if (!result.isPresent()) { - Log.i(TAG, "Hit an empty response. Finished." + timeSuffix(startTime)); - break; - } - } catch (TimeoutException e) { - Log.w(TAG, "Websocket timeout." + timeSuffix(startTime)); - } - } - } finally { - signalWebSocket.disconnect(); - jobManager.removeListener(queueListener); - } - - return queueListener.getQueues(); - } - - - private boolean shouldContinue() { - return !isCanceled(); - } -}