Break out FaultTolerantPubSubConnection as its own thing so different use cases can have their own subscription space.

This commit is contained in:
Jon Chambers
2020-08-14 11:14:23 -04:00
committed by Jon Chambers
parent 20bbdf22c7
commit ae0f8df11b
7 changed files with 146 additions and 62 deletions

View File

@@ -15,6 +15,7 @@ import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.util.Constants;
@@ -42,8 +43,10 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
private final String managerId = UUID.randomUUID().toString();
private final String connectedClientSetKey = getConnectedClientSetKey(managerId);
private final FaultTolerantRedisCluster presenceCluster;
private final ClusterLuaScript clearPresenceScript;
private final FaultTolerantRedisCluster presenceCluster;
private final FaultTolerantPubSubConnection<String, String> pubSubConnection;
private final ClusterLuaScript clearPresenceScript;
private final ScheduledExecutorService scheduledExecutorService;
private ScheduledFuture<?> pruneMissingPeersFuture;
@@ -65,8 +68,9 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
public ClientPresenceManager(final FaultTolerantRedisCluster presenceCluster, final ScheduledExecutorService scheduledExecutorService) throws IOException {
this.presenceCluster = presenceCluster;
this.scheduledExecutorService = scheduledExecutorService;
this.pubSubConnection = this.presenceCluster.createPubSubConnection();
this.clearPresenceScript = ClusterLuaScript.fromResource(presenceCluster, "lua/clear_presence.lua", ScriptOutputType.INTEGER);
this.scheduledExecutorService = scheduledExecutorService;
final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
metricRegistry.gauge(name(getClass(), "localClientCount"), () -> displacementListenersByPresenceKey::size);
@@ -79,9 +83,14 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
this.remoteDisplacementMeter = metricRegistry.meter(name(getClass(), "remoteDisplacement"));
}
@VisibleForTesting
FaultTolerantPubSubConnection<String, String> getPubSubConnection() {
return pubSubConnection;
}
@Override
public void start() {
presenceCluster.usePubSubConnection(connection -> {
pubSubConnection.usePubSubConnection(connection -> {
connection.addListener(this);
connection.getResources().eventBus().get()
.filter(event -> event instanceof ClusterTopologyChangedEvent)
@@ -103,7 +112,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
@Override
public void stop() {
presenceCluster.usePubSubConnection(connection -> connection.removeListener(this));
pubSubConnection.usePubSubConnection(connection -> connection.removeListener(this));
if (pruneMissingPeersFuture != null) {
pruneMissingPeersFuture.cancel(false);
@@ -118,7 +127,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
connection.sync().del(getConnectedClientSetKey(managerId));
});
presenceCluster.usePubSubConnection(connection -> connection.sync().masters().commands().unsubscribe(getManagerPresenceChannel(managerId)));
pubSubConnection.usePubSubConnection(connection -> connection.sync().masters().commands().unsubscribe(getManagerPresenceChannel(managerId)));
}
public void setPresent(final UUID accountUuid, final long deviceId, final DisplacedPresenceListener displacementListener) {
@@ -175,7 +184,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
private void subscribeForRemotePresenceChanges(final String presenceKey) {
final int slot = SlotHash.getSlot(presenceKey);
presenceCluster.usePubSubConnection(connection -> connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.MASTER) && node.hasSlot(slot))
pubSubConnection.usePubSubConnection(connection -> connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.MASTER) && node.hasSlot(slot))
.commands()
.subscribe(getKeyspaceNotificationChannel(presenceKey)));
}
@@ -187,7 +196,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
}
private void unsubscribeFromRemotePresenceChanges(final String presenceKey) {
presenceCluster.usePubSubConnection(connection -> connection.async().masters().commands().unsubscribe(getKeyspaceNotificationChannel(presenceKey)));
pubSubConnection.usePubSubConnection(connection -> connection.async().masters().commands().unsubscribe(getKeyspaceNotificationChannel(presenceKey)));
}
void pruneMissingPeers() {