cli: add acquire_cli (#179837)

* cli: add acquire_cli

As given in my draft document, pipes a CLI of the given platform to the
specified process, for example:

```js
const cmd = await rpc.call('acquire_cli', {
	command: 'node',
	args: [
		'-e',
		'process.stdin.pipe(fs.createWriteStream("c:/users/conno/downloads/hello-cli"))',
	],
	platform: Platform.LinuxX64,
	quality: 'insider',
});
```

It genericizes caching so that the CLI is also cached on the host, just
like servers.

* fix bug
This commit is contained in:
Connor Peet
2023-04-13 11:18:48 -07:00
committed by GitHub
parent 24c44070ae
commit f743297aa1
15 changed files with 489 additions and 405 deletions

View File

@@ -3,7 +3,7 @@
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
use crate::async_pipe::get_socket_rw_stream;
use crate::constants::CONTROL_PORT;
use crate::constants::{CONTROL_PORT, PRODUCT_NAME_LONG};
use crate::log;
use crate::msgpack_rpc::U32PrefixedCodec;
use crate::rpc::{MaybeSync, RpcBuilder, RpcDispatcher, Serialization};
@@ -11,7 +11,7 @@ use crate::self_update::SelfUpdate;
use crate::state::LauncherPaths;
use crate::tunnels::protocol::HttpRequestParams;
use crate::tunnels::socket_signal::CloseReason;
use crate::update_service::{Platform, UpdateService};
use crate::update_service::{Platform, Release, TargetKind, UpdateService};
use crate::util::errors::{
wrap, AnyError, CodeError, InvalidRpcDataError, MismatchedLaunchModeError,
NoAttachedServerError,
@@ -23,6 +23,8 @@ use crate::util::io::SilentCopyProgress;
use crate::util::is_integrated_cli;
use crate::util::sync::{new_barrier, Barrier};
use futures::stream::FuturesUnordered;
use futures::FutureExt;
use opentelemetry::trace::SpanKind;
use opentelemetry::KeyValue;
use std::collections::HashMap;
@@ -37,16 +39,17 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, D
use tokio::sync::{mpsc, Mutex};
use super::code_server::{
AnyCodeServer, CodeServerArgs, ServerBuilder, ServerParamsRaw, SocketCodeServer,
download_cli_into_cache, AnyCodeServer, CodeServerArgs, ServerBuilder, ServerParamsRaw,
SocketCodeServer,
};
use super::dev_tunnels::ActiveTunnel;
use super::paths::prune_stopped_servers;
use super::port_forwarder::{PortForwarding, PortForwardingProcessor};
use super::protocol::{
CallServerHttpParams, CallServerHttpResult, ClientRequestMethod, EmptyObject, ForwardParams,
ForwardResult, GetHostnameResponse, HttpBodyParams, HttpHeadersParams, ServeParams, ServerLog,
ServerMessageParams, SpawnParams, SpawnResult, ToClientRequest, UnforwardParams, UpdateParams,
UpdateResult, VersionParams,
AcquireCliParams, CallServerHttpParams, CallServerHttpResult, ClientRequestMethod, EmptyObject,
ForwardParams, ForwardResult, GetHostnameResponse, HttpBodyParams, HttpHeadersParams,
ServeParams, ServerLog, ServerMessageParams, SpawnParams, SpawnResult, ToClientRequest,
UnforwardParams, UpdateParams, UpdateResult, VersionParams,
};
use super::server_bridge::ServerBridge;
use super::server_multiplexer::ServerMultiplexer;
@@ -284,8 +287,18 @@ async fn process_socket(
rpc.register_async("unforward", |p: UnforwardParams, c| async move {
handle_unforward(&c.log, &c.port_forwarding, p).await
});
rpc.register_duplex("spawn", |stream, p: SpawnParams, c| async move {
handle_spawn(&c.log, stream, p).await
rpc.register_async("acquire_cli", |p: AcquireCliParams, c| async move {
handle_acquire_cli(&c.launcher_paths, &c.http, &c.log, p).await
});
rpc.register_duplex("spawn", 3, |mut streams, p: SpawnParams, c| async move {
handle_spawn(
&c.log,
p,
Some(streams.remove(0)),
Some(streams.remove(0)),
Some(streams.remove(0)),
)
.await
});
rpc.register_sync("httpheaders", |p: HttpHeadersParams, c| {
if let Some(req) = c.http_requests.lock().unwrap().get(&p.req_id) {
@@ -507,7 +520,7 @@ async fn handle_serve(
Some(AnyCodeServer::Socket(s)) => s,
Some(_) => return Err(AnyError::from(MismatchedLaunchModeError())),
None => {
$sb.setup(None).await?;
$sb.setup().await?;
$sb.listen_on_default_socket().await?
}
}
@@ -734,82 +747,106 @@ async fn handle_call_server_http(
})
}
async fn handle_spawn(
async fn handle_acquire_cli(
paths: &LauncherPaths,
http: &Arc<FallbackSimpleHttp>,
log: &log::Logger,
mut duplex: DuplexStream,
params: SpawnParams,
params: AcquireCliParams,
) -> Result<SpawnResult, AnyError> {
let update_service = UpdateService::new(log.clone(), http.clone());
let release = match params.commit_id {
Some(commit) => Release {
name: format!("{} CLI", PRODUCT_NAME_LONG),
commit,
platform: params.platform,
quality: params.quality,
target: TargetKind::Cli,
},
None => {
update_service
.get_latest_commit(params.platform, TargetKind::Cli, params.quality)
.await?
}
};
let cli = download_cli_into_cache(&paths.cli_cache, &release, &update_service).await?;
let file = tokio::fs::File::open(cli)
.await
.map_err(|e| wrap(e, "error opening cli file"))?;
handle_spawn::<_, DuplexStream>(log, params.spawn, Some(file), None, None).await
}
async fn handle_spawn<Stdin, StdoutAndErr>(
log: &log::Logger,
params: SpawnParams,
stdin: Option<Stdin>,
stdout: Option<StdoutAndErr>,
stderr: Option<StdoutAndErr>,
) -> Result<SpawnResult, AnyError>
where
Stdin: AsyncRead + Unpin + Send,
StdoutAndErr: AsyncWrite + Unpin + Send,
{
debug!(
log,
"requested to spawn {} with args {:?}", params.command, params.args
);
let mut p = tokio::process::Command::new(&params.command)
.args(&params.args)
.envs(&params.env)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(CodeError::ProcessSpawnFailed)?;
let mut stdout = p.stdout.take().unwrap();
let mut stderr = p.stderr.take().unwrap();
let mut stdin = p.stdin.take().unwrap();
let (tx, mut rx) = mpsc::channel(4);
macro_rules! copy_stream_to {
($target:expr) => {
let tx = tx.clone();
tokio::spawn(async move {
let mut buf = vec![0; 4096];
loop {
let n = match $target.read(&mut buf).await {
Ok(0) | Err(_) => return,
Ok(n) => n,
};
if !tx.send(buf[..n].to_vec()).await.is_ok() {
return;
}
}
});
macro_rules! pipe_if_some {
($e: expr) => {
if $e.is_some() {
Stdio::piped()
} else {
Stdio::null()
}
};
}
copy_stream_to!(stdout);
copy_stream_to!(stderr);
let mut p = tokio::process::Command::new(&params.command)
.args(&params.args)
.envs(&params.env)
.stdin(pipe_if_some!(stdin))
.stdout(pipe_if_some!(stdout))
.stderr(pipe_if_some!(stderr))
.spawn()
.map_err(CodeError::ProcessSpawnFailed)?;
let futs = FuturesUnordered::new();
if let (Some(mut a), Some(mut b)) = (p.stdout.take(), stdout) {
futs.push(async move { tokio::io::copy(&mut a, &mut b).await }.boxed());
}
if let (Some(mut a), Some(mut b)) = (p.stderr.take(), stderr) {
futs.push(async move { tokio::io::copy(&mut a, &mut b).await }.boxed());
}
if let (Some(mut b), Some(mut a)) = (p.stdin.take(), stdin) {
futs.push(async move { tokio::io::copy(&mut a, &mut b).await }.boxed());
}
let mut stdin_buf = vec![0; 4096];
let closed = p.wait();
pin!(closed);
loop {
tokio::select! {
Ok(n) = duplex.read(&mut stdin_buf) => {
let _ = stdin.write_all(&stdin_buf[..n]).await;
},
Some(m) = rx.recv() => {
let _ = duplex.write_all(&m).await;
},
r = &mut closed => {
let r = match r {
Ok(e) => SpawnResult {
message: e.to_string(),
exit_code: e.code().unwrap_or(-1),
},
Err(e) => SpawnResult {
message: e.to_string(),
exit_code: -1,
},
};
let r = tokio::select! {
_ = futures::future::join_all(futs) => closed.await,
r = &mut closed => r
};
debug!(
log,
"spawned command {} exited with code {}", params.command, r.exit_code
);
let r = match r {
Ok(e) => SpawnResult {
message: e.to_string(),
exit_code: e.code().unwrap_or(-1),
},
Err(e) => SpawnResult {
message: e.to_string(),
exit_code: -1,
},
};
return Ok(r)
},
}
}
debug!(
log,
"spawned command {} exited with code {}", params.command, r.exit_code
);
Ok(r)
}