diff --git a/extensions/vscode-test-resolver/src/extension.ts b/extensions/vscode-test-resolver/src/extension.ts index 46f95f14f1f..4178c2b823e 100644 --- a/extensions/vscode-test-resolver/src/extension.ts +++ b/extensions/vscode-test-resolver/src/extension.ts @@ -196,8 +196,8 @@ export function activate(context: vscode.ExtensionContext) { onDidReceiveMessage: dataEmitter.event, onDidClose: closeEmitter.event, onDidEnd: endEmitter.event, - dataHandler: d => remoteSocket.write(d), - endHandler: () => remoteSocket.end(), + send: d => remoteSocket.write(d), + end: () => remoteSocket.end(), }; }, connectionToken)); } diff --git a/src/vs/base/common/event.ts b/src/vs/base/common/event.ts index 81981c52906..24547947c56 100644 --- a/src/vs/base/common/event.ts +++ b/src/vs/base/common/event.ts @@ -1171,6 +1171,10 @@ export class PauseableEmitter extends Emitter { protected _eventQueue = new LinkedList(); private _mergeFn?: (input: T[]) => T; + public get isPaused(): boolean { + return this._isPaused !== 0; + } + constructor(options?: EmitterOptions & { merge?: (input: T[]) => T }) { super(options); this._mergeFn = options?.merge; diff --git a/src/vs/workbench/api/browser/mainThreadManagedSockets.ts b/src/vs/workbench/api/browser/mainThreadManagedSockets.ts index 4ad4c6d653f..dbd5c34ac3e 100644 --- a/src/vs/workbench/api/browser/mainThreadManagedSockets.ts +++ b/src/vs/workbench/api/browser/mainThreadManagedSockets.ts @@ -10,7 +10,7 @@ import { ManagedRemoteConnection, RemoteConnectionType } from 'vs/platform/remot import { VSBuffer } from 'vs/base/common/buffer'; import { IRemoteSocketFactoryService, ISocketFactory } from 'vs/platform/remote/common/remoteSocketFactoryService'; import { ISocket, SocketCloseEvent, SocketCloseEventType, SocketDiagnostics, SocketDiagnosticsEventType } from 'vs/base/parts/ipc/common/ipc.net'; -import { Emitter, Event } from 'vs/base/common/event'; +import { Emitter, Event, PauseableEmitter } from 'vs/base/common/event'; import { makeRawSocketHeaders, socketRawEndHeaderSequence } from 'vs/platform/remote/common/managedSocket'; @extHostNamedCustomer(MainContext.MainThreadManagedSockets) @@ -30,7 +30,7 @@ export class MainThreadManagedSockets extends Disposable implements MainThreadMa async $registerSocketFactory(socketFactoryId: number): Promise { const that = this; - const scoketFactory = new class implements ISocketFactory { + const socketFactory = new class implements ISocketFactory { supports(connectTo: ManagedRemoteConnection): boolean { return (connectTo.id === socketFactoryId); @@ -65,7 +65,7 @@ export class MainThreadManagedSockets extends Disposable implements MainThreadMa }); } }; - this._registrations.set(socketFactoryId, this._remoteSocketFactoryService.register(RemoteConnectionType.Managed, scoketFactory)); + this._registrations.set(socketFactoryId, this._remoteSocketFactoryService.register(RemoteConnectionType.Managed, socketFactory)); } @@ -91,7 +91,7 @@ export class MainThreadManagedSockets extends Disposable implements MainThreadMa } } -interface RemoteSocketHalf { +export interface RemoteSocketHalf { onData: Emitter; onClose: Emitter; onEnd: Emitter; @@ -103,11 +103,7 @@ export class ManagedSocket extends Disposable implements ISocket { proxy: ExtHostManagedSocketsShape, path: string, query: string, debugLabel: string, - half: { - onClose: Emitter; - onData: Emitter; - onEnd: Emitter; - } + half: RemoteSocketHalf ): Promise { const socket = new ManagedSocket(socketId, proxy, debugLabel, half.onClose, half.onData, half.onEnd); @@ -115,9 +111,28 @@ export class ManagedSocket extends Disposable implements ISocket { const d = new DisposableStore(); return new Promise((resolve, reject) => { + let dataSoFar: VSBuffer | undefined; d.add(socket.onData(d => { - if (d.indexOf(socketRawEndHeaderSequence) !== -1) { - resolve(socket); + if (!dataSoFar) { + dataSoFar = d; + } else { + dataSoFar = VSBuffer.concat([dataSoFar, d], dataSoFar.byteLength + d.byteLength); + } + + const index = dataSoFar.indexOf(socketRawEndHeaderSequence); + if (index === -1) { + return; + } + + resolve(socket); + // pause data events until the socket consumer is hooked up. We may + // immediately emit remaining data, but if not there may still be + // microtasks queued which would fire data into the abyss. + socket.pauseData(); + + const rest = dataSoFar.slice(index + socketRawEndHeaderSequence.byteLength); + if (rest.byteLength) { + half.onData.fire(rest); } })); @@ -126,7 +141,14 @@ export class ManagedSocket extends Disposable implements ISocket { }).finally(() => d.dispose()); } - public onData: Event; + private readonly pausableDataEmitter = this._register(new PauseableEmitter()); + + public onData: Event = (...args) => { + if (this.pausableDataEmitter.isPaused) { + queueMicrotask(() => this.pausableDataEmitter.resume()); + } + return this.pausableDataEmitter.event(...args); + }; public onClose: Event; public onEnd: Event; @@ -144,11 +166,19 @@ export class ManagedSocket extends Disposable implements ISocket { onEndEmitter: Emitter, ) { super(); + + this._register(onDataEmitter); + this._register(onDataEmitter.event(data => this.pausableDataEmitter.fire(data))); + this.onClose = this._register(onCloseEmitter).event; - this.onData = this._register(onDataEmitter).event; this.onEnd = this._register(onEndEmitter).event; } + /** Pauses data events until a new listener comes in onData() */ + pauseData() { + this.pausableDataEmitter.pause(); + } + write(buffer: VSBuffer): void { this.proxy.$remoteSocketWrite(this.socketId, buffer); } diff --git a/src/vs/workbench/api/common/extHostManagedSockets.ts b/src/vs/workbench/api/common/extHostManagedSockets.ts index ccf2c7be634..d2f028df19e 100644 --- a/src/vs/workbench/api/common/extHostManagedSockets.ts +++ b/src/vs/workbench/api/common/extHostManagedSockets.ts @@ -71,19 +71,19 @@ export class ExtHostManagedSockets implements IExtHostManagedSockets { } $remoteSocketWrite(socketId: number, buffer: VSBuffer): void { - this._managedRemoteSockets.get(socketId)?.actual.dataHandler(buffer.buffer); + this._managedRemoteSockets.get(socketId)?.actual.send(buffer.buffer); } $remoteSocketEnd(socketId: number): void { const socket = this._managedRemoteSockets.get(socketId); if (socket) { - socket.actual.endHandler(); + socket.actual.end(); socket.dispose(); } } - $remoteSocketDrain(socketId: number): Promise { - return this._managedRemoteSockets.get(socketId)?.actual.drainHandler?.() ?? Promise.resolve(); + async $remoteSocketDrain(socketId: number): Promise { + await this._managedRemoteSockets.get(socketId)?.actual.drain?.(); } } diff --git a/src/vs/workbench/api/test/browser/mainThreadManagedSockets.test.ts b/src/vs/workbench/api/test/browser/mainThreadManagedSockets.test.ts new file mode 100644 index 00000000000..611ec9727c0 --- /dev/null +++ b/src/vs/workbench/api/test/browser/mainThreadManagedSockets.test.ts @@ -0,0 +1,104 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +import * as assert from 'assert'; +import { disposableTimeout, timeout } from 'vs/base/common/async'; +import { VSBuffer } from 'vs/base/common/buffer'; +import { Emitter } from 'vs/base/common/event'; +import { DisposableStore } from 'vs/base/common/lifecycle'; +import { SocketCloseEvent } from 'vs/base/parts/ipc/common/ipc.net'; +import { mock } from 'vs/base/test/common/mock'; +import { ManagedSocket, RemoteSocketHalf } from 'vs/workbench/api/browser/mainThreadManagedSockets'; +import { ExtHostManagedSocketsShape } from 'vs/workbench/api/common/extHost.protocol'; + +suite('MainThreadManagedSockets', () => { + + suite('ManagedSocket', () => { + let extHost: ExtHostMock; + let half: RemoteSocketHalf; + + class ExtHostMock extends mock() { + private onDidFire = new Emitter(); + public readonly events: any[] = []; + + override $remoteSocketWrite(socketId: number, buffer: VSBuffer): void { + this.events.push({ socketId, data: buffer.toString() }); + this.onDidFire.fire(); + } + + override $remoteSocketDrain(socketId: number) { + this.events.push({ socketId, event: 'drain' }); + this.onDidFire.fire(); + return Promise.resolve(); + } + + override $remoteSocketEnd(socketId: number) { + this.events.push({ socketId, event: 'end' }); + this.onDidFire.fire(); + } + + expectEvent(test: (evt: any) => void, message: string) { + if (this.events.some(test)) { + return; + } + + const d = new DisposableStore(); + return new Promise(resolve => { + d.add(this.onDidFire.event(() => { + if (this.events.some(test)) { + return; + } + })); + d.add(disposableTimeout(() => { + throw new Error(`Expected ${message} but only had ${JSON.stringify(this.events, null, 2)}`); + }, 1000)); + }).finally(() => d.dispose()); + } + } + + setup(() => { + extHost = new ExtHostMock(); + half = { + onClose: new Emitter(), + onData: new Emitter(), + onEnd: new Emitter(), + }; + }); + + async function doConnect() { + const socket = ManagedSocket.connect(1, extHost, '/hello', 'world=true', '', half); + await extHost.expectEvent(evt => evt.data && evt.data.startsWith('GET ws://localhost/hello?world=true&skipWebSocketFrames=true HTTP/1.1\r\nConnection: Upgrade\r\nUpgrade: websocket\r\nSec-WebSocket-Key:'), 'websocket open event'); + half.onData.fire(VSBuffer.fromString('Opened successfully ;)\r\n\r\n')); + return await socket; + } + + test('connects', async () => { + await doConnect(); + }); + + test('includes trailing connection data', async () => { + const socketProm = ManagedSocket.connect(1, extHost, '/hello', 'world=true', '', half); + await extHost.expectEvent(evt => evt.data && evt.data.includes('GET ws://localhost'), 'websocket open event'); + half.onData.fire(VSBuffer.fromString('Opened successfully ;)\r\n\r\nSome trailing data')); + const socket = await socketProm; + + const data: string[] = []; + socket.onData(d => data.push(d.toString())); + await timeout(1); // allow microtasks to flush + assert.deepStrictEqual(data, ['Some trailing data']); + }); + + test('round trips data', async () => { + const socket = await doConnect(); + const data: string[] = []; + socket.onData(d => data.push(d.toString())); + + socket.write(VSBuffer.fromString('ping')); + await extHost.expectEvent(evt => evt.data === 'ping', 'expected ping'); + half.onData.fire(VSBuffer.fromString("pong")); + assert.deepStrictEqual(data, ['pong']); + }); + }); +}); diff --git a/src/vs/workbench/services/environment/electron-sandbox/environmentService.ts b/src/vs/workbench/services/environment/electron-sandbox/environmentService.ts index fec734ac73b..810a38b3866 100644 --- a/src/vs/workbench/services/environment/electron-sandbox/environmentService.ts +++ b/src/vs/workbench/services/environment/electron-sandbox/environmentService.ts @@ -63,7 +63,7 @@ export class NativeWorkbenchEnvironmentService extends AbstractNativeEnvironment get remoteAuthority() { return this.configuration.remoteAuthority; } @memoize - get expectsResolverExtension() { return !!this.configuration.remoteAuthority; } + get expectsResolverExtension() { return !!this.configuration.remoteAuthority?.includes('+'); } @memoize get execPath() { return this.configuration.execPath; } diff --git a/src/vs/workbench/services/remote/common/remoteExtensionsScanner.ts b/src/vs/workbench/services/remote/common/remoteExtensionsScanner.ts index a466cc1f3a1..418026d6a4f 100644 --- a/src/vs/workbench/services/remote/common/remoteExtensionsScanner.ts +++ b/src/vs/workbench/services/remote/common/remoteExtensionsScanner.ts @@ -7,7 +7,7 @@ import { IRemoteAgentService } from 'vs/workbench/services/remote/common/remoteA import { IRemoteExtensionsScannerService, RemoteExtensionsScannerChannelName } from 'vs/platform/remote/common/remoteExtensionsScanner'; import * as platform from 'vs/base/common/platform'; import { IChannel } from 'vs/base/parts/ipc/common/ipc'; -import { IExtensionDescription, IRelaxedExtensionDescription } from 'vs/platform/extensions/common/extensions'; +import { ExtensionIdentifier, IExtensionDescription, IRelaxedExtensionDescription } from 'vs/platform/extensions/common/extensions'; import { URI } from 'vs/base/common/uri'; import { IUserDataProfileService } from 'vs/workbench/services/userDataProfile/common/userDataProfile'; import { IRemoteUserDataProfilesService } from 'vs/workbench/services/userDataProfile/common/remoteUserDataProfiles'; @@ -45,6 +45,7 @@ class RemoteExtensionsScannerService implements IRemoteExtensionsScannerService const scannedExtensions = await channel.call('scanExtensions', [platform.language, profileLocation, this.environmentService.extensionDevelopmentLocationURI, languagePack]); scannedExtensions.forEach((extension) => { extension.extensionLocation = URI.revive(extension.extensionLocation); + extension.identifier = new ExtensionIdentifier(extension.identifier.value); }); return scannedExtensions; }, diff --git a/src/vscode-dts/vscode.proposed.resolvers.d.ts b/src/vscode-dts/vscode.proposed.resolvers.d.ts index 027e85a99da..d7529dd871f 100644 --- a/src/vscode-dts/vscode.proposed.resolvers.d.ts +++ b/src/vscode-dts/vscode.proposed.resolvers.d.ts @@ -31,9 +31,9 @@ declare module 'vscode' { onDidClose: Event; onDidEnd: Event; - dataHandler: (data: Uint8Array) => void; - endHandler: () => void; - drainHandler?: () => void; + send: (data: Uint8Array) => void; + end: () => void; + drain?: () => Thenable; } export class ManagedResolvedAuthority {