Retire the (unused!) binary message format

This commit is contained in:
Jon Chambers
2022-07-27 11:22:20 -04:00
committed by Jon Chambers
parent aa36dc95ef
commit e9119da040
9 changed files with 144 additions and 829 deletions

View File

@@ -138,7 +138,6 @@ import org.whispersystems.textsecuregcm.metrics.OperatingSystemMemoryGauge;
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
import org.whispersystems.textsecuregcm.metrics.ReportedMessageMetricsListener;
import org.whispersystems.textsecuregcm.metrics.TrafficSource;
import org.whispersystems.textsecuregcm.providers.MultiDeviceMessageListProvider;
import org.whispersystems.textsecuregcm.providers.MultiRecipientMessageProvider;
import org.whispersystems.textsecuregcm.providers.RedisClientFactory;
import org.whispersystems.textsecuregcm.providers.RedisClusterHealthCheck;
@@ -591,7 +590,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
.addMappingForUrlPatterns(EnumSet.of(DispatcherType.REQUEST), false, "/*");
environment.jersey().register(new ContentLengthFilter(TrafficSource.HTTP));
environment.jersey().register(MultiDeviceMessageListProvider.class);
environment.jersey().register(MultiRecipientMessageProvider.class);
environment.jersey().register(new MetricsApplicationEventListener(TrafficSource.HTTP));
environment.jersey()
@@ -613,7 +611,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
clientPresenceManager, websocketScheduledExecutor));
webSocketEnvironment.jersey().register(new WebsocketRefreshApplicationEventListener(accountsManager, clientPresenceManager));
webSocketEnvironment.jersey().register(new ContentLengthFilter(TrafficSource.WEBSOCKET));
webSocketEnvironment.jersey().register(MultiDeviceMessageListProvider.class);
webSocketEnvironment.jersey().register(MultiRecipientMessageProvider.class);
webSocketEnvironment.jersey().register(new MetricsApplicationEventListener(TrafficSource.WEBSOCKET));
webSocketEnvironment.jersey().register(new KeepAliveController(clientPresenceManager));

View File

@@ -61,7 +61,6 @@ import org.whispersystems.textsecuregcm.auth.CombinedUnidentifiedSenderAccessKey
import org.whispersystems.textsecuregcm.auth.OptionalAccess;
import org.whispersystems.textsecuregcm.entities.AccountMismatchedDevices;
import org.whispersystems.textsecuregcm.entities.AccountStaleDevices;
import org.whispersystems.textsecuregcm.entities.IncomingDeviceMessage;
import org.whispersystems.textsecuregcm.entities.IncomingMessage;
import org.whispersystems.textsecuregcm.entities.IncomingMessageList;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
@@ -77,7 +76,6 @@ import org.whispersystems.textsecuregcm.entities.StaleDevices;
import org.whispersystems.textsecuregcm.limits.RateLimitChallengeException;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
import org.whispersystems.textsecuregcm.providers.MultiDeviceMessageListProvider;
import org.whispersystems.textsecuregcm.providers.MultiRecipientMessageProvider;
import org.whispersystems.textsecuregcm.push.ApnFallbackManager;
import org.whispersystems.textsecuregcm.push.MessageSender;
@@ -264,113 +262,6 @@ public class MessageController {
}
}
@Timed
@Path("/{destination}")
@PUT
@Consumes(MultiDeviceMessageListProvider.MEDIA_TYPE)
@Produces(MediaType.APPLICATION_JSON)
@FilterAbusiveMessages
public Response sendMultiDeviceMessage(@Auth Optional<AuthenticatedAccount> source,
@HeaderParam(OptionalAccess.UNIDENTIFIED) Optional<Anonymous> accessKey,
@HeaderParam("User-Agent") String userAgent,
@HeaderParam("X-Forwarded-For") String forwardedFor,
@PathParam("destination") UUID destinationUuid,
@QueryParam("online") boolean online,
@QueryParam("ts") long timestamp,
@NotNull @Valid IncomingDeviceMessage[] messages)
throws RateLimitExceededException {
if (source.isEmpty() && accessKey.isEmpty()) {
throw new WebApplicationException(Response.Status.UNAUTHORIZED);
}
final String senderType;
if (source.isPresent()) {
if (source.get().getAccount().isIdentifiedBy(destinationUuid)) {
senderType = SENDER_TYPE_SELF;
} else {
senderType = SENDER_TYPE_IDENTIFIED;
}
} else {
senderType = SENDER_TYPE_UNIDENTIFIED;
}
for (final IncomingDeviceMessage message : messages) {
validateContentLength(message.getContent().length, userAgent);
validateEnvelopeType(message.getType(), userAgent);
}
try {
boolean isSyncMessage = source.isPresent() && source.get().getAccount().isIdentifiedBy(destinationUuid);
Optional<Account> destination;
if (!isSyncMessage) {
destination = accountsManager.getByAccountIdentifier(destinationUuid)
.or(() -> accountsManager.getByPhoneNumberIdentifier(destinationUuid));
} else {
destination = source.map(AuthenticatedAccount::getAccount);
}
OptionalAccess.verify(source.map(AuthenticatedAccount::getAccount), accessKey, destination);
assert (destination.isPresent());
if (source.isPresent() && !isSyncMessage) {
checkRateLimit(source.get(), destination.get(), userAgent);
}
final Set<Long> excludedDeviceIds;
if (isSyncMessage) {
excludedDeviceIds = Set.of(source.get().getAuthenticatedDevice().getId());
} else {
excludedDeviceIds = Collections.emptySet();
}
DestinationDeviceValidator.validateCompleteDeviceList(
destination.get(),
Arrays.stream(messages).map(IncomingDeviceMessage::getDeviceId).collect(Collectors.toSet()),
excludedDeviceIds);
DestinationDeviceValidator.validateRegistrationIds(
destination.get(),
Arrays.stream(messages).toList(),
IncomingDeviceMessage::getDeviceId,
IncomingDeviceMessage::getRegistrationId,
destination.get().getPhoneNumberIdentifier().equals(destinationUuid));
final List<Tag> tags = List.of(UserAgentTagUtil.getPlatformTag(userAgent),
Tag.of(EPHEMERAL_TAG_NAME, String.valueOf(online)),
Tag.of(SENDER_TYPE_TAG_NAME, senderType));
for (final IncomingDeviceMessage message : messages) {
Optional<Device> destinationDevice = destination.get().getDevice(message.getDeviceId());
if (destinationDevice.isPresent()) {
Metrics.counter(SENT_MESSAGE_COUNTER_NAME, tags).increment();
sendMessage(source, destination.get(), destinationDevice.get(), destinationUuid, timestamp, online, message);
}
}
return Response.ok(new SendMessageResponse(
!isSyncMessage && source.isPresent() && source.get().getAccount().getEnabledDeviceCount() > 1)).build();
} catch (NoSuchUserException e) {
throw new WebApplicationException(Response.status(404).build());
} catch (MismatchedDevicesException e) {
throw new WebApplicationException(Response.status(409)
.type(MediaType.APPLICATION_JSON_TYPE)
.entity(new MismatchedDevices(e.getMissingDevices(),
e.getExtraDevices()))
.build());
} catch (StaleDevicesException e) {
throw new WebApplicationException(Response.status(410)
.type(MediaType.APPLICATION_JSON)
.entity(new StaleDevices(e.getStaleDevices()))
.build());
}
}
@Timed
@Path("/multi_recipient")
@PUT
@@ -658,34 +549,6 @@ public class MessageController {
}
}
private void sendMessage(Optional<AuthenticatedAccount> source, Account destinationAccount, Device destinationDevice,
UUID destinationUuid, long timestamp, boolean online, IncomingDeviceMessage message) throws NoSuchUserException {
try {
Envelope.Builder messageBuilder = Envelope.newBuilder();
long serverTimestamp = System.currentTimeMillis();
messageBuilder
.setType(Envelope.Type.forNumber(message.getType()))
.setTimestamp(timestamp == 0 ? serverTimestamp : timestamp)
.setServerTimestamp(serverTimestamp)
.setDestinationUuid(destinationUuid.toString())
.setContent(ByteString.copyFrom(message.getContent()));
source.ifPresent(authenticatedAccount ->
messageBuilder.setSource(authenticatedAccount.getAccount().getNumber())
.setSourceUuid(authenticatedAccount.getAccount().getUuid().toString())
.setSourceDevice((int) authenticatedAccount.getAuthenticatedDevice().getId()));
messageSender.sendMessage(destinationAccount, destinationDevice, messageBuilder.build(), online);
} catch (NotPushRegisteredException e) {
if (destinationDevice.isMaster()) {
throw new NoSuchUserException(e);
} else {
logger.debug("Not registered", e);
}
}
}
private void sendMessage(Account destinationAccount,
Device destinationDevice,
long timestamp,

View File

@@ -1,47 +0,0 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.entities;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
public class IncomingDeviceMessage {
private final int type;
@Min(1)
private final long deviceId;
@Min(0)
@Max(65536)
private final int registrationId;
@NotNull
private final byte[] content;
public IncomingDeviceMessage(int type, long deviceId, int registrationId, byte[] content) {
this.type = type;
this.deviceId = deviceId;
this.registrationId = registrationId;
this.content = content;
}
public int getType() {
return type;
}
public long getDeviceId() {
return deviceId;
}
public int getRegistrationId() {
return registrationId;
}
public byte[] getContent() {
return content;
}
}

View File

@@ -1,90 +0,0 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.providers;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InputStream;
import java.util.UUID;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.WebApplicationException;
public abstract class BinaryProviderBase {
/**
* Reads a UUID in network byte order and converts to a UUID object.
*/
UUID readUuid(InputStream stream) throws IOException {
byte[] buffer = new byte[8];
int read = stream.readNBytes(buffer, 0, 8);
if (read != 8) {
throw new IOException("Insufficient bytes for UUID");
}
long msb = convertNetworkByteOrderToLong(buffer);
read = stream.readNBytes(buffer, 0, 8);
if (read != 8) {
throw new IOException("Insufficient bytes for UUID");
}
long lsb = convertNetworkByteOrderToLong(buffer);
return new UUID(msb, lsb);
}
private long convertNetworkByteOrderToLong(byte[] buffer) {
long result = 0;
for (int i = 0; i < 8; i++) {
result = (result << 8) | (buffer[i] & 0xFFL);
}
return result;
}
/**
* Reads a varint. A varint larger than 64 bits is rejected with a {@code WebApplicationException}. An
* {@code IOException} is thrown if the stream ends before we finish reading the varint.
*
* @return the varint value
*/
static long readVarint(InputStream stream) throws IOException, WebApplicationException {
boolean hasMore = true;
int currentOffset = 0;
long result = 0;
while (hasMore) {
if (currentOffset >= 64) {
throw new BadRequestException("varint is too large");
}
int b = stream.read();
if (b == -1) {
throw new IOException("Missing byte " + (currentOffset / 7) + " of varint");
}
if (currentOffset == 63 && (b & 0xFE) != 0) {
throw new BadRequestException("varint is too large");
}
hasMore = (b & 0x80) != 0;
result |= ((long)(b & 0x7F)) << currentOffset;
currentOffset += 7;
}
return result;
}
/**
* Reads two bytes with most significant byte first. Treats the value as unsigned so the range returned is
* {@code [0, 65535]}.
*/
@VisibleForTesting
static int readU16(InputStream stream) throws IOException {
int b1 = stream.read();
if (b1 == -1) {
throw new IOException("Missing byte 1 of U16");
}
int b2 = stream.read();
if (b2 == -1) {
throw new IOException("Missing byte 2 of U16");
}
return (b1 << 8) | b2;
}
}

View File

@@ -1,81 +0,0 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.providers;
import io.dropwizard.util.DataSizeUnit;
import java.io.IOException;
import java.io.InputStream;
import java.lang.annotation.Annotation;
import java.lang.reflect.Type;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.Consumes;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.NoContentException;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.ext.MessageBodyReader;
import javax.ws.rs.ext.Provider;
import org.whispersystems.textsecuregcm.entities.IncomingDeviceMessage;
@Provider
@Consumes(MultiDeviceMessageListProvider.MEDIA_TYPE)
public class MultiDeviceMessageListProvider extends BinaryProviderBase implements MessageBodyReader<IncomingDeviceMessage[]> {
public static final String MEDIA_TYPE = "application/vnd.signal-messenger.mdml";
public static final int MAX_MESSAGE_COUNT = 50;
public static final int MAX_MESSAGE_SIZE = Math.toIntExact(DataSizeUnit.KIBIBYTES.toBytes(256));
public static final byte VERSION = 0x01;
@Override
public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
return MEDIA_TYPE.equals(mediaType.toString()) && IncomingDeviceMessage[].class.isAssignableFrom(type);
}
@Override
public IncomingDeviceMessage[]
readFrom(Class<IncomingDeviceMessage[]> resultType, Type genericType,
Annotation[] annotations, MediaType mediaType, MultivaluedMap<String, String> httpHeaders,
InputStream entityStream)
throws IOException, WebApplicationException {
int versionByte = entityStream.read();
if (versionByte == -1) {
throw new NoContentException("Empty body not allowed");
}
if (versionByte != VERSION) {
throw new BadRequestException("Unsupported version");
}
int count = entityStream.read();
if (count == -1) {
throw new IOException("Missing count");
}
if (count > MAX_MESSAGE_COUNT) {
throw new BadRequestException("Maximum recipient count exceeded");
}
IncomingDeviceMessage[] messages = new IncomingDeviceMessage[count];
for (int i = 0; i < count; i++) {
long deviceId = readVarint(entityStream);
int registrationId = readU16(entityStream);
int type = entityStream.read();
if (type == -1) {
throw new IOException("Unexpected end of stream reading message type");
}
long messageLength = readVarint(entityStream);
if (messageLength > MAX_MESSAGE_SIZE) {
throw new WebApplicationException("Message body too large", Status.REQUEST_ENTITY_TOO_LARGE);
}
byte[] contents = entityStream.readNBytes(Math.toIntExact(messageLength));
if (contents.length != messageLength) {
throw new IOException("Unexpected end of stream in the middle of message contents");
}
messages[i] = new IncomingDeviceMessage(type, deviceId, registrationId, contents);
}
return messages;
}
}

View File

@@ -5,6 +5,7 @@
package org.whispersystems.textsecuregcm.providers;
import com.google.common.annotations.VisibleForTesting;
import io.dropwizard.util.DataSizeUnit;
import java.io.IOException;
import java.io.InputStream;
@@ -23,7 +24,7 @@ import org.whispersystems.textsecuregcm.entities.MultiRecipientMessage;
@Provider
@Consumes(MultiRecipientMessageProvider.MEDIA_TYPE)
public class MultiRecipientMessageProvider extends BinaryProviderBase implements MessageBodyReader<MultiRecipientMessage> {
public class MultiRecipientMessageProvider implements MessageBodyReader<MultiRecipientMessage> {
public static final String MEDIA_TYPE = "application/vnd.signal-messenger.mrm";
public static final int MAX_RECIPIENT_COUNT = 5000;
@@ -70,4 +71,78 @@ public class MultiRecipientMessageProvider extends BinaryProviderBase implements
}
return new MultiRecipientMessage(recipients, commonPayload);
}
/**
* Reads a UUID in network byte order and converts to a UUID object.
*/
private UUID readUuid(InputStream stream) throws IOException {
byte[] buffer = new byte[8];
int read = stream.readNBytes(buffer, 0, 8);
if (read != 8) {
throw new IOException("Insufficient bytes for UUID");
}
long msb = convertNetworkByteOrderToLong(buffer);
read = stream.readNBytes(buffer, 0, 8);
if (read != 8) {
throw new IOException("Insufficient bytes for UUID");
}
long lsb = convertNetworkByteOrderToLong(buffer);
return new UUID(msb, lsb);
}
private long convertNetworkByteOrderToLong(byte[] buffer) {
long result = 0;
for (int i = 0; i < 8; i++) {
result = (result << 8) | (buffer[i] & 0xFFL);
}
return result;
}
/**
* Reads a varint. A varint larger than 64 bits is rejected with a {@code WebApplicationException}. An
* {@code IOException} is thrown if the stream ends before we finish reading the varint.
*
* @return the varint value
*/
private long readVarint(InputStream stream) throws IOException, WebApplicationException {
boolean hasMore = true;
int currentOffset = 0;
int result = 0;
while (hasMore) {
if (currentOffset >= 64) {
throw new BadRequestException("varint is too large");
}
int b = stream.read();
if (b == -1) {
throw new IOException("Missing byte " + (currentOffset / 7) + " of varint");
}
if (currentOffset == 63 && (b & 0xFE) != 0) {
throw new BadRequestException("varint is too large");
}
hasMore = (b & 0x80) != 0;
result |= (b & 0x7F) << currentOffset;
currentOffset += 7;
}
return result;
}
/**
* Reads two bytes with most significant byte first. Treats the value as unsigned so the range returned is
* {@code [0, 65535]}.
*/
@VisibleForTesting
static int readU16(InputStream stream) throws IOException {
int b1 = stream.read();
if (b1 == -1) {
throw new IOException("Missing byte 1 of U16");
}
int b2 = stream.read();
if (b2 == -1) {
throw new IOException("Missing byte 2 of U16");
}
return (b1 << 8) | b2;
}
}