mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-26 00:18:03 +01:00
Split Account into Device and Account definitions.
This commit is contained in:
@@ -41,6 +41,8 @@ import org.whispersystems.textsecuregcm.federation.FederatedClientManager;
|
||||
import org.whispersystems.textsecuregcm.federation.NoSuchPeerException;
|
||||
import org.whispersystems.textsecuregcm.limits.RateLimiters;
|
||||
import org.whispersystems.textsecuregcm.push.PushSender;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.util.Base64;
|
||||
import org.whispersystems.textsecuregcm.util.IterablePair;
|
||||
@@ -52,10 +54,12 @@ import javax.servlet.AsyncContext;
|
||||
import javax.servlet.http.HttpServlet;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import javax.ws.rs.Path;
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -79,20 +83,34 @@ public class MessageController extends HttpServlet {
|
||||
private final FederatedClientManager federatedClientManager;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final ExecutorService executor;
|
||||
private final AccountsManager accountsManager;
|
||||
|
||||
public MessageController(RateLimiters rateLimiters,
|
||||
DeviceAuthenticator deviceAuthenticator,
|
||||
PushSender pushSender,
|
||||
AccountsManager accountsManager,
|
||||
FederatedClientManager federatedClientManager)
|
||||
{
|
||||
this.rateLimiters = rateLimiters;
|
||||
this.deviceAuthenticator = deviceAuthenticator;
|
||||
this.pushSender = pushSender;
|
||||
this.accountsManager = accountsManager;
|
||||
this.federatedClientManager = federatedClientManager;
|
||||
this.objectMapper = new ObjectMapper();
|
||||
this.executor = Executors.newFixedThreadPool(10);
|
||||
}
|
||||
|
||||
class LocalOrRemoteDevice {
|
||||
Device device;
|
||||
String relay, number; long deviceId;
|
||||
LocalOrRemoteDevice(Device device) {
|
||||
this.device = device; this.number = device.getNumber(); this.deviceId = device.getDeviceId();
|
||||
}
|
||||
LocalOrRemoteDevice(String relay, String number, long deviceId) {
|
||||
this.relay = relay; this.number = number; this.deviceId = deviceId;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doPost(HttpServletRequest req, HttpServletResponse resp) {
|
||||
TimerContext timerContext = timer.time();
|
||||
@@ -103,20 +121,12 @@ public class MessageController extends HttpServlet {
|
||||
|
||||
rateLimiters.getMessagesLimiter().validate(sender.getNumber());
|
||||
|
||||
|
||||
Map<Pair<String, Long>, Device> deviceCache = new HashMap<>();
|
||||
List<String> numbersMissingDevices = new LinkedList<>();
|
||||
|
||||
List<IncomingMessage> incomingMessages = messages.getMessages();
|
||||
List<OutgoingMessageSignal> outgoingMessages = getOutgoingMessageSignals(sender.getNumber(),
|
||||
incomingMessages,
|
||||
deviceCache,
|
||||
numbersMissingDevices);
|
||||
List<Pair<LocalOrRemoteDevice, OutgoingMessageSignal>> outgoingMessages =
|
||||
getOutgoingMessageSignals(sender.getNumber(), messages.getMessages(), numbersMissingDevices);
|
||||
|
||||
IterablePair<IncomingMessage, OutgoingMessageSignal> listPair = new IterablePair<>(incomingMessages,
|
||||
outgoingMessages);
|
||||
|
||||
handleAsyncDelivery(timerContext, req.startAsync(), listPair, deviceCache, numbersMissingDevices);
|
||||
handleAsyncDelivery(timerContext, req.startAsync(), outgoingMessages, numbersMissingDevices);
|
||||
} catch (AuthenticationException e) {
|
||||
failureMeter.mark();
|
||||
timerContext.stop();
|
||||
@@ -139,8 +149,7 @@ public class MessageController extends HttpServlet {
|
||||
|
||||
private void handleAsyncDelivery(final TimerContext timerContext,
|
||||
final AsyncContext context,
|
||||
final IterablePair<IncomingMessage, OutgoingMessageSignal> listPair,
|
||||
final Map<Pair<String, Long>, Device> deviceCache,
|
||||
final List<Pair<LocalOrRemoteDevice, OutgoingMessageSignal>> listPair,
|
||||
final List<String> numbersMissingDevices)
|
||||
{
|
||||
executor.submit(new Runnable() {
|
||||
@@ -151,38 +160,37 @@ public class MessageController extends HttpServlet {
|
||||
HttpServletResponse response = (HttpServletResponse) context.getResponse();
|
||||
|
||||
try {
|
||||
Map<String, Set<Pair<IncomingMessage, OutgoingMessageSignal>>> relayMessages = new HashMap<>();
|
||||
for (Pair<IncomingMessage, OutgoingMessageSignal> messagePair : listPair) {
|
||||
String destination = messagePair.first().getDestination();
|
||||
long destinationDeviceId = messagePair.first().getDestinationDeviceId();
|
||||
String relay = messagePair.first().getRelay();
|
||||
Map<String, Set<Pair<LocalOrRemoteDevice, OutgoingMessageSignal>>> relayMessages = new HashMap<>();
|
||||
for (Pair<LocalOrRemoteDevice, OutgoingMessageSignal> messagePair : listPair) {
|
||||
String relay = messagePair.first().relay;
|
||||
|
||||
if (Util.isEmpty(relay)) {
|
||||
String encodedId = messagePair.first().device.getBackwardsCompatibleNumberEncoding();
|
||||
try {
|
||||
pushSender.sendMessage(deviceCache.get(new Pair<>(destination, destinationDeviceId)), messagePair.second());
|
||||
pushSender.sendMessage(messagePair.first().device, messagePair.second());
|
||||
success.add(encodedId);
|
||||
} catch (NoSuchUserException e) {
|
||||
logger.debug("No such user", e);
|
||||
failure.add(destination);
|
||||
failure.add(encodedId);
|
||||
}
|
||||
} else {
|
||||
Set<Pair<IncomingMessage, OutgoingMessageSignal>> messageSet = relayMessages.get(relay);
|
||||
Set<Pair<LocalOrRemoteDevice, OutgoingMessageSignal>> messageSet = relayMessages.get(relay);
|
||||
if (messageSet == null) {
|
||||
messageSet = new HashSet<>();
|
||||
relayMessages.put(relay, messageSet);
|
||||
}
|
||||
messageSet.add(messagePair);
|
||||
}
|
||||
success.add(destination);
|
||||
}
|
||||
|
||||
for (Map.Entry<String, Set<Pair<IncomingMessage, OutgoingMessageSignal>>> messagesForRelay : relayMessages.entrySet()) {
|
||||
for (Map.Entry<String, Set<Pair<LocalOrRemoteDevice, OutgoingMessageSignal>>> messagesForRelay : relayMessages.entrySet()) {
|
||||
try {
|
||||
FederatedClient client = federatedClientManager.getClient(messagesForRelay.getKey());
|
||||
|
||||
List<RelayMessage> messages = new LinkedList<>();
|
||||
for (Pair<IncomingMessage, OutgoingMessageSignal> message : messagesForRelay.getValue()) {
|
||||
messages.add(new RelayMessage(message.first().getDestination(),
|
||||
message.first().getDestinationDeviceId(),
|
||||
for (Pair<LocalOrRemoteDevice, OutgoingMessageSignal> message : messagesForRelay.getValue()) {
|
||||
messages.add(new RelayMessage(message.first().number,
|
||||
message.first().deviceId,
|
||||
message.second().toByteArray()));
|
||||
}
|
||||
|
||||
@@ -195,8 +203,8 @@ public class MessageController extends HttpServlet {
|
||||
numbersMissingDevices.add(string);
|
||||
} catch (NoSuchPeerException e) {
|
||||
logger.info("No such peer", e);
|
||||
for (Pair<IncomingMessage, OutgoingMessageSignal> messagePair : messagesForRelay.getValue())
|
||||
failure.add(messagePair.first().getDestination());
|
||||
for (Pair<LocalOrRemoteDevice, OutgoingMessageSignal> messagePair : messagesForRelay.getValue())
|
||||
failure.add(messagePair.first().number);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -210,6 +218,11 @@ public class MessageController extends HttpServlet {
|
||||
failureMeter.mark();
|
||||
response.setStatus(501);
|
||||
context.complete();
|
||||
} catch (Exception e) {
|
||||
logger.error("Unknown error sending message", e);
|
||||
failureMeter.mark();
|
||||
response.setStatus(500);
|
||||
context.complete();
|
||||
}
|
||||
|
||||
timerContext.stop();
|
||||
@@ -217,28 +230,32 @@ public class MessageController extends HttpServlet {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @param deviceCache is a map from Pair<number, deviceId> to the account
|
||||
*/
|
||||
@Nullable
|
||||
private List<OutgoingMessageSignal> getOutgoingMessageSignals(String sourceNumber,
|
||||
List<IncomingMessage> incomingMessages,
|
||||
Map<Pair<String, Long>, Device> deviceCache,
|
||||
List<String> numbersMissingDevices)
|
||||
private List<Pair<LocalOrRemoteDevice, OutgoingMessageSignal>> getOutgoingMessageSignals(String sourceNumber,
|
||||
List<IncomingMessage> incomingMessages,
|
||||
List<String> numbersMissingDevices)
|
||||
{
|
||||
List<OutgoingMessageSignal> outgoingMessages = new LinkedList<>();
|
||||
// # local deviceIds
|
||||
Map<String, Pair<Boolean, Set<Long>>> destinations = new HashMap<>();
|
||||
List<Pair<LocalOrRemoteDevice, OutgoingMessageSignal>> outgoingMessages = new LinkedList<>();
|
||||
Map<String, Set<Long>> localDestinations = new HashMap<>();
|
||||
Set<String> destinationNumbers = new HashSet<>();
|
||||
for (IncomingMessage incoming : incomingMessages) {
|
||||
Pair<Boolean, Set<Long>> deviceIds = destinations.get(incoming.getDestination());
|
||||
destinationNumbers.add(incoming.getDestination());
|
||||
if (!Util.isEmpty(incoming.getRelay()))
|
||||
continue;
|
||||
|
||||
Set<Long> deviceIds = localDestinations.get(incoming.getDestination());
|
||||
if (deviceIds == null) {
|
||||
deviceIds = new Pair<Boolean, Set<Long>>(Util.isEmpty(incoming.getRelay()), new HashSet<Long>());
|
||||
destinations.put(incoming.getDestination(), deviceIds);
|
||||
deviceIds = new HashSet<>();
|
||||
localDestinations.put(incoming.getDestination(), deviceIds);
|
||||
}
|
||||
deviceIds.second().add(incoming.getDestinationDeviceId());
|
||||
deviceIds.add(incoming.getDestinationDeviceId());
|
||||
}
|
||||
|
||||
pushSender.fillLocalAccountsCache(destinations, deviceCache, numbersMissingDevices);
|
||||
Pair<Map<String, Account>, List<String>> accountsForDevices = accountsManager.getAccountsForDevices(localDestinations);
|
||||
|
||||
Map<String, Account> localAccounts = accountsForDevices.first();
|
||||
for (String number : accountsForDevices.second())
|
||||
numbersMissingDevices.add(number);
|
||||
|
||||
for (IncomingMessage incoming : incomingMessages) {
|
||||
OutgoingMessageSignal.Builder outgoingMessage = OutgoingMessageSignal.newBuilder();
|
||||
@@ -255,13 +272,22 @@ public class MessageController extends HttpServlet {
|
||||
|
||||
int index = 0;
|
||||
|
||||
for (String destination : destinations.keySet()) {
|
||||
if (!destination.equals(incoming.getDestination())) {
|
||||
for (String destination : destinationNumbers) {
|
||||
if (!destination.equals(incoming.getDestination()))
|
||||
outgoingMessage.setDestinations(index++, destination);
|
||||
}
|
||||
}
|
||||
|
||||
outgoingMessages.add(outgoingMessage.build());
|
||||
LocalOrRemoteDevice device = null;
|
||||
if (!Util.isEmpty(incoming.getRelay()))
|
||||
device = new LocalOrRemoteDevice(incoming.getRelay(), incoming.getDestination(), incoming.getDestinationDeviceId());
|
||||
else {
|
||||
Account destination = localAccounts.get(incoming.getDestination());
|
||||
if (destination != null)
|
||||
device = new LocalOrRemoteDevice(destination.getDevice(incoming.getDestinationDeviceId()));
|
||||
}
|
||||
|
||||
if (device != null)
|
||||
outgoingMessages.add(new Pair<>(device, outgoingMessage.build()));
|
||||
}
|
||||
|
||||
return outgoingMessages;
|
||||
|
||||
Reference in New Issue
Block a user