From 83f4dfdff121dee60169620e738dc2b9dfea0bec Mon Sep 17 00:00:00 2001 From: Benjamin Pasero Date: Wed, 10 Feb 2021 14:34:13 +0100 Subject: [PATCH] streams - introduce and adopt listenStream cc @alexdima --- src/vs/base/common/stream.ts | 138 +++++++++++------- src/vs/base/test/common/stream.test.ts | 86 +++++++---- src/vs/editor/common/model/textModel.ts | 49 ++++--- src/vs/platform/files/common/fileService.ts | 49 +++---- .../electron-browser/diskFileService.test.ts | 14 ++ .../electron-main/webviewProtocolProvider.ts | 38 ++--- .../contrib/files/browser/fileActions.ts | 29 ++-- .../services/textfile/common/encoding.ts | 79 +++++----- .../test/browser/workbenchTestServices.ts | 17 +-- 9 files changed, 281 insertions(+), 218 deletions(-) diff --git a/src/vs/base/common/stream.ts b/src/vs/base/common/stream.ts index 8874a0fa41a..d9e26f56af9 100644 --- a/src/vs/base/common/stream.ts +++ b/src/vs/base/common/stream.ts @@ -16,6 +16,13 @@ export interface ReadableStreamEvents { /** * The 'data' event is emitted whenever the stream is * relinquishing ownership of a chunk of data to a consumer. + * + * NOTE: PLEASE UNDERSTAND THAT ADDING A DATA LISTENER CAN + * TURN THE STREAM INTO FLOWING MODE. IT IS THEREFOR THE + * LAST LISTENER THAT SHOULD BE ADDED AND NOT THE FIRST + * + * Use `listenStream` as a helper method to listen to + * stream events in the right order. */ on(event: 'data', callback: (data: T) => void): void; @@ -268,7 +275,7 @@ class WriteableStreamImpl implements WriteableStream { // end with data or error if provided if (result instanceof Error) { this.error(result); - } else if (result) { + } else if (typeof result !== 'undefined') { this.write(result); } @@ -489,18 +496,74 @@ export function peekReadable(readable: Readable, reducer: IReducer, max } /** - * Helper to fully read a T stream into a T. + * Helper to fully read a T stream into a T or consuming + * a stream fully, awaiting all the events without caring + * about the data. */ -export function consumeStream(stream: ReadableStreamEvents, reducer: IReducer): Promise { +export function consumeStream(stream: ReadableStreamEvents, reducer: IReducer): Promise; +export function consumeStream(stream: ReadableStreamEvents): Promise; +export function consumeStream(stream: ReadableStreamEvents, reducer?: IReducer): Promise { return new Promise((resolve, reject) => { const chunks: T[] = []; - stream.on('data', data => chunks.push(data)); - stream.on('error', error => reject(error)); - stream.on('end', () => resolve(reducer(chunks))); + listenStream(stream, { + onData: chunk => { + if (reducer) { + chunks.push(chunk); + } + }, + onError: error => { + if (reducer) { + reject(error); + } else { + resolve(undefined); + } + }, + onEnd: () => { + if (reducer) { + resolve(reducer(chunks)); + } else { + resolve(undefined); + } + } + }); }); } +export interface IStreamListener { + + /** + * The 'data' event is emitted whenever the stream is + * relinquishing ownership of a chunk of data to a consumer. + */ + onData(data: T): void; + + /** + * Emitted when any error occurs. + */ + onError(err: Error): void; + + /** + * The 'end' event is emitted when there is no more data + * to be consumed from the stream. The 'end' event will + * not be emitted unless the data is completely consumed. + */ + onEnd(): void; +} + +/** + * Helper to listen to all events of a T stream in proper order. + */ +export function listenStream(stream: ReadableStreamEvents, listener: IStreamListener): void { + stream.on('error', error => listener.onError(error)); + stream.on('end', () => listener.onEnd()); + + // Adding the `data` listener will turn the stream + // into flowing mode. As such it is important to + // add this listener last (DO NOT CHANGE!) + stream.on('data', data => listener.onData(data)); +} + /** * Helper to peek up to `maxChunks` into a stream. The return type signals if * the stream has ended or not. If not, caller needs to add a `data` listener @@ -509,9 +572,9 @@ export function consumeStream(stream: ReadableStreamEvents, reducer: IRedu export function peekStream(stream: ReadableStream, maxChunks: number): Promise> { return new Promise((resolve, reject) => { const streamListeners = new DisposableStore(); + const buffer: T[] = []; // Data Listener - const buffer: T[] = []; const dataListener = (chunk: T) => { // Add to buffer @@ -529,23 +592,27 @@ export function peekStream(stream: ReadableStream, maxChunks: number): Pro } }; - streamListeners.add(toDisposable(() => stream.removeListener('data', dataListener))); - stream.on('data', dataListener); - // Error Listener const errorListener = (error: Error) => { return reject(error); }; - streamListeners.add(toDisposable(() => stream.removeListener('error', errorListener))); - stream.on('error', errorListener); - + // End Listener const endListener = () => { return resolve({ stream, buffer, ended: true }); }; + streamListeners.add(toDisposable(() => stream.removeListener('error', errorListener))); + stream.on('error', errorListener); + streamListeners.add(toDisposable(() => stream.removeListener('end', endListener))); stream.on('end', endListener); + + // Important: leave the `data` listener last because + // this can turn the stream into flowing mode and we + // want `error` events to be received as well. + streamListeners.add(toDisposable(() => stream.removeListener('data', dataListener))); + stream.on('data', dataListener); }); } @@ -585,46 +652,11 @@ export function toReadable(t: T): Readable { export function transform(stream: ReadableStreamEvents, transformer: ITransformer, reducer: IReducer): ReadableStream { const target = newWriteableStream(reducer); - stream.on('data', data => target.write(transformer.data(data))); - stream.on('end', () => target.end()); - stream.on('error', error => target.error(transformer.error ? transformer.error(error) : error)); + listenStream(stream, { + onData: data => target.write(transformer.data(data)), + onError: error => target.error(transformer.error ? transformer.error(error) : error), + onEnd: () => target.end() + }); return target; } - -export interface IReadableStreamObservable { - - /** - * A promise to await the `end` or `error` event - * of a stream. - */ - errorOrEnd: () => Promise; -} - -/** - * Helper to observe a stream for certain events through - * a promise based API. - */ -export function observe(stream: ReadableStream): IReadableStreamObservable { - - // A stream is closed when it ended or errord - // We install this listener right from the - // beginning to catch the events early. - const errorOrEnd = Promise.race([ - new Promise(resolve => stream.on('end', () => resolve())), - new Promise(resolve => stream.on('error', () => resolve())) - ]); - - return { - errorOrEnd(): Promise { - - // We need to ensure the stream is flowing so that our - // listeners are getting triggered. It is possible that - // the stream is not flowing because no `data` listener - // was attached yet. - stream.resume(); - - return errorOrEnd; - } - }; -} diff --git a/src/vs/base/test/common/stream.test.ts b/src/vs/base/test/common/stream.test.ts index 0fb36467b2f..7594d7cb8e6 100644 --- a/src/vs/base/test/common/stream.test.ts +++ b/src/vs/base/test/common/stream.test.ts @@ -4,7 +4,7 @@ *--------------------------------------------------------------------------------------------*/ import * as assert from 'assert'; -import { isReadableStream, newWriteableStream, Readable, consumeReadable, peekReadable, consumeStream, ReadableStream, toStream, toReadable, transform, peekStream, isReadableBufferedStream, observe } from 'vs/base/common/stream'; +import { isReadableStream, newWriteableStream, Readable, consumeReadable, peekReadable, consumeStream, ReadableStream, toStream, toReadable, transform, peekStream, isReadableBufferedStream, listenStream } from 'vs/base/common/stream'; import { timeout } from 'vs/base/common/async'; suite('Stream', () => { @@ -69,6 +69,7 @@ suite('Stream', () => { stream.end('Final Bit'); assert.strictEqual(chunks.length, 4); assert.strictEqual(chunks[3], 'Final Bit'); + assert.strictEqual(end, true); stream.destroy(); @@ -76,6 +77,15 @@ suite('Stream', () => { assert.strictEqual(chunks.length, 4); }); + test('WriteableStream - end with empty string works', async () => { + const reducer = (strings: string[]) => strings.length > 0 ? strings.join() : 'error'; + const stream = newWriteableStream(reducer); + stream.end(''); + + const result = await consumeStream(stream, reducer); + assert.strictEqual(result, ''); + }); + test('WriteableStream - removeListener', () => { const stream = newWriteableStream(strings => strings.join()); @@ -270,6 +280,56 @@ suite('Stream', () => { assert.strictEqual(consumed, '1,2,3,4,5'); }); + test('consumeStream - without reducer', async () => { + const stream = readableToStream(arrayToReadable(['1', '2', '3', '4', '5'])); + const consumed = await consumeStream(stream); + assert.strictEqual(consumed, undefined); + }); + + test('consumeStream - without reducer and error', async () => { + const stream = newWriteableStream(strings => strings.join()); + stream.error(new Error()); + + const consumed = await consumeStream(stream); + assert.strictEqual(consumed, undefined); + }); + + test('listenStream', () => { + const stream = newWriteableStream(strings => strings.join()); + + let error = false; + let end = false; + let data = ''; + + listenStream(stream, { + onData: d => { + data = d; + }, + onError: e => { + error = true; + }, + onEnd: () => { + end = true; + } + }); + + stream.write('Hello'); + + assert.strictEqual(data, 'Hello'); + + stream.write('World'); + assert.strictEqual(data, 'World'); + + assert.strictEqual(error, false); + assert.strictEqual(end, false); + + stream.error(new Error()); + assert.strictEqual(error, true); + + stream.end('Final Bit'); + assert.strictEqual(end, true); + }); + test('peekStream', async () => { for (let i = 0; i < 5; i++) { const stream = readableToStream(arrayToReadable(['1', '2', '3', '4', '5'])); @@ -335,30 +395,6 @@ suite('Stream', () => { assert.strictEqual(consumed, '11,22,33,44,55'); }); - test('observer', async () => { - const source1 = newWriteableStream(strings => strings.join()); - setTimeout(() => source1.error(new Error())); - await observe(source1).errorOrEnd(); - - const source2 = newWriteableStream(strings => strings.join()); - setTimeout(() => source2.end('Hello Test')); - await observe(source2).errorOrEnd(); - - const source3 = newWriteableStream(strings => strings.join()); - setTimeout(() => { - source3.write('Hello Test'); - source3.error(new Error()); - }); - await observe(source3).errorOrEnd(); - - const source4 = newWriteableStream(strings => strings.join()); - setTimeout(() => { - source4.write('Hello Test'); - source4.end(); - }); - await observe(source4).errorOrEnd(); - }); - test('events are delivered even if a listener is removed during delivery', () => { const stream = newWriteableStream(strings => strings.join()); diff --git a/src/vs/editor/common/model/textModel.ts b/src/vs/editor/common/model/textModel.ts index aab72a48cd6..3680d8b5185 100644 --- a/src/vs/editor/common/model/textModel.ts +++ b/src/vs/editor/common/model/textModel.ts @@ -38,6 +38,7 @@ import { IUndoRedoService, ResourceEditStackSnapshot } from 'vs/platform/undoRed import { TextChange } from 'vs/editor/common/model/textChange'; import { Constants } from 'vs/base/common/uint'; import { PieceTreeTextBuffer } from 'vs/editor/common/model/pieceTreeTextBuffer/pieceTreeTextBuffer'; +import { listenStream } from 'vs/base/common/stream'; function createTextBufferBuilder() { return new PieceTreeTextBufferBuilder(); @@ -64,33 +65,33 @@ export function createTextBufferFactoryFromStream(stream: ITextStream | VSBuffer let done = false; - stream.on('data', (chunk: string | VSBuffer) => { - if (validator) { - const error = validator(chunk); - if (error) { + listenStream(stream, { + onData: chunk => { + if (validator) { + const error = validator(chunk); + if (error) { + done = true; + reject(error); + } + } + + if (filter) { + chunk = filter(chunk); + } + + builder.acceptChunk((typeof chunk === 'string') ? chunk : chunk.toString()); + }, + onError: error => { + if (!done) { done = true; reject(error); } - } - - if (filter) { - chunk = filter(chunk); - } - - builder.acceptChunk((typeof chunk === 'string') ? chunk : chunk.toString()); - }); - - stream.on('error', (error) => { - if (!done) { - done = true; - reject(error); - } - }); - - stream.on('end', () => { - if (!done) { - done = true; - resolve(builder.finish()); + }, + onEnd: () => { + if (!done) { + done = true; + resolve(builder.finish()); + } } }); }); diff --git a/src/vs/platform/files/common/fileService.ts b/src/vs/platform/files/common/fileService.ts index 9fddb5507f2..ab2a5236895 100644 --- a/src/vs/platform/files/common/fileService.ts +++ b/src/vs/platform/files/common/fileService.ts @@ -14,7 +14,7 @@ import { TernarySearchTree } from 'vs/base/common/map'; import { isNonEmptyArray, coalesce } from 'vs/base/common/arrays'; import { ILogService } from 'vs/platform/log/common/log'; import { VSBuffer, VSBufferReadable, readableToBuffer, bufferToReadable, streamToBuffer, VSBufferReadableStream, VSBufferReadableBufferedStream, bufferedStreamToBuffer, newWriteableBufferStream } from 'vs/base/common/buffer'; -import { isReadableStream, transform, peekReadable, peekStream, isReadableBufferedStream, newWriteableStream, IReadableStreamObservable, observe } from 'vs/base/common/stream'; +import { isReadableStream, transform, peekReadable, peekStream, isReadableBufferedStream, newWriteableStream, listenStream, consumeStream } from 'vs/base/common/stream'; import { Promises, Queue } from 'vs/base/common/async'; import { CancellationTokenSource, CancellationToken } from 'vs/base/common/cancellation'; import { Schemas } from 'vs/base/common/network'; @@ -454,8 +454,7 @@ export class FileService extends Disposable implements IFileService { throw error; }); - let fileStreamObserver: IReadableStreamObservable | undefined = undefined; - + let fileStream: VSBufferReadableStream | undefined = undefined; try { // if the etag is provided, we await the result of the validation @@ -466,8 +465,6 @@ export class FileService extends Disposable implements IFileService { await statPromise; } - let fileStream: VSBufferReadableStream | undefined = undefined; - // read unbuffered (only if either preferred, or the provider has no buffered read capability) if (!(hasOpenReadWriteCloseCapability(provider) || hasFileReadStreamCapability(provider)) || (hasReadWriteCapability(provider) && options?.preferUnbuffered)) { fileStream = this.readFileUnbuffered(provider, resource, options); @@ -483,9 +480,6 @@ export class FileService extends Disposable implements IFileService { fileStream = this.readFileBuffered(provider, resource, cancellableSource.token, options); } - // observe the stream for the error case below - fileStreamObserver = observe(fileStream); - const fileStat = await statPromise; return { @@ -497,8 +491,8 @@ export class FileService extends Disposable implements IFileService { // Await the stream to finish so that we exit this method // in a consistent state with file handles closed // (https://github.com/microsoft/vscode/issues/114024) - if (fileStreamObserver) { - await fileStreamObserver.errorOrEnd(); + if (fileStream) { + await consumeStream(fileStream); } throw new FileOperationError(localize('err.read', "Unable to read file '{0}' ({1})", this.resourceForError(resource), ensureFileSystemProviderError(error).toString()), toFileOperationResult(error), options); @@ -1065,28 +1059,29 @@ export class FileService extends Disposable implements IFileService { return new Promise(async (resolve, reject) => { - stream.on('data', async chunk => { + listenStream(stream, { + onData: async chunk => { - // pause stream to perform async write operation - stream.pause(); + // pause stream to perform async write operation + stream.pause(); - try { - await this.doWriteBuffer(provider, handle, chunk, chunk.byteLength, posInFile, 0); - } catch (error) { - return reject(error); - } + try { + await this.doWriteBuffer(provider, handle, chunk, chunk.byteLength, posInFile, 0); + } catch (error) { + return reject(error); + } - posInFile += chunk.byteLength; + posInFile += chunk.byteLength; - // resume stream now that we have successfully written - // run this on the next tick to prevent increasing the - // execution stack because resume() may call the event - // handler again before finishing. - setTimeout(() => stream.resume()); + // resume stream now that we have successfully written + // run this on the next tick to prevent increasing the + // execution stack because resume() may call the event + // handler again before finishing. + setTimeout(() => stream.resume()); + }, + onError: error => reject(error), + onEnd: () => resolve() }); - - stream.on('error', error => reject(error)); - stream.on('end', () => resolve()); }); } diff --git a/src/vs/platform/files/test/electron-browser/diskFileService.test.ts b/src/vs/platform/files/test/electron-browser/diskFileService.test.ts index 667e135bca6..b5dfa9db4d8 100644 --- a/src/vs/platform/files/test/electron-browser/diskFileService.test.ts +++ b/src/vs/platform/files/test/electron-browser/diskFileService.test.ts @@ -1584,6 +1584,20 @@ flakySuite('Disk File Service', function () { assert.strictEqual(error!.fileOperationResult, FileOperationResult.FILE_TOO_LARGE); } + (isWindows ? test.skip /* windows: cannot create file symbolic link without elevated context */ : test)('readFile - dangling symbolic link - https://github.com/microsoft/vscode/issues/116049', async () => { + const link = URI.file(join(testDir, 'small.js-link')); + await promises.symlink(join(testDir, 'small.js'), link.fsPath); + + let error: FileOperationError | undefined = undefined; + try { + await service.readFile(link); + } catch (err) { + error = err; + } + + assert.ok(error); + }); + test('createFile', async () => { return assertCreateFile(contents => VSBuffer.fromString(contents)); }); diff --git a/src/vs/platform/webview/electron-main/webviewProtocolProvider.ts b/src/vs/platform/webview/electron-main/webviewProtocolProvider.ts index e4788996e60..276b7492e63 100644 --- a/src/vs/platform/webview/electron-main/webviewProtocolProvider.ts +++ b/src/vs/platform/webview/electron-main/webviewProtocolProvider.ts @@ -9,6 +9,7 @@ import { bufferToStream, VSBufferReadableStream } from 'vs/base/common/buffer'; import { CancellationToken } from 'vs/base/common/cancellation'; import { Disposable, toDisposable } from 'vs/base/common/lifecycle'; import { FileAccess, Schemas } from 'vs/base/common/network'; +import { listenStream } from 'vs/base/common/stream'; import { URI } from 'vs/base/common/uri'; import { FileOperationError, FileOperationResult, IFileService } from 'vs/platform/files/common/files'; import { ILogService } from 'vs/platform/log/common/log'; @@ -75,28 +76,27 @@ export class WebviewProtocolProvider extends Disposable { if (!this.listening) { this.listening = true; - // Data - stream.on('data', data => { - try { - if (!this.push(data.buffer)) { - stream.pause(); // pause the stream if we should not push anymore + listenStream(stream, { + onData: data => { + try { + if (!this.push(data.buffer)) { + stream.pause(); // pause the stream if we should not push anymore + } + } catch (error) { + this.emit(error); + } + }, + onError: error => { + this.emit('error', error); + }, + onEnd: () => { + try { + this.push(null); // signal EOS + } catch (error) { + this.emit(error); } - } catch (error) { - this.emit(error); } }); - - // End - stream.on('end', () => { - try { - this.push(null); // signal EOS - } catch (error) { - this.emit(error); - } - }); - - // Error - stream.on('error', error => this.emit('error', error)); } // ensure the stream is flowing diff --git a/src/vs/workbench/contrib/files/browser/fileActions.ts b/src/vs/workbench/contrib/files/browser/fileActions.ts index d8289ff2b9a..61b0b7f33f9 100644 --- a/src/vs/workbench/contrib/files/browser/fileActions.ts +++ b/src/vs/workbench/contrib/files/browser/fileActions.ts @@ -53,6 +53,7 @@ import { ILogService } from 'vs/platform/log/common/log'; import { IUriIdentityService } from 'vs/workbench/services/uriIdentity/common/uriIdentity'; import { ResourceFileEdit } from 'vs/editor/browser/services/bulkEditService'; import { IExplorerService } from 'vs/workbench/contrib/files/browser/files'; +import { listenStream } from 'vs/base/common/stream'; export const NEW_FILE_COMMAND_ID = 'explorer.newFile'; export const NEW_FILE_LABEL = nls.localize('newFile', "New File"); @@ -1034,22 +1035,22 @@ const downloadFileHandler = async (accessor: ServicesAccessor) => { reject(); })); - sourceStream.on('data', data => { - if (!disposed) { - target.write(data.buffer); - reportProgress(contents.name, contents.size, data.byteLength, operation); + listenStream(sourceStream, { + onData: data => { + if (!disposed) { + target.write(data.buffer); + reportProgress(contents.name, contents.size, data.byteLength, operation); + } + }, + onError: error => { + disposables.dispose(); + reject(error); + }, + onEnd: () => { + disposables.dispose(); + resolve(); } }); - - sourceStream.on('error', error => { - disposables.dispose(); - reject(error); - }); - - sourceStream.on('end', () => { - disposables.dispose(); - resolve(); - }); }); } diff --git a/src/vs/workbench/services/textfile/common/encoding.ts b/src/vs/workbench/services/textfile/common/encoding.ts index c4f80a86739..fa303894781 100644 --- a/src/vs/workbench/services/textfile/common/encoding.ts +++ b/src/vs/workbench/services/textfile/common/encoding.ts @@ -3,7 +3,7 @@ * Licensed under the MIT License. See License.txt in the project root for license information. *--------------------------------------------------------------------------------------------*/ -import { Readable, ReadableStream, newWriteableStream } from 'vs/base/common/stream'; +import { Readable, ReadableStream, newWriteableStream, listenStream } from 'vs/base/common/stream'; import { VSBuffer, VSBufferReadable, VSBufferReadableStream } from 'vs/base/common/buffer'; export const UTF8 = 'utf8'; @@ -133,50 +133,47 @@ export function toDecodeStream(source: VSBufferReadableStream, options: IDecodeS } }; - // Stream error: forward to target - source.on('error', error => target.error(error)); + listenStream(source, { + onData: async chunk => { - // Stream data - source.on('data', async chunk => { - - // if the decoder is ready, we just write directly - if (decoder) { - target.write(decoder.write(chunk.buffer)); - } - - // otherwise we need to buffer the data until the stream is ready - else { - bufferedChunks.push(chunk); - bytesBuffered += chunk.byteLength; - - // buffered enough data for encoding detection, create stream - if (bytesBuffered >= minBytesRequiredForDetection) { - - // pause stream here until the decoder is ready - source.pause(); - - await createDecoder(); - - // resume stream now that decoder is ready but - // outside of this stack to reduce recursion - setTimeout(() => source.resume()); + // if the decoder is ready, we just write directly + if (decoder) { + target.write(decoder.write(chunk.buffer)); } + + // otherwise we need to buffer the data until the stream is ready + else { + bufferedChunks.push(chunk); + bytesBuffered += chunk.byteLength; + + // buffered enough data for encoding detection, create stream + if (bytesBuffered >= minBytesRequiredForDetection) { + + // pause stream here until the decoder is ready + source.pause(); + + await createDecoder(); + + // resume stream now that decoder is ready but + // outside of this stack to reduce recursion + setTimeout(() => source.resume()); + } + } + }, + onError: error => target.error(error), // simply forward to target + onEnd: async () => { + + // we were still waiting for data to do the encoding + // detection. thus, wrap up starting the stream even + // without all the data to get things going + if (!decoder) { + await createDecoder(); + } + + // end the target with the remainders of the decoder + target.end(decoder?.end()); } }); - - // Stream end - source.on('end', async () => { - - // we were still waiting for data to do the encoding - // detection. thus, wrap up starting the stream even - // without all the data to get things going - if (!decoder) { - await createDecoder(); - } - - // end the target with the remainders of the decoder - target.end(decoder?.end()); - }); }); } diff --git a/src/vs/workbench/test/browser/workbenchTestServices.ts b/src/vs/workbench/test/browser/workbenchTestServices.ts index ab712e8834c..30c54d04952 100644 --- a/src/vs/workbench/test/browser/workbenchTestServices.ts +++ b/src/vs/workbench/test/browser/workbenchTestServices.ts @@ -69,7 +69,7 @@ import { Part } from 'vs/workbench/browser/part'; import { IPanelService } from 'vs/workbench/services/panel/common/panelService'; import { IPanel } from 'vs/workbench/common/panel'; import { IBadge } from 'vs/workbench/services/activity/common/activity'; -import { VSBuffer, VSBufferReadable } from 'vs/base/common/buffer'; +import { bufferToStream, VSBuffer, VSBufferReadable } from 'vs/base/common/buffer'; import { Schemas } from 'vs/base/common/network'; import { IProductService } from 'vs/platform/product/common/productService'; import product from 'vs/platform/product/common/product'; @@ -839,20 +839,7 @@ export class TestFileService implements IFileService { return Promise.resolve({ resource, - value: { - on: (event: string, callback: Function): void => { - if (event === 'data') { - callback(this.content); - } - if (event === 'end') { - callback(); - } - }, - removeListener: () => { }, - resume: () => { }, - pause: () => { }, - destroy: () => { } - }, + value: bufferToStream(VSBuffer.fromString(this.content)), etag: 'index.txt', encoding: 'utf8', mtime: Date.now(),