Remove Subscriptions.PCI attribute

This commit is contained in:
Chris Eager
2023-01-03 15:26:19 -06:00
committed by Chris Eager
parent 010f88a2ad
commit b02b00818b
4 changed files with 38 additions and 156 deletions

View File

@@ -310,7 +310,7 @@ public class SubscriptionController {
.map(ignored -> CompletableFuture.completedFuture(record))
.orElseGet(() -> subscriptionProcessorManager.createCustomer(requestData.subscriberUser)
.thenApply(ProcessorCustomer::customerId)
.thenCompose(customerId -> subscriptionManager.updateProcessorAndCustomerId(record,
.thenCompose(customerId -> subscriptionManager.setProcessorAndCustomerId(record,
new ProcessorCustomer(customerId, subscriptionProcessorManager.getProcessor()),
Instant.now())));

View File

@@ -15,8 +15,6 @@ import com.google.common.base.Throwables;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@@ -24,6 +22,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.core.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.subscriptions.ProcessorCustomer;
@@ -48,6 +48,7 @@ public class SubscriptionManager {
public static final String KEY_PASSWORD = "P"; // B
public static final String KEY_PROCESSOR_ID_CUSTOMER_ID = "PC"; // B (GSI Hash Key of `pc_to_u` index)
public static final String KEY_CREATED_AT = "R"; // N
@Deprecated
public static final String KEY_PROCESSOR_CUSTOMER_IDS_MAP = "PCI"; // M
public static final String KEY_SUBSCRIPTION_ID = "S"; // S
public static final String KEY_SUBSCRIPTION_CREATED_AT = "T"; // N
@@ -57,7 +58,7 @@ public class SubscriptionManager {
public static final String KEY_CANCELED_AT = "B"; // N
public static final String KEY_CURRENT_PERIOD_ENDS_AT = "D"; // N
public static final String INDEX_NAME = "pc_to_u"; // Hash Key "C"
public static final String INDEX_NAME = "pc_to_u"; // Hash Key "PC"
public static class Record {
@@ -67,7 +68,6 @@ public class SubscriptionManager {
@VisibleForTesting
@Nullable
ProcessorCustomer processorCustomer;
public Map<SubscriptionProcessor, String> processorsToCustomerIds;
public String subscriptionId;
public Instant subscriptionCreatedAt;
public Long subscriptionLevel;
@@ -92,7 +92,6 @@ public class SubscriptionManager {
if (processorCustomerId != null) {
record.processorCustomer = new ProcessorCustomer(processorCustomerId.second(), processorCustomerId.first());
}
record.processorsToCustomerIds = getProcessorsToCustomerIds(item);
record.subscriptionId = getString(item, KEY_SUBSCRIPTION_ID);
record.subscriptionCreatedAt = getInstant(item, KEY_SUBSCRIPTION_CREATED_AT);
record.subscriptionLevel = getLong(item, KEY_SUBSCRIPTION_LEVEL);
@@ -107,18 +106,6 @@ public class SubscriptionManager {
return Optional.ofNullable(processorCustomer);
}
private static Map<SubscriptionProcessor, String> getProcessorsToCustomerIds(Map<String, AttributeValue> item) {
final AttributeValue attributeValue = item.get(KEY_PROCESSOR_CUSTOMER_IDS_MAP);
final Map<String, AttributeValue> attribute =
attributeValue == null ? Collections.emptyMap() : attributeValue.m();
final Map<SubscriptionProcessor, String> processorsToCustomerIds = new HashMap<>();
attribute.forEach((processorName, customerId) ->
processorsToCustomerIds.put(SubscriptionProcessor.valueOf(processorName), customerId.s()));
return processorsToCustomerIds;
}
/**
* Extracts the active processor and customer from a single attribute value in the given item.
* <p>
@@ -308,37 +295,28 @@ public class SubscriptionManager {
}
/**
* Updates the active processor and customer ID for the given user record.
* Sets the processor and customer ID for the given user record.
*
* @return the updated user record.
* @return the user record.
*/
public CompletableFuture<Record> updateProcessorAndCustomerId(Record userRecord,
public CompletableFuture<Record> setProcessorAndCustomerId(Record userRecord,
ProcessorCustomer activeProcessorCustomer, Instant updatedAt) {
UpdateItemRequest request = UpdateItemRequest.builder()
.tableName(table)
.key(Map.of(KEY_USER, b(userRecord.user)))
.returnValues(ReturnValue.ALL_NEW)
.conditionExpression(
// there is no active processor+customer attribute
"attribute_not_exists(#processor_customer_id) " +
// or an attribute in the map with an inactive processor+customer
"AND attribute_not_exists(#processors_to_customer_ids.#processor_name)"
)
.conditionExpression("attribute_not_exists(#processor_customer_id)")
.updateExpression("SET "
+ "#processor_customer_id = :processor_customer_id, "
+ "#processors_to_customer_ids.#processor_name = :customer_id, "
+ "#accessed_at = :accessed_at"
)
.expressionAttributeNames(Map.of(
"#accessed_at", KEY_ACCESSED_AT,
"#processor_customer_id", KEY_PROCESSOR_ID_CUSTOMER_ID,
"#processor_name", activeProcessorCustomer.processor().name(),
"#processors_to_customer_ids", KEY_PROCESSOR_CUSTOMER_IDS_MAP
"#processor_customer_id", KEY_PROCESSOR_ID_CUSTOMER_ID
))
.expressionAttributeValues(Map.of(
":accessed_at", n(updatedAt.getEpochSecond()),
":customer_id", s(activeProcessorCustomer.customerId()),
":processor_customer_id", b(activeProcessorCustomer.toDynamoBytes())
)).build();
@@ -346,8 +324,7 @@ public class SubscriptionManager {
.thenApply(updateItemResponse -> Record.from(userRecord.user, updateItemResponse.attributes()))
.exceptionallyCompose(throwable -> {
if (Throwables.getRootCause(throwable) instanceof ConditionalCheckFailedException) {
return getUser(userRecord.user).thenApply(getItemResponse ->
Record.from(userRecord.user, getItemResponse.item()));
throw new ClientErrorException(Response.Status.CONFLICT);
}
Throwables.throwIfUnchecked(throwable);
throw new CompletionException(throwable);