Avoid potential concurrent modification in KeysController

This commit is contained in:
ravi-signal
2025-07-15 16:15:08 -05:00
committed by GitHub
parent 3f62677176
commit 656b08f3b6

View File

@@ -74,6 +74,8 @@ import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.KeysManager;
import org.whispersystems.textsecuregcm.util.HeaderUtils;
import org.whispersystems.textsecuregcm.util.Util;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
@Path("/v2/keys")
@@ -388,43 +390,36 @@ public class KeysController {
}
final List<Device> devices = parseDeviceId(deviceId, target);
final List<PreKeyResponseItem> responseItems = new ArrayList<>(devices.size());
final List<CompletableFuture<Void>> tasks = devices.stream().map(device -> {
final CompletableFuture<Optional<ECPreKey>> unsignedEcPreKeyFuture =
keysManager.takeEC(targetIdentifier.uuid(), device.getId());
final List<PreKeyResponseItem> responseItems = Flux.fromIterable(devices)
.flatMap(device -> Mono.zip(
Mono.just(device),
Mono.fromFuture(keysManager.takeEC(targetIdentifier.uuid(), device.getId())),
Mono.fromFuture(keysManager.getEcSignedPreKey(targetIdentifier.uuid(), device.getId())),
Mono.fromFuture(keysManager.takePQ(targetIdentifier.uuid(), device.getId()))))
.flatMap(deviceAndPreKeys -> {
final Device device = deviceAndPreKeys.getT1();
final KEMSignedPreKey pqPreKey = deviceAndPreKeys.getT4().orElse(null);
final ECPreKey unsignedEcPreKey = deviceAndPreKeys.getT2().orElse(null);
final ECSignedPreKey signedEcPreKey = deviceAndPreKeys.getT3().orElse(null);
final CompletableFuture<Optional<ECSignedPreKey>> signedEcPreKeyFuture =
keysManager.getEcSignedPreKey(targetIdentifier.uuid(), device.getId());
Metrics.counter(GET_KEYS_COUNTER_NAME, Tags.of(
UserAgentTagUtil.getPlatformTag(userAgent),
Tag.of(IDENTITY_TYPE_TAG_NAME, targetIdentifier.identityType().name()),
Tag.of("oneTimeEcKeyAvailable", String.valueOf(unsignedEcPreKey != null)),
Tag.of("pqKeyAvailable", String.valueOf(pqPreKey != null))))
.increment();
final CompletableFuture<Optional<KEMSignedPreKey>> pqPreKeyFuture =
keysManager.takePQ(targetIdentifier.uuid(), device.getId());
return CompletableFuture.allOf(unsignedEcPreKeyFuture, signedEcPreKeyFuture, pqPreKeyFuture)
.thenAccept(ignored -> {
final KEMSignedPreKey pqPreKey = pqPreKeyFuture.join().orElse(null);
final ECPreKey unsignedEcPreKey = unsignedEcPreKeyFuture.join().orElse(null);
final ECSignedPreKey signedEcPreKey = signedEcPreKeyFuture.join().orElse(null);
Metrics.counter(GET_KEYS_COUNTER_NAME, Tags.of(
UserAgentTagUtil.getPlatformTag(userAgent),
Tag.of(IDENTITY_TYPE_TAG_NAME, targetIdentifier.identityType().name()),
Tag.of("oneTimeEcKeyAvailable", String.valueOf(unsignedEcPreKey != null)),
Tag.of("pqKeyAvailable", String.valueOf(pqPreKey != null))))
.increment();
if (signedEcPreKey != null || unsignedEcPreKey != null || pqPreKey != null) {
final int registrationId = device.getRegistrationId(targetIdentifier.identityType());
responseItems.add(
new PreKeyResponseItem(device.getId(), registrationId, signedEcPreKey, unsignedEcPreKey,
pqPreKey));
}
});
if (signedEcPreKey != null || unsignedEcPreKey != null || pqPreKey != null) {
final int registrationId = device.getRegistrationId(targetIdentifier.identityType());
return Mono.just(new PreKeyResponseItem(
device.getId(), registrationId, signedEcPreKey, unsignedEcPreKey, pqPreKey));
} else {
return Mono.empty();
}
})
.toList();
CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])).join();
.collectList()
.block();
final IdentityKey identityKey = target.getIdentityKey(targetIdentifier.identityType());