diff --git a/app/src/main/AndroidManifest.xml b/app/src/main/AndroidManifest.xml
index 563df5c59d..31b45f2783 100644
--- a/app/src/main/AndroidManifest.xml
+++ b/app/src/main/AndroidManifest.xml
@@ -699,6 +699,7 @@
+
diff --git a/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmFetchManager.kt b/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmFetchManager.kt
index 7a8fc9d4ce..7139d70033 100644
--- a/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmFetchManager.kt
+++ b/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmFetchManager.kt
@@ -8,7 +8,7 @@ import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
import org.thoughtcrime.securesms.jobs.ForegroundServiceUtil
import org.thoughtcrime.securesms.jobs.PushNotificationReceiveJob
-import org.thoughtcrime.securesms.messages.RestStrategy
+import org.thoughtcrime.securesms.messages.WebSocketStrategy
import org.thoughtcrime.securesms.util.concurrent.SerialMonoLifoExecutor
/**
@@ -97,7 +97,7 @@ object FcmFetchManager {
@JvmStatic
fun retrieveMessages(context: Context) {
- val success = ApplicationDependencies.getBackgroundMessageRetriever().retrieveMessages(context, RestStrategy(), RestStrategy())
+ val success = ApplicationDependencies.getBackgroundMessageRetriever().retrieveMessages(context, WebSocketStrategy())
if (success) {
Log.i(TAG, "Successfully retrieved messages.")
diff --git a/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmJobService.java b/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmJobService.java
index 2434be184d..fead61abdc 100644
--- a/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmJobService.java
+++ b/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmJobService.java
@@ -13,7 +13,7 @@ import org.signal.core.util.concurrent.SignalExecutors;
import org.signal.core.util.logging.Log;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.messages.BackgroundMessageRetriever;
-import org.thoughtcrime.securesms.messages.RestStrategy;
+import org.thoughtcrime.securesms.messages.WebSocketStrategy;
import org.thoughtcrime.securesms.util.ServiceUtil;
import org.thoughtcrime.securesms.util.TextSecurePreferences;
@@ -49,7 +49,7 @@ public class FcmJobService extends JobService {
SignalExecutors.UNBOUNDED.execute(() -> {
Context context = getApplicationContext();
BackgroundMessageRetriever retriever = ApplicationDependencies.getBackgroundMessageRetriever();
- boolean success = retriever.retrieveMessages(context, new RestStrategy(), new RestStrategy());
+ boolean success = retriever.retrieveMessages(context, new WebSocketStrategy());
if (success) {
Log.i(TAG, "Successfully retrieved messages.");
diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushNotificationReceiveJob.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushNotificationReceiveJob.java
index 20a4a2a0ac..cd7b9a3315 100644
--- a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushNotificationReceiveJob.java
+++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushNotificationReceiveJob.java
@@ -8,7 +8,7 @@ import org.thoughtcrime.securesms.jobmanager.Data;
import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
import org.thoughtcrime.securesms.messages.BackgroundMessageRetriever;
-import org.thoughtcrime.securesms.messages.RestStrategy;
+import org.thoughtcrime.securesms.messages.WebSocketStrategy;
import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException;
import java.io.IOException;
@@ -60,7 +60,7 @@ public final class PushNotificationReceiveJob extends BaseJob {
@Override
public void onRun() throws IOException {
BackgroundMessageRetriever retriever = ApplicationDependencies.getBackgroundMessageRetriever();
- boolean result = retriever.retrieveMessages(context, foregroundServiceDelayMs, new RestStrategy());
+ boolean result = retriever.retrieveMessages(context, foregroundServiceDelayMs, new WebSocketStrategy());
if (result) {
Log.i(TAG, "Successfully pulled messages.");
diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/BackgroundMessageRetriever.java b/app/src/main/java/org/thoughtcrime/securesms/messages/BackgroundMessageRetriever.java
index e85b975be8..e4b1dc3ae5 100644
--- a/app/src/main/java/org/thoughtcrime/securesms/messages/BackgroundMessageRetriever.java
+++ b/app/src/main/java/org/thoughtcrime/securesms/messages/BackgroundMessageRetriever.java
@@ -34,8 +34,6 @@ public class BackgroundMessageRetriever {
private static final Semaphore ACTIVE_LOCK = new Semaphore(2);
- private static final long NORMAL_TIMEOUT = TimeUnit.SECONDS.toMillis(10);
-
public static final long DO_NOT_SHOW_IN_FOREGROUND = DelayedNotificationController.DO_NOT_SHOW;
/**
@@ -109,7 +107,7 @@ public class BackgroundMessageRetriever {
Log.i(TAG, "Attempting strategy: " + strategy.toString() + logSuffix(startTime));
- if (strategy.execute(NORMAL_TIMEOUT)) {
+ if (strategy.execute()) {
Log.i(TAG, "Strategy succeeded: " + strategy.toString() + logSuffix(startTime));
success = true;
break;
diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java
index afd8e9bc73..74e33704fc 100644
--- a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java
+++ b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java
@@ -26,7 +26,6 @@ import org.thoughtcrime.securesms.jobs.UnableToStartException;
import org.thoughtcrime.securesms.keyvalue.SignalStore;
import org.thoughtcrime.securesms.messages.IncomingMessageProcessor.Processor;
import org.thoughtcrime.securesms.notifications.NotificationChannels;
-import org.thoughtcrime.securesms.push.SignalServiceNetworkAccess;
import org.thoughtcrime.securesms.util.AppForegroundObserver;
import org.thoughtcrime.securesms.util.Util;
import org.whispersystems.signalservice.api.SignalWebSocket;
@@ -36,6 +35,7 @@ import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableExcept
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -52,33 +52,35 @@ public class IncomingMessageObserver {
private static final String TAG = Log.tag(IncomingMessageObserver.class);
- public static final int FOREGROUND_ID = 313399;
+ public static final int FOREGROUND_ID = 313399;
private static final long REQUEST_TIMEOUT_MINUTES = 1;
private static final long OLD_REQUEST_WINDOW_MS = TimeUnit.MINUTES.toMillis(5);
+ private static final long MAX_BACKGROUND_TIME = TimeUnit.MINUTES.toMillis(5);
private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger(0);
private final Application context;
- private final SignalServiceNetworkAccess networkAccess;
private final List decryptionDrainedListeners;
private final BroadcastReceiver connectionReceiver;
private final Map keepAliveTokens;
private boolean appVisible;
+ private long lastInteractionTime;
private volatile boolean networkDrained;
private volatile boolean decryptionDrained;
private volatile boolean terminated;
+
public IncomingMessageObserver(@NonNull Application context) {
if (INSTANCE_COUNT.incrementAndGet() != 1) {
throw new AssertionError("Multiple observers!");
}
this.context = context;
- this.networkAccess = ApplicationDependencies.getSignalServiceNetworkAccess();
this.decryptionDrainedListeners = new CopyOnWriteArrayList<>();
this.keepAliveTokens = new HashMap<>();
+ this.lastInteractionTime = System.currentTimeMillis();
new MessageRetrievalThread().start();
@@ -139,6 +141,13 @@ public class IncomingMessageObserver {
return decryptionDrained;
}
+ /**
+ * @return True if the websocket is active, otherwise false.
+ */
+ public boolean isActive() {
+ return isConnectionNecessary();
+ }
+
public void notifyDecryptionsDrained() {
List listenersToTrigger = new ArrayList<>(decryptionDrainedListeners.size());
@@ -157,33 +166,42 @@ public class IncomingMessageObserver {
private synchronized void onAppForegrounded() {
appVisible = true;
+ context.startService(new Intent(context, BackgroundService.class));
notifyAll();
}
private synchronized void onAppBackgrounded() {
- appVisible = false;
+ appVisible = false;
+ lastInteractionTime = System.currentTimeMillis();
notifyAll();
}
private synchronized boolean isConnectionNecessary() {
- boolean registered = SignalStore.account().isRegistered();
- boolean fcmEnabled = SignalStore.account().isFcmEnabled();
- boolean hasNetwork = NetworkConstraint.isMet(context);
- boolean hasProxy = SignalStore.proxy().isProxyEnabled();
- boolean forceWebsocket = SignalStore.internalValues().isWebsocketModeForced();
- long oldRequest = System.currentTimeMillis() - OLD_REQUEST_WINDOW_MS;
+ boolean registered = SignalStore.account().isRegistered();
+ boolean fcmEnabled = SignalStore.account().isFcmEnabled();
+ boolean hasNetwork = NetworkConstraint.isMet(context);
+ boolean hasProxy = SignalStore.proxy().isProxyEnabled();
+ boolean forceWebsocket = SignalStore.internalValues().isWebsocketModeForced();
+ long oldRequest = System.currentTimeMillis() - OLD_REQUEST_WINDOW_MS;
+ long timeIdle = appVisible ? 0 : System.currentTimeMillis() - lastInteractionTime;
boolean removedRequests = keepAliveTokens.entrySet().removeIf(e -> e.getValue() < oldRequest);
if (removedRequests) {
Log.d(TAG, "Removed old keep web socket open requests.");
}
- Log.d(TAG, String.format("Network: %s, Foreground: %s, FCM: %s, Stay open requests: [%s], Censored: %s, Registered: %s, Proxy: %s, Force websocket: %s",
- hasNetwork, appVisible, fcmEnabled, Util.join(keepAliveTokens.entrySet(), ","), networkAccess.isCensored(), registered, hasProxy, forceWebsocket));
+ String lastInteractionString = appVisible ? "N/A" : timeIdle + " ms (" + (timeIdle < MAX_BACKGROUND_TIME ? "within limit" : "over limit") + ")";
- return registered &&
- (appVisible || !fcmEnabled || forceWebsocket || Util.hasItems(keepAliveTokens)) &&
- hasNetwork;
+ boolean conclusion = registered &&
+ (appVisible || timeIdle < MAX_BACKGROUND_TIME || !fcmEnabled || Util.hasItems(keepAliveTokens)) &&
+ hasNetwork;
+
+ String needsConnectionString = conclusion ? "Needs Connection" : "Does Not Need Connection";
+
+ Log.d(TAG, String.format(Locale.US, "[" + needsConnectionString + "] Network: %s, Foreground: %s, Time Since Last Interaction: %s, FCM: %s, Stay open requests: [%s], Registered: %s, Proxy: %s, Force websocket: %s",
+ hasNetwork, appVisible, lastInteractionString, fcmEnabled, Util.join(keepAliveTokens.entrySet(), ","), registered, hasProxy, forceWebsocket));
+
+ return conclusion;
}
private synchronized void waitForConnectionNecessary() {
@@ -212,11 +230,13 @@ public class IncomingMessageObserver {
public synchronized void registerKeepAliveToken(String key) {
keepAliveTokens.put(key, System.currentTimeMillis());
+ lastInteractionTime = System.currentTimeMillis();
notifyAll();
}
public synchronized void removeKeepAliveToken(String key) {
keepAliveTokens.remove(key);
+ lastInteractionTime = System.currentTimeMillis();
notifyAll();
}
@@ -270,6 +290,10 @@ public class IncomingMessageObserver {
attempts = 0;
}
}
+
+ if (!appVisible) {
+ BackgroundService.stop(context);
+ }
} catch (Throwable e) {
attempts++;
Log.w(TAG, e);
@@ -313,4 +337,38 @@ public class IncomingMessageObserver {
return Service.START_STICKY;
}
}
+
+ /**
+ * A service that exists just to encourage the system to keep our process alive a little longer.
+ */
+ public static class BackgroundService extends Service {
+
+ public static void start(Context context) {
+ try {
+ context.startService(new Intent(context, BackgroundService.class));
+ } catch (Exception e) {
+ Log.w(TAG, "Failed to start background service.", e);
+ }
+ }
+
+ public static void stop(Context context) {
+ context.stopService(new Intent(context, BackgroundService.class));
+ }
+
+ @Override
+ public @Nullable IBinder onBind(Intent intent) {
+ return null;
+ }
+
+ @Override
+ public int onStartCommand(Intent intent, int flags, int startId) {
+ Log.d(TAG, "Background service started.");
+ return START_STICKY;
+ }
+
+ @Override
+ public void onDestroy() {
+ Log.d(TAG, "Background service destroyed.");
+ }
+ }
}
diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/MessageRetrievalStrategy.java b/app/src/main/java/org/thoughtcrime/securesms/messages/MessageRetrievalStrategy.java
index 2e93ad8538..45085dc169 100644
--- a/app/src/main/java/org/thoughtcrime/securesms/messages/MessageRetrievalStrategy.java
+++ b/app/src/main/java/org/thoughtcrime/securesms/messages/MessageRetrievalStrategy.java
@@ -20,53 +20,14 @@ import java.util.Set;
*/
public abstract class MessageRetrievalStrategy {
- private volatile boolean canceled;
-
/**
* Fetches and processes any pending messages. This method should block until the messages are
* actually stored and processed -- not just retrieved.
*
- * @param timeout Hint for how long this will run. The strategy will also be canceled after the
- * timeout ends, but having the timeout available may be useful for setting things
- * like socket timeouts.
- *
* @return True if everything was successful up until cancelation, false otherwise.
*/
@WorkerThread
- abstract boolean execute(long timeout);
-
- /**
- * Marks the strategy as canceled. It is the responsibility of the implementation of
- * {@link #execute(long)} to check {@link #isCanceled()} to know if execution should stop.
- */
- void cancel() {
- this.canceled = true;
- }
-
- protected boolean isCanceled() {
- return canceled;
- }
-
- protected static void blockUntilQueueDrained(@NonNull String tag, @NonNull String queue, long timeoutMs) {
- long startTime = System.currentTimeMillis();
- final JobManager jobManager = ApplicationDependencies.getJobManager();
- final MarkerJob markerJob = new MarkerJob(queue);
-
- Optional jobState = jobManager.runSynchronously(markerJob, timeoutMs);
-
- if (!jobState.isPresent()) {
- Log.w(tag, "Timed out waiting for " + queue + " job(s) to finish!");
- }
-
- long endTime = System.currentTimeMillis();
- long duration = endTime - startTime;
-
- Log.d(tag, "Waited " + duration + " ms for the " + queue + " job(s) to finish.");
- }
-
- protected static String timeSuffix(long startTime) {
- return " (" + (System.currentTimeMillis() - startTime) + " ms elapsed)";
- }
+ abstract boolean execute();
protected static class QueueFindingJobListener implements JobTracker.JobListener {
private final Set queues = new HashSet<>();
diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/RestStrategy.java b/app/src/main/java/org/thoughtcrime/securesms/messages/RestStrategy.java
deleted file mode 100644
index 772dc7ac78..0000000000
--- a/app/src/main/java/org/thoughtcrime/securesms/messages/RestStrategy.java
+++ /dev/null
@@ -1,127 +0,0 @@
-package org.thoughtcrime.securesms.messages;
-
-import androidx.annotation.NonNull;
-import androidx.annotation.WorkerThread;
-
-import org.signal.core.util.logging.Log;
-import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
-import org.thoughtcrime.securesms.jobmanager.JobManager;
-import org.thoughtcrime.securesms.jobmanager.JobTracker;
-import org.thoughtcrime.securesms.jobs.MarkerJob;
-import org.thoughtcrime.securesms.jobs.PushDecryptMessageJob;
-import org.thoughtcrime.securesms.jobs.PushProcessMessageJob;
-import org.thoughtcrime.securesms.keyvalue.SignalStore;
-import org.thoughtcrime.securesms.stories.Stories;
-import org.thoughtcrime.securesms.util.TextSecurePreferences;
-import org.whispersystems.signalservice.api.SignalServiceMessageReceiver;
-import org.whispersystems.signalservice.api.push.exceptions.AuthorizationFailedException;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Retrieves messages over the REST endpoint.
- */
-public class RestStrategy extends MessageRetrievalStrategy {
-
- private static final String TAG = Log.tag(RestStrategy.class);
-
- @WorkerThread
- @Override
- public boolean execute(long timeout) {
- long startTime = System.currentTimeMillis();
- JobManager jobManager = ApplicationDependencies.getJobManager();
- QueueFindingJobListener queueListener = new QueueFindingJobListener();
-
- try (IncomingMessageProcessor.Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) {
- jobManager.addListener(job -> job.getParameters().getQueue() != null && job.getParameters().getQueue().startsWith(PushProcessMessageJob.QUEUE_PREFIX), queueListener);
-
- int jobCount = enqueuePushDecryptJobs(processor, startTime, timeout);
-
- if (jobCount == 0) {
- Log.d(TAG, "No PushDecryptMessageJobs were enqueued.");
- return true;
- } else {
- Log.d(TAG, jobCount + " PushDecryptMessageJob(s) were enqueued.");
- }
-
- long timeRemainingMs = blockUntilQueueDrained(PushDecryptMessageJob.QUEUE, TimeUnit.SECONDS.toMillis(10));
- Set processQueues = queueListener.getQueues();
-
- Log.d(TAG, "Discovered " + processQueues.size() + " queue(s): " + processQueues);
-
- if (timeRemainingMs > 0) {
- Iterator iter = processQueues.iterator();
-
- while (iter.hasNext() && timeRemainingMs > 0) {
- timeRemainingMs = blockUntilQueueDrained(iter.next(), timeRemainingMs);
- }
-
- if (timeRemainingMs <= 0) {
- Log.w(TAG, "Ran out of time while waiting for queues to drain.");
- }
- } else {
- Log.w(TAG, "Ran out of time before we could even wait on individual queues!");
- }
-
- return true;
- } catch (IOException e) {
- Log.w(TAG, "Failed to retrieve messages. Resetting the SignalServiceMessageReceiver.", e);
- ApplicationDependencies.resetSignalServiceMessageReceiver();
- if (e instanceof AuthorizationFailedException && SignalStore.account().isRegistered() && SignalStore.account().getAci() != null) {
- TextSecurePreferences.setUnauthorizedReceived(ApplicationDependencies.getApplication(), true);
- }
- return false;
- } finally {
- jobManager.removeListener(queueListener);
- }
- }
-
- private static int enqueuePushDecryptJobs(IncomingMessageProcessor.Processor processor, long startTime, long timeout)
- throws IOException
- {
- SignalServiceMessageReceiver receiver = ApplicationDependencies.getSignalServiceMessageReceiver();
- AtomicInteger jobCount = new AtomicInteger(0);
-
- receiver.setSoTimeoutMillis(timeout);
-
- receiver.retrieveMessages(Stories.isFeatureEnabled(), envelope -> {
- Log.i(TAG, "Retrieved an envelope." + timeSuffix(startTime));
- String jobId = processor.processEnvelope(envelope);
-
- if (jobId != null) {
- jobCount.incrementAndGet();
- }
- Log.i(TAG, "Successfully processed an envelope." + timeSuffix(startTime));
- });
-
- return jobCount.get();
- }
-
- private static long blockUntilQueueDrained(@NonNull String queue, long timeoutMs) {
- long startTime = System.currentTimeMillis();
- final JobManager jobManager = ApplicationDependencies.getJobManager();
- final MarkerJob markerJob = new MarkerJob(queue);
-
- Optional jobState = jobManager.runSynchronously(markerJob, timeoutMs);
-
- if (!jobState.isPresent()) {
- Log.w(TAG, "Timed out waiting for " + queue + " job(s) to finish!");
- }
-
- long endTime = System.currentTimeMillis();
- long duration = endTime - startTime;
-
- Log.d(TAG, "Waited " + duration + " ms for the " + queue + " job(s) to finish.");
- return timeoutMs - duration;
- }
-
- @Override
- public @NonNull String toString() {
- return Log.tag(RestStrategy.class);
- }
-}
diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/WebSocketStrategy.java b/app/src/main/java/org/thoughtcrime/securesms/messages/WebSocketStrategy.java
new file mode 100644
index 0000000000..cb4b67e318
--- /dev/null
+++ b/app/src/main/java/org/thoughtcrime/securesms/messages/WebSocketStrategy.java
@@ -0,0 +1,102 @@
+package org.thoughtcrime.securesms.messages;
+
+import androidx.annotation.NonNull;
+import androidx.annotation.WorkerThread;
+
+import org.signal.core.util.Stopwatch;
+import org.signal.core.util.logging.Log;
+import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
+import org.thoughtcrime.securesms.jobmanager.JobManager;
+import org.thoughtcrime.securesms.jobmanager.JobTracker;
+import org.thoughtcrime.securesms.jobs.MarkerJob;
+import org.thoughtcrime.securesms.jobs.PushDecryptMessageJob;
+import org.thoughtcrime.securesms.jobs.PushProcessMessageJob;
+
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Retrieves messages over the websocket.
+ */
+public class WebSocketStrategy extends MessageRetrievalStrategy {
+
+ private static final String TAG = Log.tag(WebSocketStrategy.class);
+
+ private static final String KEEP_ALIVE_TOKEN = "WebsocketStrategy";
+ private static final long QUEUE_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
+
+ @WorkerThread
+ @Override
+ public boolean execute() {
+ Stopwatch stopwatch = new Stopwatch("websocket-strategy");
+ IncomingMessageObserver observer = ApplicationDependencies.getIncomingMessageObserver();
+
+ observer.registerKeepAliveToken(KEEP_ALIVE_TOKEN);
+ try {
+ JobManager jobManager = ApplicationDependencies.getJobManager();
+ QueueFindingJobListener queueListener = new QueueFindingJobListener();
+
+ jobManager.addListener(job -> job.getParameters().getQueue() != null && job.getParameters().getQueue().startsWith(PushProcessMessageJob.QUEUE_PREFIX), queueListener);
+
+ blockUntilWebsocketDrained(observer);
+ stopwatch.split("decryptions-drained");
+
+ Set processQueues = queueListener.getQueues();
+ Log.d(TAG, "Discovered " + processQueues.size() + " queue(s): " + processQueues);
+
+ for (String queue : processQueues) {
+ blockUntilJobQueueDrained(queue, QUEUE_TIMEOUT);
+ }
+
+ stopwatch.split("process-drained");
+ stopwatch.stop(TAG);
+
+ return true;
+ } finally {
+ ApplicationDependencies.getIncomingMessageObserver().removeKeepAliveToken(KEEP_ALIVE_TOKEN);
+ }
+ }
+
+ private static void blockUntilWebsocketDrained(IncomingMessageObserver observer) {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ observer.addDecryptionDrainedListener(new Runnable() {
+ @Override public void run() {
+ latch.countDown();
+ observer.removeDecryptionDrainedListener(this);
+ }
+ });
+
+ try {
+ if (!latch.await(1, TimeUnit.MINUTES)) {
+ Log.w(TAG, "Hit timeout while waiting for decryptions to drain!");
+ }
+ } catch (InterruptedException e) {
+ Log.w(TAG, "Interrupted!", e);
+ }
+ }
+
+ private static void blockUntilJobQueueDrained(@NonNull String queue, long timeoutMs) {
+ long startTime = System.currentTimeMillis();
+ final JobManager jobManager = ApplicationDependencies.getJobManager();
+ final MarkerJob markerJob = new MarkerJob(queue);
+
+ Optional jobState = jobManager.runSynchronously(markerJob, timeoutMs);
+
+ if (!jobState.isPresent()) {
+ Log.w(TAG, "Timed out waiting for " + queue + " job(s) to finish!");
+ }
+
+ long endTime = System.currentTimeMillis();
+ long duration = endTime - startTime;
+
+ Log.d(TAG, "Waited " + duration + " ms for the " + queue + " job(s) to finish.");
+ }
+
+ @Override
+ public @NonNull String toString() {
+ return Log.tag(WebSocketStrategy.class);
+ }
+}
diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/WebsocketStrategy.java b/app/src/main/java/org/thoughtcrime/securesms/messages/WebsocketStrategy.java
deleted file mode 100644
index 88c94af22c..0000000000
--- a/app/src/main/java/org/thoughtcrime/securesms/messages/WebsocketStrategy.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package org.thoughtcrime.securesms.messages;
-
-import androidx.annotation.NonNull;
-
-import org.signal.core.util.logging.Log;
-import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
-import org.thoughtcrime.securesms.jobmanager.JobManager;
-import org.thoughtcrime.securesms.jobs.PushProcessMessageJob;
-import org.whispersystems.signalservice.api.SignalWebSocket;
-import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeoutException;
-
-class WebsocketStrategy extends MessageRetrievalStrategy {
-
- private static final String TAG = Log.tag(WebsocketStrategy.class);
-
- private final SignalWebSocket signalWebSocket;
- private final JobManager jobManager;
-
- public WebsocketStrategy() {
- this.signalWebSocket = ApplicationDependencies.getSignalWebSocket();
- this.jobManager = ApplicationDependencies.getJobManager();
- }
-
- @Override
- public boolean execute(long timeout) {
- long startTime = System.currentTimeMillis();
-
- try {
- Set processJobQueues = drainWebsocket(timeout, startTime);
- Iterator queueIterator = processJobQueues.iterator();
- long timeRemaining = Math.max(0, timeout - (System.currentTimeMillis() - startTime));
-
- while (!isCanceled() && queueIterator.hasNext() && timeRemaining > 0) {
- String queue = queueIterator.next();
-
- blockUntilQueueDrained(TAG, queue, timeRemaining);
-
- timeRemaining = Math.max(0, timeout - (System.currentTimeMillis() - startTime));
- }
-
- return true;
- } catch (IOException e) {
- Log.w(TAG, "Encountered an exception while draining the websocket.", e);
- return false;
- }
- }
-
- private @NonNull Set drainWebsocket(long timeout, long startTime) throws IOException {
- QueueFindingJobListener queueListener = new QueueFindingJobListener();
-
- jobManager.addListener(job -> job.getParameters().getQueue() != null && job.getParameters().getQueue().startsWith(PushProcessMessageJob.QUEUE_PREFIX), queueListener);
-
- try {
- signalWebSocket.connect();
- while (shouldContinue()) {
- try {
- Optional result = signalWebSocket.readOrEmpty(timeout, envelope -> {
- Log.i(TAG, "Retrieved envelope! " + envelope.getTimestamp() + timeSuffix(startTime));
- try (IncomingMessageProcessor.Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) {
- processor.processEnvelope(envelope);
- }
- });
-
- if (!result.isPresent()) {
- Log.i(TAG, "Hit an empty response. Finished." + timeSuffix(startTime));
- break;
- }
- } catch (TimeoutException e) {
- Log.w(TAG, "Websocket timeout." + timeSuffix(startTime));
- }
- }
- } finally {
- signalWebSocket.disconnect();
- jobManager.removeListener(queueListener);
- }
-
- return queueListener.getQueues();
- }
-
-
- private boolean shouldContinue() {
- return !isCanceled();
- }
-}