diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 2658ff454e9..6ae6d83ddd3 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -320,3 +320,19 @@ jobs: run: cargo fmt --check - name: Run rustfmt checks on lightning-tests run: cd lightning-tests && cargo fmt --check + tor-connect: + runs-on: ubuntu-latest + env: + TOOLCHAIN: 1.75.0 + steps: + - name: Checkout source code + uses: actions/checkout@v4 + - name: Install tor + run: | + sudo apt install -y tor + - name: Install Rust ${{ env.TOOLCHAIN }} toolchain + run: | + curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --profile=minimal --default-toolchain ${{ env.TOOLCHAIN }} + - name: Test tor connections using lightning-net-tokio + run: | + TOR_PROXY="127.0.0.1:9050" RUSTFLAGS="--cfg=tor" cargo test --verbose --color always -p lightning-net-tokio diff --git a/Cargo.toml b/Cargo.toml index a0895fe1641..1eb7b572d8b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,4 +67,5 @@ check-cfg = [ "cfg(require_route_graph_test)", "cfg(simple_close)", "cfg(peer_storage)", + "cfg(tor)", ] diff --git a/lightning-net-tokio/Cargo.toml b/lightning-net-tokio/Cargo.toml index 6c45f40e3c8..af4845b7397 100644 --- a/lightning-net-tokio/Cargo.toml +++ b/lightning-net-tokio/Cargo.toml @@ -19,7 +19,7 @@ rustdoc-args = ["--cfg", "docsrs"] [dependencies] bitcoin = "0.32.2" lightning = { version = "0.3.0", path = "../lightning" } -tokio = { version = "1.35", features = [ "rt", "sync", "net", "time" ] } +tokio = { version = "1.35", features = [ "rt", "sync", "net", "time", "io-util" ] } [dev-dependencies] tokio = { version = "1.35", features = [ "macros", "rt", "rt-multi-thread", "sync", "net", "time" ] } diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 75886ebfb5f..0131929ddd6 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -37,6 +37,7 @@ use lightning::ln::msgs::SocketAddress; use lightning::ln::peer_handler; use lightning::ln::peer_handler::APeerManager; use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait; +use lightning::sign::EntropySource; use std::future::Future; use std::hash::Hash; @@ -51,6 +52,9 @@ use std::time::Duration; static ID_COUNTER: AtomicU64 = AtomicU64::new(0); +const CONNECT_OUTBOUND_TIMEOUT: u64 = 10; +const SOCKS5_CONNECT_OUTBOUND_TIMEOUT: u64 = 30; + // We only need to select over multiple futures in one place, and taking on the full `tokio/macros` // dependency tree in order to do so (which has broken our MSRV before) is excessive. Instead, we // define a trivial two- and three- select macro with the specific types we need and just use that. @@ -462,13 +466,157 @@ where PM::Target: APeerManager, { let connect_fut = async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }; - if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), connect_fut).await { + if let Ok(Ok(stream)) = + time::timeout(Duration::from_secs(CONNECT_OUTBOUND_TIMEOUT), connect_fut).await + { + Some(setup_outbound(peer_manager, their_node_id, stream)) + } else { + None + } +} + +/// Routes [`connect_outbound`] through Tor. Implements stream isolation for each connection +/// using a stream isolation parameter sourced from [`EntropySource::get_secure_random_bytes`]. +pub async fn tor_connect_outbound( + peer_manager: PM, their_node_id: PublicKey, addr: SocketAddress, tor_proxy_addr: SocketAddr, + entropy_source: ES, +) -> Option> +where + PM::Target: APeerManager, + ES::Target: EntropySource, +{ + let connect_fut = async { + tor_connect(addr, tor_proxy_addr, entropy_source).await.map(|s| s.into_std().unwrap()) + }; + if let Ok(Ok(stream)) = + time::timeout(Duration::from_secs(SOCKS5_CONNECT_OUTBOUND_TIMEOUT), connect_fut).await + { Some(setup_outbound(peer_manager, their_node_id, stream)) } else { None } } +async fn tor_connect( + addr: SocketAddress, tor_proxy_addr: SocketAddr, entropy_source: ES, +) -> Result +where + ES::Target: EntropySource, +{ + use std::io::{Cursor, Write}; + use tokio::io::AsyncReadExt; + + const IPV4_ADDR_LEN: usize = 4; + const IPV6_ADDR_LEN: usize = 16; + const HOSTNAME_MAX_LEN: usize = 255; + + // Constants defined in RFC 1928 and RFC 1929 + const VERSION: u8 = 5; + const NMETHODS: u8 = 1; + const USERNAME_PASSWORD_AUTH: u8 = 2; + const METHOD_SELECT_REPLY_LEN: usize = 2; + const USERNAME_PASSWORD_VERSION: u8 = 1; + const USERNAME_PASSWORD_REPLY_LEN: usize = 2; + const CMD_CONNECT: u8 = 1; + const RSV: u8 = 0; + const ATYP_IPV4: u8 = 1; + const ATYP_DOMAINNAME: u8 = 3; + const ATYP_IPV6: u8 = 4; + const SUCCESS: u8 = 0; + + // Tor extensions, see https://spec.torproject.org/socks-extensions.html for further details + const USERNAME: &[u8] = b"0"; + const USERNAME_LEN: usize = USERNAME.len(); + const PASSWORD_LEN: usize = 32; + + const USERNAME_PASSWORD_REQUEST_LEN: usize = + 1 /* VER */ + 1 /* ULEN */ + USERNAME_LEN + 1 /* PLEN */ + PASSWORD_LEN; + const SOCKS5_REQUEST_MAX_LEN: usize = 1 /* VER */ + 1 /* CMD */ + 1 /* RSV */ + 1 /* ATYP */ + + 1 /* HOSTNAME len */ + HOSTNAME_MAX_LEN /* HOSTNAME */ + 2 /* PORT */; + + let method_selection_request = [VERSION, NMETHODS, USERNAME_PASSWORD_AUTH]; + let mut tcp_stream = TcpStream::connect(&tor_proxy_addr).await.map_err(|_| ())?; + tokio::io::AsyncWriteExt::write_all(&mut tcp_stream, &method_selection_request) + .await + .map_err(|_| ())?; + + let mut method_selection_reply = [0u8; METHOD_SELECT_REPLY_LEN]; + tcp_stream.read_exact(&mut method_selection_reply).await.map_err(|_| ())?; + if method_selection_reply != [VERSION, USERNAME_PASSWORD_AUTH] { + return Err(()); + } + + let password: [u8; PASSWORD_LEN] = entropy_source.get_secure_random_bytes(); + let mut username_password_request = [0u8; USERNAME_PASSWORD_REQUEST_LEN]; + let mut writer = Cursor::new(&mut username_password_request[..]); + writer.write_all(&[USERNAME_PASSWORD_VERSION, USERNAME_LEN as u8]).map_err(|_| ())?; + writer.write_all(USERNAME).map_err(|_| ())?; + writer.write_all(&[PASSWORD_LEN as u8]).map_err(|_| ())?; + writer.write_all(&password).map_err(|_| ())?; + debug_assert_eq!(writer.position() as usize, USERNAME_PASSWORD_REQUEST_LEN); + tokio::io::AsyncWriteExt::write_all(&mut tcp_stream, &username_password_request) + .await + .map_err(|_| ())?; + + let mut username_password_reply = [0u8; USERNAME_PASSWORD_REPLY_LEN]; + tcp_stream.read_exact(&mut username_password_reply).await.map_err(|_| ())?; + if username_password_reply != [USERNAME_PASSWORD_VERSION, SUCCESS] { + return Err(()); + } + + let mut socks5_request = [0u8; SOCKS5_REQUEST_MAX_LEN]; + let mut writer = Cursor::new(&mut socks5_request[..]); + writer.write_all(&[VERSION, CMD_CONNECT, RSV]).map_err(|_| ())?; + match addr { + SocketAddress::TcpIpV4 { addr, port } => { + writer.write_all(&[ATYP_IPV4]).map_err(|_| ())?; + writer.write_all(&addr).map_err(|_| ())?; + writer.write_all(&port.to_be_bytes()).map_err(|_| ())?; + }, + SocketAddress::TcpIpV6 { addr, port } => { + writer.write_all(&[ATYP_IPV6]).map_err(|_| ())?; + writer.write_all(&addr).map_err(|_| ())?; + writer.write_all(&port.to_be_bytes()).map_err(|_| ())?; + }, + ref onion_v3 @ SocketAddress::OnionV3 { port, .. } => { + let onion_v3_url = onion_v3.to_string(); + let hostname = onion_v3_url.split_once(':').ok_or(())?.0.as_bytes(); + writer.write_all(&[ATYP_DOMAINNAME, hostname.len() as u8]).map_err(|_| ())?; + writer.write_all(hostname).map_err(|_| ())?; + writer.write_all(&port.to_be_bytes()).map_err(|_| ())?; + }, + SocketAddress::Hostname { hostname, port } => { + writer.write_all(&[ATYP_DOMAINNAME, hostname.len()]).map_err(|_| ())?; + writer.write_all(hostname.as_bytes()).map_err(|_| ())?; + writer.write_all(&port.to_be_bytes()).map_err(|_| ())?; + }, + SocketAddress::OnionV2 { .. } => return Err(()), + }; + let pos = writer.position() as usize; + tokio::io::AsyncWriteExt::write_all(&mut tcp_stream, &socks5_request[..pos]) + .await + .map_err(|_| ())?; + + let mut reply_buffer = [0u8; 4]; + tcp_stream.read_exact(&mut reply_buffer).await.map_err(|_| ())?; + if reply_buffer[..3] != [VERSION, SUCCESS, RSV] { + return Err(()); + } + match reply_buffer[3] { + ATYP_IPV4 => tcp_stream.read_exact(&mut [0u8; IPV4_ADDR_LEN]).await.map_err(|_| ())?, + ATYP_DOMAINNAME => { + let hostname_len = tcp_stream.read_u8().await.map_err(|_| ())? as usize; + let mut hostname_buffer = [0u8; HOSTNAME_MAX_LEN]; + tcp_stream.read_exact(&mut hostname_buffer[..hostname_len]).await.map_err(|_| ())? + }, + ATYP_IPV6 => tcp_stream.read_exact(&mut [0u8; IPV6_ADDR_LEN]).await.map_err(|_| ())?, + _ => return Err(()), + }; + tcp_stream.read_u16().await.map_err(|_| ())?; + + Ok(tcp_stream) +} + const SOCK_WAKER_VTABLE: task::RawWakerVTable = task::RawWakerVTable::new( clone_socket_waker, wake_socket_waker, @@ -941,4 +1089,61 @@ mod tests { async fn unthreaded_race_disconnect_accept() { race_disconnect_accept().await; } + + #[cfg(tor)] + #[tokio::test] + async fn test_tor_connect() { + use super::tor_connect; + use lightning::sign::EntropySource; + use std::net::SocketAddr; + + // Set TOR_PROXY=127.0.0.1:9050 + let tor_proxy_addr: SocketAddr = std::env!("TOR_PROXY").parse().unwrap(); + + struct TestEntropySource; + + impl EntropySource for TestEntropySource { + fn get_secure_random_bytes(&self) -> [u8; 32] { + [0xffu8; 32] + } + } + + let entropy_source = TestEntropySource; + + // Success cases + + for addr_str in [ + // google.com + "142.250.189.196:80", + // google.com + "[2607:f8b0:4005:813::2004]:80", + // torproject.org + "torproject.org:80", + // torproject.org + "2gzyxa5ihm7nsggfxnu52rck2vv4rvmdlkiu3zzui5du4xyclen53wid.onion:80", + ] { + let addr: SocketAddress = addr_str.parse().unwrap(); + let tcp_stream = tor_connect(addr, tor_proxy_addr, &entropy_source).await.unwrap(); + assert_eq!( + tcp_stream.try_read(&mut [0u8; 1]).unwrap_err().kind(), + std::io::ErrorKind::WouldBlock + ); + } + + // Failure cases + + for addr_str in [ + // google.com, with some invalid port + "142.250.189.196:1234", + // google.com, with some invalid port + "[2607:f8b0:4005:813::2004]:1234", + // torproject.org, with some invalid port + "torproject.org:1234", + // torproject.org, with a typo + "3gzyxa5ihm7nsggfxnu52rck2vv4rvmdlkiu3zzui5du4xyclen53wid.onion:80", + ] { + let addr: SocketAddress = addr_str.parse().unwrap(); + assert!(tor_connect(addr, tor_proxy_addr, &entropy_source).await.is_err()); + } + } }