[Iceberg AddFiles] Support creating namespaces if needed#39041
[Iceberg AddFiles] Support creating namespaces if needed#39041ahmedabu98 wants to merge 1 commit into
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances the Iceberg AddFiles functionality by enabling automatic namespace creation and optimizing table loading through the use of TableCache. These changes ensure more robust table initialization and reduce unnecessary overhead during the write process. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request refactors table initialization in AddFiles.java to use a thread-safe TableCache and adds support for automatic namespace creation. The review feedback highlights three key issues: first, a potential pipeline failure due to an uncaught AlreadyExistsException when concurrent table creation occurs inside a sibling catch block; second, a data race caused by storing the retrieved table in a shared instance field instead of a local variable; and third, a potential crash if checkStateNotNull receives a null exception message, which can be resolved by falling back to toString().
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| try { | ||
| return catalog.loadTable(tableId); | ||
| } catch (NoSuchTableException e) { // Otherwise, create the table | ||
| org.apache.iceberg.Schema tableSchema = getSchema(filePath, format); | ||
| PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, tableSchema); | ||
| SortOrder sortOrder = SortOrderUtils.toSortOrder(sortFields, tableSchema); | ||
|
|
||
| Catalog.TableBuilder builder = | ||
| catalogConfig | ||
| .catalog() | ||
| .buildTable(tableId, tableSchema) | ||
| .withPartitionSpec(spec) | ||
| .withSortOrder(sortOrder); | ||
| if (tableProps != null) { | ||
| builder.withProperties(tableProps); | ||
| } | ||
| Table table = builder.create(); | ||
| LOG.info( | ||
| "Created Iceberg table '{}' with schema: {}\n" | ||
| + ", partition spec: {}, sort order: {}, table properties: {}", | ||
| tableId, | ||
| tableSchema, | ||
| spec, | ||
| sortOrder, | ||
| tableProps); | ||
| return table; | ||
| } catch (AlreadyExistsException e2) { // if table already exists, just load it | ||
| return catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier)); | ||
| } |
There was a problem hiding this comment.
In Java, an exception thrown inside a catch block is not caught by sibling catch blocks of the same try statement. Here, if catalog.loadTable(tableId) throws NoSuchTableException, we enter the first catch block. If builder.create() then throws AlreadyExistsException (due to concurrent table creation by another worker), it will not be caught by the sibling catch (AlreadyExistsException e2) block, causing the pipeline to fail. We should nest the table creation in its own try-catch block to properly handle this race condition. Additionally, we can use the already resolved catalog variable instead of calling catalogConfig.catalog() again.
try {
return catalog.loadTable(tableId);
} catch (NoSuchTableException e) { // Otherwise, create the table
org.apache.iceberg.Schema tableSchema = getSchema(filePath, format);
PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, tableSchema);
SortOrder sortOrder = SortOrderUtils.toSortOrder(sortFields, tableSchema);
Catalog.TableBuilder builder =
catalog
.buildTable(tableId, tableSchema)
.withPartitionSpec(spec)
.withSortOrder(sortOrder);
if (tableProps != null) {
builder.withProperties(tableProps);
}
try {
Table table = builder.create();
LOG.info(
"Created Iceberg table '{}' with schema: {}\n"
+ ", partition spec: {}, sort order: {}, table properties: {}",
tableId,
tableSchema,
spec,
sortOrder,
tableProps);
return table;
} catch (AlreadyExistsException e2) { // if table already exists, just load it
return catalog.loadTable(tableId);
}
}| try { | ||
| table = | ||
| TableCache.get( | ||
| catalogConfig, | ||
| TableIdentifier.parse(identifier), | ||
| () -> loadOrCreateTable(filePath, format)); |
There was a problem hiding this comment.
Since we are now using TableCache.get (which is thread-safe and cached), we no longer need to store the table in a shared instance field. Storing it in a shared field without synchronization (as done here) introduces a data race when multiple threads execute this concurrently. Declaring table as a local variable completely avoids this issue. Please also remove the table field from the enclosing class if it is no longer used elsewhere.
Table table;
try {
table =
TableCache.get(
catalogConfig,
TableIdentifier.parse(identifier),
() -> loadOrCreateTable(filePath, format));| } catch (Exception e) { | ||
| Throwable cause = e.getCause(); | ||
| return new ProcessResult( | ||
| null, | ||
| Row.withSchema(ERROR_SCHEMA) | ||
| .addValues( | ||
| filePath, | ||
| checkStateNotNull(cause != null ? cause.getMessage() : e.getMessage())) | ||
| .build(), | ||
| timestamp, | ||
| window, | ||
| paneInfo); | ||
| } |
There was a problem hiding this comment.
If cause.getMessage() or e.getMessage() is null (which can happen for certain exceptions like NullPointerException), checkStateNotNull will throw an IllegalStateException. This will crash the pipeline instead of returning a proper ProcessResult with the error. We should fall back to toString() if the message is null to ensure robustness.
| } catch (Exception e) { | |
| Throwable cause = e.getCause(); | |
| return new ProcessResult( | |
| null, | |
| Row.withSchema(ERROR_SCHEMA) | |
| .addValues( | |
| filePath, | |
| checkStateNotNull(cause != null ? cause.getMessage() : e.getMessage())) | |
| .build(), | |
| timestamp, | |
| window, | |
| paneInfo); | |
| } | |
| } catch (Exception e) { | |
| Throwable cause = e.getCause(); | |
| String errorMsg = cause != null ? cause.getMessage() : e.getMessage(); | |
| if (errorMsg == null) { | |
| errorMsg = cause != null ? cause.toString() : e.toString(); | |
| } | |
| return new ProcessResult( | |
| null, | |
| Row.withSchema(ERROR_SCHEMA) | |
| .addValues(filePath, errorMsg) | |
| .build(), | |
| timestamp, | |
| window, | |
| paneInfo); | |
| } |
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
Allows AddFiles to create namespaces if needed, similar to the normal write sink.
Also switches to using the improved TableCache to avoid multiple API calls
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.