Add IV to the attachment table.

This commit is contained in:
Greyson Parrelli
2024-08-30 12:11:22 -04:00
committed by Cody Henthorne
parent 07289b417b
commit 4b47d38d78
26 changed files with 534 additions and 309 deletions

View File

@@ -6,7 +6,11 @@
package org.whispersystems.signalservice.api
import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResponseCodeException
import org.whispersystems.signalservice.internal.util.JsonUtil
import org.whispersystems.signalservice.internal.websocket.WebSocketRequestMessage
import org.whispersystems.signalservice.internal.websocket.WebsocketResponse
import java.io.IOException
import kotlin.reflect.KClass
typealias StatusCodeErrorAction = (NetworkResult.StatusCodeError<*>) -> Unit
@@ -43,6 +47,50 @@ sealed class NetworkResult<T>(
} catch (e: Throwable) {
ApplicationError(e)
}
/**
* A convenience method to convert a websocket request into a network result with simple conversion of the response body to the desired class.
* Common exceptions will be caught and translated to errors.
*/
@JvmStatic
fun <T : Any> fromWebSocketRequest(
signalWebSocket: SignalWebSocket,
request: WebSocketRequestMessage,
clazz: KClass<T>
): NetworkResult<T> = try {
val result = signalWebSocket.request(request)
.map { response: WebsocketResponse -> JsonUtil.fromJson(response.body, clazz.java) }
.blockingGet()
Success(result)
} catch (e: NonSuccessfulResponseCodeException) {
StatusCodeError(e)
} catch (e: IOException) {
NetworkError(e)
} catch (e: Throwable) {
ApplicationError(e)
}
/**
* A convenience method to convert a websocket request into a network result with the ability to convert the response to your target class.
* Common exceptions will be caught and translated to errors.
*/
@JvmStatic
fun <T : Any> fromWebSocketRequest(
signalWebSocket: SignalWebSocket,
request: WebSocketRequestMessage,
webSocketResponseConverter: WebSocketResponseConverter<T>
): NetworkResult<T> = try {
val result = signalWebSocket.request(request)
.map { response: WebsocketResponse -> webSocketResponseConverter.convert(response) }
.blockingGet()
Success(result)
} catch (e: NonSuccessfulResponseCodeException) {
StatusCodeError(e)
} catch (e: IOException) {
NetworkError(e)
} catch (e: Throwable) {
ApplicationError(e)
}
}
/** Indicates the request was successful */
@@ -105,6 +153,34 @@ sealed class NetworkResult<T>(
}
}
/**
* Provides the ability to fallback to [fromFetch] if the current [NetworkResult] is non-successful.
*
* The [fallback] will only be triggered on non-[Success] results. You can provide a [unless] to limit what kinds of errors you fallback on
* (the default is to fallback on every error).
*
* This primary usecase of this is to make a websocket request (see [fromWebSocketRequest]) and fallback to rest upon failure.
*
* ```kotlin
* val user: NetworkResult<LocalUserModel> = NetworkResult
* .fromWebSocketRequest(websocket, request, LocalUserMode.class.java)
* .fallbackTo { result -> NetworkResult.fromFetch { http.getUser() } }
* ```
*
* @param unless If this lamba returns true, the fallback will not be triggered.
*/
fun fallbackToFetch(unless: (NetworkResult<T>) -> Boolean = { false }, fallback: Fetcher<T>): NetworkResult<T> {
if (this is Success) {
return this
}
return if (unless(this)) {
fromFetch(fallback)
} else {
this
}
}
/**
* Takes the output of one [NetworkResult] and passes it as the input to another if the operation is successful.
* If it's non-successful, the [result] lambda is not run, and instead the original failure will be propagated.
@@ -183,4 +259,9 @@ sealed class NetworkResult<T>(
@Throws(Exception::class)
fun fetch(): T
}
fun interface WebSocketResponseConverter<T> {
@Throws(Exception::class)
fun convert(response: WebsocketResponse): T
}
}

View File

@@ -23,6 +23,7 @@ import org.signal.libsignal.protocol.state.SessionRecord;
import org.signal.libsignal.protocol.util.Pair;
import org.signal.libsignal.zkgroup.groupsend.GroupSendFullToken;
import org.signal.libsignal.zkgroup.profiles.ClientZkProfileOperations;
import org.whispersystems.signalservice.api.attachment.AttachmentApi;
import org.whispersystems.signalservice.api.crypto.AttachmentCipherStreamUtil;
import org.whispersystems.signalservice.api.crypto.ContentHint;
import org.whispersystems.signalservice.api.crypto.EnvelopeContent;
@@ -170,6 +171,7 @@ public class SignalServiceMessageSender {
private static final int RETRY_COUNT = 4;
private final PushServiceSocket socket;
private final SignalWebSocket webSocket;
private final SignalServiceAccountDataStore aciStore;
private final SignalSessionLock sessionLock;
private final SignalServiceAddress localAddress;
@@ -198,6 +200,7 @@ public class SignalServiceMessageSender {
boolean automaticNetworkRetry)
{
this.socket = new PushServiceSocket(urls, credentialsProvider, signalAgent, clientZkProfileOperations, automaticNetworkRetry);
this.webSocket = signalWebSocket;
this.aciStore = store.aci();
this.sessionLock = sessionLock;
this.localAddress = new SignalServiceAddress(credentialsProvider.getAci(), credentialsProvider.getE164());
@@ -212,6 +215,10 @@ public class SignalServiceMessageSender {
this.scheduler = Schedulers.from(executor, false, false);
}
public AttachmentApi getAttachmentApi() {
return AttachmentApi.create(webSocket, socket);
}
/**
* Send a read receipt for a received message.
*
@@ -799,8 +806,8 @@ public class SignalServiceMessageSender {
}
public SignalServiceAttachmentPointer uploadAttachment(SignalServiceAttachmentStream attachment) throws IOException {
byte[] attachmentKey = attachment.getResumableUploadSpec().map(ResumableUploadSpec::getSecretKey).orElseGet(() -> Util.getSecretBytes(64));
byte[] attachmentIV = attachment.getResumableUploadSpec().map(ResumableUploadSpec::getIV).orElseGet(() -> Util.getSecretBytes(16));
byte[] attachmentKey = attachment.getResumableUploadSpec().map(ResumableUploadSpec::getAttachmentKey).orElseGet(() -> Util.getSecretBytes(64));
byte[] attachmentIV = attachment.getResumableUploadSpec().map(ResumableUploadSpec::getAttachmentIv).orElseGet(() -> Util.getSecretBytes(16));
long paddedLength = PaddingInputStream.getPaddedSize(attachment.getLength());
InputStream dataStream = new PaddingInputStream(attachment.getInputStream(), attachment.getLength());
long ciphertextLength = AttachmentCipherStreamUtil.getCiphertextLength(paddedLength);
@@ -811,7 +818,7 @@ public class SignalServiceMessageSender {
new AttachmentCipherOutputStreamFactory(attachmentKey, attachmentIV),
attachment.getListener(),
attachment.getCancelationSignal(),
attachment.getResumableUploadSpec().orElse(null));
attachment.getResumableUploadSpec().get());
if (attachment.getResumableUploadSpec().isEmpty()) {
throw new IllegalStateException("Attachment must have a resumable upload spec.");

View File

@@ -0,0 +1,120 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.signalservice.api.attachment
import org.whispersystems.signalservice.api.NetworkResult
import org.whispersystems.signalservice.api.SignalWebSocket
import org.whispersystems.signalservice.api.crypto.AttachmentCipherStreamUtil
import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentRemoteId
import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentStream
import org.whispersystems.signalservice.internal.crypto.PaddingInputStream
import org.whispersystems.signalservice.internal.push.AttachmentUploadForm
import org.whispersystems.signalservice.internal.push.PushAttachmentData
import org.whispersystems.signalservice.internal.push.PushServiceSocket
import org.whispersystems.signalservice.internal.push.http.AttachmentCipherOutputStreamFactory
import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec
import org.whispersystems.signalservice.internal.websocket.WebSocketRequestMessage
import java.io.InputStream
import java.security.SecureRandom
/**
* Class to interact with various attachment-related endpoints.
*/
class AttachmentApi(
private val signalWebSocket: SignalWebSocket,
private val pushServiceSocket: PushServiceSocket
) {
companion object {
@JvmStatic
fun create(signalWebSocket: SignalWebSocket, pushServiceSocket: PushServiceSocket): AttachmentApi {
return AttachmentApi(signalWebSocket, pushServiceSocket)
}
}
/**
* Gets a v4 attachment upload form, which provides the necessary information to upload an attachment.
*/
fun getAttachmentV4UploadForm(): NetworkResult<AttachmentUploadForm> {
val request = WebSocketRequestMessage(
id = SecureRandom().nextLong(),
verb = "GET",
path = "/v4/attachments/form/upload"
)
return NetworkResult
.fromWebSocketRequest(signalWebSocket, request, AttachmentUploadForm::class)
.fallbackToFetch(
unless = { it is NetworkResult.StatusCodeError && it.code == 209 },
fallback = { pushServiceSocket.attachmentV4UploadAttributes }
)
}
/**
* Gets a resumable upload spec, which can be saved and re-used across upload attempts to resume upload progress.
*/
fun getResumableUploadSpec(key: ByteArray, iv: ByteArray, uploadForm: AttachmentUploadForm): NetworkResult<ResumableUploadSpec> {
return getResumableUploadUrl(uploadForm)
.map { url ->
ResumableUploadSpec(
attachmentKey = key,
attachmentIv = iv,
cdnKey = uploadForm.key,
cdnNumber = uploadForm.cdn,
resumeLocation = url,
expirationTimestamp = System.currentTimeMillis() + PushServiceSocket.CDN2_RESUMABLE_LINK_LIFETIME_MILLIS,
headers = uploadForm.headers
)
}
}
/**
* Uploads an attachment using the v4 upload scheme.
*/
fun uploadAttachmentV4(attachmentStream: SignalServiceAttachmentStream): NetworkResult<AttachmentUploadResult> {
if (attachmentStream.resumableUploadSpec.isEmpty) {
throw IllegalStateException("Attachment must have a resumable upload spec!")
}
return NetworkResult.fromFetch {
val resumableUploadSpec = attachmentStream.resumableUploadSpec.get()
val paddedLength = PaddingInputStream.getPaddedSize(attachmentStream.length)
val dataStream: InputStream = PaddingInputStream(attachmentStream.inputStream, attachmentStream.length)
val ciphertextLength = AttachmentCipherStreamUtil.getCiphertextLength(paddedLength)
val attachmentData = PushAttachmentData(
contentType = attachmentStream.contentType,
data = dataStream,
dataSize = ciphertextLength,
incremental = attachmentStream.isFaststart,
outputStreamFactory = AttachmentCipherOutputStreamFactory(resumableUploadSpec.attachmentKey, resumableUploadSpec.attachmentIv),
listener = attachmentStream.listener,
cancelationSignal = attachmentStream.cancelationSignal,
resumableUploadSpec = attachmentStream.resumableUploadSpec.get()
)
val digestInfo = pushServiceSocket.uploadAttachment(attachmentData)
AttachmentUploadResult(
remoteId = SignalServiceAttachmentRemoteId.V4(attachmentData.resumableUploadSpec.cdnKey),
cdnNumber = attachmentData.resumableUploadSpec.cdnNumber,
key = resumableUploadSpec.attachmentKey,
iv = resumableUploadSpec.attachmentIv,
digest = digestInfo.digest,
incrementalDigest = digestInfo.incrementalDigest,
incrementalDigestChunkSize = digestInfo.incrementalMacChunkSize,
uploadTimestamp = attachmentStream.uploadTimestamp,
dataSize = attachmentData.dataSize
)
}
}
private fun getResumableUploadUrl(uploadForm: AttachmentUploadForm): NetworkResult<String> {
return NetworkResult.fromFetch {
pushServiceSocket.getResumableUploadUrl(uploadForm)
}
}
}

View File

@@ -0,0 +1,23 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.signalservice.api.attachment
import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentRemoteId
/**
* The result of uploading an attachment. Just the additional metadata related to the upload itself.
*/
class AttachmentUploadResult(
val remoteId: SignalServiceAttachmentRemoteId,
val cdnNumber: Int,
val key: ByteArray,
val iv: ByteArray,
val digest: ByteArray,
val incrementalDigest: ByteArray?,
val incrementalDigestChunkSize: Int,
val dataSize: Long,
val uploadTimestamp: Long
)

View File

@@ -5,4 +5,8 @@
package org.whispersystems.signalservice.internal.crypto
data class AttachmentDigest(val digest: ByteArray, val incrementalDigest: ByteArray?, val incrementalMacChunkSize: Int)
data class AttachmentDigest(
val digest: ByteArray,
val incrementalDigest: ByteArray?,
val incrementalMacChunkSize: Int
)

View File

@@ -1,74 +0,0 @@
/**
* Copyright (C) 2014-2016 Open Whisper Systems
*
* Licensed according to the LICENSE file in this repository.
*/
package org.whispersystems.signalservice.internal.push;
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment.ProgressListener;
import org.whispersystems.signalservice.internal.push.http.CancelationSignal;
import org.whispersystems.signalservice.internal.push.http.OutputStreamFactory;
import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec;
import java.io.InputStream;
public class PushAttachmentData {
private final String contentType;
private final InputStream data;
private final long dataSize;
private final boolean incremental;
private final OutputStreamFactory outputStreamFactory;
private final ProgressListener listener;
private final CancelationSignal cancelationSignal;
private final ResumableUploadSpec resumableUploadSpec;
public PushAttachmentData(String contentType, InputStream data, long dataSize,
boolean incremental, OutputStreamFactory outputStreamFactory,
ProgressListener listener, CancelationSignal cancelationSignal,
ResumableUploadSpec resumableUploadSpec)
{
this.contentType = contentType;
this.data = data;
this.dataSize = dataSize;
this.incremental = incremental;
this.outputStreamFactory = outputStreamFactory;
this.resumableUploadSpec = resumableUploadSpec;
this.listener = listener;
this.cancelationSignal = cancelationSignal;
}
public String getContentType() {
return contentType;
}
public InputStream getData() {
return data;
}
public long getDataSize() {
return dataSize;
}
public boolean getIncremental() {
return incremental;
}
public OutputStreamFactory getOutputStreamFactory() {
return outputStreamFactory;
}
public ProgressListener getListener() {
return listener;
}
public CancelationSignal getCancelationSignal() {
return cancelationSignal;
}
public ResumableUploadSpec getResumableUploadSpec() {
return resumableUploadSpec;
}
}

View File

@@ -0,0 +1,26 @@
/**
* Copyright (C) 2014-2016 Open Whisper Systems
*
* Licensed according to the LICENSE file in this repository.
*/
package org.whispersystems.signalservice.internal.push
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment
import org.whispersystems.signalservice.internal.push.http.CancelationSignal
import org.whispersystems.signalservice.internal.push.http.OutputStreamFactory
import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec
import java.io.InputStream
/**
* A bundle of data needed to start an attachment upload.
*/
data class PushAttachmentData(
val contentType: String?,
val data: InputStream,
val dataSize: Long,
val incremental: Boolean,
val outputStreamFactory: OutputStreamFactory,
val listener: SignalServiceAttachment.ProgressListener?,
val cancelationSignal: CancelationSignal?,
val resumableUploadSpec: ResumableUploadSpec
)

View File

@@ -343,7 +343,7 @@ public class PushServiceSocket {
private static final ResponseCodeHandler NO_HANDLER = new EmptyResponseCodeHandler();
private static final ResponseCodeHandler UNOPINIONATED_HANDLER = new UnopinionatedResponseCodeHandler();
private static final long CDN2_RESUMABLE_LINK_LIFETIME_MILLIS = TimeUnit.DAYS.toMillis(7);
public static final long CDN2_RESUMABLE_LINK_LIFETIME_MILLIS = TimeUnit.DAYS.toMillis(7);
private static final int MAX_FOLLOW_UPS = 20;

View File

@@ -1,121 +0,0 @@
package org.whispersystems.signalservice.internal.push.http;
import org.signal.protos.resumableuploads.ResumableUpload;
import org.whispersystems.signalservice.api.push.exceptions.ResumeLocationInvalidException;
import org.signal.core.util.Base64;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import okio.ByteString;
public final class ResumableUploadSpec {
private final byte[] secretKey;
private final byte[] iv;
private final String cdnKey;
private final Integer cdnNumber;
private final String resumeLocation;
private final Long expirationTimestamp;
private final Map<String, String> headers;
public ResumableUploadSpec(byte[] secretKey,
byte[] iv,
String cdnKey,
int cdnNumber,
String resumeLocation,
long expirationTimestamp,
Map<String, String> headers)
{
this.secretKey = secretKey;
this.iv = iv;
this.cdnKey = cdnKey;
this.cdnNumber = cdnNumber;
this.resumeLocation = resumeLocation;
this.expirationTimestamp = expirationTimestamp;
this.headers = headers;
}
public byte[] getSecretKey() {
return secretKey;
}
public byte[] getIV() {
return iv;
}
public String getCdnKey() {
return cdnKey;
}
public Integer getCdnNumber() {
return cdnNumber;
}
public String getResumeLocation() {
return resumeLocation;
}
public Long getExpirationTimestamp() {
return expirationTimestamp;
}
public Map<String, String> getHeaders() {
return headers;
}
public ResumableUpload toProto() {
ResumableUpload.Builder builder = new ResumableUpload.Builder()
.secretKey(ByteString.of(getSecretKey()))
.iv(ByteString.of(getIV()))
.timeout(getExpirationTimestamp())
.cdnNumber(getCdnNumber())
.cdnKey(getCdnKey())
.location(getResumeLocation())
.timeout(getExpirationTimestamp());
builder.headers(
headers.entrySet()
.stream()
.map(e -> new ResumableUpload.Header.Builder().key(e.getKey()).value_(e.getValue()).build())
.collect(Collectors.toList())
);
return builder.build();
}
public String serialize() {
return Base64.encodeWithPadding(toProto().encode());
}
public static ResumableUploadSpec deserialize(String serializedSpec) throws ResumeLocationInvalidException {
try {
ResumableUpload resumableUpload = ResumableUpload.ADAPTER.decode(Base64.decode(serializedSpec));
return from(resumableUpload);
} catch (IOException e) {
throw new ResumeLocationInvalidException();
}
}
public static ResumableUploadSpec from(ResumableUpload resumableUpload) throws ResumeLocationInvalidException {
if (resumableUpload == null) return null;
Map<String, String> headers = new HashMap<>();
for (ResumableUpload.Header header : resumableUpload.headers) {
headers.put(header.key, header.value_);
}
return new ResumableUploadSpec(
resumableUpload.secretKey.toByteArray(),
resumableUpload.iv.toByteArray(),
resumableUpload.cdnKey,
resumableUpload.cdnNumber,
resumableUpload.location,
resumableUpload.timeout,
headers
);
}
}

View File

@@ -0,0 +1,71 @@
package org.whispersystems.signalservice.internal.push.http
import okio.ByteString.Companion.toByteString
import org.signal.core.util.Base64
import org.signal.protos.resumableuploads.ResumableUpload
import org.whispersystems.signalservice.api.push.exceptions.ResumeLocationInvalidException
import java.io.IOException
/**
* Contains data around how to begin or resume an upload.
* For given attachment, this data be saved and reused within the [expirationTimestamp] window.
*/
class ResumableUploadSpec(
val attachmentKey: ByteArray,
val attachmentIv: ByteArray,
val cdnKey: String,
val cdnNumber: Int,
val resumeLocation: String,
val expirationTimestamp: Long,
val headers: Map<String, String>
) {
fun toProto(): ResumableUpload {
return ResumableUpload(
secretKey = attachmentKey.toByteString(),
iv = attachmentIv.toByteString(),
timeout = expirationTimestamp,
cdnNumber = cdnNumber,
cdnKey = cdnKey,
location = resumeLocation,
headers = headers.entries.map { ResumableUpload.Header(key = it.key, value_ = it.value) }
)
}
fun serialize(): String {
return Base64.encodeWithPadding(toProto().encode())
}
companion object {
@Throws(ResumeLocationInvalidException::class)
fun deserialize(serializedSpec: String?): ResumableUploadSpec? {
try {
val resumableUpload = ResumableUpload.ADAPTER.decode(Base64.decode(serializedSpec!!))
return from(resumableUpload)
} catch (e: IOException) {
throw ResumeLocationInvalidException()
}
}
@Throws(ResumeLocationInvalidException::class)
fun from(resumableUpload: ResumableUpload?): ResumableUploadSpec? {
if (resumableUpload == null) {
return null
}
val headers: MutableMap<String, String> = HashMap()
for (header in resumableUpload.headers) {
headers[header.key] = header.value_
}
return ResumableUploadSpec(
attachmentKey = resumableUpload.secretKey.toByteArray(),
attachmentIv = resumableUpload.iv.toByteArray(),
cdnKey = resumableUpload.cdnKey,
cdnNumber = resumableUpload.cdnNumber,
resumeLocation = resumableUpload.location,
expirationTimestamp = resumableUpload.timeout,
headers = headers
)
}
}
}