diff --git a/src/vs/base/common/async.ts b/src/vs/base/common/async.ts index 7dce2743917..ed98922a415 100644 --- a/src/vs/base/common/async.ts +++ b/src/vs/base/common/async.ts @@ -4,10 +4,9 @@ *--------------------------------------------------------------------------------------------*/ import { CancellationToken, CancellationTokenSource } from 'vs/base/common/cancellation'; -import { canceled, onUnexpectedError } from 'vs/base/common/errors'; -import { Emitter, Event, Listener } from 'vs/base/common/event'; +import { canceled, } from 'vs/base/common/errors'; +import { Emitter, Event, } from 'vs/base/common/event'; import { IDisposable, toDisposable, Disposable, MutableDisposable } from 'vs/base/common/lifecycle'; -import { LinkedList } from 'vs/base/common/linkedList'; import { extUri as defaultExtUri, IExtUri } from 'vs/base/common/resources'; import { URI } from 'vs/base/common/uri'; @@ -1195,68 +1194,6 @@ export class DeferredPromise { //#endregion -//#region - -export interface IWaitUntil { - waitUntil(thenable: Promise): void; -} - -export class AsyncEmitter extends Emitter { - - private _asyncDeliveryQueue?: LinkedList<[Listener, Omit]>; - - async fireAsync(data: Omit, token: CancellationToken, promiseJoin?: (p: Promise, listener: Function) => Promise): Promise { - if (!this._listeners) { - return; - } - - if (!this._asyncDeliveryQueue) { - this._asyncDeliveryQueue = new LinkedList(); - } - - for (const listener of this._listeners) { - this._asyncDeliveryQueue.push([listener, data]); - } - - while (this._asyncDeliveryQueue.size > 0 && !token.isCancellationRequested) { - - const [listener, data] = this._asyncDeliveryQueue.shift()!; - const thenables: Promise[] = []; - - const event = { - ...data, - waitUntil: (p: Promise): void => { - if (Object.isFrozen(thenables)) { - throw new Error('waitUntil can NOT be called asynchronous'); - } - if (promiseJoin) { - p = promiseJoin(p, typeof listener === 'function' ? listener : listener[0]); - } - thenables.push(p); - } - }; - - try { - if (typeof listener === 'function') { - listener.call(undefined, event); - } else { - listener[0].call(listener[1], event); - } - } catch (e) { - onUnexpectedError(e); - continue; - } - - // freeze thenables-collection to enforce sync-calls to - // wait until and then wait for all thenables to resolve - Object.freeze(thenables); - await Promises.settled(thenables).catch(e => onUnexpectedError(e)); - } - } -} - -//#endregion - //#region Promises export namespace Promises { diff --git a/src/vs/base/common/event.ts b/src/vs/base/common/event.ts index 60699292417..afd00b4fe8a 100644 --- a/src/vs/base/common/event.ts +++ b/src/vs/base/common/event.ts @@ -3,6 +3,7 @@ * Licensed under the MIT License. See License.txt in the project root for license information. *--------------------------------------------------------------------------------------------*/ +import { CancellationToken } from 'vs/base/common/cancellation'; import { onUnexpectedError } from 'vs/base/common/errors'; import { once as onceFn } from 'vs/base/common/functional'; import { Disposable, IDisposable, toDisposable, combinedDisposable, DisposableStore } from 'vs/base/common/lifecycle'; @@ -597,6 +598,73 @@ export class Emitter { } } + +export interface IWaitUntil { + waitUntil(thenable: Promise): void; +} + +export class AsyncEmitter extends Emitter { + + private _asyncDeliveryQueue?: LinkedList<[Listener, Omit]>; + + async fireAsync(data: Omit, token: CancellationToken, promiseJoin?: (p: Promise, listener: Function) => Promise): Promise { + if (!this._listeners) { + return; + } + + if (!this._asyncDeliveryQueue) { + this._asyncDeliveryQueue = new LinkedList(); + } + + for (const listener of this._listeners) { + this._asyncDeliveryQueue.push([listener, data]); + } + + while (this._asyncDeliveryQueue.size > 0 && !token.isCancellationRequested) { + + const [listener, data] = this._asyncDeliveryQueue.shift()!; + const thenables: Promise[] = []; + + const event = { + ...data, + waitUntil: (p: Promise): void => { + if (Object.isFrozen(thenables)) { + throw new Error('waitUntil can NOT be called asynchronous'); + } + if (promiseJoin) { + p = promiseJoin(p, typeof listener === 'function' ? listener : listener[0]); + } + thenables.push(p); + } + }; + + try { + if (typeof listener === 'function') { + listener.call(undefined, event); + } else { + listener[0].call(listener[1], event); + } + } catch (e) { + onUnexpectedError(e); + continue; + } + + // freeze thenables-collection to enforce sync-calls to + // wait until and then wait for all thenables to resolve + Object.freeze(thenables); + + await Promise.allSettled(thenables).then(values => { + for (const value of values) { + if (value.status === 'rejected') { + onUnexpectedError(value.reason); + } + } + }); + } + } +} + + export class PauseableEmitter extends Emitter { private _isPaused = 0; diff --git a/src/vs/base/test/common/event.test.ts b/src/vs/base/test/common/event.test.ts index 03600a91398..f1bd2d20d5a 100644 --- a/src/vs/base/test/common/event.test.ts +++ b/src/vs/base/test/common/event.test.ts @@ -3,10 +3,10 @@ * Licensed under the MIT License. See License.txt in the project root for license information. *--------------------------------------------------------------------------------------------*/ import * as assert from 'assert'; -import { Event, Emitter, EventBufferer, EventMultiplexer, PauseableEmitter, Relay, DebounceEmitter } from 'vs/base/common/event'; +import { Event, Emitter, AsyncEmitter, IWaitUntil, EventBufferer, EventMultiplexer, PauseableEmitter, Relay, DebounceEmitter } from 'vs/base/common/event'; import { IDisposable, DisposableStore } from 'vs/base/common/lifecycle'; import { errorHandler, setUnexpectedErrorHandler } from 'vs/base/common/errors'; -import { AsyncEmitter, IWaitUntil, timeout } from 'vs/base/common/async'; +import { timeout } from 'vs/base/common/async'; import { CancellationToken } from 'vs/base/common/cancellation'; namespace Samples { diff --git a/src/vs/workbench/api/common/extHostFileSystemEventService.ts b/src/vs/workbench/api/common/extHostFileSystemEventService.ts index 48ec2def290..78ffb23b51f 100644 --- a/src/vs/workbench/api/common/extHostFileSystemEventService.ts +++ b/src/vs/workbench/api/common/extHostFileSystemEventService.ts @@ -3,8 +3,7 @@ * Licensed under the MIT License. See License.txt in the project root for license information. *--------------------------------------------------------------------------------------------*/ -import { Emitter, Event } from 'vs/base/common/event'; -import { AsyncEmitter, IWaitUntil } from 'vs/base/common/async'; +import { Emitter, Event, AsyncEmitter, IWaitUntil } from 'vs/base/common/event'; import { IRelativePattern, parse } from 'vs/base/common/glob'; import { URI } from 'vs/base/common/uri'; import { ExtHostDocumentsAndEditors } from 'vs/workbench/api/common/extHostDocumentsAndEditors'; diff --git a/src/vs/workbench/services/workingCopy/common/workingCopyFileService.ts b/src/vs/workbench/services/workingCopy/common/workingCopyFileService.ts index a9cb9a7c9b3..b2f03285f43 100644 --- a/src/vs/workbench/services/workingCopy/common/workingCopyFileService.ts +++ b/src/vs/workbench/services/workingCopy/common/workingCopyFileService.ts @@ -5,8 +5,8 @@ import { createDecorator, IInstantiationService } from 'vs/platform/instantiation/common/instantiation'; import { registerSingleton } from 'vs/platform/instantiation/common/extensions'; -import { Event } from 'vs/base/common/event'; -import { AsyncEmitter, IWaitUntil, Promises } from 'vs/base/common/async'; +import { Event, AsyncEmitter, IWaitUntil } from 'vs/base/common/event'; +import { Promises } from 'vs/base/common/async'; import { insert } from 'vs/base/common/arrays'; import { URI } from 'vs/base/common/uri'; import { Disposable, IDisposable, toDisposable } from 'vs/base/common/lifecycle';