Refactor backup media download progress tracking

This commit is contained in:
trevor-signal
2025-07-18 14:21:15 -04:00
committed by GitHub
parent 3775aa7ae4
commit a8a7dc8965
19 changed files with 668 additions and 102 deletions

View File

@@ -52,7 +52,11 @@ import * as Errors from '../types/errors';
import { assertDev, strictAssert } from '../util/assert';
import { combineNames } from '../util/combineNames';
import { consoleLogger } from '../util/consoleLogger';
import { dropNull, shallowConvertUndefinedToNull } from '../util/dropNull';
import {
dropNull,
shallowConvertUndefinedToNull,
type ShallowUndefinedToNull,
} from '../util/dropNull';
import { isNormalNumber } from '../util/isNormalNumber';
import { isNotNil } from '../util/isNotNil';
import { parseIntOrThrow } from '../util/parseIntOrThrow';
@@ -187,6 +191,7 @@ import type {
MessageTypeUnhydrated,
ServerMessageSearchResultType,
MessageCountBySchemaVersionType,
BackupAttachmentDownloadProgress,
} from './Interface';
import {
AttachmentDownloadSource,
@@ -298,22 +303,37 @@ type StickerPackRow = InstalledStickerPackRow &
stickers: string;
title: string;
}>;
type AttachmentDownloadJobRow = Readonly<{
messageId: string;
attachmentType: string;
attachmentSignature: string;
receivedAt: number;
sentAt: number;
contentType: string;
size: number;
active: number;
attempts: number;
retryAfter: number;
lastAttemptTimestamp: number;
const ATTACHMENT_DOWNLOADS_COLUMNS: ReadonlyArray<
| keyof Omit<AttachmentDownloadJobType, 'attachment' | 'isManualDownload'>
| 'attachmentJson'
> = [
'messageId',
'attachmentType',
'attachmentSignature',
'receivedAt',
'sentAt',
'contentType',
'size',
'active',
'attempts',
'retryAfter',
'lastAttemptTimestamp',
'attachmentJson',
'ciphertextSize',
'originalSource',
'source',
] as const;
type AttachmentDownloadJobRow = Omit<
ShallowUndefinedToNull<AttachmentDownloadJobType>,
// TODO: DESKTOP-8995
'attachment' | 'contentType' | 'active' | 'isManualDownload'
> & {
attachmentJson: string;
ciphertextSize: number;
source: string;
}>;
contentType: string;
active: 0 | 1;
};
// Because we can't force this module to conform to an interface, we narrow our exports
// to this one default export, which does conform to the interface.
@@ -445,7 +465,7 @@ export const DataReader: ServerReadableInterface = {
getStatisticsForLogging,
getBackupCdnObjectMetadata,
getSizeOfPendingBackupAttachmentDownloadJobs,
getBackupAttachmentDownloadProgress,
getAttachmentReferencesForMessages,
getMessageCountBySchemaVersion,
getMessageSampleForSchemaVersion,
@@ -581,6 +601,7 @@ export const DataWriter: ServerWritableInterface = {
removeAttachmentDownloadJob,
removeAttachmentDownloadJobsForMessage,
removeAllBackupAttachmentDownloadJobs,
resetBackupAttachmentDownloadStats,
getNextAttachmentBackupJobs,
saveAttachmentBackupJob,
@@ -5477,16 +5498,28 @@ function removeAllBackupAttachmentDownloadJobs(db: WritableDB): void {
db.prepare(query).run(params);
}
function getSizeOfPendingBackupAttachmentDownloadJobs(db: ReadableDB): number {
function resetBackupAttachmentDownloadStats(db: WritableDB): void {
const [query, params] = sql`
SELECT SUM(ciphertextSize) FROM attachment_downloads
WHERE source = ${AttachmentDownloadSource.BACKUP_IMPORT};`;
INSERT OR REPLACE INTO attachment_downloads_backup_stats
(id, totalBytes, completedBytes)
VALUES
(0,0,0);
`;
db.prepare(query).run(params);
}
function getBackupAttachmentDownloadProgress(
db: ReadableDB
): BackupAttachmentDownloadProgress {
const [query, params] = sql`
SELECT totalBytes, completedBytes FROM attachment_downloads_backup_stats
WHERE id = 0;
`;
return (
db
.prepare(query, {
pluck: true,
})
.get<number>(params) ?? 0
db.prepare(query).get<BackupAttachmentDownloadProgress>(params) ?? {
totalBytes: 0,
completedBytes: 0,
}
);
}
@@ -5642,41 +5675,42 @@ function saveAttachmentDownloadJob(
logger.warn('saveAttachmentDownloadJob: message does not exist, bailing');
return;
}
const jobToInsert: AttachmentDownloadJobRow = {
messageId: job.messageId,
attachmentType: job.attachmentType,
attachmentSignature: job.attachmentSignature,
receivedAt: job.receivedAt,
sentAt: job.sentAt,
contentType: job.contentType,
size: job.size,
active: job.active ? 1 : 0,
attempts: job.attempts,
retryAfter: job.retryAfter,
lastAttemptTimestamp: job.lastAttemptTimestamp,
attachmentJson: objectToJSON(job.attachment),
ciphertextSize: job.ciphertextSize,
originalSource: job.originalSource,
source: job.source,
} as const satisfies Record<
(typeof ATTACHMENT_DOWNLOADS_COLUMNS)[number],
unknown
>;
const [insertQuery, insertParams] = sql`
INSERT OR REPLACE INTO attachment_downloads (
messageId,
attachmentType,
attachmentSignature,
receivedAt,
sentAt,
contentType,
size,
active,
attempts,
retryAfter,
lastAttemptTimestamp,
attachmentJson,
ciphertextSize,
source
) VALUES (
${job.messageId},
${job.attachmentType},
${job.attachmentSignature},
${job.receivedAt},
${job.sentAt},
${job.contentType},
${job.size},
${job.active ? 1 : 0},
${job.attempts},
${job.retryAfter},
${job.lastAttemptTimestamp},
${objectToJSON(job.attachment)},
${job.ciphertextSize},
${job.source}
);
`;
db.prepare(insertQuery).run(insertParams);
db.prepare(
`
INSERT INTO attachment_downloads
(${ATTACHMENT_DOWNLOADS_COLUMNS.join(', ')})
VALUES
(${ATTACHMENT_DOWNLOADS_COLUMNS.map(name => `$${name}`).join(', ')})
ON CONFLICT DO UPDATE SET
-- preserve originalSource
${ATTACHMENT_DOWNLOADS_COLUMNS.filter(
name => name !== 'originalSource'
)
.map(name => `${name} = $${name}`)
.join(', ')}
`
).run(jobToInsert);
})();
}
@@ -7533,6 +7567,7 @@ function removeAll(db: WritableDB): void {
DELETE FROM attachment_downloads;
DELETE FROM attachment_backup_jobs;
DELETE FROM attachment_downloads_backup_stats;
DELETE FROM backup_cdn_object_metadata;
DELETE FROM badgeImageFiles;
DELETE FROM badges;
@@ -7549,11 +7584,13 @@ function removeAll(db: WritableDB): void {
DELETE FROM items;
DELETE FROM jobs;
DELETE FROM kyberPreKeys;
DELETE FROM message_attachments;
DELETE FROM messages_fts;
DELETE FROM messages;
DELETE FROM notificationProfiles;
DELETE FROM preKeys;
DELETE FROM reactions;
DELETE FROM recentGifs;
DELETE FROM senderKeys;
DELETE FROM sendLogMessageIds;
DELETE FROM sendLogPayloads;
@@ -7569,10 +7606,10 @@ function removeAll(db: WritableDB): void {
DELETE FROM syncTasks;
DELETE FROM unprocessed;
DELETE FROM uninstalled_sticker_packs;
DELETE FROM message_attachments;
INSERT INTO messages_fts(messages_fts) VALUES('optimize');
--- Re-create the messages delete trigger
--- See migration 45
CREATE TRIGGER messages_on_delete AFTER DELETE ON messages BEGIN
@@ -7588,6 +7625,8 @@ function removeAll(db: WritableDB): void {
DELETE FROM storyReads WHERE storyId = old.storyId;
END;
`);
resetBackupAttachmentDownloadStats(db);
})();
}