mirror of
https://github.com/signalapp/Signal-Desktop.git
synced 2026-04-02 00:07:56 +01:00
Serialize sql args/results
Co-authored-by: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com>
This commit is contained in:
@@ -10,8 +10,8 @@ import { remove as removeEphemeralConfig } from './ephemeral_config.main.js';
|
||||
let sql:
|
||||
| Pick<
|
||||
MainSQL,
|
||||
| 'sqlRead'
|
||||
| 'sqlWrite'
|
||||
| 'sqlReadSerialized'
|
||||
| 'sqlWriteSerialized'
|
||||
| 'pauseWriteAccess'
|
||||
| 'resumeWriteAccess'
|
||||
| 'removeDB'
|
||||
@@ -57,21 +57,21 @@ export function initialize(mainSQL: typeof sql): void {
|
||||
|
||||
ipcMain.handle(
|
||||
SQL_READ_KEY,
|
||||
wrapResult(function ipcSqlReadHandler(_event, callName, ...args) {
|
||||
wrapResult(function ipcSqlReadHandler(_event, callName, serialized) {
|
||||
if (!sql) {
|
||||
throw new Error(`${SQL_READ_KEY}: Not yet initialized!`);
|
||||
}
|
||||
return sql.sqlRead(callName, ...args);
|
||||
return sql.sqlReadSerialized(callName, serialized);
|
||||
})
|
||||
);
|
||||
|
||||
ipcMain.handle(
|
||||
SQL_WRITE_KEY,
|
||||
wrapResult(function ipcSqlWriteHandler(_event, callName, ...args) {
|
||||
wrapResult(function ipcSqlWriteHandler(_event, callName, serialized) {
|
||||
if (!sql) {
|
||||
throw new Error(`${SQL_WRITE_KEY}: Not yet initialized!`);
|
||||
}
|
||||
return sql.sqlWrite(callName, ...args);
|
||||
return sql.sqlWriteSerialized(callName, serialized);
|
||||
})
|
||||
);
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
import { ipcRenderer } from 'electron';
|
||||
import { serialize, deserialize } from 'node:v8';
|
||||
import { createLogger } from '../logging/log.std.js';
|
||||
import { runTaskWithTimeout } from '../textsecure/TaskWithTimeout.std.js';
|
||||
import { explodePromise } from '../util/explodePromise.std.js';
|
||||
@@ -47,11 +48,11 @@ export async function ipcInvoke<T>(
|
||||
activeJobCount += 1;
|
||||
return runTaskWithTimeout(async () => {
|
||||
try {
|
||||
const result = await ipcRenderer.invoke(channel, name, ...args);
|
||||
const result = await ipcRenderer.invoke(channel, name, serialize(args));
|
||||
if (!result.ok) {
|
||||
throw result.error;
|
||||
}
|
||||
return result.value;
|
||||
return deserialize(result.value);
|
||||
} finally {
|
||||
activeJobCount -= 1;
|
||||
if (activeJobCount === 0) {
|
||||
|
||||
@@ -45,14 +45,28 @@ export type WorkerRequest = Readonly<
|
||||
}
|
||||
| {
|
||||
type: 'sqlCall:read';
|
||||
encoding: 'js';
|
||||
method: keyof ServerReadableDirectInterface;
|
||||
args: ReadonlyArray<unknown>;
|
||||
}
|
||||
| {
|
||||
type: 'sqlCall:read';
|
||||
encoding: 'serialized';
|
||||
method: keyof ServerReadableDirectInterface;
|
||||
data: Uint8Array<ArrayBuffer>;
|
||||
}
|
||||
| {
|
||||
type: 'sqlCall:write';
|
||||
encoding: 'js';
|
||||
method: keyof ServerWritableDirectInterface;
|
||||
args: ReadonlyArray<unknown>;
|
||||
}
|
||||
| {
|
||||
type: 'sqlCall:write';
|
||||
encoding: 'serialized';
|
||||
method: keyof ServerWritableDirectInterface;
|
||||
data: Uint8Array<ArrayBuffer>;
|
||||
}
|
||||
>;
|
||||
|
||||
export type WrappedWorkerRequest = Readonly<{
|
||||
@@ -272,12 +286,37 @@ export class MainSQL {
|
||||
method: Method,
|
||||
...args: Parameters<ServerReadableDirectInterface[Method]>
|
||||
): Promise<ReturnType<ServerReadableDirectInterface[Method]>> {
|
||||
return this.#sqlRead({
|
||||
type: 'sqlCall:read',
|
||||
method,
|
||||
encoding: 'js',
|
||||
args,
|
||||
}) as Promise<ReturnType<ServerReadableDirectInterface[Method]>>;
|
||||
}
|
||||
|
||||
public async sqlReadSerialized<
|
||||
Method extends keyof ServerReadableDirectInterface,
|
||||
>(
|
||||
method: Method,
|
||||
data: Uint8Array<ArrayBuffer>
|
||||
): Promise<Uint8Array<ArrayBuffer>> {
|
||||
return this.#sqlRead({
|
||||
type: 'sqlCall:read',
|
||||
method,
|
||||
encoding: 'serialized',
|
||||
data,
|
||||
}) as Promise<Uint8Array<ArrayBuffer>>;
|
||||
}
|
||||
|
||||
async #sqlRead(
|
||||
request: Extract<WorkerRequest, { type: 'sqlCall:read' }>
|
||||
): Promise<unknown> {
|
||||
type SqlCallResult = Readonly<{
|
||||
result: ReturnType<ServerReadableDirectInterface[Method]>;
|
||||
result: unknown;
|
||||
duration: number;
|
||||
}>;
|
||||
|
||||
if (method === 'pageBackupMessages' && this.#pauseWaiters == null) {
|
||||
if (request.method === 'pageBackupMessages' && this.#pauseWaiters == null) {
|
||||
throw new Error(
|
||||
'pageBackupMessages can only run after pauseWriteAccess()'
|
||||
);
|
||||
@@ -286,18 +325,17 @@ export class MainSQL {
|
||||
// pageMessages runs over several queries and needs to have access to
|
||||
// the same temporary table, it also creates temporary insert/update
|
||||
// triggers so it has to run on the same connection that updates the tables
|
||||
const isPaging = PAGING_QUERIES.has(method);
|
||||
const isPaging = PAGING_QUERIES.has(request.method);
|
||||
|
||||
const entry = isPaging ? this.#pool[0] : this.#getWorker();
|
||||
strictAssert(entry != null, 'Must have a pool entry');
|
||||
|
||||
const { result, duration } = await this.#send<SqlCallResult>(entry, {
|
||||
type: 'sqlCall:read',
|
||||
method,
|
||||
args,
|
||||
});
|
||||
const { result, duration } = await this.#send<SqlCallResult>(
|
||||
entry,
|
||||
request
|
||||
);
|
||||
|
||||
this.#traceDuration(method, duration);
|
||||
this.#traceDuration(request.method, duration);
|
||||
|
||||
return result;
|
||||
}
|
||||
@@ -306,9 +344,33 @@ export class MainSQL {
|
||||
method: Method,
|
||||
...args: Parameters<ServerWritableDirectInterface[Method]>
|
||||
): Promise<ReturnType<ServerWritableDirectInterface[Method]>> {
|
||||
type Result = ReturnType<ServerWritableDirectInterface[Method]>;
|
||||
return this.#sqlWrite({
|
||||
type: 'sqlCall:write',
|
||||
method,
|
||||
encoding: 'js',
|
||||
args,
|
||||
}) as Promise<ReturnType<ServerWritableDirectInterface[Method]>>;
|
||||
}
|
||||
|
||||
public async sqlWriteSerialized<
|
||||
Method extends keyof ServerWritableDirectInterface,
|
||||
>(
|
||||
method: Method,
|
||||
data: Uint8Array<ArrayBuffer>
|
||||
): Promise<Uint8Array<ArrayBuffer>> {
|
||||
return this.#sqlWrite({
|
||||
type: 'sqlCall:write',
|
||||
method,
|
||||
encoding: 'serialized',
|
||||
data,
|
||||
}) as Promise<Uint8Array<ArrayBuffer>>;
|
||||
}
|
||||
|
||||
async #sqlWrite(
|
||||
request: Extract<WorkerRequest, { type: 'sqlCall:write' }>
|
||||
): Promise<unknown> {
|
||||
type SqlCallResult = Readonly<{
|
||||
result: Result;
|
||||
result: unknown;
|
||||
duration: number;
|
||||
}>;
|
||||
|
||||
@@ -322,13 +384,12 @@ export class MainSQL {
|
||||
const primary = this.#pool[0];
|
||||
strictAssert(primary, 'Missing primary');
|
||||
|
||||
const { result, duration } = await this.#send<SqlCallResult>(primary, {
|
||||
type: 'sqlCall:write',
|
||||
method,
|
||||
args,
|
||||
});
|
||||
const { result, duration } = await this.#send<SqlCallResult>(
|
||||
primary,
|
||||
request
|
||||
);
|
||||
|
||||
this.#traceDuration(method, duration);
|
||||
this.#traceDuration(request.method, duration);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
import { parentPort } from 'node:worker_threads';
|
||||
import { serialize, deserialize } from 'node:v8';
|
||||
|
||||
import type {
|
||||
WrappedWorkerRequest,
|
||||
@@ -109,11 +110,17 @@ const onMessage = (
|
||||
throw new Error(`Invalid sql method: ${request.method} ${method}`);
|
||||
}
|
||||
|
||||
const start = performance.now();
|
||||
const result = method(db, ...request.args);
|
||||
const end = performance.now();
|
||||
const args =
|
||||
request.encoding === 'js' ? request.args : deserialize(request.data);
|
||||
|
||||
respond(seq, { result, duration: end - start });
|
||||
const start = performance.now();
|
||||
const result = method(db, ...args);
|
||||
const duration = performance.now() - start;
|
||||
|
||||
respond(seq, {
|
||||
result: request.encoding === 'js' ? result : serialize(result),
|
||||
duration,
|
||||
});
|
||||
} else {
|
||||
throw new Error('Unexpected request type');
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user