forwarding: add built-in tunnel forwarding extension (#189874)

* forwarding: add built-in tunnel forwarding extension

- Support public/private ports, which accounts for most of the work in
  the CLI. Previously ports were only privat.
- Make the extension built-in. Ported from the remote-containers
  extension with some tweaks for privacy and durability.
- This also removes the opt-in flag, by not reimplementing it 😛

Fixes https://github.com/microsoft/vscode/issues/189677
Fixes https://github.com/microsoft/vscode/issues/189678

* fixup! comments

---------

Co-authored-by: Raymond Zhao <7199958+rzhao271@users.noreply.github.com>
This commit is contained in:
Connor Peet
2023-08-08 14:00:03 -07:00
committed by GitHub
parent 2159f75d4b
commit 71282c3d52
19 changed files with 664 additions and 35 deletions
+3 -3
View File
@@ -35,7 +35,7 @@ use crate::{
code_server::CodeServerArgs,
create_service_manager,
dev_tunnels::{self, DevTunnels},
forwarding, legal,
local_forwarding, legal,
paths::get_all_servers,
protocol, serve_stream,
shutdown_signal::ShutdownRequest,
@@ -444,7 +444,7 @@ pub async fn forward(
match acquire_singleton(&ctx.paths.forwarding_lockfile()).await {
Ok(SingletonConnection::Client(stream)) => {
debug!(ctx.log, "starting as client to singleton");
let r = forwarding::client(forwarding::SingletonClientArgs {
let r = local_forwarding::client(local_forwarding::SingletonClientArgs {
log: ctx.log.clone(),
shutdown: shutdown.clone(),
stream,
@@ -477,7 +477,7 @@ pub async fn forward(
.start_new_launcher_tunnel(None, true, &forward_args.ports)
.await?;
forwarding::server(ctx.log, tunnel, server, own_ports_rx, shutdown).await?;
local_forwarding::server(ctx.log, tunnel, server, own_ports_rx, shutdown).await?;
Ok(0)
}
+1 -1
View File
@@ -11,7 +11,7 @@ pub mod protocol;
pub mod shutdown_signal;
pub mod singleton_client;
pub mod singleton_server;
pub mod forwarding;
pub mod local_forwarding;
mod wsl_detect;
mod challenge;
+35 -4
View File
@@ -23,13 +23,15 @@ use std::time::Duration;
use tokio::sync::{mpsc, watch};
use tunnels::connections::{ForwardedPortConnection, RelayTunnelHost};
use tunnels::contracts::{
Tunnel, TunnelPort, TunnelRelayTunnelEndpoint, PORT_TOKEN, TUNNEL_PROTOCOL_AUTO,
Tunnel, TunnelAccessControl, TunnelPort, TunnelRelayTunnelEndpoint, PORT_TOKEN,
TUNNEL_ACCESS_SCOPES_CONNECT, TUNNEL_PROTOCOL_AUTO,
};
use tunnels::management::{
new_tunnel_management, HttpError, TunnelLocator, TunnelManagementClient, TunnelRequestOptions,
NO_REQUEST_OPTIONS,
};
use super::protocol::PortPrivacy;
use super::wsl_detect::is_wsl_installed;
static TUNNEL_COUNT_LIMIT_NAME: &str = "TunnelsPerUserPerLocation";
@@ -164,8 +166,12 @@ impl ActiveTunnel {
}
/// Forwards a port over TCP.
pub async fn add_port_tcp(&self, port_number: u16) -> Result<(), AnyError> {
self.manager.add_port_tcp(port_number).await?;
pub async fn add_port_tcp(
&self,
port_number: u16,
privacy: PortPrivacy,
) -> Result<(), AnyError> {
self.manager.add_port_tcp(port_number, privacy).await?;
Ok(())
}
@@ -866,13 +872,18 @@ impl ActiveTunnelManager {
/// Adds a port for TCP/IP forwarding.
#[allow(dead_code)] // todo: port forwarding
pub async fn add_port_tcp(&self, port_number: u16) -> Result<(), WrappedError> {
pub async fn add_port_tcp(
&self,
port_number: u16,
privacy: PortPrivacy,
) -> Result<(), WrappedError> {
self.relay
.lock()
.await
.add_port(&TunnelPort {
port_number,
protocol: Some(TUNNEL_PROTOCOL_AUTO.to_owned()),
access_control: Some(privacy_to_tunnel_acl(privacy)),
..Default::default()
})
.await
@@ -1081,6 +1092,26 @@ fn vec_eq_as_set(a: &[String], b: &[String]) -> bool {
true
}
fn privacy_to_tunnel_acl(privacy: PortPrivacy) -> TunnelAccessControl {
let mut acl = TunnelAccessControl { entries: vec![] };
if privacy == PortPrivacy::Public {
acl.entries
.push(tunnels::contracts::TunnelAccessControlEntry {
kind: tunnels::contracts::TunnelAccessControlEntryType::Anonymous,
provider: None,
is_inherited: false,
is_deny: false,
is_inverse: false,
organization: None,
subjects: vec![],
scopes: vec![TUNNEL_ACCESS_SCOPES_CONNECT.to_string()],
});
}
acl
}
#[cfg(test)]
mod test {
use super::*;
@@ -5,6 +5,7 @@
use std::{
collections::HashMap,
ops::{Index, IndexMut},
sync::{Arc, Mutex},
};
@@ -26,11 +27,52 @@ use super::{
protocol::{
self,
forward_singleton::{PortList, SetPortsResponse},
PortPrivacy,
},
shutdown_signal::ShutdownSignal,
};
type PortMap = HashMap<u16, u32>;
#[derive(Default, Clone)]
struct PortCount {
public: u32,
private: u32,
}
impl Index<PortPrivacy> for PortCount {
type Output = u32;
fn index(&self, privacy: PortPrivacy) -> &Self::Output {
match privacy {
PortPrivacy::Public => &self.public,
PortPrivacy::Private => &self.private,
}
}
}
impl IndexMut<PortPrivacy> for PortCount {
fn index_mut(&mut self, privacy: PortPrivacy) -> &mut Self::Output {
match privacy {
PortPrivacy::Public => &mut self.public,
PortPrivacy::Private => &mut self.private,
}
}
}
impl PortCount {
fn is_empty(&self) -> bool {
self.public == 0 && self.private == 0
}
fn primary_privacy(&self) -> PortPrivacy {
if self.public > 0 {
PortPrivacy::Public
} else {
PortPrivacy::Private
}
}
}
type PortMap = HashMap<u16, PortCount>;
/// The PortForwardingHandle is given out to multiple consumers to allow
/// them to set_ports that they want to be forwarded.
@@ -56,23 +98,25 @@ impl PortForwardingSender {
self.sender.lock().unwrap().send_modify(|v| {
for p in current.iter() {
if !ports.contains(p) {
match v.get(p) {
Some(1) => {
v.remove(p);
}
Some(n) => {
v.insert(*p, n - 1);
}
None => unreachable!("removed port not in map"),
let n = v.get_mut(&p.number).expect("expected port in map");
n[p.privacy] -= 1;
if n.is_empty() {
v.remove(&p.number);
}
}
}
for p in ports.iter() {
if !current.contains(p) {
match v.get(p) {
Some(n) => v.insert(*p, n + 1),
None => v.insert(*p, 1),
match v.get_mut(&p.number) {
Some(n) => {
n[p.privacy] += 1;
}
None => {
let mut pc = PortCount::default();
pc[p.privacy] += 1;
v.insert(p.number, pc);
}
};
}
}
@@ -116,23 +160,26 @@ impl PortForwardingReceiver {
/// Applies all changes from PortForwardingHandles to the tunnel.
pub async fn apply_to(&mut self, log: log::Logger, tunnel: Arc<ActiveTunnel>) {
let mut current = vec![];
let mut current: PortMap = HashMap::new();
while self.receiver.changed().await.is_ok() {
let next = self.receiver.borrow().keys().copied().collect::<Vec<_>>();
let next = self.receiver.borrow().clone();
for p in current.iter() {
if !next.contains(p) {
match tunnel.remove_port(*p).await {
Ok(_) => info!(log, "stopped forwarding port {}", p),
Err(e) => error!(log, "failed to stop forwarding port {}: {}", p, e),
for (port, count) in current.iter() {
let privacy = count.primary_privacy();
if !matches!(next.get(port), Some(n) if n.primary_privacy() == privacy) {
match tunnel.remove_port(*port).await {
Ok(_) => info!(log, "stopped forwarding port {} at {:?}", *port, privacy),
Err(e) => error!(log, "failed to stop forwarding port {}: {}", port, e),
}
}
}
for p in next.iter() {
if !current.contains(p) {
match tunnel.add_port_tcp(*p).await {
Ok(_) => info!(log, "forwarding port {}", p),
Err(e) => error!(log, "failed to forward port {}: {}", p, e),
for (port, count) in next.iter() {
let privacy = count.primary_privacy();
if !matches!(current.get(port), Some(n) if n.primary_privacy() == privacy) {
match tunnel.add_port_tcp(*port, privacy).await {
Ok(_) => info!(log, "forwarding port {} at {:?}", port, privacy),
Err(e) => error!(log, "failed to forward port {}: {}", port, e),
}
}
}
+2 -2
View File
@@ -12,7 +12,7 @@ use crate::{
util::errors::{AnyError, CannotForwardControlPort, ServerHasClosed},
};
use super::dev_tunnels::ActiveTunnel;
use super::{dev_tunnels::ActiveTunnel, protocol::PortPrivacy};
pub enum PortForwardingRec {
Forward(u16, oneshot::Sender<Result<String, AnyError>>),
@@ -87,7 +87,7 @@ impl PortForwardingProcessor {
}
if !self.forwarded.contains(&port) {
tunnel.add_port_tcp(port).await?;
tunnel.add_port_tcp(port, PortPrivacy::Private).await?;
self.forwarded.insert(port);
}
+16 -1
View File
@@ -214,12 +214,27 @@ pub struct ChallengeVerifyParams {
pub response: String,
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Copy, Clone, Debug)]
#[serde(rename_all = "lowercase")]
pub enum PortPrivacy {
Public,
Private,
}
pub mod forward_singleton {
use serde::{Deserialize, Serialize};
use super::PortPrivacy;
pub const METHOD_SET_PORTS: &str = "set_ports";
pub type PortList = Vec<u16>;
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
pub struct PortRec {
pub number: u16,
pub privacy: PortPrivacy,
}
pub type PortList = Vec<PortRec>;
#[derive(Serialize, Deserialize)]
pub struct SetPortsParams {