mirror of
https://github.com/microsoft/vscode.git
synced 2025-12-19 17:58:39 +00:00
files - lock all resources involved in atomic writes (#194009)
* files - lock all resources involved in atomic writes * do not acquire locks in random order * add test * stop support for buffered atomic writes
This commit is contained in:
@@ -14,7 +14,7 @@ import { Disposable, DisposableStore, dispose, IDisposable, toDisposable } from
|
||||
import { TernarySearchTree } from 'vs/base/common/ternarySearchTree';
|
||||
import { Schemas } from 'vs/base/common/network';
|
||||
import { mark } from 'vs/base/common/performance';
|
||||
import { basename, dirname, extUri, extUriIgnorePathCase, IExtUri, isAbsolutePath, joinPath } from 'vs/base/common/resources';
|
||||
import { extUri, extUriIgnorePathCase, IExtUri, isAbsolutePath } from 'vs/base/common/resources';
|
||||
import { consumeStream, isReadableBufferedStream, isReadableStream, listenStream, newWriteableStream, peekReadable, peekStream, transform } from 'vs/base/common/stream';
|
||||
import { URI } from 'vs/base/common/uri';
|
||||
import { localize } from 'vs/nls';
|
||||
@@ -408,17 +408,7 @@ export class FileService extends Disposable implements IFileService {
|
||||
|
||||
// write file: buffered
|
||||
else {
|
||||
const contents = bufferOrReadableOrStreamOrBufferedStream instanceof VSBuffer ? bufferToReadable(bufferOrReadableOrStreamOrBufferedStream) : bufferOrReadableOrStreamOrBufferedStream;
|
||||
|
||||
// atomic write
|
||||
if (writeFileOptions?.atomic !== false && writeFileOptions?.atomic?.postfix) {
|
||||
await this.doWriteBufferedAtomic(provider, resource, joinPath(dirname(resource), `${basename(resource)}${writeFileOptions.atomic.postfix}`), writeFileOptions, contents);
|
||||
}
|
||||
|
||||
// non-atomic write
|
||||
else {
|
||||
await this.doWriteBuffered(provider, resource, writeFileOptions, contents);
|
||||
}
|
||||
await this.doWriteBuffered(provider, resource, writeFileOptions, bufferOrReadableOrStreamOrBufferedStream instanceof VSBuffer ? bufferToReadable(bufferOrReadableOrStreamOrBufferedStream) : bufferOrReadableOrStreamOrBufferedStream);
|
||||
}
|
||||
|
||||
// events
|
||||
@@ -442,7 +432,11 @@ export class FileService extends Disposable implements IFileService {
|
||||
const atomic = !!options?.atomic;
|
||||
if (atomic) {
|
||||
if (!(provider.capabilities & FileSystemProviderCapabilities.FileAtomicWrite)) {
|
||||
throw new Error(localize('writeFailedAtomicUnsupported', "Unable to atomically write file '{0}' because provider does not support it.", this.resourceForError(resource)));
|
||||
throw new Error(localize('writeFailedAtomicUnsupported1', "Unable to atomically write file '{0}' because provider does not support it.", this.resourceForError(resource)));
|
||||
}
|
||||
|
||||
if (!(provider.capabilities & FileSystemProviderCapabilities.FileReadWrite)) {
|
||||
throw new Error(localize('writeFailedAtomicUnsupported2', "Unable to atomically write file '{0}' because provider does not support unbuffered writes.", this.resourceForError(resource)));
|
||||
}
|
||||
|
||||
if (unlock) {
|
||||
@@ -1183,28 +1177,6 @@ export class FileService extends Disposable implements IFileService {
|
||||
|
||||
private readonly writeQueue = this._register(new ResourceQueue());
|
||||
|
||||
private async doWriteBufferedAtomic(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, tempResource: URI, options: IWriteFileOptions | undefined, readableOrStreamOrBufferedStream: VSBufferReadable | VSBufferReadableStream | VSBufferReadableBufferedStream): Promise<void> {
|
||||
|
||||
// Write to temp resource first
|
||||
await this.doWriteBuffered(provider, tempResource, options, readableOrStreamOrBufferedStream);
|
||||
|
||||
try {
|
||||
|
||||
// Rename over existing to ensure atomic replace
|
||||
await provider.rename(tempResource, resource, { overwrite: true });
|
||||
} catch (error) {
|
||||
|
||||
// Cleanup in case of rename error
|
||||
try {
|
||||
await provider.delete(tempResource, { recursive: false, useTrash: false, atomic: false });
|
||||
} catch (error) {
|
||||
// ignore - we want the outer error to bubble up
|
||||
}
|
||||
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
private async doWriteBuffered(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, options: IWriteFileOptions | undefined, readableOrStreamOrBufferedStream: VSBufferReadable | VSBufferReadableStream | VSBufferReadableBufferedStream): Promise<void> {
|
||||
return this.writeQueue.queueFor(resource, this.getExtUri(provider).providerExtUri).queue(async () => {
|
||||
|
||||
|
||||
@@ -252,28 +252,41 @@ export class DiskFileSystemProvider extends AbstractDiskFileSystemProvider imple
|
||||
|
||||
private async doWriteFileAtomic(resource: URI, tempResource: URI, content: Uint8Array, opts: IFileWriteOptions): Promise<void> {
|
||||
|
||||
// Write to temp resource first
|
||||
await this.doWriteFile(tempResource, content, opts);
|
||||
// Ensure to create locks for all resources involved
|
||||
// since atomic write involves mutiple disk operations
|
||||
// and resources.
|
||||
|
||||
const locks = new DisposableStore();
|
||||
|
||||
try {
|
||||
locks.add(await this.createResourceLock(resource));
|
||||
locks.add(await this.createResourceLock(tempResource));
|
||||
|
||||
// Rename over existing to ensure atomic replace
|
||||
await this.rename(tempResource, resource, { overwrite: true });
|
||||
// Write to temp resource first
|
||||
await this.doWriteFile(tempResource, content, opts, true /* disable write lock */);
|
||||
|
||||
} catch (error) {
|
||||
|
||||
// Cleanup in case of rename error
|
||||
try {
|
||||
await this.delete(tempResource, { recursive: false, useTrash: false, atomic: false });
|
||||
} catch (error) {
|
||||
// ignore - we want the outer error to bubble up
|
||||
}
|
||||
|
||||
throw error;
|
||||
// Rename over existing to ensure atomic replace
|
||||
await this.rename(tempResource, resource, { overwrite: true });
|
||||
|
||||
} catch (error) {
|
||||
|
||||
// Cleanup in case of rename error
|
||||
try {
|
||||
await this.delete(tempResource, { recursive: false, useTrash: false, atomic: false });
|
||||
} catch (error) {
|
||||
// ignore - we want the outer error to bubble up
|
||||
}
|
||||
|
||||
throw error;
|
||||
}
|
||||
} finally {
|
||||
locks.dispose();
|
||||
}
|
||||
}
|
||||
|
||||
private async doWriteFile(resource: URI, content: Uint8Array, opts: IFileWriteOptions): Promise<void> {
|
||||
private async doWriteFile(resource: URI, content: Uint8Array, opts: IFileWriteOptions, disableWriteLock?: boolean): Promise<void> {
|
||||
let handle: number | undefined = undefined;
|
||||
try {
|
||||
const filePath = this.toFilePath(resource);
|
||||
@@ -293,7 +306,7 @@ export class DiskFileSystemProvider extends AbstractDiskFileSystemProvider imple
|
||||
}
|
||||
|
||||
// Open
|
||||
handle = await this.open(resource, { create: true, unlock: opts.unlock });
|
||||
handle = await this.open(resource, { create: true, unlock: opts.unlock }, disableWriteLock);
|
||||
|
||||
// Write content at once
|
||||
await this.write(handle, 0, content, 0, content.byteLength);
|
||||
@@ -317,14 +330,14 @@ export class DiskFileSystemProvider extends AbstractDiskFileSystemProvider imple
|
||||
DiskFileSystemProvider.canFlush = enabled;
|
||||
}
|
||||
|
||||
async open(resource: URI, opts: IFileOpenOptions): Promise<number> {
|
||||
async open(resource: URI, opts: IFileOpenOptions, disableWriteLock?: boolean): Promise<number> {
|
||||
const filePath = this.toFilePath(resource);
|
||||
|
||||
// Writes: guard multiple writes to the same resource
|
||||
// behind a single lock to prevent races when writing
|
||||
// from multiple places at the same time to the same file
|
||||
let lock: IDisposable | undefined = undefined;
|
||||
if (isFileOpenForWriteOptions(opts)) {
|
||||
if (isFileOpenForWriteOptions(opts) && !disableWriteLock) {
|
||||
lock = await this.createResourceLock(resource);
|
||||
}
|
||||
|
||||
@@ -756,13 +769,8 @@ export class DiskFileSystemProvider extends AbstractDiskFileSystemProvider imple
|
||||
const locks = new DisposableStore();
|
||||
|
||||
try {
|
||||
const [fromLock, toLock] = await Promise.all([
|
||||
this.createResourceLock(from),
|
||||
this.createResourceLock(to)
|
||||
]);
|
||||
|
||||
locks.add(fromLock);
|
||||
locks.add(toLock);
|
||||
locks.add(await this.createResourceLock(from));
|
||||
locks.add(await this.createResourceLock(to));
|
||||
|
||||
if (mkdir) {
|
||||
await Promises.mkdir(dirname(toFilePath), { recursive: true });
|
||||
|
||||
@@ -146,16 +146,13 @@ flakySuite('Disk File Service', function () {
|
||||
setup(async () => {
|
||||
const logService = new NullLogService();
|
||||
|
||||
service = new FileService(logService);
|
||||
disposables.add(service);
|
||||
service = disposables.add(new FileService(logService));
|
||||
|
||||
fileProvider = new TestDiskFileSystemProvider(logService);
|
||||
fileProvider = disposables.add(new TestDiskFileSystemProvider(logService));
|
||||
disposables.add(service.registerProvider(Schemas.file, fileProvider));
|
||||
disposables.add(fileProvider);
|
||||
|
||||
testProvider = new TestDiskFileSystemProvider(logService);
|
||||
testProvider = disposables.add(new TestDiskFileSystemProvider(logService));
|
||||
disposables.add(service.registerProvider(testSchema, testProvider));
|
||||
disposables.add(testProvider);
|
||||
|
||||
testDir = getRandomTestPath(tmpdir(), 'vsctests', 'diskfileservice');
|
||||
|
||||
@@ -1818,7 +1815,14 @@ flakySuite('Disk File Service', function () {
|
||||
test('writeFile - buffered (atomic)', async () => {
|
||||
setCapabilities(fileProvider, FileSystemProviderCapabilities.FileOpenReadWriteClose | FileSystemProviderCapabilities.FileAtomicWrite);
|
||||
|
||||
return testWriteFile(true);
|
||||
let e;
|
||||
try {
|
||||
await testWriteFile(true);
|
||||
} catch (error) {
|
||||
e = error;
|
||||
}
|
||||
|
||||
assert.ok(e);
|
||||
});
|
||||
|
||||
test('writeFile - unbuffered (atomic)', async () => {
|
||||
@@ -1869,7 +1873,14 @@ flakySuite('Disk File Service', function () {
|
||||
test('writeFile (large file) - buffered (atomic)', async () => {
|
||||
setCapabilities(fileProvider, FileSystemProviderCapabilities.FileOpenReadWriteClose | FileSystemProviderCapabilities.FileAtomicWrite);
|
||||
|
||||
return testWriteFileLarge(true);
|
||||
let e;
|
||||
try {
|
||||
await testWriteFileLarge(true);
|
||||
} catch (error) {
|
||||
e = error;
|
||||
}
|
||||
|
||||
assert.ok(e);
|
||||
});
|
||||
|
||||
test('writeFile (large file) - unbuffered (atomic)', async () => {
|
||||
@@ -1890,6 +1901,29 @@ flakySuite('Disk File Service', function () {
|
||||
assert.strictEqual(readFileSync(resource.fsPath).toString(), newContent);
|
||||
}
|
||||
|
||||
test('writeFile (large file) - unbuffered (atomic) - concurrent writes with multiple services', async () => {
|
||||
setCapabilities(fileProvider, FileSystemProviderCapabilities.FileReadWrite | FileSystemProviderCapabilities.FileAtomicWrite);
|
||||
|
||||
const resource = URI.file(join(testDir, 'lorem.txt'));
|
||||
|
||||
const content = readFileSync(resource.fsPath);
|
||||
const newContent = content.toString() + content.toString();
|
||||
|
||||
const promises: Promise<IFileStatWithMetadata>[] = [];
|
||||
let suffix = 0;
|
||||
for (let i = 0; i < 10; i++) {
|
||||
const service = disposables.add(new FileService(new NullLogService()));
|
||||
disposables.add(service.registerProvider(Schemas.file, fileProvider));
|
||||
|
||||
promises.push(service.writeFile(resource, VSBuffer.fromString(`${newContent}${++suffix}`), { atomic: { postfix: '.vsctmp' } }));
|
||||
await timeout(0);
|
||||
}
|
||||
|
||||
await Promise.allSettled(promises);
|
||||
|
||||
assert.strictEqual(readFileSync(resource.fsPath).toString(), `${newContent}${suffix}`);
|
||||
});
|
||||
|
||||
test('writeFile - buffered - readonly throws', async () => {
|
||||
setCapabilities(fileProvider, FileSystemProviderCapabilities.FileOpenReadWriteClose | FileSystemProviderCapabilities.Readonly);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user