Skip to content

NATS JetStream work distribution + adaptive chunk parallelism#138

Draft
ShubhamRasal wants to merge 26 commits intomainfrom
nats-apis
Draft

NATS JetStream work distribution + adaptive chunk parallelism#138
ShubhamRasal wants to merge 26 commits intomainfrom
nats-apis

Conversation

@ShubhamRasal
Copy link
Copy Markdown
Member

Summary

  • NATS JetStream work distribution: replaces HTTP polling for scan/enumeration dispatch with push-based JetStream consumers, competing consumer pattern for multi-agent chunk distribution
  • Adaptive chunk parallelism: auto-detects machine resources (CPU, memory, FDs, cgroup-aware), computes optimal chunk parallelism at startup, runtime scaler adjusts based on resource pressure and chunk duration trends
  • Enumeration pipeline rewrite: linear gated flow (dnsx → port scan → httpx probe → screenshot → tlsx), each step gates the next — no httpx on dead hosts, no Chrome without confirmed web services
  • Prereq system: auto-installs missing PD tools, validates browser/Chrome dependencies at startup
  • Graceful shutdown: in-flight chunks finish on SIGTERM, proper context cancellation chain
  • Operational fixes: non-fatal heartbeat, health check with chunk-level activity, resource profiler

Key changes

Adaptive parallelism

  • Formula: min(NumCPU×4, availMem/200MB, (fdLimit-1024)/20), clamped [1,64]
  • ResizableSemaphore for runtime concurrency adjustment
  • Scaler control loop (30s): monitors CPU/mem/FD pressure + chunk duration trends
  • automaxprocs for cgroup-aware GOMAXPROCS

Enumeration pipeline

  • Skip dnsx for IP-only targets
  • Quick port filter (naabu 80/443/8443) before httpx when port_scan not in steps
  • Two-pass httpx: probe first (no Chrome), screenshot only on confirmed web services
  • No double uploads to dashboard

New packages

  • pkg/resourceprofile/ — cross-platform resource detection, profiler, semaphore, scaler
  • pkg/prereq/ — tool prerequisite checking and auto-installation

Benchmarks (c2-standard-16, ~100 public targets)

Chunk parallelism Wall clock Peak RSS Peak CPU
P=4 9m 56s 268 MB 4.6%
P=16 2m 52s 274 MB 92%
P=32 3m 06s 480 MB 125%

Test plan

  • go test ./pkg/natsrpc/ -count=1 -timeout=60s — all tests pass including new TestConsumeChunks_ParallelProcessing
  • go vet ./... — clean
  • Deploy to bench VM, run enumeration with 256+ targets, verify adaptive scaling logs
  • Verify graceful shutdown: SIGTERM → in-flight chunks finish → clean exit
  • Verify prereq: fresh VM → tools auto-installed → browser validated
  • Verify health check shows idle: false during active scanning

ShubhamRasal and others added 26 commits March 25, 2026 15:50
Use nuclei Go SDK (lib.NucleiEngine) to execute retest scans directly
in-process, avoiding the slow binary startup that loads all templates
and parses CLI flags. Returns a single output.ResultEvent matching
the aurora/retest API response format.

Co-Authored-By: Claude <[email protected]>
- Implement handleHTTPX using httpx runner SDK with OnResult callback
  for in-process HTTP probing (status code, title, tech detect, IP, CNAME)
- Add handlePortProbe using raw net.DialTimeout for single TCP port checks
  (naabu is overkill — its CONNECT scan wraps the same net.DialTimeout)
- Switch encoding/json to json-iterator/go for faster marshaling
- Remove scan_id from HTTPXRequest (caller already knows it)
- Add PortProbeRequest type to natsrpc types

Co-Authored-By: Claude <[email protected]>
Replace file-based nats.UserCredentials with nats.UserJWT callbacks
that read credentials from memory on every connect/reconnect. This
eliminates temp file management and the full disconnect-reconnect
cycle on credential refresh — now just nc.ForceReconnect() triggers
the callbacks to pick up fresh creds from GetNATSCredentials().

Co-Authored-By: Claude <[email protected]>
Add per-agent direct subscription on {groupPrefix}.direct.{agentID}
for agent-specific commands. Includes:
- debug: returns system info, process RSS/CPU, Go runtime memory stats
- health-check: same as broadcast (agent identity + status)
- stop: graceful SIGTERM shutdown via direct command

Also add SubscribeDirect to natsrpc router and structured DebugData
types for UI-friendly JSON output.

Co-Authored-By: Claude <[email protected]>
Defines ScanRequest (ZSTD-compressed scan chunks) and
AssetEnrichmentRequest (plain protobuf enumeration chunks) matching
the server's serialization format.

Co-Authored-By: Claude <[email protected]>
WorkerPool: pulls work notifications from the group stream using a
FilterSubject-scoped durable consumer. Supports configurable parallelism
via MaxAckPending.

ConsumeChunks: shared durable consumer with FilterSubject for per-scan
chunk subjects. Competing delivery across agents. NumPending-only drain
check enables multi-agent flow-through without idle blocking.

Chunk decoding: ZSTD magic detection — scan chunks (ZSTD+protobuf)
vs enumeration chunks (plain protobuf). Poison-pill protection via
Term after 5 failed redeliveries.

Co-Authored-By: Claude <[email protected]>
Tests with embedded NATS+JetStream server:
- WorkerPool receives and acks work messages
- ConsumeChunks processes all chunks with ZSTD+protobuf decode
- Competing consumers split chunks across agents
- Heartbeat prevents redelivery past AckWait
- MaxAckPending limits concurrent work
- FilterSubject isolates work messages from chunk messages

Co-Authored-By: Claude <[email protected]>
Adds --use-jetstream / PDCP_USE_JETSTREAM flag to switch between
JetStream push model and legacy HTTP polling. When enabled:

- Work consumer created on group stream with FilterSubject
- processJetStreamScan: port filtering + nuclei execution per chunk
- processJetStreamEnumeration: extracts steps from chunk config
- Consumer names derived from agent ID (work) and scan ID (chunks)
- Unknown work types Term'd immediately (no poison-pill loops)
- Improved nuclei execution logging (targets, templates, ports)

Legacy HTTP polling runs when flag is not set. Core NATS RPC
(httpx, port-probe, nuclei-retest) runs in both modes.

Co-Authored-By: Claude <[email protected]>
Consistent with cmd/pd-agent/main.go which already uses jsoniter.
Switched in: natsrpc (router, worker, types), scanlog (parser,
uploader), and types (scan_log).

Co-Authored-By: Claude <[email protected]>
- Upgrade interactsh v1.2.4 → v1.3.0 to fix encrypted OOB response
  decoding against current public servers (oast.pro, oast.live, etc.)
- Call ne.Close() before reading results so the interactsh cooldown
  poll fires while the callback is still active — OOB matches now
  land in results instead of being lost
- Add template_url support (priority: encoded > url > id)
- Clear result.Interaction to prevent JSON marshal failures from
  raw binary interactsh DNS data

Co-Authored-By: Claude <[email protected]>
HTTPXRequest now takes a single target string instead of a list.
This matches the sync request-reply pattern — one target, one response.
Removed unused flags field.

Co-Authored-By: Claude <[email protected]>
BREAKING: CLI and env var changes:
- --agent-networks (slice) → --agent-network (single string)
- --agent-group → removed (agent-network used for NATS queue group)
- PDCP_AGENT_NETWORKS → PDCP_AGENT_NETWORK
- AGENT_GROUP → AGENT_NETWORK
- POST /v1/agents/in query param: agent_group → agent_network

Also:
- Add PDCP_ENABLE_AGENT_LOG_UPLOAD=false to disable agent log upload
- Suppress verbose log upload confirmation messages

Co-Authored-By: Claude <[email protected]>
C1: Update router_test.go to match HTTPXRequest single target field
    (Targets[]string → Target string from commit 5355ae3)

C3: Remove 80-line debug block (main.go:1649-1729) that printed scan
    configs, targets, templates, and ran redundant naabu port scans
    to stdout via fmt.Println on every HTTP-polled scan

Co-Authored-By: Claude <[email protected]>
H3: reuse zstd decoder via package-level init (no per-chunk alloc)
H4: store NATS subscriptions, unsubscribe before drain on reconnect
H5: serialize cache file writes with saveMu, remove fire-and-forget goroutines
H6: propagate lifecycle context through Router to all handlers
M2: extract hardcoded version to build-time var (ldflags -X main.Version)
M3: replace syscall.Rusage with runtime.MemStats (cross-platform)
M4: document why SubscribeBroadcast/SubscribeDirect are separate methods
M7: use encoding/json.RawMessage for Response.Data (cross-package compat)
L2: remove commented-out passive discovery and deleteCacheFileForTesting
Also: add PDCP_ENABLE_AGENT_LOG_UPLOAD toggle, suppress log upload noise

Co-Authored-By: Claude <[email protected]>
- Move Agent last updated, Discovered network subnets, /in requests sent
  logs from INFO to DEBUG to reduce noise during normal operation
- Gate DEBUG level in logHelper with early return unless -verbose is set
- Remove ***** debug log lines from processJetStreamEnumeration

Co-Authored-By: Claude <[email protected]>
… messages

Plain Ack() is fire-and-forget — if the ack packet is lost on the wire,
NATS keeps the message as outstanding. Once all agents stop pulling
(Waiting Pulls: 0), there is nobody to redeliver to and the messages
are orphaned forever. Switch to DoubleAck (server-confirmed ack) with
up to 3 retries for both work and chunk message acknowledgments.

Also set MaxDeliver=1 on the chunk consumer so that if a chunk is
delivered and not acked within AckWait, NATS terminates it instead of
keeping it as an outstanding ack indefinitely.

Co-Authored-By: Claude <[email protected]>
…check, and request logging

- restart: new NATS RPC on direct router for graceful in-process restart
  without external supervisor (K8s/systemd). Run() loops on restart flag,
  resetForRestart() clears transient state between cycles.
- logs: in-memory ring buffer (5000 entries) exposed via "logs" RPC with
  offset/limit pagination and level filtering.
- health-check: add idle/idle_since fields. Tracks active workers via
  atomic counter on WorkerPool; reports idle only when no tasks running
  and agent up > 1 min.
- router: log all NATS RPC requests with method, subject, and duration.
- portfilter: strip URL scheme before SplitHostPort to fix bogus port
  extraction from targets like https://host.
- execute: replace gologger with slog, remove debug prints.

Co-Authored-By: Claude <[email protected]>
… and runtime scaler

- Resource detection (pkg/resourceprofile): cross-platform CPU, memory, FD
  detection with cgroup v1/v2 support for containers
- Auto-detect formula: min(NumCPU*4, availMem/200MB, (fdLimit-1024)/20)
  clamped [1,64], using 0 as sentinel for auto-detect
- ResizableSemaphore: channel-based concurrency control with runtime Resize
  for grow/shrink without disrupting in-flight work
- Adaptive scaler: 30s control loop monitoring CPU/mem/FD pressure and chunk
  duration trends, with hysteresis (75%/80%/90%) and 60s cooldown
- ConsumeChunks: parallel chunk fan-out via semaphore (replaces sequential
  batch processing), heartbeat before Acquire to prevent AckWait expiry,
  context.WithoutCancel only inside chunk goroutines for graceful shutdown
- automaxprocs: cgroup-aware GOMAXPROCS via uber-go/automaxprocs
- New test: TestConsumeChunks_ParallelProcessing verifying semaphore bounds

Co-Authored-By: Claude <[email protected]>
- Tool prerequisites: checks nuclei, naabu, httpx, dnsx, tlsx, nmap at
  startup. Missing PD tools installed via go install or GitHub binary
  download. System tools (nmap) installed via apt/brew when running as root.
- Idempotent: on restarts with all tools present, cost is N LookPath calls
- Browser validation: runs httpx -screenshot test to download Chrome and
  verify shared library dependencies. Exits if Chrome deps are missing
  with install instructions.
- Cross-platform: Linux (apt), macOS (brew), Windows (binary download +
  PowerShell), with Go archive/zip stdlib for extraction (no shell-out)

Co-Authored-By: Claude <[email protected]>
…ixes

Enumeration pipeline:
- Linear flow: dnsx → port scan → httpx probe → screenshot → tlsx,
  each step gates the next (no httpx on dead hosts, no Chrome without
  confirmed web services)
- Skip dnsx for IP-only targets
- Quick port filter (naabu 80/443/8443) when port_scan step not present
- No double uploads: tools with -dashboard upload themselves, manual
  upload only for tools without it and only if output non-empty
- httpx -irr flag, output file collision fix (probe vs screenshot)

Graceful shutdown:
- Signal handler cancels contexts only, agentMode defer handles blocking
  wait and cleanup sequence (NATS drain, batcher flush)
- context.WithoutCancel inside chunk goroutines so subprocesses finish,
  fetch loop still stops on cancellation

Operational fixes:
- /in heartbeat: 5min interval, 30s timeout, non-fatal after first
  registration (agent keeps scanning via NATS if API is down)
- Health check: includes chunk-level activity (semaphore InUse) in
  tasks_running and idle status
- NATS credential warning when JetStream enabled but creds not received
- ensureNucleiTemplates always updates (was skipping if dir existed)
- Resource profiler wired into agentMode with per-scan snapshots
- Prereq system wired into main() with auto-install and browser check

Co-Authored-By: Claude <[email protected]>
- pkg/selfupdate: download binary from GitHub releases, verify with
  -version, replace running binary via rename + syscall.Exec
- Two-phase update: DownloadAndVerify (agent still operational) then
  Apply (after draining connections). If download fails, agent keeps
  running with NATS connected.
- Container detection: skips self-update in Docker/k8s (checks
  KUBERNETES_SERVICE_HOST, /.dockerenv, cgroup, mountinfo)
- PDCP_UPDATE_URL env override for local testing/staging
- Asset matching: maps darwin → macOS for GitHub release filenames
- Handles older binaries without -version flag gracefully

Co-Authored-By: Claude <[email protected]>
…filer

- Makefile: VERSION from git describe, injected via -X main.Version
  into all build targets (build, build-linux-amd64, build-linux-arm64)
- -version/--version flag: prints version and exits before any init
- configureLogging: --verbose now enables slog debug level
- Resource profiler sample log changed from Info to Debug (only visible
  with --verbose)

Co-Authored-By: Claude <[email protected]>
- NATS RPC: handleUpdate registered on broadcast + direct routers
  for fleet-wide or targeted updates
- Two-phase handler: downloads while agent is operational, drains
  connections only after binary is verified. On failure, agent keeps
  running. On Apply failure, restarts same binary to recover NATS.
- Agent ID persistence: --agent-id CLI flag, injected into syscall.Exec
  args via ensureArg so new process keeps the same identity
- xid validation: user-provided --agent-id must pass xid.FromString
  to prevent injection via CLI args
- Startup log includes version: "pd-agent v1.0.0 — running in agent mode"

Co-Authored-By: Claude <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant