Switch to postgresql-backed message DB.

// FREEBIE
This commit is contained in:
Moxie Marlinspike
2015-01-29 13:25:33 -08:00
parent 45a0b74b89
commit 1f5ee36a6b
22 changed files with 585 additions and 803 deletions

View File

@@ -15,9 +15,10 @@ import org.whispersystems.textsecuregcm.push.TransientPushFailureException;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.PubSubListener;
import org.whispersystems.textsecuregcm.storage.PubSubManager;
import org.whispersystems.textsecuregcm.storage.StoredMessages;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.websocket.WebSocketClient;
import org.whispersystems.websocket.messages.WebSocketResponseMessage;
@@ -34,7 +35,7 @@ public class WebSocketConnection implements PubSubListener {
private final AccountsManager accountsManager;
private final PushSender pushSender;
private final StoredMessages storedMessages;
private final MessagesManager messagesManager;
private final PubSubManager pubSubManager;
private final Account account;
@@ -44,7 +45,7 @@ public class WebSocketConnection implements PubSubListener {
public WebSocketConnection(AccountsManager accountsManager,
PushSender pushSender,
StoredMessages storedMessages,
MessagesManager messagesManager,
PubSubManager pubSubManager,
Account account,
Device device,
@@ -52,7 +53,7 @@ public class WebSocketConnection implements PubSubListener {
{
this.accountsManager = accountsManager;
this.pushSender = pushSender;
this.storedMessages = storedMessages;
this.messagesManager = messagesManager;
this.pubSubManager = pubSubManager;
this.account = account;
this.device = device;
@@ -77,7 +78,7 @@ public class WebSocketConnection implements PubSubListener {
processStoredMessages();
break;
case PubSubMessage.Type.DELIVER_VALUE:
sendMessage(OutgoingMessageSignal.parseFrom(pubSubMessage.getContent()));
sendMessage(OutgoingMessageSignal.parseFrom(pubSubMessage.getContent()), Optional.<Long>absent());
break;
default:
logger.warn("Unknown pubsub message: " + pubSubMessage.getType().getNumber());
@@ -87,7 +88,9 @@ public class WebSocketConnection implements PubSubListener {
}
}
private void sendMessage(final OutgoingMessageSignal message) {
private void sendMessage(final OutgoingMessageSignal message,
final Optional<Long> storedMessageId)
{
try {
EncryptedOutgoingMessage encryptedMessage = new EncryptedOutgoingMessage(message, device.getSignalingKey());
Optional<byte[]> body = Optional.fromNullable(encryptedMessage.toByteArray());
@@ -98,16 +101,17 @@ public class WebSocketConnection implements PubSubListener {
public void onSuccess(@Nullable WebSocketResponseMessage response) {
boolean isReceipt = message.getType() == OutgoingMessageSignal.Type.RECEIPT_VALUE;
if (isSuccessResponse(response) && !isReceipt) {
sendDeliveryReceiptFor(message);
} else if (!isSuccessResponse(response)) {
if (isSuccessResponse(response)) {
if (storedMessageId.isPresent()) messagesManager.delete(storedMessageId.get());
if (!isReceipt) sendDeliveryReceiptFor(message);
} else if (!isSuccessResponse(response) & !storedMessageId.isPresent()) {
requeueMessage(message);
}
}
@Override
public void onFailure(@Nonnull Throwable throwable) {
requeueMessage(message);
if (!storedMessageId.isPresent()) requeueMessage(message);
}
private boolean isSuccessResponse(WebSocketResponseMessage response) {
@@ -124,7 +128,7 @@ public class WebSocketConnection implements PubSubListener {
pushSender.sendMessage(account, device, message);
} catch (NotPushRegisteredException | TransientPushFailureException e) {
logger.warn("requeueMessage", e);
storedMessages.insert(address, message);
messagesManager.insert(account.getNumber(), device.getId(), message);
}
}
@@ -153,10 +157,11 @@ public class WebSocketConnection implements PubSubListener {
}
private void processStoredMessages() {
List<OutgoingMessageSignal> messages = storedMessages.getMessagesForDevice(address);
List<Pair<Long, OutgoingMessageSignal>> messages = messagesManager.getMessagesForDevice(account.getNumber(),
device.getId());
for (OutgoingMessageSignal message : messages) {
sendMessage(message);
for (Pair<Long, OutgoingMessageSignal> message : messages) {
sendMessage(message.second(), Optional.of(message.first()));
}
}
}