From 3e51e4ef5dc87f80927054f5c1902570e29d128b Mon Sep 17 00:00:00 2001 From: Scott Nonnenberg Date: Mon, 6 May 2024 17:33:50 -0700 Subject: [PATCH] conversationJobQueue: Introduce RUNNING status, attempts and backoff --- _locales/en/messages.json | 12 +-- ts/jobs/conversationJobQueue.ts | 146 +++++++++++++++++++++++++++----- ts/util/BackOff.ts | 2 + 3 files changed, 133 insertions(+), 27 deletions(-) diff --git a/_locales/en/messages.json b/_locales/en/messages.json index fa769ecf95..2ba666e931 100644 --- a/_locales/en/messages.json +++ b/_locales/en/messages.json @@ -4263,7 +4263,7 @@ "description": "Shown in timeline or conversation preview when v2 group changes" }, "icu:GroupV2--pending-remove--revoke-invite-from--one--unknown": { - "messageformat": "An admin revoked an invitation to the group for 1 person invited by {memberName}.", + "messageformat": "An admin revoked an invitation to the group for # person invited by {memberName}.", "description": "Shown in timeline or conversation preview when v2 group changes" }, "icu:GroupV2--pending-remove--revoke-invite-from-you--one--other": { @@ -4279,19 +4279,19 @@ "description": "Shown in timeline or conversation preview when v2 group changes" }, "icu:GroupV2--pending-remove--revoke-invite-from--many--other": { - "messageformat": "{adminName} revoked {count, plural, one {an invitation to the group for 1 person} other {invitations to the group for # people}} invited by {memberName}.", + "messageformat": "{adminName} revoked {count, plural, one {an invitation to the group for # person} other {invitations to the group for # people}} invited by {memberName}.", "description": "Shown in timeline or conversation preview when v2 group changes" }, "icu:GroupV2--pending-remove--revoke-invite-from--many--you": { - "messageformat": "You revoked {count, plural, one {an invitation to the group for 1 person} other {invitations to the group for # people}} invited by {memberName}.", + "messageformat": "You revoked {count, plural, one {an invitation to the group for # person} other {invitations to the group for # people}} invited by {memberName}.", "description": "Shown in timeline or conversation preview when v2 group changes" }, "icu:GroupV2--pending-remove--revoke-invite-from--many--unknown": { - "messageformat": "An admin revoked {count, plural, one {an invitation to the group for 1 person} other {invitations to the group for # people}} invited by {memberName}.", + "messageformat": "An admin revoked {count, plural, one {an invitation to the group for # person} other {invitations to the group for # people}} invited by {memberName}.", "description": "Shown in timeline or conversation preview when v2 group changes" }, "icu:GroupV2--pending-remove--revoke-invite-from-you--many--other": { - "messageformat": "{adminName} revoked the {count, plural, one {invitation to the group you sent to 1 person} other {invitations to the group you sent to # people}}.", + "messageformat": "{adminName} revoked the {count, plural, one {invitation to the group you sent to # person} other {invitations to the group you sent to # people}}.", "description": "Shown in timeline or conversation preview when v2 group changes" }, "icu:GroupV2--pending-remove--revoke-invite-from-you--many--you": { @@ -4299,7 +4299,7 @@ "description": "Shown in timeline or conversation preview when v2 group changes" }, "icu:GroupV2--pending-remove--revoke-invite-from-you--many--unknown": { - "messageformat": "An admin revoked the {count, plural, one {invitation to the group you sent to 1 person} other {invitations to the group you sent to # people}}.", + "messageformat": "An admin revoked the {count, plural, one {invitation to the group you sent to # person} other {invitations to the group you sent to # people}}.", "description": "Shown in timeline or conversation preview when v2 group changes" }, "icu:GroupV2--admin-approval-add-one--you": { diff --git a/ts/jobs/conversationJobQueue.ts b/ts/jobs/conversationJobQueue.ts index 47a5dab825..73b44bdacf 100644 --- a/ts/jobs/conversationJobQueue.ts +++ b/ts/jobs/conversationJobQueue.ts @@ -49,6 +49,7 @@ import { sendSavedProto } from './helpers/sendSavedProto'; import { drop } from '../util/drop'; import { isInPast } from '../util/timestamp'; import { clearTimeoutIfNecessary } from '../util/clearTimeoutIfNecessary'; +import { FIBONACCI } from '../util/BackOff'; // Note: generally, we only want to add to this list. If you do need to change one of // these values, you'll likely need to write a database migration. @@ -321,6 +322,7 @@ enum RETRY_STATUS { BLOCKED = 'BLOCKED', BLOCKED_WITH_JOBS = 'BLOCKED_WITH_JOBS', UNBLOCKED = 'UNBLOCKED', + RUNNING = 'RUNNING', } type ConversationData = Readonly< @@ -329,6 +331,7 @@ type ConversationData = Readonly< // yet have a job to retry. We should, very soon, when the job returns // JOB_STATUS.NEEDS_RETRY. This should be a very short-lived state. status: RETRY_STATUS.BLOCKED; + attempts: number; callback: undefined; jobsNeedingRetry: undefined; retryAt: number; @@ -337,6 +340,7 @@ type ConversationData = Readonly< // This is the next stage, when we've added at least one job needing retry, and we // have a callback registered to run on queue idle (or be called directly). status: RETRY_STATUS.BLOCKED_WITH_JOBS; + attempts: number; callback: () => void; jobsNeedingRetry: Array>; retryAt: number; @@ -345,13 +349,27 @@ type ConversationData = Readonly< | { // When we discover that we can now run these deferred jobs, we flip into this // state, which should be short-lived. We very quickly re-enqueue all - // jobsNeedingRetry, and erase perConversationData for this conversation. + // jobsNeedingRetry, and move to RETRY_STATUS.RUNNING for this conversation. status: RETRY_STATUS.UNBLOCKED; + attempts: number; callback: () => void; jobsNeedingRetry: Array>; retryAt: undefined; retryAtTimeout?: NodeJS.Timeout; } + | { + // When we've queued all jobs needing retry, and we're waiting for the results + // of our next set of attempted sends, we are in this state. Its only real purpose + // is to keep track of our attempts, so we can exponentially back off. Once a send + // goes through successfully, we erase perConversationData for this conversation. + // Otherwise, we go back to RETRY_STATUS.BLOCKED. + status: RETRY_STATUS.RUNNING; + attempts: number; + callback: undefined; + jobsNeedingRetry: undefined; + retryAt: undefined; + retryAtTimeout: undefined; + } >; export class ConversationJobQueue extends JobQueue { @@ -450,15 +468,22 @@ export class ConversationJobQueue extends JobQueue { return; } - const { status, callback } = perConversationData; + const { attempts, status, callback } = perConversationData; if (status === RETRY_STATUS.BLOCKED) { globalLogger.info( - `${logId}: Deleting previous BLOCKED state; had no jobs` + `${logId}: Previously BLOCKED, moving to RUNNING state` ); - this.perConversationData.delete(conversationId); + this.perConversationData.set(conversationId, { + status: RETRY_STATUS.RUNNING, + attempts, + callback: undefined, + jobsNeedingRetry: undefined, + retryAt: undefined, + retryAtTimeout: undefined, + }); } else if (status === RETRY_STATUS.BLOCKED_WITH_JOBS) { globalLogger.info( - `${logId}: Moving previous WAITING state to UNBLOCKED, calling callback directly` + `${logId}: Moving previous BLOCKED state to UNBLOCKED, calling callback directly` ); this.perConversationData.set(conversationId, { ...perConversationData, @@ -471,17 +496,54 @@ export class ConversationJobQueue extends JobQueue { `${logId}: We're still in UNBLOCKED state; calling callback directly` ); callback(); + } else if (status === RETRY_STATUS.RUNNING) { + globalLogger.warn( + `${logId}: We're already in RUNNING state; doing nothing` + ); } else { throw missingCaseError(status); } } + private recordSuccessfulSend(conversationId: string) { + const logId = `recordSuccessfulSend/${conversationId}`; + + const perConversationData = this.perConversationData.get(conversationId); + if (!perConversationData) { + return; + } + + const { status } = perConversationData; + if (status === RETRY_STATUS.RUNNING || status === RETRY_STATUS.BLOCKED) { + globalLogger.info(`${logId}: Previously ${status}; clearing state`); + this.perConversationData.delete(conversationId); + } else if ( + status === RETRY_STATUS.BLOCKED_WITH_JOBS || + status === RETRY_STATUS.UNBLOCKED + ) { + globalLogger.warn( + `${logId}: We're still in ${status} state; calling unblockConversationRetries` + ); + // We have to do this because in these states there are jobs that need to be retried + this.unblockConversationRetries(conversationId); + } else { + throw missingCaseError(status); + } + } + + private getRetryWithBackoff(attempts: number) { + return ( + Date.now() + + MINUTE * (FIBONACCI[attempts] ?? FIBONACCI[FIBONACCI.length - 1]) + ); + } + private captureRetryAt(conversationId: string, retryAt: number | undefined) { const logId = `captureRetryAt/${conversationId}`; - const newRetryAt = retryAt || Date.now() + MINUTE; const perConversationData = this.perConversationData.get(conversationId); if (!perConversationData) { + const newRetryAt = retryAt || Date.now() + MINUTE; if (!retryAt) { globalLogger.warn( `${logId}: No existing data, using retryAt of ${newRetryAt}` @@ -489,6 +551,7 @@ export class ConversationJobQueue extends JobQueue { } this.perConversationData.set(conversationId, { status: RETRY_STATUS.BLOCKED, + attempts: 1, retryAt: newRetryAt, callback: undefined, jobsNeedingRetry: undefined, @@ -498,9 +561,12 @@ export class ConversationJobQueue extends JobQueue { } const { status, retryAt: existingRetryAt } = perConversationData; - if (existingRetryAt && existingRetryAt >= newRetryAt) { + const attempts = perConversationData.attempts + 1; + const retryWithBackoff = this.getRetryWithBackoff(attempts); + + if (existingRetryAt && existingRetryAt >= retryWithBackoff) { globalLogger.warn( - `${logId}: New newRetryAt ${newRetryAt} isn't after existing retryAt ${existingRetryAt}, dropping` + `${logId}: New newRetryAt ${retryWithBackoff} isn't after existing retryAt ${existingRetryAt}, dropping` ); return; } @@ -510,20 +576,31 @@ export class ConversationJobQueue extends JobQueue { status === RETRY_STATUS.BLOCKED_WITH_JOBS ) { globalLogger.info( - `${logId}: Updating to newRetryAt ${newRetryAt} from existing retryAt ${existingRetryAt}, status ${status}` + `${logId}: Updating to new retryAt ${retryWithBackoff} (attempts ${attempts}) from existing retryAt ${existingRetryAt}, status ${status}` ); this.perConversationData.set(conversationId, { ...perConversationData, - retryAt: newRetryAt, + retryAt: retryWithBackoff, }); } else if (status === RETRY_STATUS.UNBLOCKED) { globalLogger.info( - `${logId}: Updating to newRetryAt ${newRetryAt} from previous UNBLOCKED status` + `${logId}: Updating to new retryAt ${retryWithBackoff} (attempts ${attempts}) from previous UNBLOCKED status` ); this.perConversationData.set(conversationId, { ...perConversationData, status: RETRY_STATUS.BLOCKED_WITH_JOBS, - retryAt: newRetryAt, + retryAt: retryWithBackoff, + }); + } else if (status === RETRY_STATUS.RUNNING) { + globalLogger.info( + `${logId}: Updating to new retryAt ${retryWithBackoff} (attempts ${attempts}) from previous RUNNING status` + ); + this.perConversationData.set(conversationId, { + status: RETRY_STATUS.BLOCKED, + attempts, + retryAt: retryWithBackoff, + callback: undefined, + jobsNeedingRetry: undefined, }); } else { throw missingCaseError(status); @@ -551,10 +628,12 @@ export class ConversationJobQueue extends JobQueue { ); } - const { status, retryAt, jobsNeedingRetry, callback } = + const defaultRetryAt = Date.now() + MINUTE; + const { attempts, callback, jobsNeedingRetry, status, retryAt } = perConversationData || { + attempts: 1, status: RETRY_STATUS.BLOCKED, - retryAt: Date.now() + MINUTE, + retryAt: defaultRetryAt, }; const newJobsNeedingRetry = (jobsNeedingRetry || []).concat([storedJob]); @@ -571,13 +650,28 @@ export class ConversationJobQueue extends JobQueue { ) { this.perConversationData.set(conversationId, { status: RETRY_STATUS.BLOCKED_WITH_JOBS, + attempts, retryAt, jobsNeedingRetry: newJobsNeedingRetry, callback: newCallback, }); + } else if (status === RETRY_STATUS.RUNNING) { + const newAttempts = attempts + 1; + const newRetryAt = this.getRetryWithBackoff(newAttempts); + logger.warn( + `${logId}: Moving from state RUNNING to BLOCKED_WITH_JOBS, with retryAt ${newRetryAt}, (attempts ${newAttempts})` + ); + this.perConversationData.set(conversationId, { + status: RETRY_STATUS.BLOCKED_WITH_JOBS, + attempts: newAttempts, + retryAt: newRetryAt, + jobsNeedingRetry: newJobsNeedingRetry, + callback: newCallback, + }); } else { this.perConversationData.set(conversationId, { status: RETRY_STATUS.UNBLOCKED, + attempts, retryAt, jobsNeedingRetry: newJobsNeedingRetry, callback: newCallback, @@ -616,15 +710,12 @@ export class ConversationJobQueue extends JobQueue { } const { status, retryAt } = perConversationData; - if (status === RETRY_STATUS.BLOCKED) { - globalLogger.warn( - `${logId}: Still in blocked state, no jobs to retry. Clearing perConversationData.` - ); - this.perConversationData.delete(conversationId); + if (status === RETRY_STATUS.BLOCKED || status === RETRY_STATUS.RUNNING) { + globalLogger.warn(`${logId}: In ${status} state; no jobs to retry.`); return; } - const { callback, jobsNeedingRetry, retryAtTimeout } = + const { attempts, callback, jobsNeedingRetry, retryAtTimeout } = perConversationData; if (retryAtTimeout) { @@ -639,10 +730,18 @@ export class ConversationJobQueue extends JobQueue { // We're starting to retry jobs; remove the challenge handler drop(window.Signal.challengeHandler?.unregister(conversationId, logId)); + this.perConversationData.set(conversationId, { + status: RETRY_STATUS.RUNNING, + attempts, + callback: undefined, + jobsNeedingRetry: undefined, + retryAt: undefined, + retryAtTimeout: undefined, + }); jobsNeedingRetry?.forEach(job => { drop(this.enqueueStoredJob(job)); }); - this.perConversationData.delete(conversationId); + return; } @@ -701,6 +800,7 @@ export class ConversationJobQueue extends JobQueue { skipWait: count > 1, }); if (!shouldContinue) { + // We don't return here because each sub-task has its own cleanup sequence break; } @@ -873,6 +973,10 @@ export class ConversationJobQueue extends JobQueue { } } + if (shouldContinue && !this.isShuttingDown) { + this.recordSuccessfulSend(conversationId); + } + return undefined; } catch (error: unknown) { const untrustedServiceIds: Array = []; diff --git a/ts/util/BackOff.ts b/ts/util/BackOff.ts index a68c4fe5b6..e8bb5012a6 100644 --- a/ts/util/BackOff.ts +++ b/ts/util/BackOff.ts @@ -3,6 +3,8 @@ const SECOND = 1000; +export const FIBONACCI: ReadonlyArray = [1, 2, 3, 5, 8, 13, 21, 34, 55]; + export const FIBONACCI_TIMEOUTS: ReadonlyArray = [ 1 * SECOND, 2 * SECOND,