diff --git a/cli/src/tunnels/control_server.rs b/cli/src/tunnels/control_server.rs index bb3715832b1..48c11fc1b35 100644 --- a/cli/src/tunnels/control_server.rs +++ b/cli/src/tunnels/control_server.rs @@ -21,6 +21,7 @@ use crate::util::http::{ }; use crate::util::io::SilentCopyProgress; use crate::util::is_integrated_cli; +use crate::util::machine::kill_pid; use crate::util::os::os_release; use crate::util::sync::{new_barrier, Barrier, BarrierOpener}; @@ -29,6 +30,7 @@ use futures::FutureExt; use opentelemetry::trace::SpanKind; use opentelemetry::KeyValue; use std::collections::HashMap; +use std::path::PathBuf; use std::process::Stdio; use tokio::pin; use tokio::process::{ChildStderr, ChildStdin}; @@ -51,9 +53,10 @@ use super::port_forwarder::{PortForwarding, PortForwardingProcessor}; use super::protocol::{ AcquireCliParams, CallServerHttpParams, CallServerHttpResult, ChallengeIssueParams, ChallengeIssueResponse, ChallengeVerifyParams, ClientRequestMethod, EmptyObject, ForwardParams, - ForwardResult, FsStatRequest, FsStatResponse, GetEnvResponse, GetHostnameResponse, - HttpBodyParams, HttpHeadersParams, ServeParams, ServerLog, ServerMessageParams, SpawnParams, - SpawnResult, ToClientRequest, UnforwardParams, UpdateParams, UpdateResult, VersionResponse, + ForwardResult, FsReadDirEntry, FsReadDirResponse, FsRenameRequest, FsSinglePathRequest, + FsStatResponse, GetEnvResponse, GetHostnameResponse, HttpBodyParams, HttpHeadersParams, + ServeParams, ServerLog, ServerMessageParams, SpawnParams, SpawnResult, SysKillRequest, + SysKillResponse, ToClientRequest, UnforwardParams, UpdateParams, UpdateResult, VersionResponse, METHOD_CHALLENGE_VERIFY, }; use super::server_bridge::ServerBridge; @@ -306,10 +309,54 @@ fn make_socket_rpc( rpc.register_sync("ping", |_: EmptyObject, _| Ok(EmptyObject {})); rpc.register_sync("gethostname", |_: EmptyObject, _| handle_get_hostname()); - rpc.register_sync("fs_stat", |p: FsStatRequest, c| { + rpc.register_sync("sys_kill", |p: SysKillRequest, c| { + ensure_auth(&c.auth_state)?; + handle_sys_kill(p.pid) + }); + rpc.register_sync("fs_stat", |p: FsSinglePathRequest, c| { ensure_auth(&c.auth_state)?; handle_stat(p.path) }); + rpc.register_duplex( + "fs_read", + 1, + move |mut streams, p: FsSinglePathRequest, c| async move { + ensure_auth(&c.auth_state)?; + handle_fs_read(streams.remove(0), p.path).await + }, + ); + rpc.register_duplex( + "fs_write", + 1, + move |mut streams, p: FsSinglePathRequest, c| async move { + ensure_auth(&c.auth_state)?; + handle_fs_write(streams.remove(0), p.path).await + }, + ); + rpc.register_duplex( + "fs_connect", + 1, + move |mut streams, p: FsSinglePathRequest, c| async move { + ensure_auth(&c.auth_state)?; + handle_fs_connect(streams.remove(0), p.path).await + }, + ); + rpc.register_async("fs_rm", move |p: FsSinglePathRequest, c| async move { + ensure_auth(&c.auth_state)?; + handle_fs_remove(p.path).await + }); + rpc.register_sync("fs_mkdirp", |p: FsSinglePathRequest, c| { + ensure_auth(&c.auth_state)?; + handle_fs_mkdirp(p.path) + }); + rpc.register_sync("fs_rename", |p: FsRenameRequest, c| { + ensure_auth(&c.auth_state)?; + handle_fs_rename(p.from_path, p.to_path) + }); + rpc.register_sync("fs_readdir", |p: FsSinglePathRequest, c| { + ensure_auth(&c.auth_state)?; + handle_fs_readdir(p.path) + }); rpc.register_sync("get_env", |_: EmptyObject, c| { ensure_auth(&c.auth_state)?; handle_get_env() @@ -820,16 +867,87 @@ fn handle_stat(path: String) -> Result { .map(|m| FsStatResponse { exists: true, size: Some(m.len()), - kind: Some(match m.file_type() { - t if t.is_dir() => "dir", - t if t.is_file() => "file", - t if t.is_symlink() => "link", - _ => "unknown", - }), + kind: Some(m.file_type().into()), }) .unwrap_or_default()) } +async fn handle_fs_read(mut out: DuplexStream, path: String) -> Result { + let mut f = tokio::fs::File::open(path) + .await + .map_err(|e| wrap(e, "file not found"))?; + + tokio::io::copy(&mut f, &mut out) + .await + .map_err(|e| wrap(e, "error reading file"))?; + + Ok(EmptyObject {}) +} + +async fn handle_fs_write(mut input: DuplexStream, path: String) -> Result { + let mut f = tokio::fs::File::create(path) + .await + .map_err(|e| wrap(e, "file not found"))?; + + tokio::io::copy(&mut input, &mut f) + .await + .map_err(|e| wrap(e, "error writing file"))?; + + Ok(EmptyObject {}) +} + +async fn handle_fs_connect( + mut stream: DuplexStream, + path: String, +) -> Result { + let mut s = get_socket_rw_stream(&PathBuf::from(path)) + .await + .map_err(|e| wrap(e, "could not connect to socket"))?; + + tokio::io::copy_bidirectional(&mut stream, &mut s) + .await + .map_err(|e| wrap(e, "error copying stream data"))?; + + Ok(EmptyObject {}) +} + +async fn handle_fs_remove(path: String) -> Result { + tokio::fs::remove_dir_all(path) + .await + .map_err(|e| wrap(e, "error removing directory"))?; + Ok(EmptyObject {}) +} + +fn handle_fs_rename(from_path: String, to_path: String) -> Result { + std::fs::rename(from_path, to_path).map_err(|e| wrap(e, "error renaming"))?; + Ok(EmptyObject {}) +} + +fn handle_fs_mkdirp(path: String) -> Result { + std::fs::create_dir_all(path).map_err(|e| wrap(e, "error creating directory"))?; + Ok(EmptyObject {}) +} + +fn handle_fs_readdir(path: String) -> Result { + let mut entries = std::fs::read_dir(path).map_err(|e| wrap(e, "error listing directory"))?; + + let mut contents = Vec::new(); + while let Some(Ok(child)) = entries.next() { + contents.push(FsReadDirEntry { + name: child.file_name().to_string_lossy().into_owned(), + kind: child.file_type().ok().map(|v| v.into()), + }); + } + + Ok(FsReadDirResponse { contents }) +} + +fn handle_sys_kill(pid: u32) -> Result { + Ok(SysKillResponse { + success: kill_pid(pid), + }) +} + fn handle_get_env() -> Result { Ok(GetEnvResponse { env: std::env::vars().collect(), @@ -1037,14 +1155,7 @@ where let futs = FuturesUnordered::new(); if let (Some(mut a), Some(mut b)) = (p.stdout.take(), stdout) { - futs.push( - async move { - let r = tokio::io::copy(&mut a, &mut b).await; - // println!("copy exited with {:?}", r); - r - } - .boxed(), - ); + futs.push(async move { tokio::io::copy(&mut a, &mut b).await }.boxed()); } if let (Some(mut a), Some(mut b)) = (p.stderr.take(), stderr) { futs.push(async move { tokio::io::copy(&mut a, &mut b).await }.boxed()); diff --git a/cli/src/tunnels/protocol.rs b/cli/src/tunnels/protocol.rs index 5665714fed9..547ffba82a8 100644 --- a/cli/src/tunnels/protocol.rs +++ b/cli/src/tunnels/protocol.rs @@ -133,17 +133,80 @@ pub struct GetEnvResponse { pub os_release: String, } +/// Method: `kill`. Sends a generic, platform-specific kill command to the process. #[derive(Deserialize)] -pub struct FsStatRequest { +pub struct SysKillRequest { + pub pid: u32, +} + +#[derive(Serialize)] +pub struct SysKillResponse { + pub success: bool, +} + +/// Methods: `fs_read`/`fs_write`/`fs_rm`/`fs_mkdirp`/`fs_stat` +/// - fs_read: reads into a stream returned from the method, +/// - fs_write: writes from a stream passed to the method. +/// - fs_rm: recursively removes the file +/// - fs_mkdirp: recursively creates the directory +/// - fs_readdir: reads directory contents +/// - fs_stat: stats the given path +/// - fs_connect: connect to the given unix or named pipe socket, streaming +/// data in and out from the method's stream. +#[derive(Deserialize)] +pub struct FsSinglePathRequest { pub path: String, } +#[derive(Serialize)] +pub enum FsFileKind { + #[serde(rename = "dir")] + Directory, + #[serde(rename = "file")] + File, + #[serde(rename = "link")] + Link, +} + +impl From for FsFileKind { + fn from(kind: std::fs::FileType) -> Self { + if kind.is_dir() { + Self::Directory + } else if kind.is_file() { + Self::File + } else if kind.is_symlink() { + Self::Link + } else { + unreachable!() + } + } +} + #[derive(Serialize, Default)] pub struct FsStatResponse { pub exists: bool, pub size: Option, #[serde(rename = "type")] - pub kind: Option<&'static str>, + pub kind: Option, +} + +#[derive(Serialize)] +pub struct FsReadDirResponse { + pub contents: Vec, +} + +#[derive(Serialize)] +pub struct FsReadDirEntry { + pub name: String, + #[serde(rename = "type")] + pub kind: Option, +} + +/// Method: `fs_reaname`. Renames a file. +#[derive(Deserialize)] +pub struct FsRenameRequest { + pub from_path: String, + pub to_path: String, } #[derive(Deserialize, Debug)] diff --git a/cli/src/util/machine.rs b/cli/src/util/machine.rs index 1df4a7843ff..4c7b6729e43 100644 --- a/cli/src/util/machine.rs +++ b/cli/src/util/machine.rs @@ -29,6 +29,18 @@ pub fn process_exists(pid: u32) -> bool { sys.refresh_process(Pid::from_u32(pid)) } +pub fn kill_pid(pid: u32) -> bool { + let mut sys = System::new(); + let pid = Pid::from_u32(pid); + sys.refresh_process(pid); + + if let Some(p) = sys.process(pid) { + p.kill() + } else { + false + } +} + pub async fn wait_until_process_exits(pid: Pid, poll_ms: u64) { let mut s = System::new(); let duration = Duration::from_millis(poll_ms);