From 7043558657adc0626f6f7d5d8dc3209186e7861d Mon Sep 17 00:00:00 2001 From: Michelle Tang Date: Thu, 24 Apr 2025 15:48:51 -0400 Subject: [PATCH] Add fixes for streamable videos. --- .../securesms/video/VideoPlayer.java | 10 ++ .../securesms/video/exo/PartDataSource.java | 10 +- app/src/main/res/layout/video_player.xml | 9 ++ .../core/util/stream/TailerInputStream.kt | 87 ++++++++++++++ .../core/util/stream/TailerInputStreamTest.kt | 106 ++++++++++++++++++ 5 files changed, 218 insertions(+), 4 deletions(-) create mode 100644 core-util-jvm/src/main/java/org/signal/core/util/stream/TailerInputStream.kt create mode 100644 core-util-jvm/src/test/java/org/signal/core/util/stream/TailerInputStreamTest.kt diff --git a/app/src/main/java/org/thoughtcrime/securesms/video/VideoPlayer.java b/app/src/main/java/org/thoughtcrime/securesms/video/VideoPlayer.java index f2b44eada4..80c01820da 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/video/VideoPlayer.java +++ b/app/src/main/java/org/thoughtcrime/securesms/video/VideoPlayer.java @@ -19,6 +19,7 @@ package org.thoughtcrime.securesms.video; import android.content.Context; import android.content.res.TypedArray; import android.util.AttributeSet; +import android.view.View; import android.view.Window; import android.view.WindowManager; import android.widget.FrameLayout; @@ -55,6 +56,7 @@ public class VideoPlayer extends FrameLayout { private static final String TAG = Log.tag(VideoPlayer.class); private final PlayerView exoView; + private final View progressBar; private final DefaultMediaSourceFactory mediaSourceFactory; private ExoPlayer exoPlayer; @@ -89,6 +91,7 @@ public class VideoPlayer extends FrameLayout { this.mediaSourceFactory = new DefaultMediaSourceFactory(context); this.exoView = findViewById(R.id.video_view); + this.progressBar = findViewById(R.id.progress_bar); this.exoControls = createPlayerControls(getContext()); this.exoPlayerListener = new ExoPlayerListener(); @@ -113,6 +116,13 @@ public class VideoPlayer extends FrameLayout { } private void onPlaybackStateChanged(boolean playWhenReady, int playbackState) { + if (progressBar != null) { + if (playbackState == Player.STATE_BUFFERING) { + progressBar.setVisibility(View.VISIBLE); + } else { + progressBar.setVisibility(View.GONE); + } + } if (playerCallback != null) { switch (playbackState) { case Player.STATE_READY: diff --git a/app/src/main/java/org/thoughtcrime/securesms/video/exo/PartDataSource.java b/app/src/main/java/org/thoughtcrime/securesms/video/exo/PartDataSource.java index 2dd1ac5c95..2fd334a2e0 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/video/exo/PartDataSource.java +++ b/app/src/main/java/org/thoughtcrime/securesms/video/exo/PartDataSource.java @@ -14,22 +14,22 @@ import androidx.media3.datasource.TransferListener; import org.signal.core.util.logging.Log; import org.signal.libsignal.protocol.InvalidMessageException; import org.thoughtcrime.securesms.attachments.DatabaseAttachment; -import org.thoughtcrime.securesms.backup.v2.BackupRepository; import org.thoughtcrime.securesms.backup.v2.DatabaseAttachmentArchiveUtil; import org.thoughtcrime.securesms.database.AttachmentTable; import org.thoughtcrime.securesms.database.SignalDatabase; import org.thoughtcrime.securesms.keyvalue.SignalStore; import org.thoughtcrime.securesms.mms.PartUriParser; import org.signal.core.util.Base64; -import org.whispersystems.signalservice.api.backup.MediaId; import org.whispersystems.signalservice.api.backup.MediaName; import org.whispersystems.signalservice.api.backup.MediaRootBackupKey; import org.whispersystems.signalservice.api.crypto.AttachmentCipherInputStream; import org.whispersystems.signalservice.api.crypto.AttachmentCipherStreamUtil; +import org.signal.core.util.stream.TailerInputStream; import org.whispersystems.signalservice.internal.crypto.PaddingInputStream; import java.io.EOFException; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Collections; @@ -88,14 +88,16 @@ class PartDataSource implements DataSource { } else { final File transferFile = attachmentDatabase.getOrCreateTransferFile(attachment.attachmentId); try { - this.inputStream = AttachmentCipherInputStream.createForAttachment(transferFile, attachment.size, decode, attachment.remoteDigest, attachment.getIncrementalDigest(), attachment.incrementalMacChunkSize); + long streamLength = AttachmentCipherStreamUtil.getCiphertextLength(PaddingInputStream.getPaddedSize(attachment.size)); + AttachmentCipherInputStream.StreamSupplier streamSupplier = () -> new TailerInputStream(() -> new FileInputStream(transferFile), streamLength); + this.inputStream = AttachmentCipherInputStream.createForAttachment(streamSupplier, streamLength, attachment.size, decode, attachment.remoteDigest, attachment.getIncrementalDigest(), attachment.incrementalMacChunkSize, false); } catch (InvalidMessageException e) { throw new IOException("Error decrypting attachment stream!", e); } } long skipped = 0; while (skipped < dataSpec.position) { - skipped += this.inputStream.read(); + skipped += this.inputStream.skip(dataSpec.position - skipped); } Log.d(TAG, "Successfully loaded partial attachment file."); diff --git a/app/src/main/res/layout/video_player.xml b/app/src/main/res/layout/video_player.xml index 9c70f3cb01..c71c5e1d19 100644 --- a/app/src/main/res/layout/video_player.xml +++ b/app/src/main/res/layout/video_player.xml @@ -17,4 +17,13 @@ app:surface_type="texture_view" app:player_layout_id="@layout/media_preview_exoplayer_layout"/> + + \ No newline at end of file diff --git a/core-util-jvm/src/main/java/org/signal/core/util/stream/TailerInputStream.kt b/core-util-jvm/src/main/java/org/signal/core/util/stream/TailerInputStream.kt new file mode 100644 index 0000000000..c0dca47f47 --- /dev/null +++ b/core-util-jvm/src/main/java/org/signal/core/util/stream/TailerInputStream.kt @@ -0,0 +1,87 @@ +package org.signal.core.util.stream + +import org.signal.core.util.logging.Log +import java.io.FilterInputStream +import java.io.IOException +import java.io.InputStream + +/** + * Input stream that reads a file that is actively being written to. + * Will read or wait to read (for the bytes to be available) until it reaches the end [bytesLength] + * A use case is streamable video where we want to play the video while the file is still downloading + */ +class TailerInputStream(private val streamFactory: StreamFactory, private val bytesLength: Long) : FilterInputStream(streamFactory.openStream()) { + + private val TAG = Log.tag(TailerInputStream::class) + + /** Tracks where we are in the file */ + private var position: Long = 0 + + private var currentStream: InputStream + get() = this.`in` + set(input) { + this.`in` = input + } + + override fun skip(requestedSkipCount: Long): Long { + val bytesSkipped = this.currentStream.skip(requestedSkipCount) + this.position += bytesSkipped + + return bytesSkipped + } + + override fun read(): Int { + val bytes = ByteArray(1) + var result = this.read(bytes) + while (result == 0) { + result = this.read(bytes) + } + + if (result == -1) { + return result + } + + return bytes[0].toInt() and 0xFF + } + + override fun read(destination: ByteArray): Int { + return this.read(destination = destination, offset = 0, length = destination.size) + } + + override fun read(destination: ByteArray, offset: Int, length: Int): Int { + // Checking if we reached the end of the file (bytesLength) + if (position >= bytesLength) { + return -1 + } + + var bytesRead = this.currentStream.read(destination, offset, length) + + // If we haven't read any bytes, but we aren't at the end of the file, + // we close the stream, wait, and then try again + while (bytesRead < 0 && position < bytesLength) { + this.currentStream.close() + try { + Thread.sleep(100) + } catch (e: InterruptedException) { + Log.w(TAG, "Ignoring interrupted exception while waiting for input stream", e) + } + this.currentStream = streamFactory.openStream() + // After reopening the file, we skip to the position we were at last time + this.currentStream.skip(this.position) + + bytesRead = this.currentStream.read(destination, offset, length) + } + + // Update current position with bytes read + if (bytesRead > 0) { + position += bytesRead + } + + return bytesRead + } +} + +fun interface StreamFactory { + @Throws(IOException::class) + fun openStream(): InputStream +} diff --git a/core-util-jvm/src/test/java/org/signal/core/util/stream/TailerInputStreamTest.kt b/core-util-jvm/src/test/java/org/signal/core/util/stream/TailerInputStreamTest.kt new file mode 100644 index 0000000000..e529fb5562 --- /dev/null +++ b/core-util-jvm/src/test/java/org/signal/core/util/stream/TailerInputStreamTest.kt @@ -0,0 +1,106 @@ +package org.signal.core.util.stream + +import org.junit.Assert.assertEquals +import org.junit.Test +import org.signal.core.util.readFully + +class TailerInputStreamTest { + + @Test + fun `when I provide an incomplete stream and a known bytesLength, I can read the stream until bytesLength is reached`() { + var currentBytesLength = 0 + val inputStream = TailerInputStream( + streamFactory = { + currentBytesLength += 10 + ByteArray(currentBytesLength).inputStream() + }, + bytesLength = 50 + ) + + val data = inputStream.readFully() + assertEquals(50, data.size) + } + + @Test + fun `when I provide an incomplete stream and a known bytesLength, I can read the stream one byte at a time until bytesLength is reached`() { + var currentBytesLength = 0 + val inputStream = TailerInputStream( + streamFactory = { + currentBytesLength += 10 + ByteArray(currentBytesLength).inputStream() + }, + bytesLength = 20 + ) + + var count = 0 + var lastRead = inputStream.read() + while (lastRead != -1) { + count++ + lastRead = inputStream.read() + } + + assertEquals(20, count) + } + + @Test + fun `when I provide a complete stream and a known bytesLength, I can read the stream until bytesLength is reached`() { + val inputStream = TailerInputStream( + streamFactory = { ByteArray(50).inputStream() }, + bytesLength = 50 + ) + + val data = inputStream.readFully() + assertEquals(50, data.size) + } + + @Test + fun `when I provide a complete stream and a known bytesLength, I can read the stream one byte at a time until bytesLength is reached`() { + val inputStream = TailerInputStream( + streamFactory = { ByteArray(20).inputStream() }, + bytesLength = 20 + ) + + var count = 0 + var lastRead = inputStream.read() + while (lastRead != -1) { + count++ + lastRead = inputStream.read() + } + + assertEquals(20, count) + } + + @Test + fun `when I skip bytes, I still read until the end of bytesLength`() { + var currentBytesLength = 0 + val inputStream = TailerInputStream( + streamFactory = { + currentBytesLength += 10 + ByteArray(currentBytesLength).inputStream() + }, + bytesLength = 50 + ) + + inputStream.skip(5) + + val data = inputStream.readFully() + assertEquals(45, data.size) + } + + @Test + fun `when I skip more bytes than available, I can still read until the end of bytesLength`() { + var currentBytesLength = 0 + val inputStream = TailerInputStream( + streamFactory = { + currentBytesLength += 10 + ByteArray(currentBytesLength).inputStream() + }, + bytesLength = 50 + ) + + inputStream.skip(15) + + val data = inputStream.readFully() + assertEquals(40, data.size) + } +}