mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-19 13:18:02 +01:00
Retire creation timestamp from device endpoints
This commit is contained in:
@@ -115,9 +115,6 @@ public class DeviceController {
|
||||
private static final String WAIT_FOR_TRANSFER_ARCHIVE_TIMER_NAME =
|
||||
MetricsUtil.name(DeviceController.class, "waitForTransferArchiveDuration");
|
||||
|
||||
private static final String RECORD_TRANSFER_ARCHIVE_UPLOADED_COUNTER_NAME = MetricsUtil.name(DeviceController.class, "recordTransferArchiveUploaded");
|
||||
private static final String HAS_REGISTRATION_ID_TAG_NAME = "hasRegistrationId";
|
||||
|
||||
@VisibleForTesting
|
||||
static final int MIN_TOKEN_IDENTIFIER_LENGTH = 32;
|
||||
|
||||
@@ -537,14 +534,7 @@ public class DeviceController {
|
||||
@ApiResponse(responseCode = "422", description = "The request object could not be parsed or was otherwise invalid")
|
||||
@ApiResponse(responseCode = "429", description = "Rate-limited; try again after the prescribed delay")
|
||||
public CompletionStage<Void> recordTransferArchiveUploaded(@Auth final AuthenticatedDevice authenticatedDevice,
|
||||
@NotNull @Valid final TransferArchiveUploadedRequest transferArchiveUploadedRequest,
|
||||
@HeaderParam(HttpHeaders.USER_AGENT) @Nullable String userAgent) {
|
||||
Metrics.counter(RECORD_TRANSFER_ARCHIVE_UPLOADED_COUNTER_NAME, Tags.of(
|
||||
UserAgentTagUtil.getPlatformTag(userAgent),
|
||||
io.micrometer.core.instrument.Tag.of(
|
||||
HAS_REGISTRATION_ID_TAG_NAME,
|
||||
String.valueOf(transferArchiveUploadedRequest.destinationDeviceRegistrationId().isPresent()))
|
||||
)).increment();
|
||||
@NotNull @Valid final TransferArchiveUploadedRequest transferArchiveUploadedRequest) {
|
||||
return rateLimiters.getUploadTransferArchiveLimiter()
|
||||
.validateAsync(authenticatedDevice.accountIdentifier())
|
||||
.thenCompose(ignored -> accounts.getByAccountIdentifierAsync(authenticatedDevice.accountIdentifier()))
|
||||
@@ -554,7 +544,6 @@ public class DeviceController {
|
||||
|
||||
return accounts.recordTransferArchiveUpload(account,
|
||||
transferArchiveUploadedRequest.destinationDeviceId(),
|
||||
transferArchiveUploadedRequest.destinationDeviceCreated().map(Instant::ofEpochMilli),
|
||||
transferArchiveUploadedRequest.destinationDeviceRegistrationId(),
|
||||
transferArchiveUploadedRequest.transferArchive());
|
||||
});
|
||||
|
||||
@@ -21,13 +21,6 @@ public record DeviceInfo(long id,
|
||||
|
||||
long lastSeen,
|
||||
|
||||
@Deprecated
|
||||
@Schema(description = """
|
||||
The time in milliseconds since epoch when the device was linked.
|
||||
Deprecated in favor of `createdAtCiphertext`.
|
||||
""", deprecated = true)
|
||||
long created,
|
||||
|
||||
@Schema(description = "The registration ID of the given device.")
|
||||
int registrationId,
|
||||
|
||||
@@ -40,7 +33,7 @@ public record DeviceInfo(long id,
|
||||
byte[] createdAtCiphertext) {
|
||||
|
||||
public static DeviceInfo forDevice(final Device device) {
|
||||
return new DeviceInfo(device.getId(), device.getName(), device.getLastSeen(), device.getCreated(), device.getRegistrationId(
|
||||
return new DeviceInfo(device.getId(), device.getName(), device.getLastSeen(), device.getRegistrationId(
|
||||
IdentityType.ACI), device.getCreatedAtCiphertext());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,15 +22,8 @@ public record TransferArchiveUploadedRequest(
|
||||
@Schema(description = "The ID of the device for which the transfer archive has been prepared")
|
||||
byte destinationDeviceId,
|
||||
|
||||
@Schema(description = """
|
||||
The timestamp, in milliseconds since the epoch, at which the destination device was created.
|
||||
Deprecated in favor of `destinationDeviceRegistrationId`.
|
||||
""", deprecated = true)
|
||||
@Deprecated
|
||||
Optional<@Positive Long> destinationDeviceCreated,
|
||||
|
||||
@Schema(description = "The registration ID of the destination device")
|
||||
Optional<@Min(0) @Max(Device.MAX_REGISTRATION_ID) Integer> destinationDeviceRegistrationId,
|
||||
@Min(0) @Max(Device.MAX_REGISTRATION_ID) int destinationDeviceRegistrationId,
|
||||
|
||||
@NotNull
|
||||
@Valid
|
||||
@@ -39,9 +32,4 @@ public record TransferArchiveUploadedRequest(
|
||||
the upload has failed and the destination device should stop waiting
|
||||
""", oneOf = {RemoteAttachment.class, RemoteAttachmentError.class})
|
||||
TransferArchiveResult transferArchive) {
|
||||
@AssertTrue
|
||||
@Schema(hidden = true)
|
||||
public boolean isExactlyOneDisambiguatorProvided() {
|
||||
return destinationDeviceCreated.isPresent() ^ destinationDeviceRegistrationId.isPresent();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,7 +37,6 @@ import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Base64;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -51,7 +50,6 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
@@ -115,7 +113,6 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
||||
private static final String DELETE_COUNTER_NAME = name(AccountsManager.class, "deleteCounter");
|
||||
private static final String COUNTRY_CODE_TAG_NAME = "country";
|
||||
private static final String DELETION_REASON_TAG_NAME = "reason";
|
||||
private static final String TIMESTAMP_BASED_TRANSFER_ARCHIVE_KEY_COUNTER_NAME = name(AccountsManager.class, "timestampRedisKeyCounter");
|
||||
private static final String REGISTRATION_ID_BASED_TRANSFER_ARCHIVE_KEY_COUNTER_NAME = name(AccountsManager.class,"registrationIdRedisKeyCounter");
|
||||
|
||||
private static final String RETRY_NAME = ResilienceUtil.name(AccountsManager.class);
|
||||
@@ -205,14 +202,8 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
||||
}
|
||||
}
|
||||
|
||||
private interface DeviceIdentifier {}
|
||||
|
||||
private record TimestampDeviceIdentifier(UUID accountIdentifier, byte deviceId, Instant deviceCreationTimestamp)
|
||||
implements DeviceIdentifier {
|
||||
}
|
||||
|
||||
private record RegistrationIdDeviceIdentifier(UUID accountIdentifier, byte deviceId,
|
||||
int registrationId) implements DeviceIdentifier {
|
||||
private record DeviceIdentifier(UUID accountIdentifier, byte deviceId,
|
||||
int registrationId) {
|
||||
}
|
||||
|
||||
public AccountsManager(final Accounts accounts,
|
||||
@@ -1469,21 +1460,14 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
||||
linkDeviceTokenIdentifier, getLinkedDeviceKey(linkDeviceTokenIdentifier), timeout, this::handleDeviceAdded);
|
||||
|
||||
return deviceAdded.thenCompose(maybeDeviceInfo -> maybeDeviceInfo.map(deviceInfo -> {
|
||||
// The device finished linking, we now want to make sure the client has fetched messages that could
|
||||
// have come in before the device's mailbox was set up.
|
||||
// The device finished linking, we now want to make sure the primary client has fetched messages that could
|
||||
// have come in before the linked device's mailbox was set up. This avoids a race where the linked device
|
||||
// misses out on messages that were sent before its mailbox was set up but received by the primary *after*
|
||||
// creating its backup for the linked device.
|
||||
|
||||
// A worst case estimate of the wall clock time at which the linked device was added to the account record
|
||||
Instant deviceLinked = Instant.ofEpochMilli(deviceInfo.created()).plus(MAX_SERVER_CLOCK_DRIFT);
|
||||
|
||||
Instant now = clock.instant();
|
||||
|
||||
// We know at `now` the device finished linking, so if we waited for all the messages before now it would be
|
||||
// sufficient. However, now might be much later that the device was linked, so we don't want to force
|
||||
// the client to wait for messages that are past our worst case estimate of when the device was linked
|
||||
Instant messageEpoch = Collections.min(List.of(deviceLinked, now));
|
||||
|
||||
// We assume that any message with a timestamp after the messageEpoch made it into the linked device's queues
|
||||
return waitForPreLinkMessagesToBeFetched(accountIdentifier, linkingDevice, deviceInfo, messageEpoch, deadline);
|
||||
// We know the device finished linking at the current time, so waiting for all messages
|
||||
// before now is sufficient.
|
||||
return waitForPreLinkMessagesToBeFetched(accountIdentifier, linkingDevice, deviceInfo, clock.instant(), deadline);
|
||||
})
|
||||
.orElseGet(() -> CompletableFuture.completedFuture(maybeDeviceInfo)));
|
||||
}
|
||||
@@ -1547,59 +1531,24 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
||||
}
|
||||
|
||||
public CompletableFuture<Optional<TransferArchiveResult>> waitForTransferArchive(final Account account, final Device device, final Duration timeout) {
|
||||
final DeviceIdentifier timestampDeviceIdentifier = new TimestampDeviceIdentifier(account.getIdentifier(IdentityType.ACI), device.getId(), Instant.ofEpochMilli(device.getCreated()));
|
||||
final String timestampTransferArchiveKey = getTimestampTransferArchiveKey(account.getIdentifier(IdentityType.ACI), device.getId(), Instant.ofEpochMilli(device.getCreated()));
|
||||
|
||||
final DeviceIdentifier registrationIdDeviceIdentifier = new RegistrationIdDeviceIdentifier(account.getIdentifier(IdentityType.ACI), device.getId(), device.getRegistrationId(IdentityType.ACI));
|
||||
final DeviceIdentifier deviceIdentifier = new DeviceIdentifier(account.getIdentifier(IdentityType.ACI), device.getId(), device.getRegistrationId(IdentityType.ACI));
|
||||
final String registrationIdTransferArchiveKey = getRegistrationIdTransferArchiveKey(account.getIdentifier(IdentityType.ACI), device.getId(), device.getRegistrationId(IdentityType.ACI));
|
||||
|
||||
final CompletableFuture<Optional<TransferArchiveResult>> timestampFuture = waitForPubSubKey(waitForTransferArchiveFuturesByDeviceIdentifier,
|
||||
timestampDeviceIdentifier,
|
||||
timestampTransferArchiveKey,
|
||||
timeout,
|
||||
this::handleTransferArchiveAdded);
|
||||
|
||||
final CompletableFuture<Optional<TransferArchiveResult>> registrationIdFuture = waitForPubSubKey(waitForTransferArchiveFuturesByDeviceIdentifier,
|
||||
registrationIdDeviceIdentifier,
|
||||
return waitForPubSubKey(waitForTransferArchiveFuturesByDeviceIdentifier,
|
||||
deviceIdentifier,
|
||||
registrationIdTransferArchiveKey,
|
||||
timeout,
|
||||
this::handleTransferArchiveAdded);
|
||||
return firstSuccessfulTransferArchiveFuture(List.of(timestampFuture, registrationIdFuture));
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static CompletableFuture<Optional<TransferArchiveResult>> firstSuccessfulTransferArchiveFuture(
|
||||
final List<CompletableFuture<Optional<TransferArchiveResult>>> futures) {
|
||||
final CompletableFuture<Optional<TransferArchiveResult>> result = new CompletableFuture<>();
|
||||
final AtomicInteger remaining = new AtomicInteger(futures.size());
|
||||
|
||||
for (CompletableFuture<Optional<TransferArchiveResult>> future : futures) {
|
||||
future.whenComplete((value, _) -> {
|
||||
if (value.isPresent()) {
|
||||
result.complete(value);
|
||||
} else if (remaining.decrementAndGet() == 0) {
|
||||
result.complete(Optional.empty());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> recordTransferArchiveUpload(final Account account,
|
||||
final byte destinationDeviceId,
|
||||
@SuppressWarnings("OptionalUsedAsFieldOrParameterType") final Optional<Instant> destinationDeviceCreationTimestamp,
|
||||
@SuppressWarnings("OptionalUsedAsFieldOrParameterType") final Optional<Integer> maybeRegistrationId,
|
||||
final int registrationId,
|
||||
final TransferArchiveResult transferArchiveResult) {
|
||||
try {
|
||||
final String transferArchiveJson = SystemMapper.jsonMapper().writeValueAsString(transferArchiveResult);
|
||||
|
||||
final String key = destinationDeviceCreationTimestamp
|
||||
.map(timestamp -> getTimestampTransferArchiveKey(account.getIdentifier(IdentityType.ACI), destinationDeviceId, timestamp))
|
||||
.orElseGet(() -> maybeRegistrationId
|
||||
.map(registrationId -> getRegistrationIdTransferArchiveKey(account.getIdentifier(IdentityType.ACI), destinationDeviceId, registrationId))
|
||||
// We validate the request object so this should never happen
|
||||
.orElseThrow(() -> new AssertionError("No creation timestamp or registration ID provided")));
|
||||
final String key = getRegistrationIdTransferArchiveKey(account.getIdentifier(IdentityType.ACI), destinationDeviceId, registrationId);
|
||||
|
||||
return ResilienceUtil.getGeneralRedisRetry(RETRY_NAME)
|
||||
.executeCompletionStage(retryExecutor, () -> pubSubRedisClient.withConnection(connection -> connection.async()
|
||||
@@ -1622,16 +1571,6 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
||||
}
|
||||
}
|
||||
|
||||
private static String getTimestampTransferArchiveKey(final UUID accountIdentifier,
|
||||
final byte destinationDeviceId,
|
||||
final Instant destinationDeviceCreationTimestamp) {
|
||||
Metrics.counter(TIMESTAMP_BASED_TRANSFER_ARCHIVE_KEY_COUNTER_NAME).increment();
|
||||
|
||||
return TRANSFER_ARCHIVE_PREFIX + accountIdentifier.toString() +
|
||||
":" + destinationDeviceId +
|
||||
":" + destinationDeviceCreationTimestamp.toEpochMilli();
|
||||
}
|
||||
|
||||
private static String getRegistrationIdTransferArchiveKey(final UUID accountIdentifier,
|
||||
final byte destinationDeviceId,
|
||||
final int registrationId) {
|
||||
@@ -1734,8 +1673,9 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
||||
final String[] deviceIdentifierComponents =
|
||||
channel.substring(TRANSFER_ARCHIVE_KEYSPACE_PATTERN.length() - 1).split(":", 4);
|
||||
|
||||
if (deviceIdentifierComponents.length != 3 && deviceIdentifierComponents.length != 4) {
|
||||
logger.error("Could not parse device identifier; unexpected component count");
|
||||
if (deviceIdentifierComponents.length != 4) {
|
||||
logger.error("Could not parse device identifier; unexpected component count: {}",
|
||||
deviceIdentifierComponents.length);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1745,24 +1685,16 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
||||
final UUID accountIdentifier = UUID.fromString(deviceIdentifierComponents[0]);
|
||||
final byte deviceId = Byte.parseByte(deviceIdentifierComponents[1]);
|
||||
|
||||
if (deviceIdentifierComponents.length == 3) {
|
||||
// Parse the old transfer archive Redis key format
|
||||
final Instant deviceCreationTimestamp = Instant.ofEpochMilli(Long.parseLong(deviceIdentifierComponents[2]));
|
||||
|
||||
deviceIdentifier = new TimestampDeviceIdentifier(accountIdentifier, deviceId, deviceCreationTimestamp);
|
||||
transferArchiveKey = getTimestampTransferArchiveKey(accountIdentifier, deviceId, deviceCreationTimestamp);
|
||||
} else {
|
||||
final String maybeRegistrationIdPattern = deviceIdentifierComponents[2];
|
||||
if (!maybeRegistrationIdPattern.equals(TRANSFER_ARCHIVE_REGISTRATION_ID_PATTERN)) {
|
||||
throw new IllegalArgumentException("Could not parse Redis key with pattern " + maybeRegistrationIdPattern);
|
||||
}
|
||||
final int registrationId = Integer.parseInt(deviceIdentifierComponents[3]);
|
||||
if (!RegistrationIdValidator.validRegistrationId(registrationId)) {
|
||||
throw new IllegalArgumentException("Invalid registration ID: " + registrationId);
|
||||
}
|
||||
deviceIdentifier = new RegistrationIdDeviceIdentifier(accountIdentifier, deviceId, registrationId);
|
||||
transferArchiveKey = getRegistrationIdTransferArchiveKey(accountIdentifier, deviceId, registrationId);
|
||||
final String registrationIdPattern = deviceIdentifierComponents[2];
|
||||
if (!registrationIdPattern.equals(TRANSFER_ARCHIVE_REGISTRATION_ID_PATTERN)) {
|
||||
throw new IllegalArgumentException("Could not parse Redis key with pattern " + registrationIdPattern);
|
||||
}
|
||||
final int registrationId = Integer.parseInt(deviceIdentifierComponents[3]);
|
||||
if (!RegistrationIdValidator.validRegistrationId(registrationId)) {
|
||||
throw new IllegalArgumentException("Invalid registration ID: " + registrationId);
|
||||
}
|
||||
deviceIdentifier = new DeviceIdentifier(accountIdentifier, deviceId, registrationId);
|
||||
transferArchiveKey = getRegistrationIdTransferArchiveKey(accountIdentifier, deviceId, registrationId);
|
||||
|
||||
Optional.ofNullable(waitForTransferArchiveFuturesByDeviceIdentifier.remove(deviceIdentifier))
|
||||
.ifPresent(future -> pubSubRedisClient.withConnection(connection -> connection.async().get(transferArchiveKey))
|
||||
|
||||
Reference in New Issue
Block a user