From d0ff6def453b73810d7b3ef2f295af9e0fe08ad5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dominik=20D=C4=99bowczyk?= Date: Fri, 12 Jun 2026 17:26:21 +0200 Subject: [PATCH 1/4] [runners-flink] Add support for Apache Flink 2.1.3 --- .../test-properties.json | 2 +- .../run_rc_validation_java_quickstart.yml | 2 +- gradle.properties | 2 +- runners/flink/2.1/build.gradle | 51 +++++++++++++++++++ .../2.1/job-server-container/build.gradle | 26 ++++++++++ runners/flink/2.1/job-server/build.gradle | 39 ++++++++++++++ .../src/test/resources/flink-test-config.yaml | 27 ++++++++++ sdks/go/examples/wasm/README.md | 6 +-- sdks/go/test/build.gradle | 2 +- .../apache_beam/options/pipeline_options.py | 2 +- .../src/apache_beam/runners/flink.ts | 2 +- 11 files changed, 152 insertions(+), 9 deletions(-) create mode 100644 runners/flink/2.1/build.gradle create mode 100644 runners/flink/2.1/job-server-container/build.gradle create mode 100644 runners/flink/2.1/job-server/build.gradle create mode 100644 runners/flink/2.1/src/test/resources/flink-test-config.yaml diff --git a/.github/actions/setup-default-test-properties/test-properties.json b/.github/actions/setup-default-test-properties/test-properties.json index f06de5174e6c..7a3f9890a294 100644 --- a/.github/actions/setup-default-test-properties/test-properties.json +++ b/.github/actions/setup-default-test-properties/test-properties.json @@ -14,7 +14,7 @@ }, "JavaTestProperties": { "SUPPORTED_VERSIONS": ["8", "11", "17", "21", "25"], - "FLINK_VERSIONS": ["1.17", "1.18", "1.19", "1.20", "2.0"], + "FLINK_VERSIONS": ["1.17", "1.18", "1.19", "1.20", "2.0", "2.1"], "SPARK_VERSIONS": ["3"] }, "GoTestProperties": { diff --git a/.github/workflows/run_rc_validation_java_quickstart.yml b/.github/workflows/run_rc_validation_java_quickstart.yml index dce9b7f3fedb..41a6991d14ee 100644 --- a/.github/workflows/run_rc_validation_java_quickstart.yml +++ b/.github/workflows/run_rc_validation_java_quickstart.yml @@ -88,7 +88,7 @@ jobs: - name: Run QuickStart Java Flink Runner uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :runners:flink:2.0:runQuickstartJavaFlinkLocal + gradle-command: :runners:flink:2.1:runQuickstartJavaFlinkLocal arguments: | -Prepourl=${{ env.APACHE_REPO_URL }} \ -Pver=${{ env.RELEASE_VERSION }} diff --git a/gradle.properties b/gradle.properties index 95e50105a494..0289d02722b3 100644 --- a/gradle.properties +++ b/gradle.properties @@ -39,7 +39,7 @@ docker_image_default_repo_root=apache docker_image_default_repo_prefix=beam_ # supported flink versions -flink_versions=1.17,1.18,1.19,1.20,2.0 +flink_versions=1.17,1.18,1.19,1.20,2.0,2.1 # supported spark versions spark_versions=3,4 # supported python versions diff --git a/runners/flink/2.1/build.gradle b/runners/flink/2.1/build.gradle new file mode 100644 index 000000000000..ced0757c7046 --- /dev/null +++ b/runners/flink/2.1/build.gradle @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +project.ext { + flink_major = '2.1' + flink_version = '2.1.3' + excluded_files = [ + 'main': [ + // Used by DataSet API only + "org/apache/beam/runners/flink/adapter/BeamFlinkDataSetAdapter.java", + "org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java", + "org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java", + "org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java", + "org/apache/beam/runners/flink/translation/functions/FlinkNonMergingReduceFunction.java", + // Moved to org.apache.flink.runtime.state.StateBackendFactory + "org/apache/beam/runners/flink/FlinkStateBackendFactory.java", + ], + 'test': [ + // Used by DataSet API only + "org/apache/beam/runners/flink/adapter/BeamFlinkDataSetAdapterTest.java", + "org/apache/beam/runners/flink/batch/NonMergingGroupByKeyTest.java", + "org/apache/beam/runners/flink/batch/ReshuffleTest.java", + ] + ] +} + +// Load the main build script which contains all build logic. +apply from: "../flink_runner.gradle" + +// Flink 2.1 uses at.yawk.lz4:lz4-java instead of org.lz4:lz4-java +// Resolve capability conflict by preferring Flink's version +configurations.all { + resolutionStrategy.capabilitiesResolution.withCapability('org.lz4:lz4-java') { + selectHighestVersion() + } +} diff --git a/runners/flink/2.1/job-server-container/build.gradle b/runners/flink/2.1/job-server-container/build.gradle new file mode 100644 index 000000000000..afdb68a0fc91 --- /dev/null +++ b/runners/flink/2.1/job-server-container/build.gradle @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '../../job-server-container' + +project.ext { + resource_path = basePath +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_job_server_container.gradle" diff --git a/runners/flink/2.1/job-server/build.gradle b/runners/flink/2.1/job-server/build.gradle new file mode 100644 index 000000000000..6b4517e6ad10 --- /dev/null +++ b/runners/flink/2.1/job-server/build.gradle @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '../../job-server' + +project.ext { + // Look for the source code in the parent module + main_source_dirs = ["$basePath/src/main/java"] + test_source_dirs = ["$basePath/src/test/java"] + main_resources_dirs = ["$basePath/src/main/resources"] + test_resources_dirs = ["$basePath/src/test/resources"] + archives_base_name = 'beam-runners-flink-2.1-job-server' +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_job_server.gradle" + +// Flink 2.1 uses at.yawk.lz4:lz4-java instead of org.lz4:lz4-java +// Resolve capability conflict by preferring Flink's version +configurations.all { + resolutionStrategy.capabilitiesResolution.withCapability('org.lz4:lz4-java') { + selectHighestVersion() + } +} diff --git a/runners/flink/2.1/src/test/resources/flink-test-config.yaml b/runners/flink/2.1/src/test/resources/flink-test-config.yaml new file mode 100644 index 000000000000..d34264859bce --- /dev/null +++ b/runners/flink/2.1/src/test/resources/flink-test-config.yaml @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +taskmanager: + memory: + network: + max: 2gb + fraction: '0.2' + managed: + size: 1gb +parallelism: + default: '2' diff --git a/sdks/go/examples/wasm/README.md b/sdks/go/examples/wasm/README.md index e4ab54d4a3ed..30fd22f624be 100644 --- a/sdks/go/examples/wasm/README.md +++ b/sdks/go/examples/wasm/README.md @@ -68,13 +68,13 @@ cd $BEAM_HOME Expected output should include the following, from which you acquire the latest flink runner version. ```shell -'flink_versions: 1.17,1.18,1.19,1.20' +'flink_versions: 1.17,1.18,1.19,1.20,2.0,2.1' ``` -#### 2. Set to the latest flink runner version i.e. 1.16 +#### 2. Set to the latest flink runner version i.e. 2.1 ```shell -FLINK_VERSION=1.16 +FLINK_VERSION=2.1 ``` #### 3. In a separate terminal, start the flink runner (It should take a few minutes on the first execution) diff --git a/sdks/go/test/build.gradle b/sdks/go/test/build.gradle index 677134716062..05c019c6d93a 100644 --- a/sdks/go/test/build.gradle +++ b/sdks/go/test/build.gradle @@ -89,7 +89,7 @@ task flinkValidatesRunner { doFirst { // Copy Flink conf file copy { - from "${project.rootDir}/runners/flink/2.0/src/test/resources/flink-test-config.yaml" + from "${project.rootDir}/runners/flink/${flinkVersion}/src/test/resources/flink-test-config.yaml" into "${project.buildDir}/flink-conf" // Rename the file during the copy process diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index c813939d53f1..978a79bf6172 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -2106,7 +2106,7 @@ def _add_argparse_args(cls, parser): class FlinkRunnerOptions(PipelineOptions): # These should stay in sync with gradle.properties. - PUBLISHED_FLINK_VERSIONS = ['1.17', '1.18', '1.19', '1.20', '2.0'] + PUBLISHED_FLINK_VERSIONS = ['1.17', '1.18', '1.19', '1.20', '2.0', '2.1'] @classmethod def _add_argparse_args(cls, parser): diff --git a/sdks/typescript/src/apache_beam/runners/flink.ts b/sdks/typescript/src/apache_beam/runners/flink.ts index 8f80b971da2a..c8f8f57eb080 100644 --- a/sdks/typescript/src/apache_beam/runners/flink.ts +++ b/sdks/typescript/src/apache_beam/runners/flink.ts @@ -28,7 +28,7 @@ import { JavaJarService } from "../utils/service"; const MAGIC_HOST_NAMES = ["[local]", "[auto]"]; // These should stay in sync with gradle.properties. -const PUBLISHED_FLINK_VERSIONS = ["1.17", "1.18", "1.19", "1.20", "2.0"]; +const PUBLISHED_FLINK_VERSIONS = ["1.17", "1.18", "1.19", "1.20", "2.0", "2.1"]; const defaultOptions = { flinkMaster: "[local]", From a1d6aa1597975e66a6b7435dc9d9ebbf0d8f3fba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dominik=20D=C4=99bowczyk?= Date: Mon, 15 Jun 2026 15:38:37 +0200 Subject: [PATCH 2/4] Improve lz4-java dependency resolution strategy Explicitly prefer Flink's at.yawk.lz4:lz4-java over org.lz4:lz4-java to ensure the Flink-compatible version is always selected, regardless of version numbers. This is more robust than selectHighestVersion() which could theoretically select org.lz4 if it had a higher version number. --- runners/flink/2.1/build.gradle | 9 +++++++-- runners/flink/2.1/job-server/build.gradle | 9 +++++++-- .../flink/2.1/src/test/resources/flink-test-config.yaml | 2 +- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/runners/flink/2.1/build.gradle b/runners/flink/2.1/build.gradle index ced0757c7046..e9092c2977f7 100644 --- a/runners/flink/2.1/build.gradle +++ b/runners/flink/2.1/build.gradle @@ -43,9 +43,14 @@ project.ext { apply from: "../flink_runner.gradle" // Flink 2.1 uses at.yawk.lz4:lz4-java instead of org.lz4:lz4-java -// Resolve capability conflict by preferring Flink's version +// Explicitly prefer Flink's at.yawk.lz4 version to resolve capability conflict configurations.all { resolutionStrategy.capabilitiesResolution.withCapability('org.lz4:lz4-java') { - selectHighestVersion() + def candidate = candidates.find { it.id.toString().contains('at.yawk.lz4') } + if (candidate != null) { + select(candidate) + } else { + selectHighestVersion() + } } } diff --git a/runners/flink/2.1/job-server/build.gradle b/runners/flink/2.1/job-server/build.gradle index 6b4517e6ad10..277ddc07fdaf 100644 --- a/runners/flink/2.1/job-server/build.gradle +++ b/runners/flink/2.1/job-server/build.gradle @@ -31,9 +31,14 @@ project.ext { apply from: "$basePath/flink_job_server.gradle" // Flink 2.1 uses at.yawk.lz4:lz4-java instead of org.lz4:lz4-java -// Resolve capability conflict by preferring Flink's version +// Explicitly prefer Flink's at.yawk.lz4 version to resolve capability conflict configurations.all { resolutionStrategy.capabilitiesResolution.withCapability('org.lz4:lz4-java') { - selectHighestVersion() + def candidate = candidates.find { it.id.toString().contains('at.yawk.lz4') } + if (candidate != null) { + select(candidate) + } else { + selectHighestVersion() + } } } diff --git a/runners/flink/2.1/src/test/resources/flink-test-config.yaml b/runners/flink/2.1/src/test/resources/flink-test-config.yaml index d34264859bce..d34134695dd6 100644 --- a/runners/flink/2.1/src/test/resources/flink-test-config.yaml +++ b/runners/flink/2.1/src/test/resources/flink-test-config.yaml @@ -24,4 +24,4 @@ taskmanager: managed: size: 1gb parallelism: - default: '2' + default: '23' From b7b4aa59d55377709b244a64a7116b9abd65ea86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dominik=20D=C4=99bowczyk?= Date: Tue, 16 Jun 2026 15:36:49 +0200 Subject: [PATCH 3/4] Fix lz4-java capability conflict for examples:java flinkRunnerPreCommit Flink 2.1.3 uses at.yawk.lz4:lz4-java:1.10.3 while Kafka uses org.lz4:lz4-java:1.6.0, causing capability conflicts in configurations that depend on both (like examples:java flinkRunnerPreCommit). Add capability resolution strategy to select the highest version, consistent with the approach in runners/flink/2.1/build.gradle. --- examples/java/common.gradle | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/examples/java/common.gradle b/examples/java/common.gradle index 10ea43628bc8..0e800e7cf987 100644 --- a/examples/java/common.gradle +++ b/examples/java/common.gradle @@ -35,6 +35,16 @@ configurations.sparkRunnerPreCommit { exclude group: "org.slf4j", module: "jul-to-slf4j" exclude group: "org.slf4j", module: "slf4j-jdk14" } +configurations.flinkRunnerPreCommit { + resolutionStrategy.capabilitiesResolution.withCapability("org.lz4:lz4-java") { + def candidate = candidates.find { it.id.toString().contains('at.yawk.lz4') } + if (candidate != null) { + select(candidate) + } else { + selectHighestVersion() + } + } +} dependencies { directRunnerPreCommit project(path: ":runners:direct-java", configuration: "shadow") From b20493135d1f30da929076802a82d884c99bfb76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dominik=20D=C4=99bowczyk?= Date: Wed, 17 Jun 2026 10:51:51 +0200 Subject: [PATCH 4/4] Remove unnecessary test resources for Flink 2.1 Flink 2.1 inherits test resources from parent versions via Beam's resource layering mechanism. No version-specific config needed. --- .test-infra/validate-runner/build.gradle | 13 +++++++++ .../src/test/resources/flink-test-config.yaml | 27 ------------------- 2 files changed, 13 insertions(+), 27 deletions(-) delete mode 100644 runners/flink/2.1/src/test/resources/flink-test-config.yaml diff --git a/.test-infra/validate-runner/build.gradle b/.test-infra/validate-runner/build.gradle index 1817d7014a6c..3992abb24dd4 100644 --- a/.test-infra/validate-runner/build.gradle +++ b/.test-infra/validate-runner/build.gradle @@ -31,6 +31,19 @@ repositories { } } +// Flink 2.1+ uses at.yawk.lz4:lz4-java while Spark uses org.lz4:lz4-java +// Resolve capability conflict by preferring Flink's version when both are present +configurations.all { + resolutionStrategy.capabilitiesResolution.withCapability('org.lz4:lz4-java') { + def candidate = candidates.find { it.id.toString().contains('at.yawk.lz4') } + if (candidate != null) { + select(candidate) + } else { + selectHighestVersion() + } + } +} + dependencies { implementation 'com.offbytwo.jenkins:jenkins-client:0.3.8' implementation library.java.jackson_databind diff --git a/runners/flink/2.1/src/test/resources/flink-test-config.yaml b/runners/flink/2.1/src/test/resources/flink-test-config.yaml deleted file mode 100644 index d34134695dd6..000000000000 --- a/runners/flink/2.1/src/test/resources/flink-test-config.yaml +++ /dev/null @@ -1,27 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -taskmanager: - memory: - network: - max: 2gb - fraction: '0.2' - managed: - size: 1gb -parallelism: - default: '23'