fix: don't wait for stdin pump to end before propagating process exit (#196306)

This commit is contained in:
Connor Peet
2023-10-23 10:59:05 -07:00
committed by GitHub
parent 46566fa29d
commit ad29091f39

View File

@@ -1153,18 +1153,19 @@ where
let mut p = p.spawn().map_err(CodeError::ProcessSpawnFailed)?;
let futs = FuturesUnordered::new();
let block_futs = FuturesUnordered::new();
let poll_futs = FuturesUnordered::new();
if let (Some(mut a), Some(mut b)) = (p.stdout.take(), stdout) {
futs.push(async move { tokio::io::copy(&mut a, &mut b).await }.boxed());
block_futs.push(async move { tokio::io::copy(&mut a, &mut b).await }.boxed());
}
if let (Some(mut a), Some(mut b)) = (p.stderr.take(), stderr) {
futs.push(async move { tokio::io::copy(&mut a, &mut b).await }.boxed());
block_futs.push(async move { tokio::io::copy(&mut a, &mut b).await }.boxed());
}
if let (Some(mut b), Some(mut a)) = (p.stdin.take(), stdin) {
futs.push(async move { tokio::io::copy(&mut a, &mut b).await }.boxed());
poll_futs.push(async move { tokio::io::copy(&mut a, &mut b).await }.boxed());
}
wait_for_process_exit(log, &params.command, p, futs).await
wait_for_process_exit(log, &params.command, p, block_futs, poll_futs).await
}
async fn handle_spawn_cli(
@@ -1212,23 +1213,42 @@ async fn handle_spawn_cli(
}
debug!(log, "cli authenticated, attaching stdio");
let futs = FuturesUnordered::new();
futs.push(async move { tokio::io::copy(&mut protocol_in, &mut stdin).await }.boxed());
futs.push(async move { tokio::io::copy(&mut stderr, &mut protocol_out).await }.boxed());
futs.push(async move { log_pump.await.unwrap() }.boxed());
let block_futs = FuturesUnordered::new();
let poll_futs = FuturesUnordered::new();
poll_futs.push(async move { tokio::io::copy(&mut protocol_in, &mut stdin).await }.boxed());
block_futs.push(async move { tokio::io::copy(&mut stderr, &mut protocol_out).await }.boxed());
block_futs.push(async move { log_pump.await.unwrap() }.boxed());
wait_for_process_exit(log, &params.command, p, futs).await
wait_for_process_exit(log, &params.command, p, block_futs, poll_futs).await
}
type TokioCopyFuture = dyn futures::Future<Output = Result<u64, std::io::Error>> + Send;
async fn get_joined_result(
mut process: tokio::process::Child,
block_futs: FuturesUnordered<std::pin::Pin<Box<TokioCopyFuture>>>,
) -> Result<std::process::ExitStatus, std::io::Error> {
let (_, r) = tokio::join!(futures::future::join_all(block_futs), process.wait());
r
}
/// Wait for the process to exit and sends the spawn result. Waits until the
/// `block_futs` and the process have exited, and polls the `poll_futs` while
/// doing so.
async fn wait_for_process_exit(
log: &log::Logger,
command: &str,
mut process: tokio::process::Child,
futs: FuturesUnordered<std::pin::Pin<Box<TokioCopyFuture>>>,
process: tokio::process::Child,
block_futs: FuturesUnordered<std::pin::Pin<Box<TokioCopyFuture>>>,
poll_futs: FuturesUnordered<std::pin::Pin<Box<TokioCopyFuture>>>,
) -> Result<SpawnResult, AnyError> {
let (_, r) = tokio::join!(futures::future::join_all(futs), process.wait());
let joined = get_joined_result(process, block_futs);
pin!(joined);
let r = tokio::select! {
_ = futures::future::join_all(poll_futs) => joined.await,
r = &mut joined => r,
};
let r = match r {
Ok(e) => SpawnResult {