Use a custom redis pubsub implementation rather than Jedis.

// FREEBIE
This commit is contained in:
Moxie Marlinspike
2015-03-16 15:05:33 -07:00
parent e79861c30a
commit c7e0cc1158
26 changed files with 1254 additions and 292 deletions

View File

@@ -24,6 +24,8 @@ import com.sun.jersey.api.client.Client;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.eclipse.jetty.servlets.CrossOriginFilter;
import org.skife.jdbi.v2.DBI;
import org.whispersystems.dispatch.DispatchChannel;
import org.whispersystems.dispatch.DispatchManager;
import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
import org.whispersystems.textsecuregcm.auth.FederatedPeerAuthenticator;
import org.whispersystems.textsecuregcm.auth.MultiBasicAuthProvider;
@@ -147,10 +149,11 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
Keys keys = database.onDemand(Keys.class);
Messages messages = messagedb.onDemand(Messages.class);
JedisPool cacheClient = new RedisClientFactory(config.getCacheConfiguration().getUrl()).getRedisClientPool();
JedisPool directoryClient = new RedisClientFactory(config.getDirectoryConfiguration().getUrl()).getRedisClientPool();
Client httpClient = new JerseyClientBuilder(environment).using(config.getJerseyClientConfiguration())
.build(getName());
RedisClientFactory cacheClientFactory = new RedisClientFactory(config.getCacheConfiguration().getUrl());
JedisPool cacheClient = cacheClientFactory.getRedisClientPool();
JedisPool directoryClient = new RedisClientFactory(config.getDirectoryConfiguration().getUrl()).getRedisClientPool();
Client httpClient = new JerseyClientBuilder(environment).using(config.getJerseyClientConfiguration())
.build(getName());
DirectoryManager directory = new DirectoryManager(directoryClient);
PendingAccountsManager pendingAccountsManager = new PendingAccountsManager(pendingAccounts, cacheClient);
@@ -159,7 +162,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
FederatedClientManager federatedClientManager = new FederatedClientManager(config.getFederationConfiguration());
MessagesManager messagesManager = new MessagesManager(messages);
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(messagesManager);
PubSubManager pubSubManager = new PubSubManager(cacheClient, deadLetterHandler);
DispatchManager dispatchManager = new DispatchManager(cacheClientFactory, Optional.<DispatchChannel>of(deadLetterHandler));
PubSubManager pubSubManager = new PubSubManager(cacheClient, dispatchManager);
PushServiceClient pushServiceClient = new PushServiceClient(httpClient, config.getPushConfiguration());
WebsocketSender websocketSender = new WebsocketSender(messagesManager, pubSubManager);
AccountAuthenticator deviceAuthenticator = new AccountAuthenticator(accountsManager);
@@ -173,6 +177,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
FeedbackHandler feedbackHandler = new FeedbackHandler(pushServiceClient, accountsManager);
Optional<byte[]> authorizationKey = config.getRedphoneConfiguration().getAuthorizationKey();
environment.lifecycle().manage(pubSubManager);
environment.lifecycle().manage(feedbackHandler);
AttachmentController attachmentController = new AttachmentController(rateLimiters, federatedClientManager, urlSigner);
@@ -263,5 +268,4 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
public static void main(String[] args) throws Exception {
new WhisperServerService().run(args);
}
}

View File

@@ -16,8 +16,14 @@
*/
package org.whispersystems.textsecuregcm.providers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.dispatch.io.RedisPubSubConnectionFactory;
import org.whispersystems.dispatch.redis.PubSubConnection;
import org.whispersystems.textsecuregcm.util.Util;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
@@ -25,29 +31,40 @@ import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Protocol;
public class RedisClientFactory {
public class RedisClientFactory implements RedisPubSubConnectionFactory {
private final Logger logger = LoggerFactory.getLogger(RedisClientFactory.class);
private final String host;
private final int port;
private final JedisPool jedisPool;
public RedisClientFactory(String url) throws URISyntaxException {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setTestOnBorrow(true);
URI redisURI = new URI(url);
String redisHost = redisURI.getHost();
int redisPort = redisURI.getPort();
String redisPassword = null;
URI redisURI = new URI(url);
if (!Util.isEmpty(redisURI.getUserInfo())) {
redisPassword = redisURI.getUserInfo().split(":",2)[1];
}
this.jedisPool = new JedisPool(poolConfig, redisHost, redisPort,
Protocol.DEFAULT_TIMEOUT, redisPassword);
this.host = redisURI.getHost();
this.port = redisURI.getPort();
this.jedisPool = new JedisPool(poolConfig, host, port,
Protocol.DEFAULT_TIMEOUT, null);
}
public JedisPool getRedisClientPool() {
return jedisPool;
}
@Override
public PubSubConnection connect() {
while (true) {
try {
Socket socket = new Socket(host, port);
return new PubSubConnection(socket);
} catch (IOException e) {
logger.warn("Error connecting", e);
Util.sleep(200);
}
}
}
}

View File

@@ -1,9 +0,0 @@
package org.whispersystems.textsecuregcm.storage;
import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage;
public interface PubSubListener {
public void onPubSubMessage(PubSubMessage outgoingMessage);
}

View File

@@ -1,177 +1,110 @@
package org.whispersystems.textsecuregcm.storage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.websocket.DeadLetterHandler;
import org.whispersystems.dispatch.DispatchChannel;
import org.whispersystems.dispatch.DispatchManager;
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import io.dropwizard.lifecycle.Managed;
import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage;
import redis.clients.jedis.BinaryJedisPubSub;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
public class PubSubManager {
public class PubSubManager implements Managed {
private static final byte[] KEEPALIVE_CHANNEL = "KEEPALIVE".getBytes();
private static final String KEEPALIVE_CHANNEL = "KEEPALIVE";
private final Logger logger = LoggerFactory.getLogger(PubSubManager.class);
private final SubscriptionListener baseListener = new SubscriptionListener();
private final Map<String, PubSubListener> listeners = new HashMap<>();
private final Executor threaded = Executors.newCachedThreadPool();
private final Logger logger = LoggerFactory.getLogger(PubSubManager.class);
private final DispatchManager dispatchManager;
private final JedisPool jedisPool;
private final DeadLetterHandler deadLetterHandler;
private boolean subscribed = false;
public PubSubManager(JedisPool jedisPool, DeadLetterHandler deadLetterHandler) {
public PubSubManager(JedisPool jedisPool, DispatchManager dispatchManager) {
this.dispatchManager = dispatchManager;
this.jedisPool = jedisPool;
this.deadLetterHandler = deadLetterHandler;
initializePubSubWorker();
waitForSubscription();
}
public synchronized void subscribe(WebsocketAddress address, PubSubListener listener) {
String serializedAddress = address.serialize();
@Override
public void start() throws Exception {
this.dispatchManager.start();
listeners.put(serializedAddress, listener);
baseListener.subscribe(serializedAddress.getBytes());
}
KeepaliveDispatchChannel keepaliveDispatchChannel = new KeepaliveDispatchChannel();
this.dispatchManager.subscribe(KEEPALIVE_CHANNEL, keepaliveDispatchChannel);
public synchronized void unsubscribe(WebsocketAddress address, PubSubListener listener) {
String serializedAddress = address.serialize();
if (listeners.get(serializedAddress) == listener) {
listeners.remove(serializedAddress);
baseListener.unsubscribe(serializedAddress.getBytes());
synchronized (this) {
while (!subscribed) wait(0);
}
new KeepaliveSender().start();
}
public synchronized boolean publish(WebsocketAddress address, PubSubMessage message) {
@Override
public void stop() throws Exception {
dispatchManager.shutdown();
}
public void subscribe(WebsocketAddress address, DispatchChannel channel) {
String serializedAddress = address.serialize();
dispatchManager.subscribe(serializedAddress, channel);
}
public void unsubscribe(WebsocketAddress address, DispatchChannel dispatchChannel) {
String serializedAddress = address.serialize();
dispatchManager.unsubscribe(serializedAddress, dispatchChannel);
}
public boolean publish(WebsocketAddress address, PubSubMessage message) {
return publish(address.serialize().getBytes(), message);
}
private synchronized boolean publish(byte[] channel, PubSubMessage message) {
private boolean publish(byte[] channel, PubSubMessage message) {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.publish(channel, message.toByteArray()) != 0;
}
}
private synchronized void resubscribeAll() {
for (String serializedAddress : listeners.keySet()) {
baseListener.subscribe(serializedAddress.getBytes());
}
}
private synchronized void waitForSubscription() {
try {
while (!subscribed) {
wait();
}
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
private void initializePubSubWorker() {
new Thread("PubSubListener") {
@Override
public void run() {
for (;;) {
logger.info("Starting Redis PubSub Subscriber...");
try (Jedis jedis = jedisPool.getResource()) {
jedis.subscribe(baseListener, KEEPALIVE_CHANNEL);
logger.warn("**** Unsubscribed from holding channel!!! ******");
} catch (Throwable t) {
logger.warn("*** SUBSCRIBER CONNECTION CLOSED", t);
}
}
}
}.start();
new Thread("PubSubKeepAlive") {
@Override
public void run() {
for (;;) {
try {
Thread.sleep(20000);
publish(KEEPALIVE_CHANNEL, PubSubMessage.newBuilder()
.setType(PubSubMessage.Type.KEEPALIVE)
.build());
} catch (Throwable e) {
logger.warn("KEEPALIVE PUBLISH EXCEPTION: ", e);
}
}
}
}.start();
}
private class SubscriptionListener extends BinaryJedisPubSub {
private class KeepaliveDispatchChannel implements DispatchChannel {
@Override
public void onMessage(final byte[] channel, final byte[] message) {
if (Arrays.equals(KEEPALIVE_CHANNEL, channel)) {
return;
}
final PubSubListener listener;
synchronized (PubSubManager.this) {
listener = listeners.get(new String(channel));
}
threaded.execute(new Runnable() {
@Override
public void run() {
try {
PubSubMessage receivedMessage = PubSubMessage.parseFrom(message);
if (listener != null) listener.onPubSubMessage(receivedMessage);
else deadLetterHandler.handle(channel, receivedMessage);
} catch (InvalidProtocolBufferException e) {
logger.warn("Error parsing PubSub protobuf", e);
}
}
});
public void onDispatchMessage(String channel, byte[] message) {
// Good
}
@Override
public void onPMessage(byte[] s, byte[] s2, byte[] s3) {
logger.warn("Received PMessage!");
}
@Override
public void onSubscribe(byte[] channel, int count) {
if (Arrays.equals(KEEPALIVE_CHANNEL, channel)) {
public void onDispatchSubscribed(String channel) {
if (KEEPALIVE_CHANNEL.equals(channel)) {
synchronized (PubSubManager.this) {
subscribed = true;
PubSubManager.this.notifyAll();
}
threaded.execute(new Runnable() {
@Override
public void run() {
resubscribeAll();
}
});
}
}
@Override
public void onUnsubscribe(byte[] s, int i) {}
public void onDispatchUnsubscribed(String channel) {
logger.warn("***** KEEPALIVE CHANNEL UNSUBSCRIBED *****");
}
}
private class KeepaliveSender extends Thread {
@Override
public void onPUnsubscribe(byte[] s, int i) {}
@Override
public void onPSubscribe(byte[] s, int i) {}
public void run() {
while (true) {
try {
Thread.sleep(20000);
publish(KEEPALIVE_CHANNEL.getBytes(), PubSubMessage.newBuilder()
.setType(PubSubMessage.Type.KEEPALIVE)
.build());
} catch (Throwable e) {
logger.warn("***** KEEPALIVE EXCEPTION ******", e);
}
}
}
}
}

View File

@@ -1,5 +1,8 @@
package org.whispersystems.textsecuregcm.websocket;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.push.PushSender;
@@ -8,14 +11,19 @@ import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.PubSubManager;
import org.whispersystems.textsecuregcm.storage.PubSubProtos;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Util;
import org.whispersystems.websocket.session.WebSocketSessionContext;
import org.whispersystems.websocket.setup.WebSocketConnectListener;
import static com.codahale.metrics.MetricRegistry.name;
public class AuthenticatedConnectListener implements WebSocketConnectListener {
private static final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
private static final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private static final Histogram durationHistogram = metricRegistry.histogram(name(WebSocketConnection.class, "connected_duration"));
private final AccountsManager accountsManager;
private final PushSender pushSender;
@@ -33,23 +41,22 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
@Override
public void onWebSocketConnect(WebSocketSessionContext context) {
Account account = context.getAuthenticated(Account.class).get();
Device device = account.getAuthenticatedDevice().get();
final Account account = context.getAuthenticated(Account.class).get();
final Device device = account.getAuthenticatedDevice().get();
final long connectTime = System.currentTimeMillis();
final WebsocketAddress address = new WebsocketAddress(account.getNumber(), device.getId());
final WebSocketConnection connection = new WebSocketConnection(accountsManager, pushSender,
messagesManager, account, device,
context.getClient());
updateLastSeen(account, device);
closeExistingDeviceConnection(account, device);
final WebSocketConnection connection = new WebSocketConnection(accountsManager, pushSender,
messagesManager, pubSubManager,
account, device,
context.getClient());
connection.onConnected();
pubSubManager.subscribe(address, connection);
context.addListener(new WebSocketSessionContext.WebSocketEventListener() {
@Override
public void onWebSocketClose(WebSocketSessionContext context, int statusCode, String reason) {
connection.onConnectionLost();
pubSubManager.unsubscribe(address, connection);
durationHistogram.update(System.currentTimeMillis() - connectTime);
}
});
}
@@ -60,12 +67,5 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
accountsManager.update(account);
}
}
private void closeExistingDeviceConnection(Account account, Device device) {
pubSubManager.publish(new WebsocketAddress(account.getNumber(), device.getId()),
PubSubProtos.PubSubMessage.newBuilder()
.setType(PubSubProtos.PubSubMessage.Type.CLOSE)
.build());
}
}

View File

@@ -3,11 +3,13 @@ package org.whispersystems.textsecuregcm.websocket;
import com.google.protobuf.InvalidProtocolBufferException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.dispatch.DispatchChannel;
import org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.PubSubProtos;
import org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage;
public class DeadLetterHandler {
public class DeadLetterHandler implements DispatchChannel {
private final Logger logger = LoggerFactory.getLogger(DeadLetterHandler.class);
@@ -17,14 +19,16 @@ public class DeadLetterHandler {
this.messagesManager = messagesManager;
}
public void handle(byte[] channel, PubSubProtos.PubSubMessage pubSubMessage) {
@Override
public void onDispatchMessage(String channel, byte[] data) {
try {
WebsocketAddress address = new WebsocketAddress(new String(channel));
logger.warn("Handling dead letter to: " + channel);
logger.warn("Handling dead letter to: " + address);
WebsocketAddress address = new WebsocketAddress(channel);
PubSubMessage pubSubMessage = PubSubMessage.parseFrom(data);
switch (pubSubMessage.getType().getNumber()) {
case PubSubProtos.PubSubMessage.Type.DELIVER_VALUE:
case PubSubMessage.Type.DELIVER_VALUE:
OutgoingMessageSignal message = OutgoingMessageSignal.parseFrom(pubSubMessage.getContent());
messagesManager.insert(address.getNumber(), address.getDeviceId(), message);
break;
@@ -36,4 +40,13 @@ public class DeadLetterHandler {
}
}
@Override
public void onDispatchSubscribed(String channel) {
logger.warn("DeadLetterHandler subscription notice! " + channel);
}
@Override
public void onDispatchUnsubscribed(String channel) {
logger.warn("DeadLetterHandler unsubscribe notice! " + channel);
}
}

View File

@@ -14,13 +14,15 @@ public class ProvisioningConnectListener implements WebSocketConnectListener {
@Override
public void onWebSocketConnect(WebSocketSessionContext context) {
final ProvisioningConnection connection = new ProvisioningConnection(pubSubManager, context.getClient());
connection.onConnected();
final ProvisioningConnection connection = new ProvisioningConnection(context.getClient());
final ProvisioningAddress provisioningAddress = ProvisioningAddress.generate();
pubSubManager.subscribe(provisioningAddress, connection);
context.addListener(new WebSocketSessionContext.WebSocketEventListener() {
@Override
public void onWebSocketClose(WebSocketSessionContext context, int statusCode, String reason) {
connection.onConnectionLost();
pubSubManager.unsubscribe(provisioningAddress, connection);
}
});
}

View File

@@ -4,58 +4,62 @@ import com.google.common.base.Optional;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.InvalidProtocolBufferException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.dispatch.DispatchChannel;
import org.whispersystems.textsecuregcm.entities.MessageProtos.ProvisioningUuid;
import org.whispersystems.textsecuregcm.storage.PubSubListener;
import org.whispersystems.textsecuregcm.storage.PubSubManager;
import org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage;
import org.whispersystems.websocket.WebSocketClient;
import org.whispersystems.websocket.messages.WebSocketResponseMessage;
public class ProvisioningConnection implements PubSubListener {
public class ProvisioningConnection implements DispatchChannel {
private final PubSubManager pubSubManager;
private final ProvisioningAddress provisioningAddress;
private final WebSocketClient client;
private final Logger logger = LoggerFactory.getLogger(ProvisioningConnection.class);
public ProvisioningConnection(PubSubManager pubSubManager, WebSocketClient client) {
this.pubSubManager = pubSubManager;
this.client = client;
this.provisioningAddress = ProvisioningAddress.generate();
private final WebSocketClient client;
public ProvisioningConnection(WebSocketClient client) {
this.client = client;
}
@Override
public void onPubSubMessage(PubSubMessage outgoingMessage) {
if (outgoingMessage.getType() == PubSubMessage.Type.DELIVER) {
Optional<byte[]> body = Optional.of(outgoingMessage.getContent().toByteArray());
public void onDispatchMessage(String channel, byte[] message) {
try {
PubSubMessage outgoingMessage = PubSubMessage.parseFrom(message);
ListenableFuture<WebSocketResponseMessage> response = client.sendRequest("PUT", "/v1/message", body);
if (outgoingMessage.getType() == PubSubMessage.Type.DELIVER) {
Optional<byte[]> body = Optional.of(outgoingMessage.getContent().toByteArray());
Futures.addCallback(response, new FutureCallback<WebSocketResponseMessage>() {
@Override
public void onSuccess(WebSocketResponseMessage webSocketResponseMessage) {
pubSubManager.unsubscribe(provisioningAddress, ProvisioningConnection.this);
client.close(1001, "All you get.");
}
ListenableFuture<WebSocketResponseMessage> response = client.sendRequest("PUT", "/v1/message", body);
@Override
public void onFailure(Throwable throwable) {
pubSubManager.unsubscribe(provisioningAddress, ProvisioningConnection.this);
client.close(1001, "That's all!");
}
});
Futures.addCallback(response, new FutureCallback<WebSocketResponseMessage>() {
@Override
public void onSuccess(WebSocketResponseMessage webSocketResponseMessage) {
client.close(1001, "All you get.");
}
@Override
public void onFailure(Throwable throwable) {
client.close(1001, "That's all!");
}
});
}
} catch (InvalidProtocolBufferException e) {
logger.warn("Protobuf Error: ", e);
}
}
public void onConnected() {
this.pubSubManager.subscribe(provisioningAddress, this);
@Override
public void onDispatchSubscribed(String channel) {
this.client.sendRequest("PUT", "/v1/address", Optional.of(ProvisioningUuid.newBuilder()
.setUuid(provisioningAddress.getAddress())
.setUuid(channel)
.build()
.toByteArray()));
}
public void onConnectionLost() {
this.pubSubManager.unsubscribe(provisioningAddress, this);
this.client.close(1001, "Done");
@Override
public void onDispatchUnsubscribed(String channel) {
this.client.close(1001, "Closed");
}
}

View File

@@ -10,6 +10,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.InvalidProtocolBufferException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.dispatch.DispatchChannel;
import org.whispersystems.textsecuregcm.entities.CryptoEncodingException;
import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage;
import org.whispersystems.textsecuregcm.push.NotPushRegisteredException;
@@ -19,8 +20,6 @@ import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.PubSubListener;
import org.whispersystems.textsecuregcm.storage.PubSubManager;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.websocket.WebSocketClient;
@@ -34,21 +33,16 @@ import static com.codahale.metrics.MetricRegistry.name;
import static org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal;
import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage;
public class WebSocketConnection implements PubSubListener {
public class WebSocketConnection implements DispatchChannel {
private static final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private static final Histogram durationHistogram = metricRegistry.histogram(name(WebSocketConnection.class, "connected_duration"));
private final AccountsManager accountsManager;
private final PushSender pushSender;
private final MessagesManager messagesManager;
private final PubSubManager pubSubManager;
private final Account account;
private final Device device;
private final WebsocketAddress address;
private final WebSocketClient client;
private long connectionStartTime;
@@ -56,7 +50,6 @@ public class WebSocketConnection implements PubSubListener {
public WebSocketConnection(AccountsManager accountsManager,
PushSender pushSender,
MessagesManager messagesManager,
PubSubManager pubSubManager,
Account account,
Device device,
WebSocketClient client)
@@ -64,27 +57,16 @@ public class WebSocketConnection implements PubSubListener {
this.accountsManager = accountsManager;
this.pushSender = pushSender;
this.messagesManager = messagesManager;
this.pubSubManager = pubSubManager;
this.account = account;
this.device = device;
this.client = client;
this.address = new WebsocketAddress(account.getNumber(), device.getId());
}
public void onConnected() {
connectionStartTime = System.currentTimeMillis();
pubSubManager.subscribe(address, this);
processStoredMessages();
}
public void onConnectionLost() {
durationHistogram.update(System.currentTimeMillis() - connectionStartTime);
pubSubManager.unsubscribe(address, this);
}
@Override
public void onPubSubMessage(PubSubMessage pubSubMessage) {
public void onDispatchMessage(String channel, byte[] message) {
try {
PubSubMessage pubSubMessage = PubSubMessage.parseFrom(message);
switch (pubSubMessage.getType().getNumber()) {
case PubSubMessage.Type.QUERY_DB_VALUE:
processStoredMessages();
@@ -92,10 +74,6 @@ public class WebSocketConnection implements PubSubListener {
case PubSubMessage.Type.DELIVER_VALUE:
sendMessage(OutgoingMessageSignal.parseFrom(pubSubMessage.getContent()), Optional.<Long>absent());
break;
case PubSubMessage.Type.CLOSE_VALUE:
client.close(1000, "OK");
pubSubManager.unsubscribe(address, this);
break;
default:
logger.warn("Unknown pubsub message: " + pubSubMessage.getType().getNumber());
}
@@ -104,6 +82,15 @@ public class WebSocketConnection implements PubSubListener {
}
}
@Override
public void onDispatchUnsubscribed(String channel) {
client.close(1000, "OK");
}
public void onDispatchSubscribed(String channel) {
processStoredMessages();
}
private void sendMessage(final OutgoingMessageSignal message,
final Optional<Long> storedMessageId)
{
@@ -180,4 +167,6 @@ public class WebSocketConnection implements PubSubListener {
sendMessage(message.second(), Optional.of(message.first()));
}
}
}