protocol parity

Goes with 009e4e93c3
This commit is contained in:
Connor Peet
2026-03-13 14:50:22 -07:00
parent 706e49e462
commit f8311c303d
15 changed files with 271 additions and 230 deletions

View File

@@ -10,22 +10,19 @@
import { URI } from '../../../../base/common/uri.js';
/**
* JSON.stringify replacer that serializes {@link URI} instances and Maps
* into a revivable format.
* JSON.stringify replacer that serializes {@link URI} instances
* into a wire-friendly format.
*/
export function protocolReplacer(_key: string, value: unknown): unknown {
if (value instanceof URI) {
return value.toJSON();
}
if (value instanceof Map) {
return { $type: 'Map', entries: [...value.entries()] };
}
return value;
}
/**
* JSON.parse reviver that restores {@link URI} instances and Maps from
* their serialized format.
* JSON.parse reviver that restores {@link URI} instances from their
* serialized format.
*/
export function protocolReviver(_key: string, value: unknown): unknown {
if (value && typeof value === 'object') {
@@ -33,9 +30,6 @@ export function protocolReviver(_key: string, value: unknown): unknown {
if (obj.$mid === 1) {
return URI.revive(value as URI);
}
if (obj.$type === 'Map' && Array.isArray(obj.entries)) {
return new Map(obj.entries as [unknown, unknown][]);
}
}
return value;
}

View File

@@ -44,10 +44,11 @@ export interface IActionEnvelope<A extends IStateAction = IStateAction> {
*/
readonly origin: IActionOrigin | undefined;
/**
* Set to `true` when the server rejected the command that produced this
* action. The client should revert its optimistic prediction.
* When present, indicates the server rejected the action. The client
* should revert its optimistic prediction. Contains a human-readable
* explanation (e.g. `"no active turn to cancel"`).
*/
readonly rejected?: true;
readonly rejectionReason?: string;
}
export interface IActionOrigin {

View File

@@ -181,7 +181,7 @@ export class SessionClientState extends Disposable {
const headIdx = this._pendingActions.findIndex(p => p.clientSeq === origin.clientSeq);
if (headIdx !== -1) {
if (envelope.rejected) {
if (envelope.rejectionReason) {
this._pendingActions.splice(headIdx, 1);
} else {
this._applyToConfirmed(envelope.action);

View File

@@ -7,12 +7,12 @@
// See protocol.md for the full design.
//
// Client → Server messages are either:
// - Notifications (fire-and-forget): initialize, reconnect, unsubscribe, dispatchAction
// - Requests (expect a correlated response): subscribe, createSession, disposeSession,
// listSessions, fetchTurns, fetchContent
// - Notifications (fire-and-forget): unsubscribe, dispatchAction
// - Requests (expect a correlated response): initialize, reconnect, subscribe,
// createSession, disposeSession, listSessions, fetchTurns, fetchContent
//
// Server → Client messages are either:
// - Notifications (pushed to clients): serverHello, reconnectResponse, action, notification
// - Notifications (pushed to clients): action, notification
// - Responses (correlated to a client request by id)
import { hasKey } from '../../../../base/common/types.js';
@@ -79,6 +79,24 @@ export function isJsonRpcResponse(msg: IProtocolMessage): msg is IJsonRpcRespons
export const JSON_RPC_PARSE_ERROR = -32700;
export const JSON_RPC_INTERNAL_ERROR = -32603;
// ---- AHP application error codes -------------------------------------------
export const AHP_SESSION_NOT_FOUND = -32001;
export const AHP_PROVIDER_NOT_FOUND = -32002;
export const AHP_SESSION_ALREADY_EXISTS = -32003;
export const AHP_TURN_IN_PROGRESS = -32004;
export const AHP_UNSUPPORTED_PROTOCOL_VERSION = -32005;
export const AHP_CONTENT_NOT_FOUND = -32006;
/**
* Error with a JSON-RPC error code for protocol-level failures.
*/
export class ProtocolError extends Error {
constructor(readonly code: number, message: string) {
super(message);
}
}
// ---- Shared data types ------------------------------------------------------
/** State snapshot returned by subscribe and included in handshake/reconnect. */
@@ -90,18 +108,6 @@ export interface IStateSnapshot {
// ---- Client → Server: Notification params -----------------------------------
export interface IInitializeParams {
readonly protocolVersion: number;
readonly clientId: string;
readonly initialSubscriptions?: readonly URI[];
}
export interface IReconnectParams {
readonly clientId: string;
readonly lastSeenServerSeq: number;
readonly subscriptions: readonly URI[];
}
export interface IUnsubscribeParams {
readonly resource: URI;
}
@@ -113,6 +119,38 @@ export interface IDispatchActionParams {
// ---- Client → Server: Request params and results ----------------------------
export interface IInitializeParams {
readonly protocolVersion: number;
readonly clientId: string;
readonly initialSubscriptions?: readonly URI[];
}
export interface IInitializeResult {
readonly protocolVersion: number;
readonly serverSeq: number;
readonly snapshots: readonly IStateSnapshot[];
}
export interface IReconnectParams {
readonly clientId: string;
readonly lastSeenServerSeq: number;
readonly subscriptions: readonly URI[];
}
export type IReconnectResult =
| IReconnectReplayResult
| IReconnectSnapshotResult;
export interface IReconnectReplayResult {
readonly type: 'replay';
readonly actions: readonly IActionEnvelope[];
}
export interface IReconnectSnapshotResult {
readonly type: 'snapshot';
readonly snapshots: readonly IStateSnapshot[];
}
export interface ISubscribeParams {
readonly resource: URI;
}
@@ -142,15 +180,13 @@ export interface IListSessionsResult {
export interface IFetchTurnsParams {
readonly session: URI;
readonly startTurn: number;
readonly count: number;
readonly before?: string;
readonly limit?: number;
}
export interface IFetchTurnsResult {
readonly session: URI;
readonly startTurn: number;
readonly turns: ISessionState['turns'];
readonly totalTurns: number;
readonly hasMore: boolean;
}
export interface IFetchContentParams {
@@ -158,24 +194,13 @@ export interface IFetchContentParams {
}
export interface IFetchContentResult {
readonly uri: URI;
readonly data: string; // base64-encoded for binary safety
readonly data: string;
readonly encoding: 'base64' | 'utf-8';
readonly mimeType?: string;
}
// ---- Server → Client: Notification params -----------------------------------
export interface IServerHelloParams {
readonly protocolVersion: number;
readonly serverSeq: number;
readonly snapshots: readonly IStateSnapshot[];
}
export interface IReconnectResponseParams {
readonly serverSeq: number;
readonly snapshots: readonly IStateSnapshot[];
}
export interface IActionBroadcastParams {
readonly envelope: IActionEnvelope<IStateAction>;
}

View File

@@ -20,7 +20,6 @@ import {
type IErrorInfo,
type IRootState,
type ISessionState,
type IToolCallState,
type ITurn,
createActiveTurn,
SessionLifecycle,
@@ -100,50 +99,49 @@ export function sessionReducer(state: ISessionState, action: ISessionAction): IS
if (!state.activeTurn || state.activeTurn.id !== action.turnId) {
return state;
}
const toolCalls = new Map(state.activeTurn.toolCalls);
toolCalls.set(action.toolCall.toolCallId, action.toolCall);
return {
...state,
activeTurn: { ...state.activeTurn, toolCalls },
activeTurn: {
...state.activeTurn,
toolCalls: { ...state.activeTurn.toolCalls, [action.toolCall.toolCallId]: action.toolCall },
},
};
}
case 'session/toolComplete': {
if (!state.activeTurn || state.activeTurn.id !== action.turnId) {
return state;
}
const toolCall = state.activeTurn.toolCalls.get(action.toolCallId);
const toolCall = state.activeTurn.toolCalls[action.toolCallId];
if (!toolCall) {
return state;
}
const toolCalls = new Map(state.activeTurn.toolCalls);
toolCalls.set(action.toolCallId, {
...toolCall,
status: action.result.success ? ToolCallStatus.Completed : ToolCallStatus.Failed,
pastTenseMessage: action.result.pastTenseMessage,
toolOutput: action.result.toolOutput,
error: action.result.error,
});
return {
...state,
activeTurn: { ...state.activeTurn, toolCalls },
activeTurn: {
...state.activeTurn,
toolCalls: {
...state.activeTurn.toolCalls,
[action.toolCallId]: {
...toolCall,
status: action.result.success ? ToolCallStatus.Completed : ToolCallStatus.Failed,
pastTenseMessage: action.result.pastTenseMessage,
toolOutput: action.result.toolOutput,
error: action.result.error,
},
},
},
};
}
case 'session/permissionRequest': {
if (!state.activeTurn || state.activeTurn.id !== action.turnId) {
return state;
}
const pendingPermissions = new Map(state.activeTurn.pendingPermissions);
pendingPermissions.set(action.request.requestId, action.request);
let toolCalls: ReadonlyMap<string, IToolCallState> = state.activeTurn.toolCalls;
const pendingPermissions = { ...state.activeTurn.pendingPermissions, [action.request.requestId]: action.request };
let toolCalls = state.activeTurn.toolCalls;
if (action.request.toolCallId) {
const toolCall = toolCalls.get(action.request.toolCallId);
const toolCall = toolCalls[action.request.toolCallId];
if (toolCall) {
const mutable = new Map(toolCalls);
mutable.set(action.request.toolCallId, {
...toolCall,
status: ToolCallStatus.PendingPermission,
});
toolCalls = mutable;
toolCalls = { ...toolCalls, [action.request.toolCallId]: { ...toolCall, status: ToolCallStatus.PendingPermission } };
}
}
return {
@@ -155,21 +153,21 @@ export function sessionReducer(state: ISessionState, action: ISessionAction): IS
if (!state.activeTurn || state.activeTurn.id !== action.turnId) {
return state;
}
const pendingPermissions = new Map(state.activeTurn.pendingPermissions);
const resolved = pendingPermissions.get(action.requestId);
pendingPermissions.delete(action.requestId);
let toolCalls: ReadonlyMap<string, IToolCallState> = state.activeTurn.toolCalls;
const resolved = state.activeTurn.pendingPermissions[action.requestId];
const { [action.requestId]: _, ...pendingPermissions } = state.activeTurn.pendingPermissions;
let toolCalls = state.activeTurn.toolCalls;
if (resolved?.toolCallId) {
const toolCall = toolCalls.get(resolved.toolCallId);
const toolCall = toolCalls[resolved.toolCallId];
if (toolCall && toolCall.status === ToolCallStatus.PendingPermission) {
const mutable = new Map(toolCalls);
mutable.set(resolved.toolCallId, {
...toolCall,
status: action.approved ? ToolCallStatus.Running : ToolCallStatus.Cancelled,
confirmed: action.approved ? 'user-action' : 'denied',
cancellationReason: action.approved ? undefined : 'denied',
});
toolCalls = mutable;
toolCalls = {
...toolCalls,
[resolved.toolCallId]: {
...toolCall,
status: action.approved ? ToolCallStatus.Running : ToolCallStatus.Cancelled,
confirmed: action.approved ? 'user-action' : 'denied',
cancellationReason: action.approved ? undefined : 'denied',
},
};
}
}
return {
@@ -237,7 +235,7 @@ function finalizeTurn(state: ISessionState, turnId: string, turnState: TurnState
const active = state.activeTurn;
const completedToolCalls: ICompletedToolCall[] = [];
for (const tc of active.toolCalls.values()) {
for (const tc of Object.values(active.toolCalls)) {
completedToolCalls.push({
toolCallId: tc.toolCallId,
toolName: tc.toolName,

View File

@@ -138,8 +138,8 @@ export interface IActiveTurn {
readonly userMessage: IUserMessage;
readonly streamingText: string;
readonly responseParts: readonly IResponsePart[];
readonly toolCalls: ReadonlyMap<string, IToolCallState>;
readonly pendingPermissions: ReadonlyMap<string, IPermissionRequest>;
readonly toolCalls: Readonly<Record<string, IToolCallState>>;
readonly pendingPermissions: Readonly<Record<string, IPermissionRequest>>;
readonly reasoning: string;
readonly usage: IUsageInfo | undefined;
}
@@ -281,8 +281,8 @@ export function createActiveTurn(id: string, userMessage: IUserMessage): IActive
userMessage,
streamingText: '',
responseParts: [],
toolCalls: new Map(),
pendingPermissions: new Map(),
toolCalls: {},
pendingPermissions: {},
reasoning: '',
usage: undefined,
};

View File

@@ -80,8 +80,8 @@ export interface IV1_ActiveTurn {
readonly userMessage: IV1_UserMessage;
readonly streamingText: string;
readonly responseParts: readonly IV1_ResponsePart[];
readonly toolCalls: ReadonlyMap<string, IV1_ToolCallState>;
readonly pendingPermissions: ReadonlyMap<string, IV1_PermissionRequest>;
readonly toolCalls: Readonly<Record<string, IV1_ToolCallState>>;
readonly pendingPermissions: Readonly<Record<string, IV1_PermissionRequest>>;
readonly reasoning: string;
readonly usage: IV1_UsageInfo | undefined;
}
@@ -168,6 +168,11 @@ export interface IV1_AgentsChangedAction {
readonly agents: readonly IV1_AgentInfo[];
}
export interface IV1_ActiveSessionsChangedAction {
readonly type: 'root/activeSessionsChanged';
readonly activeSessions: number;
}
export interface IV1_SessionReadyAction extends IV1_SessionActionBase {
readonly type: 'session/ready';
}

View File

@@ -7,6 +7,7 @@
// See ../AGENTS.md for modification instructions.
import type {
IActiveSessionsChangedAction,
IAgentsChangedAction,
IDeltaAction,
IModelChangedAction,
@@ -49,6 +50,7 @@ import type {
} from '../sessionState.js';
import type {
IV1_ActiveSessionsChangedAction,
IV1_ActiveTurn,
IV1_AgentInfo,
IV1_AgentsChangedAction,
@@ -135,6 +137,7 @@ type _v1_ErrorInfo = AssertCompatible<IV1_ErrorInfo, IErrorInfo>;
// -- v1 action compatibility --
type _v1_AgentsChanged = AssertCompatible<IV1_AgentsChangedAction, IAgentsChangedAction>;
type _v1_ActiveSessionsChanged = AssertCompatible<IV1_ActiveSessionsChangedAction, IActiveSessionsChangedAction>;
type _v1_SessionReady = AssertCompatible<IV1_SessionReadyAction, ISessionReadyAction>;
type _v1_CreationFailed = AssertCompatible<IV1_SessionCreationFailedAction, ISessionCreationFailedAction>;
type _v1_TurnStarted = AssertCompatible<IV1_TurnStartedAction, ITurnStartedAction>;
@@ -159,7 +162,7 @@ void (0 as unknown as
_v1_ActiveTurn & _v1_MarkdownResponsePart & _v1_ContentRef &
_v1_ToolCallState & _v1_CompletedToolCall & _v1_PermissionRequest &
_v1_UsageInfo & _v1_ErrorInfo &
_v1_AgentsChanged & _v1_SessionReady & _v1_CreationFailed &
_v1_AgentsChanged & _v1_ActiveSessionsChanged & _v1_SessionReady & _v1_CreationFailed &
_v1_TurnStarted & _v1_Delta & _v1_ResponsePart & _v1_ToolStart &
_v1_ToolComplete & _v1_PermissionRequestAction & _v1_PermissionResolved &
_v1_TurnComplete & _v1_TurnCancelled & _v1_SessionError & _v1_TitleChanged &
@@ -179,6 +182,7 @@ void (0 as unknown as
export const ACTION_INTRODUCED_IN: { readonly [K in IStateAction['type']]: number } = {
// Root actions (v1)
'root/agentsChanged': 1,
'root/activeSessionsChanged': 1,
// Session lifecycle (v1)
'session/ready': 1,
'session/creationFailed': 1,
@@ -233,7 +237,7 @@ export function isNotificationKnownToVersion(notification: INotification, client
// When you add a new protocol version, define its additions and extend the map.
/** Action types introduced in v1. */
type IRootAction_v1 = IV1_AgentsChangedAction;
type IRootAction_v1 = IV1_AgentsChangedAction | IV1_ActiveSessionsChangedAction;
type ISessionAction_v1 = IV1_SessionReadyAction | IV1_SessionCreationFailedAction
| IV1_TurnStartedAction | IV1_DeltaAction | IV1_ResponsePartAction
| IV1_ToolStartAction | IV1_ToolCompleteAction

View File

@@ -7,9 +7,9 @@ import { Disposable, DisposableStore, IDisposable } from '../../../base/common/l
import { autorun, IObservable } from '../../../base/common/observable.js';
import { URI } from '../../../base/common/uri.js';
import { ILogService } from '../../log/common/log.js';
import { AgentProvider, IAgentAttachment, IAgent } from '../common/agentService.js';
import { AgentProvider, IAgent, IAgentAttachment } from '../common/agentService.js';
import type { ISessionAction } from '../common/state/sessionActions.js';
import type { ICreateSessionParams } from '../common/state/sessionProtocol.js';
import { ICreateSessionParams, AHP_PROVIDER_NOT_FOUND, ProtocolError } from '../common/state/sessionProtocol.js';
import {
ISessionModelInfo,
SessionStatus, type ISessionSummary
@@ -168,11 +168,11 @@ export class AgentSideEffects extends Disposable implements IProtocolSideEffectH
async handleCreateSession(command: ICreateSessionParams): Promise<URI> {
const provider = command.provider as AgentProvider | undefined;
if (!provider) {
throw new Error('No provider specified for session creation');
throw new ProtocolError(AHP_PROVIDER_NOT_FOUND, 'No provider specified for session creation');
}
const agent = this._options.agents.get().find(a => a.id === provider);
if (!agent) {
throw new Error(`No agent registered for provider: ${provider}`);
throw new ProtocolError(AHP_PROVIDER_NOT_FOUND, `No agent registered for provider: ${provider}`);
}
const session = await agent.createSession({
provider,

View File

@@ -7,11 +7,14 @@ import { Disposable, DisposableStore } from '../../../base/common/lifecycle.js';
import { URI } from '../../../base/common/uri.js';
import { ILogService } from '../../log/common/log.js';
import { IActionEnvelope, INotification, isSessionAction } from '../common/state/sessionActions.js';
import { isActionKnownToVersion, PROTOCOL_VERSION } from '../common/state/sessionCapabilities.js';
import { isActionKnownToVersion, MIN_PROTOCOL_VERSION, PROTOCOL_VERSION } from '../common/state/sessionCapabilities.js';
import {
isJsonRpcRequest,
isJsonRpcNotification,
JSON_RPC_INTERNAL_ERROR,
AHP_SESSION_NOT_FOUND,
AHP_UNSUPPORTED_PROTOCOL_VERSION,
ProtocolError,
type ICreateSessionParams,
type IDispatchActionParams,
type IDisposeSessionParams,
@@ -86,7 +89,23 @@ export class ProtocolServerHandler extends Disposable {
disposables.add(transport.onMessage(msg => {
if (isJsonRpcRequest(msg)) {
this._logService.trace(`[ProtocolServer] request: method=${msg.method} id=${msg.id}`);
// Request - expects a correlated response
// Handle initialize/reconnect as requests that set up the client
if (!client && (msg.method === 'initialize' || msg.method === 'reconnect')) {
try {
const result = msg.method === 'initialize'
? this._handleInitialize(msg.params as IInitializeParams, transport, disposables)
: this._handleReconnect(msg.params as IReconnectParams, transport, disposables);
client = result.client;
transport.send({ jsonrpc: '2.0', id: msg.id, result: result.response });
} catch (err) {
const code = err instanceof ProtocolError ? err.code : JSON_RPC_INTERNAL_ERROR;
const message = err instanceof Error ? err.message : String(err);
transport.send({ jsonrpc: '2.0', id: msg.id, error: { code, message } });
}
return;
}
if (!client) {
return;
}
@@ -95,12 +114,6 @@ export class ProtocolServerHandler extends Disposable {
this._logService.trace(`[ProtocolServer] notification: method=${msg.method}`);
// Notification — fire-and-forget
switch (msg.method) {
case 'initialize':
client = this._handleInitialize(msg.params as IInitializeParams, transport, disposables);
break;
case 'reconnect':
client = this._handleReconnect(msg.params as IReconnectParams, transport, disposables);
break;
case 'unsubscribe':
if (client) {
client.subscriptions.delete((msg.params as IUnsubscribeParams).resource.toString());
@@ -136,15 +149,22 @@ export class ProtocolServerHandler extends Disposable {
disposables.add(transport);
}
// ---- Notifications (fire-and-forget) ------------------------------------
// ---- Handshake handlers ----------------------------------------------------
private _handleInitialize(
params: IInitializeParams,
transport: IProtocolTransport,
disposables: DisposableStore,
): IConnectedClient {
): { client: IConnectedClient; response: unknown } {
this._logService.info(`[ProtocolServer] Initialize: clientId=${params.clientId}, version=${params.protocolVersion}`);
if (params.protocolVersion < MIN_PROTOCOL_VERSION) {
throw new ProtocolError(
AHP_UNSUPPORTED_PROTOCOL_VERSION,
`Client protocol version ${params.protocolVersion} is below minimum ${MIN_PROTOCOL_VERSION}`,
);
}
const client: IConnectedClient = {
clientId: params.clientId,
protocolVersion: params.protocolVersion,
@@ -165,20 +185,21 @@ export class ProtocolServerHandler extends Disposable {
}
}
this._sendNotification(transport, 'serverHello', {
protocolVersion: PROTOCOL_VERSION,
serverSeq: this._stateManager.serverSeq,
snapshots,
});
return client;
return {
client,
response: {
protocolVersion: PROTOCOL_VERSION,
serverSeq: this._stateManager.serverSeq,
snapshots,
},
};
}
private _handleReconnect(
params: IReconnectParams,
transport: IProtocolTransport,
disposables: DisposableStore,
): IConnectedClient {
): { client: IConnectedClient; response: unknown } {
this._logService.info(`[ProtocolServer] Reconnect: clientId=${params.clientId}, lastSeenSeq=${params.lastSeenServerSeq}`);
const client: IConnectedClient = {
@@ -194,16 +215,18 @@ export class ProtocolServerHandler extends Disposable {
const canReplay = params.lastSeenServerSeq >= oldestBuffered;
if (canReplay) {
const actions: IActionEnvelope[] = [];
for (const sub of params.subscriptions) {
client.subscriptions.add(sub.toString());
}
for (const envelope of this._replayBuffer) {
if (envelope.serverSeq > params.lastSeenServerSeq) {
if (this._isRelevantToClient(client, envelope)) {
this._sendNotification(transport, 'action', { envelope });
actions.push(envelope);
}
}
}
return { client, response: { type: 'replay', actions } };
} else {
const snapshots: IStateSnapshot[] = [];
for (const sub of params.subscriptions) {
@@ -214,13 +237,8 @@ export class ProtocolServerHandler extends Disposable {
client.subscriptions.add(resource.toString());
}
}
this._sendNotification(transport, 'reconnectResponse', {
serverSeq: this._stateManager.serverSeq,
snapshots,
});
return { client, response: { type: 'snapshot', snapshots } };
}
return client;
}
// ---- Requests (expect a response) ---------------------------------------
@@ -231,13 +249,14 @@ export class ProtocolServerHandler extends Disposable {
client.transport.send({ jsonrpc: '2.0', id, result: result ?? null });
}).catch(err => {
this._logService.error(`[ProtocolServer] Request '${method}' failed`, err);
const code = err instanceof ProtocolError ? err.code : JSON_RPC_INTERNAL_ERROR;
const message = err instanceof Error && err.stack
? err.stack
: String(err?.message ?? err);
client.transport.send({
jsonrpc: '2.0',
id,
error: { code: JSON_RPC_INTERNAL_ERROR, message },
error: { code, message },
});
});
}
@@ -269,22 +288,24 @@ export class ProtocolServerHandler extends Disposable {
const p = params as IFetchTurnsParams;
const session = URI.revive(p.session);
const state = this._stateManager.getSessionState(session);
if (state) {
const turns = state.turns;
const start = Math.max(0, p.startTurn);
const end = Math.min(turns.length, start + p.count);
return {
session,
startTurn: start,
turns: turns.slice(start, end),
totalTurns: turns.length,
};
if (!state) {
throw new ProtocolError(AHP_SESSION_NOT_FOUND, `Session not found: ${session.toString()}`);
}
const turns = state.turns;
const limit = Math.min(p.limit ?? 50, 100);
let endIndex = turns.length;
if (p.before) {
const idx = turns.findIndex(t => t.id === p.before);
if (idx !== -1) {
endIndex = idx;
}
}
const startIndex = Math.max(0, endIndex - limit);
return {
session,
startTurn: p.startTurn,
turns: [],
totalTurns: 0,
turns: turns.slice(startIndex, endIndex),
hasMore: startIndex > 0,
};
}
default:
@@ -294,11 +315,6 @@ export class ProtocolServerHandler extends Disposable {
// ---- Broadcasting -------------------------------------------------------
private _sendNotification(transport: IProtocolTransport, method: string, params: unknown): void {
this._logService.trace(`[ProtocolServer] Sending notification: ${method}`);
transport.send({ jsonrpc: '2.0', method, params });
}
private _broadcastAction(envelope: IActionEnvelope): void {
this._logService.trace(`[ProtocolServer] Broadcasting action: ${envelope.action.type}`);
const msg: IProtocolMessage = { jsonrpc: '2.0', method: 'action', params: { envelope } };

View File

@@ -75,27 +75,12 @@ export class RemoteAgentHostProtocolClient extends Disposable implements IAgentC
async connect(): Promise<void> {
await this._transport.connect();
// Send initialize notification
this._sendNotification('initialize', {
// Send initialize request and await the response
const result = await this._sendRequest('initialize', {
protocolVersion: PROTOCOL_VERSION,
clientId: this._clientId,
});
// Wait for serverHello notification
await new Promise<void>((resolve, reject) => {
const listener = this._transport.onMessage(msg => {
if (isJsonRpcNotification(msg) && msg.method === 'serverHello') {
listener.dispose();
const params = msg.params as { serverSeq: number };
this._serverSeq = params.serverSeq;
resolve();
}
});
this._register(this._transport.onClose(() => {
listener.dispose();
reject(new Error('Connection closed during handshake'));
}));
});
}) as { serverSeq: number };
this._serverSeq = result.serverSeq;
}
/**

View File

@@ -129,8 +129,8 @@ ActiveTurn {
userMessage: UserMessage
streamingText: string
responseParts: ResponsePart[]
toolCalls: Map<toolCallId, ToolCallState>
pendingPermissions: Map<requestId, PermissionRequest>
toolCalls: Record<toolCallId, ToolCallState>
pendingPermissions: Record<requestId, PermissionRequest>
reasoning: string
usage: UsageInfo | undefined
}
@@ -169,6 +169,7 @@ ActionEnvelope {
action: Action
serverSeq: number // monotonic, assigned by server
origin: { clientId: string, clientSeq: number } | undefined // undefined = server-originated
rejectionReason?: string // present when the server rejected the action
}
```
@@ -258,19 +259,21 @@ The protocol uses **JSON-RPC 2.0** framing over the transport (WebSocket, Messag
### Message categories
- **Client → Server notifications** (fire-and-forget): `initialize`, `reconnect`, `unsubscribe`, `dispatchAction`
- **Client → Server requests** (expect a correlated response): `subscribe`, `createSession`, `disposeSession`, `listSessions`, `fetchTurns`, `fetchContent`
- **Server → Client notifications** (pushed): `serverHello`, `reconnectResponse`, `action`, `notification`
- **Client → Server notifications** (fire-and-forget): `unsubscribe`, `dispatchAction`
- **Client → Server requests** (expect a correlated response): `initialize`, `reconnect`, `subscribe`, `createSession`, `disposeSession`, `listSessions`, `fetchTurns`, `fetchContent`
- **Server → Client notifications** (pushed): `action`, `notification`
- **Server → Client responses** (correlated to requests by `id`): success result or JSON-RPC error
### Connection handshake
`initialize` is a JSON-RPC **request** — the server MUST respond with a result or error:
```
1. Client → Server: { "jsonrpc": "2.0", "method": "initialize", "params": { protocolVersion, clientId, initialSubscriptions? } }
2. Server → Client: { "jsonrpc": "2.0", "method": "serverHello", "params": { protocolVersion, serverSeq, snapshots[] } }
1. Client → Server: { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { protocolVersion, clientId, initialSubscriptions? } }
2. Server → Client: { "jsonrpc": "2.0", "id": 1, "result": { protocolVersion, serverSeq, snapshots[] } }
```
`initialSubscriptions` allows the client to subscribe to root state (and any previously-open sessions on reconnect) in the same round-trip as the handshake. The server responds with snapshots for each.
`initialSubscriptions` allows the client to subscribe to root state (and any previously-open sessions on reconnect) in the same round-trip as the handshake. The server returns snapshots for each in the response.
### URI subscription
@@ -331,11 +334,23 @@ Client → Server: { "jsonrpc": "2.0", "method": "dispatchAction", "params": {
### Reconnection
`reconnect` is a JSON-RPC **request**. The server MUST include all replayed data in the response:
```
Client → Server: { "jsonrpc": "2.0", "method": "reconnect", "params": { clientId, lastSeenServerSeq, subscriptions } }
Client → Server: { "jsonrpc": "2.0", "id": 2, "method": "reconnect", "params": { clientId, lastSeenServerSeq, subscriptions } }
```
Server replays actions since `lastSeenServerSeq` from a bounded replay buffer. If the gap exceeds the buffer, sends fresh snapshots via a `reconnectResponse` notification. Notifications are **not** replayed — the client should re-fetch the session list.
If the gap is within the replay buffer, the response contains missed action envelopes:
```
Server → Client: { "jsonrpc": "2.0", "id": 2, "result": { "type": "replay", "actions": [...] } }
```
If the gap exceeds the buffer, the response contains fresh snapshots:
```
Server → Client: { "jsonrpc": "2.0", "id": 2, "result": { "type": "snapshot", "snapshots": [...] } }
```
Protocol notifications are **not** replayed — the client should re-fetch the session list.
## Write-ahead reconciliation
@@ -352,7 +367,7 @@ When the client receives an `ActionEnvelope` from the server:
1. **Own action echoed**: `origin.clientId === myId` and matches head of `pendingActions` → pop from pending, apply to `confirmedState`
2. **Foreign action**: different origin → apply to `confirmedState`, rebase remaining `pendingActions`
3. **Rejected action**: server echoed with `rejected: true` → remove from pending (optimistic effect reverted)
3. **Rejected action**: server echoed with `rejectionReason` present → remove from pending (optimistic effect reverted). The `rejectionReason` MAY be surfaced to the user.
4. Recompute `optimisticState` from `confirmedState` + remaining `pendingActions`
### Why rebasing is simple
@@ -444,7 +459,7 @@ interface ProtocolCapabilities {
### Forward compatibility
A newer client connecting to an older server:
1. During handshake, the client learns the server's protocol version.
1. During handshake, the client learns the server's protocol version from the `initialize` response.
2. The client derives `ProtocolCapabilities` from the server version.
3. Command factories check capabilities before dispatching; if unsupported, the client degrades gracefully.
4. The server only sends action types known to the client's declared version (via `isActionKnownToVersion`).

View File

@@ -10,7 +10,7 @@ import { URI } from '../../../../base/common/uri.js';
import { ensureNoDisposablesAreLeakedInTestSuite } from '../../../../base/test/common/utils.js';
import { NullLogService } from '../../../log/common/log.js';
import type { ISessionAction } from '../../common/state/sessionActions.js';
import { isJsonRpcNotification, isJsonRpcResponse, type ICreateSessionParams, type IProtocolMessage, type IProtocolNotification, type IServerHelloParams, type IStateSnapshot } from '../../common/state/sessionProtocol.js';
import { isJsonRpcNotification, isJsonRpcResponse, type ICreateSessionParams, type IInitializeResult, type IProtocolMessage, type IProtocolNotification, type IReconnectResult, type IStateSnapshot } from '../../common/state/sessionProtocol.js';
import { SessionStatus, type ISessionSummary } from '../../common/state/sessionState.js';
import { PROTOCOL_VERSION } from '../../common/state/sessionCapabilities.js';
import type { IProtocolServer, IProtocolTransport } from '../../common/state/sessionTransport.js';
@@ -117,7 +117,7 @@ suite('ProtocolServerHandler', () => {
function connectClient(clientId: string, initialSubscriptions?: readonly URI[]): MockProtocolTransport {
const transport = new MockProtocolTransport();
server.simulateConnection(transport);
transport.simulateMessage(notification('initialize', {
transport.simulateMessage(request(1, 'initialize', {
protocolVersion: PROTOCOL_VERSION,
clientId,
initialSubscriptions,
@@ -144,14 +144,14 @@ suite('ProtocolServerHandler', () => {
ensureNoDisposablesAreLeakedInTestSuite();
test('handshake sends serverHello notification', () => {
test('handshake returns initialize response', () => {
const transport = connectClient('client-1');
const hello = findNotification(transport.sent, 'serverHello');
assert.ok(hello, 'should have sent serverHello');
const params = hello.params as IServerHelloParams;
assert.strictEqual(params.protocolVersion, PROTOCOL_VERSION);
assert.strictEqual(params.serverSeq, stateManager.serverSeq);
const resp = findResponse(transport.sent, 1);
assert.ok(resp, 'should have sent initialize response');
const result = (resp as { result: IInitializeResult }).result;
assert.strictEqual(result.protocolVersion, PROTOCOL_VERSION);
assert.strictEqual(result.serverSeq, stateManager.serverSeq);
});
test('handshake with initialSubscriptions returns snapshots', () => {
@@ -159,11 +159,11 @@ suite('ProtocolServerHandler', () => {
const transport = connectClient('client-1', [sessionUri]);
const hello = findNotification(transport.sent, 'serverHello');
assert.ok(hello);
const params = hello.params as IServerHelloParams;
assert.strictEqual(params.snapshots.length, 1);
assert.strictEqual(params.snapshots[0].resource.toString(), sessionUri.toString());
const resp = findResponse(transport.sent, 1);
assert.ok(resp);
const result = (resp as { result: IInitializeResult }).result;
assert.strictEqual(result.snapshots.length, 1);
assert.strictEqual(result.snapshots[0].resource.toString(), sessionUri.toString());
});
test('subscribe request returns snapshot', async () => {
@@ -249,8 +249,8 @@ suite('ProtocolServerHandler', () => {
stateManager.dispatchServerAction({ type: 'session/ready', session: sessionUri });
const transport1 = connectClient('client-r', [sessionUri]);
const hello = findNotification(transport1.sent, 'serverHello');
const helloSeq = (hello!.params as IServerHelloParams).serverSeq;
const resp = findResponse(transport1.sent, 1);
const initSeq = (resp as { result: IInitializeResult }).result.serverSeq;
transport1.simulateClose();
stateManager.dispatchServerAction({ type: 'session/titleChanged', session: sessionUri, title: 'Title A' });
@@ -258,14 +258,19 @@ suite('ProtocolServerHandler', () => {
const transport2 = new MockProtocolTransport();
server.simulateConnection(transport2);
transport2.simulateMessage(notification('reconnect', {
transport2.simulateMessage(request(1, 'reconnect', {
clientId: 'client-r',
lastSeenServerSeq: helloSeq,
lastSeenServerSeq: initSeq,
subscriptions: [sessionUri],
}));
const replayed = findNotifications(transport2.sent, 'action');
assert.strictEqual(replayed.length, 2);
const reconnectResp = findResponse(transport2.sent, 1);
assert.ok(reconnectResp, 'should have sent reconnect response');
const result = (reconnectResp as { result: IReconnectResult }).result;
assert.strictEqual(result.type, 'replay');
if (result.type === 'replay') {
assert.strictEqual(result.actions.length, 2);
}
});
test('reconnect sends fresh snapshots when gap too large', () => {
@@ -281,16 +286,19 @@ suite('ProtocolServerHandler', () => {
const transport2 = new MockProtocolTransport();
server.simulateConnection(transport2);
transport2.simulateMessage(notification('reconnect', {
transport2.simulateMessage(request(1, 'reconnect', {
clientId: 'client-g',
lastSeenServerSeq: 0,
subscriptions: [sessionUri],
}));
const reconnectResp = findNotification(transport2.sent, 'reconnectResponse');
assert.ok(reconnectResp, 'should receive a reconnectResponse');
const params = reconnectResp!.params as { snapshots: IStateSnapshot[] };
assert.ok(params.snapshots.length > 0, 'should contain snapshots');
const reconnectResp = findResponse(transport2.sent, 1);
assert.ok(reconnectResp, 'should have sent reconnect response');
const result = (reconnectResp as { result: IReconnectResult }).result;
assert.strictEqual(result.type, 'snapshot');
if (result.type === 'snapshot') {
assert.ok(result.snapshots.length > 0, 'should contain snapshots');
}
});
test('client disconnect cleans up', () => {

View File

@@ -16,13 +16,14 @@ import {
JSON_RPC_PARSE_ERROR,
type IActionBroadcastParams,
type IFetchTurnsResult,
type IInitializeResult,
type IJsonRpcErrorResponse,
type IJsonRpcSuccessResponse,
type IListSessionsResult,
type INotificationBroadcastParams,
type IProtocolMessage,
type IProtocolNotification,
type IServerHelloParams,
type IReconnectResult,
type IStateSnapshot,
} from '../../common/state/sessionProtocol.js';
import type { IDeltaAction, ISessionAddedNotification, ISessionRemovedNotification, IUsageAction } from '../../common/state/sessionActions.js';
@@ -244,8 +245,7 @@ function getActionParams(n: IProtocolNotification): IActionBroadcastParams {
/** Perform handshake, create a session, subscribe, and return its URI. */
async function createAndSubscribeSession(c: TestProtocolClient, clientId: string): Promise<URI> {
c.notify('initialize', { protocolVersion: PROTOCOL_VERSION, clientId });
await c.waitForNotification(n => n.method === 'serverHello');
await c.call('initialize', { protocolVersion: PROTOCOL_VERSION, clientId });
await c.call('createSession', { session: nextSessionUri(), provider: 'mock' });
@@ -299,28 +299,25 @@ suite('Protocol WebSocket E2E', function () {
});
// 1. Handshake
test('handshake returns serverHello with protocol version', async function () {
test('handshake returns initialize response with protocol version', async function () {
this.timeout(5_000);
client.notify('initialize', {
const result = await client.call<IInitializeResult>('initialize', {
protocolVersion: PROTOCOL_VERSION,
clientId: 'test-handshake',
initialSubscriptions: [URI.from({ scheme: 'agenthost', path: '/root' })],
});
const hello = await client.waitForNotification(n => n.method === 'serverHello');
const params = hello.params as IServerHelloParams;
assert.strictEqual(params.protocolVersion, PROTOCOL_VERSION);
assert.ok(params.serverSeq >= 0);
assert.ok(params.snapshots.length >= 1, 'should have root state snapshot');
assert.strictEqual(result.protocolVersion, PROTOCOL_VERSION);
assert.ok(result.serverSeq >= 0);
assert.ok(result.snapshots.length >= 1, 'should have root state snapshot');
});
// 2. Create session
test('create session triggers sessionAdded notification', async function () {
this.timeout(10_000);
client.notify('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-create-session' });
await client.waitForNotification(n => n.method === 'serverHello');
await client.call('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-create-session' });
await client.call('createSession', { session: nextSessionUri(), provider: 'mock' });
@@ -411,8 +408,7 @@ suite('Protocol WebSocket E2E', function () {
test('listSessions returns sessions', async function () {
this.timeout(10_000);
client.notify('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-list-sessions' });
await client.waitForNotification(n => n.method === 'serverHello');
await client.call('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-list-sessions' });
await client.call('createSession', { session: nextSessionUri(), provider: 'mock' });
await client.waitForNotification(n =>
@@ -440,19 +436,16 @@ suite('Protocol WebSocket E2E', function () {
const client2 = new TestProtocolClient(server.port);
await client2.connect();
client2.notify('reconnect', {
const result = await client2.call<IReconnectResult>('reconnect', {
clientId: 'test-reconnect',
lastSeenServerSeq: missedFromSeq,
subscriptions: [sessionUri],
});
await new Promise(resolve => setTimeout(resolve, 500));
const replayed = client2.receivedNotifications();
assert.ok(replayed.length > 0, 'should receive replayed actions or reconnect response');
const hasActions = replayed.some(n => n.method === 'action');
const hasReconnect = replayed.some(n => n.method === 'reconnectResponse');
assert.ok(hasActions || hasReconnect);
assert.ok(result.type === 'replay' || result.type === 'snapshot', 'should receive replay or snapshot');
if (result.type === 'replay') {
assert.ok(result.actions.length > 0, 'should have replayed actions');
}
client2.close();
});
@@ -502,8 +495,7 @@ suite('Protocol WebSocket E2E', function () {
test('createSession with invalid provider does not crash server', async function () {
this.timeout(10_000);
client.notify('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-invalid-create' });
await client.waitForNotification(n => n.method === 'serverHello');
await client.call('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-invalid-create' });
// This should return a JSON-RPC error
let gotError = false;
@@ -534,9 +526,9 @@ suite('Protocol WebSocket E2E', function () {
await new Promise(resolve => setTimeout(resolve, 200));
await client.waitForNotification(n => isActionNotification(n, 'session/turnComplete'));
const result = await client.call<IFetchTurnsResult>('fetchTurns', { session: sessionUri, startTurn: 0, count: 10 });
const result = await client.call<IFetchTurnsResult>('fetchTurns', { session: sessionUri, limit: 10 });
assert.ok(result.turns.length >= 2);
assert.ok(result.totalTurns >= 2);
assert.strictEqual(typeof result.hasMore, 'boolean');
});
// ---- Gap tests: coverage ---------------------------------------------------
@@ -599,8 +591,7 @@ suite('Protocol WebSocket E2E', function () {
const client2 = new TestProtocolClient(server.port);
await client2.connect();
client2.notify('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-multi-client-2' });
await client2.waitForNotification(n => n.method === 'serverHello');
await client2.call('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-multi-client-2' });
await client2.call('subscribe', { resource: sessionUri });
client2.clearReceived();
@@ -627,8 +618,7 @@ suite('Protocol WebSocket E2E', function () {
const client2 = new TestProtocolClient(server.port);
await client2.connect();
client2.notify('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-unsub-helper' });
await client2.waitForNotification(n => n.method === 'serverHello');
await client2.call('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-unsub-helper' });
await client2.call('subscribe', { resource: sessionUri });
dispatchTurnStarted(client2, sessionUri, 'turn-unsub', 'hello', 1);

View File

@@ -351,7 +351,7 @@ export class AgentHostSessionHandler extends Disposable implements IChatSessionC
}
// Handle tool calls — create/finalize ChatToolInvocations
for (const [toolCallId, tc] of activeTurn.toolCalls) {
for (const [toolCallId, tc] of Object.entries(activeTurn.toolCalls)) {
const existing = activeToolInvocations.get(toolCallId);
if (!existing) {
if (tc.status === ToolCallStatus.Running || tc.status === ToolCallStatus.PendingPermission) {
@@ -366,7 +366,7 @@ export class AgentHostSessionHandler extends Disposable implements IChatSessionC
}
// Handle permission requests
for (const [requestId, perm] of activeTurn.pendingPermissions) {
for (const [requestId, perm] of Object.entries(activeTurn.pendingPermissions)) {
if (activePermissions.has(requestId)) {
continue;
}