mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-21 01:28:03 +01:00
Remove GCMSender
This commit is contained in:
committed by
Jon Chambers
parent
8956e1e0cf
commit
0a6d724f2c
@@ -1,191 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013-2020 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.push;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
import com.codahale.metrics.Meter;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.gcm.server.Message;
|
||||
import org.whispersystems.gcm.server.Result;
|
||||
import org.whispersystems.gcm.server.Sender;
|
||||
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
import org.whispersystems.textsecuregcm.util.SystemMapper;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
|
||||
public class GCMSender {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(GCMSender.class);
|
||||
|
||||
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
private final Meter success = metricRegistry.meter(name(getClass(), "sent", "success"));
|
||||
private final Meter failure = metricRegistry.meter(name(getClass(), "sent", "failure"));
|
||||
private final Meter unregistered = metricRegistry.meter(name(getClass(), "sent", "unregistered"));
|
||||
private final Meter canonical = metricRegistry.meter(name(getClass(), "sent", "canonical"));
|
||||
|
||||
private final String DOWNSTREAM_ERROR_COUNTER_NAME = name(GCMSender.class, "downstreamError");
|
||||
|
||||
private final Map<String, Meter> outboundMeters = new HashMap<>() {{
|
||||
put("receipt", metricRegistry.meter(name(getClass(), "outbound", "receipt")));
|
||||
put("notification", metricRegistry.meter(name(getClass(), "outbound", "notification")));
|
||||
put("challenge", metricRegistry.meter(name(getClass(), "outbound", "challenge")));
|
||||
put("rateLimitChallenge", metricRegistry.meter(name(getClass(), "outbound", "rateLimitChallenge")));
|
||||
}};
|
||||
|
||||
private final AccountsManager accountsManager;
|
||||
private final Sender signalSender;
|
||||
private final ExecutorService executor;
|
||||
|
||||
private final ExperimentEnrollmentManager experimentEnrollmentManager;
|
||||
private final FcmSender fcmSender;
|
||||
|
||||
public GCMSender(ExecutorService executor, AccountsManager accountsManager, String signalKey, ExperimentEnrollmentManager experimentEnrollmentManager, FcmSender fcmSender) {
|
||||
this(executor, accountsManager, new Sender(signalKey, SystemMapper.getMapper(), 6), experimentEnrollmentManager, fcmSender);
|
||||
|
||||
CircuitBreakerUtil.registerMetrics(metricRegistry, signalSender.getRetry(), Sender.class);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public GCMSender(ExecutorService executor, AccountsManager accountsManager, Sender sender, ExperimentEnrollmentManager experimentEnrollmentManager, FcmSender fcmSender) {
|
||||
this.accountsManager = accountsManager;
|
||||
this.signalSender = sender;
|
||||
this.executor = executor;
|
||||
this.experimentEnrollmentManager = experimentEnrollmentManager;
|
||||
this.fcmSender = fcmSender;
|
||||
}
|
||||
|
||||
public void sendMessage(GcmMessage message) {
|
||||
final boolean useFcmSender = message.getUuid()
|
||||
.map(uuid -> experimentEnrollmentManager.isEnrolled(uuid, "fcmSender"))
|
||||
.orElse(false);
|
||||
|
||||
if (useFcmSender) {
|
||||
fcmSender.sendMessage(message);
|
||||
} else {
|
||||
Message.Builder builder = Message.newBuilder()
|
||||
.withDestination(message.getGcmId())
|
||||
.withPriority("high");
|
||||
|
||||
String key;
|
||||
|
||||
switch (message.getType()) {
|
||||
case NOTIFICATION:
|
||||
key = "notification";
|
||||
break;
|
||||
case CHALLENGE:
|
||||
key = "challenge";
|
||||
break;
|
||||
case RATE_LIMIT_CHALLENGE:
|
||||
key = "rateLimitChallenge";
|
||||
break;
|
||||
default:
|
||||
throw new AssertionError();
|
||||
}
|
||||
|
||||
Message request = builder.withDataPart(key, message.getData().orElse("")).build();
|
||||
|
||||
CompletableFuture<Result> future = signalSender.send(request);
|
||||
markOutboundMeter(key);
|
||||
|
||||
future.handle((result, throwable) -> {
|
||||
if (result != null && message.getType() != GcmMessage.Type.CHALLENGE) {
|
||||
if (result.isUnregistered() || result.isInvalidRegistrationId()) {
|
||||
executor.submit(() -> handleBadRegistration(message));
|
||||
} else if (result.hasCanonicalRegistrationId()) {
|
||||
executor.submit(() -> handleCanonicalRegistrationId(message, result));
|
||||
} else if (!result.isSuccess()) {
|
||||
executor.submit(() -> handleGenericError(message, result));
|
||||
} else {
|
||||
success.mark();
|
||||
}
|
||||
} else {
|
||||
logger.warn("FCM Failed: " + throwable + ", " + throwable.getCause());
|
||||
}
|
||||
|
||||
return null;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void handleBadRegistration(GcmMessage message) {
|
||||
Optional<Account> account = getAccountForEvent(message);
|
||||
|
||||
if (account.isPresent()) {
|
||||
//noinspection OptionalGetWithoutIsPresent
|
||||
Device device = account.get().getDevice(message.getDeviceId()).get();
|
||||
|
||||
if (device.getUninstalledFeedbackTimestamp() == 0) {
|
||||
accountsManager.updateDevice(account.get(), message.getDeviceId(), d ->
|
||||
d.setUninstalledFeedbackTimestamp(Util.todayInMillis()));
|
||||
}
|
||||
}
|
||||
|
||||
unregistered.mark();
|
||||
}
|
||||
|
||||
private void handleCanonicalRegistrationId(GcmMessage message, Result result) {
|
||||
logger.warn("Actually received 'CanonicalRegistrationId' ::: (canonical={}}), (original={}})",
|
||||
result.getCanonicalRegistrationId(), message.getGcmId());
|
||||
|
||||
getAccountForEvent(message).ifPresent(account ->
|
||||
accountsManager.updateDevice(
|
||||
account,
|
||||
message.getDeviceId(),
|
||||
d -> d.setGcmId(result.getCanonicalRegistrationId())));
|
||||
|
||||
canonical.mark();
|
||||
}
|
||||
|
||||
private void handleGenericError(GcmMessage message, Result result) {
|
||||
logger.debug("Unrecoverable Error ::: (error={}}), (gcm_id={}}), (destination={}}), (device_id={}})",
|
||||
result.getError(), message.getGcmId(), message.getUuid(), message.getDeviceId());
|
||||
|
||||
Metrics.counter(DOWNSTREAM_ERROR_COUNTER_NAME, "code", result.getError()).increment();
|
||||
failure.mark();
|
||||
}
|
||||
|
||||
private Optional<Account> getAccountForEvent(GcmMessage message) {
|
||||
Optional<Account> account = message.getUuid().flatMap(accountsManager::getByAccountIdentifier);
|
||||
|
||||
if (account.isPresent()) {
|
||||
Optional<Device> device = account.get().getDevice(message.getDeviceId());
|
||||
|
||||
if (device.isPresent()) {
|
||||
if (message.getGcmId().equals(device.get().getGcmId())) {
|
||||
|
||||
if (device.get().getPushTimestamp() == 0 || System.currentTimeMillis() > (device.get().getPushTimestamp() + TimeUnit.SECONDS.toMillis(10))) {
|
||||
return account;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
private void markOutboundMeter(String key) {
|
||||
Meter meter = outboundMeters.get(key);
|
||||
|
||||
if (meter != null) meter.mark();
|
||||
else logger.warn("Unknown outbound key: " + key);
|
||||
}
|
||||
}
|
||||
@@ -38,7 +38,7 @@ public class MessageSender implements Managed {
|
||||
private final ApnFallbackManager apnFallbackManager;
|
||||
private final ClientPresenceManager clientPresenceManager;
|
||||
private final MessagesManager messagesManager;
|
||||
private final GCMSender gcmSender;
|
||||
private final FcmSender fcmSender;
|
||||
private final APNSender apnSender;
|
||||
private final PushLatencyManager pushLatencyManager;
|
||||
|
||||
@@ -50,14 +50,14 @@ public class MessageSender implements Managed {
|
||||
public MessageSender(ApnFallbackManager apnFallbackManager,
|
||||
ClientPresenceManager clientPresenceManager,
|
||||
MessagesManager messagesManager,
|
||||
GCMSender gcmSender,
|
||||
FcmSender fcmSender,
|
||||
APNSender apnSender,
|
||||
PushLatencyManager pushLatencyManager)
|
||||
{
|
||||
this.apnFallbackManager = apnFallbackManager;
|
||||
this.clientPresenceManager = clientPresenceManager;
|
||||
this.messagesManager = messagesManager;
|
||||
this.gcmSender = gcmSender;
|
||||
this.fcmSender = fcmSender;
|
||||
this.apnSender = apnSender;
|
||||
this.pushLatencyManager = pushLatencyManager;
|
||||
}
|
||||
@@ -122,7 +122,7 @@ public class MessageSender implements Managed {
|
||||
GcmMessage gcmMessage = new GcmMessage(device.getGcmId(), account.getUuid(),
|
||||
(int)device.getId(), GcmMessage.Type.NOTIFICATION, Optional.empty());
|
||||
|
||||
gcmSender.sendMessage(gcmMessage);
|
||||
fcmSender.sendMessage(gcmMessage);
|
||||
|
||||
RedisOperation.unchecked(() -> pushLatencyManager.recordPushSent(account.getUuid(), device.getId(), false));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user