Remove message database from the codebase (#395)

* Remove message database from the codebase

* Remove unused ExperimentEnrollmentManager in test

* Be more stylish
This commit is contained in:
Ehren Kret
2021-02-11 10:50:03 -06:00
committed by GitHub
parent 477615fc66
commit be8a1acca9
21 changed files with 258 additions and 1049 deletions

View File

@@ -134,11 +134,6 @@ public class WhisperServerConfiguration extends Configuration {
@JsonProperty
private DynamoDbConfiguration keysDynamoDb;
@Valid
@NotNull
@JsonProperty
private DatabaseConfiguration messageStore;
@Valid
@NotNull
@JsonProperty
@@ -316,10 +311,6 @@ public class WhisperServerConfiguration extends Configuration {
return keysDynamoDb;
}
public DatabaseConfiguration getMessageStoreConfiguration() {
return messageStore;
}
public DatabaseConfiguration getAbuseDatabaseConfiguration() {
return abuseDatabase;
}

View File

@@ -4,6 +4,8 @@
*/
package org.whispersystems.textsecuregcm;
import static com.codahale.metrics.MetricRegistry.name;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
@@ -38,6 +40,21 @@ import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.wavefront.WavefrontConfig;
import io.micrometer.wavefront.WavefrontMeterRegistry;
import java.security.Security;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.servlet.DispatcherType;
import javax.servlet.FilterRegistration;
import javax.servlet.ServletRegistration;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.eclipse.jetty.servlets.CrossOriginFilter;
import org.jdbi.v3.core.Jdbi;
@@ -126,7 +143,6 @@ import org.whispersystems.textsecuregcm.storage.FeatureFlags;
import org.whispersystems.textsecuregcm.storage.FeatureFlagsManager;
import org.whispersystems.textsecuregcm.storage.KeysDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagePersister;
import org.whispersystems.textsecuregcm.storage.Messages;
import org.whispersystems.textsecuregcm.storage.MessagesCache;
import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
@@ -152,35 +168,17 @@ import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator;
import org.whispersystems.textsecuregcm.workers.CertificateCommand;
import org.whispersystems.textsecuregcm.workers.DeleteFeatureFlagTask;
import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
import org.whispersystems.textsecuregcm.workers.SetRequestLoggingEnabledTask;
import org.whispersystems.textsecuregcm.workers.GetRedisCommandStatsCommand;
import org.whispersystems.textsecuregcm.workers.GetRedisSlowlogCommand;
import org.whispersystems.textsecuregcm.workers.ListFeatureFlagsTask;
import org.whispersystems.textsecuregcm.workers.SetCrawlerAccelerationTask;
import org.whispersystems.textsecuregcm.workers.SetFeatureFlagTask;
import org.whispersystems.textsecuregcm.workers.SetRequestLoggingEnabledTask;
import org.whispersystems.textsecuregcm.workers.VacuumCommand;
import org.whispersystems.textsecuregcm.workers.ZkParamsCommand;
import org.whispersystems.websocket.WebSocketResourceProviderFactory;
import org.whispersystems.websocket.setup.WebSocketEnvironment;
import javax.servlet.DispatcherType;
import javax.servlet.FilterRegistration;
import javax.servlet.ServletRegistration;
import java.security.Security;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static com.codahale.metrics.MetricRegistry.name;
public class WhisperServerService extends Application<WhisperServerConfiguration> {
static {
@@ -204,13 +202,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
});
bootstrap.addBundle(new NameableMigrationsBundle<WhisperServerConfiguration>("messagedb", "messagedb.xml") {
@Override
public DataSourceFactory getDataSourceFactory(WhisperServerConfiguration configuration) {
return configuration.getMessageStoreConfiguration();
}
});
bootstrap.addBundle(new NameableMigrationsBundle<WhisperServerConfiguration>("abusedb", "abusedb.xml") {
@Override
public PooledDataSourceFactory getDataSourceFactory(WhisperServerConfiguration configuration) {
@@ -261,11 +252,9 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
JdbiFactory jdbiFactory = new JdbiFactory(DefaultNameStrategy.CHECK_EMPTY);
Jdbi accountJdbi = jdbiFactory.build(environment, config.getAccountsDatabaseConfiguration(), "accountdb");
Jdbi messageJdbi = jdbiFactory.build(environment, config.getMessageStoreConfiguration(), "messagedb" );
Jdbi abuseJdbi = jdbiFactory.build(environment, config.getAbuseDatabaseConfiguration(), "abusedb" );
FaultTolerantDatabase accountDatabase = new FaultTolerantDatabase("accounts_database", accountJdbi, config.getAccountsDatabaseConfiguration().getCircuitBreakerConfiguration());
FaultTolerantDatabase messageDatabase = new FaultTolerantDatabase("message_database", messageJdbi, config.getMessageStoreConfiguration().getCircuitBreakerConfiguration());
FaultTolerantDatabase abuseDatabase = new FaultTolerantDatabase("abuse_database", abuseJdbi, config.getAbuseDatabaseConfiguration().getCircuitBreakerConfiguration());
AmazonDynamoDBClientBuilder messageDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
@@ -292,7 +281,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ReservedUsernames reservedUsernames = new ReservedUsernames(accountDatabase);
Profiles profiles = new Profiles(accountDatabase);
KeysDynamoDb keysDynamoDb = new KeysDynamoDb(preKeyDynamoDb, config.getKeysDynamoDbConfiguration().getTableName());
Messages messages = new Messages(messageDatabase);
MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(messageDynamoDb, config.getMessageDynamoDbConfiguration().getTableName(), config.getMessageDynamoDbConfiguration().getTimeToLive());
AbusiveHostRules abusiveHostRules = new AbusiveHostRules(abuseDatabase);
RemoteConfigs remoteConfigs = new RemoteConfigs(accountDatabase);
@@ -342,7 +330,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
MessagesCache messagesCache = new MessagesCache(messagesCluster, messagesCluster, keyspaceNotificationDispatchExecutor);
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
MessagesManager messagesManager = new MessagesManager(messages, messagesDynamoDb, messagesCache, pushLatencyManager, experimentEnrollmentManager);
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, pushLatencyManager);
AccountsManager accountsManager = new AccountsManager(accounts, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager);
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
FeatureFlagsManager featureFlagsManager = new FeatureFlagsManager(featureFlags, recurringJobExecutor);

View File

@@ -4,12 +4,33 @@
*/
package org.whispersystems.textsecuregcm.controllers;
import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.annotation.Timed;
import com.google.common.annotations.VisibleForTesting;
import io.dropwizard.auth.Auth;
import java.security.SecureRandom;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import javax.validation.Valid;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AuthenticationCredentials;
@@ -52,28 +73,6 @@ import org.whispersystems.textsecuregcm.util.Hex;
import org.whispersystems.textsecuregcm.util.Util;
import org.whispersystems.textsecuregcm.util.VerificationCode;
import javax.validation.Valid;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.security.SecureRandom;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import static com.codahale.metrics.MetricRegistry.name;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
@Path("/v1/accounts")
public class AccountController {
@@ -644,7 +643,7 @@ public class AccountController {
}
directoryQueue.refreshRegisteredUser(account);
messagesManager.clear(number, maybeExistingAccount.map(Account::getUuid).orElse(null));
maybeExistingAccount.ifPresent(definitelyExistingAccount -> messagesManager.clear(definitelyExistingAccount.getUuid()));
pendingAccounts.remove(number);
return account;

View File

@@ -102,7 +102,7 @@ public class DeviceController {
account.removeDevice(deviceId);
accounts.update(account);
directoryQueue.refreshRegisteredUser(account);
messages.clear(account.getNumber(), account.getUuid(), deviceId);
messages.clear(account.getUuid(), deviceId);
}
@Timed
@@ -196,7 +196,7 @@ public class DeviceController {
device.setCapabilities(accountAttributes.getCapabilities());
account.get().addDevice(device);
messages.clear(account.get().getNumber(), account.get().getUuid(), device.getId());
messages.clear(account.get().getUuid(), device.getId());
accounts.update(account.get());
pendingDevices.remove(number);

View File

@@ -4,6 +4,8 @@
*/
package org.whispersystems.textsecuregcm.controllers;
import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
@@ -15,6 +17,25 @@ import io.dropwizard.auth.Auth;
import io.dropwizard.util.DataSize;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import java.io.IOException;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import javax.validation.Valid;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AmbiguousIdentifier;
@@ -46,28 +67,6 @@ import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException;
import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil;
import org.whispersystems.textsecuregcm.websocket.WebSocketConnection;
import javax.validation.Valid;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import static com.codahale.metrics.MetricRegistry.name;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
@Path("/v1/messages")
public class MessageController {
@@ -226,11 +225,11 @@ public class MessageController {
RedisOperation.unchecked(() -> apnFallbackManager.cancel(account, account.getAuthenticatedDevice().get()));
}
final OutgoingMessageEntityList outgoingMessages = messagesManager.getMessagesForDevice(account.getNumber(),
account.getUuid(),
account.getAuthenticatedDevice().get().getId(),
userAgent,
false);
final OutgoingMessageEntityList outgoingMessages = messagesManager.getMessagesForDevice(
account.getUuid(),
account.getAuthenticatedDevice().get().getId(),
userAgent,
false);
outgoingMessageListSizeHistogram.update(outgoingMessages.getMessages().size());
@@ -271,8 +270,8 @@ public class MessageController {
{
try {
WebSocketConnection.recordMessageDeliveryDuration(timestamp, account.getAuthenticatedDevice().get());
Optional<OutgoingMessageEntity> message = messagesManager.delete(account.getNumber(),
account.getUuid(),
Optional<OutgoingMessageEntity> message = messagesManager.delete(
account.getUuid(),
account.getAuthenticatedDevice().get().getId(),
source, timestamp);
@@ -291,8 +290,8 @@ public class MessageController {
@Path("/uuid/{uuid}")
public void removePendingMessage(@Auth Account account, @PathParam("uuid") UUID uuid) {
try {
Optional<OutgoingMessageEntity> message = messagesManager.delete(account.getNumber(),
account.getUuid(),
Optional<OutgoingMessageEntity> message = messagesManager.delete(
account.getUuid(),
account.getAuthenticatedDevice().get().getId(),
uuid);

View File

@@ -5,6 +5,8 @@
package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
@@ -13,6 +15,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.lettuce.core.RedisException;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import io.micrometer.core.instrument.Metrics;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AmbiguousIdentifier;
@@ -22,13 +28,6 @@ import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.Util;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import static com.codahale.metrics.MetricRegistry.name;
public class AccountsManager {
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
@@ -145,7 +144,7 @@ public class AccountsManager {
directoryQueue.deleteAccount(account);
profilesManager.deleteAll(account.getUuid());
keysDynamoDb.delete(account);
messagesManager.clear(account.getNumber(), account.getUuid());
messagesManager.clear(account.getUuid());
redisDelete(account);
databaseDelete(account);
}

View File

@@ -164,7 +164,7 @@ public class MessagePersister implements Managed {
do {
messages = messagesCache.getMessagesToPersist(accountUuid, deviceId, MESSAGE_BATCH_LIMIT);
messagesManager.persistMessages(accountNumber, accountUuid, deviceId, messages);
messagesManager.persistMessages(accountUuid, deviceId, messages);
messageCount += messages.size();
persistMessageMeter.mark(messages.size());

View File

@@ -1,183 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import org.jdbi.v3.core.argument.SetObjectArgumentFactory;
import org.jdbi.v3.core.statement.PreparedBatch;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
import org.whispersystems.textsecuregcm.storage.mappers.OutgoingMessageEntityRowMapper;
import org.whispersystems.textsecuregcm.util.Constants;
import java.sql.Types;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import static com.codahale.metrics.MetricRegistry.name;
public class Messages {
static final int RESULT_SET_CHUNK_SIZE = 100;
public static final String ID = "id";
public static final String GUID = "guid";
public static final String TYPE = "type";
public static final String RELAY = "relay";
public static final String TIMESTAMP = "timestamp";
public static final String SERVER_TIMESTAMP = "server_timestamp";
public static final String SOURCE = "source";
public static final String SOURCE_UUID = "source_uuid";
public static final String SOURCE_DEVICE = "source_device";
public static final String DESTINATION = "destination";
public static final String DESTINATION_DEVICE = "destination_device";
public static final String MESSAGE = "message";
public static final String CONTENT = "content";
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Timer storeTimer = metricRegistry.timer(name(Messages.class, "store" ));
private final Timer loadTimer = metricRegistry.timer(name(Messages.class, "load" ));
private final Timer removeBySourceTimer = metricRegistry.timer(name(Messages.class, "removeBySource"));
private final Timer removeByGuidTimer = metricRegistry.timer(name(Messages.class, "removeByGuid" ));
private final Timer removeByIdTimer = metricRegistry.timer(name(Messages.class, "removeById" ));
private final Timer clearDeviceTimer = metricRegistry.timer(name(Messages.class, "clearDevice" ));
private final Timer clearTimer = metricRegistry.timer(name(Messages.class, "clear" ));
private final Timer vacuumTimer = metricRegistry.timer(name(Messages.class, "vacuum"));
private final Meter insertNullGuidMeter = metricRegistry.meter(name(Messages.class, "insertNullGuid"));
private final Histogram storeSizeHistogram = metricRegistry.histogram(name(Messages.class, "storeBatchSize"));
private final FaultTolerantDatabase database;
private static class UUIDArgumentFactory extends SetObjectArgumentFactory {
public UUIDArgumentFactory() {
super(Map.of(UUID.class, Types.OTHER));
}
}
public Messages(FaultTolerantDatabase database) {
this.database = database;
this.database.getDatabase().registerRowMapper(new OutgoingMessageEntityRowMapper());
this.database.getDatabase().registerArgument(new UUIDArgumentFactory());
}
public void store(final List<Envelope> messages, final String destination, final long destinationDevice) {
database.use(jdbi -> jdbi.useTransaction(handle -> {
try (final Timer.Context ignored = storeTimer.time()) {
final PreparedBatch batch = handle.prepareBatch("INSERT INTO messages (" + GUID + ", " + TYPE + ", " + RELAY + ", " + TIMESTAMP + ", " + SERVER_TIMESTAMP + ", " + SOURCE + ", " + SOURCE_UUID + ", " + SOURCE_DEVICE + ", " + DESTINATION + ", " + DESTINATION_DEVICE + ", " + MESSAGE + ", " + CONTENT + ") " +
"VALUES (:guid, :type, :relay, :timestamp, :server_timestamp, :source, :source_uuid, :source_device, :destination, :destination_device, :message, :content)");
for (final Envelope message : messages) {
if (message.getServerGuid() == null) {
insertNullGuidMeter.mark();
}
batch.bind("guid", UUID.fromString(message.getServerGuid()))
.bind("destination", destination)
.bind("destination_device", destinationDevice)
.bind("type", message.getType().getNumber())
.bind("relay", message.getRelay())
.bind("timestamp", message.getTimestamp())
.bind("server_timestamp", message.getServerTimestamp())
.bind("source", message.hasSource() ? message.getSource() : null)
.bind("source_uuid", message.hasSourceUuid() ? UUID.fromString(message.getSourceUuid()) : null)
.bind("source_device", message.hasSourceDevice() ? message.getSourceDevice() : null)
.bind("message", message.hasLegacyMessage() ? message.getLegacyMessage().toByteArray() : null)
.bind("content", message.hasContent() ? message.getContent().toByteArray() : null)
.add();
}
batch.execute();
storeSizeHistogram.update(messages.size());
}
}));
}
public List<OutgoingMessageEntity> load(String destination, long destinationDevice) {
return database.with(jdbi-> jdbi.withHandle(handle -> {
try (Timer.Context ignored = loadTimer.time()) {
return handle.createQuery("SELECT * FROM messages WHERE " + DESTINATION + " = :destination AND " + DESTINATION_DEVICE + " = :destination_device ORDER BY " + TIMESTAMP + " ASC LIMIT " + RESULT_SET_CHUNK_SIZE)
.bind("destination", destination)
.bind("destination_device", destinationDevice)
.mapTo(OutgoingMessageEntity.class)
.list();
}
}));
}
public Optional<OutgoingMessageEntity> remove(String destination, long destinationDevice, String source, long timestamp) {
return database.with(jdbi -> jdbi.withHandle(handle -> {
try (Timer.Context ignored = removeBySourceTimer.time()) {
return handle.createQuery("DELETE FROM messages WHERE " + ID + " IN (SELECT " + ID + " FROM messages WHERE " + DESTINATION + " = :destination AND " + DESTINATION_DEVICE + " = :destination_device AND " + SOURCE + " = :source AND " + TIMESTAMP + " = :timestamp ORDER BY " + ID + " LIMIT 1) RETURNING *")
.bind("destination", destination)
.bind("destination_device", destinationDevice)
.bind("source", source)
.bind("timestamp", timestamp)
.mapTo(OutgoingMessageEntity.class)
.findFirst();
}
}));
}
public Optional<OutgoingMessageEntity> remove(String destination, UUID guid) {
return database.with(jdbi -> jdbi.withHandle(handle -> {
try (Timer.Context ignored = removeByGuidTimer.time()) {
return handle.createQuery("DELETE FROM messages WHERE " + ID + " IN (SELECT " + ID + " FROM MESSAGES WHERE " + GUID + " = :guid AND " + DESTINATION + " = :destination ORDER BY " + ID + " LIMIT 1) RETURNING *")
.bind("destination", destination)
.bind("guid", guid)
.mapTo(OutgoingMessageEntity.class)
.findFirst();
}
}));
}
public void remove(String destination, long id) {
database.use(jdbi -> jdbi.useHandle(handle -> {
try (Timer.Context ignored = removeByIdTimer.time()) {
handle.createUpdate("DELETE FROM messages WHERE " + ID + " = :id AND " + DESTINATION + " = :destination")
.bind("destination", destination)
.bind("id", id)
.execute();
}
}));
}
public void clear(String destination) {
database.use(jdbi ->jdbi.useHandle(handle -> {
try (Timer.Context ignored = clearTimer.time()) {
handle.createUpdate("DELETE FROM messages WHERE " + DESTINATION + " = :destination")
.bind("destination", destination)
.execute();
}
}));
}
public void clear(String destination, long destinationDevice) {
database.use(jdbi -> jdbi.useHandle(handle -> {
try (Timer.Context ignored = clearDeviceTimer.time()) {
handle.createUpdate("DELETE FROM messages WHERE " + DESTINATION + " = :destination AND " + DESTINATION_DEVICE + " = :destination_device")
.bind("destination", destination)
.bind("destination_device", destinationDevice)
.execute();
}
}));
}
public void vacuum() {
database.use(jdbi -> jdbi.useHandle(handle -> {
try (Timer.Context ignored = vacuumTimer.time()) {
handle.execute("VACUUM messages");
}
}));
}
}

View File

@@ -2,10 +2,8 @@
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.Meter;
@@ -19,14 +17,13 @@ import java.util.stream.Collectors;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
import org.whispersystems.textsecuregcm.redis.RedisOperation;
import org.whispersystems.textsecuregcm.util.Constants;
public class MessagesManager {
private static final String DISABLE_RDS_EXPERIMENT = "messages_disable_rds";
private static final int RESULT_SET_CHUNK_SIZE = 100;
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private static final Meter cacheHitByNameMeter = metricRegistry.meter(name(MessagesManager.class, "cacheHitByName" ));
@@ -34,18 +31,17 @@ public class MessagesManager {
private static final Meter cacheHitByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheHitByGuid" ));
private static final Meter cacheMissByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheMissByGuid"));
private final Messages messages;
private final MessagesDynamoDb messagesDynamoDb;
private final MessagesCache messagesCache;
private final PushLatencyManager pushLatencyManager;
private final ExperimentEnrollmentManager experimentEnrollmentManager;
public MessagesManager(Messages messages, MessagesDynamoDb messagesDynamoDb, MessagesCache messagesCache, PushLatencyManager pushLatencyManager, ExperimentEnrollmentManager experimentEnrollmentManager) {
this.messages = messages;
public MessagesManager(
MessagesDynamoDb messagesDynamoDb,
MessagesCache messagesCache,
PushLatencyManager pushLatencyManager) {
this.messagesDynamoDb = messagesDynamoDb;
this.messagesCache = messagesCache;
this.pushLatencyManager = pushLatencyManager;
this.experimentEnrollmentManager = experimentEnrollmentManager;
}
public void insert(UUID destinationUuid, long destinationDevice, Envelope message) {
@@ -64,55 +60,38 @@ public class MessagesManager {
return messagesCache.hasMessages(destinationUuid, destinationDevice);
}
public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice, final String userAgent, final boolean cachedMessagesOnly) {
public OutgoingMessageEntityList getMessagesForDevice(UUID destinationUuid, long destinationDevice, final String userAgent, final boolean cachedMessagesOnly) {
RedisOperation.unchecked(() -> pushLatencyManager.recordQueueRead(destinationUuid, destinationDevice, userAgent));
List<OutgoingMessageEntity> messageList = new ArrayList<>();
if (!cachedMessagesOnly && !experimentEnrollmentManager.isEnrolled(destinationUuid, DISABLE_RDS_EXPERIMENT)) {
messageList.addAll(messages.load(destination, destinationDevice));
if (!cachedMessagesOnly) {
messageList.addAll(messagesDynamoDb.load(destinationUuid, destinationDevice, RESULT_SET_CHUNK_SIZE));
}
if (messageList.size() < Messages.RESULT_SET_CHUNK_SIZE && !cachedMessagesOnly) {
messageList.addAll(messagesDynamoDb.load(destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messageList.size()));
if (messageList.size() < RESULT_SET_CHUNK_SIZE) {
messageList.addAll(messagesCache.get(destinationUuid, destinationDevice, RESULT_SET_CHUNK_SIZE - messageList.size()));
}
if (messageList.size() < Messages.RESULT_SET_CHUNK_SIZE) {
messageList.addAll(messagesCache.get(destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messageList.size()));
}
return new OutgoingMessageEntityList(messageList, messageList.size() >= Messages.RESULT_SET_CHUNK_SIZE);
return new OutgoingMessageEntityList(messageList, messageList.size() >= RESULT_SET_CHUNK_SIZE);
}
public void clear(String destination, UUID destinationUuid) {
// TODO Remove this null check in a fully-UUID-ified world
if (destinationUuid != null) {
messagesCache.clear(destinationUuid);
messagesDynamoDb.deleteAllMessagesForAccount(destinationUuid);
if (!experimentEnrollmentManager.isEnrolled(destinationUuid, DISABLE_RDS_EXPERIMENT)) {
messages.clear(destination);
}
} else {
messages.clear(destination);
}
public void clear(UUID destinationUuid) {
messagesCache.clear(destinationUuid);
messagesDynamoDb.deleteAllMessagesForAccount(destinationUuid);
}
public void clear(String destination, UUID destinationUuid, long deviceId) {
public void clear(UUID destinationUuid, long deviceId) {
messagesCache.clear(destinationUuid, deviceId);
messagesDynamoDb.deleteAllMessagesForDevice(destinationUuid, deviceId);
if (!experimentEnrollmentManager.isEnrolled(destinationUuid, DISABLE_RDS_EXPERIMENT)) {
messages.clear(destination, deviceId);
}
}
public Optional<OutgoingMessageEntity> delete(String destination, UUID destinationUuid, long destinationDevice, String source, long timestamp) {
Optional<OutgoingMessageEntity> removed = messagesCache.remove(destinationUuid, destinationDevice, source, timestamp);
public Optional<OutgoingMessageEntity> delete(
UUID destinationUuid, long destinationDeviceId, String source, long timestamp) {
Optional<OutgoingMessageEntity> removed = messagesCache.remove(destinationUuid, destinationDeviceId, source, timestamp);
if (removed.isEmpty()) {
removed = messagesDynamoDb.deleteMessageByDestinationAndSourceAndTimestamp(destinationUuid, destinationDevice, source, timestamp);
if (removed.isEmpty() && !experimentEnrollmentManager.isEnrolled(destinationUuid, DISABLE_RDS_EXPERIMENT)) {
removed = messages.remove(destination, destinationDevice, source, timestamp);
}
removed = messagesDynamoDb.deleteMessageByDestinationAndSourceAndTimestamp(destinationUuid, destinationDeviceId, source, timestamp);
cacheMissByNameMeter.mark();
} else {
cacheHitByNameMeter.mark();
@@ -121,14 +100,11 @@ public class MessagesManager {
return removed;
}
public Optional<OutgoingMessageEntity> delete(String destination, UUID destinationUuid, long deviceId, UUID guid) {
Optional<OutgoingMessageEntity> removed = messagesCache.remove(destinationUuid, deviceId, guid);
public Optional<OutgoingMessageEntity> delete(UUID destinationUuid, long destinationDeviceId, UUID guid) {
Optional<OutgoingMessageEntity> removed = messagesCache.remove(destinationUuid, destinationDeviceId, guid);
if (removed.isEmpty()) {
removed = messagesDynamoDb.deleteMessageByDestinationAndGuid(destinationUuid, deviceId, guid);
if (removed.isEmpty() && !experimentEnrollmentManager.isEnrolled(destinationUuid, DISABLE_RDS_EXPERIMENT)) {
removed = messages.remove(destination, guid);
}
removed = messagesDynamoDb.deleteMessageByDestinationAndGuid(destinationUuid, destinationDeviceId, guid);
cacheMissByGuidMeter.mark();
} else {
cacheHitByGuidMeter.mark();
@@ -137,18 +113,19 @@ public class MessagesManager {
return removed;
}
@Deprecated
public void delete(String destination, long id) {
messages.remove(destination, id);
}
public void persistMessages(final String destination, final UUID destinationUuid, final long destinationDeviceId, final List<Envelope> messages) {
public void persistMessages(
final UUID destinationUuid,
final long destinationDeviceId,
final List<Envelope> messages) {
messagesDynamoDb.store(messages, destinationUuid, destinationDeviceId);
messagesCache.remove(destinationUuid, destinationDeviceId, messages.stream().map(message -> UUID.fromString(message.getServerGuid())).collect(Collectors.toList()));
}
public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, final MessageAvailabilityListener listener) {
messagesCache.addMessageAvailabilityListener(destinationUuid, deviceId, listener);
public void addMessageAvailabilityListener(
final UUID destinationUuid,
final long destinationDeviceId,
final MessageAvailabilityListener listener) {
messagesCache.addMessageAvailabilityListener(destinationUuid, destinationDeviceId, listener);
}
public void removeMessageAvailabilityListener(final MessageAvailabilityListener listener) {

View File

@@ -1,44 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage.mappers;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
import org.whispersystems.textsecuregcm.storage.Messages;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.UUID;
public class OutgoingMessageEntityRowMapper implements RowMapper<OutgoingMessageEntity> {
@Override
public OutgoingMessageEntity map(ResultSet resultSet, StatementContext ctx) throws SQLException {
int type = resultSet.getInt(Messages.TYPE);
byte[] legacyMessage = resultSet.getBytes(Messages.MESSAGE);
String guid = resultSet.getString(Messages.GUID);
String sourceUuid = resultSet.getString(Messages.SOURCE_UUID);
if (type == Envelope.Type.RECEIPT_VALUE && legacyMessage == null) {
/// XXX - REMOVE AFTER 10/01/15
legacyMessage = new byte[0];
}
return new OutgoingMessageEntity(resultSet.getLong(Messages.ID),
false,
guid == null ? null : UUID.fromString(guid),
type,
resultSet.getString(Messages.RELAY),
resultSet.getLong(Messages.TIMESTAMP),
resultSet.getString(Messages.SOURCE),
sourceUuid == null ? null : UUID.fromString(sourceUuid),
resultSet.getInt(Messages.SOURCE_DEVICE),
legacyMessage,
resultSet.getBytes(Messages.CONTENT),
resultSet.getLong(Messages.SERVER_TIMESTAMP));
}
}

View File

@@ -5,6 +5,9 @@
package org.whispersystems.textsecuregcm.websocket;
import static com.codahale.metrics.MetricRegistry.name;
import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
@@ -13,6 +16,19 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import javax.ws.rs.WebApplicationException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,23 +52,6 @@ import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil;
import org.whispersystems.websocket.WebSocketClient;
import org.whispersystems.websocket.messages.WebSocketResponseMessage;
import javax.ws.rs.WebApplicationException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import static com.codahale.metrics.MetricRegistry.name;
import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public class WebSocketConnection implements MessageAvailabilityListener, DisplacedPresenceListener {
@@ -144,7 +143,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
if (throwable == null) {
if (isSuccessResponse(response)) {
if (storedMessageInfo.isPresent()) {
messagesManager.delete(account.getNumber(), account.getUuid(), device.getId(), storedMessageInfo.get().getGuid());
messagesManager.delete(account.getUuid(), device.getId(), storedMessageInfo.get().getGuid());
}
if (message.getType() != Envelope.Type.RECEIPT) {
@@ -218,7 +217,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
}
private void sendNextMessagePage(final boolean cachedMessagesOnly, final CompletableFuture<Void> queueClearedFuture) {
final OutgoingMessageEntityList messages = messagesManager.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId(), client.getUserAgent(), cachedMessagesOnly);
final OutgoingMessageEntityList messages = messagesManager.getMessagesForDevice(account.getUuid(), device.getId(), client.getUserAgent(), cachedMessagesOnly);
final CompletableFuture<?>[] sendFutures = new CompletableFuture[messages.getMessages().size()];
for (int i = 0; i < messages.getMessages().size(); i++) {
@@ -250,12 +249,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
final Envelope envelope = builder.build();
if (message.getGuid() == null || (envelope.getSerializedSize() > MAX_DESKTOP_MESSAGE_SIZE && isDesktopClient)) {
if (message.getGuid() == null) {
messagesManager.delete(account.getNumber(), message.getId()); // TODO(ehren): Remove once the message DB is gone.
} else {
messagesManager.delete(account.getNumber(), account.getUuid(), device.getId(), message.getGuid());
}
if (envelope.getSerializedSize() > MAX_DESKTOP_MESSAGE_SIZE && isDesktopClient) {
messagesManager.delete(account.getUuid(), device.getId(), message.getGuid());
discardedMessagesMeter.mark();
sendFutures[i] = CompletableFuture.completedFuture(null);

View File

@@ -5,6 +5,8 @@
package org.whispersystems.textsecuregcm.workers;
import static com.codahale.metrics.MetricRegistry.name;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
@@ -15,13 +17,14 @@ import io.dropwizard.cli.EnvironmentCommand;
import io.dropwizard.jdbi3.JdbiFactory;
import io.dropwizard.setup.Environment;
import io.lettuce.core.resource.ClientResources;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.jdbi.v3.core.Jdbi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.sqs.DirectoryQueue;
@@ -31,7 +34,6 @@ import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
import org.whispersystems.textsecuregcm.storage.KeysDynamoDb;
import org.whispersystems.textsecuregcm.storage.Messages;
import org.whispersystems.textsecuregcm.storage.MessagesCache;
import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
@@ -41,11 +43,6 @@ import org.whispersystems.textsecuregcm.storage.ReservedUsernames;
import org.whispersystems.textsecuregcm.storage.Usernames;
import org.whispersystems.textsecuregcm.storage.UsernamesManager;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import static com.codahale.metrics.MetricRegistry.name;
public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfiguration> {
private final Logger logger = LoggerFactory.getLogger(DeleteUserCommand.class);
@@ -83,9 +80,7 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
JdbiFactory jdbiFactory = new JdbiFactory();
Jdbi accountJdbi = jdbiFactory.build(environment, configuration.getAccountsDatabaseConfiguration(), "accountdb");
Jdbi messageJdbi = jdbiFactory.build(environment, configuration.getMessageStoreConfiguration(), "messagedb" );
FaultTolerantDatabase accountDatabase = new FaultTolerantDatabase("account_database_delete_user", accountJdbi, configuration.getAccountsDatabaseConfiguration().getCircuitBreakerConfiguration());
FaultTolerantDatabase messageDatabase = new FaultTolerantDatabase("message_database", messageJdbi, configuration.getMessageStoreConfiguration().getCircuitBreakerConfiguration());
ClientResources redisClusterClientResources = ClientResources.builder().build();
AmazonDynamoDBClientBuilder clientBuilder = AmazonDynamoDBClientBuilder
@@ -116,7 +111,6 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
Profiles profiles = new Profiles(accountDatabase);
ReservedUsernames reservedUsernames = new ReservedUsernames(accountDatabase);
KeysDynamoDb keysDynamoDb = new KeysDynamoDb(preKeysDynamoDb, configuration.getKeysDynamoDbConfiguration().getTableName());
Messages messages = new Messages(messageDatabase);
MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(messageDynamoDb, configuration.getMessageDynamoDbConfiguration().getTableName(), configuration.getMessageDynamoDbConfiguration().getTimeToLive());
FaultTolerantRedisCluster messageInsertCacheCluster = new FaultTolerantRedisCluster("message_insert_cluster", configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster messageReadDeleteCluster = new FaultTolerantRedisCluster("message_read_delete_cluster", configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources);
@@ -126,7 +120,7 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
DirectoryQueue directoryQueue = new DirectoryQueue (configuration.getDirectoryConfiguration().getSqsConfiguration());
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
MessagesManager messagesManager = new MessagesManager(messages, messagesDynamoDb, messagesCache, pushLatencyManager, new ExperimentEnrollmentManager(dynamicConfigurationManager));
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, pushLatencyManager);
AccountsManager accountsManager = new AccountsManager(accounts, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager);
for (String user: users) {

View File

@@ -14,7 +14,6 @@ import org.whispersystems.textsecuregcm.configuration.DatabaseConfiguration;
import org.whispersystems.textsecuregcm.storage.Accounts;
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
import org.whispersystems.textsecuregcm.storage.FeatureFlags;
import org.whispersystems.textsecuregcm.storage.Messages;
import org.whispersystems.textsecuregcm.storage.PendingAccounts;
import io.dropwizard.cli.ConfiguredCommand;
@@ -36,17 +35,11 @@ public class VacuumCommand extends ConfiguredCommand<WhisperServerConfiguration>
throws Exception
{
DatabaseConfiguration accountDbConfig = config.getAbuseDatabaseConfiguration();
DatabaseConfiguration messageDbConfig = config.getMessageStoreConfiguration();
Jdbi accountJdbi = Jdbi.create(accountDbConfig.getUrl(), accountDbConfig.getUser(), accountDbConfig.getPassword());
Jdbi messageJdbi = Jdbi.create(messageDbConfig.getUrl(), messageDbConfig.getUser(), messageDbConfig.getPassword());
FaultTolerantDatabase accountDatabase = new FaultTolerantDatabase("account_database_vacuum", accountJdbi, accountDbConfig.getCircuitBreakerConfiguration());
FaultTolerantDatabase messageDatabase = new FaultTolerantDatabase("message_database_vacuum", messageJdbi, messageDbConfig.getCircuitBreakerConfiguration());
Accounts accounts = new Accounts(accountDatabase);
PendingAccounts pendingAccounts = new PendingAccounts(accountDatabase);
Messages messages = new Messages(messageDatabase);
FeatureFlags featureFlags = new FeatureFlags(accountDatabase);
logger.info("Vacuuming accounts...");
@@ -55,9 +48,6 @@ public class VacuumCommand extends ConfiguredCommand<WhisperServerConfiguration>
logger.info("Vacuuming pending_accounts...");
pendingAccounts.vacuum();
logger.info("Vacuuming messages...");
messages.vacuum();
logger.info("Vacuuming feature flags...");
featureFlags.vacuum();