Create a new system for fetching the intial batch of messages.

This commit is contained in:
Greyson Parrelli
2020-05-20 09:06:08 -04:00
parent d2bf539504
commit a299bafe89
17 changed files with 522 additions and 117 deletions

View File

@@ -0,0 +1,131 @@
package org.thoughtcrime.securesms.messages;
import android.content.Context;
import android.os.PowerManager;
import androidx.annotation.NonNull;
import androidx.annotation.WorkerThread;
import org.thoughtcrime.securesms.ApplicationContext;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.util.PowerManagerCompat;
import org.thoughtcrime.securesms.util.ServiceUtil;
import org.thoughtcrime.securesms.util.TextSecurePreferences;
import org.thoughtcrime.securesms.util.WakeLockUtil;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* Retrieves messages while the app is in the background via provided {@link MessageRetrievalStrategy}'s.
*/
public class BackgroundMessageRetriever {
private static final String TAG = Log.tag(BackgroundMessageRetriever.class);
private static final String WAKE_LOCK_TAG = "MessageRetriever";
private static final Semaphore ACTIVE_LOCK = new Semaphore(2);
private static final long CATCHUP_TIMEOUT = TimeUnit.SECONDS.toMillis(60);
private static final long NORMAL_TIMEOUT = TimeUnit.SECONDS.toMillis(10);
/**
* @return False if the retrieval failed and should be rescheduled, otherwise true.
*/
@WorkerThread
public boolean retrieveMessages(@NonNull Context context, MessageRetrievalStrategy... strategies) {
if (shouldIgnoreFetch(context)) {
Log.i(TAG, "Skipping retrieval -- app is in the foreground.");
return true;
}
if (!ACTIVE_LOCK.tryAcquire()) {
Log.i(TAG, "Skipping retrieval -- there's already one enqueued.");
return true;
}
synchronized (this) {
PowerManager.WakeLock wakeLock = null;
try {
wakeLock = WakeLockUtil.acquire(context, PowerManager.PARTIAL_WAKE_LOCK, TimeUnit.SECONDS.toMillis(60), WAKE_LOCK_TAG);
TextSecurePreferences.setNeedsMessagePull(context, true);
long startTime = System.currentTimeMillis();
PowerManager powerManager = ServiceUtil.getPowerManager(context);
boolean doze = PowerManagerCompat.isDeviceIdleMode(powerManager);
boolean network = new NetworkConstraint.Factory(ApplicationContext.getInstance(context)).create().isMet();
if (doze || !network) {
Log.w(TAG, "We may be operating in a constrained environment. Doze: " + doze + " Network: " + network);
}
if (ApplicationDependencies.getInitialMessageRetriever().isCaughtUp()) {
Log.i(TAG, "Performing normal message fetch.");
return executeBackgroundRetrieval(context, startTime, strategies);
} else {
Log.i(TAG, "Performing initial message fetch.");
InitialMessageRetriever.Result result = ApplicationDependencies.getInitialMessageRetriever().begin(CATCHUP_TIMEOUT);
if (result == InitialMessageRetriever.Result.SUCCESS) {
Log.i(TAG, "Initial message request was completed successfully. " + logSuffix(startTime));
TextSecurePreferences.setNeedsMessagePull(context, false);
return true;
} else {
Log.w(TAG, "Initial message fetch returned result " + result + ", so doing a normal message fetch.");
return executeBackgroundRetrieval(context, System.currentTimeMillis(), strategies);
}
}
} finally {
WakeLockUtil.release(wakeLock, WAKE_LOCK_TAG);
ACTIVE_LOCK.release();
}
}
}
private boolean executeBackgroundRetrieval(@NonNull Context context, long startTime, @NonNull MessageRetrievalStrategy[] strategies) {
boolean success = false;
for (MessageRetrievalStrategy strategy : strategies) {
if (shouldIgnoreFetch(context)) {
Log.i(TAG, "Stopping further strategy attempts -- app is in the foreground." + logSuffix(startTime));
success = true;
break;
}
Log.i(TAG, "Attempting strategy: " + strategy.toString() + logSuffix(startTime));
if (strategy.execute(NORMAL_TIMEOUT)) {
Log.i(TAG, "Strategy succeeded: " + strategy.toString() + logSuffix(startTime));
success = true;
break;
} else {
Log.w(TAG, "Strategy failed: " + strategy.toString() + logSuffix(startTime));
}
}
if (success) {
TextSecurePreferences.setNeedsMessagePull(context, false);
} else {
Log.w(TAG, "All strategies failed!" + logSuffix(startTime));
}
return success;
}
/**
* @return True if there is no need to execute a message fetch, because the websocket will take
* care of it.
*/
public static boolean shouldIgnoreFetch(@NonNull Context context) {
return ApplicationContext.getInstance(context).isAppVisible() &&
!ApplicationDependencies.getSignalServiceNetworkAccess().isCensored(context);
}
private static String logSuffix(long startTime) {
return " (" + (System.currentTimeMillis() - startTime) + " ms elapsed)";
}
}

View File

@@ -0,0 +1,217 @@
package org.thoughtcrime.securesms.messages;
import android.app.Service;
import androidx.lifecycle.DefaultLifecycleObserver;
import androidx.lifecycle.LifecycleOwner;
import androidx.lifecycle.ProcessLifecycleOwner;
import android.content.Context;
import android.content.Intent;
import android.os.IBinder;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.core.app.NotificationCompat;
import androidx.core.content.ContextCompat;
import org.thoughtcrime.securesms.messages.IncomingMessageProcessor.Processor;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.jobmanager.ConstraintObserver;
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraintObserver;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.ApplicationContext;
import org.thoughtcrime.securesms.R;
import org.thoughtcrime.securesms.notifications.NotificationChannels;
import org.thoughtcrime.securesms.push.SignalServiceNetworkAccess;
import org.thoughtcrime.securesms.util.TextSecurePreferences;
import org.whispersystems.libsignal.InvalidVersionException;
import org.whispersystems.signalservice.api.SignalServiceMessagePipe;
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class IncomingMessageObserver implements ConstraintObserver.Notifier {
private static final String TAG = IncomingMessageObserver.class.getSimpleName();
public static final int FOREGROUND_ID = 313399;
private static final long REQUEST_TIMEOUT_MINUTES = 1;
private static SignalServiceMessagePipe pipe = null;
private static SignalServiceMessagePipe unidentifiedPipe = null;
private final Context context;
private final NetworkConstraint networkConstraint;
private final SignalServiceNetworkAccess networkAccess;
private boolean appVisible;
public IncomingMessageObserver(@NonNull Context context) {
this.context = context;
this.networkConstraint = new NetworkConstraint.Factory(ApplicationContext.getInstance(context)).create();
this.networkAccess = ApplicationDependencies.getSignalServiceNetworkAccess();
new NetworkConstraintObserver(ApplicationContext.getInstance(context)).register(this);
new MessageRetrievalThread().start();
if (TextSecurePreferences.isFcmDisabled(context)) {
ContextCompat.startForegroundService(context, new Intent(context, ForegroundService.class));
}
ProcessLifecycleOwner.get().getLifecycle().addObserver(new DefaultLifecycleObserver() {
@Override
public void onStart(@NonNull LifecycleOwner owner) {
onAppForegrounded();
}
@Override
public void onStop(@NonNull LifecycleOwner owner) {
onAppBackgrounded();
}
});
ApplicationDependencies.getInitialMessageRetriever().addListener(this::onInitialRetrievalComplete);
}
@Override
public void onConstraintMet(@NonNull String reason) {
synchronized (this) {
notifyAll();
}
}
private synchronized void onAppForegrounded() {
appVisible = true;
notifyAll();
}
private synchronized void onAppBackgrounded() {
appVisible = false;
notifyAll();
}
private synchronized void onInitialRetrievalComplete() {
notifyAll();
}
private synchronized boolean isConnectionNecessary() {
boolean isGcmDisabled = TextSecurePreferences.isFcmDisabled(context);
Log.d(TAG, String.format("Network requirement: %s, app visible: %s, gcm disabled: %b",
networkConstraint.isMet(), appVisible, isGcmDisabled));
return TextSecurePreferences.isPushRegistered(context) &&
TextSecurePreferences.isWebsocketRegistered(context) &&
(appVisible || isGcmDisabled) &&
networkConstraint.isMet() &&
ApplicationDependencies.getInitialMessageRetriever().isCaughtUp() &&
!networkAccess.isCensored(context);
}
private synchronized void waitForConnectionNecessary() {
try {
while (!isConnectionNecessary()) wait();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
private void shutdown(SignalServiceMessagePipe pipe, SignalServiceMessagePipe unidentifiedPipe) {
try {
pipe.shutdown();
unidentifiedPipe.shutdown();
} catch (Throwable t) {
Log.w(TAG, t);
}
}
public static @Nullable SignalServiceMessagePipe getPipe() {
return pipe;
}
public static @Nullable SignalServiceMessagePipe getUnidentifiedPipe() {
return unidentifiedPipe;
}
private class MessageRetrievalThread extends Thread implements Thread.UncaughtExceptionHandler {
MessageRetrievalThread() {
super("MessageRetrievalService");
setUncaughtExceptionHandler(this);
}
@Override
public void run() {
while (true) {
Log.i(TAG, "Waiting for websocket state change....");
waitForConnectionNecessary();
Log.i(TAG, "Making websocket connection....");
SignalServiceMessageReceiver receiver = ApplicationDependencies.getSignalServiceMessageReceiver();
pipe = receiver.createMessagePipe();
unidentifiedPipe = receiver.createUnidentifiedMessagePipe();
SignalServiceMessagePipe localPipe = pipe;
SignalServiceMessagePipe unidentifiedLocalPipe = unidentifiedPipe;
try {
while (isConnectionNecessary()) {
try {
Log.i(TAG, "Reading message...");
localPipe.read(REQUEST_TIMEOUT_MINUTES, TimeUnit.MINUTES,
envelope -> {
Log.i(TAG, "Retrieved envelope! " + envelope.getTimestamp());
try (Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) {
processor.processEnvelope(envelope);
}
});
} catch (TimeoutException e) {
Log.w(TAG, "Application level read timeout...");
} catch (InvalidVersionException e) {
Log.w(TAG, e);
}
}
} catch (Throwable e) {
Log.w(TAG, e);
} finally {
Log.w(TAG, "Shutting down pipe...");
shutdown(localPipe, unidentifiedLocalPipe);
}
Log.i(TAG, "Looping...");
}
}
@Override
public void uncaughtException(Thread t, Throwable e) {
Log.w(TAG, "*** Uncaught exception!");
Log.w(TAG, e);
}
}
public static class ForegroundService extends Service {
@Override
public @Nullable IBinder onBind(Intent intent) {
return null;
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
super.onStartCommand(intent, flags, startId);
NotificationCompat.Builder builder = new NotificationCompat.Builder(getApplicationContext(), NotificationChannels.OTHER);
builder.setContentTitle(getApplicationContext().getString(R.string.MessageRetrievalService_signal));
builder.setContentText(getApplicationContext().getString(R.string.MessageRetrievalService_background_connection_enabled));
builder.setPriority(NotificationCompat.PRIORITY_MIN);
builder.setWhen(0);
builder.setSmallIcon(R.drawable.ic_signal_background_connection);
startForeground(FOREGROUND_ID, builder.build());
return Service.START_STICKY;
}
}
}

View File

@@ -0,0 +1,115 @@
package org.thoughtcrime.securesms.messages;
import android.content.Context;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import org.thoughtcrime.securesms.database.DatabaseFactory;
import org.thoughtcrime.securesms.database.MessagingDatabase.SyncMessageId;
import org.thoughtcrime.securesms.database.MmsSmsDatabase;
import org.thoughtcrime.securesms.database.PushDatabase;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.jobmanager.JobManager;
import org.thoughtcrime.securesms.jobs.PushDecryptMessageJob;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.recipients.Recipient;
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
import java.io.Closeable;
import java.util.Locale;
import java.util.concurrent.locks.ReentrantLock;
/**
* The central entry point for all envelopes that have been retrieved. Envelopes must be processed
* here to guarantee proper ordering.
*/
public class IncomingMessageProcessor {
private static final String TAG = Log.tag(IncomingMessageProcessor.class);
private final Context context;
private final ReentrantLock lock;
public IncomingMessageProcessor(@NonNull Context context) {
this.context = context;
this.lock = new ReentrantLock();
}
/**
* @return An instance of a Processor that will allow you to process messages in a thread safe
* way. Must be closed.
*/
public Processor acquire() {
lock.lock();
Thread current = Thread.currentThread();
Log.d(TAG, "Lock acquired by thread " + current.getId() + " (" + current.getName() + ")");
return new Processor(context);
}
private void release() {
Thread current = Thread.currentThread();
Log.d(TAG, "Lock about to be released by thread " + current.getId() + " (" + current.getName() + ")");
lock.unlock();
}
public class Processor implements Closeable {
private final Context context;
private final PushDatabase pushDatabase;
private final MmsSmsDatabase mmsSmsDatabase;
private final JobManager jobManager;
private Processor(@NonNull Context context) {
this.context = context;
this.pushDatabase = DatabaseFactory.getPushDatabase(context);
this.mmsSmsDatabase = DatabaseFactory.getMmsSmsDatabase(context);
this.jobManager = ApplicationDependencies.getJobManager();
}
/**
* @return The id of the {@link PushDecryptMessageJob} that was scheduled to process the message, if
* one was created. Otherwise null.
*/
public @Nullable String processEnvelope(@NonNull SignalServiceEnvelope envelope) {
if (envelope.hasSource()) {
Recipient.externalPush(context, envelope.getSourceAddress());
}
if (envelope.isReceipt()) {
processReceipt(envelope);
return null;
} else if (envelope.isPreKeySignalMessage() || envelope.isSignalMessage() || envelope.isUnidentifiedSender()) {
return processMessage(envelope);
} else {
Log.w(TAG, "Received envelope of unknown type: " + envelope.getType());
return null;
}
}
private @NonNull String processMessage(@NonNull SignalServiceEnvelope envelope) {
Log.i(TAG, "Received message. Inserting in PushDatabase.");
long id = pushDatabase.insert(envelope);
PushDecryptMessageJob job = new PushDecryptMessageJob(context, id);
jobManager.add(job);
return job.getId();
}
private void processReceipt(@NonNull SignalServiceEnvelope envelope) {
Log.i(TAG, String.format(Locale.ENGLISH, "Received receipt: (XXXXX, %d)", envelope.getTimestamp()));
mmsSmsDatabase.incrementDeliveryReceiptCount(new SyncMessageId(Recipient.externalPush(context, envelope.getSourceAddress()).getId(), envelope.getTimestamp()),
System.currentTimeMillis());
}
@Override
public void close() {
release();
}
}
}

View File

@@ -0,0 +1,151 @@
package org.thoughtcrime.securesms.messages;
import android.content.Context;
import androidx.annotation.AnyThread;
import androidx.annotation.NonNull;
import androidx.annotation.WorkerThread;
import org.thoughtcrime.securesms.ApplicationContext;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.util.concurrent.SignalExecutors;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* Fetches the first batch of messages, before anything else does.
*
* We have a separate process for fetching "initial" messages in order to have special behavior when
* catching up on a lot of messages after being offline for a while. It also gives us an opportunity
* to flag when we are "up-to-date" with our message queue.
*/
public class InitialMessageRetriever {
private static final String TAG = Log.tag(InitialMessageRetriever.class);
private static final int MAX_ATTEMPTS = 3;
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
private State state = State.NOT_CAUGHT_UP;
private final Object STATE_LOCK = new Object();
/**
* Only fires once. No need to remove. It will be called on an arbitrary worker thread.
*/
public void addListener(@NonNull Listener listener) {
synchronized (STATE_LOCK) {
if (state == State.CAUGHT_UP) {
listener.onCaughtUp();
} else {
listeners.add(listener);
}
}
}
/**
* Performs the initial fetch for messages (if necessary) with the requested timeout. The timeout
* is not just for the initial network request, but for the entire method call.
*
* @return A result describing how the operation completed.
*/
@WorkerThread
public @NonNull Result begin(long timeout) {
synchronized (STATE_LOCK) {
if (state == State.CAUGHT_UP) {
return Result.SKIPPED_ALREADY_CAUGHT_UP;
} else if (state == State.RUNNING) {
return Result.SKIPPED_ALREADY_RUNNING;
}
state = State.RUNNING;
}
long startTime = System.currentTimeMillis();
MessageRetrievalStrategy messageRetrievalStrategy = getRetriever();
CountDownLatch latch = new CountDownLatch(1);
SignalExecutors.UNBOUNDED.execute(() -> {
for (int i = 0; i < MAX_ATTEMPTS; i++) {
if (messageRetrievalStrategy.isCanceled()) {
Log.w(TAG, "Invalidated! Ending attempts.");
break;
}
boolean success = getRetriever().execute(timeout);
if (success) {
break;
} else {
Log.w(TAG, "Failed to catch up! Attempt " + (i + 1) + "/" + MAX_ATTEMPTS);
}
}
latch.countDown();
});
try {
boolean success = latch.await(timeout, TimeUnit.MILLISECONDS);
synchronized (STATE_LOCK) {
state = State.CAUGHT_UP;
for (Listener listener : listeners) {
listener.onCaughtUp();
}
listeners.clear();
}
if (success) {
Log.i(TAG, "Successfully caught up in " + (System.currentTimeMillis() - startTime) + " ms.");
return Result.SUCCESS;
} else {
Log.i(TAG, "Could not catch up completely. Hit the timeout of " + timeout + " ms.");
messageRetrievalStrategy.cancel();
return Result.FAILURE_TIMEOUT;
}
} catch (InterruptedException e) {
Log.w(TAG, "Interrupted!", e);
return Result.FAILURE_ERROR;
}
}
public boolean isCaughtUp() {
synchronized (STATE_LOCK) {
return state == State.CAUGHT_UP;
}
}
private @NonNull MessageRetrievalStrategy getRetriever() {
Context context = ApplicationDependencies.getApplication();
if (ApplicationContext.getInstance(context).isAppVisible() &&
!ApplicationDependencies.getSignalServiceNetworkAccess().isCensored(context))
{
return new WebsocketStrategy();
} else {
return new RestStrategy();
}
}
private enum State {
NOT_CAUGHT_UP, RUNNING, CAUGHT_UP
}
public enum Result {
SUCCESS, FAILURE_TIMEOUT, FAILURE_ERROR, SKIPPED_ALREADY_CAUGHT_UP, SKIPPED_ALREADY_RUNNING
}
public interface Listener {
@WorkerThread
void onCaughtUp();
}
}

View File

@@ -0,0 +1,91 @@
package org.thoughtcrime.securesms.messages;
import androidx.annotation.AnyThread;
import androidx.annotation.NonNull;
import androidx.annotation.WorkerThread;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobmanager.JobManager;
import org.thoughtcrime.securesms.jobmanager.JobTracker;
import org.thoughtcrime.securesms.jobs.MarkerJob;
import org.thoughtcrime.securesms.logging.Log;
import org.whispersystems.libsignal.util.guava.Optional;
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Implementations are responsible for fetching and processing a batch of messages.
*/
public abstract class MessageRetrievalStrategy {
private volatile boolean canceled;
/**
* Fetches and processes any pending messages. This method should block until the messages are
* actually stored and processed -- not just retrieved.
*
* @param timeout Hint for how long this will run. The strategy will also be canceled after the
* timeout ends, but having the timeout available may be useful for setting things
* like socket timeouts.
*
* @return True if everything was successful up until cancelation, false otherwise.
*/
@WorkerThread
abstract boolean execute(long timeout);
/**
* Marks the strategy as canceled. It is the responsibility of the implementation of
* {@link #execute(long)} to check {@link #isCanceled()} to know if execution should stop.
*/
void cancel() {
this.canceled = true;
}
protected boolean isCanceled() {
return canceled;
}
protected static void blockUntilQueueDrained(@NonNull String tag, @NonNull String queue, long timeoutMs) {
long startTime = System.currentTimeMillis();
final JobManager jobManager = ApplicationDependencies.getJobManager();
final MarkerJob markerJob = new MarkerJob(queue);
Optional<JobTracker.JobState> jobState = jobManager.runSynchronously(markerJob, timeoutMs);
if (!jobState.isPresent()) {
Log.w(tag, "Timed out waiting for " + queue + " job(s) to finish!");
}
long endTime = System.currentTimeMillis();
long duration = endTime - startTime;
Log.d(tag, "Waited " + duration + " ms for the " + queue + " job(s) to finish.");
}
protected static String timeSuffix(long startTime) {
return " (" + (System.currentTimeMillis() - startTime) + " ms elapsed)";
}
protected static class QueueFindingJobListener implements JobTracker.JobListener {
private final Set<String> queues = new HashSet<>();
@Override
@AnyThread
public void onStateChanged(@NonNull Job job, @NonNull JobTracker.JobState jobState) {
synchronized (queues) {
queues.add(job.getParameters().getQueue());
}
}
@NonNull Set<String> getQueues() {
synchronized (queues) {
return new HashSet<>(queues);
}
}
}
}

View File

@@ -0,0 +1,120 @@
package org.thoughtcrime.securesms.messages;
import androidx.annotation.NonNull;
import androidx.annotation.WorkerThread;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.jobmanager.JobManager;
import org.thoughtcrime.securesms.jobmanager.JobTracker;
import org.thoughtcrime.securesms.jobs.MarkerJob;
import org.thoughtcrime.securesms.jobs.PushDecryptMessageJob;
import org.thoughtcrime.securesms.jobs.PushProcessMessageJob;
import org.thoughtcrime.securesms.logging.Log;
import org.whispersystems.libsignal.util.guava.Optional;
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver;
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Retrieves messages over the REST endpoint.
*/
public class RestStrategy extends MessageRetrievalStrategy {
private static final String TAG = Log.tag(RestStrategy.class);
@WorkerThread
@Override
public boolean execute(long timeout) {
long startTime = System.currentTimeMillis();
JobManager jobManager = ApplicationDependencies.getJobManager();
QueueFindingJobListener queueListener = new QueueFindingJobListener();
try (IncomingMessageProcessor.Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) {
int jobCount = enqueuePushDecryptJobs(processor, startTime, timeout);
if (jobCount == 0) {
Log.d(TAG, "No PushDecryptMessageJobs were enqueued.");
return true;
} else {
Log.d(TAG, jobCount + " PushDecryptMessageJob(s) were enqueued.");
}
jobManager.addListener(job -> job.getParameters().getQueue() != null && job.getParameters().getQueue().startsWith(PushProcessMessageJob.QUEUE_PREFIX), queueListener);
long timeRemainingMs = blockUntilQueueDrained(PushDecryptMessageJob.QUEUE, TimeUnit.SECONDS.toMillis(10));
Set<String> processQueues = queueListener.getQueues();
Log.d(TAG, "Discovered " + processQueues.size() + " queue(s): " + processQueues);
if (timeRemainingMs > 0) {
Iterator<String> iter = processQueues.iterator();
while (iter.hasNext() && timeRemainingMs > 0) {
timeRemainingMs = blockUntilQueueDrained(iter.next(), timeRemainingMs);
}
if (timeRemainingMs <= 0) {
Log.w(TAG, "Ran out of time while waiting for queues to drain.");
}
} else {
Log.w(TAG, "Ran out of time before we could even wait on individual queues!");
}
return true;
} catch (IOException e) {
Log.w(TAG, "Failed to retrieve messages. Resetting the SignalServiceMessageReceiver.", e);
ApplicationDependencies.resetSignalServiceMessageReceiver();
return false;
} finally {
jobManager.removeListener(queueListener);
}
}
private static int enqueuePushDecryptJobs(IncomingMessageProcessor.Processor processor, long startTime, long timeout)
throws IOException
{
SignalServiceMessageReceiver receiver = ApplicationDependencies.getSignalServiceMessageReceiver();
AtomicInteger jobCount = new AtomicInteger(0);
receiver.setSoTimeoutMillis(timeout);
receiver.retrieveMessages(envelope -> {
Log.i(TAG, "Retrieved an envelope." + timeSuffix(startTime));
String jobId = processor.processEnvelope(envelope);
if (jobId != null) {
jobCount.incrementAndGet();
}
Log.i(TAG, "Successfully processed an envelope." + timeSuffix(startTime));
});
return jobCount.get();
}
private static long blockUntilQueueDrained(@NonNull String queue, long timeoutMs) {
long startTime = System.currentTimeMillis();
final JobManager jobManager = ApplicationDependencies.getJobManager();
final MarkerJob markerJob = new MarkerJob(queue);
Optional<JobTracker.JobState> jobState = jobManager.runSynchronously(markerJob, timeoutMs);
if (!jobState.isPresent()) {
Log.w(TAG, "Timed out waiting for " + queue + " job(s) to finish!");
}
long endTime = System.currentTimeMillis();
long duration = endTime - startTime;
Log.d(TAG, "Waited " + duration + " ms for the " + queue + " job(s) to finish.");
return timeoutMs - duration;
}
@Override
public @NonNull String toString() {
return RestStrategy.class.getSimpleName();
}
}

View File

@@ -0,0 +1,95 @@
package org.thoughtcrime.securesms.messages;
import androidx.annotation.NonNull;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.jobmanager.JobManager;
import org.thoughtcrime.securesms.jobs.PushProcessMessageJob;
import org.thoughtcrime.securesms.logging.Log;
import org.whispersystems.libsignal.InvalidVersionException;
import org.whispersystems.libsignal.util.guava.Optional;
import org.whispersystems.signalservice.api.SignalServiceMessagePipe;
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver;
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
class WebsocketStrategy extends MessageRetrievalStrategy {
private static final String TAG = Log.tag(WebsocketStrategy.class);
private final SignalServiceMessageReceiver receiver;
private final JobManager jobManager;
public WebsocketStrategy() {
this.receiver = ApplicationDependencies.getSignalServiceMessageReceiver();
this.jobManager = ApplicationDependencies.getJobManager();
}
@Override
public boolean execute(long timeout) {
long startTime = System.currentTimeMillis();
try {
Set<String> processJobQueues = drainWebsocket(timeout, startTime);
Iterator<String> queueIterator = processJobQueues.iterator();
long timeRemaining = Math.max(0, timeout - (System.currentTimeMillis() - startTime));
while (!isCanceled() && queueIterator.hasNext() && timeRemaining > 0) {
String queue = queueIterator.next();
blockUntilQueueDrained(TAG, queue, timeRemaining);
timeRemaining = Math.max(0, timeout - (System.currentTimeMillis() - startTime));
}
return true;
} catch (IOException e) {
Log.w(TAG, "Encountered an exception while draining the websocket.", e);
return false;
}
}
private @NonNull Set<String> drainWebsocket(long timeout, long startTime) throws IOException {
SignalServiceMessagePipe pipe = receiver.createMessagePipe();
QueueFindingJobListener queueListener = new QueueFindingJobListener();
jobManager.addListener(job -> job.getParameters().getQueue() != null && job.getParameters().getQueue().startsWith(PushProcessMessageJob.QUEUE_PREFIX), queueListener);
try {
while (shouldContinue()) {
try {
Optional<SignalServiceEnvelope> result = pipe.readOrEmpty(timeout, TimeUnit.MILLISECONDS, envelope -> {
Log.i(TAG, "Retrieved envelope! " + envelope.getTimestamp() + timeSuffix(startTime));
try (IncomingMessageProcessor.Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) {
processor.processEnvelope(envelope);
}
});
if (!result.isPresent()) {
Log.i(TAG, "Hit an empty response. Finished." + timeSuffix(startTime));
break;
}
} catch (TimeoutException e) {
Log.w(TAG, "Websocket timeout." + timeSuffix(startTime));
} catch (InvalidVersionException e) {
Log.w(TAG, e);
}
}
} finally {
pipe.shutdown();
jobManager.removeListener(queueListener);
}
return queueListener.getQueues();
}
private boolean shouldContinue() {
return !isCanceled();
}
}