Refactor direct connect delivery pipeline and message store.

1) Make message store contents more memory efficient.

2) Make notification pipeline simpler and more memory efficient.

3) Don't b64 encode websocket message bodies.

// FREEBIE
This commit is contained in:
Moxie Marlinspike
2014-12-06 15:30:26 -08:00
parent aa2a5ff929
commit 41d15b738b
26 changed files with 1581 additions and 326 deletions

View File

@@ -4,6 +4,7 @@ import com.google.common.base.Optional;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.websocket.auth.AuthenticationException;
import org.whispersystems.websocket.auth.WebSocketAuthenticator;

View File

@@ -1,13 +1,14 @@
package org.whispersystems.textsecuregcm.websocket;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.InvalidProtocolBufferException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.PendingMessage;
import org.whispersystems.textsecuregcm.entities.CryptoEncodingException;
import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage;
import org.whispersystems.textsecuregcm.push.NotPushRegisteredException;
import org.whispersystems.textsecuregcm.push.PushSender;
import org.whispersystems.textsecuregcm.push.TransientPushFailureException;
@@ -16,23 +17,20 @@ import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.PubSubListener;
import org.whispersystems.textsecuregcm.storage.PubSubManager;
import org.whispersystems.textsecuregcm.storage.PubSubMessage;
import org.whispersystems.textsecuregcm.storage.StoredMessages;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.websocket.WebSocketClient;
import org.whispersystems.websocket.messages.WebSocketResponseMessage;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
import static org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal;
import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage;
public class WebSocketConnection implements PubSubListener {
private static final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
private static final ObjectMapper objectMapper = SystemMapper.getMapper();
private static final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
private final AccountsManager accountsManager;
private final PushSender pushSender;
@@ -72,52 +70,56 @@ public class WebSocketConnection implements PubSubListener {
}
@Override
public void onPubSubMessage(PubSubMessage message) {
public void onPubSubMessage(PubSubMessage pubSubMessage) {
try {
switch (message.getType()) {
case PubSubMessage.TYPE_QUERY_DB:
switch (pubSubMessage.getType().getNumber()) {
case PubSubMessage.Type.QUERY_DB_VALUE:
processStoredMessages();
break;
case PubSubMessage.TYPE_DELIVER:
PendingMessage pendingMessage = objectMapper.readValue(message.getContents(),
PendingMessage.class);
sendMessage(pendingMessage);
case PubSubMessage.Type.DELIVER_VALUE:
sendMessage(OutgoingMessageSignal.parseFrom(pubSubMessage.getContent()));
break;
default:
logger.warn("Unknown pubsub message: " + message.getType());
logger.warn("Unknown pubsub message: " + pubSubMessage.getType().getNumber());
}
} catch (IOException e) {
logger.warn("Error deserializing PendingMessage", e);
} catch (InvalidProtocolBufferException e) {
logger.warn("Protobuf parse error", e);
}
}
private void sendMessage(final PendingMessage message) {
String content = message.getEncryptedOutgoingMessage();
Optional<byte[]> body = Optional.fromNullable(content.getBytes());
ListenableFuture<WebSocketResponseMessage> response = client.sendRequest("PUT", "/api/v1/message", body);
private void sendMessage(final OutgoingMessageSignal message) {
try {
EncryptedOutgoingMessage encryptedMessage = new EncryptedOutgoingMessage(message, device.getSignalingKey());
Optional<byte[]> body = Optional.fromNullable(encryptedMessage.toByteArray());
ListenableFuture<WebSocketResponseMessage> response = client.sendRequest("PUT", "/api/v1/message", body);
Futures.addCallback(response, new FutureCallback<WebSocketResponseMessage>() {
@Override
public void onSuccess(@Nullable WebSocketResponseMessage response) {
if (isSuccessResponse(response) && !message.isReceipt()) {
sendDeliveryReceiptFor(message);
} else if (!isSuccessResponse(response)) {
Futures.addCallback(response, new FutureCallback<WebSocketResponseMessage>() {
@Override
public void onSuccess(@Nullable WebSocketResponseMessage response) {
boolean isReceipt = message.getType() == OutgoingMessageSignal.Type.RECEIPT_VALUE;
if (isSuccessResponse(response) && !isReceipt) {
sendDeliveryReceiptFor(message);
} else if (!isSuccessResponse(response)) {
requeueMessage(message);
}
}
@Override
public void onFailure(@Nonnull Throwable throwable) {
requeueMessage(message);
}
}
@Override
public void onFailure(@Nonnull Throwable throwable) {
requeueMessage(message);
}
private boolean isSuccessResponse(WebSocketResponseMessage response) {
return response != null && response.getStatus() >= 200 && response.getStatus() < 300;
}
});
private boolean isSuccessResponse(WebSocketResponseMessage response) {
return response != null && response.getStatus() >= 200 && response.getStatus() < 300;
}
});
} catch (CryptoEncodingException e) {
logger.warn("Bad signaling key", e);
}
}
private void requeueMessage(PendingMessage message) {
private void requeueMessage(OutgoingMessageSignal message) {
try {
pushSender.sendMessage(account, device, message);
} catch (NotPushRegisteredException | TransientPushFailureException e) {
@@ -126,12 +128,12 @@ public class WebSocketConnection implements PubSubListener {
}
}
private void sendDeliveryReceiptFor(PendingMessage message) {
private void sendDeliveryReceiptFor(OutgoingMessageSignal message) {
try {
Optional<Account> source = accountsManager.get(message.getSender());
Optional<Account> source = accountsManager.get(message.getSource());
if (!source.isPresent()) {
logger.warn("Source account disappeared? (%s)", message.getSender());
logger.warn("Source account disappeared? (%s)", message.getSource());
return;
}
@@ -139,7 +141,7 @@ public class WebSocketConnection implements PubSubListener {
OutgoingMessageSignal.newBuilder()
.setSource(account.getNumber())
.setSourceDevice((int) device.getId())
.setTimestamp(message.getMessageId())
.setTimestamp(message.getTimestamp())
.setType(OutgoingMessageSignal.Type.RECEIPT_VALUE);
for (Device device : source.get().getDevices()) {
@@ -151,9 +153,9 @@ public class WebSocketConnection implements PubSubListener {
}
private void processStoredMessages() {
List<PendingMessage> messages = storedMessages.getMessagesForDevice(address);
List<OutgoingMessageSignal> messages = storedMessages.getMessagesForDevice(address);
for (PendingMessage message : messages) {
for (OutgoingMessageSignal message : messages) {
sendMessage(message);
}
}