Ignore failures to update cache after a read

This commit is contained in:
ravi-signal
2025-12-10 16:21:18 -06:00
committed by GitHub
parent 1913dbf6f9
commit fecb032d8f
4 changed files with 71 additions and 21 deletions

View File

@@ -1310,7 +1310,11 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
Optional<Account> account = resolveFromRedis.get();
if (account.isEmpty()) {
account = resolveFromAccounts.get();
account.ifPresent(this::redisSet);
try {
account.ifPresent(this::redisSet);
} catch (RedisException e) {
logger.warn("Failed to cache retrieved account", e);
}
}
return account;
});
@@ -1328,7 +1332,12 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
.map(accountFromRedis -> CompletableFuture.completedFuture(maybeAccountFromRedis))
.orElseGet(() -> resolveFromAccounts.get()
.thenCompose(maybeAccountFromAccounts -> maybeAccountFromAccounts
.map(account -> redisSetAsync(account).thenApply(ignored -> maybeAccountFromAccounts))
.map(account -> redisSetAsync(account)
.exceptionally(ExceptionUtils.exceptionallyHandler(RedisException.class, e -> {
logger.warn("Failed to cache retrieved account", e);
return null;
}))
.thenApply(ignored -> maybeAccountFromAccounts))
.orElseGet(() -> CompletableFuture.completedFuture(maybeAccountFromAccounts)))))
.whenComplete((ignored, throwable) -> sample.stop(overallTimer));
}
@@ -1343,7 +1352,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
logger.warn("Deserialization error", e);
return Optional.empty();
} catch (RedisException e) {
logger.warn("Redis failure", e);
logger.warn("Failed fetching account from cache by secondary key", e);
return Optional.empty();
}
});
@@ -1375,7 +1384,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
return parseAccountJson(json, uuid);
} catch (final RedisException e) {
logger.warn("Redis failure", e);
logger.warn("Failed to retrieve account from cache", e);
return Optional.empty();
}
});

View File

@@ -22,6 +22,7 @@ import io.micrometer.core.instrument.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import org.whispersystems.textsecuregcm.util.ResilienceUtil;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.Util;
@@ -114,7 +115,11 @@ public class ProfilesManager {
if (profile.isEmpty()) {
profile = profiles.get(uuid, version);
profile.ifPresent(versionedProfile -> redisSet(uuid, versionedProfile));
try {
profile.ifPresent(versionedProfile -> redisSet(uuid, versionedProfile));
} catch (RedisException e) {
logger.warn("Failed to cache retrieved profile", e);
}
}
return profile;
@@ -126,7 +131,12 @@ public class ProfilesManager {
.map(versionedProfile -> CompletableFuture.completedFuture(maybeVersionedProfile))
.orElseGet(() -> profiles.getAsync(uuid, version)
.thenCompose(maybeVersionedProfileFromDynamo -> maybeVersionedProfileFromDynamo
.map(profile -> redisSetAsync(uuid, profile).thenApply(ignored -> maybeVersionedProfileFromDynamo))
.map(profile -> redisSetAsync(uuid, profile)
.exceptionally(ExceptionUtils.exceptionallyHandler(RedisException.class, e -> {
logger.warn("Failed to cache retrieved profile", e);
return null;
}))
.thenApply(ignored -> maybeVersionedProfileFromDynamo))
.orElseGet(() -> CompletableFuture.completedFuture(maybeVersionedProfileFromDynamo)))));
}
@@ -161,7 +171,7 @@ public class ProfilesManager {
return parseProfileJson(json);
} catch (RedisException e) {
logger.warn("Redis exception", e);
logger.warn("Failed to retrieve profile from cache", e);
return Optional.empty();
}
}

View File

@@ -70,6 +70,7 @@ import org.junit.jupiter.api.function.Executable;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.stubbing.Answer;
@@ -508,13 +509,25 @@ class AccountsManagerTest {
verifyNoMoreInteractions(accounts);
}
@Test
void testGetAccountByUuidBrokenCache() {
enum FailureStep {
GET,
SET_ACI,
SET_PNI
}
@ParameterizedTest
@EnumSource(FailureStep.class)
void testGetAccountByUuidBrokenCache(final FailureStep step) {
UUID uuid = UUID.randomUUID();
UUID pni = UUID.randomUUID();
Account account = AccountsHelper.generateTestAccount("+14152222222", uuid, pni, new ArrayList<>(), new byte[UnidentifiedAccessUtil.UNIDENTIFIED_ACCESS_KEY_LENGTH]);
when(clusterCommands.get(eq("Account3::" + uuid))).thenThrow(new RedisException("Connection lost!"));
(switch (step) {
case GET -> when(clusterCommands.get(eq("Account3::" + uuid)));
case SET_ACI -> when(clusterCommands.setex(eq("Account3::" + uuid), anyLong(), anyString()));
case SET_PNI -> when(clusterCommands.setex(eq("AccountMap::" + pni), anyLong(), eq(uuid.toString())));
}).thenThrow(new RedisException("Connection lost!"));
when(accounts.getByAccountIdentifier(eq(uuid))).thenReturn(Optional.of(account));
Optional<Account> retrieved = accountsManager.getByAccountIdentifier(uuid);
@@ -524,27 +537,35 @@ class AccountsManagerTest {
verify(clusterCommands, times(1)).get(eq("Account3::" + uuid));
verify(clusterCommands, times(1)).setex(eq("AccountMap::" + pni), anyLong(), eq(uuid.toString()));
verify(clusterCommands, times(1)).setex(eq("Account3::" + uuid), anyLong(), anyString());
// we only try setting the ACI if we successfully set the PNI
verify(clusterCommands, times(step == FailureStep.SET_PNI ? 0 : 1))
.setex(eq("Account3::" + uuid), anyLong(), anyString());
verifyNoMoreInteractions(clusterCommands);
verify(accounts, times(1)).getByAccountIdentifier(eq(uuid));
verifyNoMoreInteractions(accounts);
}
@Test
void testGetAccountByUuidBrokenCacheAsync() {
@ParameterizedTest
@EnumSource(FailureStep.class)
void testGetAccountByUuidBrokenCacheAsync(final FailureStep step) {
UUID uuid = UUID.randomUUID();
UUID pni = UUID.randomUUID();
Account account = AccountsHelper.generateTestAccount("+14152222222", uuid, pni, new ArrayList<>(), new byte[UnidentifiedAccessUtil.UNIDENTIFIED_ACCESS_KEY_LENGTH]);
when(asyncClusterCommands.get(eq("Account3::" + uuid)))
.thenReturn(MockRedisFuture.failedFuture(new RedisException("Connection lost!")));
.thenReturn(MockRedisFuture.completedFuture(null));
when(asyncClusterCommands.setex(any(), anyLong(), any())).thenReturn(MockRedisFuture.completedFuture("OK"));
when(accounts.getByAccountIdentifierAsync(eq(uuid)))
.thenReturn(CompletableFuture.completedFuture(Optional.of(account)));
(switch (step) {
case GET -> when(asyncClusterCommands.get(eq("Account3::" + uuid)));
case SET_ACI -> when(asyncClusterCommands.setex(eq("Account3::" + uuid), anyLong(), anyString()));
case SET_PNI -> when(asyncClusterCommands.setex(eq("AccountMap::" + pni), anyLong(), eq(uuid.toString())));
}).thenReturn(MockRedisFuture.failedFuture(new RedisException("Connection lost!")));
Optional<Account> retrieved = accountsManager.getByAccountIdentifierAsync(uuid).join();
assertTrue(retrieved.isPresent());

View File

@@ -31,6 +31,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.signal.libsignal.protocol.ServiceId;
import org.signal.libsignal.zkgroup.InvalidInputException;
@@ -163,14 +164,19 @@ public class ProfilesManagerTest {
verifyNoMoreInteractions(profiles);
}
@Test
public void testGetProfileBrokenCache() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testGetProfileBrokenCache(final boolean failUpdateCache) {
final UUID uuid = UUID.randomUUID();
final byte[] name = TestRandomUtil.nextBytes(81);
final VersionedProfile profile = new VersionedProfile("someversion", name, "someavatar", null, null,
null, null, "somecommitment".getBytes());
when(commands.hget(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"))).thenThrow(new RedisException("Connection lost"));
if (failUpdateCache) {
when(commands.hset(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"), anyString()))
.thenThrow(new RedisException("Connection lost"));
}
when(profiles.get(eq(uuid), eq("someversion"))).thenReturn(Optional.of(profile));
Optional<VersionedProfile> retrieved = profilesManager.get(uuid, "someversion");
@@ -186,15 +192,19 @@ public class ProfilesManagerTest {
verifyNoMoreInteractions(profiles);
}
@Test
public void testGetProfileAsyncBrokenCache() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testGetProfileAsyncBrokenCache(final boolean failUpdateCache) {
final UUID uuid = UUID.randomUUID();
final byte[] name = TestRandomUtil.nextBytes(81);
final VersionedProfile profile = new VersionedProfile("someversion", name, "someavatar", null, null,
null, null, "somecommitment".getBytes());
when(asyncCommands.hget(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"))).thenReturn(MockRedisFuture.failedFuture(new RedisException("Connection lost")));
when(asyncCommands.hset(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"), anyString())).thenReturn(MockRedisFuture.completedFuture(null));
when(asyncCommands.hset(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"), anyString()))
.thenReturn(failUpdateCache
? MockRedisFuture.failedFuture(new RedisException("Connection lost"))
: MockRedisFuture.completedFuture(null));
when(profiles.getAsync(eq(uuid), eq("someversion"))).thenReturn(CompletableFuture.completedFuture(Optional.of(profile)));
Optional<VersionedProfile> retrieved = profilesManager.getAsync(uuid, "someversion").join();