mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-26 09:28:03 +01:00
Support for APN fallback retries.
// FREEBIE
This commit is contained in:
@@ -55,6 +55,7 @@ import org.whispersystems.textsecuregcm.metrics.NetworkSentGauge;
|
||||
import org.whispersystems.textsecuregcm.providers.RedisClientFactory;
|
||||
import org.whispersystems.textsecuregcm.providers.RedisHealthCheck;
|
||||
import org.whispersystems.textsecuregcm.providers.TimeProvider;
|
||||
import org.whispersystems.textsecuregcm.push.ApnFallbackManager;
|
||||
import org.whispersystems.textsecuregcm.push.FeedbackHandler;
|
||||
import org.whispersystems.textsecuregcm.push.PushSender;
|
||||
import org.whispersystems.textsecuregcm.push.PushServiceClient;
|
||||
@@ -170,15 +171,17 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
AccountAuthenticator deviceAuthenticator = new AccountAuthenticator(accountsManager);
|
||||
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), cacheClient);
|
||||
|
||||
ApnFallbackManager apnFallbackManager = new ApnFallbackManager(pushServiceClient);
|
||||
TwilioSmsSender twilioSmsSender = new TwilioSmsSender(config.getTwilioConfiguration());
|
||||
Optional<NexmoSmsSender> nexmoSmsSender = initializeNexmoSmsSender(config.getNexmoConfiguration());
|
||||
SmsSender smsSender = new SmsSender(twilioSmsSender, nexmoSmsSender, config.getTwilioConfiguration().isInternational());
|
||||
UrlSigner urlSigner = new UrlSigner(config.getS3Configuration());
|
||||
PushSender pushSender = new PushSender(pushServiceClient, websocketSender);
|
||||
PushSender pushSender = new PushSender(apnFallbackManager, pushServiceClient, websocketSender);
|
||||
ReceiptSender receiptSender = new ReceiptSender(accountsManager, pushSender, federatedClientManager);
|
||||
FeedbackHandler feedbackHandler = new FeedbackHandler(pushServiceClient, accountsManager);
|
||||
Optional<byte[]> authorizationKey = config.getRedphoneConfiguration().getAuthorizationKey();
|
||||
|
||||
environment.lifecycle().manage(apnFallbackManager);
|
||||
environment.lifecycle().manage(pubSubManager);
|
||||
environment.lifecycle().manage(feedbackHandler);
|
||||
|
||||
@@ -207,7 +210,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
if (config.getWebsocketConfiguration().isEnabled()) {
|
||||
WebSocketEnvironment webSocketEnvironment = new WebSocketEnvironment(environment, config, 90000);
|
||||
webSocketEnvironment.setAuthenticator(new WebSocketAccountAuthenticator(deviceAuthenticator));
|
||||
webSocketEnvironment.setConnectListener(new AuthenticatedConnectListener(accountsManager, pushSender, receiptSender, messagesManager, pubSubManager));
|
||||
webSocketEnvironment.setConnectListener(new AuthenticatedConnectListener(accountsManager, pushSender, receiptSender, messagesManager, pubSubManager, apnFallbackManager));
|
||||
webSocketEnvironment.jersey().register(new KeepAliveController(pubSubManager));
|
||||
|
||||
WebSocketEnvironment provisioningEnvironment = new WebSocketEnvironment(environment, config);
|
||||
|
||||
@@ -37,4 +37,22 @@ public class ApnMessage {
|
||||
this.message = message;
|
||||
this.voip = voip;
|
||||
}
|
||||
|
||||
public ApnMessage(ApnMessage copy, String apnId, boolean voip) {
|
||||
this.apnId = apnId;
|
||||
this.number = copy.number;
|
||||
this.deviceId = copy.deviceId;
|
||||
this.message = copy.message;
|
||||
this.voip = voip;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public String getApnId() {
|
||||
return apnId;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public boolean isVoip() {
|
||||
return voip;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,150 @@
|
||||
package org.whispersystems.textsecuregcm.push;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.entities.ApnMessage;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.dropwizard.lifecycle.Managed;
|
||||
|
||||
public class ApnFallbackManager implements Managed, Runnable {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(ApnFallbackManager.class);
|
||||
|
||||
private final ApnFallbackTaskQueue taskQueue = new ApnFallbackTaskQueue();
|
||||
private final PushServiceClient pushServiceClient;
|
||||
|
||||
public ApnFallbackManager(PushServiceClient pushServiceClient) {
|
||||
this.pushServiceClient = pushServiceClient;
|
||||
}
|
||||
|
||||
public void schedule(final WebsocketAddress address, ApnFallbackTask task) {
|
||||
taskQueue.put(address, task);
|
||||
}
|
||||
|
||||
public void cancel(WebsocketAddress address) {
|
||||
taskQueue.remove(address);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() throws Exception {
|
||||
new Thread(this).start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (true) {
|
||||
try {
|
||||
Entry<WebsocketAddress, ApnFallbackTask> taskEntry = taskQueue.get();
|
||||
ApnFallbackTask task = taskEntry.getValue();
|
||||
int retryCount = task.getRetryCount();
|
||||
|
||||
if (retryCount == 0) {
|
||||
pushServiceClient.send(task.getMessage());
|
||||
schedule(taskEntry.getKey(), new ApnFallbackTask(task.getApnId(), task.getMessage(),
|
||||
retryCount + 1, task.getDelay()));
|
||||
} else if (retryCount == 1) {
|
||||
pushServiceClient.send(new ApnMessage(task.getMessage(), task.getApnId(), false));
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
logger.warn("ApnFallbackThread", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class ApnFallbackTask {
|
||||
|
||||
private final long delay;
|
||||
private final long executionTime;
|
||||
private final String apnId;
|
||||
private final ApnMessage message;
|
||||
private final int retryCount;
|
||||
|
||||
public ApnFallbackTask(String apnId, ApnMessage message, int retryCount) {
|
||||
this(apnId, message, retryCount, TimeUnit.SECONDS.toMillis(15));
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public ApnFallbackTask(String apnId, ApnMessage message, int retryCount, long delay) {
|
||||
this.executionTime = System.currentTimeMillis() + delay;
|
||||
this.delay = delay;
|
||||
this.apnId = apnId;
|
||||
this.message = message;
|
||||
this.retryCount = retryCount;
|
||||
}
|
||||
|
||||
public String getApnId() {
|
||||
return apnId;
|
||||
}
|
||||
|
||||
public ApnMessage getMessage() {
|
||||
return message;
|
||||
}
|
||||
|
||||
public int getRetryCount() {
|
||||
return retryCount;
|
||||
}
|
||||
|
||||
public long getExecutionTime() {
|
||||
return executionTime;
|
||||
}
|
||||
|
||||
public long getDelay() {
|
||||
return delay;
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static class ApnFallbackTaskQueue {
|
||||
|
||||
private final LinkedHashMap<WebsocketAddress, ApnFallbackTask> tasks = new LinkedHashMap<>();
|
||||
|
||||
public Entry<WebsocketAddress, ApnFallbackTask> get() {
|
||||
while (true) {
|
||||
long timeDelta;
|
||||
|
||||
synchronized (tasks) {
|
||||
while (tasks.isEmpty()) Util.wait(tasks);
|
||||
|
||||
Iterator<Entry<WebsocketAddress, ApnFallbackTask>> iterator = tasks.entrySet().iterator();
|
||||
Entry<WebsocketAddress, ApnFallbackTask> nextTask = iterator.next();
|
||||
|
||||
timeDelta = nextTask.getValue().getExecutionTime() - System.currentTimeMillis();
|
||||
|
||||
if (timeDelta <= 0) {
|
||||
iterator.remove();
|
||||
return nextTask;
|
||||
}
|
||||
}
|
||||
|
||||
Util.sleep(timeDelta);
|
||||
}
|
||||
}
|
||||
|
||||
public void put(WebsocketAddress address, ApnFallbackTask task) {
|
||||
synchronized (tasks) {
|
||||
tasks.put(address, task);
|
||||
tasks.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
public void remove(WebsocketAddress address) {
|
||||
synchronized (tasks) {
|
||||
tasks.remove(address);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -22,11 +22,16 @@ import org.whispersystems.textsecuregcm.entities.ApnMessage;
|
||||
import org.whispersystems.textsecuregcm.entities.CryptoEncodingException;
|
||||
import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage;
|
||||
import org.whispersystems.textsecuregcm.entities.GcmMessage;
|
||||
import org.whispersystems.textsecuregcm.push.ApnFallbackManager.ApnFallbackTask;
|
||||
import org.whispersystems.textsecuregcm.push.WebsocketSender.DeliveryStatus;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
|
||||
import io.dropwizard.lifecycle.Managed;
|
||||
import static org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal;
|
||||
|
||||
public class PushSender {
|
||||
@@ -35,12 +40,14 @@ public class PushSender {
|
||||
|
||||
private static final String APN_PAYLOAD = "{\"aps\":{\"sound\":\"default\",\"badge\":%d,\"alert\":{\"loc-key\":\"APN_Message\"}}}";
|
||||
|
||||
private final PushServiceClient pushServiceClient;
|
||||
private final WebsocketSender webSocketSender;
|
||||
private final ApnFallbackManager apnFallbackManager;
|
||||
private final PushServiceClient pushServiceClient;
|
||||
private final WebsocketSender webSocketSender;
|
||||
|
||||
public PushSender(PushServiceClient pushServiceClient, WebsocketSender websocketSender) {
|
||||
this.pushServiceClient = pushServiceClient;
|
||||
this.webSocketSender = websocketSender;
|
||||
public PushSender(ApnFallbackManager apnFallbackManager, PushServiceClient pushServiceClient, WebsocketSender websocketSender) {
|
||||
this.apnFallbackManager = apnFallbackManager;
|
||||
this.pushServiceClient = pushServiceClient;
|
||||
this.webSocketSender = websocketSender;
|
||||
}
|
||||
|
||||
public void sendMessage(Account account, Device device, OutgoingMessageSignal message)
|
||||
@@ -106,6 +113,12 @@ public class PushSender {
|
||||
ApnMessage apnMessage = new ApnMessage(apnId, account.getNumber(), (int)device.getId(),
|
||||
String.format(APN_PAYLOAD, deliveryStatus.getMessageQueueDepth()),
|
||||
isVoip);
|
||||
|
||||
if (isVoip) {
|
||||
apnFallbackManager.schedule(new WebsocketAddress(account.getNumber(), device.getId()),
|
||||
new ApnFallbackTask(device.getApnId(), apnMessage, 0));
|
||||
}
|
||||
|
||||
pushServiceClient.send(apnMessage);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -115,12 +115,20 @@ public class Util {
|
||||
return parts;
|
||||
}
|
||||
|
||||
public static void sleep(int i) {
|
||||
public static void sleep(long i) {
|
||||
try {
|
||||
Thread.sleep(i);
|
||||
} catch (InterruptedException ie) {}
|
||||
}
|
||||
|
||||
public static void wait(Object object) {
|
||||
try {
|
||||
object.wait();
|
||||
} catch (InterruptedException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static long todayInMillis() {
|
||||
return TimeUnit.DAYS.toMillis(TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis()));
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.push.ApnFallbackManager;
|
||||
import org.whispersystems.textsecuregcm.push.PushSender;
|
||||
import org.whispersystems.textsecuregcm.push.ReceiptSender;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
@@ -25,22 +26,23 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
|
||||
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
private static final Histogram durationHistogram = metricRegistry.histogram(name(WebSocketConnection.class, "connected_duration"));
|
||||
|
||||
|
||||
private final AccountsManager accountsManager;
|
||||
private final PushSender pushSender;
|
||||
private final ReceiptSender receiptSender;
|
||||
private final MessagesManager messagesManager;
|
||||
private final PubSubManager pubSubManager;
|
||||
private final ApnFallbackManager apnFallbackManager;
|
||||
private final AccountsManager accountsManager;
|
||||
private final PushSender pushSender;
|
||||
private final ReceiptSender receiptSender;
|
||||
private final MessagesManager messagesManager;
|
||||
private final PubSubManager pubSubManager;
|
||||
|
||||
public AuthenticatedConnectListener(AccountsManager accountsManager, PushSender pushSender,
|
||||
ReceiptSender receiptSender, MessagesManager messagesManager,
|
||||
PubSubManager pubSubManager)
|
||||
PubSubManager pubSubManager, ApnFallbackManager apnFallbackManager)
|
||||
{
|
||||
this.accountsManager = accountsManager;
|
||||
this.pushSender = pushSender;
|
||||
this.receiptSender = receiptSender;
|
||||
this.messagesManager = messagesManager;
|
||||
this.pubSubManager = pubSubManager;
|
||||
this.accountsManager = accountsManager;
|
||||
this.pushSender = pushSender;
|
||||
this.receiptSender = receiptSender;
|
||||
this.messagesManager = messagesManager;
|
||||
this.pubSubManager = pubSubManager;
|
||||
this.apnFallbackManager = apnFallbackManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -53,6 +55,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
|
||||
messagesManager, account, device,
|
||||
context.getClient());
|
||||
|
||||
apnFallbackManager.cancel(address);
|
||||
updateLastSeen(account, device);
|
||||
pubSubManager.subscribe(address, connection);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user