From c63cbc304fad8451318207eb58a5bbd07bd0e98d Mon Sep 17 00:00:00 2001 From: Jon Chambers <63609320+jon-signal@users.noreply.github.com> Date: Thu, 26 Feb 2026 17:31:13 -0500 Subject: [PATCH] Use `simple-grpc` in `DevicesGrpcService` --- .../grpc/DevicesGrpcService.java | 139 +++++++++--------- .../main/proto/org/signal/chat/device.proto | 10 +- .../grpc/DevicesGrpcServiceTest.java | 14 +- 3 files changed, 76 insertions(+), 87 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/DevicesGrpcService.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/DevicesGrpcService.java index 9967133f8..a16c259c9 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/DevicesGrpcService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/DevicesGrpcService.java @@ -9,7 +9,6 @@ import com.google.protobuf.ByteString; import com.google.protobuf.Empty; import java.util.Objects; import java.util.Set; -import java.util.UUID; import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; @@ -17,7 +16,6 @@ import org.signal.chat.device.ClearPushTokenRequest; import org.signal.chat.device.ClearPushTokenResponse; import org.signal.chat.device.GetDevicesRequest; import org.signal.chat.device.GetDevicesResponse; -import org.signal.chat.device.ReactorDevicesGrpc; import org.signal.chat.device.RemoveDeviceRequest; import org.signal.chat.device.RemoveDeviceResponse; import org.signal.chat.device.SetCapabilitiesRequest; @@ -26,6 +24,7 @@ import org.signal.chat.device.SetDeviceNameRequest; import org.signal.chat.device.SetDeviceNameResponse; import org.signal.chat.device.SetPushTokenRequest; import org.signal.chat.device.SetPushTokenResponse; +import org.signal.chat.device.SimpleDevicesGrpc; import org.signal.chat.errors.NotFound; import org.whispersystems.textsecuregcm.auth.grpc.AuthenticatedDevice; import org.whispersystems.textsecuregcm.auth.grpc.AuthenticationUtil; @@ -34,10 +33,8 @@ import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.DeviceCapability; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -public class DevicesGrpcService extends ReactorDevicesGrpc.DevicesImplBase { +public class DevicesGrpcService extends SimpleDevicesGrpc.DevicesImplBase { private final AccountsManager accountsManager; @@ -46,31 +43,33 @@ public class DevicesGrpcService extends ReactorDevicesGrpc.DevicesImplBase { } @Override - public Mono getDevices(final GetDevicesRequest request) { - final AuthenticatedDevice authenticatedDevice = AuthenticationUtil.requireAuthenticatedDevice(); + public GetDevicesResponse getDevices(final GetDevicesRequest request) { + final Account account = getAuthenticatedAccount(); - return getAccount(authenticatedDevice.accountIdentifier()) - .flatMapMany(account -> Flux.fromIterable(account.getDevices())) - .reduce(GetDevicesResponse.newBuilder(), (builder, device) -> { - final GetDevicesResponse.LinkedDevice.Builder linkedDeviceBuilder = GetDevicesResponse.LinkedDevice.newBuilder(); + final GetDevicesResponse.Builder responseBuilder = GetDevicesResponse.newBuilder(); + + account.getDevices().stream() + .map(device -> { + final GetDevicesResponse.LinkedDevice.Builder linkedDeviceBuilder = + GetDevicesResponse.LinkedDevice.newBuilder() + .setId(device.getId()) + .setLastSeen(device.getLastSeen()) + .setRegistrationId(device.getRegistrationId(IdentityType.ACI)) + .setCreatedAtCiphertext(ByteString.copyFrom(device.getCreatedAtCiphertext())); if (device.getName() != null) { linkedDeviceBuilder.setName(ByteString.copyFrom(device.getName())); } - return builder.addDevices(linkedDeviceBuilder - .setId(device.getId()) - .setCreated(device.getCreated()) - .setLastSeen(device.getLastSeen()) - .setRegistrationId(device.getRegistrationId(IdentityType.ACI)) - .setCreatedAtCiphertext(ByteString.copyFrom(device.getCreatedAtCiphertext())) - .build()); + return linkedDeviceBuilder.build(); }) - .map(GetDevicesResponse.Builder::build); + .forEach(responseBuilder::addDevices); + + return responseBuilder.build(); } @Override - public Mono removeDevice(final RemoveDeviceRequest request) { + public RemoveDeviceResponse removeDevice(final RemoveDeviceRequest request) { if (request.getId() == Device.PRIMARY_ID) { throw GrpcExceptions.invalidArguments("cannot remove primary device"); } @@ -83,13 +82,13 @@ public class DevicesGrpcService extends ReactorDevicesGrpc.DevicesImplBase { final byte deviceId = DeviceIdUtil.validate(request.getId()); - return getAccount(authenticatedDevice.accountIdentifier()) - .flatMap(account -> Mono.fromFuture(accountsManager.removeDevice(account, deviceId))) - .thenReturn(RemoveDeviceResponse.newBuilder().build()); + accountsManager.removeDevice(getAuthenticatedAccount(), deviceId).join(); + + return RemoveDeviceResponse.getDefaultInstance(); } @Override - public Mono setDeviceName(final SetDeviceNameRequest request) { + public SetDeviceNameResponse setDeviceName(final SetDeviceNameRequest request) { final AuthenticatedDevice authenticatedDevice = AuthenticationUtil.requireAuthenticatedDevice(); final byte deviceId = DeviceIdUtil.validate(request.getId()); @@ -101,19 +100,19 @@ public class DevicesGrpcService extends ReactorDevicesGrpc.DevicesImplBase { throw GrpcExceptions.badAuthentication("linked device is not authorized to change target device name"); } - return getAccount(authenticatedDevice.accountIdentifier()) - .flatMap(account -> { - if (account.getDevice(deviceId).isEmpty()) { - return Mono.just(SetDeviceNameResponse.newBuilder().setTargetDeviceNotFound(NotFound.getDefaultInstance()).build()); - } - return Mono.fromFuture(() -> accountsManager.updateDeviceAsync(account, deviceId, device -> - device.setName(request.getName().toByteArray()))) - .thenReturn(SetDeviceNameResponse.newBuilder().setSuccess(Empty.getDefaultInstance()).build()); - }); + final Account account = getAuthenticatedAccount(); + + if (account.getDevice(deviceId).isEmpty()) { + return SetDeviceNameResponse.newBuilder().setTargetDeviceNotFound(NotFound.getDefaultInstance()).build(); + } + + accountsManager.updateDevice(account, deviceId, device -> device.setName(request.getName().toByteArray())); + + return SetDeviceNameResponse.newBuilder().setSuccess(Empty.getDefaultInstance()).build(); } @Override - public Mono setPushToken(final SetPushTokenRequest request) { + public SetPushTokenResponse setPushToken(final SetPushTokenRequest request) { final AuthenticatedDevice authenticatedDevice = AuthenticationUtil.requireAuthenticatedDevice(); @Nullable final String apnsToken; @@ -136,62 +135,58 @@ public class DevicesGrpcService extends ReactorDevicesGrpc.DevicesImplBase { default -> throw GrpcExceptions.fieldViolation("token_request", "No tokens specified"); } - return getAccount(authenticatedDevice.accountIdentifier()) - .flatMap(account -> { - final Device device = account.getDevice(authenticatedDevice.deviceId()) - .orElseThrow(() -> GrpcExceptions.invalidCredentials("invalid credentials")); + final Account account = getAuthenticatedAccount(); - final boolean tokenUnchanged = - Objects.equals(device.getApnId(), apnsToken) && - Objects.equals(device.getGcmId(), fcmToken); + final Device device = account.getDevice(authenticatedDevice.deviceId()) + .orElseThrow(() -> GrpcExceptions.invalidCredentials("invalid credentials")); - return tokenUnchanged - ? Mono.empty() - : Mono.fromFuture(() -> accountsManager.updateDeviceAsync(account, authenticatedDevice.deviceId(), d -> { - d.setApnId(apnsToken); - d.setGcmId(fcmToken); - d.setFetchesMessages(false); - })); - }) - .thenReturn(SetPushTokenResponse.newBuilder().build()); + if (!Objects.equals(device.getApnId(), apnsToken) || !Objects.equals(device.getGcmId(), fcmToken)) { + accountsManager.updateDevice(account, authenticatedDevice.deviceId(), d -> { + d.setApnId(apnsToken); + d.setGcmId(fcmToken); + d.setFetchesMessages(false); + }); + } + + return SetPushTokenResponse.getDefaultInstance(); } @Override - public Mono clearPushToken(final ClearPushTokenRequest request) { + public ClearPushTokenResponse clearPushToken(final ClearPushTokenRequest request) { final AuthenticatedDevice authenticatedDevice = AuthenticationUtil.requireAuthenticatedDevice(); + final Account account = getAuthenticatedAccount(); - return getAccount(authenticatedDevice.accountIdentifier()) - .flatMap(account -> Mono.fromFuture(() -> accountsManager.updateDeviceAsync(account, authenticatedDevice.deviceId(), device -> { - if (StringUtils.isNotBlank(device.getApnId())) { - device.setUserAgent(device.isPrimary() ? "OWI" : "OWP"); - } else if (StringUtils.isNotBlank(device.getGcmId())) { - device.setUserAgent("OWA"); - } + accountsManager.updateDevice(account, authenticatedDevice.deviceId(), device -> { + if (StringUtils.isNotBlank(device.getApnId())) { + device.setUserAgent(device.isPrimary() ? "OWI" : "OWP"); + } else if (StringUtils.isNotBlank(device.getGcmId())) { + device.setUserAgent("OWA"); + } - device.setApnId(null); - device.setGcmId(null); - device.setFetchesMessages(true); - }))) - .thenReturn(ClearPushTokenResponse.newBuilder().build()); + device.setApnId(null); + device.setGcmId(null); + device.setFetchesMessages(true); + }); + + return ClearPushTokenResponse.getDefaultInstance(); } @Override - public Mono setCapabilities(final SetCapabilitiesRequest request) { + public SetCapabilitiesResponse setCapabilities(final SetCapabilitiesRequest request) { final AuthenticatedDevice authenticatedDevice = AuthenticationUtil.requireAuthenticatedDevice(); final Set capabilities = request.getCapabilitiesList().stream() .map(DeviceCapabilityUtil::fromGrpcDeviceCapability) .collect(Collectors.toSet()); - return getAccount(authenticatedDevice.accountIdentifier()) - .flatMap(account -> - Mono.fromFuture(() -> accountsManager.updateDeviceAsync(account, authenticatedDevice.deviceId(), - d -> d.setCapabilities(capabilities)))) - .thenReturn(SetCapabilitiesResponse.newBuilder().build()); + accountsManager.updateDevice(getAuthenticatedAccount(), authenticatedDevice.deviceId(), + device -> device.setCapabilities(capabilities)); + + return SetCapabilitiesResponse.getDefaultInstance(); } - private Mono getAccount(final UUID accountIdentifier) { - return Mono.fromFuture(() -> accountsManager.getByAccountIdentifierAsync(accountIdentifier)) - .map(maybeAccount -> maybeAccount.orElseThrow(() -> GrpcExceptions.invalidCredentials("invalid credentials"))); + private Account getAuthenticatedAccount() { + return accountsManager.getByAccountIdentifier(AuthenticationUtil.requireAuthenticatedDevice().accountIdentifier()) + .orElseThrow(() -> GrpcExceptions.invalidCredentials("invalid credentials")); } } diff --git a/service/src/main/proto/org/signal/chat/device.proto b/service/src/main/proto/org/signal/chat/device.proto index 15c8e02a6..977231dde 100644 --- a/service/src/main/proto/org/signal/chat/device.proto +++ b/service/src/main/proto/org/signal/chat/device.proto @@ -57,21 +57,17 @@ message GetDevicesResponse { // this device. bytes name = 2; - // The time, in milliseconds since the epoch, at which this device was - // attached to its parent account. - uint64 created = 3; - // The approximate time, in milliseconds since the epoch, at which this // device last connected to the server. - uint64 last_seen = 4; + uint64 last_seen = 3; // The registration ID of the given device. - uint32 registration_id = 5 [(require.range).max = 0x3fff]; + uint32 registration_id = 4 [(require.range).max = 0x3fff]; // A sequence of bytes that encodes the time, // in milliseconds since the epoch, at which this device was // attached to its parent account. - bytes created_at_ciphertext = 6; + bytes created_at_ciphertext = 5; } // A list of devices linked to the authenticated account. diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/grpc/DevicesGrpcServiceTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/DevicesGrpcServiceTest.java index 26fcc1693..3f18f5320 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/grpc/DevicesGrpcServiceTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/DevicesGrpcServiceTest.java @@ -68,23 +68,23 @@ class DevicesGrpcServiceTest extends SimpleBaseGrpcTest { final Account account = invocation.getArgument(0); final Consumer updater = invocation.getArgument(1); updater.accept(account); - return CompletableFuture.completedFuture(account); + return account; }); - when(accountsManager.updateDeviceAsync(any(), anyByte(), any())) + when(accountsManager.updateDevice(any(), anyByte(), any())) .thenAnswer(invocation -> { final Account account = invocation.getArgument(0); final Device device = account.getDevice(invocation.getArgument(1)).orElseThrow(); @@ -92,7 +92,7 @@ class DevicesGrpcServiceTest extends SimpleBaseGrpcTest