mirror of
https://github.com/signalapp/Signal-Android.git
synced 2026-04-28 04:34:21 +01:00
Add Flow.throttleLatest extension.
This commit is contained in:
@@ -0,0 +1,24 @@
|
||||
/*
|
||||
* Copyright 2024 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.signal.core.util
|
||||
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.conflate
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlin.time.Duration
|
||||
|
||||
/**
|
||||
* Throttles the flow so that at most one value is emitted every [timeout]. The latest value is always emitted.
|
||||
*
|
||||
* You can think of this like debouncing, but with "checkpoints" so that even if you have a constant stream of values,
|
||||
* you'll still get an emission every [timeout] (unlike debouncing, which will only emit once the stream settles down).
|
||||
*/
|
||||
fun <T> Flow<T>.throttleLatest(timeout: Duration): Flow<T> {
|
||||
return this
|
||||
.conflate()
|
||||
.onEach { delay(timeout) }
|
||||
}
|
||||
@@ -0,0 +1,63 @@
|
||||
/*
|
||||
* Copyright 2024 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.signal.core.util
|
||||
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.flow.flow
|
||||
import kotlinx.coroutines.flow.toList
|
||||
import kotlinx.coroutines.test.runTest
|
||||
import org.junit.Assert.assertEquals
|
||||
import org.junit.Test
|
||||
import kotlin.time.Duration.Companion.milliseconds
|
||||
|
||||
class FlowExtensionsTests {
|
||||
|
||||
@Test
|
||||
fun `throttleLatest - always emits first value`() = runTest {
|
||||
val testFlow = flow {
|
||||
delay(10)
|
||||
emit(1)
|
||||
}
|
||||
|
||||
val output = testFlow
|
||||
.throttleLatest(100.milliseconds)
|
||||
.toList()
|
||||
|
||||
assertEquals(listOf(1), output)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `throttleLatest - always emits last value`() = runTest {
|
||||
val testFlow = flow {
|
||||
delay(10)
|
||||
emit(1)
|
||||
delay(30)
|
||||
emit(2)
|
||||
}
|
||||
|
||||
val output = testFlow
|
||||
.throttleLatest(20.milliseconds)
|
||||
.toList()
|
||||
|
||||
assertEquals(listOf(1, 2), output)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `throttleLatest - skips intermediate values`() = runTest {
|
||||
val testFlow = flow {
|
||||
for (i in 1..30) {
|
||||
emit(i)
|
||||
delay(10)
|
||||
}
|
||||
}
|
||||
|
||||
val output = testFlow
|
||||
.throttleLatest(50.milliseconds)
|
||||
.toList()
|
||||
|
||||
assertEquals(listOf(1, 5, 10, 15, 20, 25, 30), output)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user