/*--------------------------------------------------------------------------------------------- * Copyright (c) Microsoft Corporation. All rights reserved. * Licensed under the MIT License. See License.txt in the project root for license information. *--------------------------------------------------------------------------------------------*/ import { CancellationToken } from 'vs/base/common/cancellation'; import { Emitter, Event } from 'vs/base/common/event'; import { Disposable, DisposableStore } from 'vs/base/common/lifecycle'; import { isArray } from 'vs/base/common/types'; import { URI } from 'vs/base/common/uri'; import { IChannel, IServerChannel } from 'vs/base/parts/ipc/common/ipc'; import { ILogService } from 'vs/platform/log/common/log'; import { IManualSyncTask, IResourcePreview, ISyncResourceHandle, ISyncResourcePreview, ISyncTask, IUserDataManifest, IUserDataSyncService, SyncResource, SyncStatus, UserDataSyncError } from 'vs/platform/userDataSync/common/userDataSync'; type ManualSyncTaskEvent = { manualSyncTaskId: string; data: T }; export class UserDataSyncChannel implements IServerChannel { private readonly manualSyncTasks = new Map(); private readonly onManualSynchronizeResources = new Emitter>(); constructor(private readonly service: IUserDataSyncService, private readonly logService: ILogService) { } listen(_: unknown, event: string): Event { switch (event) { // sync case 'onDidChangeStatus': return this.service.onDidChangeStatus; case 'onDidChangeConflicts': return this.service.onDidChangeConflicts; case 'onDidChangeLocal': return this.service.onDidChangeLocal; case 'onDidChangeLastSyncTime': return this.service.onDidChangeLastSyncTime; case 'onSyncErrors': return this.service.onSyncErrors; case 'onDidResetLocal': return this.service.onDidResetLocal; case 'onDidResetRemote': return this.service.onDidResetRemote; // manual sync case 'manualSync/onSynchronizeResources': return this.onManualSynchronizeResources.event; } throw new Error(`Event not found: ${event}`); } async call(context: any, command: string, args?: any): Promise { try { const result = await this._call(context, command, args); return result; } catch (e) { this.logService.error(e); throw e; } } private async _call(context: any, command: string, args?: any): Promise { switch (command) { // sync case '_getInitialData': return Promise.resolve([this.service.status, this.service.conflicts, this.service.lastSyncTime]); case 'replace': return this.service.replace(URI.revive(args[0])); case 'reset': return this.service.reset(); case 'resetRemote': return this.service.resetRemote(); case 'resetLocal': return this.service.resetLocal(); case 'hasPreviouslySynced': return this.service.hasPreviouslySynced(); case 'hasLocalData': return this.service.hasLocalData(); case 'accept': return this.service.accept(args[0], URI.revive(args[1]), args[2], args[3]); case 'resolveContent': return this.service.resolveContent(URI.revive(args[0])); case 'getLocalSyncResourceHandles': return this.service.getLocalSyncResourceHandles(args[0]); case 'getRemoteSyncResourceHandles': return this.service.getRemoteSyncResourceHandles(args[0]); case 'getAssociatedResources': return this.service.getAssociatedResources(args[0], { created: args[1].created, uri: URI.revive(args[1].uri) }); case 'getMachineId': return this.service.getMachineId(args[0], { created: args[1].created, uri: URI.revive(args[1].uri) }); case 'createManualSyncTask': return this.createManualSyncTask(); } // manual sync if (command.startsWith('manualSync/')) { const manualSyncTaskCommand = command.substring('manualSync/'.length); const manualSyncTaskId = args[0]; const manualSyncTask = this.getManualSyncTask(manualSyncTaskId); args = (>args).slice(1); switch (manualSyncTaskCommand) { case 'preview': return manualSyncTask.preview(); case 'accept': return manualSyncTask.accept(URI.revive(args[0]), args[1]); case 'merge': return manualSyncTask.merge(URI.revive(args[0])); case 'discard': return manualSyncTask.discard(URI.revive(args[0])); case 'discardConflicts': return manualSyncTask.discardConflicts(); case 'apply': return manualSyncTask.apply(); case 'pull': return manualSyncTask.pull(); case 'push': return manualSyncTask.push(); case 'stop': return manualSyncTask.stop(); case '_getStatus': return manualSyncTask.status; case 'dispose': return this.disposeManualSyncTask(manualSyncTask); } } throw new Error('Invalid call'); } private getManualSyncTask(manualSyncTaskId: string): IManualSyncTask { const value = this.manualSyncTasks.get(this.createKey(manualSyncTaskId)); if (!value) { throw new Error(`Manual sync taks not found: ${manualSyncTaskId}`); } return value.manualSyncTask; } private async createManualSyncTask(): Promise<{ id: string; manifest: IUserDataManifest | null; status: SyncStatus }> { const disposables = new DisposableStore(); const manualSyncTask = disposables.add(await this.service.createManualSyncTask()); disposables.add(manualSyncTask.onSynchronizeResources(synchronizeResources => this.onManualSynchronizeResources.fire({ manualSyncTaskId: manualSyncTask.id, data: synchronizeResources }))); this.manualSyncTasks.set(this.createKey(manualSyncTask.id), { manualSyncTask, disposables }); return { id: manualSyncTask.id, manifest: manualSyncTask.manifest, status: manualSyncTask.status }; } private disposeManualSyncTask(manualSyncTask: IManualSyncTask): void { manualSyncTask.dispose(); const key = this.createKey(manualSyncTask.id); this.manualSyncTasks.get(key)?.disposables.dispose(); this.manualSyncTasks.delete(key); } private createKey(manualSyncTaskId: string): string { return `manualSyncTask-${manualSyncTaskId}`; } } export class UserDataSyncChannelClient extends Disposable implements IUserDataSyncService { declare readonly _serviceBrand: undefined; private readonly channel: IChannel; private _status: SyncStatus = SyncStatus.Uninitialized; get status(): SyncStatus { return this._status; } private _onDidChangeStatus: Emitter = this._register(new Emitter()); readonly onDidChangeStatus: Event = this._onDidChangeStatus.event; get onDidChangeLocal(): Event { return this.channel.listen('onDidChangeLocal'); } private _conflicts: [SyncResource, IResourcePreview[]][] = []; get conflicts(): [SyncResource, IResourcePreview[]][] { return this._conflicts; } private _onDidChangeConflicts: Emitter<[SyncResource, IResourcePreview[]][]> = this._register(new Emitter<[SyncResource, IResourcePreview[]][]>()); readonly onDidChangeConflicts: Event<[SyncResource, IResourcePreview[]][]> = this._onDidChangeConflicts.event; private _lastSyncTime: number | undefined = undefined; get lastSyncTime(): number | undefined { return this._lastSyncTime; } private _onDidChangeLastSyncTime: Emitter = this._register(new Emitter()); readonly onDidChangeLastSyncTime: Event = this._onDidChangeLastSyncTime.event; private _onSyncErrors: Emitter<[SyncResource, UserDataSyncError][]> = this._register(new Emitter<[SyncResource, UserDataSyncError][]>()); readonly onSyncErrors: Event<[SyncResource, UserDataSyncError][]> = this._onSyncErrors.event; get onDidResetLocal(): Event { return this.channel.listen('onDidResetLocal'); } get onDidResetRemote(): Event { return this.channel.listen('onDidResetRemote'); } constructor(userDataSyncChannel: IChannel) { super(); this.channel = { call(command: string, arg?: any, cancellationToken?: CancellationToken): Promise { return userDataSyncChannel.call(command, arg, cancellationToken) .then(null, error => { throw UserDataSyncError.toUserDataSyncError(error); }); }, listen(event: string, arg?: any): Event { return userDataSyncChannel.listen(event, arg); } }; this.channel.call<[SyncStatus, [SyncResource, IResourcePreview[]][], number | undefined]>('_getInitialData').then(([status, conflicts, lastSyncTime]) => { this.updateStatus(status); this.updateConflicts(conflicts); if (lastSyncTime) { this.updateLastSyncTime(lastSyncTime); } this._register(this.channel.listen('onDidChangeStatus')(status => this.updateStatus(status))); this._register(this.channel.listen('onDidChangeLastSyncTime')(lastSyncTime => this.updateLastSyncTime(lastSyncTime))); }); this._register(this.channel.listen<[SyncResource, IResourcePreview[]][]>('onDidChangeConflicts')(conflicts => this.updateConflicts(conflicts))); this._register(this.channel.listen<[SyncResource, Error][]>('onSyncErrors')(errors => this._onSyncErrors.fire(errors.map(([source, error]) => ([source, UserDataSyncError.toUserDataSyncError(error)]))))); } createSyncTask(): Promise { throw new Error('not supported'); } async createManualSyncTask(): Promise { const { id, manifest, status } = await this.channel.call<{ id: string; manifest: IUserDataManifest | null; status: SyncStatus }>('createManualSyncTask'); const that = this; const manualSyncTaskChannelClient = new ManualSyncTaskChannelClient(id, manifest, status, { async call(command: string, arg?: any, cancellationToken?: CancellationToken): Promise { return that.channel.call(`manualSync/${command}`, [id, ...(isArray(arg) ? arg : [arg])], cancellationToken); }, listen(event: string, arg?: any): Event { return Event.map( Event.filter(that.channel.listen<{ manualSyncTaskId: string; data: T }>(`manualSync/${event}`, arg), e => !manualSyncTaskChannelClient.isDiposed() && e.manualSyncTaskId === id), e => e.data); } }); return manualSyncTaskChannelClient; } replace(uri: URI): Promise { return this.channel.call('replace', [uri]); } reset(): Promise { return this.channel.call('reset'); } resetRemote(): Promise { return this.channel.call('resetRemote'); } resetLocal(): Promise { return this.channel.call('resetLocal'); } hasPreviouslySynced(): Promise { return this.channel.call('hasPreviouslySynced'); } hasLocalData(): Promise { return this.channel.call('hasLocalData'); } accept(syncResource: SyncResource, resource: URI, content: string | null, apply: boolean): Promise { return this.channel.call('accept', [syncResource, resource, content, apply]); } resolveContent(resource: URI): Promise { return this.channel.call('resolveContent', [resource]); } async getLocalSyncResourceHandles(resource: SyncResource): Promise { const handles = await this.channel.call('getLocalSyncResourceHandles', [resource]); return handles.map(({ created, uri }) => ({ created, uri: URI.revive(uri) })); } async getRemoteSyncResourceHandles(resource: SyncResource): Promise { const handles = await this.channel.call('getRemoteSyncResourceHandles', [resource]); return handles.map(({ created, uri }) => ({ created, uri: URI.revive(uri) })); } async getAssociatedResources(resource: SyncResource, syncResourceHandle: ISyncResourceHandle): Promise<{ resource: URI; comparableResource: URI }[]> { const result = await this.channel.call<{ resource: URI; comparableResource: URI }[]>('getAssociatedResources', [resource, syncResourceHandle]); return result.map(({ resource, comparableResource }) => ({ resource: URI.revive(resource), comparableResource: URI.revive(comparableResource) })); } async getMachineId(resource: SyncResource, syncResourceHandle: ISyncResourceHandle): Promise { return this.channel.call('getMachineId', [resource, syncResourceHandle]); } private async updateStatus(status: SyncStatus): Promise { this._status = status; this._onDidChangeStatus.fire(status); } private async updateConflicts(conflicts: [SyncResource, IResourcePreview[]][]): Promise { // Revive URIs this._conflicts = conflicts.map(([syncResource, conflicts]) => ([ syncResource, conflicts.map(r => ({ ...r, localResource: URI.revive(r.localResource), remoteResource: URI.revive(r.remoteResource), previewResource: URI.revive(r.previewResource), })) ])); this._onDidChangeConflicts.fire(this._conflicts); } private updateLastSyncTime(lastSyncTime: number): void { if (this._lastSyncTime !== lastSyncTime) { this._lastSyncTime = lastSyncTime; this._onDidChangeLastSyncTime.fire(lastSyncTime); } } } class ManualSyncTaskChannelClient extends Disposable implements IManualSyncTask { private readonly channel: IChannel; get onSynchronizeResources(): Event<[SyncResource, URI[]][]> { return this.channel.listen<[SyncResource, URI[]][]>('onSynchronizeResources'); } private _status: SyncStatus; get status(): SyncStatus { return this._status; } constructor( readonly id: string, readonly manifest: IUserDataManifest | null, status: SyncStatus, manualSyncTaskChannel: IChannel ) { super(); this._status = status; const that = this; this.channel = { async call(command: string, arg?: any, cancellationToken?: CancellationToken): Promise { try { const result = await manualSyncTaskChannel.call(command, arg, cancellationToken); if (!that.isDiposed()) { that._status = await manualSyncTaskChannel.call('_getStatus'); } return result; } catch (error) { throw UserDataSyncError.toUserDataSyncError(error); } }, listen(event: string, arg?: any): Event { return manualSyncTaskChannel.listen(event, arg); } }; } async preview(): Promise<[SyncResource, ISyncResourcePreview][]> { const previews = await this.channel.call<[SyncResource, ISyncResourcePreview][]>('preview'); return this.deserializePreviews(previews); } async accept(resource: URI, content?: string | null): Promise<[SyncResource, ISyncResourcePreview][]> { const previews = await this.channel.call<[SyncResource, ISyncResourcePreview][]>('accept', [resource, content]); return this.deserializePreviews(previews); } async merge(resource?: URI): Promise<[SyncResource, ISyncResourcePreview][]> { const previews = await this.channel.call<[SyncResource, ISyncResourcePreview][]>('merge', [resource]); return this.deserializePreviews(previews); } async discard(resource: URI): Promise<[SyncResource, ISyncResourcePreview][]> { const previews = await this.channel.call<[SyncResource, ISyncResourcePreview][]>('discard', [resource]); return this.deserializePreviews(previews); } async discardConflicts(): Promise<[SyncResource, ISyncResourcePreview][]> { const previews = await this.channel.call<[SyncResource, ISyncResourcePreview][]>('discardConflicts'); return this.deserializePreviews(previews); } async apply(): Promise<[SyncResource, ISyncResourcePreview][]> { const previews = await this.channel.call<[SyncResource, ISyncResourcePreview][]>('apply'); return this.deserializePreviews(previews); } pull(): Promise { return this.channel.call('pull'); } push(): Promise { return this.channel.call('push'); } stop(): Promise { return this.channel.call('stop'); } private _disposed = false; isDiposed() { return this._disposed; } override dispose(): void { this._disposed = true; this.channel.call('dispose'); } private deserializePreviews(previews: [SyncResource, ISyncResourcePreview][]): [SyncResource, ISyncResourcePreview][] { return previews.map(([syncResource, preview]) => ([ syncResource, { isLastSyncFromCurrentMachine: preview.isLastSyncFromCurrentMachine, resourcePreviews: preview.resourcePreviews.map(r => ({ ...r, localResource: URI.revive(r.localResource), remoteResource: URI.revive(r.remoteResource), previewResource: URI.revive(r.previewResource), acceptedResource: URI.revive(r.acceptedResource), })) } ])); } }