Add plumbing to roll out binary service IDs/UUIDs on envelopes to internal users

This commit is contained in:
Jon Chambers
2025-08-21 17:53:41 -04:00
committed by GitHub
parent 78a7112675
commit 7f5ea6608c
13 changed files with 120 additions and 49 deletions

View File

@@ -27,7 +27,6 @@ import io.lettuce.core.metrics.MicrometerCommandLatencyRecorder;
import io.lettuce.core.metrics.MicrometerOptions;
import io.lettuce.core.resource.ClientResources;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.binder.grpc.MetricCollectingServerInterceptor;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.socket.nio.NioDatagramChannel;
@@ -40,7 +39,6 @@ import jakarta.servlet.Filter;
import jakarta.servlet.ServletRegistration;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.net.URI;
import java.net.http.HttpClient;
import java.nio.charset.StandardCharsets;
import java.security.KeyStore;
@@ -460,7 +458,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(dynamoDbClient, dynamoDbAsyncClient,
config.getDynamoDbTables().getMessages().getTableName(),
config.getDynamoDbTables().getMessages().getExpiration(),
messageDeletionAsyncExecutor);
messageDeletionAsyncExecutor, experimentEnrollmentManager);
RemoteConfigs remoteConfigs = new RemoteConfigs(dynamoDbClient,
config.getDynamoDbTables().getRemoteConfig().getTableName());
PushChallengeDynamoDb pushChallengeDynamoDb = new PushChallengeDynamoDb(dynamoDbClient,
@@ -641,7 +639,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
DisconnectionRequestManager disconnectionRequestManager = new DisconnectionRequestManager(pubsubClient, grpcClientConnectionManager, disconnectionRequestListenerExecutor);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster, asyncCdnS3Client, config.getCdnConfiguration().bucket());
MessagesCache messagesCache = new MessagesCache(messagesCluster, messageDeliveryScheduler,
messageDeletionAsyncExecutor, clock);
messageDeletionAsyncExecutor, clock, experimentEnrollmentManager);
ClientReleaseManager clientReleaseManager = new ClientReleaseManager(clientReleases,
recurringJobExecutor,
config.getClientReleaseConfiguration().refreshInterval(),

View File

@@ -6,7 +6,9 @@
package org.whispersystems.textsecuregcm.storage;
import java.util.UUID;
import com.google.common.annotations.VisibleForTesting;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.grpc.ServiceIdentifierUtil;
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
import org.whispersystems.textsecuregcm.util.UUIDUtil;
@@ -19,6 +21,9 @@ import org.whispersystems.textsecuregcm.util.UUIDUtil;
*/
public class EnvelopeUtil {
@VisibleForTesting
static final String INCLUDE_BINARY_SERVICE_ID_EXPERIMENT_NAME = "envelopeIncludeBinaryServiceIdentifier";
/**
* Converts all "compressible" UUID-like fields in the given envelope to more compact binary representations.
*
@@ -68,7 +73,22 @@ public class EnvelopeUtil {
*
* @return an envelope with binary representations of UUID-like fields expanded to string representations
*/
public static MessageProtos.Envelope expand(final MessageProtos.Envelope envelope) {
public static MessageProtos.Envelope expand(final MessageProtos.Envelope envelope,
final ExperimentEnrollmentManager experimentEnrollmentManager) {
final boolean includeBinaryServiceIdentifiers;
if (envelope.hasDestinationServiceIdBinary() || envelope.hasDestinationServiceId()) {
final ServiceIdentifier destinationIdentifier = envelope.hasDestinationServiceIdBinary()
? ServiceIdentifier.fromBytes(envelope.getDestinationServiceIdBinary().toByteArray())
: ServiceIdentifier.valueOf(envelope.getDestinationServiceId());
includeBinaryServiceIdentifiers =
experimentEnrollmentManager.isEnrolled(destinationIdentifier.uuid(), INCLUDE_BINARY_SERVICE_ID_EXPERIMENT_NAME);
} else {
includeBinaryServiceIdentifiers = false;
}
final MessageProtos.Envelope.Builder builder = envelope.toBuilder();
if (builder.hasSourceServiceIdBinary()) {
@@ -76,7 +96,10 @@ public class EnvelopeUtil {
ServiceIdentifierUtil.fromByteString(builder.getSourceServiceIdBinary());
builder.setSourceServiceId(sourceServiceId.toServiceIdentifierString());
builder.clearSourceServiceIdBinary();
if (!includeBinaryServiceIdentifiers) {
builder.clearSourceServiceIdBinary();
}
}
if (builder.hasDestinationServiceIdBinary()) {
@@ -84,14 +107,20 @@ public class EnvelopeUtil {
ServiceIdentifierUtil.fromByteString(builder.getDestinationServiceIdBinary());
builder.setDestinationServiceId(destinationServiceId.toServiceIdentifierString());
builder.clearDestinationServiceIdBinary();
if (!includeBinaryServiceIdentifiers) {
builder.clearDestinationServiceIdBinary();
}
}
if (builder.hasServerGuidBinary()) {
final UUID serverGuid = UUIDUtil.fromByteString(builder.getServerGuidBinary());
builder.setServerGuid(serverGuid.toString());
builder.clearServerGuidBinary();
if (!includeBinaryServiceIdentifiers) {
builder.clearServerGuidBinary();
}
}
if (builder.hasUpdatedPniBinary()) {

View File

@@ -42,6 +42,7 @@ import org.signal.libsignal.protocol.ServiceId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
@@ -118,6 +119,7 @@ public class MessagesCache {
private final ExecutorService messageDeletionExecutorService;
// messageDeletionExecutorService wrapped into a reactor Scheduler
private final Scheduler messageDeletionScheduler;
private final ExperimentEnrollmentManager experimentEnrollmentManager;
private final MessagesCacheInsertScript insertScript;
private final MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript insertMrmScript;
@@ -167,7 +169,8 @@ public class MessagesCache {
public MessagesCache(final FaultTolerantRedisClusterClient redisCluster,
final Scheduler messageDeliveryScheduler,
final ExecutorService messageDeletionExecutorService,
final Clock clock)
final Clock clock,
final ExperimentEnrollmentManager experimentEnrollmentManager)
throws IOException {
this(
@@ -175,6 +178,7 @@ public class MessagesCache {
messageDeliveryScheduler,
messageDeletionExecutorService,
clock,
experimentEnrollmentManager,
new MessagesCacheInsertScript(redisCluster),
new MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript(redisCluster),
new MessagesCacheGetItemsScript(redisCluster),
@@ -190,6 +194,7 @@ public class MessagesCache {
MessagesCache(final FaultTolerantRedisClusterClient redisCluster,
final Scheduler messageDeliveryScheduler,
final ExecutorService messageDeletionExecutorService, final Clock clock,
final ExperimentEnrollmentManager experimentEnrollmentManager,
final MessagesCacheInsertScript insertScript,
final MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript insertMrmScript,
final MessagesCacheGetItemsScript getItemsScript, final MessagesCacheRemoveByGuidScript removeByGuidScript,
@@ -204,6 +209,7 @@ public class MessagesCache {
this.messageDeliveryScheduler = messageDeliveryScheduler;
this.messageDeletionExecutorService = messageDeletionExecutorService;
this.messageDeletionScheduler = Schedulers.fromExecutorService(messageDeletionExecutorService, "messageDeletion");
this.experimentEnrollmentManager = experimentEnrollmentManager;
this.insertScript = insertScript;
this.insertMrmScript = insertMrmScript;
@@ -750,9 +756,9 @@ public class MessagesCache {
return Byte.parseByte(queueName.substring(queueName.lastIndexOf("::") + 2, queueName.lastIndexOf('}')));
}
private static MessageProtos.Envelope parseEnvelope(final byte[] envelopeBytes)
private MessageProtos.Envelope parseEnvelope(final byte[] envelopeBytes)
throws InvalidProtocolBufferException {
return EnvelopeUtil.expand(MessageProtos.Envelope.parseFrom(envelopeBytes));
return EnvelopeUtil.expand(MessageProtos.Envelope.parseFrom(envelopeBytes), experimentEnrollmentManager);
}
}

View File

@@ -27,6 +27,7 @@ import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -69,11 +70,13 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
private final Duration timeToLive;
private final ExecutorService messageDeletionExecutor;
private final Scheduler messageDeletionScheduler;
private final ExperimentEnrollmentManager experimentEnrollmentManager;
private static final Logger logger = LoggerFactory.getLogger(MessagesDynamoDb.class);
public MessagesDynamoDb(DynamoDbClient dynamoDb, DynamoDbAsyncClient dynamoDbAsyncClient, String tableName,
Duration timeToLive, ExecutorService messageDeletionExecutor) {
Duration timeToLive, ExecutorService messageDeletionExecutor,
final ExperimentEnrollmentManager experimentEnrollmentManager) {
super(dynamoDb);
this.dbAsyncClient = dynamoDbAsyncClient;
@@ -82,6 +85,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
this.messageDeletionExecutor = messageDeletionExecutor;
this.messageDeletionScheduler = Schedulers.fromExecutor(messageDeletionExecutor);
this.experimentEnrollmentManager = experimentEnrollmentManager;
}
public void store(final List<MessageProtos.Envelope> messages, final UUID destinationAccountUuid,
@@ -151,7 +155,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
return dbAsyncClient.queryPaginator(queryRequest).items()
.map(message -> {
try {
return convertItemToEnvelope(message);
return convertItemToEnvelope(message, experimentEnrollmentManager);
} catch (final InvalidProtocolBufferException e) {
logger.error("Failed to parse envelope", e);
return null;
@@ -189,7 +193,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
.mapNotNull(deleteItemResponse -> {
try {
if (deleteItemResponse.attributes() != null && deleteItemResponse.attributes().containsKey(KEY_PARTITION)) {
return convertItemToEnvelope(deleteItemResponse.attributes());
return convertItemToEnvelope(deleteItemResponse.attributes(), experimentEnrollmentManager);
}
} catch (final InvalidProtocolBufferException e) {
logger.error("Failed to parse envelope", e);
@@ -213,7 +217,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
.thenApplyAsync(deleteItemResponse -> {
if (deleteItemResponse.attributes() != null && deleteItemResponse.attributes().containsKey(KEY_PARTITION)) {
try {
return Optional.of(convertItemToEnvelope(deleteItemResponse.attributes()));
return Optional.of(convertItemToEnvelope(deleteItemResponse.attributes(), experimentEnrollmentManager));
} catch (final InvalidProtocolBufferException e) {
logger.error("Failed to parse envelope", e);
}
@@ -224,10 +228,11 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
}
@VisibleForTesting
static MessageProtos.Envelope convertItemToEnvelope(final Map<String, AttributeValue> item)
throws InvalidProtocolBufferException {
static MessageProtos.Envelope convertItemToEnvelope(final Map<String, AttributeValue> item,
final ExperimentEnrollmentManager experimentEnrollmentManager) throws InvalidProtocolBufferException {
return EnvelopeUtil.expand(MessageProtos.Envelope.parseFrom(item.get(KEY_ENVELOPE_BYTES).b().asByteArray()));
return EnvelopeUtil.expand(MessageProtos.Envelope.parseFrom(item.get(KEY_ENVELOPE_BYTES).b().asByteArray()),
experimentEnrollmentManager);
}
private long getTtlForMessage(MessageProtos.Envelope message) {

View File

@@ -235,7 +235,7 @@ record CommandDependencies(
MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(dynamoDbClient, dynamoDbAsyncClient,
configuration.getDynamoDbTables().getMessages().getTableName(),
configuration.getDynamoDbTables().getMessages().getExpiration(),
messageDeletionExecutor);
messageDeletionExecutor, experimentEnrollmentManager);
FaultTolerantRedisClusterClient messagesCluster = configuration.getMessageCacheConfiguration()
.getRedisClusterConfiguration().build("messages", redisClientResourcesBuilder);
FaultTolerantRedisClusterClient rateLimitersCluster = configuration.getRateLimitersCluster().build("rate_limiters",
@@ -257,7 +257,7 @@ record CommandDependencies(
GrpcClientConnectionManager grpcClientConnectionManager = new GrpcClientConnectionManager();
DisconnectionRequestManager disconnectionRequestManager = new DisconnectionRequestManager(pubsubClient, grpcClientConnectionManager, disconnectionRequestListenerExecutor);
MessagesCache messagesCache = new MessagesCache(messagesCluster,
messageDeliveryScheduler, messageDeletionExecutor, Clock.systemUTC());
messageDeliveryScheduler, messageDeletionExecutor, Clock.systemUTC(), experimentEnrollmentManager);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster, asyncCdnS3Client,
configuration.getCdnConfiguration().bucket());
ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(dynamoDbClient, dynamoDbAsyncClient,