mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-20 15:28:05 +01:00
Use refreshing AuthenticatedAccount for @Auth
This commit is contained in:
@@ -1,32 +1,31 @@
|
||||
/*
|
||||
* Copyright 2013-2020 Signal Messenger, LLC
|
||||
* Copyright 2013-2021 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.websocket;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
import com.codahale.metrics.Counter;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
|
||||
import org.whispersystems.textsecuregcm.push.ApnFallbackManager;
|
||||
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
|
||||
import org.whispersystems.textsecuregcm.push.MessageSender;
|
||||
import org.whispersystems.textsecuregcm.push.ReceiptSender;
|
||||
import org.whispersystems.textsecuregcm.redis.RedisOperation;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
import org.whispersystems.websocket.session.WebSocketSessionContext;
|
||||
import org.whispersystems.websocket.setup.WebSocketConnectListener;
|
||||
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
public class AuthenticatedConnectListener implements WebSocketConnectListener {
|
||||
|
||||
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
@@ -60,16 +59,16 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
|
||||
@Override
|
||||
public void onWebSocketConnect(WebSocketSessionContext context) {
|
||||
if (context.getAuthenticated() != null) {
|
||||
final Account account = context.getAuthenticated(Account.class);
|
||||
final Device device = account.getAuthenticatedDevice().get();
|
||||
final Timer.Context timer = durationTimer.time();
|
||||
final WebSocketConnection connection = new WebSocketConnection(receiptSender,
|
||||
messagesManager, account, device,
|
||||
context.getClient(),
|
||||
retrySchedulingExecutor);
|
||||
final AuthenticatedAccount auth = context.getAuthenticated(AuthenticatedAccount.class);
|
||||
final Device device = auth.getAuthenticatedDevice();
|
||||
final Timer.Context timer = durationTimer.time();
|
||||
final WebSocketConnection connection = new WebSocketConnection(receiptSender,
|
||||
messagesManager, auth, device,
|
||||
context.getClient(),
|
||||
retrySchedulingExecutor);
|
||||
|
||||
openWebsocketCounter.inc();
|
||||
RedisOperation.unchecked(() -> apnFallbackManager.cancel(account, device));
|
||||
RedisOperation.unchecked(() -> apnFallbackManager.cancel(auth.getAccount(), device));
|
||||
|
||||
context.addListener(new WebSocketSessionContext.WebSocketEventListener() {
|
||||
@Override
|
||||
@@ -79,20 +78,21 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
|
||||
|
||||
connection.stop();
|
||||
|
||||
RedisOperation.unchecked(() -> clientPresenceManager.clearPresence(account.getUuid(), device.getId()));
|
||||
RedisOperation.unchecked(
|
||||
() -> clientPresenceManager.clearPresence(auth.getAccount().getUuid(), device.getId()));
|
||||
RedisOperation.unchecked(() -> {
|
||||
messagesManager.removeMessageAvailabilityListener(connection);
|
||||
|
||||
if (messagesManager.hasCachedMessages(account.getUuid(), device.getId())) {
|
||||
messageSender.sendNewMessageNotification(account, device);
|
||||
if (messagesManager.hasCachedMessages(auth.getAccount().getUuid(), device.getId())) {
|
||||
messageSender.sendNewMessageNotification(auth.getAccount(), device);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
clientPresenceManager.setPresent(account.getUuid(), device.getId(), connection);
|
||||
messagesManager.addMessageAvailabilityListener(account.getUuid(), device.getId(), connection);
|
||||
clientPresenceManager.setPresent(auth.getAccount().getUuid(), device.getId(), connection);
|
||||
messagesManager.addMessageAvailabilityListener(auth.getAccount().getUuid(), device.getId(), connection);
|
||||
connection.start();
|
||||
} catch (final Exception e) {
|
||||
log.warn("Failed to initialize websocket", e);
|
||||
|
||||
@@ -1,23 +1,21 @@
|
||||
/*
|
||||
* Copyright 2013-2020 Signal Messenger, LLC
|
||||
* Copyright 2013-2021 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.websocket;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.UpgradeRequest;
|
||||
import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.websocket.auth.WebSocketAuthenticator;
|
||||
|
||||
import io.dropwizard.auth.basic.BasicCredentials;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import io.dropwizard.auth.basic.BasicCredentials;
|
||||
import org.eclipse.jetty.websocket.api.UpgradeRequest;
|
||||
import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
|
||||
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
|
||||
import org.whispersystems.websocket.auth.WebSocketAuthenticator;
|
||||
|
||||
|
||||
public class WebSocketAccountAuthenticator implements WebSocketAuthenticator<Account> {
|
||||
public class WebSocketAccountAuthenticator implements WebSocketAuthenticator<AuthenticatedAccount> {
|
||||
|
||||
private final AccountAuthenticator accountAuthenticator;
|
||||
|
||||
@@ -26,19 +24,18 @@ public class WebSocketAccountAuthenticator implements WebSocketAuthenticator<Acc
|
||||
}
|
||||
|
||||
@Override
|
||||
public AuthenticationResult<Account> authenticate(UpgradeRequest request) {
|
||||
public AuthenticationResult<AuthenticatedAccount> authenticate(UpgradeRequest request) {
|
||||
Map<String, List<String>> parameters = request.getParameterMap();
|
||||
List<String> usernames = parameters.get("login");
|
||||
List<String> passwords = parameters.get("password");
|
||||
List<String> usernames = parameters.get("login");
|
||||
List<String> passwords = parameters.get("password");
|
||||
|
||||
if (usernames == null || usernames.size() == 0 ||
|
||||
passwords == null || passwords.size() == 0)
|
||||
{
|
||||
passwords == null || passwords.size() == 0) {
|
||||
return new AuthenticationResult<>(Optional.empty(), false);
|
||||
}
|
||||
|
||||
BasicCredentials credentials = new BasicCredentials(usernames.get(0).replace(" ", "+"),
|
||||
passwords.get(0).replace(" ", "+"));
|
||||
passwords.get(0).replace(" ", "+"));
|
||||
|
||||
return new AuthenticationResult<>(accountAuthenticator.authenticate(credentials), true);
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2013-2020 Signal Messenger, LLC
|
||||
* Copyright 2013-2021 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
@@ -36,6 +36,7 @@ import javax.ws.rs.WebApplicationException;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
|
||||
import org.whispersystems.textsecuregcm.controllers.MessageController;
|
||||
import org.whispersystems.textsecuregcm.controllers.NoSuchUserException;
|
||||
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
|
||||
@@ -43,7 +44,6 @@ import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList;
|
||||
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
|
||||
import org.whispersystems.textsecuregcm.push.DisplacedPresenceListener;
|
||||
import org.whispersystems.textsecuregcm.push.ReceiptSender;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.storage.MessageAvailabilityListener;
|
||||
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||
@@ -90,21 +90,22 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
|
||||
|
||||
private final ReceiptSender receiptSender;
|
||||
private final MessagesManager messagesManager;
|
||||
private final ReceiptSender receiptSender;
|
||||
private final MessagesManager messagesManager;
|
||||
|
||||
private final Account account;
|
||||
private final Device device;
|
||||
private final WebSocketClient client;
|
||||
private final AuthenticatedAccount auth;
|
||||
private final Device device;
|
||||
private final WebSocketClient client;
|
||||
private final ScheduledExecutorService retrySchedulingExecutor;
|
||||
|
||||
private final boolean isDesktopClient;
|
||||
private final boolean isDesktopClient;
|
||||
|
||||
private final Semaphore processStoredMessagesSemaphore = new Semaphore(1);
|
||||
private final AtomicReference<StoredMessageState> storedMessageState = new AtomicReference<>(StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE);
|
||||
private final AtomicBoolean sentInitialQueueEmptyMessage = new AtomicBoolean(false);
|
||||
private final LongAdder sentMessageCounter = new LongAdder();
|
||||
private final AtomicLong queueDrainStartTime = new AtomicLong();
|
||||
private final Semaphore processStoredMessagesSemaphore = new Semaphore(1);
|
||||
private final AtomicReference<StoredMessageState> storedMessageState = new AtomicReference<>(
|
||||
StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE);
|
||||
private final AtomicBoolean sentInitialQueueEmptyMessage = new AtomicBoolean(false);
|
||||
private final LongAdder sentMessageCounter = new LongAdder();
|
||||
private final AtomicLong queueDrainStartTime = new AtomicLong();
|
||||
private final AtomicInteger consecutiveRetries = new AtomicInteger();
|
||||
private final AtomicReference<ScheduledFuture<?>> retryFuture = new AtomicReference<>();
|
||||
|
||||
@@ -118,16 +119,15 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||
|
||||
public WebSocketConnection(ReceiptSender receiptSender,
|
||||
MessagesManager messagesManager,
|
||||
Account account,
|
||||
AuthenticatedAccount auth,
|
||||
Device device,
|
||||
WebSocketClient client,
|
||||
ScheduledExecutorService retrySchedulingExecutor)
|
||||
{
|
||||
this.receiptSender = receiptSender;
|
||||
ScheduledExecutorService retrySchedulingExecutor) {
|
||||
this.receiptSender = receiptSender;
|
||||
this.messagesManager = messagesManager;
|
||||
this.account = account;
|
||||
this.device = device;
|
||||
this.client = client;
|
||||
this.auth = auth;
|
||||
this.device = device;
|
||||
this.client = client;
|
||||
this.retrySchedulingExecutor = retrySchedulingExecutor;
|
||||
|
||||
Optional<ClientPlatform> maybePlatform;
|
||||
@@ -168,7 +168,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||
if (throwable == null) {
|
||||
if (isSuccessResponse(response)) {
|
||||
if (storedMessageInfo.isPresent()) {
|
||||
messagesManager.delete(account.getUuid(), device.getId(), storedMessageInfo.get().getGuid());
|
||||
messagesManager.delete(auth.getAccount().getUuid(), device.getId(), storedMessageInfo.get().getGuid());
|
||||
}
|
||||
|
||||
if (message.getType() != Envelope.Type.SERVER_DELIVERY_RECEIPT) {
|
||||
@@ -204,7 +204,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||
if (!message.hasSource()) return;
|
||||
|
||||
try {
|
||||
receiptSender.sendReceipt(account, message.getSource(), message.getTimestamp());
|
||||
receiptSender.sendReceipt(auth, message.getSource(), message.getTimestamp());
|
||||
} catch (NoSuchUserException e) {
|
||||
logger.info("No longer registered " + e.getMessage());
|
||||
} catch (WebApplicationException e) {
|
||||
@@ -267,7 +267,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||
private void sendNextMessagePage(final boolean cachedMessagesOnly, final CompletableFuture<Void> queueClearedFuture) {
|
||||
try {
|
||||
final OutgoingMessageEntityList messages = messagesManager
|
||||
.getMessagesForDevice(account.getUuid(), device.getId(), client.getUserAgent(), cachedMessagesOnly);
|
||||
.getMessagesForDevice(auth.getAccount().getUuid(), device.getId(), client.getUserAgent(), cachedMessagesOnly);
|
||||
|
||||
final CompletableFuture<?>[] sendFutures = new CompletableFuture[messages.getMessages().size()];
|
||||
|
||||
@@ -303,7 +303,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||
final Envelope envelope = builder.build();
|
||||
|
||||
if (envelope.getSerializedSize() > MAX_DESKTOP_MESSAGE_SIZE && isDesktopClient) {
|
||||
messagesManager.delete(account.getUuid(), device.getId(), message.getGuid());
|
||||
messagesManager.delete(auth.getAccount().getUuid(), device.getId(), message.getGuid());
|
||||
discardedMessagesMeter.mark();
|
||||
|
||||
sendFutures[i] = CompletableFuture.completedFuture(null);
|
||||
@@ -340,7 +340,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||
public void handleNewEphemeralMessageAvailable() {
|
||||
ephemeralMessageAvailableMeter.mark();
|
||||
|
||||
messagesManager.takeEphemeralMessage(account.getUuid(), device.getId())
|
||||
messagesManager.takeEphemeralMessage(auth.getAccount().getUuid(), device.getId())
|
||||
.ifPresent(message -> sendMessage(message, Optional.empty()));
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user