Make stored messages REST accessible.

Add REST endpoints for retrieving and acknowledging pending
messges, beyond the WebSocket.

// FREEBIE
This commit is contained in:
Moxie Marlinspike
2015-04-15 16:19:07 -07:00
parent 53e7ffa311
commit db6ee8f687
14 changed files with 412 additions and 150 deletions

View File

@@ -6,6 +6,7 @@ import com.codahale.metrics.SharedMetricRegistries;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.push.PushSender;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
@@ -27,14 +28,17 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
private final AccountsManager accountsManager;
private final PushSender pushSender;
private final ReceiptSender receiptSender;
private final MessagesManager messagesManager;
private final PubSubManager pubSubManager;
public AuthenticatedConnectListener(AccountsManager accountsManager, PushSender pushSender,
MessagesManager messagesManager, PubSubManager pubSubManager)
ReceiptSender receiptSender, MessagesManager messagesManager,
PubSubManager pubSubManager)
{
this.accountsManager = accountsManager;
this.pushSender = pushSender;
this.receiptSender = receiptSender;
this.messagesManager = messagesManager;
this.pubSubManager = pubSubManager;
}
@@ -45,7 +49,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
final Device device = account.getAuthenticatedDevice().get();
final long connectTime = System.currentTimeMillis();
final WebsocketAddress address = new WebsocketAddress(account.getNumber(), device.getId());
final WebSocketConnection connection = new WebSocketConnection(accountsManager, pushSender,
final WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender,
messagesManager, account, device,
context.getClient());

View File

@@ -1,35 +1,33 @@
package org.whispersystems.textsecuregcm.websocket;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.dispatch.DispatchChannel;
import org.whispersystems.textsecuregcm.controllers.NoSuchUserException;
import org.whispersystems.textsecuregcm.entities.CryptoEncodingException;
import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
import org.whispersystems.textsecuregcm.push.NotPushRegisteredException;
import org.whispersystems.textsecuregcm.push.PushSender;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.push.TransientPushFailureException;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.websocket.WebSocketClient;
import org.whispersystems.websocket.messages.WebSocketResponseMessage;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
import static com.codahale.metrics.MetricRegistry.name;
import static org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal;
import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage;
@@ -37,7 +35,7 @@ public class WebSocketConnection implements DispatchChannel {
private static final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
private final AccountsManager accountsManager;
private final ReceiptSender receiptSender;
private final PushSender pushSender;
private final MessagesManager messagesManager;
@@ -45,17 +43,15 @@ public class WebSocketConnection implements DispatchChannel {
private final Device device;
private final WebSocketClient client;
private long connectionStartTime;
public WebSocketConnection(AccountsManager accountsManager,
PushSender pushSender,
public WebSocketConnection(PushSender pushSender,
ReceiptSender receiptSender,
MessagesManager messagesManager,
Account account,
Device device,
WebSocketClient client)
{
this.accountsManager = accountsManager;
this.pushSender = pushSender;
this.receiptSender = receiptSender;
this.messagesManager = messagesManager;
this.account = account;
this.device = device;
@@ -107,7 +103,7 @@ public class WebSocketConnection implements DispatchChannel {
if (isSuccessResponse(response)) {
if (storedMessageId.isPresent()) messagesManager.delete(storedMessageId.get());
if (!isReceipt) sendDeliveryReceiptFor(message);
} else if (!isSuccessResponse(response) & !storedMessageId.isPresent()) {
} else if (!isSuccessResponse(response) && !storedMessageId.isPresent()) {
requeueMessage(message);
}
}
@@ -137,34 +133,30 @@ public class WebSocketConnection implements DispatchChannel {
private void sendDeliveryReceiptFor(OutgoingMessageSignal message) {
try {
Optional<Account> source = accountsManager.get(message.getSource());
if (!source.isPresent()) {
logger.warn(String.format("Source account disappeared? (%s)", message.getSource()));
return;
}
OutgoingMessageSignal.Builder receipt =
OutgoingMessageSignal.newBuilder()
.setSource(account.getNumber())
.setSourceDevice((int) device.getId())
.setTimestamp(message.getTimestamp())
.setType(OutgoingMessageSignal.Type.RECEIPT_VALUE);
for (Device device : source.get().getDevices()) {
pushSender.sendMessage(source.get(), device, receipt.build());
}
} catch (NotPushRegisteredException | TransientPushFailureException e) {
logger.warn("sendDeliveryReceiptFor", "Delivery receipet", e);
receiptSender.sendReceipt(account, message.getSource(), message.getTimestamp(),
message.hasRelay() ? Optional.of(message.getRelay()) :
Optional.<String>absent());
} catch (IOException | NoSuchUserException | TransientPushFailureException | NotPushRegisteredException e) {
logger.warn("sendDeliveryReceiptFor", e);
}
}
private void processStoredMessages() {
List<Pair<Long, OutgoingMessageSignal>> messages = messagesManager.getMessagesForDevice(account.getNumber(),
device.getId());
List<OutgoingMessageEntity> messages = messagesManager.getMessagesForDevice(account.getNumber(), device.getId());
for (Pair<Long, OutgoingMessageSignal> message : messages) {
sendMessage(message.second(), Optional.of(message.first()));
for (OutgoingMessageEntity message : messages) {
OutgoingMessageSignal.Builder builder = OutgoingMessageSignal.newBuilder()
.setType(message.getType())
.setMessage(ByteString.copyFrom(message.getMessage()))
.setSourceDevice(message.getSourceDevice())
.setSource(message.getSource())
.setTimestamp(message.getTimestamp());
if (message.getRelay() != null && !message.getRelay().isEmpty()) {
builder.setRelay(message.getRelay());
}
sendMessage(builder.build(), Optional.of(message.getId()));
}
}