diff --git a/cli/Cargo.lock b/cli/Cargo.lock index fcbb3fee49f..f7c60045c12 100644 --- a/cli/Cargo.lock +++ b/cli/Cargo.lock @@ -502,9 +502,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.12" +version = "0.8.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edbafec5fa1f196ca66527c1b12c2ec4745ca14b50f1ad8f9f6f720b55d11fac" +checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" dependencies = [ "cfg-if", ] diff --git a/cli/src/commands/tunnels.rs b/cli/src/commands/tunnels.rs index c1b361fc81b..0cc0b8d264b 100644 --- a/cli/src/commands/tunnels.rs +++ b/cli/src/commands/tunnels.rs @@ -35,7 +35,7 @@ use crate::{ code_server::CodeServerArgs, create_service_manager, dev_tunnels::{self, DevTunnels}, - local_forwarding, legal, + legal, local_forwarding, paths::get_all_servers, protocol, serve_stream, shutdown_signal::ShutdownRequest, @@ -326,12 +326,12 @@ pub async fn kill(ctx: CommandContext) -> Result { #[derive(Serialize)] pub struct StatusOutput { - pub tunnel: Option, + pub tunnel: Option, pub service_installed: bool, } pub async fn status(ctx: CommandContext) -> Result { - let tunnel_status = do_single_rpc_call::<_, protocol::singleton::Status>( + let tunnel = do_single_rpc_call::<_, protocol::singleton::StatusWithTunnelName>( &ctx.paths.tunnel_lockfile(), ctx.log.clone(), protocol::singleton::METHOD_STATUS, @@ -347,8 +347,8 @@ pub async fn status(ctx: CommandContext) -> Result { ctx.log.result( serde_json::to_string(&StatusOutput { service_installed, - tunnel: match tunnel_status { - Ok(s) => Some(s.tunnel), + tunnel: match tunnel { + Ok(s) => Some(s), Err(CodeError::NoRunningTunnel) => None, Err(e) => return Err(e.into()), }, diff --git a/cli/src/tunnels/dev_tunnels.rs b/cli/src/tunnels/dev_tunnels.rs index 334ebc7d2bd..b77f6da5f2e 100644 --- a/cli/src/tunnels/dev_tunnels.rs +++ b/cli/src/tunnels/dev_tunnels.rs @@ -31,7 +31,7 @@ use tunnels::management::{ NO_REQUEST_OPTIONS, }; -use super::protocol::PortPrivacy; +use super::protocol::{self, PortPrivacy}; use super::wsl_detect::is_wsl_installed; static TUNNEL_COUNT_LIMIT_NAME: &str = "TunnelsPerUserPerLocation"; @@ -203,6 +203,11 @@ impl ActiveTunnel { self.get_port_format() .map(|f| f.replace(PORT_TOKEN, &port.to_string())) } + + /// Gets an object to read the current tunnel status. + pub fn status(&self) -> StatusLock { + self.manager.get_status() + } } const VSCODE_CLI_TUNNEL_TAG: &str = "vscode-server-launcher"; @@ -843,10 +848,36 @@ impl DevTunnels { } } +#[derive(Clone, Default)] +pub struct StatusLock(Arc>); + +impl StatusLock { + fn succeed(&self) { + let mut status = self.0.lock().unwrap(); + status.tunnel = protocol::singleton::TunnelState::Connected; + status.last_connected_at = Some(chrono::Utc::now()); + } + + fn fail(&self, reason: String) { + let mut status = self.0.lock().unwrap(); + if let protocol::singleton::TunnelState::Connected = status.tunnel { + status.last_disconnected_at = Some(chrono::Utc::now()); + status.tunnel = protocol::singleton::TunnelState::Disconnected; + } + status.last_fail_reason = Some(reason); + } + + pub fn read(&self) -> protocol::singleton::Status { + let status = self.0.lock().unwrap(); + status.clone() + } +} + struct ActiveTunnelManager { close_tx: Option>, endpoint_rx: watch::Receiver>>, relay: Arc>, + status: StatusLock, } impl ActiveTunnelManager { @@ -862,6 +893,9 @@ impl ActiveTunnelManager { let relay = Arc::new(tokio::sync::Mutex::new(RelayTunnelHost::new(locator, mgmt))); let relay_spawned = relay.clone(); + let status = StatusLock::default(); + + let status_spawned = status.clone(); tokio::spawn(async move { ActiveTunnelManager::spawn_tunnel( log, @@ -869,6 +903,7 @@ impl ActiveTunnelManager { close_rx, endpoint_tx, access_token, + status_spawned, ) .await; }); @@ -877,9 +912,15 @@ impl ActiveTunnelManager { endpoint_rx, relay, close_tx: Some(close_tx), + status, } } + /// Gets a copy of the current tunnel status information + pub fn get_status(&self) -> StatusLock { + self.status.clone() + } + /// Adds a port for TCP/IP forwarding. #[allow(dead_code)] // todo: port forwarding pub async fn add_port_tcp( @@ -967,12 +1008,15 @@ impl ActiveTunnelManager { mut close_rx: mpsc::Receiver<()>, endpoint_tx: watch::Sender>>, access_token_provider: impl AccessTokenProvider + 'static, + status: StatusLock, ) { let mut backoff = Backoff::new(Duration::from_secs(5), Duration::from_secs(120)); macro_rules! fail { ($e: expr, $msg: expr) => { - warning!(log, "{}: {}", $msg, $e); + let fmt = format!("{}: {}", $msg, $e); + warning!(log, &fmt); + status.fail(fmt); endpoint_tx.send(Some(Err($e))).ok(); backoff.delay().await; }; @@ -1008,6 +1052,7 @@ impl ActiveTunnelManager { }; backoff.reset(); + status.succeed(); endpoint_tx.send(Some(Ok(handle.endpoint().clone()))).ok(); tokio::select! { diff --git a/cli/src/tunnels/protocol.rs b/cli/src/tunnels/protocol.rs index 89c4e384229..316e3672ba6 100644 --- a/cli/src/tunnels/protocol.rs +++ b/cli/src/tunnels/protocol.rs @@ -249,6 +249,7 @@ pub mod forward_singleton { pub mod singleton { use crate::log; + use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; pub const METHOD_RESTART: &str = "restart"; @@ -271,17 +272,41 @@ pub mod singleton { pub message: String, } - #[derive(Serialize, Deserialize)] + #[derive(Serialize, Deserialize, Clone, Default)] + pub struct StatusWithTunnelName { + pub name: Option, + #[serde(flatten)] + pub status: Status, + } + + #[derive(Serialize, Deserialize, Clone)] pub struct Status { + pub started_at: DateTime, pub tunnel: TunnelState, + pub last_connected_at: Option>, + pub last_disconnected_at: Option>, + pub last_fail_reason: Option, + } + + impl Default for Status { + fn default() -> Self { + Self { + started_at: Utc::now(), + tunnel: TunnelState::Disconnected, + last_connected_at: None, + last_disconnected_at: None, + last_fail_reason: None, + } + } } #[derive(Deserialize, Serialize, Debug)] pub struct LogReplayFinished {} - #[derive(Deserialize, Serialize, Debug)] + #[derive(Deserialize, Serialize, Debug, Default, Clone)] pub enum TunnelState { + #[default] Disconnected, - Connected { name: String }, + Connected, } } diff --git a/cli/src/tunnels/singleton_client.rs b/cli/src/tunnels/singleton_client.rs index ef9fdf85cc0..ca29b7349f1 100644 --- a/cli/src/tunnels/singleton_client.rs +++ b/cli/src/tunnels/singleton_client.rs @@ -114,16 +114,18 @@ pub async fn start_singleton_client(args: SingletonClientArgs) -> bool { CONTROL_INSTRUCTIONS_COMMON }); - let res = c.caller.call::<_, _, protocol::singleton::Status>( - protocol::singleton::METHOD_STATUS, - protocol::EmptyObject {}, - ); + let res = c + .caller + .call::<_, _, protocol::singleton::StatusWithTunnelName>( + protocol::singleton::METHOD_STATUS, + protocol::EmptyObject {}, + ); // we want to ensure the "listening" string always gets printed for // consumers (i.e. VS Code). Ask for it. If the tunnel is not currently // connected though, it will be soon, and that'll be in the log replays. if let Ok(Ok(s)) = res.await { - if let protocol::singleton::TunnelState::Connected { name } = s.tunnel { + if let Some(name) = s.name { print_listening(&c.log, &name); } } diff --git a/cli/src/tunnels/singleton_server.rs b/cli/src/tunnels/singleton_server.rs index 77cc701ca46..6189c06a493 100644 --- a/cli/src/tunnels/singleton_server.rs +++ b/cli/src/tunnels/singleton_server.rs @@ -11,7 +11,7 @@ use std::{ use super::{ code_server::CodeServerArgs, control_server::ServerTermination, - dev_tunnels::ActiveTunnel, + dev_tunnels::{ActiveTunnel, StatusLock}, protocol, shutdown_signal::{ShutdownRequest, ShutdownSignal}, }; @@ -48,18 +48,28 @@ pub struct SingletonServerArgs<'a> { pub log_broadcast: &'a BroadcastLogSink, } +struct StatusInfo { + name: String, + lock: StatusLock, +} + #[derive(Clone)] struct SingletonServerContext { log: log::Logger, shutdown_tx: broadcast::Sender, broadcast_tx: broadcast::Sender>, - current_name: Arc>>, + // ugly: a lock in a lock. current_status needs to be provided only + // after we set up the tunnel, however the tunnel is created after the + // singleton server starts to avoid a gap in singleton availability. + // However, this should be safe, as the lock is only used for immediate + // data reads (in the `status` method). + current_status: Arc>>, } pub struct RpcServer { fut: JoinHandle>, shutdown_broadcast: broadcast::Sender, - current_name: Arc>>, + current_status: Arc>>, } pub fn make_singleton_server( @@ -71,12 +81,12 @@ pub fn make_singleton_server( let (shutdown_broadcast, _) = broadcast::channel(4); let rpc = new_json_rpc(); - let current_name = Arc::new(Mutex::new(None)); + let current_status = Arc::new(Mutex::default()); let mut rpc = rpc.methods(SingletonServerContext { log: log.clone(), shutdown_tx: shutdown_broadcast.clone(), broadcast_tx: log_broadcast.get_brocaster(), - current_name: current_name.clone(), + current_status: current_status.clone(), }); rpc.register_sync( @@ -91,12 +101,15 @@ pub fn make_singleton_server( rpc.register_sync( protocol::singleton::METHOD_STATUS, |_: protocol::EmptyObject, c| { - Ok(protocol::singleton::Status { - tunnel: match c.current_name.lock().unwrap().clone() { - Some(name) => protocol::singleton::TunnelState::Connected { name }, - None => protocol::singleton::TunnelState::Disconnected, - }, - }) + Ok(c.current_status + .lock() + .unwrap() + .as_ref() + .map(|s| protocol::singleton::StatusWithTunnelName { + name: Some(s.name.clone()), + status: s.lock.read(), + }) + .unwrap_or_default()) }, ); @@ -124,7 +137,7 @@ pub fn make_singleton_server( }); RpcServer { shutdown_broadcast, - current_name, + current_status, fut, } } @@ -139,8 +152,11 @@ pub async fn start_singleton_server<'a>( { print_listening(&args.log, &args.tunnel.name); - let mut name = args.server.current_name.lock().unwrap(); - *name = Some(args.tunnel.name.clone()) + let mut status = args.server.current_status.lock().unwrap(); + *status = Some(StatusInfo { + name: args.tunnel.name.clone(), + lock: args.tunnel.status(), + }) } let serve_fut = super::serve(