Bounded, observable BEAM<->Python pipelines with shared state on ETS
Slither is a low-level concurrency substrate for BEAM + Python systems, built on top of SnakeBridge and Snakepit.
It gives you three primitives:
Store: ETS-backed shared state (lock-free reads, serialized writes), exposed to Python via views.Dispatch: batched fan-out to Python workers with bounded in-flight work (max_in_flight).Pipe: a DSL to compose BEAM stages, Python stages, and routing in one supervised flow.
Use Slither when you want Python to keep doing compute, while BEAM owns concurrency, state, and coordination.
Common wins:
- no unbounded queue growth (backpressure)
- no shared-thread state races in Python workers
- run-scoped session affinity for Python state
- consistent telemetry across runs, stages, and batches
Current released version: 0.1.0.
The next exploration track is to make Slither the control plane for mixed Python concurrency models. Python 3.13+ free-threading can let one Python process run CPU-bound Python threads in parallel, but it does not replace BEAM-owned supervision, pool routing, state cleanup, backpressure, or process isolation. Slither should make those tradeoffs explicit and measurable.
Planned work:
- Hybrid pool routing: first-class examples and helpers for routing I/O-bound work to process-isolated Snakepit pools and CPU-bound, thread-safe work to free-threaded Python pools.
- Free-threaded runtime detection: distinguish Python version support from an actually free-threaded build with the GIL disabled, then expose that in diagnostics and routing decisions.
- Capacity-aware dispatch: teach
Slither.Dispatchto account for process capacity (pool_size) and threaded capacity (pool_size * threads_per_worker) when sizing batches andmax_in_flight. - Stateful affinity patterns: document and test long-lived Python refs, model instances, caches, and open connections under strict affinity across both process and thread profiles.
- Failure-domain demos: compare crash containment for process pools against shared-process thread pools, including worker recycling and session cleanup behavior.
- Shared data benchmarks: measure when ETS-backed shared state plus Python views beats Python-thread shared mutable state, and when in-process free-threaded Python sharing is the better choice.
- Library compatibility matrix: classify workloads by thread safety, GIL behavior, native-library thread pools, and memory profile so Slither can recommend process, thread, or hybrid execution.
- Observability: add telemetry views that report queue depth, worker load, effective capacity, saturation, retries, and per-stage routing decisions.
Target use case:
A multi-tenant service that keeps Python compute close to the libraries that need it, while BEAM owns concurrency policy. Stateless or unsafe calls run in many isolated Python processes. Verified thread-safe CPU work runs in fewer free-threaded Python workers. Slither coordinates both, keeps backpressure bounded, routes session-scoped refs correctly, and cleans up Python state when the owning Elixir process exits.
Open questions for the 0.2.0 cycle:
- Which workloads are genuinely better with free-threaded Python threads instead of more isolated Python processes?
- Where does BEAM-owned ETS state outperform Python shared memory after serialization and callback costs are included?
- What runtime checks are needed before Slither can safely select a thread-profile pool automatically?
- How should batch sizing adapt when one logical worker can accept many concurrent requests?
defp deps do
[
{:slither, "~> 0.1.0"}
]
endThen:
mix deps.get
mix compile# config/runtime.exs
import Config
SnakeBridge.ConfigHelper.configure_snakepit!(pool_size: 2)defmodule MyApp.ScorePipe do
use Slither.Pipe
pipe :score do
stage :prepare, :beam,
handler: fn item, _ctx ->
%{"text" => item.payload}
end
stage :predict, :python,
executor: Slither.Dispatch.Executors.SnakeBridge,
module: "my_model",
function: "predict_batch",
pool: :default,
batch_size: 32,
max_in_flight: 4
stage :route, :router,
routes: [
{fn item -> item.payload["score"] >= 0.8 end, :accept},
{fn item -> item.payload["score"] >= 0.4 end, :review}
]
output :accept
output :review
output :default
on_error :predict, :skip
on_error :*, :halt
end
end{:ok, outputs} = Slither.run_pipe(MyApp.ScorePipe, ["first", "second", "third"])
accepted_payloads = Enum.map(outputs.accept, & &1.payload)
review_payloads = Enum.map(outputs.review, & &1.payload)mix slither.example
mix slither.example text_analysis
mix slither.example batch_stats
mix slither.example data_etl
mix slither.example ml_scoring
mix slither.example image_pipeline
mix slither.example --all
mix slither.example --no-baselineml_scoring and image_pipeline auto-install Python deps via Snakepit/uv.
Slither.Pipe.Runner.run/3: run a pipe and return output buckets.Slither.Pipe.Runner.stream/3: stream outputs lazily.Slither.run_pipe/3: convenience wrapper.
items = Slither.Item.wrap_many([1, 2, 3, 4])
{:ok, results} =
Slither.dispatch(items,
executor: Slither.Dispatch.Executors.SnakeBridge,
module: "my_model",
function: "predict_batch",
pool: :default,
batch_size: 64,
max_in_flight: 8,
ordering: :preserve,
on_error: :halt
)For large workloads, use Slither.Dispatch.stream/2.
defmodule MyApp.FeatureStore do
@behaviour Slither.Store
@impl true
def tables do
[
%{name: :features, type: :set, read_concurrency: true}
]
end
@impl true
def views do
[
%{
name: :lookup_feature,
mode: :scalar,
scope: :session,
handler: fn %{"key" => key}, _ctx ->
case Slither.Store.Server.get(__MODULE__, :features, key) do
nil -> %{"error" => "not_found"}
value -> %{"value" => value}
end
end,
timeout_ms: 5_000
}
]
end
@impl true
def load(tables) do
:ets.insert(tables[:features], {"user:42", %{tier: "gold"}})
:ok
end
endStart store processes by listing modules in config:
config :slither,
stores: [MyApp.FeatureStore]Read/write API:
Slither.Store.Server.get(MyApp.FeatureStore, :features, "user:42")
Slither.Store.Server.put(MyApp.FeatureStore, :features, "user:99", %{tier: "silver"})config :slither,
stores: [],
dispatch: [
default_batch_size: 64,
default_max_in_flight: 8,
default_ordering: :preserve,
default_on_error: :halt
],
bridge: [
default_scope: :session
]Slither emits under [:slither, ...].
- dispatch:
[:slither, :dispatch, :batch, :start|:stop|:exception] - pipe run:
[:slither, :pipe, :run, :start|:stop|:exception] - pipe stage:
[:slither, :pipe, :stage, :start|:stop|:exception] - store:
[:slither, :store, :write],[:slither, :store, :reload, :start|:stop] - bridge view callbacks:
[:slither, :bridge, :view, :start|:stop]
- Python module not found:
run examples through
mix slither.example ...soPYTHONPATHis set. - SnakeBridge/Snakepit calls failing:
verify runtime pool config in
config/runtime.exs. - Optional package import errors:
run
mix slither.example ml_scoringormix slither.example image_pipelineto auto-install deps.
guides/getting-started.mdguides/store.mdguides/dispatch.mdguides/pipe.mdguides/examples.mdguides/operations.md
MIT