From 6430ee1efce26e2386fef52fd399cd2d2be90832 Mon Sep 17 00:00:00 2001 From: Daniel Imms Date: Tue, 12 Jan 2021 09:39:45 -0800 Subject: [PATCH 1/9] Basic flow control for ext host processes Part of #113827 --- .../api/browser/mainThreadTerminalService.ts | 10 ++++--- .../workbench/api/common/extHost.protocol.ts | 5 ++-- .../api/common/extHostTerminalService.ts | 11 ++++++++ .../terminal/browser/remoteTerminalService.ts | 4 +++ .../terminal/browser/terminalInstance.ts | 7 ++++- .../browser/terminalProcessExtHostProxy.ts | 14 +++++++--- .../browser/terminalProcessManager.ts | 9 ++++-- .../contrib/terminal/common/terminal.ts | 19 +++++++++++-- .../terminal/common/terminalDataBuffering.ts | 18 +++++++----- .../contrib/terminal/node/terminalProcess.ts | 28 ++++++++++++++++--- .../test/common/terminalDataBuffering.test.ts | 4 ++- 11 files changed, 102 insertions(+), 27 deletions(-) diff --git a/src/vs/workbench/api/browser/mainThreadTerminalService.ts b/src/vs/workbench/api/browser/mainThreadTerminalService.ts index cf16787d23b..f9dda5e5474 100644 --- a/src/vs/workbench/api/browser/mainThreadTerminalService.ts +++ b/src/vs/workbench/api/browser/mainThreadTerminalService.ts @@ -4,7 +4,7 @@ *--------------------------------------------------------------------------------------------*/ import { DisposableStore, Disposable, IDisposable } from 'vs/base/common/lifecycle'; -import { IShellLaunchConfig, ITerminalProcessExtHostProxy, ISpawnExtHostProcessRequest, ITerminalDimensions, IAvailableShellsRequest, IDefaultShellAndArgsRequest, IStartExtensionTerminalRequest } from 'vs/workbench/contrib/terminal/common/terminal'; +import { IShellLaunchConfig, ITerminalProcessExtHostProxy, ISpawnExtHostProcessRequest, ITerminalDimensions, IAvailableShellsRequest, IDefaultShellAndArgsRequest, IStartExtensionTerminalRequest, IProcessDataWithAckEvent } from 'vs/workbench/contrib/terminal/common/terminal'; import { ExtHostContext, ExtHostTerminalServiceShape, MainThreadTerminalServiceShape, MainContext, IExtHostContext, IShellLaunchConfigDto, TerminalLaunchConfig, ITerminalDimensionsDto, TerminalIdentifier } from 'vs/workbench/api/common/extHost.protocol'; import { extHostNamedCustomer } from 'vs/workbench/api/common/extHostCustomers'; import { URI } from 'vs/base/common/uri'; @@ -168,7 +168,8 @@ export class MainThreadTerminalService implements MainThreadTerminalServiceShape public $startSendingDataEvents(): void { if (!this._dataEventTracker) { this._dataEventTracker = this._instantiationService.createInstance(TerminalDataEventTracker, (id, data) => { - this._onTerminalData(id, data); + const d = typeof data === 'string' ? data : data.data; + this._onTerminalData(id, d); }); // Send initial events if they exist this._terminalService.terminalInstances.forEach(t => { @@ -272,6 +273,7 @@ export class MainThreadTerminalService implements MainThreadTerminalServiceShape request.isWorkspaceShellAllowed ).then(request.callback, request.callback); + proxy.onAcknowledgeDataEvent(ackId => this._proxy.$acceptProcessAckDataEvent(proxy.terminalId, ackId)); proxy.onInput(data => this._proxy.$acceptProcessInput(proxy.terminalId, data)); proxy.onResize(dimensions => this._proxy.$acceptProcessResize(proxy.terminalId, dimensions.cols, dimensions.rows)); proxy.onShutdown(immediate => this._proxy.$acceptProcessShutdown(proxy.terminalId, immediate)); @@ -310,7 +312,7 @@ export class MainThreadTerminalService implements MainThreadTerminalServiceShape } } - public $sendProcessData(terminalId: number, data: string): void { + public $sendProcessData(terminalId: number, data: string | IProcessDataWithAckEvent): void { const terminalProcess = this._terminalProcessProxies.get(terminalId); if (terminalProcess) { terminalProcess.emitData(data); @@ -423,7 +425,7 @@ class TerminalDataEventTracker extends Disposable { private readonly _bufferer: TerminalDataBufferer; constructor( - private readonly _callback: (id: number, data: string) => void, + private readonly _callback: (id: number, data: string | IProcessDataWithAckEvent) => void, @ITerminalService private readonly _terminalService: ITerminalService ) { super(); diff --git a/src/vs/workbench/api/common/extHost.protocol.ts b/src/vs/workbench/api/common/extHost.protocol.ts index 1da90521b7b..819a27528fb 100644 --- a/src/vs/workbench/api/common/extHost.protocol.ts +++ b/src/vs/workbench/api/common/extHost.protocol.ts @@ -41,7 +41,7 @@ import * as tasks from 'vs/workbench/api/common/shared/tasks'; import { IRevealOptions, ITreeItem } from 'vs/workbench/common/views'; import { IAdapterDescriptor, IConfig, IDebugSessionReplMode } from 'vs/workbench/contrib/debug/common/debug'; import { ITextQueryBuilderOptions } from 'vs/workbench/contrib/search/common/queryBuilder'; -import { ITerminalDimensions, IShellLaunchConfig, ITerminalLaunchError } from 'vs/workbench/contrib/terminal/common/terminal'; +import { ITerminalDimensions, IShellLaunchConfig, ITerminalLaunchError, IProcessDataWithAckEvent } from 'vs/workbench/contrib/terminal/common/terminal'; import { ActivationKind, ExtensionActivationError, ExtensionHostKind } from 'vs/workbench/services/extensions/common/extensions'; import { createExtHostContextProxyIdentifier as createExtId, createMainContextProxyIdentifier as createMainId, IRPCProtocol } from 'vs/workbench/services/extensions/common/proxyIdentifier'; import * as search from 'vs/workbench/services/search/common/search'; @@ -480,7 +480,7 @@ export interface MainThreadTerminalServiceShape extends IDisposable { // Process $sendProcessTitle(terminalId: number, title: string): void; - $sendProcessData(terminalId: number, data: string): void; + $sendProcessData(terminalId: number, data: string | IProcessDataWithAckEvent): void; $sendProcessReady(terminalId: number, pid: number, cwd: string): void; $sendProcessExit(terminalId: number, exitCode: number | undefined): void; $sendProcessInitialCwd(terminalId: number, cwd: string): void; @@ -1545,6 +1545,7 @@ export interface ExtHostTerminalServiceShape { $acceptTerminalMaximumDimensions(id: number, cols: number, rows: number): void; $spawnExtHostProcess(id: number, shellLaunchConfig: IShellLaunchConfigDto, activeWorkspaceRootUri: UriComponents | undefined, cols: number, rows: number, isWorkspaceShellAllowed: boolean): Promise; $startExtensionTerminal(id: number, initialDimensions: ITerminalDimensionsDto | undefined): Promise; + $acceptProcessAckDataEvent(id: number, ackId: number): void; $acceptProcessInput(id: number, data: string): void; $acceptProcessResize(id: number, cols: number, rows: number): void; $acceptProcessShutdown(id: number, immediate: boolean): void; diff --git a/src/vs/workbench/api/common/extHostTerminalService.ts b/src/vs/workbench/api/common/extHostTerminalService.ts index 00e09861cfb..8d96e657b59 100644 --- a/src/vs/workbench/api/common/extHostTerminalService.ts +++ b/src/vs/workbench/api/common/extHostTerminalService.ts @@ -220,6 +220,13 @@ export class ExtHostPseudoterminal implements ITerminalChildProcess { } } + acknowledgeDataEvent(ackId: number): void { + // TODO: Determine whether ExtHostPseudoterminal terminals should support flow control, this + // would need resume/pause APIs + + // No-op + } + getInitialCwd(): Promise { return Promise.resolve(''); } @@ -488,6 +495,10 @@ export abstract class BaseExtHostTerminalService extends Disposable implements I return disposables; } + public $acceptProcessAckDataEvent(id: number, ackId: number): void { + this._terminalProcesses.get(id)?.acknowledgeDataEvent(ackId); + } + public $acceptProcessInput(id: number, data: string): void { this._terminalProcesses.get(id)?.input(data); } diff --git a/src/vs/workbench/contrib/terminal/browser/remoteTerminalService.ts b/src/vs/workbench/contrib/terminal/browser/remoteTerminalService.ts index b2b5ec36d29..c7be47e5214 100644 --- a/src/vs/workbench/contrib/terminal/browser/remoteTerminalService.ts +++ b/src/vs/workbench/contrib/terminal/browser/remoteTerminalService.ts @@ -252,6 +252,10 @@ export class RemoteTerminalProcess extends Disposable implements ITerminalChildP }); } + public acknowledgeDataEvent(ackId: number): void { + // TODO: Support flow control for server spawned processes + } + public async getInitialCwd(): Promise { await this._startBarrier.wait(); return this._remoteTerminalChannel.getTerminalInitialCwd(this._remoteTerminalId); diff --git a/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts b/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts index 8ef61e3bbe0..ffca11edf46 100644 --- a/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts +++ b/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts @@ -1019,7 +1019,12 @@ export class TerminalInstance extends Disposable implements ITerminalInstance { this._xtermCore?.writeSync(ev.data); } else { const messageId = ++this._latestXtermWriteData; - this._xterm?.write(ev.data, () => this._latestXtermParseData = messageId); + this._xterm?.write(ev.data, () => { + this._latestXtermParseData = messageId; + if (ev.dataAckId !== undefined) { + this._processManager.acknowledgeDataEvent(ev.dataAckId); + } + }); } } diff --git a/src/vs/workbench/contrib/terminal/browser/terminalProcessExtHostProxy.ts b/src/vs/workbench/contrib/terminal/browser/terminalProcessExtHostProxy.ts index c7069f89ff0..9a713b7f16a 100644 --- a/src/vs/workbench/contrib/terminal/browser/terminalProcessExtHostProxy.ts +++ b/src/vs/workbench/contrib/terminal/browser/terminalProcessExtHostProxy.ts @@ -4,7 +4,7 @@ *--------------------------------------------------------------------------------------------*/ import { Event, Emitter } from 'vs/base/common/event'; -import { ITerminalProcessExtHostProxy, IShellLaunchConfig, ITerminalChildProcess, ITerminalConfigHelper, ITerminalDimensions, ITerminalLaunchError, ITerminalDimensionsOverride } from 'vs/workbench/contrib/terminal/common/terminal'; +import { ITerminalProcessExtHostProxy, IShellLaunchConfig, ITerminalChildProcess, ITerminalConfigHelper, ITerminalDimensions, ITerminalLaunchError, ITerminalDimensionsOverride, IProcessDataWithAckEvent } from 'vs/workbench/contrib/terminal/common/terminal'; import { Disposable } from 'vs/base/common/lifecycle'; import { URI } from 'vs/base/common/uri'; import { IRemoteAgentService } from 'vs/workbench/services/remote/common/remoteAgentService'; @@ -15,8 +15,8 @@ let hasReceivedResponseFromRemoteExtHost: boolean = false; export class TerminalProcessExtHostProxy extends Disposable implements ITerminalChildProcess, ITerminalProcessExtHostProxy { - private readonly _onProcessData = this._register(new Emitter()); - public readonly onProcessData: Event = this._onProcessData.event; + private readonly _onProcessData = this._register(new Emitter()); + public readonly onProcessData: Event = this._onProcessData.event; private readonly _onProcessExit = this._register(new Emitter()); public readonly onProcessExit: Event = this._onProcessExit.event; private readonly _onProcessReady = this._register(new Emitter<{ pid: number, cwd: string }>()); @@ -34,6 +34,8 @@ export class TerminalProcessExtHostProxy extends Disposable implements ITerminal public readonly onInput: Event = this._onInput.event; private readonly _onResize: Emitter<{ cols: number, rows: number }> = this._register(new Emitter<{ cols: number, rows: number }>()); public readonly onResize: Event<{ cols: number, rows: number }> = this._onResize.event; + private readonly _onAcknowledgeDataEvent = this._register(new Emitter()); + public readonly onAcknowledgeDataEvent: Event = this._onAcknowledgeDataEvent.event; private readonly _onShutdown = this._register(new Emitter()); public readonly onShutdown: Event = this._onShutdown.event; private readonly _onRequestInitialCwd = this._register(new Emitter()); @@ -60,7 +62,7 @@ export class TerminalProcessExtHostProxy extends Disposable implements ITerminal super(); } - public emitData(data: string): void { + public emitData(data: string | IProcessDataWithAckEvent): void { this._onProcessData.fire(data); } @@ -139,6 +141,10 @@ export class TerminalProcessExtHostProxy extends Disposable implements ITerminal this._onResize.fire({ cols, rows }); } + public acknowledgeDataEvent(ackId: number): void { + this._onAcknowledgeDataEvent.fire(ackId); + } + public getInitialCwd(): Promise { return new Promise(resolve => { this._onRequestInitialCwd.fire(); diff --git a/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts b/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts index 043a6e04af7..d4548a8ad2d 100644 --- a/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts +++ b/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts @@ -175,11 +175,12 @@ export class TerminalProcessManager extends Disposable implements ITerminalProce this._process.onProcessData(ev => { const data = (typeof ev === 'string' ? ev : ev.data); - const sync = (typeof ev === 'string' ? false : ev.sync); + const sync = (typeof ev === 'string' || 'ackId' in ev ? false : ev.sync); + const dataAckId = (typeof ev !== 'string' && 'ackId' in ev ? ev.ackId : undefined); const beforeProcessDataEvent: IBeforeProcessDataEvent = { data }; this._onBeforeProcessData.fire(beforeProcessDataEvent); if (beforeProcessDataEvent.data && beforeProcessDataEvent.data.length > 0) { - this._onProcessData.fire({ data: beforeProcessDataEvent.data, sync }); + this._onProcessData.fire({ data: beforeProcessDataEvent.data, sync, dataAckId }); } }); @@ -331,6 +332,10 @@ export class TerminalProcessManager extends Disposable implements ITerminalProce return Promise.resolve(this._latency); } + public acknowledgeDataEvent(ackId: number): void { + this._process?.acknowledgeDataEvent(ackId); + } + private _onExit(exitCode: number | undefined): void { this._process = null; diff --git a/src/vs/workbench/contrib/terminal/common/terminal.ts b/src/vs/workbench/contrib/terminal/common/terminal.ts index a3b3f9eff25..c45ce933b8b 100644 --- a/src/vs/workbench/contrib/terminal/common/terminal.ts +++ b/src/vs/workbench/contrib/terminal/common/terminal.ts @@ -354,6 +354,12 @@ export interface IBeforeProcessDataEvent { export interface IProcessDataEvent { data: string; sync: boolean; + dataAckId?: number; +} + +export interface IProcessDataWithAckEvent { + data: string; + ackId: number; } export interface ITerminalProcessManager extends IDisposable { @@ -379,6 +385,7 @@ export interface ITerminalProcessManager extends IDisposable { createProcess(shellLaunchConfig: IShellLaunchConfig, cols: number, rows: number, isScreenReaderModeEnabled: boolean): Promise; write(data: string): void; setDimensions(cols: number, rows: number): void; + acknowledgeDataEvent(ackId: number): void; getInitialCwd(): Promise; getCwd(): Promise; @@ -407,7 +414,7 @@ export const enum ProcessState { export interface ITerminalProcessExtHostProxy extends IDisposable { readonly terminalId: number; - emitData(data: string): void; + emitData(data: string | IProcessDataWithAckEvent): void; emitTitle(title: string): void; emitReady(pid: number, cwd: string): void; emitExit(exitCode: number | undefined): void; @@ -419,6 +426,7 @@ export interface ITerminalProcessExtHostProxy extends IDisposable { onInput: Event; onResize: Event<{ cols: number, rows: number }>; + onAcknowledgeDataEvent: Event; onShutdown: Event; onRequestInitialCwd: Event; onRequestCwd: Event; @@ -482,7 +490,7 @@ export interface ITerminalLaunchError { * child_process.ChildProcess node.js interface. */ export interface ITerminalChildProcess { - onProcessData: Event; + onProcessData: Event; onProcessExit: Event; onProcessReady: Event<{ pid: number, cwd: string }>; onProcessTitleChanged: Event; @@ -507,6 +515,13 @@ export interface ITerminalChildProcess { input(data: string): void; resize(cols: number, rows: number): void; + /** + * Acknowledge a data event has been parsed by the terminal, this is used to implement flow + * control to ensure remote processes to not get too far ahead of the client and flood the + * connection. + */ + acknowledgeDataEvent(ackId: number): void; + getInitialCwd(): Promise; getCwd(): Promise; getLatency(): Promise; diff --git a/src/vs/workbench/contrib/terminal/common/terminalDataBuffering.ts b/src/vs/workbench/contrib/terminal/common/terminalDataBuffering.ts index db8c7f57d9b..92a3edfa52b 100644 --- a/src/vs/workbench/contrib/terminal/common/terminalDataBuffering.ts +++ b/src/vs/workbench/contrib/terminal/common/terminalDataBuffering.ts @@ -5,17 +5,17 @@ import { Event } from 'vs/base/common/event'; import { IDisposable } from 'vs/base/common/lifecycle'; -import { IProcessDataEvent } from 'vs/workbench/contrib/terminal/common/terminal'; +import { IProcessDataEvent, IProcessDataWithAckEvent } from 'vs/workbench/contrib/terminal/common/terminal'; interface TerminalDataBuffer extends IDisposable { - data: string[]; + data: (string | IProcessDataWithAckEvent)[]; timeoutId: any; } export class TerminalDataBufferer implements IDisposable { private readonly _terminalBufferMap = new Map(); - constructor(private readonly _callback: (id: number, data: string) => void) { + constructor(private readonly _callback: (id: number, data: string | IProcessDataWithAckEvent) => void) { } dispose() { @@ -24,10 +24,10 @@ export class TerminalDataBufferer implements IDisposable { } } - startBuffering(id: number, event: Event, throttleBy: number = 5): IDisposable { + startBuffering(id: number, event: Event, throttleBy: number = 5): IDisposable { let disposable: IDisposable; - disposable = event((e: string | IProcessDataEvent) => { - const data = (typeof e === 'string' ? e : e.data); + disposable = event((e: string | IProcessDataEvent | IProcessDataWithAckEvent) => { + const data = (typeof e === 'string' || 'ackId' in e ? e : e.data); let buffer = this._terminalBufferMap.get(id); if (buffer) { buffer.data.push(data); @@ -61,7 +61,11 @@ export class TerminalDataBufferer implements IDisposable { const buffer = this._terminalBufferMap.get(id); if (buffer) { this._terminalBufferMap.delete(id); - this._callback(id, buffer.data.join('')); + // TODO: This should batch string events together still + // this._callback(id, buffer.data.join('')); + for (let b of buffer.data) { + this._callback(id, b); + } } } } diff --git a/src/vs/workbench/contrib/terminal/node/terminalProcess.ts b/src/vs/workbench/contrib/terminal/node/terminalProcess.ts index e0abbb31873..a88b2bf6198 100644 --- a/src/vs/workbench/contrib/terminal/node/terminalProcess.ts +++ b/src/vs/workbench/contrib/terminal/node/terminalProcess.ts @@ -11,7 +11,7 @@ import * as os from 'os'; import { Event, Emitter } from 'vs/base/common/event'; import { getWindowsBuildNumber } from 'vs/workbench/contrib/terminal/node/terminal'; import { Disposable } from 'vs/base/common/lifecycle'; -import { IShellLaunchConfig, ITerminalChildProcess, ITerminalLaunchError } from 'vs/workbench/contrib/terminal/common/terminal'; +import { IProcessDataWithAckEvent, IShellLaunchConfig, ITerminalChildProcess, ITerminalDimensionsOverride, ITerminalLaunchError } from 'vs/workbench/contrib/terminal/common/terminal'; import { exec } from 'child_process'; import { ILogService } from 'vs/platform/log/common/log'; import { stat } from 'vs/base/node/pfs'; @@ -38,13 +38,15 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess private _writeQueue: string[] = []; private _writeTimeout: NodeJS.Timeout | undefined; private _delayedResizer: DelayedResizer | undefined; + private _currentAckRequestId: number = 0; + private _ackedRequestId: number = 0; private readonly _initialCwd: string; private readonly _ptyOptions: pty.IPtyForkOptions | pty.IWindowsPtyForkOptions; public get exitMessage(): string | undefined { return this._exitMessage; } - private readonly _onProcessData = this._register(new Emitter()); - public get onProcessData(): Event { return this._onProcessData.event; } + private readonly _onProcessData = this._register(new Emitter()); + public get onProcessData(): Event { return this._onProcessData.event; } private readonly _onProcessExit = this._register(new Emitter()); public get onProcessExit(): Event { return this._onProcessExit.event; } private readonly _onProcessReady = this._register(new Emitter<{ pid: number, cwd: string }>()); @@ -98,6 +100,8 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess })); } } + onProcessOverrideDimensions?: Event | undefined; + onProcessResolvedShellLaunchConfig?: Event | undefined; public async start(): Promise { const results = await Promise.all([this._validateCwd(), this._validateExecutable()]); @@ -162,7 +166,15 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess this.onProcessReady(() => c()); }); ptyProcess.onData(data => { - this._onProcessData.fire(data); + // TODO: Periodically request ACK between low and high watermark + this._onProcessData.fire({ + data, + ackId: ++this._currentAckRequestId + }); + if (this._currentAckRequestId > this._ackedRequestId) { + // TODO: Expose as public API in node-pty + (ptyProcess as any).pause(); + } if (this._closeTimeout) { clearTimeout(this._closeTimeout); this._queueProcessExit(); @@ -324,6 +336,14 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess } } + public acknowledgeDataEvent(ackId: number): void { + this._ackedRequestId = ackId; + if (this._currentAckRequestId === this._ackedRequestId) { + // TODO: Expose as public API in node-pty + (this._ptyProcess as any).resume(); + } + } + public getInitialCwd(): Promise { return Promise.resolve(this._initialCwd); } diff --git a/src/vs/workbench/contrib/terminal/test/common/terminalDataBuffering.test.ts b/src/vs/workbench/contrib/terminal/test/common/terminalDataBuffering.test.ts index 6a2e3b93b21..9da073d3507 100644 --- a/src/vs/workbench/contrib/terminal/test/common/terminalDataBuffering.test.ts +++ b/src/vs/workbench/contrib/terminal/test/common/terminalDataBuffering.test.ts @@ -5,6 +5,7 @@ import * as assert from 'assert'; import { Emitter } from 'vs/base/common/event'; +import { IProcessDataWithAckEvent } from 'vs/workbench/contrib/terminal/common/terminal'; import { TerminalDataBufferer } from 'vs/workbench/contrib/terminal/common/terminalDataBuffering'; const wait = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); @@ -12,7 +13,8 @@ const wait = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); suite('Workbench - TerminalDataBufferer', () => { let bufferer: TerminalDataBufferer; let counter: { [id: number]: number }; - let data: { [id: number]: string }; + // TODO: Fix these tests + let data: { [id: number]: string | IProcessDataWithAckEvent }; setup(async () => { counter = {}; From 7e5c01208ddda07381bc507408b804065d81bf39 Mon Sep 17 00:00:00 2001 From: Daniel Imms Date: Tue, 12 Jan 2021 10:42:26 -0800 Subject: [PATCH 2/9] Start of low-high watermark flow control --- .../contrib/terminal/node/terminalProcess.ts | 37 +++++++++++++++---- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/src/vs/workbench/contrib/terminal/node/terminalProcess.ts b/src/vs/workbench/contrib/terminal/node/terminalProcess.ts index a88b2bf6198..ecc2d0a7e49 100644 --- a/src/vs/workbench/contrib/terminal/node/terminalProcess.ts +++ b/src/vs/workbench/contrib/terminal/node/terminalProcess.ts @@ -26,6 +26,24 @@ import { localize } from 'vs/nls'; const WRITE_MAX_CHUNK_SIZE = 50; const WRITE_INTERVAL_MS = 5; +const enum FlowControl { + /** + * The number of _unacknowledged_ bytes to have been sent before the pty is paused in order for + * the client to catch up. + */ + HighWatermarkBytes = 100000, + /** + * After flow control pauses the pty for the client the catch up, this is the number of + * _unacknowledged_ bytes to have been caught up to on the client before resuming the pty again. + * This is used to attempt to prevent pauses in the flowing data; ideally while the pty is + * paused the number of unacknowledged bytes would always be greater than 0 or the client will + * appear to stutter. In reality this balance is hard to accomplish though so heavy commands + * will likely pause as latency grows, not flooding the connection is the important thing as + * it's shared with other core functionality. + */ + LowWatermarkBytes = 10000 +} + export class TerminalProcess extends Disposable implements ITerminalChildProcess { private _exitCode: number | undefined; private _exitMessage: string | undefined; @@ -167,12 +185,17 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess }); ptyProcess.onData(data => { // TODO: Periodically request ACK between low and high watermark - this._onProcessData.fire({ - data, - ackId: ++this._currentAckRequestId - }); - if (this._currentAckRequestId > this._ackedRequestId) { + const fakeLatency = 1000; + this._currentAckRequestId += data.length; + const ackId = data.length; + setTimeout(() => { + this._onProcessData.fire({ data, ackId }); + }, fakeLatency); + // TODO: Use bytes, not messages + // console.log('check', this._currentAckRequestId - this._ackedRequestId, FlowControl.HighWatermarkBytes); + if (this._currentAckRequestId - this._ackedRequestId > FlowControl.HighWatermarkBytes) { // TODO: Expose as public API in node-pty + // console.log('pause', this._currentAckRequestId - this._ackedRequestId, '>', FlowControl.HighWatermarkBytes); (ptyProcess as any).pause(); } if (this._closeTimeout) { @@ -337,8 +360,8 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess } public acknowledgeDataEvent(ackId: number): void { - this._ackedRequestId = ackId; - if (this._currentAckRequestId === this._ackedRequestId) { + this._ackedRequestId += ackId; + if (this._currentAckRequestId - this._ackedRequestId < FlowControl.LowWatermarkBytes) { // TODO: Expose as public API in node-pty (this._ptyProcess as any).resume(); } From 7aee462b8a30802fb0f7083c0264dfa35da28311 Mon Sep 17 00:00:00 2001 From: Daniel Imms Date: Tue, 12 Jan 2021 11:23:10 -0800 Subject: [PATCH 3/9] Use char count instead of ack ids --- .../terminal/browser/terminalInstance.ts | 3 ++- .../contrib/terminal/node/terminalProcess.ts | 26 ++++++++++--------- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts b/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts index 448e4d7cba3..0eac513627b 100644 --- a/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts +++ b/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts @@ -1023,7 +1023,8 @@ export class TerminalInstance extends Disposable implements ITerminalInstance { this._xterm?.write(ev.data, () => { this._latestXtermParseData = messageId; if (ev.dataAckId !== undefined) { - this._processManager.acknowledgeDataEvent(ev.dataAckId); + // TODO: Send back the length of data instead + this._processManager.acknowledgeDataEvent(ev.data.length); } }); } diff --git a/src/vs/workbench/contrib/terminal/node/terminalProcess.ts b/src/vs/workbench/contrib/terminal/node/terminalProcess.ts index ecc2d0a7e49..5d52c2d506b 100644 --- a/src/vs/workbench/contrib/terminal/node/terminalProcess.ts +++ b/src/vs/workbench/contrib/terminal/node/terminalProcess.ts @@ -28,20 +28,20 @@ const WRITE_INTERVAL_MS = 5; const enum FlowControl { /** - * The number of _unacknowledged_ bytes to have been sent before the pty is paused in order for + * The number of _unacknowledged_ chars to have been sent before the pty is paused in order for * the client to catch up. */ - HighWatermarkBytes = 100000, + HighWatermarkChars = 100000, /** * After flow control pauses the pty for the client the catch up, this is the number of - * _unacknowledged_ bytes to have been caught up to on the client before resuming the pty again. + * _unacknowledged_ chars to have been caught up to on the client before resuming the pty again. * This is used to attempt to prevent pauses in the flowing data; ideally while the pty is - * paused the number of unacknowledged bytes would always be greater than 0 or the client will + * paused the number of unacknowledged chars would always be greater than 0 or the client will * appear to stutter. In reality this balance is hard to accomplish though so heavy commands * will likely pause as latency grows, not flooding the connection is the important thing as * it's shared with other core functionality. */ - LowWatermarkBytes = 10000 + LowWatermarkChars = 5000 } export class TerminalProcess extends Disposable implements ITerminalChildProcess { @@ -56,8 +56,8 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess private _writeQueue: string[] = []; private _writeTimeout: NodeJS.Timeout | undefined; private _delayedResizer: DelayedResizer | undefined; - private _currentAckRequestId: number = 0; - private _ackedRequestId: number = 0; + private _totalDataCharCount: number = 0; + private _acknowledgedDataCharCount: number = 0; private readonly _initialCwd: string; private readonly _ptyOptions: pty.IPtyForkOptions | pty.IWindowsPtyForkOptions; @@ -186,16 +186,18 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess ptyProcess.onData(data => { // TODO: Periodically request ACK between low and high watermark const fakeLatency = 1000; - this._currentAckRequestId += data.length; + // TODO: if we're just sending the data length over, ackId doesn't need to exist at all + // TODO: We don't need to ack everything, just count on the other side and ack every 1000/10000 bytes + this._totalDataCharCount += data.length; const ackId = data.length; setTimeout(() => { this._onProcessData.fire({ data, ackId }); }, fakeLatency); // TODO: Use bytes, not messages // console.log('check', this._currentAckRequestId - this._ackedRequestId, FlowControl.HighWatermarkBytes); - if (this._currentAckRequestId - this._ackedRequestId > FlowControl.HighWatermarkBytes) { + if (this._totalDataCharCount - this._acknowledgedDataCharCount > FlowControl.HighWatermarkChars) { // TODO: Expose as public API in node-pty - // console.log('pause', this._currentAckRequestId - this._ackedRequestId, '>', FlowControl.HighWatermarkBytes); + console.log('pause', this._totalDataCharCount - this._acknowledgedDataCharCount, '>', FlowControl.HighWatermarkChars); (ptyProcess as any).pause(); } if (this._closeTimeout) { @@ -360,8 +362,8 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess } public acknowledgeDataEvent(ackId: number): void { - this._ackedRequestId += ackId; - if (this._currentAckRequestId - this._ackedRequestId < FlowControl.LowWatermarkBytes) { + this._acknowledgedDataCharCount += ackId; + if (this._totalDataCharCount - this._acknowledgedDataCharCount < FlowControl.LowWatermarkChars) { // TODO: Expose as public API in node-pty (this._ptyProcess as any).resume(); } From bf52d50a0a356a6c3200c566da9132874db46683 Mon Sep 17 00:00:00 2001 From: Daniel Imms Date: Tue, 12 Jan 2021 11:27:41 -0800 Subject: [PATCH 4/9] Remove ackId from data events going to client --- .../api/browser/mainThreadTerminalService.ts | 9 ++++----- .../workbench/api/common/extHost.protocol.ts | 4 ++-- .../terminal/browser/terminalInstance.ts | 6 ++---- .../browser/terminalProcessExtHostProxy.ts | 8 ++++---- .../terminal/browser/terminalProcessManager.ts | 5 ++--- .../contrib/terminal/common/terminal.ts | 11 +++-------- .../terminal/common/terminalDataBuffering.ts | 18 +++++++----------- .../contrib/terminal/node/terminalProcess.ts | 9 ++++----- .../test/common/terminalDataBuffering.test.ts | 4 +--- 9 files changed, 29 insertions(+), 45 deletions(-) diff --git a/src/vs/workbench/api/browser/mainThreadTerminalService.ts b/src/vs/workbench/api/browser/mainThreadTerminalService.ts index f9dda5e5474..4c9ab2d4e06 100644 --- a/src/vs/workbench/api/browser/mainThreadTerminalService.ts +++ b/src/vs/workbench/api/browser/mainThreadTerminalService.ts @@ -4,7 +4,7 @@ *--------------------------------------------------------------------------------------------*/ import { DisposableStore, Disposable, IDisposable } from 'vs/base/common/lifecycle'; -import { IShellLaunchConfig, ITerminalProcessExtHostProxy, ISpawnExtHostProcessRequest, ITerminalDimensions, IAvailableShellsRequest, IDefaultShellAndArgsRequest, IStartExtensionTerminalRequest, IProcessDataWithAckEvent } from 'vs/workbench/contrib/terminal/common/terminal'; +import { IShellLaunchConfig, ITerminalProcessExtHostProxy, ISpawnExtHostProcessRequest, ITerminalDimensions, IAvailableShellsRequest, IDefaultShellAndArgsRequest, IStartExtensionTerminalRequest } from 'vs/workbench/contrib/terminal/common/terminal'; import { ExtHostContext, ExtHostTerminalServiceShape, MainThreadTerminalServiceShape, MainContext, IExtHostContext, IShellLaunchConfigDto, TerminalLaunchConfig, ITerminalDimensionsDto, TerminalIdentifier } from 'vs/workbench/api/common/extHost.protocol'; import { extHostNamedCustomer } from 'vs/workbench/api/common/extHostCustomers'; import { URI } from 'vs/base/common/uri'; @@ -168,8 +168,7 @@ export class MainThreadTerminalService implements MainThreadTerminalServiceShape public $startSendingDataEvents(): void { if (!this._dataEventTracker) { this._dataEventTracker = this._instantiationService.createInstance(TerminalDataEventTracker, (id, data) => { - const d = typeof data === 'string' ? data : data.data; - this._onTerminalData(id, d); + this._onTerminalData(id, data); }); // Send initial events if they exist this._terminalService.terminalInstances.forEach(t => { @@ -312,7 +311,7 @@ export class MainThreadTerminalService implements MainThreadTerminalServiceShape } } - public $sendProcessData(terminalId: number, data: string | IProcessDataWithAckEvent): void { + public $sendProcessData(terminalId: number, data: string): void { const terminalProcess = this._terminalProcessProxies.get(terminalId); if (terminalProcess) { terminalProcess.emitData(data); @@ -425,7 +424,7 @@ class TerminalDataEventTracker extends Disposable { private readonly _bufferer: TerminalDataBufferer; constructor( - private readonly _callback: (id: number, data: string | IProcessDataWithAckEvent) => void, + private readonly _callback: (id: number, data: string) => void, @ITerminalService private readonly _terminalService: ITerminalService ) { super(); diff --git a/src/vs/workbench/api/common/extHost.protocol.ts b/src/vs/workbench/api/common/extHost.protocol.ts index 819a27528fb..e151a129653 100644 --- a/src/vs/workbench/api/common/extHost.protocol.ts +++ b/src/vs/workbench/api/common/extHost.protocol.ts @@ -41,7 +41,7 @@ import * as tasks from 'vs/workbench/api/common/shared/tasks'; import { IRevealOptions, ITreeItem } from 'vs/workbench/common/views'; import { IAdapterDescriptor, IConfig, IDebugSessionReplMode } from 'vs/workbench/contrib/debug/common/debug'; import { ITextQueryBuilderOptions } from 'vs/workbench/contrib/search/common/queryBuilder'; -import { ITerminalDimensions, IShellLaunchConfig, ITerminalLaunchError, IProcessDataWithAckEvent } from 'vs/workbench/contrib/terminal/common/terminal'; +import { ITerminalDimensions, IShellLaunchConfig, ITerminalLaunchError } from 'vs/workbench/contrib/terminal/common/terminal'; import { ActivationKind, ExtensionActivationError, ExtensionHostKind } from 'vs/workbench/services/extensions/common/extensions'; import { createExtHostContextProxyIdentifier as createExtId, createMainContextProxyIdentifier as createMainId, IRPCProtocol } from 'vs/workbench/services/extensions/common/proxyIdentifier'; import * as search from 'vs/workbench/services/search/common/search'; @@ -480,7 +480,7 @@ export interface MainThreadTerminalServiceShape extends IDisposable { // Process $sendProcessTitle(terminalId: number, title: string): void; - $sendProcessData(terminalId: number, data: string | IProcessDataWithAckEvent): void; + $sendProcessData(terminalId: number, data: string): void; $sendProcessReady(terminalId: number, pid: number, cwd: string): void; $sendProcessExit(terminalId: number, exitCode: number | undefined): void; $sendProcessInitialCwd(terminalId: number, cwd: string): void; diff --git a/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts b/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts index 0eac513627b..780da6473b8 100644 --- a/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts +++ b/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts @@ -1022,10 +1022,8 @@ export class TerminalInstance extends Disposable implements ITerminalInstance { const messageId = ++this._latestXtermWriteData; this._xterm?.write(ev.data, () => { this._latestXtermParseData = messageId; - if (ev.dataAckId !== undefined) { - // TODO: Send back the length of data instead - this._processManager.acknowledgeDataEvent(ev.data.length); - } + // TODO: Disable for local processes? + this._processManager.acknowledgeDataEvent(ev.data.length); }); } } diff --git a/src/vs/workbench/contrib/terminal/browser/terminalProcessExtHostProxy.ts b/src/vs/workbench/contrib/terminal/browser/terminalProcessExtHostProxy.ts index 9a713b7f16a..40895f7e432 100644 --- a/src/vs/workbench/contrib/terminal/browser/terminalProcessExtHostProxy.ts +++ b/src/vs/workbench/contrib/terminal/browser/terminalProcessExtHostProxy.ts @@ -4,7 +4,7 @@ *--------------------------------------------------------------------------------------------*/ import { Event, Emitter } from 'vs/base/common/event'; -import { ITerminalProcessExtHostProxy, IShellLaunchConfig, ITerminalChildProcess, ITerminalConfigHelper, ITerminalDimensions, ITerminalLaunchError, ITerminalDimensionsOverride, IProcessDataWithAckEvent } from 'vs/workbench/contrib/terminal/common/terminal'; +import { ITerminalProcessExtHostProxy, IShellLaunchConfig, ITerminalChildProcess, ITerminalConfigHelper, ITerminalDimensions, ITerminalLaunchError, ITerminalDimensionsOverride } from 'vs/workbench/contrib/terminal/common/terminal'; import { Disposable } from 'vs/base/common/lifecycle'; import { URI } from 'vs/base/common/uri'; import { IRemoteAgentService } from 'vs/workbench/services/remote/common/remoteAgentService'; @@ -15,8 +15,8 @@ let hasReceivedResponseFromRemoteExtHost: boolean = false; export class TerminalProcessExtHostProxy extends Disposable implements ITerminalChildProcess, ITerminalProcessExtHostProxy { - private readonly _onProcessData = this._register(new Emitter()); - public readonly onProcessData: Event = this._onProcessData.event; + private readonly _onProcessData = this._register(new Emitter()); + public readonly onProcessData: Event = this._onProcessData.event; private readonly _onProcessExit = this._register(new Emitter()); public readonly onProcessExit: Event = this._onProcessExit.event; private readonly _onProcessReady = this._register(new Emitter<{ pid: number, cwd: string }>()); @@ -62,7 +62,7 @@ export class TerminalProcessExtHostProxy extends Disposable implements ITerminal super(); } - public emitData(data: string | IProcessDataWithAckEvent): void { + public emitData(data: string): void { this._onProcessData.fire(data); } diff --git a/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts b/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts index d4548a8ad2d..0368eef6b69 100644 --- a/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts +++ b/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts @@ -175,12 +175,11 @@ export class TerminalProcessManager extends Disposable implements ITerminalProce this._process.onProcessData(ev => { const data = (typeof ev === 'string' ? ev : ev.data); - const sync = (typeof ev === 'string' || 'ackId' in ev ? false : ev.sync); - const dataAckId = (typeof ev !== 'string' && 'ackId' in ev ? ev.ackId : undefined); + const sync = (typeof ev === 'string' ? false : ev.sync); const beforeProcessDataEvent: IBeforeProcessDataEvent = { data }; this._onBeforeProcessData.fire(beforeProcessDataEvent); if (beforeProcessDataEvent.data && beforeProcessDataEvent.data.length > 0) { - this._onProcessData.fire({ data: beforeProcessDataEvent.data, sync, dataAckId }); + this._onProcessData.fire({ data: beforeProcessDataEvent.data, sync }); } }); diff --git a/src/vs/workbench/contrib/terminal/common/terminal.ts b/src/vs/workbench/contrib/terminal/common/terminal.ts index c45ce933b8b..b46c5367028 100644 --- a/src/vs/workbench/contrib/terminal/common/terminal.ts +++ b/src/vs/workbench/contrib/terminal/common/terminal.ts @@ -354,12 +354,6 @@ export interface IBeforeProcessDataEvent { export interface IProcessDataEvent { data: string; sync: boolean; - dataAckId?: number; -} - -export interface IProcessDataWithAckEvent { - data: string; - ackId: number; } export interface ITerminalProcessManager extends IDisposable { @@ -385,6 +379,7 @@ export interface ITerminalProcessManager extends IDisposable { createProcess(shellLaunchConfig: IShellLaunchConfig, cols: number, rows: number, isScreenReaderModeEnabled: boolean): Promise; write(data: string): void; setDimensions(cols: number, rows: number): void; + // TODO: Rename to charCount acknowledgeDataEvent(ackId: number): void; getInitialCwd(): Promise; @@ -414,7 +409,7 @@ export const enum ProcessState { export interface ITerminalProcessExtHostProxy extends IDisposable { readonly terminalId: number; - emitData(data: string | IProcessDataWithAckEvent): void; + emitData(data: string): void; emitTitle(title: string): void; emitReady(pid: number, cwd: string): void; emitExit(exitCode: number | undefined): void; @@ -490,7 +485,7 @@ export interface ITerminalLaunchError { * child_process.ChildProcess node.js interface. */ export interface ITerminalChildProcess { - onProcessData: Event; + onProcessData: Event; onProcessExit: Event; onProcessReady: Event<{ pid: number, cwd: string }>; onProcessTitleChanged: Event; diff --git a/src/vs/workbench/contrib/terminal/common/terminalDataBuffering.ts b/src/vs/workbench/contrib/terminal/common/terminalDataBuffering.ts index 92a3edfa52b..db8c7f57d9b 100644 --- a/src/vs/workbench/contrib/terminal/common/terminalDataBuffering.ts +++ b/src/vs/workbench/contrib/terminal/common/terminalDataBuffering.ts @@ -5,17 +5,17 @@ import { Event } from 'vs/base/common/event'; import { IDisposable } from 'vs/base/common/lifecycle'; -import { IProcessDataEvent, IProcessDataWithAckEvent } from 'vs/workbench/contrib/terminal/common/terminal'; +import { IProcessDataEvent } from 'vs/workbench/contrib/terminal/common/terminal'; interface TerminalDataBuffer extends IDisposable { - data: (string | IProcessDataWithAckEvent)[]; + data: string[]; timeoutId: any; } export class TerminalDataBufferer implements IDisposable { private readonly _terminalBufferMap = new Map(); - constructor(private readonly _callback: (id: number, data: string | IProcessDataWithAckEvent) => void) { + constructor(private readonly _callback: (id: number, data: string) => void) { } dispose() { @@ -24,10 +24,10 @@ export class TerminalDataBufferer implements IDisposable { } } - startBuffering(id: number, event: Event, throttleBy: number = 5): IDisposable { + startBuffering(id: number, event: Event, throttleBy: number = 5): IDisposable { let disposable: IDisposable; - disposable = event((e: string | IProcessDataEvent | IProcessDataWithAckEvent) => { - const data = (typeof e === 'string' || 'ackId' in e ? e : e.data); + disposable = event((e: string | IProcessDataEvent) => { + const data = (typeof e === 'string' ? e : e.data); let buffer = this._terminalBufferMap.get(id); if (buffer) { buffer.data.push(data); @@ -61,11 +61,7 @@ export class TerminalDataBufferer implements IDisposable { const buffer = this._terminalBufferMap.get(id); if (buffer) { this._terminalBufferMap.delete(id); - // TODO: This should batch string events together still - // this._callback(id, buffer.data.join('')); - for (let b of buffer.data) { - this._callback(id, b); - } + this._callback(id, buffer.data.join('')); } } } diff --git a/src/vs/workbench/contrib/terminal/node/terminalProcess.ts b/src/vs/workbench/contrib/terminal/node/terminalProcess.ts index 5d52c2d506b..4eec770e6c8 100644 --- a/src/vs/workbench/contrib/terminal/node/terminalProcess.ts +++ b/src/vs/workbench/contrib/terminal/node/terminalProcess.ts @@ -11,7 +11,7 @@ import * as os from 'os'; import { Event, Emitter } from 'vs/base/common/event'; import { getWindowsBuildNumber } from 'vs/workbench/contrib/terminal/node/terminal'; import { Disposable } from 'vs/base/common/lifecycle'; -import { IProcessDataWithAckEvent, IShellLaunchConfig, ITerminalChildProcess, ITerminalDimensionsOverride, ITerminalLaunchError } from 'vs/workbench/contrib/terminal/common/terminal'; +import { IShellLaunchConfig, ITerminalChildProcess, ITerminalDimensionsOverride, ITerminalLaunchError } from 'vs/workbench/contrib/terminal/common/terminal'; import { exec } from 'child_process'; import { ILogService } from 'vs/platform/log/common/log'; import { stat } from 'vs/base/node/pfs'; @@ -63,8 +63,8 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess public get exitMessage(): string | undefined { return this._exitMessage; } - private readonly _onProcessData = this._register(new Emitter()); - public get onProcessData(): Event { return this._onProcessData.event; } + private readonly _onProcessData = this._register(new Emitter()); + public get onProcessData(): Event { return this._onProcessData.event; } private readonly _onProcessExit = this._register(new Emitter()); public get onProcessExit(): Event { return this._onProcessExit.event; } private readonly _onProcessReady = this._register(new Emitter<{ pid: number, cwd: string }>()); @@ -189,9 +189,8 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess // TODO: if we're just sending the data length over, ackId doesn't need to exist at all // TODO: We don't need to ack everything, just count on the other side and ack every 1000/10000 bytes this._totalDataCharCount += data.length; - const ackId = data.length; setTimeout(() => { - this._onProcessData.fire({ data, ackId }); + this._onProcessData.fire(data); }, fakeLatency); // TODO: Use bytes, not messages // console.log('check', this._currentAckRequestId - this._ackedRequestId, FlowControl.HighWatermarkBytes); diff --git a/src/vs/workbench/contrib/terminal/test/common/terminalDataBuffering.test.ts b/src/vs/workbench/contrib/terminal/test/common/terminalDataBuffering.test.ts index 9da073d3507..6a2e3b93b21 100644 --- a/src/vs/workbench/contrib/terminal/test/common/terminalDataBuffering.test.ts +++ b/src/vs/workbench/contrib/terminal/test/common/terminalDataBuffering.test.ts @@ -5,7 +5,6 @@ import * as assert from 'assert'; import { Emitter } from 'vs/base/common/event'; -import { IProcessDataWithAckEvent } from 'vs/workbench/contrib/terminal/common/terminal'; import { TerminalDataBufferer } from 'vs/workbench/contrib/terminal/common/terminalDataBuffering'; const wait = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); @@ -13,8 +12,7 @@ const wait = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); suite('Workbench - TerminalDataBufferer', () => { let bufferer: TerminalDataBufferer; let counter: { [id: number]: number }; - // TODO: Fix these tests - let data: { [id: number]: string | IProcessDataWithAckEvent }; + let data: { [id: number]: string }; setup(async () => { counter = {}; From 0a19f7702a9c9205cdab214860fa97e97dc7c4ea Mon Sep 17 00:00:00 2001 From: Daniel Imms Date: Tue, 12 Jan 2021 11:36:21 -0800 Subject: [PATCH 5/9] Rename ackId to charCount --- src/vs/workbench/api/browser/mainThreadTerminalService.ts | 2 +- src/vs/workbench/api/common/extHost.protocol.ts | 2 +- src/vs/workbench/api/common/extHostTerminalService.ts | 6 +++--- .../contrib/terminal/browser/remoteTerminalService.ts | 2 +- .../terminal/browser/terminalProcessExtHostProxy.ts | 4 ++-- .../contrib/terminal/browser/terminalProcessManager.ts | 5 +++-- src/vs/workbench/contrib/terminal/common/terminal.ts | 6 +++--- src/vs/workbench/contrib/terminal/node/terminalProcess.ts | 8 +++----- 8 files changed, 17 insertions(+), 18 deletions(-) diff --git a/src/vs/workbench/api/browser/mainThreadTerminalService.ts b/src/vs/workbench/api/browser/mainThreadTerminalService.ts index 4c9ab2d4e06..1888982a193 100644 --- a/src/vs/workbench/api/browser/mainThreadTerminalService.ts +++ b/src/vs/workbench/api/browser/mainThreadTerminalService.ts @@ -272,7 +272,7 @@ export class MainThreadTerminalService implements MainThreadTerminalServiceShape request.isWorkspaceShellAllowed ).then(request.callback, request.callback); - proxy.onAcknowledgeDataEvent(ackId => this._proxy.$acceptProcessAckDataEvent(proxy.terminalId, ackId)); + proxy.onAcknowledgeDataEvent(charCount => this._proxy.$acceptProcessAckDataEvent(proxy.terminalId, charCount)); proxy.onInput(data => this._proxy.$acceptProcessInput(proxy.terminalId, data)); proxy.onResize(dimensions => this._proxy.$acceptProcessResize(proxy.terminalId, dimensions.cols, dimensions.rows)); proxy.onShutdown(immediate => this._proxy.$acceptProcessShutdown(proxy.terminalId, immediate)); diff --git a/src/vs/workbench/api/common/extHost.protocol.ts b/src/vs/workbench/api/common/extHost.protocol.ts index e151a129653..d3a5b973021 100644 --- a/src/vs/workbench/api/common/extHost.protocol.ts +++ b/src/vs/workbench/api/common/extHost.protocol.ts @@ -1545,7 +1545,7 @@ export interface ExtHostTerminalServiceShape { $acceptTerminalMaximumDimensions(id: number, cols: number, rows: number): void; $spawnExtHostProcess(id: number, shellLaunchConfig: IShellLaunchConfigDto, activeWorkspaceRootUri: UriComponents | undefined, cols: number, rows: number, isWorkspaceShellAllowed: boolean): Promise; $startExtensionTerminal(id: number, initialDimensions: ITerminalDimensionsDto | undefined): Promise; - $acceptProcessAckDataEvent(id: number, ackId: number): void; + $acceptProcessAckDataEvent(id: number, charCount: number): void; $acceptProcessInput(id: number, data: string): void; $acceptProcessResize(id: number, cols: number, rows: number): void; $acceptProcessShutdown(id: number, immediate: boolean): void; diff --git a/src/vs/workbench/api/common/extHostTerminalService.ts b/src/vs/workbench/api/common/extHostTerminalService.ts index 8d96e657b59..597b304f4fc 100644 --- a/src/vs/workbench/api/common/extHostTerminalService.ts +++ b/src/vs/workbench/api/common/extHostTerminalService.ts @@ -220,7 +220,7 @@ export class ExtHostPseudoterminal implements ITerminalChildProcess { } } - acknowledgeDataEvent(ackId: number): void { + acknowledgeDataEvent(charCount: number): void { // TODO: Determine whether ExtHostPseudoterminal terminals should support flow control, this // would need resume/pause APIs @@ -495,8 +495,8 @@ export abstract class BaseExtHostTerminalService extends Disposable implements I return disposables; } - public $acceptProcessAckDataEvent(id: number, ackId: number): void { - this._terminalProcesses.get(id)?.acknowledgeDataEvent(ackId); + public $acceptProcessAckDataEvent(id: number, charCount: number): void { + this._terminalProcesses.get(id)?.acknowledgeDataEvent(charCount); } public $acceptProcessInput(id: number, data: string): void { diff --git a/src/vs/workbench/contrib/terminal/browser/remoteTerminalService.ts b/src/vs/workbench/contrib/terminal/browser/remoteTerminalService.ts index c7be47e5214..872e9a36fbf 100644 --- a/src/vs/workbench/contrib/terminal/browser/remoteTerminalService.ts +++ b/src/vs/workbench/contrib/terminal/browser/remoteTerminalService.ts @@ -252,7 +252,7 @@ export class RemoteTerminalProcess extends Disposable implements ITerminalChildP }); } - public acknowledgeDataEvent(ackId: number): void { + public acknowledgeDataEvent(charCount: number): void { // TODO: Support flow control for server spawned processes } diff --git a/src/vs/workbench/contrib/terminal/browser/terminalProcessExtHostProxy.ts b/src/vs/workbench/contrib/terminal/browser/terminalProcessExtHostProxy.ts index 40895f7e432..0be5cf88be5 100644 --- a/src/vs/workbench/contrib/terminal/browser/terminalProcessExtHostProxy.ts +++ b/src/vs/workbench/contrib/terminal/browser/terminalProcessExtHostProxy.ts @@ -141,8 +141,8 @@ export class TerminalProcessExtHostProxy extends Disposable implements ITerminal this._onResize.fire({ cols, rows }); } - public acknowledgeDataEvent(ackId: number): void { - this._onAcknowledgeDataEvent.fire(ackId); + public acknowledgeDataEvent(charCount: number): void { + this._onAcknowledgeDataEvent.fire(charCount); } public getInitialCwd(): Promise { diff --git a/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts b/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts index 0368eef6b69..bec1293ce6e 100644 --- a/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts +++ b/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts @@ -331,8 +331,9 @@ export class TerminalProcessManager extends Disposable implements ITerminalProce return Promise.resolve(this._latency); } - public acknowledgeDataEvent(ackId: number): void { - this._process?.acknowledgeDataEvent(ackId); + public acknowledgeDataEvent(charCount: number): void { + // TODO: Batch these acknowledge calls (in proxy/remote connection?) + this._process?.acknowledgeDataEvent(charCount); } private _onExit(exitCode: number | undefined): void { diff --git a/src/vs/workbench/contrib/terminal/common/terminal.ts b/src/vs/workbench/contrib/terminal/common/terminal.ts index b46c5367028..076030c6b4d 100644 --- a/src/vs/workbench/contrib/terminal/common/terminal.ts +++ b/src/vs/workbench/contrib/terminal/common/terminal.ts @@ -379,8 +379,7 @@ export interface ITerminalProcessManager extends IDisposable { createProcess(shellLaunchConfig: IShellLaunchConfig, cols: number, rows: number, isScreenReaderModeEnabled: boolean): Promise; write(data: string): void; setDimensions(cols: number, rows: number): void; - // TODO: Rename to charCount - acknowledgeDataEvent(ackId: number): void; + acknowledgeDataEvent(charCount: number): void; getInitialCwd(): Promise; getCwd(): Promise; @@ -514,8 +513,9 @@ export interface ITerminalChildProcess { * Acknowledge a data event has been parsed by the terminal, this is used to implement flow * control to ensure remote processes to not get too far ahead of the client and flood the * connection. + * @param charCount The number of characters being acknowledged. */ - acknowledgeDataEvent(ackId: number): void; + acknowledgeDataEvent(charCount: number): void; getInitialCwd(): Promise; getCwd(): Promise; diff --git a/src/vs/workbench/contrib/terminal/node/terminalProcess.ts b/src/vs/workbench/contrib/terminal/node/terminalProcess.ts index 4eec770e6c8..fa2ba90fb6d 100644 --- a/src/vs/workbench/contrib/terminal/node/terminalProcess.ts +++ b/src/vs/workbench/contrib/terminal/node/terminalProcess.ts @@ -186,14 +186,11 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess ptyProcess.onData(data => { // TODO: Periodically request ACK between low and high watermark const fakeLatency = 1000; - // TODO: if we're just sending the data length over, ackId doesn't need to exist at all // TODO: We don't need to ack everything, just count on the other side and ack every 1000/10000 bytes this._totalDataCharCount += data.length; setTimeout(() => { this._onProcessData.fire(data); }, fakeLatency); - // TODO: Use bytes, not messages - // console.log('check', this._currentAckRequestId - this._ackedRequestId, FlowControl.HighWatermarkBytes); if (this._totalDataCharCount - this._acknowledgedDataCharCount > FlowControl.HighWatermarkChars) { // TODO: Expose as public API in node-pty console.log('pause', this._totalDataCharCount - this._acknowledgedDataCharCount, '>', FlowControl.HighWatermarkChars); @@ -360,9 +357,10 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess } } - public acknowledgeDataEvent(ackId: number): void { - this._acknowledgedDataCharCount += ackId; + public acknowledgeDataEvent(charCount: number): void { + this._acknowledgedDataCharCount += charCount; if (this._totalDataCharCount - this._acknowledgedDataCharCount < FlowControl.LowWatermarkChars) { + // TODO: Check whether it is paused before resuming // TODO: Expose as public API in node-pty (this._ptyProcess as any).resume(); } From 3232112f9baf143a38e129aa23502957e594c0d5 Mon Sep 17 00:00:00 2001 From: Daniel Imms Date: Tue, 12 Jan 2021 12:08:58 -0800 Subject: [PATCH 6/9] Only resume if it's paused --- .../contrib/terminal/browser/terminalInstance.ts | 1 + .../contrib/terminal/node/terminalProcess.ts | 15 ++++++++------- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts b/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts index 780da6473b8..83f70c934b7 100644 --- a/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts +++ b/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts @@ -1023,6 +1023,7 @@ export class TerminalInstance extends Disposable implements ITerminalInstance { this._xterm?.write(ev.data, () => { this._latestXtermParseData = messageId; // TODO: Disable for local processes? + // TODO: We don't need to ack everything, just count on the other side and ack every 1000/10000 bytes this._processManager.acknowledgeDataEvent(ev.data.length); }); } diff --git a/src/vs/workbench/contrib/terminal/node/terminalProcess.ts b/src/vs/workbench/contrib/terminal/node/terminalProcess.ts index fa2ba90fb6d..55e2e0563d2 100644 --- a/src/vs/workbench/contrib/terminal/node/terminalProcess.ts +++ b/src/vs/workbench/contrib/terminal/node/terminalProcess.ts @@ -56,11 +56,13 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess private _writeQueue: string[] = []; private _writeTimeout: NodeJS.Timeout | undefined; private _delayedResizer: DelayedResizer | undefined; - private _totalDataCharCount: number = 0; - private _acknowledgedDataCharCount: number = 0; private readonly _initialCwd: string; private readonly _ptyOptions: pty.IPtyForkOptions | pty.IWindowsPtyForkOptions; + private _isPtyPaused: boolean = false; + private _totalDataCharCount: number = 0; + private _acknowledgedDataCharCount: number = 0; + public get exitMessage(): string | undefined { return this._exitMessage; } private readonly _onProcessData = this._register(new Emitter()); @@ -184,16 +186,15 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess this.onProcessReady(() => c()); }); ptyProcess.onData(data => { - // TODO: Periodically request ACK between low and high watermark const fakeLatency = 1000; - // TODO: We don't need to ack everything, just count on the other side and ack every 1000/10000 bytes this._totalDataCharCount += data.length; setTimeout(() => { this._onProcessData.fire(data); }, fakeLatency); - if (this._totalDataCharCount - this._acknowledgedDataCharCount > FlowControl.HighWatermarkChars) { + if (!this._isPtyPaused && this._totalDataCharCount - this._acknowledgedDataCharCount > FlowControl.HighWatermarkChars) { // TODO: Expose as public API in node-pty console.log('pause', this._totalDataCharCount - this._acknowledgedDataCharCount, '>', FlowControl.HighWatermarkChars); + this._isPtyPaused = true; (ptyProcess as any).pause(); } if (this._closeTimeout) { @@ -359,10 +360,10 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess public acknowledgeDataEvent(charCount: number): void { this._acknowledgedDataCharCount += charCount; - if (this._totalDataCharCount - this._acknowledgedDataCharCount < FlowControl.LowWatermarkChars) { - // TODO: Check whether it is paused before resuming + if (this._isPtyPaused && this._totalDataCharCount - this._acknowledgedDataCharCount < FlowControl.LowWatermarkChars) { // TODO: Expose as public API in node-pty (this._ptyProcess as any).resume(); + this._isPtyPaused = false; } } From 22c88cfaaebcc796589840d3a1f21b0d3a9c42d9 Mon Sep 17 00:00:00 2001 From: Daniel Imms Date: Tue, 12 Jan 2021 12:29:12 -0800 Subject: [PATCH 7/9] Batch ack events coming from client --- .../api/common/extHostTerminalService.ts | 6 +-- .../browser/terminalProcessManager.ts | 24 ++++++++++-- .../contrib/terminal/common/terminal.ts | 23 +++++++++++ .../contrib/terminal/node/terminalProcess.ts | 38 ++++++------------- 4 files changed, 57 insertions(+), 34 deletions(-) diff --git a/src/vs/workbench/api/common/extHostTerminalService.ts b/src/vs/workbench/api/common/extHostTerminalService.ts index 597b304f4fc..5f6034363c2 100644 --- a/src/vs/workbench/api/common/extHostTerminalService.ts +++ b/src/vs/workbench/api/common/extHostTerminalService.ts @@ -221,10 +221,8 @@ export class ExtHostPseudoterminal implements ITerminalChildProcess { } acknowledgeDataEvent(charCount: number): void { - // TODO: Determine whether ExtHostPseudoterminal terminals should support flow control, this - // would need resume/pause APIs - - // No-op + // No-op, flow control is not supported in extension owned terminals. If this is ever + // implemented it will need new pause and resume VS Code APIs. } getInitialCwd(): Promise { diff --git a/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts b/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts index bec1293ce6e..b783fec1988 100644 --- a/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts +++ b/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts @@ -6,7 +6,7 @@ import * as platform from 'vs/base/common/platform'; import * as terminalEnvironment from 'vs/workbench/contrib/terminal/common/terminalEnvironment'; import { env as processEnv } from 'vs/base/common/process'; -import { ProcessState, ITerminalProcessManager, IShellLaunchConfig, ITerminalConfigHelper, ITerminalChildProcess, IBeforeProcessDataEvent, ITerminalEnvironment, ITerminalLaunchError, IProcessDataEvent, ITerminalDimensionsOverride } from 'vs/workbench/contrib/terminal/common/terminal'; +import { ProcessState, ITerminalProcessManager, IShellLaunchConfig, ITerminalConfigHelper, ITerminalChildProcess, IBeforeProcessDataEvent, ITerminalEnvironment, ITerminalLaunchError, IProcessDataEvent, ITerminalDimensionsOverride, FlowControlConstants } from 'vs/workbench/contrib/terminal/common/terminal'; import { ILogService } from 'vs/platform/log/common/log'; import { Emitter, Event } from 'vs/base/common/event'; import { IHistoryService } from 'vs/workbench/services/history/common/history'; @@ -64,6 +64,7 @@ export class TerminalProcessManager extends Disposable implements ITerminalProce private _initialCwd: string | undefined; private _extEnvironmentVariableCollection: IMergedEnvironmentVariableCollection | undefined; private _environmentVariableInfo: IEnvironmentVariableInfo | undefined; + private _ackDataBufferer: AckDataBufferer; private readonly _onProcessReady = this._register(new Emitter()); public get onProcessReady(): Event { return this._onProcessReady.event; } @@ -111,6 +112,7 @@ export class TerminalProcessManager extends Disposable implements ITerminalProce }); }); this.ptyProcessReady.then(async () => await this.getLatency()); + this._ackDataBufferer = new AckDataBufferer(e => this._process?.acknowledgeDataEvent(e)); } public dispose(immediate: boolean = false): void { @@ -332,8 +334,7 @@ export class TerminalProcessManager extends Disposable implements ITerminalProce } public acknowledgeDataEvent(charCount: number): void { - // TODO: Batch these acknowledge calls (in proxy/remote connection?) - this._process?.acknowledgeDataEvent(charCount); + this._ackDataBufferer.ack(charCount); } private _onExit(exitCode: number | undefined): void { @@ -364,3 +365,20 @@ export class TerminalProcessManager extends Disposable implements ITerminalProce this._onEnvironmentVariableInfoChange.fire(this._environmentVariableInfo); } } + +class AckDataBufferer { + private _unsentCharCount: number = 0; + + constructor( + private readonly _callback: (charCount: number) => void + ) { + } + + ack(charCount: number) { + this._unsentCharCount += charCount; + while (this._unsentCharCount > FlowControlConstants.CharCountAckSize) { + this._unsentCharCount -= FlowControlConstants.CharCountAckSize; + this._callback(FlowControlConstants.CharCountAckSize); + } + } +} diff --git a/src/vs/workbench/contrib/terminal/common/terminal.ts b/src/vs/workbench/contrib/terminal/common/terminal.ts index 076030c6b4d..78f4ef2a552 100644 --- a/src/vs/workbench/contrib/terminal/common/terminal.ts +++ b/src/vs/workbench/contrib/terminal/common/terminal.ts @@ -522,6 +522,29 @@ export interface ITerminalChildProcess { getLatency(): Promise; } +export const enum FlowControlConstants { + /** + * The number of _unacknowledged_ chars to have been sent before the pty is paused in order for + * the client to catch up. + */ + HighWatermarkChars = 100000, + /** + * After flow control pauses the pty for the client the catch up, this is the number of + * _unacknowledged_ chars to have been caught up to on the client before resuming the pty again. + * This is used to attempt to prevent pauses in the flowing data; ideally while the pty is + * paused the number of unacknowledged chars would always be greater than 0 or the client will + * appear to stutter. In reality this balance is hard to accomplish though so heavy commands + * will likely pause as latency grows, not flooding the connection is the important thing as + * it's shared with other core functionality. + */ + LowWatermarkChars = 5000, + /** + * The number characters that are accumulated on the client side before sending an ack event. + * This must be less than or equal to LowWatermarkChars or the terminal max never unpause. + */ + CharCountAckSize = 5000 +} + export const enum TERMINAL_COMMAND_ID { FIND_NEXT = 'workbench.action.terminal.findNext', FIND_PREVIOUS = 'workbench.action.terminal.findPrevious', diff --git a/src/vs/workbench/contrib/terminal/node/terminalProcess.ts b/src/vs/workbench/contrib/terminal/node/terminalProcess.ts index 55e2e0563d2..26c3c1f429e 100644 --- a/src/vs/workbench/contrib/terminal/node/terminalProcess.ts +++ b/src/vs/workbench/contrib/terminal/node/terminalProcess.ts @@ -11,7 +11,7 @@ import * as os from 'os'; import { Event, Emitter } from 'vs/base/common/event'; import { getWindowsBuildNumber } from 'vs/workbench/contrib/terminal/node/terminal'; import { Disposable } from 'vs/base/common/lifecycle'; -import { IShellLaunchConfig, ITerminalChildProcess, ITerminalDimensionsOverride, ITerminalLaunchError } from 'vs/workbench/contrib/terminal/common/terminal'; +import { IShellLaunchConfig, ITerminalChildProcess, ITerminalDimensionsOverride, ITerminalLaunchError, FlowControlConstants } from 'vs/workbench/contrib/terminal/common/terminal'; import { exec } from 'child_process'; import { ILogService } from 'vs/platform/log/common/log'; import { stat } from 'vs/base/node/pfs'; @@ -26,24 +26,6 @@ import { localize } from 'vs/nls'; const WRITE_MAX_CHUNK_SIZE = 50; const WRITE_INTERVAL_MS = 5; -const enum FlowControl { - /** - * The number of _unacknowledged_ chars to have been sent before the pty is paused in order for - * the client to catch up. - */ - HighWatermarkChars = 100000, - /** - * After flow control pauses the pty for the client the catch up, this is the number of - * _unacknowledged_ chars to have been caught up to on the client before resuming the pty again. - * This is used to attempt to prevent pauses in the flowing data; ideally while the pty is - * paused the number of unacknowledged chars would always be greater than 0 or the client will - * appear to stutter. In reality this balance is hard to accomplish though so heavy commands - * will likely pause as latency grows, not flooding the connection is the important thing as - * it's shared with other core functionality. - */ - LowWatermarkChars = 5000 -} - export class TerminalProcess extends Disposable implements ITerminalChildProcess { private _exitCode: number | undefined; private _exitMessage: string | undefined; @@ -60,8 +42,7 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess private readonly _ptyOptions: pty.IPtyForkOptions | pty.IWindowsPtyForkOptions; private _isPtyPaused: boolean = false; - private _totalDataCharCount: number = 0; - private _acknowledgedDataCharCount: number = 0; + private _unacknowledgedCharCount: number = 0; public get exitMessage(): string | undefined { return this._exitMessage; } @@ -187,14 +168,14 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess }); ptyProcess.onData(data => { const fakeLatency = 1000; - this._totalDataCharCount += data.length; + this._unacknowledgedCharCount += data.length; setTimeout(() => { this._onProcessData.fire(data); }, fakeLatency); - if (!this._isPtyPaused && this._totalDataCharCount - this._acknowledgedDataCharCount > FlowControl.HighWatermarkChars) { - // TODO: Expose as public API in node-pty - console.log('pause', this._totalDataCharCount - this._acknowledgedDataCharCount, '>', FlowControl.HighWatermarkChars); + if (!this._isPtyPaused && this._unacknowledgedCharCount > FlowControlConstants.HighWatermarkChars) { + console.log(`pause (${this._unacknowledgedCharCount} > ${FlowControlConstants.HighWatermarkChars}`); this._isPtyPaused = true; + // TODO: Expose as public API in node-pty (ptyProcess as any).pause(); } if (this._closeTimeout) { @@ -359,8 +340,11 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess } public acknowledgeDataEvent(charCount: number): void { - this._acknowledgedDataCharCount += charCount; - if (this._isPtyPaused && this._totalDataCharCount - this._acknowledgedDataCharCount < FlowControl.LowWatermarkChars) { + // Prevent lower than 0 to heal from errors + this._unacknowledgedCharCount = Math.max(this._unacknowledgedCharCount - charCount, 0); + console.log(`Ack ${charCount} chars (unacknowledged: ${this._unacknowledgedCharCount})`); + if (this._isPtyPaused && this._unacknowledgedCharCount < FlowControlConstants.LowWatermarkChars) { + console.log(`Resume (${this._unacknowledgedCharCount} < ${FlowControlConstants.LowWatermarkChars})`); // TODO: Expose as public API in node-pty (this._ptyProcess as any).resume(); this._isPtyPaused = false; From f8ec60aa06f4197e42df99a63046a42fb5e499a3 Mon Sep 17 00:00:00 2001 From: Daniel Imms Date: Thu, 14 Jan 2021 06:04:03 -0800 Subject: [PATCH 8/9] Add flow control setting, remove fake latency --- .../api/browser/mainThreadTerminalService.ts | 7 ++++-- .../workbench/api/common/extHost.protocol.ts | 1 + .../api/node/extHostTerminalService.ts | 3 ++- .../terminal/browser/terminalInstance.ts | 6 ++--- .../browser/terminalProcessManager.ts | 10 +++++++-- .../contrib/terminal/common/terminal.ts | 6 +++++ .../terminal/common/terminalConfiguration.ts | 5 +++++ .../contrib/terminal/node/terminalProcess.ts | 22 ++++++++++--------- 8 files changed, 42 insertions(+), 18 deletions(-) diff --git a/src/vs/workbench/api/browser/mainThreadTerminalService.ts b/src/vs/workbench/api/browser/mainThreadTerminalService.ts index 1888982a193..167eb17fc8e 100644 --- a/src/vs/workbench/api/browser/mainThreadTerminalService.ts +++ b/src/vs/workbench/api/browser/mainThreadTerminalService.ts @@ -4,7 +4,7 @@ *--------------------------------------------------------------------------------------------*/ import { DisposableStore, Disposable, IDisposable } from 'vs/base/common/lifecycle'; -import { IShellLaunchConfig, ITerminalProcessExtHostProxy, ISpawnExtHostProcessRequest, ITerminalDimensions, IAvailableShellsRequest, IDefaultShellAndArgsRequest, IStartExtensionTerminalRequest } from 'vs/workbench/contrib/terminal/common/terminal'; +import { IShellLaunchConfig, ITerminalProcessExtHostProxy, ISpawnExtHostProcessRequest, ITerminalDimensions, IAvailableShellsRequest, IDefaultShellAndArgsRequest, IStartExtensionTerminalRequest, ITerminalConfiguration, TERMINAL_CONFIG_SECTION } from 'vs/workbench/contrib/terminal/common/terminal'; import { ExtHostContext, ExtHostTerminalServiceShape, MainThreadTerminalServiceShape, MainContext, IExtHostContext, IShellLaunchConfigDto, TerminalLaunchConfig, ITerminalDimensionsDto, TerminalIdentifier } from 'vs/workbench/api/common/extHost.protocol'; import { extHostNamedCustomer } from 'vs/workbench/api/common/extHostCustomers'; import { URI } from 'vs/base/common/uri'; @@ -16,6 +16,7 @@ import { TerminalDataBufferer } from 'vs/workbench/contrib/terminal/common/termi import { IEnvironmentVariableService, ISerializableEnvironmentVariableCollection } from 'vs/workbench/contrib/terminal/common/environmentVariable'; import { deserializeEnvironmentVariableCollection, serializeEnvironmentVariableCollection } from 'vs/workbench/contrib/terminal/common/environmentVariableShared'; import { ILogService } from 'vs/platform/log/common/log'; +import { IConfigurationService } from 'vs/platform/configuration/common/configuration'; @extHostNamedCustomer(MainContext.MainThreadTerminalService) export class MainThreadTerminalService implements MainThreadTerminalServiceShape { @@ -46,6 +47,7 @@ export class MainThreadTerminalService implements MainThreadTerminalServiceShape @IRemoteAgentService private readonly _remoteAgentService: IRemoteAgentService, @IInstantiationService private readonly _instantiationService: IInstantiationService, @IEnvironmentVariableService private readonly _environmentVariableService: IEnvironmentVariableService, + @IConfigurationService private readonly _configurationService: IConfigurationService, @ILogService private readonly _logService: ILogService, ) { this._proxy = extHostContext.getProxy(ExtHostContext.ExtHostTerminalService); @@ -259,7 +261,8 @@ export class MainThreadTerminalService implements MainThreadTerminalServiceShape executable: request.shellLaunchConfig.executable, args: request.shellLaunchConfig.args, cwd: request.shellLaunchConfig.cwd, - env: request.shellLaunchConfig.env + env: request.shellLaunchConfig.env, + flowControl: this._configurationService.getValue(TERMINAL_CONFIG_SECTION).flowControl }; this._logService.trace('Spawning ext host process', { terminalId: proxy.terminalId, shellLaunchConfigDto, request }); diff --git a/src/vs/workbench/api/common/extHost.protocol.ts b/src/vs/workbench/api/common/extHost.protocol.ts index d3a5b973021..6777ffa6cea 100644 --- a/src/vs/workbench/api/common/extHost.protocol.ts +++ b/src/vs/workbench/api/common/extHost.protocol.ts @@ -1506,6 +1506,7 @@ export interface IShellLaunchConfigDto { cwd?: string | UriComponents; env?: { [key: string]: string | null; }; hideFromUser?: boolean; + flowControl?: boolean; } export interface IShellDefinitionDto { diff --git a/src/vs/workbench/api/node/extHostTerminalService.ts b/src/vs/workbench/api/node/extHostTerminalService.ts index 127814dbb0a..adad271b38f 100644 --- a/src/vs/workbench/api/node/extHostTerminalService.ts +++ b/src/vs/workbench/api/node/extHostTerminalService.ts @@ -149,7 +149,8 @@ export class ExtHostTerminalService extends BaseExtHostTerminalService { executable: shellLaunchConfigDto.executable, args: shellLaunchConfigDto.args, cwd: typeof shellLaunchConfigDto.cwd === 'string' ? shellLaunchConfigDto.cwd : URI.revive(shellLaunchConfigDto.cwd), - env: shellLaunchConfigDto.env + env: shellLaunchConfigDto.env, + flowControl: shellLaunchConfigDto.flowControl }; // Merge in shell and args from settings diff --git a/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts b/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts index 83f70c934b7..59eee00f4d9 100644 --- a/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts +++ b/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts @@ -1022,9 +1022,9 @@ export class TerminalInstance extends Disposable implements ITerminalInstance { const messageId = ++this._latestXtermWriteData; this._xterm?.write(ev.data, () => { this._latestXtermParseData = messageId; - // TODO: Disable for local processes? - // TODO: We don't need to ack everything, just count on the other side and ack every 1000/10000 bytes - this._processManager.acknowledgeDataEvent(ev.data.length); + if (this._shellLaunchConfig.flowControl) { + this._processManager.acknowledgeDataEvent(ev.data.length); + } }); } } diff --git a/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts b/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts index b783fec1988..1fe02f4fed4 100644 --- a/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts +++ b/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts @@ -133,7 +133,10 @@ export class TerminalProcessManager extends Disposable implements ITerminalProce rows: number, isScreenReaderModeEnabled: boolean ): Promise { + shellLaunchConfig.flowControl = this._configHelper.config.flowControl; if (shellLaunchConfig.isExtensionTerminal) { + // Flow control is not supported for extension terminals + shellLaunchConfig.flowControl = false; this._processType = ProcessType.ExtensionTerminal; this._process = this._instantiationService.createInstance(TerminalProcessExtHostProxy, this._terminalId, shellLaunchConfig, undefined, cols, rows, this._configHelper); } else { @@ -169,7 +172,10 @@ export class TerminalProcessManager extends Disposable implements ITerminalProce this._process = this._instantiationService.createInstance(TerminalProcessExtHostProxy, this._terminalId, shellLaunchConfig, activeWorkspaceRootUri, cols, rows, this._configHelper); } } else { - this._process = await this._launchProcess(shellLaunchConfig, cols, rows, this.userHome, isScreenReaderModeEnabled); + // Flow control is not needed for ptys hosted in the same process (ie. the electron + // renderer). + shellLaunchConfig.flowControl = false; + this._process = await this._launchLocalProcess(shellLaunchConfig, cols, rows, this.userHome, isScreenReaderModeEnabled); } } @@ -223,7 +229,7 @@ export class TerminalProcessManager extends Disposable implements ITerminalProce return undefined; } - private async _launchProcess( + private async _launchLocalProcess( shellLaunchConfig: IShellLaunchConfig, cols: number, rows: number, diff --git a/src/vs/workbench/contrib/terminal/common/terminal.ts b/src/vs/workbench/contrib/terminal/common/terminal.ts index 78f4ef2a552..40e4f9f8963 100644 --- a/src/vs/workbench/contrib/terminal/common/terminal.ts +++ b/src/vs/workbench/contrib/terminal/common/terminal.ts @@ -138,6 +138,7 @@ export interface ITerminalConfiguration { localEchoStyle: 'bold' | 'dim' | 'italic' | 'underlined' | 'inverted' | string; serverSpawn: boolean; enablePersistentSessions: boolean; + flowControl: boolean; } export const DEFAULT_LOCAL_ECHO_EXCLUDE: ReadonlyArray = ['vim', 'vi', 'nano', 'tmux']; @@ -287,6 +288,11 @@ export interface IShellLaunchConfig { * a terminal used to drive some VS Code feature. */ isFeatureTerminal?: boolean; + + /** + * Whether flow control is enabled for this terminal. + */ + flowControl?: boolean; } /** diff --git a/src/vs/workbench/contrib/terminal/common/terminalConfiguration.ts b/src/vs/workbench/contrib/terminal/common/terminalConfiguration.ts index d2fe5d11fec..23ad043b2b2 100644 --- a/src/vs/workbench/contrib/terminal/common/terminalConfiguration.ts +++ b/src/vs/workbench/contrib/terminal/common/terminalConfiguration.ts @@ -397,6 +397,11 @@ export const terminalConfiguration: IConfigurationNode = { description: localize('terminal.integrated.enablePersistentSessions', "Experimental: persist terminal sessions for the workspace across window reloads. Currently only supported in VS Code Remote workspaces."), type: 'boolean', default: true + }, + 'terminal.integrated.flowControl': { + description: localize('terminal.integrated.flowControl', "Experimental: whether to enable flow control which will slow the program on the remote side to avoid flooding remote connections with terminal output. This setting has no effect for local terminals and terminals where the output/input is controlled by an extension. Changing this will only affect new terminals."), + type: 'boolean', + default: false } } }; diff --git a/src/vs/workbench/contrib/terminal/node/terminalProcess.ts b/src/vs/workbench/contrib/terminal/node/terminalProcess.ts index 26c3c1f429e..04e1cbc797c 100644 --- a/src/vs/workbench/contrib/terminal/node/terminalProcess.ts +++ b/src/vs/workbench/contrib/terminal/node/terminalProcess.ts @@ -167,17 +167,16 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess this.onProcessReady(() => c()); }); ptyProcess.onData(data => { - const fakeLatency = 1000; - this._unacknowledgedCharCount += data.length; - setTimeout(() => { - this._onProcessData.fire(data); - }, fakeLatency); - if (!this._isPtyPaused && this._unacknowledgedCharCount > FlowControlConstants.HighWatermarkChars) { - console.log(`pause (${this._unacknowledgedCharCount} > ${FlowControlConstants.HighWatermarkChars}`); - this._isPtyPaused = true; - // TODO: Expose as public API in node-pty - (ptyProcess as any).pause(); + if (this._shellLaunchConfig.flowControl) { + this._unacknowledgedCharCount += data.length; + if (!this._isPtyPaused && this._unacknowledgedCharCount > FlowControlConstants.HighWatermarkChars) { + console.log(`pause (${this._unacknowledgedCharCount} > ${FlowControlConstants.HighWatermarkChars}`); + this._isPtyPaused = true; + // TODO: Expose as public API in node-pty + (ptyProcess as any).pause(); + } } + this._onProcessData.fire(data); if (this._closeTimeout) { clearTimeout(this._closeTimeout); this._queueProcessExit(); @@ -340,6 +339,9 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess } public acknowledgeDataEvent(charCount: number): void { + if (!this._shellLaunchConfig.flowControl) { + return; + } // Prevent lower than 0 to heal from errors this._unacknowledgedCharCount = Math.max(this._unacknowledgedCharCount - charCount, 0); console.log(`Ack ${charCount} chars (unacknowledged: ${this._unacknowledgedCharCount})`); From a79276dc649e921c043d28594262cea938b5f180 Mon Sep 17 00:00:00 2001 From: Daniel Imms Date: Thu, 14 Jan 2021 06:17:29 -0800 Subject: [PATCH 9/9] Move to log service --- src/vs/workbench/contrib/terminal/node/terminalProcess.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/vs/workbench/contrib/terminal/node/terminalProcess.ts b/src/vs/workbench/contrib/terminal/node/terminalProcess.ts index 04e1cbc797c..4d7816c1ea9 100644 --- a/src/vs/workbench/contrib/terminal/node/terminalProcess.ts +++ b/src/vs/workbench/contrib/terminal/node/terminalProcess.ts @@ -170,7 +170,7 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess if (this._shellLaunchConfig.flowControl) { this._unacknowledgedCharCount += data.length; if (!this._isPtyPaused && this._unacknowledgedCharCount > FlowControlConstants.HighWatermarkChars) { - console.log(`pause (${this._unacknowledgedCharCount} > ${FlowControlConstants.HighWatermarkChars}`); + this._logService.trace(`Flow control: Pause (${this._unacknowledgedCharCount} > ${FlowControlConstants.HighWatermarkChars})`); this._isPtyPaused = true; // TODO: Expose as public API in node-pty (ptyProcess as any).pause(); @@ -344,9 +344,9 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess } // Prevent lower than 0 to heal from errors this._unacknowledgedCharCount = Math.max(this._unacknowledgedCharCount - charCount, 0); - console.log(`Ack ${charCount} chars (unacknowledged: ${this._unacknowledgedCharCount})`); + this._logService.trace(`Flow control: Ack ${charCount} chars (unacknowledged: ${this._unacknowledgedCharCount})`); if (this._isPtyPaused && this._unacknowledgedCharCount < FlowControlConstants.LowWatermarkChars) { - console.log(`Resume (${this._unacknowledgedCharCount} < ${FlowControlConstants.LowWatermarkChars})`); + this._logService.trace(`Flow control: Resume (${this._unacknowledgedCharCount} < ${FlowControlConstants.LowWatermarkChars})`); // TODO: Expose as public API in node-pty (this._ptyProcess as any).resume(); this._isPtyPaused = false;