Skip to content

nshkrdotcom/slither

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

13 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Slither

Bounded, observable BEAM<->Python pipelines with shared state on ETS

Hex.pm HexDocs License

Slither is a low-level concurrency substrate for BEAM + Python systems, built on top of SnakeBridge and Snakepit.

It gives you three primitives:

  • Store: ETS-backed shared state (lock-free reads, serialized writes), exposed to Python via views.
  • Dispatch: batched fan-out to Python workers with bounded in-flight work (max_in_flight).
  • Pipe: a DSL to compose BEAM stages, Python stages, and routing in one supervised flow.

Why use Slither

Use Slither when you want Python to keep doing compute, while BEAM owns concurrency, state, and coordination.

Common wins:

  • no unbounded queue growth (backpressure)
  • no shared-thread state races in Python workers
  • run-scoped session affinity for Python state
  • consistent telemetry across runs, stages, and batches

Roadmap: 0.2.0 Unreleased

Current released version: 0.1.0.

The next exploration track is to make Slither the control plane for mixed Python concurrency models. Python 3.13+ free-threading can let one Python process run CPU-bound Python threads in parallel, but it does not replace BEAM-owned supervision, pool routing, state cleanup, backpressure, or process isolation. Slither should make those tradeoffs explicit and measurable.

Planned work:

  • Hybrid pool routing: first-class examples and helpers for routing I/O-bound work to process-isolated Snakepit pools and CPU-bound, thread-safe work to free-threaded Python pools.
  • Free-threaded runtime detection: distinguish Python version support from an actually free-threaded build with the GIL disabled, then expose that in diagnostics and routing decisions.
  • Capacity-aware dispatch: teach Slither.Dispatch to account for process capacity (pool_size) and threaded capacity (pool_size * threads_per_worker) when sizing batches and max_in_flight.
  • Stateful affinity patterns: document and test long-lived Python refs, model instances, caches, and open connections under strict affinity across both process and thread profiles.
  • Failure-domain demos: compare crash containment for process pools against shared-process thread pools, including worker recycling and session cleanup behavior.
  • Shared data benchmarks: measure when ETS-backed shared state plus Python views beats Python-thread shared mutable state, and when in-process free-threaded Python sharing is the better choice.
  • Library compatibility matrix: classify workloads by thread safety, GIL behavior, native-library thread pools, and memory profile so Slither can recommend process, thread, or hybrid execution.
  • Observability: add telemetry views that report queue depth, worker load, effective capacity, saturation, retries, and per-stage routing decisions.

Target use case:

A multi-tenant service that keeps Python compute close to the libraries that need it, while BEAM owns concurrency policy. Stateless or unsafe calls run in many isolated Python processes. Verified thread-safe CPU work runs in fewer free-threaded Python workers. Slither coordinates both, keeps backpressure bounded, routes session-scoped refs correctly, and cleans up Python state when the owning Elixir process exits.

Open questions for the 0.2.0 cycle:

  • Which workloads are genuinely better with free-threaded Python threads instead of more isolated Python processes?
  • Where does BEAM-owned ETS state outperform Python shared memory after serialization and callback costs are included?
  • What runtime checks are needed before Slither can safely select a thread-profile pool automatically?
  • How should batch sizing adapt when one logical worker can accept many concurrent requests?

Installation

defp deps do
  [
    {:slither, "~> 0.1.0"}
  ]
end

Then:

mix deps.get
mix compile

Quick Start

1) Configure a Python worker pool

# config/runtime.exs
import Config

SnakeBridge.ConfigHelper.configure_snakepit!(pool_size: 2)

2) Define a pipeline

defmodule MyApp.ScorePipe do
  use Slither.Pipe

  pipe :score do
    stage :prepare, :beam,
      handler: fn item, _ctx ->
        %{"text" => item.payload}
      end

    stage :predict, :python,
      executor: Slither.Dispatch.Executors.SnakeBridge,
      module: "my_model",
      function: "predict_batch",
      pool: :default,
      batch_size: 32,
      max_in_flight: 4

    stage :route, :router,
      routes: [
        {fn item -> item.payload["score"] >= 0.8 end, :accept},
        {fn item -> item.payload["score"] >= 0.4 end, :review}
      ]

    output :accept
    output :review
    output :default

    on_error :predict, :skip
    on_error :*, :halt
  end
end

3) Run it

{:ok, outputs} = Slither.run_pipe(MyApp.ScorePipe, ["first", "second", "third"])

accepted_payloads = Enum.map(outputs.accept, & &1.payload)
review_payloads = Enum.map(outputs.review, & &1.payload)

Run the built-in demos

mix slither.example
mix slither.example text_analysis
mix slither.example batch_stats
mix slither.example data_etl
mix slither.example ml_scoring
mix slither.example image_pipeline
mix slither.example --all
mix slither.example --no-baseline

ml_scoring and image_pipeline auto-install Python deps via Snakepit/uv.

Core APIs

Pipe (orchestration)

  • Slither.Pipe.Runner.run/3: run a pipe and return output buckets.
  • Slither.Pipe.Runner.stream/3: stream outputs lazily.
  • Slither.run_pipe/3: convenience wrapper.

Dispatch (batched Python calls)

items = Slither.Item.wrap_many([1, 2, 3, 4])

{:ok, results} =
  Slither.dispatch(items,
    executor: Slither.Dispatch.Executors.SnakeBridge,
    module: "my_model",
    function: "predict_batch",
    pool: :default,
    batch_size: 64,
    max_in_flight: 8,
    ordering: :preserve,
    on_error: :halt
  )

For large workloads, use Slither.Dispatch.stream/2.

Store (shared state on ETS)

defmodule MyApp.FeatureStore do
  @behaviour Slither.Store

  @impl true
  def tables do
    [
      %{name: :features, type: :set, read_concurrency: true}
    ]
  end

  @impl true
  def views do
    [
      %{
        name: :lookup_feature,
        mode: :scalar,
        scope: :session,
        handler: fn %{"key" => key}, _ctx ->
          case Slither.Store.Server.get(__MODULE__, :features, key) do
            nil -> %{"error" => "not_found"}
            value -> %{"value" => value}
          end
        end,
        timeout_ms: 5_000
      }
    ]
  end

  @impl true
  def load(tables) do
    :ets.insert(tables[:features], {"user:42", %{tier: "gold"}})
    :ok
  end
end

Start store processes by listing modules in config:

config :slither,
  stores: [MyApp.FeatureStore]

Read/write API:

Slither.Store.Server.get(MyApp.FeatureStore, :features, "user:42")
Slither.Store.Server.put(MyApp.FeatureStore, :features, "user:99", %{tier: "silver"})

Configuration knobs

config :slither,
  stores: [],
  dispatch: [
    default_batch_size: 64,
    default_max_in_flight: 8,
    default_ordering: :preserve,
    default_on_error: :halt
  ],
  bridge: [
    default_scope: :session
  ]

Telemetry events

Slither emits under [:slither, ...].

  • dispatch: [:slither, :dispatch, :batch, :start|:stop|:exception]
  • pipe run: [:slither, :pipe, :run, :start|:stop|:exception]
  • pipe stage: [:slither, :pipe, :stage, :start|:stop|:exception]
  • store: [:slither, :store, :write], [:slither, :store, :reload, :start|:stop]
  • bridge view callbacks: [:slither, :bridge, :view, :start|:stop]

Troubleshooting

  • Python module not found: run examples through mix slither.example ... so PYTHONPATH is set.
  • SnakeBridge/Snakepit calls failing: verify runtime pool config in config/runtime.exs.
  • Optional package import errors: run mix slither.example ml_scoring or mix slither.example image_pipeline to auto-install deps.

Guides

  • guides/getting-started.md
  • guides/store.md
  • guides/dispatch.md
  • guides/pipe.md
  • guides/examples.md
  • guides/operations.md

License

MIT

About

Lightweight Elixir runtime for composing and executing Python-backed data pipelines with automatic dependency resolution, lazy evaluation, fault-tolerant stage supervision, and seamless Snakebridge/Snakepit integration for production-grade cross-language AI and scientific computing workflows on the BEAM.

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors