Add API endpoints for waiting for transfer archives

This commit is contained in:
Jon Chambers
2024-10-11 13:04:51 -04:00
committed by Jon Chambers
parent 7ff48155d6
commit 73fb1fc2ed
4 changed files with 271 additions and 8 deletions

View File

@@ -17,11 +17,13 @@ import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import java.time.Duration;
import java.time.Instant;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import javax.validation.Valid;
@@ -58,7 +60,9 @@ import org.whispersystems.textsecuregcm.entities.LinkDeviceResponse;
import org.whispersystems.textsecuregcm.entities.LinkDeviceRequest;
import org.whispersystems.textsecuregcm.entities.PreKeySignatureValidator;
import org.whispersystems.textsecuregcm.entities.ProvisioningMessage;
import org.whispersystems.textsecuregcm.entities.RemoteAttachment;
import org.whispersystems.textsecuregcm.entities.SetPublicKeyRequest;
import org.whispersystems.textsecuregcm.entities.TransferArchiveUploadedRequest;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
@@ -318,9 +322,9 @@ public class DeviceController {
Waits for a new device to be linked to an account and returns basic information about the new device when
available.
""")
@ApiResponse(responseCode = "200", description = "The specified was linked to an account",
@ApiResponse(responseCode = "200", description = "A device was linked to an account using the token associated with the given token identifier",
content = @Content(schema = @Schema(implementation = DeviceInfo.class)))
@ApiResponse(responseCode = "204", description = "No device was linked to the account before the call completed")
@ApiResponse(responseCode = "204", description = "No device was linked to the account before the call completed; clients may repeat the call to continue waiting")
@ApiResponse(responseCode = "400", description = "The given token identifier or timeout was invalid")
@ApiResponse(responseCode = "429", description = "Rate-limited; try again after the prescribed delay")
public CompletableFuture<Response> waitForLinkedDevice(
@@ -432,4 +436,66 @@ public class DeviceController {
isDowngrade |= account.isVersionedExpirationTimerSupported() && !capabilities.versionedExpirationTimer();
return isDowngrade;
}
@PUT
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@Path("/transfer_archive")
@Operation(
summary = "Signals that a transfer archive has been uploaded for a specific linked device",
description = """
Signals that a transfer archive has been uploaded for a specific linked device. Devices waiting via the "wait
for transfer archive" endpoint will be notified that the new archive is available.
""")
@ApiResponse(responseCode = "204", description = "Success")
@ApiResponse(responseCode = "422", description = "The request object could not be parsed or was otherwise invalid")
@ApiResponse(responseCode = "429", description = "Rate-limited; try again after the prescribed delay")
public CompletionStage<Void> recordTransferArchiveUploaded(@ReadOnly @Auth final AuthenticatedDevice authenticatedDevice,
@NotNull @Valid final TransferArchiveUploadedRequest transferArchiveUploadedRequest) {
return rateLimiters.getUploadTransferArchiveLimiter().validateAsync(authenticatedDevice.getAccount().getIdentifier(IdentityType.ACI))
.thenCompose(ignored -> accounts.recordTransferArchiveUpload(authenticatedDevice.getAccount(),
transferArchiveUploadedRequest.destinationDeviceId(),
Instant.ofEpochMilli(transferArchiveUploadedRequest.destinationDeviceCreated()),
transferArchiveUploadedRequest.transferArchive()));
}
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/transfer_archive")
@Operation(summary = "Wait for a new transfer archive to be uploaded",
description = """
Waits for a new transfer archive to be uploaded for the authenticated device and returns the location of the
archive when available.
""")
@ApiResponse(responseCode = "200", description = "A new transfer archive was uploaded for the authenticated device",
content = @Content(schema = @Schema(implementation = RemoteAttachment.class)))
@ApiResponse(responseCode = "204", description = "No transfer archive was uploaded before the call completed; clients may repeat the call to continue waiting")
@ApiResponse(responseCode = "400", description = "The given timeout was invalid")
@ApiResponse(responseCode = "429", description = "Rate-limited; try again after the prescribed delay")
public CompletionStage<Response> waitForTransferArchive(@ReadOnly @Auth final AuthenticatedDevice authenticatedDevice,
@QueryParam("timeout")
@DefaultValue("30")
@Min(1)
@Max(3600)
@Schema(requiredMode = Schema.RequiredMode.NOT_REQUIRED,
minimum = "1",
maximum = "3600",
description = """
The amount of time (in seconds) to wait for a response. If a transfer archive for the authenticated
device is not available within the given amount of time, this endpoint will return a status of HTTP/204.
""") final int timeoutSeconds) {
final String rateLimiterKey = authenticatedDevice.getAccount().getIdentifier(IdentityType.ACI) +
":" + authenticatedDevice.getAuthenticatedDevice().getId();
return rateLimiters.getWaitForTransferArchiveLimiter().validateAsync(rateLimiterKey)
.thenCompose(ignored -> accounts.waitForTransferArchive(authenticatedDevice.getAccount(),
authenticatedDevice.getAuthenticatedDevice(),
Duration.ofSeconds(timeoutSeconds)))
.thenApply(maybeTransferArchive -> maybeTransferArchive
.map(transferArchive -> Response.status(Response.Status.OK).entity(transferArchive).build())
.orElseGet(() -> Response.status(Response.Status.NO_CONTENT).build()));
}
}

View File

@@ -0,0 +1,27 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.entities;
import io.swagger.v3.oas.annotations.media.Schema;
import org.whispersystems.textsecuregcm.storage.Device;
import javax.validation.Valid;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.Positive;
public record TransferArchiveUploadedRequest(@Min(1)
@Max(Device.MAXIMUM_DEVICE_ID)
@Schema(description = "The ID of the device for which the transfer archive has been prepared")
byte destinationDeviceId,
@Positive
@Schema(description = "The timestamp, in milliseconds since the epoch, at which the destination device was created")
long destinationDeviceCreated,
@Schema(description = "The location of the transfer archive")
@Valid
RemoteAttachment transferArchive) {
}

View File

@@ -51,6 +51,8 @@ public class RateLimiters extends BaseRateLimiters<RateLimiters.For> {
KEY_TRANSPARENCY_SEARCH_PER_IP("keyTransparencySearch", true, new RateLimiterConfig(100, Duration.ofSeconds(15))),
KEY_TRANSPARENCY_MONITOR_PER_IP("keyTransparencyMonitor", true, new RateLimiterConfig(100, Duration.ofSeconds(15))),
WAIT_FOR_LINKED_DEVICE("waitForLinkedDevice", true, new RateLimiterConfig(10, Duration.ofSeconds(30))),
UPLOAD_TRANSFER_ARCHIVE("uploadTransferArchive", true, new RateLimiterConfig(10, Duration.ofMinutes(1))),
WAIT_FOR_TRANSFER_ARCHIVE("waitForTransferArchive", true, new RateLimiterConfig(10, Duration.ofSeconds(30))),
;
private final String id;
@@ -210,4 +212,12 @@ public class RateLimiters extends BaseRateLimiters<RateLimiters.For> {
public RateLimiter getWaitForLinkedDeviceLimiter() {
return forDescriptor(For.WAIT_FOR_LINKED_DEVICE);
}
public RateLimiter getUploadTransferArchiveLimiter() {
return forDescriptor(For.UPLOAD_TRANSFER_ARCHIVE);
}
public RateLimiter getWaitForTransferArchiveLimiter() {
return forDescriptor(For.WAIT_FOR_TRANSFER_ARCHIVE);
}
}