Add create-and-upload to important attachment upload flows.

Co-authored-by: Greyson Parrelli <greyson@signal.org>
This commit is contained in:
Cody Henthorne
2026-03-26 16:35:02 -04:00
committed by Alex Hart
parent 2ad14800d1
commit ce87b50a07
12 changed files with 497 additions and 265 deletions

View File

@@ -10,6 +10,7 @@ import org.signal.core.models.backup.BackupKey
import org.signal.core.models.backup.MediaRootBackupKey
import org.signal.core.models.backup.MessageBackupKey
import org.signal.core.util.isNotNullOrBlank
import org.signal.core.util.logging.Log
import org.signal.libsignal.protocol.ecc.ECPrivateKey
import org.signal.libsignal.protocol.ecc.ECPublicKey
import org.signal.libsignal.zkgroup.GenericServerPublicParams
@@ -44,6 +45,10 @@ class ArchiveApi(
private val pushServiceSocket: PushServiceSocket
) {
companion object {
private val TAG = Log.tag(ArchiveApi::class)
}
private val backupServerPublicParams: GenericServerPublicParams = GenericServerPublicParams(pushServiceSocket.configuration.backupServerPublicParams)
/**
@@ -236,11 +241,38 @@ class ArchiveApi(
}
/**
* Uploads your main backup file to cloud storage.
* Uploads a pre-encrypted backup file, automatically choosing the best upload strategy based on CDN version.
* For CDN3, uses TUS "Creation With Upload" (single POST). For other CDNs, falls back to the legacy
* resumable upload flow.
*
* If [existingResumeUrl] is provided, the upload resumes using the existing URL (HEAD+PATCH).
* Otherwise, a new upload is initiated and [onResumeUrlCreated] is called with the resumable URL
* before the upload begins, allowing callers to persist it for crash recovery.
*/
fun uploadBackupFile(uploadForm: AttachmentUploadForm, resumableUploadUrl: String, data: InputStream, dataLength: Long, progressListener: SignalServiceAttachment.ProgressListener? = null): NetworkResult<Unit> {
fun uploadBackupFile(
uploadForm: AttachmentUploadForm,
data: InputStream,
dataLength: Long,
checksumSha256: String? = null,
progressListener: SignalServiceAttachment.ProgressListener? = null,
existingResumeUrl: String? = null,
onResumeUrlCreated: ((String) -> Unit)? = null
): NetworkResult<Unit> {
return NetworkResult.fromFetch {
pushServiceSocket.uploadBackupFile(uploadForm, resumableUploadUrl, data, dataLength, progressListener)
if (existingResumeUrl != null) {
Log.i(TAG, "Resuming backup upload via HEAD+PATCH")
pushServiceSocket.uploadBackupFile(uploadForm, existingResumeUrl, data, dataLength, progressListener)
} else if (uploadForm.cdn == 3) {
Log.i(TAG, "Fresh backup upload via creation-with-upload (CDN3)")
val resumeUrl = uploadForm.signedUploadLocation + "/" + uploadForm.key
onResumeUrlCreated?.invoke(resumeUrl)
pushServiceSocket.uploadBackupFile(uploadForm, checksumSha256, data, dataLength, progressListener, null)
} else {
Log.i(TAG, "Fresh backup upload via legacy flow (CDN${uploadForm.cdn})")
val resumeUrl = pushServiceSocket.getResumableUploadUrl(uploadForm, checksumSha256)
onResumeUrlCreated?.invoke(resumeUrl)
pushServiceSocket.uploadBackupFile(uploadForm, resumeUrl, data, dataLength, progressListener)
}
}
}

View File

@@ -5,6 +5,7 @@
package org.whispersystems.signalservice.api.attachment
import org.signal.core.util.logging.Log
import org.whispersystems.signalservice.api.NetworkResult
import org.whispersystems.signalservice.api.crypto.AttachmentCipherStreamUtil
import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentRemoteId
@@ -28,6 +29,11 @@ class AttachmentApi(
private val authWebSocket: SignalWebSocket.AuthenticatedWebSocket,
private val pushServiceSocket: PushServiceSocket
) {
companion object {
private val TAG: String = Log.tag(AttachmentApi::class)
}
/**
* Gets a v4 attachment upload form, which provides the necessary information to upload an attachment.
*
@@ -41,24 +47,6 @@ class AttachmentApi(
return NetworkResult.fromWebSocketRequest(authWebSocket, request, AttachmentUploadForm::class)
}
/**
* Gets a resumable upload spec, which can be saved and re-used across upload attempts to resume upload progress.
*/
fun getResumableUploadSpec(key: ByteArray, iv: ByteArray, uploadForm: AttachmentUploadForm): NetworkResult<ResumableUploadSpec> {
return getResumableUploadUrl(uploadForm)
.map { url ->
ResumableUploadSpec(
attachmentKey = key,
attachmentIv = iv,
cdnKey = uploadForm.key,
cdnNumber = uploadForm.cdn,
resumeLocation = url,
expirationTimestamp = System.currentTimeMillis() + PushServiceSocket.CDN2_RESUMABLE_LINK_LIFETIME_MILLIS,
headers = uploadForm.headers
)
}
}
/**
* Uploads an attachment using the v4 upload scheme.
*/
@@ -88,8 +76,8 @@ class AttachmentApi(
val digestInfo = pushServiceSocket.uploadAttachment(attachmentData)
AttachmentUploadResult(
remoteId = SignalServiceAttachmentRemoteId.V4(attachmentData.resumableUploadSpec.cdnKey),
cdnNumber = attachmentData.resumableUploadSpec.cdnNumber,
remoteId = SignalServiceAttachmentRemoteId.V4(resumableUploadSpec.cdnKey),
cdnNumber = resumableUploadSpec.cdnNumber,
key = resumableUploadSpec.attachmentKey,
digest = digestInfo.digest,
incrementalDigest = digestInfo.incrementalDigest,
@@ -102,18 +90,94 @@ class AttachmentApi(
}
/**
* Uploads a raw file using the v4 upload scheme. No additional encryption is supplied! Always prefer [uploadAttachmentV4], unless you are using a separate
* encryption scheme (i.e. like backup files).
* Uploads an encrypted attachment, automatically choosing the best upload strategy based on CDN version.
* For CDN3, uses TUS "Creation With Upload" (single POST). For other CDNs, falls back to the legacy
* resumable upload flow (POST create + HEAD + PATCH).
*
* If [existingSpec] is provided, the upload resumes using the existing resumable upload URL (HEAD+PATCH)
* and [form] is not required.
* Otherwise, [form] is required, a new upload is initiated, and [onSpecCreated] is called with the
* [ResumableUploadSpec] before the upload begins, allowing callers to persist it for crash recovery.
*/
fun uploadPreEncryptedFileToAttachmentV4(uploadForm: AttachmentUploadForm, resumableUploadUrl: String, inputStream: InputStream, inputStreamLength: Long): NetworkResult<Unit> {
fun uploadAttachmentV4(
form: AttachmentUploadForm? = null,
key: ByteArray,
iv: ByteArray,
checksumSha256: String?,
attachmentStream: SignalServiceAttachmentStream,
existingSpec: ResumableUploadSpec? = null,
onSpecCreated: ((ResumableUploadSpec) -> Unit)? = null
): NetworkResult<AttachmentUploadResult> {
return NetworkResult.fromFetch {
pushServiceSocket.uploadBackupFile(uploadForm, resumableUploadUrl, inputStream, inputStreamLength)
}
}
require(existingSpec != null || form != null) { "Either existingSpec or form must be provided" }
fun getResumableUploadUrl(uploadForm: AttachmentUploadForm): NetworkResult<String> {
return NetworkResult.fromFetch {
pushServiceSocket.getResumableUploadUrl(uploadForm)
val paddedLength = PaddingInputStream.getPaddedSize(attachmentStream.length)
val dataStream: InputStream = PaddingInputStream(attachmentStream.inputStream, attachmentStream.length)
val ciphertextLength = AttachmentCipherStreamUtil.getCiphertextLength(paddedLength)
val effectiveKey = existingSpec?.attachmentKey ?: key
val effectiveIv = existingSpec?.attachmentIv ?: iv
val attachmentData = PushAttachmentData(
contentType = attachmentStream.contentType,
data = dataStream,
dataSize = ciphertextLength,
incremental = attachmentStream.isFaststart,
outputStreamFactory = AttachmentCipherOutputStreamFactory(effectiveKey, effectiveIv),
listener = attachmentStream.listener,
cancelationSignal = attachmentStream.cancelationSignal,
resumableUploadSpec = existingSpec
)
val digestInfo = if (existingSpec != null) {
Log.i(TAG, "Resuming upload via HEAD+PATCH")
pushServiceSocket.uploadAttachment(attachmentData)
} else if (form!!.cdn == 3) {
Log.i(TAG, "Fresh upload via creation-with-upload (CDN3)")
val spec = ResumableUploadSpec(
attachmentKey = key,
attachmentIv = iv,
cdnKey = form.key,
cdnNumber = form.cdn,
resumeLocation = form.signedUploadLocation + "/" + form.key,
expirationTimestamp = System.currentTimeMillis() + PushServiceSocket.CDN2_RESUMABLE_LINK_LIFETIME_MILLIS,
headers = form.headers
)
onSpecCreated?.invoke(spec)
pushServiceSocket.createAndUploadToCdn3(form, checksumSha256, attachmentData)
} else {
Log.i(TAG, "Fresh upload via legacy flow (CDN${form.cdn})")
val resumeUrl = pushServiceSocket.getResumableUploadUrl(form, checksumSha256)
val spec = ResumableUploadSpec(
attachmentKey = key,
attachmentIv = iv,
cdnKey = form.key,
cdnNumber = form.cdn,
resumeLocation = resumeUrl,
expirationTimestamp = System.currentTimeMillis() + PushServiceSocket.CDN2_RESUMABLE_LINK_LIFETIME_MILLIS,
headers = form.headers
)
onSpecCreated?.invoke(spec)
pushServiceSocket.uploadAttachment(attachmentData.copy(resumableUploadSpec = spec))
}
val cdnKey = existingSpec?.cdnKey ?: form!!.key
val cdnNumber = existingSpec?.cdnNumber ?: form!!.cdn
AttachmentUploadResult(
remoteId = SignalServiceAttachmentRemoteId.V4(cdnKey),
cdnNumber = cdnNumber,
key = key,
digest = digestInfo.digest,
incrementalDigest = digestInfo.incrementalDigest,
incrementalDigestChunkSize = digestInfo.incrementalMacChunkSize,
uploadTimestamp = attachmentStream.uploadTimestamp,
dataSize = attachmentStream.length,
blurHash = attachmentStream.blurHash.getOrNull()
)
}
}
}

View File

@@ -5,6 +5,9 @@
package org.whispersystems.signalservice.api.crypto
import org.signal.core.util.stream.NullOutputStream
import java.io.InputStream
object AttachmentCipherStreamUtil {
/**
@@ -23,4 +26,21 @@ object AttachmentCipherStreamUtil {
fun getPlaintextLength(ciphertextLength: Long): Long {
return ((ciphertextLength - 16 - 32) / 16 - 1) * 16
}
/**
* Computes the SHA-256 digest of the ciphertext that would result from encrypting [plaintextStream] with the given [key] and [iv].
* This includes the IV prefix and HMAC suffix that are part of the encrypted attachment format.
* The stream is encrypted to /dev/null -- only the digest is retained.
*/
@JvmStatic
fun computeCiphertextSha256(key: ByteArray, iv: ByteArray, plaintextStream: InputStream): ByteArray {
val cipherOutputStream = AttachmentCipherOutputStream(key, iv, NullOutputStream)
val buffer = ByteArray(16 * 1024)
var read: Int
while (plaintextStream.read(buffer).also { read = it } != -1) {
cipherOutputStream.write(buffer, 0, read)
}
cipherOutputStream.close()
return cipherOutputStream.transmittedDigest
}
}

View File

@@ -22,5 +22,5 @@ data class PushAttachmentData(
val outputStreamFactory: OutputStreamFactory,
val listener: SignalServiceAttachment.ProgressListener?,
val cancelationSignal: CancelationSignal?,
val resumableUploadSpec: ResumableUploadSpec
val resumableUploadSpec: ResumableUploadSpec? = null
)

View File

@@ -930,6 +930,10 @@ public class PushServiceSocket {
}
public String getResumableUploadUrl(AttachmentUploadForm uploadForm) throws IOException {
return getResumableUploadUrl(uploadForm, null);
}
public String getResumableUploadUrl(AttachmentUploadForm uploadForm, @Nullable String checksumSha256) throws IOException {
ConnectionHolder connectionHolder = getRandom(cdnClientsMap.get(uploadForm.cdn), random);
OkHttpClient okHttpClient = connectionHolder.getClient()
.newBuilder()
@@ -957,6 +961,9 @@ public class PushServiceSocket {
} else if (uploadForm.cdn == 3) {
request.addHeader("Upload-Defer-Length", "1")
.addHeader("Tus-Resumable", "1.0.0");
if (checksumSha256 != null) {
request.addHeader("x-signal-checksum-sha256", checksumSha256);
}
} else {
throw new AssertionError("Unknown CDN version: " + uploadForm.cdn);
}
@@ -984,6 +991,75 @@ public class PushServiceSocket {
}
}
public AttachmentDigest createAndUploadToCdn3(AttachmentUploadForm uploadForm,
@Nullable String checksumSha256,
PushAttachmentData attachmentData)
throws IOException
{
ConnectionHolder connectionHolder = getRandom(cdnClientsMap.get(3), random);
OkHttpClient okHttpClient = connectionHolder.getClient()
.newBuilder()
.connectTimeout(soTimeoutMillis, TimeUnit.MILLISECONDS)
.readTimeout(soTimeoutMillis, TimeUnit.MILLISECONDS)
.build();
DigestingRequestBody file = new DigestingRequestBody(attachmentData.getData(), attachmentData.getOutputStreamFactory(), "application/offset+octet-stream", attachmentData.getDataSize(), attachmentData.getIncremental(), attachmentData.getListener(), attachmentData.getCancelationSignal(), 0);
Request.Builder request = new Request.Builder().url(buildConfiguredUrl(connectionHolder, uploadForm.signedUploadLocation))
.post(file)
.addHeader("Upload-Length", String.valueOf(attachmentData.getDataSize()))
.addHeader("Tus-Resumable", "1.0.0");
for (Map.Entry<String, String> header : uploadForm.headers.entrySet()) {
if (!header.getKey().equalsIgnoreCase("host")) {
request.header(header.getKey(), header.getValue());
}
}
if (checksumSha256 != null) {
request.addHeader("x-signal-checksum-sha256", checksumSha256);
}
if (connectionHolder.getHostHeader().isPresent()) {
request.header("host", connectionHolder.getHostHeader().get());
}
Call call = okHttpClient.newCall(request.build());
synchronized (connections) {
connections.add(call);
}
try (Response response = call.execute()) {
if (response.isSuccessful()) {
return file.getAttachmentDigest();
} else {
throw new NonSuccessfulResponseCodeException(response.code(), "Response: " + response, response.body().string());
}
} catch (PushNetworkException | NonSuccessfulResponseCodeException e) {
throw e;
} catch (IOException e) {
if (e instanceof StreamResetException) {
throw e;
}
throw new PushNetworkException(e);
} finally {
synchronized (connections) {
connections.remove(call);
}
}
}
public void uploadBackupFile(AttachmentUploadForm uploadForm,
@Nullable String checksumSha256,
InputStream data,
long length,
ProgressListener progressListener,
CancelationSignal cancelationSignal)
throws IOException
{
createAndUploadToCdn3(uploadForm, checksumSha256, new PushAttachmentData(null, data, length, false, new NoCipherOutputStreamFactory(), progressListener, cancelationSignal, null));
}
private AttachmentDigest uploadToCdn2(String resumableUrl, InputStream data, String contentType, long length, boolean incremental, OutputStreamFactory outputStreamFactory, ProgressListener progressListener, CancelationSignal cancelationSignal) throws IOException {
ConnectionHolder connectionHolder = getRandom(cdnClientsMap.get(2), random);
OkHttpClient okHttpClient = connectionHolder.getClient()
@@ -1042,7 +1118,7 @@ public class PushServiceSocket {
if (uploadForm.cdn == 2) {
uploadToCdn2(resumableUploadUrl, data, "application/octet-stream", dataLength, false, new NoCipherOutputStreamFactory(), progressListener, null);
} else {
uploadToCdn3(resumableUploadUrl, data, "application/octet-stream", dataLength, false, new NoCipherOutputStreamFactory(), progressListener, null, uploadForm.headers);
uploadToCdn3(resumableUploadUrl, data, "application/offset+octet-stream", dataLength, false, new NoCipherOutputStreamFactory(), progressListener, null, uploadForm.headers);
}
}
@@ -1211,6 +1287,9 @@ public class PushServiceSocket {
} catch (PushNetworkException | NonSuccessfulResponseCodeException e) {
throw e;
} catch (IOException e) {
if (e instanceof StreamResetException || e instanceof ResumeLocationInvalidException) {
throw e;
}
throw new PushNetworkException(e);
} finally {
synchronized (connections) {