diff --git a/src/vs/platform/agentHost/common/state/jsonSerialization.ts b/src/vs/platform/agentHost/common/state/jsonSerialization.ts index aeb12524c27..356128af148 100644 --- a/src/vs/platform/agentHost/common/state/jsonSerialization.ts +++ b/src/vs/platform/agentHost/common/state/jsonSerialization.ts @@ -10,22 +10,19 @@ import { URI } from '../../../../base/common/uri.js'; /** - * JSON.stringify replacer that serializes {@link URI} instances and Maps - * into a revivable format. + * JSON.stringify replacer that serializes {@link URI} instances + * into a wire-friendly format. */ export function protocolReplacer(_key: string, value: unknown): unknown { if (value instanceof URI) { return value.toJSON(); } - if (value instanceof Map) { - return { $type: 'Map', entries: [...value.entries()] }; - } return value; } /** - * JSON.parse reviver that restores {@link URI} instances and Maps from - * their serialized format. + * JSON.parse reviver that restores {@link URI} instances from their + * serialized format. */ export function protocolReviver(_key: string, value: unknown): unknown { if (value && typeof value === 'object') { @@ -33,9 +30,6 @@ export function protocolReviver(_key: string, value: unknown): unknown { if (obj.$mid === 1) { return URI.revive(value as URI); } - if (obj.$type === 'Map' && Array.isArray(obj.entries)) { - return new Map(obj.entries as [unknown, unknown][]); - } } return value; } diff --git a/src/vs/platform/agentHost/common/state/sessionActions.ts b/src/vs/platform/agentHost/common/state/sessionActions.ts index 21a41879e14..a8b33f048ab 100644 --- a/src/vs/platform/agentHost/common/state/sessionActions.ts +++ b/src/vs/platform/agentHost/common/state/sessionActions.ts @@ -44,10 +44,11 @@ export interface IActionEnvelope { */ readonly origin: IActionOrigin | undefined; /** - * Set to `true` when the server rejected the command that produced this - * action. The client should revert its optimistic prediction. + * When present, indicates the server rejected the action. The client + * should revert its optimistic prediction. Contains a human-readable + * explanation (e.g. `"no active turn to cancel"`). */ - readonly rejected?: true; + readonly rejectionReason?: string; } export interface IActionOrigin { diff --git a/src/vs/platform/agentHost/common/state/sessionClientState.ts b/src/vs/platform/agentHost/common/state/sessionClientState.ts index 3d26433161d..de1a5333821 100644 --- a/src/vs/platform/agentHost/common/state/sessionClientState.ts +++ b/src/vs/platform/agentHost/common/state/sessionClientState.ts @@ -181,7 +181,7 @@ export class SessionClientState extends Disposable { const headIdx = this._pendingActions.findIndex(p => p.clientSeq === origin.clientSeq); if (headIdx !== -1) { - if (envelope.rejected) { + if (envelope.rejectionReason) { this._pendingActions.splice(headIdx, 1); } else { this._applyToConfirmed(envelope.action); diff --git a/src/vs/platform/agentHost/common/state/sessionProtocol.ts b/src/vs/platform/agentHost/common/state/sessionProtocol.ts index 9a2380278b2..3db9024a3b4 100644 --- a/src/vs/platform/agentHost/common/state/sessionProtocol.ts +++ b/src/vs/platform/agentHost/common/state/sessionProtocol.ts @@ -7,12 +7,12 @@ // See protocol.md for the full design. // // Client → Server messages are either: -// - Notifications (fire-and-forget): initialize, reconnect, unsubscribe, dispatchAction -// - Requests (expect a correlated response): subscribe, createSession, disposeSession, -// listSessions, fetchTurns, fetchContent +// - Notifications (fire-and-forget): unsubscribe, dispatchAction +// - Requests (expect a correlated response): initialize, reconnect, subscribe, +// createSession, disposeSession, listSessions, fetchTurns, fetchContent // // Server → Client messages are either: -// - Notifications (pushed to clients): serverHello, reconnectResponse, action, notification +// - Notifications (pushed to clients): action, notification // - Responses (correlated to a client request by id) import { hasKey } from '../../../../base/common/types.js'; @@ -79,6 +79,24 @@ export function isJsonRpcResponse(msg: IProtocolMessage): msg is IJsonRpcRespons export const JSON_RPC_PARSE_ERROR = -32700; export const JSON_RPC_INTERNAL_ERROR = -32603; +// ---- AHP application error codes ------------------------------------------- + +export const AHP_SESSION_NOT_FOUND = -32001; +export const AHP_PROVIDER_NOT_FOUND = -32002; +export const AHP_SESSION_ALREADY_EXISTS = -32003; +export const AHP_TURN_IN_PROGRESS = -32004; +export const AHP_UNSUPPORTED_PROTOCOL_VERSION = -32005; +export const AHP_CONTENT_NOT_FOUND = -32006; + +/** + * Error with a JSON-RPC error code for protocol-level failures. + */ +export class ProtocolError extends Error { + constructor(readonly code: number, message: string) { + super(message); + } +} + // ---- Shared data types ------------------------------------------------------ /** State snapshot returned by subscribe and included in handshake/reconnect. */ @@ -90,18 +108,6 @@ export interface IStateSnapshot { // ---- Client → Server: Notification params ----------------------------------- -export interface IInitializeParams { - readonly protocolVersion: number; - readonly clientId: string; - readonly initialSubscriptions?: readonly URI[]; -} - -export interface IReconnectParams { - readonly clientId: string; - readonly lastSeenServerSeq: number; - readonly subscriptions: readonly URI[]; -} - export interface IUnsubscribeParams { readonly resource: URI; } @@ -113,6 +119,38 @@ export interface IDispatchActionParams { // ---- Client → Server: Request params and results ---------------------------- +export interface IInitializeParams { + readonly protocolVersion: number; + readonly clientId: string; + readonly initialSubscriptions?: readonly URI[]; +} + +export interface IInitializeResult { + readonly protocolVersion: number; + readonly serverSeq: number; + readonly snapshots: readonly IStateSnapshot[]; +} + +export interface IReconnectParams { + readonly clientId: string; + readonly lastSeenServerSeq: number; + readonly subscriptions: readonly URI[]; +} + +export type IReconnectResult = + | IReconnectReplayResult + | IReconnectSnapshotResult; + +export interface IReconnectReplayResult { + readonly type: 'replay'; + readonly actions: readonly IActionEnvelope[]; +} + +export interface IReconnectSnapshotResult { + readonly type: 'snapshot'; + readonly snapshots: readonly IStateSnapshot[]; +} + export interface ISubscribeParams { readonly resource: URI; } @@ -142,15 +180,13 @@ export interface IListSessionsResult { export interface IFetchTurnsParams { readonly session: URI; - readonly startTurn: number; - readonly count: number; + readonly before?: string; + readonly limit?: number; } export interface IFetchTurnsResult { - readonly session: URI; - readonly startTurn: number; readonly turns: ISessionState['turns']; - readonly totalTurns: number; + readonly hasMore: boolean; } export interface IFetchContentParams { @@ -158,24 +194,13 @@ export interface IFetchContentParams { } export interface IFetchContentResult { - readonly uri: URI; - readonly data: string; // base64-encoded for binary safety + readonly data: string; + readonly encoding: 'base64' | 'utf-8'; readonly mimeType?: string; } // ---- Server → Client: Notification params ----------------------------------- -export interface IServerHelloParams { - readonly protocolVersion: number; - readonly serverSeq: number; - readonly snapshots: readonly IStateSnapshot[]; -} - -export interface IReconnectResponseParams { - readonly serverSeq: number; - readonly snapshots: readonly IStateSnapshot[]; -} - export interface IActionBroadcastParams { readonly envelope: IActionEnvelope; } diff --git a/src/vs/platform/agentHost/common/state/sessionReducers.ts b/src/vs/platform/agentHost/common/state/sessionReducers.ts index f0799879b51..400f51a2d21 100644 --- a/src/vs/platform/agentHost/common/state/sessionReducers.ts +++ b/src/vs/platform/agentHost/common/state/sessionReducers.ts @@ -20,7 +20,6 @@ import { type IErrorInfo, type IRootState, type ISessionState, - type IToolCallState, type ITurn, createActiveTurn, SessionLifecycle, @@ -100,50 +99,49 @@ export function sessionReducer(state: ISessionState, action: ISessionAction): IS if (!state.activeTurn || state.activeTurn.id !== action.turnId) { return state; } - const toolCalls = new Map(state.activeTurn.toolCalls); - toolCalls.set(action.toolCall.toolCallId, action.toolCall); return { ...state, - activeTurn: { ...state.activeTurn, toolCalls }, + activeTurn: { + ...state.activeTurn, + toolCalls: { ...state.activeTurn.toolCalls, [action.toolCall.toolCallId]: action.toolCall }, + }, }; } case 'session/toolComplete': { if (!state.activeTurn || state.activeTurn.id !== action.turnId) { return state; } - const toolCall = state.activeTurn.toolCalls.get(action.toolCallId); + const toolCall = state.activeTurn.toolCalls[action.toolCallId]; if (!toolCall) { return state; } - const toolCalls = new Map(state.activeTurn.toolCalls); - toolCalls.set(action.toolCallId, { - ...toolCall, - status: action.result.success ? ToolCallStatus.Completed : ToolCallStatus.Failed, - pastTenseMessage: action.result.pastTenseMessage, - toolOutput: action.result.toolOutput, - error: action.result.error, - }); return { ...state, - activeTurn: { ...state.activeTurn, toolCalls }, + activeTurn: { + ...state.activeTurn, + toolCalls: { + ...state.activeTurn.toolCalls, + [action.toolCallId]: { + ...toolCall, + status: action.result.success ? ToolCallStatus.Completed : ToolCallStatus.Failed, + pastTenseMessage: action.result.pastTenseMessage, + toolOutput: action.result.toolOutput, + error: action.result.error, + }, + }, + }, }; } case 'session/permissionRequest': { if (!state.activeTurn || state.activeTurn.id !== action.turnId) { return state; } - const pendingPermissions = new Map(state.activeTurn.pendingPermissions); - pendingPermissions.set(action.request.requestId, action.request); - let toolCalls: ReadonlyMap = state.activeTurn.toolCalls; + const pendingPermissions = { ...state.activeTurn.pendingPermissions, [action.request.requestId]: action.request }; + let toolCalls = state.activeTurn.toolCalls; if (action.request.toolCallId) { - const toolCall = toolCalls.get(action.request.toolCallId); + const toolCall = toolCalls[action.request.toolCallId]; if (toolCall) { - const mutable = new Map(toolCalls); - mutable.set(action.request.toolCallId, { - ...toolCall, - status: ToolCallStatus.PendingPermission, - }); - toolCalls = mutable; + toolCalls = { ...toolCalls, [action.request.toolCallId]: { ...toolCall, status: ToolCallStatus.PendingPermission } }; } } return { @@ -155,21 +153,21 @@ export function sessionReducer(state: ISessionState, action: ISessionAction): IS if (!state.activeTurn || state.activeTurn.id !== action.turnId) { return state; } - const pendingPermissions = new Map(state.activeTurn.pendingPermissions); - const resolved = pendingPermissions.get(action.requestId); - pendingPermissions.delete(action.requestId); - let toolCalls: ReadonlyMap = state.activeTurn.toolCalls; + const resolved = state.activeTurn.pendingPermissions[action.requestId]; + const { [action.requestId]: _, ...pendingPermissions } = state.activeTurn.pendingPermissions; + let toolCalls = state.activeTurn.toolCalls; if (resolved?.toolCallId) { - const toolCall = toolCalls.get(resolved.toolCallId); + const toolCall = toolCalls[resolved.toolCallId]; if (toolCall && toolCall.status === ToolCallStatus.PendingPermission) { - const mutable = new Map(toolCalls); - mutable.set(resolved.toolCallId, { - ...toolCall, - status: action.approved ? ToolCallStatus.Running : ToolCallStatus.Cancelled, - confirmed: action.approved ? 'user-action' : 'denied', - cancellationReason: action.approved ? undefined : 'denied', - }); - toolCalls = mutable; + toolCalls = { + ...toolCalls, + [resolved.toolCallId]: { + ...toolCall, + status: action.approved ? ToolCallStatus.Running : ToolCallStatus.Cancelled, + confirmed: action.approved ? 'user-action' : 'denied', + cancellationReason: action.approved ? undefined : 'denied', + }, + }; } } return { @@ -237,7 +235,7 @@ function finalizeTurn(state: ISessionState, turnId: string, turnState: TurnState const active = state.activeTurn; const completedToolCalls: ICompletedToolCall[] = []; - for (const tc of active.toolCalls.values()) { + for (const tc of Object.values(active.toolCalls)) { completedToolCalls.push({ toolCallId: tc.toolCallId, toolName: tc.toolName, diff --git a/src/vs/platform/agentHost/common/state/sessionState.ts b/src/vs/platform/agentHost/common/state/sessionState.ts index 85772454df4..c6872559708 100644 --- a/src/vs/platform/agentHost/common/state/sessionState.ts +++ b/src/vs/platform/agentHost/common/state/sessionState.ts @@ -138,8 +138,8 @@ export interface IActiveTurn { readonly userMessage: IUserMessage; readonly streamingText: string; readonly responseParts: readonly IResponsePart[]; - readonly toolCalls: ReadonlyMap; - readonly pendingPermissions: ReadonlyMap; + readonly toolCalls: Readonly>; + readonly pendingPermissions: Readonly>; readonly reasoning: string; readonly usage: IUsageInfo | undefined; } @@ -281,8 +281,8 @@ export function createActiveTurn(id: string, userMessage: IUserMessage): IActive userMessage, streamingText: '', responseParts: [], - toolCalls: new Map(), - pendingPermissions: new Map(), + toolCalls: {}, + pendingPermissions: {}, reasoning: '', usage: undefined, }; diff --git a/src/vs/platform/agentHost/common/state/versions/v1.ts b/src/vs/platform/agentHost/common/state/versions/v1.ts index 78a66215c64..1df83cd7f22 100644 --- a/src/vs/platform/agentHost/common/state/versions/v1.ts +++ b/src/vs/platform/agentHost/common/state/versions/v1.ts @@ -80,8 +80,8 @@ export interface IV1_ActiveTurn { readonly userMessage: IV1_UserMessage; readonly streamingText: string; readonly responseParts: readonly IV1_ResponsePart[]; - readonly toolCalls: ReadonlyMap; - readonly pendingPermissions: ReadonlyMap; + readonly toolCalls: Readonly>; + readonly pendingPermissions: Readonly>; readonly reasoning: string; readonly usage: IV1_UsageInfo | undefined; } @@ -168,6 +168,11 @@ export interface IV1_AgentsChangedAction { readonly agents: readonly IV1_AgentInfo[]; } +export interface IV1_ActiveSessionsChangedAction { + readonly type: 'root/activeSessionsChanged'; + readonly activeSessions: number; +} + export interface IV1_SessionReadyAction extends IV1_SessionActionBase { readonly type: 'session/ready'; } diff --git a/src/vs/platform/agentHost/common/state/versions/versionRegistry.ts b/src/vs/platform/agentHost/common/state/versions/versionRegistry.ts index c650d977a9a..c629f1c8788 100644 --- a/src/vs/platform/agentHost/common/state/versions/versionRegistry.ts +++ b/src/vs/platform/agentHost/common/state/versions/versionRegistry.ts @@ -7,6 +7,7 @@ // See ../AGENTS.md for modification instructions. import type { + IActiveSessionsChangedAction, IAgentsChangedAction, IDeltaAction, IModelChangedAction, @@ -49,6 +50,7 @@ import type { } from '../sessionState.js'; import type { + IV1_ActiveSessionsChangedAction, IV1_ActiveTurn, IV1_AgentInfo, IV1_AgentsChangedAction, @@ -135,6 +137,7 @@ type _v1_ErrorInfo = AssertCompatible; // -- v1 action compatibility -- type _v1_AgentsChanged = AssertCompatible; +type _v1_ActiveSessionsChanged = AssertCompatible; type _v1_SessionReady = AssertCompatible; type _v1_CreationFailed = AssertCompatible; type _v1_TurnStarted = AssertCompatible; @@ -159,7 +162,7 @@ void (0 as unknown as _v1_ActiveTurn & _v1_MarkdownResponsePart & _v1_ContentRef & _v1_ToolCallState & _v1_CompletedToolCall & _v1_PermissionRequest & _v1_UsageInfo & _v1_ErrorInfo & - _v1_AgentsChanged & _v1_SessionReady & _v1_CreationFailed & + _v1_AgentsChanged & _v1_ActiveSessionsChanged & _v1_SessionReady & _v1_CreationFailed & _v1_TurnStarted & _v1_Delta & _v1_ResponsePart & _v1_ToolStart & _v1_ToolComplete & _v1_PermissionRequestAction & _v1_PermissionResolved & _v1_TurnComplete & _v1_TurnCancelled & _v1_SessionError & _v1_TitleChanged & @@ -179,6 +182,7 @@ void (0 as unknown as export const ACTION_INTRODUCED_IN: { readonly [K in IStateAction['type']]: number } = { // Root actions (v1) 'root/agentsChanged': 1, + 'root/activeSessionsChanged': 1, // Session lifecycle (v1) 'session/ready': 1, 'session/creationFailed': 1, @@ -233,7 +237,7 @@ export function isNotificationKnownToVersion(notification: INotification, client // When you add a new protocol version, define its additions and extend the map. /** Action types introduced in v1. */ -type IRootAction_v1 = IV1_AgentsChangedAction; +type IRootAction_v1 = IV1_AgentsChangedAction | IV1_ActiveSessionsChangedAction; type ISessionAction_v1 = IV1_SessionReadyAction | IV1_SessionCreationFailedAction | IV1_TurnStartedAction | IV1_DeltaAction | IV1_ResponsePartAction | IV1_ToolStartAction | IV1_ToolCompleteAction diff --git a/src/vs/platform/agentHost/node/agentSideEffects.ts b/src/vs/platform/agentHost/node/agentSideEffects.ts index 2d35315c745..0d4be17701a 100644 --- a/src/vs/platform/agentHost/node/agentSideEffects.ts +++ b/src/vs/platform/agentHost/node/agentSideEffects.ts @@ -7,9 +7,9 @@ import { Disposable, DisposableStore, IDisposable } from '../../../base/common/l import { autorun, IObservable } from '../../../base/common/observable.js'; import { URI } from '../../../base/common/uri.js'; import { ILogService } from '../../log/common/log.js'; -import { AgentProvider, IAgentAttachment, IAgent } from '../common/agentService.js'; +import { AgentProvider, IAgent, IAgentAttachment } from '../common/agentService.js'; import type { ISessionAction } from '../common/state/sessionActions.js'; -import type { ICreateSessionParams } from '../common/state/sessionProtocol.js'; +import { ICreateSessionParams, AHP_PROVIDER_NOT_FOUND, ProtocolError } from '../common/state/sessionProtocol.js'; import { ISessionModelInfo, SessionStatus, type ISessionSummary @@ -168,11 +168,11 @@ export class AgentSideEffects extends Disposable implements IProtocolSideEffectH async handleCreateSession(command: ICreateSessionParams): Promise { const provider = command.provider as AgentProvider | undefined; if (!provider) { - throw new Error('No provider specified for session creation'); + throw new ProtocolError(AHP_PROVIDER_NOT_FOUND, 'No provider specified for session creation'); } const agent = this._options.agents.get().find(a => a.id === provider); if (!agent) { - throw new Error(`No agent registered for provider: ${provider}`); + throw new ProtocolError(AHP_PROVIDER_NOT_FOUND, `No agent registered for provider: ${provider}`); } const session = await agent.createSession({ provider, diff --git a/src/vs/platform/agentHost/node/protocolServerHandler.ts b/src/vs/platform/agentHost/node/protocolServerHandler.ts index 0f60bf44de8..774b471b012 100644 --- a/src/vs/platform/agentHost/node/protocolServerHandler.ts +++ b/src/vs/platform/agentHost/node/protocolServerHandler.ts @@ -7,11 +7,14 @@ import { Disposable, DisposableStore } from '../../../base/common/lifecycle.js'; import { URI } from '../../../base/common/uri.js'; import { ILogService } from '../../log/common/log.js'; import { IActionEnvelope, INotification, isSessionAction } from '../common/state/sessionActions.js'; -import { isActionKnownToVersion, PROTOCOL_VERSION } from '../common/state/sessionCapabilities.js'; +import { isActionKnownToVersion, MIN_PROTOCOL_VERSION, PROTOCOL_VERSION } from '../common/state/sessionCapabilities.js'; import { isJsonRpcRequest, isJsonRpcNotification, JSON_RPC_INTERNAL_ERROR, + AHP_SESSION_NOT_FOUND, + AHP_UNSUPPORTED_PROTOCOL_VERSION, + ProtocolError, type ICreateSessionParams, type IDispatchActionParams, type IDisposeSessionParams, @@ -86,7 +89,23 @@ export class ProtocolServerHandler extends Disposable { disposables.add(transport.onMessage(msg => { if (isJsonRpcRequest(msg)) { this._logService.trace(`[ProtocolServer] request: method=${msg.method} id=${msg.id}`); - // Request - expects a correlated response + + // Handle initialize/reconnect as requests that set up the client + if (!client && (msg.method === 'initialize' || msg.method === 'reconnect')) { + try { + const result = msg.method === 'initialize' + ? this._handleInitialize(msg.params as IInitializeParams, transport, disposables) + : this._handleReconnect(msg.params as IReconnectParams, transport, disposables); + client = result.client; + transport.send({ jsonrpc: '2.0', id: msg.id, result: result.response }); + } catch (err) { + const code = err instanceof ProtocolError ? err.code : JSON_RPC_INTERNAL_ERROR; + const message = err instanceof Error ? err.message : String(err); + transport.send({ jsonrpc: '2.0', id: msg.id, error: { code, message } }); + } + return; + } + if (!client) { return; } @@ -95,12 +114,6 @@ export class ProtocolServerHandler extends Disposable { this._logService.trace(`[ProtocolServer] notification: method=${msg.method}`); // Notification — fire-and-forget switch (msg.method) { - case 'initialize': - client = this._handleInitialize(msg.params as IInitializeParams, transport, disposables); - break; - case 'reconnect': - client = this._handleReconnect(msg.params as IReconnectParams, transport, disposables); - break; case 'unsubscribe': if (client) { client.subscriptions.delete((msg.params as IUnsubscribeParams).resource.toString()); @@ -136,15 +149,22 @@ export class ProtocolServerHandler extends Disposable { disposables.add(transport); } - // ---- Notifications (fire-and-forget) ------------------------------------ + // ---- Handshake handlers ---------------------------------------------------- private _handleInitialize( params: IInitializeParams, transport: IProtocolTransport, disposables: DisposableStore, - ): IConnectedClient { + ): { client: IConnectedClient; response: unknown } { this._logService.info(`[ProtocolServer] Initialize: clientId=${params.clientId}, version=${params.protocolVersion}`); + if (params.protocolVersion < MIN_PROTOCOL_VERSION) { + throw new ProtocolError( + AHP_UNSUPPORTED_PROTOCOL_VERSION, + `Client protocol version ${params.protocolVersion} is below minimum ${MIN_PROTOCOL_VERSION}`, + ); + } + const client: IConnectedClient = { clientId: params.clientId, protocolVersion: params.protocolVersion, @@ -165,20 +185,21 @@ export class ProtocolServerHandler extends Disposable { } } - this._sendNotification(transport, 'serverHello', { - protocolVersion: PROTOCOL_VERSION, - serverSeq: this._stateManager.serverSeq, - snapshots, - }); - - return client; + return { + client, + response: { + protocolVersion: PROTOCOL_VERSION, + serverSeq: this._stateManager.serverSeq, + snapshots, + }, + }; } private _handleReconnect( params: IReconnectParams, transport: IProtocolTransport, disposables: DisposableStore, - ): IConnectedClient { + ): { client: IConnectedClient; response: unknown } { this._logService.info(`[ProtocolServer] Reconnect: clientId=${params.clientId}, lastSeenSeq=${params.lastSeenServerSeq}`); const client: IConnectedClient = { @@ -194,16 +215,18 @@ export class ProtocolServerHandler extends Disposable { const canReplay = params.lastSeenServerSeq >= oldestBuffered; if (canReplay) { + const actions: IActionEnvelope[] = []; for (const sub of params.subscriptions) { client.subscriptions.add(sub.toString()); } for (const envelope of this._replayBuffer) { if (envelope.serverSeq > params.lastSeenServerSeq) { if (this._isRelevantToClient(client, envelope)) { - this._sendNotification(transport, 'action', { envelope }); + actions.push(envelope); } } } + return { client, response: { type: 'replay', actions } }; } else { const snapshots: IStateSnapshot[] = []; for (const sub of params.subscriptions) { @@ -214,13 +237,8 @@ export class ProtocolServerHandler extends Disposable { client.subscriptions.add(resource.toString()); } } - this._sendNotification(transport, 'reconnectResponse', { - serverSeq: this._stateManager.serverSeq, - snapshots, - }); + return { client, response: { type: 'snapshot', snapshots } }; } - - return client; } // ---- Requests (expect a response) --------------------------------------- @@ -231,13 +249,14 @@ export class ProtocolServerHandler extends Disposable { client.transport.send({ jsonrpc: '2.0', id, result: result ?? null }); }).catch(err => { this._logService.error(`[ProtocolServer] Request '${method}' failed`, err); + const code = err instanceof ProtocolError ? err.code : JSON_RPC_INTERNAL_ERROR; const message = err instanceof Error && err.stack ? err.stack : String(err?.message ?? err); client.transport.send({ jsonrpc: '2.0', id, - error: { code: JSON_RPC_INTERNAL_ERROR, message }, + error: { code, message }, }); }); } @@ -269,22 +288,24 @@ export class ProtocolServerHandler extends Disposable { const p = params as IFetchTurnsParams; const session = URI.revive(p.session); const state = this._stateManager.getSessionState(session); - if (state) { - const turns = state.turns; - const start = Math.max(0, p.startTurn); - const end = Math.min(turns.length, start + p.count); - return { - session, - startTurn: start, - turns: turns.slice(start, end), - totalTurns: turns.length, - }; + if (!state) { + throw new ProtocolError(AHP_SESSION_NOT_FOUND, `Session not found: ${session.toString()}`); } + const turns = state.turns; + const limit = Math.min(p.limit ?? 50, 100); + + let endIndex = turns.length; + if (p.before) { + const idx = turns.findIndex(t => t.id === p.before); + if (idx !== -1) { + endIndex = idx; + } + } + + const startIndex = Math.max(0, endIndex - limit); return { - session, - startTurn: p.startTurn, - turns: [], - totalTurns: 0, + turns: turns.slice(startIndex, endIndex), + hasMore: startIndex > 0, }; } default: @@ -294,11 +315,6 @@ export class ProtocolServerHandler extends Disposable { // ---- Broadcasting ------------------------------------------------------- - private _sendNotification(transport: IProtocolTransport, method: string, params: unknown): void { - this._logService.trace(`[ProtocolServer] Sending notification: ${method}`); - transport.send({ jsonrpc: '2.0', method, params }); - } - private _broadcastAction(envelope: IActionEnvelope): void { this._logService.trace(`[ProtocolServer] Broadcasting action: ${envelope.action.type}`); const msg: IProtocolMessage = { jsonrpc: '2.0', method: 'action', params: { envelope } }; diff --git a/src/vs/platform/agentHost/node/remoteAgentHostProtocolClient.ts b/src/vs/platform/agentHost/node/remoteAgentHostProtocolClient.ts index c64277915e6..3a0c29101df 100644 --- a/src/vs/platform/agentHost/node/remoteAgentHostProtocolClient.ts +++ b/src/vs/platform/agentHost/node/remoteAgentHostProtocolClient.ts @@ -75,27 +75,12 @@ export class RemoteAgentHostProtocolClient extends Disposable implements IAgentC async connect(): Promise { await this._transport.connect(); - // Send initialize notification - this._sendNotification('initialize', { + // Send initialize request and await the response + const result = await this._sendRequest('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: this._clientId, - }); - - // Wait for serverHello notification - await new Promise((resolve, reject) => { - const listener = this._transport.onMessage(msg => { - if (isJsonRpcNotification(msg) && msg.method === 'serverHello') { - listener.dispose(); - const params = msg.params as { serverSeq: number }; - this._serverSeq = params.serverSeq; - resolve(); - } - }); - this._register(this._transport.onClose(() => { - listener.dispose(); - reject(new Error('Connection closed during handshake')); - })); - }); + }) as { serverSeq: number }; + this._serverSeq = result.serverSeq; } /** diff --git a/src/vs/platform/agentHost/protocol.md b/src/vs/platform/agentHost/protocol.md index 37c2b2ca1e1..1505f155bb5 100644 --- a/src/vs/platform/agentHost/protocol.md +++ b/src/vs/platform/agentHost/protocol.md @@ -129,8 +129,8 @@ ActiveTurn { userMessage: UserMessage streamingText: string responseParts: ResponsePart[] - toolCalls: Map - pendingPermissions: Map + toolCalls: Record + pendingPermissions: Record reasoning: string usage: UsageInfo | undefined } @@ -169,6 +169,7 @@ ActionEnvelope { action: Action serverSeq: number // monotonic, assigned by server origin: { clientId: string, clientSeq: number } | undefined // undefined = server-originated + rejectionReason?: string // present when the server rejected the action } ``` @@ -258,19 +259,21 @@ The protocol uses **JSON-RPC 2.0** framing over the transport (WebSocket, Messag ### Message categories -- **Client → Server notifications** (fire-and-forget): `initialize`, `reconnect`, `unsubscribe`, `dispatchAction` -- **Client → Server requests** (expect a correlated response): `subscribe`, `createSession`, `disposeSession`, `listSessions`, `fetchTurns`, `fetchContent` -- **Server → Client notifications** (pushed): `serverHello`, `reconnectResponse`, `action`, `notification` +- **Client → Server notifications** (fire-and-forget): `unsubscribe`, `dispatchAction` +- **Client → Server requests** (expect a correlated response): `initialize`, `reconnect`, `subscribe`, `createSession`, `disposeSession`, `listSessions`, `fetchTurns`, `fetchContent` +- **Server → Client notifications** (pushed): `action`, `notification` - **Server → Client responses** (correlated to requests by `id`): success result or JSON-RPC error ### Connection handshake +`initialize` is a JSON-RPC **request** — the server MUST respond with a result or error: + ``` -1. Client → Server: { "jsonrpc": "2.0", "method": "initialize", "params": { protocolVersion, clientId, initialSubscriptions? } } -2. Server → Client: { "jsonrpc": "2.0", "method": "serverHello", "params": { protocolVersion, serverSeq, snapshots[] } } +1. Client → Server: { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { protocolVersion, clientId, initialSubscriptions? } } +2. Server → Client: { "jsonrpc": "2.0", "id": 1, "result": { protocolVersion, serverSeq, snapshots[] } } ``` -`initialSubscriptions` allows the client to subscribe to root state (and any previously-open sessions on reconnect) in the same round-trip as the handshake. The server responds with snapshots for each. +`initialSubscriptions` allows the client to subscribe to root state (and any previously-open sessions on reconnect) in the same round-trip as the handshake. The server returns snapshots for each in the response. ### URI subscription @@ -331,11 +334,23 @@ Client → Server: { "jsonrpc": "2.0", "method": "dispatchAction", "params": { ### Reconnection +`reconnect` is a JSON-RPC **request**. The server MUST include all replayed data in the response: + ``` -Client → Server: { "jsonrpc": "2.0", "method": "reconnect", "params": { clientId, lastSeenServerSeq, subscriptions } } +Client → Server: { "jsonrpc": "2.0", "id": 2, "method": "reconnect", "params": { clientId, lastSeenServerSeq, subscriptions } } ``` -Server replays actions since `lastSeenServerSeq` from a bounded replay buffer. If the gap exceeds the buffer, sends fresh snapshots via a `reconnectResponse` notification. Notifications are **not** replayed — the client should re-fetch the session list. +If the gap is within the replay buffer, the response contains missed action envelopes: +``` +Server → Client: { "jsonrpc": "2.0", "id": 2, "result": { "type": "replay", "actions": [...] } } +``` + +If the gap exceeds the buffer, the response contains fresh snapshots: +``` +Server → Client: { "jsonrpc": "2.0", "id": 2, "result": { "type": "snapshot", "snapshots": [...] } } +``` + +Protocol notifications are **not** replayed — the client should re-fetch the session list. ## Write-ahead reconciliation @@ -352,7 +367,7 @@ When the client receives an `ActionEnvelope` from the server: 1. **Own action echoed**: `origin.clientId === myId` and matches head of `pendingActions` → pop from pending, apply to `confirmedState` 2. **Foreign action**: different origin → apply to `confirmedState`, rebase remaining `pendingActions` -3. **Rejected action**: server echoed with `rejected: true` → remove from pending (optimistic effect reverted) +3. **Rejected action**: server echoed with `rejectionReason` present → remove from pending (optimistic effect reverted). The `rejectionReason` MAY be surfaced to the user. 4. Recompute `optimisticState` from `confirmedState` + remaining `pendingActions` ### Why rebasing is simple @@ -444,7 +459,7 @@ interface ProtocolCapabilities { ### Forward compatibility A newer client connecting to an older server: -1. During handshake, the client learns the server's protocol version. +1. During handshake, the client learns the server's protocol version from the `initialize` response. 2. The client derives `ProtocolCapabilities` from the server version. 3. Command factories check capabilities before dispatching; if unsupported, the client degrades gracefully. 4. The server only sends action types known to the client's declared version (via `isActionKnownToVersion`). diff --git a/src/vs/platform/agentHost/test/node/protocolServerHandler.test.ts b/src/vs/platform/agentHost/test/node/protocolServerHandler.test.ts index 76f1ef6b39f..7763a8be72f 100644 --- a/src/vs/platform/agentHost/test/node/protocolServerHandler.test.ts +++ b/src/vs/platform/agentHost/test/node/protocolServerHandler.test.ts @@ -10,7 +10,7 @@ import { URI } from '../../../../base/common/uri.js'; import { ensureNoDisposablesAreLeakedInTestSuite } from '../../../../base/test/common/utils.js'; import { NullLogService } from '../../../log/common/log.js'; import type { ISessionAction } from '../../common/state/sessionActions.js'; -import { isJsonRpcNotification, isJsonRpcResponse, type ICreateSessionParams, type IProtocolMessage, type IProtocolNotification, type IServerHelloParams, type IStateSnapshot } from '../../common/state/sessionProtocol.js'; +import { isJsonRpcNotification, isJsonRpcResponse, type ICreateSessionParams, type IInitializeResult, type IProtocolMessage, type IProtocolNotification, type IReconnectResult, type IStateSnapshot } from '../../common/state/sessionProtocol.js'; import { SessionStatus, type ISessionSummary } from '../../common/state/sessionState.js'; import { PROTOCOL_VERSION } from '../../common/state/sessionCapabilities.js'; import type { IProtocolServer, IProtocolTransport } from '../../common/state/sessionTransport.js'; @@ -117,7 +117,7 @@ suite('ProtocolServerHandler', () => { function connectClient(clientId: string, initialSubscriptions?: readonly URI[]): MockProtocolTransport { const transport = new MockProtocolTransport(); server.simulateConnection(transport); - transport.simulateMessage(notification('initialize', { + transport.simulateMessage(request(1, 'initialize', { protocolVersion: PROTOCOL_VERSION, clientId, initialSubscriptions, @@ -144,14 +144,14 @@ suite('ProtocolServerHandler', () => { ensureNoDisposablesAreLeakedInTestSuite(); - test('handshake sends serverHello notification', () => { + test('handshake returns initialize response', () => { const transport = connectClient('client-1'); - const hello = findNotification(transport.sent, 'serverHello'); - assert.ok(hello, 'should have sent serverHello'); - const params = hello.params as IServerHelloParams; - assert.strictEqual(params.protocolVersion, PROTOCOL_VERSION); - assert.strictEqual(params.serverSeq, stateManager.serverSeq); + const resp = findResponse(transport.sent, 1); + assert.ok(resp, 'should have sent initialize response'); + const result = (resp as { result: IInitializeResult }).result; + assert.strictEqual(result.protocolVersion, PROTOCOL_VERSION); + assert.strictEqual(result.serverSeq, stateManager.serverSeq); }); test('handshake with initialSubscriptions returns snapshots', () => { @@ -159,11 +159,11 @@ suite('ProtocolServerHandler', () => { const transport = connectClient('client-1', [sessionUri]); - const hello = findNotification(transport.sent, 'serverHello'); - assert.ok(hello); - const params = hello.params as IServerHelloParams; - assert.strictEqual(params.snapshots.length, 1); - assert.strictEqual(params.snapshots[0].resource.toString(), sessionUri.toString()); + const resp = findResponse(transport.sent, 1); + assert.ok(resp); + const result = (resp as { result: IInitializeResult }).result; + assert.strictEqual(result.snapshots.length, 1); + assert.strictEqual(result.snapshots[0].resource.toString(), sessionUri.toString()); }); test('subscribe request returns snapshot', async () => { @@ -249,8 +249,8 @@ suite('ProtocolServerHandler', () => { stateManager.dispatchServerAction({ type: 'session/ready', session: sessionUri }); const transport1 = connectClient('client-r', [sessionUri]); - const hello = findNotification(transport1.sent, 'serverHello'); - const helloSeq = (hello!.params as IServerHelloParams).serverSeq; + const resp = findResponse(transport1.sent, 1); + const initSeq = (resp as { result: IInitializeResult }).result.serverSeq; transport1.simulateClose(); stateManager.dispatchServerAction({ type: 'session/titleChanged', session: sessionUri, title: 'Title A' }); @@ -258,14 +258,19 @@ suite('ProtocolServerHandler', () => { const transport2 = new MockProtocolTransport(); server.simulateConnection(transport2); - transport2.simulateMessage(notification('reconnect', { + transport2.simulateMessage(request(1, 'reconnect', { clientId: 'client-r', - lastSeenServerSeq: helloSeq, + lastSeenServerSeq: initSeq, subscriptions: [sessionUri], })); - const replayed = findNotifications(transport2.sent, 'action'); - assert.strictEqual(replayed.length, 2); + const reconnectResp = findResponse(transport2.sent, 1); + assert.ok(reconnectResp, 'should have sent reconnect response'); + const result = (reconnectResp as { result: IReconnectResult }).result; + assert.strictEqual(result.type, 'replay'); + if (result.type === 'replay') { + assert.strictEqual(result.actions.length, 2); + } }); test('reconnect sends fresh snapshots when gap too large', () => { @@ -281,16 +286,19 @@ suite('ProtocolServerHandler', () => { const transport2 = new MockProtocolTransport(); server.simulateConnection(transport2); - transport2.simulateMessage(notification('reconnect', { + transport2.simulateMessage(request(1, 'reconnect', { clientId: 'client-g', lastSeenServerSeq: 0, subscriptions: [sessionUri], })); - const reconnectResp = findNotification(transport2.sent, 'reconnectResponse'); - assert.ok(reconnectResp, 'should receive a reconnectResponse'); - const params = reconnectResp!.params as { snapshots: IStateSnapshot[] }; - assert.ok(params.snapshots.length > 0, 'should contain snapshots'); + const reconnectResp = findResponse(transport2.sent, 1); + assert.ok(reconnectResp, 'should have sent reconnect response'); + const result = (reconnectResp as { result: IReconnectResult }).result; + assert.strictEqual(result.type, 'snapshot'); + if (result.type === 'snapshot') { + assert.ok(result.snapshots.length > 0, 'should contain snapshots'); + } }); test('client disconnect cleans up', () => { diff --git a/src/vs/platform/agentHost/test/node/protocolWebSocket.integrationTest.ts b/src/vs/platform/agentHost/test/node/protocolWebSocket.integrationTest.ts index 367a0369f1a..3441e560bbe 100644 --- a/src/vs/platform/agentHost/test/node/protocolWebSocket.integrationTest.ts +++ b/src/vs/platform/agentHost/test/node/protocolWebSocket.integrationTest.ts @@ -16,13 +16,14 @@ import { JSON_RPC_PARSE_ERROR, type IActionBroadcastParams, type IFetchTurnsResult, + type IInitializeResult, type IJsonRpcErrorResponse, type IJsonRpcSuccessResponse, type IListSessionsResult, type INotificationBroadcastParams, type IProtocolMessage, type IProtocolNotification, - type IServerHelloParams, + type IReconnectResult, type IStateSnapshot, } from '../../common/state/sessionProtocol.js'; import type { IDeltaAction, ISessionAddedNotification, ISessionRemovedNotification, IUsageAction } from '../../common/state/sessionActions.js'; @@ -244,8 +245,7 @@ function getActionParams(n: IProtocolNotification): IActionBroadcastParams { /** Perform handshake, create a session, subscribe, and return its URI. */ async function createAndSubscribeSession(c: TestProtocolClient, clientId: string): Promise { - c.notify('initialize', { protocolVersion: PROTOCOL_VERSION, clientId }); - await c.waitForNotification(n => n.method === 'serverHello'); + await c.call('initialize', { protocolVersion: PROTOCOL_VERSION, clientId }); await c.call('createSession', { session: nextSessionUri(), provider: 'mock' }); @@ -299,28 +299,25 @@ suite('Protocol WebSocket E2E', function () { }); // 1. Handshake - test('handshake returns serverHello with protocol version', async function () { + test('handshake returns initialize response with protocol version', async function () { this.timeout(5_000); - client.notify('initialize', { + const result = await client.call('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-handshake', initialSubscriptions: [URI.from({ scheme: 'agenthost', path: '/root' })], }); - const hello = await client.waitForNotification(n => n.method === 'serverHello'); - const params = hello.params as IServerHelloParams; - assert.strictEqual(params.protocolVersion, PROTOCOL_VERSION); - assert.ok(params.serverSeq >= 0); - assert.ok(params.snapshots.length >= 1, 'should have root state snapshot'); + assert.strictEqual(result.protocolVersion, PROTOCOL_VERSION); + assert.ok(result.serverSeq >= 0); + assert.ok(result.snapshots.length >= 1, 'should have root state snapshot'); }); // 2. Create session test('create session triggers sessionAdded notification', async function () { this.timeout(10_000); - client.notify('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-create-session' }); - await client.waitForNotification(n => n.method === 'serverHello'); + await client.call('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-create-session' }); await client.call('createSession', { session: nextSessionUri(), provider: 'mock' }); @@ -411,8 +408,7 @@ suite('Protocol WebSocket E2E', function () { test('listSessions returns sessions', async function () { this.timeout(10_000); - client.notify('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-list-sessions' }); - await client.waitForNotification(n => n.method === 'serverHello'); + await client.call('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-list-sessions' }); await client.call('createSession', { session: nextSessionUri(), provider: 'mock' }); await client.waitForNotification(n => @@ -440,19 +436,16 @@ suite('Protocol WebSocket E2E', function () { const client2 = new TestProtocolClient(server.port); await client2.connect(); - client2.notify('reconnect', { + const result = await client2.call('reconnect', { clientId: 'test-reconnect', lastSeenServerSeq: missedFromSeq, subscriptions: [sessionUri], }); - await new Promise(resolve => setTimeout(resolve, 500)); - - const replayed = client2.receivedNotifications(); - assert.ok(replayed.length > 0, 'should receive replayed actions or reconnect response'); - const hasActions = replayed.some(n => n.method === 'action'); - const hasReconnect = replayed.some(n => n.method === 'reconnectResponse'); - assert.ok(hasActions || hasReconnect); + assert.ok(result.type === 'replay' || result.type === 'snapshot', 'should receive replay or snapshot'); + if (result.type === 'replay') { + assert.ok(result.actions.length > 0, 'should have replayed actions'); + } client2.close(); }); @@ -502,8 +495,7 @@ suite('Protocol WebSocket E2E', function () { test('createSession with invalid provider does not crash server', async function () { this.timeout(10_000); - client.notify('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-invalid-create' }); - await client.waitForNotification(n => n.method === 'serverHello'); + await client.call('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-invalid-create' }); // This should return a JSON-RPC error let gotError = false; @@ -534,9 +526,9 @@ suite('Protocol WebSocket E2E', function () { await new Promise(resolve => setTimeout(resolve, 200)); await client.waitForNotification(n => isActionNotification(n, 'session/turnComplete')); - const result = await client.call('fetchTurns', { session: sessionUri, startTurn: 0, count: 10 }); + const result = await client.call('fetchTurns', { session: sessionUri, limit: 10 }); assert.ok(result.turns.length >= 2); - assert.ok(result.totalTurns >= 2); + assert.strictEqual(typeof result.hasMore, 'boolean'); }); // ---- Gap tests: coverage --------------------------------------------------- @@ -599,8 +591,7 @@ suite('Protocol WebSocket E2E', function () { const client2 = new TestProtocolClient(server.port); await client2.connect(); - client2.notify('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-multi-client-2' }); - await client2.waitForNotification(n => n.method === 'serverHello'); + await client2.call('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-multi-client-2' }); await client2.call('subscribe', { resource: sessionUri }); client2.clearReceived(); @@ -627,8 +618,7 @@ suite('Protocol WebSocket E2E', function () { const client2 = new TestProtocolClient(server.port); await client2.connect(); - client2.notify('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-unsub-helper' }); - await client2.waitForNotification(n => n.method === 'serverHello'); + await client2.call('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-unsub-helper' }); await client2.call('subscribe', { resource: sessionUri }); dispatchTurnStarted(client2, sessionUri, 'turn-unsub', 'hello', 1); diff --git a/src/vs/workbench/contrib/chat/browser/agentSessions/agentHost/agentHostSessionHandler.ts b/src/vs/workbench/contrib/chat/browser/agentSessions/agentHost/agentHostSessionHandler.ts index d583a5af76a..09f9f680e89 100644 --- a/src/vs/workbench/contrib/chat/browser/agentSessions/agentHost/agentHostSessionHandler.ts +++ b/src/vs/workbench/contrib/chat/browser/agentSessions/agentHost/agentHostSessionHandler.ts @@ -351,7 +351,7 @@ export class AgentHostSessionHandler extends Disposable implements IChatSessionC } // Handle tool calls — create/finalize ChatToolInvocations - for (const [toolCallId, tc] of activeTurn.toolCalls) { + for (const [toolCallId, tc] of Object.entries(activeTurn.toolCalls)) { const existing = activeToolInvocations.get(toolCallId); if (!existing) { if (tc.status === ToolCallStatus.Running || tc.status === ToolCallStatus.PendingPermission) { @@ -366,7 +366,7 @@ export class AgentHostSessionHandler extends Disposable implements IChatSessionC } // Handle permission requests - for (const [requestId, perm] of activeTurn.pendingPermissions) { + for (const [requestId, perm] of Object.entries(activeTurn.pendingPermissions)) { if (activePermissions.has(requestId)) { continue; }