Add transaction support to kyber triples

This commit is contained in:
Fedor Indutny
2025-09-30 15:53:46 -07:00
committed by GitHub
parent b2f1639146
commit c25ba3f2a4
3 changed files with 88 additions and 18 deletions

View File

@@ -20,7 +20,7 @@ import {
} from '@signalapp/libsignal-client';
import { DataReader, DataWriter } from './sql/Client.js';
import type { ItemType } from './sql/Interface.js';
import type { ItemType, KyberPreKeyTripleType } from './sql/Interface.js';
import * as Bytes from './Bytes.js';
import { constantTimeEqual, sha256 } from './Crypto.js';
import { assertDev, strictAssert } from './util/assert.js';
@@ -213,6 +213,19 @@ export function hydrateSignedPreKey(
);
}
// Format: keyId:signedPreKeyId:baseKey
type KyberTripleCacheKeyPrefixType = `${KyberPreKeyTripleType['id']}:`;
type KyberTripleCacheKeyType =
`${KyberTripleCacheKeyPrefixType}${KyberPreKeyTripleType['signedPreKeyId']}:${string}`;
function getKyberTripleCacheKey({
id,
signedPreKeyId,
baseKey,
}: KyberPreKeyTripleType): KyberTripleCacheKeyType {
return `${id}:${signedPreKeyId}:${Bytes.toHex(baseKey)}`;
}
type SessionCacheEntry = CacheEntryType<SessionType, SessionRecord>;
type SenderKeyCacheEntry = CacheEntryType<SenderKeyType, SenderKeyRecord>;
@@ -254,6 +267,8 @@ export class SignalProtocolStore extends EventEmitter {
CacheEntryType<SignedPreKeyType, SignedPreKeyRecord>
>;
readonly #kyberTriples = new Set<KyberTripleCacheKeyType>();
senderKeyQueues = new Map<QualifiedAddressStringType, PQueue>();
sessionQueues = new Map<SessionIdType, PQueue>();
@@ -267,6 +282,10 @@ export class SignalProtocolStore extends EventEmitter {
#pendingSessions = new Map<SessionIdType, SessionCacheEntry>();
#pendingSenderKeys = new Map<SenderKeyIdType, SenderKeyCacheEntry>();
#pendingUnprocessed = new Map<string, UnprocessedType>();
#pendingKyberTriples = new Map<
KyberTripleCacheKeyType,
KyberPreKeyTripleType
>();
async hydrateCaches(): Promise<void> {
await Promise.all([
@@ -310,6 +329,15 @@ export class SignalProtocolStore extends EventEmitter {
this.#ourRegistrationIds.set(serviceId, map.value[serviceId]);
}
})(),
(async () => {
this.#kyberTriples.clear();
const triples = await DataReader.getAllKyberTriples();
for (const t of triples) {
this.#kyberTriples.add(getKyberTripleCacheKey(t));
}
})(),
_fillCaches<string, IdentityKeyType, PublicKey>(
this,
'identityKeys',
@@ -525,14 +553,30 @@ export class SignalProtocolStore extends EventEmitter {
`maybeRemoveKyberPreKey: Not removing kyber prekey ${id}; it's a last resort key`
);
const result = await DataWriter.markKyberTripleSeenOrFail({
id: `${ourServiceId}:${keyId}`,
signedPreKeyId,
baseKey: baseKey.serialize(),
await this.withZone(zone, 'maybeRemoveKyberPreKey', async () => {
const triple: KyberPreKeyTripleType = {
id: `${ourServiceId}:${keyId}`,
signedPreKeyId,
baseKey: baseKey.serialize(),
};
const cacheKey = getKyberTripleCacheKey(triple);
// Note: we don't have to check for `#pendingKyberPreKeysToRemove` since
// it makes the key in question inaccessible to begin with.
if (
this.#kyberTriples.has(cacheKey) ||
this.#pendingKyberTriples.has(cacheKey)
) {
throw new Error(`Duplicate kyber triple ${keyId}:${signedPreKeyId}`);
}
this.#pendingKyberTriples.set(cacheKey, triple);
if (!zone.supportsPendingKyberPreKeysToRemove()) {
await this.#commitZoneChanges('removeKyberPreKeys');
}
});
if (result === 'fail') {
throw new Error(`Duplicate kyber triple ${keyId}:${signedPreKeyId}`);
}
}
async removeKyberPreKeys(
@@ -1169,13 +1213,15 @@ export class SignalProtocolStore extends EventEmitter {
const pendingSenderKeys = this.#pendingSenderKeys;
const pendingSessions = this.#pendingSessions;
const pendingUnprocessed = this.#pendingUnprocessed;
const pendingKyberTriples = this.#pendingKyberTriples;
if (
pendingKyberPreKeysToRemove.size === 0 &&
pendingPreKeysToRemove.size === 0 &&
pendingSenderKeys.size === 0 &&
pendingSessions.size === 0 &&
pendingUnprocessed.size === 0
pendingUnprocessed.size === 0 &&
pendingKyberTriples.size === 0
) {
return;
}
@@ -1186,7 +1232,8 @@ export class SignalProtocolStore extends EventEmitter {
`pending preKeysToRemove ${pendingKyberPreKeysToRemove.size}, ` +
`pending senderKeys ${pendingSenderKeys.size}, ` +
`pending sessions ${pendingSessions.size}, ` +
`pending unprocessed ${pendingUnprocessed.size}`
`pending unprocessed ${pendingUnprocessed.size}, ` +
`pending kyberTriples ${pendingKyberTriples.size}`
);
this.#pendingKyberPreKeysToRemove = new Set();
@@ -1194,6 +1241,7 @@ export class SignalProtocolStore extends EventEmitter {
this.#pendingSenderKeys = new Map();
this.#pendingSessions = new Map();
this.#pendingUnprocessed = new Map();
this.#pendingKyberTriples = new Map();
// Commit both sender keys, sessions and unprocessed in the same database transaction
// to unroll both on error.
@@ -1207,17 +1255,30 @@ export class SignalProtocolStore extends EventEmitter {
({ fromDB }) => fromDB
),
unprocessed: Array.from(pendingUnprocessed.values()),
kyberTriples: Array.from(pendingKyberTriples.values()),
});
// Apply changes to in-memory storage after successful DB write.
for (const cacheKey of pendingKyberTriples.keys()) {
this.#kyberTriples.add(cacheKey);
}
const { kyberPreKeys } = this;
assertDev(
kyberPreKeys !== undefined,
"Can't commit unhydrated kyberPreKeys storage"
);
pendingKyberPreKeysToRemove.forEach(value => {
pendingKyberPreKeysToRemove.forEach((value: PreKeyIdType) => {
kyberPreKeys.delete(value);
// Remove all cached kyber triples for this key.
const prefix: KyberTripleCacheKeyPrefixType = `${value}:`;
for (const key of this.#kyberTriples.keys()) {
if (key.startsWith(prefix)) {
this.#kyberTriples.delete(key);
}
}
});
if (kyberPreKeys.size < LOW_KEYS_THRESHOLD) {
this.#emitLowKeys(`removeKyberPreKeys@${kyberPreKeys.size}`);