Setup scheduled WAL checkpoints

Co-authored-by: Jamie <113370520+jamiebuilds-signal@users.noreply.github.com>
This commit is contained in:
automated-signal
2026-05-27 15:51:29 -05:00
committed by GitHub
parent 0268716856
commit b4d6423b3f
7 changed files with 443 additions and 6 deletions
+1 -1
View File
@@ -122,7 +122,7 @@
"@signalapp/libsignal-client": "0.94.1",
"@signalapp/mute-state-change": "workspace:1.0.0",
"@signalapp/ringrtc": "2.69.0",
"@signalapp/sqlcipher": "3.2.1",
"@signalapp/sqlcipher": "3.3.5",
"@signalapp/windows-ucv": "1.0.1",
"google-libphonenumber": "3.2.44"
},
+5 -5
View File
@@ -79,8 +79,8 @@ importers:
specifier: 2.69.0
version: 2.69.0
'@signalapp/sqlcipher':
specifier: 3.2.1
version: 3.2.1
specifier: 3.3.5
version: 3.3.5
'@signalapp/windows-ucv':
specifier: 1.0.1
version: 1.0.1
@@ -4042,8 +4042,8 @@ packages:
resolution: {integrity: sha512-yugpBTyFvmeV9//DdCIj1c1pc23GdLsRR0jHsfaDKVyDKvaa1JQ7aa2xjUZjSZNwqQK+X2y6EcUSErr0Hba4Ww==}
hasBin: true
'@signalapp/sqlcipher@3.2.1':
resolution: {integrity: sha512-QStHnLZiYp7YvdR6CnDO7TxdoSJQCeF02UxkyJE7+l/qES3wvwkc1IuAxSYCZEI8gS4PUBTYXSX70yod7PS9Mg==}
'@signalapp/sqlcipher@3.3.5':
resolution: {integrity: sha512-0kkHQixiaFOFYCXP6J8zsvXeq7REf5nucX+BMY8Gy5E6F2BEM2Ap9NsDNaB+wHZ6QqqsItQRvSE4PaYhTtWGIg==}
'@signalapp/windows-ucv@1.0.1':
resolution: {integrity: sha512-tArRaDzAFXQ6BcYseUtd9bp52/sb5C/zbCIoNkDH+FUoxnZRvX25Fv2HHbH7Xe2+bcdb4+DQmyoUeHMhTUxAmA==}
@@ -13935,7 +13935,7 @@ snapshots:
transitivePeerDependencies:
- supports-color
'@signalapp/sqlcipher@3.2.1':
'@signalapp/sqlcipher@3.3.5':
dependencies:
node-addon-api: 8.5.0
node-gyp-build: 4.8.4
+7
View File
@@ -306,6 +306,7 @@ import { getFilePathsReferencedByMessage } from '../util/messageFilePaths.std.ts
import { createMessagesOnInsertTrigger } from './migrations/1500-search-polls.std.ts';
import { isValidPlaintextHash } from '../types/Crypto.std.ts';
import { Emoji } from '../axo/emoji.std.ts';
import { WalCheckpoints } from './WalCheckpoints.std.ts';
const {
forEach,
@@ -865,6 +866,7 @@ function switchToWAL(db: WritableDB): void {
// https://sqlite.org/wal.html
db.pragma('journal_mode = WAL');
db.pragma('synchronous = FULL');
WalCheckpoints.setupCommitHook(db, logger);
}
function migrateSchemaVersion(db: WritableDB): void {
@@ -1025,6 +1027,7 @@ export function initialize({
// Only the first worker gets to upgrade the schema. The rest just folow.
if (isPrimary) {
updateSchema(db, logger);
WalCheckpoints.setupDeleteTriggers(db, logger);
}
// test database
@@ -1057,6 +1060,10 @@ function closeReadable(db: ReadableDB): void {
}
function closeWritable(db: WritableDB): void {
// Flush any pending WAL checkpoints before database close
// TODO: Do we need the retry behavior here?
WalCheckpoints.runImmediately(db, logger, 'close');
// SQLLite documentation suggests that we run `PRAGMA optimize` right
// before closing the database connection.
db.pragma('optimize');
+202
View File
@@ -0,0 +1,202 @@
// Copyright 2026 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import type { LoggerType } from '../types/Logging.std.ts';
import type { ReadableDB, WritableDB } from './Interface.std.ts';
import * as Errors from '../types/errors.std.ts';
import { sql } from './util.std.ts';
/**
* The default automatic checkpointing behavior of sqlite only checkpoints every 1000 pages
* and on database close. Which means that changes can sit in the log for potentially a long time.
*
* To make sure that the WAL is flushed soon after every commit, we call `sqlite3_wal_hook()`
* (which replaces the automatic checkpointing behavior) with our own callback.
*
* We will still run a checkpoint every 1000 pages using TRUNCATE instead of PASSIVE.
*
* But we will also run a checkpoint after every commit, throttled to every 30 seconds.
*
* We also setup TEMP trigger's AFTER DELETE on every table, which reschedules
* the next checkpoint to every 5 seconds.
*/
export namespace WalCheckpoints {
const PAGE_THRESHOLD = 1000;
const THROTTLE_MS_AFTER_COMMIT = 30_000; // 30s
const THROTTLE_MS_AFTER_DELETE = 5_000; // 5s
let onCheckpointNeeded: ((reason: string) => void) | null = null;
let lastRunAt = 0;
let pendingRunWhenIdle = false;
let hasDeletesSinceLastRun = false;
let scheduledTimer: ReturnType<typeof setTimeout> | null = null;
export function setOnCheckpointNeeded(
callback: (reason: string) => void
): void {
onCheckpointNeeded = callback;
}
/** @testexport */
export function _reset(): void {
onCheckpointNeeded = null;
if (scheduledTimer != null) {
clearTimeout(scheduledTimer);
scheduledTimer = null;
}
lastRunAt = 0;
pendingRunWhenIdle = false;
hasDeletesSinceLastRun = false;
}
function run(
db: WritableDB,
logger: LoggerType,
attempts: number,
reason: string,
callback: () => void
) {
try {
db.pragma('wal_checkpoint(TRUNCATE)');
callback();
} catch (error) {
if (error.code !== 'SQLITE_LOCKED') {
logger.error(
`WalCheckpoints.run: Unexpected error (attempts: ${attempts}, reason: ${reason})`,
Errors.toLogFormat(error)
);
return;
}
// TODO: Are there any errors that we shouldn't retry?
logger.warn(
`WalCheckpoints.run: Database is locked, retrying (attempts: ${attempts}, reason: ${reason})`,
Errors.toLogFormat(error)
);
// TODO: This should probably try again faster with backoff or something
setTimeout(() => {
run(db, logger, attempts + 1, reason, callback);
}, 1000);
}
}
export function runImmediately(
db: WritableDB,
logger: LoggerType,
reason: string
): void {
if (scheduledTimer != null) {
clearTimeout(scheduledTimer);
scheduledTimer = null;
}
run(db, logger, 0, reason, () => {
lastRunAt = Date.now();
pendingRunWhenIdle = false;
hasDeletesSinceLastRun = false;
});
}
function runWhenIdle(logger: LoggerType, reason: string): void {
if (onCheckpointNeeded == null) {
logger.error(
'WalCheckpoints.runWhenIdle: setOnCheckpointNeeded has not been called'
);
return;
}
if (pendingRunWhenIdle) {
return;
}
pendingRunWhenIdle = true;
onCheckpointNeeded(reason);
}
/** @testexport */
export function _scheduleRun(
event: 'commit' | 'delete',
logger: LoggerType
): void {
if (pendingRunWhenIdle) {
return;
}
const prevScheduledForDelete = hasDeletesSinceLastRun;
const needScheduledForDelete = event === 'delete';
if (event === 'delete') {
hasDeletesSinceLastRun = true;
}
const elapsedMs = Date.now() - lastRunAt;
const throttleMs = hasDeletesSinceLastRun
? THROTTLE_MS_AFTER_DELETE
: THROTTLE_MS_AFTER_COMMIT;
if (elapsedMs >= throttleMs) {
if (scheduledTimer != null) {
clearTimeout(scheduledTimer);
scheduledTimer = null;
}
runWhenIdle(logger, event);
return;
}
if (scheduledTimer != null) {
if (prevScheduledForDelete || !needScheduledForDelete) {
return;
}
clearTimeout(scheduledTimer);
}
scheduledTimer = setTimeout(() => {
scheduledTimer = null;
runWhenIdle(logger, event);
}, throttleMs - elapsedMs);
}
export function setupCommitHook(db: WritableDB, logger: LoggerType): void {
db.setWalHook((_dbName, pageCount) => {
if (pageCount >= PAGE_THRESHOLD) {
// TODO: Should we run `PRAGMA wal_checkpoint(PASSIVE)` here like automatic checkpoints do?
// We could still call runWhenIdle() to get a TRUNCATE?
runWhenIdle(logger, 'page-threshold');
} else {
_scheduleRun('commit', logger);
}
});
}
function getAllTableNames(db: ReadableDB): ReadonlyArray<string> {
const [query, params] = sql`
SELECT name FROM sqlite_master
WHERE type = 'table'
AND name NOT LIKE 'sqlite_%'
AND name NOT LIKE 'messages_fts_%'
AND sql NOT LIKE 'CREATE VIRTUAL TABLE%'
`;
return db.prepare(query, { pluck: true }).all(params);
}
export function setupDeleteTriggers(
db: WritableDB,
logger: LoggerType
): void {
db.createFunction('_wal_checkpoint_on_delete', () => {
_scheduleRun('delete', logger);
});
const tableNames = getAllTableNames(db);
for (const tableName of tableNames) {
db.exec(`
CREATE TEMP TRIGGER IF NOT EXISTS _wal_checkpoint_${tableName}_after_delete
AFTER DELETE ON "${tableName}"
BEGIN
SELECT _wal_checkpoint_on_delete();
END;
`);
}
}
}
+35
View File
@@ -44,6 +44,10 @@ export type WorkerRequest = Readonly<
| {
type: 'close' | 'removeDB';
}
| {
type: 'walCheckpoint';
reason: string;
}
| {
type: 'sqlCall:read';
encoding: 'js';
@@ -96,6 +100,10 @@ export type WrappedWorkerResponse =
// oxlint-disable-next-line typescript/no-explicit-any
response: any;
}>
| Readonly<{
type: 'walCheckpointNeeded';
reason: string;
}>
| WrappedWorkerLogEntry;
type ResponseEntry<T> = {
@@ -148,6 +156,8 @@ export class MainSQL {
// oxlint-disable-next-line typescript/no-explicit-any
readonly #onResponse = new Map<number, ResponseEntry<any>>();
#checkpointPendingReason: string | null = null;
readonly #shouldLogQueryTime: (queryName: string) => boolean;
#shouldTrackQueryStats = false;
@@ -452,9 +462,28 @@ export class MainSQL {
} finally {
// oxlint-disable-next-line no-param-reassign
entry.load -= 1;
this.#maybeRunCheckpoint();
}
}
#maybeRunCheckpoint(): void {
if (!this.#isReady || this.#checkpointPendingReason == null) {
return;
}
for (const entry of this.#pool) {
if (entry.load !== 0) {
return;
}
}
const reason = this.#checkpointPendingReason;
this.#checkpointPendingReason = null;
const primary = this.#pool[0];
strictAssert(primary, 'Missing primary');
void this.#send(primary, { type: 'walCheckpoint', reason });
}
async #terminate(request: WorkerRequest): Promise<void> {
const primary = this.#pool[0];
strictAssert(primary, 'Missing primary');
@@ -564,6 +593,12 @@ export class MainSQL {
return;
}
if (wrappedResponse.type === 'walCheckpointNeeded') {
this.#checkpointPendingReason = wrappedResponse.reason;
this.#maybeRunCheckpoint();
return;
}
const { seq, error, errorKind, response } = wrappedResponse;
const entry = this.#onResponse.get(seq);
+20
View File
@@ -12,6 +12,7 @@ import type { WritableDB } from './Interface.std.ts';
import { initialize, DataReader, DataWriter, removeDB } from './Server.node.ts';
import { SqliteErrorKind, parseSqliteError } from './errors.std.ts';
import { sqlLogger as logger } from './sqlLogger.node.ts';
import { WalCheckpoints } from './WalCheckpoints.std.ts';
if (!parentPort) {
throw new Error('Must run as a worker thread');
@@ -43,6 +44,17 @@ const onMessage = (
if (request.type === 'init') {
isPrimary = request.isPrimary;
isRemoved = false;
if (isPrimary) {
WalCheckpoints.setOnCheckpointNeeded(reason => {
const message: WrappedWorkerResponse = {
type: 'walCheckpointNeeded',
reason,
};
port.postMessage(message);
});
}
db = initialize({
...request.options,
isPrimary,
@@ -101,6 +113,14 @@ const onMessage = (
return;
}
if (request.type === 'walCheckpoint') {
if (db != null) {
WalCheckpoints.runImmediately(db, logger, request.reason);
}
respond(seq, undefined);
return;
}
if (request.type === 'sqlCall:read' || request.type === 'sqlCall:write') {
const DataInterface =
request.type === 'sqlCall:read' ? DataReader : DataWriter;
+173
View File
@@ -0,0 +1,173 @@
// Copyright 2026 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import * as sinon from 'sinon';
import type { LoggerType } from '../../types/Logging.std.ts';
import { WalCheckpoints } from '../../sql/WalCheckpoints.std.ts';
const logger: LoggerType = {
warn: () => null,
error: () => null,
fatal: () => null,
info: () => null,
debug: () => null,
trace: () => null,
child: () => logger,
};
describe('WalCheckpoints', () => {
let sandbox: sinon.SinonSandbox;
beforeEach(() => {
sandbox = sinon.createSandbox({ useFakeTimers: true });
WalCheckpoints._reset();
});
afterEach(() => {
WalCheckpoints._reset();
sandbox.restore();
});
describe('_scheduleRun (commit)', () => {
it('schedules a timer for 30s on first commit', () => {
const pragma = sinon.stub();
WalCheckpoints.setOnCheckpointNeeded(pragma);
WalCheckpoints._scheduleRun('commit', logger);
sinon.assert.notCalled(pragma);
sandbox.clock.tick(29_999);
sinon.assert.notCalled(pragma);
sandbox.clock.tick(1);
sinon.assert.calledOnce(pragma);
});
it('runs immediately when elapsed >= 30s', async () => {
const pragma = sinon.stub();
WalCheckpoints.setOnCheckpointNeeded(pragma);
await sandbox.clock.tickAsync(30_000);
WalCheckpoints._scheduleRun('commit', logger);
sinon.assert.calledOnce(pragma);
});
it('does not reschedule when a commit timer is already pending', () => {
const pragma = sinon.stub();
WalCheckpoints.setOnCheckpointNeeded(pragma);
WalCheckpoints._scheduleRun('commit', logger);
sandbox.clock.tick(29_999);
WalCheckpoints._scheduleRun('commit', logger);
sinon.assert.notCalled(pragma);
sandbox.clock.tick(1);
// Timer fires exactly once, not twice
sinon.assert.calledOnce(pragma);
});
});
describe('_scheduleRun (delete)', () => {
it('schedules a timer for 5s on first delete', () => {
const pragma = sinon.stub();
WalCheckpoints.setOnCheckpointNeeded(pragma);
WalCheckpoints._scheduleRun('delete', logger);
sinon.assert.notCalled(pragma);
sandbox.clock.tick(4_999);
sinon.assert.notCalled(pragma);
sandbox.clock.tick(1);
sinon.assert.calledOnce(pragma);
});
it('runs immediately when elapsed >= 5s', async () => {
const pragma = sinon.stub();
WalCheckpoints.setOnCheckpointNeeded(pragma);
await sandbox.clock.tickAsync(5_000);
WalCheckpoints._scheduleRun('delete', logger);
sinon.assert.calledOnce(pragma);
});
it('does not reschedule when a delete timer is already pending', () => {
const pragma = sinon.stub();
WalCheckpoints.setOnCheckpointNeeded(pragma);
WalCheckpoints._scheduleRun('delete', logger);
sandbox.clock.tick(4_999);
WalCheckpoints._scheduleRun('delete', logger);
sinon.assert.notCalled(pragma);
sandbox.clock.tick(5_000);
sinon.assert.calledOnce(pragma);
});
it('does not reschedule when a commit timer was upgraded to delete', () => {
const pragma = sinon.stub();
WalCheckpoints.setOnCheckpointNeeded(pragma);
WalCheckpoints._scheduleRun('delete', logger);
sandbox.clock.tick(4_999);
WalCheckpoints._scheduleRun('commit', logger);
sinon.assert.notCalled(pragma);
sandbox.clock.tick(1);
sinon.assert.calledOnce(pragma);
});
});
describe('delete upgrading a pending commit timer', () => {
it('fires immediately if already past 5s', () => {
const pragma = sinon.stub();
WalCheckpoints.setOnCheckpointNeeded(pragma);
WalCheckpoints._scheduleRun('commit', logger);
sandbox.clock.tick(5_000);
sinon.assert.notCalled(pragma);
WalCheckpoints._scheduleRun('delete', logger);
sinon.assert.calledOnce(pragma);
});
it('replaces a 30s commit timer with a shorter 5s timer', () => {
const pragma = sinon.stub();
WalCheckpoints.setOnCheckpointNeeded(pragma);
WalCheckpoints._scheduleRun('commit', logger);
sandbox.clock.tick(4_999);
sinon.assert.notCalled(pragma);
WalCheckpoints._scheduleRun('delete', logger);
sinon.assert.notCalled(pragma);
sandbox.clock.tick(1);
sinon.assert.calledOnce(pragma);
});
it('does not fire the original 30s commit timer after replacement', () => {
const pragma = sinon.stub();
WalCheckpoints.setOnCheckpointNeeded(pragma);
WalCheckpoints._scheduleRun('commit', logger);
sandbox.clock.tick(29_999);
WalCheckpoints._scheduleRun('delete', logger);
sinon.assert.calledOnce(pragma);
sandbox.clock.tick(1);
sinon.assert.calledOnce(pragma);
sandbox.clock.tick(1);
sinon.assert.calledOnce(pragma);
});
});
});