/*--------------------------------------------------------------------------------------------- * Copyright (c) Microsoft Corporation. All rights reserved. * Licensed under the MIT License. See License.txt in the project root for license information. *--------------------------------------------------------------------------------------------*/ use super::paths::{InstalledServer, ServerPaths}; use crate::async_pipe::get_socket_name; use crate::constants::{ APPLICATION_NAME, EDITOR_WEB_URL, QUALITYLESS_PRODUCT_NAME, QUALITYLESS_SERVER_NAME, }; use crate::download_cache::DownloadCache; use crate::options::{Quality, TelemetryLevel}; use crate::state::LauncherPaths; use crate::tunnels::paths::{get_server_folder_name, SERVER_FOLDER_NAME}; use crate::update_service::{ unzip_downloaded_release, Platform, Release, TargetKind, UpdateService, }; use crate::util::command::{ capture_command, capture_command_and_check_status, check_output_status, kill_tree, new_script_command, }; use crate::util::errors::{wrap, AnyError, CodeError, ExtensionInstallFailed, WrappedError}; use crate::util::http::{self, BoxedHttp}; use crate::util::io::SilentCopyProgress; use crate::util::machine::process_exists; use crate::util::prereqs::skip_requirements_check; use crate::{debug, info, log, spanf, trace, warning}; use lazy_static::lazy_static; use opentelemetry::KeyValue; use regex::Regex; use serde::Deserialize; use std::fs; use std::fs::File; use std::io::Write; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; use tokio::fs::remove_file; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::{Child, Command}; use tokio::sync::oneshot::Receiver; use tokio::time::{interval, timeout}; lazy_static! { static ref LISTENING_PORT_RE: Regex = Regex::new(r"Extension host agent listening on (.+)").unwrap(); static ref WEB_UI_RE: Regex = Regex::new(r"Web UI available at (.+)").unwrap(); } #[derive(Clone, Debug, Default)] pub struct CodeServerArgs { pub host: Option, pub port: Option, pub socket_path: Option, // common argument pub telemetry_level: Option, pub log: Option, pub accept_server_license_terms: bool, pub verbose: bool, pub server_data_dir: Option, pub extensions_dir: Option, // extension management pub install_extensions: Vec, pub uninstall_extensions: Vec, pub update_extensions: bool, pub list_extensions: bool, pub show_versions: bool, pub category: Option, pub pre_release: bool, pub donot_include_pack_and_dependencies: bool, pub force: bool, pub start_server: bool, // connection tokens pub connection_token: Option, pub connection_token_file: Option, pub without_connection_token: bool, // reconnection pub reconnection_grace_time: Option, } impl CodeServerArgs { pub fn log_level(&self) -> log::Level { if self.verbose { log::Level::Trace } else { self.log.unwrap_or(log::Level::Info) } } pub fn telemetry_disabled(&self) -> bool { self.telemetry_level == Some(TelemetryLevel::Off) } pub fn command_arguments(&self) -> Vec { let mut args = Vec::new(); if let Some(i) = &self.socket_path { args.push(format!("--socket-path={i}")); } else { if let Some(i) = &self.host { args.push(format!("--host={i}")); } if let Some(i) = &self.port { args.push(format!("--port={i}")); } } if let Some(i) = &self.connection_token { args.push(format!("--connection-token={i}")); } if let Some(i) = &self.connection_token_file { args.push(format!("--connection-token-file={i}")); } if self.without_connection_token { args.push(String::from("--without-connection-token")); } if self.accept_server_license_terms { args.push(String::from("--accept-server-license-terms")); } if let Some(i) = self.telemetry_level { args.push(format!("--telemetry-level={i}")); } if let Some(i) = self.log { args.push(format!("--log={i}")); } if let Some(t) = self.reconnection_grace_time { args.push(format!("--reconnection-grace-time={t}")); } for extension in &self.install_extensions { args.push(format!("--install-extension={extension}")); } if !&self.install_extensions.is_empty() { if self.pre_release { args.push(String::from("--pre-release")); } if self.force { args.push(String::from("--force")); } } for extension in &self.uninstall_extensions { args.push(format!("--uninstall-extension={extension}")); } if self.update_extensions { args.push(String::from("--update-extensions")); } if self.list_extensions { args.push(String::from("--list-extensions")); if self.show_versions { args.push(String::from("--show-versions")); } if let Some(i) = &self.category { args.push(format!("--category={i}")); } } if let Some(d) = &self.server_data_dir { args.push(format!("--server-data-dir={d}")); } if let Some(d) = &self.extensions_dir { args.push(format!("--extensions-dir={d}")); } if self.start_server { args.push(String::from("--start-server")); } args } } /// Base server params that can be `resolve()`d to a `ResolvedServerParams`. /// Doing so fetches additional information like a commit ID if previously /// unspecified. pub struct ServerParamsRaw { pub commit_id: Option, pub quality: Quality, pub code_server_args: CodeServerArgs, pub headless: bool, pub platform: Platform, } /// Server params that can be used to start a VS Code server. pub struct ResolvedServerParams { pub release: Release, pub code_server_args: CodeServerArgs, } impl ResolvedServerParams { fn as_installed_server(&self) -> InstalledServer { InstalledServer { commit: self.release.commit.clone(), quality: self.release.quality, headless: self.release.target == TargetKind::Server, } } } impl ServerParamsRaw { pub async fn resolve( self, log: &log::Logger, http: BoxedHttp, ) -> Result { Ok(ResolvedServerParams { release: self.get_or_fetch_commit_id(log, http).await?, code_server_args: self.code_server_args, }) } async fn get_or_fetch_commit_id( &self, log: &log::Logger, http: BoxedHttp, ) -> Result { let target = match self.headless { true => TargetKind::Server, false => TargetKind::Web, }; if let Some(c) = &self.commit_id { return Ok(Release { commit: c.clone(), quality: self.quality, target, name: String::new(), platform: self.platform, }); } UpdateService::new(log.clone(), http) .get_latest_commit(self.platform, target, self.quality) .await } } #[derive(Deserialize)] #[serde(rename_all = "camelCase")] #[allow(dead_code)] struct UpdateServerVersion { pub name: String, pub version: String, pub product_version: String, pub timestamp: i64, } /// Code server listening on a port address. #[derive(Clone)] pub struct SocketCodeServer { pub commit_id: String, pub socket: PathBuf, pub origin: Arc, } /// Code server listening on a socket address. #[derive(Clone)] pub struct PortCodeServer { pub commit_id: String, pub port: u16, pub origin: Arc, } /// A server listening on any address/location. pub enum AnyCodeServer { Socket(SocketCodeServer), Port(PortCodeServer), } pub enum CodeServerOrigin { /// A new code server, that opens the barrier when it exits. New(Box), /// An existing code server with a PID. Existing(u32), } impl CodeServerOrigin { pub async fn wait_for_exit(&mut self) { match self { CodeServerOrigin::New(child) => { child.wait().await.ok(); } CodeServerOrigin::Existing(pid) => { let mut interval = interval(Duration::from_secs(30)); while process_exists(*pid) { interval.tick().await; } } } } pub async fn kill(&mut self) { match self { CodeServerOrigin::New(child) => { child.kill().await.ok(); } CodeServerOrigin::Existing(pid) => { kill_tree(*pid).await.ok(); } } } } /// Ensures the given list of extensions are installed on the running server. async fn do_extension_install_on_running_server( start_script_path: &Path, extensions: &[String], log: &log::Logger, ) -> Result<(), AnyError> { if extensions.is_empty() { return Ok(()); } debug!(log, "Installing extensions..."); let command = format!( "{} {}", start_script_path.display(), extensions .iter() .map(|s| get_extensions_flag(s)) .collect::>() .join(" ") ); let result = capture_command("bash", &["-c", &command]).await?; if !result.status.success() { Err(AnyError::from(ExtensionInstallFailed( String::from_utf8_lossy(&result.stderr).to_string(), ))) } else { Ok(()) } } pub struct ServerBuilder<'a> { logger: &'a log::Logger, server_params: &'a ResolvedServerParams, launcher_paths: &'a LauncherPaths, server_paths: ServerPaths, http: BoxedHttp, } impl<'a> ServerBuilder<'a> { pub fn new( logger: &'a log::Logger, server_params: &'a ResolvedServerParams, launcher_paths: &'a LauncherPaths, http: BoxedHttp, ) -> Self { Self { logger, server_params, launcher_paths, server_paths: server_params .as_installed_server() .server_paths(launcher_paths), http, } } /// Gets any already-running server from this directory. pub async fn get_running(&self) -> Result, AnyError> { info!( self.logger, "Checking {} and {} for a running server...", self.server_paths.logfile.display(), self.server_paths.pidfile.display() ); let pid = match self.server_paths.get_running_pid() { Some(pid) => pid, None => return Ok(None), }; info!(self.logger, "Found running server (pid={})", pid); if !Path::new(&self.server_paths.logfile).exists() { warning!(self.logger, "{} Server is running but its logfile is missing. Don't delete the {} Server manually, run the command '{} prune'.", QUALITYLESS_PRODUCT_NAME, QUALITYLESS_PRODUCT_NAME, APPLICATION_NAME); return Ok(None); } do_extension_install_on_running_server( &self.server_paths.executable, &self.server_params.code_server_args.install_extensions, self.logger, ) .await?; let origin = Arc::new(CodeServerOrigin::Existing(pid)); let contents = fs::read_to_string(&self.server_paths.logfile) .expect("Something went wrong reading log file"); if let Some(port) = parse_port_from(&contents) { Ok(Some(AnyCodeServer::Port(PortCodeServer { commit_id: self.server_params.release.commit.to_owned(), port, origin, }))) } else if let Some(socket) = parse_socket_from(&contents) { Ok(Some(AnyCodeServer::Socket(SocketCodeServer { commit_id: self.server_params.release.commit.to_owned(), socket, origin, }))) } else { Ok(None) } } /// Removes a cached server. pub async fn evict(&self) -> Result<(), WrappedError> { let name = get_server_folder_name( self.server_params.release.quality, &self.server_params.release.commit, ); self.launcher_paths.server_cache.delete(&name) } /// Ensures the server is set up in the configured directory. pub async fn setup(&self) -> Result<(), AnyError> { debug!( self.logger, "Installing and setting up {}...", QUALITYLESS_SERVER_NAME ); let update_service = UpdateService::new(self.logger.clone(), self.http.clone()); let name = get_server_folder_name( self.server_params.release.quality, &self.server_params.release.commit, ); let result = self .launcher_paths .server_cache .create(name, |target_dir| async move { let tmpdir = tempfile::tempdir().map_err(|e| wrap(e, "error creating temp download dir"))?; let response = update_service .get_download_stream(&self.server_params.release) .await?; let archive_path = tmpdir.path().join(response.url_path_basename().unwrap()); info!( self.logger, "Downloading {} server -> {}", QUALITYLESS_PRODUCT_NAME, archive_path.display() ); http::download_into_file( &archive_path, self.logger.get_download_logger("server download progress:"), response, ) .await?; let server_dir = target_dir.join(SERVER_FOLDER_NAME); unzip_downloaded_release( &archive_path, &server_dir, self.logger.get_download_logger("server inflate progress:"), )?; if !skip_requirements_check().await { let output = capture_command_and_check_status( server_dir .join("bin") .join(self.server_params.release.quality.server_entrypoint()), &["--version"], ) .await .map_err(|e| wrap(e, "error checking server integrity"))?; trace!( self.logger, "Server integrity verified, version: {}", String::from_utf8_lossy(&output.stdout).replace('\n', " / ") ); } else { info!(self.logger, "Skipping server integrity check"); } Ok(()) }) .await; if let Err(e) = result { error!(self.logger, "Error installing server: {}", e); return Err(e); } debug!(self.logger, "Server setup complete"); Ok(()) } pub async fn listen_on_port(&self, port: u16) -> Result { let mut cmd = self.get_base_command(); cmd.arg("--start-server") .arg("--enable-remote-auto-shutdown") .arg(format!("--port={port}")); let child = self.spawn_server_process(cmd).await?; let log_file = self.get_logfile()?; let plog = self.logger.prefixed(&log::new_code_server_prefix()); let (mut origin, listen_rx) = monitor_server::(child, Some(log_file), plog, false); let port = match timeout(Duration::from_secs(8), listen_rx).await { Err(_) => { origin.kill().await; return Err(CodeError::ServerOriginTimeout.into()); } Ok(Err(s)) => { origin.kill().await; return Err(CodeError::ServerUnexpectedExit(format!("{s}")).into()); } Ok(Ok(p)) => p, }; info!(self.logger, "Server started"); Ok(PortCodeServer { commit_id: self.server_params.release.commit.to_owned(), port, origin: Arc::new(origin), }) } /// Runs the command that just installs extensions and exits. pub async fn install_extensions(&self) -> Result<(), AnyError> { // cmd already has --install-extensions from base let mut cmd = self.get_base_command(); let cmd_str = || { self.server_params .code_server_args .command_arguments() .join(" ") }; let r = cmd.output().await.map_err(|e| CodeError::CommandFailed { command: cmd_str(), code: -1, output: e.to_string(), })?; check_output_status(r, cmd_str)?; Ok(()) } pub async fn listen_on_default_socket(&self) -> Result { let requested_file = get_socket_name(); self.listen_on_socket(&requested_file).await } pub async fn listen_on_socket(&self, socket: &Path) -> Result { Ok(spanf!( self.logger, self.logger.span("server.start").with_attributes(vec! { KeyValue::new("commit_id", self.server_params.release.commit.to_string()), KeyValue::new("quality", format!("{}", self.server_params.release.quality)), }), self._listen_on_socket(socket) )?) } async fn _listen_on_socket(&self, socket: &Path) -> Result { remove_file(&socket).await.ok(); // ignore any error if it doesn't exist let mut cmd = self.get_base_command(); cmd.arg("--start-server") .arg("--enable-remote-auto-shutdown") .arg(format!("--socket-path={}", socket.display())); let child = self.spawn_server_process(cmd).await?; let log_file = self.get_logfile()?; let plog = self.logger.prefixed(&log::new_code_server_prefix()); let (mut origin, listen_rx) = monitor_server::(child, Some(log_file), plog, false); let socket = match timeout(Duration::from_secs(30), listen_rx).await { Err(_) => { origin.kill().await; return Err(CodeError::ServerOriginTimeout.into()); } Ok(Err(s)) => { origin.kill().await; return Err(CodeError::ServerUnexpectedExit(format!("{s}")).into()); } Ok(Ok(socket)) => socket, }; info!(self.logger, "Server started"); Ok(SocketCodeServer { commit_id: self.server_params.release.commit.to_owned(), socket, origin: Arc::new(origin), }) } async fn spawn_server_process(&self, mut cmd: Command) -> Result { info!(self.logger, "Starting server..."); debug!(self.logger, "Starting server with command... {:?}", cmd); // On Windows spawning a code-server binary will run cmd.exe /c C:\path\to\code-server.cmd... // This spawns a cmd.exe window for the user, which if they close will kill the code-server process // and disconnect the tunnel. To prevent this, pass the CREATE_NO_WINDOW flag to the Command // only on Windows. // Original issue: https://github.com/microsoft/vscode/issues/184058 // Partial fix: https://github.com/microsoft/vscode/pull/184621 #[cfg(target_os = "windows")] let cmd = cmd.creation_flags( winapi::um::winbase::CREATE_NO_WINDOW | winapi::um::winbase::CREATE_NEW_PROCESS_GROUP | get_should_use_breakaway_from_job() .await .then_some(winapi::um::winbase::CREATE_BREAKAWAY_FROM_JOB) .unwrap_or_default(), ); let child = cmd .stderr(std::process::Stdio::piped()) .stdout(std::process::Stdio::piped()) .spawn() .map_err(|e| CodeError::ServerUnexpectedExit(format!("{e}")))?; self.server_paths .write_pid(child.id().expect("expected server to have pid"))?; Ok(child) } fn get_logfile(&self) -> Result { File::create(&self.server_paths.logfile).map_err(|e| { wrap( e, format!( "error creating log file {}", self.server_paths.logfile.display() ), ) }) } fn get_base_command(&self) -> Command { let mut cmd = new_script_command(&self.server_paths.executable); cmd.stdin(std::process::Stdio::null()) .args(self.server_params.code_server_args.command_arguments()); cmd } } fn monitor_server( mut child: Child, log_file: Option, plog: log::Logger, write_directly: bool, ) -> (CodeServerOrigin, Receiver) where M: ServerOutputMatcher, R: 'static + Send + std::fmt::Debug, { let stdout = child .stdout .take() .expect("child did not have a handle to stdout"); let stderr = child .stderr .take() .expect("child did not have a handle to stdout"); let (listen_tx, listen_rx) = tokio::sync::oneshot::channel(); // Handle stderr and stdout in a separate task. Initially scan lines looking // for the listening port. Afterwards, just scan and write out to the file. tokio::spawn(async move { let mut stdout_reader = BufReader::new(stdout).lines(); let mut stderr_reader = BufReader::new(stderr).lines(); let write_line = |line: &str| -> std::io::Result<()> { if let Some(mut f) = log_file.as_ref() { f.write_all(line.as_bytes())?; f.write_all(b"\n")?; } if write_directly { println!("{line}"); } else { trace!(plog, line); } Ok(()) }; loop { let line = tokio::select! { l = stderr_reader.next_line() => l, l = stdout_reader.next_line() => l, }; match line { Err(e) => { trace!(plog, "error reading from stdout/stderr: {}", e); return; } Ok(None) => break, Ok(Some(l)) => { write_line(&l).ok(); if let Some(listen_on) = M::match_line(&l) { trace!(plog, "parsed location: {:?}", listen_on); listen_tx.send(listen_on).ok(); break; } } } } loop { let line = tokio::select! { l = stderr_reader.next_line() => l, l = stdout_reader.next_line() => l, }; match line { Err(e) => { trace!(plog, "error reading from stdout/stderr: {}", e); break; } Ok(None) => break, Ok(Some(l)) => { write_line(&l).ok(); } } } }); let origin = CodeServerOrigin::New(Box::new(child)); (origin, listen_rx) } fn get_extensions_flag(extension_id: &str) -> String { format!("--install-extension={extension_id}") } /// A type that can be used to scan stdout from the VS Code server. Returns /// some other type that, in turn, is returned from starting the server. pub trait ServerOutputMatcher where R: Send, { fn match_line(line: &str) -> Option; } /// Parses a line like "Extension host agent listening on /tmp/foo.sock" struct SocketMatcher(); impl ServerOutputMatcher for SocketMatcher { fn match_line(line: &str) -> Option { parse_socket_from(line) } } /// Parses a line like "Extension host agent listening on 9000" pub struct PortMatcher(); impl ServerOutputMatcher for PortMatcher { fn match_line(line: &str) -> Option { parse_port_from(line) } } /// Parses a line like "Web UI available at http://localhost:9000/?tkn=..." pub struct WebUiMatcher(); impl ServerOutputMatcher for WebUiMatcher { fn match_line(line: &str) -> Option { WEB_UI_RE.captures(line).and_then(|cap| { cap.get(1) .and_then(|uri| reqwest::Url::parse(uri.as_str()).ok()) }) } } /// Does not do any parsing and just immediately returns an empty result. pub struct NoOpMatcher(); impl ServerOutputMatcher<()> for NoOpMatcher { fn match_line(_: &str) -> Option<()> { Some(()) } } fn parse_socket_from(text: &str) -> Option { LISTENING_PORT_RE .captures(text) .and_then(|cap| cap.get(1).map(|path| PathBuf::from(path.as_str()))) } fn parse_port_from(text: &str) -> Option { LISTENING_PORT_RE.captures(text).and_then(|cap| { cap.get(1) .and_then(|path| path.as_str().parse::().ok()) }) } pub fn print_listening(log: &log::Logger, tunnel_name: &str) { debug!( log, "{} is listening for incoming connections", QUALITYLESS_SERVER_NAME ); let home_dir = dirs::home_dir().unwrap_or_else(|| PathBuf::from("")); let current_dir = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("")); let dir = if home_dir == current_dir { PathBuf::from("") } else { current_dir }; let base_web_url = match EDITOR_WEB_URL { Some(u) => u, None => return, }; let mut addr = url::Url::parse(base_web_url).unwrap(); { let mut ps = addr.path_segments_mut().unwrap(); ps.push("tunnel"); ps.push(tunnel_name); for segment in &dir { let as_str = segment.to_string_lossy(); if !(as_str.len() == 1 && as_str.starts_with(std::path::MAIN_SEPARATOR)) { ps.push(as_str.as_ref()); } } } let message = &format!("\nOpen this link in your browser {addr}\n"); log.result(message); } pub async fn download_cli_into_cache( cache: &DownloadCache, release: &Release, update_service: &UpdateService, ) -> Result { let cache_name = format!( "{}-{}-{}", release.quality, release.commit, release.platform ); let cli_dir = cache .create(&cache_name, |target_dir| async move { let tmpdir = tempfile::tempdir().map_err(|e| wrap(e, "error creating temp download dir"))?; let response = update_service.get_download_stream(release).await?; let name = response.url_path_basename().unwrap(); let archive_path = tmpdir.path().join(name); http::download_into_file(&archive_path, SilentCopyProgress(), response).await?; unzip_downloaded_release(&archive_path, &target_dir, SilentCopyProgress())?; Ok(()) }) .await?; let cli = std::fs::read_dir(cli_dir) .map_err(|_| CodeError::CorruptDownload("could not read cli folder contents"))? .next(); match cli { Some(Ok(cli)) => Ok(cli.path()), _ => { let _ = cache.delete(&cache_name); Err(CodeError::CorruptDownload("cli directory is empty").into()) } } } #[cfg(target_os = "windows")] async fn get_should_use_breakaway_from_job() -> bool { let mut cmd = Command::new("cmd"); cmd.creation_flags( winapi::um::winbase::CREATE_NO_WINDOW | winapi::um::winbase::CREATE_BREAKAWAY_FROM_JOB, ); cmd.args(["/C", "echo ok"]).output().await.is_ok() }