diff --git a/.github/actions/setup-default-test-properties/test-properties.json b/.github/actions/setup-default-test-properties/test-properties.json index 7a3f9890a294..ddb4a8b73ffb 100644 --- a/.github/actions/setup-default-test-properties/test-properties.json +++ b/.github/actions/setup-default-test-properties/test-properties.json @@ -14,7 +14,7 @@ }, "JavaTestProperties": { "SUPPORTED_VERSIONS": ["8", "11", "17", "21", "25"], - "FLINK_VERSIONS": ["1.17", "1.18", "1.19", "1.20", "2.0", "2.1"], + "FLINK_VERSIONS": ["1.19", "1.20", "2.0", "2.1"], "SPARK_VERSIONS": ["3"] }, "GoTestProperties": { diff --git a/.github/workflows/beam_PreCommit_Flink_Container.yml b/.github/workflows/beam_PreCommit_Flink_Container.yml index 9d4ff5e9636c..1f6c209cfd2b 100644 --- a/.github/workflows/beam_PreCommit_Flink_Container.yml +++ b/.github/workflows/beam_PreCommit_Flink_Container.yml @@ -173,7 +173,7 @@ jobs: gradle-command: :sdks:java:testing:load-tests:run arguments: | -PloadTest.mainClass=org.apache.beam.sdk.loadtests.CombineLoadTest \ - -Prunner=:runners:flink:1.17 \ + -Prunner=:runners:flink:2.0 \ '-PloadTest.args=${{ env.beam_PreCommit_Flink_Container_test_arguments_3 }} --jobName=flink-tests-java11-${{env.NOW_UTC}}' - name: Teardown Flink diff --git a/gradle.properties b/gradle.properties index 0289d02722b3..c20a8373cb60 100644 --- a/gradle.properties +++ b/gradle.properties @@ -39,7 +39,7 @@ docker_image_default_repo_root=apache docker_image_default_repo_prefix=beam_ # supported flink versions -flink_versions=1.17,1.18,1.19,1.20,2.0,2.1 +flink_versions=1.19,1.20,2.0,2.1 # supported spark versions spark_versions=3,4 # supported python versions diff --git a/runners/flink/1.17/build.gradle b/runners/flink/1.17/build.gradle deleted file mode 100644 index ae69b879eba9..000000000000 --- a/runners/flink/1.17/build.gradle +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -project.ext { - flink_major = '1.17' - flink_version = '1.17.0' -} - -// Load the main build script which contains all build logic. -apply from: "../flink_runner.gradle" diff --git a/runners/flink/1.17/job-server-container/build.gradle b/runners/flink/1.17/job-server-container/build.gradle deleted file mode 100644 index afdb68a0fc91..000000000000 --- a/runners/flink/1.17/job-server-container/build.gradle +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -def basePath = '../../job-server-container' - -project.ext { - resource_path = basePath -} - -// Load the main build script which contains all build logic. -apply from: "$basePath/flink_job_server_container.gradle" diff --git a/runners/flink/1.17/job-server/build.gradle b/runners/flink/1.17/job-server/build.gradle deleted file mode 100644 index 89915349ae9a..000000000000 --- a/runners/flink/1.17/job-server/build.gradle +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -def basePath = '../../job-server' - -project.ext { - // Look for the source code in the parent module - main_source_dirs = ["$basePath/src/main/java"] - test_source_dirs = ["$basePath/src/test/java"] - main_resources_dirs = ["$basePath/src/main/resources"] - test_resources_dirs = ["$basePath/src/test/resources"] - archives_base_name = 'beam-runners-flink-1.17-job-server' -} - -// Load the main build script which contains all build logic. -apply from: "$basePath/flink_job_server.gradle" diff --git a/runners/flink/1.18/build.gradle b/runners/flink/1.18/build.gradle deleted file mode 100644 index ab6e6b63b773..000000000000 --- a/runners/flink/1.18/build.gradle +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -project.ext { - flink_major = '1.18' - flink_version = '1.18.0' -} - -// Load the main build script which contains all build logic. -apply from: "../flink_runner.gradle" diff --git a/runners/flink/1.18/job-server-container/build.gradle b/runners/flink/1.18/job-server-container/build.gradle deleted file mode 100644 index afdb68a0fc91..000000000000 --- a/runners/flink/1.18/job-server-container/build.gradle +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -def basePath = '../../job-server-container' - -project.ext { - resource_path = basePath -} - -// Load the main build script which contains all build logic. -apply from: "$basePath/flink_job_server_container.gradle" diff --git a/runners/flink/1.18/job-server/build.gradle b/runners/flink/1.18/job-server/build.gradle deleted file mode 100644 index e70fdcc0c581..000000000000 --- a/runners/flink/1.18/job-server/build.gradle +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -def basePath = '../../job-server' - -project.ext { - // Look for the source code in the parent module - main_source_dirs = ["$basePath/src/main/java"] - test_source_dirs = ["$basePath/src/test/java"] - main_resources_dirs = ["$basePath/src/main/resources"] - test_resources_dirs = ["$basePath/src/test/resources"] - archives_base_name = 'beam-runners-flink-1.18-job-server' -} - -// Load the main build script which contains all build logic. -apply from: "$basePath/flink_job_server.gradle" diff --git a/runners/flink/1.19/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java b/runners/flink/1.19/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java deleted file mode 100644 index cbaa6fd3a8c4..000000000000 --- a/runners/flink/1.19/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.streaming; - -import java.util.Collection; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.fs.CloseableRegistry; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.BackendBuildingException; -import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl; -import org.apache.flink.runtime.state.KeyedStateHandle; -import org.apache.flink.runtime.state.OperatorStateBackend; -import org.apache.flink.runtime.state.OperatorStateBackendParametersImpl; -import org.apache.flink.runtime.state.OperatorStateHandle; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.apache.flink.runtime.state.ttl.TtlTimeProvider; - -class MemoryStateBackendWrapper { - static AbstractKeyedStateBackend createKeyedStateBackend( - Environment env, - JobID jobID, - String operatorIdentifier, - TypeSerializer keySerializer, - int numberOfKeyGroups, - KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry, - TtlTimeProvider ttlTimeProvider, - MetricGroup metricGroup, - Collection stateHandles, - CloseableRegistry cancelStreamRegistry) - throws BackendBuildingException { - - MemoryStateBackend backend = new MemoryStateBackend(); - return backend.createKeyedStateBackend( - new KeyedStateBackendParametersImpl<>( - env, - jobID, - operatorIdentifier, - keySerializer, - numberOfKeyGroups, - keyGroupRange, - kvStateRegistry, - ttlTimeProvider, - metricGroup, - stateHandles, - cancelStreamRegistry)); - } - - static OperatorStateBackend createOperatorStateBackend( - Environment env, - String operatorIdentifier, - Collection stateHandles, - CloseableRegistry cancelStreamRegistry) - throws Exception { - MemoryStateBackend backend = new MemoryStateBackend(); - return backend.createOperatorStateBackend( - new OperatorStateBackendParametersImpl( - env, operatorIdentifier, stateHandles, cancelStreamRegistry)); - } -} diff --git a/runners/flink/1.19/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java b/runners/flink/1.19/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java deleted file mode 100644 index 793959c5e693..000000000000 --- a/runners/flink/1.19/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.streaming; - -import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.api.operators.StreamSource; -import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorChain; -import org.apache.flink.streaming.runtime.tasks.RegularOperatorChain; -import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; - -/** {@link StreamSource} utilities, that bridge incompatibilities between Flink releases. */ -public class StreamSources { - - public static > void run( - StreamSource streamSource, - Object lockingObject, - Output> collector) - throws Exception { - streamSource.run(lockingObject, collector, createOperatorChain(streamSource)); - } - - private static OperatorChain createOperatorChain(AbstractStreamOperator operator) { - return new RegularOperatorChain<>( - operator.getContainingTask(), - StreamTask.createRecordWriterDelegate( - operator.getOperatorConfig(), new MockEnvironmentBuilder().build())); - } - - /** The emitWatermarkStatus method was added in Flink 1.14, so we need to wrap Output. */ - public interface OutputWrapper extends Output { - @Override - default void emitWatermarkStatus(WatermarkStatus watermarkStatus) {} - - /** In Flink 1.19 the {@code recordAttributes} method was added. */ - @Override - default void emitRecordAttributes(RecordAttributes recordAttributes) { - throw new UnsupportedOperationException("emitRecordAttributes not implemented"); - } - } -} diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java index 7317788a72ee..cbaa6fd3a8c4 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java @@ -27,8 +27,10 @@ import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.BackendBuildingException; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.OperatorStateBackendParametersImpl; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; @@ -50,17 +52,18 @@ static AbstractKeyedStateBackend createKeyedStateBackend( MemoryStateBackend backend = new MemoryStateBackend(); return backend.createKeyedStateBackend( - env, - jobID, - operatorIdentifier, - keySerializer, - numberOfKeyGroups, - keyGroupRange, - kvStateRegistry, - ttlTimeProvider, - metricGroup, - stateHandles, - cancelStreamRegistry); + new KeyedStateBackendParametersImpl<>( + env, + jobID, + operatorIdentifier, + keySerializer, + numberOfKeyGroups, + keyGroupRange, + kvStateRegistry, + ttlTimeProvider, + metricGroup, + stateHandles, + cancelStreamRegistry)); } static OperatorStateBackend createOperatorStateBackend( @@ -71,6 +74,7 @@ static OperatorStateBackend createOperatorStateBackend( throws Exception { MemoryStateBackend backend = new MemoryStateBackend(); return backend.createOperatorStateBackend( - env, operatorIdentifier, stateHandles, cancelStreamRegistry); + new OperatorStateBackendParametersImpl( + env, operatorIdentifier, stateHandles, cancelStreamRegistry)); } } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java index d1e38549d8ed..793959c5e693 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java @@ -22,6 +22,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.OperatorChain; import org.apache.flink.streaming.runtime.tasks.RegularOperatorChain; @@ -50,5 +51,11 @@ public static > void run( public interface OutputWrapper extends Output { @Override default void emitWatermarkStatus(WatermarkStatus watermarkStatus) {} + + /** In Flink 1.19 the {@code recordAttributes} method was added. */ + @Override + default void emitRecordAttributes(RecordAttributes recordAttributes) { + throw new UnsupportedOperationException("emitRecordAttributes not implemented"); + } } } diff --git a/sdks/go/examples/wasm/README.md b/sdks/go/examples/wasm/README.md index 30fd22f624be..deb2e9196f41 100644 --- a/sdks/go/examples/wasm/README.md +++ b/sdks/go/examples/wasm/README.md @@ -68,7 +68,7 @@ cd $BEAM_HOME Expected output should include the following, from which you acquire the latest flink runner version. ```shell -'flink_versions: 1.17,1.18,1.19,1.20,2.0,2.1' +'flink_versions: 1.19,1.20,2.0,2.1' ``` #### 2. Set to the latest flink runner version i.e. 2.1 diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 978a79bf6172..8bd220776312 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -2106,7 +2106,7 @@ def _add_argparse_args(cls, parser): class FlinkRunnerOptions(PipelineOptions): # These should stay in sync with gradle.properties. - PUBLISHED_FLINK_VERSIONS = ['1.17', '1.18', '1.19', '1.20', '2.0', '2.1'] + PUBLISHED_FLINK_VERSIONS = ['1.19', '1.20', '2.0', '2.1'] @classmethod def _add_argparse_args(cls, parser): diff --git a/sdks/typescript/src/apache_beam/runners/flink.ts b/sdks/typescript/src/apache_beam/runners/flink.ts index c8f8f57eb080..bb5137b8f9b8 100644 --- a/sdks/typescript/src/apache_beam/runners/flink.ts +++ b/sdks/typescript/src/apache_beam/runners/flink.ts @@ -28,7 +28,7 @@ import { JavaJarService } from "../utils/service"; const MAGIC_HOST_NAMES = ["[local]", "[auto]"]; // These should stay in sync with gradle.properties. -const PUBLISHED_FLINK_VERSIONS = ["1.17", "1.18", "1.19", "1.20", "2.0", "2.1"]; +const PUBLISHED_FLINK_VERSIONS = ["1.19", "1.20", "2.0", "2.1"]; const defaultOptions = { flinkMaster: "[local]",