diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountLockManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountLockManager.java index 50b899b87..3f0e1d5f4 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountLockManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountLockManager.java @@ -10,12 +10,11 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; +import org.whispersystems.textsecuregcm.util.ThrowingSupplier; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; public class AccountLockManager { @@ -54,11 +53,11 @@ public class AccountLockManager { * * @return the value returned by the given {@code task} * - * @throws Exception if an exception is thrown by the given {@code task} + * @throws E if an exception is thrown by the given {@code task} */ - public V withLock(final Set phoneNumberIdentifiers, - final Callable task, - final Executor lockAcquisitionExecutor) throws Exception { + public V withLock(final Set phoneNumberIdentifiers, + final ThrowingSupplier task, + final Executor lockAcquisitionExecutor) throws E { if (phoneNumberIdentifiers.isEmpty()) { throw new IllegalArgumentException("List of PNIs to lock must not be empty"); @@ -82,7 +81,7 @@ public class AccountLockManager { } }, lockAcquisitionExecutor).join(); - return task.call(); + return task.get(); } finally { CompletableFuture.runAsync(() -> { for (final LockItem lockItem : lockItems) { @@ -93,43 +92,4 @@ public class AccountLockManager { }, lockAcquisitionExecutor).join(); } } - - /** - * Acquires a distributed, pessimistic lock for the accounts identified by the given phone number identifiers. By - * design, the accounts need not actually exist in order to acquire a lock; this allows lock acquisition for - * operations that span account lifecycle changes (like deleting an account or changing a phone number). The given - * task runs once locks for all given identifiers have been acquired, and the locks are released as soon as the task - * completes by any means. - * - * @param phoneNumberIdentifiers the phone number identifiers for which to acquire a distributed, pessimistic lock - * @param taskSupplier a supplier for the task to execute once locks have been acquired - * @param executor the executor on which to acquire and release locks - * @return a future that completes normally when the given task has executed successfully and all locks have been - * released; the returned future may fail with an {@link InterruptedException} if interrupted while acquiring a lock - */ - public CompletableFuture withLockAsync(final Set phoneNumberIdentifiers, - final Supplier> taskSupplier, final Executor executor) { - - if (phoneNumberIdentifiers.isEmpty()) { - throw new IllegalArgumentException("List of PNIs to lock must not be empty"); - } - - final List lockItems = new ArrayList<>(phoneNumberIdentifiers.size()); - - return CompletableFuture.runAsync(() -> { - for (final UUID pni : phoneNumberIdentifiers) { - try { - lockItems.add(lockClient.acquireLock(AcquireLockOptions.builder(pni.toString()) - .withAcquireReleasedLocksConsistently(true) - .build())); - } catch (final InterruptedException e) { - throw new CompletionException(e); - } - } - }, executor) - .thenCompose(ignored -> taskSupplier.get()) - .whenCompleteAsync((ignored, throwable) -> lockItems.forEach(lockItem -> lockClient.releaseLock(ReleaseLockOptions.builder(lockItem) - .withBestEffort(true) - .build())), executor); - } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java index b47ad35a8..4bfeaef81 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java @@ -298,13 +298,9 @@ public class AccountsManager extends RedisPubSubAdapter implemen try { return accountLockManager.withLock(Set.of(pni), () -> create(number, pni, accountAttributes, accountBadges, aciIdentityKey, pniIdentityKey, primaryDeviceSpec, userAgent), accountLockExecutor); - } catch (final Exception e) { - if (e instanceof RuntimeException runtimeException) { - throw runtimeException; - } - + } catch (final RuntimeException e) { logger.error("Unexpected exception while creating account", e); - throw new RuntimeException(e); + throw e; } }); } @@ -442,15 +438,9 @@ public class AccountsManager extends RedisPubSubAdapter implemen public Pair addDevice(final Account account, final DeviceSpec deviceSpec, final String linkDeviceToken) throws LinkDeviceTokenAlreadyUsedException { - try { - return accountLockManager.withLock(Set.of(account.getPhoneNumberIdentifier()), - () -> addDevice(account.getIdentifier(IdentityType.ACI), deviceSpec, linkDeviceToken, MAX_UPDATE_ATTEMPTS), - accountLockExecutor); - } catch (final LinkDeviceTokenAlreadyUsedException | RuntimeException e) { - throw e; - } catch (final Exception e) { - throw new RuntimeException(e); - } + return accountLockManager.withLock(Set.of(account.getPhoneNumberIdentifier()), + () -> addDevice(account.getIdentifier(IdentityType.ACI), deviceSpec, linkDeviceToken, MAX_UPDATE_ATTEMPTS), + accountLockExecutor); } private Pair addDevice(final UUID accountIdentifier, final DeviceSpec deviceSpec, final String linkDeviceToken, final int retries) @@ -639,13 +629,9 @@ public class AccountsManager extends RedisPubSubAdapter implemen throw new IllegalArgumentException("Cannot remove primary device"); } - try { - return accountLockManager.withLock(Set.of(account.getPhoneNumberIdentifier()), - () -> removeDevice(account.getIdentifier(IdentityType.ACI), deviceId, MAX_UPDATE_ATTEMPTS), - accountLockExecutor); - } catch (final Exception e) { - throw new RuntimeException(e); - } + return accountLockManager.withLock(Set.of(account.getPhoneNumberIdentifier()), + () -> removeDevice(account.getIdentifier(IdentityType.ACI), deviceId, MAX_UPDATE_ATTEMPTS), + accountLockExecutor); } private Account removeDevice(final UUID accountIdentifier, final byte deviceId, final int retries) { @@ -700,15 +686,9 @@ public class AccountsManager extends RedisPubSubAdapter implemen try { return accountLockManager.withLock(new HashSet<>(List.of(account.getPhoneNumberIdentifier(), targetPhoneNumberIdentifier)), () -> changeNumber(account, targetNumber, targetPhoneNumberIdentifier, pniIdentityKey, pniSignedPreKeys, pniPqLastResortPreKeys, pniRegistrationIds), accountLockExecutor); - } catch (final Exception e) { - if (e instanceof MismatchedDevicesException mismatchedDevicesException) { - throw mismatchedDevicesException; - } if (e instanceof RuntimeException runtimeException) { - throw runtimeException; - } - + } catch (final RuntimeException e) { logger.error("Unexpected exception when changing phone number", e); - throw new RuntimeException(e); + throw e; } } @@ -1146,14 +1126,9 @@ public class AccountsManager extends RedisPubSubAdapter implemen COUNTRY_CODE_TAG_NAME, Util.getCountryCode(account.getNumber()), DELETION_REASON_TAG_NAME, deletionReason.tagValue) .increment(); - } catch (final Exception e) { + } catch (final RuntimeException e) { logger.warn("Failed to delete account", e); - - if (e instanceof RuntimeException runtimeException) { - throw runtimeException; - } - - throw new RuntimeException(e); + throw e; } finally { sample.stop(deleteTimer); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/util/ThrowingSupplier.java b/service/src/main/java/org/whispersystems/textsecuregcm/util/ThrowingSupplier.java new file mode 100644 index 000000000..37937fa66 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/util/ThrowingSupplier.java @@ -0,0 +1,20 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.util; + +/// Represents a supplier of results that may throw an exception. +/// +/// There is no requirement that a new or distinct result be returned each time the supplier is invoked. +@FunctionalInterface +public interface ThrowingSupplier { + + /// Gets a result, potentially throwing an exception. + /// + /// @return a result + /// + /// @throws E at the discretion of the implementation + T get() throws E; +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountLockManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountLockManagerTest.java index 947ee2706..f2ca703ea 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountLockManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountLockManagerTest.java @@ -12,7 +12,6 @@ import com.amazonaws.services.dynamodbv2.ReleaseLockOptions; import java.util.Collections; import java.util.Set; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -72,35 +71,4 @@ class AccountLockManagerTest { executor)); verify(task, never()).run(); } - - @Test - void withLockAsync() throws InterruptedException { - accountLockManager.withLockAsync( - Set.of(FIRST_PNI, SECOND_PNI), () -> CompletableFuture.completedFuture(null), executor).join(); - - verify(lockClient, times(2)).acquireLock(any()); - verify(lockClient, times(2)).releaseLock(any(ReleaseLockOptions.class)); - } - - @Test - void withLockAsyncTaskThrowsException() throws InterruptedException { - assertThrows(RuntimeException.class, - () -> accountLockManager.withLockAsync( - Set.of(FIRST_PNI, SECOND_PNI), () -> CompletableFuture.failedFuture(new RuntimeException()), executor) - .join()); - - verify(lockClient, times(2)).acquireLock(any()); - verify(lockClient, times(2)).releaseLock(any(ReleaseLockOptions.class)); - } - - @Test - void withLockAsyncEmptyList() { - final Runnable task = mock(Runnable.class); - - assertThrows(IllegalArgumentException.class, - () -> accountLockManager.withLockAsync(Collections.emptySet(), () -> CompletableFuture.completedFuture(null), - executor)); - - verify(task, never()).run(); - } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerConcurrentModificationIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerConcurrentModificationIntegrationTest.java index fd6730fe1..70b0c7328 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerConcurrentModificationIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerConcurrentModificationIntegrationTest.java @@ -28,7 +28,6 @@ import java.util.ArrayList; import java.util.Optional; import java.util.Set; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingDeque; @@ -36,7 +35,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import java.util.function.Supplier; import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -59,6 +57,7 @@ import org.whispersystems.textsecuregcm.tests.util.JsonHelpers; import org.whispersystems.textsecuregcm.tests.util.KeysHelper; import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper; import org.whispersystems.textsecuregcm.util.Pair; +import org.whispersystems.textsecuregcm.util.ThrowingSupplier; class AccountsManagerConcurrentModificationIntegrationTest { @@ -103,17 +102,10 @@ class AccountsManagerConcurrentModificationIntegrationTest { final AccountLockManager accountLockManager = mock(AccountLockManager.class); doAnswer(invocation -> { - final Callable task = invocation.getArgument(1); - return task.call(); + final ThrowingSupplier task = invocation.getArgument(1); + return task.get(); }).when(accountLockManager).withLock(anySet(), any(), any()); - when(accountLockManager.withLockAsync(anySet(), any(), any())).thenAnswer(invocation -> { - final Supplier> taskSupplier = invocation.getArgument(1); - taskSupplier.get().join(); - - return CompletableFuture.completedFuture(null); - }); - final PhoneNumberIdentifiers phoneNumberIdentifiers = mock(PhoneNumberIdentifiers.class); when(phoneNumberIdentifiers.getPhoneNumberIdentifier(anyString())) .thenAnswer((Answer>) _ -> CompletableFuture.completedFuture(UUID.randomUUID())); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java index c28536b6e..ffb417dd4 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java @@ -53,12 +53,10 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.function.Consumer; -import java.util.function.Supplier; import java.util.stream.Stream; import javax.annotation.Nullable; import javax.crypto.spec.SecretKeySpec; @@ -99,6 +97,7 @@ import org.whispersystems.textsecuregcm.tests.util.RedisServerHelper; import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.textsecuregcm.util.TestClock; import org.whispersystems.textsecuregcm.util.TestRandomUtil; +import org.whispersystems.textsecuregcm.util.ThrowingSupplier; @Timeout(value = 10, threadMode = Timeout.ThreadMode.SEPARATE_THREAD) class AccountsManagerTest { @@ -187,15 +186,10 @@ class AccountsManagerTest { final AccountLockManager accountLockManager = mock(AccountLockManager.class); doAnswer(invocation -> { - final Callable task = invocation.getArgument(1); - return task.call(); + final ThrowingSupplier task = invocation.getArgument(1); + return task.get(); }).when(accountLockManager).withLock(anySet(), any(), any()); - when(accountLockManager.withLockAsync(anySet(), any(), any())).thenAnswer(invocation -> { - final Supplier> taskSupplier = invocation.getArgument(1); - return taskSupplier.get(); - }); - final RegistrationRecoveryPasswordsManager registrationRecoveryPasswordsManager = mock(RegistrationRecoveryPasswordsManager.class); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerUsernameIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerUsernameIntegrationTest.java index 17beea9d1..199de8bbf 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerUsernameIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerUsernameIntegrationTest.java @@ -30,10 +30,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; -import java.util.function.Supplier; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -47,6 +45,7 @@ import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables; import org.whispersystems.textsecuregcm.tests.util.AccountsHelper; import org.whispersystems.textsecuregcm.util.AttributeValues; import org.whispersystems.textsecuregcm.util.TestRandomUtil; +import org.whispersystems.textsecuregcm.util.ThrowingSupplier; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; @@ -119,17 +118,10 @@ class AccountsManagerUsernameIntegrationTest { final AccountLockManager accountLockManager = mock(AccountLockManager.class); doAnswer(invocation -> { - final Callable task = invocation.getArgument(1); - return task.call(); + final ThrowingSupplier task = invocation.getArgument(1); + return task.get(); }).when(accountLockManager).withLock(anySet(), any(), any()); - when(accountLockManager.withLockAsync(anySet(), any(), any())).thenAnswer(invocation -> { - final Supplier> taskSupplier = invocation.getArgument(1); - taskSupplier.get().join(); - - return CompletableFuture.completedFuture(null); - }); - final PhoneNumberIdentifiers phoneNumberIdentifiers = new PhoneNumberIdentifiers(DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), Tables.PNI.tableName());