Skip to content

Latest commit

 

History

History
334 lines (257 loc) · 8.97 KB

File metadata and controls

334 lines (257 loc) · 8.97 KB

Ballista Examples

This directory contains examples for executing distributed queries with Ballista.

Standalone Examples

The standalone example is the easiest to get started with. Ballista supports a standalone mode where a scheduler and executor are started in-process.

cargo run --example standalone_sql --features="ballista/standalone"

Source code for standalone SQL example

use ballista::{
    extension::SessionConfigExt,
    prelude::*
};
use datafusion::{
    execution::{options::ParquetReadOptions, SessionStateBuilder},
    prelude::{SessionConfig, SessionContext},
};

#[tokio::main]
async fn main() -> Result<()> {
    let config = SessionConfig::new_with_ballista()
        .with_target_partitions(1)
        .with_ballista_standalone_parallelism(2);

    let state = SessionStateBuilder::new()
        .with_config(config)
        .with_default_features()
        .build();

    let ctx = SessionContext::standalone_with_state(state).await?;

    let test_data = test_util::examples_test_data();

    // register parquet file with the execution context
    ctx.register_parquet(
        "test",
        &format!("{test_data}/alltypes_plain.parquet"),
        ParquetReadOptions::default(),
    )
    .await?;

    let df = ctx.sql("select count(1) from test").await?;

    df.show().await?;
    Ok(())
}
cargo run --example standalone-substrait --features="ballista/standalone","substrait"

Source code for standalone Substrait example

use std::sync::Arc;

use ballista::datafusion::common::Result;
use ballista::extension::{Extension, SubstraitExec};
use ballista_core::extension::SessionConfigExt;
use ballista_examples::test_util;
use datafusion::catalog::MemoryCatalogProviderList;
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_substrait::serializer::serialize_bytes;
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<()> {
    let catalog_list = Arc::new(MemoryCatalogProviderList::new());

    let config = SessionConfig::new_with_ballista();
    let state = SessionStateBuilder::new()
        .with_config(config)
        .with_default_features()
        .with_catalog_list(catalog_list.clone())
        .build();
    let ctx = SessionContext::new_with_state(state.clone());

    // Use any frontend to serialize a Substrait plan
    let test_data = test_util::examples_test_data();
    let ddl_plan_bytes = serialize_bytes(&format!(
        "CREATE EXTERNAL TABLE IF NOT EXISTS another_data \
         STORED AS PARQUET \
         LOCATION '{}/alltypes_plain.parquet'",
        test_data
    ), &ctx).await?;
    let select_plan_bytes = serialize_bytes(
        "SELECT id, string_col FROM another_data",
        &ctx)
        .await?;

    let session_id = ctx.session_id();
    let scheduler_url = Extension::setup_standalone(Some(&state)).await?;
    let client = SubstraitSchedulerClient::new(scheduler_url, session_id.to_string()).await?;

    client.execute_query(ddl_plan_bytes).await?;
    let mut stream = client.execute_query(select_plan_bytes).await?;

    let mut batch_count = 0;
    let mut total_rows = 0;
    while let Some(batch_result) = stream.next().await {
        let batch = batch_result?;
        batch_count += 1;
        total_rows += batch.num_rows();

        println!("Batch {}: {} rows", batch_count, batch.num_rows());
        println!("{:?}", batch);
    }
    println!("---------");
    println!("Query executed successfully!");
    println!("Total batches: {}, Total rows: {}", batch_count, total_rows);

    Ok(())
}

Distributed Examples

For background information on the Ballista architecture, refer to the Ballista README.

Start a standalone cluster

From the root of the project, build release binaries.

cargo build --release

Start a Ballista scheduler process in a new terminal session.

RUST_LOG=info ./target/release/ballista-scheduler

Start one or more Ballista executor processes in new terminal sessions. When starting more than one executor, a unique port number must be specified for each executor.

RUST_LOG=info ./target/release/ballista-executor -c 2 -p 50051
RUST_LOG=info ./target/release/ballista-executor -c 2 -p 50052

Running the examples

The examples can be run using the cargo run --bin syntax.

Distributed SQL Example

cargo run --release --example remote-sql

Source code for distributed SQL example

use ballista::{extension::SessionConfigExt, prelude::*};
use datafusion::{
    execution::SessionStateBuilder,
    prelude::{CsvReadOptions, SessionConfig, SessionContext},
};

/// This example demonstrates executing a simple query against an Arrow data source (CSV) and
/// fetching results, using SQL
#[tokio::main]
async fn main() -> Result<()> {
    let config = SessionConfig::new_with_ballista()
        .with_target_partitions(4)
        .with_ballista_job_name("Remote SQL Example");

    let state = SessionStateBuilder::new()
        .with_config(config)
        .with_default_features()
        .build();

    let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?;

    let test_data = test_util::examples_test_data();

    ctx.register_csv(
        "test",
        &format!("{test_data}/aggregate_test_100.csv"),
        CsvReadOptions::new(),
    )
    .await?;

    let df = ctx
        .sql(
            "SELECT c1, MIN(c12), MAX(c12) \
        FROM test \
        WHERE c11 > 0.1 AND c11 < 0.9 \
        GROUP BY c1",
        )
        .await?;

    df.show().await?;

    Ok(())
}

Distributed DataFrame Example

cargo run --release --example remote-dataframe

Source code for distributed DataFrame example

use ballista::{extension::SessionConfigExt, prelude::*};
use datafusion::{
    execution::SessionStateBuilder,
    prelude::{col, lit, ParquetReadOptions, SessionConfig, SessionContext},
};

/// This example demonstrates executing a simple query against an Arrow data source (Parquet) and
/// fetching results, using the DataFrame trait
#[tokio::main]
async fn main() -> Result<()> {
    let config = SessionConfig::new_with_ballista().with_target_partitions(4);

    let state = SessionStateBuilder::new()
        .with_config(config)
        .with_default_features()
        .build();

    let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?;

    let test_data = test_util::examples_test_data();
    let filename = format!("{test_data}/alltypes_plain.parquet");

    let df = ctx
        .read_parquet(filename, ParquetReadOptions::default())
        .await?
        .select_columns(&["id", "bool_col", "timestamp_col"])?
        .filter(col("id").gt(lit(1)))?;

    df.show().await?;

    Ok(())
}

Distributed datafusion-spark example

The release binaries must be built with the spark-compat feature enabled for this example.

cargo build --release --features spark-compat
cargo run --release --example remote-spark-functions --features="spark-compat"

Source code for distributed spark functions example

use ballista::datafusion::{
    common::Result,
    execution::SessionStateBuilder,
    prelude::{CsvReadOptions, SessionConfig, SessionContext},
};
use ballista::prelude::*;
use ballista_examples::test_util;

#[tokio::main]
async fn main() -> Result<()> {
    let config = SessionConfig::new_with_ballista()
        .with_target_partitions(4)
        .with_ballista_job_name("Remote Datafusion Spark Example");

    let state = SessionStateBuilder::new()
        .with_config(config)
        .with_default_features()
        .build();

    let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?;

    let test_data = test_util::examples_test_data();

    ctx.register_csv(
        "test",
        &format!("{test_data}/aggregate_test_100.csv"),
        CsvReadOptions::new(),
    )
    .await?;

    let df = ctx
        .sql(
            "SELECT 
            sha1(c13) AS hash, 
            upper(c13) AS uppercase,
            length(c13) AS length,
            expm1(0.001) AS precise_value,
            exp(0.001) - 1 AS standard_value
        FROM test",
        )
        .await?;

    df.show().await?;

    Ok(())
}