mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-20 09:28:03 +01:00
Retire Postgres-backed pending account/device tables.
This commit is contained in:
committed by
Jon Chambers
parent
530b2a310f
commit
d128bc782a
@@ -166,10 +166,6 @@ import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||
import org.whispersystems.textsecuregcm.storage.MigrationDeletedAccounts;
|
||||
import org.whispersystems.textsecuregcm.storage.MigrationRetryAccounts;
|
||||
import org.whispersystems.textsecuregcm.storage.MigrationRetryAccountsTableCrawler;
|
||||
import org.whispersystems.textsecuregcm.storage.PendingAccounts;
|
||||
import org.whispersystems.textsecuregcm.storage.PendingAccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.PendingDevices;
|
||||
import org.whispersystems.textsecuregcm.storage.PendingDevicesManager;
|
||||
import org.whispersystems.textsecuregcm.storage.Profiles;
|
||||
import org.whispersystems.textsecuregcm.storage.ProfilesManager;
|
||||
import org.whispersystems.textsecuregcm.storage.PubSubManager;
|
||||
@@ -181,9 +177,10 @@ import org.whispersystems.textsecuregcm.storage.RemoteConfigsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.ReportMessageDynamoDb;
|
||||
import org.whispersystems.textsecuregcm.storage.ReportMessageManager;
|
||||
import org.whispersystems.textsecuregcm.storage.ReservedUsernames;
|
||||
import org.whispersystems.textsecuregcm.storage.StoredVerificationCodeManager;
|
||||
import org.whispersystems.textsecuregcm.storage.Usernames;
|
||||
import org.whispersystems.textsecuregcm.storage.UsernamesManager;
|
||||
import org.whispersystems.textsecuregcm.storage.VerificationCodeStoreDynamoDb;
|
||||
import org.whispersystems.textsecuregcm.storage.VerificationCodeStore;
|
||||
import org.whispersystems.textsecuregcm.util.AsnManager;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig;
|
||||
@@ -356,8 +353,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
|
||||
Accounts accounts = new Accounts(accountDatabase);
|
||||
AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient, accountsDynamoDbAsyncClient, accountsDynamoDbMigrationThreadPool, config.getAccountsDynamoDbConfiguration().getTableName(), config.getAccountsDynamoDbConfiguration().getPhoneNumberTableName(), migrationDeletedAccounts, migrationRetryAccounts);
|
||||
PendingAccounts pendingAccounts = new PendingAccounts(accountDatabase);
|
||||
PendingDevices pendingDevices = new PendingDevices (accountDatabase);
|
||||
Usernames usernames = new Usernames(accountDatabase);
|
||||
ReservedUsernames reservedUsernames = new ReservedUsernames(accountDatabase);
|
||||
Profiles profiles = new Profiles(accountDatabase);
|
||||
@@ -367,8 +362,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
RemoteConfigs remoteConfigs = new RemoteConfigs(accountDatabase);
|
||||
PushChallengeDynamoDb pushChallengeDynamoDb = new PushChallengeDynamoDb(pushChallengeDynamoDbClient, config.getPushChallengeDynamoDbConfiguration().getTableName());
|
||||
ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(reportMessageDynamoDbClient, config.getReportMessageDynamoDbConfiguration().getTableName());
|
||||
VerificationCodeStoreDynamoDb pendingAccountsDynamoDb = new VerificationCodeStoreDynamoDb(pendingAccountsDynamoDbClient, config.getPendingAccountsDynamoDbConfiguration().getTableName());
|
||||
VerificationCodeStoreDynamoDb pendingDevicesDynamoDb = new VerificationCodeStoreDynamoDb(pendingDevicesDynamoDbClient, config.getPendingDevicesDynamoDbConfiguration().getTableName());
|
||||
VerificationCodeStore pendingAccounts = new VerificationCodeStore(pendingAccountsDynamoDbClient, config.getPendingAccountsDynamoDbConfiguration().getTableName());
|
||||
VerificationCodeStore pendingDevices = new VerificationCodeStore(pendingDevicesDynamoDbClient, config.getPendingDevicesDynamoDbConfiguration().getTableName());
|
||||
|
||||
RedisClientFactory pubSubClientFactory = new RedisClientFactory("pubsub_cache", config.getPubsubCacheConfiguration().getUrl(), config.getPubsubCacheConfiguration().getReplicaUrls(), config.getPubsubCacheConfiguration().getCircuitBreakerConfiguration());
|
||||
ReplicatedJedisPool pubsubClient = pubSubClientFactory.getRedisClientPool();
|
||||
@@ -425,8 +420,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
SecureStorageClient secureStorageClient = new SecureStorageClient(storageCredentialsGenerator, storageServiceExecutor, config.getSecureStorageServiceConfiguration());
|
||||
ClientPresenceManager clientPresenceManager = new ClientPresenceManager(clientPresenceCluster, recurringJobExecutor, keyspaceNotificationDispatchExecutor);
|
||||
DirectoryQueue directoryQueue = new DirectoryQueue(config.getDirectoryConfiguration().getSqsConfiguration());
|
||||
PendingAccountsManager pendingAccountsManager = new PendingAccountsManager(pendingAccounts, pendingAccountsDynamoDb, dynamicConfigurationManager);
|
||||
PendingDevicesManager pendingDevicesManager = new PendingDevicesManager(pendingDevices, pendingDevicesDynamoDb, dynamicConfigurationManager);
|
||||
StoredVerificationCodeManager pendingAccountsManager = new StoredVerificationCodeManager(pendingAccounts);
|
||||
StoredVerificationCodeManager pendingDevicesManager = new StoredVerificationCodeManager(pendingDevices);
|
||||
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
|
||||
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
|
||||
MessagesCache messagesCache = new MessagesCache(messagesCluster, messagesCluster, keyspaceNotificationDispatchExecutor);
|
||||
|
||||
@@ -51,14 +51,6 @@ public class DynamicConfiguration {
|
||||
@Valid
|
||||
private DynamicRateLimitChallengeConfiguration rateLimitChallenge = new DynamicRateLimitChallengeConfiguration();
|
||||
|
||||
@JsonProperty
|
||||
@Valid
|
||||
private DynamicVerificationCodeStoreMigrationConfiguration pendingAccountsMigration = new DynamicVerificationCodeStoreMigrationConfiguration();
|
||||
|
||||
@JsonProperty
|
||||
@Valid
|
||||
private DynamicVerificationCodeStoreMigrationConfiguration pendingDevicesMigration = new DynamicVerificationCodeStoreMigrationConfiguration();
|
||||
|
||||
public Optional<DynamicExperimentEnrollmentConfiguration> getExperimentEnrollmentConfiguration(
|
||||
final String experimentName) {
|
||||
return Optional.ofNullable(experiments.get(experimentName));
|
||||
@@ -109,12 +101,4 @@ public class DynamicConfiguration {
|
||||
public DynamicRateLimitChallengeConfiguration getRateLimitChallengeConfiguration() {
|
||||
return rateLimitChallenge;
|
||||
}
|
||||
|
||||
public DynamicVerificationCodeStoreMigrationConfiguration getPendingAccountsMigrationConfiguration() {
|
||||
return pendingAccountsMigration;
|
||||
}
|
||||
|
||||
public DynamicVerificationCodeStoreMigrationConfiguration getPendingDevicesMigrationConfiguration() {
|
||||
return pendingDevicesMigration;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,51 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013-2021 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.configuration.dynamic;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import javax.validation.constraints.NotNull;
|
||||
|
||||
public class DynamicVerificationCodeStoreMigrationConfiguration {
|
||||
|
||||
public enum WriteDestination {
|
||||
POSTGRES,
|
||||
DYNAMODB
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@NotNull
|
||||
private WriteDestination writeDestination = WriteDestination.POSTGRES;
|
||||
|
||||
@JsonProperty
|
||||
private boolean readPostgres = true;
|
||||
|
||||
@JsonProperty
|
||||
private boolean readDynamoDb = false;
|
||||
|
||||
public WriteDestination getWriteDestination() {
|
||||
return writeDestination;
|
||||
}
|
||||
|
||||
public void setWriteDestination(final WriteDestination writeDestination) {
|
||||
this.writeDestination = writeDestination;
|
||||
}
|
||||
|
||||
public boolean isReadPostgres() {
|
||||
return readPostgres;
|
||||
}
|
||||
|
||||
public void setReadPostgres(final boolean readPostgres) {
|
||||
this.readPostgres = readPostgres;
|
||||
}
|
||||
|
||||
public boolean isReadDynamoDb() {
|
||||
return readDynamoDb;
|
||||
}
|
||||
|
||||
public void setReadDynamoDb(final boolean readDynamoDb) {
|
||||
this.readDynamoDb = readDynamoDb;
|
||||
}
|
||||
}
|
||||
@@ -76,7 +76,7 @@ import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
|
||||
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||
import org.whispersystems.textsecuregcm.storage.PendingAccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.StoredVerificationCodeManager;
|
||||
import org.whispersystems.textsecuregcm.storage.UsernamesManager;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
import org.whispersystems.textsecuregcm.util.ForwardedIpUtil;
|
||||
@@ -112,7 +112,7 @@ public class AccountController {
|
||||
|
||||
private static final String VERIFY_EXPERIMENT_TAG_NAME = "twilioVerify";
|
||||
|
||||
private final PendingAccountsManager pendingAccounts;
|
||||
private final StoredVerificationCodeManager pendingAccounts;
|
||||
private final AccountsManager accounts;
|
||||
private final UsernamesManager usernames;
|
||||
private final AbusiveHostRules abusiveHostRules;
|
||||
@@ -130,7 +130,7 @@ public class AccountController {
|
||||
|
||||
private final TwilioVerifyExperimentEnrollmentManager verifyExperimentEnrollmentManager;
|
||||
|
||||
public AccountController(PendingAccountsManager pendingAccounts,
|
||||
public AccountController(StoredVerificationCodeManager pendingAccounts,
|
||||
AccountsManager accounts,
|
||||
UsernamesManager usernames,
|
||||
AbusiveHostRules abusiveHostRules,
|
||||
|
||||
@@ -24,7 +24,7 @@ import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.storage.Device.DeviceCapabilities;
|
||||
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||
import org.whispersystems.textsecuregcm.storage.PendingDevicesManager;
|
||||
import org.whispersystems.textsecuregcm.storage.StoredVerificationCodeManager;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
import org.whispersystems.textsecuregcm.util.VerificationCode;
|
||||
import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException;
|
||||
@@ -55,14 +55,14 @@ public class DeviceController {
|
||||
|
||||
private static final int MAX_DEVICES = 6;
|
||||
|
||||
private final PendingDevicesManager pendingDevices;
|
||||
private final StoredVerificationCodeManager pendingDevices;
|
||||
private final AccountsManager accounts;
|
||||
private final MessagesManager messages;
|
||||
private final RateLimiters rateLimiters;
|
||||
private final Map<String, Integer> maxDeviceConfiguration;
|
||||
private final DirectoryQueue directoryQueue;
|
||||
|
||||
public DeviceController(PendingDevicesManager pendingDevices,
|
||||
public DeviceController(StoredVerificationCodeManager pendingDevices,
|
||||
AccountsManager accounts,
|
||||
MessagesManager messages,
|
||||
DirectoryQueue directoryQueue,
|
||||
|
||||
@@ -1,84 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013-2020 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import java.util.Optional;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.whispersystems.textsecuregcm.auth.StoredVerificationCode;
|
||||
import org.whispersystems.textsecuregcm.storage.mappers.StoredVerificationCodeRowMapper;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
|
||||
public class PendingAccounts implements VerificationCodeStore {
|
||||
|
||||
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
private final Timer insertTimer = metricRegistry.timer(name(PendingAccounts.class, "insert" ));
|
||||
private final Timer getCodeForNumberTimer = metricRegistry.timer(name(PendingAccounts.class, "getCodeForNumber"));
|
||||
private final Timer removeTimer = metricRegistry.timer(name(PendingAccounts.class, "remove" ));
|
||||
private final Timer vacuumTimer = metricRegistry.timer(name(PendingAccounts.class, "vacuum" ));
|
||||
|
||||
private final FaultTolerantDatabase database;
|
||||
|
||||
public PendingAccounts(FaultTolerantDatabase database) {
|
||||
this.database = database;
|
||||
this.database.getDatabase().registerRowMapper(new StoredVerificationCodeRowMapper());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void insert(final String number, final StoredVerificationCode storedVerificationCode) {
|
||||
database.use(jdbi -> jdbi.useHandle(handle -> {
|
||||
try (Timer.Context ignored = insertTimer.time()) {
|
||||
handle.createUpdate("INSERT INTO pending_accounts (number, verification_code, timestamp, push_code, twilio_verification_sid) " +
|
||||
"VALUES (:number, :verification_code, :timestamp, :push_code, :twilio_verification_sid) " +
|
||||
"ON CONFLICT(number) DO UPDATE " +
|
||||
"SET verification_code = EXCLUDED.verification_code, timestamp = EXCLUDED.timestamp, push_code = EXCLUDED.push_code, twilio_verification_sid = EXCLUDED.twilio_verification_sid")
|
||||
.bind("verification_code", storedVerificationCode.getCode())
|
||||
.bind("timestamp", storedVerificationCode.getTimestamp())
|
||||
.bind("number", number)
|
||||
.bind("push_code", storedVerificationCode.getPushCode())
|
||||
.bind("twilio_verification_sid", storedVerificationCode.getTwilioVerificationSid().orElse(null))
|
||||
.execute();
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<StoredVerificationCode> findForNumber(String number) {
|
||||
return database.with(jdbi ->jdbi.withHandle(handle -> {
|
||||
try (Timer.Context ignored = getCodeForNumberTimer.time()) {
|
||||
return handle.createQuery("SELECT verification_code, timestamp, push_code, twilio_verification_sid FROM pending_accounts WHERE number = :number")
|
||||
.bind("number", number)
|
||||
.mapTo(StoredVerificationCode.class)
|
||||
.findFirst();
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove(String number) {
|
||||
database.use(jdbi-> jdbi.useHandle(handle -> {
|
||||
try (Timer.Context ignored = removeTimer.time()) {
|
||||
handle.createUpdate("DELETE FROM pending_accounts WHERE number = :number")
|
||||
.bind("number", number)
|
||||
.execute();
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
public void vacuum() {
|
||||
database.use(jdbi -> jdbi.useHandle(handle -> {
|
||||
try (Timer.Context ignored = vacuumTimer.time()) {
|
||||
handle.execute("VACUUM pending_accounts");
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
@@ -1,55 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013-2020 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import java.util.Optional;
|
||||
import org.whispersystems.textsecuregcm.auth.StoredVerificationCode;
|
||||
|
||||
public class PendingAccountsManager {
|
||||
|
||||
private final PendingAccounts pendingAccounts;
|
||||
private final VerificationCodeStoreDynamoDb pendingAccountsDynamoDb;
|
||||
private final DynamicConfigurationManager dynamicConfigurationManager;
|
||||
|
||||
public PendingAccountsManager(
|
||||
final PendingAccounts pendingAccounts,
|
||||
final VerificationCodeStoreDynamoDb pendingAccountsDynamoDb,
|
||||
final DynamicConfigurationManager dynamicConfigurationManager) {
|
||||
|
||||
this.pendingAccounts = pendingAccounts;
|
||||
this.pendingAccountsDynamoDb = pendingAccountsDynamoDb;
|
||||
this.dynamicConfigurationManager = dynamicConfigurationManager;
|
||||
}
|
||||
|
||||
public void store(String number, StoredVerificationCode code) {
|
||||
switch (dynamicConfigurationManager.getConfiguration().getPendingAccountsMigrationConfiguration().getWriteDestination()) {
|
||||
|
||||
case POSTGRES:
|
||||
pendingAccounts.insert(number, code);
|
||||
break;
|
||||
|
||||
case DYNAMODB:
|
||||
pendingAccountsDynamoDb.insert(number, code);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
public void remove(String number) {
|
||||
pendingAccounts.remove(number);
|
||||
pendingAccountsDynamoDb.remove(number);
|
||||
}
|
||||
|
||||
public Optional<StoredVerificationCode> getCodeForNumber(String number) {
|
||||
final Optional<StoredVerificationCode> maybeCodeFromPostgres =
|
||||
dynamicConfigurationManager.getConfiguration().getPendingAccountsMigrationConfiguration().isReadPostgres()
|
||||
? pendingAccounts.findForNumber(number)
|
||||
: Optional.empty();
|
||||
|
||||
return maybeCodeFromPostgres.or(
|
||||
() -> dynamicConfigurationManager.getConfiguration().getPendingAccountsMigrationConfiguration().isReadDynamoDb()
|
||||
? pendingAccountsDynamoDb.findForNumber(number)
|
||||
: Optional.empty());
|
||||
}
|
||||
}
|
||||
@@ -1,68 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013-2020 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import java.util.Optional;
|
||||
import org.whispersystems.textsecuregcm.auth.StoredVerificationCode;
|
||||
import org.whispersystems.textsecuregcm.storage.mappers.StoredVerificationCodeRowMapper;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
|
||||
public class PendingDevices implements VerificationCodeStore {
|
||||
|
||||
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
private final Timer insertTimer = metricRegistry.timer(name(PendingDevices.class, "insert" ));
|
||||
private final Timer getCodeForNumberTimer = metricRegistry.timer(name(PendingDevices.class, "getcodeForNumber"));
|
||||
private final Timer removeTimer = metricRegistry.timer(name(PendingDevices.class, "remove" ));
|
||||
|
||||
private final FaultTolerantDatabase database;
|
||||
|
||||
public PendingDevices(FaultTolerantDatabase database) {
|
||||
this.database = database;
|
||||
this.database.getDatabase().registerRowMapper(new StoredVerificationCodeRowMapper());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void insert(final String number, final StoredVerificationCode storedVerificationCode) {
|
||||
database.use(jdbi ->jdbi.useHandle(handle -> {
|
||||
try (Timer.Context timer = insertTimer.time()) {
|
||||
handle.createUpdate("WITH upsert AS (UPDATE pending_devices SET verification_code = :verification_code, timestamp = :timestamp WHERE number = :number RETURNING *) " +
|
||||
"INSERT INTO pending_devices (number, verification_code, timestamp) SELECT :number, :verification_code, :timestamp WHERE NOT EXISTS (SELECT * FROM upsert)")
|
||||
.bind("number", number)
|
||||
.bind("verification_code", storedVerificationCode.getCode())
|
||||
.bind("timestamp", storedVerificationCode.getTimestamp())
|
||||
.execute();
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<StoredVerificationCode> findForNumber(String number) {
|
||||
return database.with(jdbi -> jdbi.withHandle(handle -> {
|
||||
try (Timer.Context timer = getCodeForNumberTimer.time()) {
|
||||
return handle.createQuery("SELECT verification_code, timestamp, NULL as push_code, NULL as twilio_verification_sid FROM pending_devices WHERE number = :number")
|
||||
.bind("number", number)
|
||||
.mapTo(StoredVerificationCode.class)
|
||||
.findFirst();
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove(String number) {
|
||||
database.use(jdbi -> jdbi.useHandle(handle -> {
|
||||
try (Timer.Context timer = removeTimer.time()) {
|
||||
handle.createUpdate("DELETE FROM pending_devices WHERE number = :number")
|
||||
.bind("number", number)
|
||||
.execute();
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,54 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013-2020 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import java.util.Optional;
|
||||
import org.whispersystems.textsecuregcm.auth.StoredVerificationCode;
|
||||
|
||||
public class PendingDevicesManager {
|
||||
|
||||
private final PendingDevices pendingDevices;
|
||||
private final VerificationCodeStoreDynamoDb pendingDevicesDynamoDb;
|
||||
private final DynamicConfigurationManager dynamicConfigurationManager;
|
||||
|
||||
public PendingDevicesManager(
|
||||
final PendingDevices pendingDevices,
|
||||
final VerificationCodeStoreDynamoDb pendingDevicesDynamoDb,
|
||||
final DynamicConfigurationManager dynamicConfigurationManager) {
|
||||
|
||||
this.pendingDevices = pendingDevices;
|
||||
this.pendingDevicesDynamoDb = pendingDevicesDynamoDb;
|
||||
this.dynamicConfigurationManager = dynamicConfigurationManager;
|
||||
}
|
||||
|
||||
public void store(String number, StoredVerificationCode code) {
|
||||
switch (dynamicConfigurationManager.getConfiguration().getPendingDevicesMigrationConfiguration().getWriteDestination()) {
|
||||
case POSTGRES:
|
||||
pendingDevices.insert(number, code);
|
||||
break;
|
||||
|
||||
case DYNAMODB:
|
||||
pendingDevicesDynamoDb.insert(number, code);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
public void remove(String number) {
|
||||
pendingDevices.remove(number);
|
||||
pendingDevicesDynamoDb.remove(number);
|
||||
}
|
||||
|
||||
public Optional<StoredVerificationCode> getCodeForNumber(String number) {
|
||||
final Optional<StoredVerificationCode> maybeCodeFromPostgres =
|
||||
dynamicConfigurationManager.getConfiguration().getPendingDevicesMigrationConfiguration().isReadPostgres()
|
||||
? pendingDevices.findForNumber(number)
|
||||
: Optional.empty();
|
||||
|
||||
return maybeCodeFromPostgres.or(
|
||||
() -> dynamicConfigurationManager.getConfiguration().getPendingDevicesMigrationConfiguration().isReadDynamoDb()
|
||||
? pendingDevicesDynamoDb.findForNumber(number)
|
||||
: Optional.empty());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
/*
|
||||
* Copyright 2013-2020 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import java.util.Optional;
|
||||
import org.whispersystems.textsecuregcm.auth.StoredVerificationCode;
|
||||
|
||||
public class StoredVerificationCodeManager {
|
||||
|
||||
private final VerificationCodeStore verificationCodeStore;
|
||||
|
||||
public StoredVerificationCodeManager(final VerificationCodeStore verificationCodeStore) {
|
||||
this.verificationCodeStore = verificationCodeStore;
|
||||
}
|
||||
|
||||
public void store(String number, StoredVerificationCode code) {
|
||||
verificationCodeStore.insert(number, code);
|
||||
}
|
||||
|
||||
public void remove(String number) {
|
||||
verificationCodeStore.remove(number);
|
||||
}
|
||||
|
||||
public Optional<StoredVerificationCode> getCodeForNumber(String number) {
|
||||
return verificationCodeStore.findForNumber(number);
|
||||
}
|
||||
}
|
||||
@@ -5,14 +5,99 @@
|
||||
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import io.micrometer.core.instrument.Timer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.auth.StoredVerificationCode;
|
||||
import org.whispersystems.textsecuregcm.util.AttributeValues;
|
||||
import org.whispersystems.textsecuregcm.util.SystemMapper;
|
||||
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
|
||||
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
|
||||
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
public interface VerificationCodeStore {
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
void insert(String number, StoredVerificationCode verificationCode);
|
||||
public class VerificationCodeStore {
|
||||
|
||||
Optional<StoredVerificationCode> findForNumber(String number);
|
||||
private final DynamoDbClient dynamoDbClient;
|
||||
private final String tableName;
|
||||
|
||||
void remove(String number);
|
||||
private final Timer insertTimer;
|
||||
private final Timer getTimer;
|
||||
private final Timer removeTimer;
|
||||
|
||||
@VisibleForTesting
|
||||
static final String KEY_E164 = "P";
|
||||
|
||||
private static final String ATTR_STORED_CODE = "C";
|
||||
private static final String ATTR_TTL = "E";
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(VerificationCodeStore.class);
|
||||
|
||||
public VerificationCodeStore(final DynamoDbClient dynamoDbClient, final String tableName) {
|
||||
this.dynamoDbClient = dynamoDbClient;
|
||||
this.tableName = tableName;
|
||||
|
||||
this.insertTimer = Metrics.timer(name(getClass(), "insert"), "table", tableName);
|
||||
this.getTimer = Metrics.timer(name(getClass(), "get"), "table", tableName);
|
||||
this.removeTimer = Metrics.timer(name(getClass(), "remove"), "table", tableName);
|
||||
}
|
||||
|
||||
public void insert(final String number, final StoredVerificationCode verificationCode) {
|
||||
insertTimer.record(() -> {
|
||||
try {
|
||||
dynamoDbClient.putItem(PutItemRequest.builder()
|
||||
.tableName(tableName)
|
||||
.item(Map.of(
|
||||
KEY_E164, AttributeValues.fromString(number),
|
||||
ATTR_STORED_CODE, AttributeValues.fromString(SystemMapper.getMapper().writeValueAsString(verificationCode)),
|
||||
ATTR_TTL, AttributeValues.fromLong(getExpirationTimestamp(verificationCode))))
|
||||
.build());
|
||||
} catch (final JsonProcessingException e) {
|
||||
// This should never happen when writing directly to a string except in cases of serious misconfiguration, which
|
||||
// would be caught by tests.
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private long getExpirationTimestamp(final StoredVerificationCode storedVerificationCode) {
|
||||
return Instant.ofEpochMilli(storedVerificationCode.getTimestamp()).plus(StoredVerificationCode.EXPIRATION).getEpochSecond();
|
||||
}
|
||||
|
||||
public Optional<StoredVerificationCode> findForNumber(final String number) {
|
||||
return getTimer.record(() -> {
|
||||
final GetItemResponse response = dynamoDbClient.getItem(GetItemRequest.builder()
|
||||
.tableName(tableName)
|
||||
.consistentRead(true)
|
||||
.key(Map.of(KEY_E164, AttributeValues.fromString(number)))
|
||||
.build());
|
||||
|
||||
try {
|
||||
return response.hasItem()
|
||||
? Optional.of(SystemMapper.getMapper().readValue(response.item().get(ATTR_STORED_CODE).s(), StoredVerificationCode.class))
|
||||
: Optional.empty();
|
||||
} catch (final JsonProcessingException e) {
|
||||
log.error("Failed to parse stored verification code", e);
|
||||
return Optional.empty();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void remove(final String number) {
|
||||
removeTimer.record(() -> {
|
||||
dynamoDbClient.deleteItem(DeleteItemRequest.builder()
|
||||
.tableName(tableName)
|
||||
.key(Map.of(KEY_E164, AttributeValues.fromString(number)))
|
||||
.build());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,106 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013-2021 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import io.micrometer.core.instrument.Timer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.auth.StoredVerificationCode;
|
||||
import org.whispersystems.textsecuregcm.util.AttributeValues;
|
||||
import org.whispersystems.textsecuregcm.util.SystemMapper;
|
||||
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
|
||||
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
|
||||
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
public class VerificationCodeStoreDynamoDb implements VerificationCodeStore {
|
||||
|
||||
private final DynamoDbClient dynamoDbClient;
|
||||
private final String tableName;
|
||||
|
||||
private final Timer insertTimer;
|
||||
private final Timer getTimer;
|
||||
private final Timer removeTimer;
|
||||
|
||||
@VisibleForTesting
|
||||
static final String KEY_E164 = "P";
|
||||
|
||||
private static final String ATTR_STORED_CODE = "C";
|
||||
private static final String ATTR_TTL = "E";
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(VerificationCodeStoreDynamoDb.class);
|
||||
|
||||
public VerificationCodeStoreDynamoDb(final DynamoDbClient dynamoDbClient, final String tableName) {
|
||||
this.dynamoDbClient = dynamoDbClient;
|
||||
this.tableName = tableName;
|
||||
|
||||
this.insertTimer = Metrics.timer(name(getClass(), "insert"), "table", tableName);
|
||||
this.getTimer = Metrics.timer(name(getClass(), "get"), "table", tableName);
|
||||
this.removeTimer = Metrics.timer(name(getClass(), "remove"), "table", tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void insert(final String number, final StoredVerificationCode verificationCode) {
|
||||
insertTimer.record(() -> {
|
||||
try {
|
||||
dynamoDbClient.putItem(PutItemRequest.builder()
|
||||
.tableName(tableName)
|
||||
.item(Map.of(
|
||||
KEY_E164, AttributeValues.fromString(number),
|
||||
ATTR_STORED_CODE, AttributeValues.fromString(SystemMapper.getMapper().writeValueAsString(verificationCode)),
|
||||
ATTR_TTL, AttributeValues.fromLong(getExpirationTimestamp(verificationCode))))
|
||||
.build());
|
||||
} catch (final JsonProcessingException e) {
|
||||
// This should never happen when writing directly to a string except in cases of serious misconfiguration, which
|
||||
// would be caught by tests.
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private long getExpirationTimestamp(final StoredVerificationCode storedVerificationCode) {
|
||||
return Instant.ofEpochMilli(storedVerificationCode.getTimestamp()).plus(StoredVerificationCode.EXPIRATION).getEpochSecond();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<StoredVerificationCode> findForNumber(final String number) {
|
||||
return getTimer.record(() -> {
|
||||
final GetItemResponse response = dynamoDbClient.getItem(GetItemRequest.builder()
|
||||
.tableName(tableName)
|
||||
.consistentRead(true)
|
||||
.key(Map.of(KEY_E164, AttributeValues.fromString(number)))
|
||||
.build());
|
||||
|
||||
try {
|
||||
return response.hasItem()
|
||||
? Optional.of(SystemMapper.getMapper().readValue(response.item().get(ATTR_STORED_CODE).s(), StoredVerificationCode.class))
|
||||
: Optional.empty();
|
||||
} catch (final JsonProcessingException e) {
|
||||
log.error("Failed to parse stored verification code", e);
|
||||
return Optional.empty();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove(final String number) {
|
||||
removeTimer.record(() -> {
|
||||
dynamoDbClient.deleteItem(DeleteItemRequest.builder()
|
||||
.tableName(tableName)
|
||||
.key(Map.of(KEY_E164, AttributeValues.fromString(number)))
|
||||
.build());
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -1,24 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013-2020 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.storage.mappers;
|
||||
|
||||
import org.jdbi.v3.core.mapper.RowMapper;
|
||||
import org.jdbi.v3.core.statement.StatementContext;
|
||||
import org.whispersystems.textsecuregcm.auth.StoredVerificationCode;
|
||||
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
|
||||
public class StoredVerificationCodeRowMapper implements RowMapper<StoredVerificationCode> {
|
||||
|
||||
@Override
|
||||
public StoredVerificationCode map(ResultSet resultSet, StatementContext ctx) throws SQLException {
|
||||
return new StoredVerificationCode(resultSet.getString("verification_code"),
|
||||
resultSet.getLong("timestamp"),
|
||||
resultSet.getString("push_code"),
|
||||
resultSet.getString("twilio_verification_sid"));
|
||||
}
|
||||
}
|
||||
@@ -13,7 +13,6 @@ import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.DatabaseConfiguration;
|
||||
import org.whispersystems.textsecuregcm.storage.Accounts;
|
||||
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
|
||||
import org.whispersystems.textsecuregcm.storage.PendingAccounts;
|
||||
|
||||
import io.dropwizard.cli.ConfiguredCommand;
|
||||
import io.dropwizard.setup.Bootstrap;
|
||||
@@ -38,14 +37,10 @@ public class VacuumCommand extends ConfiguredCommand<WhisperServerConfiguration>
|
||||
FaultTolerantDatabase accountDatabase = new FaultTolerantDatabase("account_database_vacuum", accountJdbi, accountDbConfig.getCircuitBreakerConfiguration());
|
||||
|
||||
Accounts accounts = new Accounts(accountDatabase);
|
||||
PendingAccounts pendingAccounts = new PendingAccounts(accountDatabase);
|
||||
|
||||
logger.info("Vacuuming accounts...");
|
||||
accounts.vacuum();
|
||||
|
||||
logger.info("Vacuuming pending_accounts...");
|
||||
pendingAccounts.vacuum();
|
||||
|
||||
Thread.sleep(3000);
|
||||
System.exit(0);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user