mirror of
https://github.com/signalapp/Signal-Android.git
synced 2026-05-22 16:20:17 +01:00
Add SignalRestClient.
This commit is contained in:
committed by
jeffrey-signal
parent
ec47b83f76
commit
2aa27df95b
+1
-1
@@ -7,9 +7,9 @@ package org.thoughtcrime.securesms.database
|
||||
|
||||
import org.signal.core.util.Base64
|
||||
import org.signal.core.util.Util
|
||||
import org.signal.network.api.AttachmentUploadResult
|
||||
import org.thoughtcrime.securesms.attachments.AttachmentId
|
||||
import org.thoughtcrime.securesms.attachments.Cdn
|
||||
import org.whispersystems.signalservice.api.attachment.AttachmentUploadResult
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentRemoteId
|
||||
import kotlin.random.Random
|
||||
|
||||
|
||||
+3
-4
@@ -10,11 +10,11 @@ import org.thoughtcrime.securesms.recipients.LiveRecipientCache
|
||||
import org.whispersystems.signalservice.api.SignalServiceDataStore
|
||||
import org.whispersystems.signalservice.api.SignalServiceMessageSender
|
||||
import org.whispersystems.signalservice.api.account.AccountApi
|
||||
import org.whispersystems.signalservice.api.attachment.AttachmentApi
|
||||
import org.whispersystems.signalservice.api.donations.DonationsApi
|
||||
import org.whispersystems.signalservice.api.keys.KeysApi
|
||||
import org.whispersystems.signalservice.api.message.MessageApi
|
||||
import org.whispersystems.signalservice.api.websocket.SignalWebSocket
|
||||
import org.whispersystems.signalservice.internal.configuration.SignalServiceConfiguration
|
||||
import org.whispersystems.signalservice.internal.push.PushServiceSocket
|
||||
|
||||
/**
|
||||
@@ -41,7 +41,7 @@ class InstrumentationApplicationDependencyProvider(val application: Application,
|
||||
return recipientCache
|
||||
}
|
||||
|
||||
override fun provideArchiveApi(authWebSocket: SignalWebSocket.AuthenticatedWebSocket, unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket, pushServiceSocket: PushServiceSocket): ArchiveApi {
|
||||
override fun provideArchiveApi(authWebSocket: SignalWebSocket.AuthenticatedWebSocket, unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket, pushServiceSocket: PushServiceSocket, signalServiceConfiguration: SignalServiceConfiguration): ArchiveApi {
|
||||
return mockk()
|
||||
}
|
||||
|
||||
@@ -52,12 +52,11 @@ class InstrumentationApplicationDependencyProvider(val application: Application,
|
||||
override fun provideSignalServiceMessageSender(
|
||||
protocolStore: SignalServiceDataStore,
|
||||
pushServiceSocket: PushServiceSocket,
|
||||
attachmentApi: AttachmentApi,
|
||||
messageApi: MessageApi,
|
||||
keysApi: KeysApi
|
||||
): SignalServiceMessageSender {
|
||||
if (signalServiceMessageSender == null) {
|
||||
signalServiceMessageSender = spyk(objToCopy = default.provideSignalServiceMessageSender(protocolStore, pushServiceSocket, attachmentApi, messageApi, keysApi))
|
||||
signalServiceMessageSender = spyk(objToCopy = default.provideSignalServiceMessageSender(protocolStore, pushServiceSocket, messageApi, keysApi))
|
||||
}
|
||||
return signalServiceMessageSender!!
|
||||
}
|
||||
|
||||
+1
-1
@@ -25,6 +25,7 @@ import org.signal.core.util.Util
|
||||
import org.signal.core.util.logging.Log
|
||||
import org.signal.core.util.update
|
||||
import org.signal.core.util.withinTransaction
|
||||
import org.signal.network.api.AttachmentUploadResult
|
||||
import org.thoughtcrime.securesms.attachments.Attachment
|
||||
import org.thoughtcrime.securesms.attachments.DatabaseAttachment
|
||||
import org.thoughtcrime.securesms.database.AttachmentTable
|
||||
@@ -37,7 +38,6 @@ import org.thoughtcrime.securesms.recipients.RecipientId
|
||||
import org.thoughtcrime.securesms.testing.MessageContentFuzzer.DeleteForMeSync
|
||||
import org.thoughtcrime.securesms.testing.SignalActivityRule
|
||||
import org.thoughtcrime.securesms.util.IdentityUtil
|
||||
import org.whispersystems.signalservice.api.attachment.AttachmentUploadResult
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentRemoteId
|
||||
import java.util.UUID
|
||||
|
||||
|
||||
@@ -71,6 +71,7 @@ import org.signal.network.NetworkResult
|
||||
import org.signal.network.StatusCodeErrorAction
|
||||
import org.signal.network.api.SvrBApi
|
||||
import org.signal.network.exceptions.NonSuccessfulResponseCodeException
|
||||
import org.signal.network.rest.toNetworkResult
|
||||
import org.thoughtcrime.securesms.R
|
||||
import org.thoughtcrime.securesms.attachments.AttachmentId
|
||||
import org.thoughtcrime.securesms.attachments.Cdn
|
||||
@@ -1628,19 +1629,6 @@ object BackupRepository {
|
||||
}
|
||||
}
|
||||
|
||||
fun getResumableMessagesBackupUploadSpec(backupFileSize: Long): NetworkResult<ResumableMessagesBackupUploadSpec> {
|
||||
return initBackupAndFetchAuth()
|
||||
.then { credential ->
|
||||
SignalNetwork.archive.getMessageBackupUploadForm(SignalStore.account.requireAci(), credential.messageBackupAccess, backupFileSize)
|
||||
.also { Log.i(TAG, "UploadFormResult: ${it::class.simpleName}") }
|
||||
}
|
||||
.then { form ->
|
||||
SignalNetwork.archive.getBackupResumableUploadUrl(form)
|
||||
.also { Log.i(TAG, "ResumableUploadUrlResult: ${it::class.simpleName}") }
|
||||
.map { ResumableMessagesBackupUploadSpec(attachmentUploadForm = form, resumableUri = it) }
|
||||
}
|
||||
}
|
||||
|
||||
fun getMessageBackupUploadForm(backupFileSize: Long): NetworkResult<AttachmentUploadForm> {
|
||||
return initBackupAndFetchAuth()
|
||||
.then { credential ->
|
||||
|
||||
@@ -66,6 +66,7 @@ import org.signal.core.util.toInt
|
||||
import org.signal.core.util.update
|
||||
import org.signal.core.util.withinTransaction
|
||||
import org.signal.glide.decryptableuri.DecryptableUri
|
||||
import org.signal.network.api.AttachmentUploadResult
|
||||
import org.thoughtcrime.securesms.attachments.ArchivedAttachment
|
||||
import org.thoughtcrime.securesms.attachments.Attachment
|
||||
import org.thoughtcrime.securesms.attachments.AttachmentId
|
||||
@@ -107,7 +108,6 @@ import org.thoughtcrime.securesms.util.ImageCompressionUtil
|
||||
import org.thoughtcrime.securesms.util.MediaUtil
|
||||
import org.thoughtcrime.securesms.util.RemoteConfig
|
||||
import org.thoughtcrime.securesms.video.EncryptedMediaDataSource
|
||||
import org.whispersystems.signalservice.api.attachment.AttachmentUploadResult
|
||||
import org.whispersystems.signalservice.api.crypto.AttachmentCipherStreamUtil
|
||||
import org.whispersystems.signalservice.internal.crypto.PaddingInputStream
|
||||
import java.io.ByteArrayInputStream
|
||||
|
||||
@@ -17,6 +17,7 @@ import org.signal.libsignal.zkgroup.profiles.ClientZkProfileOperations
|
||||
import org.signal.libsignal.zkgroup.receipts.ClientZkReceiptOperations
|
||||
import org.signal.mediasend.MediaSendDependencies
|
||||
import org.signal.network.api.ArchiveApi
|
||||
import org.signal.network.api.AttachmentApi
|
||||
import org.signal.network.api.CallingApi
|
||||
import org.signal.network.api.CdsApi
|
||||
import org.signal.network.api.CertificateApi
|
||||
@@ -27,6 +28,7 @@ import org.signal.network.api.RateLimitChallengeApi
|
||||
import org.signal.network.api.RemoteConfigApi
|
||||
import org.signal.network.api.SvrBApi
|
||||
import org.signal.network.api.UsernameApi
|
||||
import org.signal.network.rest.SignalRestClient
|
||||
import org.thoughtcrime.securesms.BuildConfig
|
||||
import org.thoughtcrime.securesms.components.TypingStatusRepository
|
||||
import org.thoughtcrime.securesms.components.TypingStatusSender
|
||||
@@ -63,7 +65,6 @@ import org.whispersystems.signalservice.api.SignalServiceDataStore
|
||||
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver
|
||||
import org.whispersystems.signalservice.api.SignalServiceMessageSender
|
||||
import org.whispersystems.signalservice.api.account.AccountApi
|
||||
import org.whispersystems.signalservice.api.attachment.AttachmentApi
|
||||
import org.whispersystems.signalservice.api.donations.DonationsApi
|
||||
import org.whispersystems.signalservice.api.groupsv2.GroupsV2Operations
|
||||
import org.whispersystems.signalservice.api.keys.KeysApi
|
||||
@@ -348,6 +349,10 @@ object AppDependencies {
|
||||
val pushServiceSocket: PushServiceSocket
|
||||
get() = networkModule.pushServiceSocket
|
||||
|
||||
@JvmStatic
|
||||
val signalRestClient: SignalRestClient
|
||||
get() = networkModule.signalRestClient
|
||||
|
||||
@JvmStatic
|
||||
val registrationApi: RegistrationApi
|
||||
get() = networkModule.registrationApi
|
||||
@@ -433,9 +438,10 @@ object AppDependencies {
|
||||
|
||||
interface Provider {
|
||||
fun providePushServiceSocket(signalServiceConfiguration: SignalServiceConfiguration, groupsV2Operations: GroupsV2Operations): PushServiceSocket
|
||||
fun provideSignalRestClient(signalServiceConfiguration: SignalServiceConfiguration): SignalRestClient
|
||||
fun provideGroupsV2Operations(signalServiceConfiguration: SignalServiceConfiguration): GroupsV2Operations
|
||||
fun provideSignalServiceAccountManager(authWebSocket: SignalWebSocket.AuthenticatedWebSocket, accountApi: AccountApi, pushServiceSocket: PushServiceSocket, groupsV2Operations: GroupsV2Operations): SignalServiceAccountManager
|
||||
fun provideSignalServiceMessageSender(protocolStore: SignalServiceDataStore, pushServiceSocket: PushServiceSocket, attachmentApi: AttachmentApi, messageApi: MessageApi, keysApi: KeysApi): SignalServiceMessageSender
|
||||
fun provideSignalServiceMessageSender(protocolStore: SignalServiceDataStore, pushServiceSocket: PushServiceSocket, messageApi: MessageApi, keysApi: KeysApi): SignalServiceMessageSender
|
||||
fun provideSignalServiceMessageReceiver(pushServiceSocket: PushServiceSocket): SignalServiceMessageReceiver
|
||||
fun provideSignalServiceNetworkAccess(): SignalServiceNetworkAccess
|
||||
fun provideRecipientCache(): LiveRecipientCache
|
||||
@@ -471,7 +477,7 @@ object AppDependencies {
|
||||
fun providePinnedMessageManager(): PinnedMessageManager
|
||||
fun provideLibsignalNetwork(config: SignalServiceConfiguration): Network
|
||||
fun provideBillingApi(): BillingApi
|
||||
fun provideArchiveApi(authWebSocket: SignalWebSocket.AuthenticatedWebSocket, unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket, pushServiceSocket: PushServiceSocket): ArchiveApi
|
||||
fun provideArchiveApi(authWebSocket: SignalWebSocket.AuthenticatedWebSocket, unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket, pushServiceSocket: PushServiceSocket, signalServiceConfiguration: SignalServiceConfiguration): ArchiveApi
|
||||
fun provideKeysApi(authWebSocket: SignalWebSocket.AuthenticatedWebSocket, unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket): KeysApi
|
||||
fun provideAttachmentApi(authWebSocket: SignalWebSocket.AuthenticatedWebSocket, pushServiceSocket: PushServiceSocket): AttachmentApi
|
||||
fun provideLinkDeviceApi(authWebSocket: SignalWebSocket.AuthenticatedWebSocket): LinkDeviceApi
|
||||
|
||||
+18
-5
@@ -18,9 +18,12 @@ import org.signal.core.util.concurrent.DeadlockDetector;
|
||||
import org.signal.core.util.concurrent.SignalExecutors;
|
||||
import org.signal.libsignal.net.Network;
|
||||
import org.signal.libsignal.protocol.SignalProtocolAddress;
|
||||
import org.signal.libsignal.zkgroup.GenericServerPublicParams;
|
||||
import org.signal.libsignal.zkgroup.InvalidInputException;
|
||||
import org.signal.libsignal.zkgroup.profiles.ClientZkProfileOperations;
|
||||
import org.signal.libsignal.zkgroup.receipts.ClientZkReceiptOperations;
|
||||
import org.signal.network.api.ArchiveApi;
|
||||
import org.signal.network.rest.SignalRestClient;
|
||||
import org.signal.network.api.CallingApi;
|
||||
import org.signal.network.api.CdsApi;
|
||||
import org.signal.network.api.CertificateApi;
|
||||
@@ -104,7 +107,7 @@ import org.whispersystems.signalservice.api.SignalServiceDataStore;
|
||||
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver;
|
||||
import org.whispersystems.signalservice.api.SignalServiceMessageSender;
|
||||
import org.whispersystems.signalservice.api.account.AccountApi;
|
||||
import org.whispersystems.signalservice.api.attachment.AttachmentApi;
|
||||
import org.signal.network.api.AttachmentApi;
|
||||
import org.whispersystems.signalservice.api.donations.DonationsApi;
|
||||
import org.whispersystems.signalservice.api.groupsv2.ClientZkOperations;
|
||||
import org.whispersystems.signalservice.api.groupsv2.GroupsV2Operations;
|
||||
@@ -154,6 +157,14 @@ public class ApplicationDependencyProvider implements AppDependencies.Provider {
|
||||
RemoteConfig.okHttpAutomaticRetry());
|
||||
}
|
||||
|
||||
@Override
|
||||
public @NonNull SignalRestClient provideSignalRestClient(@NonNull SignalServiceConfiguration signalServiceConfiguration) {
|
||||
return new SignalRestClient(signalServiceConfiguration,
|
||||
BuildConfig.SIGNAL_AGENT,
|
||||
new DynamicCredentialsProvider(),
|
||||
RemoteConfig.okHttpAutomaticRetry());
|
||||
}
|
||||
|
||||
@Override
|
||||
public @NonNull GroupsV2Operations provideGroupsV2Operations(@NonNull SignalServiceConfiguration signalServiceConfiguration) {
|
||||
return new GroupsV2Operations(provideClientZkOperations(signalServiceConfiguration), RemoteConfig.groupLimits().getHardLimit());
|
||||
@@ -167,13 +178,11 @@ public class ApplicationDependencyProvider implements AppDependencies.Provider {
|
||||
@Override
|
||||
public @NonNull SignalServiceMessageSender provideSignalServiceMessageSender(@NonNull SignalServiceDataStore protocolStore,
|
||||
@NonNull PushServiceSocket pushServiceSocket,
|
||||
@NonNull AttachmentApi attachmentApi,
|
||||
@NonNull MessageApi messageApi,
|
||||
@NonNull KeysApi keysApi) {
|
||||
return new SignalServiceMessageSender(pushServiceSocket,
|
||||
protocolStore,
|
||||
ReentrantSessionLock.INSTANCE,
|
||||
attachmentApi,
|
||||
messageApi,
|
||||
keysApi,
|
||||
Optional.of(new SecurityEventListener(context)),
|
||||
@@ -502,8 +511,12 @@ public class ApplicationDependencyProvider implements AppDependencies.Provider {
|
||||
}
|
||||
|
||||
@Override
|
||||
public @NonNull ArchiveApi provideArchiveApi(@NonNull SignalWebSocket.AuthenticatedWebSocket authWebSocket, @NonNull SignalWebSocket.UnauthenticatedWebSocket unauthWebSocket, @NonNull PushServiceSocket pushServiceSocket) {
|
||||
return new ArchiveApi(authWebSocket, unauthWebSocket, pushServiceSocket);
|
||||
public @NonNull ArchiveApi provideArchiveApi(@NonNull SignalWebSocket.AuthenticatedWebSocket authWebSocket, @NonNull SignalWebSocket.UnauthenticatedWebSocket unauthWebSocket, @NonNull PushServiceSocket pushServiceSocket, @NonNull SignalServiceConfiguration signalServiceConfiguration) {
|
||||
try {
|
||||
return new ArchiveApi(authWebSocket, unauthWebSocket, pushServiceSocket, new GenericServerPublicParams(signalServiceConfiguration.getBackupServerPublicParams()));
|
||||
} catch (InvalidInputException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
+8
-3
@@ -17,6 +17,7 @@ import org.signal.core.util.resettableLazy
|
||||
import org.signal.libsignal.net.Network
|
||||
import org.signal.libsignal.zkgroup.receipts.ClientZkReceiptOperations
|
||||
import org.signal.network.api.ArchiveApi
|
||||
import org.signal.network.api.AttachmentApi
|
||||
import org.signal.network.api.CallingApi
|
||||
import org.signal.network.api.CdsApi
|
||||
import org.signal.network.api.CertificateApi
|
||||
@@ -27,6 +28,7 @@ import org.signal.network.api.RateLimitChallengeApi
|
||||
import org.signal.network.api.RemoteConfigApi
|
||||
import org.signal.network.api.SvrBApi
|
||||
import org.signal.network.api.UsernameApi
|
||||
import org.signal.network.rest.SignalRestClient
|
||||
import org.thoughtcrime.securesms.crypto.storage.SignalServiceDataStoreImpl
|
||||
import org.thoughtcrime.securesms.groups.GroupsV2Authorization
|
||||
import org.thoughtcrime.securesms.groups.GroupsV2AuthorizationMemoryValueCache
|
||||
@@ -40,7 +42,6 @@ import org.whispersystems.signalservice.api.SignalServiceAccountManager
|
||||
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver
|
||||
import org.whispersystems.signalservice.api.SignalServiceMessageSender
|
||||
import org.whispersystems.signalservice.api.account.AccountApi
|
||||
import org.whispersystems.signalservice.api.attachment.AttachmentApi
|
||||
import org.whispersystems.signalservice.api.donations.DonationsApi
|
||||
import org.whispersystems.signalservice.api.groupsv2.GroupsV2Operations
|
||||
import org.whispersystems.signalservice.api.keys.KeysApi
|
||||
@@ -90,7 +91,7 @@ class NetworkDependenciesModule(
|
||||
val protocolStore: SignalServiceDataStoreImpl by _protocolStore
|
||||
|
||||
private val _signalServiceMessageSender = resettableLazy {
|
||||
provider.provideSignalServiceMessageSender(protocolStore, pushServiceSocket, attachmentApi, messageApi, keysApi)
|
||||
provider.provideSignalServiceMessageSender(protocolStore, pushServiceSocket, messageApi, keysApi)
|
||||
}
|
||||
val signalServiceMessageSender: SignalServiceMessageSender by _signalServiceMessageSender
|
||||
|
||||
@@ -102,6 +103,10 @@ class NetworkDependenciesModule(
|
||||
provider.providePushServiceSocket(signalServiceNetworkAccess.getConfiguration(), groupsV2Operations)
|
||||
}
|
||||
|
||||
val signalRestClient: SignalRestClient by lazy {
|
||||
provider.provideSignalRestClient(signalServiceNetworkAccess.getConfiguration())
|
||||
}
|
||||
|
||||
val signalServiceAccountManager: SignalServiceAccountManager by lazy {
|
||||
provider.provideSignalServiceAccountManager(authWebSocket, accountApi, pushServiceSocket, groupsV2Operations)
|
||||
}
|
||||
@@ -150,7 +155,7 @@ class NetworkDependenciesModule(
|
||||
}
|
||||
|
||||
val archiveApi: ArchiveApi by lazy {
|
||||
provider.provideArchiveApi(authWebSocket, unauthWebSocket, pushServiceSocket)
|
||||
provider.provideArchiveApi(authWebSocket, unauthWebSocket, pushServiceSocket, signalServiceNetworkAccess.getConfiguration())
|
||||
}
|
||||
|
||||
val keysApi: KeysApi by lazy {
|
||||
|
||||
@@ -9,6 +9,7 @@ import org.signal.core.util.Util
|
||||
import org.signal.core.util.logging.Log
|
||||
import org.signal.glide.decryptableuri.DecryptableUri
|
||||
import org.signal.network.NetworkResult
|
||||
import org.signal.network.api.AttachmentUploadResult
|
||||
import org.thoughtcrime.securesms.attachments.AttachmentId
|
||||
import org.thoughtcrime.securesms.attachments.AttachmentUploadUtil
|
||||
import org.thoughtcrime.securesms.attachments.DatabaseAttachment
|
||||
@@ -30,7 +31,6 @@ import org.thoughtcrime.securesms.net.SignalNetwork
|
||||
import org.thoughtcrime.securesms.util.ImageCompressionUtil
|
||||
import org.thoughtcrime.securesms.util.MediaUtil
|
||||
import org.thoughtcrime.securesms.util.RemoteConfig
|
||||
import org.whispersystems.signalservice.api.attachment.AttachmentUploadResult
|
||||
import org.whispersystems.signalservice.api.crypto.AttachmentCipherStreamUtil
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentStream
|
||||
|
||||
@@ -16,6 +16,7 @@ import org.signal.core.util.readLength
|
||||
import org.signal.libsignal.net.RequestResult
|
||||
import org.signal.libsignal.net.RetryLaterException
|
||||
import org.signal.libsignal.net.UploadTooLargeException
|
||||
import org.signal.network.api.AttachmentUploadResult
|
||||
import org.signal.protos.resumableuploads.ResumableUpload
|
||||
import org.thoughtcrime.securesms.R
|
||||
import org.thoughtcrime.securesms.attachments.Attachment
|
||||
@@ -41,7 +42,6 @@ import org.thoughtcrime.securesms.transport.UndeliverableMessageException
|
||||
import org.thoughtcrime.securesms.util.MediaUtil
|
||||
import org.thoughtcrime.securesms.util.MessageUtil
|
||||
import org.thoughtcrime.securesms.util.RemoteConfig
|
||||
import org.whispersystems.signalservice.api.attachment.AttachmentUploadResult
|
||||
import org.whispersystems.signalservice.api.crypto.AttachmentCipherStreamUtil
|
||||
import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment
|
||||
|
||||
@@ -19,6 +19,7 @@ import org.thoughtcrime.securesms.crypto.ProfileKeyUtil;
|
||||
import org.thoughtcrime.securesms.database.RecipientTable;
|
||||
import org.thoughtcrime.securesms.database.SignalDatabase;
|
||||
import org.thoughtcrime.securesms.database.model.IdentityRecord;
|
||||
import org.signal.network.service.CdnService;
|
||||
import org.thoughtcrime.securesms.dependencies.AppDependencies;
|
||||
import org.thoughtcrime.securesms.jobmanager.Job;
|
||||
import org.thoughtcrime.securesms.jobmanager.JsonJobData;
|
||||
@@ -286,11 +287,12 @@ public class MultiDeviceContactUpdateJob extends BaseJob {
|
||||
{
|
||||
if (length > 0) {
|
||||
try {
|
||||
CdnService cdnService = new CdnService(AppDependencies.getSignalRestClient(), AppDependencies.getAttachmentApi());
|
||||
SignalServiceAttachmentStream.Builder attachmentStream = SignalServiceAttachment.newStreamBuilder()
|
||||
.withStream(stream)
|
||||
.withContentType("application/octet-stream")
|
||||
.withLength(length)
|
||||
.withResumableUploadSpec(messageSender.getResumableUploadSpec(AttachmentCipherStreamUtil.getCiphertextLength(PaddingInputStream.getPaddedSize(length))));
|
||||
.withResumableUploadSpec(cdnService.getResumableUploadSpecBlocking(AttachmentCipherStreamUtil.getCiphertextLength(PaddingInputStream.getPaddedSize(length))));
|
||||
|
||||
messageSender.sendSyncMessage(SignalServiceSyncMessage.forContacts(new ContactsMessage(attachmentStream.build(), complete))
|
||||
);
|
||||
|
||||
+3
-1
@@ -6,6 +6,7 @@ import androidx.annotation.Nullable;
|
||||
|
||||
import org.signal.core.util.logging.Log;
|
||||
import org.signal.libsignal.zkgroup.profiles.ProfileKey;
|
||||
import org.signal.network.service.CdnService;
|
||||
import org.thoughtcrime.securesms.BuildConfig;
|
||||
import org.thoughtcrime.securesms.crypto.ProfileKeyUtil;
|
||||
import org.thoughtcrime.securesms.dependencies.AppDependencies;
|
||||
@@ -93,7 +94,8 @@ public class MultiDeviceProfileKeyUpdateJob extends BaseJob {
|
||||
SignalServiceMessageSender messageSender = AppDependencies.getSignalServiceMessageSender();
|
||||
long dataLength = baos.toByteArray().length;
|
||||
long ciphertextLength = AttachmentCipherStreamUtil.getCiphertextLength(PaddingInputStream.getPaddedSize(dataLength));
|
||||
ResumableUploadSpec uploadSpec = messageSender.getResumableUploadSpec(ciphertextLength);
|
||||
CdnService cdnService = new CdnService(AppDependencies.getSignalRestClient(), AppDependencies.getAttachmentApi());
|
||||
ResumableUploadSpec uploadSpec = cdnService.getResumableUploadSpecBlocking(ciphertextLength);
|
||||
SignalServiceAttachmentStream attachmentStream = SignalServiceAttachment.newStreamBuilder()
|
||||
.withStream(new ByteArrayInputStream(baos.toByteArray()))
|
||||
.withContentType("application/octet-stream")
|
||||
|
||||
@@ -16,6 +16,7 @@ import org.signal.core.util.Util
|
||||
import org.signal.core.util.logging.Log
|
||||
import org.signal.libsignal.zkgroup.InvalidInputException
|
||||
import org.signal.libsignal.zkgroup.receipts.ReceiptCredentialPresentation
|
||||
import org.signal.network.service.CdnService
|
||||
import org.thoughtcrime.securesms.BuildConfig
|
||||
import org.thoughtcrime.securesms.TextSecureExpiredException
|
||||
import org.thoughtcrime.securesms.attachments.Attachment
|
||||
@@ -233,7 +234,7 @@ abstract class PushSendJob protected constructor(parameters: Parameters) : BaseJ
|
||||
|
||||
val inputStream = PartAuthority.getAttachmentStream(context, attachment.uri!!)
|
||||
val ciphertextLength = getCiphertextLength(PaddingInputStream.getPaddedSize(attachment.size))
|
||||
val uploadSpec = AppDependencies.signalServiceMessageSender.getResumableUploadSpec(ciphertextLength)
|
||||
val uploadSpec = CdnService(AppDependencies.signalRestClient, AppDependencies.attachmentApi).getResumableUploadSpecBlocking(ciphertextLength)
|
||||
|
||||
return SignalServiceAttachment.newStreamBuilder()
|
||||
.withStream(inputStream)
|
||||
|
||||
@@ -14,6 +14,7 @@ import org.signal.core.util.isNotNullOrBlank
|
||||
import org.signal.core.util.logging.Log
|
||||
import org.signal.core.util.readLength
|
||||
import org.signal.network.NetworkResult
|
||||
import org.signal.network.api.AttachmentUploadResult
|
||||
import org.signal.protos.resumableuploads.ResumableUpload
|
||||
import org.thoughtcrime.securesms.R
|
||||
import org.thoughtcrime.securesms.attachments.AttachmentId
|
||||
@@ -35,7 +36,6 @@ import org.thoughtcrime.securesms.service.AttachmentProgressService
|
||||
import org.thoughtcrime.securesms.util.MediaUtil
|
||||
import org.thoughtcrime.securesms.util.RemoteConfig
|
||||
import org.whispersystems.signalservice.api.archive.ArchiveMediaUploadFormStatusCodes
|
||||
import org.whispersystems.signalservice.api.attachment.AttachmentUploadResult
|
||||
import org.whispersystems.signalservice.api.crypto.AttachmentCipherStreamUtil
|
||||
import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
package org.thoughtcrime.securesms.net
|
||||
|
||||
import org.signal.network.api.ArchiveApi
|
||||
import org.signal.network.api.AttachmentApi
|
||||
import org.signal.network.api.CallingApi
|
||||
import org.signal.network.api.CdsApi
|
||||
import org.signal.network.api.CertificateApi
|
||||
@@ -19,7 +20,6 @@ import org.signal.network.api.UsernameApi
|
||||
import org.thoughtcrime.securesms.dependencies.AppDependencies
|
||||
import org.thoughtcrime.securesms.dependencies.KeyTransparencyApi
|
||||
import org.whispersystems.signalservice.api.account.AccountApi
|
||||
import org.whispersystems.signalservice.api.attachment.AttachmentApi
|
||||
import org.whispersystems.signalservice.api.keys.KeysApi
|
||||
import org.whispersystems.signalservice.api.message.MessageApi
|
||||
import org.whispersystems.signalservice.api.profiles.ProfileApi
|
||||
|
||||
+7
-3
@@ -7,6 +7,7 @@ import org.signal.libsignal.net.Network
|
||||
import org.signal.libsignal.zkgroup.profiles.ClientZkProfileOperations
|
||||
import org.signal.libsignal.zkgroup.receipts.ClientZkReceiptOperations
|
||||
import org.signal.network.api.ArchiveApi
|
||||
import org.signal.network.api.AttachmentApi
|
||||
import org.signal.network.api.CallingApi
|
||||
import org.signal.network.api.CdsApi
|
||||
import org.signal.network.api.CertificateApi
|
||||
@@ -17,6 +18,7 @@ import org.signal.network.api.RateLimitChallengeApi
|
||||
import org.signal.network.api.RemoteConfigApi
|
||||
import org.signal.network.api.SvrBApi
|
||||
import org.signal.network.api.UsernameApi
|
||||
import org.signal.network.rest.SignalRestClient
|
||||
import org.thoughtcrime.securesms.components.TypingStatusRepository
|
||||
import org.thoughtcrime.securesms.components.TypingStatusSender
|
||||
import org.thoughtcrime.securesms.crypto.storage.SignalServiceDataStoreImpl
|
||||
@@ -50,7 +52,6 @@ import org.whispersystems.signalservice.api.SignalServiceDataStore
|
||||
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver
|
||||
import org.whispersystems.signalservice.api.SignalServiceMessageSender
|
||||
import org.whispersystems.signalservice.api.account.AccountApi
|
||||
import org.whispersystems.signalservice.api.attachment.AttachmentApi
|
||||
import org.whispersystems.signalservice.api.donations.DonationsApi
|
||||
import org.whispersystems.signalservice.api.groupsv2.GroupsV2Operations
|
||||
import org.whispersystems.signalservice.api.keys.KeysApi
|
||||
@@ -70,6 +71,10 @@ class MockApplicationDependencyProvider : AppDependencies.Provider {
|
||||
return mockk(relaxed = true)
|
||||
}
|
||||
|
||||
override fun provideSignalRestClient(signalServiceConfiguration: SignalServiceConfiguration): SignalRestClient {
|
||||
return mockk(relaxed = true)
|
||||
}
|
||||
|
||||
override fun provideGroupsV2Operations(signalServiceConfiguration: SignalServiceConfiguration): GroupsV2Operations {
|
||||
return mockk(relaxed = true)
|
||||
}
|
||||
@@ -81,7 +86,6 @@ class MockApplicationDependencyProvider : AppDependencies.Provider {
|
||||
override fun provideSignalServiceMessageSender(
|
||||
protocolStore: SignalServiceDataStore,
|
||||
pushServiceSocket: PushServiceSocket,
|
||||
attachmentApi: AttachmentApi,
|
||||
messageApi: MessageApi,
|
||||
keysApi: KeysApi
|
||||
): SignalServiceMessageSender {
|
||||
@@ -232,7 +236,7 @@ class MockApplicationDependencyProvider : AppDependencies.Provider {
|
||||
return mockk(relaxed = true)
|
||||
}
|
||||
|
||||
override fun provideArchiveApi(authWebSocket: SignalWebSocket.AuthenticatedWebSocket, unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket, pushServiceSocket: PushServiceSocket): ArchiveApi {
|
||||
override fun provideArchiveApi(authWebSocket: SignalWebSocket.AuthenticatedWebSocket, unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket, pushServiceSocket: PushServiceSocket, signalServiceConfiguration: SignalServiceConfiguration): ArchiveApi {
|
||||
return mockk(relaxed = true)
|
||||
}
|
||||
|
||||
|
||||
-24
@@ -19,7 +19,6 @@ import org.signal.libsignal.net.MultiRecipientSendAuthorization;
|
||||
import org.signal.libsignal.net.MultiRecipientSendFailure;
|
||||
import org.signal.libsignal.net.RequestResult;
|
||||
import org.signal.libsignal.net.RequestUnauthorizedException;
|
||||
import org.signal.libsignal.net.UploadTooLargeException;
|
||||
import org.signal.libsignal.net.RetryLaterException;
|
||||
import org.signal.libsignal.protocol.IdentityKey;
|
||||
import org.signal.libsignal.protocol.IdentityKeyPair;
|
||||
@@ -36,7 +35,6 @@ import org.signal.libsignal.protocol.message.SenderKeyDistributionMessage;
|
||||
import org.signal.libsignal.protocol.state.PreKeyBundle;
|
||||
import org.signal.libsignal.protocol.state.SessionRecord;
|
||||
import org.signal.libsignal.zkgroup.groupsend.GroupSendFullToken;
|
||||
import org.whispersystems.signalservice.api.attachment.AttachmentApi;
|
||||
import org.whispersystems.signalservice.api.crypto.AttachmentCipherStreamUtil;
|
||||
import org.whispersystems.signalservice.api.crypto.ContentHint;
|
||||
import org.whispersystems.signalservice.api.crypto.EnvelopeContent;
|
||||
@@ -103,7 +101,6 @@ import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableExcept
|
||||
import org.whispersystems.signalservice.internal.crypto.AttachmentDigest;
|
||||
import org.whispersystems.signalservice.internal.crypto.PaddingInputStream;
|
||||
import org.whispersystems.signalservice.internal.push.AttachmentPointer;
|
||||
import org.whispersystems.signalservice.internal.push.AttachmentUploadForm;
|
||||
import org.whispersystems.signalservice.internal.push.BodyRange;
|
||||
import org.whispersystems.signalservice.internal.push.CallMessage;
|
||||
import org.whispersystems.signalservice.internal.push.Content;
|
||||
@@ -186,7 +183,6 @@ public class SignalServiceMessageSender {
|
||||
private final Optional<EventListener> eventListener;
|
||||
private final IdentityKeyPair localPniIdentity;
|
||||
|
||||
private final AttachmentApi attachmentApi;
|
||||
private final MessageApi messageApi;
|
||||
private final KeysApi keysApi;
|
||||
private final PreKeyRepository preKeyRepository;
|
||||
@@ -201,7 +197,6 @@ public class SignalServiceMessageSender {
|
||||
public SignalServiceMessageSender(PushServiceSocket pushServiceSocket,
|
||||
SignalServiceDataStore store,
|
||||
SignalSessionLock sessionLock,
|
||||
AttachmentApi attachmentApi,
|
||||
MessageApi messageApi,
|
||||
KeysApi keysApi,
|
||||
Optional<EventListener> eventListener,
|
||||
@@ -222,7 +217,6 @@ public class SignalServiceMessageSender {
|
||||
this.localDeviceId = credentialsProvider.getDeviceId();
|
||||
this.localProtocolAddress = new SignalProtocolAddress(localAddress.getIdentifier(), localDeviceId);
|
||||
this.localPni = credentialsProvider.getPni();
|
||||
this.attachmentApi = attachmentApi;
|
||||
this.messageApi = messageApi;
|
||||
this.eventListener = eventListener;
|
||||
this.maxEnvelopeSize = maxEnvelopeSize;
|
||||
@@ -841,24 +835,6 @@ public class SignalServiceMessageSender {
|
||||
return uploadAttachmentV4(attachment, attachmentKey, attachmentData);
|
||||
}
|
||||
|
||||
public ResumableUploadSpec getResumableUploadSpec(long uploadSizeBytes) throws IOException {
|
||||
Log.d(TAG, "Using pipe to retrieve attachment upload attributes...");
|
||||
RequestResult<AttachmentUploadForm, UploadTooLargeException> result = attachmentApi.getAttachmentV4UploadForm(uploadSizeBytes);
|
||||
|
||||
if (result instanceof RequestResult.Success) {
|
||||
AttachmentUploadForm v4UploadAttributes = ((RequestResult.Success<AttachmentUploadForm>) result).getResult();
|
||||
return socket.getResumableUploadSpec(v4UploadAttributes);
|
||||
} else if (result instanceof RequestResult.NonSuccess) {
|
||||
throw ((RequestResult.NonSuccess<UploadTooLargeException>) result).getError();
|
||||
} else if (result instanceof RequestResult.RetryableNetworkError) {
|
||||
throw new PushNetworkException(((RequestResult.RetryableNetworkError) result).getNetworkError());
|
||||
} else if (result instanceof RequestResult.ApplicationError) {
|
||||
throw new RuntimeException(((RequestResult.ApplicationError) result).getCause());
|
||||
} else {
|
||||
throw new IOException("Unexpected RequestResult type: " + result.getClass().getSimpleName());
|
||||
}
|
||||
}
|
||||
|
||||
private SignalServiceAttachmentPointer uploadAttachmentV4(SignalServiceAttachmentStream attachment, byte[] attachmentKey, PushAttachmentData attachmentData) throws IOException {
|
||||
AttachmentDigest digest = socket.uploadAttachment(attachmentData);
|
||||
return new SignalServiceAttachmentPointer(attachmentData.getResumableUploadSpec().getCdnNumber(),
|
||||
|
||||
@@ -59,4 +59,5 @@ dependencies {
|
||||
testImplementation(testLibs.junit.junit)
|
||||
testImplementation(testLibs.assertk)
|
||||
testImplementation(testLibs.mockk)
|
||||
testImplementation(testLibs.kotlinx.coroutines.test)
|
||||
}
|
||||
|
||||
@@ -57,15 +57,14 @@ import kotlin.time.Duration.Companion.seconds
|
||||
class ArchiveApi(
|
||||
private val authWebSocket: SignalWebSocket.AuthenticatedWebSocket,
|
||||
private val unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket,
|
||||
private val pushServiceSocket: PushServiceSocket
|
||||
private val pushServiceSocket: PushServiceSocket,
|
||||
private val backupServerPublicParams: GenericServerPublicParams
|
||||
) {
|
||||
|
||||
companion object {
|
||||
private val TAG = Log.tag(ArchiveApi::class)
|
||||
}
|
||||
|
||||
private val backupServerPublicParams: GenericServerPublicParams = GenericServerPublicParams(pushServiceSocket.configuration.backupServerPublicParams)
|
||||
|
||||
/**
|
||||
* Retrieves a set of credentials one can use to authorize other requests.
|
||||
*
|
||||
@@ -246,15 +245,6 @@ class ArchiveApi(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves a resumable upload URL you can use to upload your main message backup file or an arbitrary media file to cloud storage.
|
||||
*/
|
||||
fun getBackupResumableUploadUrl(uploadForm: AttachmentUploadForm): NetworkResult<String> {
|
||||
return NetworkResult.fromFetch {
|
||||
pushServiceSocket.getResumableUploadUrl(uploadForm)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Uploads a pre-encrypted backup file, automatically choosing the best upload strategy based on CDN version.
|
||||
* For CDN3, uses TUS "Creation With Upload" (single POST). For other CDNs, falls back to the legacy
|
||||
@@ -294,7 +284,7 @@ class ArchiveApi(
|
||||
/**
|
||||
* Retrieves an [AttachmentUploadForm] that can be used to upload pre-existing media to the archive.
|
||||
*
|
||||
* This is basically the same as [org.whispersystems.signalservice.api.attachment.AttachmentApi.getAttachmentV4UploadForm], but with a relaxed rate limit
|
||||
* This is basically the same as [org.signal.network.api.AttachmentApi.getAttachmentV4UploadForm], but with a relaxed rate limit
|
||||
* so we can request them more often (which is required for backfilling).
|
||||
*
|
||||
* After uploading, the media still needs to be copied via [copyAttachmentToArchive].
|
||||
|
||||
+1
-1
@@ -3,7 +3,7 @@
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.signalservice.api.attachment
|
||||
package org.signal.network.api
|
||||
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import org.signal.core.util.logging.Log
|
||||
+1
-1
@@ -3,7 +3,7 @@
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.signalservice.api.attachment
|
||||
package org.signal.network.api
|
||||
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentRemoteId
|
||||
|
||||
@@ -0,0 +1,16 @@
|
||||
/*
|
||||
* Copyright 2026 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.signal.network.rest
|
||||
|
||||
/**
|
||||
* Success outcome of [SignalRestClient.download]. The actual payload was streamed into the
|
||||
* caller-supplied destination; this carries metadata about the response.
|
||||
*/
|
||||
data class DownloadResult(
|
||||
val statusCode: Int,
|
||||
val headers: Map<String, String>,
|
||||
val totalBytes: Long
|
||||
)
|
||||
@@ -0,0 +1,16 @@
|
||||
/*
|
||||
* Copyright 2026 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.signal.network.rest
|
||||
|
||||
import org.signal.libsignal.net.BadRequestError
|
||||
|
||||
/**
|
||||
* Maps a non-2xx HTTP response into a typed [BadRequestError]. Pass an instance to
|
||||
* [SignalRestClient.request] when a request has known business-logic errors.
|
||||
*/
|
||||
fun interface ErrorMapper<out E : BadRequestError> {
|
||||
fun map(statusCode: Int, headers: Map<String, String>, body: ByteArray?): E
|
||||
}
|
||||
@@ -0,0 +1,49 @@
|
||||
/*
|
||||
* Copyright 2026 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.signal.network.rest
|
||||
|
||||
import org.signal.libsignal.net.RequestResult
|
||||
import org.signal.network.NetworkResult
|
||||
import org.signal.network.exceptions.NonSuccessfulResponseCodeException
|
||||
import java.io.IOException
|
||||
|
||||
/**
|
||||
* Bridge a [RequestResult] (returned by [SignalRestClient]) into the older [NetworkResult] surface
|
||||
* still used by most API classes. Use this during the migration off `PushServiceSocket`.
|
||||
*/
|
||||
fun <T> RequestResult<T, RestStatusCodeError>.toNetworkResult(): NetworkResult<T> {
|
||||
return when (this) {
|
||||
is RequestResult.Success -> NetworkResult.Success(result)
|
||||
is RequestResult.NonSuccess -> NetworkResult.StatusCodeError(error.toNonSuccessfulResponseCodeException())
|
||||
is RequestResult.RetryableNetworkError -> NetworkResult.NetworkError(networkError)
|
||||
is RequestResult.ApplicationError -> NetworkResult.ApplicationError(cause)
|
||||
}
|
||||
}
|
||||
|
||||
/** Build a [NonSuccessfulResponseCodeException] from a [RestStatusCodeError]. */
|
||||
fun RestStatusCodeError.toNonSuccessfulResponseCodeException(): NonSuccessfulResponseCodeException {
|
||||
return NonSuccessfulResponseCodeException(
|
||||
statusCode,
|
||||
"Bad response: $statusCode",
|
||||
body,
|
||||
headers
|
||||
)
|
||||
}
|
||||
|
||||
/** Convert a [RequestResult] failure into an [IOException]-style throw, matching `NetworkResult.successOrThrow()` shape. */
|
||||
@Throws(IOException::class)
|
||||
fun <T> RequestResult<T, RestStatusCodeError>.successOrThrow(): T {
|
||||
return when (this) {
|
||||
is RequestResult.Success -> result
|
||||
is RequestResult.NonSuccess -> throw error.toNonSuccessfulResponseCodeException()
|
||||
is RequestResult.RetryableNetworkError -> throw networkError
|
||||
is RequestResult.ApplicationError -> when (val t = cause) {
|
||||
is IOException -> throw t
|
||||
is RuntimeException -> throw t
|
||||
else -> throw RuntimeException(t)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
/*
|
||||
* Copyright 2026 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.signal.network.rest
|
||||
|
||||
import okhttp3.RequestBody
|
||||
|
||||
/**
|
||||
* Description of a single REST request to be executed by [SignalRestClient].
|
||||
*/
|
||||
data class RequestSpec(
|
||||
val method: Method,
|
||||
val host: Host,
|
||||
val path: String,
|
||||
val body: RequestBody? = null,
|
||||
val headers: Map<String, String> = emptyMap(),
|
||||
val auth: Auth = Auth.None
|
||||
) {
|
||||
enum class Method(val value: String) {
|
||||
GET("GET"),
|
||||
PUT("PUT"),
|
||||
POST("POST"),
|
||||
PATCH("PATCH"),
|
||||
DELETE("DELETE"),
|
||||
HEAD("HEAD")
|
||||
}
|
||||
|
||||
/** Which set of Signal URLs the request should be routed through. */
|
||||
sealed interface Host {
|
||||
/** The standard Signal service URLs. */
|
||||
data object Service : Host
|
||||
|
||||
/** A specific Signal CDN. [number] selects the entry in [SignalServiceConfiguration.signalCdnUrlMap]. */
|
||||
data class Cdn(val number: Int) : Host
|
||||
|
||||
/** The Signal storage service URLs. */
|
||||
data object Storage : Host
|
||||
}
|
||||
|
||||
/** How (or whether) to attach authentication to the outgoing request. */
|
||||
sealed interface Auth {
|
||||
/** No auth header. */
|
||||
data object None : Auth
|
||||
|
||||
/** Basic auth derived from the CredentialsProvider passed to the [SignalRestClient] constructor. */
|
||||
data object Standard : Auth
|
||||
|
||||
/** A caller-supplied header. */
|
||||
data class Header(val name: String, val value: String) : Auth
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
/*
|
||||
* Copyright 2026 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.signal.network.rest
|
||||
|
||||
/**
|
||||
* The result of a successful (2xx) [SignalRestClient] request when no response class was provided.
|
||||
* Holds the raw status, headers, and body bytes for the caller to consume.
|
||||
*/
|
||||
data class RestResponse(
|
||||
val statusCode: Int,
|
||||
val headers: Map<String, String>,
|
||||
val body: ByteArray
|
||||
) {
|
||||
override fun equals(other: Any?): Boolean {
|
||||
if (this === other) return true
|
||||
if (other !is RestResponse) return false
|
||||
return statusCode == other.statusCode && headers == other.headers && body.contentEquals(other.body)
|
||||
}
|
||||
|
||||
override fun hashCode(): Int {
|
||||
var result = statusCode
|
||||
result = 31 * result + headers.hashCode()
|
||||
result = 31 * result + body.contentHashCode()
|
||||
return result
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,33 @@
|
||||
/*
|
||||
* Copyright 2026 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.signal.network.rest
|
||||
|
||||
import org.signal.libsignal.net.BadRequestError
|
||||
|
||||
/**
|
||||
* Default error returned when a request produces a non-2xx response and the caller didn't supply
|
||||
* a custom [ErrorMapper]. Carries the raw response info so callers can inspect after the fact.
|
||||
*/
|
||||
data class RestStatusCodeError(
|
||||
val statusCode: Int,
|
||||
val headers: Map<String, String>,
|
||||
val body: ByteArray?
|
||||
) : BadRequestError {
|
||||
override fun equals(other: Any?): Boolean {
|
||||
if (this === other) return true
|
||||
if (other !is RestStatusCodeError) return false
|
||||
return statusCode == other.statusCode &&
|
||||
headers == other.headers &&
|
||||
(body?.contentEquals(other.body) ?: (other.body == null))
|
||||
}
|
||||
|
||||
override fun hashCode(): Int {
|
||||
var result = statusCode
|
||||
result = 31 * result + headers.hashCode()
|
||||
result = 31 * result + (body?.contentHashCode() ?: 0)
|
||||
return result
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,549 @@
|
||||
/*
|
||||
* Copyright 2026 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.signal.network.rest
|
||||
|
||||
import kotlinx.coroutines.suspendCancellableCoroutine
|
||||
import okhttp3.Call
|
||||
import okhttp3.Callback
|
||||
import okhttp3.ConnectionPool
|
||||
import okhttp3.ConnectionSpec
|
||||
import okhttp3.Credentials
|
||||
import okhttp3.Dns
|
||||
import okhttp3.HttpUrl
|
||||
import okhttp3.HttpUrl.Companion.toHttpUrl
|
||||
import okhttp3.Interceptor
|
||||
import okhttp3.OkHttpClient
|
||||
import okhttp3.Request
|
||||
import okhttp3.RequestBody
|
||||
import okhttp3.RequestBody.Companion.toRequestBody
|
||||
import okhttp3.Response
|
||||
import okio.Buffer
|
||||
import okio.BufferedSink
|
||||
import okio.ByteString
|
||||
import okio.ForwardingSink
|
||||
import okio.buffer
|
||||
import org.signal.core.util.logging.Log
|
||||
import org.signal.libsignal.net.BadRequestError
|
||||
import org.signal.libsignal.net.RequestResult
|
||||
import org.signal.network.rest.RequestSpec.Host
|
||||
import org.signal.network.util.JsonUtil
|
||||
import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment.ProgressListener
|
||||
import org.whispersystems.signalservice.api.push.SignalServiceAddress
|
||||
import org.whispersystems.signalservice.api.util.CredentialsProvider
|
||||
import org.whispersystems.signalservice.api.util.Tls12SocketFactory
|
||||
import org.whispersystems.signalservice.api.util.TlsProxySocketFactory
|
||||
import org.whispersystems.signalservice.internal.configuration.SignalProxy
|
||||
import org.whispersystems.signalservice.internal.configuration.SignalServiceConfiguration
|
||||
import org.whispersystems.signalservice.internal.configuration.SignalUrl
|
||||
import org.whispersystems.signalservice.internal.util.BlacklistingTrustManager
|
||||
import java.io.IOException
|
||||
import java.io.OutputStream
|
||||
import java.security.KeyManagementException
|
||||
import java.security.NoSuchAlgorithmException
|
||||
import java.security.SecureRandom
|
||||
import java.util.Optional
|
||||
import java.util.Random
|
||||
import java.util.concurrent.TimeUnit
|
||||
import javax.net.ssl.SSLContext
|
||||
import javax.net.ssl.X509TrustManager
|
||||
import kotlin.coroutines.resume
|
||||
import kotlin.jvm.Throws
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
/**
|
||||
* A suspending REST client that handles common infrastructure like CC, cert pinning, DNS, and proxies.
|
||||
* It also standardizes responses to be [RequestResult]s.
|
||||
*
|
||||
* Only use this for requests that cannot be done over the websocket (generally CDN, storage service, etc).
|
||||
*/
|
||||
class SignalRestClient @JvmOverloads constructor(
|
||||
private val configuration: SignalServiceConfiguration,
|
||||
private val signalAgent: String?,
|
||||
private val credentialsProvider: CredentialsProvider? = null,
|
||||
private val automaticNetworkRetry: Boolean = true,
|
||||
private val socketTimeoutMillis: Long = DEFAULT_TIMEOUT_MILLIS,
|
||||
private val random: Random = SecureRandom(),
|
||||
private val clientOverride: OkHttpClient? = null
|
||||
) {
|
||||
|
||||
companion object {
|
||||
private val TAG = Log.tag(SignalRestClient::class)
|
||||
|
||||
private const val AUTHORIZATION_HEADER = "Authorization"
|
||||
private const val RANGE_HEADER = "Range"
|
||||
private const val DEFAULT_TIMEOUT_MILLIS = 30_000L
|
||||
private const val DOWNLOAD_BUFFER_SIZE = 32 * 1024
|
||||
|
||||
private val EMPTY_BODY: RequestBody = ByteArray(0).toRequestBody()
|
||||
|
||||
private val DEFAULT_ERROR_MAPPER = ErrorMapper { statusCode, headers, body ->
|
||||
RestStatusCodeError(statusCode, headers, body)
|
||||
}
|
||||
|
||||
private fun createConnectionHolders(
|
||||
urls: Array<out SignalUrl>,
|
||||
interceptors: List<Interceptor>,
|
||||
dns: Optional<Dns>,
|
||||
proxy: Optional<SignalProxy>,
|
||||
clientOverride: OkHttpClient?
|
||||
): Array<ConnectionHolder> {
|
||||
return urls.map { url ->
|
||||
ConnectionHolder(
|
||||
client = clientOverride ?: buildClient(url, interceptors, dns, proxy),
|
||||
url = url.url,
|
||||
hostHeader = url.hostHeader
|
||||
)
|
||||
}.toTypedArray()
|
||||
}
|
||||
|
||||
private fun buildClient(
|
||||
url: SignalUrl,
|
||||
interceptors: List<Interceptor>,
|
||||
dns: Optional<Dns>,
|
||||
proxy: Optional<SignalProxy>
|
||||
): OkHttpClient {
|
||||
try {
|
||||
val trustManagers = BlacklistingTrustManager.createFor(url.trustStore)
|
||||
val sslContext = SSLContext.getInstance("TLS").apply { init(null, trustManagers, null) }
|
||||
|
||||
val builder = OkHttpClient.Builder()
|
||||
.sslSocketFactory(Tls12SocketFactory(sslContext.socketFactory), trustManagers[0] as X509TrustManager)
|
||||
.connectionSpecs(url.connectionSpecs.orElse(listOf(ConnectionSpec.RESTRICTED_TLS))!!)
|
||||
.dns(dns.orElse(Dns.SYSTEM)!!)
|
||||
.connectionPool(ConnectionPool(5, 45, TimeUnit.SECONDS))
|
||||
|
||||
if (proxy.isPresent) {
|
||||
builder.socketFactory(TlsProxySocketFactory(proxy.get().host, proxy.get().port, dns))
|
||||
}
|
||||
|
||||
for (interceptor in interceptors) {
|
||||
builder.addInterceptor(interceptor)
|
||||
}
|
||||
|
||||
return builder.build()
|
||||
} catch (e: NoSuchAlgorithmException) {
|
||||
throw AssertionError(e)
|
||||
} catch (e: KeyManagementException) {
|
||||
throw AssertionError(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private val serviceClients: Array<ConnectionHolder> = createConnectionHolders(
|
||||
configuration.signalServiceUrls,
|
||||
configuration.networkInterceptors,
|
||||
configuration.dns,
|
||||
configuration.signalProxy,
|
||||
clientOverride
|
||||
)
|
||||
|
||||
private val cdnClientsMap: Map<Int, Array<ConnectionHolder>> = configuration.signalCdnUrlMap
|
||||
.mapValues { (_, urls) ->
|
||||
createConnectionHolders(
|
||||
urls = urls,
|
||||
interceptors = configuration.networkInterceptors,
|
||||
dns = configuration.dns,
|
||||
proxy = configuration.signalProxy,
|
||||
clientOverride = clientOverride
|
||||
)
|
||||
}
|
||||
|
||||
private val storageClients: Array<ConnectionHolder> = createConnectionHolders(
|
||||
configuration.signalStorageUrls,
|
||||
configuration.networkInterceptors,
|
||||
configuration.dns,
|
||||
configuration.signalProxy,
|
||||
clientOverride
|
||||
)
|
||||
|
||||
/**
|
||||
* Make a request, returning the raw [RestResponse] on success. Non-2xx responses are mapped to
|
||||
* a [RestStatusCodeError]. If a [progressListener] is supplied and [RequestSpec.body] is non-null,
|
||||
* upload progress will be reported as bytes are written to the wire.
|
||||
*/
|
||||
suspend fun request(
|
||||
spec: RequestSpec,
|
||||
progressListener: ProgressListener? = null
|
||||
): RequestResult<RestResponse, RestStatusCodeError> {
|
||||
return execute(spec, responseClass = null, errorMapper = DEFAULT_ERROR_MAPPER, progressListener = progressListener)
|
||||
}
|
||||
|
||||
/**
|
||||
* Make a request and decode the 2xx body via [JsonUtil] (or pass through directly for [Unit],
|
||||
* [String], or [ByteArray]). Non-2xx responses are mapped to [RestStatusCodeError]. If a
|
||||
* [progressListener] is supplied and [RequestSpec.body] is non-null, upload progress will be
|
||||
* reported as bytes are written to the wire.
|
||||
*/
|
||||
suspend fun <T : Any> request(
|
||||
spec: RequestSpec,
|
||||
responseClass: KClass<T>,
|
||||
progressListener: ProgressListener? = null
|
||||
): RequestResult<T, RestStatusCodeError> {
|
||||
return execute(spec, responseClass = responseClass, errorMapper = DEFAULT_ERROR_MAPPER, progressListener = progressListener)
|
||||
}
|
||||
|
||||
/**
|
||||
* Make a request returning the raw [RestResponse] on success, using a custom [ErrorMapper] for
|
||||
* non-2xx responses.
|
||||
*/
|
||||
suspend fun <E : BadRequestError> request(
|
||||
spec: RequestSpec,
|
||||
errorMapper: ErrorMapper<E>,
|
||||
progressListener: ProgressListener? = null
|
||||
): RequestResult<RestResponse, E> {
|
||||
return execute(spec, responseClass = null, errorMapper = errorMapper, progressListener = progressListener)
|
||||
}
|
||||
|
||||
/**
|
||||
* Make a request, decoding the 2xx body to [T] and mapping non-2xx via the supplied
|
||||
* [ErrorMapper].
|
||||
*/
|
||||
suspend fun <T : Any, E : BadRequestError> request(
|
||||
spec: RequestSpec,
|
||||
responseClass: KClass<T>,
|
||||
errorMapper: ErrorMapper<E>,
|
||||
progressListener: ProgressListener? = null
|
||||
): RequestResult<T, E> {
|
||||
return execute(spec, responseClass = responseClass, errorMapper = errorMapper, progressListener = progressListener)
|
||||
}
|
||||
|
||||
/**
|
||||
* Stream the response body for [spec] into [destination], reporting progress through
|
||||
* [progressListener] as bytes flow.
|
||||
*
|
||||
* - [offset] adds a `Range: bytes=<offset>-` header (use for resuming partial downloads). It is
|
||||
* also used as the starting count for progress reporting.
|
||||
* - [maxSize] caps the total bytes streamed; exceeding it produces a [RequestResult.RetryableNetworkError].
|
||||
* - Cancellation propagates from coroutine cancellation and from [ProgressListener.shouldCancel].
|
||||
*
|
||||
* Non-2xx responses produce a [RequestResult.NonSuccess] using [RestStatusCodeError].
|
||||
*/
|
||||
suspend fun download(
|
||||
spec: RequestSpec,
|
||||
destination: OutputStream,
|
||||
offset: Long = 0,
|
||||
maxSize: Long = Long.MAX_VALUE,
|
||||
progressListener: ProgressListener? = null
|
||||
): RequestResult<DownloadResult, RestStatusCodeError> {
|
||||
return executeDownload(spec, destination, offset, maxSize, progressListener, DEFAULT_ERROR_MAPPER)
|
||||
}
|
||||
|
||||
/**
|
||||
* Same as [download] but with a caller-supplied [ErrorMapper].
|
||||
*/
|
||||
suspend fun <E : BadRequestError> download(
|
||||
spec: RequestSpec,
|
||||
destination: OutputStream,
|
||||
errorMapper: ErrorMapper<E>,
|
||||
offset: Long = 0,
|
||||
maxSize: Long = Long.MAX_VALUE,
|
||||
progressListener: ProgressListener? = null
|
||||
): RequestResult<DownloadResult, E> {
|
||||
return executeDownload(spec, destination, offset, maxSize, progressListener, errorMapper)
|
||||
}
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
private suspend fun <T, E : BadRequestError> execute(
|
||||
spec: RequestSpec,
|
||||
responseClass: KClass<*>?,
|
||||
errorMapper: ErrorMapper<E>,
|
||||
progressListener: ProgressListener?
|
||||
): RequestResult<T, E> {
|
||||
return try {
|
||||
val holder = pickHolder(spec.host)
|
||||
val effectiveBody = spec.body?.let { body ->
|
||||
if (progressListener != null) ProgressRequestBody(body, progressListener) else body
|
||||
}
|
||||
val httpRequest = buildHttpRequest(spec, holder, effectiveBody, extraHeaders = emptyMap())
|
||||
val client = newCallClient(holder)
|
||||
val response = client.newCall(httpRequest).await()
|
||||
|
||||
response.use { resp ->
|
||||
val body = resp.body.bytes()
|
||||
val headers = resp.headers.toLowercaseMap()
|
||||
val code = resp.code
|
||||
if (code in 200..299) {
|
||||
val parsed = parseSuccessBody(code, headers, body, responseClass)
|
||||
RequestResult.Success(parsed as T)
|
||||
} else {
|
||||
RequestResult.NonSuccess(errorMapper.map(code, headers, body))
|
||||
}
|
||||
}
|
||||
} catch (e: IOException) {
|
||||
RequestResult.RetryableNetworkError(e)
|
||||
} catch (e: Throwable) {
|
||||
RequestResult.ApplicationError(e)
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun <E : BadRequestError> executeDownload(
|
||||
spec: RequestSpec,
|
||||
destination: OutputStream,
|
||||
offset: Long,
|
||||
maxSize: Long,
|
||||
progressListener: ProgressListener?,
|
||||
errorMapper: ErrorMapper<E>
|
||||
): RequestResult<DownloadResult, E> {
|
||||
require(offset >= 0) { "offset must be non-negative (was $offset)" }
|
||||
require(maxSize >= 0) { "maxSize must be non-negative (was $maxSize)" }
|
||||
|
||||
return try {
|
||||
val holder = pickHolder(spec.host)
|
||||
val rangeHeader = if (offset > 0) mapOf(RANGE_HEADER to "bytes=$offset-") else emptyMap()
|
||||
val httpRequest = buildHttpRequest(spec, holder, body = spec.body, extraHeaders = rangeHeader)
|
||||
val client = newCallClient(holder)
|
||||
val call = client.newCall(httpRequest)
|
||||
val response = call.await()
|
||||
|
||||
response.use { resp ->
|
||||
val code = resp.code
|
||||
val headers = resp.headers.toLowercaseMap()
|
||||
if (code in 200..299) {
|
||||
val totalBytes = streamToDestination(resp, destination, offset, maxSize, progressListener, call)
|
||||
RequestResult.Success(DownloadResult(code, headers, totalBytes))
|
||||
} else {
|
||||
val errorBody = runCatching { resp.body.bytes() }.getOrNull()
|
||||
RequestResult.NonSuccess(errorMapper.map(code, headers, errorBody))
|
||||
}
|
||||
}
|
||||
} catch (e: IOException) {
|
||||
RequestResult.RetryableNetworkError(e)
|
||||
} catch (e: Throwable) {
|
||||
RequestResult.ApplicationError(e)
|
||||
}
|
||||
}
|
||||
|
||||
@Throws(IOException::class)
|
||||
private fun streamToDestination(
|
||||
response: Response,
|
||||
destination: OutputStream,
|
||||
startingOffset: Long,
|
||||
maxSize: Long,
|
||||
progressListener: ProgressListener?,
|
||||
call: Call
|
||||
): Long {
|
||||
val body = response.body
|
||||
val contentLength = body.contentLength()
|
||||
if (contentLength > 0 && contentLength + startingOffset > maxSize) {
|
||||
throw IOException("Response exceeds max size!")
|
||||
}
|
||||
|
||||
val totalExpected = if (contentLength > 0) contentLength + startingOffset else -1L
|
||||
val input = body.byteStream()
|
||||
val buffer = ByteArray(DOWNLOAD_BUFFER_SIZE)
|
||||
var totalBytes = startingOffset
|
||||
|
||||
while (true) {
|
||||
val read = input.read(buffer)
|
||||
if (read == -1) break
|
||||
|
||||
if (progressListener?.shouldCancel() == true) {
|
||||
runCatching { call.cancel() }
|
||||
throw IOException("Canceled by listener check.")
|
||||
}
|
||||
|
||||
destination.write(buffer, 0, read)
|
||||
totalBytes += read
|
||||
|
||||
if (totalBytes > maxSize) {
|
||||
throw IOException("Response exceeded max size!")
|
||||
}
|
||||
|
||||
if (progressListener != null && totalExpected > 0) {
|
||||
progressListener.onAttachmentProgress(AttachmentTransferProgress(totalExpected, totalBytes))
|
||||
}
|
||||
}
|
||||
|
||||
return totalBytes
|
||||
}
|
||||
|
||||
private fun pickHolder(host: Host): ConnectionHolder {
|
||||
val pool: Array<ConnectionHolder> = when (host) {
|
||||
is Host.Service -> serviceClients
|
||||
is Host.Storage -> storageClients
|
||||
is Host.Cdn -> cdnClientsMap[host.number]
|
||||
?: throw IllegalArgumentException("No CDN configuration for number ${host.number}")
|
||||
}
|
||||
return pool[random.nextInt(pool.size)]
|
||||
}
|
||||
|
||||
private fun newCallClient(holder: ConnectionHolder): OkHttpClient {
|
||||
return holder.client.newBuilder()
|
||||
.connectTimeout(socketTimeoutMillis, TimeUnit.MILLISECONDS)
|
||||
.readTimeout(socketTimeoutMillis, TimeUnit.MILLISECONDS)
|
||||
.retryOnConnectionFailure(automaticNetworkRetry)
|
||||
.build()
|
||||
}
|
||||
|
||||
private fun buildHttpRequest(
|
||||
spec: RequestSpec,
|
||||
holder: ConnectionHolder,
|
||||
body: RequestBody?,
|
||||
extraHeaders: Map<String, String>
|
||||
): Request {
|
||||
val requestBody = body ?: when (spec.method) {
|
||||
RequestSpec.Method.POST, RequestSpec.Method.PUT, RequestSpec.Method.PATCH -> EMPTY_BODY
|
||||
else -> null
|
||||
}
|
||||
|
||||
val builder = Request.Builder()
|
||||
.url(computeRequestUrl(spec.path, holder))
|
||||
.method(spec.method.value, requestBody)
|
||||
|
||||
for ((key, value) in spec.headers) {
|
||||
builder.addHeader(key, value)
|
||||
}
|
||||
|
||||
for ((key, value) in extraHeaders) {
|
||||
if (!spec.headers.containsKey(key)) {
|
||||
builder.addHeader(key, value)
|
||||
}
|
||||
}
|
||||
|
||||
when (val auth = spec.auth) {
|
||||
is RequestSpec.Auth.None -> Unit
|
||||
is RequestSpec.Auth.Standard -> {
|
||||
val provider = checkNotNull(credentialsProvider) { "RequestSpec.Auth.Basic requires a CredentialsProvider on SignalRestClient" }
|
||||
if (!spec.headers.containsKey(AUTHORIZATION_HEADER)) {
|
||||
builder.addHeader(AUTHORIZATION_HEADER, basicAuthHeader(provider))
|
||||
} else {
|
||||
Log.w(TAG, "Requested Basic auth, but there was already an auth header. Keeping existing auth header.")
|
||||
}
|
||||
}
|
||||
is RequestSpec.Auth.Header -> {
|
||||
if (!spec.headers.containsKey(auth.name)) {
|
||||
builder.addHeader(auth.name, auth.value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (signalAgent != null) {
|
||||
builder.addHeader("X-Signal-Agent", signalAgent)
|
||||
}
|
||||
|
||||
holder.hostHeader.ifPresent { builder.addHeader("Host", it) }
|
||||
|
||||
return builder.build()
|
||||
}
|
||||
|
||||
/**
|
||||
* Turn a [path] into a concrete [HttpUrl]. For relative paths we just append. For
|
||||
* absolute URLs (e.g. resumable upload locations the server hands us) we keep the scheme, host,
|
||||
* port, and base path from the pinned [holder] and graft the supplied path/query/fragment on top
|
||||
* — the same trick `PushServiceSocket.buildConfiguredUrl` plays so traffic stays on our
|
||||
* cert-pinned endpoints regardless of what URL the server returned.
|
||||
*/
|
||||
private fun computeRequestUrl(path: String, holder: ConnectionHolder): HttpUrl {
|
||||
if (!path.startsWith("http://") && !path.startsWith("https://")) {
|
||||
return (holder.url + path).toHttpUrl()
|
||||
}
|
||||
|
||||
val absolute = path.toHttpUrl()
|
||||
val base = holder.url.toHttpUrl()
|
||||
return HttpUrl.Builder()
|
||||
.scheme(base.scheme)
|
||||
.host(base.host)
|
||||
.port(base.port)
|
||||
.encodedPath(base.encodedPath)
|
||||
.addEncodedPathSegments(absolute.encodedPath.removePrefix("/"))
|
||||
.apply {
|
||||
absolute.encodedQuery?.let { encodedQuery(it) }
|
||||
absolute.encodedFragment?.let { encodedFragment(it) }
|
||||
}
|
||||
.build()
|
||||
}
|
||||
|
||||
private fun parseSuccessBody(
|
||||
statusCode: Int,
|
||||
headers: Map<String, String>,
|
||||
body: ByteArray,
|
||||
responseClass: KClass<*>?
|
||||
): Any {
|
||||
if (responseClass == null) {
|
||||
return RestResponse(statusCode, headers, body)
|
||||
}
|
||||
return when (responseClass) {
|
||||
Unit::class -> Unit
|
||||
String::class -> body.toString(Charsets.UTF_8)
|
||||
ByteArray::class -> body
|
||||
ByteString::class -> ByteString.of(*body)
|
||||
else -> JsonUtil.fromJson(body, responseClass.java)
|
||||
}
|
||||
}
|
||||
|
||||
private fun basicAuthHeader(provider: CredentialsProvider): String {
|
||||
val baseIdentifier = provider.aci?.toString() ?: provider.e164
|
||||
val identifier = if (provider.deviceId != SignalServiceAddress.DEFAULT_DEVICE_ID) {
|
||||
"$baseIdentifier.${provider.deviceId}"
|
||||
} else {
|
||||
baseIdentifier
|
||||
}
|
||||
|
||||
return Credentials.basic(identifier, provider.password)
|
||||
}
|
||||
|
||||
private fun okhttp3.Headers.toLowercaseMap(): Map<String, String> {
|
||||
val map = LinkedHashMap<String, String>(size)
|
||||
for (i in 0 until size) {
|
||||
map[name(i).lowercase()] = value(i)
|
||||
}
|
||||
return map
|
||||
}
|
||||
|
||||
private suspend fun Call.await(): Response = suspendCancellableCoroutine { cont ->
|
||||
cont.invokeOnCancellation { runCatching { cancel() } }
|
||||
enqueue(object : Callback {
|
||||
override fun onFailure(call: Call, e: IOException) {
|
||||
if (!cont.isCancelled) cont.resumeWith(Result.failure(e))
|
||||
}
|
||||
|
||||
override fun onResponse(call: Call, response: Response) {
|
||||
cont.resume(response)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
private data class ConnectionHolder(
|
||||
val client: OkHttpClient,
|
||||
val url: String,
|
||||
val hostHeader: Optional<String>
|
||||
)
|
||||
|
||||
/**
|
||||
* Wraps an outgoing [RequestBody] to report progress as bytes flow to the underlying sink. Used
|
||||
* internally when callers pass a [ProgressListener] to one of the upload-capable [request]
|
||||
* overloads.
|
||||
*/
|
||||
private class ProgressRequestBody(
|
||||
private val delegate: RequestBody,
|
||||
private val listener: ProgressListener
|
||||
) : RequestBody() {
|
||||
override fun contentType() = delegate.contentType()
|
||||
|
||||
override fun contentLength(): Long = delegate.contentLength()
|
||||
|
||||
override fun isOneShot(): Boolean = delegate.isOneShot()
|
||||
|
||||
override fun writeTo(sink: BufferedSink) {
|
||||
val total = contentLength()
|
||||
val countingSink = object : ForwardingSink(sink) {
|
||||
var written: Long = 0
|
||||
|
||||
override fun write(source: Buffer, byteCount: Long) {
|
||||
super.write(source, byteCount)
|
||||
written += byteCount
|
||||
if (total > 0) {
|
||||
listener.onAttachmentProgress(AttachmentTransferProgress(total, written))
|
||||
}
|
||||
}
|
||||
}
|
||||
val buffered = countingSink.buffer()
|
||||
delegate.writeTo(buffered)
|
||||
buffered.flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,46 @@
|
||||
/*
|
||||
* Copyright 2026 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.signal.network.rest
|
||||
|
||||
import okhttp3.MediaType
|
||||
import okhttp3.MediaType.Companion.toMediaTypeOrNull
|
||||
import okhttp3.RequestBody
|
||||
import okio.BufferedSink
|
||||
import okio.source
|
||||
import java.io.InputStream
|
||||
|
||||
/**
|
||||
* A [RequestBody] that streams its content from an [InputStream] rather than buffering it all in
|
||||
* memory. Suitable for uploading large files / blobs through [SignalRestClient.request].
|
||||
*
|
||||
* Pair with a [org.whispersystems.signalservice.api.messages.SignalServiceAttachment.ProgressListener]
|
||||
* passed to `request()` to receive upload progress as bytes flow.
|
||||
*
|
||||
* @param source The data to upload. Will be read once and not closed by this body; callers should
|
||||
* close it themselves (or wrap it in a stream that closes the underlying resource).
|
||||
* @param contentLength The total number of bytes to upload, or `-1` if unknown (causes chunked
|
||||
* transfer encoding). Progress reporting requires a known length.
|
||||
* @param contentType Optional `Content-Type` for the body, e.g. `"application/octet-stream"`.
|
||||
*/
|
||||
class StreamingRequestBody(
|
||||
private val source: InputStream,
|
||||
private val contentLength: Long,
|
||||
private val contentType: String? = null
|
||||
) : RequestBody() {
|
||||
|
||||
override fun contentType(): MediaType? = contentType?.toMediaTypeOrNull()
|
||||
|
||||
override fun contentLength(): Long = contentLength
|
||||
|
||||
/** [InputStream] cannot be rewound, so retries that require replaying the body are not safe. */
|
||||
override fun isOneShot(): Boolean = true
|
||||
|
||||
override fun writeTo(sink: BufferedSink) {
|
||||
source.source().use { okSource ->
|
||||
sink.writeAll(okSource)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,157 @@
|
||||
/*
|
||||
* Copyright 2026 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.signal.network.service
|
||||
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import org.signal.libsignal.net.RequestResult
|
||||
import org.signal.libsignal.net.UploadTooLargeException
|
||||
import org.signal.network.api.AttachmentApi
|
||||
import org.signal.network.exceptions.PushNetworkException
|
||||
import org.signal.network.rest.RequestSpec
|
||||
import org.signal.network.rest.RequestSpec.Method
|
||||
import org.signal.network.rest.RestStatusCodeError
|
||||
import org.signal.network.rest.SignalRestClient
|
||||
import org.signal.network.rest.toNonSuccessfulResponseCodeException
|
||||
import org.whispersystems.signalservice.internal.push.AttachmentUploadForm
|
||||
import org.whispersystems.signalservice.internal.push.PushServiceSocket
|
||||
import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec
|
||||
import org.whispersystems.signalservice.internal.util.Util
|
||||
import java.io.IOException
|
||||
|
||||
/**
|
||||
* Assists in CDN operations.
|
||||
*/
|
||||
class CdnService(
|
||||
private val signalRestClient: SignalRestClient,
|
||||
private val attachmentApi: AttachmentApi
|
||||
) {
|
||||
|
||||
/**
|
||||
* POST to the signed upload location from [uploadForm] to obtain a resumable upload URL. The
|
||||
* URL is returned in the response's `Location` header. The exact headers we send depend on the
|
||||
* CDN version the upload form targets:
|
||||
*
|
||||
* - CDN 2: legacy resumable upload — `Content-Type: application/octet-stream`.
|
||||
* - CDN 3: TUS protocol — `Upload-Defer-Length: 1`, `Tus-Resumable: 1.0.0`, plus an optional
|
||||
* `x-signal-checksum-sha256` if [checksumSha256] is provided.
|
||||
*/
|
||||
suspend fun getResumableUploadUrl(uploadForm: AttachmentUploadForm, checksumSha256: String? = null): RequestResult<String, RestStatusCodeError> {
|
||||
val headers = mutableMapOf<String, String>()
|
||||
for ((key, value) in uploadForm.headers) {
|
||||
if (!key.equals("host", ignoreCase = true)) {
|
||||
headers[key] = value
|
||||
}
|
||||
}
|
||||
headers["Content-Length"] = "0"
|
||||
|
||||
when (uploadForm.cdn) {
|
||||
2 -> headers["Content-Type"] = "application/octet-stream"
|
||||
3 -> {
|
||||
headers["Upload-Defer-Length"] = "1"
|
||||
headers["Tus-Resumable"] = "1.0.0"
|
||||
if (checksumSha256 != null) {
|
||||
headers["x-signal-checksum-sha256"] = checksumSha256
|
||||
}
|
||||
}
|
||||
else -> return RequestResult.ApplicationError(AssertionError("Unknown CDN version: ${uploadForm.cdn}"))
|
||||
}
|
||||
|
||||
val spec = RequestSpec(
|
||||
method = Method.POST,
|
||||
host = RequestSpec.Host.Cdn(uploadForm.cdn),
|
||||
path = uploadForm.signedUploadLocation,
|
||||
headers = headers
|
||||
)
|
||||
|
||||
return when (val result = signalRestClient.request(spec)) {
|
||||
is RequestResult.Success -> {
|
||||
val location = result.result.headers["location"]
|
||||
if (location != null) {
|
||||
RequestResult.Success(location)
|
||||
} else {
|
||||
RequestResult.ApplicationError(IOException("Missing Location header in resumable-upload response"))
|
||||
}
|
||||
}
|
||||
is RequestResult.NonSuccess -> result
|
||||
is RequestResult.RetryableNetworkError -> result
|
||||
is RequestResult.ApplicationError -> result
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches a v4 attachment upload form (sized for [uploadSizeBytes]) and turns it into a
|
||||
* ready-to-use [ResumableUploadSpec].
|
||||
*
|
||||
* This is a composite of two requests (fetch form + fetch resumable URL), so it has more possible
|
||||
* outcomes than a single request — see [ResumableUploadSpecResult].
|
||||
*/
|
||||
suspend fun getResumableUploadSpec(uploadSizeBytes: Long): ResumableUploadSpecResult {
|
||||
val form: AttachmentUploadForm = when (val formResult = attachmentApi.getAttachmentV4UploadForm(uploadSizeBytes)) {
|
||||
is RequestResult.Success -> formResult.result
|
||||
is RequestResult.NonSuccess -> return ResumableUploadSpecResult.UploadTooLarge(formResult.error)
|
||||
is RequestResult.RetryableNetworkError -> return ResumableUploadSpecResult.NetworkError(formResult.networkError)
|
||||
is RequestResult.ApplicationError -> return ResumableUploadSpecResult.ApplicationError(formResult.cause)
|
||||
}
|
||||
|
||||
return when (val urlResult = getResumableUploadUrl(form)) {
|
||||
is RequestResult.Success -> ResumableUploadSpecResult.Success(
|
||||
ResumableUploadSpec(
|
||||
attachmentKey = Util.getSecretBytes(64),
|
||||
attachmentIv = Util.getSecretBytes(16),
|
||||
cdnKey = form.key,
|
||||
cdnNumber = form.cdn,
|
||||
resumeLocation = urlResult.result,
|
||||
expirationTimestamp = System.currentTimeMillis() + PushServiceSocket.CDN2_RESUMABLE_LINK_LIFETIME_MILLIS,
|
||||
headers = form.headers
|
||||
)
|
||||
)
|
||||
is RequestResult.NonSuccess -> ResumableUploadSpecResult.UploadUrlStatusError(urlResult.error)
|
||||
is RequestResult.RetryableNetworkError -> ResumableUploadSpecResult.NetworkError(urlResult.networkError)
|
||||
is RequestResult.ApplicationError -> ResumableUploadSpecResult.ApplicationError(urlResult.cause)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Legacy adapter over [getResumableUploadSpec] that throws on failure. Should only be used
|
||||
* by java code.
|
||||
*/
|
||||
@Throws(IOException::class)
|
||||
fun getResumableUploadSpecBlocking(uploadSizeBytes: Long): ResumableUploadSpec {
|
||||
return when (val result = runBlocking { getResumableUploadSpec(uploadSizeBytes) }) {
|
||||
is ResumableUploadSpecResult.Success -> result.spec
|
||||
is ResumableUploadSpecResult.UploadTooLarge -> throw result.exception
|
||||
is ResumableUploadSpecResult.UploadUrlStatusError -> throw result.error.toNonSuccessfulResponseCodeException()
|
||||
is ResumableUploadSpecResult.NetworkError -> throw PushNetworkException(result.exception)
|
||||
is ResumableUploadSpecResult.ApplicationError -> when (val cause = result.throwable) {
|
||||
is IOException -> throw cause
|
||||
is RuntimeException -> throw cause
|
||||
else -> throw RuntimeException(cause)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The possible outcomes of [getResumableUploadSpec]. Because that call composes multiple requests,
|
||||
* it can't honestly be represented as a single [RequestResult] — each failure mode here means
|
||||
* something different to a caller.
|
||||
*/
|
||||
sealed interface ResumableUploadSpecResult {
|
||||
/** Got a usable spec. */
|
||||
data class Success(val spec: ResumableUploadSpec) : ResumableUploadSpecResult
|
||||
|
||||
/** The server rejected the upload because it's larger than the maximum supported size. */
|
||||
data class UploadTooLarge(val exception: UploadTooLargeException) : ResumableUploadSpecResult
|
||||
|
||||
/** The request for a resumable upload URL produced a non-2xx HTTP response. */
|
||||
data class UploadUrlStatusError(val error: RestStatusCodeError) : ResumableUploadSpecResult
|
||||
|
||||
/** A retryable network failure occurred during one of the underlying requests. */
|
||||
data class NetworkError(val exception: IOException) : ResumableUploadSpecResult
|
||||
|
||||
/** An unexpected client-side failure (likely a bug). */
|
||||
data class ApplicationError(val throwable: Throwable) : ResumableUploadSpecResult
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,217 @@
|
||||
/*
|
||||
* Copyright 2026 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.signal.network.rest
|
||||
|
||||
import assertk.assertThat
|
||||
import assertk.assertions.isEqualTo
|
||||
import assertk.assertions.isInstanceOf
|
||||
import assertk.assertions.isSameInstanceAs
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import okhttp3.Interceptor
|
||||
import okhttp3.MediaType.Companion.toMediaType
|
||||
import okhttp3.OkHttpClient
|
||||
import okhttp3.Protocol
|
||||
import okhttp3.Request
|
||||
import okhttp3.Response
|
||||
import okhttp3.ResponseBody.Companion.toResponseBody
|
||||
import org.junit.Test
|
||||
import org.signal.libsignal.net.BadRequestError
|
||||
import org.signal.libsignal.net.RequestResult
|
||||
import org.signal.network.rest.RequestSpec.Host
|
||||
import org.signal.network.rest.RequestSpec.Method
|
||||
import org.whispersystems.signalservice.api.push.TrustStore
|
||||
import org.whispersystems.signalservice.internal.configuration.SignalCdnUrl
|
||||
import org.whispersystems.signalservice.internal.configuration.SignalCdsiUrl
|
||||
import org.whispersystems.signalservice.internal.configuration.SignalServiceConfiguration
|
||||
import org.whispersystems.signalservice.internal.configuration.SignalServiceUrl
|
||||
import org.whispersystems.signalservice.internal.configuration.SignalStorageUrl
|
||||
import org.whispersystems.signalservice.internal.configuration.SignalSvr2Url
|
||||
import java.io.ByteArrayInputStream
|
||||
import java.io.IOException
|
||||
import java.util.Optional
|
||||
import java.util.Random
|
||||
|
||||
class SignalRestClientTest {
|
||||
|
||||
private val recordedRequests = mutableListOf<Request>()
|
||||
|
||||
/** Default: 200 with an empty JSON body. Override per-test before issuing a request. */
|
||||
private var responder: (Request) -> Response = { req -> response(req, 200, "{}") }
|
||||
|
||||
private val recordingClient: OkHttpClient = OkHttpClient.Builder()
|
||||
.addInterceptor(
|
||||
Interceptor { chain ->
|
||||
val request = chain.request()
|
||||
recordedRequests += request
|
||||
responder(request)
|
||||
}
|
||||
)
|
||||
.build()
|
||||
|
||||
@Test
|
||||
fun `cycles through service urls based on injected random`() = runBlockingTest {
|
||||
val client = client(random = ScriptedRandom(0, 2, 1, 0))
|
||||
|
||||
repeat(4) {
|
||||
client.request(RequestSpec(Method.GET, Host.Service, "/v1/ping"))
|
||||
}
|
||||
|
||||
assertThat(recordedRequests.map { it.url.host }).isEqualTo(
|
||||
listOf("service-a.test", "service-c.test", "service-b.test", "service-a.test")
|
||||
)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `routes to the cdn pool for the requested cdn number`() = runBlockingTest {
|
||||
val client = client(random = ScriptedRandom(0, 0))
|
||||
|
||||
client.request(RequestSpec(Method.GET, Host.Cdn(2), "/file"))
|
||||
client.request(RequestSpec(Method.GET, Host.Cdn(3), "/file"))
|
||||
|
||||
assertThat(recordedRequests.map { it.url.host }).isEqualTo(listOf("cdn2.test", "cdn3.test"))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `routes to the storage pool`() = runBlockingTest {
|
||||
val client = client(random = ScriptedRandom(0))
|
||||
|
||||
client.request(RequestSpec(Method.GET, Host.Storage, "/v1/storage"))
|
||||
|
||||
assertThat(recordedRequests.single().url.host).isEqualTo("storage.test")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `2xx maps to Success`() = runBlockingTest {
|
||||
responder = { req -> response(req, 200, "hello", extraHeader = "X-Foo" to "Bar") }
|
||||
val client = client()
|
||||
|
||||
val result = client.request(RequestSpec(Method.GET, Host.Service, "/v1/ping"))
|
||||
|
||||
assertThat(result).isInstanceOf(RequestResult.Success::class)
|
||||
val success = result as RequestResult.Success
|
||||
assertThat(success.result.statusCode).isEqualTo(200)
|
||||
assertThat(String(success.result.body)).isEqualTo("hello")
|
||||
assertThat(success.result.headers["x-foo"]).isEqualTo("Bar")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `non-2xx maps to NonSuccess with default RestStatusCodeError`() = runBlockingTest {
|
||||
responder = { req -> response(req, 404, "nope") }
|
||||
val client = client()
|
||||
|
||||
val result = client.request(RequestSpec(Method.GET, Host.Service, "/v1/ping"))
|
||||
|
||||
assertThat(result).isInstanceOf(RequestResult.NonSuccess::class)
|
||||
val error = (result as RequestResult.NonSuccess).error
|
||||
assertThat(error.statusCode).isEqualTo(404)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `non-2xx uses the supplied error mapper`() = runBlockingTest {
|
||||
responder = { req -> response(req, 500, "boom") }
|
||||
val client = client()
|
||||
val mapped = TestError(599)
|
||||
|
||||
val result = client.request(
|
||||
RequestSpec(Method.GET, Host.Service, "/v1/ping"),
|
||||
ErrorMapper { _, _, _ -> mapped }
|
||||
)
|
||||
|
||||
assertThat(result).isInstanceOf(RequestResult.NonSuccess::class)
|
||||
assertThat((result as RequestResult.NonSuccess).error).isSameInstanceAs(mapped)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `transport IOException maps to RetryableNetworkError`() = runBlockingTest {
|
||||
responder = { throw IOException("connection reset") }
|
||||
val client = client()
|
||||
|
||||
val result = client.request(RequestSpec(Method.GET, Host.Service, "/v1/ping"))
|
||||
|
||||
assertThat(result).isInstanceOf(RequestResult.RetryableNetworkError::class)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `unknown cdn number maps to ApplicationError`() = runBlockingTest {
|
||||
val client = client()
|
||||
|
||||
val result = client.request(RequestSpec(Method.GET, Host.Cdn(99), "/file"))
|
||||
|
||||
assertThat(result).isInstanceOf(RequestResult.ApplicationError::class)
|
||||
}
|
||||
|
||||
private fun runBlockingTest(block: suspend () -> Unit) {
|
||||
runBlocking { block() }
|
||||
}
|
||||
|
||||
private fun client(random: Random = ScriptedRandom(0)): SignalRestClient {
|
||||
return SignalRestClient(
|
||||
configuration = testConfiguration(),
|
||||
signalAgent = "test-agent",
|
||||
credentialsProvider = null,
|
||||
automaticNetworkRetry = false,
|
||||
socketTimeoutMillis = 1_000,
|
||||
random = random,
|
||||
clientOverride = recordingClient
|
||||
)
|
||||
}
|
||||
|
||||
private fun testConfiguration(): SignalServiceConfiguration {
|
||||
return SignalServiceConfiguration(
|
||||
signalServiceUrls = arrayOf(
|
||||
SignalServiceUrl("https://service-a.test", DUMMY_TRUST_STORE),
|
||||
SignalServiceUrl("https://service-b.test", DUMMY_TRUST_STORE),
|
||||
SignalServiceUrl("https://service-c.test", DUMMY_TRUST_STORE)
|
||||
),
|
||||
signalCdnUrlMap = mapOf(
|
||||
2 to arrayOf(SignalCdnUrl("https://cdn2.test", DUMMY_TRUST_STORE)),
|
||||
3 to arrayOf(SignalCdnUrl("https://cdn3.test", DUMMY_TRUST_STORE))
|
||||
),
|
||||
signalStorageUrls = arrayOf(SignalStorageUrl("https://storage.test", DUMMY_TRUST_STORE)),
|
||||
signalCdsiUrls = emptyArray<SignalCdsiUrl>(),
|
||||
signalSvr2Urls = emptyArray<SignalSvr2Url>(),
|
||||
networkInterceptors = emptyList(),
|
||||
dns = Optional.empty(),
|
||||
signalProxy = Optional.empty(),
|
||||
systemHttpProxy = Optional.empty(),
|
||||
zkGroupServerPublicParams = ByteArray(0),
|
||||
genericServerPublicParams = ByteArray(0),
|
||||
backupServerPublicParams = ByteArray(0),
|
||||
censored = false
|
||||
)
|
||||
}
|
||||
|
||||
private fun response(request: Request, code: Int, body: String, extraHeader: Pair<String, String>? = null): Response {
|
||||
val builder = Response.Builder()
|
||||
.request(request)
|
||||
.protocol(Protocol.HTTP_1_1)
|
||||
.code(code)
|
||||
.message(if (code in 200..299) "OK" else "Error")
|
||||
.body(body.toResponseBody("application/octet-stream".toMediaType()))
|
||||
|
||||
if (extraHeader != null) {
|
||||
builder.header(extraHeader.first, extraHeader.second)
|
||||
}
|
||||
|
||||
return builder.build()
|
||||
}
|
||||
|
||||
private class TestError(val code: Int) : BadRequestError
|
||||
|
||||
/** A [Random] whose `nextInt(bound)` returns a scripted sequence of values. */
|
||||
private class ScriptedRandom(vararg values: Int) : Random() {
|
||||
private val queue = ArrayDeque(values.toList())
|
||||
|
||||
override fun nextInt(bound: Int): Int = queue.removeFirst()
|
||||
}
|
||||
|
||||
companion object {
|
||||
private val DUMMY_TRUST_STORE = object : TrustStore {
|
||||
override fun getKeyStoreInputStream() = ByteArrayInputStream(ByteArray(0))
|
||||
override fun getKeyStorePassword() = ""
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user