Skip to content

NIFI-15754 Add Google Cloud Storage Provider for Iceberg#11052

Open
exceptionfactory wants to merge 1 commit intoapache:mainfrom
exceptionfactory:NIFI-15754
Open

NIFI-15754 Add Google Cloud Storage Provider for Iceberg#11052
exceptionfactory wants to merge 1 commit intoapache:mainfrom
exceptionfactory:NIFI-15754

Conversation

@exceptionfactory
Copy link
Copy Markdown
Contributor

Summary

NIFI-15754 Adds Google Cloud Storage support to Iceberg modules with a GCSIcebergFileIOProvider Controller Service implementation.

The new Controller Service is packaged in a separate nifi-iceberg-gcs module and bundled in nifi-iceberg-gcs-nar, following the pattern of the AWS S3 and Azure Data Lake Storage implementations.

Although the iceberg-gcp module from the Apache Iceberg project includes a GCSFileIO implementation, the class depends on the google-cloud-storage library which has dozens of direct and transitive dependencies.

Instead of using the GCSFileIO class, the Controller Service package includes a direct implementation of the Iceberg FileIO interface named GoogleCloudStorageFileIO. The implementation uses the Java HttpClient for REST operations with the GCS API. Supporting Bearer Token authentication with vended credentials from an Iceberg REST Catalog, the FileIO implementation avoids multiple layers of dependencies. The new implementation reads the same properties defined in the Iceberg GCPProperties to support compatibility with Iceberg Catalogs.

The InputStream and OutputStream implementations handle direct interaction with Google Cloud Storage, including support for resumable uploads.

Tests for the FileIO implementation include interaction with the OkHttp MockWebServer to verify expected HTTP requests and responses.

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000
  • Pull request contains commits signed with a registered key indicating Verified status

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using ./mvnw clean install -P contrib-check
    • JDK 21
    • JDK 25

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

- Added Iceberg GCS module and NAR
- Added Iceberg FileIO implementation using Java HttpClient for GCS REST operations

try {
final HttpResponse<Void> response = httpClientProvider.send(request, HttpResponse.BodyHandlers.discarding());
return response.statusCode() == HTTP_OK;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it risky here to only check for HTTP_OK? what if you get something like error 500 or error 429 and assume that the file does not exist and you then overwrite the file (I mean OutputFile.create() path, which calls toInputFile().exists() to guard against overwriting an existing file).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point, I'll make an adjustment to throw an exception for status codes other than 200 and 404.

*/
@Override
public void close() {
httpClientProvider.close();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a null check here? (if close() is called before initialize() is re-invoked)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although initialize() is required before use, yes, adding a null check is a reasonable safeguard.

Comment on lines +52 to +54
try {
return httpClient.send(request, bodyHandler);
} catch (final InterruptedException e) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we implement retry with backoff for transient failures (429, 500, 503, ...)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it looks like that would more closely align with the Google Cloud Storage library behavior, I will implement some basic retry and backoff strategy.

static final String DEFAULT_SERVICE_HOST = "https://storage.googleapis.com";
static final int DEFAULT_WRITE_CHUNK_SIZE = 8 * 1024 * 1024;
static final int DEFAULT_READ_CHUNK_SIZE = 2 * 1024 * 1024;
static final int MINIMUM_CHUNK_SIZE = 256 * 1024;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never seem to be enforcing this compared to what we get with WRITE_CHUNK_SIZE_BYTES. I understand that if the catalog is well configured that should not be an issue but thought I'd flag.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, that was leftover from some earlier iteration. I will remove this value to avoid confusion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants