Upgrade to dropwizard 0.7.

This commit is contained in:
Moxie Marlinspike
2014-05-07 13:50:40 -07:00
parent 5d169c523f
commit b433b9c879
47 changed files with 742 additions and 789 deletions

View File

@@ -1,9 +1,14 @@
package org.whispersystems.textsecuregcm.controllers;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.jetty.websocket.WebSocket;
import com.google.common.base.Optional;
import org.eclipse.jetty.websocket.api.CloseStatus;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
import org.whispersystems.textsecuregcm.entities.AcknowledgeWebsocketMessage;
import org.whispersystems.textsecuregcm.entities.IncomingWebsocketMessage;
import org.whispersystems.textsecuregcm.storage.Account;
@@ -22,111 +27,94 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
public class WebsocketController implements WebSocket.OnTextMessage, PubSubListener {
import io.dropwizard.auth.AuthenticationException;
import io.dropwizard.auth.basic.BasicCredentials;
public class WebsocketController implements WebSocketListener, PubSubListener {
private static final Logger logger = LoggerFactory.getLogger(WebsocketController.class);
private static final ObjectMapper mapper = new ObjectMapper();
private static final Map<Long, String> pendingMessages = new HashMap<>();
private final AccountAuthenticator accountAuthenticator;
private final StoredMessageManager storedMessageManager;
private final PubSubManager pubSubManager;
private final Account account;
private final Device device;
private Account account;
private Device device;
private Session session;
private Connection connection;
private long pendingMessageSequence;
private long pendingMessageSequence;
public WebsocketController(StoredMessageManager storedMessageManager,
PubSubManager pubSubManager,
Account account)
public WebsocketController(AccountAuthenticator accountAuthenticator,
StoredMessageManager storedMessageManager,
PubSubManager pubSubManager)
{
this.accountAuthenticator = accountAuthenticator;
this.storedMessageManager = storedMessageManager;
this.pubSubManager = pubSubManager;
this.account = account;
this.device = account.getAuthenticatedDevice().get();
}
@Override
public void onOpen(Connection connection) {
this.connection = connection;
pubSubManager.subscribe(new WebsocketAddress(this.account.getId(), this.device.getId()), this);
handleQueryDatabase();
}
@Override
public void onClose(int i, String s) {
handleClose();
public void onWebSocketConnect(Session session) {
try {
UpgradeRequest request = session.getUpgradeRequest();
Map<String, String[]> parameters = request.getParameterMap();
String[] usernames = parameters.get("login" );
String[] passwords = parameters.get("password");
if (usernames == null || usernames.length == 0 ||
passwords == null || passwords.length == 0)
{
session.close(new CloseStatus(401, "Unauthorized"));
return;
}
BasicCredentials credentials = new BasicCredentials(usernames[0], passwords[0]);
Optional<Account> account = accountAuthenticator.authenticate(credentials);
if (!account.isPresent()) {
session.close(new CloseStatus(401, "Unauthorized"));
return;
}
this.account = account.get();
this.device = account.get().getAuthenticatedDevice().get();
this.session = session;
this.session.setIdleTimeout(10 * 60 * 1000);
this.pubSubManager.subscribe(new WebsocketAddress(this.account.getId(),
this.device.getId()), this);
handleQueryDatabase();
} catch (AuthenticationException e) {
try { session.close(500, "Server Error");} catch (IOException e1) {}
} catch (IOException ioe) {
logger.info("Abrupt session close.");
}
}
@Override
public void onMessage(String body) {
public void onWebSocketText(String body) {
try {
IncomingWebsocketMessage incomingMessage = mapper.readValue(body, IncomingWebsocketMessage.class);
switch (incomingMessage.getType()) {
case IncomingWebsocketMessage.TYPE_ACKNOWLEDGE_MESSAGE: handleMessageAck(body); break;
case IncomingWebsocketMessage.TYPE_PING_MESSAGE: handlePing(); break;
default: handleClose(); break;
case IncomingWebsocketMessage.TYPE_ACKNOWLEDGE_MESSAGE:
handleMessageAck(body);
break;
default:
close(new CloseStatus(410, "Unknown Type"));
}
} catch (IOException e) {
logger.debug("Parse", e);
handleClose();
close(new CloseStatus(410, "Badly Formatted"));
}
}
@Override
public void onPubSubMessage(PubSubMessage outgoingMessage) {
switch (outgoingMessage.getType()) {
case PubSubMessage.TYPE_DELIVER: handleDeliverOutgoingMessage(outgoingMessage.getContents()); break;
case PubSubMessage.TYPE_QUERY_DB: handleQueryDatabase(); break;
default:
logger.warn("Unknown pubsub message: " + outgoingMessage.getType());
}
}
private void handleDeliverOutgoingMessage(String message) {
try {
long messageSequence;
synchronized (pendingMessages) {
messageSequence = pendingMessageSequence++;
pendingMessages.put(messageSequence, message);
}
connection.sendMessage(mapper.writeValueAsString(new WebsocketMessage(messageSequence, message)));
} catch (IOException e) {
logger.debug("Response failed", e);
handleClose();
}
}
private void handleMessageAck(String message) {
try {
AcknowledgeWebsocketMessage ack = mapper.readValue(message, AcknowledgeWebsocketMessage.class);
synchronized (pendingMessages) {
pendingMessages.remove(ack.getId());
}
} catch (IOException e) {
logger.warn("Mapping", e);
}
}
private void handlePing() {
try {
IncomingWebsocketMessage pongMessage = new IncomingWebsocketMessage(IncomingWebsocketMessage.TYPE_PONG_MESSAGE);
connection.sendMessage(mapper.writeValueAsString(pongMessage));
} catch (IOException e) {
logger.warn("Pong failed", e);
handleClose();
}
}
private void handleClose() {
public void onWebSocketClose(int i, String s) {
pubSubManager.unsubscribe(new WebsocketAddress(account.getId(), device.getId()), this);
connection.close();
List<String> remainingMessages = new LinkedList<>();
@@ -144,6 +132,50 @@ public class WebsocketController implements WebSocket.OnTextMessage, PubSubListe
storedMessageManager.storeMessages(account, device, remainingMessages);
}
@Override
public void onPubSubMessage(PubSubMessage outgoingMessage) {
switch (outgoingMessage.getType()) {
case PubSubMessage.TYPE_DELIVER:
handleDeliverOutgoingMessage(outgoingMessage.getContents());
break;
case PubSubMessage.TYPE_QUERY_DB:
handleQueryDatabase();
break;
default:
logger.warn("Unknown pubsub message: " + outgoingMessage.getType());
}
}
private void handleDeliverOutgoingMessage(String message) {
try {
long messageSequence;
synchronized (pendingMessages) {
messageSequence = pendingMessageSequence++;
pendingMessages.put(messageSequence, message);
}
WebsocketMessage websocketMessage = new WebsocketMessage(messageSequence, message);
session.getRemote().sendStringByFuture(mapper.writeValueAsString(websocketMessage));
} catch (IOException e) {
logger.debug("Response failed", e);
close(null);
}
}
private void handleMessageAck(String message) {
try {
AcknowledgeWebsocketMessage ack = mapper.readValue(message, AcknowledgeWebsocketMessage.class);
synchronized (pendingMessages) {
pendingMessages.remove(ack.getId());
}
} catch (IOException e) {
logger.warn("Mapping", e);
}
}
private void handleQueryDatabase() {
List<String> messages = storedMessageManager.getOutgoingMessages(account, device);
@@ -152,4 +184,25 @@ public class WebsocketController implements WebSocket.OnTextMessage, PubSubListe
}
}
@Override
public void onWebSocketBinary(byte[] bytes, int i, int i2) {
logger.info("Received binary message!");
}
@Override
public void onWebSocketError(Throwable throwable) {
logger.info("onWebSocketError", throwable);
}
private void close(CloseStatus closeStatus) {
try {
if (this.session != null) {
if (closeStatus != null) this.session.close(closeStatus);
else this.session.close();
}
} catch (IOException e) {
logger.info("close()", e);
}
}
}