diff --git a/cli/src/commands/agent_host.rs b/cli/src/commands/agent_host.rs index 132212408e0..1535142a4d1 100644 --- a/cli/src/commands/agent_host.rs +++ b/cli/src/commands/agent_host.rs @@ -9,24 +9,28 @@ use std::io::{Read, Write}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::path::{Path, PathBuf}; use std::sync::Arc; - -use std::time::Duration; +use std::time::{Duration, Instant}; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response, Server}; use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::sync::Mutex; use crate::async_pipe::{get_socket_name, get_socket_rw_stream, AsyncPipe}; use crate::constants::VSCODE_CLI_QUALITY; +use crate::download_cache::DownloadCache; use crate::log; use crate::options::Quality; -use crate::tunnels::paths::{get_server_folder_name, SERVER_FOLDER_NAME}; +use crate::tunnels::paths::SERVER_FOLDER_NAME; use crate::tunnels::shutdown_signal::ShutdownRequest; -use crate::update_service::{unzip_downloaded_release, Platform, TargetKind, UpdateService}; +use crate::update_service::{ + unzip_downloaded_release, Platform, Release, TargetKind, UpdateService, +}; use crate::util::command::new_script_command; use crate::util::errors::AnyError; use crate::util::http::{self, ReqwestSimpleHttp}; use crate::util::io::SilentCopyProgress; +use crate::util::sync::{new_barrier, Barrier, BarrierOpener}; use crate::{ tunnels::legal, util::{errors::CodeError, prereqs::PreReqChecker}, @@ -34,9 +38,18 @@ use crate::{ use super::{args::AgentHostArgs, CommandContext}; -/// Runs a local agent host server. Downloads the latest VS Code server, -/// starts it with `--agent-host-path`, and proxies connections from a -/// local TCP port to the server's agent host socket. +/// How often to check for server updates. +const UPDATE_CHECK_INTERVAL: Duration = Duration::from_secs(24 * 60 * 60); +/// How often to re-check whether the server has exited when an update is pending. +const UPDATE_POLL_INTERVAL: Duration = Duration::from_secs(10 * 60); +/// How long to wait for the server to signal readiness. +const STARTUP_TIMEOUT: Duration = Duration::from_secs(30); + +/// Runs a local agent host server. Downloads the latest VS Code server on +/// demand, starts it with `--enable-remote-auto-shutdown`, and proxies +/// WebSocket connections from a local TCP port to the server's agent host +/// socket. The server auto-shuts down when idle; the CLI checks for updates +/// in the background and starts the latest version on the next connection. pub async fn agent_host(ctx: CommandContext, mut args: AgentHostArgs) -> Result { legal::require_consent(&ctx.paths, args.accept_server_license_terms)?; @@ -56,158 +69,20 @@ pub async fn agent_host(ctx: CommandContext, mut args: AgentHostArgs) -> Result< } } - let quality = VSCODE_CLI_QUALITY - .ok_or_else(|| CodeError::UpdatesNotConfigured("no configured quality")) - .and_then(|q| { - Quality::try_from(q).map_err(|_| CodeError::UpdatesNotConfigured("unknown quality")) - })?; + let manager = AgentHostManager::new(&ctx, platform, args.clone())?; - let update_service = UpdateService::new( - ctx.log.clone(), - Arc::new(ReqwestSimpleHttp::with_client(ctx.http.clone())), - ); - - // Download the latest headless server - let release = update_service - .get_latest_commit(platform, TargetKind::Server, quality) - .await?; - info!(ctx.log, "Resolved server version: {}", release); - - let name = get_server_folder_name(quality, &release.commit); - let server_dir = if let Some(dir) = ctx.paths.server_cache.exists(&name) { - info!(ctx.log, "Server already downloaded"); - dir - } else { - info!(ctx.log, "Downloading server {}", release.commit); - let release_for_download = release.clone(); - let log_for_download = ctx.log.clone(); - ctx.paths - .server_cache - .create(name, |target_dir| async move { - let tmpdir = tempfile::tempdir().unwrap(); - let response = update_service - .get_download_stream(&release_for_download) - .await?; - let name = response.url_path_basename().unwrap(); - let archive_path = tmpdir.path().join(name); - http::download_into_file( - &archive_path, - log_for_download.get_download_logger("Downloading server:"), - response, - ) - .await?; - unzip_downloaded_release(&archive_path, &target_dir, SilentCopyProgress())?; - Ok(()) - }) - .await? - }; - - // Start the server with --agent-host-path pointing to a local socket - // allow using the OSS server in development via an override - let executable = if let Some(p) = option_env!("VSCODE_CLI_OVERRIDE_SERVER_PATH") { - PathBuf::from(p) - } else { - server_dir - .join(SERVER_FOLDER_NAME) - .join("bin") - .join(release.quality.server_entrypoint()) - }; - let agent_host_socket = get_socket_name(); - - let mut cmd = new_script_command(&executable); - cmd.stdin(std::process::Stdio::null()); - cmd.stderr(std::process::Stdio::piped()); - cmd.stdout(std::process::Stdio::piped()); - cmd.arg("--agent-host-path"); - cmd.arg(&agent_host_socket); - cmd.args(["--accept-server-license-terms"]); - - if let Some(a) = &args.server_data_dir { - cmd.arg("--server-data-dir"); - cmd.arg(a); - } - if args.without_connection_token { - cmd.arg("--without-connection-token"); - } - if let Some(ct) = &args.connection_token_file { - cmd.arg("--connection-token-file"); - cmd.arg(ct); + // Eagerly resolve the latest version so the first connection is fast + if let Err(e) = manager.get_latest_release().await { + warning!(ctx.log, "Error resolving initial server version: {}", e); } - cmd.env_remove("VSCODE_DEV"); - - let mut child = cmd.spawn().map_err(|e| CodeError::CommandFailed { - command: executable.to_string_lossy().to_string(), - code: -1, - output: e.to_string(), - })?; - - let (mut stdout, mut stderr) = ( - BufReader::new(child.stdout.take().unwrap()).lines(), - BufReader::new(child.stderr.take().unwrap()).lines(), - ); - - // Wait for the server to signal readiness. Buffer stderr at debug level; - // if the server fails to start within 30s, dump everything as a warning. - let startup_timeout = Duration::from_secs(30); - let mut stderr_buffer: Vec = Vec::new(); - let ready = tokio::time::timeout(startup_timeout, async { - loop { - tokio::select! { - Ok(Some(l)) = stdout.next_line() => { - debug!(ctx.log, "[server stdout]: {}", l); - if l.contains("Agent host server listening on") { - return; - } - } - Ok(Some(l)) = stderr.next_line() => { - debug!(ctx.log, "[server stderr]: {}", l); - stderr_buffer.push(l); - } - else => break, - } - } - }) - .await; - - if ready.is_err() { - warning!( - ctx.log, - "Server did not become ready within {}s. It may still be starting up.", - startup_timeout.as_secs() - ); - for line in &stderr_buffer { - warning!(ctx.log, "[server stderr]: {}", line); - } - } - - // Continue reading server output in the background - let log_clone = ctx.log.clone(); + // Start background update checker + let manager_for_updates = manager.clone(); tokio::spawn(async move { - loop { - tokio::select! { - Ok(Some(l)) = stdout.next_line() => { - debug!(log_clone, "[server stdout]: {}", l); - } - Ok(Some(l)) = stderr.next_line() => { - debug!(log_clone, "[server stderr]: {}", l); - } - else => break, - } - } + manager_for_updates.run_update_loop().await; }); - // Start HTTP/WebSocket proxy - let agent_socket = agent_host_socket.clone(); - let make_svc = move || { - let socket_path = agent_socket.clone(); - let service = service_fn(move |req| { - let socket_path = socket_path.clone(); - async move { handle_request(socket_path, req).await } - }); - async move { Ok::<_, Infallible>(service) } - }; - + // Bind the HTTP/WebSocket proxy let mut shutdown = ShutdownRequest::create_rx([ShutdownRequest::CtrlC]); let addr: SocketAddr = match &args.host { @@ -223,6 +98,16 @@ pub async fn agent_host(ctx: CommandContext, mut args: AgentHostArgs) -> Result< } ctx.log.result(format!("Listening on {url}")); + let manager_for_svc = manager.clone(); + let make_svc = move || { + let mgr = manager_for_svc.clone(); + let service = service_fn(move |req| { + let mgr = mgr.clone(); + async move { handle_request(mgr, req).await } + }); + async move { Ok::<_, Infallible>(service) } + }; + let server_future = builder .serve(make_service_fn(|_| make_svc())) .with_graceful_shutdown(async { @@ -230,17 +115,431 @@ pub async fn agent_host(ctx: CommandContext, mut args: AgentHostArgs) -> Result< }); let r = server_future.await; - let _ = child.kill().await; + manager.kill_running_server().await; r.map_err(CodeError::CouldNotListenOnInterface)?; Ok(0) } +// ---- AgentHostManager ------------------------------------------------------- + +/// State of the running VS Code server process. +struct RunningServer { + child: tokio::process::Child, + commit: String, +} + +/// Manages the VS Code server lifecycle: on-demand start, auto-restart +/// after idle shutdown, and background update checking. +struct AgentHostManager { + log: log::Logger, + args: AgentHostArgs, + platform: Platform, + cache: DownloadCache, + update_service: UpdateService, + /// The latest known release, with the time it was checked. + latest_release: Mutex>, + /// The currently running server, if any. + running: Mutex>, + /// Barrier that opens when a server is ready (socket path available). + /// Reset each time a new server is started. + ready: Mutex>>>, +} + +impl AgentHostManager { + fn new( + ctx: &CommandContext, + platform: Platform, + args: AgentHostArgs, + ) -> Result, CodeError> { + // Seed latest_release from cache if available + let cache = ctx.paths.server_cache.clone(); + Ok(Arc::new(Self { + log: ctx.log.clone(), + args, + platform, + cache, + update_service: UpdateService::new( + ctx.log.clone(), + Arc::new(ReqwestSimpleHttp::with_client(ctx.http.clone())), + ), + latest_release: Mutex::new(None), + running: Mutex::new(None), + ready: Mutex::new(None), + })) + } + + /// Returns the socket path to a running server, starting one if needed. + async fn ensure_server(self: &Arc) -> Result { + // Fast path: if we already have a barrier, wait on it + { + let ready = self.ready.lock().await; + if let Some(barrier) = &*ready { + if barrier.is_open() { + // Check if the process is still running + let running = self.running.lock().await; + if running.is_some() { + return barrier + .clone() + .wait() + .await + .unwrap() + .map_err(CodeError::ServerDownloadError); + } + } else { + // Still starting up, wait for it + let mut barrier = barrier.clone(); + drop(ready); + return barrier + .wait() + .await + .unwrap() + .map_err(CodeError::ServerDownloadError); + } + } + } + + // Need to start a new server + self.start_server().await + } + + /// Starts the server with the latest already-downloaded version. + /// Only blocks on a network fetch if no version has been downloaded yet. + async fn start_server(self: &Arc) -> Result { + let (release, server_dir) = self.get_cached_or_download().await?; + + let (mut barrier, opener) = new_barrier::>(); + { + let mut ready = self.ready.lock().await; + *ready = Some(barrier.clone()); + } + + let self_clone = self.clone(); + let release_clone = release.clone(); + tokio::spawn(async move { + self_clone + .run_server(release_clone, server_dir, opener) + .await; + }); + + barrier + .wait() + .await + .unwrap() + .map_err(CodeError::ServerDownloadError) + } + + /// Runs the server process to completion, handling readiness signaling. + async fn run_server( + self: &Arc, + release: Release, + server_dir: PathBuf, + opener: BarrierOpener>, + ) { + let executable = if let Some(p) = option_env!("VSCODE_CLI_OVERRIDE_SERVER_PATH") { + PathBuf::from(p) + } else { + server_dir + .join(SERVER_FOLDER_NAME) + .join("bin") + .join(release.quality.server_entrypoint()) + }; + + let agent_host_socket = get_socket_name(); + let mut cmd = new_script_command(&executable); + cmd.stdin(std::process::Stdio::null()); + cmd.stderr(std::process::Stdio::piped()); + cmd.stdout(std::process::Stdio::piped()); + cmd.arg("--agent-host-path"); + cmd.arg(&agent_host_socket); + cmd.args([ + "--accept-server-license-terms", + "--enable-remote-auto-shutdown", + ]); + + if let Some(a) = &self.args.server_data_dir { + cmd.arg("--server-data-dir"); + cmd.arg(a); + } + if self.args.without_connection_token { + cmd.arg("--without-connection-token"); + } + if let Some(ct) = &self.args.connection_token_file { + cmd.arg("--connection-token-file"); + cmd.arg(ct); + } + cmd.env_remove("VSCODE_DEV"); + + let mut child = match cmd.spawn() { + Ok(c) => c, + Err(e) => { + opener.open(Err(e.to_string())); + return; + } + }; + + let commit_prefix = &release.commit[..release.commit.len().min(7)]; + let (mut stdout, mut stderr) = ( + BufReader::new(child.stdout.take().unwrap()).lines(), + BufReader::new(child.stderr.take().unwrap()).lines(), + ); + + // Wait for readiness with a timeout + let mut opener = Some(opener); + let socket_path = agent_host_socket.clone(); + let startup_deadline = tokio::time::sleep(STARTUP_TIMEOUT); + tokio::pin!(startup_deadline); + + let mut ready = false; + loop { + tokio::select! { + Ok(Some(l)) = stdout.next_line() => { + debug!(self.log, "[{} stdout]: {}", commit_prefix, l); + if !ready && l.contains("Agent host server listening on") { + ready = true; + if let Some(o) = opener.take() { + o.open(Ok(socket_path.clone())); + } + } + } + Ok(Some(l)) = stderr.next_line() => { + debug!(self.log, "[{} stderr]: {}", commit_prefix, l); + } + _ = &mut startup_deadline, if !ready => { + warning!(self.log, "[{}]: Server did not become ready within {}s", commit_prefix, STARTUP_TIMEOUT.as_secs()); + // Don't fail — the server may still start up, just slowly + if let Some(o) = opener.take() { + o.open(Ok(socket_path.clone())); + } + ready = true; + } + e = child.wait() => { + info!(self.log, "[{} process]: exited: {:?}", commit_prefix, e); + if let Some(o) = opener.take() { + o.open(Err(format!("Server exited before ready: {:?}", e))); + } + break; + } + } + + if ready { + break; + } + } + + // Store the running server state + { + let mut running = self.running.lock().await; + *running = Some(RunningServer { + child, + commit: release.commit.clone(), + }); + } + + if !ready { + return; + } + + info!(self.log, "[{}]: Server ready", commit_prefix); + + // Continue reading output until the process exits + let log = self.log.clone(); + let commit_prefix = commit_prefix.to_string(); + let self_clone = self.clone(); + tokio::spawn(async move { + loop { + tokio::select! { + Ok(Some(l)) = stdout.next_line() => { + debug!(log, "[{} stdout]: {}", commit_prefix, l); + } + Ok(Some(l)) = stderr.next_line() => { + debug!(log, "[{} stderr]: {}", commit_prefix, l); + } + else => break, + } + } + + // Server process has exited (auto-shutdown or crash) + info!(log, "[{}]: Server process ended", commit_prefix); + let mut running = self_clone.running.lock().await; + if let Some(r) = &*running { + if r.commit == commit_prefix || r.commit.starts_with(&commit_prefix) { + // Only clear if it's still our server + } + } + *running = None; + }); + } + + /// Returns a release and its local directory. Prefers the latest known + /// release if it has already been downloaded; otherwise falls back to any + /// cached version. Only fetches from the network and downloads if + /// nothing is cached at all. + async fn get_cached_or_download(&self) -> Result<(Release, PathBuf), CodeError> { + // Best case: the latest known release is already downloaded + if let Some((_, release)) = &*self.latest_release.lock().await { + if let Some(dir) = self.cache.exists(&release.commit) { + return Ok((release.clone(), dir)); + } + } + + let quality = VSCODE_CLI_QUALITY + .ok_or_else(|| CodeError::UpdatesNotConfigured("no configured quality")) + .and_then(|q| { + Quality::try_from(q).map_err(|_| CodeError::UpdatesNotConfigured("unknown quality")) + })?; + + // Fall back to any cached version (still instant, just not the newest) + for commit in self.cache.get() { + if let Some(dir) = self.cache.exists(&commit) { + let release = Release { + name: String::new(), + commit, + platform: self.platform, + target: TargetKind::Server, + quality, + }; + return Ok((release, dir)); + } + } + + // Nothing cached — must fetch and download (blocks the first connection) + info!(self.log, "No cached server version, downloading latest..."); + let release = self.get_latest_release().await?; + let dir = self.ensure_downloaded(&release).await?; + Ok((release, dir)) + } + + /// Ensures the release is downloaded, returning the server directory. + async fn ensure_downloaded(&self, release: &Release) -> Result { + if let Some(dir) = self.cache.exists(&release.commit) { + return Ok(dir); + } + + info!(self.log, "Downloading server {}", release.commit); + let release = release.clone(); + let log = self.log.clone(); + let update_service = self.update_service.clone(); + let commit = release.commit.clone(); + self.cache + .create(&commit, |target_dir| async move { + let tmpdir = tempfile::tempdir().unwrap(); + let response = update_service.get_download_stream(&release).await?; + let name = response.url_path_basename().unwrap(); + let archive_path = tmpdir.path().join(name); + http::download_into_file( + &archive_path, + log.get_download_logger("Downloading server:"), + response, + ) + .await?; + unzip_downloaded_release(&archive_path, &target_dir, SilentCopyProgress())?; + Ok(()) + }) + .await + .map_err(|e| CodeError::ServerDownloadError(e.to_string())) + } + + /// Gets the latest release, caching the result. + async fn get_latest_release(&self) -> Result { + let mut latest = self.latest_release.lock().await; + let now = Instant::now(); + + let quality = VSCODE_CLI_QUALITY + .ok_or_else(|| CodeError::UpdatesNotConfigured("no configured quality")) + .and_then(|q| { + Quality::try_from(q).map_err(|_| CodeError::UpdatesNotConfigured("unknown quality")) + })?; + + let result = self + .update_service + .get_latest_commit(self.platform, TargetKind::Server, quality) + .await + .map_err(|e| CodeError::UpdateCheckFailed(e.to_string())); + + // If the update service is unavailable, fall back to the cached version + if let (Err(e), Some((_, previous))) = (&result, latest.clone()) { + warning!(self.log, "Error checking for updates, using cached: {}", e); + *latest = Some((now, previous.clone())); + return Ok(previous); + } + + let release = result?; + debug!(self.log, "Resolved server version: {}", release); + *latest = Some((now, release.clone())); + Ok(release) + } + + /// Background loop: checks for updates periodically and pre-downloads + /// new versions when the server is idle. + async fn run_update_loop(self: Arc) { + let mut interval = tokio::time::interval(UPDATE_CHECK_INTERVAL); + interval.tick().await; // skip the immediate first tick + + loop { + interval.tick().await; + + let new_release = match self.get_latest_release().await { + Ok(r) => r, + Err(e) => { + warning!(self.log, "Update check failed: {}", e); + continue; + } + }; + + // Check if we already have this version + if self.cache.exists(&new_release.commit).is_some() { + continue; + } + + info!(self.log, "New server version available: {}", new_release); + + // Wait until the server is not running before downloading + loop { + { + let running = self.running.lock().await; + if running.is_none() { + break; + } + } + debug!(self.log, "Server still running, waiting before updating..."); + tokio::time::sleep(UPDATE_POLL_INTERVAL).await; + } + + // Download the new version + match self.ensure_downloaded(&new_release).await { + Ok(_) => info!(self.log, "Updated server to {}", new_release), + Err(e) => warning!(self.log, "Failed to download update: {}", e), + } + } + } + + /// Kills the currently running server, if any. + async fn kill_running_server(&self) { + let mut running = self.running.lock().await; + if let Some(mut server) = running.take() { + let _ = server.child.kill().await; + } + } +} + +// ---- HTTP/WebSocket proxy --------------------------------------------------- + /// Proxies an incoming HTTP/WebSocket request to the agent host's Unix socket. async fn handle_request( - socket_path: PathBuf, + manager: Arc, req: Request, ) -> Result, Infallible> { + let socket_path = match manager.ensure_server().await { + Ok(p) => p, + Err(e) => { + return Ok(Response::builder() + .status(503) + .body(Body::from(format!("Error starting agent host: {e:?}"))) + .unwrap()); + } + }; + let is_upgrade = req.headers().contains_key(hyper::header::UPGRADE); let rw = match get_socket_rw_stream(&socket_path).await { @@ -351,3 +650,48 @@ fn mint_connection_token(path: &Path, prefer_token: Option) -> std::io:: f.write_all(prefer_token.as_bytes())?; Ok(prefer_token) } + +#[cfg(test)] +mod tests { + use super::*; + use std::fs; + + #[test] + fn mint_connection_token_generates_and_persists() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("token"); + + // First call with no preference generates a UUID and persists it + let token1 = mint_connection_token(&path, None).unwrap(); + assert!(!token1.is_empty()); + assert_eq!(fs::read_to_string(&path).unwrap(), token1); + + // Second call with no preference reads the existing token + let token2 = mint_connection_token(&path, None).unwrap(); + assert_eq!(token1, token2); + } + + #[test] + fn mint_connection_token_respects_preferred() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("token"); + + // Providing a preferred token writes it to the file + let token = mint_connection_token(&path, Some("my-token".to_string())).unwrap(); + assert_eq!(token, "my-token"); + assert_eq!(fs::read_to_string(&path).unwrap(), "my-token"); + } + + #[test] + fn mint_connection_token_preferred_overwrites_existing() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("token"); + + mint_connection_token(&path, None).unwrap(); + + // Providing a preference overwrites any existing token + let token = mint_connection_token(&path, Some("override".to_string())).unwrap(); + assert_eq!(token, "override"); + assert_eq!(fs::read_to_string(&path).unwrap(), "override"); + } +}