feat(dataproc): Add dataproc source and list/get clusters/jobs tools#2407
feat(dataproc): Add dataproc source and list/get clusters/jobs tools#2407dborowitz wants to merge 5 commits intogoogleapis:mainfrom
Conversation
Summary of ChangesHello @dborowitz, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly expands the system's capabilities by integrating with Google Cloud Dataproc. It provides a new data source and a suite of tools that enable users to programmatically interact with Dataproc clusters and jobs, offering functionalities such as retrieving specific resource details and listing resources with filtering options. This enhancement mirrors existing functionalities for Serverless Spark, providing a consistent experience for managing big data processing environments. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a new Dataproc source and associated tools for interacting with Dataproc clusters and jobs. While the implementation is generally solid, several critical security and stability issues have been identified. Specifically, the URL generation for Cloud Logging is vulnerable to filter injection if resource names contain double quotes, and the Invoke methods in the list tools use unsafe type assertions on user-supplied parameters, which can lead to application panics and Denial of Service (DoS). Additionally, there are areas for improvement regarding resource handling, data correctness in one of the tools, and inconsistencies in documentation and tests. Addressing these points will significantly enhance the robustness, clarity, and security of the new features.
internal/tools/dataproc/dataproclistclusters/dataproclistclusters.go
Outdated
Show resolved
Hide resolved
internal/tools/dataproc/dataproclistclusters/dataproclistclusters.go
Outdated
Show resolved
Hide resolved
|
Integration tests are failing because the Cloud Build SA is missing Dataproc Viewer/Editor IAM role. |
182297a to
d0c3c41
Compare
|
I made a mistake basing my original PR on a several-week-old local repo without syncing first. I cribbed from the Serverless implementation, but missed this important refactoring, guess I need to go rewrite this now... |
Done, as a separate commit. I would have tried harder to change the old commits in order if the tests weren't already passing, but they were, this is a pure refactoring. |
Very closely analogous to the serverless-spark source and serverless-spark-list-batches tool; these are separate APIs for two closely related GCP products. There are of course minor differences in the APIs, for example Dataproc genearlly supports only regions, not general locations. One wrinkle is that a KI with the list jobs RPC causes it to be very slow in a project with many serverless batches (like the test project), unless filtering by cluster. This is mentioned in the param description so LLMs can provide it; in the tests, we always add it based on an env var. Unlike other env vars, the cluster name in the test project is arbitrary but not a secret.
The previous commits adding Dataproc tool implementations were based on an old version of main, before rebasing past 0691a6f, which refactored serverlessspark to use the new pattern of putting business logic into the Source. This commit refactors all Dataproc tools at once, rather than refactoring incrementally and dealing with the rebase of the commits in the pending PR.
| and `labels`. For example: `status.state = ACTIVE AND clusterName = mycluster`. | ||
| Supported `status.state` values are: `ACTIVE`, `INACTIVE`, `CREATING`, `RUNNING`, | ||
| `ERROR`, `DELETING`, `UPDATING`, `STOPPING`, `STOPPED`. | ||
| - **`pageSize`** (optional): The maximum number of clusters to return in a single |
There was a problem hiding this comment.
Can we also specify the default value here?
| func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, params parameters.ParamValues, accessToken tools.AccessToken) (any, error) { | ||
| source, err := tools.GetCompatibleSource[compatibleSource](resourceMgr, t.Config.Source, t.Name, kind) | ||
| if err != nil { | ||
| return nil, err |
There was a problem hiding this comment.
We just had a refactor of the Tool Invoke() function. We need to return util.ClientServerError for developer errors and util.AgentError for agent errors that can be self-corrected. Let me know if there's confusion around it.
| paramMap := params.AsMap() | ||
| name, ok := paramMap["clusterName"].(string) | ||
| if !ok { | ||
| return nil, fmt.Errorf("missing required parameter: clusterName") |
There was a problem hiding this comment.
| return nil, fmt.Errorf("missing required parameter: clusterName") | |
| return nil, util.NewAgentError("invalid or missing 'clusterName' parameter", nil) |
| func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, params parameters.ParamValues, accessToken tools.AccessToken) (any, error) { | ||
| source, err := tools.GetCompatibleSource[compatibleSource](resourceMgr, t.Config.Source, t.Name, kind) | ||
| if err != nil { | ||
| return nil, err |
There was a problem hiding this comment.
| return nil, err | |
| return nil, util.NewClientServerError("source used is not compatible with the tool", http.StatusInternalServerError, err) |
| return nil, fmt.Errorf("missing required parameter: clusterName") | ||
| } | ||
| if strings.Contains(name, "/") { | ||
| return nil, fmt.Errorf("clusterName must be a short name without '/': %s", name) |
There was a problem hiding this comment.
return nil, util.NewAgentError(fmt.Sprintf("clusterName must be a short name without '/': %s", name), nil)
| } | ||
|
|
||
| // Invoke executes the tool's operation. | ||
| func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, params parameters.ParamValues, accessToken tools.AccessToken) (any, error) { |
There was a problem hiding this comment.
| func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, params parameters.ParamValues, accessToken tools.AccessToken) (any, error) { | |
| func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, params parameters.ParamValues, accessToken tools.AccessToken) (any, util.ToolboxError) { |
|
Hi @dborowitz, other than some refactor and nits, the PR LGTM. Let me know when the updates are made and I'll approve it. Thanks! |
Description
Add a new source for Dataproc, which is closely related to Serverless Spark. Similar to get/list batches, we have get/list clusters and jobs, with minor API differences.
PR Checklist
CONTRIBUTING.md
bug/issue
before writing your code! That way we can discuss the change, evaluate
designs, and agree on the general idea
!if this involve a breaking change🛠️ Part of #2405