From a85b8c49d9765b4e5b72c21828e52dbe1e841a00 Mon Sep 17 00:00:00 2001 From: Alex Hart Date: Wed, 4 Jun 2025 09:17:53 -0300 Subject: [PATCH] Rework billing client integration. --- .../java/org/signal/billing/BillingApiImpl.kt | 111 ++++++++++-------- 1 file changed, 64 insertions(+), 47 deletions(-) diff --git a/billing/src/main/java/org/signal/billing/BillingApiImpl.kt b/billing/src/main/java/org/signal/billing/BillingApiImpl.kt index ac855af77a..20cb65248f 100644 --- a/billing/src/main/java/org/signal/billing/BillingApiImpl.kt +++ b/billing/src/main/java/org/signal/billing/BillingApiImpl.kt @@ -22,20 +22,14 @@ import com.android.billingclient.api.QueryProductDetailsParams import com.android.billingclient.api.QueryPurchasesParams import com.android.billingclient.api.queryProductDetails import com.android.billingclient.api.queryPurchasesAsync -import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.asCoroutineDispatcher -import kotlinx.coroutines.cancel -import kotlinx.coroutines.channels.awaitClose -import kotlinx.coroutines.channels.trySendBlocking import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.callbackFlow import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first -import kotlinx.coroutines.flow.retry import kotlinx.coroutines.flow.update import kotlinx.coroutines.launch import kotlinx.coroutines.withContext @@ -75,6 +69,7 @@ internal class BillingApiImpl( private val connectionState = MutableStateFlow(State.Init) private val coroutineScope = CoroutineScope(Dispatchers.Default) + private val connectionStateDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() private val internalResults = MutableSharedFlow() @@ -102,49 +97,61 @@ internal class BillingApiImpl( } } } + BillingResponseCode.BILLING_UNAVAILABLE -> { Log.d(TAG, "purchasesUpdatedListener: Billing unavailable.") BillingPurchaseResult.BillingUnavailable } + BillingResponseCode.USER_CANCELED -> { Log.d(TAG, "purchasesUpdatedListener: User cancelled.") BillingPurchaseResult.UserCancelled } + BillingResponseCode.ERROR -> { Log.d(TAG, "purchasesUpdatedListener: error.") BillingPurchaseResult.GenericError } + BillingResponseCode.NETWORK_ERROR -> { Log.d(TAG, "purchasesUpdatedListener: Network error.") BillingPurchaseResult.NetworkError } + BillingResponseCode.DEVELOPER_ERROR -> { Log.d(TAG, "purchasesUpdatedListener: Developer error.") BillingPurchaseResult.GenericError } + BillingResponseCode.FEATURE_NOT_SUPPORTED -> { Log.d(TAG, "purchasesUpdatedListener: Feature not supported.") BillingPurchaseResult.FeatureNotSupported } + BillingResponseCode.ITEM_ALREADY_OWNED -> { Log.d(TAG, "purchasesUpdatedListener: Already owned.") BillingPurchaseResult.AlreadySubscribed } + BillingResponseCode.ITEM_NOT_OWNED -> { error("This shouldn't happen during the purchase process") } + BillingResponseCode.ITEM_UNAVAILABLE -> { Log.d(TAG, "purchasesUpdatedListener: Item is unavailable") BillingPurchaseResult.TryAgainLater } + BillingResponseCode.SERVICE_UNAVAILABLE -> { Log.d(TAG, "purchasesUpdatedListener: Service is unavailable.") BillingPurchaseResult.TryAgainLater } + BillingResponseCode.SERVICE_DISCONNECTED -> { Log.d(TAG, "purchasesUpdatedListener: Service is disconnected.") BillingPurchaseResult.TryAgainLater } + else -> { Log.d(TAG, "purchasesUpdatedListener: No purchases.") BillingPurchaseResult.None @@ -163,19 +170,6 @@ internal class BillingApiImpl( ) .build() - init { - coroutineScope.launch { - createConnectionFlow() - .retry { it is RetryException } - .collect { newState -> - Log.d(TAG, "Updating Google Play Billing connection state: $newState", true) - connectionState.update { - newState - } - } - } - } - override fun getBillingPurchaseResults(): Flow { return internalResults } @@ -318,6 +312,7 @@ internal class BillingApiImpl( private suspend fun doOnConnectionReady(caller: String, block: suspend () -> T): T { Log.d(TAG, "Awaiting connection from $caller... (current state: ${connectionState.value})", true) + startBillingClientConnectionIfNecessary() val state = connectionState .filter { it == State.Connected || it is State.Failure } @@ -331,37 +326,59 @@ internal class BillingApiImpl( } } - private fun createConnectionFlow(): Flow { - return callbackFlow { - Log.d(TAG, "Starting Google Play Billing connection...", true) - send(State.Connecting) - - billingClient.startConnection(object : BillingClientStateListener { - override fun onBillingServiceDisconnected() { - Log.d(TAG, "Google Play Billing became disconnected.", true) - trySendBlocking(State.Disconnected) - cancel(CancellationException("Google Play Billing became disconnected.", RetryException())) - } - - override fun onBillingSetupFinished(billingResult: BillingResult) { - Log.d(TAG, "onBillingSetupFinished: ${billingResult.responseCode}", true) - if (billingResult.responseCode == BillingResponseCode.OK) { - Log.d(TAG, "Google Play Billing is ready.", true) - trySendBlocking(State.Connected) - } else { - Log.d(TAG, "Google Play Billing failed to connect.", true) - val billingError = BillingError( - billingResponseCode = billingResult.responseCode + private suspend fun startBillingClientConnectionIfNecessary() { + withContext(connectionStateDispatcher) { + val billingConnectionState = billingClient.connectionState + when (billingConnectionState) { + BillingClient.ConnectionState.DISCONNECTED -> { + Log.d(TAG, "BillingClient is disconnected. Starting connection attempt.", true) + connectionState.update { State.Connecting } + billingClient.startConnection( + BillingListener( + onStateUpdate = { new -> + connectionState.update { old -> + Log.d(TAG, "Moving from state $old -> $new", true) + new + } + } ) - trySendBlocking(State.Failure(billingError)) - channel.close() - } + ) } - }) - awaitClose { - Log.d(TAG, "Ending Google Play Billing connection.", true) - billingClient.endConnection() + BillingClient.ConnectionState.CONNECTING -> { + Log.d(TAG, "BillingClient is already connecting. Nothing to do.", true) + } + + BillingClient.ConnectionState.CONNECTED -> { + Log.d(TAG, "BillingClient is already connected. Nothing to do.", true) + } + + BillingClient.ConnectionState.CLOSED -> { + Log.w(TAG, "BillingClient was permanently closed. Cannot proceed.", true) + } + } + } + } + + private class BillingListener( + private val onStateUpdate: (State) -> Unit + ) : BillingClientStateListener { + override fun onBillingServiceDisconnected() { + Log.d(TAG, "BillingListener#onBillingServiceDisconnected", true) + onStateUpdate(State.Disconnected) + } + + override fun onBillingSetupFinished(billingResult: BillingResult) { + Log.d(TAG, "BillingListener#onBillingSetupFinished: ${billingResult.responseCode}", true) + if (billingResult.responseCode == BillingResponseCode.OK) { + Log.d(TAG, "BillingListener#onBillingSetupFinished: ready", true) + onStateUpdate(State.Connected) + } else { + Log.d(TAG, "BillingListener#onBillingSetupFinished: failure", true) + val billingError = BillingError( + billingResponseCode = billingResult.responseCode + ) + onStateUpdate(State.Failure(billingError)) } } }