Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ object SparkConnectorOptions {

val PRIMARY_KEY: ConfigOption[String] =
ConfigBuilder
.key("primary.key")
.key("primaryKey")
.stringType()
.noDefaultValue()
.withDescription("The primary keys of a Fluss table.")

val BUCKET_KEY: ConfigOption[String] =
ConfigBuilder
.key("bucket.key")
.key("bucketKey")
.stringType()
.noDefaultValue()
.withDescription(
Expand All @@ -44,7 +44,7 @@ object SparkConnectorOptions {

val BUCKET_NUMBER: ConfigOption[Integer] =
ConfigBuilder
.key("bucket.num")
.key("bucketNum")
.intType()
.noDefaultValue()
.withDescription("The number of buckets of a Fluss table.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object SparkFlussConf {
val SPARK_FLUSS_CONF_PREFIX = "spark.sql.fluss."

val READ_OPTIMIZED_OPTION: ConfigOption[java.lang.Boolean] =
key("read.optimized")
key("readOptimized")
.booleanType()
.defaultValue(false)
.withDescription(
Expand All @@ -39,21 +39,21 @@ object SparkFlussConf {

val SCAN_START_UP_MODE: ConfigOption[String] =
ConfigBuilder
.key("scan.startup.mode")
.key("scanStartupMode")
.stringType()
.defaultValue(StartUpMode.FULL.toString)
.withDescription("The start up mode when read Fluss table.")

val SCAN_POLL_TIMEOUT: ConfigOption[Duration] =
ConfigBuilder
.key("scan.poll.timeout")
.key("scanPollTimeout")
.durationType()
.defaultValue(Duration.ofMillis(10000L))
.withDescription("The timeout for log scanner to poll records.")

val SCAN_MAX_RECORDS_PER_PARTITION: ConfigOption[java.lang.Long] =
ConfigBuilder
.key("scan.maxRecordsPerPartition")
.key("scanMaxRecordsPerPartition")
.longType()
.noDefaultValue()
.withDescription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class SparkCatalogTest extends FlussSparkTestBase {
sql(s"""
|CREATE TABLE $DEFAULT_DATABASE.test_tbl (id int, name string, pt string)
|PARTITIONED BY (pt)
|TBLPROPERTIES("primary.key" = "id,pt")
|TBLPROPERTIES("primaryKey" = "id,pt")
|""".stripMargin)

val tbl1 = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, "test_tbl")).get()
Expand All @@ -142,7 +142,7 @@ class SparkCatalogTest extends FlussSparkTestBase {
s"""
|CREATE TABLE $DEFAULT_DATABASE.test_tbl2 (pk1 int, pk2 long, name string, pt1 string, pt2 string)
|PARTITIONED BY (pt1, pt2)
|TBLPROPERTIES("primary.key" = "pk1,pk2,pt1,pt2", "bucket.num" = 3, "bucket.key" = "pk1")
|TBLPROPERTIES("primaryKey" = "pk1,pk2,pt1,pt2", "bucketNum" = 3, "bucketKey" = "pk1")
|""".stripMargin)

val tbl2 = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, "test_tbl2")).get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class SparkPrimaryKeyTableReadTest extends FlussSparkTestBase {
val tablePath = createTablePath("t")
sql(s"""
|CREATE TABLE $DEFAULT_DATABASE.t (orderId BIGINT, itemId BIGINT, amount INT, address STRING)
|TBLPROPERTIES("primary.key" = "orderId", "bucket.num" = 1)
|TBLPROPERTIES("primaryKey" = "orderId", "bucketNum" = 1)
|""".stripMargin)

sql(s"""
Expand Down Expand Up @@ -151,7 +151,7 @@ class SparkPrimaryKeyTableReadTest extends FlussSparkTestBase {
sql(s"""
|CREATE TABLE $DEFAULT_DATABASE.t (orderId BIGINT, itemId BIGINT, amount INT, address STRING, dt STRING)
|PARTITIONED BY (dt)
|TBLPROPERTIES("primary.key" = "orderId,dt", "bucket.num" = 1)
|TBLPROPERTIES("primaryKey" = "orderId,dt", "bucketNum" = 1)
|""".stripMargin)

sql(s"""
Expand Down Expand Up @@ -248,7 +248,7 @@ class SparkPrimaryKeyTableReadTest extends FlussSparkTestBase {
test("Spark Read: primary key table with random project") {
withTable("t") {
sql(
"CREATE TABLE t (id int, name string, pk int, pk2 string) TBLPROPERTIES('primary.key'='pk,pk2')")
"CREATE TABLE t (id int, name string, pk int, pk2 string) TBLPROPERTIES('primaryKey'='pk,pk2')")
checkAnswer(sql("SELECT * FROM t"), Nil)
sql("INSERT INTO t VALUES (1, 'a', 10, 'x'), (2, 'b', 20, 'y')")
checkAnswer(
Expand All @@ -269,7 +269,7 @@ class SparkPrimaryKeyTableReadTest extends FlussSparkTestBase {
|arr ARRAY<INT>,
|struct_col STRUCT<col1: INT, col2: STRING>,
|ts_ltz TIMESTAMP_LTZ
|) TBLPROPERTIES("primary.key" = "pk", "bucket.num" = 1)
|) TBLPROPERTIES("primaryKey" = "pk", "bucketNum" = 1)
|""".stripMargin)

sql(s"""
Expand Down Expand Up @@ -344,7 +344,7 @@ class SparkPrimaryKeyTableReadTest extends FlussSparkTestBase {
val tablePath = createTablePath("t")
sql(s"""
|CREATE TABLE $DEFAULT_DATABASE.t (id INT, name STRING)
|TBLPROPERTIES("primary.key" = "id", "bucket.num" = 1)
|TBLPROPERTIES("primaryKey" = "id", "bucketNum" = 1)
|""".stripMargin)

sql(s"""
Expand Down Expand Up @@ -491,7 +491,7 @@ class SparkPrimaryKeyTableReadTest extends FlussSparkTestBase {
| orderId BIGINT, itemId BIGINT, amount INT, address STRING, dt STRING
|)
|PARTITIONED BY (dt)
|TBLPROPERTIES("primary.key" = "orderId,dt", "bucket.num" = 1)""".stripMargin)
|TBLPROPERTIES("primaryKey" = "orderId,dt", "bucketNum" = 1)""".stripMargin)
sql(s"""
|INSERT INTO $DEFAULT_DATABASE.t VALUES
|(600L, 21L, 601, "addr1", "2026-01-01"), (700L, 22L, 602, "addr2", "2026-01-01"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class SparkStreamingTest extends FlussSparkTestBase with StreamTest {
withTable("t") {
val tablePath = createTablePath("t")
spark.sql(s"""
|CREATE TABLE t (id bigint, data string) TBLPROPERTIES("primary.key" = "id")
|CREATE TABLE t (id bigint, data string) TBLPROPERTIES("primaryKey" = "id")
|""".stripMargin)
val table = loadFlussTable(tablePath)
assert(table.getTableInfo.hasBucketKey)
Expand Down Expand Up @@ -157,7 +157,7 @@ class SparkStreamingTest extends FlussSparkTestBase with StreamTest {
val schema = StructType(Seq(StructField("id", IntegerType), StructField("data", StringType)))

// Test with ProcessAllAvailable
testStream(spark.readStream.options(Map("scan.startup.mode" -> "latest")).table(tableName))(
testStream(spark.readStream.options(Map("scanStartupMode" -> "latest")).table(tableName))(
ProcessAllAvailable(),
CheckLastBatch(),
StopStream,
Expand All @@ -169,7 +169,7 @@ class SparkStreamingTest extends FlussSparkTestBase with StreamTest {

// Test with timed trigger
val clock = new StreamManualClock
testStream(spark.readStream.options(Map("scan.startup.mode" -> "latest")).table(tableName))(
testStream(spark.readStream.options(Map("scanStartupMode" -> "latest")).table(tableName))(
StartStream(trigger = Trigger.ProcessingTime(500), clock),
AdvanceManualClock(500),
CheckNewAnswer(),
Expand Down Expand Up @@ -198,7 +198,7 @@ class SparkStreamingTest extends FlussSparkTestBase with StreamTest {
StructField("pt", StringType)))

// Test with ProcessAllAvailable
testStream(spark.readStream.options(Map("scan.startup.mode" -> "latest")).table(tableName))(
testStream(spark.readStream.options(Map("scanStartupMode" -> "latest")).table(tableName))(
ProcessAllAvailable(),
CheckLastBatch(),
StopStream,
Expand All @@ -210,7 +210,7 @@ class SparkStreamingTest extends FlussSparkTestBase with StreamTest {

// Test with timed trigger
val clock = new StreamManualClock
testStream(spark.readStream.options(Map("scan.startup.mode" -> "latest")).table(tableName))(
testStream(spark.readStream.options(Map("scanStartupMode" -> "latest")).table(tableName))(
StartStream(trigger = Trigger.ProcessingTime(500), clock),
AdvanceManualClock(500),
CheckNewAnswer(),
Expand All @@ -234,7 +234,7 @@ class SparkStreamingTest extends FlussSparkTestBase with StreamTest {
val tableName = "t"
withTable(tableName) {
sql(
"CREATE TABLE t (pk1 int, pk2 string, id int, data string) TBLPROPERTIES('primary.key' = 'pk1, pk2', 'bucket.num' = 1)")
"CREATE TABLE t (pk1 int, pk2 string, id int, data string) TBLPROPERTIES('primaryKey' = 'pk1, pk2', 'bucketNum' = 1)")
sql("INSERT INTO t VALUES (1, 'a', 11, 'aa'), (2, 'b', 22, 'bb'), (3, 'c', 33, 'cc')")

val schema = StructType(
Expand All @@ -245,7 +245,7 @@ class SparkStreamingTest extends FlussSparkTestBase with StreamTest {
StructField("data", StringType)))

// Test with ProcessAllAvailable
testStream(spark.readStream.options(Map("scan.startup.mode" -> "latest")).table(tableName))(
testStream(spark.readStream.options(Map("scanStartupMode" -> "latest")).table(tableName))(
ProcessAllAvailable(),
CheckLastBatch(),
StopStream,
Expand All @@ -257,7 +257,7 @@ class SparkStreamingTest extends FlussSparkTestBase with StreamTest {

// Test with timed trigger
val clock = new StreamManualClock
testStream(spark.readStream.options(Map("scan.startup.mode" -> "latest")).table(tableName))(
testStream(spark.readStream.options(Map("scanStartupMode" -> "latest")).table(tableName))(
StartStream(trigger = Trigger.ProcessingTime(500), clock),
AdvanceManualClock(500),
CheckNewAnswer(),
Expand All @@ -281,7 +281,7 @@ class SparkStreamingTest extends FlussSparkTestBase with StreamTest {
val tableName = "t"
withTable(tableName) {
sql(
"CREATE TABLE t (pk1 int, pk2 string, id int, data string, dt string) PARTITIONED BY(dt) TBLPROPERTIES('primary.key' = 'pk1, pk2, dt', 'bucket.num' = 1)")
"CREATE TABLE t (pk1 int, pk2 string, id int, data string, dt string) PARTITIONED BY(dt) TBLPROPERTIES('primaryKey' = 'pk1, pk2, dt', 'bucketNum' = 1)")
sql(
"INSERT INTO t VALUES (1, 'a', 11, 'aa', 'a'), (2, 'b', 22, 'bb', 'b'), (3, 'c', 33, 'cc', 'b')")

Expand All @@ -295,7 +295,7 @@ class SparkStreamingTest extends FlussSparkTestBase with StreamTest {
))

// Test with ProcessAllAvailable
testStream(spark.readStream.options(Map("scan.startup.mode" -> "latest")).table(tableName))(
testStream(spark.readStream.options(Map("scanStartupMode" -> "latest")).table(tableName))(
ProcessAllAvailable(),
CheckLastBatch(),
StopStream,
Expand All @@ -310,7 +310,7 @@ class SparkStreamingTest extends FlussSparkTestBase with StreamTest {

// Test with timed trigger
val clock = new StreamManualClock
testStream(spark.readStream.options(Map("scan.startup.mode" -> "latest")).table(tableName))(
testStream(spark.readStream.options(Map("scanStartupMode" -> "latest")).table(tableName))(
StartStream(trigger = Trigger.ProcessingTime(500), clock),
AdvanceManualClock(500),
CheckNewAnswer(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,14 +671,14 @@ abstract class SparkLakeLogTableReadTest extends SparkLakeTableReadTestBase {
tierToLake("t_earliest")

try {
spark.conf.set("spark.sql.fluss.scan.startup.mode", "earliest")
spark.conf.set("spark.sql.fluss.scanStartupMode", "earliest")

checkAnswer(
sql(s"SELECT * FROM $DEFAULT_DATABASE.t_earliest ORDER BY id"),
Row(1, "alpha") :: Row(2, "beta") :: Row(3, "gamma") :: Nil
)
} finally {
spark.conf.set("spark.sql.fluss.scan.startup.mode", "full")
spark.conf.set("spark.sql.fluss.scanStartupMode", "full")
}
}
}
Expand Down
20 changes: 10 additions & 10 deletions website/docs/engine-spark/ddl.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ DROP DATABASE my_db;

### Log Table

The following SQL statement creates a Log Table by not specifying `primary.key` property.
The following SQL statement creates a Log Table by not specifying `primaryKey` property.

```sql title="Spark SQL"
CREATE TABLE my_log_table (
Expand All @@ -70,7 +70,7 @@ CREATE TABLE my_log_table (

### Primary Key Table

The following SQL statement creates a Primary Key Table by specifying `primary.key` in `TBLPROPERTIES`.
The following SQL statement creates a Primary Key Table by specifying `primaryKey` in `TBLPROPERTIES`.

```sql title="Spark SQL"
CREATE TABLE my_pk_table (
Expand All @@ -79,8 +79,8 @@ CREATE TABLE my_pk_table (
num_orders INT,
total_amount INT
) TBLPROPERTIES (
'primary.key' = 'shop_id,user_id',
'bucket.num' = '4'
'primaryKey' = 'shop_id,user_id',
'bucketNum' = '4'
);
```

Expand All @@ -106,7 +106,7 @@ CREATE TABLE my_part_pk_table (
name STRING,
pt STRING
) PARTITIONED BY (pt) TBLPROPERTIES (
'primary.key' = 'id,pt'
'primaryKey' = 'id,pt'
);
```

Expand Down Expand Up @@ -134,9 +134,9 @@ The following table properties can be specified when creating a table:

| Property | Required | Description |
|--------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| primary.key | optional | The primary keys of the Fluss table. Multiple columns are separated by commas (e.g., `'col1,col2'`). |
| bucket.key | optional | The distribution key of the Fluss table. Data will be distributed to each bucket according to the hash value of the bucket key. Must be a subset of the primary keys (excluding partition keys). If not specified, defaults to the primary key (excluding partition keys) for PK tables, or random distribution for Log tables. |
| bucket.num | optional | The number of buckets of the Fluss table. |
| primaryKey | optional | The primary keys of the Fluss table. Multiple columns are separated by commas (e.g., `'col1,col2'`). |
| bucketKey | optional | The distribution key of the Fluss table. Data will be distributed to each bucket according to the hash value of the bucket key. Must be a subset of the primary keys (excluding partition keys). If not specified, defaults to the primary key (excluding partition keys) for PK tables, or random distribution for Log tables. |
| bucketNum | optional | The number of buckets of the Fluss table. |

You can also pass additional custom properties and Fluss storage options through `TBLPROPERTIES`:

Expand All @@ -145,8 +145,8 @@ CREATE TABLE my_table (
id INT,
name STRING
) TBLPROPERTIES (
'primary.key' = 'id',
'bucket.num' = '4',
'primaryKey' = 'id',
'bucketNum' = '4',
'key1' = 'value1'
);
```
Expand Down
4 changes: 2 additions & 2 deletions website/docs/engine-spark/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ CREATE TABLE pk_table (
num_orders INT,
total_amount INT
) TBLPROPERTIES (
'primary.key' = 'shop_id,user_id',
'bucket.num' = '4'
'primaryKey' = 'shop_id,user_id',
'bucketNum' = '4'
);
```

Expand Down
6 changes: 3 additions & 3 deletions website/docs/engine-spark/options.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ The following Spark configurations can be used to control read behavior for both

| Option | Default | Description |
|--------|---------|-------------|
| `spark.sql.fluss.scan.startup.mode` | `full` | The startup mode when reading a Fluss table. Supported values: <ul><li>`full` (default): For primary key tables, reads the full snapshot and merges with log changes. For log tables, reads from the earliest offset.</li><li>`earliest`: Reads from the earliest log/changelog offset.</li><li>`latest`: Reads from the latest log/changelog offset.</li><li>`timestamp`: Reads from a specified timestamp (requires `scan.startup.timestamp`).</li></ul>**Note:** For Structured Streaming read, only `latest` mode is currently supported. |
| `spark.sql.fluss.read.optimized` | `false` | If `true`, Spark will only read data from the data lake snapshot or KV snapshot, without merging log changes. This can improve read performance but may return stale data for primary key tables. |
| `spark.sql.fluss.scan.poll.timeout` | `10000ms` | The timeout for the log scanner to poll records. |
| `spark.sql.fluss.scanStartupMode` | `full` | The startup mode when reading a Fluss table. Supported values: <ul><li>`full` (default): For primary key tables, reads the full snapshot and merges with log changes. For log tables, reads from the earliest offset.</li><li>`earliest`: Reads from the earliest log/changelog offset.</li><li>`latest`: Reads from the latest log/changelog offset.</li><li>`timestamp`: Reads from a specified timestamp (requires `scanStartupTimestamp`).</li></ul>**Note:** For Structured Streaming read, only `latest` mode is currently supported. |
| `spark.sql.fluss.readOptimized` | `false` | If `true`, Spark will only read data from the data lake snapshot or KV snapshot, without merging log changes. This can improve read performance but may return stale data for primary key tables. |
| `spark.sql.fluss.scanPollTimeout` | `10000ms` | The timeout for the log scanner to poll records. |
10 changes: 5 additions & 5 deletions website/docs/engine-spark/reads.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ CREATE TABLE pk_table (
amount INT,
address STRING
) TBLPROPERTIES (
'primary.key' = 'order_id',
'bucket.num' = '1'
'primaryKey' = 'order_id',
'bucketNum' = '1'
);
```

Expand Down Expand Up @@ -169,8 +169,8 @@ CREATE TABLE part_pk_table (
address STRING,
dt STRING
) PARTITIONED BY (dt) TBLPROPERTIES (
'primary.key' = 'order_id,dt',
'bucket.num' = '1'
'primaryKey' = 'order_id,dt',
'bucketNum' = '1'
);
```

Expand Down Expand Up @@ -301,7 +301,7 @@ For primary key tables, Fluss by default reads the latest snapshot and merges it

```sql title="Spark SQL"
-- Enable read-optimized mode for primary key tables
SET spark.sql.fluss.read.optimized=true;
SET spark.sql.fluss.readOptimized=true;

-- Query returns only snapshot data (may be stale)
SELECT * FROM pk_table;
Expand Down
Loading