diff --git a/package.json b/package.json index 427bbd68a2..eb9babb73c 100644 --- a/package.json +++ b/package.json @@ -122,7 +122,7 @@ "@signalapp/libsignal-client": "0.94.1", "@signalapp/mute-state-change": "workspace:1.0.0", "@signalapp/ringrtc": "2.69.0", - "@signalapp/sqlcipher": "3.2.1", + "@signalapp/sqlcipher": "3.3.5", "@signalapp/windows-ucv": "1.0.1", "google-libphonenumber": "3.2.44" }, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 367ed89fcd..b0a4bae448 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -79,8 +79,8 @@ importers: specifier: 2.69.0 version: 2.69.0 '@signalapp/sqlcipher': - specifier: 3.2.1 - version: 3.2.1 + specifier: 3.3.5 + version: 3.3.5 '@signalapp/windows-ucv': specifier: 1.0.1 version: 1.0.1 @@ -4042,8 +4042,8 @@ packages: resolution: {integrity: sha512-yugpBTyFvmeV9//DdCIj1c1pc23GdLsRR0jHsfaDKVyDKvaa1JQ7aa2xjUZjSZNwqQK+X2y6EcUSErr0Hba4Ww==} hasBin: true - '@signalapp/sqlcipher@3.2.1': - resolution: {integrity: sha512-QStHnLZiYp7YvdR6CnDO7TxdoSJQCeF02UxkyJE7+l/qES3wvwkc1IuAxSYCZEI8gS4PUBTYXSX70yod7PS9Mg==} + '@signalapp/sqlcipher@3.3.5': + resolution: {integrity: sha512-0kkHQixiaFOFYCXP6J8zsvXeq7REf5nucX+BMY8Gy5E6F2BEM2Ap9NsDNaB+wHZ6QqqsItQRvSE4PaYhTtWGIg==} '@signalapp/windows-ucv@1.0.1': resolution: {integrity: sha512-tArRaDzAFXQ6BcYseUtd9bp52/sb5C/zbCIoNkDH+FUoxnZRvX25Fv2HHbH7Xe2+bcdb4+DQmyoUeHMhTUxAmA==} @@ -13935,7 +13935,7 @@ snapshots: transitivePeerDependencies: - supports-color - '@signalapp/sqlcipher@3.2.1': + '@signalapp/sqlcipher@3.3.5': dependencies: node-addon-api: 8.5.0 node-gyp-build: 4.8.4 diff --git a/ts/sql/Server.node.ts b/ts/sql/Server.node.ts index 668a7a613f..63eb7b408e 100644 --- a/ts/sql/Server.node.ts +++ b/ts/sql/Server.node.ts @@ -306,6 +306,7 @@ import { getFilePathsReferencedByMessage } from '../util/messageFilePaths.std.ts import { createMessagesOnInsertTrigger } from './migrations/1500-search-polls.std.ts'; import { isValidPlaintextHash } from '../types/Crypto.std.ts'; import { Emoji } from '../axo/emoji.std.ts'; +import { WalCheckpoints } from './WalCheckpoints.std.ts'; const { forEach, @@ -865,6 +866,7 @@ function switchToWAL(db: WritableDB): void { // https://sqlite.org/wal.html db.pragma('journal_mode = WAL'); db.pragma('synchronous = FULL'); + WalCheckpoints.setupCommitHook(db, logger); } function migrateSchemaVersion(db: WritableDB): void { @@ -1025,6 +1027,7 @@ export function initialize({ // Only the first worker gets to upgrade the schema. The rest just folow. if (isPrimary) { updateSchema(db, logger); + WalCheckpoints.setupDeleteTriggers(db, logger); } // test database @@ -1057,6 +1060,10 @@ function closeReadable(db: ReadableDB): void { } function closeWritable(db: WritableDB): void { + // Flush any pending WAL checkpoints before database close + // TODO: Do we need the retry behavior here? + WalCheckpoints.runImmediately(db, logger, 'close'); + // SQLLite documentation suggests that we run `PRAGMA optimize` right // before closing the database connection. db.pragma('optimize'); diff --git a/ts/sql/WalCheckpoints.std.ts b/ts/sql/WalCheckpoints.std.ts new file mode 100644 index 0000000000..93c7e7f06e --- /dev/null +++ b/ts/sql/WalCheckpoints.std.ts @@ -0,0 +1,202 @@ +// Copyright 2026 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only +import type { LoggerType } from '../types/Logging.std.ts'; +import type { ReadableDB, WritableDB } from './Interface.std.ts'; +import * as Errors from '../types/errors.std.ts'; +import { sql } from './util.std.ts'; + +/** + * The default automatic checkpointing behavior of sqlite only checkpoints every 1000 pages + * and on database close. Which means that changes can sit in the log for potentially a long time. + * + * To make sure that the WAL is flushed soon after every commit, we call `sqlite3_wal_hook()` + * (which replaces the automatic checkpointing behavior) with our own callback. + * + * We will still run a checkpoint every 1000 pages using TRUNCATE instead of PASSIVE. + * + * But we will also run a checkpoint after every commit, throttled to every 30 seconds. + * + * We also setup TEMP trigger's AFTER DELETE on every table, which reschedules + * the next checkpoint to every 5 seconds. + */ +export namespace WalCheckpoints { + const PAGE_THRESHOLD = 1000; + const THROTTLE_MS_AFTER_COMMIT = 30_000; // 30s + const THROTTLE_MS_AFTER_DELETE = 5_000; // 5s + + let onCheckpointNeeded: ((reason: string) => void) | null = null; + + let lastRunAt = 0; + let pendingRunWhenIdle = false; + let hasDeletesSinceLastRun = false; + let scheduledTimer: ReturnType | null = null; + + export function setOnCheckpointNeeded( + callback: (reason: string) => void + ): void { + onCheckpointNeeded = callback; + } + + /** @testexport */ + export function _reset(): void { + onCheckpointNeeded = null; + if (scheduledTimer != null) { + clearTimeout(scheduledTimer); + scheduledTimer = null; + } + lastRunAt = 0; + pendingRunWhenIdle = false; + hasDeletesSinceLastRun = false; + } + + function run( + db: WritableDB, + logger: LoggerType, + attempts: number, + reason: string, + callback: () => void + ) { + try { + db.pragma('wal_checkpoint(TRUNCATE)'); + callback(); + } catch (error) { + if (error.code !== 'SQLITE_LOCKED') { + logger.error( + `WalCheckpoints.run: Unexpected error (attempts: ${attempts}, reason: ${reason})`, + Errors.toLogFormat(error) + ); + return; + } + + // TODO: Are there any errors that we shouldn't retry? + logger.warn( + `WalCheckpoints.run: Database is locked, retrying (attempts: ${attempts}, reason: ${reason})`, + Errors.toLogFormat(error) + ); + + // TODO: This should probably try again faster with backoff or something + setTimeout(() => { + run(db, logger, attempts + 1, reason, callback); + }, 1000); + } + } + + export function runImmediately( + db: WritableDB, + logger: LoggerType, + reason: string + ): void { + if (scheduledTimer != null) { + clearTimeout(scheduledTimer); + scheduledTimer = null; + } + + run(db, logger, 0, reason, () => { + lastRunAt = Date.now(); + pendingRunWhenIdle = false; + hasDeletesSinceLastRun = false; + }); + } + + function runWhenIdle(logger: LoggerType, reason: string): void { + if (onCheckpointNeeded == null) { + logger.error( + 'WalCheckpoints.runWhenIdle: setOnCheckpointNeeded has not been called' + ); + return; + } + + if (pendingRunWhenIdle) { + return; + } + pendingRunWhenIdle = true; + onCheckpointNeeded(reason); + } + + /** @testexport */ + export function _scheduleRun( + event: 'commit' | 'delete', + logger: LoggerType + ): void { + if (pendingRunWhenIdle) { + return; + } + + const prevScheduledForDelete = hasDeletesSinceLastRun; + const needScheduledForDelete = event === 'delete'; + + if (event === 'delete') { + hasDeletesSinceLastRun = true; + } + + const elapsedMs = Date.now() - lastRunAt; + const throttleMs = hasDeletesSinceLastRun + ? THROTTLE_MS_AFTER_DELETE + : THROTTLE_MS_AFTER_COMMIT; + + if (elapsedMs >= throttleMs) { + if (scheduledTimer != null) { + clearTimeout(scheduledTimer); + scheduledTimer = null; + } + runWhenIdle(logger, event); + return; + } + + if (scheduledTimer != null) { + if (prevScheduledForDelete || !needScheduledForDelete) { + return; + } + clearTimeout(scheduledTimer); + } + + scheduledTimer = setTimeout(() => { + scheduledTimer = null; + runWhenIdle(logger, event); + }, throttleMs - elapsedMs); + } + + export function setupCommitHook(db: WritableDB, logger: LoggerType): void { + db.setWalHook((_dbName, pageCount) => { + if (pageCount >= PAGE_THRESHOLD) { + // TODO: Should we run `PRAGMA wal_checkpoint(PASSIVE)` here like automatic checkpoints do? + // We could still call runWhenIdle() to get a TRUNCATE? + runWhenIdle(logger, 'page-threshold'); + } else { + _scheduleRun('commit', logger); + } + }); + } + + function getAllTableNames(db: ReadableDB): ReadonlyArray { + const [query, params] = sql` + SELECT name FROM sqlite_master + WHERE type = 'table' + AND name NOT LIKE 'sqlite_%' + AND name NOT LIKE 'messages_fts_%' + AND sql NOT LIKE 'CREATE VIRTUAL TABLE%' + `; + return db.prepare(query, { pluck: true }).all(params); + } + + export function setupDeleteTriggers( + db: WritableDB, + logger: LoggerType + ): void { + db.createFunction('_wal_checkpoint_on_delete', () => { + _scheduleRun('delete', logger); + }); + + const tableNames = getAllTableNames(db); + + for (const tableName of tableNames) { + db.exec(` + CREATE TEMP TRIGGER IF NOT EXISTS _wal_checkpoint_${tableName}_after_delete + AFTER DELETE ON "${tableName}" + BEGIN + SELECT _wal_checkpoint_on_delete(); + END; + `); + } + } +} diff --git a/ts/sql/main.main.ts b/ts/sql/main.main.ts index 511d7323c2..2e76bedd70 100644 --- a/ts/sql/main.main.ts +++ b/ts/sql/main.main.ts @@ -44,6 +44,10 @@ export type WorkerRequest = Readonly< | { type: 'close' | 'removeDB'; } + | { + type: 'walCheckpoint'; + reason: string; + } | { type: 'sqlCall:read'; encoding: 'js'; @@ -96,6 +100,10 @@ export type WrappedWorkerResponse = // oxlint-disable-next-line typescript/no-explicit-any response: any; }> + | Readonly<{ + type: 'walCheckpointNeeded'; + reason: string; + }> | WrappedWorkerLogEntry; type ResponseEntry = { @@ -148,6 +156,8 @@ export class MainSQL { // oxlint-disable-next-line typescript/no-explicit-any readonly #onResponse = new Map>(); + #checkpointPendingReason: string | null = null; + readonly #shouldLogQueryTime: (queryName: string) => boolean; #shouldTrackQueryStats = false; @@ -452,9 +462,28 @@ export class MainSQL { } finally { // oxlint-disable-next-line no-param-reassign entry.load -= 1; + this.#maybeRunCheckpoint(); } } + #maybeRunCheckpoint(): void { + if (!this.#isReady || this.#checkpointPendingReason == null) { + return; + } + + for (const entry of this.#pool) { + if (entry.load !== 0) { + return; + } + } + + const reason = this.#checkpointPendingReason; + this.#checkpointPendingReason = null; + const primary = this.#pool[0]; + strictAssert(primary, 'Missing primary'); + void this.#send(primary, { type: 'walCheckpoint', reason }); + } + async #terminate(request: WorkerRequest): Promise { const primary = this.#pool[0]; strictAssert(primary, 'Missing primary'); @@ -564,6 +593,12 @@ export class MainSQL { return; } + if (wrappedResponse.type === 'walCheckpointNeeded') { + this.#checkpointPendingReason = wrappedResponse.reason; + this.#maybeRunCheckpoint(); + return; + } + const { seq, error, errorKind, response } = wrappedResponse; const entry = this.#onResponse.get(seq); diff --git a/ts/sql/mainWorker.node.ts b/ts/sql/mainWorker.node.ts index 414631c141..ae6c487817 100644 --- a/ts/sql/mainWorker.node.ts +++ b/ts/sql/mainWorker.node.ts @@ -12,6 +12,7 @@ import type { WritableDB } from './Interface.std.ts'; import { initialize, DataReader, DataWriter, removeDB } from './Server.node.ts'; import { SqliteErrorKind, parseSqliteError } from './errors.std.ts'; import { sqlLogger as logger } from './sqlLogger.node.ts'; +import { WalCheckpoints } from './WalCheckpoints.std.ts'; if (!parentPort) { throw new Error('Must run as a worker thread'); @@ -43,6 +44,17 @@ const onMessage = ( if (request.type === 'init') { isPrimary = request.isPrimary; isRemoved = false; + + if (isPrimary) { + WalCheckpoints.setOnCheckpointNeeded(reason => { + const message: WrappedWorkerResponse = { + type: 'walCheckpointNeeded', + reason, + }; + port.postMessage(message); + }); + } + db = initialize({ ...request.options, isPrimary, @@ -101,6 +113,14 @@ const onMessage = ( return; } + if (request.type === 'walCheckpoint') { + if (db != null) { + WalCheckpoints.runImmediately(db, logger, request.reason); + } + respond(seq, undefined); + return; + } + if (request.type === 'sqlCall:read' || request.type === 'sqlCall:write') { const DataInterface = request.type === 'sqlCall:read' ? DataReader : DataWriter; diff --git a/ts/test-node/sql/WalCheckpoints_test.std.ts b/ts/test-node/sql/WalCheckpoints_test.std.ts new file mode 100644 index 0000000000..884aeba1c2 --- /dev/null +++ b/ts/test-node/sql/WalCheckpoints_test.std.ts @@ -0,0 +1,173 @@ +// Copyright 2026 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import * as sinon from 'sinon'; +import type { LoggerType } from '../../types/Logging.std.ts'; +import { WalCheckpoints } from '../../sql/WalCheckpoints.std.ts'; + +const logger: LoggerType = { + warn: () => null, + error: () => null, + fatal: () => null, + info: () => null, + debug: () => null, + trace: () => null, + child: () => logger, +}; + +describe('WalCheckpoints', () => { + let sandbox: sinon.SinonSandbox; + + beforeEach(() => { + sandbox = sinon.createSandbox({ useFakeTimers: true }); + WalCheckpoints._reset(); + }); + + afterEach(() => { + WalCheckpoints._reset(); + sandbox.restore(); + }); + + describe('_scheduleRun (commit)', () => { + it('schedules a timer for 30s on first commit', () => { + const pragma = sinon.stub(); + WalCheckpoints.setOnCheckpointNeeded(pragma); + + WalCheckpoints._scheduleRun('commit', logger); + sinon.assert.notCalled(pragma); + + sandbox.clock.tick(29_999); + sinon.assert.notCalled(pragma); + + sandbox.clock.tick(1); + sinon.assert.calledOnce(pragma); + }); + + it('runs immediately when elapsed >= 30s', async () => { + const pragma = sinon.stub(); + WalCheckpoints.setOnCheckpointNeeded(pragma); + + await sandbox.clock.tickAsync(30_000); + + WalCheckpoints._scheduleRun('commit', logger); + sinon.assert.calledOnce(pragma); + }); + + it('does not reschedule when a commit timer is already pending', () => { + const pragma = sinon.stub(); + WalCheckpoints.setOnCheckpointNeeded(pragma); + + WalCheckpoints._scheduleRun('commit', logger); + + sandbox.clock.tick(29_999); + WalCheckpoints._scheduleRun('commit', logger); + sinon.assert.notCalled(pragma); + + sandbox.clock.tick(1); + // Timer fires exactly once, not twice + sinon.assert.calledOnce(pragma); + }); + }); + + describe('_scheduleRun (delete)', () => { + it('schedules a timer for 5s on first delete', () => { + const pragma = sinon.stub(); + WalCheckpoints.setOnCheckpointNeeded(pragma); + + WalCheckpoints._scheduleRun('delete', logger); + sinon.assert.notCalled(pragma); + + sandbox.clock.tick(4_999); + sinon.assert.notCalled(pragma); + + sandbox.clock.tick(1); + sinon.assert.calledOnce(pragma); + }); + + it('runs immediately when elapsed >= 5s', async () => { + const pragma = sinon.stub(); + WalCheckpoints.setOnCheckpointNeeded(pragma); + + await sandbox.clock.tickAsync(5_000); + + WalCheckpoints._scheduleRun('delete', logger); + sinon.assert.calledOnce(pragma); + }); + + it('does not reschedule when a delete timer is already pending', () => { + const pragma = sinon.stub(); + WalCheckpoints.setOnCheckpointNeeded(pragma); + + WalCheckpoints._scheduleRun('delete', logger); + + sandbox.clock.tick(4_999); + WalCheckpoints._scheduleRun('delete', logger); + sinon.assert.notCalled(pragma); + + sandbox.clock.tick(5_000); + sinon.assert.calledOnce(pragma); + }); + + it('does not reschedule when a commit timer was upgraded to delete', () => { + const pragma = sinon.stub(); + WalCheckpoints.setOnCheckpointNeeded(pragma); + + WalCheckpoints._scheduleRun('delete', logger); + + sandbox.clock.tick(4_999); + WalCheckpoints._scheduleRun('commit', logger); + sinon.assert.notCalled(pragma); + + sandbox.clock.tick(1); + sinon.assert.calledOnce(pragma); + }); + }); + + describe('delete upgrading a pending commit timer', () => { + it('fires immediately if already past 5s', () => { + const pragma = sinon.stub(); + WalCheckpoints.setOnCheckpointNeeded(pragma); + + WalCheckpoints._scheduleRun('commit', logger); + + sandbox.clock.tick(5_000); + sinon.assert.notCalled(pragma); + + WalCheckpoints._scheduleRun('delete', logger); + sinon.assert.calledOnce(pragma); + }); + + it('replaces a 30s commit timer with a shorter 5s timer', () => { + const pragma = sinon.stub(); + WalCheckpoints.setOnCheckpointNeeded(pragma); + + WalCheckpoints._scheduleRun('commit', logger); + + sandbox.clock.tick(4_999); + sinon.assert.notCalled(pragma); + + WalCheckpoints._scheduleRun('delete', logger); + sinon.assert.notCalled(pragma); + + sandbox.clock.tick(1); + sinon.assert.calledOnce(pragma); + }); + + it('does not fire the original 30s commit timer after replacement', () => { + const pragma = sinon.stub(); + WalCheckpoints.setOnCheckpointNeeded(pragma); + + WalCheckpoints._scheduleRun('commit', logger); + + sandbox.clock.tick(29_999); + WalCheckpoints._scheduleRun('delete', logger); + sinon.assert.calledOnce(pragma); + + sandbox.clock.tick(1); + sinon.assert.calledOnce(pragma); + + sandbox.clock.tick(1); + sinon.assert.calledOnce(pragma); + }); + }); +});