Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
},
"JavaTestProperties": {
"SUPPORTED_VERSIONS": ["8", "11", "17", "21", "25"],
"FLINK_VERSIONS": ["1.17", "1.18", "1.19", "1.20", "2.0"],
"FLINK_VERSIONS": ["1.17", "1.18", "1.19", "1.20", "2.0", "2.1"],
"SPARK_VERSIONS": ["3"]
},
"GoTestProperties": {
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/run_rc_validation_java_quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ jobs:
- name: Run QuickStart Java Flink Runner
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :runners:flink:2.0:runQuickstartJavaFlinkLocal
gradle-command: :runners:flink:2.1:runQuickstartJavaFlinkLocal
arguments: |
-Prepourl=${{ env.APACHE_REPO_URL }} \
-Pver=${{ env.RELEASE_VERSION }}
Expand Down
13 changes: 13 additions & 0 deletions .test-infra/validate-runner/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ repositories {
}
}

// Flink 2.1+ uses at.yawk.lz4:lz4-java while Spark uses org.lz4:lz4-java
// Resolve capability conflict by preferring Flink's version when both are present
configurations.all {
resolutionStrategy.capabilitiesResolution.withCapability('org.lz4:lz4-java') {
def candidate = candidates.find { it.id.toString().contains('at.yawk.lz4') }
if (candidate != null) {
select(candidate)
} else {
selectHighestVersion()
}
}
}

dependencies {
implementation 'com.offbytwo.jenkins:jenkins-client:0.3.8'
implementation library.java.jackson_databind
Expand Down
10 changes: 10 additions & 0 deletions examples/java/common.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ configurations.sparkRunnerPreCommit {
exclude group: "org.slf4j", module: "jul-to-slf4j"
exclude group: "org.slf4j", module: "slf4j-jdk14"
}
configurations.flinkRunnerPreCommit {

@Abacn Abacn Jun 16, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this chunk do?

Is it due to https://issues.apache.org/jira/browse/FLINK-38764?

We don't need this workaround in examples/java. If we really want to patch it, it should happens in module that ships for production (runners/flink/2.1). Here it's just Java examples where handling CVEs doesn't adds much value

If it's required to make Flink 2.1 working it's more concerning, as it indicates users would need the same hack here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that Flink 2.1+ comes with at.yawk.lz4:lz4-java instead of org.lz4:lz4-java. This becomes an issue in modules where both libraries are present; this change simply favors the former.

> Could not resolve org.lz4:lz4-java:1.8.0.
         > Module 'org.lz4:lz4-java' has been rejected:
              Cannot select module with conflict on capability 'org.lz4:lz4-java:1.8.0' also provided by [at.yawk.lz4:lz4-java:1.10.3(runtimeElements)]
      > Could not resolve at.yawk.lz4:lz4-java:1.10.3.
         > Module 'at.yawk.lz4:lz4-java' has been rejected:
              Cannot select module with conflict on capability 'org.lz4:lz4-java:1.10.3' also provided by [org.lz4:lz4-java:1.8.0(runtime)]

In the case of this specific module, the conflict arises because of the kafka-clients dependency, which brings in the old version of org.lz4.

The parents of the clashing dependencies can be checked via this command:
./gradlew :examples:java:dependencies --configuration flinkRunnerPreCommit | grep -B2 "org.lz4:lz4-java:1.6.0"

I made these changes to pass the ./gradlew :javaPreCommit CICD check

resolutionStrategy.capabilitiesResolution.withCapability("org.lz4:lz4-java") {
def candidate = candidates.find { it.id.toString().contains('at.yawk.lz4') }
if (candidate != null) {
select(candidate)
} else {
selectHighestVersion()
}
}
}

dependencies {
directRunnerPreCommit project(path: ":runners:direct-java", configuration: "shadow")
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ docker_image_default_repo_root=apache
docker_image_default_repo_prefix=beam_

# supported flink versions
flink_versions=1.17,1.18,1.19,1.20,2.0
flink_versions=1.17,1.18,1.19,1.20,2.0,2.1
# supported spark versions
spark_versions=3,4
# supported python versions
Expand Down
56 changes: 56 additions & 0 deletions runners/flink/2.1/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

project.ext {
flink_major = '2.1'
flink_version = '2.1.3'
excluded_files = [
'main': [
// Used by DataSet API only
"org/apache/beam/runners/flink/adapter/BeamFlinkDataSetAdapter.java",
"org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java",
"org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java",
"org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java",
"org/apache/beam/runners/flink/translation/functions/FlinkNonMergingReduceFunction.java",
// Moved to org.apache.flink.runtime.state.StateBackendFactory
"org/apache/beam/runners/flink/FlinkStateBackendFactory.java",
],
'test': [
// Used by DataSet API only
"org/apache/beam/runners/flink/adapter/BeamFlinkDataSetAdapterTest.java",
"org/apache/beam/runners/flink/batch/NonMergingGroupByKeyTest.java",
"org/apache/beam/runners/flink/batch/ReshuffleTest.java",
]
]
}

// Load the main build script which contains all build logic.
apply from: "../flink_runner.gradle"

// Flink 2.1 uses at.yawk.lz4:lz4-java instead of org.lz4:lz4-java
// Explicitly prefer Flink's at.yawk.lz4 version to resolve capability conflict
configurations.all {
resolutionStrategy.capabilitiesResolution.withCapability('org.lz4:lz4-java') {
def candidate = candidates.find { it.id.toString().contains('at.yawk.lz4') }
if (candidate != null) {
select(candidate)
} else {
selectHighestVersion()
}
}
}
Comment thread
ddebowczyk92 marked this conversation as resolved.
26 changes: 26 additions & 0 deletions runners/flink/2.1/job-server-container/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '../../job-server-container'

project.ext {
resource_path = basePath
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_job_server_container.gradle"
44 changes: 44 additions & 0 deletions runners/flink/2.1/job-server/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '../../job-server'

project.ext {
// Look for the source code in the parent module
main_source_dirs = ["$basePath/src/main/java"]
test_source_dirs = ["$basePath/src/test/java"]
main_resources_dirs = ["$basePath/src/main/resources"]
test_resources_dirs = ["$basePath/src/test/resources"]
archives_base_name = 'beam-runners-flink-2.1-job-server'
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_job_server.gradle"

// Flink 2.1 uses at.yawk.lz4:lz4-java instead of org.lz4:lz4-java
// Explicitly prefer Flink's at.yawk.lz4 version to resolve capability conflict
configurations.all {
resolutionStrategy.capabilitiesResolution.withCapability('org.lz4:lz4-java') {
def candidate = candidates.find { it.id.toString().contains('at.yawk.lz4') }
if (candidate != null) {
select(candidate)
} else {
selectHighestVersion()
}
}
}
Comment thread
ddebowczyk92 marked this conversation as resolved.
6 changes: 3 additions & 3 deletions sdks/go/examples/wasm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ cd $BEAM_HOME
Expected output should include the following, from which you acquire the latest flink runner version.

```shell
'flink_versions: 1.17,1.18,1.19,1.20'
'flink_versions: 1.17,1.18,1.19,1.20,2.0,2.1'
```

#### 2. Set to the latest flink runner version i.e. 1.16
#### 2. Set to the latest flink runner version i.e. 2.1

```shell
FLINK_VERSION=1.16
FLINK_VERSION=2.1
```

#### 3. In a separate terminal, start the flink runner (It should take a few minutes on the first execution)
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ task flinkValidatesRunner {
doFirst {
// Copy Flink conf file
copy {
from "${project.rootDir}/runners/flink/2.0/src/test/resources/flink-test-config.yaml"
from "${project.rootDir}/runners/flink/${flinkVersion}/src/test/resources/flink-test-config.yaml"
into "${project.buildDir}/flink-conf"

// Rename the file during the copy process
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -2106,7 +2106,7 @@ def _add_argparse_args(cls, parser):
class FlinkRunnerOptions(PipelineOptions):

# These should stay in sync with gradle.properties.
PUBLISHED_FLINK_VERSIONS = ['1.17', '1.18', '1.19', '1.20', '2.0']
PUBLISHED_FLINK_VERSIONS = ['1.17', '1.18', '1.19', '1.20', '2.0', '2.1']

@classmethod
def _add_argparse_args(cls, parser):
Expand Down
2 changes: 1 addition & 1 deletion sdks/typescript/src/apache_beam/runners/flink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import { JavaJarService } from "../utils/service";
const MAGIC_HOST_NAMES = ["[local]", "[auto]"];

// These should stay in sync with gradle.properties.
const PUBLISHED_FLINK_VERSIONS = ["1.17", "1.18", "1.19", "1.20", "2.0"];
const PUBLISHED_FLINK_VERSIONS = ["1.17", "1.18", "1.19", "1.20", "2.0", "2.1"];

const defaultOptions = {
flinkMaster: "[local]",
Expand Down
Loading