diff --git a/src/vs/workbench/api/browser/mainThreadTerminalService.ts b/src/vs/workbench/api/browser/mainThreadTerminalService.ts index cf16787d23b..f9dda5e5474 100644 --- a/src/vs/workbench/api/browser/mainThreadTerminalService.ts +++ b/src/vs/workbench/api/browser/mainThreadTerminalService.ts @@ -4,7 +4,7 @@ *--------------------------------------------------------------------------------------------*/ import { DisposableStore, Disposable, IDisposable } from 'vs/base/common/lifecycle'; -import { IShellLaunchConfig, ITerminalProcessExtHostProxy, ISpawnExtHostProcessRequest, ITerminalDimensions, IAvailableShellsRequest, IDefaultShellAndArgsRequest, IStartExtensionTerminalRequest } from 'vs/workbench/contrib/terminal/common/terminal'; +import { IShellLaunchConfig, ITerminalProcessExtHostProxy, ISpawnExtHostProcessRequest, ITerminalDimensions, IAvailableShellsRequest, IDefaultShellAndArgsRequest, IStartExtensionTerminalRequest, IProcessDataWithAckEvent } from 'vs/workbench/contrib/terminal/common/terminal'; import { ExtHostContext, ExtHostTerminalServiceShape, MainThreadTerminalServiceShape, MainContext, IExtHostContext, IShellLaunchConfigDto, TerminalLaunchConfig, ITerminalDimensionsDto, TerminalIdentifier } from 'vs/workbench/api/common/extHost.protocol'; import { extHostNamedCustomer } from 'vs/workbench/api/common/extHostCustomers'; import { URI } from 'vs/base/common/uri'; @@ -168,7 +168,8 @@ export class MainThreadTerminalService implements MainThreadTerminalServiceShape public $startSendingDataEvents(): void { if (!this._dataEventTracker) { this._dataEventTracker = this._instantiationService.createInstance(TerminalDataEventTracker, (id, data) => { - this._onTerminalData(id, data); + const d = typeof data === 'string' ? data : data.data; + this._onTerminalData(id, d); }); // Send initial events if they exist this._terminalService.terminalInstances.forEach(t => { @@ -272,6 +273,7 @@ export class MainThreadTerminalService implements MainThreadTerminalServiceShape request.isWorkspaceShellAllowed ).then(request.callback, request.callback); + proxy.onAcknowledgeDataEvent(ackId => this._proxy.$acceptProcessAckDataEvent(proxy.terminalId, ackId)); proxy.onInput(data => this._proxy.$acceptProcessInput(proxy.terminalId, data)); proxy.onResize(dimensions => this._proxy.$acceptProcessResize(proxy.terminalId, dimensions.cols, dimensions.rows)); proxy.onShutdown(immediate => this._proxy.$acceptProcessShutdown(proxy.terminalId, immediate)); @@ -310,7 +312,7 @@ export class MainThreadTerminalService implements MainThreadTerminalServiceShape } } - public $sendProcessData(terminalId: number, data: string): void { + public $sendProcessData(terminalId: number, data: string | IProcessDataWithAckEvent): void { const terminalProcess = this._terminalProcessProxies.get(terminalId); if (terminalProcess) { terminalProcess.emitData(data); @@ -423,7 +425,7 @@ class TerminalDataEventTracker extends Disposable { private readonly _bufferer: TerminalDataBufferer; constructor( - private readonly _callback: (id: number, data: string) => void, + private readonly _callback: (id: number, data: string | IProcessDataWithAckEvent) => void, @ITerminalService private readonly _terminalService: ITerminalService ) { super(); diff --git a/src/vs/workbench/api/common/extHost.protocol.ts b/src/vs/workbench/api/common/extHost.protocol.ts index 1da90521b7b..819a27528fb 100644 --- a/src/vs/workbench/api/common/extHost.protocol.ts +++ b/src/vs/workbench/api/common/extHost.protocol.ts @@ -41,7 +41,7 @@ import * as tasks from 'vs/workbench/api/common/shared/tasks'; import { IRevealOptions, ITreeItem } from 'vs/workbench/common/views'; import { IAdapterDescriptor, IConfig, IDebugSessionReplMode } from 'vs/workbench/contrib/debug/common/debug'; import { ITextQueryBuilderOptions } from 'vs/workbench/contrib/search/common/queryBuilder'; -import { ITerminalDimensions, IShellLaunchConfig, ITerminalLaunchError } from 'vs/workbench/contrib/terminal/common/terminal'; +import { ITerminalDimensions, IShellLaunchConfig, ITerminalLaunchError, IProcessDataWithAckEvent } from 'vs/workbench/contrib/terminal/common/terminal'; import { ActivationKind, ExtensionActivationError, ExtensionHostKind } from 'vs/workbench/services/extensions/common/extensions'; import { createExtHostContextProxyIdentifier as createExtId, createMainContextProxyIdentifier as createMainId, IRPCProtocol } from 'vs/workbench/services/extensions/common/proxyIdentifier'; import * as search from 'vs/workbench/services/search/common/search'; @@ -480,7 +480,7 @@ export interface MainThreadTerminalServiceShape extends IDisposable { // Process $sendProcessTitle(terminalId: number, title: string): void; - $sendProcessData(terminalId: number, data: string): void; + $sendProcessData(terminalId: number, data: string | IProcessDataWithAckEvent): void; $sendProcessReady(terminalId: number, pid: number, cwd: string): void; $sendProcessExit(terminalId: number, exitCode: number | undefined): void; $sendProcessInitialCwd(terminalId: number, cwd: string): void; @@ -1545,6 +1545,7 @@ export interface ExtHostTerminalServiceShape { $acceptTerminalMaximumDimensions(id: number, cols: number, rows: number): void; $spawnExtHostProcess(id: number, shellLaunchConfig: IShellLaunchConfigDto, activeWorkspaceRootUri: UriComponents | undefined, cols: number, rows: number, isWorkspaceShellAllowed: boolean): Promise; $startExtensionTerminal(id: number, initialDimensions: ITerminalDimensionsDto | undefined): Promise; + $acceptProcessAckDataEvent(id: number, ackId: number): void; $acceptProcessInput(id: number, data: string): void; $acceptProcessResize(id: number, cols: number, rows: number): void; $acceptProcessShutdown(id: number, immediate: boolean): void; diff --git a/src/vs/workbench/api/common/extHostTerminalService.ts b/src/vs/workbench/api/common/extHostTerminalService.ts index 00e09861cfb..8d96e657b59 100644 --- a/src/vs/workbench/api/common/extHostTerminalService.ts +++ b/src/vs/workbench/api/common/extHostTerminalService.ts @@ -220,6 +220,13 @@ export class ExtHostPseudoterminal implements ITerminalChildProcess { } } + acknowledgeDataEvent(ackId: number): void { + // TODO: Determine whether ExtHostPseudoterminal terminals should support flow control, this + // would need resume/pause APIs + + // No-op + } + getInitialCwd(): Promise { return Promise.resolve(''); } @@ -488,6 +495,10 @@ export abstract class BaseExtHostTerminalService extends Disposable implements I return disposables; } + public $acceptProcessAckDataEvent(id: number, ackId: number): void { + this._terminalProcesses.get(id)?.acknowledgeDataEvent(ackId); + } + public $acceptProcessInput(id: number, data: string): void { this._terminalProcesses.get(id)?.input(data); } diff --git a/src/vs/workbench/contrib/terminal/browser/remoteTerminalService.ts b/src/vs/workbench/contrib/terminal/browser/remoteTerminalService.ts index b2b5ec36d29..c7be47e5214 100644 --- a/src/vs/workbench/contrib/terminal/browser/remoteTerminalService.ts +++ b/src/vs/workbench/contrib/terminal/browser/remoteTerminalService.ts @@ -252,6 +252,10 @@ export class RemoteTerminalProcess extends Disposable implements ITerminalChildP }); } + public acknowledgeDataEvent(ackId: number): void { + // TODO: Support flow control for server spawned processes + } + public async getInitialCwd(): Promise { await this._startBarrier.wait(); return this._remoteTerminalChannel.getTerminalInitialCwd(this._remoteTerminalId); diff --git a/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts b/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts index 8ef61e3bbe0..ffca11edf46 100644 --- a/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts +++ b/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts @@ -1019,7 +1019,12 @@ export class TerminalInstance extends Disposable implements ITerminalInstance { this._xtermCore?.writeSync(ev.data); } else { const messageId = ++this._latestXtermWriteData; - this._xterm?.write(ev.data, () => this._latestXtermParseData = messageId); + this._xterm?.write(ev.data, () => { + this._latestXtermParseData = messageId; + if (ev.dataAckId !== undefined) { + this._processManager.acknowledgeDataEvent(ev.dataAckId); + } + }); } } diff --git a/src/vs/workbench/contrib/terminal/browser/terminalProcessExtHostProxy.ts b/src/vs/workbench/contrib/terminal/browser/terminalProcessExtHostProxy.ts index c7069f89ff0..9a713b7f16a 100644 --- a/src/vs/workbench/contrib/terminal/browser/terminalProcessExtHostProxy.ts +++ b/src/vs/workbench/contrib/terminal/browser/terminalProcessExtHostProxy.ts @@ -4,7 +4,7 @@ *--------------------------------------------------------------------------------------------*/ import { Event, Emitter } from 'vs/base/common/event'; -import { ITerminalProcessExtHostProxy, IShellLaunchConfig, ITerminalChildProcess, ITerminalConfigHelper, ITerminalDimensions, ITerminalLaunchError, ITerminalDimensionsOverride } from 'vs/workbench/contrib/terminal/common/terminal'; +import { ITerminalProcessExtHostProxy, IShellLaunchConfig, ITerminalChildProcess, ITerminalConfigHelper, ITerminalDimensions, ITerminalLaunchError, ITerminalDimensionsOverride, IProcessDataWithAckEvent } from 'vs/workbench/contrib/terminal/common/terminal'; import { Disposable } from 'vs/base/common/lifecycle'; import { URI } from 'vs/base/common/uri'; import { IRemoteAgentService } from 'vs/workbench/services/remote/common/remoteAgentService'; @@ -15,8 +15,8 @@ let hasReceivedResponseFromRemoteExtHost: boolean = false; export class TerminalProcessExtHostProxy extends Disposable implements ITerminalChildProcess, ITerminalProcessExtHostProxy { - private readonly _onProcessData = this._register(new Emitter()); - public readonly onProcessData: Event = this._onProcessData.event; + private readonly _onProcessData = this._register(new Emitter()); + public readonly onProcessData: Event = this._onProcessData.event; private readonly _onProcessExit = this._register(new Emitter()); public readonly onProcessExit: Event = this._onProcessExit.event; private readonly _onProcessReady = this._register(new Emitter<{ pid: number, cwd: string }>()); @@ -34,6 +34,8 @@ export class TerminalProcessExtHostProxy extends Disposable implements ITerminal public readonly onInput: Event = this._onInput.event; private readonly _onResize: Emitter<{ cols: number, rows: number }> = this._register(new Emitter<{ cols: number, rows: number }>()); public readonly onResize: Event<{ cols: number, rows: number }> = this._onResize.event; + private readonly _onAcknowledgeDataEvent = this._register(new Emitter()); + public readonly onAcknowledgeDataEvent: Event = this._onAcknowledgeDataEvent.event; private readonly _onShutdown = this._register(new Emitter()); public readonly onShutdown: Event = this._onShutdown.event; private readonly _onRequestInitialCwd = this._register(new Emitter()); @@ -60,7 +62,7 @@ export class TerminalProcessExtHostProxy extends Disposable implements ITerminal super(); } - public emitData(data: string): void { + public emitData(data: string | IProcessDataWithAckEvent): void { this._onProcessData.fire(data); } @@ -139,6 +141,10 @@ export class TerminalProcessExtHostProxy extends Disposable implements ITerminal this._onResize.fire({ cols, rows }); } + public acknowledgeDataEvent(ackId: number): void { + this._onAcknowledgeDataEvent.fire(ackId); + } + public getInitialCwd(): Promise { return new Promise(resolve => { this._onRequestInitialCwd.fire(); diff --git a/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts b/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts index 043a6e04af7..d4548a8ad2d 100644 --- a/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts +++ b/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts @@ -175,11 +175,12 @@ export class TerminalProcessManager extends Disposable implements ITerminalProce this._process.onProcessData(ev => { const data = (typeof ev === 'string' ? ev : ev.data); - const sync = (typeof ev === 'string' ? false : ev.sync); + const sync = (typeof ev === 'string' || 'ackId' in ev ? false : ev.sync); + const dataAckId = (typeof ev !== 'string' && 'ackId' in ev ? ev.ackId : undefined); const beforeProcessDataEvent: IBeforeProcessDataEvent = { data }; this._onBeforeProcessData.fire(beforeProcessDataEvent); if (beforeProcessDataEvent.data && beforeProcessDataEvent.data.length > 0) { - this._onProcessData.fire({ data: beforeProcessDataEvent.data, sync }); + this._onProcessData.fire({ data: beforeProcessDataEvent.data, sync, dataAckId }); } }); @@ -331,6 +332,10 @@ export class TerminalProcessManager extends Disposable implements ITerminalProce return Promise.resolve(this._latency); } + public acknowledgeDataEvent(ackId: number): void { + this._process?.acknowledgeDataEvent(ackId); + } + private _onExit(exitCode: number | undefined): void { this._process = null; diff --git a/src/vs/workbench/contrib/terminal/common/terminal.ts b/src/vs/workbench/contrib/terminal/common/terminal.ts index a3b3f9eff25..c45ce933b8b 100644 --- a/src/vs/workbench/contrib/terminal/common/terminal.ts +++ b/src/vs/workbench/contrib/terminal/common/terminal.ts @@ -354,6 +354,12 @@ export interface IBeforeProcessDataEvent { export interface IProcessDataEvent { data: string; sync: boolean; + dataAckId?: number; +} + +export interface IProcessDataWithAckEvent { + data: string; + ackId: number; } export interface ITerminalProcessManager extends IDisposable { @@ -379,6 +385,7 @@ export interface ITerminalProcessManager extends IDisposable { createProcess(shellLaunchConfig: IShellLaunchConfig, cols: number, rows: number, isScreenReaderModeEnabled: boolean): Promise; write(data: string): void; setDimensions(cols: number, rows: number): void; + acknowledgeDataEvent(ackId: number): void; getInitialCwd(): Promise; getCwd(): Promise; @@ -407,7 +414,7 @@ export const enum ProcessState { export interface ITerminalProcessExtHostProxy extends IDisposable { readonly terminalId: number; - emitData(data: string): void; + emitData(data: string | IProcessDataWithAckEvent): void; emitTitle(title: string): void; emitReady(pid: number, cwd: string): void; emitExit(exitCode: number | undefined): void; @@ -419,6 +426,7 @@ export interface ITerminalProcessExtHostProxy extends IDisposable { onInput: Event; onResize: Event<{ cols: number, rows: number }>; + onAcknowledgeDataEvent: Event; onShutdown: Event; onRequestInitialCwd: Event; onRequestCwd: Event; @@ -482,7 +490,7 @@ export interface ITerminalLaunchError { * child_process.ChildProcess node.js interface. */ export interface ITerminalChildProcess { - onProcessData: Event; + onProcessData: Event; onProcessExit: Event; onProcessReady: Event<{ pid: number, cwd: string }>; onProcessTitleChanged: Event; @@ -507,6 +515,13 @@ export interface ITerminalChildProcess { input(data: string): void; resize(cols: number, rows: number): void; + /** + * Acknowledge a data event has been parsed by the terminal, this is used to implement flow + * control to ensure remote processes to not get too far ahead of the client and flood the + * connection. + */ + acknowledgeDataEvent(ackId: number): void; + getInitialCwd(): Promise; getCwd(): Promise; getLatency(): Promise; diff --git a/src/vs/workbench/contrib/terminal/common/terminalDataBuffering.ts b/src/vs/workbench/contrib/terminal/common/terminalDataBuffering.ts index db8c7f57d9b..92a3edfa52b 100644 --- a/src/vs/workbench/contrib/terminal/common/terminalDataBuffering.ts +++ b/src/vs/workbench/contrib/terminal/common/terminalDataBuffering.ts @@ -5,17 +5,17 @@ import { Event } from 'vs/base/common/event'; import { IDisposable } from 'vs/base/common/lifecycle'; -import { IProcessDataEvent } from 'vs/workbench/contrib/terminal/common/terminal'; +import { IProcessDataEvent, IProcessDataWithAckEvent } from 'vs/workbench/contrib/terminal/common/terminal'; interface TerminalDataBuffer extends IDisposable { - data: string[]; + data: (string | IProcessDataWithAckEvent)[]; timeoutId: any; } export class TerminalDataBufferer implements IDisposable { private readonly _terminalBufferMap = new Map(); - constructor(private readonly _callback: (id: number, data: string) => void) { + constructor(private readonly _callback: (id: number, data: string | IProcessDataWithAckEvent) => void) { } dispose() { @@ -24,10 +24,10 @@ export class TerminalDataBufferer implements IDisposable { } } - startBuffering(id: number, event: Event, throttleBy: number = 5): IDisposable { + startBuffering(id: number, event: Event, throttleBy: number = 5): IDisposable { let disposable: IDisposable; - disposable = event((e: string | IProcessDataEvent) => { - const data = (typeof e === 'string' ? e : e.data); + disposable = event((e: string | IProcessDataEvent | IProcessDataWithAckEvent) => { + const data = (typeof e === 'string' || 'ackId' in e ? e : e.data); let buffer = this._terminalBufferMap.get(id); if (buffer) { buffer.data.push(data); @@ -61,7 +61,11 @@ export class TerminalDataBufferer implements IDisposable { const buffer = this._terminalBufferMap.get(id); if (buffer) { this._terminalBufferMap.delete(id); - this._callback(id, buffer.data.join('')); + // TODO: This should batch string events together still + // this._callback(id, buffer.data.join('')); + for (let b of buffer.data) { + this._callback(id, b); + } } } } diff --git a/src/vs/workbench/contrib/terminal/node/terminalProcess.ts b/src/vs/workbench/contrib/terminal/node/terminalProcess.ts index e0abbb31873..a88b2bf6198 100644 --- a/src/vs/workbench/contrib/terminal/node/terminalProcess.ts +++ b/src/vs/workbench/contrib/terminal/node/terminalProcess.ts @@ -11,7 +11,7 @@ import * as os from 'os'; import { Event, Emitter } from 'vs/base/common/event'; import { getWindowsBuildNumber } from 'vs/workbench/contrib/terminal/node/terminal'; import { Disposable } from 'vs/base/common/lifecycle'; -import { IShellLaunchConfig, ITerminalChildProcess, ITerminalLaunchError } from 'vs/workbench/contrib/terminal/common/terminal'; +import { IProcessDataWithAckEvent, IShellLaunchConfig, ITerminalChildProcess, ITerminalDimensionsOverride, ITerminalLaunchError } from 'vs/workbench/contrib/terminal/common/terminal'; import { exec } from 'child_process'; import { ILogService } from 'vs/platform/log/common/log'; import { stat } from 'vs/base/node/pfs'; @@ -38,13 +38,15 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess private _writeQueue: string[] = []; private _writeTimeout: NodeJS.Timeout | undefined; private _delayedResizer: DelayedResizer | undefined; + private _currentAckRequestId: number = 0; + private _ackedRequestId: number = 0; private readonly _initialCwd: string; private readonly _ptyOptions: pty.IPtyForkOptions | pty.IWindowsPtyForkOptions; public get exitMessage(): string | undefined { return this._exitMessage; } - private readonly _onProcessData = this._register(new Emitter()); - public get onProcessData(): Event { return this._onProcessData.event; } + private readonly _onProcessData = this._register(new Emitter()); + public get onProcessData(): Event { return this._onProcessData.event; } private readonly _onProcessExit = this._register(new Emitter()); public get onProcessExit(): Event { return this._onProcessExit.event; } private readonly _onProcessReady = this._register(new Emitter<{ pid: number, cwd: string }>()); @@ -98,6 +100,8 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess })); } } + onProcessOverrideDimensions?: Event | undefined; + onProcessResolvedShellLaunchConfig?: Event | undefined; public async start(): Promise { const results = await Promise.all([this._validateCwd(), this._validateExecutable()]); @@ -162,7 +166,15 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess this.onProcessReady(() => c()); }); ptyProcess.onData(data => { - this._onProcessData.fire(data); + // TODO: Periodically request ACK between low and high watermark + this._onProcessData.fire({ + data, + ackId: ++this._currentAckRequestId + }); + if (this._currentAckRequestId > this._ackedRequestId) { + // TODO: Expose as public API in node-pty + (ptyProcess as any).pause(); + } if (this._closeTimeout) { clearTimeout(this._closeTimeout); this._queueProcessExit(); @@ -324,6 +336,14 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess } } + public acknowledgeDataEvent(ackId: number): void { + this._ackedRequestId = ackId; + if (this._currentAckRequestId === this._ackedRequestId) { + // TODO: Expose as public API in node-pty + (this._ptyProcess as any).resume(); + } + } + public getInitialCwd(): Promise { return Promise.resolve(this._initialCwd); } diff --git a/src/vs/workbench/contrib/terminal/test/common/terminalDataBuffering.test.ts b/src/vs/workbench/contrib/terminal/test/common/terminalDataBuffering.test.ts index 6a2e3b93b21..9da073d3507 100644 --- a/src/vs/workbench/contrib/terminal/test/common/terminalDataBuffering.test.ts +++ b/src/vs/workbench/contrib/terminal/test/common/terminalDataBuffering.test.ts @@ -5,6 +5,7 @@ import * as assert from 'assert'; import { Emitter } from 'vs/base/common/event'; +import { IProcessDataWithAckEvent } from 'vs/workbench/contrib/terminal/common/terminal'; import { TerminalDataBufferer } from 'vs/workbench/contrib/terminal/common/terminalDataBuffering'; const wait = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); @@ -12,7 +13,8 @@ const wait = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); suite('Workbench - TerminalDataBufferer', () => { let bufferer: TerminalDataBufferer; let counter: { [id: number]: number }; - let data: { [id: number]: string }; + // TODO: Fix these tests + let data: { [id: number]: string | IProcessDataWithAckEvent }; setup(async () => { counter = {};