Remove blocking get from donation jobs.

This commit is contained in:
Alex Hart
2022-07-28 15:54:49 -03:00
committed by Greyson Parrelli
parent 8f12b2041a
commit f50bf3e9c2
17 changed files with 214 additions and 183 deletions

View File

@@ -26,20 +26,29 @@ class DonorErrorConfigurationViewModel : ViewModel() {
val state: Flowable<DonorErrorConfigurationState> = store.stateFlowable
init {
val giftBadges: Single<List<Badge>> = ApplicationDependencies.getDonationsService()
.getGiftBadges(Locale.getDefault())
val giftBadges: Single<List<Badge>> = Single
.fromCallable {
ApplicationDependencies.getDonationsService()
.getGiftBadges(Locale.getDefault())
}
.flatMap { it.flattenResult() }
.map { results -> results.values.map { Badges.fromServiceBadge(it) } }
.subscribeOn(Schedulers.io())
val boostBadges: Single<List<Badge>> = ApplicationDependencies.getDonationsService()
.getBoostBadge(Locale.getDefault())
val boostBadges: Single<List<Badge>> = Single
.fromCallable {
ApplicationDependencies.getDonationsService()
.getBoostBadge(Locale.getDefault())
}
.flatMap { it.flattenResult() }
.map { listOf(Badges.fromServiceBadge(it)) }
.subscribeOn(Schedulers.io())
val subscriptionBadges: Single<List<Badge>> = ApplicationDependencies.getDonationsService()
.getSubscriptionLevels(Locale.getDefault())
val subscriptionBadges: Single<List<Badge>> = Single
.fromCallable {
ApplicationDependencies.getDonationsService()
.getSubscriptionLevels(Locale.getDefault())
}
.flatMap { it.flattenResult() }
.map { levels -> levels.levels.values.map { Badges.fromServiceBadge(it.badge) } }
.subscribeOn(Schedulers.io())

View File

@@ -170,8 +170,11 @@ class DonationPaymentRepository(activity: Activity) : StripeApi.PaymentIntentFet
fun cancelActiveSubscription(): Completable {
Log.d(TAG, "Canceling active subscription...", true)
val localSubscriber = SignalStore.donationsValues().requireSubscriber()
return ApplicationDependencies.getDonationsService()
.cancelSubscription(localSubscriber.subscriberId)
return Single
.fromCallable {
ApplicationDependencies.getDonationsService()
.cancelSubscription(localSubscriber.subscriberId)
}
.subscribeOn(Schedulers.io())
.flatMap(ServiceResponse<EmptyResponse>::flattenResult)
.ignoreElement()
@@ -181,9 +184,12 @@ class DonationPaymentRepository(activity: Activity) : StripeApi.PaymentIntentFet
fun ensureSubscriberId(): Completable {
Log.d(TAG, "Ensuring SubscriberId exists on Signal service...", true)
val subscriberId = SignalStore.donationsValues().getSubscriber()?.subscriberId ?: SubscriberId.generate()
return ApplicationDependencies
.getDonationsService()
.putSubscription(subscriberId)
return Single
.fromCallable {
ApplicationDependencies
.getDonationsService()
.putSubscription(subscriberId)
}
.subscribeOn(Schedulers.io())
.flatMap(ServiceResponse<EmptyResponse>::flattenResult).ignoreElement()
.doOnComplete {
@@ -268,67 +274,71 @@ class DonationPaymentRepository(activity: Activity) : StripeApi.PaymentIntentFet
val subscriber = SignalStore.donationsValues().requireSubscriber()
Log.d(TAG, "Attempting to set user subscription level to $subscriptionLevel", true)
ApplicationDependencies.getDonationsService().updateSubscriptionLevel(
subscriber.subscriberId,
subscriptionLevel,
subscriber.currencyCode,
levelUpdateOperation.idempotencyKey.serialize(),
SubscriptionReceiptRequestResponseJob.MUTEX
).flatMapCompletable {
if (it.status == 200 || it.status == 204) {
Log.d(TAG, "Successfully set user subscription to level $subscriptionLevel with response code ${it.status}", true)
SignalStore.donationsValues().updateLocalStateForLocalSubscribe()
scheduleSyncForAccountRecordChange()
LevelUpdate.updateProcessingState(false)
Completable.complete()
} else {
if (it.applicationError.isPresent) {
Log.w(TAG, "Failed to set user subscription to level $subscriptionLevel with response code ${it.status}", it.applicationError.get(), true)
SignalStore.donationsValues().clearLevelOperations()
Single
.fromCallable {
ApplicationDependencies.getDonationsService().updateSubscriptionLevel(
subscriber.subscriberId,
subscriptionLevel,
subscriber.currencyCode,
levelUpdateOperation.idempotencyKey.serialize(),
SubscriptionReceiptRequestResponseJob.MUTEX
)
}
.flatMapCompletable {
if (it.status == 200 || it.status == 204) {
Log.d(TAG, "Successfully set user subscription to level $subscriptionLevel with response code ${it.status}", true)
SignalStore.donationsValues().updateLocalStateForLocalSubscribe()
scheduleSyncForAccountRecordChange()
LevelUpdate.updateProcessingState(false)
Completable.complete()
} else {
Log.w(TAG, "Failed to set user subscription to level $subscriptionLevel", it.executionError.orElse(null), true)
}
LevelUpdate.updateProcessingState(false)
it.flattenResult().ignoreElement()
}
}.andThen {
Log.d(TAG, "Enqueuing request response job chain.", true)
val countDownLatch = CountDownLatch(1)
var finalJobState: JobTracker.JobState? = null
SubscriptionReceiptRequestResponseJob.createSubscriptionContinuationJobChain().enqueue { _, jobState ->
if (jobState.isComplete) {
finalJobState = jobState
countDownLatch.countDown()
}
}
try {
if (countDownLatch.await(10, TimeUnit.SECONDS)) {
when (finalJobState) {
JobTracker.JobState.SUCCESS -> {
Log.d(TAG, "Subscription request response job chain succeeded.", true)
it.onComplete()
}
JobTracker.JobState.FAILURE -> {
Log.d(TAG, "Subscription request response job chain failed permanently.", true)
it.onError(DonationError.genericBadgeRedemptionFailure(DonationErrorSource.SUBSCRIPTION))
}
else -> {
Log.d(TAG, "Subscription request response job chain ignored due to in-progress jobs.", true)
it.onError(DonationError.timeoutWaitingForToken(DonationErrorSource.SUBSCRIPTION))
}
if (it.applicationError.isPresent) {
Log.w(TAG, "Failed to set user subscription to level $subscriptionLevel with response code ${it.status}", it.applicationError.get(), true)
SignalStore.donationsValues().clearLevelOperations()
} else {
Log.w(TAG, "Failed to set user subscription to level $subscriptionLevel", it.executionError.orElse(null), true)
}
} else {
Log.d(TAG, "Subscription request response job timed out.", true)
LevelUpdate.updateProcessingState(false)
it.flattenResult().ignoreElement()
}
}.andThen {
Log.d(TAG, "Enqueuing request response job chain.", true)
val countDownLatch = CountDownLatch(1)
var finalJobState: JobTracker.JobState? = null
SubscriptionReceiptRequestResponseJob.createSubscriptionContinuationJobChain().enqueue { _, jobState ->
if (jobState.isComplete) {
finalJobState = jobState
countDownLatch.countDown()
}
}
try {
if (countDownLatch.await(10, TimeUnit.SECONDS)) {
when (finalJobState) {
JobTracker.JobState.SUCCESS -> {
Log.d(TAG, "Subscription request response job chain succeeded.", true)
it.onComplete()
}
JobTracker.JobState.FAILURE -> {
Log.d(TAG, "Subscription request response job chain failed permanently.", true)
it.onError(DonationError.genericBadgeRedemptionFailure(DonationErrorSource.SUBSCRIPTION))
}
else -> {
Log.d(TAG, "Subscription request response job chain ignored due to in-progress jobs.", true)
it.onError(DonationError.timeoutWaitingForToken(DonationErrorSource.SUBSCRIPTION))
}
}
} else {
Log.d(TAG, "Subscription request response job timed out.", true)
it.onError(DonationError.timeoutWaitingForToken(DonationErrorSource.SUBSCRIPTION))
}
} catch (e: InterruptedException) {
Log.w(TAG, "Subscription request response interrupted.", e, true)
it.onError(DonationError.timeoutWaitingForToken(DonationErrorSource.SUBSCRIPTION))
}
} catch (e: InterruptedException) {
Log.w(TAG, "Subscription request response interrupted.", e, true)
it.onError(DonationError.timeoutWaitingForToken(DonationErrorSource.SUBSCRIPTION))
}
}
}.doOnError {
LevelUpdate.updateProcessingState(false)
}.subscribeOn(Schedulers.io())
@@ -356,9 +366,12 @@ class DonationPaymentRepository(activity: Activity) : StripeApi.PaymentIntentFet
override fun fetchPaymentIntent(price: FiatMoney, level: Long): Single<StripeApi.PaymentIntent> {
Log.d(TAG, "Fetching payment intent from Signal service for $price... (Locale.US minimum precision: ${price.minimumUnitPrecisionString})")
return ApplicationDependencies
.getDonationsService()
.createDonationIntentWithAmount(price.minimumUnitPrecisionString, price.currency.currencyCode, level)
return Single
.fromCallable {
ApplicationDependencies
.getDonationsService()
.createDonationIntentWithAmount(price.minimumUnitPrecisionString, price.currency.currencyCode, level)
}
.flatMap(ServiceResponse<SubscriptionClientSecret>::flattenResult)
.map {
StripeApi.PaymentIntent(it.id, it.clientSecret)
@@ -370,7 +383,13 @@ class DonationPaymentRepository(activity: Activity) : StripeApi.PaymentIntentFet
override fun fetchSetupIntent(): Single<StripeApi.SetupIntent> {
Log.d(TAG, "Fetching setup intent from Signal service...")
return Single.fromCallable { SignalStore.donationsValues().requireSubscriber() }
.flatMap { ApplicationDependencies.getDonationsService().createSubscriptionPaymentMethod(it.subscriberId) }
.flatMap {
Single.fromCallable {
ApplicationDependencies
.getDonationsService()
.createSubscriptionPaymentMethod(it.subscriberId)
}
}
.flatMap(ServiceResponse<SubscriptionClientSecret>::flattenResult)
.map { StripeApi.SetupIntent(it.id, it.clientSecret) }
.doOnSuccess {
@@ -383,7 +402,11 @@ class DonationPaymentRepository(activity: Activity) : StripeApi.PaymentIntentFet
return Single.fromCallable {
SignalStore.donationsValues().requireSubscriber()
}.flatMap {
ApplicationDependencies.getDonationsService().setDefaultPaymentMethodId(it.subscriberId, paymentMethodId)
Single.fromCallable {
ApplicationDependencies
.getDonationsService()
.setDefaultPaymentMethodId(it.subscriberId, paymentMethodId)
}
}.flatMap(ServiceResponse<EmptyResponse>::flattenResult).ignoreElement().doOnComplete {
Log.d(TAG, "Set default payment method via Signal service!")
}

View File

@@ -23,7 +23,7 @@ class SubscriptionsRepository(private val donationsService: DonationsService) {
fun getActiveSubscription(): Single<ActiveSubscription> {
val localSubscription = SignalStore.donationsValues().getSubscriber()
return if (localSubscription != null) {
donationsService.getSubscription(localSubscription.subscriberId)
Single.fromCallable { donationsService.getSubscription(localSubscription.subscriberId) }
.subscribeOn(Schedulers.io())
.flatMap(ServiceResponse<ActiveSubscription>::flattenResult)
} else {
@@ -31,7 +31,8 @@ class SubscriptionsRepository(private val donationsService: DonationsService) {
}
}
fun getSubscriptions(): Single<List<Subscription>> = donationsService.getSubscriptionLevels(Locale.getDefault())
fun getSubscriptions(): Single<List<Subscription>> = Single
.fromCallable { donationsService.getSubscriptionLevels(Locale.getDefault()) }
.subscribeOn(Schedulers.io())
.flatMap(ServiceResponse<SubscriptionLevels>::flattenResult)
.map { subscriptionLevels ->

View File

@@ -5,6 +5,7 @@ import io.reactivex.rxjava3.schedulers.Schedulers
import org.signal.core.util.money.FiatMoney
import org.thoughtcrime.securesms.badges.Badges
import org.thoughtcrime.securesms.badges.models.Badge
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
import org.thoughtcrime.securesms.util.PlatformCurrencyUtil
import org.whispersystems.signalservice.api.profiles.SignalServiceProfile
import org.whispersystems.signalservice.api.services.DonationsService
@@ -16,7 +17,7 @@ import java.util.Locale
class BoostRepository(private val donationsService: DonationsService) {
fun getBoosts(): Single<Map<Currency, List<Boost>>> {
return donationsService.boostAmounts
return Single.fromCallable { donationsService.boostAmounts }
.subscribeOn(Schedulers.io())
.flatMap(ServiceResponse<Map<String, List<BigDecimal>>>::flattenResult)
.map { result ->
@@ -28,7 +29,11 @@ class BoostRepository(private val donationsService: DonationsService) {
}
fun getBoostBadge(): Single<Badge> {
return donationsService.getBoostBadge(Locale.getDefault())
return Single
.fromCallable {
ApplicationDependencies.getDonationsService()
.getBoostBadge(Locale.getDefault())
}
.subscribeOn(Schedulers.io())
.flatMap(ServiceResponse<SignalServiceProfile.Badge>::flattenResult)
.map(Badges::fromServiceBadge)

View File

@@ -9,9 +9,12 @@ import java.util.Locale
class DonationReceiptDetailRepository {
fun getSubscriptionLevelName(subscriptionLevel: Int): Single<String> {
return ApplicationDependencies
.getDonationsService()
.getSubscriptionLevels(Locale.getDefault())
return Single
.fromCallable {
ApplicationDependencies
.getDonationsService()
.getSubscriptionLevels(Locale.getDefault())
}
.flatMap { it.flattenResult() }
.map { it.levels[subscriptionLevel.toString()] ?: throw Exception("Subscription level $subscriptionLevel not found") }
.map { it.name }

View File

@@ -9,7 +9,11 @@ import java.util.Locale
class DonationReceiptListRepository {
fun getBadges(): Single<List<DonationReceiptBadge>> {
val boostBadges: Single<List<DonationReceiptBadge>> = ApplicationDependencies.getDonationsService().getBoostBadge(Locale.getDefault())
val boostBadges: Single<List<DonationReceiptBadge>> = Single
.fromCallable {
ApplicationDependencies.getDonationsService()
.getBoostBadge(Locale.getDefault())
}
.map { response ->
if (response.result.isPresent) {
listOf(DonationReceiptBadge(DonationReceiptRecord.Type.BOOST, -1, Badges.fromServiceBadge(response.result.get())))
@@ -18,7 +22,8 @@ class DonationReceiptListRepository {
}
}
val subBadges: Single<List<DonationReceiptBadge>> = ApplicationDependencies.getDonationsService().getSubscriptionLevels(Locale.getDefault())
val subBadges: Single<List<DonationReceiptBadge>> = Single
.fromCallable { ApplicationDependencies.getDonationsService().getSubscriptionLevels(Locale.getDefault()) }
.map { response ->
if (response.result.isPresent) {
response.result.get().levels.map {