mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-20 19:18:03 +01:00
Fix for jedis pool deadlock
1) Remove nested pool checkouts 2) Add a max wait so it won't block forever on deadlock
This commit is contained in:
@@ -48,6 +48,7 @@ public class RedisClientFactory implements RedisPubSubConnectionFactory {
|
||||
{
|
||||
JedisPoolConfig poolConfig = new JedisPoolConfig();
|
||||
poolConfig.setTestOnBorrow(true);
|
||||
poolConfig.setMaxWaitMillis(10000);
|
||||
|
||||
URI redisURI = new URI(url);
|
||||
|
||||
|
||||
@@ -39,19 +39,27 @@ public class LuaScript {
|
||||
|
||||
public Object execute(List<byte[]> keys, List<byte[]> args) {
|
||||
try (Jedis jedis = jedisPool.getWriteResource()) {
|
||||
try {
|
||||
return jedis.evalsha(sha, keys, args);
|
||||
} catch (JedisDataException e) {
|
||||
storeScript(jedisPool, script);
|
||||
return jedis.evalsha(sha, keys, args);
|
||||
}
|
||||
return execute(jedis, keys, args);
|
||||
}
|
||||
}
|
||||
|
||||
public Object execute(Jedis jedis, List<byte[]> keys, List<byte[]> args) {
|
||||
try {
|
||||
return jedis.evalsha(sha, keys, args);
|
||||
} catch (JedisDataException e) {
|
||||
storeScript(jedis, script);
|
||||
return jedis.evalsha(sha, keys, args);
|
||||
}
|
||||
}
|
||||
|
||||
private String storeScript(ReplicatedJedisPool jedisPool, String script) {
|
||||
try (Jedis jedis = jedisPool.getWriteResource()) {
|
||||
return jedis.scriptLoad(script);
|
||||
return storeScript(jedis, script);
|
||||
}
|
||||
}
|
||||
|
||||
private String storeScript(Jedis jedis, String script) {
|
||||
return jedis.scriptLoad(script);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -67,7 +67,7 @@ public class AccountsManager {
|
||||
}
|
||||
|
||||
public boolean create(Account account) {
|
||||
try (Timer.Context context = createTimer.time()) {
|
||||
try (Timer.Context ignored = createTimer.time()) {
|
||||
boolean freshUser = databaseCreate(account);
|
||||
redisSet(account);
|
||||
updateDirectory(account);
|
||||
@@ -77,7 +77,7 @@ public class AccountsManager {
|
||||
}
|
||||
|
||||
public void update(Account account) {
|
||||
try (Timer.Context context = updateTimer.time()) {
|
||||
try (Timer.Context ignored = updateTimer.time()) {
|
||||
redisSet(account);
|
||||
databaseUpdate(account);
|
||||
updateDirectory(account);
|
||||
@@ -91,7 +91,7 @@ public class AccountsManager {
|
||||
}
|
||||
|
||||
public Optional<Account> get(String number) {
|
||||
try (Timer.Context context = getByNumberTimer.time()) {
|
||||
try (Timer.Context ignored = getByNumberTimer.time()) {
|
||||
Optional<Account> account = redisGet(number);
|
||||
|
||||
if (!account.isPresent()) {
|
||||
@@ -104,7 +104,7 @@ public class AccountsManager {
|
||||
}
|
||||
|
||||
public Optional<Account> get(UUID uuid) {
|
||||
try (Timer.Context context = getByUuidTimer.time()) {
|
||||
try (Timer.Context ignored = getByUuidTimer.time()) {
|
||||
Optional<Account> account = redisGet(uuid);
|
||||
|
||||
if (!account.isPresent()) {
|
||||
@@ -140,12 +140,12 @@ public class AccountsManager {
|
||||
}
|
||||
|
||||
private String getAccountEntityKey(UUID uuid) {
|
||||
return "Account::" + uuid.toString();
|
||||
return "Account2::" + uuid.toString();
|
||||
}
|
||||
|
||||
private void redisSet(Account account) {
|
||||
try (Jedis jedis = cacheClient.getWriteResource();
|
||||
Timer.Context timer = redisSetTimer.time())
|
||||
try (Jedis jedis = cacheClient.getWriteResource();
|
||||
Timer.Context ignored = redisSetTimer.time())
|
||||
{
|
||||
jedis.set(getAccountMapKey(account.getNumber()), account.getUuid().toString());
|
||||
jedis.set(getAccountEntityKey(account.getUuid()), mapper.writeValueAsString(account));
|
||||
@@ -155,12 +155,12 @@ public class AccountsManager {
|
||||
}
|
||||
|
||||
private Optional<Account> redisGet(String number) {
|
||||
try (Jedis jedis = cacheClient.getReadResource();
|
||||
Timer.Context timer = redisNumberGetTimer.time())
|
||||
try (Jedis jedis = cacheClient.getReadResource();
|
||||
Timer.Context ignored = redisNumberGetTimer.time())
|
||||
{
|
||||
String uuid = jedis.get(getAccountMapKey(number));
|
||||
|
||||
if (uuid != null) return redisGet(UUID.fromString(uuid));
|
||||
if (uuid != null) return redisGet(jedis, UUID.fromString(uuid));
|
||||
else return Optional.empty();
|
||||
} catch (IllegalArgumentException e) {
|
||||
logger.warn("Deserialization error", e);
|
||||
@@ -172,9 +172,13 @@ public class AccountsManager {
|
||||
}
|
||||
|
||||
private Optional<Account> redisGet(UUID uuid) {
|
||||
try (Jedis jedis = cacheClient.getReadResource();
|
||||
Timer.Context timer = redisUuidGetTimer.time())
|
||||
{
|
||||
try (Jedis jedis = cacheClient.getReadResource()) {
|
||||
return redisGet(jedis, uuid);
|
||||
}
|
||||
}
|
||||
|
||||
private Optional<Account> redisGet(Jedis jedis, UUID uuid) {
|
||||
try (Timer.Context ignored = redisUuidGetTimer.time()) {
|
||||
String json = jedis.get(getAccountEntityKey(uuid));
|
||||
|
||||
if (json != null) {
|
||||
@@ -192,7 +196,6 @@ public class AccountsManager {
|
||||
logger.warn("Redis failure", e);
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private Optional<Account> databaseGet(String number) {
|
||||
|
||||
@@ -82,12 +82,10 @@ public class MessagesCache implements Managed {
|
||||
}
|
||||
|
||||
public void remove(String destination, long destinationDevice, long id) {
|
||||
Timer.Context timer = removeByIdTimer.time();
|
||||
|
||||
try {
|
||||
removeOperation.remove(destination, destinationDevice, id);
|
||||
} finally {
|
||||
timer.stop();
|
||||
try (Jedis jedis = jedisPool.getWriteResource();
|
||||
Timer.Context ignored = removeByIdTimer.time())
|
||||
{
|
||||
removeOperation.remove(jedis, destination, destinationDevice, id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -298,13 +296,13 @@ public class MessagesCache implements Managed {
|
||||
this.removeQueue = LuaScript.fromResource(jedisPool, "lua/remove_queue.lua" );
|
||||
}
|
||||
|
||||
public void remove(String destination, long destinationDevice, long id) {
|
||||
public void remove(Jedis jedis, String destination, long destinationDevice, long id) {
|
||||
Key key = new Key(destination, destinationDevice);
|
||||
|
||||
List<byte[]> keys = Arrays.asList(key.getUserMessageQueue(), key.getUserMessageQueueMetadata(), Key.getUserMessageQueueIndex());
|
||||
List<byte[]> args = Collections.singletonList(String.valueOf(id).getBytes());
|
||||
|
||||
this.removeById.execute(keys, args);
|
||||
this.removeById.execute(jedis, keys, args);
|
||||
}
|
||||
|
||||
public byte[] remove(String destination, long destinationDevice, String sender, long timestamp) {
|
||||
@@ -464,7 +462,7 @@ public class MessagesCache implements Managed {
|
||||
Set<Tuple> messages = jedis.zrangeWithScores(key.getUserMessageQueue(), 0, CHUNK_SIZE);
|
||||
|
||||
for (Tuple message : messages) {
|
||||
persistMessage(key, (long)message.getScore(), message.getBinaryElement());
|
||||
persistMessage(jedis, key, (long)message.getScore(), message.getBinaryElement());
|
||||
messagesPersistedCount++;
|
||||
}
|
||||
|
||||
@@ -479,7 +477,7 @@ public class MessagesCache implements Managed {
|
||||
}
|
||||
}
|
||||
|
||||
private void persistMessage(Key key, long score, byte[] message) {
|
||||
private void persistMessage(Jedis jedis, Key key, long score, byte[] message) {
|
||||
try {
|
||||
Envelope envelope = Envelope.parseFrom(message);
|
||||
UUID guid = envelope.hasServerGuid() ? UUID.fromString(envelope.getServerGuid()) : null;
|
||||
@@ -491,7 +489,7 @@ public class MessagesCache implements Managed {
|
||||
logger.error("Error parsing envelope", e);
|
||||
}
|
||||
|
||||
removeOperation.remove(key.getAddress(), key.getDeviceId(), score);
|
||||
removeOperation.remove(jedis, key.getAddress(), key.getDeviceId(), score);
|
||||
}
|
||||
|
||||
private List<byte[]> getQueuesToPersist(GetOperation getOperation) {
|
||||
|
||||
Reference in New Issue
Block a user