diff --git a/.github/actions/setup-default-test-properties/test-properties.json b/.github/actions/setup-default-test-properties/test-properties.json index f06de5174e6c..7a3f9890a294 100644 --- a/.github/actions/setup-default-test-properties/test-properties.json +++ b/.github/actions/setup-default-test-properties/test-properties.json @@ -14,7 +14,7 @@ }, "JavaTestProperties": { "SUPPORTED_VERSIONS": ["8", "11", "17", "21", "25"], - "FLINK_VERSIONS": ["1.17", "1.18", "1.19", "1.20", "2.0"], + "FLINK_VERSIONS": ["1.17", "1.18", "1.19", "1.20", "2.0", "2.1"], "SPARK_VERSIONS": ["3"] }, "GoTestProperties": { diff --git a/.github/workflows/run_rc_validation_java_quickstart.yml b/.github/workflows/run_rc_validation_java_quickstart.yml index dce9b7f3fedb..41a6991d14ee 100644 --- a/.github/workflows/run_rc_validation_java_quickstart.yml +++ b/.github/workflows/run_rc_validation_java_quickstart.yml @@ -88,7 +88,7 @@ jobs: - name: Run QuickStart Java Flink Runner uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :runners:flink:2.0:runQuickstartJavaFlinkLocal + gradle-command: :runners:flink:2.1:runQuickstartJavaFlinkLocal arguments: | -Prepourl=${{ env.APACHE_REPO_URL }} \ -Pver=${{ env.RELEASE_VERSION }} diff --git a/.test-infra/validate-runner/build.gradle b/.test-infra/validate-runner/build.gradle index 1817d7014a6c..3992abb24dd4 100644 --- a/.test-infra/validate-runner/build.gradle +++ b/.test-infra/validate-runner/build.gradle @@ -31,6 +31,19 @@ repositories { } } +// Flink 2.1+ uses at.yawk.lz4:lz4-java while Spark uses org.lz4:lz4-java +// Resolve capability conflict by preferring Flink's version when both are present +configurations.all { + resolutionStrategy.capabilitiesResolution.withCapability('org.lz4:lz4-java') { + def candidate = candidates.find { it.id.toString().contains('at.yawk.lz4') } + if (candidate != null) { + select(candidate) + } else { + selectHighestVersion() + } + } +} + dependencies { implementation 'com.offbytwo.jenkins:jenkins-client:0.3.8' implementation library.java.jackson_databind diff --git a/examples/java/common.gradle b/examples/java/common.gradle index 10ea43628bc8..0e800e7cf987 100644 --- a/examples/java/common.gradle +++ b/examples/java/common.gradle @@ -35,6 +35,16 @@ configurations.sparkRunnerPreCommit { exclude group: "org.slf4j", module: "jul-to-slf4j" exclude group: "org.slf4j", module: "slf4j-jdk14" } +configurations.flinkRunnerPreCommit { + resolutionStrategy.capabilitiesResolution.withCapability("org.lz4:lz4-java") { + def candidate = candidates.find { it.id.toString().contains('at.yawk.lz4') } + if (candidate != null) { + select(candidate) + } else { + selectHighestVersion() + } + } +} dependencies { directRunnerPreCommit project(path: ":runners:direct-java", configuration: "shadow") diff --git a/gradle.properties b/gradle.properties index 95e50105a494..0289d02722b3 100644 --- a/gradle.properties +++ b/gradle.properties @@ -39,7 +39,7 @@ docker_image_default_repo_root=apache docker_image_default_repo_prefix=beam_ # supported flink versions -flink_versions=1.17,1.18,1.19,1.20,2.0 +flink_versions=1.17,1.18,1.19,1.20,2.0,2.1 # supported spark versions spark_versions=3,4 # supported python versions diff --git a/runners/flink/2.1/build.gradle b/runners/flink/2.1/build.gradle new file mode 100644 index 000000000000..e9092c2977f7 --- /dev/null +++ b/runners/flink/2.1/build.gradle @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +project.ext { + flink_major = '2.1' + flink_version = '2.1.3' + excluded_files = [ + 'main': [ + // Used by DataSet API only + "org/apache/beam/runners/flink/adapter/BeamFlinkDataSetAdapter.java", + "org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java", + "org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java", + "org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java", + "org/apache/beam/runners/flink/translation/functions/FlinkNonMergingReduceFunction.java", + // Moved to org.apache.flink.runtime.state.StateBackendFactory + "org/apache/beam/runners/flink/FlinkStateBackendFactory.java", + ], + 'test': [ + // Used by DataSet API only + "org/apache/beam/runners/flink/adapter/BeamFlinkDataSetAdapterTest.java", + "org/apache/beam/runners/flink/batch/NonMergingGroupByKeyTest.java", + "org/apache/beam/runners/flink/batch/ReshuffleTest.java", + ] + ] +} + +// Load the main build script which contains all build logic. +apply from: "../flink_runner.gradle" + +// Flink 2.1 uses at.yawk.lz4:lz4-java instead of org.lz4:lz4-java +// Explicitly prefer Flink's at.yawk.lz4 version to resolve capability conflict +configurations.all { + resolutionStrategy.capabilitiesResolution.withCapability('org.lz4:lz4-java') { + def candidate = candidates.find { it.id.toString().contains('at.yawk.lz4') } + if (candidate != null) { + select(candidate) + } else { + selectHighestVersion() + } + } +} diff --git a/runners/flink/2.1/job-server-container/build.gradle b/runners/flink/2.1/job-server-container/build.gradle new file mode 100644 index 000000000000..afdb68a0fc91 --- /dev/null +++ b/runners/flink/2.1/job-server-container/build.gradle @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '../../job-server-container' + +project.ext { + resource_path = basePath +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_job_server_container.gradle" diff --git a/runners/flink/2.1/job-server/build.gradle b/runners/flink/2.1/job-server/build.gradle new file mode 100644 index 000000000000..277ddc07fdaf --- /dev/null +++ b/runners/flink/2.1/job-server/build.gradle @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '../../job-server' + +project.ext { + // Look for the source code in the parent module + main_source_dirs = ["$basePath/src/main/java"] + test_source_dirs = ["$basePath/src/test/java"] + main_resources_dirs = ["$basePath/src/main/resources"] + test_resources_dirs = ["$basePath/src/test/resources"] + archives_base_name = 'beam-runners-flink-2.1-job-server' +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_job_server.gradle" + +// Flink 2.1 uses at.yawk.lz4:lz4-java instead of org.lz4:lz4-java +// Explicitly prefer Flink's at.yawk.lz4 version to resolve capability conflict +configurations.all { + resolutionStrategy.capabilitiesResolution.withCapability('org.lz4:lz4-java') { + def candidate = candidates.find { it.id.toString().contains('at.yawk.lz4') } + if (candidate != null) { + select(candidate) + } else { + selectHighestVersion() + } + } +} diff --git a/sdks/go/examples/wasm/README.md b/sdks/go/examples/wasm/README.md index e4ab54d4a3ed..30fd22f624be 100644 --- a/sdks/go/examples/wasm/README.md +++ b/sdks/go/examples/wasm/README.md @@ -68,13 +68,13 @@ cd $BEAM_HOME Expected output should include the following, from which you acquire the latest flink runner version. ```shell -'flink_versions: 1.17,1.18,1.19,1.20' +'flink_versions: 1.17,1.18,1.19,1.20,2.0,2.1' ``` -#### 2. Set to the latest flink runner version i.e. 1.16 +#### 2. Set to the latest flink runner version i.e. 2.1 ```shell -FLINK_VERSION=1.16 +FLINK_VERSION=2.1 ``` #### 3. In a separate terminal, start the flink runner (It should take a few minutes on the first execution) diff --git a/sdks/go/test/build.gradle b/sdks/go/test/build.gradle index 677134716062..05c019c6d93a 100644 --- a/sdks/go/test/build.gradle +++ b/sdks/go/test/build.gradle @@ -89,7 +89,7 @@ task flinkValidatesRunner { doFirst { // Copy Flink conf file copy { - from "${project.rootDir}/runners/flink/2.0/src/test/resources/flink-test-config.yaml" + from "${project.rootDir}/runners/flink/${flinkVersion}/src/test/resources/flink-test-config.yaml" into "${project.buildDir}/flink-conf" // Rename the file during the copy process diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index c813939d53f1..978a79bf6172 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -2106,7 +2106,7 @@ def _add_argparse_args(cls, parser): class FlinkRunnerOptions(PipelineOptions): # These should stay in sync with gradle.properties. - PUBLISHED_FLINK_VERSIONS = ['1.17', '1.18', '1.19', '1.20', '2.0'] + PUBLISHED_FLINK_VERSIONS = ['1.17', '1.18', '1.19', '1.20', '2.0', '2.1'] @classmethod def _add_argparse_args(cls, parser): diff --git a/sdks/typescript/src/apache_beam/runners/flink.ts b/sdks/typescript/src/apache_beam/runners/flink.ts index 8f80b971da2a..c8f8f57eb080 100644 --- a/sdks/typescript/src/apache_beam/runners/flink.ts +++ b/sdks/typescript/src/apache_beam/runners/flink.ts @@ -28,7 +28,7 @@ import { JavaJarService } from "../utils/service"; const MAGIC_HOST_NAMES = ["[local]", "[auto]"]; // These should stay in sync with gradle.properties. -const PUBLISHED_FLINK_VERSIONS = ["1.17", "1.18", "1.19", "1.20", "2.0"]; +const PUBLISHED_FLINK_VERSIONS = ["1.17", "1.18", "1.19", "1.20", "2.0", "2.1"]; const defaultOptions = { flinkMaster: "[local]",