mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-19 14:38:04 +01:00
Add gRPC backup services
This commit is contained in:
committed by
ravi-signal
parent
3ca9a66323
commit
a88560e557
@@ -154,6 +154,7 @@ import org.whispersystems.textsecuregcm.grpc.PaymentsGrpcService;
|
||||
import org.whispersystems.textsecuregcm.grpc.ProfileAnonymousGrpcService;
|
||||
import org.whispersystems.textsecuregcm.grpc.ProfileGrpcService;
|
||||
import org.whispersystems.textsecuregcm.grpc.RequestAttributesInterceptor;
|
||||
import org.whispersystems.textsecuregcm.grpc.ValidatingInterceptor;
|
||||
import org.whispersystems.textsecuregcm.grpc.net.GrpcClientConnectionManager;
|
||||
import org.whispersystems.textsecuregcm.grpc.net.ManagedDefaultEventLoopGroup;
|
||||
import org.whispersystems.textsecuregcm.grpc.net.ManagedLocalGrpcServer;
|
||||
@@ -862,6 +863,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
final RequestAttributesInterceptor requestAttributesInterceptor =
|
||||
new RequestAttributesInterceptor(grpcClientConnectionManager);
|
||||
|
||||
final ValidatingInterceptor validatingInterceptor = new ValidatingInterceptor();
|
||||
|
||||
final LocalAddress anonymousGrpcServerAddress = new LocalAddress("grpc-anonymous");
|
||||
final LocalAddress authenticatedGrpcServerAddress = new LocalAddress("grpc-authenticated");
|
||||
|
||||
@@ -875,6 +878,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
.intercept(
|
||||
new ExternalRequestFilter(config.getExternalRequestFilterConfiguration().permittedInternalRanges(),
|
||||
config.getExternalRequestFilterConfiguration().grpcMethods()))
|
||||
.intercept(validatingInterceptor)
|
||||
// TODO: specialize metrics with user-agent platform
|
||||
.intercept(metricCollectingServerInterceptor)
|
||||
.intercept(errorMappingInterceptor)
|
||||
@@ -897,6 +901,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
// http://grpc.github.io/grpc-java/javadoc/io/grpc/ServerBuilder.html#intercept-io.grpc.ServerInterceptor-
|
||||
serverBuilder
|
||||
// TODO: specialize metrics with user-agent platform
|
||||
.intercept(validatingInterceptor)
|
||||
.intercept(metricCollectingServerInterceptor)
|
||||
.intercept(errorMappingInterceptor)
|
||||
.intercept(remoteDeprecationFilter)
|
||||
|
||||
@@ -0,0 +1,226 @@
|
||||
/*
|
||||
* Copyright 2024 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
package org.whispersystems.textsecuregcm.grpc;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import io.grpc.Status;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import org.signal.chat.backup.CopyMediaRequest;
|
||||
import org.signal.chat.backup.CopyMediaResponse;
|
||||
import org.signal.chat.backup.DeleteAllRequest;
|
||||
import org.signal.chat.backup.DeleteAllResponse;
|
||||
import org.signal.chat.backup.DeleteMediaRequest;
|
||||
import org.signal.chat.backup.DeleteMediaResponse;
|
||||
import org.signal.chat.backup.GetBackupInfoRequest;
|
||||
import org.signal.chat.backup.GetBackupInfoResponse;
|
||||
import org.signal.chat.backup.GetCdnCredentialsRequest;
|
||||
import org.signal.chat.backup.GetCdnCredentialsResponse;
|
||||
import org.signal.chat.backup.GetUploadFormRequest;
|
||||
import org.signal.chat.backup.GetUploadFormResponse;
|
||||
import org.signal.chat.backup.ListMediaRequest;
|
||||
import org.signal.chat.backup.ListMediaResponse;
|
||||
import org.signal.chat.backup.ReactorBackupsAnonymousGrpc;
|
||||
import org.signal.chat.backup.RefreshRequest;
|
||||
import org.signal.chat.backup.RefreshResponse;
|
||||
import org.signal.chat.backup.SetPublicKeyRequest;
|
||||
import org.signal.chat.backup.SetPublicKeyResponse;
|
||||
import org.signal.chat.backup.SignedPresentation;
|
||||
import org.signal.libsignal.protocol.InvalidKeyException;
|
||||
import org.signal.libsignal.protocol.ecc.ECPublicKey;
|
||||
import org.signal.libsignal.zkgroup.InvalidInputException;
|
||||
import org.signal.libsignal.zkgroup.backups.BackupAuthCredentialPresentation;
|
||||
import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser;
|
||||
import org.whispersystems.textsecuregcm.backup.BackupManager;
|
||||
import org.whispersystems.textsecuregcm.backup.CopyParameters;
|
||||
import org.whispersystems.textsecuregcm.backup.MediaEncryptionParameters;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class BackupsAnonymousGrpcService extends ReactorBackupsAnonymousGrpc.BackupsAnonymousImplBase {
|
||||
|
||||
private final BackupManager backupManager;
|
||||
|
||||
public BackupsAnonymousGrpcService(final BackupManager backupManager) {
|
||||
this.backupManager = backupManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<GetCdnCredentialsResponse> getCdnCredentials(final GetCdnCredentialsRequest request) {
|
||||
return authenticateBackupUserMono(request.getSignedPresentation())
|
||||
.map(user -> backupManager.generateReadAuth(user, request.getCdn()))
|
||||
.map(credentials -> GetCdnCredentialsResponse.newBuilder().putAllHeaders(credentials).build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<GetBackupInfoResponse> getBackupInfo(final GetBackupInfoRequest request) {
|
||||
return Mono.fromFuture(() ->
|
||||
authenticateBackupUser(request.getSignedPresentation()).thenCompose(backupManager::backupInfo))
|
||||
.map(info -> GetBackupInfoResponse.newBuilder()
|
||||
.setBackupName(info.messageBackupKey())
|
||||
.setCdn(info.cdn())
|
||||
.setBackupDir(info.backupSubdir())
|
||||
.setMediaDir(info.mediaSubdir())
|
||||
.setUsedSpace(info.mediaUsedSpace().orElse(0L))
|
||||
.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<RefreshResponse> refresh(final RefreshRequest request) {
|
||||
return Mono.fromFuture(() -> authenticateBackupUser(request.getSignedPresentation())
|
||||
.thenCompose(backupManager::ttlRefresh))
|
||||
.thenReturn(RefreshResponse.getDefaultInstance());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<SetPublicKeyResponse> setPublicKey(final SetPublicKeyRequest request) {
|
||||
final ECPublicKey publicKey = deserialize(ECPublicKey::new, request.getPublicKey().toByteArray());
|
||||
final BackupAuthCredentialPresentation presentation = deserialize(
|
||||
BackupAuthCredentialPresentation::new,
|
||||
request.getSignedPresentation().getPresentation().toByteArray());
|
||||
final byte[] signature = request.getSignedPresentation().getPresentationSignature().toByteArray();
|
||||
|
||||
return Mono.fromFuture(() -> backupManager.setPublicKey(presentation, signature, publicKey))
|
||||
.thenReturn(SetPublicKeyResponse.getDefaultInstance());
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Mono<GetUploadFormResponse> getUploadForm(final GetUploadFormRequest request) {
|
||||
return authenticateBackupUserMono(request.getSignedPresentation())
|
||||
.flatMap(backupUser -> switch (request.getUploadTypeCase()) {
|
||||
case MESSAGES -> Mono.fromFuture(backupManager.createMessageBackupUploadDescriptor(backupUser));
|
||||
case MEDIA -> Mono.fromCompletionStage(backupManager.createTemporaryAttachmentUploadDescriptor(backupUser));
|
||||
case UPLOADTYPE_NOT_SET -> Mono.error(Status.INVALID_ARGUMENT
|
||||
.withDescription("Must set upload_type")
|
||||
.asRuntimeException());
|
||||
})
|
||||
.map(uploadDescriptor -> GetUploadFormResponse.newBuilder()
|
||||
.setCdn(uploadDescriptor.cdn())
|
||||
.setKey(uploadDescriptor.key())
|
||||
.setSignedUploadLocation(uploadDescriptor.signedUploadLocation())
|
||||
.putAllHeaders(uploadDescriptor.headers())
|
||||
.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<CopyMediaResponse> copyMedia(final CopyMediaRequest request) {
|
||||
return authenticateBackupUserMono(request.getSignedPresentation())
|
||||
.flatMapMany(backupUser -> backupManager.copyToBackup(backupUser,
|
||||
request.getItemsList().stream().map(item -> new CopyParameters(
|
||||
item.getSourceAttachmentCdn(), item.getSourceKey(),
|
||||
// uint32 in proto, make sure it fits in a signed int
|
||||
fromUnsignedExact(item.getObjectLength()),
|
||||
new MediaEncryptionParameters(item.getEncryptionKey().toByteArray(), item.getHmacKey().toByteArray()),
|
||||
item.getMediaId().toByteArray())).toList()))
|
||||
.map(copyResult -> {
|
||||
CopyMediaResponse.Builder builder = CopyMediaResponse
|
||||
.newBuilder()
|
||||
.setMediaId(ByteString.copyFrom(copyResult.mediaId()));
|
||||
builder = switch (copyResult.outcome()) {
|
||||
case SUCCESS -> builder
|
||||
.setSuccess(CopyMediaResponse.CopySuccess.newBuilder().setCdn(copyResult.cdn()).build());
|
||||
case OUT_OF_QUOTA -> builder
|
||||
.setOutOfSpace(CopyMediaResponse.OutOfSpace.getDefaultInstance());
|
||||
case SOURCE_WRONG_LENGTH -> builder
|
||||
.setWrongSourceLength(CopyMediaResponse.WrongSourceLength.getDefaultInstance());
|
||||
case SOURCE_NOT_FOUND -> builder
|
||||
.setSourceNotFound(CopyMediaResponse.SourceNotFound.getDefaultInstance());
|
||||
};
|
||||
return builder.build();
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ListMediaResponse> listMedia(final ListMediaRequest request) {
|
||||
return authenticateBackupUserMono(request.getSignedPresentation()).zipWhen(
|
||||
backupUser -> Mono.fromFuture(backupManager.list(
|
||||
backupUser,
|
||||
request.hasCursor() ? Optional.of(request.getCursor()) : Optional.empty(),
|
||||
request.getLimit()).toCompletableFuture()),
|
||||
|
||||
(backupUser, listResult) -> {
|
||||
final ListMediaResponse.Builder builder = ListMediaResponse.newBuilder();
|
||||
for (BackupManager.StorageDescriptorWithLength sd : listResult.media()) {
|
||||
builder.addPage(ListMediaResponse.ListEntry.newBuilder()
|
||||
.setMediaId(ByteString.copyFrom(sd.key()))
|
||||
.setCdn(sd.cdn())
|
||||
.setLength(sd.length())
|
||||
.build());
|
||||
}
|
||||
builder
|
||||
.setBackupDir(backupUser.backupDir())
|
||||
.setMediaDir(backupUser.mediaDir());
|
||||
listResult.cursor().ifPresent(builder::setCursor);
|
||||
return builder.build();
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<DeleteAllResponse> deleteAll(final DeleteAllRequest request) {
|
||||
return Mono.fromFuture(() -> authenticateBackupUser(request.getSignedPresentation())
|
||||
.thenCompose(backupManager::deleteEntireBackup))
|
||||
.thenReturn(DeleteAllResponse.getDefaultInstance());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<DeleteMediaResponse> deleteMedia(final DeleteMediaRequest request) {
|
||||
return Mono
|
||||
.fromFuture(() -> authenticateBackupUser(request.getSignedPresentation()))
|
||||
.flatMapMany(backupUser -> backupManager.deleteMedia(backupUser, request
|
||||
.getItemsList()
|
||||
.stream()
|
||||
.map(item -> new BackupManager.StorageDescriptor(item.getCdn(), item.getMediaId().toByteArray()))
|
||||
.toList()))
|
||||
.map(storageDescriptor -> DeleteMediaResponse.newBuilder()
|
||||
.setMediaId(ByteString.copyFrom(storageDescriptor.key()))
|
||||
.setCdn(storageDescriptor.cdn()).build());
|
||||
}
|
||||
|
||||
private Mono<AuthenticatedBackupUser> authenticateBackupUserMono(final SignedPresentation signedPresentation) {
|
||||
return Mono.fromFuture(() -> authenticateBackupUser(signedPresentation));
|
||||
}
|
||||
|
||||
private CompletableFuture<AuthenticatedBackupUser> authenticateBackupUser(
|
||||
final SignedPresentation signedPresentation) {
|
||||
if (signedPresentation == null) {
|
||||
throw Status.UNAUTHENTICATED.asRuntimeException();
|
||||
}
|
||||
try {
|
||||
return backupManager.authenticateBackupUser(
|
||||
new BackupAuthCredentialPresentation(signedPresentation.getPresentation().toByteArray()),
|
||||
signedPresentation.getPresentationSignature().toByteArray());
|
||||
} catch (InvalidInputException e) {
|
||||
throw Status.UNAUTHENTICATED.withDescription("Could not deserialize presentation").asRuntimeException();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert an int from a proto uint32 to a signed positive integer, throwing if the value exceeds
|
||||
* {@link Integer#MAX_VALUE}. To convert to a long, see {@link Integer#toUnsignedLong(int)}
|
||||
*/
|
||||
private static int fromUnsignedExact(final int i) {
|
||||
if (i < 0) {
|
||||
throw Status.INVALID_ARGUMENT.withDescription("Invalid size").asRuntimeException();
|
||||
}
|
||||
return i;
|
||||
}
|
||||
|
||||
private interface Deserializer<T> {
|
||||
|
||||
T deserialize(byte[] bytes) throws InvalidInputException, InvalidKeyException;
|
||||
}
|
||||
|
||||
private static <T> T deserialize(Deserializer<T> deserializer, byte[] bytes) {
|
||||
try {
|
||||
return deserializer.deserialize(bytes);
|
||||
} catch (InvalidInputException | InvalidKeyException e) {
|
||||
throw Status.INVALID_ARGUMENT.withDescription("Invalid serialization").asRuntimeException();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,122 @@
|
||||
/*
|
||||
* Copyright 2024 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
package org.whispersystems.textsecuregcm.grpc;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import io.grpc.Status;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import org.signal.chat.backup.GetBackupAuthCredentialsRequest;
|
||||
import org.signal.chat.backup.GetBackupAuthCredentialsResponse;
|
||||
import org.signal.chat.backup.ReactorBackupsGrpc;
|
||||
import org.signal.chat.backup.RedeemReceiptRequest;
|
||||
import org.signal.chat.backup.RedeemReceiptResponse;
|
||||
import org.signal.chat.backup.SetBackupIdRequest;
|
||||
import org.signal.chat.backup.SetBackupIdResponse;
|
||||
import org.signal.chat.common.ZkCredential;
|
||||
import org.signal.libsignal.zkgroup.InvalidInputException;
|
||||
import org.signal.libsignal.zkgroup.backups.BackupAuthCredentialRequest;
|
||||
import org.signal.libsignal.zkgroup.backups.BackupCredentialType;
|
||||
import org.signal.libsignal.zkgroup.receipts.ReceiptCredentialPresentation;
|
||||
import org.whispersystems.textsecuregcm.auth.grpc.AuthenticatedDevice;
|
||||
import org.whispersystems.textsecuregcm.auth.grpc.AuthenticationUtil;
|
||||
import org.whispersystems.textsecuregcm.backup.BackupAuthManager;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class BackupsGrpcService extends ReactorBackupsGrpc.BackupsImplBase {
|
||||
|
||||
private final AccountsManager accountManager;
|
||||
private final BackupAuthManager backupAuthManager;
|
||||
|
||||
public BackupsGrpcService(final AccountsManager accountManager, final BackupAuthManager backupAuthManager) {
|
||||
this.accountManager = accountManager;
|
||||
this.backupAuthManager = backupAuthManager;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Mono<SetBackupIdResponse> setBackupId(SetBackupIdRequest request) {
|
||||
|
||||
final BackupAuthCredentialRequest messagesCredentialRequest = deserialize(
|
||||
BackupAuthCredentialRequest::new,
|
||||
request.getMessagesBackupAuthCredentialRequest().toByteArray());
|
||||
|
||||
final BackupAuthCredentialRequest mediaCredentialRequest = deserialize(
|
||||
BackupAuthCredentialRequest::new,
|
||||
request.getMediaBackupAuthCredentialRequest().toByteArray());
|
||||
|
||||
return authenticatedAccount()
|
||||
.flatMap(account -> Mono.fromFuture(
|
||||
backupAuthManager.commitBackupId(account, messagesCredentialRequest, mediaCredentialRequest)))
|
||||
.thenReturn(SetBackupIdResponse.getDefaultInstance());
|
||||
}
|
||||
|
||||
public Mono<RedeemReceiptResponse> redeemReceipt(RedeemReceiptRequest request) {
|
||||
final ReceiptCredentialPresentation receiptCredentialPresentation = deserialize(
|
||||
ReceiptCredentialPresentation::new,
|
||||
request.getPresentation().toByteArray());
|
||||
return authenticatedAccount()
|
||||
.flatMap(account -> Mono.fromFuture(backupAuthManager.redeemReceipt(account, receiptCredentialPresentation)))
|
||||
.thenReturn(RedeemReceiptResponse.getDefaultInstance());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<GetBackupAuthCredentialsResponse> getBackupAuthCredentials(GetBackupAuthCredentialsRequest request) {
|
||||
return authenticatedAccount().flatMap(account -> {
|
||||
final Mono<List<BackupAuthManager.Credential>> messageCredentials = Mono.fromCompletionStage(() ->
|
||||
backupAuthManager.getBackupAuthCredentials(
|
||||
account,
|
||||
BackupCredentialType.MESSAGES,
|
||||
Instant.ofEpochSecond(request.getRedemptionStart()),
|
||||
Instant.ofEpochSecond(request.getRedemptionStop())));
|
||||
final Mono<List<BackupAuthManager.Credential>> mediaCredentials = Mono.fromCompletionStage(() ->
|
||||
backupAuthManager.getBackupAuthCredentials(
|
||||
account,
|
||||
BackupCredentialType.MEDIA,
|
||||
Instant.ofEpochSecond(request.getRedemptionStart()),
|
||||
Instant.ofEpochSecond(request.getRedemptionStop())));
|
||||
|
||||
return messageCredentials.zipWith(mediaCredentials, (messageCreds, mediaCreds) ->
|
||||
GetBackupAuthCredentialsResponse.newBuilder()
|
||||
.putAllMessageCredentials(messageCreds.stream().collect(Collectors.toMap(
|
||||
c -> c.redemptionTime().getEpochSecond(),
|
||||
c -> ZkCredential.newBuilder()
|
||||
.setCredential(ByteString.copyFrom(c.credential().serialize()))
|
||||
.setRedemptionTime(c.redemptionTime().getEpochSecond())
|
||||
.build())))
|
||||
.putAllMediaCredentials(mediaCreds.stream().collect(Collectors.toMap(
|
||||
c -> c.redemptionTime().getEpochSecond(),
|
||||
c -> ZkCredential.newBuilder()
|
||||
.setCredential(ByteString.copyFrom(c.credential().serialize()))
|
||||
.setRedemptionTime(c.redemptionTime().getEpochSecond())
|
||||
.build())))
|
||||
.build());
|
||||
});
|
||||
}
|
||||
|
||||
private Mono<Account> authenticatedAccount() {
|
||||
final AuthenticatedDevice authenticatedDevice = AuthenticationUtil.requireAuthenticatedDevice();
|
||||
return Mono
|
||||
.fromFuture(() -> accountManager.getByAccountIdentifierAsync(authenticatedDevice.accountIdentifier()))
|
||||
.map(maybeAccount -> maybeAccount.orElseThrow(Status.UNAUTHENTICATED::asRuntimeException));
|
||||
}
|
||||
|
||||
private interface Deserializer<T> {
|
||||
|
||||
T deserialize(byte[] bytes) throws InvalidInputException;
|
||||
}
|
||||
|
||||
private <T> T deserialize(Deserializer<T> deserializer, byte[] bytes) {
|
||||
try {
|
||||
return deserializer.deserialize(bytes);
|
||||
} catch (InvalidInputException e) {
|
||||
throw Status.INVALID_ARGUMENT.withDescription("Invalid serialization").asRuntimeException();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user