Wait for chat cancellation to complete before proceeding (#300001)

* Wait for chat cancellation to complete before proceeding
This is an inherent race condition in the architecture, working on changing it, in the meantime, work around this...

* Fix test
This commit is contained in:
Rob Lourens
2026-03-07 12:44:53 -08:00
committed by GitHub
parent 5734afd12b
commit 010781f467
11 changed files with 99 additions and 19 deletions

View File

@@ -896,7 +896,7 @@ class SendToNewChatAction extends Action2 {
// Cancel any in-progress request before clearing
if (widget.viewModel) {
chatService.cancelCurrentRequestForSession(widget.viewModel.sessionResource, 'newSessionAction');
await chatService.cancelCurrentRequestForSession(widget.viewModel.sessionResource, 'newSessionAction');
}
if (widget.viewModel?.model) {
@@ -956,7 +956,7 @@ export class CancelAction extends Action2 {
});
}
run(accessor: ServicesAccessor, ...args: unknown[]) {
async run(accessor: ServicesAccessor, ...args: unknown[]) {
const context = args[0] as IChatExecuteActionContext | undefined;
const widgetService = accessor.get(IChatWidgetService);
const logService = accessor.get(ILogService);
@@ -975,7 +975,7 @@ export class CancelAction extends Action2 {
const chatService = accessor.get(IChatService);
if (widget.viewModel) {
chatService.cancelCurrentRequestForSession(widget.viewModel.sessionResource, 'cancelAction');
await chatService.cancelCurrentRequestForSession(widget.viewModel.sessionResource, 'cancelAction');
} else {
telemetryService.publicLog2<ChatStopCancellationNoopEvent, ChatStopCancellationNoopClassification>(ChatStopCancellationNoopEventName, {
source: 'cancelAction',

View File

@@ -219,7 +219,7 @@ export class ChatSendPendingImmediatelyAction extends Action2 {
});
}
override run(accessor: ServicesAccessor, ...args: unknown[]): void {
override async run(accessor: ServicesAccessor, ...args: unknown[]): Promise<void> {
const chatService = accessor.get(IChatService);
const widgetService = accessor.get(IChatWidgetService);
const [context] = args;
@@ -250,7 +250,7 @@ export class ChatSendPendingImmediatelyAction extends Action2 {
];
chatService.setPendingRequests(context.sessionResource, reordered);
chatService.cancelCurrentRequestForSession(context.sessionResource, 'queueRunNext');
await chatService.cancelCurrentRequestForSession(context.sessionResource, 'queueRunNext');
chatService.processPendingRequests(context.sessionResource);
}
}

View File

@@ -2246,7 +2246,7 @@ export class ChatWidget extends Disposable implements IChatWidget {
this.chatService.removePendingRequest(this.viewModel.sessionResource, editingRequestId);
options.queue ??= editingPendingRequest;
} else {
this.chatService.cancelCurrentRequestForSession(this.viewModel.sessionResource, 'acceptInput-editing');
await this.chatService.cancelCurrentRequestForSession(this.viewModel.sessionResource, 'acceptInput-editing');
options.queue = undefined;
}
@@ -2266,7 +2266,7 @@ export class ChatWidget extends Disposable implements IChatWidget {
options.queue ??= ChatRequestQueueKind.Queued;
}
if (model.requestNeedsInput.get() && !model.getPendingRequests().length) {
this.chatService.cancelCurrentRequestForSession(this.viewModel.sessionResource, 'acceptInput-needsInput');
await this.chatService.cancelCurrentRequestForSession(this.viewModel.sessionResource, 'acceptInput-needsInput');
options.queue ??= ChatRequestQueueKind.Queued;
}
if (requestInProgress) {

View File

@@ -1410,7 +1410,7 @@ export interface IChatService {
resendRequest(request: IChatRequestModel, options?: IChatSendRequestOptions): Promise<void>;
adoptRequest(sessionResource: URI, request: IChatRequestModel): Promise<void>;
removeRequest(sessionResource: URI, requestId: string): Promise<void>;
cancelCurrentRequestForSession(sessionResource: URI, source?: string): void;
cancelCurrentRequestForSession(sessionResource: URI, source?: string): Promise<void>;
/**
* Sets yieldRequested on the active request for the given session.
*/

View File

@@ -3,7 +3,7 @@
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
import { DeferredPromise } from '../../../../../base/common/async.js';
import { DeferredPromise, raceTimeout } from '../../../../../base/common/async.js';
import { CancellationToken, CancellationTokenSource } from '../../../../../base/common/cancellation.js';
import { toErrorMessage } from '../../../../../base/common/errorMessage.js';
import { BugIndicatingError, ErrorNoTelemetry } from '../../../../../base/common/errors.js';
@@ -67,6 +67,7 @@ class CancellableRequest implements IDisposable {
constructor(
public readonly cancellationTokenSource: CancellationTokenSource,
public requestId: string | undefined,
public readonly responseCompletePromise: Promise<void> | undefined,
@ILanguageModelToolsService private readonly toolsService: ILanguageModelToolsService
) { }
@@ -679,7 +680,7 @@ export class ChatService extends Disposable implements IChatService {
}
if (providedSession.progressObs && lastRequest && providedSession.interruptActiveResponseCallback) {
const initialCancellationRequest = this.instantiationService.createInstance(CancellableRequest, new CancellationTokenSource(), undefined);
const initialCancellationRequest = this.instantiationService.createInstance(CancellableRequest, new CancellationTokenSource(), undefined, undefined);
this._pendingRequests.set(model.sessionResource, initialCancellationRequest);
this.telemetryService.publicLog2<ChatPendingRequestChangeEvent, ChatPendingRequestChangeClassification>(ChatPendingRequestChangeEventName, { action: 'add', source: 'remoteSession', chatSessionId: chatSessionResourceToId(model.sessionResource) });
const cancellationListener = disposables.add(new MutableDisposable());
@@ -689,7 +690,7 @@ export class ChatService extends Disposable implements IChatService {
providedSession.interruptActiveResponseCallback?.().then(userConfirmedInterruption => {
if (!userConfirmedInterruption) {
// User cancelled the interruption
const newCancellationRequest = this.instantiationService.createInstance(CancellableRequest, new CancellationTokenSource(), undefined);
const newCancellationRequest = this.instantiationService.createInstance(CancellableRequest, new CancellationTokenSource(), undefined, undefined);
this._pendingRequests.set(model.sessionResource, newCancellationRequest);
this.telemetryService.publicLog2<ChatPendingRequestChangeEvent, ChatPendingRequestChangeClassification>(ChatPendingRequestChangeEventName, { action: 'add', source: 'remoteSession', chatSessionId: chatSessionResourceToId(model.sessionResource) });
cancellationListener.value = createCancellationListener(newCancellationRequest.cancellationTokenSource.token);
@@ -1230,7 +1231,7 @@ export class ChatService extends Disposable implements IChatService {
let shouldProcessPending = false;
const rawResponsePromise = sendRequestInternal();
// Note- requestId is not known at this point, assigned later
const cancellableRequest = this.instantiationService.createInstance(CancellableRequest, source, undefined);
const cancellableRequest = this.instantiationService.createInstance(CancellableRequest, source, undefined, rawResponsePromise);
this._pendingRequests.set(model.sessionResource, cancellableRequest);
this.telemetryService.publicLog2<ChatPendingRequestChangeEvent, ChatPendingRequestChangeClassification>(ChatPendingRequestChangeEventName, { action: 'add', source: 'sendRequest', chatSessionId: chatSessionResourceToId(model.sessionResource) });
rawResponsePromise.finally(() => {
@@ -1493,7 +1494,7 @@ export class ChatService extends Disposable implements IChatService {
request.response?.complete();
}
cancelCurrentRequestForSession(sessionResource: URI, source?: string): void {
async cancelCurrentRequestForSession(sessionResource: URI, source?: string): Promise<void> {
this.trace('cancelCurrentRequestForSession', `session: ${sessionResource}`);
const pendingRequest = this._pendingRequests.get(sessionResource);
if (!pendingRequest) {
@@ -1514,9 +1515,14 @@ export class ChatService extends Disposable implements IChatService {
return;
}
const responseCompletePromise = pendingRequest.responseCompletePromise;
pendingRequest.cancel();
this._pendingRequests.deleteAndDispose(sessionResource);
this.telemetryService.publicLog2<ChatPendingRequestChangeEvent, ChatPendingRequestChangeClassification>(ChatPendingRequestChangeEventName, { action: 'remove', source: source ?? 'cancelRequest', requestId: pendingRequest.requestId, chatSessionId: chatSessionResourceToId(sessionResource) });
if (responseCompletePromise) {
await raceTimeout(responseCompletePromise, 1000);
}
}
setYieldRequested(sessionResource: URI): void {

View File

@@ -149,7 +149,7 @@ class MockChatService implements IChatService {
throw new Error('Method not implemented.');
}
cancelCurrentRequestForSession(_sessionResource: URI, _source?: string): void { }
async cancelCurrentRequestForSession(_sessionResource: URI, _source?: string): Promise<void> { }
setYieldRequested(_sessionResource: URI): void { }

View File

@@ -12,6 +12,7 @@ import { constObservable, observableValue } from '../../../../../../base/common/
import { URI } from '../../../../../../base/common/uri.js';
import { assertSnapshot } from '../../../../../../base/test/common/snapshot.js';
import { ensureNoDisposablesAreLeakedInTestSuite } from '../../../../../../base/test/common/utils.js';
import { runWithFakedTimers } from '../../../../../../base/test/common/timeTravelScheduler.js';
import { Range } from '../../../../../../editor/common/core/range.js';
import { IConfigurationService } from '../../../../../../platform/configuration/common/configuration.js';
import { TestConfigurationService } from '../../../../../../platform/configuration/test/common/testConfigurationService.js';
@@ -40,7 +41,7 @@ import { TestMcpService } from '../../../../mcp/test/common/testMcpService.js';
import { ChatAgentService, IChatAgent, IChatAgentData, IChatAgentImplementation, IChatAgentService } from '../../../common/participants/chatAgents.js';
import { IChatEditingService, IChatEditingSession } from '../../../common/editing/chatEditingService.js';
import { ChatModel, IChatModel, ISerializableChatData } from '../../../common/model/chatModel.js';
import { ChatRequestQueueKind, ChatSendResult, IChatFollowup, IChatModelReference, IChatService } from '../../../common/chatService/chatService.js';
import { ChatRequestQueueKind, ChatSendResult, IChatFollowup, IChatModelReference, IChatService, ResponseModelState } from '../../../common/chatService/chatService.js';
import { ChatService } from '../../../common/chatService/chatServiceImpl.js';
import { ChatSlashCommandService, IChatSlashCommandService } from '../../../common/participants/chatSlashCommands.js';
import { IChatVariablesService } from '../../../common/attachments/chatVariables.js';
@@ -535,6 +536,79 @@ suite('ChatService', () => {
assert.ok(invokedRequests[1].includes('steering3'), 'Combined message should include steering3');
assert.ok(invokedRequests[1].includes('\n\n'), 'Combined message should use \\n\\n as separator');
});
test('cancelCurrentRequestForSession waits for response completion', async () => {
const requestStarted = new DeferredPromise<void>();
const completeRequest = new DeferredPromise<void>();
const slowAgent: IChatAgentImplementation = {
async invoke(request, progress, history, token) {
requestStarted.complete();
const listener = token.onCancellationRequested(() => {
listener.dispose();
// Simulate some cleanup delay before completing
setTimeout(() => completeRequest.complete(), 10);
});
await completeRequest.p;
return {};
},
};
testDisposables.add(chatAgentService.registerAgent('slowAgent', { ...getAgentData('slowAgent'), isDefault: true }));
testDisposables.add(chatAgentService.registerAgentImplementation('slowAgent', slowAgent));
const testService = createChatService();
const modelRef = testDisposables.add(startSessionModel(testService));
const model = modelRef.object;
const response = await testService.sendRequest(model.sessionResource, 'test request', { agentId: 'slowAgent' });
ChatSendResult.assertSent(response);
await requestStarted.p;
// Cancel and await - should wait for the response to complete
await testService.cancelCurrentRequestForSession(model.sessionResource, 'test');
// After cancel resolves, the response model should have a result
const lastRequest = model.getRequests()[0];
assert.ok(lastRequest.response, 'Response should exist after cancellation completes');
assert.strictEqual(lastRequest.response.state, ResponseModelState.Cancelled, 'Response should be in Cancelled state');
});
test('cancelCurrentRequestForSession returns after timeout if response does not complete', async () => {
const requestStarted = new DeferredPromise<void>();
const completeRequest = new DeferredPromise<void>();
const hangingAgent: IChatAgentImplementation = {
async invoke(request, progress, history, token) {
requestStarted.complete();
// Wait for external signal, ignoring cancellation to simulate a hung agent
await completeRequest.p;
return {};
},
};
testDisposables.add(chatAgentService.registerAgent('hangingAgent', { ...getAgentData('hangingAgent'), isDefault: true }));
testDisposables.add(chatAgentService.registerAgentImplementation('hangingAgent', hangingAgent));
const testService = createChatService();
const modelRef = testDisposables.add(startSessionModel(testService));
const model = modelRef.object;
const response = await testService.sendRequest(model.sessionResource, 'test request', { agentId: 'hangingAgent' });
ChatSendResult.assertSent(response);
await requestStarted.p;
// Cancel should return after timeout even though the agent has not completed.
// Use faked timers so raceTimeout's 1s setTimeout fires instantly.
await runWithFakedTimers({ useFakeTimers: true }, async () => {
await testService.cancelCurrentRequestForSession(model.sessionResource, 'test');
});
// Let the agent finish so the test cleans up properly
completeRequest.complete();
await response.data.responseCompletePromise;
});
});

View File

@@ -88,7 +88,7 @@ export class MockChatService implements IChatService {
removeRequest(sessionResource: URI, requestId: string): Promise<void> {
throw new Error('Method not implemented.');
}
cancelCurrentRequestForSession(sessionResource: URI, source?: string): void {
async cancelCurrentRequestForSession(sessionResource: URI, source?: string): Promise<void> {
throw new Error('Method not implemented.');
}
setYieldRequested(sessionResource: URI): void {

View File

@@ -605,7 +605,7 @@ export class InlineChatController implements IEditorContribution {
if (!session) {
return;
}
this._chatService.cancelCurrentRequestForSession(session.chatModel.sessionResource, 'inlineChatReject');
await this._chatService.cancelCurrentRequestForSession(session.chatModel.sessionResource, 'inlineChatReject');
await session.editingSession.reject();
session.dispose();
}

View File

@@ -86,7 +86,7 @@ export class InlineChatSessionServiceImpl implements IInlineChatSessionService {
const store = new DisposableStore();
store.add(toDisposable(() => {
this._chatService.cancelCurrentRequestForSession(chatModel.sessionResource, 'inlineChatSession');
void this._chatService.cancelCurrentRequestForSession(chatModel.sessionResource, 'inlineChatSession');
chatModel.editingSession?.reject();
this._sessions.delete(uri);
this._onDidChangeSessions.fire(this);

View File

@@ -412,7 +412,7 @@ export class TerminalChatWidget extends Disposable {
if (!model?.sessionResource) {
return;
}
this._chatService.cancelCurrentRequestForSession(model?.sessionResource, 'terminalChat');
void this._chatService.cancelCurrentRequestForSession(model?.sessionResource, 'terminalChat');
}
async viewInChat(): Promise<void> {