|
|
|
|
@@ -13,6 +13,7 @@ import org.signal.core.util.SqlUtil
|
|
|
|
|
import org.signal.core.util.delete
|
|
|
|
|
import org.signal.core.util.readToList
|
|
|
|
|
import org.signal.core.util.readToSet
|
|
|
|
|
import org.signal.core.util.readToSingleLong
|
|
|
|
|
import org.signal.core.util.requireBoolean
|
|
|
|
|
import org.signal.core.util.requireInt
|
|
|
|
|
import org.signal.core.util.requireIntOrNull
|
|
|
|
|
@@ -21,7 +22,7 @@ import org.signal.core.util.requireNonNullString
|
|
|
|
|
import org.signal.core.util.select
|
|
|
|
|
import org.signal.core.util.toInt
|
|
|
|
|
import org.signal.core.util.update
|
|
|
|
|
import org.signal.core.util.updateAll
|
|
|
|
|
import org.signal.core.util.withinTransaction
|
|
|
|
|
import org.thoughtcrime.securesms.backup.v2.ArchivedMediaObject
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
@@ -29,6 +30,25 @@ import org.thoughtcrime.securesms.backup.v2.ArchivedMediaObject
|
|
|
|
|
* references that attachment -- at least until a new backup is made.
|
|
|
|
|
*
|
|
|
|
|
* So, this table maintains a snapshot of the media present in the last backup, so that we know what we can and can't delete from the archive CDN.
|
|
|
|
|
*
|
|
|
|
|
* The lifecycle is as follows:
|
|
|
|
|
* - Before we make a backup, we clear any pending entries that might be left over from an aborted backup.
|
|
|
|
|
* - While a backup is in progress, we write entries here for each media item, marking them with a pending flag.
|
|
|
|
|
* - After a backup is fully uploaded, we commit the pending entries and update their version to MAX([SNAPSHOT_VERSION]) + 1
|
|
|
|
|
*
|
|
|
|
|
* The end result is that we have all the media objects referenced in backups, tagged with the most recent snapshot version they were seen at.
|
|
|
|
|
*
|
|
|
|
|
* This lets us know a few things:
|
|
|
|
|
* 1. We know that any non-pending entries whose version < MAX([SNAPSHOT_VERSION]) must have been deleted.
|
|
|
|
|
* 2. We know that any entries with MAX([SNAPSHOT_VERSION]) who aren't fully backed up yet (according to the [AttachmentTable]) need to be backed up.
|
|
|
|
|
*
|
|
|
|
|
* Occasionally, we'll also run a more elaborate "reconciliation" process where we fetch all of the remote CDN entries. That data, combined with this table,
|
|
|
|
|
* will let us do the following:
|
|
|
|
|
* 1. Any entries on the remote CDN that are not present in the table with MAX([SNAPSHOT_VERSION]) can be deleted from the remote CDN.
|
|
|
|
|
* 2. Any entries present in this table with MAX([SNAPSHOT_VERSION]) that are not present on the remote CDN need to be re-uploaded. This is trickier, since the
|
|
|
|
|
* remote CDN data is too large to fit in memory. To address that, as we page through remote CDN entries, we can set the [LAST_SEEN_ON_REMOTE_SNAPSHOT_VERSION]
|
|
|
|
|
* equal to MAX([SNAPSHOT_VERSION]). After we're done, any entries whose [SNAPSHOT_VERSION] = MAX([SNAPSHOT_VERSION]), but whose
|
|
|
|
|
* [LAST_SEEN_ON_REMOTE_SNAPSHOT_VERSION] < MAX([SNAPSHOT_VERSION]) must be entries that were missing from the remote CDN.
|
|
|
|
|
*/
|
|
|
|
|
class BackupMediaSnapshotTable(context: Context, database: SignalDatabase) : DatabaseTable(context, database) {
|
|
|
|
|
companion object {
|
|
|
|
|
@@ -52,13 +72,13 @@ class BackupMediaSnapshotTable(context: Context, database: SignalDatabase) : Dat
|
|
|
|
|
* where newer backups have a greater backup id value.
|
|
|
|
|
*/
|
|
|
|
|
@VisibleForTesting
|
|
|
|
|
const val LAST_SYNC_TIME = "last_sync_time"
|
|
|
|
|
const val SNAPSHOT_VERSION = "snapshot_version"
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Pending sync time, set while a backup is in the process of being exported.
|
|
|
|
|
*/
|
|
|
|
|
@VisibleForTesting
|
|
|
|
|
const val PENDING_SYNC_TIME = "pending_sync_time"
|
|
|
|
|
const val IS_PENDING = "is_pending"
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Whether or not this entry is for a thumbnail.
|
|
|
|
|
@@ -68,58 +88,90 @@ class BackupMediaSnapshotTable(context: Context, database: SignalDatabase) : Dat
|
|
|
|
|
/**
|
|
|
|
|
* Timestamp when media was last seen on archive cdn. Can be reset to default.
|
|
|
|
|
*/
|
|
|
|
|
const val LAST_SEEN_ON_REMOTE_TIMESTAMP = "last_seen_on_remote_timestamp"
|
|
|
|
|
const val LAST_SEEN_ON_REMOTE_SNAPSHOT_VERSION = "last_seen_on_remote_snapshot_version"
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* The remote digest for the media object. This is used to find matching attachments in the attachment table when necessary.
|
|
|
|
|
*/
|
|
|
|
|
const val REMOTE_DIGEST = "remote_digest"
|
|
|
|
|
|
|
|
|
|
/** Constant representing a [SNAPSHOT_VERSION] version that has not yet been set. */
|
|
|
|
|
const val UNKNOWN_VERSION = -1
|
|
|
|
|
|
|
|
|
|
private const val MAX_SNAPSHOT_VERSION_INDEX = "backup_snapshot_version_index"
|
|
|
|
|
|
|
|
|
|
/** A query that returns that max [SNAPSHOT_VERSION] presently in the table. An index exists to ensure that this is fast. */
|
|
|
|
|
const val MAX_VERSION = "(SELECT MAX($SNAPSHOT_VERSION) FROM $TABLE_NAME INDEXED BY $MAX_SNAPSHOT_VERSION_INDEX WHERE $SNAPSHOT_VERSION != $UNKNOWN_VERSION)"
|
|
|
|
|
|
|
|
|
|
val CREATE_TABLE = """
|
|
|
|
|
CREATE TABLE $TABLE_NAME (
|
|
|
|
|
$ID INTEGER PRIMARY KEY,
|
|
|
|
|
$MEDIA_ID TEXT UNIQUE,
|
|
|
|
|
$MEDIA_ID TEXT NOT NULL UNIQUE,
|
|
|
|
|
$CDN INTEGER,
|
|
|
|
|
$LAST_SYNC_TIME INTEGER DEFAULT 0,
|
|
|
|
|
$PENDING_SYNC_TIME INTEGER,
|
|
|
|
|
$IS_THUMBNAIL INTEGER DEFAULT 0,
|
|
|
|
|
$SNAPSHOT_VERSION INTEGER NOT NULL DEFAULT $UNKNOWN_VERSION,
|
|
|
|
|
$IS_PENDING INTEGER NOT NULL DEFAULT 0,
|
|
|
|
|
$IS_THUMBNAIL INTEGER NOT NULL DEFAULT 0,
|
|
|
|
|
$REMOTE_DIGEST BLOB NOT NULL,
|
|
|
|
|
$LAST_SEEN_ON_REMOTE_TIMESTAMP INTEGER DEFAULT 0
|
|
|
|
|
$LAST_SEEN_ON_REMOTE_SNAPSHOT_VERSION INTEGER NOT NULL DEFAULT 0
|
|
|
|
|
)
|
|
|
|
|
""".trimIndent()
|
|
|
|
|
|
|
|
|
|
val CREATE_INDEXES = arrayOf(
|
|
|
|
|
"CREATE INDEX IF NOT EXISTS $MAX_SNAPSHOT_VERSION_INDEX ON $TABLE_NAME ($SNAPSHOT_VERSION DESC) WHERE $SNAPSHOT_VERSION != $UNKNOWN_VERSION"
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Writes the set of media items that are slated to be referenced in the next backup, updating their pending sync time.
|
|
|
|
|
* Will insert multiple rows per object -- one for the main item, and one for the thumbnail.
|
|
|
|
|
*/
|
|
|
|
|
fun writePendingMediaObjects(mediaObjects: Sequence<ArchiveMediaItem>, pendingSyncTime: Long) {
|
|
|
|
|
fun writePendingMediaObjects(mediaObjects: Sequence<ArchiveMediaItem>) {
|
|
|
|
|
mediaObjects
|
|
|
|
|
.chunked(SqlUtil.MAX_QUERY_ARGS)
|
|
|
|
|
.forEach { chunk ->
|
|
|
|
|
writePendingMediaObjectsChunk(
|
|
|
|
|
chunk.map { MediaEntry(it.mediaId, it.cdn, it.digest, isThumbnail = false) },
|
|
|
|
|
pendingSyncTime
|
|
|
|
|
chunk.map { MediaEntry(it.mediaId, it.cdn, it.digest, isThumbnail = false) }
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
writePendingMediaObjectsChunk(
|
|
|
|
|
chunk.map { MediaEntry(it.thumbnailMediaId, it.cdn, it.digest, isThumbnail = true) },
|
|
|
|
|
pendingSyncTime
|
|
|
|
|
chunk.map { MediaEntry(it.thumbnailMediaId, it.cdn, it.digest, isThumbnail = true) }
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Commits the pending sync time to the last sync time. This is called once a backup has been successfully uploaded.
|
|
|
|
|
* Commits all pending entries (written via [writePendingMediaObjects]) to have a concrete [SNAPSHOT_VERSION]. The version will be 1 higher than the previous
|
|
|
|
|
* snapshot version.
|
|
|
|
|
*/
|
|
|
|
|
fun commitPendingRows() {
|
|
|
|
|
writableDatabase.execSQL("UPDATE $TABLE_NAME SET $LAST_SYNC_TIME = $PENDING_SYNC_TIME")
|
|
|
|
|
writableDatabase.withinTransaction {
|
|
|
|
|
val currentSnapshotVersion = getCurrentSnapshotVersion()
|
|
|
|
|
val nextSnapshotVersion = currentSnapshotVersion + 1
|
|
|
|
|
|
|
|
|
|
writableDatabase
|
|
|
|
|
.update(TABLE_NAME)
|
|
|
|
|
.values(
|
|
|
|
|
SNAPSHOT_VERSION to nextSnapshotVersion,
|
|
|
|
|
IS_PENDING to 0
|
|
|
|
|
)
|
|
|
|
|
.where("$IS_PENDING != 0")
|
|
|
|
|
.run()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fun getPageOfOldMediaObjects(currentSyncTime: Long, pageSize: Int): Set<ArchivedMediaObject> {
|
|
|
|
|
fun getCurrentSnapshotVersion(): Long {
|
|
|
|
|
return readableDatabase
|
|
|
|
|
.select("MAX($SNAPSHOT_VERSION)")
|
|
|
|
|
.from("$TABLE_NAME INDEXED BY $MAX_SNAPSHOT_VERSION_INDEX")
|
|
|
|
|
.where("$SNAPSHOT_VERSION != $UNKNOWN_VERSION")
|
|
|
|
|
.run()
|
|
|
|
|
.readToSingleLong(0)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fun getPageOfOldMediaObjects(pageSize: Int): Set<ArchivedMediaObject> {
|
|
|
|
|
return readableDatabase.select(MEDIA_ID, CDN)
|
|
|
|
|
.from(TABLE_NAME)
|
|
|
|
|
.where("$LAST_SYNC_TIME < ? AND $LAST_SYNC_TIME = $PENDING_SYNC_TIME", currentSyncTime)
|
|
|
|
|
.where("$SNAPSHOT_VERSION < $MAX_VERSION AND $IS_PENDING = 0")
|
|
|
|
|
.limit(pageSize)
|
|
|
|
|
.run()
|
|
|
|
|
.readToSet {
|
|
|
|
|
@@ -127,16 +179,21 @@ class BackupMediaSnapshotTable(context: Context, database: SignalDatabase) : Dat
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fun deleteMediaObjects(mediaObjects: Collection<ArchivedMediaObject>) {
|
|
|
|
|
/**
|
|
|
|
|
* This will remove any old snapshot entries with matching mediaId's. No pending entries or entries in the latest snapshot will be affected.
|
|
|
|
|
*/
|
|
|
|
|
fun deleteOldMediaObjects(mediaObjects: Collection<ArchivedMediaObject>) {
|
|
|
|
|
val query = SqlUtil.buildFastCollectionQuery(MEDIA_ID, mediaObjects.map { it.mediaId })
|
|
|
|
|
|
|
|
|
|
writableDatabase.delete(TABLE_NAME)
|
|
|
|
|
.where(query.where, query.whereArgs)
|
|
|
|
|
.where("$SNAPSHOT_VERSION < $MAX_VERSION AND $IS_PENDING = 0 AND " + query.where, query.whereArgs)
|
|
|
|
|
.run()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Given a list of media objects, find the ones that we have no knowledge of in our local store.
|
|
|
|
|
* Given a list of media objects, find the ones that are not in the most recent backup snapshot.
|
|
|
|
|
*
|
|
|
|
|
* We purposely allow pending items here -- so long as they were in the most recent complete snapshot, we want to keep them.
|
|
|
|
|
*/
|
|
|
|
|
fun getMediaObjectsThatCantBeFound(objects: List<ArchivedMediaObject>): List<ArchivedMediaObject> {
|
|
|
|
|
if (objects.isEmpty()) {
|
|
|
|
|
@@ -146,7 +203,8 @@ class BackupMediaSnapshotTable(context: Context, database: SignalDatabase) : Dat
|
|
|
|
|
val queries: List<SqlUtil.Query> = SqlUtil.buildCollectionQuery(
|
|
|
|
|
column = MEDIA_ID,
|
|
|
|
|
values = objects.map { it.mediaId },
|
|
|
|
|
collectionOperator = SqlUtil.CollectionOperator.IN
|
|
|
|
|
collectionOperator = SqlUtil.CollectionOperator.IN,
|
|
|
|
|
prefix = "$SNAPSHOT_VERSION = $MAX_VERSION AND "
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
val foundObjects: MutableSet<String> = mutableSetOf()
|
|
|
|
|
@@ -166,7 +224,10 @@ class BackupMediaSnapshotTable(context: Context, database: SignalDatabase) : Dat
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Given a list of media objects, find the ones that we have no knowledge of in our local store.
|
|
|
|
|
* Given a list of media objects, find the ones that are present in the most recent snapshot, but have a different CDN than the one passed in.
|
|
|
|
|
* This will ignore thumbnails, as the results are intended to be used to update CDNs, which we do not track for thumbnails.
|
|
|
|
|
*
|
|
|
|
|
* We purposely allow pending items here -- either way they're in the latest snapshot, and should have their CDN info updated.
|
|
|
|
|
*/
|
|
|
|
|
fun getMediaObjectsWithNonMatchingCdn(objects: List<ArchivedMediaObject>): List<CdnMismatchResult> {
|
|
|
|
|
if (objects.isEmpty()) {
|
|
|
|
|
@@ -180,7 +241,7 @@ class BackupMediaSnapshotTable(context: Context, database: SignalDatabase) : Dat
|
|
|
|
|
SELECT a.$REMOTE_DIGEST, b.$CDN
|
|
|
|
|
FROM $TABLE_NAME a
|
|
|
|
|
JOIN input_pairs b ON a.$MEDIA_ID = b.$MEDIA_ID
|
|
|
|
|
WHERE a.$CDN != b.$CDN AND a.$IS_THUMBNAIL = 0
|
|
|
|
|
WHERE a.$CDN != b.$CDN AND a.$IS_THUMBNAIL = 0 AND $SNAPSHOT_VERSION = $MAX_VERSION
|
|
|
|
|
"""
|
|
|
|
|
).readToList { cursor ->
|
|
|
|
|
CdnMismatchResult(
|
|
|
|
|
@@ -193,7 +254,7 @@ class BackupMediaSnapshotTable(context: Context, database: SignalDatabase) : Dat
|
|
|
|
|
/**
|
|
|
|
|
* Indicate the time that the set of media objects were seen on the archive CDN. Can be used to reconcile our local state with the server state.
|
|
|
|
|
*/
|
|
|
|
|
fun markSeenOnRemote(mediaIdBatch: Collection<String>, time: Long) {
|
|
|
|
|
fun markSeenOnRemote(mediaIdBatch: Collection<String>, snapshotVersion: Long) {
|
|
|
|
|
if (mediaIdBatch.isEmpty()) {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
@@ -201,7 +262,7 @@ class BackupMediaSnapshotTable(context: Context, database: SignalDatabase) : Dat
|
|
|
|
|
val query = SqlUtil.buildFastCollectionQuery(MEDIA_ID, mediaIdBatch)
|
|
|
|
|
writableDatabase
|
|
|
|
|
.update(TABLE_NAME)
|
|
|
|
|
.values(LAST_SEEN_ON_REMOTE_TIMESTAMP to time)
|
|
|
|
|
.values(LAST_SEEN_ON_REMOTE_SNAPSHOT_VERSION to snapshotVersion)
|
|
|
|
|
.where(query.where, query.whereArgs)
|
|
|
|
|
.run()
|
|
|
|
|
}
|
|
|
|
|
@@ -211,46 +272,40 @@ class BackupMediaSnapshotTable(context: Context, database: SignalDatabase) : Dat
|
|
|
|
|
* This is used to find media objects that have not been seen on the CDN, even though they should be.
|
|
|
|
|
*
|
|
|
|
|
* The cursor contains rows that can be parsed into [MediaEntry] objects.
|
|
|
|
|
*
|
|
|
|
|
* We purposely allow pending items here -- either way they *should* be uploaded.
|
|
|
|
|
*/
|
|
|
|
|
fun getMediaObjectsLastSeenOnCdnBeforeTime(time: Long): Cursor {
|
|
|
|
|
fun getMediaObjectsLastSeenOnCdnBeforeSnapshotVersion(snapshotVersion: Long): Cursor {
|
|
|
|
|
return readableDatabase
|
|
|
|
|
.select(MEDIA_ID, CDN, REMOTE_DIGEST, IS_THUMBNAIL)
|
|
|
|
|
.from(TABLE_NAME)
|
|
|
|
|
.where("$LAST_SEEN_ON_REMOTE_TIMESTAMP < $time")
|
|
|
|
|
.where("$LAST_SEEN_ON_REMOTE_SNAPSHOT_VERSION < $snapshotVersion AND $SNAPSHOT_VERSION = $MAX_VERSION")
|
|
|
|
|
.run()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Resets the [LAST_SEEN_ON_REMOTE_TIMESTAMP] column back to zero. It's a good idea to do this after you have run a sync and used the value, as it can
|
|
|
|
|
* mitigate various issues that can arise from having an incorrect local clock.
|
|
|
|
|
*/
|
|
|
|
|
fun clearLastSeenOnRemote() {
|
|
|
|
|
writableDatabase
|
|
|
|
|
.updateAll(TABLE_NAME)
|
|
|
|
|
.values(LAST_SEEN_ON_REMOTE_TIMESTAMP to 0)
|
|
|
|
|
.run()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private fun writePendingMediaObjectsChunk(chunk: List<MediaEntry>, pendingSyncTime: Long) {
|
|
|
|
|
private fun writePendingMediaObjectsChunk(chunk: List<MediaEntry>) {
|
|
|
|
|
val values = chunk.map {
|
|
|
|
|
contentValuesOf(
|
|
|
|
|
MEDIA_ID to it.mediaId,
|
|
|
|
|
CDN to it.cdn,
|
|
|
|
|
REMOTE_DIGEST to it.digest,
|
|
|
|
|
IS_THUMBNAIL to it.isThumbnail.toInt(),
|
|
|
|
|
PENDING_SYNC_TIME to pendingSyncTime
|
|
|
|
|
SNAPSHOT_VERSION to UNKNOWN_VERSION,
|
|
|
|
|
IS_PENDING to 1
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val query = SqlUtil.buildSingleBulkInsert(TABLE_NAME, arrayOf(MEDIA_ID, CDN, REMOTE_DIGEST, IS_THUMBNAIL, PENDING_SYNC_TIME), values)
|
|
|
|
|
val query = SqlUtil.buildSingleBulkInsert(TABLE_NAME, arrayOf(MEDIA_ID, CDN, REMOTE_DIGEST, IS_THUMBNAIL, SNAPSHOT_VERSION, IS_PENDING), values)
|
|
|
|
|
|
|
|
|
|
writableDatabase.execSQL(
|
|
|
|
|
"""
|
|
|
|
|
${query.where}
|
|
|
|
|
ON CONFLICT($MEDIA_ID) DO UPDATE SET
|
|
|
|
|
$PENDING_SYNC_TIME = EXCLUDED.$PENDING_SYNC_TIME,
|
|
|
|
|
$CDN = EXCLUDED.$CDN
|
|
|
|
|
""",
|
|
|
|
|
query.where +
|
|
|
|
|
"""
|
|
|
|
|
ON CONFLICT($MEDIA_ID) DO UPDATE SET
|
|
|
|
|
$CDN = excluded.$CDN,
|
|
|
|
|
$REMOTE_DIGEST = excluded.$REMOTE_DIGEST,
|
|
|
|
|
$IS_THUMBNAIL = excluded.$IS_THUMBNAIL,
|
|
|
|
|
$IS_PENDING = excluded.$IS_PENDING
|
|
|
|
|
""",
|
|
|
|
|
query.whereArgs
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|