This document provides a high-level overview of how async-cassandra bridges the gap between the synchronous DataStax Cassandra driver and Python's async/await ecosystem.
The DataStax Cassandra Python driver uses a thread pool for I/O operations, which can create bottlenecks in async applications:
sequenceDiagram
participant App as Async Application
participant Driver as Cassandra Driver
participant ThreadPool as Thread Pool
participant Cassandra as Cassandra DB
App->>Driver: execute(query)
Driver->>ThreadPool: Submit to thread
Note over ThreadPool: Thread blocked
ThreadPool->>Cassandra: Send query
Cassandra-->>ThreadPool: Response
ThreadPool-->>Driver: Result
Driver-->>App: Return result
Note over App,ThreadPool: Thread pool can become bottleneck<br/>under high concurrency
async-cassandra bridges the gap between the DataStax driver's thread-based callbacks and Python's asyncio event loop. Here's how it actually works:
The DataStax driver uses a ThreadPoolExecutor for I/O operations and callbacks. We don't replace this - instead, we bridge between the driver's threading model and asyncio:
sequenceDiagram
participant App as Async Application
participant AsyncWrapper as async-cassandra
participant Driver as Cassandra Driver
participant ThreadPool as Driver Thread Pool
participant EventLoop as Asyncio Event Loop
participant Cassandra as Cassandra DB
App->>AsyncWrapper: await execute(query)
AsyncWrapper->>Driver: execute_async(query)
Driver->>ThreadPool: Submit I/O task
ThreadPool->>Cassandra: Send query (in thread)
AsyncWrapper->>EventLoop: Create asyncio.Future
AsyncWrapper-->>App: Return control (await)
Note over App: Free to handle other requests
Cassandra-->>ThreadPool: Response data
ThreadPool->>AsyncWrapper: Callback fired (in driver thread)
AsyncWrapper->>EventLoop: call_soon_threadsafe(set_result)
EventLoop-->>App: Resume coroutine with result
- We DON'T create our own thread pool - we use the driver's existing ThreadPoolExecutor
- Thread-safe communication - Callbacks from driver threads use
call_soon_threadsafe()to safely communicate with the asyncio event loop - Futures bridging - We create asyncio Futures that are resolved by callbacks from driver threads
When you call await session.execute(query), here's what happens:
# In AsyncCassandraSession.execute() - src/async_cassandra/session.py:156-168
response_future = self._session.execute_async(
query, parameters, trace, custom_payload, timeout,
execution_profile, paging_state, host, execute_as
)
handler = AsyncResultHandler(response_future)
result = await handler.get_result(timeout=query_timeout)The DataStax driver's execute_async() returns a ResponseFuture that will be completed by a driver thread.
The magic happens in AsyncResultHandler (src/async_cassandra/result.py):
class AsyncResultHandler:
def __init__(self, response_future: ResponseFuture):
self.response_future = response_future
self.rows: List[Any] = []
self._future: Optional[asyncio.Future[AsyncResultSet]] = None
self._lock = threading.Lock() # Thread safety!
# Register callbacks with the driver
self.response_future.add_callbacks(
callback=self._handle_page,
errback=self._handle_error
)When the driver completes the query (in a driver thread), our callback is invoked:
def _handle_page(self, rows: List[Any]) -> None:
"""Called from driver thread - must be thread-safe!"""
with self._lock:
if rows is not None:
self.rows.extend(list(rows)) # Defensive copy
if self.response_future.has_more_pages:
self.response_future.start_fetching_next_page()
else:
# All done - notify the asyncio Future
final_result = AsyncResultSet(list(self.rows), self.response_future)
if self._future and not self._future.done():
loop = getattr(self, "_loop", None)
if loop:
# CRITICAL: Use call_soon_threadsafe to bridge threads!
loop.call_soon_threadsafe(self._future.set_result, final_result)Meanwhile, the asyncio coroutine is waiting:
async def get_result(self, timeout: Optional[float] = None) -> "AsyncResultSet":
# Create asyncio Future in the current event loop
loop = asyncio.get_running_loop()
self._future = loop.create_future()
self._loop = loop # Store for callbacks to use
# Wait for the driver thread to complete
if timeout is not None:
return await asyncio.wait_for(self._future, timeout=timeout)
else:
return await self._futureThe driver's thread pool size is configurable (src/async_cassandra/cluster.py):
def __init__(self, ..., executor_threads: int = 2, ...):
# This is passed to the DataStax Cluster constructor
# The driver creates: ThreadPoolExecutor(max_workers=executor_threads)Important: This thread pool is shared for ALL I/O operations, so under high concurrency, you may need to increase executor_threads.
- Wraps the DataStax
Clusterclass - Manages cluster lifecycle (connect, shutdown)
- Provides async context manager support
- Handles authentication and configuration
- Wraps the DataStax
Sessionclass - Converts synchronous operations to async/await
- Provides streaming support for large result sets
- Integrates with metrics collection
- Wraps query results for async consumption
- Handles paging transparently
- Provides familiar result access methods (one(), all())
- Enables memory-efficient processing of large results
- Supports async iteration over rows
- Provides page-level access for batch processing
- Includes progress tracking capabilities
The following diagram shows how a standard query flows through the async wrapper:
sequenceDiagram
participant User as User Code
participant Session as AsyncCassandraSession
participant Handler as AsyncResultHandler
participant EventLoop as Event Loop
participant Driver as Cassandra Driver
participant ThreadPool as Driver Thread Pool
participant DB as Cassandra
User->>Session: await execute(query)
Session->>Driver: execute_async(query)
Driver->>ThreadPool: Submit I/O task
ThreadPool->>DB: Send CQL query
Driver-->>Session: ResponseFuture
Session->>Handler: new AsyncResultHandler(ResponseFuture)
Handler->>Driver: add_callbacks(callback, errback)
Handler->>EventLoop: create_future()
Session-->>User: await handler.get_result()
Note over User: Coroutine suspended, event loop free
DB-->>ThreadPool: Query result
ThreadPool->>Handler: _handle_page(rows) [in driver thread]
Handler->>Handler: Lock, process rows
Handler->>EventLoop: call_soon_threadsafe(future.set_result)
EventLoop-->>User: Resume coroutine with AsyncResultSet
For large result sets, streaming provides memory-efficient processing:
sequenceDiagram
participant App as Application
participant Session as AsyncCassandraSession
participant Stream as AsyncStreamingResultSet
participant EventLoop as Event Loop
participant Driver as Cassandra Driver
participant ThreadPool as Driver Thread Pool
participant DB as Cassandra
App->>Session: await execute_stream(query, config)
Session->>Driver: execute_async(query)
Driver->>ThreadPool: Submit I/O task
ThreadPool->>DB: Send CQL query
Driver-->>Session: ResponseFuture
Session->>Stream: Create AsyncStreamingResultSet(future, config)
Stream->>Driver: add_callbacks(callback, errback)
Session-->>App: Return Stream
loop For each row iteration
App->>Stream: async for row in stream
alt Current page has rows
Stream-->>App: Return next row
else Page exhausted, need next page
Stream->>EventLoop: Create page_ready Event
Note over App: Coroutine suspended
DB-->>ThreadPool: Next page data
ThreadPool->>Stream: _handle_page(rows) [in driver thread]
Stream->>Stream: Lock, replace current page
Stream->>EventLoop: call_soon_threadsafe(page_ready.set)
EventLoop->>Stream: Resume iteration
Stream-->>App: Return first row of new page
end
end
- We wrap, not reimplement, the DataStax driver
- All driver features remain accessible
- Minimal performance overhead
- All blocking operations converted to async
- Proper integration with Python's event loop
- No blocking of the event loop
- Streaming support for large result sets
- Configurable fetch sizes
- Page-based processing options
- Familiar async/await syntax
- Context manager support
- Type hints throughout
- Comprehensive error handling
- Metrics and monitoring built-in
- Battle-tested retry policies
async-cassandra bridges two distinct execution environments:
-
Asyncio World (Your Application)
- Single-threaded event loop
- Coroutines and async/await
- Non-blocking I/O via event loop
- Can handle thousands of concurrent operations
-
Driver Thread Pool World (DataStax Driver)
- ThreadPoolExecutor with
executor_threadsworkers (default: 2) - Blocking I/O operations
- Callback-based completion
- Limited by thread count
- ThreadPoolExecutor with
We don't "map" thread pools - we bridge between them:
Your Async Code → asyncio.Future → Callback Bridge → Driver ResponseFuture → Thread Pool
↑ ↓
└──────── call_soon_threadsafe() ←──────────────────────┘
Key bridging components:
- asyncio.Future: Created in the event loop, awaited by your code
- ResponseFuture: Created by the driver, completed by driver threads
- Callbacks: Registered with ResponseFuture, called from driver threads
- call_soon_threadsafe(): The critical method that safely notifies the event loop from driver threads
Since we use the driver's thread pool:
- Maximum concurrent I/O operations =
executor_threads(default: 2) - Under high load, increase
executor_threadswhen creating the cluster - Each blocking operation ties up a thread until completion
- The event loop stays free, but total throughput is limited by thread count
While async-cassandra provides async/await syntax, it's important to understand:
- The underlying I/O is still synchronous - The DataStax driver uses blocking sockets in threads
- Thread pool constraints apply - Concurrency is limited by the driver's thread pool size (default: 2 threads)
- Not a true async driver - This is a compatibility layer, not a ground-up async implementation
- No thread pool multiplication - We use the driver's thread pool as-is, we don't add additional threads
For more details on these limitations and when to use this wrapper, see Why an Async Wrapper is Necessary.