mirror of
https://github.com/microsoft/vscode.git
synced 2026-04-21 00:59:03 +01:00
558 lines
14 KiB
TypeScript
558 lines
14 KiB
TypeScript
/*---------------------------------------------------------------------------------------------
|
|
* Copyright (c) Microsoft Corporation. All rights reserved.
|
|
* Licensed under the MIT License. See License.txt in the project root for license information.
|
|
*--------------------------------------------------------------------------------------------*/
|
|
|
|
import { CancellationError, CancellationToken, Disposable, Event, EventEmitter } from 'vscode';
|
|
|
|
/**
|
|
* Can be passed into the Delayed to defer using a microtask
|
|
*/
|
|
export const MicrotaskDelay = Symbol('MicrotaskDelay');
|
|
|
|
export class SequencerByKey<TKey> {
|
|
|
|
private promiseMap = new Map<TKey, Promise<unknown>>();
|
|
|
|
queue<T>(key: TKey, promiseTask: () => Promise<T>): Promise<T> {
|
|
const runningPromise = this.promiseMap.get(key) ?? Promise.resolve();
|
|
const newPromise = runningPromise
|
|
.catch(() => { })
|
|
.then(promiseTask)
|
|
.finally(() => {
|
|
if (this.promiseMap.get(key) === newPromise) {
|
|
this.promiseMap.delete(key);
|
|
}
|
|
});
|
|
this.promiseMap.set(key, newPromise);
|
|
return newPromise;
|
|
}
|
|
}
|
|
|
|
export class IntervalTimer extends Disposable {
|
|
|
|
private _token: any;
|
|
|
|
constructor() {
|
|
super(() => this.cancel());
|
|
this._token = -1;
|
|
}
|
|
|
|
cancel(): void {
|
|
if (this._token !== -1) {
|
|
clearInterval(this._token);
|
|
this._token = -1;
|
|
}
|
|
}
|
|
|
|
cancelAndSet(runner: () => void, interval: number): void {
|
|
this.cancel();
|
|
this._token = setInterval(() => {
|
|
runner();
|
|
}, interval);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Returns a promise that rejects with an {@CancellationError} as soon as the passed token is cancelled.
|
|
* @see {@link raceCancellation}
|
|
*/
|
|
export function raceCancellationError<T>(promise: Promise<T>, token: CancellationToken): Promise<T> {
|
|
return new Promise((resolve, reject) => {
|
|
const ref = token.onCancellationRequested(() => {
|
|
ref.dispose();
|
|
reject(new CancellationError());
|
|
});
|
|
promise.then(resolve, reject).finally(() => ref.dispose());
|
|
});
|
|
}
|
|
|
|
export class TimeoutError extends Error {
|
|
constructor() {
|
|
super('Timed out');
|
|
}
|
|
}
|
|
|
|
export function raceTimeoutError<T>(promise: Promise<T>, timeout: number): Promise<T> {
|
|
return new Promise((resolve, reject) => {
|
|
const ref = setTimeout(() => {
|
|
reject(new CancellationError());
|
|
}, timeout);
|
|
promise.then(resolve, reject).finally(() => clearTimeout(ref));
|
|
});
|
|
}
|
|
|
|
export function raceCancellationAndTimeoutError<T>(promise: Promise<T>, token: CancellationToken, timeout: number): Promise<T> {
|
|
return raceCancellationError(raceTimeoutError(promise, timeout), token);
|
|
}
|
|
|
|
interface ILimitedTaskFactory<T> {
|
|
factory: () => Promise<T>;
|
|
c: (value: T | Promise<T>) => void;
|
|
e: (error?: unknown) => void;
|
|
}
|
|
|
|
export interface ILimiter<T> {
|
|
|
|
readonly size: number;
|
|
|
|
queue(factory: () => Promise<T>): Promise<T>;
|
|
|
|
clear(): void;
|
|
}
|
|
|
|
/**
|
|
* A helper to queue N promises and run them all with a max degree of parallelism. The helper
|
|
* ensures that at any time no more than M promises are running at the same time.
|
|
*/
|
|
export class Limiter<T> implements ILimiter<T> {
|
|
|
|
private _size = 0;
|
|
private _isDisposed = false;
|
|
private runningPromises: number;
|
|
private readonly maxDegreeOfParalellism: number;
|
|
private readonly outstandingPromises: ILimitedTaskFactory<T>[];
|
|
private readonly _onDrained: EventEmitter<void>;
|
|
|
|
constructor(maxDegreeOfParalellism: number) {
|
|
this.maxDegreeOfParalellism = maxDegreeOfParalellism;
|
|
this.outstandingPromises = [];
|
|
this.runningPromises = 0;
|
|
this._onDrained = new EventEmitter<void>();
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @returns A promise that resolved when all work is done (onDrained) or when
|
|
* there is nothing to do
|
|
*/
|
|
whenIdle(): Promise<void> {
|
|
return this.size > 0
|
|
? toPromise(this.onDrained)
|
|
: Promise.resolve();
|
|
}
|
|
|
|
get onDrained(): Event<void> {
|
|
return this._onDrained.event;
|
|
}
|
|
|
|
get size(): number {
|
|
return this._size;
|
|
}
|
|
|
|
queue(factory: () => Promise<T>): Promise<T> {
|
|
if (this._isDisposed) {
|
|
throw new Error('Object has been disposed');
|
|
}
|
|
this._size++;
|
|
|
|
return new Promise<T>((c, e) => {
|
|
this.outstandingPromises.push({ factory, c, e });
|
|
this.consume();
|
|
});
|
|
}
|
|
|
|
private consume(): void {
|
|
while (this.outstandingPromises.length && this.runningPromises < this.maxDegreeOfParalellism) {
|
|
const iLimitedTask = this.outstandingPromises.shift()!;
|
|
this.runningPromises++;
|
|
|
|
const promise = iLimitedTask.factory();
|
|
promise.then(iLimitedTask.c, iLimitedTask.e);
|
|
promise.then(() => this.consumed(), () => this.consumed());
|
|
}
|
|
}
|
|
|
|
private consumed(): void {
|
|
if (this._isDisposed) {
|
|
return;
|
|
}
|
|
this.runningPromises--;
|
|
if (--this._size === 0) {
|
|
this._onDrained.fire();
|
|
}
|
|
|
|
if (this.outstandingPromises.length > 0) {
|
|
this.consume();
|
|
}
|
|
}
|
|
|
|
clear(): void {
|
|
if (this._isDisposed) {
|
|
throw new Error('Object has been disposed');
|
|
}
|
|
this.outstandingPromises.length = 0;
|
|
this._size = this.runningPromises;
|
|
}
|
|
|
|
dispose(): void {
|
|
this._isDisposed = true;
|
|
this.outstandingPromises.length = 0; // stop further processing
|
|
this._size = 0;
|
|
this._onDrained.dispose();
|
|
}
|
|
}
|
|
|
|
|
|
interface IScheduledLater extends Disposable {
|
|
isTriggered(): boolean;
|
|
}
|
|
|
|
const timeoutDeferred = (timeout: number, fn: () => void): IScheduledLater => {
|
|
let scheduled = true;
|
|
const handle = setTimeout(() => {
|
|
scheduled = false;
|
|
fn();
|
|
}, timeout);
|
|
return {
|
|
isTriggered: () => scheduled,
|
|
dispose: () => {
|
|
clearTimeout(handle);
|
|
scheduled = false;
|
|
},
|
|
};
|
|
};
|
|
|
|
const microtaskDeferred = (fn: () => void): IScheduledLater => {
|
|
let scheduled = true;
|
|
queueMicrotask(() => {
|
|
if (scheduled) {
|
|
scheduled = false;
|
|
fn();
|
|
}
|
|
});
|
|
|
|
return {
|
|
isTriggered: () => scheduled,
|
|
dispose: () => { scheduled = false; },
|
|
};
|
|
};
|
|
|
|
/**
|
|
* A helper to delay (debounce) execution of a task that is being requested often.
|
|
*
|
|
* Following the throttler, now imagine the mail man wants to optimize the number of
|
|
* trips proactively. The trip itself can be long, so he decides not to make the trip
|
|
* as soon as a letter is submitted. Instead he waits a while, in case more
|
|
* letters are submitted. After said waiting period, if no letters were submitted, he
|
|
* decides to make the trip. Imagine that N more letters were submitted after the first
|
|
* one, all within a short period of time between each other. Even though N+1
|
|
* submissions occurred, only 1 delivery was made.
|
|
*
|
|
* The delayer offers this behavior via the trigger() method, into which both the task
|
|
* to be executed and the waiting period (delay) must be passed in as arguments. Following
|
|
* the example:
|
|
*
|
|
* const delayer = new Delayer(WAITING_PERIOD);
|
|
* const letters = [];
|
|
*
|
|
* function letterReceived(l) {
|
|
* letters.push(l);
|
|
* delayer.trigger(() => { return makeTheTrip(); });
|
|
* }
|
|
*/
|
|
export class Delayer<T> implements Disposable {
|
|
|
|
private deferred: IScheduledLater | null;
|
|
private completionPromise: Promise<any> | null;
|
|
private doResolve: ((value?: any | Promise<any>) => void) | null;
|
|
private doReject: ((err: any) => void) | null;
|
|
private task: (() => T | Promise<T>) | null;
|
|
|
|
constructor(public defaultDelay: number | typeof MicrotaskDelay) {
|
|
this.deferred = null;
|
|
this.completionPromise = null;
|
|
this.doResolve = null;
|
|
this.doReject = null;
|
|
this.task = null;
|
|
}
|
|
|
|
trigger(task: () => T | Promise<T>, delay = this.defaultDelay): Promise<T> {
|
|
this.task = task;
|
|
this.cancelTimeout();
|
|
|
|
if (!this.completionPromise) {
|
|
this.completionPromise = new Promise((resolve, reject) => {
|
|
this.doResolve = resolve;
|
|
this.doReject = reject;
|
|
}).then(() => {
|
|
this.completionPromise = null;
|
|
this.doResolve = null;
|
|
if (this.task) {
|
|
const task = this.task;
|
|
this.task = null;
|
|
return task();
|
|
}
|
|
return undefined;
|
|
});
|
|
}
|
|
|
|
const fn = () => {
|
|
this.deferred = null;
|
|
this.doResolve?.(null);
|
|
};
|
|
|
|
this.deferred = delay === MicrotaskDelay ? microtaskDeferred(fn) : timeoutDeferred(delay, fn);
|
|
|
|
return this.completionPromise;
|
|
}
|
|
|
|
isTriggered(): boolean {
|
|
return !!this.deferred?.isTriggered();
|
|
}
|
|
|
|
cancel(): void {
|
|
this.cancelTimeout();
|
|
|
|
if (this.completionPromise) {
|
|
this.doReject?.(new CancellationError());
|
|
this.completionPromise = null;
|
|
}
|
|
}
|
|
|
|
private cancelTimeout(): void {
|
|
this.deferred?.dispose();
|
|
this.deferred = null;
|
|
}
|
|
|
|
dispose(): void {
|
|
this.cancel();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* A helper to prevent accumulation of sequential async tasks.
|
|
*
|
|
* Imagine a mail man with the sole task of delivering letters. As soon as
|
|
* a letter submitted for delivery, he drives to the destination, delivers it
|
|
* and returns to his base. Imagine that during the trip, N more letters were submitted.
|
|
* When the mail man returns, he picks those N letters and delivers them all in a
|
|
* single trip. Even though N+1 submissions occurred, only 2 deliveries were made.
|
|
*
|
|
* The throttler implements this via the queue() method, by providing it a task
|
|
* factory. Following the example:
|
|
*
|
|
* const throttler = new Throttler();
|
|
* const letters = [];
|
|
*
|
|
* function deliver() {
|
|
* const lettersToDeliver = letters;
|
|
* letters = [];
|
|
* return makeTheTrip(lettersToDeliver);
|
|
* }
|
|
*
|
|
* function onLetterReceived(l) {
|
|
* letters.push(l);
|
|
* throttler.queue(deliver);
|
|
* }
|
|
*/
|
|
export class Throttler implements Disposable {
|
|
|
|
private activePromise: Promise<any> | null;
|
|
private queuedPromise: Promise<any> | null;
|
|
private queuedPromiseFactory: (() => Promise<any>) | null;
|
|
|
|
private isDisposed = false;
|
|
|
|
constructor() {
|
|
this.activePromise = null;
|
|
this.queuedPromise = null;
|
|
this.queuedPromiseFactory = null;
|
|
}
|
|
|
|
queue<T>(promiseFactory: () => Promise<T>): Promise<T> {
|
|
if (this.isDisposed) {
|
|
return Promise.reject(new Error('Throttler is disposed'));
|
|
}
|
|
|
|
if (this.activePromise) {
|
|
this.queuedPromiseFactory = promiseFactory;
|
|
|
|
if (!this.queuedPromise) {
|
|
const onComplete = () => {
|
|
this.queuedPromise = null;
|
|
|
|
if (this.isDisposed) {
|
|
return;
|
|
}
|
|
|
|
const result = this.queue(this.queuedPromiseFactory!);
|
|
this.queuedPromiseFactory = null;
|
|
|
|
return result;
|
|
};
|
|
|
|
this.queuedPromise = new Promise(resolve => {
|
|
this.activePromise!.then(onComplete, onComplete).then(resolve);
|
|
});
|
|
}
|
|
|
|
return new Promise((resolve, reject) => {
|
|
this.queuedPromise!.then(resolve, reject);
|
|
});
|
|
}
|
|
|
|
this.activePromise = promiseFactory();
|
|
|
|
return new Promise((resolve, reject) => {
|
|
this.activePromise!.then((result: T) => {
|
|
this.activePromise = null;
|
|
resolve(result);
|
|
}, (err: unknown) => {
|
|
this.activePromise = null;
|
|
reject(err);
|
|
});
|
|
});
|
|
}
|
|
|
|
dispose(): void {
|
|
this.isDisposed = true;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* A helper to delay execution of a task that is being requested often, while
|
|
* preventing accumulation of consecutive executions, while the task runs.
|
|
*
|
|
* The mail man is clever and waits for a certain amount of time, before going
|
|
* out to deliver letters. While the mail man is going out, more letters arrive
|
|
* and can only be delivered once he is back. Once he is back the mail man will
|
|
* do one more trip to deliver the letters that have accumulated while he was out.
|
|
*/
|
|
export class ThrottledDelayer<T> {
|
|
|
|
private delayer: Delayer<Promise<T>>;
|
|
private throttler: Throttler;
|
|
|
|
constructor(defaultDelay: number) {
|
|
this.delayer = new Delayer(defaultDelay);
|
|
this.throttler = new Throttler();
|
|
}
|
|
|
|
trigger(promiseFactory: () => Promise<T>, delay?: number): Promise<T> {
|
|
return this.delayer.trigger(() => this.throttler.queue(promiseFactory), delay) as unknown as Promise<T>;
|
|
}
|
|
|
|
isTriggered(): boolean {
|
|
return this.delayer.isTriggered();
|
|
}
|
|
|
|
cancel(): void {
|
|
this.delayer.cancel();
|
|
}
|
|
|
|
dispose(): void {
|
|
this.delayer.dispose();
|
|
this.throttler.dispose();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* A queue is handles one promise at a time and guarantees that at any time only one promise is executing.
|
|
*/
|
|
export class Queue<T> extends Limiter<T> {
|
|
|
|
constructor() {
|
|
super(1);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Given an event, returns another event which only fires once.
|
|
*
|
|
* @param event The event source for the new event.
|
|
*/
|
|
export function once<T>(event: Event<T>): Event<T> {
|
|
return (listener, thisArgs = null, disposables?) => {
|
|
// we need this, in case the event fires during the listener call
|
|
let didFire = false;
|
|
let result: Disposable | undefined = undefined;
|
|
result = event(e => {
|
|
if (didFire) {
|
|
return;
|
|
} else if (result) {
|
|
result.dispose();
|
|
} else {
|
|
didFire = true;
|
|
}
|
|
|
|
return listener.call(thisArgs, e);
|
|
}, null, disposables);
|
|
|
|
if (didFire) {
|
|
result.dispose();
|
|
}
|
|
|
|
return result;
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Creates a promise out of an event, using the {@link Event.once} helper.
|
|
*/
|
|
export function toPromise<T>(event: Event<T>): Promise<T> {
|
|
return new Promise(resolve => once(event)(resolve));
|
|
}
|
|
|
|
export type ValueCallback<T = unknown> = (value: T | Promise<T>) => void;
|
|
|
|
const enum DeferredOutcome {
|
|
Resolved,
|
|
Rejected
|
|
}
|
|
|
|
/**
|
|
* Creates a promise whose resolution or rejection can be controlled imperatively.
|
|
*/
|
|
export class DeferredPromise<T> {
|
|
|
|
private completeCallback!: ValueCallback<T>;
|
|
private errorCallback!: (err: unknown) => void;
|
|
private outcome?: { outcome: DeferredOutcome.Rejected; value: any } | { outcome: DeferredOutcome.Resolved; value: T };
|
|
|
|
public get isRejected() {
|
|
return this.outcome?.outcome === DeferredOutcome.Rejected;
|
|
}
|
|
|
|
public get isResolved() {
|
|
return this.outcome?.outcome === DeferredOutcome.Resolved;
|
|
}
|
|
|
|
public get isSettled() {
|
|
return !!this.outcome;
|
|
}
|
|
|
|
public get value() {
|
|
return this.outcome?.outcome === DeferredOutcome.Resolved ? this.outcome?.value : undefined;
|
|
}
|
|
|
|
public readonly p: Promise<T>;
|
|
|
|
constructor() {
|
|
this.p = new Promise<T>((c, e) => {
|
|
this.completeCallback = c;
|
|
this.errorCallback = e;
|
|
});
|
|
}
|
|
|
|
public complete(value: T) {
|
|
return new Promise<void>(resolve => {
|
|
this.completeCallback(value);
|
|
this.outcome = { outcome: DeferredOutcome.Resolved, value };
|
|
resolve();
|
|
});
|
|
}
|
|
|
|
public error(err: unknown) {
|
|
return new Promise<void>(resolve => {
|
|
this.errorCallback(err);
|
|
this.outcome = { outcome: DeferredOutcome.Rejected, value: err };
|
|
resolve();
|
|
});
|
|
}
|
|
|
|
public cancel() {
|
|
return this.error(new CancellationError());
|
|
}
|
|
}
|