cli: propagate server closing (#192824)

Previously this was never needed since the connection was only used for
the ext host, which never closed.

Part 1 of fixing #192521
This commit is contained in:
Connor Peet
2023-09-11 15:45:39 -07:00
committed by GitHub
parent 17015750a3
commit 35425d369a
3 changed files with 31 additions and 6 deletions

View File

@@ -16,6 +16,7 @@ use serde::{Deserialize, Serialize};
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
pub enum ClientRequestMethod<'a> { pub enum ClientRequestMethod<'a> {
servermsg(RefServerMessageParams<'a>), servermsg(RefServerMessageParams<'a>),
serverclose(ServerClosedParams),
serverlog(ServerLog<'a>), serverlog(ServerLog<'a>),
makehttpreq(HttpRequestParams<'a>), makehttpreq(HttpRequestParams<'a>),
version(VersionResponse), version(VersionResponse),
@@ -89,6 +90,11 @@ pub struct ServerMessageParams {
pub body: Vec<u8>, pub body: Vec<u8>,
} }
#[derive(Serialize, Debug)]
pub struct ServerClosedParams {
pub i: u16,
}
#[derive(Serialize, Debug)] #[derive(Serialize, Debug)]
pub struct RefServerMessageParams<'a> { pub struct RefServerMessageParams<'a> {
pub i: u16, pub i: u16,

View File

@@ -32,6 +32,7 @@ impl ServerBridge {
match read.read(&mut read_buf).await { match read.read(&mut read_buf).await {
Err(_) => return, Err(_) => return,
Ok(0) => { Ok(0) => {
let _ = target.server_closed().await;
return; // EOF return; // EOF
} }
Ok(s) => { Ok(s) => {

View File

@@ -9,7 +9,7 @@ use tokio::sync::mpsc;
use crate::msgpack_rpc::MsgPackCaller; use crate::msgpack_rpc::MsgPackCaller;
use super::{ use super::{
protocol::{ClientRequestMethod, RefServerMessageParams, ToClientRequest}, protocol::{ClientRequestMethod, RefServerMessageParams, ServerClosedParams, ToClientRequest},
server_multiplexer::ServerMultiplexer, server_multiplexer::ServerMultiplexer,
}; };
@@ -81,25 +81,43 @@ impl ServerMessageSink {
} }
} }
pub async fn server_closed(&mut self) -> Result<(), mpsc::error::SendError<SocketSignal>> {
self.server_message_or_closed(None).await
}
pub async fn server_message( pub async fn server_message(
&mut self, &mut self,
body: &[u8], body: &[u8],
) -> Result<(), mpsc::error::SendError<SocketSignal>> { ) -> Result<(), mpsc::error::SendError<SocketSignal>> {
let id = self.id; self.server_message_or_closed(Some(body)).await
}
async fn server_message_or_closed(
&mut self,
body: Option<&[u8]>,
) -> Result<(), mpsc::error::SendError<SocketSignal>> {
let i = self.id;
let mut tx = self.tx.take().unwrap(); let mut tx = self.tx.take().unwrap();
let body = self.get_server_msg_content(body); let msg = body
let msg = RefServerMessageParams { i: id, body }; .map(|b| self.get_server_msg_content(b))
.map(|body| RefServerMessageParams { i, body });
let r = match &mut tx { let r = match &mut tx {
ServerMessageDestination::Channel(tx) => { ServerMessageDestination::Channel(tx) => {
tx.send(SocketSignal::from_message(&ToClientRequest { tx.send(SocketSignal::from_message(&ToClientRequest {
id: None, id: None,
params: ClientRequestMethod::servermsg(msg), params: match msg {
Some(msg) => ClientRequestMethod::servermsg(msg),
None => ClientRequestMethod::serverclose(ServerClosedParams { i }),
},
})) }))
.await .await
} }
ServerMessageDestination::Rpc(caller) => { ServerMessageDestination::Rpc(caller) => {
caller.notify("servermsg", msg); match msg {
Some(msg) => caller.notify("servermsg", msg),
None => caller.notify("serverclose", ServerClosedParams { i }),
};
Ok(()) Ok(())
} }
}; };