Serialize sql args/results

This commit is contained in:
Fedor Indutny
2026-03-24 12:13:39 -07:00
committed by GitHub
parent c6e9f5668a
commit 97cf9a90fb
4 changed files with 98 additions and 29 deletions

View File

@@ -10,8 +10,8 @@ import { remove as removeEphemeralConfig } from './ephemeral_config.main.js';
let sql:
| Pick<
MainSQL,
| 'sqlRead'
| 'sqlWrite'
| 'sqlReadSerialized'
| 'sqlWriteSerialized'
| 'pauseWriteAccess'
| 'resumeWriteAccess'
| 'removeDB'
@@ -57,21 +57,21 @@ export function initialize(mainSQL: typeof sql): void {
ipcMain.handle(
SQL_READ_KEY,
wrapResult(function ipcSqlReadHandler(_event, callName, ...args) {
wrapResult(function ipcSqlReadHandler(_event, callName, serialized) {
if (!sql) {
throw new Error(`${SQL_READ_KEY}: Not yet initialized!`);
}
return sql.sqlRead(callName, ...args);
return sql.sqlReadSerialized(callName, serialized);
})
);
ipcMain.handle(
SQL_WRITE_KEY,
wrapResult(function ipcSqlWriteHandler(_event, callName, ...args) {
wrapResult(function ipcSqlWriteHandler(_event, callName, serialized) {
if (!sql) {
throw new Error(`${SQL_WRITE_KEY}: Not yet initialized!`);
}
return sql.sqlWrite(callName, ...args);
return sql.sqlWriteSerialized(callName, serialized);
})
);

View File

@@ -2,6 +2,7 @@
// SPDX-License-Identifier: AGPL-3.0-only
import { ipcRenderer } from 'electron';
import { serialize, deserialize } from 'node:v8';
import { createLogger } from '../logging/log.std.js';
import { runTaskWithTimeout } from '../textsecure/TaskWithTimeout.std.js';
import { explodePromise } from '../util/explodePromise.std.js';
@@ -47,11 +48,11 @@ export async function ipcInvoke<T>(
activeJobCount += 1;
return runTaskWithTimeout(async () => {
try {
const result = await ipcRenderer.invoke(channel, name, ...args);
const result = await ipcRenderer.invoke(channel, name, serialize(args));
if (!result.ok) {
throw result.error;
}
return result.value;
return deserialize(result.value);
} finally {
activeJobCount -= 1;
if (activeJobCount === 0) {

View File

@@ -45,14 +45,28 @@ export type WorkerRequest = Readonly<
}
| {
type: 'sqlCall:read';
encoding: 'js';
method: keyof ServerReadableDirectInterface;
args: ReadonlyArray<unknown>;
}
| {
type: 'sqlCall:read';
encoding: 'serialized';
method: keyof ServerReadableDirectInterface;
data: Uint8Array<ArrayBuffer>;
}
| {
type: 'sqlCall:write';
encoding: 'js';
method: keyof ServerWritableDirectInterface;
args: ReadonlyArray<unknown>;
}
| {
type: 'sqlCall:write';
encoding: 'serialized';
method: keyof ServerWritableDirectInterface;
data: Uint8Array<ArrayBuffer>;
}
>;
export type WrappedWorkerRequest = Readonly<{
@@ -272,12 +286,37 @@ export class MainSQL {
method: Method,
...args: Parameters<ServerReadableDirectInterface[Method]>
): Promise<ReturnType<ServerReadableDirectInterface[Method]>> {
return this.#sqlRead({
type: 'sqlCall:read',
method,
encoding: 'js',
args,
}) as Promise<ReturnType<ServerReadableDirectInterface[Method]>>;
}
public async sqlReadSerialized<
Method extends keyof ServerReadableDirectInterface,
>(
method: Method,
data: Uint8Array<ArrayBuffer>
): Promise<Uint8Array<ArrayBuffer>> {
return this.#sqlRead({
type: 'sqlCall:read',
method,
encoding: 'serialized',
data,
}) as Promise<Uint8Array<ArrayBuffer>>;
}
async #sqlRead(
request: Extract<WorkerRequest, { type: 'sqlCall:read' }>
): Promise<unknown> {
type SqlCallResult = Readonly<{
result: ReturnType<ServerReadableDirectInterface[Method]>;
result: unknown;
duration: number;
}>;
if (method === 'pageBackupMessages' && this.#pauseWaiters == null) {
if (request.method === 'pageBackupMessages' && this.#pauseWaiters == null) {
throw new Error(
'pageBackupMessages can only run after pauseWriteAccess()'
);
@@ -286,18 +325,17 @@ export class MainSQL {
// pageMessages runs over several queries and needs to have access to
// the same temporary table, it also creates temporary insert/update
// triggers so it has to run on the same connection that updates the tables
const isPaging = PAGING_QUERIES.has(method);
const isPaging = PAGING_QUERIES.has(request.method);
const entry = isPaging ? this.#pool[0] : this.#getWorker();
strictAssert(entry != null, 'Must have a pool entry');
const { result, duration } = await this.#send<SqlCallResult>(entry, {
type: 'sqlCall:read',
method,
args,
});
const { result, duration } = await this.#send<SqlCallResult>(
entry,
request
);
this.#traceDuration(method, duration);
this.#traceDuration(request.method, duration);
return result;
}
@@ -306,9 +344,33 @@ export class MainSQL {
method: Method,
...args: Parameters<ServerWritableDirectInterface[Method]>
): Promise<ReturnType<ServerWritableDirectInterface[Method]>> {
type Result = ReturnType<ServerWritableDirectInterface[Method]>;
return this.#sqlWrite({
type: 'sqlCall:write',
method,
encoding: 'js',
args,
}) as Promise<ReturnType<ServerWritableDirectInterface[Method]>>;
}
public async sqlWriteSerialized<
Method extends keyof ServerWritableDirectInterface,
>(
method: Method,
data: Uint8Array<ArrayBuffer>
): Promise<Uint8Array<ArrayBuffer>> {
return this.#sqlWrite({
type: 'sqlCall:write',
method,
encoding: 'serialized',
data,
}) as Promise<Uint8Array<ArrayBuffer>>;
}
async #sqlWrite(
request: Extract<WorkerRequest, { type: 'sqlCall:write' }>
): Promise<unknown> {
type SqlCallResult = Readonly<{
result: Result;
result: unknown;
duration: number;
}>;
@@ -322,13 +384,12 @@ export class MainSQL {
const primary = this.#pool[0];
strictAssert(primary, 'Missing primary');
const { result, duration } = await this.#send<SqlCallResult>(primary, {
type: 'sqlCall:write',
method,
args,
});
const { result, duration } = await this.#send<SqlCallResult>(
primary,
request
);
this.#traceDuration(method, duration);
this.#traceDuration(request.method, duration);
return result;
}

View File

@@ -2,6 +2,7 @@
// SPDX-License-Identifier: AGPL-3.0-only
import { parentPort } from 'node:worker_threads';
import { serialize, deserialize } from 'node:v8';
import type {
WrappedWorkerRequest,
@@ -109,11 +110,17 @@ const onMessage = (
throw new Error(`Invalid sql method: ${request.method} ${method}`);
}
const start = performance.now();
const result = method(db, ...request.args);
const end = performance.now();
const args =
request.encoding === 'js' ? request.args : deserialize(request.data);
respond(seq, { result, duration: end - start });
const start = performance.now();
const result = method(db, ...args);
const duration = performance.now() - start;
respond(seq, {
result: request.encoding === 'js' ? result : serialize(result),
duration,
});
} else {
throw new Error('Unexpected request type');
}