diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/KeysController.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/KeysController.java index efbf6d0b4..8549cdd70 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/KeysController.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/KeysController.java @@ -74,6 +74,8 @@ import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.KeysManager; import org.whispersystems.textsecuregcm.util.HeaderUtils; import org.whispersystems.textsecuregcm.util.Util; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; @SuppressWarnings("OptionalUsedAsFieldOrParameterType") @Path("/v2/keys") @@ -388,43 +390,36 @@ public class KeysController { } final List devices = parseDeviceId(deviceId, target); - final List responseItems = new ArrayList<>(devices.size()); - final List> tasks = devices.stream().map(device -> { - final CompletableFuture> unsignedEcPreKeyFuture = - keysManager.takeEC(targetIdentifier.uuid(), device.getId()); + final List responseItems = Flux.fromIterable(devices) + .flatMap(device -> Mono.zip( + Mono.just(device), + Mono.fromFuture(keysManager.takeEC(targetIdentifier.uuid(), device.getId())), + Mono.fromFuture(keysManager.getEcSignedPreKey(targetIdentifier.uuid(), device.getId())), + Mono.fromFuture(keysManager.takePQ(targetIdentifier.uuid(), device.getId())))) + .flatMap(deviceAndPreKeys -> { + final Device device = deviceAndPreKeys.getT1(); + final KEMSignedPreKey pqPreKey = deviceAndPreKeys.getT4().orElse(null); + final ECPreKey unsignedEcPreKey = deviceAndPreKeys.getT2().orElse(null); + final ECSignedPreKey signedEcPreKey = deviceAndPreKeys.getT3().orElse(null); - final CompletableFuture> signedEcPreKeyFuture = - keysManager.getEcSignedPreKey(targetIdentifier.uuid(), device.getId()); + Metrics.counter(GET_KEYS_COUNTER_NAME, Tags.of( + UserAgentTagUtil.getPlatformTag(userAgent), + Tag.of(IDENTITY_TYPE_TAG_NAME, targetIdentifier.identityType().name()), + Tag.of("oneTimeEcKeyAvailable", String.valueOf(unsignedEcPreKey != null)), + Tag.of("pqKeyAvailable", String.valueOf(pqPreKey != null)))) + .increment(); - final CompletableFuture> pqPreKeyFuture = - keysManager.takePQ(targetIdentifier.uuid(), device.getId()); - - return CompletableFuture.allOf(unsignedEcPreKeyFuture, signedEcPreKeyFuture, pqPreKeyFuture) - .thenAccept(ignored -> { - final KEMSignedPreKey pqPreKey = pqPreKeyFuture.join().orElse(null); - final ECPreKey unsignedEcPreKey = unsignedEcPreKeyFuture.join().orElse(null); - final ECSignedPreKey signedEcPreKey = signedEcPreKeyFuture.join().orElse(null); - - Metrics.counter(GET_KEYS_COUNTER_NAME, Tags.of( - UserAgentTagUtil.getPlatformTag(userAgent), - Tag.of(IDENTITY_TYPE_TAG_NAME, targetIdentifier.identityType().name()), - Tag.of("oneTimeEcKeyAvailable", String.valueOf(unsignedEcPreKey != null)), - Tag.of("pqKeyAvailable", String.valueOf(pqPreKey != null)))) - .increment(); - - if (signedEcPreKey != null || unsignedEcPreKey != null || pqPreKey != null) { - final int registrationId = device.getRegistrationId(targetIdentifier.identityType()); - - responseItems.add( - new PreKeyResponseItem(device.getId(), registrationId, signedEcPreKey, unsignedEcPreKey, - pqPreKey)); - } - }); + if (signedEcPreKey != null || unsignedEcPreKey != null || pqPreKey != null) { + final int registrationId = device.getRegistrationId(targetIdentifier.identityType()); + return Mono.just(new PreKeyResponseItem( + device.getId(), registrationId, signedEcPreKey, unsignedEcPreKey, pqPreKey)); + } else { + return Mono.empty(); + } }) - .toList(); - - CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])).join(); + .collectList() + .block(); final IdentityKey identityKey = target.getIdentityKey(targetIdentifier.identityType());