diff --git a/ts/challenge.ts b/ts/challenge.ts index 0f4d5daea7..9d8e45b59c 100644 --- a/ts/challenge.ts +++ b/ts/challenge.ts @@ -22,6 +22,7 @@ import * as Errors from './types/errors'; import { HTTPError } from './textsecure/Errors'; import type { SendMessageChallengeData } from './textsecure/Errors'; import * as log from './logging/log'; +import { drop } from './util/drop'; export type ChallengeResponse = Readonly<{ captcha: string; @@ -75,6 +76,7 @@ export type RegisteredChallengeType = Readonly<{ reason: string; retryAt?: number; token?: string; + silent: boolean; }>; type SolveOptionsType = Readonly<{ @@ -210,7 +212,7 @@ export class ChallengeHandler { } if (challenge.token) { - void this.solve({ reason, token: challenge.token }); + drop(this.solve({ reason, token: challenge.token })); } } @@ -247,7 +249,7 @@ export class ChallengeHandler { setTimeout(() => { this.startTimers.delete(conversationId); - void this.startQueue(conversationId); + drop(this.startQueue(conversationId)); }, waitTime) ); log.info( @@ -269,7 +271,9 @@ export class ChallengeHandler { return; } - void this.solve({ token: challenge.token, reason }); + if (!challenge.silent) { + drop(this.solve({ token: challenge.token, reason })); + } } public onResponse(response: IPCResponse): void { @@ -282,8 +286,13 @@ export class ChallengeHandler { handler.resolve(response.data); } - public async unregister(conversationId: string): Promise { - log.info(`challenge: unregistered conversation ${conversationId}`); + public async unregister( + conversationId: string, + source: string + ): Promise { + log.info( + `challenge: unregistered conversation ${conversationId} via ${source}` + ); this.registeredConversations.delete(conversationId); this.pendingStarts.delete(conversationId); @@ -343,7 +352,7 @@ export class ChallengeHandler { return; } - await this.unregister(conversationId); + await this.unregister(conversationId, 'startQueue'); if (this.registeredConversations.size === 0) { this.options.setChallengeStatus('idle'); diff --git a/ts/jobs/JobQueue.ts b/ts/jobs/JobQueue.ts index 8cd58fdd9b..09d1454564 100644 --- a/ts/jobs/JobQueue.ts +++ b/ts/jobs/JobQueue.ts @@ -13,6 +13,7 @@ import * as log from '../logging/log'; import { JobLogger } from './JobLogger'; import * as Errors from '../types/errors'; import type { LoggerType } from '../types/Logging'; +import { drop } from '../util/drop'; const noopOnCompleteCallbacks = { resolve: noop, @@ -43,6 +44,12 @@ type JobQueueOptions = { logger?: LoggerType; }; +export enum JOB_STATUS { + SUCCESS = 'SUCCESS', + NEEDS_RETRY = 'NEEDS_RETRY', + ERROR = 'ERROR', +} + export abstract class JobQueue { private readonly maxAttempts: number; @@ -119,7 +126,7 @@ export abstract class JobQueue { protected abstract run( job: Readonly>, extra?: Readonly<{ attempt?: number; log?: LoggerType }> - ): Promise; + ): Promise; protected getQueues(): ReadonlySet { return new Set([this.defaultInMemoryQueue]); @@ -144,7 +151,7 @@ export abstract class JobQueue { log.info(`${this.logPrefix} is shutting down. Can't accept more work.`); break; } - void this.enqueueStoredJob(storedJob); + drop(this.enqueueStoredJob(storedJob)); } } @@ -201,7 +208,9 @@ export abstract class JobQueue { return this.defaultInMemoryQueue; } - private async enqueueStoredJob(storedJob: Readonly) { + protected async enqueueStoredJob( + storedJob: Readonly + ): Promise { assertDev( storedJob.queueType === this.queueType, 'Received a mis-matched queue type' @@ -242,50 +251,78 @@ export abstract class JobQueue { const result: | undefined - | { success: true } - | { success: false; err: unknown } = await queue.add(async () => { - for (let attempt = 1; attempt <= this.maxAttempts; attempt += 1) { - const isFinalAttempt = attempt === this.maxAttempts; + | { status: JOB_STATUS.SUCCESS } + | { status: JOB_STATUS.NEEDS_RETRY } + | { status: JOB_STATUS.ERROR; err: unknown } = await queue.add( + async () => { + for (let attempt = 1; attempt <= this.maxAttempts; attempt += 1) { + const isFinalAttempt = attempt === this.maxAttempts; - logger.attempt = attempt; + logger.attempt = attempt; - log.info( - `${this.logPrefix} running job ${storedJob.id}, attempt ${attempt} of ${this.maxAttempts}` - ); - - if (this.isShuttingDown) { - log.warn( - `${this.logPrefix} returning early for job ${storedJob.id}; shutting down` - ); - return { success: false, err: new Error('Shutting down') }; - } - - try { - // We want an `await` in the loop, as we don't want a single job running more - // than once at a time. Ideally, the job will succeed on the first attempt. - // eslint-disable-next-line no-await-in-loop - await this.run(parsedJob, { attempt, log: logger }); log.info( - `${this.logPrefix} job ${storedJob.id} succeeded on attempt ${attempt}` + `${this.logPrefix} running job ${storedJob.id}, attempt ${attempt} of ${this.maxAttempts}` ); - return { success: true }; - } catch (err: unknown) { - log.error( - `${this.logPrefix} job ${ - storedJob.id - } failed on attempt ${attempt}. ${Errors.toLogFormat(err)}` - ); - if (isFinalAttempt) { - return { success: false, err }; + + if (this.isShuttingDown) { + log.warn( + `${this.logPrefix} returning early for job ${storedJob.id}; shutting down` + ); + return { + status: JOB_STATUS.ERROR, + err: new Error('Shutting down'), + }; + } + + try { + // We want an `await` in the loop, as we don't want a single job running more + // than once at a time. Ideally, the job will succeed on the first attempt. + // eslint-disable-next-line no-await-in-loop + const jobStatus = await this.run(parsedJob, { + attempt, + log: logger, + }); + if (!jobStatus) { + log.info( + `${this.logPrefix} job ${storedJob.id} succeeded on attempt ${attempt}` + ); + return { status: JOB_STATUS.SUCCESS }; + } + log.info( + `${this.logPrefix} job ${storedJob.id} returned status ${jobStatus} on attempt ${attempt}` + ); + return { status: jobStatus }; + } catch (err: unknown) { + log.error( + `${this.logPrefix} job ${ + storedJob.id + } failed on attempt ${attempt}. ${Errors.toLogFormat(err)}` + ); + if (isFinalAttempt) { + return { status: JOB_STATUS.ERROR, err }; + } } } + + // This should never happen. See the assertion below. + return undefined; } + ); - // This should never happen. See the assertion below. - return undefined; - }); - - if (result?.success || !this.isShuttingDown) { + if (result?.status === JOB_STATUS.NEEDS_RETRY) { + const addJobSuccess = await this.retryJobOnQueueIdle({ + storedJob, + job: parsedJob, + logger, + }); + if (!addJobSuccess) { + await this.store.delete(storedJob.id); + } + } + if ( + result?.status === JOB_STATUS.SUCCESS || + (result?.status === JOB_STATUS.ERROR && !this.isShuttingDown) + ) { await this.store.delete(storedJob.id); } @@ -293,13 +330,26 @@ export abstract class JobQueue { result, 'The job never ran. This indicates a developer error in the job queue' ); - if (result.success) { - resolve(); - } else { + if (result.status === JOB_STATUS.ERROR) { reject(result.err); + } else { + resolve(); } } + async retryJobOnQueueIdle({ + logger, + }: { + job: Readonly>; + storedJob: Readonly; + logger: LoggerType; + }): Promise { + logger.error( + `retryJobOnQueueIdle: not implemented for queue ${this.queueType}; dropping job` + ); + return false; + } + async shutdown(): Promise { const queues = this.getQueues(); log.info( diff --git a/ts/jobs/conversationJobQueue.ts b/ts/jobs/conversationJobQueue.ts index 314f16ffa4..1c95cabd68 100644 --- a/ts/jobs/conversationJobQueue.ts +++ b/ts/jobs/conversationJobQueue.ts @@ -9,7 +9,7 @@ import * as durations from '../util/durations'; import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff'; import { InMemoryQueues } from './helpers/InMemoryQueues'; import { jobQueueDatabaseStore } from './JobQueueDatabaseStore'; -import { JobQueue } from './JobQueue'; +import { JOB_STATUS, JobQueue } from './JobQueue'; import { sendNormalMessage } from './helpers/sendNormalMessage'; import { sendDirectExpirationTimerUpdate } from './helpers/sendDirectExpirationTimerUpdate'; @@ -33,7 +33,7 @@ import { strictAssert } from '../util/assert'; import { missingCaseError } from '../util/missingCaseError'; import { explodePromise } from '../util/explodePromise'; import type { Job } from './Job'; -import type { ParsedJob } from './types'; +import type { ParsedJob, StoredJob } from './types'; import type SendMessage from '../textsecure/SendMessage'; import type { ServiceIdString } from '../types/ServiceId'; import { commonShouldJobContinue } from './helpers/commonShouldJobContinue'; @@ -44,6 +44,9 @@ import { sendResendRequest } from './helpers/sendResendRequest'; import { sendNullMessage } from './helpers/sendNullMessage'; import { sendSenderKeyDistribution } from './helpers/sendSenderKeyDistribution'; import { sendSavedProto } from './helpers/sendSavedProto'; +import { drop } from '../util/drop'; +import { isInPast } from '../util/timestamp'; +import { clearTimeoutIfNecessary } from '../util/clearTimeoutIfNecessary'; // Note: generally, we only want to add to this list. If you do need to change one of // these values, you'll likely need to write a database migration. @@ -62,6 +65,7 @@ export const conversationQueueJobEnum = z.enum([ 'Story', 'Receipts', ]); +type ConversationQueueJobEnum = z.infer; const deleteForEveryoneJobDataSchema = z.object({ type: z.literal(conversationQueueJobEnum.enum.DeleteForEveryone), @@ -234,7 +238,92 @@ export type ConversationQueueJobBundle = { const MAX_RETRY_TIME = durations.DAY; const MAX_ATTEMPTS = exponentialBackoffMaxAttempts(MAX_RETRY_TIME); +function shouldSendShowCaptcha(type: ConversationQueueJobEnum): boolean { + if (type === 'DeleteForEveryone') { + return true; + } + if (type === 'DeleteStoryForEveryone') { + return true; + } + if (type === 'DirectExpirationTimerUpdate') { + return true; + } + if (type === 'GroupUpdate') { + return false; + } + if (type === 'NormalMessage') { + return true; + } + if (type === 'NullMessage') { + return false; + } + if (type === 'ProfileKey') { + return false; + } + if (type === 'Reaction') { + return false; + } + if (type === 'ResendRequest') { + return false; + } + if (type === 'SavedProto') { + return false; + } + if (type === 'SenderKeyDistribution') { + return false; + } + if (type === 'Story') { + return true; + } + if (type === 'Receipts') { + return false; + } + + throw missingCaseError(type); +} + +enum RETRY_STATUS { + BLOCKED = 'BLOCKED', + BLOCKED_WITH_JOBS = 'BLOCKED_WITH_JOBS', + UNBLOCKED = 'UNBLOCKED', +} + +type ConversationData = Readonly< + | { + // When we get a retryAt from a 428 error, we immediately record it, but we don't + // yet have a job to retry. We should, very soon, when the job returns + // JOB_STATUS.NEEDS_RETRY. This should be a very short-lived state. + status: RETRY_STATUS.BLOCKED; + callback: undefined; + jobsNeedingRetry: undefined; + retryAt: number; + } + | { + // This is the next stage, when we've added at least one job needing retry, and we + // have a callback registered to run on queue idle (or be called directly). + status: RETRY_STATUS.BLOCKED_WITH_JOBS; + callback: () => void; + jobsNeedingRetry: Array>; + retryAt: number; + retryAtTimeout?: NodeJS.Timeout; + } + | { + // When we discover that we can now run these deferred jobs, we flip into this + // state, which should be short-lived. We very quickly re-enqueue all + // jobsNeedingRetry, and erase perConversationData for this conversation. + status: RETRY_STATUS.UNBLOCKED; + callback: () => void; + jobsNeedingRetry: Array>; + retryAt: undefined; + retryAtTimeout?: NodeJS.Timeout; + } +>; + export class ConversationJobQueue extends JobQueue { + private readonly perConversationData = new Map< + string, + ConversationData | undefined + >(); private readonly inMemoryQueues = new InMemoryQueues(); private readonly verificationWaitMap = new Map< string, @@ -244,6 +333,7 @@ export class ConversationJobQueue extends JobQueue { promise: Promise; } >(); + private callbackCount = 0; override getQueues(): ReadonlySet { return this.inMemoryQueues.allQueues; @@ -254,14 +344,17 @@ export class ConversationJobQueue extends JobQueue { insert?: (job: ParsedJob) => Promise ): Promise> { const { conversationId, type } = data; - strictAssert( - window.Signal.challengeHandler, - 'conversationJobQueue.add: Missing challengeHandler!' - ); - window.Signal.challengeHandler.maybeSolve({ - conversationId, - reason: `conversationJobQueue.add(${conversationId}, ${type})`, - }); + + if (shouldSendShowCaptcha(data.type)) { + strictAssert( + window.Signal.challengeHandler, + 'conversationJobQueue.add: Missing challengeHandler!' + ); + window.Signal.challengeHandler.maybeSolve({ + conversationId, + reason: `conversationJobQueue.add(${conversationId}, ${type})`, + }); + } return super.add(data, insert); } @@ -310,18 +403,239 @@ export class ConversationJobQueue extends JobQueue { globalLogger.warn( `resolveVerificationWaiter: Missing waiter for conversation ${conversationId}.` ); + this.unblockConversationRetries(conversationId); } } + private unblockConversationRetries(conversationId: string) { + const logId = `unblockConversationRetries/${conversationId}`; + + const perConversationData = this.perConversationData.get(conversationId); + if (!perConversationData) { + return; + } + + const { status, callback } = perConversationData; + if (status === RETRY_STATUS.BLOCKED) { + globalLogger.info( + `${logId}: Deleting previous BLOCKED state; had no jobs` + ); + this.perConversationData.delete(conversationId); + } else if (status === RETRY_STATUS.BLOCKED_WITH_JOBS) { + globalLogger.info( + `${logId}: Moving previous WAITING state to UNBLOCKED, calling callback directly` + ); + this.perConversationData.set(conversationId, { + ...perConversationData, + status: RETRY_STATUS.UNBLOCKED, + retryAt: undefined, + }); + callback(); + } else if (status === RETRY_STATUS.UNBLOCKED) { + globalLogger.warn( + `${logId}: We're still in UNBLOCKED state; calling callback directly` + ); + callback(); + } else { + throw missingCaseError(status); + } + } + + private captureRetryAt(conversationId: string, retryAt: number | undefined) { + const logId = `captureRetryAt/${conversationId}`; + + const newRetryAt = retryAt || Date.now() + MINUTE; + const perConversationData = this.perConversationData.get(conversationId); + if (!perConversationData) { + if (!retryAt) { + globalLogger.warn( + `${logId}: No existing data, using retryAt of ${newRetryAt}` + ); + } + this.perConversationData.set(conversationId, { + status: RETRY_STATUS.BLOCKED, + retryAt: newRetryAt, + callback: undefined, + jobsNeedingRetry: undefined, + }); + + return; + } + + const { status, retryAt: existingRetryAt } = perConversationData; + if (existingRetryAt && existingRetryAt >= newRetryAt) { + globalLogger.warn( + `${logId}: New newRetryAt ${newRetryAt} isn't after existing retryAt ${existingRetryAt}, dropping` + ); + return; + } + + if ( + status === RETRY_STATUS.BLOCKED || + status === RETRY_STATUS.BLOCKED_WITH_JOBS + ) { + globalLogger.info( + `${logId}: Updating to newRetryAt ${newRetryAt} from existing retryAt ${existingRetryAt}, status ${status}` + ); + this.perConversationData.set(conversationId, { + ...perConversationData, + retryAt: newRetryAt, + }); + } else if (status === RETRY_STATUS.UNBLOCKED) { + globalLogger.info( + `${logId}: Updating to newRetryAt ${newRetryAt} from previous UNBLOCKED status` + ); + this.perConversationData.set(conversationId, { + ...perConversationData, + status: RETRY_STATUS.BLOCKED_WITH_JOBS, + retryAt: newRetryAt, + }); + } else { + throw missingCaseError(status); + } + } + + override async retryJobOnQueueIdle({ + job, + storedJob, + logger, + }: { + job: Readonly>; + storedJob: Readonly; + logger: LoggerType; + }): Promise { + const { conversationId } = job.data; + const logId = `retryJobOnQueueIdle/${conversationId}/${job.id}`; + const perConversationData = this.perConversationData.get(conversationId); + + if (!perConversationData) { + logger.warn(`${logId}: no data for conversation; using default retryAt`); + } else { + logger.warn( + `${logId}: adding to existing data with status ${perConversationData.status}` + ); + } + + const { status, retryAt, jobsNeedingRetry, callback } = + perConversationData || { + status: RETRY_STATUS.BLOCKED, + retryAt: Date.now() + MINUTE, + }; + + const newJobsNeedingRetry = (jobsNeedingRetry || []).concat([storedJob]); + logger.info( + `${logId}: job added to retry queue with status ${status}; ${newJobsNeedingRetry.length} items now in queue` + ); + + const newCallback = + callback || this.createRetryCallback(conversationId, job.id); + + if ( + status === RETRY_STATUS.BLOCKED || + status === RETRY_STATUS.BLOCKED_WITH_JOBS + ) { + this.perConversationData.set(conversationId, { + status: RETRY_STATUS.BLOCKED_WITH_JOBS, + retryAt, + jobsNeedingRetry: newJobsNeedingRetry, + callback: newCallback, + }); + } else { + this.perConversationData.set(conversationId, { + status: RETRY_STATUS.UNBLOCKED, + retryAt, + jobsNeedingRetry: newJobsNeedingRetry, + callback: newCallback, + }); + } + + if (newCallback !== callback) { + const queue = this.getInMemoryQueue(job); + drop( + // eslint-disable-next-line more/no-then + queue.onIdle().then(() => { + globalLogger.info(`${logId}: Running callback due to queue.onIdle`); + newCallback(); + }) + ); + } + + return true; + } + + private createRetryCallback(conversationId: string, jobId: string) { + this.callbackCount += 1; + const id = this.callbackCount; + + globalLogger.info( + `createRetryCallback/${conversationId}/${id}: callback created for job ${jobId}` + ); + + return () => { + const logId = `retryCallback/${conversationId}/${id}`; + + const perConversationData = this.perConversationData.get(conversationId); + if (!perConversationData) { + globalLogger.warn(`${logId}: no perConversationData, returning early.`); + return; + } + + const { status, retryAt } = perConversationData; + if (status === RETRY_STATUS.BLOCKED) { + globalLogger.warn( + `${logId}: Still in blocked state, no jobs to retry. Clearing perConversationData.` + ); + this.perConversationData.delete(conversationId); + return; + } + + const { callback, jobsNeedingRetry, retryAtTimeout } = + perConversationData; + + if (retryAtTimeout) { + clearTimeoutIfNecessary(retryAtTimeout); + } + + if (!retryAt || isInPast(retryAt)) { + globalLogger.info( + `${logId}: retryAt is ${retryAt}; queueing ${jobsNeedingRetry?.length} jobs needing retry` + ); + + // We're starting to retry jobs; remove the challenge handler + drop(window.Signal.challengeHandler?.unregister(conversationId, logId)); + + jobsNeedingRetry?.forEach(job => { + drop(this.enqueueStoredJob(job)); + }); + this.perConversationData.delete(conversationId); + return; + } + + const timeLeft = retryAt - Date.now(); + globalLogger.info( + `${logId}: retryAt ${retryAt} is in the future, scheduling timeout for ${timeLeft}ms` + ); + + this.perConversationData.set(conversationId, { + ...perConversationData, + retryAtTimeout: setTimeout(() => { + globalLogger.info(`${logId}: Running callback due to timeout`); + callback(); + }, timeLeft), + }); + }; + } + protected async run( { data, timestamp, }: Readonly<{ data: ConversationQueueJobData; timestamp: number }>, { attempt, log }: Readonly<{ attempt: number; log: LoggerType }> - ): Promise { + ): Promise { const { type, conversationId } = data; const isFinalAttempt = attempt >= MAX_ATTEMPTS; + const perConversationData = this.perConversationData.get(conversationId); await window.ConversationController.load(); @@ -330,6 +644,11 @@ export class ConversationJobQueue extends JobQueue { throw new Error(`Failed to find conversation ${conversationId}`); } + if (perConversationData?.retryAt && !shouldSendShowCaptcha(type)) { + // If we return this value, JobQueue will call retryJobOnQueueIdle for this job + return JOB_STATUS.NEEDS_RETRY; + } + let timeRemaining: number; let shouldContinue: boolean; let count = 0; @@ -350,10 +669,24 @@ export class ConversationJobQueue extends JobQueue { break; } - if (window.Signal.challengeHandler?.isRegistered(conversationId)) { + const isChallengeRegistered = + window.Signal.challengeHandler?.isRegistered(conversationId); + if (!isChallengeRegistered) { + this.unblockConversationRetries(conversationId); + } + + if (isChallengeRegistered && shouldSendShowCaptcha(type)) { if (this.isShuttingDown) { throw new Error("Shutting down, can't wait for captcha challenge."); } + + window.Signal.challengeHandler?.maybeSolve({ + conversationId, + reason: + 'conversationJobQueue.run/addWaiter(' + + `${conversation.idForLogging()}, ${type}, ${timestamp})`, + }); + log.info( 'captcha challenge is pending for this conversation; waiting at most 5m...' ); @@ -386,7 +719,7 @@ export class ConversationJobQueue extends JobQueue { log.warn( "Cancelling profile share, we don't want to wait for pending verification." ); - return; + return undefined; } if (this.isShuttingDown) { @@ -498,10 +831,14 @@ export class ConversationJobQueue extends JobQueue { ); } } + + return undefined; } catch (error: unknown) { const untrustedServiceIds: Array = []; - const processError = (toProcess: unknown) => { + const processError = ( + toProcess: unknown + ): undefined | typeof JOB_STATUS.NEEDS_RETRY => { if (toProcess instanceof OutgoingIdentityKeyError) { const failedConversation = window.ConversationController.getOrCreate( toProcess.identifier, @@ -513,28 +850,47 @@ export class ConversationJobQueue extends JobQueue { log.error( `failedConversation: Conversation ${failedConversation.idForLogging()} missing serviceId!` ); - return; + return undefined; } untrustedServiceIds.push(serviceId); } else if (toProcess instanceof SendMessageChallengeError) { - void window.Signal.challengeHandler?.register( - { - conversationId, - createdAt: Date.now(), - retryAt: toProcess.retryAt, - token: toProcess.data?.token, - reason: - 'conversationJobQueue.run(' + - `${conversation.idForLogging()}, ${type}, ${timestamp})`, - }, - toProcess.data + const silent = !shouldSendShowCaptcha(type); + + drop( + window.Signal.challengeHandler?.register( + { + conversationId, + createdAt: Date.now(), + retryAt: toProcess.retryAt, + token: toProcess.data?.token, + reason: + 'conversationJobQueue.run(' + + `${conversation.idForLogging()}, ${type}, ${timestamp})`, + silent, + }, + toProcess.data + ) ); + + if (silent) { + this.captureRetryAt(conversationId, toProcess.retryAt); + return JOB_STATUS.NEEDS_RETRY; + } } + return undefined; }; - processError(error); + const value = processError(error); + if (value) { + return value; + } + if (error instanceof SendMessageProtoError) { - (error.errors || []).forEach(processError); + const values = (error.errors || []).map(processError); + const innerValue = values.find(item => Boolean(item)); + if (innerValue) { + return innerValue; + } } if (untrustedServiceIds.length) { @@ -542,14 +898,14 @@ export class ConversationJobQueue extends JobQueue { log.warn( `Cancelling profile share, since there were ${untrustedServiceIds.length} untrusted send targets.` ); - return; + return undefined; } if (type === jobSet.Receipts) { log.warn( `Cancelling receipt send, since there were ${untrustedServiceIds.length} untrusted send targets.` ); - return; + return undefined; } log.error( diff --git a/ts/jobs/helpers/sendStory.ts b/ts/jobs/helpers/sendStory.ts index 1db8c69d95..1d134582b1 100644 --- a/ts/jobs/helpers/sendStory.ts +++ b/ts/jobs/helpers/sendStory.ts @@ -444,6 +444,7 @@ export async function sendStory( reason: 'conversationJobQueue.run(' + `${conversation.idForLogging()}, story, ${timestamp}/${distributionId})`, + silent: false, }, error.data ); diff --git a/ts/jobs/readSyncJobQueue.ts b/ts/jobs/readSyncJobQueue.ts index 58bc34be13..ef6f347036 100644 --- a/ts/jobs/readSyncJobQueue.ts +++ b/ts/jobs/readSyncJobQueue.ts @@ -13,6 +13,7 @@ import { import { strictAssert } from '../util/assert'; import { isRecord } from '../util/isRecord'; +import type { JOB_STATUS } from './JobQueue'; import { JobQueue } from './JobQueue'; import { jobQueueDatabaseStore } from './JobQueueDatabaseStore'; @@ -31,7 +32,7 @@ export class ReadSyncJobQueue extends JobQueue { protected async run( { data, timestamp }: Readonly<{ data: ReadSyncJobData; timestamp: number }>, { attempt, log }: Readonly<{ attempt: number; log: LoggerType }> - ): Promise { + ): Promise { await runSyncJob({ attempt, log, @@ -40,6 +41,8 @@ export class ReadSyncJobQueue extends JobQueue { timestamp, type: SyncTypeList.Read, }); + + return undefined; } } diff --git a/ts/jobs/removeStorageKeyJobQueue.ts b/ts/jobs/removeStorageKeyJobQueue.ts index 8b3e86fc32..c911ef6a34 100644 --- a/ts/jobs/removeStorageKeyJobQueue.ts +++ b/ts/jobs/removeStorageKeyJobQueue.ts @@ -3,7 +3,9 @@ import { z } from 'zod'; +import type { JOB_STATUS } from './JobQueue'; import { JobQueue } from './JobQueue'; + import { jobQueueDatabaseStore } from './JobQueueDatabaseStore'; const removeStorageKeyJobDataSchema = z.object({ @@ -24,12 +26,16 @@ export class RemoveStorageKeyJobQueue extends JobQueue protected async run({ data, - }: Readonly<{ data: RemoveStorageKeyJobData }>): Promise { + }: Readonly<{ data: RemoveStorageKeyJobData }>): Promise< + typeof JOB_STATUS.NEEDS_RETRY | undefined + > { await new Promise(resolve => { window.storage.onready(resolve); }); await window.storage.remove(data.key); + + return undefined; } } diff --git a/ts/jobs/reportSpamJobQueue.ts b/ts/jobs/reportSpamJobQueue.ts index 24ba8811e3..6ab92b41a1 100644 --- a/ts/jobs/reportSpamJobQueue.ts +++ b/ts/jobs/reportSpamJobQueue.ts @@ -10,6 +10,7 @@ import type { LoggerType } from '../types/Logging'; import { aciSchema } from '../types/ServiceId'; import { map } from '../util/iterables'; +import type { JOB_STATUS } from './JobQueue'; import { JobQueue } from './JobQueue'; import { jobQueueDatabaseStore } from './JobQueueDatabaseStore'; import { parseIntWithFallback } from '../util/parseIntWithFallback'; @@ -49,7 +50,7 @@ export class ReportSpamJobQueue extends JobQueue { protected async run( { data }: Readonly<{ data: ReportSpamJobData }>, { log }: Readonly<{ log: LoggerType }> - ): Promise { + ): Promise { const { aci: senderAci, token, serverGuids } = data; await new Promise(resolve => { @@ -58,7 +59,7 @@ export class ReportSpamJobQueue extends JobQueue { if (!isDeviceLinked()) { log.info("reportSpamJobQueue: skipping this job because we're unlinked"); - return; + return undefined; } await waitForOnline(window.navigator, window); @@ -72,6 +73,8 @@ export class ReportSpamJobQueue extends JobQueue { server.reportMessage({ senderAci, serverGuid, token }) ) ); + + return undefined; } catch (err: unknown) { if (!(err instanceof HTTPError)) { throw err; @@ -88,7 +91,7 @@ export class ReportSpamJobQueue extends JobQueue { log.info( 'reportSpamJobQueue: server responded with 508. Giving up on this job' ); - return; + return undefined; } if (isRetriable4xxStatus(code) || is5xxStatus(code)) { @@ -106,7 +109,7 @@ export class ReportSpamJobQueue extends JobQueue { log.error( `reportSpamJobQueue: server responded with ${code} status code. Giving up on this job` ); - return; + return undefined; } throw err; diff --git a/ts/jobs/singleProtoJobQueue.ts b/ts/jobs/singleProtoJobQueue.ts index 6b5c540bfb..4208650e18 100644 --- a/ts/jobs/singleProtoJobQueue.ts +++ b/ts/jobs/singleProtoJobQueue.ts @@ -8,6 +8,7 @@ import * as Bytes from '../Bytes'; import type { LoggerType } from '../types/Logging'; import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff'; import type { ParsedJob } from './types'; +import type { JOB_STATUS } from './JobQueue'; import { JobQueue } from './JobQueue'; import { jobQueueDatabaseStore } from './JobQueueDatabaseStore'; import { DAY } from '../util/durations'; @@ -51,7 +52,7 @@ export class SingleProtoJobQueue extends JobQueue { timestamp, }: Readonly<{ data: SingleProtoJobData; timestamp: number }>, { attempt, log }: Readonly<{ attempt: number; log: LoggerType }> - ): Promise { + ): Promise { const timeRemaining = timestamp + MAX_RETRY_TIME - Date.now(); const isFinalAttempt = attempt >= MAX_ATTEMPTS; @@ -62,7 +63,7 @@ export class SingleProtoJobQueue extends JobQueue { skipWait: false, }); if (!shouldContinue) { - return; + return undefined; } const { @@ -87,19 +88,19 @@ export class SingleProtoJobQueue extends JobQueue { log.info( `conversation ${conversation.idForLogging()} is not accepted; refusing to send` ); - return; + return undefined; } if (isConversationUnregistered(conversation.attributes)) { log.info( `conversation ${conversation.idForLogging()} is unregistered; refusing to send` ); - return; + return undefined; } if (conversation.isBlocked()) { log.info( `conversation ${conversation.idForLogging()} is blocked; refusing to send` ); - return; + return undefined; } const proto = Proto.Content.decode(Bytes.fromBase64(protoBase64)); @@ -133,6 +134,8 @@ export class SingleProtoJobQueue extends JobQueue { toThrow: error, }); } + + return undefined; } } diff --git a/ts/jobs/viewOnceOpenJobQueue.ts b/ts/jobs/viewOnceOpenJobQueue.ts index d90a5862c2..7eb1653acc 100644 --- a/ts/jobs/viewOnceOpenJobQueue.ts +++ b/ts/jobs/viewOnceOpenJobQueue.ts @@ -13,6 +13,7 @@ import { import { strictAssert } from '../util/assert'; import { isRecord } from '../util/isRecord'; +import type { JOB_STATUS } from './JobQueue'; import { JobQueue } from './JobQueue'; import { jobQueueDatabaseStore } from './JobQueueDatabaseStore'; @@ -34,7 +35,7 @@ export class ViewOnceOpenJobQueue extends JobQueue { timestamp, }: Readonly<{ data: ViewOnceOpenJobData; timestamp: number }>, { attempt, log }: Readonly<{ attempt: number; log: LoggerType }> - ): Promise { + ): Promise { await runSyncJob({ attempt, log, @@ -43,6 +44,8 @@ export class ViewOnceOpenJobQueue extends JobQueue { timestamp, type: SyncTypeList.ViewOnceOpen, }); + + return undefined; } } diff --git a/ts/jobs/viewSyncJobQueue.ts b/ts/jobs/viewSyncJobQueue.ts index 26ec9cdcf0..840607d5f8 100644 --- a/ts/jobs/viewSyncJobQueue.ts +++ b/ts/jobs/viewSyncJobQueue.ts @@ -13,6 +13,7 @@ import { import { strictAssert } from '../util/assert'; import { isRecord } from '../util/isRecord'; +import type { JOB_STATUS } from './JobQueue'; import { JobQueue } from './JobQueue'; import { jobQueueDatabaseStore } from './JobQueueDatabaseStore'; @@ -31,7 +32,7 @@ export class ViewSyncJobQueue extends JobQueue { protected async run( { data, timestamp }: Readonly<{ data: ViewSyncJobData; timestamp: number }>, { attempt, log }: Readonly<{ attempt: number; log: LoggerType }> - ): Promise { + ): Promise { await runSyncJob({ attempt, log, @@ -40,6 +41,8 @@ export class ViewSyncJobQueue extends JobQueue { timestamp, type: SyncTypeList.View, }); + + return undefined; } } diff --git a/ts/test-both/challenge_test.ts b/ts/test-both/challenge_test.ts index 93464bcad2..f7502c76ce 100644 --- a/ts/test-both/challenge_test.ts +++ b/ts/test-both/challenge_test.ts @@ -57,6 +57,7 @@ describe('ChallengeHandler', () => { retryAt: NOW + DEFAULT_RETRY_AFTER, createdAt: NOW - SECOND, reason: 'test', + silent: false, ...options, }; }; @@ -185,7 +186,7 @@ describe('ChallengeHandler', () => { await createHandler(); for (const challenge of challenges) { - await handler.unregister(challenge.conversationId); + await handler.unregister(challenge.conversationId, 'test'); } for (const challenge of challenges) { @@ -223,7 +224,7 @@ describe('ChallengeHandler', () => { autoSolve: true, expireAfter: -1, }); - await handler.unregister(one.conversationId); + await handler.unregister(one.conversationId, 'test'); challengeStatus = 'idle'; await newHandler.load(); diff --git a/ts/test-mock/messaging/edit_test.ts b/ts/test-mock/messaging/edit_test.ts index 5acd9511e2..485f60ed51 100644 --- a/ts/test-mock/messaging/edit_test.ts +++ b/ts/test-mock/messaging/edit_test.ts @@ -147,7 +147,7 @@ describe('editing', function (this: Mocha.Suite) { .locator(`.module-message__text >> "${initialMessageBody}"`) .waitFor(); - debug('waiting for receipts for original message'); + debug('waiting for outgoing receipts for original message'); const receipts = await app.waitForReceipts(); assert.strictEqual(receipts.type, ReceiptType.Read); assert.strictEqual(receipts.timestamps.length, 1); diff --git a/ts/test-mock/rate-limit/viewed_test.ts b/ts/test-mock/rate-limit/viewed_test.ts index 4db8b0d562..6e5af5e0d0 100644 --- a/ts/test-mock/rate-limit/viewed_test.ts +++ b/ts/test-mock/rate-limit/viewed_test.ts @@ -20,6 +20,7 @@ describe('challenge/receipts', function (this: Mocha.Suite) { let bootstrap: Bootstrap; let app: App; let contact: PrimaryDevice; + let contactB: PrimaryDevice; beforeEach(async () => { bootstrap = new Bootstrap({ @@ -34,6 +35,9 @@ describe('challenge/receipts', function (this: Mocha.Suite) { contact = await server.createPrimaryDevice({ profileName: 'Jamie', }); + contactB = await server.createPrimaryDevice({ + profileName: 'Kim', + }); let state = StorageState.getEmpty(); @@ -55,13 +59,28 @@ describe('challenge/receipts', function (this: Mocha.Suite) { }, ServiceIdKind.PNI ); + state = state.addContact( + contactB, + { + whitelisted: true, + serviceE164: contactB.device.number, + identityKey: contactB.getPublicKey(ServiceIdKind.PNI).serialize(), + pni: toUntaggedPni(contactB.device.pni), + givenName: 'Kim', + }, + ServiceIdKind.PNI + ); // Just to make PNI Contact visible in the left pane state = state.pin(contact, ServiceIdKind.PNI); + state = state.pin(contactB, ServiceIdKind.PNI); const ourKey = await desktop.popSingleUseKey(); await contact.addSingleUseKey(desktop, ourKey); + const ourKeyB = await desktop.popSingleUseKey(); + await contactB.addSingleUseKey(desktop, ourKeyB); + await phone.setStorageState(state); }); @@ -95,11 +114,18 @@ describe('challenge/receipts', function (this: Mocha.Suite) { .locator(`[data-testid="${contact.toContact().aci}"]`) .click(); - debug('Accept conversation from contact'); + debug('Accept conversation from contact - does not trigger captcha!'); await conversationStack .locator('.module-message-request-actions button >> "Accept"') .click(); + debug('Sending a message back to user - will trigger captcha!'); + { + const input = await app.waitForEnabledComposer(); + await input.type('Hi, good to hear from you!'); + await input.press('Enter'); + } + debug('Waiting for challenge'); const request = await app.waitForChallenge(); @@ -114,14 +140,122 @@ describe('challenge/receipts', function (this: Mocha.Suite) { target: contact.device.aci, }); - debug(`rate limited requests: ${requests}`); - assert.strictEqual(requests, 1); + debug(`Rate-limited requests: ${requests}`); + assert.strictEqual(requests, 1, 'rate limit requests'); - debug('Waiting for receipts'); + debug('Waiting for outgoing read receipt'); const receipts = await app.waitForReceipts(); assert.strictEqual(receipts.type, ReceiptType.Read); - assert.strictEqual(receipts.timestamps.length, 1); + assert.strictEqual(receipts.timestamps.length, 1, 'receipts'); assert.strictEqual(receipts.timestamps[0], timestamp); }); + + it('should send non-bubble in ConvoA when ConvoB completes challenge', async () => { + const { server, desktop } = bootstrap; + + debug( + `Rate limiting (desktop: ${desktop.aci}) -> (ContactA: ${contact.device.aci})` + ); + server.rateLimit({ source: desktop.aci, target: contact.device.aci }); + debug( + `Rate limiting (desktop: ${desktop.aci}) -> (ContactB: ${contactB.device.aci})` + ); + server.rateLimit({ source: desktop.aci, target: contactB.device.aci }); + + const window = await app.getWindow(); + const leftPane = window.locator('#LeftPane'); + const conversationStack = window.locator('.Inbox__conversation-stack'); + + debug('Sending a message from ContactA'); + const timestampA = bootstrap.getTimestamp(); + await contact.sendText(desktop, 'Hello there!', { + timestamp: timestampA, + }); + + debug(`Opening conversation with ContactA (${contact.toContact().aci})`); + await leftPane + .locator(`[data-testid="${contact.toContact().aci}"]`) + .click(); + + debug('Accept conversation from ContactA - does not trigger captcha!'); + await conversationStack + .locator('.module-message-request-actions button >> "Accept"') + .click(); + + debug('Sending a message from ContactB'); + const timestampB = bootstrap.getTimestamp(); + await contactB.sendText(desktop, 'Hey there!', { + timestamp: timestampB, + }); + + debug(`Opening conversation with ContactB (${contact.toContact().aci})`); + await leftPane + .locator(`[data-testid="${contactB.toContact().aci}"]`) + .click(); + + debug('Accept conversation from ContactB - does not trigger captcha!'); + await conversationStack + .locator('.module-message-request-actions button >> "Accept"') + .click(); + + debug('Sending a message back to ContactB - will trigger captcha!'); + { + const input = await app.waitForEnabledComposer(); + await input.type('Hi, good to hear from you!'); + await input.press('Enter'); + } + + debug('Waiting for challenge'); + const request = await app.waitForChallenge(); + + debug('Solving challenge'); + await app.solveChallenge({ + seq: request.seq, + data: { captcha: 'anything' }, + }); + + const requestsA = server.stopRateLimiting({ + source: desktop.aci, + target: contact.device.aci, + }); + const requestsB = server.stopRateLimiting({ + source: desktop.aci, + target: contactB.device.aci, + }); + + debug(`Rate-limited requests to A: ${requestsA}`); + assert.strictEqual(requestsA, 1, 'rate limit requests'); + + debug(`Rate-limited requests to B: ${requestsA}`); + assert.strictEqual(requestsB, 1, 'rate limit requests'); + + debug('Waiting for outgoing read receipt #1'); + const receipts1 = await app.waitForReceipts(); + + assert.strictEqual(receipts1.type, ReceiptType.Read); + assert.strictEqual(receipts1.timestamps.length, 1, 'receipts'); + if ( + !receipts1.timestamps.includes(timestampA) && + !receipts1.timestamps.includes(timestampB) + ) { + throw new Error( + 'receipts1: Failed to find both timestampA and timestampB' + ); + } + + debug('Waiting for outgoing read receipt #2'); + const receipts2 = await app.waitForReceipts(); + + assert.strictEqual(receipts2.type, ReceiptType.Read); + assert.strictEqual(receipts2.timestamps.length, 1, 'receipts'); + if ( + !receipts2.timestamps.includes(timestampA) && + !receipts2.timestamps.includes(timestampB) + ) { + throw new Error( + 'receipts2: Failed to find both timestampA and timestampB' + ); + } + }); }); diff --git a/ts/test-node/jobs/JobQueue_test.ts b/ts/test-node/jobs/JobQueue_test.ts index b35113a01e..18461c9144 100644 --- a/ts/test-node/jobs/JobQueue_test.ts +++ b/ts/test-node/jobs/JobQueue_test.ts @@ -16,6 +16,7 @@ import { missingCaseError } from '../../util/missingCaseError'; import { drop } from '../../util/drop'; import type { LoggerType } from '../../types/Logging'; +import type { JOB_STATUS } from '../../jobs/JobQueue'; import { JobQueue } from '../../jobs/JobQueue'; import type { ParsedJob, StoredJob, JobQueueStore } from '../../jobs/types'; import { sleep } from '../../util/sleep'; @@ -38,8 +39,13 @@ describe('JobQueue', () => { return testJobSchema.parse(data); } - async run({ data }: ParsedJob): Promise { + async run({ + data, + }: ParsedJob): Promise< + typeof JOB_STATUS.NEEDS_RETRY | undefined + > { results.add(data.a + data.b); + return undefined; } } @@ -83,13 +89,15 @@ describe('JobQueue', () => { return z.number().parse(data); } - async run(): Promise { + async run(): Promise { try { updateActiveJobCount(1); await sleep(1); } finally { updateActiveJobCount(-1); } + + return undefined; } } @@ -142,8 +150,8 @@ describe('JobQueue', () => { return testQueue; } - run(): Promise { - return Promise.resolve(); + run(): Promise { + return Promise.resolve(undefined); } } @@ -175,8 +183,8 @@ describe('JobQueue', () => { return z.string().parse(data); } - async run(): Promise { - return Promise.resolve(); + async run(): Promise { + return Promise.resolve(undefined); } } @@ -243,8 +251,8 @@ describe('JobQueue', () => { return z.string().parse(data); } - async run(): Promise { - return Promise.resolve(); + async run(): Promise { + return Promise.resolve(undefined); } } @@ -291,7 +299,11 @@ describe('JobQueue', () => { return data; } - async run({ data }: ParsedJob): Promise { + async run({ + data, + }: ParsedJob): Promise< + typeof JOB_STATUS.NEEDS_RETRY | undefined + > { switch (data) { case 'foo': fooAttempts += 1; @@ -308,6 +320,7 @@ describe('JobQueue', () => { default: throw missingCaseError(data); } + return undefined; } } @@ -360,7 +373,7 @@ describe('JobQueue', () => { async run( _: unknown, { attempt }: Readonly<{ attempt: number }> - ): Promise { + ): Promise { attempts.push(attempt); throw new Error('this job always fails'); } @@ -405,10 +418,12 @@ describe('JobQueue', () => { async run( _: unknown, { log }: Readonly<{ log: LoggerType }> - ): Promise { + ): Promise { log.info(uniqueString); log.warn(uniqueString); log.error(uniqueString); + + return undefined; } } @@ -450,8 +465,8 @@ describe('JobQueue', () => { throw new Error('uh oh'); } - async run(): Promise { - return Promise.resolve(); + async run(): Promise { + return Promise.resolve(undefined); } } @@ -494,7 +509,9 @@ describe('JobQueue', () => { throw new Error('invalid data!'); } - run(job: { data: string }): Promise { + run(job: { + data: string; + }): Promise { return run(job); } } @@ -528,8 +545,8 @@ describe('JobQueue', () => { throw new Error('invalid data!'); } - async run(): Promise { - return Promise.resolve(); + async run(): Promise { + return Promise.resolve(undefined); } } @@ -562,8 +579,8 @@ describe('JobQueue', () => { return undefined; } - async run(): Promise { - return Promise.resolve(); + async run(): Promise { + return Promise.resolve(undefined); } } @@ -596,8 +613,9 @@ describe('JobQueue', () => { return data; } - async run(): Promise { + async run(): Promise { events.push('running'); + return undefined; } } @@ -629,8 +647,8 @@ describe('JobQueue', () => { return undefined; } - async run(): Promise { - return Promise.resolve(); + async run(): Promise { + return Promise.resolve(undefined); } } @@ -670,7 +688,7 @@ describe('JobQueue', () => { return undefined; } - async run(): Promise { + async run(): Promise { events.push('running'); throw new Error('uh oh'); } @@ -751,8 +769,13 @@ describe('JobQueue', () => { return z.number().parse(data); } - async run({ data }: Readonly<{ data: number }>): Promise { + async run({ + data, + }: Readonly<{ data: number }>): Promise< + typeof JOB_STATUS.NEEDS_RETRY | undefined + > { eventEmitter.emit('run', data); + return undefined; } } @@ -794,8 +817,8 @@ describe('JobQueue', () => { return data; } - async run(): Promise { - return Promise.resolve(); + async run(): Promise { + return Promise.resolve(undefined); } } @@ -827,8 +850,8 @@ describe('JobQueue', () => { return undefined; } - async run(): Promise { - return Promise.resolve(); + async run(): Promise { + return Promise.resolve(undefined); } }