mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-20 07:28:06 +01:00
Use UUIDs instead of phone numbers as account identifiers in clustered message cache
This commit is contained in:
@@ -314,7 +314,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
MessagesCache messagesCache = new MessagesCache(messagesClient, messages, accountsManager, config.getMessageCacheConfiguration().getPersistDelayMinutes(), clusterMessagesCache);
|
||||
MessagesManager messagesManager = new MessagesManager(messages, messagesCache);
|
||||
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
|
||||
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(messagesManager);
|
||||
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(accountsManager, messagesManager);
|
||||
DispatchManager dispatchManager = new DispatchManager(pubSubClientFactory, Optional.of(deadLetterHandler));
|
||||
PubSubManager pubSubManager = new PubSubManager(pubsubClient, dispatchManager);
|
||||
APNSender apnSender = new APNSender(accountsManager, config.getApnConfiguration());
|
||||
|
||||
@@ -612,6 +612,8 @@ public class AccountController {
|
||||
}
|
||||
|
||||
private Account createAccount(String number, String password, String userAgent, AccountAttributes accountAttributes) {
|
||||
Optional<Account> maybeExistingAccount = accounts.get(number);
|
||||
|
||||
Device device = new Device();
|
||||
device.setId(Device.MASTER_ID);
|
||||
device.setAuthenticationCredentials(new AuthenticationCredentials(password));
|
||||
@@ -643,7 +645,7 @@ public class AccountController {
|
||||
directoryQueue.deleteRegisteredUser(account.getUuid(), number);
|
||||
}
|
||||
|
||||
messagesManager.clear(number);
|
||||
messagesManager.clear(number, maybeExistingAccount.map(Account::getUuid).orElse(null));
|
||||
pendingAccounts.remove(number);
|
||||
|
||||
return account;
|
||||
|
||||
@@ -117,7 +117,7 @@ public class DeviceController {
|
||||
directoryQueue.deleteRegisteredUser(account.getUuid(), account.getNumber());
|
||||
}
|
||||
|
||||
messages.clear(account.getNumber(), deviceId);
|
||||
messages.clear(account.getNumber(), account.getUuid(), deviceId);
|
||||
}
|
||||
|
||||
@Timed
|
||||
@@ -205,7 +205,7 @@ public class DeviceController {
|
||||
device.setCreated(System.currentTimeMillis());
|
||||
|
||||
account.get().addDevice(device);
|
||||
messages.clear(account.get().getNumber(), device.getId());
|
||||
messages.clear(account.get().getNumber(), account.get().getUuid(), device.getId());
|
||||
accounts.update(account.get());
|
||||
|
||||
pendingDevices.remove(number);
|
||||
|
||||
@@ -189,6 +189,7 @@ public class MessageController {
|
||||
}
|
||||
|
||||
return messagesManager.getMessagesForDevice(account.getNumber(),
|
||||
account.getUuid(),
|
||||
account.getAuthenticatedDevice().get().getId());
|
||||
}
|
||||
|
||||
@@ -203,6 +204,7 @@ public class MessageController {
|
||||
WebSocketConnection.messageTime.update(System.currentTimeMillis() - timestamp);
|
||||
|
||||
Optional<OutgoingMessageEntity> message = messagesManager.delete(account.getNumber(),
|
||||
account.getUuid(),
|
||||
account.getAuthenticatedDevice().get().getId(),
|
||||
source, timestamp);
|
||||
|
||||
@@ -222,6 +224,7 @@ public class MessageController {
|
||||
public void removePendingMessage(@Auth Account account, @PathParam("uuid") UUID uuid) {
|
||||
try {
|
||||
Optional<OutgoingMessageEntity> message = messagesManager.delete(account.getNumber(),
|
||||
account.getUuid(),
|
||||
account.getAuthenticatedDevice().get().getId(),
|
||||
uuid);
|
||||
|
||||
|
||||
@@ -96,7 +96,7 @@ public class WebsocketSender {
|
||||
|
||||
WebsocketAddress address = new WebsocketAddress(account.getNumber(), device.getId());
|
||||
|
||||
messagesManager.insert(account.getNumber(), device.getId(), message);
|
||||
messagesManager.insert(account.getNumber(), account.getUuid(), device.getId(), message);
|
||||
pubSubManager.publish(address, PubSubMessage.newBuilder()
|
||||
.setType(PubSubMessage.Type.QUERY_DB)
|
||||
.build());
|
||||
|
||||
@@ -89,14 +89,14 @@ public class MessagesCache implements Managed, UserMessagesCache {
|
||||
}
|
||||
|
||||
@Override
|
||||
public long insert(UUID guid, String destination, long destinationDevice, Envelope message) {
|
||||
public long insert(UUID guid, String destination, final UUID destinationUuid, long destinationDevice, Envelope message) {
|
||||
final Envelope messageWithGuid = message.toBuilder().setServerGuid(guid.toString()).build();
|
||||
|
||||
Timer.Context timer = insertTimer.time();
|
||||
|
||||
try {
|
||||
final long messageId = insertOperation.insert(guid, destination, destinationDevice, System.currentTimeMillis(), messageWithGuid);
|
||||
insertExperiment.compareSupplierResultAsync(messageId, () -> clusterMessagesCache.insert(guid, destination, destinationDevice, message, messageId), experimentExecutor);
|
||||
insertExperiment.compareSupplierResultAsync(messageId, () -> clusterMessagesCache.insert(guid, destination, destinationUuid, destinationDevice, message, messageId), experimentExecutor);
|
||||
|
||||
return messageId;
|
||||
} finally {
|
||||
@@ -105,7 +105,7 @@ public class MessagesCache implements Managed, UserMessagesCache {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<OutgoingMessageEntity> remove(String destination, long destinationDevice, long id) {
|
||||
public Optional<OutgoingMessageEntity> remove(String destination, final UUID destinationUuid, long destinationDevice, long id) {
|
||||
OutgoingMessageEntity removedMessageEntity = null;
|
||||
|
||||
try (Jedis jedis = jedisPool.getWriteResource();
|
||||
@@ -122,13 +122,13 @@ public class MessagesCache implements Managed, UserMessagesCache {
|
||||
|
||||
final Optional<OutgoingMessageEntity> maybeRemovedMessage = Optional.ofNullable(removedMessageEntity);
|
||||
|
||||
removeByIdExperiment.compareSupplierResultAsync(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationDevice, id), experimentExecutor);
|
||||
removeByIdExperiment.compareSupplierResultAsync(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationUuid, destinationDevice, id), experimentExecutor);
|
||||
|
||||
return maybeRemovedMessage;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<OutgoingMessageEntity> remove(String destination, long destinationDevice, String sender, long timestamp) {
|
||||
public Optional<OutgoingMessageEntity> remove(String destination, final UUID destinationUuid, long destinationDevice, String sender, long timestamp) {
|
||||
OutgoingMessageEntity removedMessageEntity = null;
|
||||
Timer.Context timer = removeByNameTimer.time();
|
||||
|
||||
@@ -146,13 +146,13 @@ public class MessagesCache implements Managed, UserMessagesCache {
|
||||
|
||||
final Optional<OutgoingMessageEntity> maybeRemovedMessage = Optional.ofNullable(removedMessageEntity);
|
||||
|
||||
removeBySenderExperiment.compareSupplierResultAsync(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationDevice, sender, timestamp), experimentExecutor);
|
||||
removeBySenderExperiment.compareSupplierResultAsync(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationUuid, destinationDevice, sender, timestamp), experimentExecutor);
|
||||
|
||||
return maybeRemovedMessage;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<OutgoingMessageEntity> remove(String destination, long destinationDevice, UUID guid) {
|
||||
public Optional<OutgoingMessageEntity> remove(String destination, final UUID destinationUuid, long destinationDevice, UUID guid) {
|
||||
OutgoingMessageEntity removedMessageEntity = null;
|
||||
Timer.Context timer = removeByGuidTimer.time();
|
||||
|
||||
@@ -170,13 +170,13 @@ public class MessagesCache implements Managed, UserMessagesCache {
|
||||
|
||||
final Optional<OutgoingMessageEntity> maybeRemovedMessage = Optional.ofNullable(removedMessageEntity);
|
||||
|
||||
removeByUuidExperiment.compareSupplierResultAsync(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationDevice, guid), experimentExecutor);
|
||||
removeByUuidExperiment.compareSupplierResultAsync(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationUuid, destinationDevice, guid), experimentExecutor);
|
||||
|
||||
return maybeRemovedMessage;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<OutgoingMessageEntity> get(String destination, long destinationDevice, int limit) {
|
||||
public List<OutgoingMessageEntity> get(String destination, final UUID destinationUuid, long destinationDevice, int limit) {
|
||||
Timer.Context timer = getTimer.time();
|
||||
|
||||
try {
|
||||
@@ -194,7 +194,7 @@ public class MessagesCache implements Managed, UserMessagesCache {
|
||||
}
|
||||
}
|
||||
|
||||
getMessagesExperiment.compareSupplierResultAsync(results, () -> clusterMessagesCache.get(destination, destinationDevice, limit), experimentExecutor);
|
||||
getMessagesExperiment.compareSupplierResultAsync(results, () -> clusterMessagesCache.get(destination, destinationUuid, destinationDevice, limit), experimentExecutor);
|
||||
|
||||
return results;
|
||||
} finally {
|
||||
@@ -203,12 +203,12 @@ public class MessagesCache implements Managed, UserMessagesCache {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear(String destination) {
|
||||
public void clear(String destination, final UUID destinationUuid) {
|
||||
Timer.Context timer = clearAccountTimer.time();
|
||||
|
||||
try {
|
||||
for (int i = 1; i < 255; i++) {
|
||||
clear(destination, i);
|
||||
clear(destination, destinationUuid, i);
|
||||
}
|
||||
} finally {
|
||||
timer.stop();
|
||||
@@ -216,7 +216,7 @@ public class MessagesCache implements Managed, UserMessagesCache {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear(String destination, long deviceId) {
|
||||
public void clear(String destination, final UUID destinationUuid, long deviceId) {
|
||||
Timer.Context timer = clearDeviceTimer.time();
|
||||
|
||||
try {
|
||||
|
||||
@@ -34,34 +34,34 @@ public class MessagesManager {
|
||||
this.messagesCache = messagesCache;
|
||||
}
|
||||
|
||||
public void insert(String destination, long destinationDevice, Envelope message) {
|
||||
public void insert(String destination, UUID destinationUuid, long destinationDevice, Envelope message) {
|
||||
UUID guid = UUID.randomUUID();
|
||||
messagesCache.insert(guid, destination, destinationDevice, message);
|
||||
messagesCache.insert(guid, destination, destinationUuid, destinationDevice, message);
|
||||
}
|
||||
|
||||
public OutgoingMessageEntityList getMessagesForDevice(String destination, long destinationDevice) {
|
||||
public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice) {
|
||||
List<OutgoingMessageEntity> messages = this.messages.load(destination, destinationDevice);
|
||||
|
||||
if (messages.size() <= Messages.RESULT_SET_CHUNK_SIZE) {
|
||||
messages.addAll(this.messagesCache.get(destination, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size()));
|
||||
messages.addAll(this.messagesCache.get(destination, destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size()));
|
||||
}
|
||||
|
||||
return new OutgoingMessageEntityList(messages, messages.size() >= Messages.RESULT_SET_CHUNK_SIZE);
|
||||
}
|
||||
|
||||
public void clear(String destination) {
|
||||
this.messagesCache.clear(destination);
|
||||
public void clear(String destination, UUID destinationUuid) {
|
||||
this.messagesCache.clear(destination, destinationUuid);
|
||||
this.messages.clear(destination);
|
||||
}
|
||||
|
||||
public void clear(String destination, long deviceId) {
|
||||
this.messagesCache.clear(destination, deviceId);
|
||||
public void clear(String destination, UUID destinationUuid, long deviceId) {
|
||||
this.messagesCache.clear(destination, destinationUuid, deviceId);
|
||||
this.messages.clear(destination, deviceId);
|
||||
}
|
||||
|
||||
public Optional<OutgoingMessageEntity> delete(String destination, long destinationDevice, String source, long timestamp)
|
||||
public Optional<OutgoingMessageEntity> delete(String destination, UUID destinationUuid, long destinationDevice, String source, long timestamp)
|
||||
{
|
||||
Optional<OutgoingMessageEntity> removed = this.messagesCache.remove(destination, destinationDevice, source, timestamp);
|
||||
Optional<OutgoingMessageEntity> removed = this.messagesCache.remove(destination, destinationUuid, destinationDevice, source, timestamp);
|
||||
|
||||
if (!removed.isPresent()) {
|
||||
removed = this.messages.remove(destination, destinationDevice, source, timestamp);
|
||||
@@ -73,8 +73,8 @@ public class MessagesManager {
|
||||
return removed;
|
||||
}
|
||||
|
||||
public Optional<OutgoingMessageEntity> delete(String destination, long deviceId, UUID guid) {
|
||||
Optional<OutgoingMessageEntity> removed = this.messagesCache.remove(destination, deviceId, guid);
|
||||
public Optional<OutgoingMessageEntity> delete(String destination, UUID destinationUuid, long deviceId, UUID guid) {
|
||||
Optional<OutgoingMessageEntity> removed = this.messagesCache.remove(destination, destinationUuid, deviceId, guid);
|
||||
|
||||
if (!removed.isPresent()) {
|
||||
removed = this.messages.remove(destination, guid);
|
||||
@@ -86,9 +86,9 @@ public class MessagesManager {
|
||||
return removed;
|
||||
}
|
||||
|
||||
public void delete(String destination, long deviceId, long id, boolean cached) {
|
||||
public void delete(String destination, UUID destinationUuid, long deviceId, long id, boolean cached) {
|
||||
if (cached) {
|
||||
this.messagesCache.remove(destination, deviceId, id);
|
||||
this.messagesCache.remove(destination, destinationUuid, deviceId, id);
|
||||
cacheHitByIdMeter.mark();
|
||||
} else {
|
||||
this.messages.remove(destination, id);
|
||||
|
||||
@@ -53,28 +53,28 @@ public class RedisClusterMessagesCache implements UserMessagesCache {
|
||||
}
|
||||
|
||||
@Override
|
||||
public long insert(final UUID guid, final String destination, final long destinationDevice, final MessageProtos.Envelope message) {
|
||||
public long insert(final UUID guid, final String destination, final UUID destinationUuid, final long destinationDevice, final MessageProtos.Envelope message) {
|
||||
final MessageProtos.Envelope messageWithGuid = message.toBuilder().setServerGuid(guid.toString()).build();
|
||||
final String sender = message.hasSource() ? (message.getSource() + "::" + message.getTimestamp()) : "nil";
|
||||
|
||||
return (long)Metrics.timer(INSERT_TIMER_NAME).record(() ->
|
||||
insertScript.executeBinary(List.of(getMessageQueueKey(destination, destinationDevice),
|
||||
getMessageQueueMetadataKey(destination, destinationDevice),
|
||||
getQueueIndexKey(destination, destinationDevice)),
|
||||
insertScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
|
||||
getMessageQueueMetadataKey(destinationUuid, destinationDevice),
|
||||
getQueueIndexKey(destinationUuid, destinationDevice)),
|
||||
List.of(messageWithGuid.toByteArray(),
|
||||
String.valueOf(message.getTimestamp()).getBytes(StandardCharsets.UTF_8),
|
||||
sender.getBytes(StandardCharsets.UTF_8),
|
||||
guid.toString().getBytes(StandardCharsets.UTF_8))));
|
||||
}
|
||||
|
||||
public long insert(final UUID guid, final String destination, final long destinationDevice, final MessageProtos.Envelope message, final long messageId) {
|
||||
public long insert(final UUID guid, final String destination, final UUID destinationUuid, final long destinationDevice, final MessageProtos.Envelope message, final long messageId) {
|
||||
final MessageProtos.Envelope messageWithGuid = message.toBuilder().setServerGuid(guid.toString()).build();
|
||||
final String sender = message.hasSource() ? (message.getSource() + "::" + message.getTimestamp()) : "nil";
|
||||
|
||||
return (long)Metrics.timer(INSERT_TIMER_NAME).record(() ->
|
||||
insertScript.executeBinary(List.of(getMessageQueueKey(destination, destinationDevice),
|
||||
getMessageQueueMetadataKey(destination, destinationDevice),
|
||||
getQueueIndexKey(destination, destinationDevice)),
|
||||
insertScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
|
||||
getMessageQueueMetadataKey(destinationUuid, destinationDevice),
|
||||
getQueueIndexKey(destinationUuid, destinationDevice)),
|
||||
List.of(messageWithGuid.toByteArray(),
|
||||
String.valueOf(message.getTimestamp()).getBytes(StandardCharsets.UTF_8),
|
||||
sender.getBytes(StandardCharsets.UTF_8),
|
||||
@@ -83,12 +83,12 @@ public class RedisClusterMessagesCache implements UserMessagesCache {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<OutgoingMessageEntity> remove(final String destination, final long destinationDevice, final long id) {
|
||||
public Optional<OutgoingMessageEntity> remove(final String destination, final UUID destinationUuid, final long destinationDevice, final long id) {
|
||||
try {
|
||||
final byte[] serialized = (byte[])Metrics.timer(REMOVE_TIMER_NAME, REMOVE_METHOD_TAG, REMOVE_METHOD_ID).record(() ->
|
||||
removeByIdScript.executeBinary(List.of(getMessageQueueKey(destination, destinationDevice),
|
||||
getMessageQueueMetadataKey(destination, destinationDevice),
|
||||
getQueueIndexKey(destination, destinationDevice)),
|
||||
removeByIdScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
|
||||
getMessageQueueMetadataKey(destinationUuid, destinationDevice),
|
||||
getQueueIndexKey(destinationUuid, destinationDevice)),
|
||||
List.of(String.valueOf(id).getBytes(StandardCharsets.UTF_8))));
|
||||
|
||||
|
||||
@@ -103,12 +103,12 @@ public class RedisClusterMessagesCache implements UserMessagesCache {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<OutgoingMessageEntity> remove(final String destination, final long destinationDevice, final String sender, final long timestamp) {
|
||||
public Optional<OutgoingMessageEntity> remove(final String destination, final UUID destinationUuid, final long destinationDevice, final String sender, final long timestamp) {
|
||||
try {
|
||||
final byte[] serialized = (byte[])Metrics.timer(REMOVE_TIMER_NAME, REMOVE_METHOD_TAG, REMOVE_METHOD_SENDER).record(() ->
|
||||
removeBySenderScript.executeBinary(List.of(getMessageQueueKey(destination, destinationDevice),
|
||||
getMessageQueueMetadataKey(destination, destinationDevice),
|
||||
getQueueIndexKey(destination, destinationDevice)),
|
||||
removeBySenderScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
|
||||
getMessageQueueMetadataKey(destinationUuid, destinationDevice),
|
||||
getQueueIndexKey(destinationUuid, destinationDevice)),
|
||||
List.of((sender + "::" + timestamp).getBytes(StandardCharsets.UTF_8))));
|
||||
|
||||
if (serialized != null) {
|
||||
@@ -122,12 +122,12 @@ public class RedisClusterMessagesCache implements UserMessagesCache {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<OutgoingMessageEntity> remove(final String destination, final long destinationDevice, final UUID guid) {
|
||||
public Optional<OutgoingMessageEntity> remove(final String destination, final UUID destinationUuid, final long destinationDevice, final UUID guid) {
|
||||
try {
|
||||
final byte[] serialized = (byte[])Metrics.timer(REMOVE_TIMER_NAME, REMOVE_METHOD_TAG, REMOVE_METHOD_UUID).record(() ->
|
||||
removeByGuidScript.executeBinary(List.of(getMessageQueueKey(destination, destinationDevice),
|
||||
getMessageQueueMetadataKey(destination, destinationDevice),
|
||||
getQueueIndexKey(destination, destinationDevice)),
|
||||
removeByGuidScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
|
||||
getMessageQueueMetadataKey(destinationUuid, destinationDevice),
|
||||
getQueueIndexKey(destinationUuid, destinationDevice)),
|
||||
List.of(guid.toString().getBytes(StandardCharsets.UTF_8))));
|
||||
|
||||
if (serialized != null) {
|
||||
@@ -142,10 +142,10 @@ public class RedisClusterMessagesCache implements UserMessagesCache {
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public List<OutgoingMessageEntity> get(String destination, long destinationDevice, int limit) {
|
||||
public List<OutgoingMessageEntity> get(String destination, final UUID destinationUuid, long destinationDevice, int limit) {
|
||||
return Metrics.timer(GET_TIMER_NAME).record(() -> {
|
||||
final List<byte[]> queueItems = (List<byte[]>)getItemsScript.executeBinary(List.of(getMessageQueueKey(destination, destinationDevice),
|
||||
getPersistInProgressKey(destination, destinationDevice)),
|
||||
final List<byte[]> queueItems = (List<byte[]>)getItemsScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
|
||||
getPersistInProgressKey(destinationUuid, destinationDevice)),
|
||||
List.of(String.valueOf(limit).getBytes()));
|
||||
|
||||
final List<OutgoingMessageEntity> messageEntities;
|
||||
@@ -173,34 +173,37 @@ public class RedisClusterMessagesCache implements UserMessagesCache {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear(final String destination) {
|
||||
for (int i = 1; i < 256; i++) {
|
||||
clear(destination, i);
|
||||
public void clear(final String destination, final UUID destinationUuid) {
|
||||
// TODO Remove null check in a fully UUID-based world
|
||||
if (destinationUuid != null) {
|
||||
for (int i = 1; i < 256; i++) {
|
||||
clear(destination, destinationUuid, i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear(final String destination, final long deviceId) {
|
||||
public void clear(final String destination, final UUID destinationUuid, final long deviceId) {
|
||||
Metrics.timer(CLEAR_TIMER_NAME).record(() ->
|
||||
removeQueueScript.executeBinary(List.of(getMessageQueueKey(destination, deviceId),
|
||||
getMessageQueueMetadataKey(destination, deviceId),
|
||||
getQueueIndexKey(destination, deviceId)),
|
||||
removeQueueScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, deviceId),
|
||||
getMessageQueueMetadataKey(destinationUuid, deviceId),
|
||||
getQueueIndexKey(destinationUuid, deviceId)),
|
||||
Collections.emptyList()));
|
||||
}
|
||||
|
||||
private static byte[] getMessageQueueKey(final String address, final long deviceId) {
|
||||
return ("user_queue::{" + address + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8);
|
||||
private static byte[] getMessageQueueKey(final UUID accountUuid, final long deviceId) {
|
||||
return ("user_queue::{" + accountUuid.toString() + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
private static byte[] getMessageQueueMetadataKey(final String address, final long deviceId) {
|
||||
return ("user_queue_metadata::{" + address + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8);
|
||||
private static byte[] getMessageQueueMetadataKey(final UUID accountUuid, final long deviceId) {
|
||||
return ("user_queue_metadata::{" + accountUuid.toString() + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
private byte[] getQueueIndexKey(final String address, final long deviceId) {
|
||||
return ("user_queue_index::{" + RedisClusterUtil.getMinimalHashTag(address + "::" + deviceId) + "}").getBytes(StandardCharsets.UTF_8);
|
||||
private byte[] getQueueIndexKey(final UUID accountUuid, final long deviceId) {
|
||||
return ("user_queue_index::{" + RedisClusterUtil.getMinimalHashTag(accountUuid.toString() + "::" + deviceId) + "}").getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
private byte[] getPersistInProgressKey(final String address, final long deviceId) {
|
||||
return ("user_queue_persisting::{" + address + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8);
|
||||
private byte[] getPersistInProgressKey(final UUID accountUuid, final long deviceId) {
|
||||
return ("user_queue_persisting::{" + accountUuid.toString() + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@ package org.whispersystems.textsecuregcm.storage;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
||||
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
|
||||
import org.whispersystems.textsecuregcm.push.PushSender;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
@@ -25,17 +24,17 @@ public interface UserMessagesCache {
|
||||
envelope.hasServerTimestamp() ? envelope.getServerTimestamp() : 0);
|
||||
}
|
||||
|
||||
long insert(UUID guid, String destination, long destinationDevice, MessageProtos.Envelope message);
|
||||
long insert(UUID guid, String destination, UUID destinationUuid, long destinationDevice, MessageProtos.Envelope message);
|
||||
|
||||
Optional<OutgoingMessageEntity> remove(String destination, long destinationDevice, long id);
|
||||
Optional<OutgoingMessageEntity> remove(String destination, UUID destinationUuid, long destinationDevice, long id);
|
||||
|
||||
Optional<OutgoingMessageEntity> remove(String destination, long destinationDevice, String sender, long timestamp);
|
||||
Optional<OutgoingMessageEntity> remove(String destination, UUID destinationUuid, long destinationDevice, String sender, long timestamp);
|
||||
|
||||
Optional<OutgoingMessageEntity> remove(String destination, long destinationDevice, UUID guid);
|
||||
Optional<OutgoingMessageEntity> remove(String destination, UUID destinationUuid, long destinationDevice, UUID guid);
|
||||
|
||||
List<OutgoingMessageEntity> get(String destination, long destinationDevice, int limit);
|
||||
List<OutgoingMessageEntity> get(String destination, UUID destinationUuid, long destinationDevice, int limit);
|
||||
|
||||
void clear(String destination);
|
||||
void clear(String destination, UUID destinationUuid);
|
||||
|
||||
void clear(String destination, long deviceId);
|
||||
void clear(String destination, UUID destinationUuid, long deviceId);
|
||||
}
|
||||
|
||||
@@ -7,20 +7,26 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.dispatch.DispatchChannel;
|
||||
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||
import org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
public class DeadLetterHandler implements DispatchChannel {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(DeadLetterHandler.class);
|
||||
|
||||
private final AccountsManager accountsManager;
|
||||
private final MessagesManager messagesManager;
|
||||
|
||||
private final Counter deadLetterCounter = Metrics.counter(name(getClass(), "deadLetterCounter"));
|
||||
|
||||
public DeadLetterHandler(MessagesManager messagesManager) {
|
||||
public DeadLetterHandler(AccountsManager accountsManager, MessagesManager messagesManager) {
|
||||
this.accountsManager = accountsManager;
|
||||
this.messagesManager = messagesManager;
|
||||
}
|
||||
|
||||
@@ -35,8 +41,15 @@ public class DeadLetterHandler implements DispatchChannel {
|
||||
|
||||
switch (pubSubMessage.getType().getNumber()) {
|
||||
case PubSubMessage.Type.DELIVER_VALUE:
|
||||
Envelope message = Envelope.parseFrom(pubSubMessage.getContent());
|
||||
messagesManager.insert(address.getNumber(), address.getDeviceId(), message);
|
||||
Envelope message = Envelope.parseFrom(pubSubMessage.getContent());
|
||||
Optional<Account> maybeAccount = accountsManager.get(address.getNumber());
|
||||
|
||||
if (maybeAccount.isPresent()) {
|
||||
messagesManager.insert(address.getNumber(), maybeAccount.get().getUuid(), address.getDeviceId(), message);
|
||||
} else {
|
||||
logger.warn("Dead letter for account that no longer exists: {}", address);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
|
||||
@@ -129,7 +129,7 @@ public class WebSocketConnection implements DispatchChannel {
|
||||
}
|
||||
|
||||
if (isSuccessResponse(response)) {
|
||||
if (storedMessageInfo.isPresent()) messagesManager.delete(account.getNumber(), device.getId(), storedMessageInfo.get().id, storedMessageInfo.get().cached);
|
||||
if (storedMessageInfo.isPresent()) messagesManager.delete(account.getNumber(), account.getUuid(), device.getId(), storedMessageInfo.get().id, storedMessageInfo.get().cached);
|
||||
if (!isReceipt) sendDeliveryReceiptFor(message);
|
||||
if (requery) processStoredMessages();
|
||||
} else if (!isSuccessResponse(response) && !storedMessageInfo.isPresent()) {
|
||||
@@ -172,7 +172,7 @@ public class WebSocketConnection implements DispatchChannel {
|
||||
}
|
||||
|
||||
private void processStoredMessages() {
|
||||
OutgoingMessageEntityList messages = messagesManager.getMessagesForDevice(account.getNumber(), device.getId());
|
||||
OutgoingMessageEntityList messages = messagesManager.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId());
|
||||
Iterator<OutgoingMessageEntity> iterator = messages.getMessages().iterator();
|
||||
|
||||
while (iterator.hasNext()) {
|
||||
|
||||
Reference in New Issue
Block a user