diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConnectorOptions.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConnectorOptions.scala index 71b1689850..65f8bce678 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConnectorOptions.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConnectorOptions.scala @@ -23,14 +23,14 @@ object SparkConnectorOptions { val PRIMARY_KEY: ConfigOption[String] = ConfigBuilder - .key("primary.key") + .key("primaryKey") .stringType() .noDefaultValue() .withDescription("The primary keys of a Fluss table.") val BUCKET_KEY: ConfigOption[String] = ConfigBuilder - .key("bucket.key") + .key("bucketKey") .stringType() .noDefaultValue() .withDescription( @@ -44,7 +44,7 @@ object SparkConnectorOptions { val BUCKET_NUMBER: ConfigOption[Integer] = ConfigBuilder - .key("bucket.num") + .key("bucketNum") .intType() .noDefaultValue() .withDescription("The number of buckets of a Fluss table.") diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala index aac6a698da..7b96f58c2a 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala @@ -27,7 +27,7 @@ object SparkFlussConf { val SPARK_FLUSS_CONF_PREFIX = "spark.sql.fluss." val READ_OPTIMIZED_OPTION: ConfigOption[java.lang.Boolean] = - key("read.optimized") + key("readOptimized") .booleanType() .defaultValue(false) .withDescription( @@ -39,21 +39,21 @@ object SparkFlussConf { val SCAN_START_UP_MODE: ConfigOption[String] = ConfigBuilder - .key("scan.startup.mode") + .key("scanStartupMode") .stringType() .defaultValue(StartUpMode.FULL.toString) .withDescription("The start up mode when read Fluss table.") val SCAN_POLL_TIMEOUT: ConfigOption[Duration] = ConfigBuilder - .key("scan.poll.timeout") + .key("scanPollTimeout") .durationType() .defaultValue(Duration.ofMillis(10000L)) .withDescription("The timeout for log scanner to poll records.") val SCAN_MAX_RECORDS_PER_PARTITION: ConfigOption[java.lang.Long] = ConfigBuilder - .key("scan.maxRecordsPerPartition") + .key("scanMaxRecordsPerPartition") .longType() .noDefaultValue() .withDescription( diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala index e30b03baac..0eb633a134 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala @@ -129,7 +129,7 @@ class SparkCatalogTest extends FlussSparkTestBase { sql(s""" |CREATE TABLE $DEFAULT_DATABASE.test_tbl (id int, name string, pt string) |PARTITIONED BY (pt) - |TBLPROPERTIES("primary.key" = "id,pt") + |TBLPROPERTIES("primaryKey" = "id,pt") |""".stripMargin) val tbl1 = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, "test_tbl")).get() @@ -142,7 +142,7 @@ class SparkCatalogTest extends FlussSparkTestBase { s""" |CREATE TABLE $DEFAULT_DATABASE.test_tbl2 (pk1 int, pk2 long, name string, pt1 string, pt2 string) |PARTITIONED BY (pt1, pt2) - |TBLPROPERTIES("primary.key" = "pk1,pk2,pt1,pt2", "bucket.num" = 3, "bucket.key" = "pk1") + |TBLPROPERTIES("primaryKey" = "pk1,pk2,pt1,pt2", "bucketNum" = 3, "bucketKey" = "pk1") |""".stripMargin) val tbl2 = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, "test_tbl2")).get() diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala index 4e30d552d1..f519aba545 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala @@ -54,7 +54,7 @@ class SparkPrimaryKeyTableReadTest extends FlussSparkTestBase { val tablePath = createTablePath("t") sql(s""" |CREATE TABLE $DEFAULT_DATABASE.t (orderId BIGINT, itemId BIGINT, amount INT, address STRING) - |TBLPROPERTIES("primary.key" = "orderId", "bucket.num" = 1) + |TBLPROPERTIES("primaryKey" = "orderId", "bucketNum" = 1) |""".stripMargin) sql(s""" @@ -151,7 +151,7 @@ class SparkPrimaryKeyTableReadTest extends FlussSparkTestBase { sql(s""" |CREATE TABLE $DEFAULT_DATABASE.t (orderId BIGINT, itemId BIGINT, amount INT, address STRING, dt STRING) |PARTITIONED BY (dt) - |TBLPROPERTIES("primary.key" = "orderId,dt", "bucket.num" = 1) + |TBLPROPERTIES("primaryKey" = "orderId,dt", "bucketNum" = 1) |""".stripMargin) sql(s""" @@ -248,7 +248,7 @@ class SparkPrimaryKeyTableReadTest extends FlussSparkTestBase { test("Spark Read: primary key table with random project") { withTable("t") { sql( - "CREATE TABLE t (id int, name string, pk int, pk2 string) TBLPROPERTIES('primary.key'='pk,pk2')") + "CREATE TABLE t (id int, name string, pk int, pk2 string) TBLPROPERTIES('primaryKey'='pk,pk2')") checkAnswer(sql("SELECT * FROM t"), Nil) sql("INSERT INTO t VALUES (1, 'a', 10, 'x'), (2, 'b', 20, 'y')") checkAnswer( @@ -269,7 +269,7 @@ class SparkPrimaryKeyTableReadTest extends FlussSparkTestBase { |arr ARRAY, |struct_col STRUCT, |ts_ltz TIMESTAMP_LTZ - |) TBLPROPERTIES("primary.key" = "pk", "bucket.num" = 1) + |) TBLPROPERTIES("primaryKey" = "pk", "bucketNum" = 1) |""".stripMargin) sql(s""" @@ -344,7 +344,7 @@ class SparkPrimaryKeyTableReadTest extends FlussSparkTestBase { val tablePath = createTablePath("t") sql(s""" |CREATE TABLE $DEFAULT_DATABASE.t (id INT, name STRING) - |TBLPROPERTIES("primary.key" = "id", "bucket.num" = 1) + |TBLPROPERTIES("primaryKey" = "id", "bucketNum" = 1) |""".stripMargin) sql(s""" @@ -491,7 +491,7 @@ class SparkPrimaryKeyTableReadTest extends FlussSparkTestBase { | orderId BIGINT, itemId BIGINT, amount INT, address STRING, dt STRING |) |PARTITIONED BY (dt) - |TBLPROPERTIES("primary.key" = "orderId,dt", "bucket.num" = 1)""".stripMargin) + |TBLPROPERTIES("primaryKey" = "orderId,dt", "bucketNum" = 1)""".stripMargin) sql(s""" |INSERT INTO $DEFAULT_DATABASE.t VALUES |(600L, 21L, 601, "addr1", "2026-01-01"), (700L, 22L, 602, "addr2", "2026-01-01"), diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkStreamingTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkStreamingTest.scala index 6921952c7f..8a0f5c585e 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkStreamingTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkStreamingTest.scala @@ -123,7 +123,7 @@ class SparkStreamingTest extends FlussSparkTestBase with StreamTest { withTable("t") { val tablePath = createTablePath("t") spark.sql(s""" - |CREATE TABLE t (id bigint, data string) TBLPROPERTIES("primary.key" = "id") + |CREATE TABLE t (id bigint, data string) TBLPROPERTIES("primaryKey" = "id") |""".stripMargin) val table = loadFlussTable(tablePath) assert(table.getTableInfo.hasBucketKey) @@ -157,7 +157,7 @@ class SparkStreamingTest extends FlussSparkTestBase with StreamTest { val schema = StructType(Seq(StructField("id", IntegerType), StructField("data", StringType))) // Test with ProcessAllAvailable - testStream(spark.readStream.options(Map("scan.startup.mode" -> "latest")).table(tableName))( + testStream(spark.readStream.options(Map("scanStartupMode" -> "latest")).table(tableName))( ProcessAllAvailable(), CheckLastBatch(), StopStream, @@ -169,7 +169,7 @@ class SparkStreamingTest extends FlussSparkTestBase with StreamTest { // Test with timed trigger val clock = new StreamManualClock - testStream(spark.readStream.options(Map("scan.startup.mode" -> "latest")).table(tableName))( + testStream(spark.readStream.options(Map("scanStartupMode" -> "latest")).table(tableName))( StartStream(trigger = Trigger.ProcessingTime(500), clock), AdvanceManualClock(500), CheckNewAnswer(), @@ -198,7 +198,7 @@ class SparkStreamingTest extends FlussSparkTestBase with StreamTest { StructField("pt", StringType))) // Test with ProcessAllAvailable - testStream(spark.readStream.options(Map("scan.startup.mode" -> "latest")).table(tableName))( + testStream(spark.readStream.options(Map("scanStartupMode" -> "latest")).table(tableName))( ProcessAllAvailable(), CheckLastBatch(), StopStream, @@ -210,7 +210,7 @@ class SparkStreamingTest extends FlussSparkTestBase with StreamTest { // Test with timed trigger val clock = new StreamManualClock - testStream(spark.readStream.options(Map("scan.startup.mode" -> "latest")).table(tableName))( + testStream(spark.readStream.options(Map("scanStartupMode" -> "latest")).table(tableName))( StartStream(trigger = Trigger.ProcessingTime(500), clock), AdvanceManualClock(500), CheckNewAnswer(), @@ -234,7 +234,7 @@ class SparkStreamingTest extends FlussSparkTestBase with StreamTest { val tableName = "t" withTable(tableName) { sql( - "CREATE TABLE t (pk1 int, pk2 string, id int, data string) TBLPROPERTIES('primary.key' = 'pk1, pk2', 'bucket.num' = 1)") + "CREATE TABLE t (pk1 int, pk2 string, id int, data string) TBLPROPERTIES('primaryKey' = 'pk1, pk2', 'bucketNum' = 1)") sql("INSERT INTO t VALUES (1, 'a', 11, 'aa'), (2, 'b', 22, 'bb'), (3, 'c', 33, 'cc')") val schema = StructType( @@ -245,7 +245,7 @@ class SparkStreamingTest extends FlussSparkTestBase with StreamTest { StructField("data", StringType))) // Test with ProcessAllAvailable - testStream(spark.readStream.options(Map("scan.startup.mode" -> "latest")).table(tableName))( + testStream(spark.readStream.options(Map("scanStartupMode" -> "latest")).table(tableName))( ProcessAllAvailable(), CheckLastBatch(), StopStream, @@ -257,7 +257,7 @@ class SparkStreamingTest extends FlussSparkTestBase with StreamTest { // Test with timed trigger val clock = new StreamManualClock - testStream(spark.readStream.options(Map("scan.startup.mode" -> "latest")).table(tableName))( + testStream(spark.readStream.options(Map("scanStartupMode" -> "latest")).table(tableName))( StartStream(trigger = Trigger.ProcessingTime(500), clock), AdvanceManualClock(500), CheckNewAnswer(), @@ -281,7 +281,7 @@ class SparkStreamingTest extends FlussSparkTestBase with StreamTest { val tableName = "t" withTable(tableName) { sql( - "CREATE TABLE t (pk1 int, pk2 string, id int, data string, dt string) PARTITIONED BY(dt) TBLPROPERTIES('primary.key' = 'pk1, pk2, dt', 'bucket.num' = 1)") + "CREATE TABLE t (pk1 int, pk2 string, id int, data string, dt string) PARTITIONED BY(dt) TBLPROPERTIES('primaryKey' = 'pk1, pk2, dt', 'bucketNum' = 1)") sql( "INSERT INTO t VALUES (1, 'a', 11, 'aa', 'a'), (2, 'b', 22, 'bb', 'b'), (3, 'c', 33, 'cc', 'b')") @@ -295,7 +295,7 @@ class SparkStreamingTest extends FlussSparkTestBase with StreamTest { )) // Test with ProcessAllAvailable - testStream(spark.readStream.options(Map("scan.startup.mode" -> "latest")).table(tableName))( + testStream(spark.readStream.options(Map("scanStartupMode" -> "latest")).table(tableName))( ProcessAllAvailable(), CheckLastBatch(), StopStream, @@ -310,7 +310,7 @@ class SparkStreamingTest extends FlussSparkTestBase with StreamTest { // Test with timed trigger val clock = new StreamManualClock - testStream(spark.readStream.options(Map("scan.startup.mode" -> "latest")).table(tableName))( + testStream(spark.readStream.options(Map("scanStartupMode" -> "latest")).table(tableName))( StartStream(trigger = Trigger.ProcessingTime(500), clock), AdvanceManualClock(500), CheckNewAnswer(), diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTest.scala index 5fd9109684..aae1ea19fe 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTest.scala @@ -671,14 +671,14 @@ abstract class SparkLakeLogTableReadTest extends SparkLakeTableReadTestBase { tierToLake("t_earliest") try { - spark.conf.set("spark.sql.fluss.scan.startup.mode", "earliest") + spark.conf.set("spark.sql.fluss.scanStartupMode", "earliest") checkAnswer( sql(s"SELECT * FROM $DEFAULT_DATABASE.t_earliest ORDER BY id"), Row(1, "alpha") :: Row(2, "beta") :: Row(3, "gamma") :: Nil ) } finally { - spark.conf.set("spark.sql.fluss.scan.startup.mode", "full") + spark.conf.set("spark.sql.fluss.scanStartupMode", "full") } } } diff --git a/website/docs/engine-spark/ddl.md b/website/docs/engine-spark/ddl.md index 0eee2d3ed8..d5b0cced37 100644 --- a/website/docs/engine-spark/ddl.md +++ b/website/docs/engine-spark/ddl.md @@ -57,7 +57,7 @@ DROP DATABASE my_db; ### Log Table -The following SQL statement creates a Log Table by not specifying `primary.key` property. +The following SQL statement creates a Log Table by not specifying `primaryKey` property. ```sql title="Spark SQL" CREATE TABLE my_log_table ( @@ -70,7 +70,7 @@ CREATE TABLE my_log_table ( ### Primary Key Table -The following SQL statement creates a Primary Key Table by specifying `primary.key` in `TBLPROPERTIES`. +The following SQL statement creates a Primary Key Table by specifying `primaryKey` in `TBLPROPERTIES`. ```sql title="Spark SQL" CREATE TABLE my_pk_table ( @@ -79,8 +79,8 @@ CREATE TABLE my_pk_table ( num_orders INT, total_amount INT ) TBLPROPERTIES ( - 'primary.key' = 'shop_id,user_id', - 'bucket.num' = '4' + 'primaryKey' = 'shop_id,user_id', + 'bucketNum' = '4' ); ``` @@ -106,7 +106,7 @@ CREATE TABLE my_part_pk_table ( name STRING, pt STRING ) PARTITIONED BY (pt) TBLPROPERTIES ( - 'primary.key' = 'id,pt' + 'primaryKey' = 'id,pt' ); ``` @@ -134,9 +134,9 @@ The following table properties can be specified when creating a table: | Property | Required | Description | |--------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| primary.key | optional | The primary keys of the Fluss table. Multiple columns are separated by commas (e.g., `'col1,col2'`). | -| bucket.key | optional | The distribution key of the Fluss table. Data will be distributed to each bucket according to the hash value of the bucket key. Must be a subset of the primary keys (excluding partition keys). If not specified, defaults to the primary key (excluding partition keys) for PK tables, or random distribution for Log tables. | -| bucket.num | optional | The number of buckets of the Fluss table. | +| primaryKey | optional | The primary keys of the Fluss table. Multiple columns are separated by commas (e.g., `'col1,col2'`). | +| bucketKey | optional | The distribution key of the Fluss table. Data will be distributed to each bucket according to the hash value of the bucket key. Must be a subset of the primary keys (excluding partition keys). If not specified, defaults to the primary key (excluding partition keys) for PK tables, or random distribution for Log tables. | +| bucketNum | optional | The number of buckets of the Fluss table. | You can also pass additional custom properties and Fluss storage options through `TBLPROPERTIES`: @@ -145,8 +145,8 @@ CREATE TABLE my_table ( id INT, name STRING ) TBLPROPERTIES ( - 'primary.key' = 'id', - 'bucket.num' = '4', + 'primaryKey' = 'id', + 'bucketNum' = '4', 'key1' = 'value1' ); ``` diff --git a/website/docs/engine-spark/getting-started.md b/website/docs/engine-spark/getting-started.md index 667227729e..fa75c5107e 100644 --- a/website/docs/engine-spark/getting-started.md +++ b/website/docs/engine-spark/getting-started.md @@ -144,8 +144,8 @@ CREATE TABLE pk_table ( num_orders INT, total_amount INT ) TBLPROPERTIES ( - 'primary.key' = 'shop_id,user_id', - 'bucket.num' = '4' + 'primaryKey' = 'shop_id,user_id', + 'bucketNum' = '4' ); ``` diff --git a/website/docs/engine-spark/options.md b/website/docs/engine-spark/options.md index b1f74cbc5e..491f2cefc2 100644 --- a/website/docs/engine-spark/options.md +++ b/website/docs/engine-spark/options.md @@ -14,6 +14,6 @@ The following Spark configurations can be used to control read behavior for both | Option | Default | Description | |--------|---------|-------------| -| `spark.sql.fluss.scan.startup.mode` | `full` | The startup mode when reading a Fluss table. Supported values: **Note:** For Structured Streaming read, only `latest` mode is currently supported. | -| `spark.sql.fluss.read.optimized` | `false` | If `true`, Spark will only read data from the data lake snapshot or KV snapshot, without merging log changes. This can improve read performance but may return stale data for primary key tables. | -| `spark.sql.fluss.scan.poll.timeout` | `10000ms` | The timeout for the log scanner to poll records. | +| `spark.sql.fluss.scanStartupMode` | `full` | The startup mode when reading a Fluss table. Supported values: **Note:** For Structured Streaming read, only `latest` mode is currently supported. | +| `spark.sql.fluss.readOptimized` | `false` | If `true`, Spark will only read data from the data lake snapshot or KV snapshot, without merging log changes. This can improve read performance but may return stale data for primary key tables. | +| `spark.sql.fluss.scanPollTimeout` | `10000ms` | The timeout for the log scanner to poll records. | diff --git a/website/docs/engine-spark/reads.md b/website/docs/engine-spark/reads.md index c9d745e45d..ab610939e0 100644 --- a/website/docs/engine-spark/reads.md +++ b/website/docs/engine-spark/reads.md @@ -121,8 +121,8 @@ CREATE TABLE pk_table ( amount INT, address STRING ) TBLPROPERTIES ( - 'primary.key' = 'order_id', - 'bucket.num' = '1' + 'primaryKey' = 'order_id', + 'bucketNum' = '1' ); ``` @@ -169,8 +169,8 @@ CREATE TABLE part_pk_table ( address STRING, dt STRING ) PARTITIONED BY (dt) TBLPROPERTIES ( - 'primary.key' = 'order_id,dt', - 'bucket.num' = '1' + 'primaryKey' = 'order_id,dt', + 'bucketNum' = '1' ); ``` @@ -301,7 +301,7 @@ For primary key tables, Fluss by default reads the latest snapshot and merges it ```sql title="Spark SQL" -- Enable read-optimized mode for primary key tables -SET spark.sql.fluss.read.optimized=true; +SET spark.sql.fluss.readOptimized=true; -- Query returns only snapshot data (may be stale) SELECT * FROM pk_table; diff --git a/website/docs/engine-spark/structured-streaming.md b/website/docs/engine-spark/structured-streaming.md index 4fcf2fafd7..e8b40f27df 100644 --- a/website/docs/engine-spark/structured-streaming.md +++ b/website/docs/engine-spark/structured-streaming.md @@ -58,7 +58,7 @@ Fluss supports reading data from Fluss tables using Spark Structured Streaming. ```scala title="Spark Application" val df = spark.readStream - .option("scan.startup.mode", "latest") + .option("scanStartupMode", "latest") .table("fluss_catalog.fluss.log_table") val query = df.writeStream @@ -72,7 +72,7 @@ query.awaitTermination() ```scala title="Spark Application" val df = spark.readStream - .option("scan.startup.mode", "latest") + .option("scanStartupMode", "latest") .table("fluss_catalog.fluss.pk_table") val query = df.writeStream @@ -97,7 +97,7 @@ Fluss Spark streaming source supports the following Spark trigger modes: import org.apache.spark.sql.streaming.Trigger val df = spark.readStream - .option("scan.startup.mode", "latest") + .option("scanStartupMode", "latest") .table("fluss_catalog.fluss.my_table") // Processing time trigger (every 5 seconds) @@ -127,12 +127,12 @@ val spark = SparkSession.builder() spark.sql("CREATE TABLE IF NOT EXISTS source_table (id INT, data STRING)") spark.sql(""" CREATE TABLE IF NOT EXISTS sink_table (id INT, data STRING) - TBLPROPERTIES ('primary.key' = 'id') + TBLPROPERTIES ('primaryKey' = 'id') """) // Read from source table val sourceDF = spark.readStream - .option("scan.startup.mode", "latest") + .option("scanStartupMode", "latest") .table("fluss_catalog.fluss.source_table") // Write to sink table diff --git a/website/docs/engine-spark/writes.md b/website/docs/engine-spark/writes.md index 3036855f95..88dea24be8 100644 --- a/website/docs/engine-spark/writes.md +++ b/website/docs/engine-spark/writes.md @@ -49,8 +49,8 @@ CREATE TABLE pk_table ( amount INT, address STRING ) TBLPROPERTIES ( - 'primary.key' = 'order_id', - 'bucket.num' = '1' + 'primaryKey' = 'order_id', + 'bucketNum' = '1' ); ```