mirror of
https://github.com/signalapp/Signal-Desktop.git
synced 2026-05-08 08:58:38 +01:00
Move message.getPropsForBubble and friends to selectors
This commit is contained in:
@@ -1,461 +0,0 @@
|
||||
// Copyright 2019-2020 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
/* global
|
||||
Whisper,
|
||||
Signal,
|
||||
setTimeout,
|
||||
clearTimeout,
|
||||
MessageController
|
||||
*/
|
||||
|
||||
const { isFunction, isNumber, omit } = require('lodash');
|
||||
const getGuid = require('uuid/v4');
|
||||
const {
|
||||
getMessageById,
|
||||
getNextAttachmentDownloadJobs,
|
||||
removeAttachmentDownloadJob,
|
||||
resetAttachmentDownloadPending,
|
||||
saveAttachmentDownloadJob,
|
||||
saveMessage,
|
||||
setAttachmentDownloadJobPending,
|
||||
} = require('../../ts/sql/Client').default;
|
||||
const { downloadAttachment } = require('../../ts/util/downloadAttachment');
|
||||
const { stringFromBytes } = require('../../ts/Crypto');
|
||||
|
||||
module.exports = {
|
||||
start,
|
||||
stop,
|
||||
addJob,
|
||||
};
|
||||
|
||||
const MAX_ATTACHMENT_JOB_PARALLELISM = 3;
|
||||
|
||||
const SECOND = 1000;
|
||||
const MINUTE = 60 * SECOND;
|
||||
const HOUR = 60 * MINUTE;
|
||||
const TICK_INTERVAL = MINUTE;
|
||||
|
||||
const RETRY_BACKOFF = {
|
||||
1: 30 * SECOND,
|
||||
2: 30 * MINUTE,
|
||||
3: 6 * HOUR,
|
||||
};
|
||||
|
||||
let enabled = false;
|
||||
let timeout;
|
||||
let getMessageReceiver;
|
||||
let logger;
|
||||
const _activeAttachmentDownloadJobs = {};
|
||||
|
||||
async function start(options = {}) {
|
||||
({ getMessageReceiver, logger } = options);
|
||||
if (!isFunction(getMessageReceiver)) {
|
||||
throw new Error(
|
||||
'attachment_downloads/start: getMessageReceiver must be a function'
|
||||
);
|
||||
}
|
||||
if (!logger) {
|
||||
throw new Error('attachment_downloads/start: logger must be provided!');
|
||||
}
|
||||
|
||||
logger.info('attachment_downloads/start: enabling');
|
||||
enabled = true;
|
||||
await resetAttachmentDownloadPending();
|
||||
|
||||
_tick();
|
||||
}
|
||||
|
||||
async function stop() {
|
||||
// If `.start()` wasn't called - the `logger` is `undefined`
|
||||
if (logger) {
|
||||
logger.info('attachment_downloads/stop: disabling');
|
||||
}
|
||||
enabled = false;
|
||||
if (timeout) {
|
||||
clearTimeout(timeout);
|
||||
timeout = null;
|
||||
}
|
||||
}
|
||||
|
||||
async function addJob(attachment, job = {}) {
|
||||
if (!attachment) {
|
||||
throw new Error('attachments_download/addJob: attachment is required');
|
||||
}
|
||||
|
||||
const { messageId, type, index } = job;
|
||||
if (!messageId) {
|
||||
throw new Error('attachments_download/addJob: job.messageId is required');
|
||||
}
|
||||
if (!type) {
|
||||
throw new Error('attachments_download/addJob: job.type is required');
|
||||
}
|
||||
if (!isNumber(index)) {
|
||||
throw new Error('attachments_download/addJob: index must be a number');
|
||||
}
|
||||
|
||||
const id = getGuid();
|
||||
const timestamp = Date.now();
|
||||
const toSave = {
|
||||
...job,
|
||||
id,
|
||||
attachment,
|
||||
timestamp,
|
||||
pending: 0,
|
||||
attempts: 0,
|
||||
};
|
||||
|
||||
await saveAttachmentDownloadJob(toSave);
|
||||
|
||||
_maybeStartJob();
|
||||
|
||||
return {
|
||||
...attachment,
|
||||
pending: true,
|
||||
downloadJobId: id,
|
||||
};
|
||||
}
|
||||
|
||||
async function _tick() {
|
||||
if (timeout) {
|
||||
clearTimeout(timeout);
|
||||
timeout = null;
|
||||
}
|
||||
|
||||
_maybeStartJob();
|
||||
timeout = setTimeout(_tick, TICK_INTERVAL);
|
||||
}
|
||||
|
||||
async function _maybeStartJob() {
|
||||
if (!enabled) {
|
||||
logger.info('attachment_downloads/_maybeStartJob: not enabled, returning');
|
||||
return;
|
||||
}
|
||||
|
||||
const jobCount = getActiveJobCount();
|
||||
const limit = MAX_ATTACHMENT_JOB_PARALLELISM - jobCount;
|
||||
if (limit <= 0) {
|
||||
logger.info(
|
||||
'attachment_downloads/_maybeStartJob: reached active job limit, waiting'
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const nextJobs = await getNextAttachmentDownloadJobs(limit);
|
||||
if (nextJobs.length <= 0) {
|
||||
logger.info(
|
||||
'attachment_downloads/_maybeStartJob: no attachment jobs to run'
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// To prevent the race condition caused by two parallel database calls, eached kicked
|
||||
// off because the jobCount wasn't at the max.
|
||||
const secondJobCount = getActiveJobCount();
|
||||
const needed = MAX_ATTACHMENT_JOB_PARALLELISM - secondJobCount;
|
||||
if (needed <= 0) {
|
||||
logger.info(
|
||||
'attachment_downloads/_maybeStartJob: reached active job limit after ' +
|
||||
'db query, waiting'
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const jobs = nextJobs.slice(0, Math.min(needed, nextJobs.length));
|
||||
|
||||
logger.info(
|
||||
`attachment_downloads/_maybeStartJob: starting ${jobs.length} jobs`
|
||||
);
|
||||
|
||||
for (let i = 0, max = jobs.length; i < max; i += 1) {
|
||||
const job = jobs[i];
|
||||
const existing = _activeAttachmentDownloadJobs[job.id];
|
||||
if (existing) {
|
||||
logger.warn(`_maybeStartJob: Job ${job.id} is already running`);
|
||||
} else {
|
||||
_activeAttachmentDownloadJobs[job.id] = _runJob(job);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function _runJob(job) {
|
||||
const { id, messageId, attachment, type, index, attempts } = job || {};
|
||||
let message;
|
||||
|
||||
try {
|
||||
if (!job || !attachment || !messageId) {
|
||||
throw new Error(
|
||||
`_runJob: Key information required for job was missing. Job id: ${id}`
|
||||
);
|
||||
}
|
||||
|
||||
logger.info(`attachment_downloads/_runJob for job id ${id}`);
|
||||
|
||||
const found =
|
||||
MessageController.getById(messageId) ||
|
||||
(await getMessageById(messageId, {
|
||||
Message: Whisper.Message,
|
||||
}));
|
||||
if (!found) {
|
||||
logger.error('_runJob: Source message not found, deleting job');
|
||||
await _finishJob(null, id);
|
||||
return;
|
||||
}
|
||||
message = MessageController.register(found.id, found);
|
||||
|
||||
const pending = true;
|
||||
await setAttachmentDownloadJobPending(id, pending);
|
||||
|
||||
const messageReceiver = getMessageReceiver();
|
||||
if (!messageReceiver) {
|
||||
throw new Error('_runJob: messageReceiver not found');
|
||||
}
|
||||
|
||||
const downloaded = await downloadAttachment(attachment);
|
||||
|
||||
if (!downloaded) {
|
||||
logger.warn(
|
||||
`_runJob: Got 404 from server for CDN ${
|
||||
attachment.cdnNumber
|
||||
}, marking attachment ${
|
||||
attachment.cdnId || attachment.cdnKey
|
||||
} from message ${message.idForLogging()} as permanent error`
|
||||
);
|
||||
|
||||
await _finishJob(message, id);
|
||||
await _addAttachmentToMessage(
|
||||
message,
|
||||
_markAttachmentAsError(attachment),
|
||||
{ type, index }
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const upgradedAttachment = await Signal.Migrations.processNewAttachment(
|
||||
downloaded
|
||||
);
|
||||
|
||||
await _addAttachmentToMessage(message, upgradedAttachment, { type, index });
|
||||
|
||||
await _finishJob(message, id);
|
||||
} catch (error) {
|
||||
const currentAttempt = (attempts || 0) + 1;
|
||||
|
||||
if (currentAttempt >= 3) {
|
||||
logger.error(
|
||||
`_runJob: ${currentAttempt} failed attempts, marking attachment ${id} from message ${message.idForLogging()} as permament error:`,
|
||||
error && error.stack ? error.stack : error
|
||||
);
|
||||
|
||||
await _finishJob(message, id);
|
||||
await _addAttachmentToMessage(
|
||||
message,
|
||||
_markAttachmentAsError(attachment),
|
||||
{ type, index }
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
logger.error(
|
||||
`_runJob: Failed to download attachment type ${type} for message ${message.idForLogging()}, attempt ${currentAttempt}:`,
|
||||
error && error.stack ? error.stack : error
|
||||
);
|
||||
|
||||
const failedJob = {
|
||||
...job,
|
||||
pending: 0,
|
||||
attempts: currentAttempt,
|
||||
timestamp: Date.now() + RETRY_BACKOFF[currentAttempt],
|
||||
};
|
||||
|
||||
await saveAttachmentDownloadJob(failedJob);
|
||||
delete _activeAttachmentDownloadJobs[id];
|
||||
_maybeStartJob();
|
||||
}
|
||||
}
|
||||
|
||||
async function _finishJob(message, id) {
|
||||
if (message) {
|
||||
logger.info(`attachment_downloads/_finishJob for job id: ${id}`);
|
||||
await saveMessage(message.attributes, {
|
||||
Message: Whisper.Message,
|
||||
});
|
||||
}
|
||||
|
||||
await removeAttachmentDownloadJob(id);
|
||||
delete _activeAttachmentDownloadJobs[id];
|
||||
_maybeStartJob();
|
||||
}
|
||||
|
||||
function getActiveJobCount() {
|
||||
return Object.keys(_activeAttachmentDownloadJobs).length;
|
||||
}
|
||||
|
||||
function _markAttachmentAsError(attachment) {
|
||||
return {
|
||||
...omit(attachment, ['key', 'digest', 'id']),
|
||||
error: true,
|
||||
};
|
||||
}
|
||||
|
||||
async function _addAttachmentToMessage(message, attachment, { type, index }) {
|
||||
if (!message) {
|
||||
return;
|
||||
}
|
||||
|
||||
const logPrefix = `${message.idForLogging()} (type: ${type}, index: ${index})`;
|
||||
|
||||
if (type === 'long-message') {
|
||||
try {
|
||||
const { data } = await Signal.Migrations.loadAttachmentData(attachment);
|
||||
message.set({
|
||||
body: attachment.isError ? message.get('body') : stringFromBytes(data),
|
||||
bodyPending: false,
|
||||
});
|
||||
} finally {
|
||||
Signal.Migrations.deleteAttachmentData(attachment.path);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (type === 'attachment') {
|
||||
const attachments = message.get('attachments');
|
||||
if (!attachments || attachments.length <= index) {
|
||||
throw new Error(
|
||||
`_addAttachmentToMessage: attachments didn't exist or ${index} was too large`
|
||||
);
|
||||
}
|
||||
_checkOldAttachment(attachments, index, attachment, logPrefix);
|
||||
|
||||
const newAttachments = [...attachments];
|
||||
newAttachments[index] = attachment;
|
||||
|
||||
message.set({ attachments: newAttachments });
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (type === 'preview') {
|
||||
const preview = message.get('preview');
|
||||
if (!preview || preview.length <= index) {
|
||||
throw new Error(
|
||||
`_addAttachmentToMessage: preview didn't exist or ${index} was too large`
|
||||
);
|
||||
}
|
||||
const item = preview[index];
|
||||
if (!item) {
|
||||
throw new Error(`_addAttachmentToMessage: preview ${index} was falsey`);
|
||||
}
|
||||
|
||||
_checkOldAttachment(item, 'image', attachment, logPrefix);
|
||||
|
||||
const newPreview = [...preview];
|
||||
newPreview[index] = {
|
||||
...preview[index],
|
||||
image: attachment,
|
||||
};
|
||||
|
||||
message.set({ preview: newPreview });
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (type === 'contact') {
|
||||
const contact = message.get('contact');
|
||||
if (!contact || contact.length <= index) {
|
||||
throw new Error(
|
||||
`_addAttachmentToMessage: contact didn't exist or ${index} was too large`
|
||||
);
|
||||
}
|
||||
const item = contact[index];
|
||||
if (item && item.avatar && item.avatar.avatar) {
|
||||
_checkOldAttachment(item.avatar, 'avatar', attachment, logPrefix);
|
||||
|
||||
const newContact = [...contact];
|
||||
newContact[index] = {
|
||||
...contact[index],
|
||||
avatar: {
|
||||
...contact[index].avatar,
|
||||
avatar: attachment,
|
||||
},
|
||||
};
|
||||
|
||||
message.set({ contact: newContact });
|
||||
} else {
|
||||
logger.warn(
|
||||
`_addAttachmentToMessage: Couldn't update contact with avatar attachment for message ${message.idForLogging()}`
|
||||
);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (type === 'quote') {
|
||||
const quote = message.get('quote');
|
||||
if (!quote) {
|
||||
throw new Error("_addAttachmentToMessage: quote didn't exist");
|
||||
}
|
||||
const { attachments } = quote;
|
||||
if (!attachments || attachments.length <= index) {
|
||||
throw new Error(
|
||||
`_addAttachmentToMessage: quote attachments didn't exist or ${index} was too large`
|
||||
);
|
||||
}
|
||||
|
||||
const item = attachments[index];
|
||||
if (!item) {
|
||||
throw new Error(
|
||||
`_addAttachmentToMessage: quote attachment ${index} was falsey`
|
||||
);
|
||||
}
|
||||
|
||||
_checkOldAttachment(item, 'thumbnail', attachment, logPrefix);
|
||||
|
||||
const newAttachments = [...attachments];
|
||||
newAttachments[index] = {
|
||||
...attachments[index],
|
||||
thumbnail: attachment,
|
||||
};
|
||||
|
||||
const newQuote = {
|
||||
...quote,
|
||||
attachments: newAttachments,
|
||||
};
|
||||
|
||||
message.set({ quote: newQuote });
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (type === 'sticker') {
|
||||
const sticker = message.get('sticker');
|
||||
if (!sticker) {
|
||||
throw new Error("_addAttachmentToMessage: sticker didn't exist");
|
||||
}
|
||||
|
||||
message.set({
|
||||
sticker: {
|
||||
...sticker,
|
||||
data: attachment,
|
||||
},
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
throw new Error(
|
||||
`_addAttachmentToMessage: Unknown job type ${type} for message ${message.idForLogging()}`
|
||||
);
|
||||
}
|
||||
|
||||
function _checkOldAttachment(object, key, newAttachment, logPrefix) {
|
||||
const oldAttachment = object[key];
|
||||
if (oldAttachment && oldAttachment.path) {
|
||||
logger.error(
|
||||
`_checkOldAttachment: ${logPrefix} - old attachment already had path, not replacing`
|
||||
);
|
||||
throw new Error(
|
||||
'_checkOldAttachment: old attachment already had path, not replacing'
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -22,7 +22,6 @@ const Settings = require('./settings');
|
||||
const RemoteConfig = require('../../ts/RemoteConfig');
|
||||
const Util = require('../../ts/util');
|
||||
const LinkPreviews = require('./link_previews');
|
||||
const AttachmentDownloads = require('./attachment_downloads');
|
||||
|
||||
// Components
|
||||
const {
|
||||
@@ -443,7 +442,6 @@ exports.setup = (options = {}) => {
|
||||
};
|
||||
|
||||
return {
|
||||
AttachmentDownloads,
|
||||
Backbone,
|
||||
Components,
|
||||
Crypto,
|
||||
|
||||
Reference in New Issue
Block a user