cli: add extra fs operations for container

Ref https://github.com/microsoft/vscode-remote-tunnels/issues/695
This commit is contained in:
Connor Peet
2023-10-18 19:08:19 -07:00
parent 96d5db84d1
commit 4aa04c7db2
3 changed files with 206 additions and 20 deletions

View File

@@ -21,6 +21,7 @@ use crate::util::http::{
};
use crate::util::io::SilentCopyProgress;
use crate::util::is_integrated_cli;
use crate::util::machine::kill_pid;
use crate::util::os::os_release;
use crate::util::sync::{new_barrier, Barrier, BarrierOpener};
@@ -29,6 +30,7 @@ use futures::FutureExt;
use opentelemetry::trace::SpanKind;
use opentelemetry::KeyValue;
use std::collections::HashMap;
use std::path::PathBuf;
use std::process::Stdio;
use tokio::pin;
use tokio::process::{ChildStderr, ChildStdin};
@@ -51,9 +53,10 @@ use super::port_forwarder::{PortForwarding, PortForwardingProcessor};
use super::protocol::{
AcquireCliParams, CallServerHttpParams, CallServerHttpResult, ChallengeIssueParams,
ChallengeIssueResponse, ChallengeVerifyParams, ClientRequestMethod, EmptyObject, ForwardParams,
ForwardResult, FsStatRequest, FsStatResponse, GetEnvResponse, GetHostnameResponse,
HttpBodyParams, HttpHeadersParams, ServeParams, ServerLog, ServerMessageParams, SpawnParams,
SpawnResult, ToClientRequest, UnforwardParams, UpdateParams, UpdateResult, VersionResponse,
ForwardResult, FsReadDirEntry, FsReadDirResponse, FsRenameRequest, FsSinglePathRequest,
FsStatResponse, GetEnvResponse, GetHostnameResponse, HttpBodyParams, HttpHeadersParams,
ServeParams, ServerLog, ServerMessageParams, SpawnParams, SpawnResult, SysKillRequest,
SysKillResponse, ToClientRequest, UnforwardParams, UpdateParams, UpdateResult, VersionResponse,
METHOD_CHALLENGE_VERIFY,
};
use super::server_bridge::ServerBridge;
@@ -306,10 +309,54 @@ fn make_socket_rpc(
rpc.register_sync("ping", |_: EmptyObject, _| Ok(EmptyObject {}));
rpc.register_sync("gethostname", |_: EmptyObject, _| handle_get_hostname());
rpc.register_sync("fs_stat", |p: FsStatRequest, c| {
rpc.register_sync("sys_kill", |p: SysKillRequest, c| {
ensure_auth(&c.auth_state)?;
handle_sys_kill(p.pid)
});
rpc.register_sync("fs_stat", |p: FsSinglePathRequest, c| {
ensure_auth(&c.auth_state)?;
handle_stat(p.path)
});
rpc.register_duplex(
"fs_read",
1,
move |mut streams, p: FsSinglePathRequest, c| async move {
ensure_auth(&c.auth_state)?;
handle_fs_read(streams.remove(0), p.path).await
},
);
rpc.register_duplex(
"fs_write",
1,
move |mut streams, p: FsSinglePathRequest, c| async move {
ensure_auth(&c.auth_state)?;
handle_fs_write(streams.remove(0), p.path).await
},
);
rpc.register_duplex(
"fs_connect",
1,
move |mut streams, p: FsSinglePathRequest, c| async move {
ensure_auth(&c.auth_state)?;
handle_fs_connect(streams.remove(0), p.path).await
},
);
rpc.register_async("fs_rm", move |p: FsSinglePathRequest, c| async move {
ensure_auth(&c.auth_state)?;
handle_fs_remove(p.path).await
});
rpc.register_sync("fs_mkdirp", |p: FsSinglePathRequest, c| {
ensure_auth(&c.auth_state)?;
handle_fs_mkdirp(p.path)
});
rpc.register_sync("fs_rename", |p: FsRenameRequest, c| {
ensure_auth(&c.auth_state)?;
handle_fs_rename(p.from_path, p.to_path)
});
rpc.register_sync("fs_readdir", |p: FsSinglePathRequest, c| {
ensure_auth(&c.auth_state)?;
handle_fs_readdir(p.path)
});
rpc.register_sync("get_env", |_: EmptyObject, c| {
ensure_auth(&c.auth_state)?;
handle_get_env()
@@ -820,16 +867,87 @@ fn handle_stat(path: String) -> Result<FsStatResponse, AnyError> {
.map(|m| FsStatResponse {
exists: true,
size: Some(m.len()),
kind: Some(match m.file_type() {
t if t.is_dir() => "dir",
t if t.is_file() => "file",
t if t.is_symlink() => "link",
_ => "unknown",
}),
kind: Some(m.file_type().into()),
})
.unwrap_or_default())
}
async fn handle_fs_read(mut out: DuplexStream, path: String) -> Result<EmptyObject, AnyError> {
let mut f = tokio::fs::File::open(path)
.await
.map_err(|e| wrap(e, "file not found"))?;
tokio::io::copy(&mut f, &mut out)
.await
.map_err(|e| wrap(e, "error reading file"))?;
Ok(EmptyObject {})
}
async fn handle_fs_write(mut input: DuplexStream, path: String) -> Result<EmptyObject, AnyError> {
let mut f = tokio::fs::File::create(path)
.await
.map_err(|e| wrap(e, "file not found"))?;
tokio::io::copy(&mut input, &mut f)
.await
.map_err(|e| wrap(e, "error writing file"))?;
Ok(EmptyObject {})
}
async fn handle_fs_connect(
mut stream: DuplexStream,
path: String,
) -> Result<EmptyObject, AnyError> {
let mut s = get_socket_rw_stream(&PathBuf::from(path))
.await
.map_err(|e| wrap(e, "could not connect to socket"))?;
tokio::io::copy_bidirectional(&mut stream, &mut s)
.await
.map_err(|e| wrap(e, "error copying stream data"))?;
Ok(EmptyObject {})
}
async fn handle_fs_remove(path: String) -> Result<EmptyObject, AnyError> {
tokio::fs::remove_dir_all(path)
.await
.map_err(|e| wrap(e, "error removing directory"))?;
Ok(EmptyObject {})
}
fn handle_fs_rename(from_path: String, to_path: String) -> Result<EmptyObject, AnyError> {
std::fs::rename(from_path, to_path).map_err(|e| wrap(e, "error renaming"))?;
Ok(EmptyObject {})
}
fn handle_fs_mkdirp(path: String) -> Result<EmptyObject, AnyError> {
std::fs::create_dir_all(path).map_err(|e| wrap(e, "error creating directory"))?;
Ok(EmptyObject {})
}
fn handle_fs_readdir(path: String) -> Result<FsReadDirResponse, AnyError> {
let mut entries = std::fs::read_dir(path).map_err(|e| wrap(e, "error listing directory"))?;
let mut contents = Vec::new();
while let Some(Ok(child)) = entries.next() {
contents.push(FsReadDirEntry {
name: child.file_name().to_string_lossy().into_owned(),
kind: child.file_type().ok().map(|v| v.into()),
});
}
Ok(FsReadDirResponse { contents })
}
fn handle_sys_kill(pid: u32) -> Result<SysKillResponse, AnyError> {
Ok(SysKillResponse {
success: kill_pid(pid),
})
}
fn handle_get_env() -> Result<GetEnvResponse, AnyError> {
Ok(GetEnvResponse {
env: std::env::vars().collect(),
@@ -1037,14 +1155,7 @@ where
let futs = FuturesUnordered::new();
if let (Some(mut a), Some(mut b)) = (p.stdout.take(), stdout) {
futs.push(
async move {
let r = tokio::io::copy(&mut a, &mut b).await;
// println!("copy exited with {:?}", r);
r
}
.boxed(),
);
futs.push(async move { tokio::io::copy(&mut a, &mut b).await }.boxed());
}
if let (Some(mut a), Some(mut b)) = (p.stderr.take(), stderr) {
futs.push(async move { tokio::io::copy(&mut a, &mut b).await }.boxed());