mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-21 01:28:03 +01:00
Refactor provisioning plumbing to use Lettuce
This commit is contained in:
committed by
Jon Chambers
parent
ae70d1113c
commit
11829d1f9f
@@ -5,37 +5,157 @@
|
||||
|
||||
package org.whispersystems.textsecuregcm.push;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import io.micrometer.core.instrument.Counter;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import org.whispersystems.textsecuregcm.storage.PubSubManager;
|
||||
import org.whispersystems.textsecuregcm.storage.PubSubProtos;
|
||||
import org.whispersystems.textsecuregcm.websocket.ProvisioningAddress;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
public class ProvisioningManager {
|
||||
private final PubSubManager pubSubManager;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import io.dropwizard.lifecycle.Managed;
|
||||
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
|
||||
import io.lettuce.core.RedisClient;
|
||||
import io.lettuce.core.api.StatefulRedisConnection;
|
||||
import io.lettuce.core.codec.ByteArrayCodec;
|
||||
import io.lettuce.core.pubsub.RedisPubSubAdapter;
|
||||
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
|
||||
import io.lettuce.core.resource.ClientResources;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Consumer;
|
||||
import io.micrometer.core.instrument.Tags;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.redis.RedisOperation;
|
||||
import org.whispersystems.textsecuregcm.storage.PubSubProtos;
|
||||
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
||||
import org.whispersystems.textsecuregcm.websocket.InvalidWebsocketAddressException;
|
||||
import org.whispersystems.textsecuregcm.websocket.ProvisioningAddress;
|
||||
|
||||
private final Counter provisioningMessageOnlineCounter = Metrics.counter(name(getClass(), "sendProvisioningMessage"), "online", "true");
|
||||
private final Counter provisioningMessageOfflineCounter = Metrics.counter(name(getClass(), "sendProvisioningMessage"), "online", "false");
|
||||
public class ProvisioningManager extends RedisPubSubAdapter<byte[], byte[]> implements Managed {
|
||||
|
||||
public ProvisioningManager(final PubSubManager pubSubManager) {
|
||||
this.pubSubManager = pubSubManager;
|
||||
}
|
||||
private final RedisClient redisClient;
|
||||
private final StatefulRedisPubSubConnection<byte[], byte[]> subscriptionConnection;
|
||||
private final StatefulRedisConnection<byte[], byte[]> publicationConnection;
|
||||
|
||||
public boolean sendProvisioningMessage(ProvisioningAddress address, byte[] body) {
|
||||
PubSubProtos.PubSubMessage pubSubMessage = PubSubProtos.PubSubMessage.newBuilder()
|
||||
.setType(PubSubProtos.PubSubMessage.Type.DELIVER)
|
||||
.setContent(ByteString.copyFrom(body))
|
||||
.build();
|
||||
private final CircuitBreaker circuitBreaker;
|
||||
|
||||
if (pubSubManager.publish(address, pubSubMessage)) {
|
||||
provisioningMessageOnlineCounter.increment();
|
||||
return true;
|
||||
} else {
|
||||
provisioningMessageOfflineCounter.increment();
|
||||
return false;
|
||||
private final Map<ProvisioningAddress, Consumer<PubSubProtos.PubSubMessage>> listenersByProvisioningAddress =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
private static final String ACTIVE_LISTENERS_GAUGE_NAME = name(ProvisioningManager.class, "activeListeners");
|
||||
|
||||
private static final String SEND_PROVISIONING_MESSAGE_COUNTER_NAME =
|
||||
name(ProvisioningManager.class, "sendProvisioningMessage");
|
||||
|
||||
private static final String RECEIVE_PROVISIONING_MESSAGE_COUNTER_NAME =
|
||||
name(ProvisioningManager.class, "receiveProvisioningMessage");
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(ProvisioningManager.class);
|
||||
|
||||
public ProvisioningManager(final String redisUri,
|
||||
final ClientResources clientResources,
|
||||
final Duration timeout,
|
||||
final CircuitBreakerConfiguration circuitBreakerConfiguration) {
|
||||
|
||||
this(RedisClient.create(clientResources, redisUri), timeout, circuitBreakerConfiguration);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
ProvisioningManager(final RedisClient redisClient,
|
||||
final Duration timeout,
|
||||
final CircuitBreakerConfiguration circuitBreakerConfiguration) {
|
||||
|
||||
this.redisClient = redisClient;
|
||||
this.redisClient.setDefaultTimeout(timeout);
|
||||
|
||||
this.subscriptionConnection = redisClient.connectPubSub(new ByteArrayCodec());
|
||||
this.publicationConnection = redisClient.connect(new ByteArrayCodec());
|
||||
|
||||
this.circuitBreaker = CircuitBreaker.of("pubsub-breaker", circuitBreakerConfiguration.toCircuitBreakerConfig());
|
||||
|
||||
CircuitBreakerUtil.registerMetrics(circuitBreaker, ProvisioningManager.class);
|
||||
|
||||
Metrics.gaugeMapSize(ACTIVE_LISTENERS_GAUGE_NAME, Tags.empty(), listenersByProvisioningAddress);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() throws Exception {
|
||||
subscriptionConnection.addListener(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws Exception {
|
||||
subscriptionConnection.removeListener(this);
|
||||
|
||||
subscriptionConnection.close();
|
||||
publicationConnection.close();
|
||||
|
||||
redisClient.shutdown();
|
||||
}
|
||||
|
||||
public void addListener(final ProvisioningAddress address, final Consumer<PubSubProtos.PubSubMessage> listener) {
|
||||
listenersByProvisioningAddress.put(address, listener);
|
||||
|
||||
circuitBreaker.executeRunnable(
|
||||
() -> subscriptionConnection.sync().subscribe(address.serialize().getBytes(StandardCharsets.UTF_8)));
|
||||
}
|
||||
|
||||
public void removeListener(final ProvisioningAddress address) {
|
||||
RedisOperation.unchecked(() -> circuitBreaker.executeRunnable(
|
||||
() -> subscriptionConnection.sync().unsubscribe(address.serialize().getBytes(StandardCharsets.UTF_8))));
|
||||
|
||||
listenersByProvisioningAddress.remove(address);
|
||||
}
|
||||
|
||||
public boolean sendProvisioningMessage(final ProvisioningAddress address, final byte[] body) {
|
||||
final PubSubProtos.PubSubMessage pubSubMessage = PubSubProtos.PubSubMessage.newBuilder()
|
||||
.setType(PubSubProtos.PubSubMessage.Type.DELIVER)
|
||||
.setContent(ByteString.copyFrom(body))
|
||||
.build();
|
||||
|
||||
final boolean receiverPresent = circuitBreaker.executeSupplier(
|
||||
() -> publicationConnection.sync()
|
||||
.publish(address.serialize().getBytes(StandardCharsets.UTF_8), pubSubMessage.toByteArray()) > 0);
|
||||
|
||||
Metrics.counter(SEND_PROVISIONING_MESSAGE_COUNTER_NAME, "online", String.valueOf(receiverPresent)).increment();
|
||||
|
||||
return receiverPresent;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void message(final byte[] channel, final byte[] message) {
|
||||
try {
|
||||
final ProvisioningAddress address = new ProvisioningAddress(new String(channel, StandardCharsets.UTF_8));
|
||||
final PubSubProtos.PubSubMessage pubSubMessage = PubSubProtos.PubSubMessage.parseFrom(message);
|
||||
|
||||
if (pubSubMessage.getType() == PubSubProtos.PubSubMessage.Type.DELIVER) {
|
||||
final Consumer<PubSubProtos.PubSubMessage> listener = listenersByProvisioningAddress.get(address);
|
||||
|
||||
boolean listenerPresent = false;
|
||||
|
||||
if (listener != null) {
|
||||
listenerPresent = true;
|
||||
listener.accept(pubSubMessage);
|
||||
}
|
||||
|
||||
Metrics.counter(RECEIVE_PROVISIONING_MESSAGE_COUNTER_NAME, "listenerPresent", String.valueOf(listenerPresent)).increment();
|
||||
}
|
||||
} catch (final InvalidWebsocketAddressException e) {
|
||||
logger.warn("Failed to parse provisioning address", e);
|
||||
} catch (final InvalidProtocolBufferException e) {
|
||||
logger.warn("Failed to parse pub/sub message", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unsubscribed(final byte[] channel, final long count) {
|
||||
try {
|
||||
listenersByProvisioningAddress.remove(new ProvisioningAddress(new String(channel)));
|
||||
} catch (final InvalidWebsocketAddressException e) {
|
||||
logger.warn("Failed to parse provisioning address for `unsubscribe` event", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user