remote - merge two latency measurements into one (#184566)

* remote - remove original latency measurement

* remote - log high latency events to logger and telemetry
This commit is contained in:
Benjamin Pasero
2023-06-08 08:56:35 +02:00
committed by GitHub
parent e686621e2e
commit 70daab796e
5 changed files with 44 additions and 234 deletions
+1 -190
View File
@@ -3,7 +3,6 @@
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
import { IntervalTimer } from 'vs/base/common/async';
import { VSBuffer } from 'vs/base/common/buffer';
import { Emitter, Event } from 'vs/base/common/event';
import { Disposable, DisposableStore, IDisposable } from 'vs/base/common/lifecycle';
@@ -264,9 +263,7 @@ const enum ProtocolMessageType {
ReplayRequest = 6,
Pause = 7,
Resume = 8,
KeepAlive = 9,
LatencyMeasurementRequest = 10,
LatencyMeasurementResponse = 11,
KeepAlive = 9
}
function protocolMessageTypeToString(messageType: ProtocolMessageType) {
@@ -280,8 +277,6 @@ function protocolMessageTypeToString(messageType: ProtocolMessageType) {
case ProtocolMessageType.Pause: return 'PauseWriting';
case ProtocolMessageType.Resume: return 'ResumeWriting';
case ProtocolMessageType.KeepAlive: return 'KeepAlive';
case ProtocolMessageType.LatencyMeasurementRequest: return 'LatencyMeasurementRequest';
case ProtocolMessageType.LatencyMeasurementResponse: return 'LatencyMeasurementResponse';
}
}
@@ -309,22 +304,6 @@ export const enum ProtocolConstants {
* Send a message every 5 seconds to avoid that the connection is closed by the OS.
*/
KeepAliveSendTime = 5000, // 5 seconds
/**
* Measure the latency every 1 minute.
*/
LatencySampleTime = 1 * 60 * 1000, // 1 minute
/**
* Keep the last 5 samples for latency measurement.
*/
LatencySampleCount = 5,
/**
* A latency over 1s will be considered high.
*/
HighLatencyTimeThreshold = 1000,
/**
* Having 3 or more samples with high latency will trigger a high latency event.
*/
HighLatencySampleThreshold = 3,
}
class ProtocolMessage {
@@ -803,52 +782,6 @@ export interface ILoadEstimator {
hasHighLoad(): boolean;
}
export const enum ConnectionHealth {
/**
* The connection health is considered good when a certain number of recent round trip time measurements are below a certain threshold.
* @see ProtocolConstants.HighLatencyTimeThreshold @see ProtocolConstants.HighLatencySampleThreshold
*/
Good,
/**
* The connection health is considered poor when a certain number of recent round trip time measurements are above a certain threshold.
* @see ProtocolConstants.HighLatencyTimeThreshold @see ProtocolConstants.HighLatencySampleThreshold
*/
Poor
}
export function connectionHealthToString(connectionHealth: ConnectionHealth): 'good' | 'poor' {
switch (connectionHealth) {
case ConnectionHealth.Good: return 'good';
case ConnectionHealth.Poor: return 'poor';
}
}
/**
* An event describing that the connection health has changed.
*/
export class ConnectionHealthChangedEvent {
constructor(
public readonly connectionHealth: ConnectionHealth
) { }
}
/**
* An event describing that a round trip time measurement was above a certain threshold.
*/
export class HighRoundTripTimeEvent {
constructor(
/**
* The round trip time in milliseconds.
*/
public readonly roundTripTime: number,
/**
* The number of recent round trip time measurements that were above the threshold.
* @see ProtocolConstants.HighLatencyTimeThreshold @see ProtocolConstants.HighLatencySampleThreshold
*/
public readonly recentHighRoundTripCount: number
) { }
}
export interface PersistentProtocolOptions {
/**
* The socket to use.
@@ -862,10 +795,6 @@ export interface PersistentProtocolOptions {
* The CPU load estimator to use.
*/
loadEstimator?: ILoadEstimator;
/**
* Whether to measure round trip time. Defaults to false.
*/
measureRoundTripTime?: boolean;
/**
* Whether to send keep alive messages. Defaults to true.
*/
@@ -898,11 +827,9 @@ export class PersistentProtocol implements IMessagePassingProtocol {
private _socket: ISocket;
private _socketWriter: ProtocolWriter;
private _socketReader: ProtocolReader;
private _socketLatencyMonitor: LatencyMonitor;
private _socketDisposables: DisposableStore;
private readonly _loadEstimator: ILoadEstimator;
private readonly _measureRoundTripTime: boolean;
private readonly _shouldSendKeepAlive: boolean;
private readonly _onControlMessage = new BufferedEmitter<VSBuffer>();
@@ -920,19 +847,12 @@ export class PersistentProtocol implements IMessagePassingProtocol {
private readonly _onSocketTimeout = new BufferedEmitter<SocketTimeoutEvent>();
readonly onSocketTimeout: Event<SocketTimeoutEvent> = this._onSocketTimeout.event;
private readonly _onHighRoundTripTime = new BufferedEmitter<HighRoundTripTimeEvent>();
readonly onHighRoundTripTime = this._onHighRoundTripTime.event;
private readonly _onDidChangeConnectionHealth = new BufferedEmitter<ConnectionHealth>();
readonly onDidChangeConnectionHealth = this._onDidChangeConnectionHealth.event;
public get unacknowledgedCount(): number {
return this._outgoingMsgId - this._outgoingAckId;
}
constructor(opts: PersistentProtocolOptions) {
this._loadEstimator = opts.loadEstimator ?? LoadEstimator.getInstance();
this._measureRoundTripTime = opts.measureRoundTripTime ?? false;
this._shouldSendKeepAlive = opts.sendKeepAlive ?? true;
this._isReconnecting = false;
this._outgoingUnackMsg = new Queue<ProtocolMessage>();
@@ -954,13 +874,6 @@ export class PersistentProtocol implements IMessagePassingProtocol {
this._socketReader = this._socketDisposables.add(new ProtocolReader(this._socket));
this._socketDisposables.add(this._socketReader.onMessage(msg => this._receiveMessage(msg)));
this._socketDisposables.add(this._socket.onClose(e => this._onSocketClose.fire(e)));
this._socketLatencyMonitor = this._socketDisposables.add(new LatencyMonitor()); // is started immediately
this._socketDisposables.add(this._socketLatencyMonitor.onSendLatencyRequest(buffer => this._sendLatencyMeasurementRequest(buffer)));
this._socketDisposables.add(this._socketLatencyMonitor.onHighRoundTripTime(e => this._onHighRoundTripTime.fire(e)));
this._socketDisposables.add(this._socketLatencyMonitor.onDidChangeConnectionHealth(e => this._onDidChangeConnectionHealth.fire(e)));
if (this._measureRoundTripTime) {
this._socketLatencyMonitor.start();
}
if (opts.initialChunk) {
this._socketReader.acceptChunk(opts.initialChunk);
@@ -1041,19 +954,12 @@ export class PersistentProtocol implements IMessagePassingProtocol {
this._socketReader = this._socketDisposables.add(new ProtocolReader(this._socket));
this._socketDisposables.add(this._socketReader.onMessage(msg => this._receiveMessage(msg)));
this._socketDisposables.add(this._socket.onClose(e => this._onSocketClose.fire(e)));
this._socketLatencyMonitor = this._socketDisposables.add(new LatencyMonitor()); // will be started later
this._socketDisposables.add(this._socketLatencyMonitor.onSendLatencyRequest(buffer => this._sendLatencyMeasurementRequest(buffer)));
this._socketDisposables.add(this._socketLatencyMonitor.onHighRoundTripTime(e => this._onHighRoundTripTime.fire(e)));
this._socketDisposables.add(this._socketLatencyMonitor.onDidChangeConnectionHealth(e => this._onDidChangeConnectionHealth.fire(e)));
this._socketReader.acceptChunk(initialDataChunk);
}
public endAcceptReconnection(): void {
this._isReconnecting = false;
if (this._measureRoundTripTime) {
this._socketLatencyMonitor.start();
}
// After a reconnection, let the other party know (again) which messages have been received.
// (perhaps the other party didn't receive a previous ACK)
@@ -1144,15 +1050,6 @@ export class PersistentProtocol implements IMessagePassingProtocol {
// nothing to do
break;
}
case ProtocolMessageType.LatencyMeasurementRequest: {
// we just send the data back
this._sendLatencyMeasurementResponse(msg.data);
break;
}
case ProtocolMessageType.LatencyMeasurementResponse: {
this._socketLatencyMonitor.handleResponse(msg.data);
break;
}
}
}
@@ -1282,92 +1179,6 @@ export class PersistentProtocol implements IMessagePassingProtocol {
const msg = new ProtocolMessage(ProtocolMessageType.KeepAlive, 0, this._incomingAckId, getEmptyBuffer());
this._socketWriter.write(msg);
}
private _sendLatencyMeasurementRequest(buffer: VSBuffer): void {
this._incomingAckId = this._incomingMsgId;
const msg = new ProtocolMessage(ProtocolMessageType.LatencyMeasurementRequest, 0, this._incomingAckId, buffer);
this._socketWriter.write(msg);
}
private _sendLatencyMeasurementResponse(buffer: VSBuffer): void {
this._incomingAckId = this._incomingMsgId;
const msg = new ProtocolMessage(ProtocolMessageType.LatencyMeasurementResponse, 0, this._incomingAckId, buffer);
this._socketWriter.write(msg);
}
}
class LatencyMonitor extends Disposable {
private readonly _onSendLatencyRequest = this._register(new Emitter<VSBuffer>());
readonly onSendLatencyRequest: Event<VSBuffer> = this._onSendLatencyRequest.event;
private readonly _onHighRoundTripTime = this._register(new Emitter<HighRoundTripTimeEvent>());
public readonly onHighRoundTripTime = this._onHighRoundTripTime.event;
private readonly _onDidChangeConnectionHealth = this._register(new Emitter<ConnectionHealth>());
public readonly onDidChangeConnectionHealth = this._onDidChangeConnectionHealth.event;
private readonly _measureLatencyTimer = this._register(new IntervalTimer());
/**
* Timestamp of our last latency request message sent to the other host.
*/
private _lastLatencyMeasurementSent: number = -1;
/**
* ID separate from the regular message IDs. Used to match up latency
* requests with responses so we know we're timing the right message
* even if a reconnection occurs.
*/
private _lastLatencyMeasurementId: number = 0;
/**
* Circular buffer of latency measurements
*/
private _latencySamples: number[] = Array.from({ length: ProtocolConstants.LatencySampleCount }, (_) => 0);
private _latencySampleIndex: number = 0;
private _connectionHealth = ConnectionHealth.Good;
constructor() {
super();
}
public start(): void {
this._measureLatencyTimer.cancelAndSet(() => {
this._lastLatencyMeasurementSent = Date.now();
const measurementId = ++this._lastLatencyMeasurementId;
const buffer = VSBuffer.alloc(4);
buffer.writeUInt32BE(measurementId, 0);
this._onSendLatencyRequest.fire(buffer);
}, ProtocolConstants.LatencySampleTime);
}
public handleResponse(buffer: VSBuffer): void {
if (buffer.byteLength !== 4) {
// invalid measurementId
return;
}
const measurementId = buffer.readUInt32BE(0);
if (this._lastLatencyMeasurementSent <= 0 || measurementId !== this._lastLatencyMeasurementId) {
// invalid measurementId
return;
}
const roundtripTime = Date.now() - this._lastLatencyMeasurementSent;
const sampleIndex = this._latencySampleIndex++;
this._latencySamples[sampleIndex % this._latencySamples.length] = roundtripTime;
const previousConnectionHealth = this._connectionHealth;
const highLatencySampleCount = this._latencySamples.filter(s => s >= ProtocolConstants.HighLatencyTimeThreshold).length;
this._connectionHealth = (highLatencySampleCount >= ProtocolConstants.HighLatencySampleThreshold ? ConnectionHealth.Poor : ConnectionHealth.Good);
if (roundtripTime > ProtocolConstants.HighLatencyTimeThreshold) {
this._onHighRoundTripTime.fire(new HighRoundTripTimeEvent(roundtripTime, highLatencySampleCount));
}
if (previousConnectionHealth !== this._connectionHealth) {
this._onDidChangeConnectionHealth.fire(this._connectionHealth);
}
}
}
// (() => {
@@ -13,7 +13,7 @@ import * as performance from 'vs/base/common/performance';
import { StopWatch } from 'vs/base/common/stopwatch';
import { generateUuid } from 'vs/base/common/uuid';
import { IIPCLogger } from 'vs/base/parts/ipc/common/ipc';
import { Client, ConnectionHealth, ISocket, PersistentProtocol, ProtocolConstants, SocketCloseEventType } from 'vs/base/parts/ipc/common/ipc.net';
import { Client, ISocket, PersistentProtocol, SocketCloseEventType } from 'vs/base/parts/ipc/common/ipc.net';
import { ILogService } from 'vs/platform/log/common/log';
import { RemoteAgentConnectionContext } from 'vs/platform/remote/common/remoteAgentEnvironment';
import { RemoteAuthorityResolverError, RemoteConnection } from 'vs/platform/remote/common/remoteAuthorityResolver';
@@ -250,7 +250,7 @@ async function connectToRemoteExtensionHostAgent<T extends RemoteConnection>(opt
protocol = options.reconnectionProtocol;
ownsProtocol = false;
} else {
protocol = new PersistentProtocol({ socket, measureRoundTripTime: true });
protocol = new PersistentProtocol({ socket });
ownsProtocol = true;
}
@@ -482,8 +482,7 @@ export const enum PersistentConnectionEventType {
ReconnectionWait,
ReconnectionRunning,
ReconnectionPermanentFailure,
ConnectionGain,
ConnectionHealthChanged
ConnectionGain
}
export class ConnectionLostEvent {
public readonly type = PersistentConnectionEventType.ConnectionLost;
@@ -521,13 +520,6 @@ export class ConnectionGainEvent {
public readonly attempt: number
) { }
}
export class ConnectionHealthChangedEvent {
public readonly type = PersistentConnectionEventType.ConnectionHealthChanged;
constructor(
public readonly reconnectionToken: string,
public readonly connectionHealth: ConnectionHealth
) { }
}
export class ReconnectionPermanentFailureEvent {
public readonly type = PersistentConnectionEventType.ReconnectionPermanentFailure;
constructor(
@@ -537,7 +529,7 @@ export class ReconnectionPermanentFailureEvent {
public readonly handled: boolean
) { }
}
export type PersistentConnectionEvent = ConnectionGainEvent | ConnectionHealthChangedEvent | ConnectionLostEvent | ReconnectionWaitEvent | ReconnectionRunningEvent | ReconnectionPermanentFailureEvent;
export type PersistentConnectionEvent = ConnectionGainEvent | ConnectionLostEvent | ReconnectionWaitEvent | ReconnectionRunningEvent | ReconnectionPermanentFailureEvent;
export abstract class PersistentConnection extends Disposable {
@@ -607,13 +599,6 @@ export abstract class PersistentConnection extends Disposable {
this._options.logService.info(`${logPrefix} received socket timeout event (unacknowledgedMsgCount: ${e.unacknowledgedMsgCount}, timeSinceOldestUnacknowledgedMsg: ${e.timeSinceOldestUnacknowledgedMsg}, timeSinceLastReceivedSomeData: ${e.timeSinceLastReceivedSomeData}).`);
this._beginReconnecting();
}));
this._register(protocol.onHighRoundTripTime((e) => {
const logPrefix = _commonLogPrefix(this._connectionType, this.reconnectionToken);
this._options.logService.info(`${logPrefix} high roundtrip time: ${e.roundTripTime}ms (${e.recentHighRoundTripCount} of ${ProtocolConstants.LatencySampleCount} recent samples)`);
}));
this._register(protocol.onDidChangeConnectionHealth((connectionHealth) => {
this._onDidStateChange.fire(new ConnectionHealthChangedEvent(this.reconnectionToken, connectionHealth));
}));
PersistentConnection._instances.push(this);
this._register(toDisposable(() => {
@@ -53,7 +53,6 @@ import { ILogService } from 'vs/platform/log/common/log';
import { ITimerService } from 'vs/workbench/services/timer/browser/timerService';
import { getRemoteName } from 'vs/platform/remote/common/remoteHosts';
import { IActionViewItem } from 'vs/base/browser/ui/actionbar/actionbar';
import { connectionHealthToString } from 'vs/base/parts/ipc/common/ipc.net';
import { getVirtualWorkspaceLocation } from 'vs/platform/workspace/common/virtualWorkspace';
import { IJSONSchema } from 'vs/base/common/jsonSchema';
import { IWalkthroughsService } from 'vs/workbench/contrib/welcomeGettingStarted/browser/gettingStartedService';
@@ -1090,26 +1089,6 @@ export class RemoteAgentConnectionStatusListener extends Disposable implements I
hideProgress();
break;
case PersistentConnectionEventType.ConnectionHealthChanged:
type RemoteConnectionHealthClassification = {
owner: 'alexdima';
comment: 'The remote connection health has changed (round trip time)';
remoteName: { classification: 'SystemMetaData'; purpose: 'PerformanceAndHealth'; comment: 'The name of the resolver.' };
reconnectionToken: { classification: 'SystemMetaData'; purpose: 'PerformanceAndHealth'; comment: 'The identifier of the connection.' };
connectionHealth: { classification: 'SystemMetaData'; purpose: 'PerformanceAndHealth'; comment: 'The health of the connection: good or poor.' };
};
type RemoteConnectionHealthEvent = {
remoteName: string | undefined;
reconnectionToken: string;
connectionHealth: 'good' | 'poor';
};
telemetryService.publicLog2<RemoteConnectionHealthEvent, RemoteConnectionHealthClassification>('remoteConnectionHealth', {
remoteName: getRemoteName(environmentService.remoteAuthority),
reconnectionToken: e.reconnectionToken,
connectionHealth: connectionHealthToString(e.connectionHealth)
});
break;
}
});
}
@@ -86,6 +86,7 @@ export class RemoteStatusIndicator extends Disposable implements IWorkbenchContr
private virtualWorkspaceLocation: { scheme: string; authority: string } | undefined = undefined;
private connectionState: 'initializing' | 'connected' | 'reconnecting' | 'disconnected' | undefined = undefined;
private connectionToken: string | undefined = undefined;
private readonly connectionStateContextKey = new RawContextKey<'' | 'initializing' | 'disconnected' | 'connected'>('remoteConnectionState', '').bindTo(this.contextKeyService);
private networkState: 'online' | 'offline' | 'high-latency' | undefined = undefined;
@@ -267,7 +268,8 @@ export class RemoteStatusIndicator extends Disposable implements IWorkbenchContr
// Try to resolve the authority to figure out connection state
(async () => {
try {
await this.remoteAuthorityResolverService.resolveAuthority(remoteAuthority);
const { authority } = await this.remoteAuthorityResolverService.resolveAuthority(remoteAuthority);
this.connectionToken = authority.connectionToken;
this.setConnectionState('connected');
} catch (error) {
@@ -302,8 +304,8 @@ export class RemoteStatusIndicator extends Disposable implements IWorkbenchContr
private scheduleMeasureNetworkConnectionLatency(): void {
if (
!this.remoteAuthority || // only when having a remote connection
this.measureNetworkConnectionLatencyScheduler // already scheduled
!this.remoteAuthority || // only when having a remote connection
this.measureNetworkConnectionLatencyScheduler // already scheduled
) {
return;
}
@@ -334,13 +336,46 @@ export class RemoteStatusIndicator extends Disposable implements IWorkbenchContr
private setNetworkState(newState: 'online' | 'offline' | 'high-latency'): void {
if (this.networkState !== newState) {
const oldState = this.networkState;
this.networkState = newState;
if (newState === 'high-latency') {
this.logService.warn(`Remote network connection appears to have high latency (${remoteConnectionLatencyMeasurer.latency?.current?.toFixed(2)}ms last, ${remoteConnectionLatencyMeasurer.latency?.average?.toFixed(2)}ms average)`);
}
if (this.connectionToken) {
if (newState === 'online' && oldState === 'high-latency') {
this.logNetworkConnectionHealthTelemetry(this.connectionToken, 'good');
} else if (newState === 'high-latency' && oldState === 'online') {
this.logNetworkConnectionHealthTelemetry(this.connectionToken, 'poor');
}
}
// update status
this.updateRemoteStatusIndicator();
}
}
private logNetworkConnectionHealthTelemetry(connectionToken: string, connectionHealth: 'good' | 'poor'): void {
type RemoteConnectionHealthClassification = {
owner: 'alexdima';
comment: 'The remote connection health has changed (round trip time)';
remoteName: { classification: 'SystemMetaData'; purpose: 'PerformanceAndHealth'; comment: 'The name of the resolver.' };
reconnectionToken: { classification: 'SystemMetaData'; purpose: 'PerformanceAndHealth'; comment: 'The identifier of the connection.' };
connectionHealth: { classification: 'SystemMetaData'; purpose: 'PerformanceAndHealth'; comment: 'The health of the connection: good or poor.' };
};
type RemoteConnectionHealthEvent = {
remoteName: string | undefined;
reconnectionToken: string;
connectionHealth: 'good' | 'poor';
};
this.telemetryService.publicLog2<RemoteConnectionHealthEvent, RemoteConnectionHealthClassification>('remoteConnectionHealth', {
remoteName: getRemoteName(this.remoteAuthority),
reconnectionToken: connectionToken,
connectionHealth
});
}
private validatedGroup(group: string) {
if (!group.match(/^(remote|virtualfs)_(\d\d)_(([a-z][a-z0-9+.-]*)_(.*))$/)) {
if (!this.loggedInvalidGroupNames[group]) {
@@ -232,7 +232,7 @@ const workbenchContributionsRegistry = Registry.as<IWorkbenchContributionsRegist
workbenchContributionsRegistry.registerWorkbenchContribution(LabelContribution, LifecyclePhase.Starting);
workbenchContributionsRegistry.registerWorkbenchContribution(RemoteChannelsContribution, LifecyclePhase.Restored);
workbenchContributionsRegistry.registerWorkbenchContribution(RemoteInvalidWorkspaceDetector, LifecyclePhase.Starting);
workbenchContributionsRegistry.registerWorkbenchContribution(InitialRemoteConnectionHealthContribution, LifecyclePhase.Ready);
workbenchContributionsRegistry.registerWorkbenchContribution(InitialRemoteConnectionHealthContribution, LifecyclePhase.Restored);
const enableDiagnostics = true;