This directory contains examples demonstrating how to use the Zerobus TypeScript SDK to ingest data into Databricks Delta tables.
- Overview
- JSON Examples
- Protocol Buffers Examples
- Arrow Flight Examples (Experimental)
- Prerequisites
- Common Code Patterns
- API Styles
- Single-Record vs Batch Ingestion
- Choosing a Format
The SDK supports three serialization formats and two ingestion methods:
Serialization Formats:
- JSON - Simpler, no schema generation required. Great for getting started.
- Protocol Buffers - Type-safe with compile-time validation. Better for production.
- Arrow Flight - High-performance columnar format for analytics. (Experimental/Unsupported)
Ingestion Methods:
- Single-record (
ingestRecordOffset) - Ingest records one at a time - Batch (
ingestRecordsOffset) - Ingest multiple records at once with all-or-nothing semantics
Available Examples:
| Example | Format | Method | Script |
|---|---|---|---|
| JSON Single | JSON | Single-record | npm run example:json:single |
| JSON Batch | JSON | Batch | npm run example:json:batch |
| Proto Single | Protocol Buffers | Single-record | npm run example:proto:single |
| Proto Batch | Protocol Buffers | Batch | npm run example:proto:batch |
| Arrow Single | Arrow Flight | Single-batch | npm run example:arrow:single |
| Arrow Batch | Arrow Flight | Multi-row batch | npm run example:arrow:batch |
Create a table in your Databricks workspace:
CREATE TABLE catalog.schema.air_quality (
device_name STRING,
temp INT,
humidity BIGINT
);- In your Databricks workspace, go to Settings > Identity and Access
- Create a service principal or use an existing one
- Generate OAuth credentials (client ID and secret)
- Grant the service principal these permissions on your table:
SELECT- Read table schemaMODIFY- Write data to the tableUSE CATALOGandUSE SCHEMA- Access catalog and schema
Set the following environment variables:
export ZEROBUS_SERVER_ENDPOINT="workspace-id.zerobus.region.cloud.databricks.com"
export DATABRICKS_WORKSPACE_URL="https://your-workspace.cloud.databricks.com"
export ZEROBUS_TABLE_NAME="catalog.schema.air_quality"
export DATABRICKS_CLIENT_ID="your-client-id"
export DATABRICKS_CLIENT_SECRET="your-client-secret"All examples follow the same general flow:
const sdk = new ZerobusSdk(SERVER_ENDPOINT, DATABRICKS_WORKSPACE_URL);JSON:
const tableProperties: TableProperties = {
tableName: TABLE_NAME
// No descriptor needed for JSON
};Protocol Buffers:
const descriptorBase64 = loadDescriptorProto({ ... });
const tableProperties: TableProperties = {
tableName: TABLE_NAME,
descriptorProto: descriptorBase64
};const options: StreamConfigurationOptions = {
recordType: RecordType.Json, // or RecordType.Proto
maxInflightRequests: 100,
recovery: true
};const stream = await sdk.createStream(
tableProperties,
CLIENT_ID,
CLIENT_SECRET,
options
);const offset = await stream.ingestRecordOffset(data);
await stream.waitForOffset(offset);await stream.close();The SDK provides two API styles for ingestion:
| Style | Method | Returns | Promise resolves |
|---|---|---|---|
| Offset-based (Recommended) | ingestRecordOffset() |
Promise<bigint> |
Immediately after queuing (before server ack) |
| Future-based (Deprecated) | ingestRecord() |
Promise<bigint> |
After server acknowledgment |
Both methods return Promise<bigint>, but the key difference is when the promise resolves:
Offset-based (Recommended):
// Promise resolves immediately with offset (doesn't wait for server ack)
const offset = await stream.ingestRecordOffset(data);
// Do other work, then wait for acknowledgment when needed
await stream.waitForOffset(offset);Future-based (Deprecated):
// Promise blocks until server acknowledges - slower for high-throughput
const offset = await stream.ingestRecord(data);| Aspect | Single-Record | Batch |
|---|---|---|
| Method | ingestRecordOffset() |
ingestRecordsOffset() |
| Use case | Records arrive one at a time | Multiple records ready at once |
| Semantics | Each record independent | All-or-nothing (atomic) |
| Acknowledgment | Per record | Per batch |
| Throughput | Lower | Higher |
Single-record:
for (const record of records) {
const offset = await stream.ingestRecordOffset(record);
}
await stream.flush();Batch:
const offset = await stream.ingestRecordsOffset(records);
if (offset !== null) {
await stream.waitForOffset(offset);
}| Feature | JSON | Protocol Buffers | Arrow Flight |
|---|---|---|---|
| Setup | Simple - no schema files | Schema files required | Schema in code |
| Type Safety | Runtime validation | Compile-time validation | Runtime validation |
| Performance | Text-based | Efficient binary encoding | High-performance columnar |
| Flexibility | Easy to modify | Schema changes require regeneration | Dynamic schema |
| Best For | Prototyping, simple use cases | Production, high-throughput | Analytics, data science |
| Status | Stable | Stable | Experimental/Unsupported |
Recommendation: Start with JSON for quick prototyping, then migrate to Protocol Buffers for production row-oriented workloads. Use Arrow Flight (when stable) for analytics and columnar workloads.