mirror of
https://github.com/signalapp/Signal-Android.git
synced 2025-12-23 20:48:43 +00:00
Remove BackgroundMessageRetriever and clean up old code.
This commit is contained in:
committed by
Cody Henthorne
parent
9f75c37331
commit
b07d675bb4
@@ -186,7 +186,6 @@ public class ApplicationContext extends MultiDexApplication implements AppForegr
|
||||
.addNonBlocking(PreKeysSyncJob::enqueueIfNeeded)
|
||||
.addNonBlocking(this::initializePeriodicTasks)
|
||||
.addNonBlocking(this::initializeCircumvention)
|
||||
.addNonBlocking(this::initializePendingMessages)
|
||||
.addNonBlocking(this::initializeCleanup)
|
||||
.addNonBlocking(this::initializeGlideCodecs)
|
||||
.addNonBlocking(StorageSyncHelper::scheduleRoutineSync)
|
||||
@@ -449,18 +448,6 @@ public class ApplicationContext extends MultiDexApplication implements AppForegr
|
||||
}
|
||||
}
|
||||
|
||||
private void initializePendingMessages() {
|
||||
if (TextSecurePreferences.getNeedsMessagePull(this)) {
|
||||
Log.i(TAG, "Scheduling a message fetch.");
|
||||
if (Build.VERSION.SDK_INT >= 26) {
|
||||
FcmJobService.schedule(this);
|
||||
} else {
|
||||
ApplicationDependencies.getJobManager().add(new PushNotificationReceiveJob());
|
||||
}
|
||||
TextSecurePreferences.setNeedsMessagePull(this, false);
|
||||
}
|
||||
}
|
||||
|
||||
@WorkerThread
|
||||
private void initializeBlobProvider() {
|
||||
BlobProvider.getInstance().initialize(this);
|
||||
|
||||
@@ -80,15 +80,6 @@ public abstract class PassphraseRequiredActivity extends BaseActivity implements
|
||||
protected void onPreCreate() {}
|
||||
protected void onCreate(Bundle savedInstanceState, boolean ready) {}
|
||||
|
||||
@Override
|
||||
protected void onResume() {
|
||||
super.onResume();
|
||||
|
||||
if (networkAccess.isCensored()) {
|
||||
ApplicationDependencies.getJobManager().add(new PushNotificationReceiveJob());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onDestroy() {
|
||||
super.onDestroy();
|
||||
|
||||
@@ -22,7 +22,6 @@ import org.thoughtcrime.securesms.groups.v2.processing.GroupsV2StateProcessor;
|
||||
import org.thoughtcrime.securesms.jobmanager.JobManager;
|
||||
import org.thoughtcrime.securesms.keyvalue.SignalStore;
|
||||
import org.thoughtcrime.securesms.megaphone.MegaphoneRepository;
|
||||
import org.thoughtcrime.securesms.messages.BackgroundMessageRetriever;
|
||||
import org.thoughtcrime.securesms.messages.IncomingMessageObserver;
|
||||
import org.thoughtcrime.securesms.net.StandardUserAgentInterceptor;
|
||||
import org.thoughtcrime.securesms.notifications.MessageNotifier;
|
||||
@@ -98,7 +97,6 @@ public class ApplicationDependencies {
|
||||
private static volatile SignalServiceMessageSender messageSender;
|
||||
private static volatile SignalServiceMessageReceiver messageReceiver;
|
||||
private static volatile IncomingMessageObserver incomingMessageObserver;
|
||||
private static volatile BackgroundMessageRetriever backgroundMessageRetriever;
|
||||
private static volatile LiveRecipientCache recipientCache;
|
||||
private static volatile JobManager jobManager;
|
||||
private static volatile FrameRateTracker frameRateTracker;
|
||||
@@ -276,18 +274,6 @@ public class ApplicationDependencies {
|
||||
return provider.provideSignalServiceNetworkAccess();
|
||||
}
|
||||
|
||||
public static @NonNull BackgroundMessageRetriever getBackgroundMessageRetriever() {
|
||||
if (backgroundMessageRetriever == null) {
|
||||
synchronized (LOCK) {
|
||||
if (backgroundMessageRetriever == null) {
|
||||
backgroundMessageRetriever = provider.provideBackgroundMessageRetriever();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return backgroundMessageRetriever;
|
||||
}
|
||||
|
||||
public static @NonNull LiveRecipientCache getRecipientCache() {
|
||||
if (recipientCache == null) {
|
||||
synchronized (LOCK) {
|
||||
@@ -706,7 +692,6 @@ public class ApplicationDependencies {
|
||||
@NonNull SignalServiceMessageSender provideSignalServiceMessageSender(@NonNull SignalWebSocket signalWebSocket, @NonNull SignalServiceDataStore protocolStore, @NonNull SignalServiceConfiguration signalServiceConfiguration);
|
||||
@NonNull SignalServiceMessageReceiver provideSignalServiceMessageReceiver(@NonNull SignalServiceConfiguration signalServiceConfiguration);
|
||||
@NonNull SignalServiceNetworkAccess provideSignalServiceNetworkAccess();
|
||||
@NonNull BackgroundMessageRetriever provideBackgroundMessageRetriever();
|
||||
@NonNull LiveRecipientCache provideRecipientCache();
|
||||
@NonNull JobManager provideJobManager();
|
||||
@NonNull FrameRateTracker provideFrameRateTracker();
|
||||
|
||||
@@ -47,7 +47,6 @@ import org.thoughtcrime.securesms.jobs.ReactionSendJob;
|
||||
import org.thoughtcrime.securesms.jobs.TypingSendJob;
|
||||
import org.thoughtcrime.securesms.keyvalue.SignalStore;
|
||||
import org.thoughtcrime.securesms.megaphone.MegaphoneRepository;
|
||||
import org.thoughtcrime.securesms.messages.BackgroundMessageRetriever;
|
||||
import org.thoughtcrime.securesms.messages.IncomingMessageObserver;
|
||||
import org.thoughtcrime.securesms.net.SignalWebSocketHealthMonitor;
|
||||
import org.thoughtcrime.securesms.notifications.MessageNotifier;
|
||||
@@ -160,11 +159,6 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr
|
||||
return new SignalServiceNetworkAccess(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public @NonNull BackgroundMessageRetriever provideBackgroundMessageRetriever() {
|
||||
return new BackgroundMessageRetriever();
|
||||
}
|
||||
|
||||
@Override
|
||||
public @NonNull LiveRecipientCache provideRecipientCache() {
|
||||
return new LiveRecipientCache(context);
|
||||
|
||||
@@ -140,7 +140,7 @@ object FcmFetchManager {
|
||||
|
||||
@JvmStatic
|
||||
fun retrieveMessages(context: Context): Boolean {
|
||||
val success = ApplicationDependencies.getBackgroundMessageRetriever().retrieveMessages(context, WebSocketStrategy(WEBSOCKET_DRAIN_TIMEOUT))
|
||||
val success = WebSocketStrategy.execute(WEBSOCKET_DRAIN_TIMEOUT)
|
||||
|
||||
if (success) {
|
||||
Log.i(TAG, "Successfully retrieved messages.")
|
||||
|
||||
@@ -12,7 +12,7 @@ import androidx.annotation.RequiresApi;
|
||||
import org.signal.core.util.concurrent.SignalExecutors;
|
||||
import org.signal.core.util.logging.Log;
|
||||
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
|
||||
import org.thoughtcrime.securesms.messages.BackgroundMessageRetriever;
|
||||
import org.thoughtcrime.securesms.keyvalue.SignalStore;
|
||||
import org.thoughtcrime.securesms.messages.WebSocketStrategy;
|
||||
import org.thoughtcrime.securesms.util.ServiceUtil;
|
||||
import org.thoughtcrime.securesms.util.TextSecurePreferences;
|
||||
@@ -41,15 +41,13 @@ public class FcmJobService extends JobService {
|
||||
public boolean onStartJob(JobParameters params) {
|
||||
Log.d(TAG, "onStartJob()");
|
||||
|
||||
if (BackgroundMessageRetriever.shouldIgnoreFetch()) {
|
||||
if (ApplicationDependencies.getAppForegroundObserver().isForegrounded()) {
|
||||
Log.i(TAG, "App is foregrounded. No need to run.");
|
||||
return false;
|
||||
}
|
||||
|
||||
SignalExecutors.UNBOUNDED.execute(() -> {
|
||||
Context context = getApplicationContext();
|
||||
BackgroundMessageRetriever retriever = ApplicationDependencies.getBackgroundMessageRetriever();
|
||||
boolean success = retriever.retrieveMessages(context, new WebSocketStrategy());
|
||||
boolean success = WebSocketStrategy.execute();
|
||||
|
||||
if (success) {
|
||||
Log.i(TAG, "Successfully retrieved messages.");
|
||||
@@ -66,6 +64,6 @@ public class FcmJobService extends JobService {
|
||||
@Override
|
||||
public boolean onStopJob(JobParameters params) {
|
||||
Log.d(TAG, "onStopJob()");
|
||||
return TextSecurePreferences.getNeedsMessagePull(getApplicationContext());
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import androidx.annotation.Nullable;
|
||||
import androidx.annotation.RequiresApi;
|
||||
|
||||
import org.thoughtcrime.securesms.jobmanager.Constraint;
|
||||
import org.thoughtcrime.securesms.util.NetworkUtil;
|
||||
|
||||
public class NetworkConstraint implements Constraint {
|
||||
|
||||
@@ -44,10 +45,7 @@ public class NetworkConstraint implements Constraint {
|
||||
}
|
||||
|
||||
public static boolean isMet(@NonNull Context context) {
|
||||
ConnectivityManager connectivityManager = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
|
||||
NetworkInfo activeNetworkInfo = connectivityManager.getActiveNetworkInfo();
|
||||
|
||||
return activeNetworkInfo != null && activeNetworkInfo.isConnected();
|
||||
return NetworkUtil.isConnected(context);
|
||||
}
|
||||
|
||||
public static final class Factory implements Constraint.Factory<NetworkConstraint> {
|
||||
|
||||
@@ -4,12 +4,12 @@ import androidx.annotation.NonNull;
|
||||
import androidx.annotation.Nullable;
|
||||
|
||||
import org.signal.core.util.logging.Log;
|
||||
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
|
||||
import org.thoughtcrime.securesms.jobmanager.JsonJobData;
|
||||
import org.thoughtcrime.securesms.R;
|
||||
import org.thoughtcrime.securesms.jobmanager.Job;
|
||||
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
|
||||
import org.thoughtcrime.securesms.messages.BackgroundMessageRetriever;
|
||||
import org.thoughtcrime.securesms.messages.WebSocketStrategy;
|
||||
import org.thoughtcrime.securesms.service.DelayedNotificationController;
|
||||
import org.thoughtcrime.securesms.service.GenericForegroundService;
|
||||
import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException;
|
||||
|
||||
import java.io.IOException;
|
||||
@@ -20,37 +20,22 @@ public final class PushNotificationReceiveJob extends BaseJob {
|
||||
|
||||
private static final String TAG = Log.tag(PushNotificationReceiveJob.class);
|
||||
|
||||
private static final String KEY_FOREGROUND_SERVICE_DELAY = "foreground_delay";
|
||||
|
||||
private final long foregroundServiceDelayMs;
|
||||
|
||||
public PushNotificationReceiveJob() {
|
||||
this(BackgroundMessageRetriever.DO_NOT_SHOW_IN_FOREGROUND);
|
||||
}
|
||||
|
||||
private PushNotificationReceiveJob(long foregroundServiceDelayMs) {
|
||||
this(new Job.Parameters.Builder()
|
||||
.addConstraint(NetworkConstraint.KEY)
|
||||
.setQueue("__notification_received")
|
||||
.setMaxAttempts(3)
|
||||
.setMaxInstancesForFactory(1)
|
||||
.build(),
|
||||
foregroundServiceDelayMs);
|
||||
.addConstraint(NetworkConstraint.KEY)
|
||||
.setQueue("__notification_received")
|
||||
.setMaxAttempts(3)
|
||||
.setMaxInstancesForFactory(1)
|
||||
.build());
|
||||
}
|
||||
|
||||
private PushNotificationReceiveJob(@NonNull Job.Parameters parameters, long foregroundServiceDelayMs) {
|
||||
private PushNotificationReceiveJob(Job.Parameters parameters) {
|
||||
super(parameters);
|
||||
this.foregroundServiceDelayMs = foregroundServiceDelayMs;
|
||||
}
|
||||
|
||||
public static Job withDelayedForegroundService(long foregroundServiceAfterMs) {
|
||||
return new PushNotificationReceiveJob(foregroundServiceAfterMs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public @Nullable byte[] serialize() {
|
||||
return new JsonJobData.Builder().putLong(KEY_FOREGROUND_SERVICE_DELAY, foregroundServiceDelayMs)
|
||||
.serialize();
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -60,10 +45,13 @@ public final class PushNotificationReceiveJob extends BaseJob {
|
||||
|
||||
@Override
|
||||
public void onRun() throws IOException {
|
||||
BackgroundMessageRetriever retriever = ApplicationDependencies.getBackgroundMessageRetriever();
|
||||
boolean result = retriever.retrieveMessages(context, foregroundServiceDelayMs, new WebSocketStrategy());
|
||||
boolean success;
|
||||
|
||||
if (result) {
|
||||
try (DelayedNotificationController unused = GenericForegroundService.startForegroundTaskDelayed(context, context.getString(R.string.BackgroundMessageRetriever_checking_for_messages), 300, R.drawable.ic_signal_refresh)) {
|
||||
success = WebSocketStrategy.execute();
|
||||
}
|
||||
|
||||
if (success) {
|
||||
Log.i(TAG, "Successfully pulled messages.");
|
||||
} else {
|
||||
throw new PushNetworkException("Failed to pull messages.");
|
||||
@@ -78,17 +66,12 @@ public final class PushNotificationReceiveJob extends BaseJob {
|
||||
|
||||
@Override
|
||||
public void onFailure() {
|
||||
Log.w(TAG, "***** Failed to download pending message!");
|
||||
// MessageNotifier.notifyMessagesPending(getContext());
|
||||
}
|
||||
|
||||
public static final class Factory implements Job.Factory<PushNotificationReceiveJob> {
|
||||
@Override
|
||||
public @NonNull PushNotificationReceiveJob create(@NonNull Parameters parameters, @Nullable byte[] serializedData) {
|
||||
JsonJobData data = JsonJobData.deserialize(serializedData);
|
||||
|
||||
return new PushNotificationReceiveJob(parameters,
|
||||
data.getLongOrDefault(KEY_FOREGROUND_SERVICE_DELAY, BackgroundMessageRetriever.DO_NOT_SHOW_IN_FOREGROUND));
|
||||
return new PushNotificationReceiveJob(parameters);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -60,7 +60,7 @@ public final class MessageProcessReceiver extends BroadcastReceiver {
|
||||
Log.i(TAG, "Running PushNotificationReceiveJob");
|
||||
|
||||
Optional<JobTracker.JobState> jobState = ApplicationDependencies.getJobManager()
|
||||
.runSynchronously(PushNotificationReceiveJob.withDelayedForegroundService(foregroundDelayMs), jobTimeout);
|
||||
.runSynchronously(new PushNotificationReceiveJob(), jobTimeout);
|
||||
|
||||
Log.i(TAG, "PushNotificationReceiveJob ended: " + (jobState.isPresent() ? jobState.get().toString() : "Job did not complete"));
|
||||
});
|
||||
|
||||
@@ -1,144 +0,0 @@
|
||||
package org.thoughtcrime.securesms.messages;
|
||||
|
||||
import android.content.Context;
|
||||
import android.os.Build;
|
||||
import android.os.PowerManager;
|
||||
|
||||
import androidx.annotation.NonNull;
|
||||
import androidx.annotation.WorkerThread;
|
||||
|
||||
import org.signal.core.util.logging.Log;
|
||||
import org.thoughtcrime.securesms.ApplicationContext;
|
||||
import org.thoughtcrime.securesms.R;
|
||||
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
|
||||
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
|
||||
import org.thoughtcrime.securesms.service.DelayedNotificationController;
|
||||
import org.thoughtcrime.securesms.service.GenericForegroundService;
|
||||
import org.thoughtcrime.securesms.util.PowerManagerCompat;
|
||||
import org.thoughtcrime.securesms.util.ServiceUtil;
|
||||
import org.thoughtcrime.securesms.util.TextSecurePreferences;
|
||||
import org.thoughtcrime.securesms.util.WakeLockUtil;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Retrieves messages while the app is in the background via provided {@link MessageRetrievalStrategy}'s.
|
||||
*/
|
||||
public class BackgroundMessageRetriever {
|
||||
|
||||
private static final String TAG = Log.tag(BackgroundMessageRetriever.class);
|
||||
|
||||
private static final String WAKE_LOCK_TAG = "MessageRetriever";
|
||||
|
||||
private static final Semaphore ACTIVE_LOCK = new Semaphore(2);
|
||||
|
||||
public static final long DO_NOT_SHOW_IN_FOREGROUND = DelayedNotificationController.DO_NOT_SHOW;
|
||||
|
||||
/**
|
||||
* @return False if the retrieval failed and should be rescheduled, otherwise true.
|
||||
*/
|
||||
@WorkerThread
|
||||
public boolean retrieveMessages(@NonNull Context context, MessageRetrievalStrategy... strategies) {
|
||||
return retrieveMessages(context, DO_NOT_SHOW_IN_FOREGROUND, strategies);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return False if the retrieval failed and should be rescheduled, otherwise true.
|
||||
*/
|
||||
@WorkerThread
|
||||
public boolean retrieveMessages(@NonNull Context context, long showNotificationAfterMs, MessageRetrievalStrategy... strategies) {
|
||||
if (shouldIgnoreFetch()) {
|
||||
Log.i(TAG, "Skipping retrieval -- app is in the foreground.");
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!ACTIVE_LOCK.tryAcquire()) {
|
||||
Log.i(TAG, "Skipping retrieval -- there's already one enqueued.");
|
||||
return true;
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
try (NoExceptionCloseable unused = startDelayedForegroundServiceIfPossible(context, showNotificationAfterMs)) {
|
||||
PowerManager.WakeLock wakeLock = null;
|
||||
|
||||
try {
|
||||
wakeLock = WakeLockUtil.acquire(context, PowerManager.PARTIAL_WAKE_LOCK, TimeUnit.SECONDS.toMillis(60), WAKE_LOCK_TAG);
|
||||
|
||||
TextSecurePreferences.setNeedsMessagePull(context, true);
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
PowerManager powerManager = ServiceUtil.getPowerManager(context);
|
||||
boolean doze = PowerManagerCompat.isDeviceIdleMode(powerManager);
|
||||
boolean network = new NetworkConstraint.Factory(ApplicationContext.getInstance(context)).create().isMet();
|
||||
|
||||
if (doze || !network) {
|
||||
Log.w(TAG, "We may be operating in a constrained environment. Doze: " + doze + " Network: " + network);
|
||||
}
|
||||
|
||||
Log.i(TAG, "Performing normal message fetch.");
|
||||
return executeBackgroundRetrieval(context, startTime, strategies);
|
||||
} finally {
|
||||
WakeLockUtil.release(wakeLock, WAKE_LOCK_TAG);
|
||||
ACTIVE_LOCK.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private NoExceptionCloseable startDelayedForegroundServiceIfPossible(@NonNull Context context, long showNotificationAfterMs) {
|
||||
if (Build.VERSION.SDK_INT < 31) {
|
||||
return GenericForegroundService.startForegroundTaskDelayed(context, context.getString(R.string.BackgroundMessageRetriever_checking_for_messages), showNotificationAfterMs, R.drawable.ic_signal_refresh)::close;
|
||||
} else {
|
||||
return () -> {};
|
||||
}
|
||||
}
|
||||
|
||||
private boolean executeBackgroundRetrieval(@NonNull Context context, long startTime, @NonNull MessageRetrievalStrategy[] strategies) {
|
||||
boolean success = false;
|
||||
|
||||
for (MessageRetrievalStrategy strategy : strategies) {
|
||||
if (shouldIgnoreFetch()) {
|
||||
Log.i(TAG, "Stopping further strategy attempts -- app is in the foreground." + logSuffix(startTime));
|
||||
success = true;
|
||||
break;
|
||||
}
|
||||
|
||||
Log.i(TAG, "Attempting strategy: " + strategy.toString() + logSuffix(startTime));
|
||||
|
||||
if (strategy.execute()) {
|
||||
Log.i(TAG, "Strategy succeeded: " + strategy.toString() + logSuffix(startTime));
|
||||
success = true;
|
||||
break;
|
||||
} else {
|
||||
Log.w(TAG, "Strategy failed: " + strategy.toString() + logSuffix(startTime));
|
||||
}
|
||||
}
|
||||
|
||||
if (success) {
|
||||
TextSecurePreferences.setNeedsMessagePull(context, false);
|
||||
} else {
|
||||
Log.w(TAG, "All strategies failed!" + logSuffix(startTime));
|
||||
}
|
||||
|
||||
return success;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return True if there is no need to execute a message fetch, because the websocket will take
|
||||
* care of it.
|
||||
*/
|
||||
public static boolean shouldIgnoreFetch() {
|
||||
return ApplicationDependencies.getAppForegroundObserver().isForegrounded();
|
||||
}
|
||||
|
||||
private static String logSuffix(long startTime) {
|
||||
return " (" + (System.currentTimeMillis() - startTime) + " ms elapsed)";
|
||||
}
|
||||
|
||||
private interface NoExceptionCloseable extends AutoCloseable {
|
||||
@Override
|
||||
void close();
|
||||
}
|
||||
}
|
||||
@@ -192,7 +192,7 @@ class IncomingMessageObserver(private val context: Application) {
|
||||
|
||||
private fun isConnectionNecessary(): Boolean {
|
||||
val timeIdle: Long
|
||||
val keepAliveEntries: Set<Map.Entry<String, Long>>
|
||||
val keepAliveEntries: Set<Pair<String, Long>>
|
||||
val appVisibleSnapshot: Boolean
|
||||
|
||||
lock.withLock {
|
||||
@@ -205,7 +205,7 @@ class IncomingMessageObserver(private val context: Application) {
|
||||
Log.d(TAG, "Removed old keep web socket open requests.")
|
||||
}
|
||||
|
||||
keepAliveEntries = keepAliveTokens.entries.toImmutableSet()
|
||||
keepAliveEntries = keepAliveTokens.entries.map { it.key to it.value }.toImmutableSet()
|
||||
}
|
||||
|
||||
val registered = SignalStore.account().isRegistered
|
||||
|
||||
@@ -1,49 +0,0 @@
|
||||
package org.thoughtcrime.securesms.messages;
|
||||
|
||||
import androidx.annotation.AnyThread;
|
||||
import androidx.annotation.NonNull;
|
||||
import androidx.annotation.WorkerThread;
|
||||
|
||||
import org.signal.core.util.logging.Log;
|
||||
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
|
||||
import org.thoughtcrime.securesms.jobmanager.Job;
|
||||
import org.thoughtcrime.securesms.jobmanager.JobManager;
|
||||
import org.thoughtcrime.securesms.jobmanager.JobTracker;
|
||||
import org.thoughtcrime.securesms.jobs.MarkerJob;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Implementations are responsible for fetching and processing a batch of messages.
|
||||
*/
|
||||
public abstract class MessageRetrievalStrategy {
|
||||
|
||||
/**
|
||||
* Fetches and processes any pending messages. This method should block until the messages are
|
||||
* actually stored and processed -- not just retrieved.
|
||||
*
|
||||
* @return True if everything was successful up until cancelation, false otherwise.
|
||||
*/
|
||||
@WorkerThread
|
||||
abstract boolean execute();
|
||||
|
||||
protected static class QueueFindingJobListener implements JobTracker.JobListener {
|
||||
private final Set<String> queues = new HashSet<>();
|
||||
|
||||
@Override
|
||||
@AnyThread
|
||||
public void onStateChanged(@NonNull Job job, @NonNull JobTracker.JobState jobState) {
|
||||
synchronized (queues) {
|
||||
queues.add(job.getParameters().getQueue());
|
||||
}
|
||||
}
|
||||
|
||||
@NonNull Set<String> getQueues() {
|
||||
synchronized (queues) {
|
||||
return new HashSet<>(queues);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,17 +1,32 @@
|
||||
package org.thoughtcrime.securesms.messages;
|
||||
|
||||
import android.app.Application;
|
||||
import android.content.Context;
|
||||
import android.os.PowerManager;
|
||||
import android.os.PowerManager.WakeLock;
|
||||
|
||||
import androidx.annotation.AnyThread;
|
||||
import androidx.annotation.NonNull;
|
||||
import androidx.annotation.WorkerThread;
|
||||
|
||||
import org.signal.core.util.Stopwatch;
|
||||
import org.signal.core.util.ThreadUtil;
|
||||
import org.signal.core.util.logging.Log;
|
||||
import org.thoughtcrime.securesms.ApplicationContext;
|
||||
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
|
||||
import org.thoughtcrime.securesms.jobmanager.Job;
|
||||
import org.thoughtcrime.securesms.jobmanager.JobManager;
|
||||
import org.thoughtcrime.securesms.jobmanager.JobTracker;
|
||||
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
|
||||
import org.thoughtcrime.securesms.jobs.MarkerJob;
|
||||
import org.thoughtcrime.securesms.jobs.PushProcessMessageJob;
|
||||
import org.thoughtcrime.securesms.keyvalue.SignalStore;
|
||||
import org.thoughtcrime.securesms.util.NetworkUtil;
|
||||
import org.thoughtcrime.securesms.util.PowerManagerCompat;
|
||||
import org.thoughtcrime.securesms.util.ServiceUtil;
|
||||
import org.thoughtcrime.securesms.util.WakeLockUtil;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
@@ -20,25 +35,46 @@ import java.util.concurrent.TimeUnit;
|
||||
/**
|
||||
* Retrieves messages over the websocket.
|
||||
*/
|
||||
public class WebSocketStrategy extends MessageRetrievalStrategy {
|
||||
public class WebSocketStrategy {
|
||||
|
||||
private static final String TAG = Log.tag(WebSocketStrategy.class);
|
||||
|
||||
private static final String KEEP_ALIVE_TOKEN = "WebsocketStrategy";
|
||||
private static final long QUEUE_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
|
||||
private static final long QUEUE_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
|
||||
private static final String WAKELOCK_PREFIX = "websocket-strategy-";
|
||||
|
||||
private final long websocketDrainTimeoutMs;
|
||||
public WebSocketStrategy() {
|
||||
this(TimeUnit.MINUTES.toMillis(1));
|
||||
}
|
||||
|
||||
public WebSocketStrategy(long websocketDrainTimeoutMs) {
|
||||
this.websocketDrainTimeoutMs = websocketDrainTimeoutMs;
|
||||
@WorkerThread
|
||||
public static boolean execute() {
|
||||
return execute(TimeUnit.MINUTES.toMillis(1));
|
||||
}
|
||||
|
||||
@WorkerThread
|
||||
@Override
|
||||
public boolean execute() {
|
||||
public static boolean execute(long websocketDrainTimeoutMs) {
|
||||
Application context = ApplicationDependencies.getApplication();
|
||||
IncomingMessageObserver observer = ApplicationDependencies.getIncomingMessageObserver();
|
||||
|
||||
PowerManager powerManager = ServiceUtil.getPowerManager(context);
|
||||
boolean doze = PowerManagerCompat.isDeviceIdleMode(powerManager);
|
||||
boolean network = NetworkUtil.isConnected(context);
|
||||
|
||||
if (doze || !network) {
|
||||
Log.w(TAG, "We may be operating in a constrained environment. Doze: " + doze + " Network: " + network);
|
||||
}
|
||||
|
||||
observer.registerKeepAliveToken(KEEP_ALIVE_TOKEN);
|
||||
|
||||
String wakeLockTag = WAKELOCK_PREFIX + System.currentTimeMillis();
|
||||
WakeLock wakeLock = WakeLockUtil.acquire(ApplicationDependencies.getApplication(), PowerManager.PARTIAL_WAKE_LOCK, websocketDrainTimeoutMs + QUEUE_TIMEOUT, wakeLockTag);
|
||||
|
||||
try {
|
||||
return drainAndProcess(websocketDrainTimeoutMs);
|
||||
} finally {
|
||||
WakeLockUtil.release(wakeLock, wakeLockTag);
|
||||
}
|
||||
}
|
||||
|
||||
@WorkerThread
|
||||
private static boolean drainAndProcess(long timeout) {
|
||||
Stopwatch stopwatch = new Stopwatch("websocket-strategy");
|
||||
IncomingMessageObserver observer = ApplicationDependencies.getIncomingMessageObserver();
|
||||
|
||||
@@ -49,7 +85,7 @@ public class WebSocketStrategy extends MessageRetrievalStrategy {
|
||||
|
||||
jobManager.addListener(job -> job.getParameters().getQueue() != null && job.getParameters().getQueue().startsWith(PushProcessMessageJob.QUEUE_PREFIX), queueListener);
|
||||
|
||||
if (!blockUntilWebsocketDrained(observer, websocketDrainTimeoutMs)) {
|
||||
if (!blockUntilWebsocketDrained(observer, timeout)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -115,8 +151,21 @@ public class WebSocketStrategy extends MessageRetrievalStrategy {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public @NonNull String toString() {
|
||||
return Log.tag(WebSocketStrategy.class);
|
||||
protected static class QueueFindingJobListener implements JobTracker.JobListener {
|
||||
private final Set<String> queues = new HashSet<>();
|
||||
|
||||
@Override
|
||||
@AnyThread
|
||||
public void onStateChanged(@NonNull Job job, @NonNull JobTracker.JobState jobState) {
|
||||
synchronized (queues) {
|
||||
queues.add(job.getParameters().getQueue());
|
||||
}
|
||||
}
|
||||
|
||||
@NonNull Set<String> getQueues() {
|
||||
synchronized (queues) {
|
||||
return new HashSet<>(queues);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,6 @@ import org.thoughtcrime.securesms.database.DatabaseObserver;
|
||||
import org.thoughtcrime.securesms.database.PendingRetryReceiptCache;
|
||||
import org.thoughtcrime.securesms.jobmanager.JobManager;
|
||||
import org.thoughtcrime.securesms.megaphone.MegaphoneRepository;
|
||||
import org.thoughtcrime.securesms.messages.BackgroundMessageRetriever;
|
||||
import org.thoughtcrime.securesms.messages.IncomingMessageObserver;
|
||||
import org.thoughtcrime.securesms.notifications.MessageNotifier;
|
||||
import org.thoughtcrime.securesms.payments.Payments;
|
||||
@@ -78,11 +77,6 @@ public class MockApplicationDependencyProvider implements ApplicationDependencie
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public @NonNull BackgroundMessageRetriever provideBackgroundMessageRetriever() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public @NonNull LiveRecipientCache provideRecipientCache() {
|
||||
return null;
|
||||
|
||||
Reference in New Issue
Block a user