diff --git a/src/vs/workbench/api/browser/mainThreadTerminalService.ts b/src/vs/workbench/api/browser/mainThreadTerminalService.ts index cf16787d23b..167eb17fc8e 100644 --- a/src/vs/workbench/api/browser/mainThreadTerminalService.ts +++ b/src/vs/workbench/api/browser/mainThreadTerminalService.ts @@ -4,7 +4,7 @@ *--------------------------------------------------------------------------------------------*/ import { DisposableStore, Disposable, IDisposable } from 'vs/base/common/lifecycle'; -import { IShellLaunchConfig, ITerminalProcessExtHostProxy, ISpawnExtHostProcessRequest, ITerminalDimensions, IAvailableShellsRequest, IDefaultShellAndArgsRequest, IStartExtensionTerminalRequest } from 'vs/workbench/contrib/terminal/common/terminal'; +import { IShellLaunchConfig, ITerminalProcessExtHostProxy, ISpawnExtHostProcessRequest, ITerminalDimensions, IAvailableShellsRequest, IDefaultShellAndArgsRequest, IStartExtensionTerminalRequest, ITerminalConfiguration, TERMINAL_CONFIG_SECTION } from 'vs/workbench/contrib/terminal/common/terminal'; import { ExtHostContext, ExtHostTerminalServiceShape, MainThreadTerminalServiceShape, MainContext, IExtHostContext, IShellLaunchConfigDto, TerminalLaunchConfig, ITerminalDimensionsDto, TerminalIdentifier } from 'vs/workbench/api/common/extHost.protocol'; import { extHostNamedCustomer } from 'vs/workbench/api/common/extHostCustomers'; import { URI } from 'vs/base/common/uri'; @@ -16,6 +16,7 @@ import { TerminalDataBufferer } from 'vs/workbench/contrib/terminal/common/termi import { IEnvironmentVariableService, ISerializableEnvironmentVariableCollection } from 'vs/workbench/contrib/terminal/common/environmentVariable'; import { deserializeEnvironmentVariableCollection, serializeEnvironmentVariableCollection } from 'vs/workbench/contrib/terminal/common/environmentVariableShared'; import { ILogService } from 'vs/platform/log/common/log'; +import { IConfigurationService } from 'vs/platform/configuration/common/configuration'; @extHostNamedCustomer(MainContext.MainThreadTerminalService) export class MainThreadTerminalService implements MainThreadTerminalServiceShape { @@ -46,6 +47,7 @@ export class MainThreadTerminalService implements MainThreadTerminalServiceShape @IRemoteAgentService private readonly _remoteAgentService: IRemoteAgentService, @IInstantiationService private readonly _instantiationService: IInstantiationService, @IEnvironmentVariableService private readonly _environmentVariableService: IEnvironmentVariableService, + @IConfigurationService private readonly _configurationService: IConfigurationService, @ILogService private readonly _logService: ILogService, ) { this._proxy = extHostContext.getProxy(ExtHostContext.ExtHostTerminalService); @@ -259,7 +261,8 @@ export class MainThreadTerminalService implements MainThreadTerminalServiceShape executable: request.shellLaunchConfig.executable, args: request.shellLaunchConfig.args, cwd: request.shellLaunchConfig.cwd, - env: request.shellLaunchConfig.env + env: request.shellLaunchConfig.env, + flowControl: this._configurationService.getValue(TERMINAL_CONFIG_SECTION).flowControl }; this._logService.trace('Spawning ext host process', { terminalId: proxy.terminalId, shellLaunchConfigDto, request }); @@ -272,6 +275,7 @@ export class MainThreadTerminalService implements MainThreadTerminalServiceShape request.isWorkspaceShellAllowed ).then(request.callback, request.callback); + proxy.onAcknowledgeDataEvent(charCount => this._proxy.$acceptProcessAckDataEvent(proxy.terminalId, charCount)); proxy.onInput(data => this._proxy.$acceptProcessInput(proxy.terminalId, data)); proxy.onResize(dimensions => this._proxy.$acceptProcessResize(proxy.terminalId, dimensions.cols, dimensions.rows)); proxy.onShutdown(immediate => this._proxy.$acceptProcessShutdown(proxy.terminalId, immediate)); diff --git a/src/vs/workbench/api/common/extHost.protocol.ts b/src/vs/workbench/api/common/extHost.protocol.ts index d997cd5f951..c2a8614457f 100644 --- a/src/vs/workbench/api/common/extHost.protocol.ts +++ b/src/vs/workbench/api/common/extHost.protocol.ts @@ -1508,6 +1508,7 @@ export interface IShellLaunchConfigDto { cwd?: string | UriComponents; env?: { [key: string]: string | null; }; hideFromUser?: boolean; + flowControl?: boolean; } export interface IShellDefinitionDto { @@ -1547,6 +1548,7 @@ export interface ExtHostTerminalServiceShape { $acceptTerminalMaximumDimensions(id: number, cols: number, rows: number): void; $spawnExtHostProcess(id: number, shellLaunchConfig: IShellLaunchConfigDto, activeWorkspaceRootUri: UriComponents | undefined, cols: number, rows: number, isWorkspaceShellAllowed: boolean): Promise; $startExtensionTerminal(id: number, initialDimensions: ITerminalDimensionsDto | undefined): Promise; + $acceptProcessAckDataEvent(id: number, charCount: number): void; $acceptProcessInput(id: number, data: string): void; $acceptProcessResize(id: number, cols: number, rows: number): void; $acceptProcessShutdown(id: number, immediate: boolean): void; diff --git a/src/vs/workbench/api/common/extHostTerminalService.ts b/src/vs/workbench/api/common/extHostTerminalService.ts index 00e09861cfb..5f6034363c2 100644 --- a/src/vs/workbench/api/common/extHostTerminalService.ts +++ b/src/vs/workbench/api/common/extHostTerminalService.ts @@ -220,6 +220,11 @@ export class ExtHostPseudoterminal implements ITerminalChildProcess { } } + acknowledgeDataEvent(charCount: number): void { + // No-op, flow control is not supported in extension owned terminals. If this is ever + // implemented it will need new pause and resume VS Code APIs. + } + getInitialCwd(): Promise { return Promise.resolve(''); } @@ -488,6 +493,10 @@ export abstract class BaseExtHostTerminalService extends Disposable implements I return disposables; } + public $acceptProcessAckDataEvent(id: number, charCount: number): void { + this._terminalProcesses.get(id)?.acknowledgeDataEvent(charCount); + } + public $acceptProcessInput(id: number, data: string): void { this._terminalProcesses.get(id)?.input(data); } diff --git a/src/vs/workbench/api/node/extHostTerminalService.ts b/src/vs/workbench/api/node/extHostTerminalService.ts index 127814dbb0a..adad271b38f 100644 --- a/src/vs/workbench/api/node/extHostTerminalService.ts +++ b/src/vs/workbench/api/node/extHostTerminalService.ts @@ -149,7 +149,8 @@ export class ExtHostTerminalService extends BaseExtHostTerminalService { executable: shellLaunchConfigDto.executable, args: shellLaunchConfigDto.args, cwd: typeof shellLaunchConfigDto.cwd === 'string' ? shellLaunchConfigDto.cwd : URI.revive(shellLaunchConfigDto.cwd), - env: shellLaunchConfigDto.env + env: shellLaunchConfigDto.env, + flowControl: shellLaunchConfigDto.flowControl }; // Merge in shell and args from settings diff --git a/src/vs/workbench/contrib/terminal/browser/remoteTerminalService.ts b/src/vs/workbench/contrib/terminal/browser/remoteTerminalService.ts index b2b5ec36d29..872e9a36fbf 100644 --- a/src/vs/workbench/contrib/terminal/browser/remoteTerminalService.ts +++ b/src/vs/workbench/contrib/terminal/browser/remoteTerminalService.ts @@ -252,6 +252,10 @@ export class RemoteTerminalProcess extends Disposable implements ITerminalChildP }); } + public acknowledgeDataEvent(charCount: number): void { + // TODO: Support flow control for server spawned processes + } + public async getInitialCwd(): Promise { await this._startBarrier.wait(); return this._remoteTerminalChannel.getTerminalInitialCwd(this._remoteTerminalId); diff --git a/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts b/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts index ff6244ea384..59eee00f4d9 100644 --- a/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts +++ b/src/vs/workbench/contrib/terminal/browser/terminalInstance.ts @@ -1020,7 +1020,12 @@ export class TerminalInstance extends Disposable implements ITerminalInstance { this._xtermCore?.writeSync(ev.data); } else { const messageId = ++this._latestXtermWriteData; - this._xterm?.write(ev.data, () => this._latestXtermParseData = messageId); + this._xterm?.write(ev.data, () => { + this._latestXtermParseData = messageId; + if (this._shellLaunchConfig.flowControl) { + this._processManager.acknowledgeDataEvent(ev.data.length); + } + }); } } diff --git a/src/vs/workbench/contrib/terminal/browser/terminalProcessExtHostProxy.ts b/src/vs/workbench/contrib/terminal/browser/terminalProcessExtHostProxy.ts index c7069f89ff0..0be5cf88be5 100644 --- a/src/vs/workbench/contrib/terminal/browser/terminalProcessExtHostProxy.ts +++ b/src/vs/workbench/contrib/terminal/browser/terminalProcessExtHostProxy.ts @@ -34,6 +34,8 @@ export class TerminalProcessExtHostProxy extends Disposable implements ITerminal public readonly onInput: Event = this._onInput.event; private readonly _onResize: Emitter<{ cols: number, rows: number }> = this._register(new Emitter<{ cols: number, rows: number }>()); public readonly onResize: Event<{ cols: number, rows: number }> = this._onResize.event; + private readonly _onAcknowledgeDataEvent = this._register(new Emitter()); + public readonly onAcknowledgeDataEvent: Event = this._onAcknowledgeDataEvent.event; private readonly _onShutdown = this._register(new Emitter()); public readonly onShutdown: Event = this._onShutdown.event; private readonly _onRequestInitialCwd = this._register(new Emitter()); @@ -139,6 +141,10 @@ export class TerminalProcessExtHostProxy extends Disposable implements ITerminal this._onResize.fire({ cols, rows }); } + public acknowledgeDataEvent(charCount: number): void { + this._onAcknowledgeDataEvent.fire(charCount); + } + public getInitialCwd(): Promise { return new Promise(resolve => { this._onRequestInitialCwd.fire(); diff --git a/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts b/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts index 043a6e04af7..1fe02f4fed4 100644 --- a/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts +++ b/src/vs/workbench/contrib/terminal/browser/terminalProcessManager.ts @@ -6,7 +6,7 @@ import * as platform from 'vs/base/common/platform'; import * as terminalEnvironment from 'vs/workbench/contrib/terminal/common/terminalEnvironment'; import { env as processEnv } from 'vs/base/common/process'; -import { ProcessState, ITerminalProcessManager, IShellLaunchConfig, ITerminalConfigHelper, ITerminalChildProcess, IBeforeProcessDataEvent, ITerminalEnvironment, ITerminalLaunchError, IProcessDataEvent, ITerminalDimensionsOverride } from 'vs/workbench/contrib/terminal/common/terminal'; +import { ProcessState, ITerminalProcessManager, IShellLaunchConfig, ITerminalConfigHelper, ITerminalChildProcess, IBeforeProcessDataEvent, ITerminalEnvironment, ITerminalLaunchError, IProcessDataEvent, ITerminalDimensionsOverride, FlowControlConstants } from 'vs/workbench/contrib/terminal/common/terminal'; import { ILogService } from 'vs/platform/log/common/log'; import { Emitter, Event } from 'vs/base/common/event'; import { IHistoryService } from 'vs/workbench/services/history/common/history'; @@ -64,6 +64,7 @@ export class TerminalProcessManager extends Disposable implements ITerminalProce private _initialCwd: string | undefined; private _extEnvironmentVariableCollection: IMergedEnvironmentVariableCollection | undefined; private _environmentVariableInfo: IEnvironmentVariableInfo | undefined; + private _ackDataBufferer: AckDataBufferer; private readonly _onProcessReady = this._register(new Emitter()); public get onProcessReady(): Event { return this._onProcessReady.event; } @@ -111,6 +112,7 @@ export class TerminalProcessManager extends Disposable implements ITerminalProce }); }); this.ptyProcessReady.then(async () => await this.getLatency()); + this._ackDataBufferer = new AckDataBufferer(e => this._process?.acknowledgeDataEvent(e)); } public dispose(immediate: boolean = false): void { @@ -131,7 +133,10 @@ export class TerminalProcessManager extends Disposable implements ITerminalProce rows: number, isScreenReaderModeEnabled: boolean ): Promise { + shellLaunchConfig.flowControl = this._configHelper.config.flowControl; if (shellLaunchConfig.isExtensionTerminal) { + // Flow control is not supported for extension terminals + shellLaunchConfig.flowControl = false; this._processType = ProcessType.ExtensionTerminal; this._process = this._instantiationService.createInstance(TerminalProcessExtHostProxy, this._terminalId, shellLaunchConfig, undefined, cols, rows, this._configHelper); } else { @@ -167,7 +172,10 @@ export class TerminalProcessManager extends Disposable implements ITerminalProce this._process = this._instantiationService.createInstance(TerminalProcessExtHostProxy, this._terminalId, shellLaunchConfig, activeWorkspaceRootUri, cols, rows, this._configHelper); } } else { - this._process = await this._launchProcess(shellLaunchConfig, cols, rows, this.userHome, isScreenReaderModeEnabled); + // Flow control is not needed for ptys hosted in the same process (ie. the electron + // renderer). + shellLaunchConfig.flowControl = false; + this._process = await this._launchLocalProcess(shellLaunchConfig, cols, rows, this.userHome, isScreenReaderModeEnabled); } } @@ -221,7 +229,7 @@ export class TerminalProcessManager extends Disposable implements ITerminalProce return undefined; } - private async _launchProcess( + private async _launchLocalProcess( shellLaunchConfig: IShellLaunchConfig, cols: number, rows: number, @@ -331,6 +339,10 @@ export class TerminalProcessManager extends Disposable implements ITerminalProce return Promise.resolve(this._latency); } + public acknowledgeDataEvent(charCount: number): void { + this._ackDataBufferer.ack(charCount); + } + private _onExit(exitCode: number | undefined): void { this._process = null; @@ -359,3 +371,20 @@ export class TerminalProcessManager extends Disposable implements ITerminalProce this._onEnvironmentVariableInfoChange.fire(this._environmentVariableInfo); } } + +class AckDataBufferer { + private _unsentCharCount: number = 0; + + constructor( + private readonly _callback: (charCount: number) => void + ) { + } + + ack(charCount: number) { + this._unsentCharCount += charCount; + while (this._unsentCharCount > FlowControlConstants.CharCountAckSize) { + this._unsentCharCount -= FlowControlConstants.CharCountAckSize; + this._callback(FlowControlConstants.CharCountAckSize); + } + } +} diff --git a/src/vs/workbench/contrib/terminal/common/terminal.ts b/src/vs/workbench/contrib/terminal/common/terminal.ts index a3b3f9eff25..40e4f9f8963 100644 --- a/src/vs/workbench/contrib/terminal/common/terminal.ts +++ b/src/vs/workbench/contrib/terminal/common/terminal.ts @@ -138,6 +138,7 @@ export interface ITerminalConfiguration { localEchoStyle: 'bold' | 'dim' | 'italic' | 'underlined' | 'inverted' | string; serverSpawn: boolean; enablePersistentSessions: boolean; + flowControl: boolean; } export const DEFAULT_LOCAL_ECHO_EXCLUDE: ReadonlyArray = ['vim', 'vi', 'nano', 'tmux']; @@ -287,6 +288,11 @@ export interface IShellLaunchConfig { * a terminal used to drive some VS Code feature. */ isFeatureTerminal?: boolean; + + /** + * Whether flow control is enabled for this terminal. + */ + flowControl?: boolean; } /** @@ -379,6 +385,7 @@ export interface ITerminalProcessManager extends IDisposable { createProcess(shellLaunchConfig: IShellLaunchConfig, cols: number, rows: number, isScreenReaderModeEnabled: boolean): Promise; write(data: string): void; setDimensions(cols: number, rows: number): void; + acknowledgeDataEvent(charCount: number): void; getInitialCwd(): Promise; getCwd(): Promise; @@ -419,6 +426,7 @@ export interface ITerminalProcessExtHostProxy extends IDisposable { onInput: Event; onResize: Event<{ cols: number, rows: number }>; + onAcknowledgeDataEvent: Event; onShutdown: Event; onRequestInitialCwd: Event; onRequestCwd: Event; @@ -507,11 +515,42 @@ export interface ITerminalChildProcess { input(data: string): void; resize(cols: number, rows: number): void; + /** + * Acknowledge a data event has been parsed by the terminal, this is used to implement flow + * control to ensure remote processes to not get too far ahead of the client and flood the + * connection. + * @param charCount The number of characters being acknowledged. + */ + acknowledgeDataEvent(charCount: number): void; + getInitialCwd(): Promise; getCwd(): Promise; getLatency(): Promise; } +export const enum FlowControlConstants { + /** + * The number of _unacknowledged_ chars to have been sent before the pty is paused in order for + * the client to catch up. + */ + HighWatermarkChars = 100000, + /** + * After flow control pauses the pty for the client the catch up, this is the number of + * _unacknowledged_ chars to have been caught up to on the client before resuming the pty again. + * This is used to attempt to prevent pauses in the flowing data; ideally while the pty is + * paused the number of unacknowledged chars would always be greater than 0 or the client will + * appear to stutter. In reality this balance is hard to accomplish though so heavy commands + * will likely pause as latency grows, not flooding the connection is the important thing as + * it's shared with other core functionality. + */ + LowWatermarkChars = 5000, + /** + * The number characters that are accumulated on the client side before sending an ack event. + * This must be less than or equal to LowWatermarkChars or the terminal max never unpause. + */ + CharCountAckSize = 5000 +} + export const enum TERMINAL_COMMAND_ID { FIND_NEXT = 'workbench.action.terminal.findNext', FIND_PREVIOUS = 'workbench.action.terminal.findPrevious', diff --git a/src/vs/workbench/contrib/terminal/common/terminalConfiguration.ts b/src/vs/workbench/contrib/terminal/common/terminalConfiguration.ts index d2fe5d11fec..23ad043b2b2 100644 --- a/src/vs/workbench/contrib/terminal/common/terminalConfiguration.ts +++ b/src/vs/workbench/contrib/terminal/common/terminalConfiguration.ts @@ -397,6 +397,11 @@ export const terminalConfiguration: IConfigurationNode = { description: localize('terminal.integrated.enablePersistentSessions', "Experimental: persist terminal sessions for the workspace across window reloads. Currently only supported in VS Code Remote workspaces."), type: 'boolean', default: true + }, + 'terminal.integrated.flowControl': { + description: localize('terminal.integrated.flowControl', "Experimental: whether to enable flow control which will slow the program on the remote side to avoid flooding remote connections with terminal output. This setting has no effect for local terminals and terminals where the output/input is controlled by an extension. Changing this will only affect new terminals."), + type: 'boolean', + default: false } } }; diff --git a/src/vs/workbench/contrib/terminal/node/terminalProcess.ts b/src/vs/workbench/contrib/terminal/node/terminalProcess.ts index e0abbb31873..4d7816c1ea9 100644 --- a/src/vs/workbench/contrib/terminal/node/terminalProcess.ts +++ b/src/vs/workbench/contrib/terminal/node/terminalProcess.ts @@ -11,7 +11,7 @@ import * as os from 'os'; import { Event, Emitter } from 'vs/base/common/event'; import { getWindowsBuildNumber } from 'vs/workbench/contrib/terminal/node/terminal'; import { Disposable } from 'vs/base/common/lifecycle'; -import { IShellLaunchConfig, ITerminalChildProcess, ITerminalLaunchError } from 'vs/workbench/contrib/terminal/common/terminal'; +import { IShellLaunchConfig, ITerminalChildProcess, ITerminalDimensionsOverride, ITerminalLaunchError, FlowControlConstants } from 'vs/workbench/contrib/terminal/common/terminal'; import { exec } from 'child_process'; import { ILogService } from 'vs/platform/log/common/log'; import { stat } from 'vs/base/node/pfs'; @@ -41,6 +41,9 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess private readonly _initialCwd: string; private readonly _ptyOptions: pty.IPtyForkOptions | pty.IWindowsPtyForkOptions; + private _isPtyPaused: boolean = false; + private _unacknowledgedCharCount: number = 0; + public get exitMessage(): string | undefined { return this._exitMessage; } private readonly _onProcessData = this._register(new Emitter()); @@ -98,6 +101,8 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess })); } } + onProcessOverrideDimensions?: Event | undefined; + onProcessResolvedShellLaunchConfig?: Event | undefined; public async start(): Promise { const results = await Promise.all([this._validateCwd(), this._validateExecutable()]); @@ -162,6 +167,15 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess this.onProcessReady(() => c()); }); ptyProcess.onData(data => { + if (this._shellLaunchConfig.flowControl) { + this._unacknowledgedCharCount += data.length; + if (!this._isPtyPaused && this._unacknowledgedCharCount > FlowControlConstants.HighWatermarkChars) { + this._logService.trace(`Flow control: Pause (${this._unacknowledgedCharCount} > ${FlowControlConstants.HighWatermarkChars})`); + this._isPtyPaused = true; + // TODO: Expose as public API in node-pty + (ptyProcess as any).pause(); + } + } this._onProcessData.fire(data); if (this._closeTimeout) { clearTimeout(this._closeTimeout); @@ -324,6 +338,21 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess } } + public acknowledgeDataEvent(charCount: number): void { + if (!this._shellLaunchConfig.flowControl) { + return; + } + // Prevent lower than 0 to heal from errors + this._unacknowledgedCharCount = Math.max(this._unacknowledgedCharCount - charCount, 0); + this._logService.trace(`Flow control: Ack ${charCount} chars (unacknowledged: ${this._unacknowledgedCharCount})`); + if (this._isPtyPaused && this._unacknowledgedCharCount < FlowControlConstants.LowWatermarkChars) { + this._logService.trace(`Flow control: Resume (${this._unacknowledgedCharCount} < ${FlowControlConstants.LowWatermarkChars})`); + // TODO: Expose as public API in node-pty + (this._ptyProcess as any).resume(); + this._isPtyPaused = false; + } + } + public getInitialCwd(): Promise { return Promise.resolve(this._initialCwd); }