Retire ProvisioningAddress and WebsocketAddress

This commit is contained in:
Jon Chambers
2024-09-30 11:37:37 -04:00
committed by Jon Chambers
parent 1bb0eb0e70
commit b2211de8d8
8 changed files with 42 additions and 171 deletions

View File

@@ -28,8 +28,6 @@ import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguratio
import org.whispersystems.textsecuregcm.redis.RedisOperation;
import org.whispersystems.textsecuregcm.storage.PubSubProtos;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import org.whispersystems.textsecuregcm.websocket.InvalidWebsocketAddressException;
import org.whispersystems.textsecuregcm.websocket.ProvisioningAddress;
public class ProvisioningManager extends RedisPubSubAdapter<byte[], byte[]> implements Managed {
@@ -39,7 +37,7 @@ public class ProvisioningManager extends RedisPubSubAdapter<byte[], byte[]> impl
private final CircuitBreaker circuitBreaker;
private final Map<ProvisioningAddress, Consumer<PubSubProtos.PubSubMessage>> listenersByProvisioningAddress =
private final Map<String, Consumer<PubSubProtos.PubSubMessage>> listenersByProvisioningAddress =
new ConcurrentHashMap<>();
private static final String ACTIVE_LISTENERS_GAUGE_NAME = name(ProvisioningManager.class, "activeListeners");
@@ -82,21 +80,21 @@ public class ProvisioningManager extends RedisPubSubAdapter<byte[], byte[]> impl
redisClient.shutdown();
}
public void addListener(final ProvisioningAddress address, final Consumer<PubSubProtos.PubSubMessage> listener) {
public void addListener(final String address, final Consumer<PubSubProtos.PubSubMessage> listener) {
listenersByProvisioningAddress.put(address, listener);
circuitBreaker.executeRunnable(
() -> subscriptionConnection.sync().subscribe(address.serialize().getBytes(StandardCharsets.UTF_8)));
() -> subscriptionConnection.sync().subscribe(address.getBytes(StandardCharsets.UTF_8)));
}
public void removeListener(final ProvisioningAddress address) {
public void removeListener(final String address) {
RedisOperation.unchecked(() -> circuitBreaker.executeRunnable(
() -> subscriptionConnection.sync().unsubscribe(address.serialize().getBytes(StandardCharsets.UTF_8))));
() -> subscriptionConnection.sync().unsubscribe(address.getBytes(StandardCharsets.UTF_8))));
listenersByProvisioningAddress.remove(address);
}
public boolean sendProvisioningMessage(final ProvisioningAddress address, final byte[] body) {
public boolean sendProvisioningMessage(final String address, final byte[] body) {
final PubSubProtos.PubSubMessage pubSubMessage = PubSubProtos.PubSubMessage.newBuilder()
.setType(PubSubProtos.PubSubMessage.Type.DELIVER)
.setContent(ByteString.copyFrom(body))
@@ -104,7 +102,7 @@ public class ProvisioningManager extends RedisPubSubAdapter<byte[], byte[]> impl
final boolean receiverPresent = circuitBreaker.executeSupplier(
() -> publicationConnection.sync()
.publish(address.serialize().getBytes(StandardCharsets.UTF_8), pubSubMessage.toByteArray()) > 0);
.publish(address.getBytes(StandardCharsets.UTF_8), pubSubMessage.toByteArray()) > 0);
Metrics.counter(SEND_PROVISIONING_MESSAGE_COUNTER_NAME, "online", String.valueOf(receiverPresent)).increment();
@@ -114,7 +112,7 @@ public class ProvisioningManager extends RedisPubSubAdapter<byte[], byte[]> impl
@Override
public void message(final byte[] channel, final byte[] message) {
try {
final ProvisioningAddress address = new ProvisioningAddress(new String(channel, StandardCharsets.UTF_8));
final String address = new String(channel, StandardCharsets.UTF_8);
final PubSubProtos.PubSubMessage pubSubMessage = PubSubProtos.PubSubMessage.parseFrom(message);
if (pubSubMessage.getType() == PubSubProtos.PubSubMessage.Type.DELIVER) {
@@ -129,8 +127,6 @@ public class ProvisioningManager extends RedisPubSubAdapter<byte[], byte[]> impl
Metrics.counter(RECEIVE_PROVISIONING_MESSAGE_COUNTER_NAME, "listenerPresent", String.valueOf(listenerPresent)).increment();
}
} catch (final InvalidWebsocketAddressException e) {
logger.warn("Failed to parse provisioning address", e);
} catch (final InvalidProtocolBufferException e) {
logger.warn("Failed to parse pub/sub message", e);
}
@@ -138,10 +134,6 @@ public class ProvisioningManager extends RedisPubSubAdapter<byte[], byte[]> impl
@Override
public void unsubscribed(final byte[] channel, final long count) {
try {
listenersByProvisioningAddress.remove(new ProvisioningAddress(new String(channel)));
} catch (final InvalidWebsocketAddressException e) {
logger.warn("Failed to parse provisioning address for `unsubscribe` event", e);
}
listenersByProvisioningAddress.remove(new String(channel, StandardCharsets.UTF_8));
}
}