mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-20 07:08:05 +01:00
Use storage-manager's copy implementation
This commit is contained in:
committed by
ravi-signal
parent
843151859d
commit
fc097db2a0
@@ -9,8 +9,7 @@ import com.google.common.annotations.VisibleForTesting;
|
||||
import io.grpc.Status;
|
||||
import io.micrometer.core.instrument.DistributionSummary;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import io.micrometer.core.instrument.Timer;
|
||||
import java.security.SecureRandom;
|
||||
import java.time.Clock;
|
||||
import java.time.Duration;
|
||||
@@ -22,9 +21,6 @@ import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.stream.Collectors;
|
||||
import io.micrometer.core.instrument.Timer;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.signal.libsignal.protocol.ecc.Curve;
|
||||
import org.signal.libsignal.protocol.ecc.ECPublicKey;
|
||||
import org.signal.libsignal.zkgroup.GenericServerSecretParams;
|
||||
@@ -80,7 +76,6 @@ public class BackupManager {
|
||||
private final TusAttachmentGenerator tusAttachmentGenerator;
|
||||
private final Cdn3BackupCredentialGenerator cdn3BackupCredentialGenerator;
|
||||
private final RemoteStorageManager remoteStorageManager;
|
||||
private final Map<Integer, String> attachmentCdnBaseUris;
|
||||
private final SecureRandom secureRandom = new SecureRandom();
|
||||
private final Clock clock;
|
||||
|
||||
@@ -92,7 +87,6 @@ public class BackupManager {
|
||||
final TusAttachmentGenerator tusAttachmentGenerator,
|
||||
final Cdn3BackupCredentialGenerator cdn3BackupCredentialGenerator,
|
||||
final RemoteStorageManager remoteStorageManager,
|
||||
final Map<Integer, String> attachmentCdnBaseUris,
|
||||
final Clock clock) {
|
||||
this.backupsDb = backupsDb;
|
||||
this.serverSecretParams = serverSecretParams;
|
||||
@@ -101,11 +95,6 @@ public class BackupManager {
|
||||
this.cdn3BackupCredentialGenerator = cdn3BackupCredentialGenerator;
|
||||
this.remoteStorageManager = remoteStorageManager;
|
||||
this.clock = clock;
|
||||
// strip trailing "/" for easier URI construction
|
||||
this.attachmentCdnBaseUris = attachmentCdnBaseUris.entrySet().stream().collect(Collectors.toMap(
|
||||
Map.Entry::getKey,
|
||||
entry -> StringUtils.removeEnd(entry.getValue(), "/")
|
||||
));
|
||||
}
|
||||
|
||||
|
||||
@@ -271,23 +260,15 @@ public class BackupManager {
|
||||
.asRuntimeException();
|
||||
}
|
||||
|
||||
final URI sourceUri;
|
||||
try {
|
||||
sourceUri = attachmentReadUri(sourceCdn, sourceKey);
|
||||
} catch (IOException e) {
|
||||
return CompletableFuture.failedFuture(e);
|
||||
}
|
||||
|
||||
final BackupUploadDescriptor dst = cdn3BackupCredentialGenerator.generateUpload(
|
||||
cdnMediaPath(backupUser, destinationMediaId));
|
||||
|
||||
final String destination = cdnMediaPath(backupUser, destinationMediaId);
|
||||
final int destinationLength = encryptionParameters.outputSize(sourceLength);
|
||||
return this.backupsDb
|
||||
// Write the ddb updates before actually updating backing storage
|
||||
.trackMedia(backupUser, 1, destinationLength)
|
||||
|
||||
// Actually copy the objects. If the copy fails, our estimated quota usage may not be exact
|
||||
.thenComposeAsync(ignored -> remoteStorageManager.copy(sourceUri, sourceLength, encryptionParameters, dst))
|
||||
.thenComposeAsync(ignored ->
|
||||
remoteStorageManager.copy(sourceCdn, sourceKey, sourceLength, encryptionParameters, destination))
|
||||
.exceptionallyCompose(throwable -> {
|
||||
final Throwable unwrapped = ExceptionUtils.unwrap(throwable);
|
||||
if (!(unwrapped instanceof SourceObjectNotFoundException) && !(unwrapped instanceof InvalidLengthException)) {
|
||||
@@ -299,25 +280,10 @@ public class BackupManager {
|
||||
});
|
||||
})
|
||||
// indicates where the backup was stored
|
||||
.thenApply(ignore -> new StorageDescriptor(dst.cdn(), destinationMediaId));
|
||||
.thenApply(ignore -> new StorageDescriptor(remoteStorageManager.cdnNumber(), destinationMediaId));
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct the URI for an attachment with the specified key
|
||||
*
|
||||
* @param cdn where the attachment is located
|
||||
* @param key the attachment key
|
||||
* @return A {@link URI} where the attachment can be retrieved
|
||||
*/
|
||||
private URI attachmentReadUri(final int cdn, final String key) throws IOException {
|
||||
final String baseUri = attachmentCdnBaseUris.get(cdn);
|
||||
if (baseUri == null) {
|
||||
throw new SourceObjectNotFoundException("Unknown attachment cdn " + cdn);
|
||||
}
|
||||
return URI.create("%s/%s".formatted(baseUri, key));
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate credentials that can be used to read from the backup CDN
|
||||
*
|
||||
@@ -377,7 +343,7 @@ public class BackupManager {
|
||||
// Try to swap out the backupDir for the user
|
||||
.scheduleBackupDeletion(backupUser)
|
||||
// If there was already a pending swap, try to delete the cdn objects directly
|
||||
.exceptionallyCompose(ExceptionUtils.exceptionallyHandler(BackupsDb.PendingDeletionException.class,e ->
|
||||
.exceptionallyCompose(ExceptionUtils.exceptionallyHandler(BackupsDb.PendingDeletionException.class, e ->
|
||||
AsyncTimerUtil.record(SYNCHRONOUS_DELETE_TIMER, () ->
|
||||
deletePrefix(backupUser.backupDir(), DELETION_CONCURRENCY))));
|
||||
}
|
||||
@@ -590,7 +556,7 @@ public class BackupManager {
|
||||
/**
|
||||
* Check that the authenticated backup user is authorized to use the provided backupLevel
|
||||
*
|
||||
* @param backupUser The backup user to check
|
||||
* @param backupUser The backup user to check
|
||||
* @param backupLevel The authorization level to verify the backupUser has access to
|
||||
* @throws {@link Status#PERMISSION_DENIED} error if the backup user is not authorized to access {@code backupLevel}
|
||||
*/
|
||||
|
||||
@@ -1,102 +0,0 @@
|
||||
package org.whispersystems.textsecuregcm.backup;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.InvalidAlgorithmParameterException;
|
||||
import java.security.InvalidKeyException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Flow;
|
||||
import javax.crypto.BadPaddingException;
|
||||
import javax.crypto.Cipher;
|
||||
import javax.crypto.IllegalBlockSizeException;
|
||||
import javax.crypto.Mac;
|
||||
import javax.crypto.NoSuchPaddingException;
|
||||
import org.reactivestreams.FlowAdapters;
|
||||
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class BackupMediaEncrypter {
|
||||
|
||||
private final Cipher cipher;
|
||||
private final Mac mac;
|
||||
|
||||
public BackupMediaEncrypter(final MediaEncryptionParameters encryptionParameters) {
|
||||
cipher = initializeCipher(encryptionParameters);
|
||||
mac = initializeMac(encryptionParameters);
|
||||
}
|
||||
|
||||
public int outputSize(final int inputSize) {
|
||||
return cipher.getIV().length + cipher.getOutputSize(inputSize) + mac.getMacLength();
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform streaming encryption
|
||||
*
|
||||
* @param sourceBody A source of ByteBuffers, typically from an asynchronous HttpResponse
|
||||
* @return A publisher that returns IV + AES/CBC/PKCS5Padding encrypted source + HMAC(IV + encrypted source) suitable
|
||||
* to write with an asynchronous HttpRequest
|
||||
*/
|
||||
public Flow.Publisher<ByteBuffer> encryptBody(Flow.Publisher<List<ByteBuffer>> sourceBody) {
|
||||
|
||||
// Write IV, encrypted payload, mac
|
||||
final Flux<ByteBuffer> encryptedBody = Flux.concat(
|
||||
Mono.fromSupplier(() -> {
|
||||
mac.update(cipher.getIV());
|
||||
return ByteBuffer.wrap(cipher.getIV());
|
||||
}),
|
||||
Flux.from(FlowAdapters.toPublisher(sourceBody))
|
||||
.concatMap(Flux::fromIterable)
|
||||
.concatMap(byteBuffer -> {
|
||||
final byte[] copy = new byte[byteBuffer.remaining()];
|
||||
byteBuffer.get(copy);
|
||||
final byte[] res = cipher.update(copy);
|
||||
if (res == null) {
|
||||
return Mono.empty();
|
||||
} else {
|
||||
mac.update(res);
|
||||
return Mono.just(ByteBuffer.wrap(res));
|
||||
}
|
||||
}),
|
||||
Mono.fromSupplier(() -> {
|
||||
try {
|
||||
final byte[] finalBytes = cipher.doFinal();
|
||||
mac.update(finalBytes);
|
||||
return ByteBuffer.wrap(finalBytes);
|
||||
} catch (IllegalBlockSizeException | BadPaddingException e) {
|
||||
throw ExceptionUtils.wrap(e);
|
||||
}
|
||||
}),
|
||||
Mono.fromSupplier(() -> ByteBuffer.wrap(mac.doFinal())));
|
||||
return FlowAdapters.toFlowPublisher(encryptedBody);
|
||||
}
|
||||
|
||||
private static Mac initializeMac(final MediaEncryptionParameters encryptionParameters) {
|
||||
try {
|
||||
final Mac mac = Mac.getInstance("HmacSHA256");
|
||||
mac.init(encryptionParameters.hmacSHA256Key());
|
||||
return mac;
|
||||
} catch (NoSuchAlgorithmException e) {
|
||||
throw new AssertionError(e);
|
||||
} catch (InvalidKeyException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static Cipher initializeCipher(final MediaEncryptionParameters encryptionParameters) {
|
||||
try {
|
||||
final Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
|
||||
cipher.init(
|
||||
Cipher.ENCRYPT_MODE,
|
||||
encryptionParameters.aesEncryptionKey(),
|
||||
encryptionParameters.iv());
|
||||
return cipher;
|
||||
|
||||
} catch (NoSuchAlgorithmException | NoSuchPaddingException e) {
|
||||
throw new AssertionError(e);
|
||||
} catch (InvalidAlgorithmParameterException | InvalidKeyException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
package org.whispersystems.textsecuregcm.backup;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import io.micrometer.core.instrument.Timer;
|
||||
import java.io.IOException;
|
||||
@@ -8,30 +9,24 @@ import java.net.URI;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Base64;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Flow;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.stream.Stream;
|
||||
import javax.annotation.Nullable;
|
||||
import javax.validation.constraints.NotNull;
|
||||
import javax.ws.rs.core.HttpHeaders;
|
||||
import javax.ws.rs.core.Response;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.configuration.Cdn3StorageManagerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
|
||||
import org.whispersystems.textsecuregcm.http.FaultTolerantHttpClient;
|
||||
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
|
||||
@@ -42,20 +37,15 @@ public class Cdn3RemoteStorageManager implements RemoteStorageManager {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(Cdn3RemoteStorageManager.class);
|
||||
|
||||
private final FaultTolerantHttpClient cdnHttpClient;
|
||||
private final FaultTolerantHttpClient storageManagerHttpClient;
|
||||
private final String storageManagerBaseUrl;
|
||||
private final String clientId;
|
||||
private final String clientSecret;
|
||||
private final Map<Integer, String> sourceSchemes;
|
||||
|
||||
static final String CLIENT_ID_HEADER = "CF-Access-Client-Id";
|
||||
static final String CLIENT_SECRET_HEADER = "CF-Access-Client-Secret";
|
||||
|
||||
private static String TUS_UPLOAD_LENGTH_HEADER = "Upload-Length";
|
||||
private static String TUS_UPLOAD_OFFSET_HEADER = "Upload-Offset";
|
||||
private static String TUS_VERSION_HEADER = "Tus-Resumable";
|
||||
private static String TUS_VERSION = "1.0.0";
|
||||
private static String TUS_CONTENT_TYPE = "application/offset+octet-stream";
|
||||
|
||||
private static final String STORAGE_MANAGER_STATUS_COUNTER_NAME = MetricsUtil.name(Cdn3RemoteStorageManager.class,
|
||||
"storageManagerStatus");
|
||||
|
||||
@@ -66,41 +56,25 @@ public class Cdn3RemoteStorageManager implements RemoteStorageManager {
|
||||
|
||||
public Cdn3RemoteStorageManager(
|
||||
final ScheduledExecutorService retryExecutor,
|
||||
final CircuitBreakerConfiguration circuitBreakerConfiguration,
|
||||
final RetryConfiguration retryConfiguration,
|
||||
final List<String> cdnCaCertificates,
|
||||
final Cdn3StorageManagerConfiguration configuration) throws CertificateException {
|
||||
final Cdn3StorageManagerConfiguration configuration) {
|
||||
|
||||
// strip trailing "/" for easier URI construction
|
||||
this.storageManagerBaseUrl = StringUtils.removeEnd(configuration.baseUri(), "/");
|
||||
this.clientId = configuration.clientId();
|
||||
this.clientSecret = configuration.clientSecret().value();
|
||||
|
||||
// Client used to read/write to cdn
|
||||
this.cdnHttpClient = FaultTolerantHttpClient.newBuilder()
|
||||
.withName("cdn-client")
|
||||
.withCircuitBreaker(circuitBreakerConfiguration)
|
||||
.withExecutor(Executors.newCachedThreadPool())
|
||||
.withRetryExecutor(retryExecutor)
|
||||
.withRetry(retryConfiguration)
|
||||
.withConnectTimeout(Duration.ofSeconds(10))
|
||||
.withVersion(HttpClient.Version.HTTP_2)
|
||||
.withTrustedServerCertificates(cdnCaCertificates.toArray(new String[0]))
|
||||
.withNumClients(configuration.numHttpClients())
|
||||
.build();
|
||||
|
||||
// Client used for calls to storage-manager
|
||||
// storage-manager has an external CA so uses a different client
|
||||
this.storageManagerHttpClient = FaultTolerantHttpClient.newBuilder()
|
||||
.withName("cdn3-storage-manager")
|
||||
.withCircuitBreaker(circuitBreakerConfiguration)
|
||||
.withCircuitBreaker(configuration.circuitBreaker())
|
||||
.withExecutor(Executors.newCachedThreadPool())
|
||||
.withRetryExecutor(retryExecutor)
|
||||
.withRetry(retryConfiguration)
|
||||
.withRetry(configuration.retry())
|
||||
.withConnectTimeout(Duration.ofSeconds(10))
|
||||
.withVersion(HttpClient.Version.HTTP_2)
|
||||
.withNumClients(configuration.numHttpClients())
|
||||
.build();
|
||||
this.sourceSchemes = configuration.sourceSchemes();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -110,85 +84,70 @@ public class Cdn3RemoteStorageManager implements RemoteStorageManager {
|
||||
|
||||
@Override
|
||||
public CompletionStage<Void> copy(
|
||||
final URI sourceUri,
|
||||
final int sourceCdn,
|
||||
final String sourceKey,
|
||||
final int expectedSourceLength,
|
||||
final MediaEncryptionParameters encryptionParameters,
|
||||
final BackupUploadDescriptor uploadDescriptor) {
|
||||
|
||||
if (uploadDescriptor.cdn() != cdnNumber()) {
|
||||
throw new IllegalArgumentException("Cdn3RemoteStorageManager can only copy to cdn3");
|
||||
final String destinationKey) {
|
||||
final String sourceScheme = this.sourceSchemes.get(sourceCdn);
|
||||
if (sourceScheme == null) {
|
||||
return CompletableFuture.failedFuture(
|
||||
new SourceObjectNotFoundException("Cdn3RemoteStorageManager cannot copy from " + sourceCdn));
|
||||
}
|
||||
final String requestBody = new Cdn3CopyRequest(
|
||||
encryptionParameters,
|
||||
new Cdn3CopyRequest.SourceDescriptor(sourceScheme, sourceKey),
|
||||
expectedSourceLength,
|
||||
destinationKey).json();
|
||||
|
||||
final Timer.Sample sample = Timer.start();
|
||||
final BackupMediaEncrypter encrypter = new BackupMediaEncrypter(encryptionParameters);
|
||||
final HttpRequest request = HttpRequest.newBuilder().GET().uri(sourceUri).build();
|
||||
return cdnHttpClient.sendAsync(request, HttpResponse.BodyHandlers.ofPublisher()).thenCompose(response -> {
|
||||
try {
|
||||
return cdnHttpClient.sendAsync(
|
||||
createCopyRequest(expectedSourceLength, uploadDescriptor, encrypter, response),
|
||||
HttpResponse.BodyHandlers.discarding());
|
||||
} catch (Exception e) {
|
||||
// Discard the response body so we don't hold the http2 stream open
|
||||
response.body().subscribe(CancelSubscriber.INSTANCE);
|
||||
throw ExceptionUtils.wrap(e);
|
||||
}
|
||||
})
|
||||
final HttpRequest request = HttpRequest.newBuilder()
|
||||
.PUT(HttpRequest.BodyPublishers.ofString(requestBody))
|
||||
.uri(URI.create(copyUrl()))
|
||||
.header("Content-Type", "application/json")
|
||||
.header(CLIENT_ID_HEADER, clientId)
|
||||
.header(CLIENT_SECRET_HEADER, clientSecret)
|
||||
.build();
|
||||
return this.storageManagerHttpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
|
||||
.thenAccept(response -> {
|
||||
if (response.statusCode() != Response.Status.CREATED.getStatusCode() &&
|
||||
response.statusCode() != Response.Status.OK.getStatusCode()) {
|
||||
throw new CompletionException(new IOException("Failed to copy object: " + response.statusCode()));
|
||||
}
|
||||
long uploadOffset = response.headers().firstValueAsLong(TUS_UPLOAD_OFFSET_HEADER)
|
||||
.orElseThrow(() -> new CompletionException(new IOException("Tus server did not return Upload-Offset")));
|
||||
final int expectedEncryptedLength = encrypter.outputSize(expectedSourceLength);
|
||||
if (uploadOffset != expectedEncryptedLength) {
|
||||
throw new CompletionException(new IOException(
|
||||
"Expected to upload %d bytes, uploaded %d".formatted(expectedEncryptedLength, uploadOffset)));
|
||||
if (response.statusCode() == Response.Status.NOT_FOUND.getStatusCode()) {
|
||||
throw ExceptionUtils.wrap(new SourceObjectNotFoundException());
|
||||
} else if (response.statusCode() == Response.Status.CONFLICT.getStatusCode()) {
|
||||
throw ExceptionUtils.wrap(new InvalidLengthException(response.body()));
|
||||
} else if (!HttpUtils.isSuccessfulResponse(response.statusCode())) {
|
||||
logger.info("Failed to copy via storage-manager {} {}", response.statusCode(), response.body());
|
||||
throw ExceptionUtils.wrap(new IOException("Failed to copy object: " + response.statusCode()));
|
||||
}
|
||||
})
|
||||
.whenComplete((ignored, ignoredException) ->
|
||||
sample.stop(Metrics.timer(STORAGE_MANAGER_TIMER_NAME, OPERATION_TAG_NAME, "copy")));
|
||||
}
|
||||
|
||||
private HttpRequest createCopyRequest(
|
||||
final int expectedSourceLength,
|
||||
final BackupUploadDescriptor uploadDescriptor,
|
||||
BackupMediaEncrypter encrypter,
|
||||
HttpResponse<Flow.Publisher<List<ByteBuffer>>> response) throws IOException {
|
||||
if (response.statusCode() == Response.Status.NOT_FOUND.getStatusCode()) {
|
||||
throw new SourceObjectNotFoundException();
|
||||
} else if (response.statusCode() != Response.Status.OK.getStatusCode()) {
|
||||
throw new IOException("error reading from source: " + response.statusCode());
|
||||
/**
|
||||
* Serialized copy request for cdn3 storage manager
|
||||
*/
|
||||
record Cdn3CopyRequest(
|
||||
String encryptionKey, String hmacKey, String iv,
|
||||
SourceDescriptor source, int expectedSourceLength,
|
||||
String dst) {
|
||||
|
||||
Cdn3CopyRequest(MediaEncryptionParameters parameters, SourceDescriptor source, int expectedSourceLength,
|
||||
String dst) {
|
||||
this(Base64.getEncoder().encodeToString(parameters.aesEncryptionKey().getEncoded()),
|
||||
Base64.getEncoder().encodeToString(parameters.hmacSHA256Key().getEncoded()),
|
||||
Base64.getEncoder().encodeToString(parameters.iv().getIV()),
|
||||
source, expectedSourceLength, dst);
|
||||
}
|
||||
|
||||
final int actualSourceLength = Math.toIntExact(response.headers().firstValueAsLong("Content-Length")
|
||||
.orElseThrow(() -> new IOException("upstream missing Content-Length")));
|
||||
record SourceDescriptor(String scheme, String key) {}
|
||||
|
||||
if (actualSourceLength != expectedSourceLength) {
|
||||
throw new InvalidLengthException(
|
||||
"Provided sourceLength " + expectedSourceLength + " was " + actualSourceLength);
|
||||
String json() {
|
||||
try {
|
||||
return SystemMapper.jsonMapper().writeValueAsString(this);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalStateException("Could not serialize copy request", e);
|
||||
}
|
||||
}
|
||||
|
||||
final int expectedEncryptedLength = encrypter.outputSize(expectedSourceLength);
|
||||
final HttpRequest.BodyPublisher encryptedBody = HttpRequest.BodyPublishers.fromPublisher(
|
||||
encrypter.encryptBody(response.body()), expectedEncryptedLength);
|
||||
|
||||
final String[] headers = Stream.concat(
|
||||
uploadDescriptor.headers().entrySet()
|
||||
.stream()
|
||||
.flatMap(e -> Stream.of(e.getKey(), e.getValue())),
|
||||
Stream.of(
|
||||
TUS_VERSION_HEADER, TUS_VERSION,
|
||||
TUS_UPLOAD_LENGTH_HEADER, Integer.toString(expectedEncryptedLength),
|
||||
HttpHeaders.CONTENT_TYPE, TUS_CONTENT_TYPE))
|
||||
.toArray(String[]::new);
|
||||
|
||||
return HttpRequest.newBuilder()
|
||||
.uri(URI.create(uploadDescriptor.signedUploadLocation()))
|
||||
.headers(headers)
|
||||
.POST(encryptedBody)
|
||||
.build();
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -338,25 +297,7 @@ public class Cdn3RemoteStorageManager implements RemoteStorageManager {
|
||||
return "%s/%s/".formatted(storageManagerBaseUrl, Cdn3BackupCredentialGenerator.CDN_PATH);
|
||||
}
|
||||
|
||||
private static class CancelSubscriber implements Flow.Subscriber<List<ByteBuffer>> {
|
||||
|
||||
private static CancelSubscriber INSTANCE = new CancelSubscriber();
|
||||
|
||||
@Override
|
||||
public void onSubscribe(final Flow.Subscription subscription) {
|
||||
subscription.cancel();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(final List<ByteBuffer> item) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(final Throwable throwable) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
}
|
||||
private String copyUrl() {
|
||||
return "%s/copy".formatted(storageManagerBaseUrl);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package org.whispersystems.textsecuregcm.backup;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
@@ -18,12 +17,13 @@ public interface RemoteStorageManager {
|
||||
/**
|
||||
* Copy and the object from a remote source into the backup, adding an additional layer of encryption
|
||||
*
|
||||
* @param sourceUri The location of the object to copy
|
||||
* @param sourceCdn The cdn number where the source attachment is stored
|
||||
* @param sourceKey The key of the source attachment within the attachment cdn
|
||||
* @param expectedSourceLength The length of the source object, should match the content-length of the object returned
|
||||
* from the sourceUri.
|
||||
* @param encryptionParameters The encryption keys that should be used to apply an additional layer of encryption to
|
||||
* the object
|
||||
* @param uploadDescriptor The destination, which must be in the cdn returned by {@link #cdnNumber()}
|
||||
* @param dstKey The key within the backup cdn where the copied object will be written
|
||||
* @return A stage that completes successfully when the source has been successfully re-encrypted and copied into
|
||||
* uploadDescriptor. The returned CompletionStage can be completed exceptionally with the following exceptions.
|
||||
* <ul>
|
||||
@@ -33,10 +33,11 @@ public interface RemoteStorageManager {
|
||||
* </ul>
|
||||
*/
|
||||
CompletionStage<Void> copy(
|
||||
URI sourceUri,
|
||||
int sourceCdn,
|
||||
String sourceKey,
|
||||
int expectedSourceLength,
|
||||
MediaEncryptionParameters encryptionParameters,
|
||||
BackupUploadDescriptor uploadDescriptor);
|
||||
String dstKey);
|
||||
|
||||
/**
|
||||
* Result of a {@link #list} operation
|
||||
|
||||
Reference in New Issue
Block a user