Break out into a multi-module project

This commit is contained in:
Moxie Marlinspike
2019-04-20 21:56:20 -07:00
parent b41dde777e
commit d0d375aeb7
318 changed files with 255 additions and 215 deletions

View File

@@ -0,0 +1,187 @@
/*
* Copyright (C) 2013 Open WhisperSystems
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.whispersystems.textsecuregcm.push;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.ApnConfiguration;
import org.whispersystems.textsecuregcm.push.RetryingApnsClient.ApnResult;
import org.whispersystems.textsecuregcm.redis.RedisOperation;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.util.Constants;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static com.codahale.metrics.MetricRegistry.name;
import io.dropwizard.lifecycle.Managed;
public class APNSender implements Managed {
private final Logger logger = LoggerFactory.getLogger(APNSender.class);
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private static final Meter unregisteredEventStale = metricRegistry.meter(name(APNSender.class, "unregistered_event_stale"));
private static final Meter unregisteredEventFresh = metricRegistry.meter(name(APNSender.class, "unregistered_event_fresh"));
private ExecutorService executor;
private ApnFallbackManager fallbackManager;
private final AccountsManager accountsManager;
private final String bundleId;
private final boolean sandbox;
private final RetryingApnsClient apnsClient;
public APNSender(AccountsManager accountsManager, ApnConfiguration configuration)
throws IOException
{
this.accountsManager = accountsManager;
this.bundleId = configuration.getBundleId();
this.sandbox = configuration.isSandboxEnabled();
this.apnsClient = new RetryingApnsClient(configuration.getPushCertificate(),
configuration.getPushKey(),
sandbox);
}
@VisibleForTesting
public APNSender(ExecutorService executor, AccountsManager accountsManager, RetryingApnsClient apnsClient, String bundleId, boolean sandbox) {
this.executor = executor;
this.accountsManager = accountsManager;
this.apnsClient = apnsClient;
this.sandbox = sandbox;
this.bundleId = bundleId;
}
public ListenableFuture<ApnResult> sendMessage(final ApnMessage message) {
String topic = bundleId;
if (message.isVoip()) {
topic = topic + ".voip";
}
ListenableFuture<ApnResult> future = apnsClient.send(message.getApnId(), topic,
message.getMessage(),
new Date(message.getExpirationTime()));
Futures.addCallback(future, new FutureCallback<ApnResult>() {
@Override
public void onSuccess(@Nullable ApnResult result) {
if (result == null) {
logger.warn("*** RECEIVED NULL APN RESULT ***");
} else if (result.getStatus() == ApnResult.Status.NO_SUCH_USER) {
handleUnregisteredUser(message.getApnId(), message.getNumber(), message.getDeviceId());
} else if (result.getStatus() == ApnResult.Status.GENERIC_FAILURE) {
logger.warn("*** Got APN generic failure: " + result.getReason() + ", " + message.getNumber());
}
}
@Override
public void onFailure(@Nullable Throwable t) {
logger.warn("Got fatal APNS exception", t);
}
}, executor);
return future;
}
@Override
public void start() {
this.executor = Executors.newSingleThreadExecutor();
}
@Override
public void stop() {
this.executor.shutdown();
this.apnsClient.disconnect();
}
public void setApnFallbackManager(ApnFallbackManager fallbackManager) {
this.fallbackManager = fallbackManager;
}
private void handleUnregisteredUser(String registrationId, String number, long deviceId) {
// logger.info("Got APN Unregistered: " + number + "," + deviceId);
Optional<Account> account = accountsManager.get(number);
if (!account.isPresent()) {
logger.info("No account found: " + number);
unregisteredEventStale.mark();
return;
}
Optional<Device> device = account.get().getDevice(deviceId);
if (!device.isPresent()) {
logger.info("No device found: " + number);
unregisteredEventStale.mark();
return;
}
if (!registrationId.equals(device.get().getApnId()) &&
!registrationId.equals(device.get().getVoipApnId()))
{
logger.info("Registration ID does not match: " + registrationId + ", " + device.get().getApnId() + ", " + device.get().getVoipApnId());
unregisteredEventStale.mark();
return;
}
// if (registrationId.equals(device.get().getApnId())) {
// logger.info("APN Unregister APN ID matches! " + number + ", " + deviceId);
// } else if (registrationId.equals(device.get().getVoipApnId())) {
// logger.info("APN Unregister VoIP ID matches! " + number + ", " + deviceId);
// }
long tokenTimestamp = device.get().getPushTimestamp();
if (tokenTimestamp != 0 && System.currentTimeMillis() < tokenTimestamp + TimeUnit.SECONDS.toMillis(10))
{
logger.info("APN Unregister push timestamp is more recent: " + tokenTimestamp + ", " + number);
unregisteredEventStale.mark();
return;
}
// logger.info("APN Unregister timestamp matches: " + device.get().getApnId() + ", " + device.get().getVoipApnId());
// device.get().setApnId(null);
// device.get().setVoipApnId(null);
// device.get().setFetchesMessages(false);
// accountsManager.update(account.get());
// if (fallbackManager != null) {
// fallbackManager.cancel(new WebsocketAddress(number, deviceId));
// }
if (fallbackManager != null) {
RedisOperation.unchecked(() -> fallbackManager.cancel(account.get(), device.get()));
unregisteredEventFresh.mark();
}
}
}

View File

@@ -0,0 +1,273 @@
package org.whispersystems.textsecuregcm.push;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.RatioGauge;
import com.codahale.metrics.SharedMetricRegistries;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.redis.LuaScript;
import org.whispersystems.textsecuregcm.redis.RedisException;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.Util;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.codahale.metrics.MetricRegistry.name;
import io.dropwizard.lifecycle.Managed;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisException;
public class ApnFallbackManager implements Managed, Runnable {
private static final Logger logger = LoggerFactory.getLogger(ApnFallbackManager.class);
private static final String PENDING_NOTIFICATIONS_KEY = "PENDING_APN";
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private static final Meter delivered = metricRegistry.meter(name(ApnFallbackManager.class, "voip_delivered"));
private static final Meter sent = metricRegistry.meter(name(ApnFallbackManager.class, "voip_sent" ));
private static final Meter retry = metricRegistry.meter(name(ApnFallbackManager.class, "voip_retry"));
private static final Meter evicted = metricRegistry.meter(name(ApnFallbackManager.class, "voip_evicted"));
static {
metricRegistry.register(name(ApnFallbackManager.class, "voip_ratio"), new VoipRatioGauge(delivered, sent));
}
private final APNSender apnSender;
private final AccountsManager accountsManager;
private final ReplicatedJedisPool jedisPool;
private final InsertOperation insertOperation;
private final GetOperation getOperation;
private final RemoveOperation removeOperation;
private AtomicBoolean running = new AtomicBoolean(false);
private boolean finished;
public ApnFallbackManager(ReplicatedJedisPool jedisPool,
APNSender apnSender,
AccountsManager accountsManager)
throws IOException
{
this.apnSender = apnSender;
this.accountsManager = accountsManager;
this.jedisPool = jedisPool;
this.insertOperation = new InsertOperation(jedisPool);
this.getOperation = new GetOperation(jedisPool);
this.removeOperation = new RemoveOperation(jedisPool);
}
public void schedule(Account account, Device device) throws RedisException {
try {
sent.mark();
insertOperation.insert(account, device, System.currentTimeMillis() + (15 * 1000), (15 * 1000));
} catch (JedisException e) {
throw new RedisException(e);
}
}
public boolean isScheduled(Account account, Device device) throws RedisException {
try {
String endpoint = "apn_device::" + account.getNumber() + "::" + device.getId();
try (Jedis jedis = jedisPool.getReadResource()) {
return jedis.zscore(PENDING_NOTIFICATIONS_KEY, endpoint) != null;
}
} catch (JedisException e) {
throw new RedisException(e);
}
}
public void cancel(Account account, Device device) throws RedisException {
try {
if (removeOperation.remove(account, device)) {
delivered.mark();
}
} catch (JedisException e) {
throw new RedisException(e);
}
}
@Override
public synchronized void start() {
running.set(true);
new Thread(this).start();
}
@Override
public synchronized void stop() {
running.set(false);
while (!finished) Util.wait(this);
}
@Override
public void run() {
while (running.get()) {
try {
List<byte[]> pendingNotifications = getOperation.getPending(100);
for (byte[] pendingNotification : pendingNotifications) {
String numberAndDevice = new String(pendingNotification);
Optional<Pair<String, Long>> separated = getSeparated(numberAndDevice);
if (!separated.isPresent()) {
removeOperation.remove(numberAndDevice);
continue;
}
Optional<Account> account = accountsManager.get(separated.get().first());
if (!account.isPresent()) {
removeOperation.remove(numberAndDevice);
continue;
}
Optional<Device> device = account.get().getDevice(separated.get().second());
if (!device.isPresent()) {
removeOperation.remove(numberAndDevice);
continue;
}
String apnId = device.get().getVoipApnId();
if (apnId == null) {
removeOperation.remove(account.get(), device.get());
continue;
}
long deviceLastSeen = device.get().getLastSeen();
if (deviceLastSeen < System.currentTimeMillis() - TimeUnit.DAYS.toMillis(90)) {
evicted.mark();
removeOperation.remove(account.get(), device.get());
continue;
}
apnSender.sendMessage(new ApnMessage(apnId, separated.get().first(), separated.get().second(), true));
retry.mark();
}
} catch (Exception e) {
logger.warn("Exception while operating", e);
}
Util.sleep(1000);
}
synchronized (ApnFallbackManager.this) {
finished = true;
notifyAll();
}
}
private Optional<Pair<String, Long>> getSeparated(String encoded) {
try {
if (encoded == null) return Optional.empty();
String[] parts = encoded.split(":");
if (parts.length != 2) {
logger.warn("Got strange encoded number: " + encoded);
return Optional.empty();
}
return Optional.of(new Pair<>(parts[0], Long.parseLong(parts[1])));
} catch (NumberFormatException e) {
logger.warn("Badly formatted: " + encoded, e);
return Optional.empty();
}
}
private static class RemoveOperation {
private final LuaScript luaScript;
RemoveOperation(ReplicatedJedisPool jedisPool) throws IOException {
this.luaScript = LuaScript.fromResource(jedisPool, "lua/apn/remove.lua");
}
boolean remove(Account account, Device device) {
String endpoint = "apn_device::" + account.getNumber() + "::" + device.getId();
return remove(endpoint);
}
boolean remove(String endpoint) {
if (!PENDING_NOTIFICATIONS_KEY.equals(endpoint)) {
List<byte[]> keys = Arrays.asList(PENDING_NOTIFICATIONS_KEY.getBytes(), endpoint.getBytes());
List<byte[]> args = Collections.emptyList();
return ((long)luaScript.execute(keys, args)) > 0;
}
return false;
}
}
private static class GetOperation {
private final LuaScript luaScript;
GetOperation(ReplicatedJedisPool jedisPool) throws IOException {
this.luaScript = LuaScript.fromResource(jedisPool, "lua/apn/get.lua");
}
@SuppressWarnings("SameParameterValue")
List<byte[]> getPending(int limit) {
List<byte[]> keys = Arrays.asList(PENDING_NOTIFICATIONS_KEY.getBytes());
List<byte[]> args = Arrays.asList(String.valueOf(System.currentTimeMillis()).getBytes(), String.valueOf(limit).getBytes());
return (List<byte[]>) luaScript.execute(keys, args);
}
}
private static class InsertOperation {
private final LuaScript luaScript;
InsertOperation(ReplicatedJedisPool jedisPool) throws IOException {
this.luaScript = LuaScript.fromResource(jedisPool, "lua/apn/insert.lua");
}
public void insert(Account account, Device device, long timestamp, long interval) {
String endpoint = "apn_device::" + account.getNumber() + "::" + device.getId();
List<byte[]> keys = Arrays.asList(PENDING_NOTIFICATIONS_KEY.getBytes(), endpoint.getBytes());
List<byte[]> args = Arrays.asList(String.valueOf(timestamp).getBytes(), String.valueOf(interval).getBytes(),
account.getNumber().getBytes(), String.valueOf(device.getId()).getBytes());
luaScript.execute(keys, args);
}
}
private static class VoipRatioGauge extends RatioGauge {
private final Meter success;
private final Meter attempts;
private VoipRatioGauge(Meter success, Meter attempts) {
this.success = success;
this.attempts = attempts;
}
@Override
protected Ratio getRatio() {
return RatioGauge.Ratio.of(success.getFiveMinuteRate(), attempts.getFiveMinuteRate());
}
}
}

View File

@@ -0,0 +1,43 @@
package org.whispersystems.textsecuregcm.push;
public class ApnMessage {
public static final String APN_PAYLOAD = "{\"aps\":{\"sound\":\"default\",\"alert\":{\"loc-key\":\"APN_Message\"}}}";
public static final long MAX_EXPIRATION = Integer.MAX_VALUE * 1000L;
private final String apnId;
private final String number;
private final long deviceId;
private final boolean isVoip;
public ApnMessage(String apnId, String number, long deviceId, boolean isVoip) {
this.apnId = apnId;
this.number = number;
this.deviceId = deviceId;
this.isVoip = isVoip;
}
public boolean isVoip() {
return isVoip;
}
public String getApnId() {
return apnId;
}
public String getMessage() {
return APN_PAYLOAD;
}
public long getExpirationTime() {
return MAX_EXPIRATION;
}
public String getNumber() {
return number;
}
public long getDeviceId() {
return deviceId;
}
}

View File

@@ -0,0 +1,186 @@
package org.whispersystems.textsecuregcm.push;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.gcm.server.Message;
import org.whispersystems.gcm.server.Result;
import org.whispersystems.gcm.server.Sender;
import org.whispersystems.textsecuregcm.sqs.DirectoryQueue;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.util.Constants;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static com.codahale.metrics.MetricRegistry.name;
import io.dropwizard.lifecycle.Managed;
public class GCMSender implements Managed {
private final Logger logger = LoggerFactory.getLogger(GCMSender.class);
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Meter success = metricRegistry.meter(name(getClass(), "sent", "success"));
private final Meter failure = metricRegistry.meter(name(getClass(), "sent", "failure"));
private final Meter unregistered = metricRegistry.meter(name(getClass(), "sent", "unregistered"));
private final Meter canonical = metricRegistry.meter(name(getClass(), "sent", "canonical"));
private final Map<String, Meter> outboundMeters = new HashMap<String, Meter>() {{
put("receipt", metricRegistry.meter(name(getClass(), "outbound", "receipt")));
put("notification", metricRegistry.meter(name(getClass(), "outbound", "notification")));
}};
private final AccountsManager accountsManager;
private final Sender signalSender;
private final DirectoryQueue directoryQueue;
private ExecutorService executor;
public GCMSender(AccountsManager accountsManager, String signalKey, DirectoryQueue directoryQueue) {
this.accountsManager = accountsManager;
this.signalSender = new Sender(signalKey, 50);
this.directoryQueue = directoryQueue;
}
@VisibleForTesting
public GCMSender(AccountsManager accountsManager, Sender sender, DirectoryQueue directoryQueue, ExecutorService executor) {
this.accountsManager = accountsManager;
this.signalSender = sender;
this.directoryQueue = directoryQueue;
this.executor = executor;
}
public void sendMessage(GcmMessage message) {
Message.Builder builder = Message.newBuilder()
.withDestination(message.getGcmId())
.withPriority("high");
String key = message.isReceipt() ? "receipt" : "notification";
Message request = builder.withDataPart(key, "").build();
ListenableFuture<Result> future = signalSender.send(request, message);
markOutboundMeter(key);
Futures.addCallback(future, new FutureCallback<Result>() {
@Override
public void onSuccess(Result result) {
if (result.isUnregistered() || result.isInvalidRegistrationId()) {
handleBadRegistration(result);
} else if (result.hasCanonicalRegistrationId()) {
handleCanonicalRegistrationId(result);
} else if (!result.isSuccess()) {
handleGenericError(result);
} else {
success.mark();
}
}
@Override
public void onFailure(Throwable throwable) {
logger.warn("GCM Failed: " + throwable);
}
}, executor);
}
@Override
public void start() {
executor = Executors.newSingleThreadExecutor();
}
@Override
public void stop() throws IOException {
this.signalSender.stop();
this.executor.shutdown();
}
private void handleBadRegistration(Result result) {
GcmMessage message = (GcmMessage)result.getContext();
logger.warn("Got GCM unregistered notice! " + message.getGcmId());
Optional<Account> account = getAccountForEvent(message);
if (account.isPresent()) {
Device device = account.get().getDevice(message.getDeviceId()).get();
device.setGcmId(null);
device.setFetchesMessages(false);
accountsManager.update(account.get());
if (!account.get().isActive()) {
directoryQueue.deleteRegisteredUser(account.get().getNumber());
}
}
unregistered.mark();
}
private void handleCanonicalRegistrationId(Result result) {
GcmMessage message = (GcmMessage)result.getContext();
logger.warn(String.format("Actually received 'CanonicalRegistrationId' ::: (canonical=%s), (original=%s)",
result.getCanonicalRegistrationId(), message.getGcmId()));
Optional<Account> account = getAccountForEvent(message);
if (account.isPresent()) {
Device device = account.get().getDevice(message.getDeviceId()).get();
device.setGcmId(result.getCanonicalRegistrationId());
accountsManager.update(account.get());
}
canonical.mark();
}
private void handleGenericError(Result result) {
GcmMessage message = (GcmMessage)result.getContext();
logger.warn(String.format("Unrecoverable Error ::: (error=%s), (gcm_id=%s), " +
"(destination=%s), (device_id=%d)",
result.getError(), message.getGcmId(), message.getNumber(),
message.getDeviceId()));
failure.mark();
}
private Optional<Account> getAccountForEvent(GcmMessage message) {
Optional<Account> account = accountsManager.get(message.getNumber());
if (account.isPresent()) {
Optional<Device> device = account.get().getDevice(message.getDeviceId());
if (device.isPresent()) {
if (message.getGcmId().equals(device.get().getGcmId())) {
logger.info("GCM Unregister GCM ID matches!");
if (device.get().getPushTimestamp() == 0 || System.currentTimeMillis() > (device.get().getPushTimestamp() + TimeUnit.SECONDS.toMillis(10)))
{
logger.info("GCM Unregister Timestamp matches!");
return account;
}
}
}
}
return Optional.empty();
}
private void markOutboundMeter(String key) {
Meter meter = outboundMeters.get(key);
if (meter != null) meter.mark();
else logger.warn("Unknown outbound key: " + key);
}
}

View File

@@ -0,0 +1,32 @@
package org.whispersystems.textsecuregcm.push;
public class GcmMessage {
private final String gcmId;
private final String number;
private final int deviceId;
private final boolean receipt;
public GcmMessage(String gcmId, String number, int deviceId, boolean receipt) {
this.gcmId = gcmId;
this.number = number;
this.deviceId = deviceId;
this.receipt = receipt;
}
public String getGcmId() {
return gcmId;
}
public String getNumber() {
return number;
}
public boolean isReceipt() {
return receipt;
}
public int getDeviceId() {
return deviceId;
}
}

View File

@@ -0,0 +1,11 @@
package org.whispersystems.textsecuregcm.push;
public class NotPushRegisteredException extends Exception {
public NotPushRegisteredException(String s) {
super(s);
}
public NotPushRegisteredException(Exception e) {
super(e);
}
}

View File

@@ -0,0 +1,158 @@
/*
* Copyright (C) 2013 Open WhisperSystems
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.whispersystems.textsecuregcm.push;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.SharedMetricRegistries;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.push.WebsocketSender.DeliveryStatus;
import org.whispersystems.textsecuregcm.redis.RedisOperation;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.util.BlockingThreadPoolExecutor;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Util;
import java.util.concurrent.TimeUnit;
import static com.codahale.metrics.MetricRegistry.name;
import io.dropwizard.lifecycle.Managed;
import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
public class PushSender implements Managed {
@SuppressWarnings("unused")
private final Logger logger = LoggerFactory.getLogger(PushSender.class);
private final ApnFallbackManager apnFallbackManager;
private final GCMSender gcmSender;
private final APNSender apnSender;
private final WebsocketSender webSocketSender;
private final BlockingThreadPoolExecutor executor;
private final int queueSize;
public PushSender(ApnFallbackManager apnFallbackManager,
GCMSender gcmSender, APNSender apnSender,
WebsocketSender websocketSender, int queueSize)
{
this.apnFallbackManager = apnFallbackManager;
this.gcmSender = gcmSender;
this.apnSender = apnSender;
this.webSocketSender = websocketSender;
this.queueSize = queueSize;
this.executor = new BlockingThreadPoolExecutor(50, queueSize);
SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME)
.register(name(PushSender.class, "send_queue_depth"),
(Gauge<Integer>) executor::getSize);
}
public void sendMessage(final Account account, final Device device, final Envelope message, boolean online)
throws NotPushRegisteredException
{
if (device.getGcmId() == null && device.getApnId() == null && !device.getFetchesMessages()) {
throw new NotPushRegisteredException("No delivery possible!");
}
if (queueSize > 0) {
executor.execute(() -> sendSynchronousMessage(account, device, message, online));
} else {
sendSynchronousMessage(account, device, message, online);
}
}
public void sendQueuedNotification(Account account, Device device)
throws NotPushRegisteredException
{
if (device.getGcmId() != null) sendGcmNotification(account, device);
else if (device.getApnId() != null) sendApnNotification(account, device, true);
else if (!device.getFetchesMessages()) throw new NotPushRegisteredException("No notification possible!");
}
public WebsocketSender getWebSocketSender() {
return webSocketSender;
}
private void sendSynchronousMessage(Account account, Device device, Envelope message, boolean online) {
if (device.getGcmId() != null) sendGcmMessage(account, device, message, online);
else if (device.getApnId() != null) sendApnMessage(account, device, message, online);
else if (device.getFetchesMessages()) sendWebSocketMessage(account, device, message, online);
else throw new AssertionError();
}
private void sendGcmMessage(Account account, Device device, Envelope message, boolean online) {
DeliveryStatus deliveryStatus = webSocketSender.sendMessage(account, device, message, WebsocketSender.Type.GCM, online);
if (!deliveryStatus.isDelivered() && !online) {
sendGcmNotification(account, device);
}
}
private void sendGcmNotification(Account account, Device device) {
GcmMessage gcmMessage = new GcmMessage(device.getGcmId(), account.getNumber(),
(int)device.getId(), false);
gcmSender.sendMessage(gcmMessage);
}
private void sendApnMessage(Account account, Device device, Envelope outgoingMessage, boolean online) {
DeliveryStatus deliveryStatus = webSocketSender.sendMessage(account, device, outgoingMessage, WebsocketSender.Type.APN, online);
if (!deliveryStatus.isDelivered() && outgoingMessage.getType() != Envelope.Type.RECEIPT && !online) {
sendApnNotification(account, device, false);
}
}
private void sendApnNotification(Account account, Device device, boolean newOnly) {
ApnMessage apnMessage;
if (newOnly && RedisOperation.unchecked(() -> apnFallbackManager.isScheduled(account, device))) {
return;
}
if (!Util.isEmpty(device.getVoipApnId())) {
apnMessage = new ApnMessage(device.getVoipApnId(), account.getNumber(), device.getId(), true);
RedisOperation.unchecked(() -> apnFallbackManager.schedule(account, device));
} else {
apnMessage = new ApnMessage(device.getApnId(), account.getNumber(), device.getId(), false);
}
apnSender.sendMessage(apnMessage);
}
private void sendWebSocketMessage(Account account, Device device, Envelope outgoingMessage, boolean online)
{
webSocketSender.sendMessage(account, device, outgoingMessage, WebsocketSender.Type.WEB, online);
}
@Override
public void start() throws Exception {
apnSender.start();
gcmSender.start();
}
@Override
public void stop() throws Exception {
executor.shutdown();
executor.awaitTermination(5, TimeUnit.MINUTES);
apnSender.stop();
gcmSender.stop();
}
}

View File

@@ -0,0 +1,60 @@
package org.whispersystems.textsecuregcm.push;
import org.whispersystems.textsecuregcm.controllers.NoSuchUserException;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import java.util.Optional;
import java.util.Set;
public class ReceiptSender {
private final PushSender pushSender;
private final AccountsManager accountManager;
public ReceiptSender(AccountsManager accountManager,
PushSender pushSender)
{
this.accountManager = accountManager;
this.pushSender = pushSender;
}
public void sendReceipt(Account source, String destination, long messageId)
throws NoSuchUserException, NotPushRegisteredException
{
if (source.getNumber().equals(destination)) {
return;
}
Account destinationAccount = getDestinationAccount(destination);
Set<Device> destinationDevices = destinationAccount.getDevices();
Envelope.Builder message = Envelope.newBuilder()
.setSource(source.getNumber())
.setSourceDevice((int) source.getAuthenticatedDevice().get().getId())
.setTimestamp(messageId)
.setType(Envelope.Type.RECEIPT);
if (source.getRelay().isPresent()) {
message.setRelay(source.getRelay().get());
}
for (Device destinationDevice : destinationDevices) {
pushSender.sendMessage(destinationAccount, destinationDevice, message.build(), false);
}
}
private Account getDestinationAccount(String destination)
throws NoSuchUserException
{
Optional<Account> account = accountManager.get(destination);
if (!account.isPresent()) {
throw new NoSuchUserException(destination);
}
return account.get();
}
}

View File

@@ -0,0 +1,140 @@
package org.whispersystems.textsecuregcm.push;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.turo.pushy.apns.ApnsClient;
import com.turo.pushy.apns.ApnsClientBuilder;
import com.turo.pushy.apns.DeliveryPriority;
import com.turo.pushy.apns.PushNotificationResponse;
import com.turo.pushy.apns.metrics.dropwizard.DropwizardApnsClientMetricsListener;
import com.turo.pushy.apns.util.SimpleApnsPushNotification;
import org.bouncycastle.openssl.PEMReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.util.Constants;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.security.KeyPair;
import java.security.PrivateKey;
import java.security.cert.X509Certificate;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import static com.codahale.metrics.MetricRegistry.name;
import io.netty.util.concurrent.GenericFutureListener;
public class RetryingApnsClient {
private static final Logger logger = LoggerFactory.getLogger(RetryingApnsClient.class);
private final ApnsClient apnsClient;
RetryingApnsClient(String apnCertificate, String apnKey, boolean sandbox)
throws IOException
{
MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
DropwizardApnsClientMetricsListener metricsListener = new DropwizardApnsClientMetricsListener();
for (Map.Entry<String, Metric> entry : metricsListener.getMetrics().entrySet()) {
metricRegistry.register(name(getClass(), entry.getKey()), entry.getValue());
}
this.apnsClient = new ApnsClientBuilder().setClientCredentials(initializeCertificate(apnCertificate),
initializePrivateKey(apnKey), null)
.setMetricsListener(metricsListener)
.setApnsServer(sandbox ? ApnsClientBuilder.DEVELOPMENT_APNS_HOST : ApnsClientBuilder.PRODUCTION_APNS_HOST)
.build();
}
@VisibleForTesting
public RetryingApnsClient(ApnsClient apnsClient) {
this.apnsClient = apnsClient;
}
ListenableFuture<ApnResult> send(final String apnId, final String topic, final String payload, final Date expiration) {
SettableFuture<ApnResult> result = SettableFuture.create();
SimpleApnsPushNotification notification = new SimpleApnsPushNotification(apnId, topic, payload, expiration, DeliveryPriority.IMMEDIATE);
apnsClient.sendNotification(notification).addListener(new ResponseHandler(result));
return result;
}
void disconnect() {
apnsClient.close();
}
private static X509Certificate initializeCertificate(String pemCertificate) throws IOException {
PEMReader reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(pemCertificate.getBytes())));
return (X509Certificate) reader.readObject();
}
private static PrivateKey initializePrivateKey(String pemKey) throws IOException {
PEMReader reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(pemKey.getBytes())));
return ((KeyPair) reader.readObject()).getPrivate();
}
private static final class ResponseHandler implements GenericFutureListener<io.netty.util.concurrent.Future<PushNotificationResponse<SimpleApnsPushNotification>>> {
private final SettableFuture<ApnResult> future;
private ResponseHandler(SettableFuture<ApnResult> future) {
this.future = future;
}
@Override
public void operationComplete(io.netty.util.concurrent.Future<PushNotificationResponse<SimpleApnsPushNotification>> result) {
try {
PushNotificationResponse<SimpleApnsPushNotification> response = result.get();
if (response.isAccepted()) {
future.set(new ApnResult(ApnResult.Status.SUCCESS, null));
} else if ("Unregistered".equals(response.getRejectionReason()) ||
"BadDeviceToken".equals(response.getRejectionReason()))
{
future.set(new ApnResult(ApnResult.Status.NO_SUCH_USER, response.getRejectionReason()));
} else {
logger.warn("Got APN failure: " + response.getRejectionReason());
future.set(new ApnResult(ApnResult.Status.GENERIC_FAILURE, response.getRejectionReason()));
}
} catch (InterruptedException e) {
logger.warn("Interrupted exception", e);
future.setException(e);
} catch (ExecutionException e) {
logger.warn("Execution exception", e);
future.setException(e.getCause());
}
}
}
public static class ApnResult {
public enum Status {
SUCCESS, NO_SUCH_USER, GENERIC_FAILURE
}
private final Status status;
private final String reason;
ApnResult(Status status, String reason) {
this.status = status;
this.reason = reason;
}
public Status getStatus() {
return status;
}
public String getReason() {
return reason;
}
}
}

View File

@@ -0,0 +1,11 @@
package org.whispersystems.textsecuregcm.push;
public class TransientPushFailureException extends Exception {
public TransientPushFailureException(String s) {
super(s);
}
public TransientPushFailureException(Exception e) {
super(e);
}
}

View File

@@ -0,0 +1,133 @@
/*
* Copyright (C) 2014 Open WhisperSystems
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.whispersystems.textsecuregcm.push;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.PubSubManager;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.websocket.ProvisioningAddress;
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
import static com.codahale.metrics.MetricRegistry.name;
import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage;
public class WebsocketSender {
public enum Type {
APN,
GCM,
WEB
}
@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(WebsocketSender.class);
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Meter websocketRequeueMeter = metricRegistry.meter(name(getClass(), "ws_requeue"));
private final Meter websocketOnlineMeter = metricRegistry.meter(name(getClass(), "ws_online" ));
private final Meter websocketOfflineMeter = metricRegistry.meter(name(getClass(), "ws_offline" ));
private final Meter apnOnlineMeter = metricRegistry.meter(name(getClass(), "apn_online" ));
private final Meter apnOfflineMeter = metricRegistry.meter(name(getClass(), "apn_offline"));
private final Meter gcmOnlineMeter = metricRegistry.meter(name(getClass(), "gcm_online" ));
private final Meter gcmOfflineMeter = metricRegistry.meter(name(getClass(), "gcm_offline"));
private final Meter provisioningOnlineMeter = metricRegistry.meter(name(getClass(), "provisioning_online" ));
private final Meter provisioningOfflineMeter = metricRegistry.meter(name(getClass(), "provisioning_offline"));
private final MessagesManager messagesManager;
private final PubSubManager pubSubManager;
public WebsocketSender(MessagesManager messagesManager, PubSubManager pubSubManager) {
this.messagesManager = messagesManager;
this.pubSubManager = pubSubManager;
}
public DeliveryStatus sendMessage(Account account, Device device, Envelope message, Type channel, boolean online) {
WebsocketAddress address = new WebsocketAddress(account.getNumber(), device.getId());
PubSubMessage pubSubMessage = PubSubMessage.newBuilder()
.setType(PubSubMessage.Type.DELIVER)
.setContent(message.toByteString())
.build();
if (pubSubManager.publish(address, pubSubMessage)) {
if (channel == Type.APN) apnOnlineMeter.mark();
else if (channel == Type.GCM) gcmOnlineMeter.mark();
else websocketOnlineMeter.mark();
return new DeliveryStatus(true);
} else {
if (channel == Type.APN) apnOfflineMeter.mark();
else if (channel == Type.GCM) gcmOfflineMeter.mark();
else websocketOfflineMeter.mark();
if (!online) queueMessage(account, device, message);
return new DeliveryStatus(false);
}
}
public void queueMessage(Account account, Device device, Envelope message) {
websocketRequeueMeter.mark();
WebsocketAddress address = new WebsocketAddress(account.getNumber(), device.getId());
messagesManager.insert(account.getNumber(), device.getId(), message);
pubSubManager.publish(address, PubSubMessage.newBuilder()
.setType(PubSubMessage.Type.QUERY_DB)
.build());
}
public boolean sendProvisioningMessage(ProvisioningAddress address, byte[] body) {
PubSubMessage pubSubMessage = PubSubMessage.newBuilder()
.setType(PubSubMessage.Type.DELIVER)
.setContent(ByteString.copyFrom(body))
.build();
if (pubSubManager.publish(address, pubSubMessage)) {
provisioningOnlineMeter.mark();
return true;
} else {
provisioningOfflineMeter.mark();
return false;
}
}
static class DeliveryStatus {
private final boolean delivered;
DeliveryStatus(boolean delivered) {
this.delivered = delivered;
}
boolean isDelivered() {
return delivered;
}
}
}