refactor token stream to remove floating promises

This commit is contained in:
Oleg Solomko
2025-05-05 19:04:40 -07:00
parent b49c8c8245
commit 03c1c34cda
8 changed files with 70 additions and 81 deletions

View File

@@ -1502,8 +1502,8 @@ export default tseslint.config(
'no-loop-func': 'off', '@typescript-eslint/no-loop-func': 'error',
'@typescript-eslint/no-misused-new': 'warn',
'@typescript-eslint/no-mixed-enums': 'error',
// '@typescript-eslint/no-floating-promises': 'error',
// '@typescript-eslint/no-misused-promises': 'error',
'@typescript-eslint/no-floating-promises': 'error',
'@typescript-eslint/no-misused-promises': 'error',
'@typescript-eslint/no-non-null-asserted-nullish-coalescing': 'error',
'@typescript-eslint/no-non-null-asserted-optional-chain': 'error',
'@typescript-eslint/no-non-null-assertion': 'error',

View File

@@ -4,78 +4,71 @@
*--------------------------------------------------------------------------------------------*/
import { BaseToken } from '../baseToken.js';
import { assert, assertNever } from '../../../../base/common/assert.js';
import { assertNever } from '../../../../base/common/assert.js';
import { assertDefined } from '../../../../base/common/types.js';
import { ObservableDisposable } from '../../../../base/common/observableDisposable.js';
import { newWriteableStream, WriteableStream, ReadableStream } from '../../../../base/common/stream.js';
/**
* A readable stream of provided tokens.
*/
// TODO: @legomushroom - add unit tests
export class TokenStream<T extends BaseToken> extends ObservableDisposable implements ReadableStream<T> {
/**
* Underlying writable stream instance.
*/
private readonly stream: WriteableStream<T>;
/**
* Index of the next token to be sent.
*/
private index: number;
/**
* Interval reference that is used to periodically send
* tokens to the stream in the background.
*/
private interval: ReturnType<typeof setInterval> | undefined;
private interval: ReturnType<typeof setImmediate> | undefined;
/**
* Number of tokens left to be sent.
* TODO: @legomushroom
*/
private get tokensLeft(): number {
return this.tokens.length - this.index;
}
private readonly tokens: T[];
constructor(
private readonly tokens: readonly T[],
tokens: readonly T[],
) {
super();
this.stream = newWriteableStream<T>(null);
this.index = 0;
// copy and reverse the tokens list so we can pop items from its e end
this.tokens = [...tokens].reverse();
// send couple of tokens immediately
this.sendTokens();
this.send(false);
}
/**
* Start periodically sending tokens to the stream
* asynchronously in the background.
* TODO: @legomushroom
*/
public startStream(): this {
// already running, noop
if (this.interval !== undefined) {
return this;
}
public send(
play: boolean = true,
): void {
this.sendTokens()
.then(() => {
if (this.tokens.length === 0) {
this.stream.end();
this.stopStream();
return;
}
// no tokens to send, end the stream immediately
if (this.tokens.length === 0) {
this.stream.end();
return this;
}
if (play === false) {
this.stopStream();
return;
}
// periodically send tokens to the stream
this.interval = setInterval(async () => {
if (this.tokensLeft === 0) {
clearInterval(this.interval);
delete this.interval;
return;
}
await this.sendTokens();
}, 1);
return this;
this.interval = setImmediate(this.send.bind(this));
})
.catch(() => {
this.stream.destroy();
this.stream.end();
this.stopStream();
});
}
/**
@@ -86,7 +79,7 @@ export class TokenStream<T extends BaseToken> extends ObservableDisposable imple
return this;
}
clearInterval(this.interval);
clearImmediate(this.interval);
delete this.interval;
return this;
@@ -98,32 +91,28 @@ export class TokenStream<T extends BaseToken> extends ObservableDisposable imple
private async sendTokens(
tokensCount: number = 25,
): Promise<void> {
if (this.tokensLeft <= 0) {
return;
}
// send up to 10 tokens at a time
let tokensToSend = Math.min(this.tokensLeft, tokensCount);
while (tokensToSend > 0) {
assert(
this.index < this.tokens.length,
`Token index '${this.index}' is out of bounds.`,
);
// if (this.tokens.length === 0) {
// return;
// }
// send up to 'tokensCount' tokens at a time
while ((tokensCount > 0) && (this.tokens.length > 0)) {
try {
await this.stream.write(this.tokens[this.index]);
this.index++;
tokensToSend--;
const token = this.tokens.pop();
assertDefined(
token,
`Token must be defined. Tokens left: ${this.tokens.length}.`,
);
await this.stream.write(token);
} catch {
this.stream.destroy();
this.stream.end();
this.stopStream();
return;
}
}
// if sent all tokens, end the stream immediately
if (this.tokensLeft === 0) {
this.stream.end();
}
}
public pause(): void {
@@ -134,7 +123,7 @@ export class TokenStream<T extends BaseToken> extends ObservableDisposable imple
}
public resume(): void {
this.startStream();
this.send();
this.stream.resume();
return;
@@ -158,7 +147,7 @@ export class TokenStream<T extends BaseToken> extends ObservableDisposable imple
this.stream.on(event, callback);
// this is the convention of the readable stream, - when
// the `data` event is registered, the stream is started
this.startStream();
this.send();
return;
}

View File

@@ -79,7 +79,7 @@ export class TextModelContentsProvider extends PromptContentsProviderBase<IModel
// to avoid blocking the main thread and save system resources used
let i = 1;
const linesCount = this.model.getLineCount();
const interval = setInterval(async () => {
const interval = setInterval(() => {
// if we have written all lines or lines count is zero,
// end the stream and stop the interval timer
if (i >= linesCount) {
@@ -99,14 +99,14 @@ export class TextModelContentsProvider extends PromptContentsProviderBase<IModel
try {
// write the current line to the stream
await stream.write(
stream.write(
VSBuffer.fromString(this.model.getLineContent(i)),
);
// for all lines except the last one, write the EOL character
// to separate the lines in the stream
if (i !== linesCount) {
await stream.write(
stream.write(
VSBuffer.fromString(this.model.getEOL()),
);
}

View File

@@ -45,9 +45,9 @@ export class PromptDecorator extends ProviderInstanceBase {
this.watchCursorPosition();
}
protected override async onPromptSettled(
protected override onPromptSettled(
_error?: Error,
): Promise<this> {
): this {
// by the time the promise above completes, either this object
// or the text model might be already has been disposed
if (this.disposed || this.model.isDisposed()) {

View File

@@ -32,10 +32,7 @@ class PromptHeaderDiagnosticsProvider extends ProviderInstanceBase {
/**
* Update diagnostic markers for the current editor.
*/
protected override async onPromptSettled(): Promise<this> {
// ensure that parsing process is settled
await this.parser.allSettled();
protected override onPromptSettled(): this {
// clean up all previously added markers
this.markerService.remove(MARKERS_OWNER_ID, [this.model.uri]);

View File

@@ -33,10 +33,7 @@ class PromptLinkDiagnosticsProvider extends ProviderInstanceBase {
/**
* Update diagnostic markers for the current editor.
*/
protected override async onPromptSettled(): Promise<this> {
// ensure that parsing process is settled
await this.parser.allSettled();
protected override onPromptSettled(): this {
// clean up all previously added markers
this.markerService.remove(MARKERS_OWNER_ID, [this.model.uri]);

View File

@@ -14,7 +14,7 @@ export abstract class ProviderInstanceBase extends ObservableDisposable {
/**
* Function that is called when the prompt parser is settled.
*/
protected abstract onPromptSettled(error: Error | undefined): Promise<this>;
protected abstract onPromptSettled(error: Error | undefined): this;
/**
* Returns a string representation of this object.

View File

@@ -260,7 +260,7 @@ export class BasePromptParser<TContentsProvider extends IPromptContentsProvider>
seenReferences,
);
this._onUpdate.fire();
this.firstParseResult.complete();
this.firstParseResult.end();
return this;
}
@@ -276,7 +276,7 @@ export class BasePromptParser<TContentsProvider extends IPromptContentsProvider>
this.onContentsChanged(streamOrError, seenReferences);
// indicate that we've received at least one `onContentChanged` event
this.firstParseResult.complete();
this.firstParseResult.end();
}),
);
@@ -978,8 +978,14 @@ class FirstParseResult extends DeferredPromise<void> {
/**
* Complete the underlying promise.
*/
public override complete(): Promise<void> {
public end(): void {
this._gotResult = true;
return super.complete(void 0);
super.complete(void 0)
.catch(() => {
// noop
// TODO: @legomushroom
});
return;
}
}