Use minimal replacement class for MessageModel

This commit is contained in:
Scott Nonnenberg
2025-01-10 08:18:32 +10:00
committed by GitHub
parent 6b00cf756e
commit f846678b90
95 changed files with 3919 additions and 4457 deletions

View File

@@ -1,63 +1,124 @@
// Copyright 2019 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import cloneDeep from 'lodash/cloneDeep';
import { throttle } from 'lodash';
import { LRUCache } from 'lru-cache';
import type {
MessageAttributesType,
ReadonlyMessageAttributesType,
} from '../model-types.d';
import type { MessageModel } from '../models/messages';
import { DataReader, DataWriter } from '../sql/Client';
import * as Errors from '../types/errors';
import * as log from '../logging/log';
import { getEnvironment, Environment } from '../environment';
import { MessageModel } from '../models/messages';
import { DataReader, DataWriter } from '../sql/Client';
import { getMessageConversation } from '../util/getMessageConversation';
import { getMessageModelLogger } from '../util/MessageModelLogger';
import { getSenderIdentifier } from '../util/getSenderIdentifier';
import { isNotNil } from '../util/isNotNil';
import { softAssert, strictAssert } from '../util/assert';
import { isStory } from '../messages/helpers';
import type { SendStateByConversationId } from '../messages/MessageSendState';
import { getStoryDataFromMessageAttributes } from './storyLoader';
import { postSaveUpdates } from '../util/cleanup';
import type { MessageAttributesType } from '../model-types.d';
import type { SendStateByConversationId } from '../messages/MessageSendState';
import type { StoredJob } from '../jobs/types';
const MAX_THROTTLED_REDUX_UPDATERS = 200;
export class MessageCache {
static install(): MessageCache {
const instance = new MessageCache();
window.MessageCache = instance;
return instance;
}
private state = {
messages: new Map<string, MessageAttributesType>(),
messages: new Map<string, MessageModel>(),
messageIdsBySender: new Map<string, string>(),
messageIdsBySentAt: new Map<number, Array<string>>(),
lastAccessedAt: new Map<string, number>(),
};
// Stores the models so that __DEPRECATED$register always returns the existing
// copy instead of a new model.
private modelCache = new Map<string, MessageModel>();
public saveMessage(
message: MessageAttributesType | MessageModel,
options?: {
forceSave?: boolean;
jobToInsert?: Readonly<StoredJob>;
}
): Promise<string> {
const attributes =
message instanceof MessageModel ? message.attributes : message;
// Synchronously access a message's attributes from internal cache. Will
// return undefined if the message does not exist in memory.
public accessAttributes(
messageId: string
): Readonly<MessageAttributesType> | undefined {
const messageAttributes = this.state.messages.get(messageId);
return messageAttributes
? this.freezeAttributes(messageAttributes)
: undefined;
return DataWriter.saveMessage(attributes, {
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
...options,
});
}
// Synchronously access a message's attributes from internal cache. Throws
// if the message does not exist in memory.
public accessAttributesOrThrow(
source: string,
messageId: string
): Readonly<MessageAttributesType> {
const messageAttributes = this.accessAttributes(messageId);
strictAssert(
messageAttributes,
`MessageCache.accessAttributesOrThrow/${source}: no message for id ${messageId}`
);
return messageAttributes;
public register(message: MessageModel): MessageModel {
if (!message || !message.id) {
throw new Error('MessageCache.register: Got falsey id or message');
}
const existing = this.getById(message.id);
if (existing) {
return existing;
}
this.addMessageToCache(message);
return message;
}
// Finds a message in the cache by sender identifier
public findBySender(senderIdentifier: string): MessageModel | undefined {
const id = this.state.messageIdsBySender.get(senderIdentifier);
if (!id) {
return undefined;
}
return this.getById(id);
}
// Finds a message in the cache by Id
public getById(id: string): MessageModel | undefined {
const message = this.state.messages.get(id);
if (!message) {
return undefined;
}
this.state.lastAccessedAt.set(id, Date.now());
return message;
}
// Finds a message in the cache by sentAt/timestamp
public async findBySentAt(
sentAt: number,
predicate: (model: MessageModel) => boolean
): Promise<MessageModel | undefined> {
const items = this.state.messageIdsBySentAt.get(sentAt) ?? [];
const inMemory = items
.map(id => this.getById(id))
.filter(isNotNil)
.find(predicate);
if (inMemory != null) {
return inMemory;
}
log.info(`findBySentAt(${sentAt}): db lookup needed`);
const allOnDisk = await DataReader.getMessagesBySentAt(sentAt);
const onDisk = allOnDisk
.map(message => this.register(new MessageModel(message)))
.find(predicate);
return onDisk;
}
// Deletes the message from our cache
public unregister(id: string): void {
const message = this.state.messages.get(id);
if (!message) {
return;
}
this.removeMessage(id);
}
// Evicts messages from the message cache if they have not been accessed past
@@ -65,9 +126,9 @@ export class MessageCache {
public deleteExpiredMessages(expiryTime: number): void {
const now = Date.now();
for (const [messageId, messageAttributes] of this.state.messages) {
for (const [messageId, message] of this.state.messages) {
const timeLastAccessed = this.state.lastAccessedAt.get(messageId) ?? 0;
const conversation = getMessageConversation(messageAttributes);
const conversation = getMessageConversation(message.attributes);
const state = window.reduxStore.getState();
const selectedId = state?.conversations?.selectedConversationId;
@@ -75,21 +136,25 @@ export class MessageCache {
conversation && selectedId && conversation.id === selectedId;
if (now - timeLastAccessed > expiryTime && !inActiveConversation) {
this.__DEPRECATED$unregister(messageId);
this.unregister(messageId);
}
}
}
// Finds a message in the cache by sender identifier
public findBySender(
senderIdentifier: string
): Readonly<MessageAttributesType> | undefined {
const id = this.state.messageIdsBySender.get(senderIdentifier);
if (!id) {
return undefined;
public async upgradeSchema(
message: MessageModel,
minSchemaVersion: number
): Promise<void> {
const { schemaVersion } = message.attributes;
if (!schemaVersion || schemaVersion >= minSchemaVersion) {
return;
}
const startingAttributes = message.attributes;
const upgradedAttributes =
await window.Signal.Migrations.upgradeMessageSchema(startingAttributes);
if (startingAttributes !== upgradedAttributes) {
message.set(upgradedAttributes);
}
return this.accessAttributes(id);
}
public replaceAllObsoleteConversationIds({
@@ -112,12 +177,12 @@ export class MessageCache {
};
};
for (const [messageId, messageAttributes] of this.state.messages) {
if (messageAttributes.conversationId !== obsoleteId) {
for (const [, message] of this.state.messages) {
if (message.get('conversationId') !== obsoleteId) {
continue;
}
const editHistory = messageAttributes.editHistory?.map(history => {
const editHistory = message.get('editHistory')?.map(history => {
return {
...history,
sendStateByConversationId: updateSendState(
@@ -126,117 +191,33 @@ export class MessageCache {
};
});
this.setAttributes({
messageId,
messageAttributes: {
conversationId,
sendStateByConversationId: updateSendState(
messageAttributes.sendStateByConversationId
),
editHistory,
},
skipSaveToDatabase: true,
message.set({
conversationId,
sendStateByConversationId: updateSendState(
message.get('sendStateByConversationId')
),
editHistory,
});
}
}
// Find the message's attributes whether in memory or in the database.
// Refresh the attributes in the cache if they exist. Throw if we cannot find
// a matching message.
public async resolveAttributes(
source: string,
messageId: string
): Promise<Readonly<MessageAttributesType>> {
const inMemoryMessageAttributes = this.accessAttributes(messageId);
// Semi-public API
if (inMemoryMessageAttributes) {
return inMemoryMessageAttributes;
}
// Should only be called by MessageModel's set() function
public _updateCaches(message: MessageModel): undefined {
const existing = this.getById(message.id);
let messageAttributesFromDatabase: MessageAttributesType | undefined;
try {
messageAttributesFromDatabase =
await DataReader.getMessageById(messageId);
} catch (err: unknown) {
log.error(
`MessageCache.resolveAttributes(${messageId}): db error ${Errors.toLogFormat(
err
)}`
);
}
strictAssert(
messageAttributesFromDatabase,
`MessageCache.resolveAttributes/${source}: no message for id ${messageId}`
);
return this.freezeAttributes(messageAttributesFromDatabase);
}
// Updates a message's attributes and saves the message to cache and to the
// database. Option to skip the save to the database.
// Overload #1: if skipSaveToDatabase = true, returns void
public setAttributes({
messageId,
messageAttributes,
skipSaveToDatabase,
}: {
messageId: string;
messageAttributes: Partial<MessageAttributesType>;
skipSaveToDatabase: true;
}): void;
// Overload #2: if skipSaveToDatabase = false, returns DB save promise
public setAttributes({
messageId,
messageAttributes,
skipSaveToDatabase,
}: {
messageId: string;
messageAttributes: Partial<MessageAttributesType>;
skipSaveToDatabase: false;
}): Promise<string>;
// Implementation
public setAttributes({
messageId,
messageAttributes: partialMessageAttributes,
skipSaveToDatabase,
}: {
messageId: string;
messageAttributes: Partial<MessageAttributesType>;
skipSaveToDatabase: boolean;
}): Promise<string> | undefined {
let messageAttributes = this.accessAttributes(messageId);
softAssert(messageAttributes, 'could not find message attributes');
if (!messageAttributes) {
// We expect message attributes to be defined in cache if one is trying to
// set new attributes. In the case that the attributes are missing in cache
// we'll add whatever we currently have to cache as a defensive measure so
// that the code continues to work properly downstream. The softAssert above
// that logs/debugger should be addressed upstream immediately by ensuring
// that message is in cache.
const partiallyCachedMessage = {
id: messageId,
...partialMessageAttributes,
} as MessageAttributesType;
this.addMessageToCache(partiallyCachedMessage);
messageAttributes = partiallyCachedMessage;
// If this model hasn't been registered yet, we can't add to cache because we don't
// want to force `message` to be the primary MessageModel for this message.
if (!existing) {
return;
}
this.state.messageIdsBySender.delete(
getSenderIdentifier(messageAttributes)
getSenderIdentifier(message.attributes)
);
const nextMessageAttributes = {
...messageAttributes,
...partialMessageAttributes,
};
const { id, sent_at: sentAt } = nextMessageAttributes;
const { id, sent_at: sentAt } = message.attributes;
const previousIdsBySentAt = this.state.messageIdsBySentAt.get(sentAt);
let nextIdsBySentAtSet: Set<string>;
@@ -247,44 +228,70 @@ export class MessageCache {
nextIdsBySentAtSet = new Set([id]);
}
this.state.messages.set(id, nextMessageAttributes);
this.state.lastAccessedAt.set(id, Date.now());
this.state.messageIdsBySender.set(
getSenderIdentifier(messageAttributes),
getSenderIdentifier(message.attributes),
id
);
this.markModelStale(nextMessageAttributes);
this.throttledUpdateRedux(message.attributes);
}
this.throttledUpdateRedux(nextMessageAttributes);
// Helpers
if (skipSaveToDatabase) {
private addMessageToCache(message: MessageModel): void {
if (!message.id) {
return;
}
return DataWriter.saveMessage(nextMessageAttributes, {
ourAci: window.textsecure.storage.user.getCheckedAci(),
});
}
private throttledReduxUpdaters = new LRUCache<
string,
typeof this.updateRedux
>({
max: MAX_THROTTLED_REDUX_UPDATERS,
});
private throttledUpdateRedux(attributes: MessageAttributesType) {
let updater = this.throttledReduxUpdaters.get(attributes.id);
if (!updater) {
updater = throttle(this.updateRedux.bind(this), 200, {
leading: true,
trailing: true,
});
this.throttledReduxUpdaters.set(attributes.id, updater);
if (this.state.messages.has(message.id)) {
this.state.lastAccessedAt.set(message.id, Date.now());
return;
}
updater(attributes);
const { id, sent_at: sentAt } = message.attributes;
const previousIdsBySentAt = this.state.messageIdsBySentAt.get(sentAt);
let nextIdsBySentAtSet: Set<string>;
if (previousIdsBySentAt) {
nextIdsBySentAtSet = new Set(previousIdsBySentAt);
nextIdsBySentAtSet.add(id);
} else {
nextIdsBySentAtSet = new Set([id]);
}
this.state.messages.set(message.id, message);
this.state.lastAccessedAt.set(message.id, Date.now());
this.state.messageIdsBySentAt.set(sentAt, Array.from(nextIdsBySentAtSet));
this.state.messageIdsBySender.set(
getSenderIdentifier(message.attributes),
id
);
}
private removeMessage(messageId: string): void {
const message = this.state.messages.get(messageId);
if (!message) {
return;
}
const { id, sent_at: sentAt } = message.attributes;
const nextIdsBySentAtSet =
new Set(this.state.messageIdsBySentAt.get(sentAt)) || new Set();
nextIdsBySentAtSet.delete(id);
if (nextIdsBySentAtSet.size) {
this.state.messageIdsBySentAt.set(sentAt, Array.from(nextIdsBySentAtSet));
} else {
this.state.messageIdsBySentAt.delete(sentAt);
}
this.state.messages.delete(messageId);
this.state.lastAccessedAt.delete(messageId);
this.state.messageIdsBySender.delete(
getSenderIdentifier(message.attributes)
);
}
private updateRedux(attributes: MessageAttributesType) {
@@ -313,238 +320,23 @@ export class MessageCache {
);
}
// When you already have the message attributes from the db and want to
// ensure that they're added to the cache. The latest attributes from cache
// are returned if they exist, if not the attributes passed in are returned.
public toMessageAttributes(
messageAttributes: MessageAttributesType
): Readonly<MessageAttributesType> {
this.addMessageToCache(messageAttributes);
private throttledReduxUpdaters = new LRUCache<
string,
typeof this.updateRedux
>({
max: MAX_THROTTLED_REDUX_UPDATERS,
});
const nextMessageAttributes = this.state.messages.get(messageAttributes.id);
strictAssert(
nextMessageAttributes,
`MessageCache.toMessageAttributes: no message for id ${messageAttributes.id}`
);
if (getEnvironment() === Environment.Development) {
return Object.freeze(cloneDeep(nextMessageAttributes));
}
return nextMessageAttributes;
}
static install(): MessageCache {
const instance = new MessageCache();
window.MessageCache = instance;
return instance;
}
private addMessageToCache(messageAttributes: MessageAttributesType): void {
if (!messageAttributes.id) {
return;
}
if (this.state.messages.has(messageAttributes.id)) {
this.state.lastAccessedAt.set(messageAttributes.id, Date.now());
return;
}
const { id, sent_at: sentAt } = messageAttributes;
const previousIdsBySentAt = this.state.messageIdsBySentAt.get(sentAt);
let nextIdsBySentAtSet: Set<string>;
if (previousIdsBySentAt) {
nextIdsBySentAtSet = new Set(previousIdsBySentAt);
nextIdsBySentAtSet.add(id);
} else {
nextIdsBySentAtSet = new Set([id]);
}
this.state.messages.set(messageAttributes.id, { ...messageAttributes });
this.state.lastAccessedAt.set(messageAttributes.id, Date.now());
this.state.messageIdsBySentAt.set(sentAt, Array.from(nextIdsBySentAtSet));
this.state.messageIdsBySender.set(
getSenderIdentifier(messageAttributes),
id
);
}
private freezeAttributes(
messageAttributes: MessageAttributesType
): Readonly<MessageAttributesType> {
this.addMessageToCache(messageAttributes);
if (getEnvironment() === Environment.Development) {
return Object.freeze(cloneDeep(messageAttributes));
}
return messageAttributes;
}
private removeMessage(messageId: string): void {
const messageAttributes = this.state.messages.get(messageId);
if (!messageAttributes) {
return;
}
const { id, sent_at: sentAt } = messageAttributes;
const nextIdsBySentAtSet =
new Set(this.state.messageIdsBySentAt.get(sentAt)) || new Set();
nextIdsBySentAtSet.delete(id);
if (nextIdsBySentAtSet.size) {
this.state.messageIdsBySentAt.set(sentAt, Array.from(nextIdsBySentAtSet));
} else {
this.state.messageIdsBySentAt.delete(sentAt);
}
this.state.messages.delete(messageId);
this.state.lastAccessedAt.delete(messageId);
this.state.messageIdsBySender.delete(
getSenderIdentifier(messageAttributes)
);
}
// Deprecated methods below
// Adds the message into the cache and eturns a Proxy that resembles
// a MessageModel
public __DEPRECATED$register(
id: string,
data: MessageModel | MessageAttributesType,
location: string
): MessageModel {
if (!id || !data) {
throw new Error(
'MessageCache.__DEPRECATED$register: Got falsey id or message'
);
}
const existing = this.__DEPRECATED$getById(id, location);
if (existing) {
this.addMessageToCache(existing.attributes);
return existing;
}
const modelProxy = this.toModel(data);
const messageAttributes = 'attributes' in data ? data.attributes : data;
this.addMessageToCache(messageAttributes);
modelProxy.registerLocations.add(location);
return modelProxy;
}
// Deletes the message from our cache
public __DEPRECATED$unregister(id: string): void {
const model = this.modelCache.get(id);
if (!model) {
return;
}
this.removeMessage(id);
this.modelCache.delete(id);
}
// Finds a message in the cache by Id
public __DEPRECATED$getById(
id: string,
location: string
): MessageModel | undefined {
const data = this.state.messages.get(id);
if (!data) {
return undefined;
}
const model = this.toModel(data);
model.registerLocations.add(location);
return model;
}
public async upgradeSchema(
attributes: MessageAttributesType,
minSchemaVersion: number
): Promise<MessageAttributesType> {
const { schemaVersion } = attributes;
if (!schemaVersion || schemaVersion >= minSchemaVersion) {
return attributes;
}
const upgradedAttributes =
await window.Signal.Migrations.upgradeMessageSchema(attributes);
await this.setAttributes({
messageId: upgradedAttributes.id,
messageAttributes: upgradedAttributes,
skipSaveToDatabase: false,
});
return upgradedAttributes;
}
// Finds a message in the cache by sentAt/timestamp
public async findBySentAt(
sentAt: number,
predicate: (attributes: ReadonlyMessageAttributesType) => boolean
): Promise<MessageAttributesType | undefined> {
const items = this.state.messageIdsBySentAt.get(sentAt) ?? [];
const inMemory = items
.map(id => this.accessAttributes(id))
.filter(isNotNil)
.find(predicate);
if (inMemory != null) {
return inMemory;
}
log.info(`findBySentAt(${sentAt}): db lookup needed`);
const allOnDisk = await DataReader.getMessagesBySentAt(sentAt);
const onDisk = allOnDisk.find(predicate);
if (onDisk != null) {
this.addMessageToCache(onDisk);
}
return onDisk;
}
// Marks cached model as "should be stale" to discourage continued use.
// The model's attributes are directly updated so that the model is in sync
// with the in-memory attributes.
private markModelStale(messageAttributes: MessageAttributesType): void {
const { id } = messageAttributes;
const model = this.modelCache.get(id);
if (!model) {
return;
}
model.attributes = { ...messageAttributes };
if (getEnvironment() === Environment.Development) {
log.warn('MessageCache: updating cached backbone model', {
cid: model.cid,
locations: Array.from(model.registerLocations).join(', '),
private throttledUpdateRedux(attributes: MessageAttributesType) {
let updater = this.throttledReduxUpdaters.get(attributes.id);
if (!updater) {
updater = throttle(this.updateRedux.bind(this), 200, {
leading: true,
trailing: true,
});
}
}
// Creates a proxy object for MessageModel which logs usage in development
// so that we're able to migrate off of models
private toModel(
messageAttributes: MessageAttributesType | MessageModel
): MessageModel {
const existingModel = this.modelCache.get(messageAttributes.id);
if (existingModel) {
return existingModel;
this.throttledReduxUpdaters.set(attributes.id, updater);
}
const model =
'attributes' in messageAttributes
? messageAttributes
: new window.Whisper.Message(messageAttributes);
const proxy = getMessageModelLogger(model);
this.modelCache.set(messageAttributes.id, proxy);
return proxy;
updater(attributes);
}
}