diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java index 37259d462..2363cec00 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java @@ -1310,7 +1310,11 @@ public class AccountsManager extends RedisPubSubAdapter implemen Optional account = resolveFromRedis.get(); if (account.isEmpty()) { account = resolveFromAccounts.get(); - account.ifPresent(this::redisSet); + try { + account.ifPresent(this::redisSet); + } catch (RedisException e) { + logger.warn("Failed to cache retrieved account", e); + } } return account; }); @@ -1328,7 +1332,12 @@ public class AccountsManager extends RedisPubSubAdapter implemen .map(accountFromRedis -> CompletableFuture.completedFuture(maybeAccountFromRedis)) .orElseGet(() -> resolveFromAccounts.get() .thenCompose(maybeAccountFromAccounts -> maybeAccountFromAccounts - .map(account -> redisSetAsync(account).thenApply(ignored -> maybeAccountFromAccounts)) + .map(account -> redisSetAsync(account) + .exceptionally(ExceptionUtils.exceptionallyHandler(RedisException.class, e -> { + logger.warn("Failed to cache retrieved account", e); + return null; + })) + .thenApply(ignored -> maybeAccountFromAccounts)) .orElseGet(() -> CompletableFuture.completedFuture(maybeAccountFromAccounts))))) .whenComplete((ignored, throwable) -> sample.stop(overallTimer)); } @@ -1343,7 +1352,7 @@ public class AccountsManager extends RedisPubSubAdapter implemen logger.warn("Deserialization error", e); return Optional.empty(); } catch (RedisException e) { - logger.warn("Redis failure", e); + logger.warn("Failed fetching account from cache by secondary key", e); return Optional.empty(); } }); @@ -1375,7 +1384,7 @@ public class AccountsManager extends RedisPubSubAdapter implemen return parseAccountJson(json, uuid); } catch (final RedisException e) { - logger.warn("Redis failure", e); + logger.warn("Failed to retrieve account from cache", e); return Optional.empty(); } }); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java index e35d868a0..028701d99 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java @@ -22,6 +22,7 @@ import io.micrometer.core.instrument.Metrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; +import org.whispersystems.textsecuregcm.util.ExceptionUtils; import org.whispersystems.textsecuregcm.util.ResilienceUtil; import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.Util; @@ -114,7 +115,11 @@ public class ProfilesManager { if (profile.isEmpty()) { profile = profiles.get(uuid, version); - profile.ifPresent(versionedProfile -> redisSet(uuid, versionedProfile)); + try { + profile.ifPresent(versionedProfile -> redisSet(uuid, versionedProfile)); + } catch (RedisException e) { + logger.warn("Failed to cache retrieved profile", e); + } } return profile; @@ -126,7 +131,12 @@ public class ProfilesManager { .map(versionedProfile -> CompletableFuture.completedFuture(maybeVersionedProfile)) .orElseGet(() -> profiles.getAsync(uuid, version) .thenCompose(maybeVersionedProfileFromDynamo -> maybeVersionedProfileFromDynamo - .map(profile -> redisSetAsync(uuid, profile).thenApply(ignored -> maybeVersionedProfileFromDynamo)) + .map(profile -> redisSetAsync(uuid, profile) + .exceptionally(ExceptionUtils.exceptionallyHandler(RedisException.class, e -> { + logger.warn("Failed to cache retrieved profile", e); + return null; + })) + .thenApply(ignored -> maybeVersionedProfileFromDynamo)) .orElseGet(() -> CompletableFuture.completedFuture(maybeVersionedProfileFromDynamo))))); } @@ -161,7 +171,7 @@ public class ProfilesManager { return parseProfileJson(json); } catch (RedisException e) { - logger.warn("Redis exception", e); + logger.warn("Failed to retrieve profile from cache", e); return Optional.empty(); } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java index 87468b61d..d047b901f 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java @@ -70,6 +70,7 @@ import org.junit.jupiter.api.function.Executable; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.stubbing.Answer; @@ -508,13 +509,25 @@ class AccountsManagerTest { verifyNoMoreInteractions(accounts); } - @Test - void testGetAccountByUuidBrokenCache() { + enum FailureStep { + GET, + SET_ACI, + SET_PNI + } + + @ParameterizedTest + @EnumSource(FailureStep.class) + void testGetAccountByUuidBrokenCache(final FailureStep step) { UUID uuid = UUID.randomUUID(); UUID pni = UUID.randomUUID(); Account account = AccountsHelper.generateTestAccount("+14152222222", uuid, pni, new ArrayList<>(), new byte[UnidentifiedAccessUtil.UNIDENTIFIED_ACCESS_KEY_LENGTH]); - when(clusterCommands.get(eq("Account3::" + uuid))).thenThrow(new RedisException("Connection lost!")); + (switch (step) { + case GET -> when(clusterCommands.get(eq("Account3::" + uuid))); + case SET_ACI -> when(clusterCommands.setex(eq("Account3::" + uuid), anyLong(), anyString())); + case SET_PNI -> when(clusterCommands.setex(eq("AccountMap::" + pni), anyLong(), eq(uuid.toString()))); + }).thenThrow(new RedisException("Connection lost!")); + when(accounts.getByAccountIdentifier(eq(uuid))).thenReturn(Optional.of(account)); Optional retrieved = accountsManager.getByAccountIdentifier(uuid); @@ -524,27 +537,35 @@ class AccountsManagerTest { verify(clusterCommands, times(1)).get(eq("Account3::" + uuid)); verify(clusterCommands, times(1)).setex(eq("AccountMap::" + pni), anyLong(), eq(uuid.toString())); - verify(clusterCommands, times(1)).setex(eq("Account3::" + uuid), anyLong(), anyString()); + // we only try setting the ACI if we successfully set the PNI + verify(clusterCommands, times(step == FailureStep.SET_PNI ? 0 : 1)) + .setex(eq("Account3::" + uuid), anyLong(), anyString()); verifyNoMoreInteractions(clusterCommands); verify(accounts, times(1)).getByAccountIdentifier(eq(uuid)); verifyNoMoreInteractions(accounts); } - @Test - void testGetAccountByUuidBrokenCacheAsync() { + @ParameterizedTest + @EnumSource(FailureStep.class) + void testGetAccountByUuidBrokenCacheAsync(final FailureStep step) { UUID uuid = UUID.randomUUID(); UUID pni = UUID.randomUUID(); Account account = AccountsHelper.generateTestAccount("+14152222222", uuid, pni, new ArrayList<>(), new byte[UnidentifiedAccessUtil.UNIDENTIFIED_ACCESS_KEY_LENGTH]); + when(asyncClusterCommands.get(eq("Account3::" + uuid))) - .thenReturn(MockRedisFuture.failedFuture(new RedisException("Connection lost!"))); - + .thenReturn(MockRedisFuture.completedFuture(null)); when(asyncClusterCommands.setex(any(), anyLong(), any())).thenReturn(MockRedisFuture.completedFuture("OK")); - when(accounts.getByAccountIdentifierAsync(eq(uuid))) .thenReturn(CompletableFuture.completedFuture(Optional.of(account))); + (switch (step) { + case GET -> when(asyncClusterCommands.get(eq("Account3::" + uuid))); + case SET_ACI -> when(asyncClusterCommands.setex(eq("Account3::" + uuid), anyLong(), anyString())); + case SET_PNI -> when(asyncClusterCommands.setex(eq("AccountMap::" + pni), anyLong(), eq(uuid.toString()))); + }).thenReturn(MockRedisFuture.failedFuture(new RedisException("Connection lost!"))); + Optional retrieved = accountsManager.getByAccountIdentifierAsync(uuid).join(); assertTrue(retrieved.isPresent()); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesManagerTest.java index ee27d9486..dd07bf0e6 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesManagerTest.java @@ -31,6 +31,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.ValueSource; import org.signal.libsignal.protocol.ServiceId; import org.signal.libsignal.zkgroup.InvalidInputException; @@ -163,14 +164,19 @@ public class ProfilesManagerTest { verifyNoMoreInteractions(profiles); } - @Test - public void testGetProfileBrokenCache() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testGetProfileBrokenCache(final boolean failUpdateCache) { final UUID uuid = UUID.randomUUID(); final byte[] name = TestRandomUtil.nextBytes(81); final VersionedProfile profile = new VersionedProfile("someversion", name, "someavatar", null, null, null, null, "somecommitment".getBytes()); when(commands.hget(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"))).thenThrow(new RedisException("Connection lost")); + if (failUpdateCache) { + when(commands.hset(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"), anyString())) + .thenThrow(new RedisException("Connection lost")); + } when(profiles.get(eq(uuid), eq("someversion"))).thenReturn(Optional.of(profile)); Optional retrieved = profilesManager.get(uuid, "someversion"); @@ -186,15 +192,19 @@ public class ProfilesManagerTest { verifyNoMoreInteractions(profiles); } - @Test - public void testGetProfileAsyncBrokenCache() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testGetProfileAsyncBrokenCache(final boolean failUpdateCache) { final UUID uuid = UUID.randomUUID(); final byte[] name = TestRandomUtil.nextBytes(81); final VersionedProfile profile = new VersionedProfile("someversion", name, "someavatar", null, null, null, null, "somecommitment".getBytes()); when(asyncCommands.hget(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"))).thenReturn(MockRedisFuture.failedFuture(new RedisException("Connection lost"))); - when(asyncCommands.hset(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"), anyString())).thenReturn(MockRedisFuture.completedFuture(null)); + when(asyncCommands.hset(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"), anyString())) + .thenReturn(failUpdateCache + ? MockRedisFuture.failedFuture(new RedisException("Connection lost")) + : MockRedisFuture.completedFuture(null)); when(profiles.getAsync(eq(uuid), eq("someversion"))).thenReturn(CompletableFuture.completedFuture(Optional.of(profile))); Optional retrieved = profilesManager.getAsync(uuid, "someversion").join();