NATS JetStream work distribution + adaptive chunk parallelism#138
Draft
ShubhamRasal wants to merge 26 commits intomainfrom
Draft
NATS JetStream work distribution + adaptive chunk parallelism#138ShubhamRasal wants to merge 26 commits intomainfrom
ShubhamRasal wants to merge 26 commits intomainfrom
Conversation
Use nuclei Go SDK (lib.NucleiEngine) to execute retest scans directly in-process, avoiding the slow binary startup that loads all templates and parses CLI flags. Returns a single output.ResultEvent matching the aurora/retest API response format. Co-Authored-By: Claude <[email protected]>
- Implement handleHTTPX using httpx runner SDK with OnResult callback for in-process HTTP probing (status code, title, tech detect, IP, CNAME) - Add handlePortProbe using raw net.DialTimeout for single TCP port checks (naabu is overkill — its CONNECT scan wraps the same net.DialTimeout) - Switch encoding/json to json-iterator/go for faster marshaling - Remove scan_id from HTTPXRequest (caller already knows it) - Add PortProbeRequest type to natsrpc types Co-Authored-By: Claude <[email protected]>
Replace file-based nats.UserCredentials with nats.UserJWT callbacks that read credentials from memory on every connect/reconnect. This eliminates temp file management and the full disconnect-reconnect cycle on credential refresh — now just nc.ForceReconnect() triggers the callbacks to pick up fresh creds from GetNATSCredentials(). Co-Authored-By: Claude <[email protected]>
Add per-agent direct subscription on {groupPrefix}.direct.{agentID}
for agent-specific commands. Includes:
- debug: returns system info, process RSS/CPU, Go runtime memory stats
- health-check: same as broadcast (agent identity + status)
- stop: graceful SIGTERM shutdown via direct command
Also add SubscribeDirect to natsrpc router and structured DebugData
types for UI-friendly JSON output.
Co-Authored-By: Claude <[email protected]>
Defines ScanRequest (ZSTD-compressed scan chunks) and AssetEnrichmentRequest (plain protobuf enumeration chunks) matching the server's serialization format. Co-Authored-By: Claude <[email protected]>
WorkerPool: pulls work notifications from the group stream using a FilterSubject-scoped durable consumer. Supports configurable parallelism via MaxAckPending. ConsumeChunks: shared durable consumer with FilterSubject for per-scan chunk subjects. Competing delivery across agents. NumPending-only drain check enables multi-agent flow-through without idle blocking. Chunk decoding: ZSTD magic detection — scan chunks (ZSTD+protobuf) vs enumeration chunks (plain protobuf). Poison-pill protection via Term after 5 failed redeliveries. Co-Authored-By: Claude <[email protected]>
Tests with embedded NATS+JetStream server: - WorkerPool receives and acks work messages - ConsumeChunks processes all chunks with ZSTD+protobuf decode - Competing consumers split chunks across agents - Heartbeat prevents redelivery past AckWait - MaxAckPending limits concurrent work - FilterSubject isolates work messages from chunk messages Co-Authored-By: Claude <[email protected]>
Adds --use-jetstream / PDCP_USE_JETSTREAM flag to switch between JetStream push model and legacy HTTP polling. When enabled: - Work consumer created on group stream with FilterSubject - processJetStreamScan: port filtering + nuclei execution per chunk - processJetStreamEnumeration: extracts steps from chunk config - Consumer names derived from agent ID (work) and scan ID (chunks) - Unknown work types Term'd immediately (no poison-pill loops) - Improved nuclei execution logging (targets, templates, ports) Legacy HTTP polling runs when flag is not set. Core NATS RPC (httpx, port-probe, nuclei-retest) runs in both modes. Co-Authored-By: Claude <[email protected]>
Consistent with cmd/pd-agent/main.go which already uses jsoniter. Switched in: natsrpc (router, worker, types), scanlog (parser, uploader), and types (scan_log). Co-Authored-By: Claude <[email protected]>
- Upgrade interactsh v1.2.4 → v1.3.0 to fix encrypted OOB response decoding against current public servers (oast.pro, oast.live, etc.) - Call ne.Close() before reading results so the interactsh cooldown poll fires while the callback is still active — OOB matches now land in results instead of being lost - Add template_url support (priority: encoded > url > id) - Clear result.Interaction to prevent JSON marshal failures from raw binary interactsh DNS data Co-Authored-By: Claude <[email protected]>
HTTPXRequest now takes a single target string instead of a list. This matches the sync request-reply pattern — one target, one response. Removed unused flags field. Co-Authored-By: Claude <[email protected]>
BREAKING: CLI and env var changes: - --agent-networks (slice) → --agent-network (single string) - --agent-group → removed (agent-network used for NATS queue group) - PDCP_AGENT_NETWORKS → PDCP_AGENT_NETWORK - AGENT_GROUP → AGENT_NETWORK - POST /v1/agents/in query param: agent_group → agent_network Also: - Add PDCP_ENABLE_AGENT_LOG_UPLOAD=false to disable agent log upload - Suppress verbose log upload confirmation messages Co-Authored-By: Claude <[email protected]>
C1: Update router_test.go to match HTTPXRequest single target field
(Targets[]string → Target string from commit 5355ae3)
C3: Remove 80-line debug block (main.go:1649-1729) that printed scan
configs, targets, templates, and ran redundant naabu port scans
to stdout via fmt.Println on every HTTP-polled scan
Co-Authored-By: Claude <[email protected]>
H3: reuse zstd decoder via package-level init (no per-chunk alloc) H4: store NATS subscriptions, unsubscribe before drain on reconnect H5: serialize cache file writes with saveMu, remove fire-and-forget goroutines H6: propagate lifecycle context through Router to all handlers M2: extract hardcoded version to build-time var (ldflags -X main.Version) M3: replace syscall.Rusage with runtime.MemStats (cross-platform) M4: document why SubscribeBroadcast/SubscribeDirect are separate methods M7: use encoding/json.RawMessage for Response.Data (cross-package compat) L2: remove commented-out passive discovery and deleteCacheFileForTesting Also: add PDCP_ENABLE_AGENT_LOG_UPLOAD toggle, suppress log upload noise Co-Authored-By: Claude <[email protected]>
- Move Agent last updated, Discovered network subnets, /in requests sent logs from INFO to DEBUG to reduce noise during normal operation - Gate DEBUG level in logHelper with early return unless -verbose is set - Remove ***** debug log lines from processJetStreamEnumeration Co-Authored-By: Claude <[email protected]>
… messages Plain Ack() is fire-and-forget — if the ack packet is lost on the wire, NATS keeps the message as outstanding. Once all agents stop pulling (Waiting Pulls: 0), there is nobody to redeliver to and the messages are orphaned forever. Switch to DoubleAck (server-confirmed ack) with up to 3 retries for both work and chunk message acknowledgments. Also set MaxDeliver=1 on the chunk consumer so that if a chunk is delivered and not acked within AckWait, NATS terminates it instead of keeping it as an outstanding ack indefinitely. Co-Authored-By: Claude <[email protected]>
…check, and request logging - restart: new NATS RPC on direct router for graceful in-process restart without external supervisor (K8s/systemd). Run() loops on restart flag, resetForRestart() clears transient state between cycles. - logs: in-memory ring buffer (5000 entries) exposed via "logs" RPC with offset/limit pagination and level filtering. - health-check: add idle/idle_since fields. Tracks active workers via atomic counter on WorkerPool; reports idle only when no tasks running and agent up > 1 min. - router: log all NATS RPC requests with method, subject, and duration. - portfilter: strip URL scheme before SplitHostPort to fix bogus port extraction from targets like https://host. - execute: replace gologger with slog, remove debug prints. Co-Authored-By: Claude <[email protected]>
… and runtime scaler - Resource detection (pkg/resourceprofile): cross-platform CPU, memory, FD detection with cgroup v1/v2 support for containers - Auto-detect formula: min(NumCPU*4, availMem/200MB, (fdLimit-1024)/20) clamped [1,64], using 0 as sentinel for auto-detect - ResizableSemaphore: channel-based concurrency control with runtime Resize for grow/shrink without disrupting in-flight work - Adaptive scaler: 30s control loop monitoring CPU/mem/FD pressure and chunk duration trends, with hysteresis (75%/80%/90%) and 60s cooldown - ConsumeChunks: parallel chunk fan-out via semaphore (replaces sequential batch processing), heartbeat before Acquire to prevent AckWait expiry, context.WithoutCancel only inside chunk goroutines for graceful shutdown - automaxprocs: cgroup-aware GOMAXPROCS via uber-go/automaxprocs - New test: TestConsumeChunks_ParallelProcessing verifying semaphore bounds Co-Authored-By: Claude <[email protected]>
- Tool prerequisites: checks nuclei, naabu, httpx, dnsx, tlsx, nmap at startup. Missing PD tools installed via go install or GitHub binary download. System tools (nmap) installed via apt/brew when running as root. - Idempotent: on restarts with all tools present, cost is N LookPath calls - Browser validation: runs httpx -screenshot test to download Chrome and verify shared library dependencies. Exits if Chrome deps are missing with install instructions. - Cross-platform: Linux (apt), macOS (brew), Windows (binary download + PowerShell), with Go archive/zip stdlib for extraction (no shell-out) Co-Authored-By: Claude <[email protected]>
…ixes Enumeration pipeline: - Linear flow: dnsx → port scan → httpx probe → screenshot → tlsx, each step gates the next (no httpx on dead hosts, no Chrome without confirmed web services) - Skip dnsx for IP-only targets - Quick port filter (naabu 80/443/8443) when port_scan step not present - No double uploads: tools with -dashboard upload themselves, manual upload only for tools without it and only if output non-empty - httpx -irr flag, output file collision fix (probe vs screenshot) Graceful shutdown: - Signal handler cancels contexts only, agentMode defer handles blocking wait and cleanup sequence (NATS drain, batcher flush) - context.WithoutCancel inside chunk goroutines so subprocesses finish, fetch loop still stops on cancellation Operational fixes: - /in heartbeat: 5min interval, 30s timeout, non-fatal after first registration (agent keeps scanning via NATS if API is down) - Health check: includes chunk-level activity (semaphore InUse) in tasks_running and idle status - NATS credential warning when JetStream enabled but creds not received - ensureNucleiTemplates always updates (was skipping if dir existed) - Resource profiler wired into agentMode with per-scan snapshots - Prereq system wired into main() with auto-install and browser check Co-Authored-By: Claude <[email protected]>
…ion pipeline rewrite Co-Authored-By: Claude <[email protected]>
- pkg/selfupdate: download binary from GitHub releases, verify with -version, replace running binary via rename + syscall.Exec - Two-phase update: DownloadAndVerify (agent still operational) then Apply (after draining connections). If download fails, agent keeps running with NATS connected. - Container detection: skips self-update in Docker/k8s (checks KUBERNETES_SERVICE_HOST, /.dockerenv, cgroup, mountinfo) - PDCP_UPDATE_URL env override for local testing/staging - Asset matching: maps darwin → macOS for GitHub release filenames - Handles older binaries without -version flag gracefully Co-Authored-By: Claude <[email protected]>
…filer - Makefile: VERSION from git describe, injected via -X main.Version into all build targets (build, build-linux-amd64, build-linux-arm64) - -version/--version flag: prints version and exits before any init - configureLogging: --verbose now enables slog debug level - Resource profiler sample log changed from Info to Debug (only visible with --verbose) Co-Authored-By: Claude <[email protected]>
- NATS RPC: handleUpdate registered on broadcast + direct routers for fleet-wide or targeted updates - Two-phase handler: downloads while agent is operational, drains connections only after binary is verified. On failure, agent keeps running. On Apply failure, restarts same binary to recover NATS. - Agent ID persistence: --agent-id CLI flag, injected into syscall.Exec args via ensureArg so new process keeps the same identity - xid validation: user-provided --agent-id must pass xid.FromString to prevent injection via CLI args - Startup log includes version: "pd-agent v1.0.0 — running in agent mode" Co-Authored-By: Claude <[email protected]>
Co-Authored-By: Claude <[email protected]>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Key changes
Adaptive parallelism
min(NumCPU×4, availMem/200MB, (fdLimit-1024)/20), clamped [1,64]automaxprocsfor cgroup-aware GOMAXPROCSEnumeration pipeline
New packages
pkg/resourceprofile/— cross-platform resource detection, profiler, semaphore, scalerpkg/prereq/— tool prerequisite checking and auto-installationBenchmarks (c2-standard-16, ~100 public targets)
Test plan
go test ./pkg/natsrpc/ -count=1 -timeout=60s— all tests pass including newTestConsumeChunks_ParallelProcessinggo vet ./...— cleanidle: falseduring active scanning