diff --git a/src/vs/base/parts/ipc/common/ipc.net.ts b/src/vs/base/parts/ipc/common/ipc.net.ts index 132654b320f..05af5d7ceb1 100644 --- a/src/vs/base/parts/ipc/common/ipc.net.ts +++ b/src/vs/base/parts/ipc/common/ipc.net.ts @@ -16,6 +16,7 @@ export interface ISocket extends IDisposable { onEnd(listener: () => void): IDisposable; write(buffer: VSBuffer): void; end(): void; + drain(): Promise; } let emptyBuffer: VSBuffer | null = null; @@ -277,6 +278,11 @@ class ProtocolWriter { this._isDisposed = true; } + public drain(): Promise { + this.flush(); + return this._socket.drain(); + } + public flush(): void { // flush this._writeNow(); @@ -372,6 +378,10 @@ export class Protocol extends Disposable implements IMessagePassingProtocol { this._register(this._socket.onClose(() => this._onClose.fire())); } + drain(): Promise { + return this._socketWriter.drain(); + } + getSocket(): ISocket { return this._socket; } @@ -619,6 +629,10 @@ export class PersistentProtocol implements IMessagePassingProtocol { this._socketDisposables = dispose(this._socketDisposables); } + drain(): Promise { + return this._socketWriter.drain(); + } + sendDisconnect(): void { const msg = new ProtocolMessage(ProtocolMessageType.Disconnect, 0, 0, getEmptyBuffer()); this._socketWriter.write(msg); diff --git a/src/vs/base/parts/ipc/common/ipc.ts b/src/vs/base/parts/ipc/common/ipc.ts index 411bfe46cca..1a583e185da 100644 --- a/src/vs/base/parts/ipc/common/ipc.ts +++ b/src/vs/base/parts/ipc/common/ipc.ts @@ -70,6 +70,10 @@ interface IHandler { export interface IMessagePassingProtocol { send(buffer: VSBuffer): void; onMessage: Event; + /** + * Wait for the write buffer (if applicable) to become empty. + */ + drain?(): Promise; } enum State { @@ -482,10 +486,7 @@ export class ChannelClient implements IChannelClient, IDisposable { return e(errors.canceled()); } - let uninitializedPromise: CancelablePromise | null = createCancelablePromise(_ => this.whenInitialized()); - uninitializedPromise.then(() => { - uninitializedPromise = null; - + const doRequest = () => { const handler: IHandler = response => { switch (response.type) { case ResponseType.PromiseSuccess: @@ -510,7 +511,18 @@ export class ChannelClient implements IChannelClient, IDisposable { this.handlers.set(id, handler); this.sendRequest(request); - }); + }; + + let uninitializedPromise: CancelablePromise | null = null; + if (this.state === State.Idle) { + doRequest(); + } else { + uninitializedPromise = createCancelablePromise(_ => this.whenInitialized()); + uninitializedPromise.then(() => { + uninitializedPromise = null; + doRequest(); + }); + } const cancel = () => { if (uninitializedPromise) { diff --git a/src/vs/base/parts/ipc/node/ipc.net.ts b/src/vs/base/parts/ipc/node/ipc.net.ts index afc72cf1658..2b6c70afa70 100644 --- a/src/vs/base/parts/ipc/node/ipc.net.ts +++ b/src/vs/base/parts/ipc/node/ipc.net.ts @@ -77,6 +77,28 @@ export class NodeSocket implements ISocket { public end(): void { this.socket.end(); } + + public drain(): Promise { + return new Promise((resolve, reject) => { + if (this.socket.bufferSize === 0) { + resolve(); + return; + } + const finished = () => { + this.socket.off('close', finished); + this.socket.off('end', finished); + this.socket.off('error', finished); + this.socket.off('timeout', finished); + this.socket.off('drain', finished); + resolve(); + }; + this.socket.on('close', finished); + this.socket.on('end', finished); + this.socket.on('error', finished); + this.socket.on('timeout', finished); + this.socket.on('drain', finished); + }); + } } const enum Constants { @@ -243,6 +265,10 @@ export class WebSocketNodeSocket extends Disposable implements ISocket { } } } + + public drain(): Promise { + return this.socket.drain(); + } } function unmask(buffer: VSBuffer, mask: number): void { diff --git a/src/vs/platform/remote/browser/browserSocketFactory.ts b/src/vs/platform/remote/browser/browserSocketFactory.ts index d0f6e6b18a6..3715cbb8e6e 100644 --- a/src/vs/platform/remote/browser/browserSocketFactory.ts +++ b/src/vs/platform/remote/browser/browserSocketFactory.ts @@ -194,6 +194,9 @@ class BrowserSocket implements ISocket { this.socket.close(); } + public drain(): Promise { + return Promise.resolve(); + } } diff --git a/src/vs/workbench/api/browser/mainThreadExtensionService.ts b/src/vs/workbench/api/browser/mainThreadExtensionService.ts index ced7611d1e4..e0b3986200d 100644 --- a/src/vs/workbench/api/browser/mainThreadExtensionService.ts +++ b/src/vs/workbench/api/browser/mainThreadExtensionService.ts @@ -128,7 +128,7 @@ export class MainThreadExtensionService implements MainThreadExtensionServiceSha } } - $onExtensionHostExit(code: number): void { + async $onExtensionHostExit(code: number): Promise { this._extensionService._onExtensionHostExit(code); } } diff --git a/src/vs/workbench/api/common/extHost.protocol.ts b/src/vs/workbench/api/common/extHost.protocol.ts index 29237b0e491..826a598dee0 100644 --- a/src/vs/workbench/api/common/extHost.protocol.ts +++ b/src/vs/workbench/api/common/extHost.protocol.ts @@ -795,7 +795,7 @@ export interface MainThreadExtensionServiceShape extends IDisposable { $onDidActivateExtension(extensionId: ExtensionIdentifier, codeLoadingTime: number, activateCallTime: number, activateResolvedTime: number, activationReason: ExtensionActivationReason): void; $onExtensionActivationError(extensionId: ExtensionIdentifier, error: ExtensionActivationError): Promise; $onExtensionRuntimeError(extensionId: ExtensionIdentifier, error: SerializedError): void; - $onExtensionHostExit(code: number): void; + $onExtensionHostExit(code: number): Promise; } export interface SCMProviderFeatures { diff --git a/src/vs/workbench/api/common/extHostExtensionService.ts b/src/vs/workbench/api/common/extHostExtensionService.ts index b9b1d92ca20..eaf57da1a75 100644 --- a/src/vs/workbench/api/common/extHostExtensionService.ts +++ b/src/vs/workbench/api/common/extHostExtensionService.ts @@ -557,7 +557,7 @@ export abstract class AbstractExtHostExtensionService extends Disposable impleme } // after tests have run, we shutdown the host - this._gracefulExit(error || (typeof failures === 'number' && failures > 0) ? 1 /* ERROR */ : 0 /* OK */); + this._testRunnerExit(error || (typeof failures === 'number' && failures > 0) ? 1 /* ERROR */ : 0 /* OK */); }; const runResult = testRunner!.run(extensionTestsPath, oldTestRunnerCallback); @@ -567,11 +567,11 @@ export abstract class AbstractExtHostExtensionService extends Disposable impleme runResult .then(() => { c(); - this._gracefulExit(0); + this._testRunnerExit(0); }) .catch((err: Error) => { e(err.toString()); - this._gracefulExit(1); + this._testRunnerExit(1); }); } }); @@ -579,24 +579,20 @@ export abstract class AbstractExtHostExtensionService extends Disposable impleme // Otherwise make sure to shutdown anyway even in case of an error else { - this._gracefulExit(1 /* ERROR */); + this._testRunnerExit(1 /* ERROR */); } return Promise.reject(new Error(requireError ? requireError.toString() : nls.localize('extensionTestError', "Path {0} does not point to a valid extension test runner.", extensionTestsPath))); } - private _gracefulExit(code: number): void { - // to give the PH process a chance to flush any outstanding console - // messages to the main process, we delay the exit() by some time - setTimeout(() => { - // If extension tests are running, give the exit code to the renderer - if (this._initData.remote.isRemote && !!this._initData.environment.extensionTestsLocationURI) { - this._mainThreadExtensionsProxy.$onExtensionHostExit(code); - return; - } - + private _testRunnerExit(code: number): void { + // wait at most 5000ms for the renderer to confirm our exit request and for the renderer socket to drain + // (this is to ensure all outstanding messages reach the renderer) + const exitPromise = this._mainThreadExtensionsProxy.$onExtensionHostExit(code); + const drainPromise = this._extHostContext.drain(); + Promise.race([Promise.all([exitPromise, drainPromise]), timeout(5000)]).then(() => { this._hostUtils.exit(code); - }, 500); + }); } private _startExtensionHost(): Promise { diff --git a/src/vs/workbench/api/common/extHostRpcService.ts b/src/vs/workbench/api/common/extHostRpcService.ts index 58237cf24bc..6582ef5fb3f 100644 --- a/src/vs/workbench/api/common/extHostRpcService.ts +++ b/src/vs/workbench/api/common/extHostRpcService.ts @@ -18,12 +18,12 @@ export class ExtHostRpcService implements IExtHostRpcService { readonly getProxy: (identifier: ProxyIdentifier) => T; readonly set: (identifier: ProxyIdentifier, instance: R) => R; readonly assertRegistered: (identifiers: ProxyIdentifier[]) => void; + readonly drain: () => Promise; constructor(rpcProtocol: IRPCProtocol) { this.getProxy = rpcProtocol.getProxy.bind(rpcProtocol); this.set = rpcProtocol.set.bind(rpcProtocol); this.assertRegistered = rpcProtocol.assertRegistered.bind(rpcProtocol); - + this.drain = rpcProtocol.drain.bind(rpcProtocol); } - } diff --git a/src/vs/workbench/services/extensions/common/extensionHostManager.ts b/src/vs/workbench/services/extensions/common/extensionHostManager.ts index 9d05e123c8c..0e26dd0f668 100644 --- a/src/vs/workbench/services/extensions/common/extensionHostManager.ts +++ b/src/vs/workbench/services/extensions/common/extensionHostManager.ts @@ -184,6 +184,7 @@ export class ExtensionHostManager extends Disposable { getProxy: (identifier: ProxyIdentifier): T => this._rpcProtocol!.getProxy(identifier), set: (identifier: ProxyIdentifier, instance: R): R => this._rpcProtocol!.set(identifier, instance), assertRegistered: (identifiers: ProxyIdentifier[]): void => this._rpcProtocol!.assertRegistered(identifiers), + drain: (): Promise => this._rpcProtocol!.drain(), }; // Named customers diff --git a/src/vs/workbench/services/extensions/common/proxyIdentifier.ts b/src/vs/workbench/services/extensions/common/proxyIdentifier.ts index e0e9999a62f..edadcae9eb2 100644 --- a/src/vs/workbench/services/extensions/common/proxyIdentifier.ts +++ b/src/vs/workbench/services/extensions/common/proxyIdentifier.ts @@ -18,6 +18,11 @@ export interface IRPCProtocol { * Assert these identifiers are already registered via `.set`. */ assertRegistered(identifiers: ProxyIdentifier[]): void; + + /** + * Wait for the write buffer (if applicable) to become empty. + */ + drain(): Promise; } export class ProxyIdentifier { diff --git a/src/vs/workbench/services/extensions/common/rpcProtocol.ts b/src/vs/workbench/services/extensions/common/rpcProtocol.ts index 022fd1f4c41..199ea6e15ef 100644 --- a/src/vs/workbench/services/extensions/common/rpcProtocol.ts +++ b/src/vs/workbench/services/extensions/common/rpcProtocol.ts @@ -115,6 +115,13 @@ export class RPCProtocol extends Disposable implements IRPCProtocol { }); } + public drain(): Promise { + if (typeof this._protocol.drain === 'function') { + return this._protocol.drain(); + } + return Promise.resolve(); + } + private _onWillSendRequest(req: number): void { if (this._unacknowledgedCount === 0) { // Since this is the first request we are sending in a while, diff --git a/src/vs/workbench/services/extensions/node/extensionHostProcessSetup.ts b/src/vs/workbench/services/extensions/node/extensionHostProcessSetup.ts index 05b5fd012d2..49542eda74c 100644 --- a/src/vs/workbench/services/extensions/node/extensionHostProcessSetup.ts +++ b/src/vs/workbench/services/extensions/node/extensionHostProcessSetup.ts @@ -96,10 +96,10 @@ let onTerminate = function () { nativeExit(); }; -function _createExtHostProtocol(): Promise { +function _createExtHostProtocol(): Promise { if (process.env.VSCODE_EXTHOST_WILL_SEND_SOCKET) { - return new Promise((resolve, reject) => { + return new Promise((resolve, reject) => { let protocol: PersistentProtocol | null = null; @@ -163,7 +163,7 @@ function _createExtHostProtocol(): Promise { const pipeName = process.env.VSCODE_IPC_HOOK_EXTHOST!; - return new Promise((resolve, reject) => { + return new Promise((resolve, reject) => { const socket = net.createConnection(pipeName, () => { socket.removeListener('error', reject); @@ -203,6 +203,10 @@ async function createExtHostProtocol(): Promise { protocol.send(msg); } } + + drain(): Promise { + return protocol.drain(); + } }; } diff --git a/src/vs/workbench/test/browser/api/extHostWorkspace.test.ts b/src/vs/workbench/test/browser/api/extHostWorkspace.test.ts index 6cbc2c7a930..7e9732b500a 100644 --- a/src/vs/workbench/test/browser/api/extHostWorkspace.test.ts +++ b/src/vs/workbench/test/browser/api/extHostWorkspace.test.ts @@ -298,7 +298,8 @@ suite('ExtHostWorkspace', function () { const protocol: IMainContext = { getProxy: () => { return undefined!; }, set: () => { return undefined!; }, - assertRegistered: () => { } + assertRegistered: () => { }, + drain: () => { return undefined!; }, }; const ws = createExtHostWorkspace(protocol, { id: 'foo', name: 'Test', folders: [] }, new NullLogService()); diff --git a/src/vs/workbench/test/browser/api/testRPCProtocol.ts b/src/vs/workbench/test/browser/api/testRPCProtocol.ts index 16673942bd6..d2d2b1c504f 100644 --- a/src/vs/workbench/test/browser/api/testRPCProtocol.ts +++ b/src/vs/workbench/test/browser/api/testRPCProtocol.ts @@ -19,7 +19,8 @@ export function SingleProxyRPCProtocol(thing: any): IExtHostContext & IExtHostRp set(identifier: ProxyIdentifier, value: R): R { return value; }, - assertRegistered: undefined! + assertRegistered: undefined!, + drain: undefined! }; } @@ -40,6 +41,10 @@ export class TestRPCProtocol implements IExtHostContext, IExtHostRpcService { this._proxies = Object.create(null); } + drain(): Promise { + return Promise.resolve(); + } + private get _callCount(): number { return this._callCountValue; }