From 29d9808be826c3688ec00bf20c4d66b73e777468 Mon Sep 17 00:00:00 2001 From: Connor Peet Date: Thu, 26 Mar 2026 09:45:15 -0700 Subject: [PATCH] agentHost: ui side for queued messages (#304954) * agentHost: initial queuing/steering data flows * agentHost: ui side for queued messages Steering messages still don't quite work yet, I need to hook up some more stuff in the CLI for that I believe. * comments * rm dead code --- .../platform/agentHost/common/agentService.ts | 12 +- .../state/protocol/action-origin.generated.ts | 19 +- .../common/state/protocol/actions.ts | 126 +++++- .../common/state/protocol/commands.ts | 2 +- .../agentHost/common/state/protocol/errors.ts | 2 +- .../common/state/protocol/messages.ts | 2 +- .../common/state/protocol/notifications.ts | 2 +- .../common/state/protocol/reducers.ts | 102 ++++- .../agentHost/common/state/protocol/state.ts | 154 +++++++- .../common/state/protocol/version/registry.ts | 7 +- .../agentHost/common/state/sessionActions.ts | 10 + .../common/state/sessionCapabilities.ts | 10 +- .../agentHost/common/state/sessionState.ts | 2 + .../common/state/versions/versionRegistry.ts | 71 ---- .../agentHost/node/agentSideEffects.ts | 105 +++++ .../agentHost/node/copilot/copilotAgent.ts | 26 +- .../agentHost/node/protocolServerHandler.ts | 5 +- .../test/node/agentSideEffects.test.ts | 202 +++++++++- .../platform/agentHost/test/node/mockAgent.ts | 7 +- .../agentHost/agentHostSessionHandler.ts | 371 +++++++++++++++++- .../common/chatService/chatServiceImpl.ts | 78 +++- .../chat/common/chatSessionsService.ts | 7 + .../agentHostChatContribution.test.ts | 133 +++++++ 23 files changed, 1336 insertions(+), 119 deletions(-) delete mode 100644 src/vs/platform/agentHost/common/state/versions/versionRegistry.ts diff --git a/src/vs/platform/agentHost/common/agentService.ts b/src/vs/platform/agentHost/common/agentService.ts index 701d346ff1d..0c99ed3addc 100644 --- a/src/vs/platform/agentHost/common/agentService.ts +++ b/src/vs/platform/agentHost/common/agentService.ts @@ -9,7 +9,7 @@ import { URI } from '../../../base/common/uri.js'; import { createDecorator } from '../../instantiation/common/instantiation.js'; import type { IActionEnvelope, INotification, ISessionAction } from './state/sessionActions.js'; import type { IBrowseDirectoryResult, IFetchContentResult, IStateSnapshot } from './state/sessionProtocol.js'; -import { AttachmentType, type IToolCallResult, type PolicyState, type StringOrMarkdown } from './state/sessionState.js'; +import { AttachmentType, type IPendingMessage, type IToolCallResult, type PolicyState, type StringOrMarkdown } from './state/sessionState.js'; // IPC contract between the renderer and the agent host utility process. // Defines all serializable event types, the IAgent provider interface, @@ -310,6 +310,16 @@ export interface IAgent { /** Send a user message into an existing session. */ sendMessage(session: URI, prompt: string, attachments?: IAgentAttachment[]): Promise; + /** + * Called when the session's pending (steering) message changes. + * The agent harness decides how to react — e.g. inject steering + * mid-turn via `mode: 'immediate'`. + * + * Queued messages are consumed on the server side and are not + * forwarded to the agent; `queuedMessages` will always be empty. + */ + setPendingMessages?(session: URI, steeringMessage: IPendingMessage | undefined, queuedMessages: readonly IPendingMessage[]): void; + /** Retrieve all session events/messages for reconstruction. */ getSessionMessages(session: URI): Promise<(IAgentMessageEvent | IAgentToolStartEvent | IAgentToolCompleteEvent)[]>; diff --git a/src/vs/platform/agentHost/common/state/protocol/action-origin.generated.ts b/src/vs/platform/agentHost/common/state/protocol/action-origin.generated.ts index 307e98daa2a..c65d536c7d3 100644 --- a/src/vs/platform/agentHost/common/state/protocol/action-origin.generated.ts +++ b/src/vs/platform/agentHost/common/state/protocol/action-origin.generated.ts @@ -5,12 +5,12 @@ // allow-any-unicode-comment-file // DO NOT EDIT -- auto-generated by scripts/sync-agent-host-protocol.ts -// Synced from agent-host-protocol @ 69603e5 +// Synced from agent-host-protocol @ 32572fc // Generated from types/actions.ts — do not edit // Run `npm run generate` to regenerate. -import { ActionType, type IStateAction, type IRootAgentsChangedAction, type IRootActiveSessionsChangedAction, type ISessionReadyAction, type ISessionCreationFailedAction, type ISessionTurnStartedAction, type ISessionDeltaAction, type ISessionResponsePartAction, type ISessionToolCallStartAction, type ISessionToolCallDeltaAction, type ISessionToolCallReadyAction, type ISessionToolCallConfirmedAction, type ISessionToolCallCompleteAction, type ISessionToolCallResultConfirmedAction, type ISessionTurnCompleteAction, type ISessionTurnCancelledAction, type ISessionErrorAction, type ISessionTitleChangedAction, type ISessionUsageAction, type ISessionReasoningAction, type ISessionModelChangedAction, type ISessionServerToolsChangedAction, type ISessionActiveClientChangedAction, type ISessionActiveClientToolsChangedAction } from './actions.js'; +import { ActionType, type IStateAction, type IRootAgentsChangedAction, type IRootActiveSessionsChangedAction, type ISessionReadyAction, type ISessionCreationFailedAction, type ISessionTurnStartedAction, type ISessionDeltaAction, type ISessionResponsePartAction, type ISessionToolCallStartAction, type ISessionToolCallDeltaAction, type ISessionToolCallReadyAction, type ISessionToolCallConfirmedAction, type ISessionToolCallCompleteAction, type ISessionToolCallResultConfirmedAction, type ISessionTurnCompleteAction, type ISessionTurnCancelledAction, type ISessionErrorAction, type ISessionTitleChangedAction, type ISessionUsageAction, type ISessionReasoningAction, type ISessionModelChangedAction, type ISessionServerToolsChangedAction, type ISessionActiveClientChangedAction, type ISessionActiveClientToolsChangedAction, type ISessionPendingMessageSetAction, type ISessionPendingMessageRemovedAction, type ISessionQueuedMessagesReorderedAction, type ISessionCustomizationsChangedAction, type ISessionCustomizationToggledAction } from './actions.js'; // ─── Root vs Session Action Unions ─────────────────────────────────────────── @@ -44,6 +44,11 @@ export type ISessionAction = | ISessionServerToolsChangedAction | ISessionActiveClientChangedAction | ISessionActiveClientToolsChangedAction + | ISessionPendingMessageSetAction + | ISessionPendingMessageRemovedAction + | ISessionQueuedMessagesReorderedAction + | ISessionCustomizationsChangedAction + | ISessionCustomizationToggledAction ; /** Union of session actions that clients may dispatch. */ @@ -56,6 +61,10 @@ export type IClientSessionAction = | ISessionModelChangedAction | ISessionActiveClientChangedAction | ISessionActiveClientToolsChangedAction + | ISessionPendingMessageSetAction + | ISessionPendingMessageRemovedAction + | ISessionQueuedMessagesReorderedAction + | ISessionCustomizationToggledAction ; /** Union of session actions that only the server may produce. */ @@ -73,6 +82,7 @@ export type IServerSessionAction = | ISessionUsageAction | ISessionReasoningAction | ISessionServerToolsChangedAction + | ISessionCustomizationsChangedAction ; // ─── Client-Dispatchable Map ───────────────────────────────────────────────── @@ -105,4 +115,9 @@ export const IS_CLIENT_DISPATCHABLE: { readonly [K in IStateAction['type']]: boo [ActionType.SessionServerToolsChanged]: false, [ActionType.SessionActiveClientChanged]: true, [ActionType.SessionActiveClientToolsChanged]: true, + [ActionType.SessionPendingMessageSet]: true, + [ActionType.SessionPendingMessageRemoved]: true, + [ActionType.SessionQueuedMessagesReordered]: true, + [ActionType.SessionCustomizationsChanged]: false, + [ActionType.SessionCustomizationToggled]: true, }; diff --git a/src/vs/platform/agentHost/common/state/protocol/actions.ts b/src/vs/platform/agentHost/common/state/protocol/actions.ts index 769b34629b1..ee22225be03 100644 --- a/src/vs/platform/agentHost/common/state/protocol/actions.ts +++ b/src/vs/platform/agentHost/common/state/protocol/actions.ts @@ -5,9 +5,9 @@ // allow-any-unicode-comment-file // DO NOT EDIT -- auto-generated by scripts/sync-agent-host-protocol.ts -// Synced from agent-host-protocol @ 69603e5 +// Synced from agent-host-protocol @ 32572fc -import { ToolCallConfirmationReason, ToolCallCancellationReason, type URI, type StringOrMarkdown, type IAgentInfo, type IErrorInfo, type IUserMessage, type IResponsePart, type IToolCallResult, type IToolDefinition, type ISessionActiveClient, type IUsageInfo } from './state.js'; +import { ToolCallConfirmationReason, ToolCallCancellationReason, PendingMessageKind, type URI, type StringOrMarkdown, type IAgentInfo, type IErrorInfo, type IUserMessage, type IResponsePart, type IToolCallResult, type IToolDefinition, type ISessionActiveClient, type IUsageInfo, type ISessionCustomization } from './state.js'; // ─── Action Type Enum ──────────────────────────────────────────────────────── @@ -41,6 +41,11 @@ export const enum ActionType { SessionServerToolsChanged = 'session/serverToolsChanged', SessionActiveClientChanged = 'session/activeClientChanged', SessionActiveClientToolsChanged = 'session/activeClientToolsChanged', + SessionPendingMessageSet = 'session/pendingMessageSet', + SessionPendingMessageRemoved = 'session/pendingMessageRemoved', + SessionQueuedMessagesReordered = 'session/queuedMessagesReordered', + SessionCustomizationsChanged = 'session/customizationsChanged', + SessionCustomizationToggled = 'session/customizationToggled', } // ─── Action Envelope ───────────────────────────────────────────────────────── @@ -156,6 +161,8 @@ export interface ISessionTurnStartedAction { turnId: string; /** User's message */ userMessage: IUserMessage; + /** If this turn was auto-started from a queued message, the ID of that message */ + queuedMessageId?: string; } /** @@ -515,6 +522,114 @@ export interface ISessionActiveClientToolsChangedAction { tools: IToolDefinition[]; } +// ─── Customization Actions ─────────────────────────────────────────────────── + +/** + * The session's customizations have changed. + * + * Full-replacement semantics: the `customizations` array replaces the + * previous `customizations` entirely. + * + * @category Session Actions + * @version 1 + */ +export interface ISessionCustomizationsChangedAction { + type: ActionType.SessionCustomizationsChanged; + /** Session URI */ + session: URI; + /** Updated customization list (full replacement) */ + customizations: ISessionCustomization[]; +} + +/** + * A client toggled a customization on or off. + * + * The server locates the customization by `uri` in the session's + * customization list and sets its `enabled` flag. + * + * @category Session Actions + * @version 1 + * @clientDispatchable + */ +export interface ISessionCustomizationToggledAction { + type: ActionType.SessionCustomizationToggled; + /** Session URI */ + session: URI; + /** The URI of the customization to toggle */ + uri: URI; + /** Whether to enable or disable the customization */ + enabled: boolean; +} + +// ─── Pending Message Actions ───────────────────────────────────────────────── + +/** + * A pending message was set (upsert semantics: creates or replaces). + * + * For steering messages, this always replaces the single steering message. + * For queued messages, if a message with the given `id` already exists it is + * updated in place; otherwise it is appended to the queue. If the session is + * idle when a queued message is set, the server SHOULD immediately consume it + * and start a new turn. + * + * @category Session Actions + * @version 1 + * @clientDispatchable + */ +export interface ISessionPendingMessageSetAction { + type: ActionType.SessionPendingMessageSet; + /** Session URI */ + session: URI; + /** Whether this is a steering or queued message */ + kind: PendingMessageKind; + /** Unique identifier for this pending message */ + id: string; + /** The message content */ + userMessage: IUserMessage; +} + +/** + * A pending message was removed (steering or queued). + * + * Dispatched by clients to cancel a pending message, or by the server when + * it consumes a message (e.g. starting a turn from a queued message or + * injecting a steering message into the current turn). + * + * @category Session Actions + * @version 1 + * @clientDispatchable + */ +export interface ISessionPendingMessageRemovedAction { + type: ActionType.SessionPendingMessageRemoved; + /** Session URI */ + session: URI; + /** Whether this is a steering or queued message */ + kind: PendingMessageKind; + /** Identifier of the pending message to remove */ + id: string; +} + +/** + * Reorder the queued messages. + * + * The `order` array contains the IDs of queued messages in their new + * desired order. IDs not present in the current queue are ignored. + * Queued messages whose IDs are absent from `order` are appended at + * the end in their original relative order (so a client with a stale + * view of the queue never silently drops messages). + * + * @category Session Actions + * @version 1 + * @clientDispatchable + */ +export interface ISessionQueuedMessagesReorderedAction { + type: ActionType.SessionQueuedMessagesReordered; + /** Session URI */ + session: URI; + /** Queued message IDs in the desired order */ + order: string[]; +} + // ─── Discriminated Union ───────────────────────────────────────────────────── /** @@ -543,4 +658,9 @@ export type IStateAction = | ISessionModelChangedAction | ISessionServerToolsChangedAction | ISessionActiveClientChangedAction - | ISessionActiveClientToolsChangedAction; + | ISessionActiveClientToolsChangedAction + | ISessionPendingMessageSetAction + | ISessionPendingMessageRemovedAction + | ISessionQueuedMessagesReorderedAction + | ISessionCustomizationsChangedAction + | ISessionCustomizationToggledAction; diff --git a/src/vs/platform/agentHost/common/state/protocol/commands.ts b/src/vs/platform/agentHost/common/state/protocol/commands.ts index ba6ef070667..1793016f86b 100644 --- a/src/vs/platform/agentHost/common/state/protocol/commands.ts +++ b/src/vs/platform/agentHost/common/state/protocol/commands.ts @@ -5,7 +5,7 @@ // allow-any-unicode-comment-file // DO NOT EDIT -- auto-generated by scripts/sync-agent-host-protocol.ts -// Synced from agent-host-protocol @ 69603e5 +// Synced from agent-host-protocol @ 32572fc import type { URI, ISnapshot, ISessionSummary, ITurn } from './state.js'; import type { IActionEnvelope, IStateAction } from './actions.js'; diff --git a/src/vs/platform/agentHost/common/state/protocol/errors.ts b/src/vs/platform/agentHost/common/state/protocol/errors.ts index e33c52c2343..cfa1291eaab 100644 --- a/src/vs/platform/agentHost/common/state/protocol/errors.ts +++ b/src/vs/platform/agentHost/common/state/protocol/errors.ts @@ -5,7 +5,7 @@ // allow-any-unicode-comment-file // DO NOT EDIT -- auto-generated by scripts/sync-agent-host-protocol.ts -// Synced from agent-host-protocol @ 69603e5 +// Synced from agent-host-protocol @ 32572fc // ─── Standard JSON-RPC Codes ───────────────────────────────────────────────── diff --git a/src/vs/platform/agentHost/common/state/protocol/messages.ts b/src/vs/platform/agentHost/common/state/protocol/messages.ts index 1e053c29579..e3cd6567b10 100644 --- a/src/vs/platform/agentHost/common/state/protocol/messages.ts +++ b/src/vs/platform/agentHost/common/state/protocol/messages.ts @@ -5,7 +5,7 @@ // allow-any-unicode-comment-file // DO NOT EDIT -- auto-generated by scripts/sync-agent-host-protocol.ts -// Synced from agent-host-protocol @ 69603e5 +// Synced from agent-host-protocol @ 32572fc import type { IInitializeParams, IInitializeResult, IReconnectParams, IReconnectResult, ISubscribeParams, ISubscribeResult, ICreateSessionParams, IDisposeSessionParams, IListSessionsParams, IListSessionsResult, IFetchContentParams, IFetchContentResult, IBrowseDirectoryParams, IBrowseDirectoryResult, IFetchTurnsParams, IFetchTurnsResult, IUnsubscribeParams, IDispatchActionParams, IAuthenticateParams, IAuthenticateResult } from './commands.js'; diff --git a/src/vs/platform/agentHost/common/state/protocol/notifications.ts b/src/vs/platform/agentHost/common/state/protocol/notifications.ts index ec2a1878c9f..7e255adac8f 100644 --- a/src/vs/platform/agentHost/common/state/protocol/notifications.ts +++ b/src/vs/platform/agentHost/common/state/protocol/notifications.ts @@ -5,7 +5,7 @@ // allow-any-unicode-comment-file // DO NOT EDIT -- auto-generated by scripts/sync-agent-host-protocol.ts -// Synced from agent-host-protocol @ 69603e5 +// Synced from agent-host-protocol @ 32572fc import type { URI, ISessionSummary } from './state.js'; diff --git a/src/vs/platform/agentHost/common/state/protocol/reducers.ts b/src/vs/platform/agentHost/common/state/protocol/reducers.ts index 581201add39..39a6aec57e5 100644 --- a/src/vs/platform/agentHost/common/state/protocol/reducers.ts +++ b/src/vs/platform/agentHost/common/state/protocol/reducers.ts @@ -5,10 +5,10 @@ // allow-any-unicode-comment-file // DO NOT EDIT -- auto-generated by scripts/sync-agent-host-protocol.ts -// Synced from agent-host-protocol @ 69603e5 +// Synced from agent-host-protocol @ 32572fc import { ActionType } from './actions.js'; -import { SessionLifecycle, SessionStatus, TurnState, ToolCallStatus, ToolCallConfirmationReason, ToolCallCancellationReason, ResponsePartKind, type IRootState, type ISessionState, type IToolCallState, type IResponsePart, type IToolCallResponsePart, type ITurn } from './state.js'; +import { SessionLifecycle, SessionStatus, TurnState, ToolCallStatus, ToolCallConfirmationReason, ToolCallCancellationReason, ResponsePartKind, PendingMessageKind, type IRootState, type ISessionState, type IToolCallState, type IResponsePart, type IToolCallResponsePart, type ITurn, type IPendingMessage } from './state.js'; import { IS_CLIENT_DISPATCHABLE, type IRootAction, type ISessionAction, type IClientSessionAction } from './action-origin.generated.js'; // ─── Helpers ───────────────────────────────────────────────────────────────── @@ -22,7 +22,7 @@ import { IS_CLIENT_DISPATCHABLE, type IRootAction, type ISessionAction, type ICl * clients receiving unknown actions from a newer server degrade gracefully. */ export function softAssertNever(value: never, log?: (msg: string) => void): void { - const msg = `Unhandled action type: ${(value as { type: string }).type}`; + const msg = `Unhandled action type: ${JSON.stringify(value)}`; (log ?? console.warn)(msg); } @@ -216,8 +216,8 @@ export function sessionReducer(state: ISessionState, action: ISessionAction, log // ── Turn Lifecycle ──────────────────────────────────────────────────── - case ActionType.SessionTurnStarted: - return { + case ActionType.SessionTurnStarted: { + let next: ISessionState = { ...state, summary: { ...state.summary, status: SessionStatus.InProgress, modifiedAt: Date.now() }, activeTurn: { @@ -228,6 +228,20 @@ export function sessionReducer(state: ISessionState, action: ISessionAction, log }, }; + // If this turn was auto-started from a pending message, remove it + if (action.queuedMessageId) { + if (next.steeringMessage?.id === action.queuedMessageId) { + next = { ...next, steeringMessage: undefined }; + } + if (next.queuedMessages) { + const filtered = next.queuedMessages.filter(m => m.id !== action.queuedMessageId); + next = { ...next, queuedMessages: filtered.length > 0 ? filtered : undefined }; + } + } + + return next; + } + case ActionType.SessionDelta: return updateResponsePart(state, action.turnId, action.partId, part => { if (part.kind === ResponsePartKind.Markdown) { @@ -453,6 +467,84 @@ export function sessionReducer(state: ISessionState, action: ISessionAction, log activeClient: { ...state.activeClient, tools: action.tools }, }; + // ── Customizations ────────────────────────────────────────────────── + + case ActionType.SessionCustomizationsChanged: + return { ...state, customizations: action.customizations }; + + case ActionType.SessionCustomizationToggled: { + const list = state.customizations; + if (!list) { + return state; + } + const idx = list.findIndex(c => c.customization.uri === action.uri); + if (idx < 0) { + return state; + } + const updated = [...list]; + updated[idx] = { ...list[idx], enabled: action.enabled }; + return { ...state, customizations: updated }; + } + + // ── Pending Messages ────────────────────────────────────────────────── + + case ActionType.SessionPendingMessageSet: { + const entry: IPendingMessage = { id: action.id, userMessage: action.userMessage }; + if (action.kind === PendingMessageKind.Steering) { + return { ...state, steeringMessage: entry }; + } + const existing = state.queuedMessages ?? []; + const idx = existing.findIndex(m => m.id === action.id); + if (idx >= 0) { + const updated = [...existing]; + updated[idx] = entry; + return { ...state, queuedMessages: updated }; + } + return { ...state, queuedMessages: [...existing, entry] }; + } + + case ActionType.SessionPendingMessageRemoved: { + if (action.kind === PendingMessageKind.Steering) { + if (!state.steeringMessage || state.steeringMessage.id !== action.id) { + return state; + } + return { ...state, steeringMessage: undefined }; + } + const existing = state.queuedMessages; + if (!existing) { + return state; + } + const filtered = existing.filter(m => m.id !== action.id); + return filtered.length === existing.length + ? state + : { ...state, queuedMessages: filtered.length > 0 ? filtered : undefined }; + } + + case ActionType.SessionQueuedMessagesReordered: { + const existing = state.queuedMessages; + if (!existing) { + return state; + } + const byId = new Map(existing.map(m => [m.id, m])); + const ordered = new Set(); + const reordered = action.order + .filter(id => { + if (byId.has(id) && !ordered.has(id)) { + ordered.add(id); + return true; + } + return false; + }) + .map(id => byId.get(id)!); + // Append any messages not mentioned in order, preserving original order + for (const m of existing) { + if (!ordered.has(m.id)) { + reordered.push(m); + } + } + return { ...state, queuedMessages: reordered }; + } + default: softAssertNever(action, log); return state; diff --git a/src/vs/platform/agentHost/common/state/protocol/state.ts b/src/vs/platform/agentHost/common/state/protocol/state.ts index 1cd9bb66cc4..42a5f0138f5 100644 --- a/src/vs/platform/agentHost/common/state/protocol/state.ts +++ b/src/vs/platform/agentHost/common/state/protocol/state.ts @@ -5,7 +5,7 @@ // allow-any-unicode-comment-file // DO NOT EDIT -- auto-generated by scripts/sync-agent-host-protocol.ts -// Synced from agent-host-protocol @ 69603e5 +// Synced from agent-host-protocol @ 32572fc // ─── Type Aliases ──────────────────────────────────────────────────────────── @@ -20,6 +20,50 @@ export type URI = string; */ export type StringOrMarkdown = string | { markdown: string }; +// ─── Icon ──────────────────────────────────────────────────────────────────── + +/** + * An optionally-sized icon that can be displayed in a user interface. + * + * @category Common Types + */ +export interface Icon { + /** + * A standard URI pointing to an icon resource. May be an HTTP/HTTPS URL or a + * `data:` URI with Base64-encoded image data. + * + * Consumers SHOULD take steps to ensure URLs serving icons are from the + * same domain as the client/server or a trusted domain. + * + * Consumers SHOULD take appropriate precautions when consuming SVGs as they can contain + * executable JavaScript. + */ + src: URI; + + /** + * Optional MIME type override if the source MIME type is missing or generic. + * For example: `"image/png"`, `"image/jpeg"`, or `"image/svg+xml"`. + */ + contentType?: string; + + /** + * Optional array of strings that specify sizes at which the icon can be used. + * Each string should be in WxH format (e.g., `"48x48"`, `"96x96"`) or `"any"` for scalable formats like SVG. + * + * If not provided, the client should assume that the icon can be used at any size. + */ + sizes?: string[]; + + /** + * Optional specifier for the theme this icon is designed for. `"light"` indicates + * the icon is designed to be used with a light background, and `"dark"` indicates + * the icon is designed to be used with a dark background. + * + * If not provided, the client should assume the icon can be used with any theme. + */ + theme?: 'light' | 'dark'; +} + // ─── Protected Resource Metadata (RFC 9728) ───────────────────────────────── /** @@ -135,6 +179,13 @@ export interface IAgentInfo { * @see {@link /specification/authentication | Authentication} */ protectedResources?: IProtectedResourceMetadata[]; + /** + * Customizations (Open Plugins) associated with this agent. + * + * Each entry is a reference to an [Open Plugins](https://open-plugins.com/) + * plugin that the agent host can activate for sessions using this agent. + */ + customizations?: ICustomizationRef[]; } /** @@ -155,6 +206,36 @@ export interface ISessionModelInfo { policyState?: PolicyState; } +// ─── Pending Message Types ─────────────────────────────────────────────────── + +/** + * Discriminant for pending message kinds. + * + * @category Pending Message Types + */ +export const enum PendingMessageKind { + /** Injected into the current turn at a convenient point */ + Steering = 'steering', + /** Sent automatically as a new turn after the current turn finishes */ + Queued = 'queued', +} + +/** + * A message queued for future delivery to the agent. + * + * Steering messages are injected into the current turn mid-flight. + * Queued messages are automatically started as new turns after the + * current turn naturally finishes. + * + * @category Pending Message Types + */ +export interface IPendingMessage { + /** Unique identifier for this pending message */ + id: string; + /** The message content */ + userMessage: IUserMessage; +} + // ─── Session State ─────────────────────────────────────────────────────────── /** @@ -201,6 +282,17 @@ export interface ISessionState { turns: ITurn[]; /** Currently in-progress turn */ activeTurn?: IActiveTurn; + /** Message to inject into the current turn at a convenient point */ + steeringMessage?: IPendingMessage; + /** Messages to send automatically as new turns after the current turn finishes */ + queuedMessages?: IPendingMessage[]; + /** + * Server-provided customizations active in this session. + * + * Client-provided customizations are available on + * {@link ISessionActiveClient.customizations | activeClient.customizations}. + */ + customizations?: ISessionCustomization[]; } /** @@ -218,6 +310,8 @@ export interface ISessionActiveClient { displayName?: string; /** Tools this client provides to the session */ tools: IToolDefinition[]; + /** Customizations this client contributes to the session */ + customizations?: ICustomizationRef[]; } /** @@ -762,6 +856,64 @@ export type IToolResultContent = | IToolResultFileEditContent | IContentRef; +// ─── Customization Types ───────────────────────────────────────────────────── + +/** + * A reference to an [Open Plugins](https://open-plugins.com/) plugin. + * + * This is intentionally thin — AHP specifies plugin identity and metadata + * but not implementation details, which are defined by the Open Plugins spec. + * + * @category Customization Types + */ +export interface ICustomizationRef { + /** Plugin URI (e.g. an HTTPS URL or marketplace identifier) */ + uri: URI; + /** Human-readable name */ + displayName: string; + /** Description of what the plugin provides */ + description?: string; + /** Icons for the plugin */ + icons?: Icon[]; +} + +/** + * Loading status for a server-managed customization. + * + * @category Customization Types + */ +export const enum CustomizationStatus { + /** Plugin is being loaded */ + Loading = 'loading', + /** Plugin is fully operational */ + Loaded = 'loaded', + /** Plugin partially loaded but has warnings */ + Degraded = 'degraded', + /** Plugin was unable to load */ + Error = 'error', +} + +/** + * A customization active in a session. + * + * Entries without a `clientId` are server-provided; entries with a `clientId` + * originate from that client. + * + * @category Customization Types + */ +export interface ISessionCustomization { + /** The plugin this customization refers to */ + customization: ICustomizationRef; + /** Whether this customization is currently enabled */ + enabled: boolean; + /** Server-reported loading status */ + status?: CustomizationStatus; + /** + * Human-readable status detail (e.g. error message or degradation warning). + */ + statusMessage?: string; +} + // ─── Common Types ──────────────────────────────────────────────────────────── /** diff --git a/src/vs/platform/agentHost/common/state/protocol/version/registry.ts b/src/vs/platform/agentHost/common/state/protocol/version/registry.ts index 5a5ce1a5380..14833d6e340 100644 --- a/src/vs/platform/agentHost/common/state/protocol/version/registry.ts +++ b/src/vs/platform/agentHost/common/state/protocol/version/registry.ts @@ -5,7 +5,7 @@ // allow-any-unicode-comment-file // DO NOT EDIT -- auto-generated by scripts/sync-agent-host-protocol.ts -// Synced from agent-host-protocol @ 69603e5 +// Synced from agent-host-protocol @ 32572fc import { ActionType, type IStateAction } from '../actions.js'; import { NotificationType, type IProtocolNotification } from '../notifications.js'; @@ -48,6 +48,11 @@ export const ACTION_INTRODUCED_IN: { readonly [K in IStateAction['type']]: numbe [ActionType.SessionServerToolsChanged]: 1, [ActionType.SessionActiveClientChanged]: 1, [ActionType.SessionActiveClientToolsChanged]: 1, + [ActionType.SessionPendingMessageSet]: 1, + [ActionType.SessionPendingMessageRemoved]: 1, + [ActionType.SessionQueuedMessagesReordered]: 1, + [ActionType.SessionCustomizationsChanged]: 1, + [ActionType.SessionCustomizationToggled]: 1, }; /** diff --git a/src/vs/platform/agentHost/common/state/sessionActions.ts b/src/vs/platform/agentHost/common/state/sessionActions.ts index d63ec1d7e5e..42f872196df 100644 --- a/src/vs/platform/agentHost/common/state/sessionActions.ts +++ b/src/vs/platform/agentHost/common/state/sessionActions.ts @@ -42,6 +42,9 @@ export { type ISessionServerToolsChangedAction, type ISessionActiveClientChangedAction, type ISessionActiveClientToolsChangedAction, + type ISessionPendingMessageSetAction, + type ISessionPendingMessageRemovedAction, + type ISessionQueuedMessagesReorderedAction, type IStateAction, } from './protocol/actions.js'; @@ -75,6 +78,9 @@ import type { ISessionTurnStartedAction, ISessionUsageAction, IStateAction, + ISessionPendingMessageSetAction, + ISessionPendingMessageRemovedAction, + ISessionQueuedMessagesReorderedAction, } from './protocol/actions.js'; import type { IProtocolNotification } from './protocol/notifications.js'; @@ -108,6 +114,10 @@ export type IUsageAction = ISessionUsageAction; export type IReasoningAction = ISessionReasoningAction; export type IModelChangedAction = ISessionModelChangedAction; +export type IPendingMessageSetAction = ISessionPendingMessageSetAction; +export type IPendingMessageRemovedAction = ISessionPendingMessageRemovedAction; +export type IQueuedMessagesReorderedAction = ISessionQueuedMessagesReorderedAction; + // Notifications export type INotification = IProtocolNotification; diff --git a/src/vs/platform/agentHost/common/state/sessionCapabilities.ts b/src/vs/platform/agentHost/common/state/sessionCapabilities.ts index b10b8ca4664..dad2824e77d 100644 --- a/src/vs/platform/agentHost/common/state/sessionCapabilities.ts +++ b/src/vs/platform/agentHost/common/state/sessionCapabilities.ts @@ -10,14 +10,8 @@ // versions/versionRegistry.ts. This file re-exports them and provides the // capability-object API that client code uses to gate features. -export { - ACTION_INTRODUCED_IN, - isActionKnownToVersion, - isNotificationKnownToVersion, - MIN_PROTOCOL_VERSION, - NOTIFICATION_INTRODUCED_IN, - PROTOCOL_VERSION, -} from './versions/versionRegistry.js'; +export const PROTOCOL_VERSION = 1; +export const MIN_PROTOCOL_VERSION = 1; /** * Capabilities derived from a protocol version. diff --git a/src/vs/platform/agentHost/common/state/sessionState.ts b/src/vs/platform/agentHost/common/state/sessionState.ts index 6915161eac9..37a1080211a 100644 --- a/src/vs/platform/agentHost/common/state/sessionState.ts +++ b/src/vs/platform/agentHost/common/state/sessionState.ts @@ -61,9 +61,11 @@ export { type ITurn, type IUsageInfo, type IUserMessage, + type IPendingMessage, type StringOrMarkdown, type URI, AttachmentType, + PendingMessageKind, PolicyState, ResponsePartKind, SessionLifecycle, diff --git a/src/vs/platform/agentHost/common/state/versions/versionRegistry.ts b/src/vs/platform/agentHost/common/state/versions/versionRegistry.ts deleted file mode 100644 index 0ea965c5542..00000000000 --- a/src/vs/platform/agentHost/common/state/versions/versionRegistry.ts +++ /dev/null @@ -1,71 +0,0 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. See License.txt in the project root for license information. - *--------------------------------------------------------------------------------------------*/ - -// Version registry: re-exports protocol version constants and provides -// runtime action-filtering helpers using the local action union types. - -import type { INotification, IStateAction } from '../sessionActions.js'; - -// Re-export version constants from the protocol. -export { MIN_PROTOCOL_VERSION, PROTOCOL_VERSION } from '../protocol/version/registry.js'; - -// ---- Runtime action → version map ------------------------------------------- - -/** Maps every action type string to the protocol version that introduced it. */ -export const ACTION_INTRODUCED_IN: { readonly [K in IStateAction['type']]: number } = { - // Root actions (v1) - 'root/agentsChanged': 1, - 'root/activeSessionsChanged': 1, - // Session lifecycle (v1) - 'session/ready': 1, - 'session/creationFailed': 1, - // Turn lifecycle (v1) - 'session/turnStarted': 1, - 'session/delta': 1, - 'session/responsePart': 1, - // Tool calls (v1) - 'session/toolCallStart': 1, - 'session/toolCallDelta': 1, - 'session/toolCallReady': 1, - 'session/toolCallConfirmed': 1, - 'session/toolCallComplete': 1, - 'session/toolCallResultConfirmed': 1, - // Turn completion (v1) - 'session/turnComplete': 1, - 'session/turnCancelled': 1, - 'session/error': 1, - // Metadata & informational (v1) - 'session/titleChanged': 1, - 'session/usage': 1, - 'session/reasoning': 1, - 'session/modelChanged': 1, - // Server tools & active client (v1) - 'session/serverToolsChanged': 1, - 'session/activeClientChanged': 1, - 'session/activeClientToolsChanged': 1, -}; - -/** Maps every notification type string to the protocol version that introduced it. */ -export const NOTIFICATION_INTRODUCED_IN: { readonly [K in INotification['type']]: number } = { - 'notify/sessionAdded': 1, - 'notify/sessionRemoved': 1, - 'notify/authRequired': 1, -}; - -// ---- Runtime filtering helpers ---------------------------------------------- - -/** - * Returns `true` if the given action type is known to a client at `clientVersion`. - */ -export function isActionKnownToVersion(action: IStateAction, clientVersion: number): boolean { - return ACTION_INTRODUCED_IN[action.type] <= clientVersion; -} - -/** - * Returns `true` if the given notification type is known to a client at `clientVersion`. - */ -export function isNotificationKnownToVersion(notification: INotification, clientVersion: number): boolean { - return NOTIFICATION_INTRODUCED_IN[notification.type] <= clientVersion; -} diff --git a/src/vs/platform/agentHost/node/agentSideEffects.ts b/src/vs/platform/agentHost/node/agentSideEffects.ts index 5a01e7f4201..b7592c59f47 100644 --- a/src/vs/platform/agentHost/node/agentSideEffects.ts +++ b/src/vs/platform/agentHost/node/agentSideEffects.ts @@ -15,6 +15,7 @@ import { ISessionDataService } from '../common/sessionDataService.js'; import { ActionType, ISessionAction } from '../common/state/sessionActions.js'; import { AhpErrorCodes, AHP_PROVIDER_NOT_FOUND, AHP_SESSION_NOT_FOUND, ContentEncoding, IBrowseDirectoryResult, ICreateSessionParams, IDirectoryEntry, IFetchContentResult, JSON_RPC_INTERNAL_ERROR, ProtocolError } from '../common/state/sessionProtocol.js'; import { + PendingMessageKind, ResponsePartKind, SessionStatus, ToolCallConfirmationReason, @@ -133,6 +134,11 @@ export class AgentSideEffects extends Disposable implements IProtocolSideEffectH } } } + + // After a turn completes (idle event), try to consume the next queued message + if (e.type === 'idle') { + this._tryConsumeNextQueuedMessage(sessionKey); + } })); return disposables; } @@ -198,9 +204,108 @@ export class AgentSideEffects extends Disposable implements IProtocolSideEffectH }); break; } + case ActionType.SessionPendingMessageSet: + case ActionType.SessionPendingMessageRemoved: + case ActionType.SessionQueuedMessagesReordered: { + this._syncPendingMessages(action.session); + break; + } } } + /** + * Pushes the current pending message state from the session to the agent. + * The server controls queued message consumption; only steering messages + * are forwarded to the agent for mid-turn injection. + */ + private _syncPendingMessages(session: ProtocolURI): void { + const state = this._stateManager.getSessionState(session); + if (!state) { + return; + } + const agent = this._options.getAgent(session); + agent?.setPendingMessages?.( + URI.parse(session), + state.steeringMessage, + [], + ); + + // Steering messages are consumed immediately by the agent; + // remove from protocol state so clients see the consumption. + if (state.steeringMessage) { + this._stateManager.dispatchServerAction({ + type: ActionType.SessionPendingMessageRemoved, + session, + kind: PendingMessageKind.Steering, + id: state.steeringMessage.id, + }); + } + + // If the session is idle, try to consume the next queued message + this._tryConsumeNextQueuedMessage(session); + } + + /** + * Consumes the next queued message by dispatching a server-initiated + * `SessionTurnStarted` action with `queuedMessageId` set. The reducer + * atomically creates the active turn and removes the message from the + * queue. Only consumes one message at a time; subsequent messages are + * consumed when the next `idle` event fires. + */ + private _tryConsumeNextQueuedMessage(session: ProtocolURI): void { + // Bail if there's already an active turn + if (this._stateManager.getActiveTurnId(session)) { + return; + } + const state = this._stateManager.getSessionState(session); + if (!state?.queuedMessages?.length) { + return; + } + + const msg = state.queuedMessages[0]; + const turnId = generateUuid(); + + // Reset event mappers for the new turn (same as handleAction does for SessionTurnStarted) + for (const mapper of this._eventMappers.values()) { + mapper.reset(session); + } + + // Dispatch server-initiated turn start; the reducer removes the queued message atomically + this._stateManager.dispatchServerAction({ + type: ActionType.SessionTurnStarted, + session, + turnId, + userMessage: msg.userMessage, + queuedMessageId: msg.id, + }); + + // Send the message to the agent backend + const agent = this._options.getAgent(session); + if (!agent) { + this._stateManager.dispatchServerAction({ + type: ActionType.SessionError, + session, + turnId, + error: { errorType: 'noAgent', message: 'No agent found for session' }, + }); + return; + } + const attachments = msg.userMessage.attachments?.map((a): IAgentAttachment => ({ + type: a.type, + path: a.path, + displayName: a.displayName, + })); + agent.sendMessage(URI.parse(session), msg.userMessage.text, attachments).catch(err => { + this._logService.error('[AgentSideEffects] sendMessage failed (queued)', err); + this._stateManager.dispatchServerAction({ + type: ActionType.SessionError, + session, + turnId, + error: { errorType: 'sendFailed', message: String(err) }, + }); + }); + } + async handleCreateSession(command: ICreateSessionParams): Promise { const provider = command.provider; if (!provider) { diff --git a/src/vs/platform/agentHost/node/copilot/copilotAgent.ts b/src/vs/platform/agentHost/node/copilot/copilotAgent.ts index a4bfbf4a65f..31a4fad42e9 100644 --- a/src/vs/platform/agentHost/node/copilot/copilotAgent.ts +++ b/src/vs/platform/agentHost/node/copilot/copilotAgent.ts @@ -17,7 +17,7 @@ import { ILogService } from '../../../log/common/log.js'; import { localize } from '../../../../nls.js'; import { AgentSession, IAgent, IAgentAttachment, IAgentCreateSessionConfig, IAgentDescriptor, IAgentMessageEvent, IAgentModelInfo, IAgentProgressEvent, IAgentSessionMetadata, IAgentToolCompleteEvent, IAgentToolStartEvent } from '../../common/agentService.js'; import { ISessionDataService } from '../../common/sessionDataService.js'; -import { ToolResultContentType, type IToolResultContent, type PolicyState } from '../../common/state/sessionState.js'; +import { ToolResultContentType, type IPendingMessage, type IToolResultContent, type PolicyState } from '../../common/state/sessionState.js'; import { CopilotSessionWrapper } from './copilotSessionWrapper.js'; import { getEditFilePath, getInvocationMessage, getPastTenseMessage, getShellLanguage, getToolDisplayName, getToolInputString, getToolKind, isEditTool, isHiddenTool } from './copilotToolDisplay.js'; import { FileEditTracker } from './fileEditTracker.js'; @@ -238,6 +238,30 @@ export class CopilotAgent extends Disposable implements IAgent { this._logService.info(`[Copilot:${sessionId}] session.send() returned`); } + setPendingMessages(session: URI, steeringMessage: IPendingMessage | undefined, queuedMessages: readonly IPendingMessage[]): void { + const sessionId = AgentSession.id(session); + const entry = this._sessions.get(sessionId); + if (!entry) { + this._logService.warn(`[Copilot:${sessionId}] setPendingMessages: session not found`); + return; + } + + // Steering: send with mode 'immediate' so the SDK injects it mid-turn + if (steeringMessage) { + this._logService.info(`[Copilot:${sessionId}] Sending steering message: "${steeringMessage.userMessage.text.substring(0, 100)}"`); + entry.session.send({ + prompt: steeringMessage.userMessage.text, + mode: 'immediate', + }).catch(err => { + this._logService.error(`[Copilot:${sessionId}] Steering message failed`, err); + }); + } + + // Queued messages are consumed by the server (AgentSideEffects) + // which dispatches SessionTurnStarted and calls sendMessage directly. + // No SDK-level enqueue is needed. + } + async getSessionMessages(session: URI): Promise<(IAgentMessageEvent | IAgentToolStartEvent | IAgentToolCompleteEvent)[]> { const sessionId = AgentSession.id(session); const entry = this._sessions.get(sessionId) ?? await this._resumeSession(sessionId).catch(() => undefined); diff --git a/src/vs/platform/agentHost/node/protocolServerHandler.ts b/src/vs/platform/agentHost/node/protocolServerHandler.ts index 6e71a13fba7..50442444a36 100644 --- a/src/vs/platform/agentHost/node/protocolServerHandler.ts +++ b/src/vs/platform/agentHost/node/protocolServerHandler.ts @@ -9,7 +9,7 @@ import { ILogService } from '../../log/common/log.js'; import type { IAgentDescriptor, IAuthenticateParams, IAuthenticateResult, IResourceMetadata } from '../common/agentService.js'; import type { ICommandMap } from '../common/state/protocol/messages.js'; import { IActionEnvelope, INotification, isSessionAction, type ISessionAction } from '../common/state/sessionActions.js'; -import { isActionKnownToVersion, MIN_PROTOCOL_VERSION, PROTOCOL_VERSION } from '../common/state/sessionCapabilities.js'; +import { MIN_PROTOCOL_VERSION, PROTOCOL_VERSION } from '../common/state/sessionCapabilities.js'; import { AHP_SESSION_NOT_FOUND, AHP_UNSUPPORTED_PROTOCOL_VERSION, @@ -418,9 +418,6 @@ export class ProtocolServerHandler extends Disposable { private _isRelevantToClient(client: IConnectedClient, envelope: IActionEnvelope): boolean { const action = envelope.action; - if (!isActionKnownToVersion(action, client.protocolVersion)) { - return false; - } if (action.type.startsWith('root/')) { return client.subscriptions.has(ROOT_STATE_URI); } diff --git a/src/vs/platform/agentHost/test/node/agentSideEffects.test.ts b/src/vs/platform/agentHost/test/node/agentSideEffects.test.ts index 5822a802fff..3d36928963b 100644 --- a/src/vs/platform/agentHost/test/node/agentSideEffects.test.ts +++ b/src/vs/platform/agentHost/test/node/agentSideEffects.test.ts @@ -16,7 +16,7 @@ import { NullLogService } from '../../../log/common/log.js'; import { AgentSession, IAgent } from '../../common/agentService.js'; import { ISessionDataService } from '../../common/sessionDataService.js'; import { ActionType, IActionEnvelope, ISessionAction } from '../../common/state/sessionActions.js'; -import { ResponsePartKind, SessionLifecycle, SessionStatus, ToolCallConfirmationReason, ToolCallStatus, ToolResultContentType, TurnState, type IMarkdownResponsePart, type IToolCallCompletedState, type IToolCallResponsePart } from '../../common/state/sessionState.js'; +import { PendingMessageKind, ResponsePartKind, SessionLifecycle, SessionStatus, ToolCallConfirmationReason, ToolCallStatus, ToolResultContentType, TurnState, type IMarkdownResponsePart, type IToolCallCompletedState, type IToolCallResponsePart } from '../../common/state/sessionState.js'; import { AgentSideEffects } from '../../node/agentSideEffects.js'; import { SessionStateManager } from '../../node/sessionStateManager.js'; import { MockAgent } from './mockAgent.js'; @@ -522,4 +522,204 @@ suite('AgentSideEffects', () => { assert.deepStrictEqual(result, { authenticated: false }); }); }); + + // ---- Pending message sync ----------------------------------------------- + + suite('pending message sync', () => { + + test('syncs steering message to agent on SessionPendingMessageSet', () => { + setupSession(); + + const action = { + type: ActionType.SessionPendingMessageSet as const, + session: sessionUri.toString(), + kind: PendingMessageKind.Steering, + id: 'steer-1', + userMessage: { text: 'focus on tests' }, + }; + stateManager.dispatchClientAction(action, { clientId: 'test', clientSeq: 1 }); + sideEffects.handleAction(action); + + assert.strictEqual(agent.setPendingMessagesCalls.length, 1); + assert.deepStrictEqual(agent.setPendingMessagesCalls[0].steeringMessage, { id: 'steer-1', userMessage: { text: 'focus on tests' } }); + assert.deepStrictEqual(agent.setPendingMessagesCalls[0].queuedMessages, []); + }); + + test('syncs queued message to agent on SessionPendingMessageSet', () => { + setupSession(); + + const action = { + type: ActionType.SessionPendingMessageSet as const, + session: sessionUri.toString(), + kind: PendingMessageKind.Queued, + id: 'q-1', + userMessage: { text: 'queued message' }, + }; + stateManager.dispatchClientAction(action, { clientId: 'test', clientSeq: 1 }); + sideEffects.handleAction(action); + + // Queued messages are not forwarded to the agent; the server controls consumption + assert.strictEqual(agent.setPendingMessagesCalls.length, 1); + assert.strictEqual(agent.setPendingMessagesCalls[0].steeringMessage, undefined); + assert.deepStrictEqual(agent.setPendingMessagesCalls[0].queuedMessages, []); + + // Session was idle, so the queued message is consumed immediately + assert.strictEqual(agent.sendMessageCalls.length, 1); + assert.strictEqual(agent.sendMessageCalls[0].prompt, 'queued message'); + }); + + test('syncs on SessionPendingMessageRemoved', () => { + setupSession(); + + // Add a queued message + const setAction = { + type: ActionType.SessionPendingMessageSet as const, + session: sessionUri.toString(), + kind: PendingMessageKind.Queued, + id: 'q-rm', + userMessage: { text: 'will be removed' }, + }; + stateManager.dispatchClientAction(setAction, { clientId: 'test', clientSeq: 1 }); + sideEffects.handleAction(setAction); + + agent.setPendingMessagesCalls.length = 0; + + // Remove + const removeAction = { + type: ActionType.SessionPendingMessageRemoved as const, + session: sessionUri.toString(), + kind: PendingMessageKind.Queued, + id: 'q-rm', + }; + stateManager.dispatchClientAction(removeAction, { clientId: 'test', clientSeq: 2 }); + sideEffects.handleAction(removeAction); + + assert.strictEqual(agent.setPendingMessagesCalls.length, 1); + assert.deepStrictEqual(agent.setPendingMessagesCalls[0].queuedMessages, []); + }); + + test('syncs on SessionQueuedMessagesReordered', () => { + setupSession(); + + // Add two queued messages + const setA = { type: ActionType.SessionPendingMessageSet as const, session: sessionUri.toString(), kind: PendingMessageKind.Queued, id: 'q-a', userMessage: { text: 'A' } }; + stateManager.dispatchClientAction(setA, { clientId: 'test', clientSeq: 1 }); + sideEffects.handleAction(setA); + + const setB = { type: ActionType.SessionPendingMessageSet as const, session: sessionUri.toString(), kind: PendingMessageKind.Queued, id: 'q-b', userMessage: { text: 'B' } }; + stateManager.dispatchClientAction(setB, { clientId: 'test', clientSeq: 2 }); + sideEffects.handleAction(setB); + + agent.setPendingMessagesCalls.length = 0; + + // Reorder + const reorderAction = { type: ActionType.SessionQueuedMessagesReordered as const, session: sessionUri.toString(), order: ['q-b', 'q-a'] }; + stateManager.dispatchClientAction(reorderAction, { clientId: 'test', clientSeq: 3 }); + sideEffects.handleAction(reorderAction); + + assert.strictEqual(agent.setPendingMessagesCalls.length, 1); + assert.deepStrictEqual(agent.setPendingMessagesCalls[0].queuedMessages, []); + }); + }); + + // ---- Queued message consumption ----------------------------------------- + + suite('queued message consumption', () => { + + test('auto-starts turn from queued message on idle', () => { + setupSession(); + disposables.add(sideEffects.registerProgressListener(agent)); + + // Queue a message while a turn is active + startTurn('turn-1'); + const setAction = { + type: ActionType.SessionPendingMessageSet as const, + session: sessionUri.toString(), + kind: PendingMessageKind.Queued, + id: 'q-auto', + userMessage: { text: 'auto queued' }, + }; + stateManager.dispatchClientAction(setAction, { clientId: 'test', clientSeq: 1 }); + sideEffects.handleAction(setAction); + + // Message should NOT be consumed yet (turn is active) + assert.strictEqual(agent.sendMessageCalls.length, 0); + + const envelopes: IActionEnvelope[] = []; + disposables.add(stateManager.onDidEmitEnvelope(e => envelopes.push(e))); + + // Fire idle → turn completes → queued message should be consumed + agent.fireProgress({ session: sessionUri, type: 'idle' }); + + const turnComplete = envelopes.find(e => e.action.type === ActionType.SessionTurnComplete); + assert.ok(turnComplete, 'should dispatch session/turnComplete'); + + const turnStarted = envelopes.find(e => e.action.type === ActionType.SessionTurnStarted); + assert.ok(turnStarted, 'should dispatch session/turnStarted for queued message'); + assert.strictEqual((turnStarted!.action as { queuedMessageId?: string }).queuedMessageId, 'q-auto'); + + assert.strictEqual(agent.sendMessageCalls.length, 1); + assert.strictEqual(agent.sendMessageCalls[0].prompt, 'auto queued'); + + // Queued message should be removed from state + const state = stateManager.getSessionState(sessionUri.toString()); + assert.strictEqual(state?.queuedMessages, undefined); + }); + + test('does not consume queued message while a turn is active', () => { + setupSession(); + startTurn('turn-1'); + + const envelopes: IActionEnvelope[] = []; + disposables.add(stateManager.onDidEmitEnvelope(e => envelopes.push(e))); + + const setAction = { + type: ActionType.SessionPendingMessageSet as const, + session: sessionUri.toString(), + kind: PendingMessageKind.Queued, + id: 'q-wait', + userMessage: { text: 'should wait' }, + }; + stateManager.dispatchClientAction(setAction, { clientId: 'test', clientSeq: 1 }); + sideEffects.handleAction(setAction); + + // No turn started for the queued message + const turnStarted = envelopes.find(e => e.action.type === ActionType.SessionTurnStarted); + assert.strictEqual(turnStarted, undefined, 'should not start a turn while one is active'); + assert.strictEqual(agent.sendMessageCalls.length, 0); + + // Queued message still in state + const state = stateManager.getSessionState(sessionUri.toString()); + assert.strictEqual(state?.queuedMessages?.length, 1); + assert.strictEqual(state?.queuedMessages?.[0].id, 'q-wait'); + }); + + test('dispatches SessionPendingMessageRemoved for steering messages', () => { + setupSession(); + + const envelopes: IActionEnvelope[] = []; + disposables.add(stateManager.onDidEmitEnvelope(e => envelopes.push(e))); + + const action = { + type: ActionType.SessionPendingMessageSet as const, + session: sessionUri.toString(), + kind: PendingMessageKind.Steering, + id: 'steer-rm', + userMessage: { text: 'steer me' }, + }; + stateManager.dispatchClientAction(action, { clientId: 'test', clientSeq: 1 }); + sideEffects.handleAction(action); + + const removal = envelopes.find(e => + e.action.type === ActionType.SessionPendingMessageRemoved && + (e.action as { kind: PendingMessageKind }).kind === PendingMessageKind.Steering + ); + assert.ok(removal, 'should dispatch SessionPendingMessageRemoved for steering'); + assert.strictEqual((removal!.action as { id: string }).id, 'steer-rm'); + + // Steering message should be removed from state + const state = stateManager.getSessionState(sessionUri.toString()); + assert.strictEqual(state?.steeringMessage, undefined); + }); + }); }); diff --git a/src/vs/platform/agentHost/test/node/mockAgent.ts b/src/vs/platform/agentHost/test/node/mockAgent.ts index bd20573d73a..6f324ace25b 100644 --- a/src/vs/platform/agentHost/test/node/mockAgent.ts +++ b/src/vs/platform/agentHost/test/node/mockAgent.ts @@ -7,7 +7,7 @@ import { Emitter } from '../../../../base/common/event.js'; import { URI } from '../../../../base/common/uri.js'; import type { IAuthorizationProtectedResourceMetadata } from '../../../../base/common/oauth.js'; import { AgentSession, type AgentProvider, type IAgent, type IAgentAttachment, type IAgentCreateSessionConfig, type IAgentDescriptor, type IAgentMessageEvent, type IAgentModelInfo, type IAgentProgressEvent, type IAgentSessionMetadata, type IAgentToolCompleteEvent, type IAgentToolStartEvent } from '../../common/agentService.js'; -import { ToolResultContentType, type IToolCallResult } from '../../common/state/sessionState.js'; +import { ToolResultContentType, type IPendingMessage, type IToolCallResult } from '../../common/state/sessionState.js'; /** * General-purpose mock agent for unit tests. Tracks all method calls @@ -22,6 +22,7 @@ export class MockAgent implements IAgent { readonly sendMessageCalls: { session: URI; prompt: string }[] = []; + readonly setPendingMessagesCalls: { session: URI; steeringMessage: IPendingMessage | undefined; queuedMessages: readonly IPendingMessage[] }[] = []; readonly disposeSessionCalls: URI[] = []; readonly abortSessionCalls: URI[] = []; readonly respondToPermissionCalls: { requestId: string; approved: boolean }[] = []; @@ -66,6 +67,10 @@ export class MockAgent implements IAgent { this.sendMessageCalls.push({ session, prompt }); } + setPendingMessages(session: URI, steeringMessage: IPendingMessage | undefined, queuedMessages: readonly IPendingMessage[]): void { + this.setPendingMessagesCalls.push({ session, steeringMessage, queuedMessages }); + } + async getSessionMessages(_session: URI): Promise<(IAgentMessageEvent | IAgentToolStartEvent | IAgentToolCompleteEvent)[]> { return this.sessionMessages; } diff --git a/src/vs/workbench/contrib/chat/browser/agentSessions/agentHost/agentHostSessionHandler.ts b/src/vs/workbench/contrib/chat/browser/agentSessions/agentHost/agentHostSessionHandler.ts index 010944b09fd..f51a429171c 100644 --- a/src/vs/workbench/contrib/chat/browser/agentSessions/agentHost/agentHostSessionHandler.ts +++ b/src/vs/workbench/contrib/chat/browser/agentSessions/agentHost/agentHostSessionHandler.ts @@ -7,24 +7,24 @@ import { Throttler } from '../../../../../../base/common/async.js'; import { CancellationToken } from '../../../../../../base/common/cancellation.js'; import { Emitter } from '../../../../../../base/common/event.js'; import { MarkdownString } from '../../../../../../base/common/htmlContent.js'; -import { Disposable, DisposableStore, toDisposable } from '../../../../../../base/common/lifecycle.js'; +import { Disposable, DisposableMap, DisposableStore, MutableDisposable, toDisposable } from '../../../../../../base/common/lifecycle.js'; import { observableValue } from '../../../../../../base/common/observable.js'; import { URI } from '../../../../../../base/common/uri.js'; import { generateUuid } from '../../../../../../base/common/uuid.js'; import { localize } from '../../../../../../nls.js'; import { toAgentHostUri } from '../../../../../../platform/agentHost/common/agentHostUri.js'; import { AgentProvider, AgentSession, IAgentAttachment, type IAgentConnection } from '../../../../../../platform/agentHost/common/agentService.js'; -import { ActionType, isSessionAction } from '../../../../../../platform/agentHost/common/state/sessionActions.js'; +import { ActionType, isSessionAction, type ISessionAction } from '../../../../../../platform/agentHost/common/state/sessionActions.js'; import { SessionClientState } from '../../../../../../platform/agentHost/common/state/sessionClientState.js'; import { AHP_AUTH_REQUIRED, ProtocolError } from '../../../../../../platform/agentHost/common/state/sessionProtocol.js'; import { getToolKind, getToolLanguage } from '../../../../../../platform/agentHost/common/state/sessionReducers.js'; -import { AttachmentType, ResponsePartKind, ToolCallCancellationReason, ToolCallConfirmationReason, ToolCallStatus, TurnState, type IMessageAttachment } from '../../../../../../platform/agentHost/common/state/sessionState.js'; +import { AttachmentType, PendingMessageKind, ResponsePartKind, ToolCallCancellationReason, ToolCallConfirmationReason, ToolCallStatus, TurnState, type IMessageAttachment } from '../../../../../../platform/agentHost/common/state/sessionState.js'; import { ExtensionIdentifier } from '../../../../../../platform/extensions/common/extensions.js'; import { IInstantiationService } from '../../../../../../platform/instantiation/common/instantiation.js'; import { ILogService } from '../../../../../../platform/log/common/log.js'; import { IProductService } from '../../../../../../platform/product/common/productService.js'; import { IWorkspaceContextService } from '../../../../../../platform/workspace/common/workspace.js'; -import { IChatProgress, IChatService, IChatToolInvocation, ToolConfirmKind } from '../../../common/chatService/chatService.js'; +import { IChatProgress, IChatService, IChatToolInvocation, ToolConfirmKind, ChatRequestQueueKind } from '../../../common/chatService/chatService.js'; import { IChatSession, IChatSessionContentProvider, IChatSessionHistoryItem } from '../../../common/chatSessionsService.js'; import { ChatAgentLocation, ChatModeKind } from '../../../common/constants.js'; import { ChatToolInvocation } from '../../../common/model/chatProgressTypes/chatToolInvocation.js'; @@ -51,6 +51,9 @@ class AgentHostChatSession extends Disposable implements IChatSession { private readonly _onWillDispose = this._register(new Emitter()); readonly onWillDispose = this._onWillDispose.event; + private readonly _onDidStartServerRequest = this._register(new Emitter<{ prompt: string }>()); + readonly onDidStartServerRequest = this._onDidStartServerRequest.event; + readonly requestHandler: IChatSession['requestHandler']; readonly interruptActiveResponseCallback: IChatSession['interruptActiveResponseCallback']; @@ -77,6 +80,18 @@ class AgentHostChatSession extends Disposable implements IChatSession { return true; }; } + + /** + * Called by the session handler when a server-initiated turn starts. + * Resets the progress observable and signals listeners to create a new + * request+response pair in the chat model. + */ + startServerRequest(prompt: string): void { + this._logService.info('[AgentHost] Server-initiated request started'); + this.progressObs.set([], undefined); + this.isCompleteObs.set(false, undefined); + this._onDidStartServerRequest.fire({ prompt }); + } } // ============================================================================= @@ -115,6 +130,12 @@ export class AgentHostSessionHandler extends Disposable implements IChatSessionC private readonly _activeSessions = new Map(); /** Maps UI resource keys to resolved backend session URIs. */ private readonly _sessionToBackend = new Map(); + /** Per-session subscription to chat model pending request changes. */ + private readonly _pendingMessageSubscriptions = this._register(new DisposableMap()); + /** Per-session subscription watching for server-initiated turns. */ + private readonly _serverTurnWatchers = this._register(new DisposableMap()); + /** Turn IDs dispatched by this client, used to distinguish server-originated turns. */ + private readonly _clientDispatchedTurnIds = new Set(); private readonly _config: IAgentHostSessionHandlerConfig; /** Client state manager shared across all sessions for this handler. */ @@ -176,13 +197,20 @@ export class AgentHostSessionHandler extends Disposable implements IChatSessionC history, async (request: IChatAgentRequest, progress: (parts: IChatProgress[]) => void, token: CancellationToken) => { const backendSession = resolvedSession ?? await this._createAndSubscribe(sessionResource, request.userSelectedModelId); - resolvedSession = backendSession; - this._sessionToBackend.set(resourceKey, backendSession); + if (!resolvedSession) { + resolvedSession = backendSession; + this._sessionToBackend.set(resourceKey, backendSession); + } + // For existing sessions, set up pending message sync on the first turn + // (after the ChatModel becomes available in the ChatService). + this._ensurePendingMessageSubscription(resourceKey, sessionResource, backendSession); return this._handleTurn(backendSession, request, progress, token); }, () => { this._activeSessions.delete(resourceKey); this._sessionToBackend.delete(resourceKey); + this._pendingMessageSubscriptions.deleteAndDispose(resourceKey); + this._serverTurnWatchers.deleteAndDispose(resourceKey); if (resolvedSession) { this._clientState.unsubscribe(resolvedSession.toString()); this._config.connection.unsubscribe(resolvedSession); @@ -191,6 +219,13 @@ export class AgentHostSessionHandler extends Disposable implements IChatSessionC }, ); this._activeSessions.set(resourceKey, session); + + // For existing (non-untitled) sessions, start watching for server-initiated turns + // immediately. For untitled sessions, this is deferred to _createAndSubscribe. + if (resolvedSession) { + this._watchForServerInitiatedTurns(resolvedSession, sessionResource); + } + return session; } @@ -250,6 +285,304 @@ export class AgentHostSessionHandler extends Disposable implements IChatSessionC return {}; } + // ---- Pending message sync ----------------------------------------------- + + /** + * Diffs the chat model's pending requests against the protocol state in + * `_clientState` and dispatches Set/Removed/Reordered actions as needed. + */ + private _syncPendingMessages(sessionResource: URI, backendSession: URI): void { + const chatModel = this._chatService.getSession(sessionResource); + if (!chatModel) { + return; + } + const session = backendSession.toString(); + const pending = chatModel.getPendingRequests(); + const protocolState = this._clientState.getSessionState(session); + const prevSteering = protocolState?.steeringMessage; + const prevQueued = protocolState?.queuedMessages ?? []; + + // Compute current state from chat model + let currentSteering: { id: string; text: string } | undefined; + const currentQueued: { id: string; text: string }[] = []; + for (const p of pending) { + if (p.kind === ChatRequestQueueKind.Steering) { + currentSteering = { id: p.request.id, text: p.request.message.text }; + } else { + currentQueued.push({ id: p.request.id, text: p.request.message.text }); + } + } + + // --- Steering --- + if (currentSteering) { + if (currentSteering.id !== prevSteering?.id) { + this._dispatchAction({ + type: ActionType.SessionPendingMessageSet, + session, + kind: PendingMessageKind.Steering, + id: currentSteering.id, + userMessage: { text: currentSteering.text }, + }); + } + } else if (prevSteering) { + this._dispatchAction({ + type: ActionType.SessionPendingMessageRemoved, + session, + kind: PendingMessageKind.Steering, + id: prevSteering.id, + }); + } + + // --- Queued: removals --- + const currentQueuedIds = new Set(currentQueued.map(q => q.id)); + for (const prev of prevQueued) { + if (!currentQueuedIds.has(prev.id)) { + this._dispatchAction({ + type: ActionType.SessionPendingMessageRemoved, + session, + kind: PendingMessageKind.Queued, + id: prev.id, + }); + } + } + + // --- Queued: additions --- + const prevQueuedIds = new Set(prevQueued.map(q => q.id)); + for (const q of currentQueued) { + if (!prevQueuedIds.has(q.id)) { + this._dispatchAction({ + type: ActionType.SessionPendingMessageSet, + session, + kind: PendingMessageKind.Queued, + id: q.id, + userMessage: { text: q.text }, + }); + } + } + + // --- Queued: reordering --- + // After additions/removals, check if the remaining common items changed order. + // Re-read protocol state since dispatches above may have mutated it. + const updatedProtocol = this._clientState.getSessionState(session); + const updatedQueued = updatedProtocol?.queuedMessages ?? []; + if (updatedQueued.length > 1 && currentQueued.length === updatedQueued.length) { + const needsReorder = currentQueued.some((q, i) => q.id !== updatedQueued[i].id); + if (needsReorder) { + this._dispatchAction({ + type: ActionType.SessionQueuedMessagesReordered, + session, + order: currentQueued.map(q => q.id), + }); + } + } + } + + private _dispatchAction(action: ISessionAction): void { + const seq = this._clientState.applyOptimistic(action); + this._config.connection.dispatchAction(action, this._clientState.clientId, seq); + } + + // ---- Server-initiated turn detection ------------------------------------ + + /** + * Sets up a persistent listener on the session's protocol state that + * detects server-initiated turns (e.g. auto-consumed queued messages). + * When a new `activeTurn` appears whose `turnId` was NOT dispatched by + * this client, it signals the {@link AgentHostChatSession} to create a + * new request in the chat model, removes the consumed pending request + * if applicable, and pipes turn progress through `progressObs`. + */ + private _watchForServerInitiatedTurns(backendSession: URI, sessionResource: URI): void { + const resourceKey = sessionResource.path.substring(1); + const sessionStr = backendSession.toString(); + let lastSeenTurnId: string | undefined; + let previousQueuedIds: Set | undefined; + + const disposables = new DisposableStore(); + + // MutableDisposable for per-turn progress tracking (replaced each turn) + const turnProgressDisposable = new MutableDisposable(); + disposables.add(turnProgressDisposable); + + disposables.add(this._clientState.onDidChangeSessionState(e => { + if (e.session !== sessionStr) { + return; + } + + // Track queued message IDs so we can detect which one was consumed + const currentQueuedIds = new Set((e.state.queuedMessages ?? []).map(m => m.id)); + + const activeTurn = e.state.activeTurn; + if (!activeTurn || activeTurn.id === lastSeenTurnId) { + previousQueuedIds = currentQueuedIds; + return; + } + lastSeenTurnId = activeTurn.id; + + // If we dispatched this turn, the existing _handleTurn flow handles it + if (this._clientDispatchedTurnIds.has(activeTurn.id)) { + previousQueuedIds = currentQueuedIds; + return; + } + + const chatSession = this._activeSessions.get(resourceKey); + if (!chatSession) { + previousQueuedIds = currentQueuedIds; + return; + } + + this._logService.info(`[AgentHost] Server-initiated turn detected: ${activeTurn.id}`); + + // Determine which queued message was consumed by diffing queue state + if (previousQueuedIds) { + for (const prevId of previousQueuedIds) { + if (!currentQueuedIds.has(prevId)) { + this._chatService.removePendingRequest(sessionResource, prevId); + } + } + } + previousQueuedIds = currentQueuedIds; + + // Signal the session to create a new request+response pair + chatSession.startServerRequest(activeTurn.userMessage.text); + + // Set up turn progress tracking — reuse the same state-to-progress + // translation as _handleTurn, but pipe output to progressObs/isCompleteObs + const turnStore = new DisposableStore(); + turnProgressDisposable.value = turnStore; + this._trackServerTurnProgress(backendSession, activeTurn.id, chatSession, sessionResource, turnStore); + })); + + this._serverTurnWatchers.set(resourceKey, disposables); + } + + /** + * Tracks protocol state changes for a specific server-initiated turn and + * pushes `IChatProgress[]` items into the session's `progressObs`. + * When the turn finishes, sets `isCompleteObs` to true. + */ + private _trackServerTurnProgress( + backendSession: URI, + turnId: string, + chatSession: AgentHostChatSession, + sessionResource: URI, + turnDisposables: DisposableStore, + ): void { + const sessionStr = backendSession.toString(); + const activeToolInvocations = new Map(); + const lastEmittedLengths = new Map(); + const throttler = new Throttler(); + turnDisposables.add(throttler); + + const progress = (parts: IChatProgress[]) => { + const current = chatSession.progressObs.get(); + chatSession.progressObs.set([...current, ...parts], undefined); + }; + + let finished = false; + const finish = () => throttler.queue(async () => { + if (finished) { + return; + } + finished = true; + for (const [, invocation] of activeToolInvocations) { + invocation.didExecuteTool(undefined); + } + activeToolInvocations.clear(); + chatSession.isCompleteObs.set(true, undefined); + }); + + turnDisposables.add(this._clientState.onDidChangeSessionState(e => { + throttler.queue(async () => { + if (e.session !== sessionStr) { + return; + } + + const activeTurn = e.state.activeTurn; + const isActive = activeTurn?.id === turnId; + const responseParts = isActive + ? activeTurn.responseParts + : e.state.turns.find(t => t.id === turnId)?.responseParts; + + if (responseParts) { + for (const rp of responseParts) { + switch (rp.kind) { + case ResponsePartKind.Markdown: { + const lastLen = lastEmittedLengths.get(rp.id) ?? 0; + if (rp.content.length > lastLen) { + const delta = rp.content.substring(lastLen); + lastEmittedLengths.set(rp.id, rp.content.length); + progress([{ kind: 'markdownContent', content: new MarkdownString(delta, { supportHtml: true }) }]); + } + break; + } + case ResponsePartKind.Reasoning: { + const lastLen = lastEmittedLengths.get(rp.id) ?? 0; + if (rp.content.length > lastLen) { + const delta = rp.content.substring(lastLen); + lastEmittedLengths.set(rp.id, rp.content.length); + progress([{ kind: 'thinking', value: delta }]); + } + break; + } + case ResponsePartKind.ToolCall: { + const tc = rp.toolCall; + const toolCallId = tc.toolCallId; + let existing = activeToolInvocations.get(toolCallId); + + if (!existing) { + existing = toolCallStateToInvocation(tc); + activeToolInvocations.set(toolCallId, existing); + progress([existing]); + + if (tc.status === ToolCallStatus.PendingConfirmation) { + this._awaitToolConfirmation(existing, toolCallId, backendSession, turnId, CancellationToken.None); + } + } else if (tc.status === ToolCallStatus.PendingConfirmation) { + existing.didExecuteTool(undefined); + const confirmInvocation = toolCallStateToInvocation(tc); + activeToolInvocations.set(toolCallId, confirmInvocation); + progress([confirmInvocation]); + this._awaitToolConfirmation(confirmInvocation, toolCallId, backendSession, turnId, CancellationToken.None); + } else if (tc.status === ToolCallStatus.Running) { + existing.invocationMessage = typeof tc.invocationMessage === 'string' + ? tc.invocationMessage + : new MarkdownString(tc.invocationMessage.markdown); + if (getToolKind(tc) === 'terminal' && tc.toolInput) { + existing.toolSpecificData = { + kind: 'terminal', + commandLine: { original: tc.toolInput }, + language: getToolLanguage(tc) ?? 'shellscript', + }; + } + } + + if (existing && (tc.status === ToolCallStatus.Completed || tc.status === ToolCallStatus.Cancelled) && !IChatToolInvocation.isComplete(existing)) { + activeToolInvocations.delete(toolCallId); + const fileEdits = finalizeToolInvocation(existing, tc); + if (fileEdits.length > 0) { + // File edits from server-initiated turns are not routed through + // the editing session here; the request is not yet available + // in the ChatModel at this point. + } + } + break; + } + } + } + } + + if (!isActive && !finished) { + const lastTurn = e.state.turns.find(t => t.id === turnId); + if (lastTurn?.state === TurnState.Error && lastTurn.error) { + progress([{ kind: 'markdownContent', content: new MarkdownString(`\n\nError: (${lastTurn.error.errorType}) ${lastTurn.error.message}`) }]); + } + finish(); + } + }); + })); + } + // ---- Turn handling (state-driven) --------------------------------------- private async _handleTurn( @@ -263,6 +596,8 @@ export class AgentHostSessionHandler extends Disposable implements IChatSessionC } const turnId = generateUuid(); + this._clientDispatchedTurnIds.add(turnId); + const cleanUpTurnId = () => this._clientDispatchedTurnIds.delete(turnId); const attachments = this._convertVariablesToAttachments(request); const messageAttachments: IMessageAttachment[] = attachments.map(a => ({ type: a.type, @@ -323,6 +658,7 @@ export class AgentHostSessionHandler extends Disposable implements IChatSessionC return; } finished = true; + cleanUpTurnId(); // Finalize any outstanding tool invocations for (const [, invocation] of activeToolInvocations) { invocation.didExecuteTool(undefined); @@ -408,7 +744,6 @@ export class AgentHostSessionHandler extends Disposable implements IChatSessionC // Finalize terminal-state tools (whether just created or pre-existing) if (existing && (tc.status === ToolCallStatus.Completed || tc.status === ToolCallStatus.Cancelled) && !IChatToolInvocation.isComplete(existing)) { - activeToolInvocations.delete(toolCallId); const fileEdits = finalizeToolInvocation(existing, tc); if (fileEdits.length > 0) { await this._applyFileEdits(request.sessionResource, request, fileEdits, progress); @@ -590,9 +925,31 @@ export class AgentHostSessionHandler extends Disposable implements IChatSessionC this._logService.error(`[AgentHost] Failed to subscribe to new session: ${session.toString()}`, err); } + // Start syncing the chat model's pending requests to the protocol + this._ensurePendingMessageSubscription(resourceKey, sessionResource, session); + + // Start watching for server-initiated turns on this session + this._watchForServerInitiatedTurns(session, sessionResource); + return session; } + /** + * Ensures that the chat model's pending request changes are synced to the + * protocol for a given session. No-ops if already subscribed. + */ + private _ensurePendingMessageSubscription(resourceKey: string, sessionResource: URI, backendSession: URI): void { + if (this._pendingMessageSubscriptions.has(resourceKey)) { + return; + } + const chatModel = this._chatService?.getSession(sessionResource); + if (chatModel) { + this._pendingMessageSubscriptions.set(resourceKey, chatModel.onDidChangePendingRequests(() => { + this._syncPendingMessages(sessionResource, backendSession); + })); + } + } + /** * Check if an error is an "authentication required" error. * Checks for the AHP_AUTH_REQUIRED error code when available, diff --git a/src/vs/workbench/contrib/chat/common/chatService/chatServiceImpl.ts b/src/vs/workbench/contrib/chat/common/chatService/chatServiceImpl.ts index fadfe7a3745..1a1dd8bd7e5 100644 --- a/src/vs/workbench/contrib/chat/common/chatService/chatServiceImpl.ts +++ b/src/vs/workbench/contrib/chat/common/chatService/chatServiceImpl.ts @@ -732,12 +732,14 @@ export class ChatService extends Disposable implements IChatService { lastRequest?.response?.complete(); } - if (providedSession.progressObs && lastRequest && providedSession.interruptActiveResponseCallback) { - const initialCancellationRequest = this.instantiationService.createInstance(CancellableRequest, new CancellationTokenSource(), undefined, undefined, undefined); - this._pendingRequests.set(model.sessionResource, initialCancellationRequest); - this.telemetryService.publicLog2(ChatPendingRequestChangeEventName, { action: 'add', source: 'remoteSession', chatSessionId: chatSessionResourceToId(model.sessionResource) }); - const cancellationListener = disposables.add(new MutableDisposable()); + // Set up progress streaming and cancellation for contributed sessions. + // This handles both the initial in-flight response (from session load) + // and any subsequent server-initiated turns (e.g. consumed queued messages). + const hasProgressStreaming = providedSession.progressObs && providedSession.interruptActiveResponseCallback; + if (hasProgressStreaming) { + let lastProgressLength = 0; + const cancellationListener = disposables.add(new MutableDisposable()); const createCancellationListener = (token: CancellationToken) => { return token.onCancellationRequested(() => { providedSession.interruptActiveResponseCallback?.().then(userConfirmedInterruption => { @@ -752,15 +754,58 @@ export class ChatService extends Disposable implements IChatService { }); }; - cancellationListener.value = createCancellationListener(initialCancellationRequest.cancellationTokenSource.token); + const ensureCancellationTracking = () => { + if (!this._pendingRequests.has(model.sessionResource)) { + const cts = this.instantiationService.createInstance(CancellableRequest, new CancellationTokenSource(), undefined, undefined, undefined); + this._pendingRequests.set(model.sessionResource, cts); + this.telemetryService.publicLog2(ChatPendingRequestChangeEventName, { action: 'add', source: 'remoteSession', chatSessionId: chatSessionResourceToId(model.sessionResource) }); + cancellationListener.value = createCancellationListener(cts.cancellationTokenSource.token); + } + }; - let lastProgressLength = 0; + if (lastRequest) { + const initialCancellationRequest = this.instantiationService.createInstance(CancellableRequest, new CancellationTokenSource(), undefined, undefined, undefined); + this._pendingRequests.set(model.sessionResource, initialCancellationRequest); + this.telemetryService.publicLog2(ChatPendingRequestChangeEventName, { action: 'add', source: 'remoteSession', chatSessionId: chatSessionResourceToId(model.sessionResource) }); + cancellationListener.value = createCancellationListener(initialCancellationRequest.cancellationTokenSource.token); + } + + // Handle server-initiated requests (e.g. consumed queued messages). + if (providedSession.onDidStartServerRequest) { + disposables.add(providedSession.onDidStartServerRequest(({ prompt }) => { + // Complete any in-flight request + if (lastRequest?.response && !lastRequest.response.isComplete) { + lastRequest.response.complete(); + } + + // Create a new request in the model + const requestText = prompt; + const parsedRequest: IParsedChatRequest = { + text: requestText, + parts: [new ChatRequestTextPart( + new OffsetRange(0, requestText.length), + { startLineNumber: 1, startColumn: 1, endLineNumber: 1, endColumn: requestText.length + 1 }, + requestText + )] + }; + const agent = this.chatAgentService.getAgent(chatSessionType); + lastRequest = model.addRequest(parsedRequest, { variables: [] }, 0, undefined, agent); + + // Reset progress tracking for the new turn + lastProgressLength = 0; + + // Ensure cancellation tracking is active + ensureCancellationTracking(); + })); + } + + // Single autorun that streams progress for whichever request is current. disposables.add(autorun(reader => { const progressArray = providedSession.progressObs?.read(reader) ?? []; const isComplete = providedSession.isCompleteObs?.read(reader) ?? false; // Process only new progress items - if (progressArray.length > lastProgressLength) { + if (lastRequest && progressArray.length > lastProgressLength) { const newProgress = progressArray.slice(lastProgressLength); for (const progress of newProgress) { model?.acceptResponseProgress(lastRequest, progress); @@ -769,7 +814,7 @@ export class ChatService extends Disposable implements IChatService { } // Handle completion - if (isComplete) { + if (isComplete && lastRequest) { lastRequest.response?.complete(); cancellationListener.clear(); } @@ -1397,12 +1442,27 @@ export class ChatService extends Disposable implements IChatService { } } + /** + * Returns true if the session is backed by an agent host server, which + * controls queued-message dequeuing on the server side. + */ + private _isServerManagedQueue(sessionResource: URI): boolean { + return sessionResource.scheme.startsWith('agent-host-'); + } + /** * Process the next pending request from the model's queue, if any. * Called after a request completes to continue processing queued requests. * Multiple consecutive steering requests are combined into a single request. */ private processNextPendingRequest(model: ChatModel): void { + // Agent host sessions delegate queue management to the server. + // The server dispatches SessionTurnStarted with queuedMessageId when + // it consumes a queued message, so the client should not dequeue eagerly. + if (this._isServerManagedQueue(model.sessionResource)) { + return; + } + // Dequeue all consecutive steering requests and combine them into one const steeringRequests = model.dequeueAllSteeringRequests(); diff --git a/src/vs/workbench/contrib/chat/common/chatSessionsService.ts b/src/vs/workbench/contrib/chat/common/chatSessionsService.ts index 16bab792915..75d912c2289 100644 --- a/src/vs/workbench/contrib/chat/common/chatSessionsService.ts +++ b/src/vs/workbench/contrib/chat/common/chatSessionsService.ts @@ -186,6 +186,13 @@ export interface IChatSession extends IDisposable { readonly isCompleteObs?: IObservable; readonly interruptActiveResponseCallback?: () => Promise; + /** + * Event fired when the server initiates a new request (e.g. from a consumed + * queued message). The consumer should create a new request+response pair in + * the model and prepare to receive progress via {@link progressObs}. + */ + readonly onDidStartServerRequest?: Event<{ prompt: string }>; + /** * Editing session transferred from a previously-untitled chat session in `onDidCommitChatSessionItem`. */ diff --git a/src/vs/workbench/contrib/chat/test/browser/agentSessions/agentHostChatContribution.test.ts b/src/vs/workbench/contrib/chat/test/browser/agentSessions/agentHostChatContribution.test.ts index de7987cf99e..ed53e2ac21a 100644 --- a/src/vs/workbench/contrib/chat/test/browser/agentSessions/agentHostChatContribution.test.ts +++ b/src/vs/workbench/contrib/chat/test/browser/agentSessions/agentHostChatContribution.test.ts @@ -1515,4 +1515,137 @@ suite('AgentHostChatContribution', () => { assert.strictEqual((agentHostService.dispatchedActions[0].action as ITurnStartedAction).userMessage.text, 'Test message'); }); }); + + // ---- Server-initiated turns ------------------------------------------- + + suite('server-initiated turns', () => { + + test('detects server-initiated turn and fires onDidStartServerRequest', async () => { + const { sessionHandler, agentHostService } = createContribution(disposables); + + // Create and subscribe a session + const sessionResource = URI.from({ scheme: 'agent-host-copilot', path: '/untitled-server-turn' }); + const chatSession = await sessionHandler.provideChatSessionContent(sessionResource, CancellationToken.None); + disposables.add(toDisposable(() => chatSession.dispose())); + + // First, do a normal turn so the backend session is created + const turn1Promise = chatSession.requestHandler!( + makeRequest({ message: 'Hello', sessionResource }), + () => { }, [], CancellationToken.None, + ); + await timeout(10); + const dispatch1 = agentHostService.dispatchedActions[0]; + const action1 = dispatch1.action as ITurnStartedAction; + const session = action1.session; + // Echo + complete the first turn + agentHostService.fireAction({ action: dispatch1.action, serverSeq: 1, origin: { clientId: agentHostService.clientId, clientSeq: dispatch1.clientSeq } }); + agentHostService.fireAction({ action: { type: 'session/turnComplete', session, turnId: action1.turnId } as ISessionAction, serverSeq: 2, origin: undefined }); + await turn1Promise; + + // Now simulate a server-initiated turn (e.g. from a consumed queued message) + const serverTurnId = 'server-turn-1'; + const serverRequestEvents: { prompt: string }[] = []; + disposables.add(chatSession.onDidStartServerRequest!(e => serverRequestEvents.push(e))); + + agentHostService.fireAction({ + action: { + type: 'session/turnStarted', + session, + turnId: serverTurnId, + userMessage: { text: 'queued message text' }, + } as ISessionAction, + serverSeq: 3, + origin: undefined, // Server-originated — no client origin + }); + + await timeout(10); + + // onDidStartServerRequest should have fired + assert.strictEqual(serverRequestEvents.length, 1); + assert.strictEqual(serverRequestEvents[0].prompt, 'queued message text'); + + // isCompleteObs should be false (turn in progress) + assert.strictEqual(chatSession.isCompleteObs!.get(), false); + }); + + test('server-initiated turn streams progress through progressObs', async () => { + const { sessionHandler, agentHostService } = createContribution(disposables); + + const sessionResource = URI.from({ scheme: 'agent-host-copilot', path: '/untitled-server-progress' }); + const chatSession = await sessionHandler.provideChatSessionContent(sessionResource, CancellationToken.None); + disposables.add(toDisposable(() => chatSession.dispose())); + + // Normal turn to create backend session + const turn1Promise = chatSession.requestHandler!( + makeRequest({ message: 'Init', sessionResource }), + () => { }, [], CancellationToken.None, + ); + await timeout(10); + const dispatch1 = agentHostService.dispatchedActions[0]; + const action1 = dispatch1.action as ITurnStartedAction; + const session = action1.session; + agentHostService.fireAction({ action: dispatch1.action, serverSeq: 1, origin: { clientId: agentHostService.clientId, clientSeq: dispatch1.clientSeq } }); + agentHostService.fireAction({ action: { type: 'session/turnComplete', session, turnId: action1.turnId } as ISessionAction, serverSeq: 2, origin: undefined }); + await turn1Promise; + + // Server-initiated turn + const serverTurnId = 'server-turn-progress'; + agentHostService.fireAction({ + action: { type: 'session/turnStarted', session, turnId: serverTurnId, userMessage: { text: 'auto queued' } } as ISessionAction, + serverSeq: 3, origin: undefined, + }); + await timeout(10); + + // Stream a response part + delta + agentHostService.fireAction({ + action: { type: 'session/responsePart', session, turnId: serverTurnId, part: { kind: 'markdown', id: 'md-srv', content: 'Hello ' } } as ISessionAction, + serverSeq: 4, origin: undefined, + }); + agentHostService.fireAction({ + action: { type: 'session/delta', session, turnId: serverTurnId, partId: 'md-srv', content: 'world' } as ISessionAction, + serverSeq: 5, origin: undefined, + }); + await timeout(50); + + // Progress should be in progressObs + const progress = chatSession.progressObs!.get(); + const markdownParts = progress.filter((p): p is IChatMarkdownContent => p.kind === 'markdownContent'); + const totalContent = markdownParts.map(p => p.content.value).join(''); + assert.strictEqual(totalContent, 'Hello world'); + + // Complete the turn + agentHostService.fireAction({ + action: { type: 'session/turnComplete', session, turnId: serverTurnId } as ISessionAction, + serverSeq: 6, origin: undefined, + }); + await timeout(10); + + assert.strictEqual(chatSession.isCompleteObs!.get(), true); + }); + + test('client-dispatched turns are not treated as server-initiated', async () => { + const { sessionHandler, agentHostService } = createContribution(disposables); + + const sessionResource = URI.from({ scheme: 'agent-host-copilot', path: '/untitled-no-dupe' }); + const chatSession = await sessionHandler.provideChatSessionContent(sessionResource, CancellationToken.None); + disposables.add(toDisposable(() => chatSession.dispose())); + + const serverRequestEvents: { prompt: string }[] = []; + disposables.add(chatSession.onDidStartServerRequest!(e => serverRequestEvents.push(e))); + + // Normal client turn — should NOT fire onDidStartServerRequest + const turnPromise = chatSession.requestHandler!( + makeRequest({ message: 'Client turn', sessionResource }), + () => { }, [], CancellationToken.None, + ); + await timeout(10); + const dispatch = agentHostService.dispatchedActions[0]; + const action = dispatch.action as ITurnStartedAction; + agentHostService.fireAction({ action: dispatch.action, serverSeq: 1, origin: { clientId: agentHostService.clientId, clientSeq: dispatch.clientSeq } }); + agentHostService.fireAction({ action: { type: 'session/turnComplete', session: action.session, turnId: action.turnId } as ISessionAction, serverSeq: 2, origin: undefined }); + await turnPromise; + + assert.strictEqual(serverRequestEvents.length, 0, 'Client-dispatched turns should not trigger onDidStartServerRequest'); + }); + }); });