Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -320,3 +320,19 @@ jobs:
run: cargo fmt --check
- name: Run rustfmt checks on lightning-tests
run: cd lightning-tests && cargo fmt --check
tor-connect:
runs-on: ubuntu-latest
env:
TOOLCHAIN: 1.75.0
steps:
- name: Checkout source code
uses: actions/checkout@v4
- name: Install tor
run: |
sudo apt install -y tor
- name: Install Rust ${{ env.TOOLCHAIN }} toolchain
run: |
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --profile=minimal --default-toolchain ${{ env.TOOLCHAIN }}
- name: Test tor connections using lightning-net-tokio
run: |
TOR_PROXY="127.0.0.1:9050" RUSTFLAGS="--cfg=tor" cargo test --verbose --color always -p lightning-net-tokio
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,5 @@ check-cfg = [
"cfg(require_route_graph_test)",
"cfg(simple_close)",
"cfg(peer_storage)",
"cfg(tor)",
]
2 changes: 1 addition & 1 deletion lightning-net-tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ rustdoc-args = ["--cfg", "docsrs"]
[dependencies]
bitcoin = "0.32.2"
lightning = { version = "0.3.0", path = "../lightning" }
tokio = { version = "1.35", features = [ "rt", "sync", "net", "time" ] }
tokio = { version = "1.35", features = [ "rt", "sync", "net", "time", "io-util" ] }

[dev-dependencies]
tokio = { version = "1.35", features = [ "macros", "rt", "rt-multi-thread", "sync", "net", "time" ] }
Expand Down
207 changes: 206 additions & 1 deletion lightning-net-tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use lightning::ln::msgs::SocketAddress;
use lightning::ln::peer_handler;
use lightning::ln::peer_handler::APeerManager;
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
use lightning::sign::EntropySource;

use std::future::Future;
use std::hash::Hash;
Expand All @@ -51,6 +52,9 @@ use std::time::Duration;

static ID_COUNTER: AtomicU64 = AtomicU64::new(0);

const CONNECT_OUTBOUND_TIMEOUT: u64 = 10;
const SOCKS5_CONNECT_OUTBOUND_TIMEOUT: u64 = 30;

// We only need to select over multiple futures in one place, and taking on the full `tokio/macros`
// dependency tree in order to do so (which has broken our MSRV before) is excessive. Instead, we
// define a trivial two- and three- select macro with the specific types we need and just use that.
Expand Down Expand Up @@ -462,13 +466,157 @@ where
PM::Target: APeerManager<Descriptor = SocketDescriptor>,
{
let connect_fut = async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) };
if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), connect_fut).await {
if let Ok(Ok(stream)) =
time::timeout(Duration::from_secs(CONNECT_OUTBOUND_TIMEOUT), connect_fut).await
{
Some(setup_outbound(peer_manager, their_node_id, stream))
} else {
None
}
}

/// Routes [`connect_outbound`] through Tor. Implements stream isolation for each connection
/// using a stream isolation parameter sourced from [`EntropySource::get_secure_random_bytes`].
pub async fn tor_connect_outbound<PM: Deref + 'static + Send + Sync + Clone, ES: Deref>(
peer_manager: PM, their_node_id: PublicKey, addr: SocketAddress, tor_proxy_addr: SocketAddr,
entropy_source: ES,
) -> Option<impl std::future::Future<Output = ()>>
where
PM::Target: APeerManager<Descriptor = SocketDescriptor>,
ES::Target: EntropySource,
{
let connect_fut = async {
tor_connect(addr, tor_proxy_addr, entropy_source).await.map(|s| s.into_std().unwrap())
};
if let Ok(Ok(stream)) =
time::timeout(Duration::from_secs(SOCKS5_CONNECT_OUTBOUND_TIMEOUT), connect_fut).await
{
Some(setup_outbound(peer_manager, their_node_id, stream))
} else {
None
}
}

async fn tor_connect<ES: Deref>(
addr: SocketAddress, tor_proxy_addr: SocketAddr, entropy_source: ES,
) -> Result<TcpStream, ()>
where
ES::Target: EntropySource,
{
use std::io::{Cursor, Write};
use tokio::io::AsyncReadExt;

const IPV4_ADDR_LEN: usize = 4;
const IPV6_ADDR_LEN: usize = 16;
const HOSTNAME_MAX_LEN: usize = 255;

// Constants defined in RFC 1928 and RFC 1929
const VERSION: u8 = 5;
const NMETHODS: u8 = 1;
const USERNAME_PASSWORD_AUTH: u8 = 2;
const METHOD_SELECT_REPLY_LEN: usize = 2;
const USERNAME_PASSWORD_VERSION: u8 = 1;
const USERNAME_PASSWORD_REPLY_LEN: usize = 2;
const CMD_CONNECT: u8 = 1;
const RSV: u8 = 0;
const ATYP_IPV4: u8 = 1;
const ATYP_DOMAINNAME: u8 = 3;
const ATYP_IPV6: u8 = 4;
const SUCCESS: u8 = 0;

// Tor extensions, see https://spec.torproject.org/socks-extensions.html for further details
const USERNAME: &[u8] = b"<torS0X>0";
const USERNAME_LEN: usize = USERNAME.len();
const PASSWORD_LEN: usize = 32;

const USERNAME_PASSWORD_REQUEST_LEN: usize =
1 /* VER */ + 1 /* ULEN */ + USERNAME_LEN + 1 /* PLEN */ + PASSWORD_LEN;
const SOCKS5_REQUEST_MAX_LEN: usize = 1 /* VER */ + 1 /* CMD */ + 1 /* RSV */ + 1 /* ATYP */
+ 1 /* HOSTNAME len */ + HOSTNAME_MAX_LEN /* HOSTNAME */ + 2 /* PORT */;

let method_selection_request = [VERSION, NMETHODS, USERNAME_PASSWORD_AUTH];
let mut tcp_stream = TcpStream::connect(&tor_proxy_addr).await.map_err(|_| ())?;
tokio::io::AsyncWriteExt::write_all(&mut tcp_stream, &method_selection_request)
.await
.map_err(|_| ())?;

let mut method_selection_reply = [0u8; METHOD_SELECT_REPLY_LEN];
tcp_stream.read_exact(&mut method_selection_reply).await.map_err(|_| ())?;
if method_selection_reply != [VERSION, USERNAME_PASSWORD_AUTH] {
return Err(());
}

let password: [u8; PASSWORD_LEN] = entropy_source.get_secure_random_bytes();
let mut username_password_request = [0u8; USERNAME_PASSWORD_REQUEST_LEN];
let mut writer = Cursor::new(&mut username_password_request[..]);
writer.write_all(&[USERNAME_PASSWORD_VERSION, USERNAME_LEN as u8]).map_err(|_| ())?;
writer.write_all(USERNAME).map_err(|_| ())?;
writer.write_all(&[PASSWORD_LEN as u8]).map_err(|_| ())?;
writer.write_all(&password).map_err(|_| ())?;
debug_assert_eq!(writer.position() as usize, USERNAME_PASSWORD_REQUEST_LEN);
tokio::io::AsyncWriteExt::write_all(&mut tcp_stream, &username_password_request)
.await
.map_err(|_| ())?;

let mut username_password_reply = [0u8; USERNAME_PASSWORD_REPLY_LEN];
tcp_stream.read_exact(&mut username_password_reply).await.map_err(|_| ())?;
if username_password_reply != [USERNAME_PASSWORD_VERSION, SUCCESS] {
return Err(());
}

let mut socks5_request = [0u8; SOCKS5_REQUEST_MAX_LEN];
let mut writer = Cursor::new(&mut socks5_request[..]);
writer.write_all(&[VERSION, CMD_CONNECT, RSV]).map_err(|_| ())?;
match addr {
SocketAddress::TcpIpV4 { addr, port } => {
writer.write_all(&[ATYP_IPV4]).map_err(|_| ())?;
writer.write_all(&addr).map_err(|_| ())?;
writer.write_all(&port.to_be_bytes()).map_err(|_| ())?;
},
SocketAddress::TcpIpV6 { addr, port } => {
writer.write_all(&[ATYP_IPV6]).map_err(|_| ())?;
writer.write_all(&addr).map_err(|_| ())?;
writer.write_all(&port.to_be_bytes()).map_err(|_| ())?;
},
ref onion_v3 @ SocketAddress::OnionV3 { port, .. } => {
let onion_v3_url = onion_v3.to_string();
let hostname = onion_v3_url.split_once(':').ok_or(())?.0.as_bytes();
writer.write_all(&[ATYP_DOMAINNAME, hostname.len() as u8]).map_err(|_| ())?;
writer.write_all(hostname).map_err(|_| ())?;
writer.write_all(&port.to_be_bytes()).map_err(|_| ())?;
},
SocketAddress::Hostname { hostname, port } => {
writer.write_all(&[ATYP_DOMAINNAME, hostname.len()]).map_err(|_| ())?;
writer.write_all(hostname.as_bytes()).map_err(|_| ())?;
writer.write_all(&port.to_be_bytes()).map_err(|_| ())?;
},
SocketAddress::OnionV2 { .. } => return Err(()),
};
let pos = writer.position() as usize;
tokio::io::AsyncWriteExt::write_all(&mut tcp_stream, &socks5_request[..pos])
.await
.map_err(|_| ())?;

let mut reply_buffer = [0u8; 4];
tcp_stream.read_exact(&mut reply_buffer).await.map_err(|_| ())?;
if reply_buffer[..3] != [VERSION, SUCCESS, RSV] {
return Err(());
}
match reply_buffer[3] {
ATYP_IPV4 => tcp_stream.read_exact(&mut [0u8; IPV4_ADDR_LEN]).await.map_err(|_| ())?,
ATYP_DOMAINNAME => {
let hostname_len = tcp_stream.read_u8().await.map_err(|_| ())? as usize;
let mut hostname_buffer = [0u8; HOSTNAME_MAX_LEN];
tcp_stream.read_exact(&mut hostname_buffer[..hostname_len]).await.map_err(|_| ())?
},
ATYP_IPV6 => tcp_stream.read_exact(&mut [0u8; IPV6_ADDR_LEN]).await.map_err(|_| ())?,
_ => return Err(()),
};
tcp_stream.read_u16().await.map_err(|_| ())?;

Ok(tcp_stream)
}

const SOCK_WAKER_VTABLE: task::RawWakerVTable = task::RawWakerVTable::new(
clone_socket_waker,
wake_socket_waker,
Expand Down Expand Up @@ -941,4 +1089,61 @@ mod tests {
async fn unthreaded_race_disconnect_accept() {
race_disconnect_accept().await;
}

#[cfg(tor)]
#[tokio::test]
async fn test_tor_connect() {
use super::tor_connect;
use lightning::sign::EntropySource;
use std::net::SocketAddr;

// Set TOR_PROXY=127.0.0.1:9050
let tor_proxy_addr: SocketAddr = std::env!("TOR_PROXY").parse().unwrap();

struct TestEntropySource;

impl EntropySource for TestEntropySource {
fn get_secure_random_bytes(&self) -> [u8; 32] {
[0xffu8; 32]
}
}

let entropy_source = TestEntropySource;

// Success cases

for addr_str in [
// google.com
"142.250.189.196:80",
// google.com
"[2607:f8b0:4005:813::2004]:80",
// torproject.org
"torproject.org:80",
// torproject.org
"2gzyxa5ihm7nsggfxnu52rck2vv4rvmdlkiu3zzui5du4xyclen53wid.onion:80",
] {
let addr: SocketAddress = addr_str.parse().unwrap();
let tcp_stream = tor_connect(addr, tor_proxy_addr, &entropy_source).await.unwrap();
assert_eq!(
tcp_stream.try_read(&mut [0u8; 1]).unwrap_err().kind(),
std::io::ErrorKind::WouldBlock
);
}

// Failure cases

for addr_str in [
// google.com, with some invalid port
"142.250.189.196:1234",
// google.com, with some invalid port
"[2607:f8b0:4005:813::2004]:1234",
// torproject.org, with some invalid port
"torproject.org:1234",
// torproject.org, with a typo
"3gzyxa5ihm7nsggfxnu52rck2vv4rvmdlkiu3zzui5du4xyclen53wid.onion:80",
] {
let addr: SocketAddress = addr_str.parse().unwrap();
assert!(tor_connect(addr, tor_proxy_addr, &entropy_source).await.is_err());
}
}
}