Skip to content

Commit 5f60f17

Browse files
committed
net-tokio: add fn socks5_connect_outbound
1 parent c722443 commit 5f60f17

File tree

3 files changed

+241
-2
lines changed

3 files changed

+241
-2
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,5 @@ check-cfg = [
6767
"cfg(require_route_graph_test)",
6868
"cfg(simple_close)",
6969
"cfg(peer_storage)",
70+
"cfg(tor_socks5)",
7071
]

lightning-net-tokio/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ rustdoc-args = ["--cfg", "docsrs"]
1919
[dependencies]
2020
bitcoin = "0.32.2"
2121
lightning = { version = "0.3.0", path = "../lightning" }
22-
tokio = { version = "1.35", features = [ "rt", "sync", "net", "time" ] }
22+
tokio = { version = "1.35", features = [ "rt", "sync", "net", "time", "io-util" ] }
2323

2424
[dev-dependencies]
2525
tokio = { version = "1.35", features = [ "macros", "rt", "rt-multi-thread", "sync", "net", "time" ] }

lightning-net-tokio/src/lib.rs

Lines changed: 239 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ use std::time::Duration;
5151

5252
static ID_COUNTER: AtomicU64 = AtomicU64::new(0);
5353

54+
const CONNECT_OUTBOUND_TIMEOUT: u64 = 10;
55+
const SOCKS5_CONNECT_OUTBOUND_TIMEOUT: u64 = 30;
56+
5457
// We only need to select over multiple futures in one place, and taking on the full `tokio/macros`
5558
// dependency tree in order to do so (which has broken our MSRV before) is excessive. Instead, we
5659
// define a trivial two- and three- select macro with the specific types we need and just use that.
@@ -462,13 +465,177 @@ where
462465
PM::Target: APeerManager<Descriptor = SocketDescriptor>,
463466
{
464467
let connect_fut = async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) };
465-
if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), connect_fut).await {
468+
if let Ok(Ok(stream)) =
469+
time::timeout(Duration::from_secs(CONNECT_OUTBOUND_TIMEOUT), connect_fut).await
470+
{
466471
Some(setup_outbound(peer_manager, their_node_id, stream))
467472
} else {
468473
None
469474
}
470475
}
471476

477+
/// Same as [`connect_outbound`], using a SOCKS5 proxy
478+
pub async fn socks5_connect_outbound<PM: Deref + 'static + Send + Sync + Clone>(
479+
peer_manager: PM, their_node_id: PublicKey, socks5_proxy_addr: SocketAddr, addr: SocketAddress,
480+
user_pass: Option<(&str, &str)>,
481+
) -> Option<impl std::future::Future<Output = ()>>
482+
where
483+
PM::Target: APeerManager<Descriptor = SocketDescriptor>,
484+
{
485+
let connect_fut = async {
486+
socks5_connect(socks5_proxy_addr, addr, user_pass).await.map(|s| s.into_std().unwrap())
487+
};
488+
if let Ok(Ok(stream)) =
489+
time::timeout(Duration::from_secs(SOCKS5_CONNECT_OUTBOUND_TIMEOUT), connect_fut).await
490+
{
491+
Some(setup_outbound(peer_manager, their_node_id, stream))
492+
} else {
493+
None
494+
}
495+
}
496+
497+
struct BufWriter<'a> {
498+
buf: &'a mut [u8],
499+
pos: usize,
500+
}
501+
502+
impl<'a> BufWriter<'a> {
503+
fn new(buf: &'a mut [u8]) -> Self {
504+
Self { buf, pos: 0 }
505+
}
506+
fn write_u8(&mut self, byte: u8) {
507+
self.buf[self.pos] = byte;
508+
self.pos += 1;
509+
}
510+
fn write_u16_be(&mut self, two_bytes: u16) {
511+
self.buf[self.pos..self.pos + 2].copy_from_slice(&two_bytes.to_be_bytes());
512+
self.pos += 2;
513+
}
514+
fn write_slice(&mut self, slice: &[u8]) {
515+
self.buf[self.pos..self.pos + slice.len()].copy_from_slice(slice);
516+
self.pos += slice.len();
517+
}
518+
}
519+
520+
async fn socks5_connect(
521+
socks5_proxy_addr: SocketAddr, addr: SocketAddress, user_pass: Option<(&str, &str)>,
522+
) -> Result<TcpStream, ()> {
523+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
524+
525+
const IPV4_ADDR_LEN: usize = 4;
526+
const IPV6_ADDR_LEN: usize = 16;
527+
const HOSTNAME_MAX_LEN: usize = 255;
528+
529+
// Constants defined in RFC 1928 and RFC 1929
530+
const VERSION: u8 = 5;
531+
const NMETHODS: u8 = 1;
532+
const NO_AUTH: u8 = 0;
533+
const USERNAME_PASSWORD_AUTH: u8 = 2;
534+
const METHOD_SELECT_REPLY_LEN: usize = 2;
535+
const USERNAME_PASSWORD_VERSION: u8 = 1;
536+
const USERNAME_PASSWORD_REPLY_LEN: usize = 2;
537+
const CMD_CONNECT: u8 = 1;
538+
const RSV: u8 = 0;
539+
const ATYP_IPV4: u8 = 1;
540+
const ATYP_DOMAINNAME: u8 = 3;
541+
const ATYP_IPV6: u8 = 4;
542+
const SUCCESS: u8 = 0;
543+
const USERNAME_MAX_LEN: usize = 255;
544+
const PASSWORD_MAX_LEN: usize = 255;
545+
546+
const USERNAME_PASSWORD_REQUEST_MAX_LEN: usize = 1 /* VER */ + 1 /* ULEN */ + USERNAME_MAX_LEN /* UNAME max len */
547+
+ 1 /* PLEN */ + PASSWORD_MAX_LEN /* PASSWD max len */;
548+
const SOCKS5_REQUEST_MAX_LEN: usize = 1 /* VER */ + 1 /* CMD */ + 1 /* RSV */ + 1 /* ATYP */
549+
+ 1 /* HOSTNAME len */ + HOSTNAME_MAX_LEN /* HOSTNAME */ + 2 /* PORT */;
550+
551+
let selected_auth = if user_pass.is_some() { USERNAME_PASSWORD_AUTH } else { NO_AUTH };
552+
let method_selection_request = [VERSION, NMETHODS, selected_auth];
553+
let mut tcp_stream = TcpStream::connect(&socks5_proxy_addr).await.map_err(|_| ())?;
554+
tcp_stream.write_all(&method_selection_request).await.map_err(|_| ())?;
555+
556+
let mut method_selection_reply = [0u8; METHOD_SELECT_REPLY_LEN];
557+
tcp_stream.read_exact(&mut method_selection_reply).await.map_err(|_| ())?;
558+
if method_selection_reply != [VERSION, selected_auth] {
559+
return Err(());
560+
}
561+
562+
if let Some((username, password)) = user_pass {
563+
if username.len() > USERNAME_MAX_LEN || password.len() > PASSWORD_MAX_LEN {
564+
return Err(());
565+
}
566+
567+
let mut username_password_request = [0u8; USERNAME_PASSWORD_REQUEST_MAX_LEN];
568+
let mut writer = BufWriter::new(&mut username_password_request);
569+
writer.write_u8(USERNAME_PASSWORD_VERSION);
570+
writer.write_u8(username.len() as u8);
571+
writer.write_slice(username.as_bytes());
572+
writer.write_u8(password.len() as u8);
573+
writer.write_slice(password.as_bytes());
574+
let pos = writer.pos;
575+
tcp_stream.write_all(&username_password_request[..pos]).await.map_err(|_| ())?;
576+
577+
let mut username_password_reply = [0u8; USERNAME_PASSWORD_REPLY_LEN];
578+
tcp_stream.read_exact(&mut username_password_reply).await.map_err(|_| ())?;
579+
if username_password_reply != [USERNAME_PASSWORD_VERSION, SUCCESS] {
580+
return Err(());
581+
}
582+
}
583+
584+
let mut socks5_request = [0u8; SOCKS5_REQUEST_MAX_LEN];
585+
let mut writer = BufWriter::new(&mut socks5_request);
586+
writer.write_u8(VERSION);
587+
writer.write_u8(CMD_CONNECT);
588+
writer.write_u8(RSV);
589+
match addr {
590+
SocketAddress::TcpIpV4 { addr, port } => {
591+
writer.write_u8(ATYP_IPV4);
592+
writer.write_slice(&addr);
593+
writer.write_u16_be(port);
594+
},
595+
SocketAddress::TcpIpV6 { addr, port } => {
596+
writer.write_u8(ATYP_IPV6);
597+
writer.write_slice(&addr);
598+
writer.write_u16_be(port);
599+
},
600+
ref onion_v3 @ SocketAddress::OnionV3 { port, .. } => {
601+
let onion_v3_url = onion_v3.to_string();
602+
let hostname = onion_v3_url.split_once(':').ok_or(())?.0.as_bytes();
603+
writer.write_u8(ATYP_DOMAINNAME);
604+
writer.write_u8(hostname.len() as u8);
605+
writer.write_slice(hostname);
606+
writer.write_u16_be(port);
607+
},
608+
SocketAddress::Hostname { hostname, port } => {
609+
writer.write_u8(ATYP_DOMAINNAME);
610+
writer.write_u8(hostname.len());
611+
writer.write_slice(hostname.as_bytes());
612+
writer.write_u16_be(port);
613+
},
614+
SocketAddress::OnionV2 { .. } => return Err(()),
615+
};
616+
let pos = writer.pos;
617+
tcp_stream.write_all(&socks5_request[..pos]).await.map_err(|_| ())?;
618+
619+
let mut reply_buffer = [0u8; 4];
620+
tcp_stream.read_exact(&mut reply_buffer).await.map_err(|_| ())?;
621+
if reply_buffer[..3] != [VERSION, SUCCESS, RSV] {
622+
return Err(());
623+
}
624+
match reply_buffer[3] {
625+
ATYP_IPV4 => tcp_stream.read_exact(&mut [0u8; IPV4_ADDR_LEN]).await.map_err(|_| ())?,
626+
ATYP_DOMAINNAME => {
627+
let hostname_len = tcp_stream.read_u8().await.map_err(|_| ())? as usize;
628+
let mut hostname_buffer = [0u8; HOSTNAME_MAX_LEN];
629+
tcp_stream.read_exact(&mut hostname_buffer[..hostname_len]).await.map_err(|_| ())?
630+
},
631+
ATYP_IPV6 => tcp_stream.read_exact(&mut [0u8; IPV6_ADDR_LEN]).await.map_err(|_| ())?,
632+
_ => return Err(()),
633+
};
634+
tcp_stream.read_u16().await.map_err(|_| ())?;
635+
636+
Ok(tcp_stream)
637+
}
638+
472639
const SOCK_WAKER_VTABLE: task::RawWakerVTable = task::RawWakerVTable::new(
473640
clone_socket_waker,
474641
wake_socket_waker,
@@ -608,6 +775,9 @@ impl Hash for SocketDescriptor {
608775

609776
#[cfg(test)]
610777
mod tests {
778+
#[cfg(tor_socks5)]
779+
use super::socks5_connect;
780+
611781
use bitcoin::constants::ChainHash;
612782
use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
613783
use bitcoin::Network;
@@ -621,6 +791,8 @@ mod tests {
621791
use tokio::sync::mpsc;
622792

623793
use std::mem;
794+
#[cfg(tor_socks5)]
795+
use std::net::SocketAddr;
624796
use std::sync::atomic::{AtomicBool, Ordering};
625797
use std::sync::{Arc, Mutex};
626798
use std::time::Duration;
@@ -941,4 +1113,70 @@ mod tests {
9411113
async fn unthreaded_race_disconnect_accept() {
9421114
race_disconnect_accept().await;
9431115
}
1116+
1117+
#[cfg(tor_socks5)]
1118+
#[tokio::test]
1119+
async fn test_socks5_connect() {
1120+
// Set TOR_SOCKS5_PROXY=127.0.0.1:9050
1121+
let socks5_proxy_addr: SocketAddr = std::env!("TOR_SOCKS5_PROXY").parse().unwrap();
1122+
1123+
// Success cases
1124+
1125+
for (addr_str, user_pass) in [
1126+
// google.com
1127+
("142.250.189.196:80", None),
1128+
// google.com
1129+
("[2607:f8b0:4005:813::2004]:80", None),
1130+
// torproject.org
1131+
("torproject.org:80", None),
1132+
// torproject.org
1133+
("2gzyxa5ihm7nsggfxnu52rck2vv4rvmdlkiu3zzui5du4xyclen53wid.onion:80", None),
1134+
// Same vectors as above, with a username and password
1135+
("142.250.189.196:80", Some(("<torS0X>0", ""))),
1136+
("[2607:f8b0:4005:813::2004]:80", Some(("<torS0X>0", "123"))),
1137+
("torproject.org:80", Some(("<torS0X>1abc", ""))),
1138+
(
1139+
"2gzyxa5ihm7nsggfxnu52rck2vv4rvmdlkiu3zzui5du4xyclen53wid.onion:80",
1140+
Some(("<torS0X>1abc", "123")),
1141+
),
1142+
] {
1143+
let addr: SocketAddress = addr_str.parse().unwrap();
1144+
let tcp_stream = socks5_connect(socks5_proxy_addr, addr, user_pass).await.unwrap();
1145+
assert_eq!(
1146+
tcp_stream.try_read(&mut [0u8; 1]).unwrap_err().kind(),
1147+
std::io::ErrorKind::WouldBlock
1148+
);
1149+
}
1150+
1151+
// Failure cases
1152+
1153+
for (addr_str, user_pass) in [
1154+
// google.com, with some invalid port
1155+
("142.250.189.196:1234", None),
1156+
// google.com, with some invalid port
1157+
("[2607:f8b0:4005:813::2004]:1234", None),
1158+
// torproject.org, with some invalid port
1159+
("torproject.org:1234", None),
1160+
// torproject.org, with a typo
1161+
("3gzyxa5ihm7nsggfxnu52rck2vv4rvmdlkiu3zzui5du4xyclen53wid.onion:80", None),
1162+
// Same vectors as above, with a username and password
1163+
("142.250.189.196:1234", Some(("<torS0X>0", ""))),
1164+
("[2607:f8b0:4005:813::2004]:1234", Some(("<torS0X>0", "123"))),
1165+
("torproject.org:1234", Some(("<torS0X>1abc", ""))),
1166+
(
1167+
"3gzyxa5ihm7nsggfxnu52rck2vv4rvmdlkiu3zzui5du4xyclen53wid.onion:80",
1168+
Some(("<torS0X>1abc", "123")),
1169+
),
1170+
/* TODO: Uncomment when format types 30 and 31 land in tor stable, see https://spec.torproject.org/socks-extensions.html,
1171+
these are invalid usernames according to those standards.
1172+
("142.250.189.196:80", Some(("<torS0X>0abc", "123"))),
1173+
("[2607:f8b0:4005:813::2004]:80", Some(("<torS0X>1", "123"))),
1174+
("torproject.org:80", Some(("<torS0X>9", "123"))),
1175+
("2gzyxa5ihm7nsggfxnu52rck2vv4rvmdlkiu3zzui5du4xyclen53wid.onion:80", Some(("<torS0X>", "123"))),
1176+
*/
1177+
] {
1178+
let addr: SocketAddress = addr_str.parse().unwrap();
1179+
assert!(socks5_connect(socks5_proxy_addr, addr, user_pass).await.is_err());
1180+
}
1181+
}
9441182
}

0 commit comments

Comments
 (0)