diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/DeviceController.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/DeviceController.java index 7da5afdf3..c0db6bc0f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/DeviceController.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/DeviceController.java @@ -43,7 +43,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; -import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -166,7 +165,7 @@ public class DeviceController { final Account account = accounts.getByAccountIdentifier(auth.accountIdentifier()) .orElseThrow(() -> new WebApplicationException(Response.Status.UNAUTHORIZED)); - accounts.removeDevice(account, deviceId).join(); + accounts.removeDevice(account, deviceId); } /** @@ -294,7 +293,7 @@ public class DeviceController { } try { - return accounts.addDevice(account, new DeviceSpec(accountAttributes.getName(), + final Pair accountAndDevice = accounts.addDevice(account, new DeviceSpec(accountAttributes.getName(), authorizationHeader.getPassword(), signalAgent, capabilities, @@ -307,18 +306,14 @@ public class DeviceController { deviceActivationRequest.pniSignedPreKey(), deviceActivationRequest.aciPqLastResortPreKey(), deviceActivationRequest.pniPqLastResortPreKey()), - linkDeviceRequest.verificationCode()) - .thenApply(accountAndDevice -> new LinkDeviceResponse( - accountAndDevice.first().getIdentifier(IdentityType.ACI), - accountAndDevice.first().getIdentifier(IdentityType.PNI), - accountAndDevice.second().getId())) - .join(); - } catch (final CompletionException e) { - if (e.getCause() instanceof LinkDeviceTokenAlreadyUsedException) { - throw new ForbiddenException(); - } + linkDeviceRequest.verificationCode()); - throw e; + return new LinkDeviceResponse( + accountAndDevice.first().getIdentifier(IdentityType.ACI), + accountAndDevice.first().getIdentifier(IdentityType.PNI), + accountAndDevice.second().getId()); + } catch (final LinkDeviceTokenAlreadyUsedException e) { + throw new ForbiddenException(); } } @@ -377,8 +372,8 @@ public class DeviceController { .map(deviceInfo -> Response.status(Response.Status.OK).entity(deviceInfo).build()) .orElseGet(() -> Response.status(Response.Status.NO_CONTENT).build())) .exceptionally(ExceptionUtils.exceptionallyHandler(IllegalArgumentException.class, - e -> Response.status(Response.Status.BAD_REQUEST).build())) - .whenComplete((response, throwable) -> { + _ -> Response.status(Response.Status.BAD_REQUEST).build())) + .whenComplete((response, _) -> { linkedDeviceListenerCounter.decrementAndGet(); if (response != null && response.getStatus() == Response.Status.OK.getStatusCode()) { @@ -568,7 +563,7 @@ public class DeviceController { .thenApply(maybeTransferArchive -> maybeTransferArchive .map(transferArchive -> Response.status(Response.Status.OK).entity(transferArchive).build()) .orElseGet(() -> Response.status(Response.Status.NO_CONTENT).build())) - .whenComplete((response, throwable) -> { + .whenComplete((response, _) -> { if (response != null && response.getStatus() == Response.Status.OK.getStatusCode()) { accountAndSample.second().stop(Timer.builder(WAIT_FOR_TRANSFER_ARCHIVE_TIMER_NAME) .tags(Tags.of( diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/DevicesGrpcService.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/DevicesGrpcService.java index a16c259c9..fefbf1c6f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/DevicesGrpcService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/DevicesGrpcService.java @@ -82,7 +82,7 @@ public class DevicesGrpcService extends SimpleDevicesGrpc.DevicesImplBase { final byte deviceId = DeviceIdUtil.validate(request.getId()); - accountsManager.removeDevice(getAuthenticatedAccount(), deviceId).join(); + accountsManager.removeDevice(getAuthenticatedAccount(), deviceId); return RemoveDeviceResponse.getDefaultInstance(); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java index acd415c08..1d8642b93 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java @@ -1013,40 +1013,37 @@ public class Accounts { } } - public CompletionStage updateTransactionallyAsync(final Account account, - final Collection additionalWriteItems) { + public void updateTransactionally(final Account account, final Collection additionalWriteItems) + throws ContestedOptimisticLockException { - return AsyncTimerUtil.record(UPDATE_TRANSACTIONALLY_TIMER, () -> { + final Timer.Sample sample = Timer.start(); + + try { final List writeItems = new ArrayList<>(additionalWriteItems.size() + 1); writeItems.add(UpdateAccountSpec.forAccount(accountsTableName, account).transactItem()); writeItems.addAll(additionalWriteItems); - return dynamoDbAsyncClient.transactWriteItems(TransactWriteItemsRequest.builder() - .transactItems(writeItems) - .build()) - .thenApply(_ -> { - account.setVersion(account.getVersion() + 1); - return (Void) null; - }) - .exceptionally(throwable -> { - final Throwable unwrapped = ExceptionUtils.unwrap(throwable); + dynamoDbClient.transactWriteItems(TransactWriteItemsRequest.builder() + .transactItems(writeItems) + .build()); - if (unwrapped instanceof TransactionCanceledException transactionCanceledException) { - if (CONDITIONAL_CHECK_FAILED.equals(transactionCanceledException.cancellationReasons().getFirst().code())) { - throw new ContestedOptimisticLockException(); - } + account.setVersion(account.getVersion() + 1); + } catch (final TransactionCanceledException transactionCanceledException) { + if (CONDITIONAL_CHECK_FAILED.equals(transactionCanceledException.cancellationReasons().getFirst().code())) { + throw new ContestedOptimisticLockException(); + } - if (transactionCanceledException.cancellationReasons() - .stream() - .anyMatch(reason -> TRANSACTION_CONFLICT.equals(reason.code()))) { + if (transactionCanceledException.cancellationReasons() + .stream() + .anyMatch(reason -> TRANSACTION_CONFLICT.equals(reason.code()))) { - throw new ContestedOptimisticLockException(); - } - } + throw new ContestedOptimisticLockException(); + } - throw CompletableFutureUtils.errorAsCompletionException(throwable); - }); - }); + throw transactionCanceledException; + } finally { + sample.stop(UPDATE_TRANSACTIONALLY_TIMER); + } } public TransactWriteItem buildTransactWriteItemForLinkDevice(final String linkDeviceToken, final Duration tokenTtl) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java index 98f083ae8..c8ad6fbfe 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java @@ -439,88 +439,92 @@ public class AccountsManager extends RedisPubSubAdapter implemen return account; } - public CompletableFuture> addDevice(final Account account, final DeviceSpec deviceSpec, final String linkDeviceToken) { - return accountLockManager.withLockAsync(Set.of(account.getPhoneNumberIdentifier()), - () -> addDevice(account.getIdentifier(IdentityType.ACI), deviceSpec, linkDeviceToken, MAX_UPDATE_ATTEMPTS), - accountLockExecutor); + public Pair addDevice(final Account account, final DeviceSpec deviceSpec, final String linkDeviceToken) + throws LinkDeviceTokenAlreadyUsedException { + + try { + return accountLockManager.withLock(Set.of(account.getPhoneNumberIdentifier()), + () -> addDevice(account.getIdentifier(IdentityType.ACI), deviceSpec, linkDeviceToken, MAX_UPDATE_ATTEMPTS), + accountLockExecutor); + } catch (final LinkDeviceTokenAlreadyUsedException | RuntimeException e) { + throw e; + } catch (final Exception e) { + throw new RuntimeException(e); + } } - private CompletableFuture> addDevice(final UUID accountIdentifier, final DeviceSpec deviceSpec, final String linkDeviceToken, final int retries) { - return accounts.getByAccountIdentifierAsync(accountIdentifier) - .thenApply(maybeAccount -> maybeAccount.orElseThrow(ContestedOptimisticLockException::new)) - .thenCompose(account -> { - final byte nextDeviceId = account.getNextDeviceId(); + private Pair addDevice(final UUID accountIdentifier, final DeviceSpec deviceSpec, final String linkDeviceToken, final int retries) + throws LinkDeviceTokenAlreadyUsedException { + final Account account = accounts.getByAccountIdentifier(accountIdentifier) + .orElseThrow(ContestedOptimisticLockException::new); - return CompletableFuture.allOf( - keysManager.deleteSingleUsePreKeys(account.getUuid(), nextDeviceId), - keysManager.deleteSingleUsePreKeys(account.getPhoneNumberIdentifier(), nextDeviceId), - messagesManager.clear(account.getUuid(), nextDeviceId)) - .thenApply(ignored -> new Pair<>(account, nextDeviceId)); - }) - .thenCompose(accountAndNextDeviceId -> { - final Account account = accountAndNextDeviceId.first(); - final byte nextDeviceId = accountAndNextDeviceId.second(); + final byte nextDeviceId = account.getNextDeviceId(); - account.addDevice(deviceSpec.toDevice(nextDeviceId, clock, account.getIdentityKey(IdentityType.ACI))); + CompletableFuture.allOf( + keysManager.deleteSingleUsePreKeys(account.getUuid(), nextDeviceId), + keysManager.deleteSingleUsePreKeys(account.getPhoneNumberIdentifier(), nextDeviceId), + messagesManager.clear(account.getUuid(), nextDeviceId)) + .join(); - final List additionalWriteItems = new ArrayList<>(keysManager.buildWriteItemsForNewDevice( - account.getIdentifier(IdentityType.ACI), - account.getIdentifier(IdentityType.PNI), - nextDeviceId, - deviceSpec.aciSignedPreKey(), - deviceSpec.pniSignedPreKey(), - deviceSpec.aciPqLastResortPreKey(), - deviceSpec.pniPqLastResortPreKey())); + account.addDevice(deviceSpec.toDevice(nextDeviceId, clock, account.getIdentityKey(IdentityType.ACI))); - additionalWriteItems.add(accounts.buildTransactWriteItemForLinkDevice(linkDeviceToken, LINK_DEVICE_TOKEN_EXPIRATION_DURATION)); + final List additionalWriteItems = new ArrayList<>(keysManager.buildWriteItemsForNewDevice( + account.getIdentifier(IdentityType.ACI), + account.getIdentifier(IdentityType.PNI), + nextDeviceId, + deviceSpec.aciSignedPreKey(), + deviceSpec.pniSignedPreKey(), + deviceSpec.aciPqLastResortPreKey(), + deviceSpec.pniPqLastResortPreKey())); - return accounts.updateTransactionallyAsync(account, additionalWriteItems) - .thenApply(ignored -> new Pair<>(account, account.getDevice(nextDeviceId).orElseThrow())); - }) - .thenCompose(updatedAccountAndDevice -> redisDeleteAsync(updatedAccountAndDevice.first()) - .thenApply(ignored -> updatedAccountAndDevice)) - .exceptionallyCompose(throwable -> { - if (ExceptionUtils.unwrap(throwable) instanceof ContestedOptimisticLockException && retries > 0) { - return addDevice(accountIdentifier, deviceSpec, linkDeviceToken, retries - 1); - } else if (ExceptionUtils.unwrap(throwable) instanceof TransactionCanceledException transactionCanceledException) { - // We can be confident the transaction was canceled because the linked device token was already used if the - // "check token" transaction write item is the only one that failed. That SHOULD be the last one in the - // list. - final long cancelledTransactions = transactionCanceledException.cancellationReasons().stream() - .filter(cancellationReason -> !"None".equals(cancellationReason.code())) - .count(); + additionalWriteItems.add(accounts.buildTransactWriteItemForLinkDevice(linkDeviceToken, LINK_DEVICE_TOKEN_EXPIRATION_DURATION)); - final boolean tokenReuseConditionFailed = - "ConditionalCheckFailed".equals(transactionCanceledException.cancellationReasons().getLast().code()); + try { + accounts.updateTransactionally(account, additionalWriteItems); + redisDelete(account); - if (cancelledTransactions == 1 && tokenReuseConditionFailed) { - return CompletableFuture.failedFuture(new LinkDeviceTokenAlreadyUsedException()); + final String key = getLinkedDeviceKey(getLinkDeviceTokenIdentifier(linkDeviceToken)); + final String deviceInfoJson; + + try { + deviceInfoJson = SystemMapper.jsonMapper().writeValueAsString(DeviceInfo.forDevice(account.getDevice(nextDeviceId).orElseThrow())); + } catch (final JsonProcessingException e) { + throw new UncheckedIOException(e); + } + + ResilienceUtil.getGeneralRedisRetry(RETRY_NAME) + .executeCompletionStage(retryExecutor, () -> pubSubRedisClient.withConnection(connection -> + connection.async().set(key, deviceInfoJson, SetArgs.Builder.ex(RECENTLY_ADDED_DEVICE_TTL)))) + .whenComplete((_, pubSubThrowable) -> { + if (pubSubThrowable != null) { + logger.warn("Failed to record recently-created device", pubSubThrowable); } - } + }); - return CompletableFuture.failedFuture(throwable); - }) - .whenComplete((updatedAccountAndDevice, _) -> { - if (updatedAccountAndDevice != null) { - final String key = getLinkedDeviceKey(getLinkDeviceTokenIdentifier(linkDeviceToken)); - final String deviceInfoJson; + return new Pair<>(account, account.getDevice(nextDeviceId).orElseThrow()); + } catch (final ContestedOptimisticLockException e) { + if (retries > 0) { + return addDevice(accountIdentifier, deviceSpec, linkDeviceToken, retries - 1); + } - try { - deviceInfoJson = SystemMapper.jsonMapper().writeValueAsString(DeviceInfo.forDevice(updatedAccountAndDevice.second())); - } catch (final JsonProcessingException e) { - throw new UncheckedIOException(e); - } + throw e; + } catch (final TransactionCanceledException transactionCanceledException) { + // We can be confident the transaction was canceled because the linked device token was already used if the + // "check token" transaction write item is the only one that failed. That SHOULD be the last one in the + // list. + final long cancelledTransactions = transactionCanceledException.cancellationReasons().stream() + .filter(cancellationReason -> !"None".equals(cancellationReason.code())) + .count(); - ResilienceUtil.getGeneralRedisRetry(RETRY_NAME) - .executeCompletionStage(retryExecutor, () -> pubSubRedisClient.withConnection(connection -> - connection.async().set(key, deviceInfoJson, SetArgs.Builder.ex(RECENTLY_ADDED_DEVICE_TTL)))) - .whenComplete((_, pubSubThrowable) -> { - if (pubSubThrowable != null) { - logger.warn("Failed to record recently-created device", pubSubThrowable); - } - }); - } - }); + final boolean tokenReuseConditionFailed = + "ConditionalCheckFailed".equals(transactionCanceledException.cancellationReasons().getLast().code()); + + if (cancelledTransactions == 1 && tokenReuseConditionFailed) { + throw new LinkDeviceTokenAlreadyUsedException(); + } + + throw transactionCanceledException; + } } private Mac getInitializedMac() { @@ -630,53 +634,58 @@ public class AccountsManager extends RedisPubSubAdapter implemen * * @return the updated Account */ - public CompletableFuture removeDevice(final Account account, final byte deviceId) { + public Account removeDevice(final Account account, final byte deviceId) { if (deviceId == Device.PRIMARY_ID) { throw new IllegalArgumentException("Cannot remove primary device"); } - return accountLockManager.withLockAsync(Set.of(account.getPhoneNumberIdentifier()), - () -> removeDevice(account.getIdentifier(IdentityType.ACI), deviceId, MAX_UPDATE_ATTEMPTS), - accountLockExecutor); + try { + return accountLockManager.withLock(Set.of(account.getPhoneNumberIdentifier()), + () -> removeDevice(account.getIdentifier(IdentityType.ACI), deviceId, MAX_UPDATE_ATTEMPTS), + accountLockExecutor); + } catch (final Exception e) { + throw new RuntimeException(e); + } } - private CompletableFuture removeDevice(final UUID accountIdentifier, final byte deviceId, final int retries) { - return accounts.getByAccountIdentifierAsync(accountIdentifier) - .thenApply(maybeAccount -> maybeAccount.orElseThrow(ContestedOptimisticLockException::new)) - .thenCompose(account -> CompletableFuture.allOf( + private Account removeDevice(final UUID accountIdentifier, final byte deviceId, final int retries) { + final Account account = accounts.getByAccountIdentifier(accountIdentifier) + .orElseThrow(ContestedOptimisticLockException::new); + + CompletableFuture.allOf( keysManager.deleteSingleUsePreKeys(account.getUuid(), deviceId), messagesManager.clear(account.getUuid(), deviceId)) - .thenApply(ignored -> account)) - .thenCompose(account -> { - account.removeDevice(deviceId); + .join(); - final List additionalWriteItems = new ArrayList<>( - keysManager.buildWriteItemsForRemovedDevice( - account.getIdentifier(IdentityType.ACI), - account.getIdentifier(IdentityType.PNI), - deviceId)); + account.removeDevice(deviceId); - return accounts.updateTransactionallyAsync(account, additionalWriteItems) - .thenApply(ignored -> account); - }) - .thenCompose(updatedAccount -> redisDeleteAsync(updatedAccount).thenApply(ignored -> updatedAccount)) - // Ensure any messages/single-use pre-keys that came in while we were working are also removed - .thenCompose(account -> CompletableFuture.allOf( - keysManager.deleteSingleUsePreKeys(account.getUuid(), deviceId), - messagesManager.clear(account.getUuid(), deviceId)) - .thenApply(ignored -> account)) - .exceptionallyCompose(throwable -> { - if (ExceptionUtils.unwrap(throwable) instanceof ContestedOptimisticLockException && retries > 0) { - return removeDevice(accountIdentifier, deviceId, retries - 1); - } + final List additionalWriteItems = new ArrayList<>( + keysManager.buildWriteItemsForRemovedDevice( + account.getIdentifier(IdentityType.ACI), + account.getIdentifier(IdentityType.PNI), + deviceId)); - return CompletableFuture.failedFuture(throwable); - }) - .whenComplete((ignored, throwable) -> { - if (throwable == null) { - disconnectionRequestManager.requestDisconnection(accountIdentifier, List.of(deviceId)); - } - }); + try { + accounts.updateTransactionally(account, additionalWriteItems); + + redisDelete(account); + + // Ensure any messages/single-use pre-keys that came in while we were working are also removed + CompletableFuture.allOf( + keysManager.deleteSingleUsePreKeys(account.getUuid(), deviceId), + messagesManager.clear(account.getUuid(), deviceId)) + .join(); + + disconnectionRequestManager.requestDisconnection(accountIdentifier, List.of(deviceId)); + + return account; + } catch (final ContestedOptimisticLockException e) { + if (retries > 0) { + return removeDevice(accountIdentifier, deviceId, retries - 1); + } + + throw e; + } } public Account changeNumber(final Account account, diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java index 6b402a5fe..c49de6889 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java @@ -22,7 +22,6 @@ import java.util.Locale; import java.util.Optional; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.IntStream; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -249,7 +248,7 @@ public class MessagePersister implements Managed { throw new MessagePersistenceException("Could not persist due to an overfull queue. Trimmed primary queue, a subsequent retry may succeed"); } else { logger.warn("Failed to persist queue {}::{} due to overfull queue; will unlink device", accountUuid, deviceId); - accountsManager.removeDevice(account, deviceId).join(); + accountsManager.removeDevice(account, deviceId); } } finally { messagesCache.unlockQueueForPersistence(accountUuid, deviceId); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/RemoveExpiredLinkedDevicesCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/RemoveExpiredLinkedDevicesCommand.java index bfe326019..b9e253f0b 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/RemoveExpiredLinkedDevicesCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/RemoveExpiredLinkedDevicesCommand.java @@ -130,7 +130,7 @@ public class RemoveExpiredLinkedDevicesCommand extends AbstractSinglePassCrawlAc return Flux.fromIterable(expiredDevices) .flatMap(deviceId -> - Mono.fromFuture(() -> getCommandDependencies().accountsManager().removeDevice(account, deviceId)) + Mono.fromRunnable(() -> getCommandDependencies().accountsManager().removeDevice(account, deviceId)) .retryWhen(Retry.backoff(maxRetries, Duration.ofSeconds(1)) .doAfterRetry(ignored -> retryCounter.increment()) .onRetryExhaustedThrow((spec, rs) -> rs.failure())) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/UnlinkDeviceCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/UnlinkDeviceCommand.java index b73d6cde5..4adf6a375 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/UnlinkDeviceCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/UnlinkDeviceCommand.java @@ -5,11 +5,8 @@ package org.whispersystems.textsecuregcm.workers; -import com.fasterxml.jackson.databind.DeserializationFeature; import io.dropwizard.core.Application; -import io.dropwizard.core.cli.EnvironmentCommand; import io.dropwizard.core.setup.Environment; - import java.util.List; import java.util.UUID; import net.sourceforge.argparse4j.impl.Arguments; @@ -64,7 +61,7 @@ public class UnlinkDeviceCommand extends AbstractCommandWithDependencies { for (byte deviceId : deviceIds) { /** see {@link org.whispersystems.textsecuregcm.controllers.DeviceController#removeDevice} */ System.out.format("Removing device %s::%d\n", aci, deviceId); - deps.accountsManager().removeDevice(account, deviceId).join(); + deps.accountsManager().removeDevice(account, deviceId); } } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/UnlinkDevicesWithIdlePrimaryCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/UnlinkDevicesWithIdlePrimaryCommand.java index abb7e4b45..a83ebfb00 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/UnlinkDevicesWithIdlePrimaryCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/UnlinkDevicesWithIdlePrimaryCommand.java @@ -20,6 +20,7 @@ import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuples; import reactor.util.retry.Retry; @@ -96,7 +97,8 @@ public class UnlinkDevicesWithIdlePrimaryCommand extends AbstractSinglePassCrawl .flatMap(accountAndLinkedDeviceId -> { final Mono unlinkDeviceMono = isDryRun ? Mono.empty() - : Mono.fromFuture(() -> accountsManager.removeDevice(accountAndLinkedDeviceId.getT1(), accountAndLinkedDeviceId.getT2())); + : Mono.fromSupplier(() -> accountsManager.removeDevice(accountAndLinkedDeviceId.getT1(), accountAndLinkedDeviceId.getT2())) + .subscribeOn(Schedulers.boundedElastic()); return unlinkDeviceMono .doOnSuccess(ignored -> unlinkDeviceCounter.increment()) diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/DeviceControllerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/DeviceControllerTest.java index 94f1ff667..8d9314d13 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/DeviceControllerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/DeviceControllerTest.java @@ -220,7 +220,7 @@ class DeviceControllerTest { final Optional apnRegistrationId, final Optional gcmRegistrationId, final Optional expectedApnsToken, - final Optional expectedGcmToken) { + final Optional expectedGcmToken) throws LinkDeviceTokenAlreadyUsedException { final Device existingDevice = mock(Device.class); when(existingDevice.getId()).thenReturn(Device.PRIMARY_ID); @@ -249,7 +249,7 @@ class DeviceControllerTest { final Account a = invocation.getArgument(0); final DeviceSpec deviceSpec = invocation.getArgument(1); - return CompletableFuture.completedFuture(new Pair<>(a, deviceSpec.toDevice(NEXT_DEVICE_ID, testClock, aciIdentityKey))); + return new Pair<>(a, deviceSpec.toDevice(NEXT_DEVICE_ID, testClock, aciIdentityKey)); }); when(asyncCommands.set(any(), any(), any())).thenReturn(MockRedisFuture.completedFuture(null)); @@ -298,11 +298,12 @@ class DeviceControllerTest { @CartesianTest void deviceDowngrade(@CartesianTest.Enum final DeviceCapability capability, @CartesianTest.Values(booleans = {true, false}) final boolean accountHasCapability, - @CartesianTest.Values(booleans = {true, false}) final boolean requestHasCapability) { + @CartesianTest.Values(booleans = {true, false}) final boolean requestHasCapability) + throws LinkDeviceTokenAlreadyUsedException { when(accountsManager.getByAccountIdentifier(AuthHelper.VALID_UUID)).thenReturn(Optional.of(account)); when(accountsManager.addDevice(any(), any(), any())) - .thenReturn(CompletableFuture.completedFuture(new Pair<>(mock(Account.class), mock(Device.class)))); + .thenReturn(new Pair<>(mock(Account.class), mock(Device.class))); final Device primaryDevice = mock(Device.class); when(primaryDevice.getId()).thenReturn(Device.PRIMARY_ID); @@ -392,7 +393,7 @@ class DeviceControllerTest { } @Test - void linkDeviceAtomicReusedToken() { + void linkDeviceAtomicReusedToken() throws LinkDeviceTokenAlreadyUsedException { final Device existingDevice = mock(Device.class); when(existingDevice.getId()).thenReturn(Device.PRIMARY_ID); when(account.getDevices()).thenReturn(List.of(existingDevice)); @@ -416,7 +417,7 @@ class DeviceControllerTest { when(accountsManager.checkDeviceLinkingToken(anyString())).thenReturn(Optional.of(AuthHelper.VALID_UUID)); when(accountsManager.addDevice(any(), any(), any())) - .thenReturn(CompletableFuture.failedFuture(new LinkDeviceTokenAlreadyUsedException())); + .thenThrow(new LinkDeviceTokenAlreadyUsedException()); when(asyncCommands.set(any(), any(), any())).thenReturn(MockRedisFuture.completedFuture(null)); @@ -729,7 +730,8 @@ class DeviceControllerTest { @ParameterizedTest @MethodSource - void linkDeviceRegistrationId(final int registrationId, final int pniRegistrationId, final int expectedStatusCode) { + void linkDeviceRegistrationId(final int registrationId, final int pniRegistrationId, final int expectedStatusCode) + throws LinkDeviceTokenAlreadyUsedException { final Device existingDevice = mock(Device.class); when(existingDevice.getId()).thenReturn(Device.PRIMARY_ID); when(account.getDevices()).thenReturn(List.of(existingDevice)); @@ -750,7 +752,7 @@ class DeviceControllerTest { final Account a = invocation.getArgument(0); final DeviceSpec deviceSpec = invocation.getArgument(1); - return CompletableFuture.completedFuture(new Pair<>(a, deviceSpec.toDevice(NEXT_DEVICE_ID, testClock, aciIdentityKey))); + return new Pair<>(a, deviceSpec.toDevice(NEXT_DEVICE_ID, testClock, aciIdentityKey)); }); when(accountsManager.checkDeviceLinkingToken(anyString())).thenReturn(Optional.of(AuthHelper.VALID_UUID)); @@ -835,7 +837,7 @@ class DeviceControllerTest { } @Test - void maxDevicesTest() { + void maxDevicesTest() throws LinkDeviceTokenAlreadyUsedException { final List devices = IntStream.range(0, DeviceController.MAX_DEVICES + 1) .mapToObj(i -> mock(Device.class)) .toList(); @@ -891,7 +893,7 @@ class DeviceControllerTest { final byte deviceId = 2; when(accountsManager.removeDevice(account, deviceId)) - .thenReturn(CompletableFuture.completedFuture(account)); + .thenReturn(account); try (final Response response = resources .getJerseyTest() @@ -932,7 +934,7 @@ class DeviceControllerTest { final byte deviceId = 2; when(accountsManager.removeDevice(AuthHelper.VALID_ACCOUNT_3, deviceId)) - .thenReturn(CompletableFuture.completedFuture(account)); + .thenReturn(account); when(accountsManager.getByAccountIdentifier(AuthHelper.VALID_UUID_3)) .thenReturn(Optional.of(AuthHelper.VALID_ACCOUNT_3)); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/grpc/DevicesGrpcServiceTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/DevicesGrpcServiceTest.java index 3f18f5320..2e2c05d10 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/grpc/DevicesGrpcServiceTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/DevicesGrpcServiceTest.java @@ -25,7 +25,6 @@ import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.stream.Stream; import javax.annotation.Nullable; @@ -72,7 +71,7 @@ class DevicesGrpcServiceTest extends SimpleBaseGrpcTest { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java index 498fcf667..e96ad52fe 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java @@ -160,7 +160,6 @@ class AccountsManagerTest { when(asyncClusterCommands.set(any(), any(), any())).thenReturn(MockRedisFuture.completedFuture("OK")); when(asyncClusterCommands.setex(any(), anyLong(), any())).thenReturn(MockRedisFuture.completedFuture("OK")); - when(accounts.updateTransactionallyAsync(any(), any())).thenReturn(CompletableFuture.completedFuture(null)); when(accounts.delete(any(), any())).thenReturn(CompletableFuture.completedFuture(null)); doAnswer((Answer) invocation -> { @@ -692,15 +691,13 @@ class AccountsManagerTest { Account account = AccountsHelper.generateTestAccount("+14152222222", List.of(primaryDevice, linkedDevice)); - when(accounts.getByAccountIdentifierAsync(account.getUuid())) - .thenReturn(CompletableFuture.completedFuture(Optional.of(account))); - + when(accounts.getByAccountIdentifier(account.getUuid())).thenReturn(Optional.of(account)); when(keysManager.deleteSingleUsePreKeys(any(), anyByte())).thenReturn(CompletableFuture.completedFuture(null)); when(messagesManager.clear(any(), anyByte())).thenReturn(CompletableFuture.completedFuture(null)); assertTrue(account.getDevice(linkedDevice.getId()).isPresent()); - account = accountsManager.removeDevice(account, linkedDevice.getId()).join(); + account = accountsManager.removeDevice(account, linkedDevice.getId()); assertFalse(account.getDevice(linkedDevice.getId()).isPresent()); verify(messagesManager, times(2)).clear(account.getUuid(), linkedDevice.getId()); @@ -855,7 +852,7 @@ class AccountsManagerTest { } @Test - void testAddDevice() { + void testAddDevice() throws LinkDeviceTokenAlreadyUsedException { final String phoneNumber = PhoneNumberUtil.getInstance().format(PhoneNumberUtil.getInstance().getExampleNumber("US"), PhoneNumberUtil.PhoneNumberFormat.E164); @@ -883,8 +880,7 @@ class AccountsManagerTest { when(keysManager.deleteSingleUsePreKeys(any(), anyByte())).thenReturn(CompletableFuture.completedFuture(null)); when(messagesManager.clear(any(), anyByte())).thenReturn(CompletableFuture.completedFuture(null)); - when(accounts.getByAccountIdentifierAsync(aci)).thenReturn(CompletableFuture.completedFuture(Optional.of(account))); - when(accounts.updateTransactionallyAsync(any(), any())).thenReturn(CompletableFuture.completedFuture(null)); + when(accounts.getByAccountIdentifier(aci)).thenReturn(Optional.of(account)); CLOCK.pin(CLOCK.instant().plusSeconds(60)); @@ -902,8 +898,7 @@ class AccountsManagerTest { pniSignedPreKey, aciPqLastResortPreKey, pniPqLastResortPreKey), - accountsManager.generateLinkDeviceToken(aci)) - .join(); + accountsManager.generateLinkDeviceToken(aci)); verify(keysManager).deleteSingleUsePreKeys(aci, nextDeviceId); verify(keysManager).deleteSingleUsePreKeys(pni, nextDeviceId); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java index bb3419ab7..1d03efb29 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java @@ -40,7 +40,6 @@ import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; @@ -648,7 +647,7 @@ class AccountsTest { account.getPrimaryDevice().setName(deviceName); - accounts.updateTransactionallyAsync(account, List.of(TransactWriteItem.builder() + accounts.updateTransactionally(account, List.of(TransactWriteItem.builder() .put(Put.builder() .tableName(Tables.CLIENT_RELEASES.tableName()) .item(Map.of( @@ -656,7 +655,7 @@ class AccountsTest { ClientReleases.ATTR_VERSION, AttributeValues.fromString("test") )) .build()) - .build())).toCompletableFuture().join(); + .build())); assertArrayEquals(deviceName, accounts.getByAccountIdentifier(account.getUuid()).orElseThrow().getPrimaryDevice().getName()); @@ -678,8 +677,8 @@ class AccountsTest { account.setVersion(account.getVersion() - 1); - final CompletionException completionException = assertThrows(CompletionException.class, - () -> accounts.updateTransactionallyAsync(account, List.of(TransactWriteItem.builder() + assertThrows(ContestedOptimisticLockException.class, + () -> accounts.updateTransactionally(account, List.of(TransactWriteItem.builder() .put(Put.builder() .tableName(Tables.CLIENT_RELEASES.tableName()) .item(Map.of( @@ -687,19 +686,17 @@ class AccountsTest { ClientReleases.ATTR_VERSION, AttributeValues.fromString("test") )) .build()) - .build())).toCompletableFuture().join()); - - assertInstanceOf(ContestedOptimisticLockException.class, completionException.getCause()); + .build()))); } @Test void testUpdateTransactionallyWithMockTransactionConflictException() { - final DynamoDbAsyncClient dynamoDbAsyncClient = mock(DynamoDbAsyncClient.class); + final DynamoDbClient dynamoDbClient = mock(DynamoDbClient.class); accounts = new Accounts( clock, - mock(DynamoDbClient.class), - dynamoDbAsyncClient, + dynamoDbClient, + mock(DynamoDbAsyncClient.class), Tables.ACCOUNTS.tableName(), Tables.NUMBERS.tableName(), Tables.PNI_ASSIGNMENTS.tableName(), @@ -707,18 +704,17 @@ class AccountsTest { Tables.DELETED_ACCOUNTS.tableName(), Tables.USED_LINK_DEVICE_TOKENS.tableName()); - when(dynamoDbAsyncClient.transactWriteItems(any(TransactWriteItemsRequest.class))) - .thenReturn(CompletableFuture.failedFuture(TransactionCanceledException.builder() + when(dynamoDbClient.transactWriteItems(any(TransactWriteItemsRequest.class))) + .thenThrow(TransactionCanceledException.builder() .cancellationReasons(CancellationReason.builder() .code("TransactionConflict") .build()) - .build())); + .build()); - Account account = generateAccount("+14151112222", UUID.randomUUID(), UUID.randomUUID()); + final Account account = generateAccount("+14151112222", UUID.randomUUID(), UUID.randomUUID()); - assertThatThrownBy(() -> accounts.updateTransactionallyAsync(account, Collections.emptyList()).toCompletableFuture().join()) - .isInstanceOfAny(CompletionException.class) - .hasCauseInstanceOf(ContestedOptimisticLockException.class); + assertThatThrownBy(() -> accounts.updateTransactionally(account, Collections.emptyList())) + .isInstanceOfAny(ContestedOptimisticLockException.class); } @Test diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AddRemoveDeviceIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AddRemoveDeviceIntegrationTest.java index 8aa9bb787..e251a62d1 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AddRemoveDeviceIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AddRemoveDeviceIntegrationTest.java @@ -2,7 +2,6 @@ package org.whispersystems.textsecuregcm.storage; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -21,7 +20,6 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -174,7 +172,7 @@ public class AddRemoveDeviceIntegrationTest { } @Test - void addDevice() throws InterruptedException { + void addDevice() throws InterruptedException, LinkDeviceTokenAlreadyUsedException { final String number = PhoneNumberUtil.getInstance().format( PhoneNumberUtil.getInstance().getExampleNumber("US"), PhoneNumberUtil.PhoneNumberFormat.E164); @@ -200,8 +198,7 @@ public class AddRemoveDeviceIntegrationTest { KeysHelper.signedECPreKey(2, pniKeyPair), KeysHelper.signedKEMPreKey(3, aciKeyPair), KeysHelper.signedKEMPreKey(4, pniKeyPair)), - accountsManager.generateLinkDeviceToken(account.getIdentifier(IdentityType.ACI))) - .join(); + accountsManager.generateLinkDeviceToken(account.getIdentifier(IdentityType.ACI))); assertEquals(2, updatedAccountAndDevice.first().getDevices().size()); @@ -223,7 +220,7 @@ public class AddRemoveDeviceIntegrationTest { } @Test - void addDeviceReusedToken() throws InterruptedException { + void addDeviceReusedToken() throws InterruptedException, LinkDeviceTokenAlreadyUsedException { final String number = PhoneNumberUtil.getInstance().format( PhoneNumberUtil.getInstance().getExampleNumber("US"), PhoneNumberUtil.PhoneNumberFormat.E164); @@ -251,14 +248,13 @@ public class AddRemoveDeviceIntegrationTest { KeysHelper.signedECPreKey(2, pniKeyPair), KeysHelper.signedKEMPreKey(3, aciKeyPair), KeysHelper.signedKEMPreKey(4, pniKeyPair)), - linkDeviceToken) - .join(); + linkDeviceToken); assertEquals(2, accountsManager.getByAccountIdentifier(updatedAccountAndDevice.first().getUuid()).orElseThrow().getDevices() .size()); - final CompletionException completionException = assertThrows(CompletionException.class, + assertThrows(LinkDeviceTokenAlreadyUsedException.class, () -> accountsManager.addDevice(account, new DeviceSpec( "device-name".getBytes(StandardCharsets.UTF_8), "password", @@ -273,10 +269,7 @@ public class AddRemoveDeviceIntegrationTest { KeysHelper.signedECPreKey(2, pniKeyPair), KeysHelper.signedKEMPreKey(3, aciKeyPair), KeysHelper.signedKEMPreKey(4, pniKeyPair)), - linkDeviceToken) - .join()); - - assertInstanceOf(LinkDeviceTokenAlreadyUsedException.class, completionException.getCause()); + linkDeviceToken)); assertEquals(2, accountsManager.getByAccountIdentifier(updatedAccountAndDevice.first().getUuid()).orElseThrow().getDevices() @@ -284,7 +277,7 @@ public class AddRemoveDeviceIntegrationTest { } @Test - void removeDevice() throws InterruptedException { + void removeDevice() throws InterruptedException, LinkDeviceTokenAlreadyUsedException { final String number = PhoneNumberUtil.getInstance().format( PhoneNumberUtil.getInstance().getExampleNumber("US"), PhoneNumberUtil.PhoneNumberFormat.E164); @@ -310,12 +303,11 @@ public class AddRemoveDeviceIntegrationTest { KeysHelper.signedECPreKey(2, pniKeyPair), KeysHelper.signedKEMPreKey(3, aciKeyPair), KeysHelper.signedKEMPreKey(4, pniKeyPair)), - accountsManager.generateLinkDeviceToken(account.getIdentifier(IdentityType.ACI))) - .join(); + accountsManager.generateLinkDeviceToken(account.getIdentifier(IdentityType.ACI))); final byte addedDeviceId = updatedAccountAndDevice.second().getId(); - final Account updatedAccount = accountsManager.removeDevice(updatedAccountAndDevice.first(), addedDeviceId).join(); + final Account updatedAccount = accountsManager.removeDevice(updatedAccountAndDevice.first(), addedDeviceId); assertEquals(1, updatedAccount.getDevices().size()); @@ -334,7 +326,7 @@ public class AddRemoveDeviceIntegrationTest { } @Test - void removeDevicePartialFailure() throws InterruptedException { + void removeDevicePartialFailure() throws InterruptedException, LinkDeviceTokenAlreadyUsedException { final String number = PhoneNumberUtil.getInstance().format( PhoneNumberUtil.getInstance().getExampleNumber("US"), PhoneNumberUtil.PhoneNumberFormat.E164); @@ -362,16 +354,15 @@ public class AddRemoveDeviceIntegrationTest { KeysHelper.signedECPreKey(2, pniKeyPair), KeysHelper.signedKEMPreKey(3, aciKeyPair), KeysHelper.signedKEMPreKey(4, pniKeyPair)), - accountsManager.generateLinkDeviceToken(account.getIdentifier(IdentityType.ACI))) - .join(); + accountsManager.generateLinkDeviceToken(account.getIdentifier(IdentityType.ACI))); final byte addedDeviceId = updatedAccountAndDevice.second().getId(); when(messagesManager.clear(any(), anyByte())) .thenReturn(CompletableFuture.failedFuture(new RuntimeException("OH NO"))); - assertThrows(CompletionException.class, - () -> accountsManager.removeDevice(updatedAccountAndDevice.first(), addedDeviceId).join()); + assertThrows(RuntimeException.class, + () -> accountsManager.removeDevice(updatedAccountAndDevice.first(), addedDeviceId)); final Account retrievedAccount = accountsManager.getByAccountIdentifierAsync(aci).join().orElseThrow(); @@ -393,7 +384,7 @@ public class AddRemoveDeviceIntegrationTest { } @Test - void waitForNewLinkedDevice() throws InterruptedException { + void waitForNewLinkedDevice() throws InterruptedException, LinkDeviceTokenAlreadyUsedException { final String number = PhoneNumberUtil.getInstance().format( PhoneNumberUtil.getInstance().getExampleNumber("US"), PhoneNumberUtil.PhoneNumberFormat.E164); @@ -433,8 +424,7 @@ public class AddRemoveDeviceIntegrationTest { KeysHelper.signedECPreKey(2, pniKeyPair), KeysHelper.signedKEMPreKey(3, aciKeyPair), KeysHelper.signedKEMPreKey(4, pniKeyPair)), - linkDeviceToken) - .join(); + linkDeviceToken); final Optional maybeDeviceInfo = activeFuture.join(); @@ -447,7 +437,7 @@ public class AddRemoveDeviceIntegrationTest { } @Test - void waitForNewLinkedDeviceAlreadyAdded() throws InterruptedException { + void waitForNewLinkedDeviceAlreadyAdded() throws InterruptedException, LinkDeviceTokenAlreadyUsedException { final String number = PhoneNumberUtil.getInstance().format( PhoneNumberUtil.getInstance().getExampleNumber("US"), PhoneNumberUtil.PhoneNumberFormat.E164); @@ -475,8 +465,7 @@ public class AddRemoveDeviceIntegrationTest { KeysHelper.signedECPreKey(2, pniKeyPair), KeysHelper.signedKEMPreKey(3, aciKeyPair), KeysHelper.signedKEMPreKey(4, pniKeyPair)), - linkDeviceToken) - .join(); + linkDeviceToken); when(messagesManager.getEarliestUndeliveredTimestampForDevice(account.getUuid(), account.getPrimaryDevice())) .thenReturn(CompletableFuture.completedFuture(Optional.empty())); @@ -520,7 +509,7 @@ public class AddRemoveDeviceIntegrationTest { "10_000,10_001,false", // pending message after now }) void waitForMessageFetch(long currentTime, Long oldestMessage, boolean shouldWait) - throws InterruptedException { + throws InterruptedException, LinkDeviceTokenAlreadyUsedException { final String number = PhoneNumberUtil.getInstance().format( PhoneNumberUtil.getInstance().getExampleNumber("US"), PhoneNumberUtil.PhoneNumberFormat.E164); @@ -531,23 +520,21 @@ public class AddRemoveDeviceIntegrationTest { final String linkDeviceToken = accountsManager.generateLinkDeviceToken(UUID.randomUUID()); final String linkDeviceTokenIdentifier = AccountsManager.getLinkDeviceTokenIdentifier(linkDeviceToken); - final Pair updatedAccountAndDevice = - accountsManager.addDevice(account, new DeviceSpec( - "device-name".getBytes(StandardCharsets.UTF_8), - "password", - "OWT", - Set.of(), - 1, - 2, - true, - Optional.empty(), - Optional.empty(), - KeysHelper.signedECPreKey(1, aciKeyPair), - KeysHelper.signedECPreKey(2, pniKeyPair), - KeysHelper.signedKEMPreKey(3, aciKeyPair), - KeysHelper.signedKEMPreKey(4, pniKeyPair)), - linkDeviceToken) - .join(); + accountsManager.addDevice(account, new DeviceSpec( + "device-name".getBytes(StandardCharsets.UTF_8), + "password", + "OWT", + Set.of(), + 1, + 2, + true, + Optional.empty(), + Optional.empty(), + KeysHelper.signedECPreKey(1, aciKeyPair), + KeysHelper.signedECPreKey(2, pniKeyPair), + KeysHelper.signedKEMPreKey(3, aciKeyPair), + KeysHelper.signedKEMPreKey(4, pniKeyPair)), + linkDeviceToken); when(messagesManager.getEarliestUndeliveredTimestampForDevice(account.getUuid(), account.getPrimaryDevice())) .thenReturn(CompletableFuture.completedFuture(Optional.ofNullable(oldestMessage).map(Instant::ofEpochMilli))); @@ -564,7 +551,7 @@ public class AddRemoveDeviceIntegrationTest { @Timeout(value = 10, threadMode = Timeout.ThreadMode.SEPARATE_THREAD) @Test void waitForMessageFetchRetries() - throws InterruptedException { + throws InterruptedException, LinkDeviceTokenAlreadyUsedException { final String number = PhoneNumberUtil.getInstance().format( PhoneNumberUtil.getInstance().getExampleNumber("US"), PhoneNumberUtil.PhoneNumberFormat.E164); @@ -577,21 +564,20 @@ public class AddRemoveDeviceIntegrationTest { clock.pin(Instant.ofEpochMilli(0)); accountsManager.addDevice(account, new DeviceSpec( - "device-name".getBytes(StandardCharsets.UTF_8), - "password", - "OWT", - Set.of(), - 1, - 2, - true, - Optional.empty(), - Optional.empty(), - KeysHelper.signedECPreKey(1, aciKeyPair), - KeysHelper.signedECPreKey(2, pniKeyPair), - KeysHelper.signedKEMPreKey(3, aciKeyPair), - KeysHelper.signedKEMPreKey(4, pniKeyPair)), - linkDeviceToken) - .join(); + "device-name".getBytes(StandardCharsets.UTF_8), + "password", + "OWT", + Set.of(), + 1, + 2, + true, + Optional.empty(), + Optional.empty(), + KeysHelper.signedECPreKey(1, aciKeyPair), + KeysHelper.signedECPreKey(2, pniKeyPair), + KeysHelper.signedKEMPreKey(3, aciKeyPair), + KeysHelper.signedKEMPreKey(4, pniKeyPair)), + linkDeviceToken); when(messagesManager.getEarliestUndeliveredTimestampForDevice(account.getUuid(), account.getPrimaryDevice())) // Has a message older than the message epoch diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java index 3b5b39ba8..9b3b0a4c7 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java @@ -34,13 +34,11 @@ import java.util.List; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.stream.Stream; import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.AfterEach; @@ -100,7 +98,7 @@ class MessagePersisterTest { when(accountsManager.getByAccountIdentifier(DESTINATION_ACCOUNT_UUID)).thenReturn(Optional.of(destinationAccount)); when(accountsManager.removeDevice(any(), anyByte())) - .thenAnswer(invocation -> CompletableFuture.completedFuture(invocation.getArgument(0))); + .thenAnswer(invocation -> invocation.getArgument(0)); when(destinationAccount.getUuid()).thenReturn(DESTINATION_ACCOUNT_UUID); when(destinationAccount.getIdentifier(IdentityType.ACI)).thenReturn(DESTINATION_ACCOUNT_UUID); @@ -399,9 +397,9 @@ class MessagePersisterTest { when(destinationAccount.getDevices()).thenReturn(List.of(primary, activeA, inactiveB, inactiveC, activeD, destination)); when(messagesManager.persistMessages(any(UUID.class), any(), anyList())).thenThrow(ItemCollectionSizeLimitExceededException.builder().build()); - when(accountsManager.removeDevice(destinationAccount, DESTINATION_DEVICE_ID)).thenReturn(CompletableFuture.failedFuture(new TimeoutException())); + when(accountsManager.removeDevice(destinationAccount, DESTINATION_DEVICE_ID)).thenThrow(new RuntimeException()); - assertThrows(CompletionException.class, () -> messagePersister.persistQueue(destinationAccount, DESTINATION_DEVICE, "test")); + assertThrows(RuntimeException.class, () -> messagePersister.persistQueue(destinationAccount, DESTINATION_DEVICE, "test")); } @SuppressWarnings("SameParameterValue") diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/workers/UnlinkDevicesWithIdlePrimaryCommandTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/workers/UnlinkDevicesWithIdlePrimaryCommandTest.java index dfa736798..33ca98a16 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/workers/UnlinkDevicesWithIdlePrimaryCommandTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/workers/UnlinkDevicesWithIdlePrimaryCommandTest.java @@ -19,7 +19,6 @@ import java.time.ZoneId; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import net.sourceforge.argparse4j.inf.Namespace; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -70,7 +69,7 @@ class UnlinkDevicesWithIdlePrimaryCommandTest { void crawlAccounts(final boolean isDryRun) { final AccountsManager accountsManager = mock(AccountsManager.class); when(accountsManager.removeDevice(any(), anyByte())) - .thenReturn(CompletableFuture.completedFuture(null)); + .thenReturn(null); final Duration idleDeviceLastSeenDuration = Duration.ofDays(UnlinkDevicesWithIdlePrimaryCommand.DEFAULT_PRIMARY_IDLE_DAYS).plus(Duration.ofDays(1));