Skip to content

Latest commit

 

History

History
549 lines (444 loc) · 14.3 KB

File metadata and controls

549 lines (444 loc) · 14.3 KB

Neumann Server API Reference

See Also:

The Neumann Server (neumann_server) provides a gRPC server that exposes the Neumann database over the network. It wraps the Query Router with authentication, TLS encryption, and streaming support for large result sets and blob storage.

The server follows four design principles: zero-configuration startup (works out of the box with sensible defaults), security-first (API key authentication with constant-time comparison, TLS support), streaming-native (all large operations use gRPC streaming), and health monitoring (automatic failure tracking with configurable thresholds).

Key Types

Type Description
NeumannServer Main server struct with router, blob store, and configuration
ServerConfig Configuration for bind address, TLS, auth, and limits
TlsConfig TLS certificate paths and client certificate settings
AuthConfig API key list, header name, and anonymous access control
ApiKey Individual API key with identity and optional description
QueryServiceImpl gRPC service for query execution with streaming
BlobServiceImpl gRPC service for artifact storage with streaming
HealthServiceImpl gRPC service for health checks
HealthState Shared health state across services
ServerError Error type for server operations

Architecture Overview

flowchart TD
    subgraph Clients
        CLI[neumann_client]
        gRPC[gRPC Clients]
        Web[gRPC-Web Browsers]
    end

    subgraph NeumannServer
        QS[QueryService]
        BS[BlobService]
        HS[HealthService]
        RS[ReflectionService]
        Auth[Auth Middleware]
        TLS[TLS Layer]
    end

    subgraph Backend
        QR[QueryRouter]
        Blob[BlobStore]
    end

    CLI --> TLS
    gRPC --> TLS
    Web --> TLS
    TLS --> Auth
    Auth --> QS
    Auth --> BS
    Auth --> HS
    QS --> QR
    BS --> Blob
    RS --> |Service Discovery| gRPC
Loading

Server Configuration

Field Type Default Description
bind_addr SocketAddr 127.0.0.1:9200 Server bind address
tls Option<TlsConfig> None TLS configuration
auth Option<AuthConfig> None Authentication configuration
max_message_size usize 64 MB Maximum gRPC message size
max_upload_size usize 512 MB Maximum blob upload size
enable_grpc_web bool true Enable gRPC-web for browsers
enable_reflection bool true Enable service reflection
blob_chunk_size usize 64 KB Chunk size for blob streaming
stream_channel_capacity usize 32 Bounded channel capacity for backpressure

Configuration Builder

use neumann_server::{ServerConfig, TlsConfig, AuthConfig, ApiKey};
use std::path::PathBuf;

let config = ServerConfig::new()
    .with_bind_addr("0.0.0.0:9443".parse()?)
    .with_tls(TlsConfig::new(
        PathBuf::from("server.crt"),
        PathBuf::from("server.key"),
    ))
    .with_auth(
        AuthConfig::new()
            .with_api_key(ApiKey::new(
                "sk-prod-key-12345678".to_string(),
                "service:backend".to_string(),
            ))
            .with_anonymous(false)
    )
    .with_max_message_size(128 * 1024 * 1024)
    .with_grpc_web(true)
    .with_reflection(true);

TLS Configuration

Field Type Default Description
cert_path PathBuf Required Path to certificate file (PEM)
key_path PathBuf Required Path to private key file (PEM)
ca_cert_path Option<PathBuf> None CA certificate for client auth
require_client_cert bool false Require client certificates

TLS Setup

use neumann_server::TlsConfig;
use std::path::PathBuf;

// Basic TLS
let tls = TlsConfig::new(
    PathBuf::from("/etc/neumann/server.crt"),
    PathBuf::from("/etc/neumann/server.key"),
);

// Mutual TLS (mTLS)
let tls = TlsConfig::new(
    PathBuf::from("/etc/neumann/server.crt"),
    PathBuf::from("/etc/neumann/server.key"),
)
.with_ca_cert(PathBuf::from("/etc/neumann/ca.crt"))
.with_required_client_cert(true);

Authentication

AuthConfig Options

Field Type Default Description
api_keys Vec<ApiKey> Empty List of valid API keys
api_key_header String x-api-key Header name for API key
allow_anonymous bool false Allow unauthenticated access

Authentication Flow

flowchart TD
    A[Request arrives] --> B{Auth configured?}
    B -->|No| C[Allow with no identity]
    B -->|Yes| D{API key header present?}
    D -->|No| E{Anonymous allowed?}
    E -->|Yes| C
    E -->|No| F[Return UNAUTHENTICATED]
    D -->|Yes| G{Key valid?}
    G -->|Yes| H[Allow with identity from key]
    G -->|No| F
Loading

API key validation uses constant-time comparison to prevent timing attacks. All keys are checked regardless of match status to avoid leaking information about valid prefixes.

gRPC Services

QueryService

Method Type Description
Execute Unary Execute single query, return full result
ExecuteStream Server streaming Execute query, stream results chunk by chunk
ExecuteBatch Unary Execute multiple queries, return all results

Execute RPC

rpc Execute(QueryRequest) returns (QueryResponse);

message QueryRequest {
    string query = 1;
    optional string identity = 2;
}

message QueryResponse {
    oneof result {
        EmptyResult empty = 1;
        CountResult count = 2;
        RowsResult rows = 3;
        NodesResult nodes = 4;
        EdgesResult edges = 5;
        PathResult path = 6;
        SimilarResult similar = 7;
        TableListResult table_list = 8;
        BlobResult blob = 9;
        IdsResult ids = 10;
    }
    optional ErrorInfo error = 15;
}

ExecuteStream RPC

rpc ExecuteStream(QueryRequest) returns (stream QueryResponseChunk);

message QueryResponseChunk {
    oneof chunk {
        RowChunk row = 1;
        NodeChunk node = 2;
        EdgeChunk edge = 3;
        SimilarChunk similar_item = 4;
        bytes blob_data = 5;
        ErrorInfo error = 15;
    }
    bool is_final = 16;
}

ExecuteBatch RPC

rpc ExecuteBatch(BatchQueryRequest) returns (BatchQueryResponse);

message BatchQueryRequest {
    repeated QueryRequest queries = 1;
}

message BatchQueryResponse {
    repeated QueryResponse results = 1;
}

Security note: In batch execution, the authenticated request identity is always used. Per-query identity fields are ignored to prevent privilege escalation attacks.

BlobService

Method Type Description
Upload Client streaming Upload artifact with metadata
Download Server streaming Download artifact in chunks
Delete Unary Delete artifact
GetMetadata Unary Get artifact metadata

Upload Protocol

sequenceDiagram
    participant C as Client
    participant S as BlobService

    C->>S: UploadMetadata (filename, content_type, tags)
    C->>S: Chunk 1
    C->>S: Chunk 2
    C->>S: ...
    C->>S: Chunk N (end stream)
    S->>C: UploadResponse (artifact_id, size, checksum)
Loading

The first message must be metadata, followed by data chunks:

rpc Upload(stream BlobUploadRequest) returns (BlobUploadResponse);

message BlobUploadRequest {
    oneof request {
        UploadMetadata metadata = 1;
        bytes chunk = 2;
    }
}

message UploadMetadata {
    string filename = 1;
    optional string content_type = 2;
    repeated string tags = 3;
}

message BlobUploadResponse {
    string artifact_id = 1;
    uint64 size = 2;
    string checksum = 3;
}

Download Protocol

rpc Download(BlobDownloadRequest) returns (stream BlobDownloadChunk);

message BlobDownloadRequest {
    string artifact_id = 1;
}

message BlobDownloadChunk {
    bytes data = 1;
    bool is_final = 2;
}

HealthService

The HealthService follows the gRPC health checking protocol:

rpc Check(HealthCheckRequest) returns (HealthCheckResponse);

message HealthCheckRequest {
    optional string service = 1;
}

message HealthCheckResponse {
    ServingStatus status = 1;
}

enum ServingStatus {
    UNSPECIFIED = 0;
    SERVING = 1;
    NOT_SERVING = 2;
}

Health Check Targets

Service Name Checks
Empty or "" Overall server health (all services)
neumann.v1.QueryService Query service health
neumann.v1.BlobService Blob service health
Unknown service Returns UNSPECIFIED

Automatic Health Tracking

The QueryService tracks consecutive failures and marks itself unhealthy after reaching the threshold (default: 5 failures):

const FAILURE_THRESHOLD: u32 = 5;

fn record_failure(&self) {
    let failures = self.consecutive_failures.fetch_add(1, Ordering::SeqCst) + 1;
    if failures >= FAILURE_THRESHOLD {
        if let Some(ref health) = self.health_state {
            health.set_query_service_healthy(false);
        }
    }
}

fn record_success(&self) {
    self.consecutive_failures.store(0, Ordering::SeqCst);
    if let Some(ref health) = self.health_state {
        health.set_query_service_healthy(true);
    }
}

Server Lifecycle

Startup Sequence

flowchart TD
    A[Create NeumannServer] --> B[Validate configuration]
    B --> C{TLS configured?}
    C -->|Yes| D[Load certificates]
    C -->|No| E[Plain TCP]
    D --> F[Build TLS config]
    F --> G[Create services]
    E --> G
    G --> H{gRPC-web enabled?}
    H -->|Yes| I[Add gRPC-web layer]
    H -->|No| J[Standard gRPC]
    I --> K{Reflection enabled?}
    J --> K
    K -->|Yes| L[Add reflection service]
    K -->|No| M[Start serving]
    L --> M
    M --> N[Accept connections]
Loading

Basic Server Setup

use neumann_server::{NeumannServer, ServerConfig};
use query_router::QueryRouter;
use std::sync::Arc;
use parking_lot::RwLock;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let router = Arc::new(RwLock::new(QueryRouter::new()));
    let server = NeumannServer::new(router, ServerConfig::default());
    server.serve().await?;
    Ok(())
}

Server with Shared Storage

use neumann_server::{NeumannServer, ServerConfig};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = ServerConfig::default();
    // Creates QueryRouter and BlobStore sharing the same TensorStore
    let server = NeumannServer::with_shared_storage(config).await?;
    server.serve().await?;
    Ok(())
}

Graceful Shutdown

use tokio::signal;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let server = NeumannServer::with_shared_storage(ServerConfig::default()).await?;
    server.serve_with_shutdown(signal::ctrl_c().map(|_| ())).await?;
    Ok(())
}

Error Types

Error Cause gRPC Status
Config Invalid configuration INVALID_ARGUMENT
Transport Network/TLS failure UNAVAILABLE
Query Query execution failed INVALID_ARGUMENT
Auth Authentication failed UNAUTHENTICATED
Blob Blob operation failed INTERNAL
Internal Unexpected server error INTERNAL
InvalidArgument Bad request data INVALID_ARGUMENT
NotFound Resource not found NOT_FOUND
PermissionDenied Access denied PERMISSION_DENIED
Io I/O error INTERNAL

Backpressure and Flow Control

Streaming Backpressure

The server uses bounded channels for streaming responses to prevent memory exhaustion:

// Default: 32 items buffered
let (tx, rx) = mpsc::channel(self.stream_channel_capacity);

tokio::spawn(async move {
    for item in results {
        // This will block if channel is full, providing backpressure
        if tx.send(Ok(item)).await.is_err() {
            // Receiver dropped, stop sending
            return;
        }
    }
});

Upload Size Limits

The BlobService enforces upload size limits:

if data.len().saturating_add(chunk.len()) > max_size {
    return Err(Status::resource_exhausted(format!(
        "upload exceeds maximum size of {max_size} bytes"
    )));
}

Production Deployment

Recommended Configuration

let config = ServerConfig::new()
    .with_bind_addr("0.0.0.0:9443".parse()?)
    .with_tls(TlsConfig::new(
        PathBuf::from("/etc/neumann/tls/server.crt"),
        PathBuf::from("/etc/neumann/tls/server.key"),
    ))
    .with_auth(
        AuthConfig::new()
            .with_api_key(ApiKey::new(
                std::env::var("NEUMANN_API_KEY")?,
                "service:default".to_string(),
            ))
            .with_anonymous(false)
    )
    .with_max_message_size(64 * 1024 * 1024)
    .with_max_upload_size(1024 * 1024 * 1024)  // 1GB
    .with_stream_channel_capacity(64)
    .with_grpc_web(true)
    .with_reflection(false);  // Disable in production

Health Check Integration

Use health checks with load balancers:

# grpcurl health check
grpcurl -plaintext localhost:9200 neumann.v1.Health/Check

# With service name
grpcurl -plaintext -d '{"service":"neumann.v1.QueryService"}' \
    localhost:9200 neumann.v1.Health/Check

Logging

The server uses the tracing crate for structured logging:

use tracing_subscriber::FmtSubscriber;

let subscriber = FmtSubscriber::builder()
    .with_max_level(tracing::Level::INFO)
    .finish();
tracing::subscriber::set_global_default(subscriber)?;

// Server logs connection info and errors
// INFO: Starting Neumann gRPC server with TLS on 0.0.0.0:9443
// ERROR: Query execution error: table 'users' not found

Dependencies

Crate Purpose
query_router Query execution backend
tensor_blob Blob storage backend
tensor_store Shared storage for both query and blob
tonic gRPC server framework
tonic-web gRPC-web layer for browser support
tonic-reflection Service reflection for debugging
tokio Async runtime
parking_lot Thread-safe router access
tracing Structured logging
thiserror Error type derivation