Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -476,8 +476,19 @@ public static List<CommitEventTable> populateCommitEventTable(Table table, Spark
.cast("long")
.multiply(1000)
.as("commitTimestampMs"),
functions.col("summary").getItem("spark.app.id").as("commitAppId"),
functions.col("summary").getItem("spark.app.name").as("commitAppName"),
functions
.coalesce(
functions.col("summary").getItem("spark.app.id"),
functions.col("summary").getItem("trino_query_id"))
.as("commitAppId"),
functions
.when(
functions.col("summary").getItem("spark.app.id").isNotNull(),
functions.col("summary").getItem("spark.app.name"))
.when(
functions.col("summary").getItem("trino_query_id").isNotNull(),
functions.lit("trino"))
.as("commitAppName"),
functions.upper(functions.col("operation")).as("commitOperation"))
.as("commitMetadata"),
functions.lit(System.currentTimeMillis()).as("eventTimestampMs"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import lombok.extern.slf4j.Slf4j;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -273,6 +277,250 @@ public void testCommitEventsOrdering() throws Exception {
}
}

// ==================== Commit Metadata (Spark/Trino) Tests ====================
//
// These tests validate the coalesce logic for commitAppId and commitAppName:
// - commitAppId = coalesce(spark.app.id, trino_query_id)
// - commitAppName = when(spark.app.id.isNotNull, spark.app.name)
// .when(trino_query_id.isNotNull, "trino")
//
// We use the Iceberg Table API directly to create commits with controlled
// summary properties, bypassing Spark SQL which automatically sets spark.app.id.

/**
* Creates a DataFile for testing commits with controlled summary properties. The file doesn't
* need to physically exist since we're testing metadata collection.
*/
private DataFile createTestDataFile(Table table) {
PartitionSpec spec = table.spec();
Schema schema = table.schema();

if (spec.isUnpartitioned()) {
return DataFiles.builder(spec)
.withPath("/test/data-" + System.nanoTime() + ".parquet")
.withFileSizeInBytes(100)
.withRecordCount(1)
.build();
} else {
// For partitioned tables, we need to provide partition data
// Our test tables are partitioned by days(ts), so partition path is like "ts_day=19000"
return DataFiles.builder(spec)
.withPath("/test/data-" + System.nanoTime() + ".parquet")
.withFileSizeInBytes(100)
.withRecordCount(1)
.withPartitionPath("ts_day=19000")
.build();
}
}

@Test
public void testCommitMetadata_BothSparkAndTrinoNull() throws Exception {
final String tableName = "db.test_commit_metadata_both_null";

try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
// Setup: Create table
prepareTable(ops, tableName);

// Create a commit using Iceberg Table API directly (without Spark SQL)
// This bypasses Spark's automatic spark.app.id/spark.app.name injection
Table table = ops.getTable(tableName);
DataFile dataFile = createTestDataFile(table);
table.newAppend().appendFile(dataFile).commit();

// Verify the commit actually succeeded by checking snapshot exists
table.refresh();
Assertions.assertNotNull(table.currentSnapshot(), "Commit should have created a snapshot");
log.info("Commit succeeded with snapshot ID: {}", table.currentSnapshot().snapshotId());

// Refresh Spark's catalog cache to see the new metadata
ops.spark().sql("REFRESH TABLE " + tableName);

// Action: Collect commit events
List<CommitEventTable> commitEvents = ops.collectCommitEventTable(tableName);

// Verify: Both commitAppId and commitAppName should be null
// when neither spark.app.id nor trino_query_id is present in the summary
Assertions.assertFalse(commitEvents.isEmpty(), "Should have at least one commit event");
CommitEventTable event = commitEvents.get(0);

Assertions.assertNull(
event.getCommitMetadata().getCommitAppId(),
"commitAppId should be null when both spark.app.id and trino_query_id are absent");
Assertions.assertNull(
event.getCommitMetadata().getCommitAppName(),
"commitAppName should be null when both spark.app.id and trino_query_id are absent");

log.info(
"Both null scenario validated: commitAppId={}, commitAppName={}",
event.getCommitMetadata().getCommitAppId(),
event.getCommitMetadata().getCommitAppName());
}
}

@Test
public void testCommitMetadata_SparkNullTrinoPresent() throws Exception {
final String tableName = "db.test_commit_metadata_trino_only";
final String trinoQueryId = "20240101_123456_00001_abcde";

try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
// Setup: Create table
prepareTable(ops, tableName);

// Create a commit using Iceberg Table API with only trino_query_id set
// This simulates a Trino commit where spark.app.id is not present
Table table = ops.getTable(tableName);
DataFile dataFile = createTestDataFile(table);
table.newAppend().appendFile(dataFile).set("trino_query_id", trinoQueryId).commit();

// Verify the commit actually succeeded by checking snapshot exists
table.refresh();
Assertions.assertNotNull(table.currentSnapshot(), "Commit should have created a snapshot");
log.info("Commit succeeded with snapshot ID: {}", table.currentSnapshot().snapshotId());

// Refresh Spark's catalog cache to see the new metadata
ops.spark().sql("REFRESH TABLE " + tableName);

// Action: Collect commit events
List<CommitEventTable> commitEvents = ops.collectCommitEventTable(tableName);

// Verify: commitAppId should be trino_query_id, commitAppName should be "trino"
Assertions.assertFalse(commitEvents.isEmpty(), "Should have at least one commit event");
CommitEventTable event = commitEvents.get(0);

Assertions.assertEquals(
trinoQueryId,
event.getCommitMetadata().getCommitAppId(),
"commitAppId should be trino_query_id when spark.app.id is null");
Assertions.assertEquals(
"trino",
event.getCommitMetadata().getCommitAppName(),
"commitAppName should be 'trino' when trino_query_id is present and spark.app.id is null");

log.info(
"Trino-only scenario validated: commitAppId={}, commitAppName={}",
event.getCommitMetadata().getCommitAppId(),
event.getCommitMetadata().getCommitAppName());
}
}

@Test
public void testCommitMetadata_SparkPresentTrinoNull() throws Exception {
final String tableName = "db.test_commit_metadata_spark_only";
final String sparkAppId = "local-1704067200000";
final String sparkAppName = "TestSparkApp";

try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
// Setup: Create table
prepareTable(ops, tableName);

// Create a commit using Iceberg Table API with only spark.app.id and spark.app.name set
// This simulates a Spark commit where trino_query_id is not present
Table table = ops.getTable(tableName);
DataFile dataFile = createTestDataFile(table);
table
.newAppend()
.appendFile(dataFile)
.set("spark.app.id", sparkAppId)
.set("spark.app.name", sparkAppName)
.commit();

// Verify the commit actually succeeded by checking snapshot exists
table.refresh();
Assertions.assertNotNull(table.currentSnapshot(), "Commit should have created a snapshot");
log.info("Commit succeeded with snapshot ID: {}", table.currentSnapshot().snapshotId());

// Refresh Spark's catalog cache to see the new metadata
ops.spark().sql("REFRESH TABLE " + tableName);

// Action: Collect commit events
List<CommitEventTable> commitEvents = ops.collectCommitEventTable(tableName);

// Verify: commitAppId should be spark.app.id, commitAppName should be spark.app.name
Assertions.assertFalse(commitEvents.isEmpty(), "Should have at least one commit event");
CommitEventTable event = commitEvents.get(0);

Assertions.assertEquals(
sparkAppId,
event.getCommitMetadata().getCommitAppId(),
"commitAppId should be spark.app.id when present");
Assertions.assertEquals(
sparkAppName,
event.getCommitMetadata().getCommitAppName(),
"commitAppName should be spark.app.name when spark.app.id is present");

log.info(
"Spark-only scenario validated: commitAppId={}, commitAppName={}",
event.getCommitMetadata().getCommitAppId(),
event.getCommitMetadata().getCommitAppName());
}
}

@Test
public void testCommitMetadata_BothPresentSparkTakesPrecedence() throws Exception {
final String tableName = "db.test_commit_metadata_both_present";
final String sparkAppId = "local-1704067200000";
final String sparkAppName = "TestSparkApp";
final String trinoQueryId = "20240101_123456_00001_abcde";

try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
// Setup: Create table
prepareTable(ops, tableName);

// Create a commit using Iceberg Table API with BOTH spark.app.id and trino_query_id set
// This edge case shouldn't happen in production but we verify Spark takes precedence
Table table = ops.getTable(tableName);
DataFile dataFile = createTestDataFile(table);
table
.newAppend()
.appendFile(dataFile)
.set("spark.app.id", sparkAppId)
.set("spark.app.name", sparkAppName)
.set("trino_query_id", trinoQueryId)
.commit();

// Verify the commit actually succeeded by checking snapshot exists
table.refresh();
Assertions.assertNotNull(table.currentSnapshot(), "Commit should have created a snapshot");
log.info("Commit succeeded with snapshot ID: {}", table.currentSnapshot().snapshotId());

// Refresh Spark's catalog cache to see the new metadata
ops.spark().sql("REFRESH TABLE " + tableName);

// Action: Collect commit events
List<CommitEventTable> commitEvents = ops.collectCommitEventTable(tableName);

// Verify: spark.app.id takes precedence due to coalesce() ordering
// commitAppId = coalesce(spark.app.id, trino_query_id) → spark.app.id
// commitAppName = when(spark.app.id.isNotNull, spark.app.name) → spark.app.name
Assertions.assertFalse(commitEvents.isEmpty(), "Should have at least one commit event");
CommitEventTable event = commitEvents.get(0);

Assertions.assertEquals(
sparkAppId,
event.getCommitMetadata().getCommitAppId(),
"commitAppId should be spark.app.id (takes precedence over trino_query_id)");
Assertions.assertEquals(
sparkAppName,
event.getCommitMetadata().getCommitAppName(),
"commitAppName should be spark.app.name when spark.app.id is present");

// Verify it's NOT using the Trino values
Assertions.assertNotEquals(
trinoQueryId,
event.getCommitMetadata().getCommitAppId(),
"commitAppId should NOT be trino_query_id when spark.app.id is also present");
Assertions.assertNotEquals(
"trino",
event.getCommitMetadata().getCommitAppName(),
"commitAppName should NOT be 'trino' when spark.app.id is also present");

log.info(
"Both present scenario validated - Spark takes precedence: commitAppId={}, commitAppName={}",
event.getCommitMetadata().getCommitAppId(),
event.getCommitMetadata().getCommitAppName());
}
}

// ==================== Error Handling Tests ====================

@Test
Expand Down