mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-20 12:08:05 +01:00
Convert Device.id from long to byte
This commit is contained in:
@@ -47,7 +47,7 @@ public class AuthEnablementRefreshRequirementProvider implements WebsocketRefres
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static Map<Long, Boolean> buildDevicesEnabledMap(final Account account) {
|
||||
static Map<Byte, Boolean> buildDevicesEnabledMap(final Account account) {
|
||||
return account.getDevices().stream().collect(Collectors.toMap(Device::getId, Device::isEnabled));
|
||||
}
|
||||
|
||||
@@ -68,17 +68,17 @@ public class AuthEnablementRefreshRequirementProvider implements WebsocketRefres
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Pair<UUID, Long>> handleRequestFinished(final RequestEvent requestEvent) {
|
||||
public List<Pair<UUID, Byte>> handleRequestFinished(final RequestEvent requestEvent) {
|
||||
// Now that the request is finished, check whether `isEnabled` changed for any of the devices. If the value did
|
||||
// change or if a devices was added or removed, all devices must disconnect and reauthenticate.
|
||||
if (requestEvent.getContainerRequest().getProperty(DEVICES_ENABLED) != null) {
|
||||
|
||||
@SuppressWarnings("unchecked") final Map<Long, Boolean> initialDevicesEnabled =
|
||||
(Map<Long, Boolean>) requestEvent.getContainerRequest().getProperty(DEVICES_ENABLED);
|
||||
@SuppressWarnings("unchecked") final Map<Byte, Boolean> initialDevicesEnabled =
|
||||
(Map<Byte, Boolean>) requestEvent.getContainerRequest().getProperty(DEVICES_ENABLED);
|
||||
|
||||
return accountsManager.getByAccountIdentifier((UUID) requestEvent.getContainerRequest().getProperty(ACCOUNT_UUID)).map(account -> {
|
||||
final Set<Long> deviceIdsToDisplace;
|
||||
final Map<Long, Boolean> currentDevicesEnabled = buildDevicesEnabledMap(account);
|
||||
final Set<Byte> deviceIdsToDisplace;
|
||||
final Map<Byte, Boolean> currentDevicesEnabled = buildDevicesEnabledMap(account);
|
||||
|
||||
if (!initialDevicesEnabled.equals(currentDevicesEnabled)) {
|
||||
deviceIdsToDisplace = new HashSet<>(initialDevicesEnabled.keySet());
|
||||
|
||||
@@ -52,9 +52,9 @@ public class BaseAccountAuthenticator {
|
||||
this.clock = clock;
|
||||
}
|
||||
|
||||
static Pair<String, Long> getIdentifierAndDeviceId(final String basicUsername) {
|
||||
static Pair<String, Byte> getIdentifierAndDeviceId(final String basicUsername) {
|
||||
final String identifier;
|
||||
final long deviceId;
|
||||
final byte deviceId;
|
||||
|
||||
final int deviceIdSeparatorIndex = basicUsername.indexOf(DEVICE_ID_SEPARATOR);
|
||||
|
||||
@@ -63,7 +63,7 @@ public class BaseAccountAuthenticator {
|
||||
deviceId = Device.PRIMARY_ID;
|
||||
} else {
|
||||
identifier = basicUsername.substring(0, deviceIdSeparatorIndex);
|
||||
deviceId = Long.parseLong(basicUsername.substring(deviceIdSeparatorIndex + 1));
|
||||
deviceId = Byte.parseByte(basicUsername.substring(deviceIdSeparatorIndex + 1));
|
||||
}
|
||||
|
||||
return new Pair<>(identifier, deviceId);
|
||||
@@ -75,9 +75,9 @@ public class BaseAccountAuthenticator {
|
||||
|
||||
try {
|
||||
final UUID accountUuid;
|
||||
final long deviceId;
|
||||
final byte deviceId;
|
||||
{
|
||||
final Pair<String, Long> identifierAndDeviceId = getIdentifierAndDeviceId(basicCredentials.getUsername());
|
||||
final Pair<String, Byte> identifierAndDeviceId = getIdentifierAndDeviceId(basicCredentials.getUsername());
|
||||
|
||||
accountUuid = UUID.fromString(identifierAndDeviceId.first());
|
||||
deviceId = identifierAndDeviceId.second();
|
||||
|
||||
@@ -11,10 +11,10 @@ import org.whispersystems.textsecuregcm.util.Pair;
|
||||
public class BasicAuthorizationHeader {
|
||||
|
||||
private final String username;
|
||||
private final long deviceId;
|
||||
private final byte deviceId;
|
||||
private final String password;
|
||||
|
||||
private BasicAuthorizationHeader(final String username, final long deviceId, final String password) {
|
||||
private BasicAuthorizationHeader(final String username, final byte deviceId, final String password) {
|
||||
this.username = username;
|
||||
this.deviceId = deviceId;
|
||||
this.password = password;
|
||||
@@ -59,9 +59,9 @@ public class BasicAuthorizationHeader {
|
||||
final String usernameComponent = credentials.substring(0, credentialSeparatorIndex);
|
||||
|
||||
final String username;
|
||||
final long deviceId;
|
||||
final byte deviceId;
|
||||
{
|
||||
final Pair<String, Long> identifierAndDeviceId =
|
||||
final Pair<String, Byte> identifierAndDeviceId =
|
||||
BaseAccountAuthenticator.getIdentifierAndDeviceId(usernameComponent);
|
||||
|
||||
username = identifierAndDeviceId.first();
|
||||
|
||||
@@ -29,7 +29,7 @@ public class OptionalAccess {
|
||||
verify(requestAccount, accessKey, targetAccount);
|
||||
|
||||
if (!deviceSelector.equals("*")) {
|
||||
long deviceId = Long.parseLong(deviceSelector);
|
||||
byte deviceId = Byte.parseByte(deviceSelector);
|
||||
|
||||
Optional<Device> targetDevice = targetAccount.get().getDevice(deviceId);
|
||||
|
||||
|
||||
@@ -26,7 +26,7 @@ public class PhoneNumberChangeRefreshRequirementProvider implements WebsocketRef
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Pair<UUID, Long>> handleRequestFinished(final RequestEvent requestEvent) {
|
||||
public List<Pair<UUID, Byte>> handleRequestFinished(final RequestEvent requestEvent) {
|
||||
final String initialNumber = (String) requestEvent.getContainerRequest().getProperty(INITIAL_NUMBER_KEY);
|
||||
|
||||
if (initialNumber != null) {
|
||||
|
||||
@@ -157,7 +157,7 @@ public class RegistrationLockVerificationManager {
|
||||
registrationRecoveryPasswordsManager.removeForNumber(updatedAccount.getNumber());
|
||||
}
|
||||
|
||||
final List<Long> deviceIds = updatedAccount.getDevices().stream().map(Device::getId).toList();
|
||||
final List<Byte> deviceIds = updatedAccount.getDevices().stream().map(Device::getId).toList();
|
||||
clientPresenceManager.disconnectAllPresences(updatedAccount.getUuid(), deviceIds);
|
||||
|
||||
try {
|
||||
|
||||
@@ -30,5 +30,5 @@ public interface WebsocketRefreshRequirementProvider {
|
||||
* @return a list of pairs of account UUID/device ID pairs identifying websockets that need to be refreshed as a
|
||||
* result of the observed request
|
||||
*/
|
||||
List<Pair<UUID, Long>> handleRequestFinished(RequestEvent requestEvent);
|
||||
List<Pair<UUID, Byte>> handleRequestFinished(RequestEvent requestEvent);
|
||||
}
|
||||
|
||||
@@ -7,5 +7,5 @@ package org.whispersystems.textsecuregcm.auth.grpc;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
public record AuthenticatedDevice(UUID accountIdentifier, long deviceId) {
|
||||
public record AuthenticatedDevice(UUID accountIdentifier, byte deviceId) {
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ import org.whispersystems.textsecuregcm.storage.Device;
|
||||
public class AuthenticationUtil {
|
||||
|
||||
static final Context.Key<UUID> CONTEXT_AUTHENTICATED_ACCOUNT_IDENTIFIER_KEY = Context.key("authenticated-aci");
|
||||
static final Context.Key<Long> CONTEXT_AUTHENTICATED_DEVICE_IDENTIFIER_KEY = Context.key("authenticated-device-id");
|
||||
static final Context.Key<Byte> CONTEXT_AUTHENTICATED_DEVICE_IDENTIFIER_KEY = Context.key("authenticated-device-id");
|
||||
|
||||
/**
|
||||
* Returns the account/device authenticated in the current gRPC context or throws an "unauthenticated" exception if
|
||||
@@ -30,7 +30,7 @@ public class AuthenticationUtil {
|
||||
*/
|
||||
public static AuthenticatedDevice requireAuthenticatedDevice() {
|
||||
@Nullable final UUID accountIdentifier = CONTEXT_AUTHENTICATED_ACCOUNT_IDENTIFIER_KEY.get();
|
||||
@Nullable final Long deviceId = CONTEXT_AUTHENTICATED_DEVICE_IDENTIFIER_KEY.get();
|
||||
@Nullable final Byte deviceId = CONTEXT_AUTHENTICATED_DEVICE_IDENTIFIER_KEY.get();
|
||||
|
||||
if (accountIdentifier != null && deviceId != null) {
|
||||
return new AuthenticatedDevice(accountIdentifier, deviceId);
|
||||
|
||||
@@ -217,7 +217,7 @@ public class AccountController {
|
||||
@HeaderParam(HeaderUtils.X_SIGNAL_AGENT) String userAgent,
|
||||
@NotNull @Valid AccountAttributes attributes) {
|
||||
final Account account = disabledPermittedAuth.getAccount();
|
||||
final long deviceId = disabledPermittedAuth.getAuthenticatedDevice().getId();
|
||||
final byte deviceId = disabledPermittedAuth.getAuthenticatedDevice().getId();
|
||||
|
||||
final Account updatedAccount = accounts.update(account, a -> {
|
||||
a.getDevice(deviceId).ifPresent(d -> {
|
||||
|
||||
@@ -135,7 +135,7 @@ public class DeviceController {
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
@Path("/{device_id}")
|
||||
@ChangesDeviceEnabledState
|
||||
public void removeDevice(@Auth AuthenticatedAccount auth, @PathParam("device_id") long deviceId) {
|
||||
public void removeDevice(@Auth AuthenticatedAccount auth, @PathParam("device_id") byte deviceId) {
|
||||
Account account = auth.getAccount();
|
||||
if (auth.getAuthenticatedDevice().getId() != Device.PRIMARY_ID) {
|
||||
throw new WebApplicationException(Response.Status.UNAUTHORIZED);
|
||||
@@ -256,7 +256,7 @@ public class DeviceController {
|
||||
@Path("/capabilities")
|
||||
public void setCapabilities(@Auth AuthenticatedAccount auth, @NotNull @Valid DeviceCapabilities capabilities) {
|
||||
assert (auth.getAuthenticatedDevice() != null);
|
||||
final long deviceId = auth.getAuthenticatedDevice().getId();
|
||||
final byte deviceId = auth.getAuthenticatedDevice().getId();
|
||||
accounts.updateDevice(auth.getAccount(), deviceId, d -> d.setCapabilities(capabilities));
|
||||
}
|
||||
|
||||
|
||||
@@ -332,7 +332,7 @@ public class KeysController {
|
||||
return account.getDevices().stream().filter(Device::isEnabled).toList();
|
||||
}
|
||||
try {
|
||||
long id = Long.parseLong(deviceId);
|
||||
byte id = Byte.parseByte(deviceId);
|
||||
return account.getDevice(id).filter(Device::isEnabled).map(List::of).orElse(List.of());
|
||||
} catch (NumberFormatException e) {
|
||||
throw new WebApplicationException(Response.status(422).build());
|
||||
|
||||
@@ -283,7 +283,7 @@ public class MessageController {
|
||||
checkStoryRateLimit(destination.get(), userAgent);
|
||||
}
|
||||
|
||||
final Set<Long> excludedDeviceIds;
|
||||
final Set<Byte> excludedDeviceIds;
|
||||
|
||||
if (isSyncMessage) {
|
||||
excludedDeviceIds = Set.of(source.get().getAuthenticatedDevice().getId());
|
||||
@@ -346,7 +346,7 @@ public class MessageController {
|
||||
/**
|
||||
* Build mapping of accounts to devices/registration IDs.
|
||||
*/
|
||||
private Map<Account, Set<Pair<Long, Integer>>> buildDeviceIdAndRegistrationIdMap(
|
||||
private Map<Account, Set<Pair<Byte, Integer>>> buildDeviceIdAndRegistrationIdMap(
|
||||
MultiRecipientMessage multiRecipientMessage,
|
||||
Map<ServiceIdentifier, Account> accountsByServiceIdentifier) {
|
||||
|
||||
@@ -403,7 +403,7 @@ public class MessageController {
|
||||
checkAccessKeys(accessKeys, accountsByServiceIdentifier.values());
|
||||
}
|
||||
|
||||
final Map<Account, Set<Pair<Long, Integer>>> accountToDeviceIdAndRegistrationIdMap =
|
||||
final Map<Account, Set<Pair<Byte, Integer>>> accountToDeviceIdAndRegistrationIdMap =
|
||||
buildDeviceIdAndRegistrationIdMap(multiRecipientMessage, accountsByServiceIdentifier);
|
||||
|
||||
// We might filter out all the recipients of a story (if none have enabled stories).
|
||||
@@ -420,7 +420,7 @@ public class MessageController {
|
||||
checkStoryRateLimit(account, userAgent);
|
||||
}
|
||||
|
||||
Set<Long> deviceIds = accountToDeviceIdAndRegistrationIdMap
|
||||
Set<Byte> deviceIds = accountToDeviceIdAndRegistrationIdMap
|
||||
.getOrDefault(account, Collections.emptySet())
|
||||
.stream()
|
||||
.map(Pair::first)
|
||||
@@ -678,7 +678,7 @@ public class MessageController {
|
||||
|
||||
try {
|
||||
Account sourceAccount = source.map(AuthenticatedAccount::getAccount).orElse(null);
|
||||
Long sourceDeviceId = source.map(account -> account.getAuthenticatedDevice().getId()).orElse(null);
|
||||
Byte sourceDeviceId = source.map(account -> account.getAuthenticatedDevice().getId()).orElse(null);
|
||||
envelope = incomingMessage.toEnvelope(
|
||||
destinationIdentifier,
|
||||
sourceAccount,
|
||||
|
||||
@@ -9,19 +9,19 @@ import java.util.List;
|
||||
|
||||
public class MismatchedDevicesException extends Exception {
|
||||
|
||||
private final List<Long> missingDevices;
|
||||
private final List<Long> extraDevices;
|
||||
private final List<Byte> missingDevices;
|
||||
private final List<Byte> extraDevices;
|
||||
|
||||
public MismatchedDevicesException(List<Long> missingDevices, List<Long> extraDevices) {
|
||||
public MismatchedDevicesException(List<Byte> missingDevices, List<Byte> extraDevices) {
|
||||
this.missingDevices = missingDevices;
|
||||
this.extraDevices = extraDevices;
|
||||
}
|
||||
|
||||
public List<Long> getMissingDevices() {
|
||||
public List<Byte> getMissingDevices() {
|
||||
return missingDevices;
|
||||
}
|
||||
|
||||
public List<Long> getExtraDevices() {
|
||||
public List<Byte> getExtraDevices() {
|
||||
return extraDevices;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,7 +47,7 @@ public class ProvisioningController {
|
||||
|
||||
rateLimiters.getMessagesLimiter().validate(auth.getAccount().getUuid());
|
||||
|
||||
if (!provisioningManager.sendProvisioningMessage(new ProvisioningAddress(destinationName, 0),
|
||||
if (!provisioningManager.sendProvisioningMessage(new ProvisioningAddress(destinationName, (byte) 0),
|
||||
Base64.getMimeDecoder().decode(message.body()))) {
|
||||
throw new WebApplicationException(Response.Status.NOT_FOUND);
|
||||
}
|
||||
|
||||
@@ -9,13 +9,14 @@ import java.util.List;
|
||||
|
||||
|
||||
public class StaleDevicesException extends Exception {
|
||||
private final List<Long> staleDevices;
|
||||
|
||||
public StaleDevicesException(List<Long> staleDevices) {
|
||||
private final List<Byte> staleDevices;
|
||||
|
||||
public StaleDevicesException(List<Byte> staleDevices) {
|
||||
this.staleDevices = staleDevices;
|
||||
}
|
||||
|
||||
public List<Long> getStaleDevices() {
|
||||
public List<Byte> getStaleDevices() {
|
||||
return staleDevices;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,7 +98,7 @@ public record AccountDataReportResponse(UUID reportId,
|
||||
|
||||
}
|
||||
|
||||
public record DeviceDataReport(long id,
|
||||
public record DeviceDataReport(byte id,
|
||||
@JsonFormat(pattern = DATE_FORMAT, timezone = UTC)
|
||||
Instant lastSeen,
|
||||
@JsonFormat(pattern = DATE_FORMAT, timezone = UTC)
|
||||
|
||||
@@ -54,7 +54,7 @@ public record ChangeNumberRequest(
|
||||
@Schema(description="""
|
||||
A new signed elliptic-curve prekey for each enabled device on the account, including this one.
|
||||
Each must be accompanied by a valid signature from the new identity key in this request.""")
|
||||
@NotNull @Valid Map<Long, @NotNull @Valid ECSignedPreKey> devicePniSignedPrekeys,
|
||||
@NotNull @Valid Map<Byte, @NotNull @Valid ECSignedPreKey> devicePniSignedPrekeys,
|
||||
|
||||
@Schema(description="""
|
||||
A new signed post-quantum last-resort prekey for each enabled device on the account, including this one.
|
||||
@@ -62,10 +62,10 @@ public record ChangeNumberRequest(
|
||||
If present, must contain one prekey per enabled device including this one.
|
||||
Prekeys for devices that did not previously have any post-quantum prekeys stored will be silently dropped.
|
||||
Each must be accompanied by a valid signature from the new identity key in this request.""")
|
||||
@Valid Map<Long, @NotNull @Valid KEMSignedPreKey> devicePniPqLastResortPrekeys,
|
||||
@Valid Map<Byte, @NotNull @Valid KEMSignedPreKey> devicePniPqLastResortPrekeys,
|
||||
|
||||
@Schema(description="the new phone-number-identity registration ID for each enabled device on the account, including this one")
|
||||
@NotNull Map<Long, Integer> pniRegistrationIds) implements PhoneVerificationRequest {
|
||||
@NotNull Map<Byte, Integer> pniRegistrationIds) implements PhoneVerificationRequest {
|
||||
|
||||
@AssertTrue
|
||||
public boolean isSignatureValidOnEachSignedPreKey() {
|
||||
|
||||
@@ -18,12 +18,12 @@ public class DeviceResponse {
|
||||
private UUID pni;
|
||||
|
||||
@JsonProperty
|
||||
private long deviceId;
|
||||
private byte deviceId;
|
||||
|
||||
@VisibleForTesting
|
||||
public DeviceResponse() {}
|
||||
|
||||
public DeviceResponse(UUID uuid, UUID pni, long deviceId) {
|
||||
public DeviceResponse(UUID uuid, UUID pni, byte deviceId) {
|
||||
this.uuid = uuid;
|
||||
this.pni = pni;
|
||||
this.deviceId = deviceId;
|
||||
@@ -37,7 +37,7 @@ public class DeviceResponse {
|
||||
return pni;
|
||||
}
|
||||
|
||||
public long getDeviceId() {
|
||||
public byte getDeviceId() {
|
||||
return deviceId;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,11 +12,11 @@ import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier;
|
||||
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
|
||||
public record IncomingMessage(int type, long destinationDeviceId, int destinationRegistrationId, String content) {
|
||||
public record IncomingMessage(int type, byte destinationDeviceId, int destinationRegistrationId, String content) {
|
||||
|
||||
public MessageProtos.Envelope toEnvelope(final ServiceIdentifier destinationIdentifier,
|
||||
@Nullable Account sourceAccount,
|
||||
@Nullable Long sourceDeviceId,
|
||||
@Nullable Byte sourceDeviceId,
|
||||
final long timestamp,
|
||||
final boolean story,
|
||||
final boolean urgent,
|
||||
|
||||
@@ -12,9 +12,9 @@ import java.util.List;
|
||||
|
||||
public record MismatchedDevices(@JsonProperty
|
||||
@Schema(description = "Devices present on the account but absent in the request")
|
||||
List<Long> missingDevices,
|
||||
List<Byte> missingDevices,
|
||||
|
||||
@JsonProperty
|
||||
@Schema(description = "Devices absent on the request but present in the account")
|
||||
List<Long> extraDevices) {
|
||||
List<Byte> extraDevices) {
|
||||
}
|
||||
|
||||
@@ -40,7 +40,7 @@ public record MultiRecipientMessage(
|
||||
@JsonSerialize(using = ServiceIdentifierAdapter.ServiceIdentifierSerializer.class)
|
||||
@JsonDeserialize(using = ServiceIdentifierAdapter.ServiceIdentifierDeserializer.class)
|
||||
ServiceIdentifier uuid,
|
||||
@Min(1) long deviceId,
|
||||
@Min(1) byte deviceId,
|
||||
@Min(0) @Max(65535) int registrationId,
|
||||
@Size(min = 48, max = 48) @NotNull byte[] perRecipientKeyMaterial) {
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ public record PhoneNumberIdentityKeyDistributionRequest(
|
||||
@JsonDeserialize(using = IdentityKeyAdapter.Deserializer.class)
|
||||
@Schema(description="the new identity key for this account's phone-number identity")
|
||||
IdentityKey pniIdentityKey,
|
||||
|
||||
|
||||
@NotNull
|
||||
@Valid
|
||||
@ArraySchema(
|
||||
@@ -32,26 +32,26 @@ public record PhoneNumberIdentityKeyDistributionRequest(
|
||||
Exactly one message must be supplied for each enabled device other than the sending (primary) device.
|
||||
"""))
|
||||
List<@NotNull @Valid IncomingMessage> deviceMessages,
|
||||
|
||||
|
||||
@NotNull
|
||||
@Valid
|
||||
@Schema(description="""
|
||||
A new signed elliptic-curve prekey for each enabled device on the account, including this one.
|
||||
Each must be accompanied by a valid signature from the new identity key in this request.""")
|
||||
Map<Long, @NotNull @Valid ECSignedPreKey> devicePniSignedPrekeys,
|
||||
|
||||
Map<Byte, @NotNull @Valid ECSignedPreKey> devicePniSignedPrekeys,
|
||||
|
||||
@Schema(description="""
|
||||
A new signed post-quantum last-resort prekey for each enabled device on the account, including this one.
|
||||
May be absent, in which case the last resort PQ prekeys for each device will be deleted if any had been stored.
|
||||
If present, must contain one prekey per enabled device including this one.
|
||||
Prekeys for devices that did not previously have any post-quantum prekeys stored will be silently dropped.
|
||||
Each must be accompanied by a valid signature from the new identity key in this request.""")
|
||||
@Valid Map<Long, @NotNull @Valid KEMSignedPreKey> devicePniPqLastResortPrekeys,
|
||||
@Valid Map<Byte, @NotNull @Valid KEMSignedPreKey> devicePniPqLastResortPrekeys,
|
||||
|
||||
@NotNull
|
||||
@Valid
|
||||
@Schema(description="The new registration ID to use for the phone-number identity of each device, including this one.")
|
||||
Map<Long, Integer> pniRegistrationIds) {
|
||||
Map<Byte, Integer> pniRegistrationIds) {
|
||||
|
||||
@AssertTrue
|
||||
public boolean isSignatureValidOnEachSignedPreKey() {
|
||||
|
||||
@@ -40,7 +40,7 @@ public class PreKeyResponse {
|
||||
|
||||
@VisibleForTesting
|
||||
@JsonIgnore
|
||||
public PreKeyResponseItem getDevice(int deviceId) {
|
||||
public PreKeyResponseItem getDevice(byte deviceId) {
|
||||
for (PreKeyResponseItem device : devices) {
|
||||
if (device.getDeviceId() == deviceId) return device;
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ public class PreKeyResponseItem {
|
||||
|
||||
@JsonProperty
|
||||
@Schema(description="the device ID of the device to which this item pertains")
|
||||
private long deviceId;
|
||||
private byte deviceId;
|
||||
|
||||
@JsonProperty
|
||||
@Schema(description="the registration ID for the device")
|
||||
@@ -33,7 +33,8 @@ public class PreKeyResponseItem {
|
||||
|
||||
public PreKeyResponseItem() {}
|
||||
|
||||
public PreKeyResponseItem(long deviceId, int registrationId, ECSignedPreKey signedPreKey, ECPreKey preKey, KEMSignedPreKey pqPreKey) {
|
||||
public PreKeyResponseItem(byte deviceId, int registrationId, ECSignedPreKey signedPreKey, ECPreKey preKey,
|
||||
KEMSignedPreKey pqPreKey) {
|
||||
this.deviceId = deviceId;
|
||||
this.registrationId = registrationId;
|
||||
this.signedPreKey = signedPreKey;
|
||||
@@ -62,7 +63,7 @@ public class PreKeyResponseItem {
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getDeviceId() {
|
||||
public byte getDeviceId() {
|
||||
return deviceId;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,5 +12,5 @@ import java.util.List;
|
||||
|
||||
public record StaleDevices(@JsonProperty
|
||||
@Schema(description = "Devices that are no longer active")
|
||||
List<Long> staleDevices) {
|
||||
List<Byte> staleDevices) {
|
||||
}
|
||||
|
||||
@@ -1,51 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013-2020 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.entities;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import javax.validation.constraints.Min;
|
||||
import javax.validation.constraints.NotEmpty;
|
||||
|
||||
public class UnregisteredEvent {
|
||||
|
||||
@JsonProperty
|
||||
@NotEmpty
|
||||
private String registrationId;
|
||||
|
||||
@JsonProperty
|
||||
private String canonicalId;
|
||||
|
||||
@JsonProperty
|
||||
@NotEmpty
|
||||
private String number;
|
||||
|
||||
@JsonProperty
|
||||
@Min(1)
|
||||
private int deviceId;
|
||||
|
||||
@JsonProperty
|
||||
private long timestamp;
|
||||
|
||||
public String getRegistrationId() {
|
||||
return registrationId;
|
||||
}
|
||||
|
||||
public String getCanonicalId() {
|
||||
return canonicalId;
|
||||
}
|
||||
|
||||
public String getNumber() {
|
||||
return number;
|
||||
}
|
||||
|
||||
public int getDeviceId() {
|
||||
return deviceId;
|
||||
}
|
||||
|
||||
public long getTimestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
}
|
||||
@@ -1,22 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013-2020 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.entities;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
public class UnregisteredEventList {
|
||||
|
||||
@JsonProperty
|
||||
private List<UnregisteredEvent> devices;
|
||||
|
||||
public List<UnregisteredEvent> getDevices() {
|
||||
if (devices == null) return new LinkedList<>();
|
||||
else return devices;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
/*
|
||||
* Copyright 2023 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.grpc;
|
||||
|
||||
import io.grpc.Status;
|
||||
|
||||
public class DeviceIdUtil {
|
||||
|
||||
static byte validate(int deviceId) {
|
||||
if (deviceId > Byte.MAX_VALUE) {
|
||||
throw Status.INVALID_ARGUMENT.withDescription("Device ID is out of range").asRuntimeException();
|
||||
}
|
||||
return (byte) deviceId;
|
||||
}
|
||||
}
|
||||
@@ -78,18 +78,19 @@ public class DevicesGrpcService extends ReactorDevicesGrpc.DevicesImplBase {
|
||||
if (request.getId() == Device.PRIMARY_ID) {
|
||||
throw Status.INVALID_ARGUMENT.withDescription("Cannot remove primary device").asRuntimeException();
|
||||
}
|
||||
final byte deviceId = DeviceIdUtil.validate(request.getId());
|
||||
|
||||
final AuthenticatedDevice authenticatedDevice = AuthenticationUtil.requireAuthenticatedPrimaryDevice();
|
||||
|
||||
return Mono.fromFuture(() -> accountsManager.getByAccountIdentifierAsync(authenticatedDevice.accountIdentifier()))
|
||||
.map(maybeAccount -> maybeAccount.orElseThrow(Status.UNAUTHENTICATED::asRuntimeException))
|
||||
.flatMap(account -> Flux.merge(
|
||||
Mono.fromFuture(() -> messagesManager.clear(account.getUuid(), request.getId())),
|
||||
Mono.fromFuture(() -> keysManager.delete(account.getUuid(), request.getId())))
|
||||
.then(Mono.fromFuture(() -> accountsManager.updateAsync(account, a -> a.removeDevice(request.getId()))))
|
||||
Mono.fromFuture(() -> messagesManager.clear(account.getUuid(), deviceId)),
|
||||
Mono.fromFuture(() -> keysManager.delete(account.getUuid(), deviceId)))
|
||||
.then(Mono.fromFuture(() -> accountsManager.updateAsync(account, a -> a.removeDevice(deviceId))))
|
||||
// Some messages may have arrived while we were performing the other updates; make a best effort to clear
|
||||
// those out, too
|
||||
.then(Mono.fromFuture(() -> messagesManager.clear(account.getUuid(), request.getId()))))
|
||||
.then(Mono.fromFuture(() -> messagesManager.clear(account.getUuid(), deviceId))))
|
||||
.thenReturn(RemoveDeviceResponse.newBuilder().build());
|
||||
}
|
||||
|
||||
|
||||
@@ -39,12 +39,14 @@ public class KeysAnonymousGrpcService extends ReactorKeysAnonymousGrpc.KeysAnony
|
||||
final ServiceIdentifier serviceIdentifier =
|
||||
ServiceIdentifierUtil.fromGrpcServiceIdentifier(request.getRequest().getTargetIdentifier());
|
||||
|
||||
final byte deviceId = DeviceIdUtil.validate(request.getRequest().getDeviceId());
|
||||
|
||||
return Mono.fromFuture(() -> accountsManager.getByServiceIdentifierAsync(serviceIdentifier))
|
||||
.flatMap(Mono::justOrEmpty)
|
||||
.switchIfEmpty(Mono.error(Status.UNAUTHENTICATED.asException()))
|
||||
.flatMap(targetAccount ->
|
||||
UnidentifiedAccessUtil.checkUnidentifiedAccess(targetAccount, request.getUnidentifiedAccessKey().toByteArray())
|
||||
? KeysGrpcHelper.getPreKeys(targetAccount, serviceIdentifier.identityType(), request.getRequest().getDeviceId(), keysManager)
|
||||
? KeysGrpcHelper.getPreKeys(targetAccount, serviceIdentifier.identityType(), deviceId, keysManager)
|
||||
: Mono.error(Status.UNAUTHENTICATED.asException()));
|
||||
}
|
||||
|
||||
|
||||
@@ -27,11 +27,11 @@ import reactor.util.function.Tuples;
|
||||
class KeysGrpcHelper {
|
||||
|
||||
@VisibleForTesting
|
||||
static final long ALL_DEVICES = 0;
|
||||
static final byte ALL_DEVICES = 0;
|
||||
|
||||
static Mono<GetPreKeysResponse> getPreKeys(final Account targetAccount,
|
||||
final IdentityType identityType,
|
||||
final long targetDeviceId,
|
||||
final byte targetDeviceId,
|
||||
final KeysManager keysManager) {
|
||||
|
||||
final Flux<Device> devices = targetDeviceId == ALL_DEVICES
|
||||
@@ -73,7 +73,8 @@ class KeysGrpcHelper {
|
||||
|
||||
return builder;
|
||||
})
|
||||
.map(builder -> Tuples.of(device.getId(), builder.build()));
|
||||
// Cast device IDs to `int` to match data types in the response object’s protobuf definition
|
||||
.map(builder -> Tuples.of((int) device.getId(), builder.build()));
|
||||
})
|
||||
.collectMap(Tuple2::getT1, Tuple2::getT2)
|
||||
.map(preKeyBundles -> GetPreKeysResponse.newBuilder()
|
||||
|
||||
@@ -124,17 +124,19 @@ public class KeysGrpcService extends ReactorKeysGrpc.KeysImplBase {
|
||||
final ServiceIdentifier targetIdentifier =
|
||||
ServiceIdentifierUtil.fromGrpcServiceIdentifier(request.getTargetIdentifier());
|
||||
|
||||
final byte deviceId = DeviceIdUtil.validate(request.getDeviceId());
|
||||
|
||||
final String rateLimitKey = authenticatedDevice.accountIdentifier() + "." +
|
||||
authenticatedDevice.deviceId() + "__" +
|
||||
targetIdentifier.uuid() + "." +
|
||||
request.getDeviceId();
|
||||
deviceId;
|
||||
|
||||
return rateLimiters.getPreKeysLimiter().validateReactive(rateLimitKey)
|
||||
.then(Mono.fromFuture(() -> accountsManager.getByServiceIdentifierAsync(targetIdentifier))
|
||||
.flatMap(Mono::justOrEmpty))
|
||||
.switchIfEmpty(Mono.error(Status.NOT_FOUND.asException()))
|
||||
.flatMap(targetAccount ->
|
||||
KeysGrpcHelper.getPreKeys(targetAccount, targetIdentifier.identityType(), request.getDeviceId(), keysManager));
|
||||
KeysGrpcHelper.getPreKeys(targetAccount, targetIdentifier.identityType(), deviceId, keysManager));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -83,7 +83,14 @@ public class MultiRecipientMessageProvider implements MessageBodyReader<MultiRec
|
||||
MultiRecipientMessage.Recipient[] recipients = new MultiRecipientMessage.Recipient[Math.toIntExact(count)];
|
||||
for (int i = 0; i < Math.toIntExact(count); i++) {
|
||||
ServiceIdentifier identifier = readIdentifier(entityStream, version);
|
||||
long deviceId = readVarint(entityStream);
|
||||
final byte deviceId;
|
||||
{
|
||||
long deviceIdLong = readVarint(entityStream);
|
||||
if (deviceIdLong > Byte.MAX_VALUE) {
|
||||
throw new BadRequestException("Invalid device ID");
|
||||
}
|
||||
deviceId = (byte) deviceIdLong;
|
||||
}
|
||||
int registrationId = readU16(entityStream);
|
||||
byte[] perRecipientKeyMaterial = entityStream.readNBytes(48);
|
||||
if (perRecipientKeyMaterial.length != 48) {
|
||||
|
||||
@@ -300,7 +300,7 @@ public class ApnPushNotificationScheduler implements Managed {
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static Optional<Pair<String, Long>> getSeparated(String encoded) {
|
||||
static Optional<Pair<String, Byte>> getSeparated(String encoded) {
|
||||
try {
|
||||
if (encoded == null) return Optional.empty();
|
||||
|
||||
@@ -311,7 +311,7 @@ public class ApnPushNotificationScheduler implements Managed {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
return Optional.of(new Pair<>(parts[0], Long.parseLong(parts[1])));
|
||||
return Optional.of(new Pair<>(parts[0], Byte.parseByte(parts[1])));
|
||||
} catch (NumberFormatException e) {
|
||||
logger.warn("Badly formatted: " + encoded, e);
|
||||
return Optional.empty();
|
||||
@@ -338,7 +338,7 @@ public class ApnPushNotificationScheduler implements Managed {
|
||||
|
||||
final Optional<Account> maybeAccount = accountsManager.getByAccountIdentifier(UUID.fromString(parts[0]));
|
||||
|
||||
return maybeAccount.flatMap(account -> account.getDevice(Long.parseLong(parts[1])))
|
||||
return maybeAccount.flatMap(account -> account.getDevice(Byte.parseByte(parts[1])))
|
||||
.map(device -> new Pair<>(maybeAccount.get(), device));
|
||||
|
||||
} catch (final NumberFormatException e) {
|
||||
|
||||
@@ -21,6 +21,7 @@ import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
|
||||
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
|
||||
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter;
|
||||
import io.micrometer.core.instrument.Counter;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
@@ -34,7 +35,6 @@ import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
|
||||
@@ -162,7 +162,8 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
|
||||
connection -> connection.sync().upstream().commands().unsubscribe(getManagerPresenceChannel(managerId)));
|
||||
}
|
||||
|
||||
public void setPresent(final UUID accountUuid, final long deviceId, final DisplacedPresenceListener displacementListener) {
|
||||
public void setPresent(final UUID accountUuid, final byte deviceId,
|
||||
final DisplacedPresenceListener displacementListener) {
|
||||
|
||||
try (final Timer.Context ignored = setPresenceTimer.time()) {
|
||||
final String presenceKey = getPresenceKey(accountUuid, deviceId);
|
||||
@@ -182,12 +183,12 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
|
||||
}
|
||||
}
|
||||
|
||||
public void renewPresence(final UUID accountUuid, final long deviceId) {
|
||||
public void renewPresence(final UUID accountUuid, final byte deviceId) {
|
||||
renewPresenceScript.execute(List.of(getPresenceKey(accountUuid, deviceId)),
|
||||
List.of(managerId, String.valueOf(PRESENCE_EXPIRATION_SECONDS)));
|
||||
}
|
||||
|
||||
public void disconnectAllPresences(final UUID accountUuid, final List<Long> deviceIds) {
|
||||
public void disconnectAllPresences(final UUID accountUuid, final List<Byte> deviceIds) {
|
||||
|
||||
List<String> presenceKeys = new ArrayList<>();
|
||||
deviceIds.forEach(deviceId -> {
|
||||
@@ -208,7 +209,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
|
||||
disconnectAllPresences(accountUuid, Device.ALL_POSSIBLE_DEVICE_IDS);
|
||||
}
|
||||
|
||||
public void disconnectPresence(final UUID accountUuid, final long deviceId) {
|
||||
public void disconnectPresence(final UUID accountUuid, final byte deviceId) {
|
||||
disconnectAllPresences(accountUuid, List.of(deviceId));
|
||||
}
|
||||
|
||||
@@ -222,18 +223,18 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
|
||||
clearPresence(presenceKey);
|
||||
}
|
||||
|
||||
public boolean isPresent(final UUID accountUuid, final long deviceId) {
|
||||
public boolean isPresent(final UUID accountUuid, final byte deviceId) {
|
||||
try (final Timer.Context ignored = checkPresenceTimer.time()) {
|
||||
return presenceCluster.withCluster(connection ->
|
||||
connection.sync().exists(getPresenceKey(accountUuid, deviceId))) == 1;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isLocallyPresent(final UUID accountUuid, final long deviceId) {
|
||||
public boolean isLocallyPresent(final UUID accountUuid, final byte deviceId) {
|
||||
return displacementListenersByPresenceKey.containsKey(getPresenceKey(accountUuid, deviceId));
|
||||
}
|
||||
|
||||
public boolean clearPresence(final UUID accountUuid, final long deviceId, final DisplacedPresenceListener listener) {
|
||||
public boolean clearPresence(final UUID accountUuid, final byte deviceId, final DisplacedPresenceListener listener) {
|
||||
final String presenceKey = getPresenceKey(accountUuid, deviceId);
|
||||
if (displacementListenersByPresenceKey.remove(presenceKey, listener)) {
|
||||
return clearPresence(presenceKey);
|
||||
@@ -337,7 +338,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static String getPresenceKey(final UUID accountUuid, final long deviceId) {
|
||||
static String getPresenceKey(final UUID accountUuid, final byte deviceId) {
|
||||
return "presence::{" + accountUuid.toString() + "::" + deviceId + "}";
|
||||
}
|
||||
|
||||
|
||||
@@ -74,7 +74,7 @@ public class PushLatencyManager {
|
||||
this.clock = clock;
|
||||
}
|
||||
|
||||
void recordPushSent(final UUID accountUuid, final long deviceId, final boolean isVoip, final boolean isUrgent) {
|
||||
void recordPushSent(final UUID accountUuid, final byte deviceId, final boolean isVoip, final boolean isUrgent) {
|
||||
try {
|
||||
final String recordJson = SystemMapper.jsonMapper().writeValueAsString(
|
||||
new PushRecord(Instant.now(clock), isVoip ? PushType.VOIP : PushType.STANDARD, Optional.of(isUrgent)));
|
||||
@@ -89,7 +89,7 @@ public class PushLatencyManager {
|
||||
}
|
||||
}
|
||||
|
||||
void recordQueueRead(final UUID accountUuid, final long deviceId, final String userAgentString) {
|
||||
void recordQueueRead(final UUID accountUuid, final byte deviceId, final String userAgentString) {
|
||||
takePushRecord(accountUuid, deviceId).thenAccept(pushRecord -> {
|
||||
if (pushRecord != null) {
|
||||
final Duration latency = Duration.between(pushRecord.timestamp(), Instant.now());
|
||||
@@ -114,7 +114,7 @@ public class PushLatencyManager {
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
CompletableFuture<PushRecord> takePushRecord(final UUID accountUuid, final long deviceId) {
|
||||
CompletableFuture<PushRecord> takePushRecord(final UUID accountUuid, final byte deviceId) {
|
||||
final String key = getFirstUnacknowledgedPushKey(accountUuid, deviceId);
|
||||
|
||||
return redisCluster.withCluster(connection -> {
|
||||
@@ -141,7 +141,7 @@ public class PushLatencyManager {
|
||||
});
|
||||
}
|
||||
|
||||
private static String getFirstUnacknowledgedPushKey(final UUID accountUuid, final long deviceId) {
|
||||
private static String getFirstUnacknowledgedPushKey(final UUID accountUuid, final byte deviceId) {
|
||||
return "push_latency::v2::" + accountUuid.toString() + "::" + deviceId;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,7 +47,7 @@ public class PushNotificationManager {
|
||||
this.pushLatencyManager = pushLatencyManager;
|
||||
}
|
||||
|
||||
public void sendNewMessageNotification(final Account destination, final long destinationDeviceId, final boolean urgent) throws NotPushRegisteredException {
|
||||
public void sendNewMessageNotification(final Account destination, final byte destinationDeviceId, final boolean urgent) throws NotPushRegisteredException {
|
||||
final Device device = destination.getDevice(destinationDeviceId).orElseThrow(NotPushRegisteredException::new);
|
||||
final Pair<String, PushNotification.TokenType> tokenAndType = getToken(device);
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@ public class ReceiptSender {
|
||||
;
|
||||
}
|
||||
|
||||
public void sendReceipt(ServiceIdentifier sourceIdentifier, long sourceDeviceId, AciServiceIdentifier destinationIdentifier, long messageId) {
|
||||
public void sendReceipt(ServiceIdentifier sourceIdentifier, byte sourceDeviceId, AciServiceIdentifier destinationIdentifier, long messageId) {
|
||||
if (sourceIdentifier.equals(destinationIdentifier)) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -223,7 +223,7 @@ public class Account {
|
||||
this.devices.add(device);
|
||||
}
|
||||
|
||||
public void removeDevice(final long deviceId) {
|
||||
public void removeDevice(final byte deviceId) {
|
||||
requireNotStale();
|
||||
|
||||
this.devices.removeIf(device -> device.getId() == deviceId);
|
||||
@@ -241,7 +241,7 @@ public class Account {
|
||||
return getDevice(Device.PRIMARY_ID);
|
||||
}
|
||||
|
||||
public Optional<Device> getDevice(final long deviceId) {
|
||||
public Optional<Device> getDevice(final byte deviceId) {
|
||||
requireNotStale();
|
||||
|
||||
return devices.stream().filter(device -> device.getId() == deviceId).findFirst();
|
||||
@@ -281,15 +281,19 @@ public class Account {
|
||||
return getPrimaryDevice().map(Device::isEnabled).orElse(false);
|
||||
}
|
||||
|
||||
public long getNextDeviceId() {
|
||||
public byte getNextDeviceId() {
|
||||
requireNotStale();
|
||||
|
||||
long candidateId = Device.PRIMARY_ID + 1;
|
||||
byte candidateId = Device.PRIMARY_ID + 1;
|
||||
|
||||
while (getDevice(candidateId).isPresent()) {
|
||||
candidateId++;
|
||||
}
|
||||
|
||||
if (candidateId <= Device.PRIMARY_ID) {
|
||||
throw new RuntimeException("device ID overflow");
|
||||
}
|
||||
|
||||
return candidateId;
|
||||
}
|
||||
|
||||
|
||||
@@ -268,9 +268,9 @@ public class AccountsManager {
|
||||
public Account changeNumber(final Account account,
|
||||
final String targetNumber,
|
||||
@Nullable final IdentityKey pniIdentityKey,
|
||||
@Nullable final Map<Long, ECSignedPreKey> pniSignedPreKeys,
|
||||
@Nullable final Map<Long, KEMSignedPreKey> pniPqLastResortPreKeys,
|
||||
@Nullable final Map<Long, Integer> pniRegistrationIds) throws InterruptedException, MismatchedDevicesException {
|
||||
@Nullable final Map<Byte, ECSignedPreKey> pniSignedPreKeys,
|
||||
@Nullable final Map<Byte, KEMSignedPreKey> pniPqLastResortPreKeys,
|
||||
@Nullable final Map<Byte, Integer> pniRegistrationIds) throws InterruptedException, MismatchedDevicesException {
|
||||
|
||||
final String originalNumber = account.getNumber();
|
||||
final UUID originalPhoneNumberIdentifier = account.getPhoneNumberIdentifier();
|
||||
@@ -369,9 +369,9 @@ public class AccountsManager {
|
||||
|
||||
public Account updatePniKeys(final Account account,
|
||||
final IdentityKey pniIdentityKey,
|
||||
final Map<Long, ECSignedPreKey> pniSignedPreKeys,
|
||||
@Nullable final Map<Long, KEMSignedPreKey> pniPqLastResortPreKeys,
|
||||
final Map<Long, Integer> pniRegistrationIds) throws MismatchedDevicesException {
|
||||
final Map<Byte, ECSignedPreKey> pniSignedPreKeys,
|
||||
@Nullable final Map<Byte, KEMSignedPreKey> pniPqLastResortPreKeys,
|
||||
final Map<Byte, Integer> pniRegistrationIds) throws MismatchedDevicesException {
|
||||
validateDevices(account, pniSignedPreKeys, pniPqLastResortPreKeys, pniRegistrationIds);
|
||||
|
||||
final UUID pni = account.getPhoneNumberIdentifier();
|
||||
@@ -395,8 +395,8 @@ public class AccountsManager {
|
||||
|
||||
private boolean setPniKeys(final Account account,
|
||||
@Nullable final IdentityKey pniIdentityKey,
|
||||
@Nullable final Map<Long, ECSignedPreKey> pniSignedPreKeys,
|
||||
@Nullable final Map<Long, Integer> pniRegistrationIds) {
|
||||
@Nullable final Map<Byte, ECSignedPreKey> pniSignedPreKeys,
|
||||
@Nullable final Map<Byte, Integer> pniRegistrationIds) {
|
||||
if (ObjectUtils.allNull(pniIdentityKey, pniSignedPreKeys, pniRegistrationIds)) {
|
||||
return false;
|
||||
} else if (!ObjectUtils.allNotNull(pniIdentityKey, pniSignedPreKeys, pniRegistrationIds)) {
|
||||
@@ -424,9 +424,9 @@ public class AccountsManager {
|
||||
}
|
||||
|
||||
private void validateDevices(final Account account,
|
||||
@Nullable final Map<Long, ECSignedPreKey> pniSignedPreKeys,
|
||||
@Nullable final Map<Long, KEMSignedPreKey> pniPqLastResortPreKeys,
|
||||
@Nullable final Map<Long, Integer> pniRegistrationIds) throws MismatchedDevicesException {
|
||||
@Nullable final Map<Byte, ECSignedPreKey> pniSignedPreKeys,
|
||||
@Nullable final Map<Byte, KEMSignedPreKey> pniPqLastResortPreKeys,
|
||||
@Nullable final Map<Byte, Integer> pniRegistrationIds) throws MismatchedDevicesException {
|
||||
if (pniSignedPreKeys == null && pniRegistrationIds == null) {
|
||||
return;
|
||||
} else if (pniSignedPreKeys == null || pniRegistrationIds == null) {
|
||||
@@ -580,7 +580,7 @@ public class AccountsManager {
|
||||
}
|
||||
|
||||
/**
|
||||
* Specialized version of {@link #updateDevice(Account, long, Consumer)} that minimizes potentially contentious and
|
||||
* Specialized version of {@link #updateDevice(Account, byte, Consumer)} that minimizes potentially contentious and
|
||||
* redundant updates of {@code device.lastSeen}
|
||||
*/
|
||||
public Account updateDeviceLastSeen(Account account, Device device, final long lastSeen) {
|
||||
@@ -741,7 +741,7 @@ public class AccountsManager {
|
||||
return CompletableFuture.failedFuture(new OptimisticLockRetryLimitExceededException());
|
||||
}
|
||||
|
||||
public Account updateDevice(Account account, long deviceId, Consumer<Device> deviceUpdater) {
|
||||
public Account updateDevice(Account account, byte deviceId, Consumer<Device> deviceUpdater) {
|
||||
return update(account, a -> {
|
||||
a.getDevice(deviceId).ifPresent(deviceUpdater);
|
||||
// assume that all updaters passed to the public method actually modify the device
|
||||
@@ -749,7 +749,8 @@ public class AccountsManager {
|
||||
});
|
||||
}
|
||||
|
||||
public CompletableFuture<Account> updateDeviceAsync(final Account account, final long deviceId, final Consumer<Device> deviceUpdater) {
|
||||
public CompletableFuture<Account> updateDeviceAsync(final Account account, final byte deviceId,
|
||||
final Consumer<Device> deviceUpdater) {
|
||||
return updateAsync(account, a -> {
|
||||
a.getDevice(deviceId).ifPresent(deviceUpdater);
|
||||
// assume that all updaters passed to the public method actually modify the device
|
||||
|
||||
@@ -43,10 +43,10 @@ public class ChangeNumberManager {
|
||||
|
||||
public Account changeNumber(final Account account, final String number,
|
||||
@Nullable final IdentityKey pniIdentityKey,
|
||||
@Nullable final Map<Long, ECSignedPreKey> deviceSignedPreKeys,
|
||||
@Nullable final Map<Long, KEMSignedPreKey> devicePqLastResortPreKeys,
|
||||
@Nullable final Map<Byte, ECSignedPreKey> deviceSignedPreKeys,
|
||||
@Nullable final Map<Byte, KEMSignedPreKey> devicePqLastResortPreKeys,
|
||||
@Nullable final List<IncomingMessage> deviceMessages,
|
||||
@Nullable final Map<Long, Integer> pniRegistrationIds)
|
||||
@Nullable final Map<Byte, Integer> pniRegistrationIds)
|
||||
throws InterruptedException, MismatchedDevicesException, StaleDevicesException {
|
||||
|
||||
if (ObjectUtils.allNotNull(pniIdentityKey, deviceSignedPreKeys, deviceMessages, pniRegistrationIds)) {
|
||||
@@ -83,10 +83,10 @@ public class ChangeNumberManager {
|
||||
|
||||
public Account updatePniKeys(final Account account,
|
||||
final IdentityKey pniIdentityKey,
|
||||
final Map<Long, ECSignedPreKey> deviceSignedPreKeys,
|
||||
@Nullable final Map<Long, KEMSignedPreKey> devicePqLastResortPreKeys,
|
||||
final Map<Byte, ECSignedPreKey> deviceSignedPreKeys,
|
||||
@Nullable final Map<Byte, KEMSignedPreKey> devicePqLastResortPreKeys,
|
||||
final List<IncomingMessage> deviceMessages,
|
||||
final Map<Long, Integer> pniRegistrationIds) throws MismatchedDevicesException, StaleDevicesException {
|
||||
final Map<Byte, Integer> pniRegistrationIds) throws MismatchedDevicesException, StaleDevicesException {
|
||||
validateDeviceMessages(account, deviceMessages);
|
||||
|
||||
// Don't try to be smart about ignoring unnecessary retries. If we make literally no change we will skip the ddb
|
||||
|
||||
@@ -6,11 +6,12 @@ package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
|
||||
import java.util.List;
|
||||
import java.util.OptionalInt;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.LongStream;
|
||||
import java.util.stream.IntStream;
|
||||
import javax.annotation.Nullable;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.whispersystems.textsecuregcm.auth.SaltedTokenHash;
|
||||
@@ -19,13 +20,15 @@ import org.whispersystems.textsecuregcm.identity.IdentityType;
|
||||
|
||||
public class Device {
|
||||
|
||||
public static final long PRIMARY_ID = 1;
|
||||
public static final int MAXIMUM_DEVICE_ID = 256;
|
||||
public static final byte PRIMARY_ID = 1;
|
||||
public static final byte MAXIMUM_DEVICE_ID = Byte.MAX_VALUE;
|
||||
public static final int MAX_REGISTRATION_ID = 0x3FFF;
|
||||
public static final List<Long> ALL_POSSIBLE_DEVICE_IDS = LongStream.range(1, MAXIMUM_DEVICE_ID).boxed().collect(Collectors.toList());
|
||||
public static final List<Byte> ALL_POSSIBLE_DEVICE_IDS = IntStream.range(Device.PRIMARY_ID, MAXIMUM_DEVICE_ID).boxed()
|
||||
.map(Integer::byteValue).collect(Collectors.toList());
|
||||
|
||||
@JsonDeserialize(using = DeviceIdDeserializer.class)
|
||||
@JsonProperty
|
||||
private long id;
|
||||
private byte id;
|
||||
|
||||
@JsonProperty
|
||||
private String name;
|
||||
@@ -135,11 +138,11 @@ public class Device {
|
||||
}
|
||||
}
|
||||
|
||||
public long getId() {
|
||||
public byte getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(long id) {
|
||||
public void setId(byte id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,41 @@
|
||||
/*
|
||||
* Copyright 2023 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.databind.DeserializationContext;
|
||||
import com.fasterxml.jackson.databind.JsonDeserializer;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* The built-in {@link com.fasterxml.jackson.databind.deser.std.NumberDeserializers.ByteDeserializer} will return
|
||||
* negative values—both verbatim and by coercing 128…255. We prefer this invalid data to fail fast, so this
|
||||
* is a simpler and stricter deserializer.
|
||||
*/
|
||||
public class DeviceIdDeserializer extends JsonDeserializer<Byte> {
|
||||
|
||||
@Override
|
||||
public Byte deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
|
||||
|
||||
byte value = p.getByteValue();
|
||||
|
||||
if (value < Device.PRIMARY_ID) {
|
||||
throw new DeviceIdDeserializationException();
|
||||
}
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
static class DeviceIdDeserializationException extends IOException {
|
||||
|
||||
DeviceIdDeserializationException() {
|
||||
super("Invalid Device ID");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -42,12 +42,12 @@ public class KeysManager {
|
||||
this.dynamicConfigurationManager = dynamicConfigurationManager;
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> store(final UUID identifier, final long deviceId, final List<ECPreKey> keys) {
|
||||
public CompletableFuture<Void> store(final UUID identifier, final byte deviceId, final List<ECPreKey> keys) {
|
||||
return store(identifier, deviceId, keys, null, null, null);
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> store(
|
||||
final UUID identifier, final long deviceId,
|
||||
final UUID identifier, final byte deviceId,
|
||||
@Nullable final List<ECPreKey> ecKeys,
|
||||
@Nullable final List<KEMSignedPreKey> pqKeys,
|
||||
@Nullable final ECSignedPreKey ecSignedPreKey,
|
||||
@@ -63,7 +63,8 @@ public class KeysManager {
|
||||
storeFutures.add(pqPreKeys.store(identifier, deviceId, pqKeys));
|
||||
}
|
||||
|
||||
if (ecSignedPreKey != null && dynamicConfigurationManager.getConfiguration().getEcPreKeyMigrationConfiguration().storeEcSignedPreKeys()) {
|
||||
if (ecSignedPreKey != null && dynamicConfigurationManager.getConfiguration().getEcPreKeyMigrationConfiguration()
|
||||
.storeEcSignedPreKeys()) {
|
||||
storeFutures.add(ecSignedPreKeys.store(identifier, deviceId, ecSignedPreKey));
|
||||
}
|
||||
|
||||
@@ -74,7 +75,7 @@ public class KeysManager {
|
||||
return CompletableFuture.allOf(storeFutures.toArray(new CompletableFuture[0]));
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> storeEcSignedPreKeys(final UUID identifier, final Map<Long, ECSignedPreKey> keys) {
|
||||
public CompletableFuture<Void> storeEcSignedPreKeys(final UUID identifier, final Map<Byte, ECSignedPreKey> keys) {
|
||||
if (dynamicConfigurationManager.getConfiguration().getEcPreKeyMigrationConfiguration().storeEcSignedPreKeys()) {
|
||||
return ecSignedPreKeys.store(identifier, keys);
|
||||
} else {
|
||||
@@ -82,27 +83,30 @@ public class KeysManager {
|
||||
}
|
||||
}
|
||||
|
||||
public CompletableFuture<Boolean> storeEcSignedPreKeyIfAbsent(final UUID identifier, final long deviceId, final ECSignedPreKey signedPreKey) {
|
||||
public CompletableFuture<Boolean> storeEcSignedPreKeyIfAbsent(final UUID identifier, final byte deviceId,
|
||||
final ECSignedPreKey signedPreKey) {
|
||||
return ecSignedPreKeys.storeIfAbsent(identifier, deviceId, signedPreKey);
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> storePqLastResort(final UUID identifier, final Map<Long, KEMSignedPreKey> keys) {
|
||||
public CompletableFuture<Void> storePqLastResort(final UUID identifier, final Map<Byte, KEMSignedPreKey> keys) {
|
||||
return pqLastResortKeys.store(identifier, keys);
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> storeEcOneTimePreKeys(final UUID identifier, final long deviceId, final List<ECPreKey> preKeys) {
|
||||
public CompletableFuture<Void> storeEcOneTimePreKeys(final UUID identifier, final byte deviceId,
|
||||
final List<ECPreKey> preKeys) {
|
||||
return ecPreKeys.store(identifier, deviceId, preKeys);
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> storeKemOneTimePreKeys(final UUID identifier, final long deviceId, final List<KEMSignedPreKey> preKeys) {
|
||||
public CompletableFuture<Void> storeKemOneTimePreKeys(final UUID identifier, final byte deviceId,
|
||||
final List<KEMSignedPreKey> preKeys) {
|
||||
return pqPreKeys.store(identifier, deviceId, preKeys);
|
||||
}
|
||||
|
||||
public CompletableFuture<Optional<ECPreKey>> takeEC(final UUID identifier, final long deviceId) {
|
||||
public CompletableFuture<Optional<ECPreKey>> takeEC(final UUID identifier, final byte deviceId) {
|
||||
return ecPreKeys.take(identifier, deviceId);
|
||||
}
|
||||
|
||||
public CompletableFuture<Optional<KEMSignedPreKey>> takePQ(final UUID identifier, final long deviceId) {
|
||||
public CompletableFuture<Optional<KEMSignedPreKey>> takePQ(final UUID identifier, final byte deviceId) {
|
||||
return pqPreKeys.take(identifier, deviceId)
|
||||
.thenCompose(maybeSingleUsePreKey -> maybeSingleUsePreKey
|
||||
.map(singleUsePreKey -> CompletableFuture.completedFuture(maybeSingleUsePreKey))
|
||||
@@ -110,26 +114,26 @@ public class KeysManager {
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
CompletableFuture<Optional<KEMSignedPreKey>> getLastResort(final UUID identifier, final long deviceId) {
|
||||
CompletableFuture<Optional<KEMSignedPreKey>> getLastResort(final UUID identifier, final byte deviceId) {
|
||||
return pqLastResortKeys.find(identifier, deviceId);
|
||||
}
|
||||
|
||||
public CompletableFuture<Optional<ECSignedPreKey>> getEcSignedPreKey(final UUID identifier, final long deviceId) {
|
||||
public CompletableFuture<Optional<ECSignedPreKey>> getEcSignedPreKey(final UUID identifier, final byte deviceId) {
|
||||
return ecSignedPreKeys.find(identifier, deviceId);
|
||||
}
|
||||
|
||||
public CompletableFuture<List<Long>> getPqEnabledDevices(final UUID identifier) {
|
||||
public CompletableFuture<List<Byte>> getPqEnabledDevices(final UUID identifier) {
|
||||
return pqLastResortKeys.getDeviceIdsWithKeys(identifier).collectList().toFuture();
|
||||
}
|
||||
|
||||
public CompletableFuture<Integer> getEcCount(final UUID identifier, final long deviceId) {
|
||||
public CompletableFuture<Integer> getEcCount(final UUID identifier, final byte deviceId) {
|
||||
return ecPreKeys.getCount(identifier, deviceId);
|
||||
}
|
||||
|
||||
public CompletableFuture<Integer> getPqCount(final UUID identifier, final long deviceId) {
|
||||
public CompletableFuture<Integer> getPqCount(final UUID identifier, final byte deviceId) {
|
||||
return pqPreKeys.getCount(identifier, deviceId);
|
||||
}
|
||||
|
||||
|
||||
public CompletableFuture<Void> delete(final UUID accountUuid) {
|
||||
return CompletableFuture.allOf(
|
||||
ecPreKeys.delete(accountUuid),
|
||||
@@ -140,7 +144,7 @@ public class KeysManager {
|
||||
pqLastResortKeys.delete(accountUuid));
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> delete(final UUID accountUuid, final long deviceId) {
|
||||
public CompletableFuture<Void> delete(final UUID accountUuid, final byte deviceId) {
|
||||
return CompletableFuture.allOf(
|
||||
ecPreKeys.delete(accountUuid, deviceId),
|
||||
pqPreKeys.delete(accountUuid, deviceId),
|
||||
|
||||
@@ -137,7 +137,7 @@ public class MessagePersister implements Managed {
|
||||
|
||||
for (final String queue : queuesToPersist) {
|
||||
final UUID accountUuid = MessagesCache.getAccountUuidFromQueueName(queue);
|
||||
final long deviceId = MessagesCache.getDeviceIdFromQueueName(queue);
|
||||
final byte deviceId = MessagesCache.getDeviceIdFromQueueName(queue);
|
||||
|
||||
try {
|
||||
persistQueue(accountUuid, deviceId);
|
||||
@@ -161,7 +161,7 @@ public class MessagePersister implements Managed {
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void persistQueue(final UUID accountUuid, final long deviceId) throws MessagePersistenceException {
|
||||
void persistQueue(final UUID accountUuid, final byte deviceId) throws MessagePersistenceException {
|
||||
final Optional<Account> maybeAccount = accountsManager.getByAccountIdentifier(accountUuid);
|
||||
|
||||
if (maybeAccount.isEmpty()) {
|
||||
|
||||
@@ -155,7 +155,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||
}
|
||||
}
|
||||
|
||||
public long insert(final UUID guid, final UUID destinationUuid, final long destinationDevice,
|
||||
public long insert(final UUID guid, final UUID destinationUuid, final byte destinationDevice,
|
||||
final MessageProtos.Envelope message) {
|
||||
final MessageProtos.Envelope messageWithGuid = message.toBuilder().setServerGuid(guid.toString()).build();
|
||||
return (long) insertTimer.record(() ->
|
||||
@@ -168,7 +168,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||
}
|
||||
|
||||
public CompletableFuture<Optional<MessageProtos.Envelope>> remove(final UUID destinationUuid,
|
||||
final long destinationDevice,
|
||||
final byte destinationDevice,
|
||||
final UUID messageGuid) {
|
||||
|
||||
return remove(destinationUuid, destinationDevice, List.of(messageGuid))
|
||||
@@ -177,7 +177,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public CompletableFuture<List<MessageProtos.Envelope>> remove(final UUID destinationUuid,
|
||||
final long destinationDevice,
|
||||
final byte destinationDevice,
|
||||
final List<UUID> messageGuids) {
|
||||
|
||||
return removeByGuidScript.executeBinaryAsync(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
|
||||
@@ -202,12 +202,12 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||
}, messageDeletionExecutorService);
|
||||
}
|
||||
|
||||
public boolean hasMessages(final UUID destinationUuid, final long destinationDevice) {
|
||||
public boolean hasMessages(final UUID destinationUuid, final byte destinationDevice) {
|
||||
return readDeleteCluster.withBinaryCluster(
|
||||
connection -> connection.sync().zcard(getMessageQueueKey(destinationUuid, destinationDevice)) > 0);
|
||||
}
|
||||
|
||||
public Publisher<MessageProtos.Envelope> get(final UUID destinationUuid, final long destinationDevice) {
|
||||
public Publisher<MessageProtos.Envelope> get(final UUID destinationUuid, final byte destinationDevice) {
|
||||
|
||||
final long earliestAllowableEphemeralTimestamp =
|
||||
clock.millis() - MAX_EPHEMERAL_MESSAGE_DELAY.toMillis();
|
||||
@@ -238,7 +238,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||
return message.hasEphemeral() && message.getEphemeral() && message.getTimestamp() < earliestAllowableTimestamp;
|
||||
}
|
||||
|
||||
private void discardStaleEphemeralMessages(final UUID destinationUuid, final long destinationDevice,
|
||||
private void discardStaleEphemeralMessages(final UUID destinationUuid, final byte destinationDevice,
|
||||
Flux<MessageProtos.Envelope> staleEphemeralMessages) {
|
||||
staleEphemeralMessages
|
||||
.map(e -> UUID.fromString(e.getServerGuid()))
|
||||
@@ -251,7 +251,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Flux<MessageProtos.Envelope> getAllMessages(final UUID destinationUuid, final long destinationDevice) {
|
||||
Flux<MessageProtos.Envelope> getAllMessages(final UUID destinationUuid, final byte destinationDevice) {
|
||||
|
||||
// fetch messages by page
|
||||
return getNextMessagePage(destinationUuid, destinationDevice, -1)
|
||||
@@ -284,7 +284,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||
});
|
||||
}
|
||||
|
||||
private Flux<Pair<List<byte[]>, Long>> getNextMessagePage(final UUID destinationUuid, final long destinationDevice,
|
||||
private Flux<Pair<List<byte[]>, Long>> getNextMessagePage(final UUID destinationUuid, final byte destinationDevice,
|
||||
long messageId) {
|
||||
|
||||
return getItemsScript.executeBinaryReactive(
|
||||
@@ -315,7 +315,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
List<MessageProtos.Envelope> getMessagesToPersist(final UUID accountUuid, final long destinationDevice,
|
||||
List<MessageProtos.Envelope> getMessagesToPersist(final UUID accountUuid, final byte destinationDevice,
|
||||
final int limit) {
|
||||
return getMessagesTimer.record(() -> {
|
||||
final List<ScoredValue<byte[]>> scoredMessages = readDeleteCluster.withBinaryCluster(
|
||||
@@ -336,16 +336,14 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> clear(final UUID destinationUuid) {
|
||||
final CompletableFuture<?>[] clearFutures = new CompletableFuture[Device.MAXIMUM_DEVICE_ID];
|
||||
|
||||
for (int deviceId = 0; deviceId < Device.MAXIMUM_DEVICE_ID; deviceId++) {
|
||||
clearFutures[deviceId] = clear(destinationUuid, deviceId);
|
||||
}
|
||||
|
||||
return CompletableFuture.allOf(clearFutures);
|
||||
return CompletableFuture.allOf(
|
||||
Device.ALL_POSSIBLE_DEVICE_IDS.stream()
|
||||
.map(deviceId -> clear(destinationUuid, deviceId))
|
||||
.toList()
|
||||
.toArray(CompletableFuture[]::new));
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> clear(final UUID destinationUuid, final long deviceId) {
|
||||
public CompletableFuture<Void> clear(final UUID destinationUuid, final byte deviceId) {
|
||||
final Timer.Sample sample = Timer.start();
|
||||
|
||||
return removeQueueScript.executeBinaryAsync(List.of(getMessageQueueKey(destinationUuid, deviceId),
|
||||
@@ -368,23 +366,23 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||
String.valueOf(limit))));
|
||||
}
|
||||
|
||||
void addQueueToPersist(final UUID accountUuid, final long deviceId) {
|
||||
void addQueueToPersist(final UUID accountUuid, final byte deviceId) {
|
||||
readDeleteCluster.useBinaryCluster(connection -> connection.sync()
|
||||
.zadd(getQueueIndexKey(accountUuid, deviceId), ZAddArgs.Builder.nx(), System.currentTimeMillis(),
|
||||
getMessageQueueKey(accountUuid, deviceId)));
|
||||
}
|
||||
|
||||
void lockQueueForPersistence(final UUID accountUuid, final long deviceId) {
|
||||
void lockQueueForPersistence(final UUID accountUuid, final byte deviceId) {
|
||||
readDeleteCluster.useBinaryCluster(
|
||||
connection -> connection.sync().setex(getPersistInProgressKey(accountUuid, deviceId), 30, LOCK_VALUE));
|
||||
}
|
||||
|
||||
void unlockQueueForPersistence(final UUID accountUuid, final long deviceId) {
|
||||
void unlockQueueForPersistence(final UUID accountUuid, final byte deviceId) {
|
||||
readDeleteCluster.useBinaryCluster(
|
||||
connection -> connection.sync().del(getPersistInProgressKey(accountUuid, deviceId)));
|
||||
}
|
||||
|
||||
public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId,
|
||||
public void addMessageAvailabilityListener(final UUID destinationUuid, final byte deviceId,
|
||||
final MessageAvailabilityListener listener) {
|
||||
final String queueName = getQueueName(destinationUuid, deviceId);
|
||||
|
||||
@@ -500,7 +498,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static String getQueueName(final UUID accountUuid, final long deviceId) {
|
||||
static String getQueueName(final UUID accountUuid, final byte deviceId) {
|
||||
return accountUuid + "::" + deviceId;
|
||||
}
|
||||
|
||||
@@ -513,15 +511,15 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static byte[] getMessageQueueKey(final UUID accountUuid, final long deviceId) {
|
||||
static byte[] getMessageQueueKey(final UUID accountUuid, final byte deviceId) {
|
||||
return ("user_queue::{" + accountUuid.toString() + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
private static byte[] getMessageQueueMetadataKey(final UUID accountUuid, final long deviceId) {
|
||||
private static byte[] getMessageQueueMetadataKey(final UUID accountUuid, final byte deviceId) {
|
||||
return ("user_queue_metadata::{" + accountUuid.toString() + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
private static byte[] getQueueIndexKey(final UUID accountUuid, final long deviceId) {
|
||||
private static byte[] getQueueIndexKey(final UUID accountUuid, final byte deviceId) {
|
||||
return getQueueIndexKey(SlotHash.getSlot(accountUuid.toString() + "::" + deviceId));
|
||||
}
|
||||
|
||||
@@ -529,7 +527,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||
return ("user_queue_index::{" + RedisClusterUtil.getMinimalHashTag(slot) + "}").getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
private static byte[] getPersistInProgressKey(final UUID accountUuid, final long deviceId) {
|
||||
private static byte[] getPersistInProgressKey(final UUID accountUuid, final byte deviceId) {
|
||||
return ("user_queue_persisting::{" + accountUuid + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
@@ -539,7 +537,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||
return UUID.fromString(queueName.substring(startOfHashTag + 1, queueName.indexOf("::", startOfHashTag)));
|
||||
}
|
||||
|
||||
static long getDeviceIdFromQueueName(final String queueName) {
|
||||
return Long.parseLong(queueName.substring(queueName.lastIndexOf("::") + 2, queueName.lastIndexOf('}')));
|
||||
static byte getDeviceIdFromQueueName(final String queueName) {
|
||||
return Byte.parseByte(queueName.substring(queueName.lastIndexOf("::") + 2, queueName.lastIndexOf('}')));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -83,11 +83,13 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
||||
this.messageDeletionScheduler = Schedulers.fromExecutor(messageDeletionExecutor);
|
||||
}
|
||||
|
||||
public void store(final List<MessageProtos.Envelope> messages, final UUID destinationAccountUuid, final long destinationDeviceId) {
|
||||
public void store(final List<MessageProtos.Envelope> messages, final UUID destinationAccountUuid,
|
||||
final byte destinationDeviceId) {
|
||||
storeTimer.record(() -> writeInBatches(messages, (messageBatch) -> storeBatch(messageBatch, destinationAccountUuid, destinationDeviceId)));
|
||||
}
|
||||
|
||||
private void storeBatch(final List<MessageProtos.Envelope> messages, final UUID destinationAccountUuid, final long destinationDeviceId) {
|
||||
private void storeBatch(final List<MessageProtos.Envelope> messages, final UUID destinationAccountUuid,
|
||||
final byte destinationDeviceId) {
|
||||
if (messages.size() > DYNAMO_DB_MAX_BATCH_SIZE) {
|
||||
throw new IllegalArgumentException("Maximum batch size of " + DYNAMO_DB_MAX_BATCH_SIZE + " exceeded with " + messages.size() + " messages");
|
||||
}
|
||||
@@ -112,7 +114,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
||||
executeTableWriteItemsUntilComplete(Map.of(tableName, writeItems));
|
||||
}
|
||||
|
||||
public Publisher<MessageProtos.Envelope> load(final UUID destinationAccountUuid, final long destinationDeviceId,
|
||||
public Publisher<MessageProtos.Envelope> load(final UUID destinationAccountUuid, final byte destinationDeviceId,
|
||||
final Integer limit) {
|
||||
|
||||
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid);
|
||||
@@ -191,7 +193,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
||||
}
|
||||
|
||||
public CompletableFuture<Optional<MessageProtos.Envelope>> deleteMessage(final UUID destinationAccountUuid,
|
||||
final long destinationDeviceId, final UUID messageUuid, final long serverTimestamp) {
|
||||
final byte destinationDeviceId, final UUID messageUuid, final long serverTimestamp) {
|
||||
|
||||
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid);
|
||||
final AttributeValue sortKey = convertSortKey(destinationDeviceId, serverTimestamp, messageUuid);
|
||||
@@ -240,7 +242,8 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
||||
.toFuture();
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> deleteAllMessagesForDevice(final UUID destinationAccountUuid, final long destinationDeviceId) {
|
||||
public CompletableFuture<Void> deleteAllMessagesForDevice(final UUID destinationAccountUuid,
|
||||
final byte destinationDeviceId) {
|
||||
final Timer.Sample sample = Timer.start();
|
||||
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid);
|
||||
|
||||
@@ -284,8 +287,10 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
||||
return AttributeValues.fromUUID(destinationAccountUuid);
|
||||
}
|
||||
|
||||
private static AttributeValue convertSortKey(final long destinationDeviceId, final long serverTimestamp, final UUID messageUuid) {
|
||||
private static AttributeValue convertSortKey(final byte destinationDeviceId, final long serverTimestamp,
|
||||
final UUID messageUuid) {
|
||||
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[32]);
|
||||
// for compatibility - destinationDeviceId was previously `long`
|
||||
byteBuffer.putLong(destinationDeviceId);
|
||||
byteBuffer.putLong(serverTimestamp);
|
||||
byteBuffer.putLong(messageUuid.getMostSignificantBits());
|
||||
@@ -293,8 +298,9 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
||||
return AttributeValues.fromByteBuffer(byteBuffer.flip());
|
||||
}
|
||||
|
||||
private static AttributeValue convertDestinationDeviceIdToSortKeyPrefix(final long destinationDeviceId) {
|
||||
private static AttributeValue convertDestinationDeviceIdToSortKeyPrefix(final byte destinationDeviceId) {
|
||||
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[8]);
|
||||
// for compatibility - destinationDeviceId was previously `long`
|
||||
byteBuffer.putLong(destinationDeviceId);
|
||||
return AttributeValues.fromByteBuffer(byteBuffer.flip());
|
||||
}
|
||||
|
||||
@@ -60,7 +60,7 @@ public class MessagesManager {
|
||||
this.messageDeletionExecutor = messageDeletionExecutor;
|
||||
}
|
||||
|
||||
public void insert(UUID destinationUuid, long destinationDevice, Envelope message) {
|
||||
public void insert(UUID destinationUuid, byte destinationDevice, Envelope message) {
|
||||
final UUID messageGuid = UUID.randomUUID();
|
||||
|
||||
messagesCache.insert(messageGuid, destinationUuid, destinationDevice, message);
|
||||
@@ -70,11 +70,11 @@ public class MessagesManager {
|
||||
}
|
||||
}
|
||||
|
||||
public boolean hasCachedMessages(final UUID destinationUuid, final long destinationDevice) {
|
||||
public boolean hasCachedMessages(final UUID destinationUuid, final byte destinationDevice) {
|
||||
return messagesCache.hasMessages(destinationUuid, destinationDevice);
|
||||
}
|
||||
|
||||
public Mono<Pair<List<Envelope>, Boolean>> getMessagesForDevice(UUID destinationUuid, long destinationDevice,
|
||||
public Mono<Pair<List<Envelope>, Boolean>> getMessagesForDevice(UUID destinationUuid, byte destinationDevice,
|
||||
boolean cachedMessagesOnly) {
|
||||
|
||||
return Flux.from(
|
||||
@@ -84,13 +84,13 @@ public class MessagesManager {
|
||||
.map(envelopes -> new Pair<>(envelopes, envelopes.size() >= RESULT_SET_CHUNK_SIZE));
|
||||
}
|
||||
|
||||
public Publisher<Envelope> getMessagesForDeviceReactive(UUID destinationUuid, long destinationDevice,
|
||||
public Publisher<Envelope> getMessagesForDeviceReactive(UUID destinationUuid, byte destinationDevice,
|
||||
final boolean cachedMessagesOnly) {
|
||||
|
||||
return getMessagesForDevice(destinationUuid, destinationDevice, null, cachedMessagesOnly);
|
||||
}
|
||||
|
||||
private Publisher<Envelope> getMessagesForDevice(UUID destinationUuid, long destinationDevice,
|
||||
private Publisher<Envelope> getMessagesForDevice(UUID destinationUuid, byte destinationDevice,
|
||||
@Nullable Integer limit, final boolean cachedMessagesOnly) {
|
||||
|
||||
final Publisher<Envelope> dynamoPublisher =
|
||||
@@ -108,13 +108,13 @@ public class MessagesManager {
|
||||
messagesDynamoDb.deleteAllMessagesForAccount(destinationUuid));
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> clear(UUID destinationUuid, long deviceId) {
|
||||
public CompletableFuture<Void> clear(UUID destinationUuid, byte deviceId) {
|
||||
return CompletableFuture.allOf(
|
||||
messagesCache.clear(destinationUuid, deviceId),
|
||||
messagesDynamoDb.deleteAllMessagesForDevice(destinationUuid, deviceId));
|
||||
}
|
||||
|
||||
public CompletableFuture<Optional<Envelope>> delete(UUID destinationUuid, long destinationDeviceId, UUID guid,
|
||||
public CompletableFuture<Optional<Envelope>> delete(UUID destinationUuid, byte destinationDeviceId, UUID guid,
|
||||
@Nullable Long serverTimestamp) {
|
||||
return messagesCache.remove(destinationUuid, destinationDeviceId, guid)
|
||||
.thenComposeAsync(removed -> {
|
||||
@@ -140,7 +140,7 @@ public class MessagesManager {
|
||||
*/
|
||||
public int persistMessages(
|
||||
final UUID destinationUuid,
|
||||
final long destinationDeviceId,
|
||||
final byte destinationDeviceId,
|
||||
final List<Envelope> messages) {
|
||||
|
||||
final List<Envelope> nonEphemeralMessages = messages.stream()
|
||||
@@ -165,7 +165,7 @@ public class MessagesManager {
|
||||
|
||||
public void addMessageAvailabilityListener(
|
||||
final UUID destinationUuid,
|
||||
final long destinationDeviceId,
|
||||
final byte destinationDeviceId,
|
||||
final MessageAvailabilityListener listener) {
|
||||
messagesCache.addMessageAvailabilityListener(destinationUuid, destinationDeviceId, listener);
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@ public class RefreshingAccountAndDeviceSupplier implements Supplier<Pair<Account
|
||||
private Device device;
|
||||
private final AccountsManager accountsManager;
|
||||
|
||||
public RefreshingAccountAndDeviceSupplier(Account account, long deviceId, AccountsManager accountsManager) {
|
||||
public RefreshingAccountAndDeviceSupplier(Account account, byte deviceId, AccountsManager accountsManager) {
|
||||
this.account = account;
|
||||
this.device = account.getDevice(deviceId)
|
||||
.orElseThrow(() -> new RefreshingAccountAndDeviceNotFoundException("Could not find device"));
|
||||
|
||||
@@ -31,7 +31,7 @@ public class RepeatedUseECSignedPreKeyStore extends RepeatedUseSignedPreKeyStore
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, AttributeValue> getItemFromPreKey(final UUID accountUuid, final long deviceId, final ECSignedPreKey signedPreKey) {
|
||||
protected Map<String, AttributeValue> getItemFromPreKey(final UUID accountUuid, final byte deviceId, final ECSignedPreKey signedPreKey) {
|
||||
|
||||
return Map.of(
|
||||
KEY_ACCOUNT_UUID, getPartitionKey(accountUuid),
|
||||
@@ -54,7 +54,7 @@ public class RepeatedUseECSignedPreKeyStore extends RepeatedUseSignedPreKeyStore
|
||||
}
|
||||
}
|
||||
|
||||
public CompletableFuture<Boolean> storeIfAbsent(final UUID identifier, final long deviceId, final ECSignedPreKey signedPreKey) {
|
||||
public CompletableFuture<Boolean> storeIfAbsent(final UUID identifier, final byte deviceId, final ECSignedPreKey signedPreKey) {
|
||||
return dynamoDbAsyncClient.putItem(PutItemRequest.builder()
|
||||
.tableName(tableName)
|
||||
.item(getItemFromPreKey(identifier, deviceId, signedPreKey))
|
||||
|
||||
@@ -21,7 +21,7 @@ public class RepeatedUseKEMSignedPreKeyStore extends RepeatedUseSignedPreKeyStor
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, AttributeValue> getItemFromPreKey(final UUID accountUuid, final long deviceId, final KEMSignedPreKey signedPreKey) {
|
||||
protected Map<String, AttributeValue> getItemFromPreKey(final UUID accountUuid, final byte deviceId, final KEMSignedPreKey signedPreKey) {
|
||||
|
||||
return Map.of(
|
||||
KEY_ACCOUNT_UUID, getPartitionKey(accountUuid),
|
||||
|
||||
@@ -67,7 +67,7 @@ public abstract class RepeatedUseSignedPreKeyStore<K extends SignedPreKey<?>> {
|
||||
*
|
||||
* @return a future that completes once the key has been stored
|
||||
*/
|
||||
public CompletableFuture<Void> store(final UUID identifier, final long deviceId, final K signedPreKey) {
|
||||
public CompletableFuture<Void> store(final UUID identifier, final byte deviceId, final K signedPreKey) {
|
||||
final Timer.Sample sample = Timer.start();
|
||||
|
||||
return dynamoDbAsyncClient.putItem(PutItemRequest.builder()
|
||||
@@ -87,13 +87,13 @@ public abstract class RepeatedUseSignedPreKeyStore<K extends SignedPreKey<?>> {
|
||||
*
|
||||
* @return a future that completes once all keys have been stored
|
||||
*/
|
||||
public CompletableFuture<Void> store(final UUID identifier, final Map<Long, K> signedPreKeysByDeviceId) {
|
||||
public CompletableFuture<Void> store(final UUID identifier, final Map<Byte, K> signedPreKeysByDeviceId) {
|
||||
final Timer.Sample sample = Timer.start();
|
||||
|
||||
return dynamoDbAsyncClient.transactWriteItems(TransactWriteItemsRequest.builder()
|
||||
.transactItems(signedPreKeysByDeviceId.entrySet().stream()
|
||||
.map(entry -> {
|
||||
final long deviceId = entry.getKey();
|
||||
final byte deviceId = entry.getKey();
|
||||
final K signedPreKey = entry.getValue();
|
||||
|
||||
return TransactWriteItem.builder()
|
||||
@@ -117,7 +117,7 @@ public abstract class RepeatedUseSignedPreKeyStore<K extends SignedPreKey<?>> {
|
||||
* @return a future that yields an optional signed pre-key if one is available for the target device or empty if no
|
||||
* key could be found for the target device
|
||||
*/
|
||||
public CompletableFuture<Optional<K>> find(final UUID identifier, final long deviceId) {
|
||||
public CompletableFuture<Optional<K>> find(final UUID identifier, final byte deviceId) {
|
||||
final Timer.Sample sample = Timer.start();
|
||||
|
||||
final CompletableFuture<Optional<K>> findFuture = dynamoDbAsyncClient.getItem(GetItemRequest.builder()
|
||||
@@ -165,7 +165,7 @@ public abstract class RepeatedUseSignedPreKeyStore<K extends SignedPreKey<?>> {
|
||||
*
|
||||
* @return a future that completes once the repeated-use pre-key has been removed from the target device
|
||||
*/
|
||||
public CompletableFuture<Void> delete(final UUID identifier, final long deviceId) {
|
||||
public CompletableFuture<Void> delete(final UUID identifier, final byte deviceId) {
|
||||
final Timer.Sample sample = Timer.start();
|
||||
|
||||
return dynamoDbAsyncClient.deleteItem(DeleteItemRequest.builder()
|
||||
@@ -175,7 +175,7 @@ public abstract class RepeatedUseSignedPreKeyStore<K extends SignedPreKey<?>> {
|
||||
.thenRun(() -> sample.stop(deleteForDeviceTimer));
|
||||
}
|
||||
|
||||
public Flux<Long> getDeviceIdsWithKeys(final UUID identifier) {
|
||||
public Flux<Byte> getDeviceIdsWithKeys(final UUID identifier) {
|
||||
return Flux.from(dynamoDbAsyncClient.queryPaginator(QueryRequest.builder()
|
||||
.tableName(tableName)
|
||||
.keyConditionExpression("#uuid = :uuid")
|
||||
@@ -186,10 +186,10 @@ public abstract class RepeatedUseSignedPreKeyStore<K extends SignedPreKey<?>> {
|
||||
.consistentRead(true)
|
||||
.build())
|
||||
.items())
|
||||
.map(item -> Long.parseLong(item.get(KEY_DEVICE_ID).n()));
|
||||
.map(item -> Byte.parseByte(item.get(KEY_DEVICE_ID).n()));
|
||||
}
|
||||
|
||||
protected static Map<String, AttributeValue> getPrimaryKey(final UUID identifier, final long deviceId) {
|
||||
protected static Map<String, AttributeValue> getPrimaryKey(final UUID identifier, final byte deviceId) {
|
||||
return Map.of(
|
||||
KEY_ACCOUNT_UUID, getPartitionKey(identifier),
|
||||
KEY_DEVICE_ID, getSortKey(deviceId));
|
||||
@@ -199,11 +199,12 @@ public abstract class RepeatedUseSignedPreKeyStore<K extends SignedPreKey<?>> {
|
||||
return AttributeValues.fromUUID(accountUuid);
|
||||
}
|
||||
|
||||
protected static AttributeValue getSortKey(final long deviceId) {
|
||||
return AttributeValues.fromLong(deviceId);
|
||||
protected static AttributeValue getSortKey(final byte deviceId) {
|
||||
return AttributeValues.fromInt(deviceId);
|
||||
}
|
||||
|
||||
protected abstract Map<String, AttributeValue> getItemFromPreKey(final UUID accountUuid, final long deviceId, final K signedPreKey);
|
||||
protected abstract Map<String, AttributeValue> getItemFromPreKey(final UUID accountUuid, final byte deviceId,
|
||||
final K signedPreKey);
|
||||
|
||||
protected abstract K getPreKeyFromItem(final Map<String, AttributeValue> item);
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ public class SingleUseECPreKeyStore extends SingleUsePreKeyStore<ECPreKey> {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, AttributeValue> getItemFromPreKey(final UUID identifier, final long deviceId, final ECPreKey preKey) {
|
||||
protected Map<String, AttributeValue> getItemFromPreKey(final UUID identifier, final byte deviceId, final ECPreKey preKey) {
|
||||
return Map.of(
|
||||
KEY_ACCOUNT_UUID, getPartitionKey(identifier),
|
||||
KEY_DEVICE_ID_KEY_ID, getSortKey(deviceId, preKey.keyId()),
|
||||
|
||||
@@ -21,7 +21,7 @@ public class SingleUseKEMPreKeyStore extends SingleUsePreKeyStore<KEMSignedPreKe
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, AttributeValue> getItemFromPreKey(final UUID identifier, final long deviceId, final KEMSignedPreKey signedPreKey) {
|
||||
protected Map<String, AttributeValue> getItemFromPreKey(final UUID identifier, final byte deviceId, final KEMSignedPreKey signedPreKey) {
|
||||
return Map.of(
|
||||
KEY_ACCOUNT_UUID, getPartitionKey(identifier),
|
||||
KEY_DEVICE_ID_KEY_ID, getSortKey(deviceId, signedPreKey.keyId()),
|
||||
|
||||
@@ -36,11 +36,11 @@ import software.amazon.awssdk.services.dynamodb.model.Select;
|
||||
|
||||
/**
|
||||
* A single-use pre-key store stores single-use pre-keys of a specific type. Keys returned by a single-use pre-key
|
||||
* store's {@link #take(UUID, long)} method are guaranteed to be returned exactly once, and repeated calls will never
|
||||
* store's {@link #take(UUID, byte)} method are guaranteed to be returned exactly once, and repeated calls will never
|
||||
* yield the same key.
|
||||
* <p/>
|
||||
* Each {@link Account} may have one or more {@link Device devices}. Clients <em>should</em> regularly check their
|
||||
* supply of single-use pre-keys (see {@link #getCount(UUID, long)}) and upload new keys when their supply runs low. In
|
||||
* supply of single-use pre-keys (see {@link #getCount(UUID, byte)}) and upload new keys when their supply runs low. In
|
||||
* the event that a party wants to begin a session with a device that has no single-use pre-keys remaining, that party
|
||||
* may fall back to using the device's repeated-use ("last-resort") signed pre-key instead.
|
||||
*/
|
||||
@@ -91,7 +91,7 @@ public abstract class SingleUsePreKeyStore<K extends PreKey<?>> {
|
||||
* @return a future that completes when all previously-stored keys have been removed and the given collection of
|
||||
* pre-keys has been stored in its place
|
||||
*/
|
||||
public CompletableFuture<Void> store(final UUID identifier, final long deviceId, final List<K> preKeys) {
|
||||
public CompletableFuture<Void> store(final UUID identifier, final byte deviceId, final List<K> preKeys) {
|
||||
final Timer.Sample sample = Timer.start();
|
||||
|
||||
return Mono.fromFuture(() -> delete(identifier, deviceId))
|
||||
@@ -103,7 +103,7 @@ public abstract class SingleUsePreKeyStore<K extends PreKey<?>> {
|
||||
.thenRun(() -> sample.stop(storeKeyBatchTimer));
|
||||
}
|
||||
|
||||
private CompletableFuture<Void> store(final UUID identifier, final long deviceId, final K preKey) {
|
||||
private CompletableFuture<Void> store(final UUID identifier, final byte deviceId, final K preKey) {
|
||||
final Timer.Sample sample = Timer.start();
|
||||
|
||||
return dynamoDbAsyncClient.putItem(PutItemRequest.builder()
|
||||
@@ -124,7 +124,7 @@ public abstract class SingleUsePreKeyStore<K extends PreKey<?>> {
|
||||
* @return a future that yields a single-use pre-key if one is available or empty if no single-use pre-keys are
|
||||
* available for the target device
|
||||
*/
|
||||
public CompletableFuture<Optional<K>> take(final UUID identifier, final long deviceId) {
|
||||
public CompletableFuture<Optional<K>> take(final UUID identifier, final byte deviceId) {
|
||||
final Timer.Sample sample = Timer.start();
|
||||
final AttributeValue partitionKey = getPartitionKey(identifier);
|
||||
final AtomicInteger keysConsidered = new AtomicInteger(0);
|
||||
@@ -169,7 +169,7 @@ public abstract class SingleUsePreKeyStore<K extends PreKey<?>> {
|
||||
* @return a future that yields the approximate number of single-use pre-keys currently available for the target
|
||||
* device
|
||||
*/
|
||||
public CompletableFuture<Integer> getCount(final UUID identifier, final long deviceId) {
|
||||
public CompletableFuture<Integer> getCount(final UUID identifier, final byte deviceId) {
|
||||
final Timer.Sample sample = Timer.start();
|
||||
|
||||
// Getting an accurate count from DynamoDB can be very confusing. See:
|
||||
@@ -230,7 +230,7 @@ public abstract class SingleUsePreKeyStore<K extends PreKey<?>> {
|
||||
|
||||
* @return a future that completes when all single-use pre-keys have been removed for the target device
|
||||
*/
|
||||
public CompletableFuture<Void> delete(final UUID identifier, final long deviceId) {
|
||||
public CompletableFuture<Void> delete(final UUID identifier, final byte deviceId) {
|
||||
final Timer.Sample sample = Timer.start();
|
||||
|
||||
return deleteItems(getPartitionKey(identifier), Flux.from(dynamoDbAsyncClient.queryPaginator(QueryRequest.builder()
|
||||
@@ -267,20 +267,20 @@ public abstract class SingleUsePreKeyStore<K extends PreKey<?>> {
|
||||
return AttributeValues.fromUUID(accountUuid);
|
||||
}
|
||||
|
||||
protected static AttributeValue getSortKey(final long deviceId, final long keyId) {
|
||||
protected static AttributeValue getSortKey(final byte deviceId, final long keyId) {
|
||||
final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
|
||||
byteBuffer.putLong(deviceId);
|
||||
byteBuffer.putLong(keyId);
|
||||
return AttributeValues.fromByteBuffer(byteBuffer.flip());
|
||||
}
|
||||
|
||||
private static AttributeValue getSortKeyPrefix(final long deviceId) {
|
||||
private static AttributeValue getSortKeyPrefix(final byte deviceId) {
|
||||
final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[8]);
|
||||
byteBuffer.putLong(deviceId);
|
||||
return AttributeValues.fromByteBuffer(byteBuffer.flip());
|
||||
}
|
||||
|
||||
protected abstract Map<String, AttributeValue> getItemFromPreKey(final UUID identifier, final long deviceId,
|
||||
protected abstract Map<String, AttributeValue> getItemFromPreKey(final UUID identifier, final byte deviceId,
|
||||
final K preKey);
|
||||
|
||||
protected abstract K getPreKeyFromItem(final Map<String, AttributeValue> item);
|
||||
|
||||
@@ -8,7 +8,6 @@ import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
@@ -24,7 +23,7 @@ public class DestinationDeviceValidator {
|
||||
* @see #validateRegistrationIds(Account, Stream, boolean)
|
||||
*/
|
||||
public static <T> void validateRegistrationIds(final Account account, final Collection<T> messages,
|
||||
Function<T, Long> getDeviceId, Function<T, Integer> getRegistrationId, boolean usePhoneNumberIdentity)
|
||||
Function<T, Byte> getDeviceId, Function<T, Integer> getRegistrationId, boolean usePhoneNumberIdentity)
|
||||
throws StaleDevicesException {
|
||||
validateRegistrationIds(account,
|
||||
messages.stream().map(m -> new Pair<>(getDeviceId.apply(m), getRegistrationId.apply(m))),
|
||||
@@ -47,13 +46,13 @@ public class DestinationDeviceValidator {
|
||||
* account does not have a corresponding device or if the registration IDs do not match
|
||||
*/
|
||||
public static void validateRegistrationIds(final Account account,
|
||||
final Stream<Pair<Long, Integer>> deviceIdAndRegistrationIdStream,
|
||||
final Stream<Pair<Byte, Integer>> deviceIdAndRegistrationIdStream,
|
||||
final boolean usePhoneNumberIdentity) throws StaleDevicesException {
|
||||
|
||||
final List<Long> staleDevices = deviceIdAndRegistrationIdStream
|
||||
final List<Byte> staleDevices = deviceIdAndRegistrationIdStream
|
||||
.filter(deviceIdAndRegistrationId -> deviceIdAndRegistrationId.second() > 0)
|
||||
.filter(deviceIdAndRegistrationId -> {
|
||||
final long deviceId = deviceIdAndRegistrationId.first();
|
||||
final byte deviceId = deviceIdAndRegistrationId.first();
|
||||
final int registrationId = deviceIdAndRegistrationId.second();
|
||||
boolean registrationIdMatches = account.getDevice(deviceId)
|
||||
.map(device -> registrationId == (usePhoneNumberIdentity
|
||||
@@ -86,19 +85,19 @@ public class DestinationDeviceValidator {
|
||||
* account
|
||||
*/
|
||||
public static void validateCompleteDeviceList(final Account account,
|
||||
final Set<Long> messageDeviceIds,
|
||||
final Set<Long> excludedDeviceIds) throws MismatchedDevicesException {
|
||||
final Set<Byte> messageDeviceIds,
|
||||
final Set<Byte> excludedDeviceIds) throws MismatchedDevicesException {
|
||||
|
||||
final Set<Long> accountDeviceIds = account.getDevices().stream()
|
||||
final Set<Byte> accountDeviceIds = account.getDevices().stream()
|
||||
.filter(Device::isEnabled)
|
||||
.map(Device::getId)
|
||||
.filter(deviceId -> !excludedDeviceIds.contains(deviceId))
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
final Set<Long> missingDeviceIds = new HashSet<>(accountDeviceIds);
|
||||
final Set<Byte> missingDeviceIds = new HashSet<>(accountDeviceIds);
|
||||
missingDeviceIds.removeAll(messageDeviceIds);
|
||||
|
||||
final Set<Long> extraDeviceIds = new HashSet<>(messageDeviceIds);
|
||||
final Set<Byte> extraDeviceIds = new HashSet<>(messageDeviceIds);
|
||||
extraDeviceIds.removeAll(accountDeviceIds);
|
||||
|
||||
if (!missingDeviceIds.isEmpty() || !extraDeviceIds.isEmpty()) {
|
||||
|
||||
@@ -10,7 +10,7 @@ import java.util.Base64;
|
||||
|
||||
public class ProvisioningAddress extends WebsocketAddress {
|
||||
|
||||
public ProvisioningAddress(String address, int id) {
|
||||
public ProvisioningAddress(String address, byte id) {
|
||||
super(address, id);
|
||||
}
|
||||
|
||||
@@ -26,6 +26,6 @@ public class ProvisioningAddress extends WebsocketAddress {
|
||||
byte[] random = new byte[16];
|
||||
new SecureRandom().nextBytes(random);
|
||||
|
||||
return new ProvisioningAddress(Base64.getUrlEncoder().withoutPadding().encodeToString(random), 0);
|
||||
return new ProvisioningAddress(Base64.getUrlEncoder().withoutPadding().encodeToString(random), (byte) 0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,9 +10,9 @@ import org.whispersystems.textsecuregcm.storage.PubSubAddress;
|
||||
public class WebsocketAddress implements PubSubAddress {
|
||||
|
||||
private final String number;
|
||||
private final long deviceId;
|
||||
private final byte deviceId;
|
||||
|
||||
public WebsocketAddress(String number, long deviceId) {
|
||||
public WebsocketAddress(String number, byte deviceId) {
|
||||
this.number = number;
|
||||
this.deviceId = deviceId;
|
||||
}
|
||||
@@ -26,7 +26,7 @@ public class WebsocketAddress implements PubSubAddress {
|
||||
}
|
||||
|
||||
this.number = parts[0];
|
||||
this.deviceId = Long.parseLong(parts[1]);
|
||||
this.deviceId = Byte.parseByte(parts[1]);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new InvalidWebsocketAddressException(e);
|
||||
}
|
||||
@@ -36,7 +36,7 @@ public class WebsocketAddress implements PubSubAddress {
|
||||
return number;
|
||||
}
|
||||
|
||||
public long getDeviceId() {
|
||||
public byte getDeviceId() {
|
||||
return deviceId;
|
||||
}
|
||||
|
||||
|
||||
@@ -41,7 +41,7 @@ public class MigrateSignedECPreKeysCommand extends AbstractSinglePassCrawlAccoun
|
||||
|
||||
accounts.flatMap(account -> Flux.fromIterable(account.getDevices())
|
||||
.flatMap(device -> {
|
||||
final List<Tuple3<UUID, Long, ECSignedPreKey>> keys = new ArrayList<>(2);
|
||||
final List<Tuple3<UUID, Byte, ECSignedPreKey>> keys = new ArrayList<>(2);
|
||||
|
||||
if (device.getSignedPreKey(IdentityType.ACI) != null) {
|
||||
keys.add(Tuples.of(account.getUuid(), device.getId(), device.getSignedPreKey(IdentityType.ACI)));
|
||||
|
||||
@@ -36,7 +36,7 @@ public class UnlinkDeviceCommand extends EnvironmentCommand<WhisperServerConfigu
|
||||
|
||||
subparser.addArgument("-d", "--deviceId")
|
||||
.dest("deviceIds")
|
||||
.type(Long.class)
|
||||
.type(Byte.class)
|
||||
.action(Arguments.append())
|
||||
.required(true);
|
||||
|
||||
@@ -57,7 +57,7 @@ public class UnlinkDeviceCommand extends EnvironmentCommand<WhisperServerConfigu
|
||||
commandStopListener.start();
|
||||
|
||||
final UUID aci = UUID.fromString(namespace.getString("uuid").trim());
|
||||
final List<Long> deviceIds = namespace.getList("deviceIds");
|
||||
final List<Byte> deviceIds = namespace.getList("deviceIds");
|
||||
|
||||
final CommandDependencies deps = CommandDependencies.build("unlink-device", environment, configuration);
|
||||
|
||||
@@ -68,7 +68,7 @@ public class UnlinkDeviceCommand extends EnvironmentCommand<WhisperServerConfigu
|
||||
throw new IllegalArgumentException("cannot delete primary device");
|
||||
}
|
||||
|
||||
for (long deviceId : deviceIds) {
|
||||
for (byte deviceId : deviceIds) {
|
||||
/** see {@link org.whispersystems.textsecuregcm.controllers.DeviceController#removeDevice} */
|
||||
System.out.format("Removing device %s::%d\n", aci, deviceId);
|
||||
account = deps.accountsManager().update(account, a -> a.removeDevice(deviceId));
|
||||
|
||||
@@ -55,7 +55,7 @@ message GetDevicesResponse {
|
||||
/**
|
||||
* The identifier for the device within an account.
|
||||
*/
|
||||
uint64 id = 1;
|
||||
uint32 id = 1;
|
||||
|
||||
/**
|
||||
* A sequence of bytes that encodes an encrypted human-readable name for
|
||||
@@ -86,7 +86,7 @@ message RemoveDeviceRequest {
|
||||
/**
|
||||
* The identifier for the device to remove from the authenticated account.
|
||||
*/
|
||||
uint64 id = 1;
|
||||
uint32 id = 1;
|
||||
}
|
||||
|
||||
message SetDeviceNameRequest {
|
||||
|
||||
@@ -154,7 +154,7 @@ message GetPreKeysRequest {
|
||||
* retrieve pre-keys. If not set, pre-keys are returned for all devices
|
||||
* associated with the targeted account.
|
||||
*/
|
||||
uint64 device_id = 2;
|
||||
uint32 device_id = 2;
|
||||
}
|
||||
|
||||
message GetPreKeysAnonymousRequest {
|
||||
@@ -199,7 +199,7 @@ message GetPreKeysResponse {
|
||||
/**
|
||||
* A map of device IDs to pre-key "bundles" for the targeted account.
|
||||
*/
|
||||
map<uint64, PreKeyBundle> pre_keys = 2;
|
||||
map<uint32, PreKeyBundle> pre_keys = 2;
|
||||
}
|
||||
|
||||
message SetOneTimeEcPreKeysRequest {
|
||||
@@ -276,4 +276,3 @@ message CheckIdentityKeyResponse {
|
||||
*/
|
||||
bytes identity_key = 2;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user