From 35425d369ada2ba625304f9adb626332f13d1753 Mon Sep 17 00:00:00 2001 From: Connor Peet Date: Mon, 11 Sep 2023 15:45:39 -0700 Subject: [PATCH] cli: propagate server closing (#192824) Previously this was never needed since the connection was only used for the ext host, which never closed. Part 1 of fixing #192521 --- cli/src/tunnels/protocol.rs | 6 ++++++ cli/src/tunnels/server_bridge.rs | 1 + cli/src/tunnels/socket_signal.rs | 30 ++++++++++++++++++++++++------ 3 files changed, 31 insertions(+), 6 deletions(-) diff --git a/cli/src/tunnels/protocol.rs b/cli/src/tunnels/protocol.rs index 316e3672ba6..5665714fed9 100644 --- a/cli/src/tunnels/protocol.rs +++ b/cli/src/tunnels/protocol.rs @@ -16,6 +16,7 @@ use serde::{Deserialize, Serialize}; #[allow(non_camel_case_types)] pub enum ClientRequestMethod<'a> { servermsg(RefServerMessageParams<'a>), + serverclose(ServerClosedParams), serverlog(ServerLog<'a>), makehttpreq(HttpRequestParams<'a>), version(VersionResponse), @@ -89,6 +90,11 @@ pub struct ServerMessageParams { pub body: Vec, } +#[derive(Serialize, Debug)] +pub struct ServerClosedParams { + pub i: u16, +} + #[derive(Serialize, Debug)] pub struct RefServerMessageParams<'a> { pub i: u16, diff --git a/cli/src/tunnels/server_bridge.rs b/cli/src/tunnels/server_bridge.rs index 50dde8e7303..f1a358279af 100644 --- a/cli/src/tunnels/server_bridge.rs +++ b/cli/src/tunnels/server_bridge.rs @@ -32,6 +32,7 @@ impl ServerBridge { match read.read(&mut read_buf).await { Err(_) => return, Ok(0) => { + let _ = target.server_closed().await; return; // EOF } Ok(s) => { diff --git a/cli/src/tunnels/socket_signal.rs b/cli/src/tunnels/socket_signal.rs index 53e6cd51567..9036c6ae3f9 100644 --- a/cli/src/tunnels/socket_signal.rs +++ b/cli/src/tunnels/socket_signal.rs @@ -9,7 +9,7 @@ use tokio::sync::mpsc; use crate::msgpack_rpc::MsgPackCaller; use super::{ - protocol::{ClientRequestMethod, RefServerMessageParams, ToClientRequest}, + protocol::{ClientRequestMethod, RefServerMessageParams, ServerClosedParams, ToClientRequest}, server_multiplexer::ServerMultiplexer, }; @@ -81,25 +81,43 @@ impl ServerMessageSink { } } + pub async fn server_closed(&mut self) -> Result<(), mpsc::error::SendError> { + self.server_message_or_closed(None).await + } + pub async fn server_message( &mut self, body: &[u8], ) -> Result<(), mpsc::error::SendError> { - let id = self.id; + self.server_message_or_closed(Some(body)).await + } + + async fn server_message_or_closed( + &mut self, + body: Option<&[u8]>, + ) -> Result<(), mpsc::error::SendError> { + let i = self.id; let mut tx = self.tx.take().unwrap(); - let body = self.get_server_msg_content(body); - let msg = RefServerMessageParams { i: id, body }; + let msg = body + .map(|b| self.get_server_msg_content(b)) + .map(|body| RefServerMessageParams { i, body }); let r = match &mut tx { ServerMessageDestination::Channel(tx) => { tx.send(SocketSignal::from_message(&ToClientRequest { id: None, - params: ClientRequestMethod::servermsg(msg), + params: match msg { + Some(msg) => ClientRequestMethod::servermsg(msg), + None => ClientRequestMethod::serverclose(ServerClosedParams { i }), + }, })) .await } ServerMessageDestination::Rpc(caller) => { - caller.notify("servermsg", msg); + match msg { + Some(msg) => caller.notify("servermsg", msg), + None => caller.notify("serverclose", ServerClosedParams { i }), + }; Ok(()) } };