From 7cb8bbe74e450dc00167f25beabce409c3887e62 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 4 Jun 2026 15:03:21 +0000 Subject: [PATCH 1/7] make changes to support delta --- sdks/java/io/expansion-service/build.gradle | 1 + .../apache_beam/yaml/integration_tests.py | 23 +++++++++++++++++++ sdks/python/apache_beam/yaml/standard_io.yaml | 19 +++++++++++++++ sdks/standard_external_transforms.yaml | 18 +++++++++++++++ 4 files changed, 61 insertions(+) diff --git a/sdks/java/io/expansion-service/build.gradle b/sdks/java/io/expansion-service/build.gradle index f7b241a75944..08caa5357640 100644 --- a/sdks/java/io/expansion-service/build.gradle +++ b/sdks/java/io/expansion-service/build.gradle @@ -97,6 +97,7 @@ dependencies { runtimeOnly project(":sdks:java:io:datadog") runtimeOnly project(":sdks:java:io:mongodb") + runtimeOnly project(":sdks:java:io:delta") runtimeOnly library.java.kafka_clients runtimeOnly library.java.slf4j_jdk14 diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index e319a3d3a9bd..b38be32eee05 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -20,6 +20,7 @@ import contextlib import copy import glob +import tempfile import itertools import json import logging @@ -70,6 +71,8 @@ def get_impl(self): import psycopg2 import pytds import sqlalchemy +import pyarrow as pa +import pyarrow.parquet as pq import yaml from apitools.base.py.exceptions import HttpError from google.cloud import pubsub_v1 @@ -618,6 +621,26 @@ def temp_pubsub_emulator(project_id="apache-beam-testing"): yield created_topic_object.name +@contextlib.contextmanager +def temp_delta_table(): + with tempfile.TemporaryDirectory() as temp_dir: + log_dir = os.path.join(temp_dir, "_delta_log") + os.makedirs(log_dir, exist_ok=True) + table_data = pa.table({"name": ["a", "b", "c"]}) + parquet_path = os.path.join(temp_dir, "part-00000.parquet") + pq.write_table(table_data, parquet_path) + file_size = os.path.getsize(parquet_path) + commit_content = ( + '{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}\n' + '{"metaData":{"id":"test-id","format":{"provider":"parquet","options":{}},"schemaString":"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"name\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\",\"partitionColumns\":[],\"configuration\":{},\"createdAt\":123456789}}\n' + f'{{"add":{{"path":"part-00000.parquet","partitionValues":{{}},"size":{file_size},"modificationTime":123456789,"dataChange":true}}}}\n' + ) + commit_file = os.path.join(log_dir, "00000000000000000000.json") + with open(commit_file, "w") as f: + f.write(commit_content) + yield temp_dir + + def replace_recursive(spec, vars): """Recursively replaces string placeholders in a spec with values from vars. diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 520c466b600b..f9b3510f9138 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -470,3 +470,22 @@ 'WriteToMongoDB': 'beam:schematransform:org.apache.beam:mongodb_write:v1' config: gradle_target: 'sdks:java:io:expansion-service:shadowJar' + +#Delta Lake +- type: renaming + transforms: + 'ReadFromDelta': 'ReadFromDelta' + config: + mappings: + 'ReadFromDelta': + table_path: 'table_path' + version: 'version' + timestamp: 'timestamp' + hadoop_config: 'hadoop_config' + underlying_provider: + type: beamJar + transforms: + 'ReadFromDelta': 'beam:schematransform:org.apache.beam:delta_read:v1' + config: + gradle_target: 'sdks:java:io:expansion-service:shadowJar' + diff --git a/sdks/standard_external_transforms.yaml b/sdks/standard_external_transforms.yaml index b9802f11b6cf..f08dbb325a9c 100644 --- a/sdks/standard_external_transforms.yaml +++ b/sdks/standard_external_transforms.yaml @@ -56,6 +56,24 @@ type: str identifier: beam:schematransform:org.apache.beam:datadog_write:v1 name: DatadogWrite + - description: Hadoop configuration properties. + name: hadoop_config + nullable: true + type: map[str, str] + - description: Path of the Delta Lake table. + name: table_path + nullable: false + type: str + - description: Timestamp of the Delta Lake table to read. + name: timestamp + nullable: true + type: str + - description: Version of the Delta Lake table to read. + name: version + nullable: true + type: int64 + identifier: beam:schematransform:org.apache.beam:delta_read:v1 + name: DeltaRead - default_service: sdks:java:io:expansion-service:shadowJar description: 'Outputs a PCollection of Beam Rows, each containing a single INT64 number called "value". The count is produced from the given "start" value and From 5908e88900fb77a195ae6a770ce0266220aab0e2 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 4 Jun 2026 15:03:52 +0000 Subject: [PATCH 2/7] add transformprovider and yaml file --- .../DeltaReadSchemaTransformProviderTest.java | 128 ++++++++++++++++++ sdks/python/apache_beam/yaml/tests/delta.yaml | 34 +++++ 2 files changed, 162 insertions(+) create mode 100644 sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProviderTest.java create mode 100644 sdks/python/apache_beam/yaml/tests/delta.yaml diff --git a/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProviderTest.java b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProviderTest.java new file mode 100644 index 000000000000..6a513cb33edf --- /dev/null +++ b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProviderTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.delta; + +import static org.apache.beam.sdk.io.delta.DeltaReadSchemaTransformProvider.Configuration; +import static org.apache.beam.sdk.io.delta.DeltaReadSchemaTransformProvider.OUTPUT_TAG; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; +import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.parquet.ParquetIO; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link DeltaReadSchemaTransformProvider}. */ +@RunWith(JUnit4.class) +public class DeltaReadSchemaTransformProviderTest { + + @Rule public TestPipeline writePipeline = TestPipeline.create(); + @Rule public TestPipeline readPipeline = TestPipeline.create(); + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testBuildTransformWithRow() { + java.util.Map hadoopConfig = new java.util.HashMap<>(); + hadoopConfig.put("fs.gs.project.id", "test-project"); + + Row config = + Row.withSchema(new DeltaReadSchemaTransformProvider().configurationSchema()) + .withFieldValue("table_path", "/path/to/table") + .withFieldValue("version", 5L) + .withFieldValue("timestamp", "2026-06-04T12:00:00Z") + .withFieldValue("hadoop_config", hadoopConfig) + .build(); + + new DeltaReadSchemaTransformProvider().from(config); + } + + @Test + public void testSimpleScan() throws Exception { + File tableDir = tempFolder.newFolder("delta-table-simple"); + + // 1. Write a Parquet file using Beam + Schema schema = Schema.builder().addField("name", Schema.FieldType.STRING).build(); + Row row = Row.withSchema(schema).addValues("test-name").build(); + + org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(schema); + GenericRecord record = AvroUtils.toGenericRecord(row, avroSchema); + + writePipeline + .apply("Create Input", Create.of(record).withCoder(AvroCoder.of(avroSchema))) + .apply( + "Write Parquet", + FileIO.write() + .via(ParquetIO.sink(avroSchema)) + .to(tableDir.getAbsolutePath() + "/") + .withNaming( + (BoundedWindow window, + PaneInfo paneInfo, + int numShards, + int shardIndex, + Compression compression) -> "part-00000.parquet")); + + writePipeline.run().waitUntilFinish(); + + File parquetFile = new File(tableDir, "part-00000.parquet"); + byte[] fileBytes = Files.readAllBytes(parquetFile.toPath()); + + // 2. Create the Delta log + File logDir = new File(tableDir, "_delta_log"); + logDir.mkdirs(); + File commitFile = new File(logDir, "00000000000000000000.json"); + + String commitContent = + "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}\n" + + "{\"metaData\":{\"id\":\"test-id\",\"format\":{\"provider\":\"parquet\",\"options\":{}},\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"name\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\",\"partitionColumns\":[],\"configuration\":{},\"createdAt\":123456789}}\n" + + "{\"add\":{\"path\":\"part-00000.parquet\",\"partitionValues\":{},\"size\":" + + fileBytes.length + + ",\"modificationTime\":123456789,\"dataChange\":true}}"; + + Files.write(commitFile.toPath(), commitContent.getBytes(StandardCharsets.UTF_8)); + + // 3. Read it using DeltaReadSchemaTransformProvider + Configuration readConfig = + Configuration.builder().setTablePath(tableDir.getAbsolutePath()).build(); + + PCollection output = + PCollectionRowTuple.empty(readPipeline) + .apply(new DeltaReadSchemaTransformProvider().from(readConfig)) + .get(OUTPUT_TAG); + + PAssert.that(output).containsInAnyOrder(row); + + readPipeline.run().waitUntilFinish(); + } +} diff --git a/sdks/python/apache_beam/yaml/tests/delta.yaml b/sdks/python/apache_beam/yaml/tests/delta.yaml new file mode 100644 index 000000000000..a38e0750d633 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/delta.yaml @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +fixtures: + - name: DELTA_TABLE + type: "apache_beam.yaml.integration_tests.temp_delta_table" + +pipelines: + - pipeline: + type: chain + transforms: + - type: ReadFromDelta + config: + table_path: "{DELTA_TABLE}" + - type: AssertEqual + config: + elements: + - {name: "a"} + - {name: "b"} + - {name: "c"} From d545063aeef67a93729d2b81e009931716d14467 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 4 Jun 2026 15:32:20 +0000 Subject: [PATCH 3/7] add managed --- .../pipeline/v1/external_transforms.proto | 5 ++++ .../python/apache_beam/transforms/external.py | 1 + sdks/python/apache_beam/transforms/managed.py | 4 +++- sdks/python/apache_beam/yaml/standard_io.yaml | 19 +-------------- sdks/python/apache_beam/yaml/yaml_io.py | 23 +++++++++++++++++++ sdks/standard_expansion_services.yaml | 1 + 6 files changed, 34 insertions(+), 19 deletions(-) diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto index c73986eed48b..f29f2a34920f 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto @@ -88,8 +88,13 @@ message ManagedTransforms { "beam:schematransform:org.apache.beam:sql_server_read:v1"]; SQL_SERVER_WRITE = 12 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:schematransform:org.apache.beam:sql_server_write:v1"]; +<<<<<<< HEAD DELTA_LAKE_READ = 13 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:schematransform:org.apache.beam:delta_lake_read:v1"]; +======= + DELTA_READ = 13 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:delta_read:v1"]; +>>>>>>> 7bce23329c5 (add managed) } } diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index 9469ac717dfc..5bd22c44d485 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -86,6 +86,7 @@ ManagedTransforms.Urns.MYSQL_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, ManagedTransforms.Urns.SQL_SERVER_READ.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long ManagedTransforms.Urns.SQL_SERVER_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long + ManagedTransforms.Urns.DELTA_READ.urn: _IO_EXPANSION_SERVICE_JAR_TARGET, } diff --git a/sdks/python/apache_beam/transforms/managed.py b/sdks/python/apache_beam/transforms/managed.py index 3f1342229ae8..13bdf986ced9 100644 --- a/sdks/python/apache_beam/transforms/managed.py +++ b/sdks/python/apache_beam/transforms/managed.py @@ -88,8 +88,9 @@ POSTGRES = "postgres" MYSQL = "mysql" SQL_SERVER = "sqlserver" +DELTA = "delta" -__all__ = ["ICEBERG", "KAFKA", "BIGQUERY", "Read", "Write"] +__all__ = ["ICEBERG", "KAFKA", "BIGQUERY", "DELTA", "Read", "Write"] class Read(PTransform): @@ -102,6 +103,7 @@ class Read(PTransform): POSTGRES: ManagedTransforms.Urns.POSTGRES_READ.urn, MYSQL: ManagedTransforms.Urns.MYSQL_READ.urn, SQL_SERVER: ManagedTransforms.Urns.SQL_SERVER_READ.urn, + DELTA: ManagedTransforms.Urns.DELTA_READ.urn, } def __init__( diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index f9b3510f9138..a543c6aa5df2 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -116,6 +116,7 @@ 'ReadFromTFRecord': 'apache_beam.yaml.yaml_io.read_from_tfrecord' 'WriteToTFRecord': 'apache_beam.yaml.yaml_io.write_to_tfrecord' 'WriteToMongoDB': 'apache_beam.yaml.yaml_io.write_to_mongodb' + 'ReadFromDelta': 'apache_beam.yaml.yaml_io.read_from_delta' # General File Formats @@ -471,21 +472,3 @@ config: gradle_target: 'sdks:java:io:expansion-service:shadowJar' -#Delta Lake -- type: renaming - transforms: - 'ReadFromDelta': 'ReadFromDelta' - config: - mappings: - 'ReadFromDelta': - table_path: 'table_path' - version: 'version' - timestamp: 'timestamp' - hadoop_config: 'hadoop_config' - underlying_provider: - type: beamJar - transforms: - 'ReadFromDelta': 'beam:schematransform:org.apache.beam:delta_read:v1' - config: - gradle_target: 'sdks:java:io:expansion-service:shadowJar' - diff --git a/sdks/python/apache_beam/yaml/yaml_io.py b/sdks/python/apache_beam/yaml/yaml_io.py index 77cbc41def32..600cc6800512 100644 --- a/sdks/python/apache_beam/yaml/yaml_io.py +++ b/sdks/python/apache_beam/yaml/yaml_io.py @@ -565,6 +565,29 @@ def read_from_iceberg( config_properties=config_properties)) +def read_from_delta( + table_path: str, + version: Optional[int] = None, + timestamp: Optional[str] = None, + hadoop_config: Optional[Mapping[str, str]] = None, +): + """Reads a Delta Lake table. + + Args: + table_path: Path of the Delta Lake table. + version: Version of the Delta Lake table to read. + timestamp: Timestamp of the Delta Lake table to read. + hadoop_config: Hadoop configuration properties. + """ + return beam.managed.Read( + "delta", + config=dict( + table_path=table_path, + version=version, + timestamp=timestamp, + hadoop_config=hadoop_config)) + + def write_to_iceberg( table: str, catalog_name: Optional[str] = None, diff --git a/sdks/standard_expansion_services.yaml b/sdks/standard_expansion_services.yaml index 79c7e06280df..4b04b59bdb03 100644 --- a/sdks/standard_expansion_services.yaml +++ b/sdks/standard_expansion_services.yaml @@ -52,6 +52,7 @@ - 'beam:schematransform:org.apache.beam:iceberg_write:v1' - 'beam:schematransform:org.apache.beam:iceberg_read:v1' - 'beam:schematransform:org.apache.beam:iceberg_cdc_read:v1' + - 'beam:schematransform:org.apache.beam:delta_read:v1' - gradle_target: 'sdks:java:io:messaging-expansion-service:shadowJar' destinations: From 0eb47c7716424f226a6a75c4b742dc5f2c157f82 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 10 Jun 2026 14:28:03 +0000 Subject: [PATCH 4/7] fix lint --- sdks/python/apache_beam/yaml/integration_tests.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index b38be32eee05..ce67b0122992 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -20,7 +20,6 @@ import contextlib import copy import glob -import tempfile import itertools import json import logging @@ -69,10 +68,10 @@ def get_impl(self): None, lambda payload, components, context: BigEndianIntegerCoder()) import psycopg2 -import pytds -import sqlalchemy import pyarrow as pa import pyarrow.parquet as pq +import pytds +import sqlalchemy import yaml from apitools.base.py.exceptions import HttpError from google.cloud import pubsub_v1 From 19b86e5a69366bb5b03321ba3c95b2885329bb11 Mon Sep 17 00:00:00 2001 From: Chamikara Jayalath Date: Wed, 10 Jun 2026 13:08:22 -0700 Subject: [PATCH 5/7] Add Delta Lake source to the Java Managed API --- .../apache/beam/model/pipeline/v1/external_transforms.proto | 5 ----- 1 file changed, 5 deletions(-) diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto index f29f2a34920f..c73986eed48b 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto @@ -88,13 +88,8 @@ message ManagedTransforms { "beam:schematransform:org.apache.beam:sql_server_read:v1"]; SQL_SERVER_WRITE = 12 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:schematransform:org.apache.beam:sql_server_write:v1"]; -<<<<<<< HEAD DELTA_LAKE_READ = 13 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:schematransform:org.apache.beam:delta_lake_read:v1"]; -======= - DELTA_READ = 13 [(org.apache.beam.model.pipeline.v1.beam_urn) = - "beam:schematransform:org.apache.beam:delta_read:v1"]; ->>>>>>> 7bce23329c5 (add managed) } } From ec43328595170da7b29a4827a23da79efa9d28d5 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 11 Jun 2026 14:56:27 +0000 Subject: [PATCH 6/7] match Cham's PR changes --- .../sdk/expansion/service/WindowIntoTransformProvider.java | 1 + .../sdk/io/delta/DeltaReadSchemaTransformProviderTest.java | 5 ++--- sdks/python/apache_beam/transforms/external.py | 2 +- sdks/python/apache_beam/transforms/managed.py | 2 +- sdks/python/apache_beam/yaml/tests/delta.yaml | 2 +- sdks/python/apache_beam/yaml/yaml_io.py | 6 +++--- sdks/standard_expansion_services.yaml | 2 +- 7 files changed, 10 insertions(+), 10 deletions(-) diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/WindowIntoTransformProvider.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/WindowIntoTransformProvider.java index d060d5916e9f..ca97ed287a0a 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/WindowIntoTransformProvider.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/WindowIntoTransformProvider.java @@ -88,6 +88,7 @@ public List outputCollectionNames() { @DefaultSchema(AutoValueSchema.class) @AutoValue + @SuppressWarnings("mutable") public abstract static class Configuration { @SuppressWarnings({"AutoValueMutable", "mutable"}) diff --git a/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProviderTest.java b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProviderTest.java index 6a513cb33edf..77aef7bce494 100644 --- a/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProviderTest.java +++ b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProviderTest.java @@ -59,7 +59,7 @@ public void testBuildTransformWithRow() { Row config = Row.withSchema(new DeltaReadSchemaTransformProvider().configurationSchema()) - .withFieldValue("table_path", "/path/to/table") + .withFieldValue("table", "/path/to/table") .withFieldValue("version", 5L) .withFieldValue("timestamp", "2026-06-04T12:00:00Z") .withFieldValue("hadoop_config", hadoopConfig) @@ -113,8 +113,7 @@ public void testSimpleScan() throws Exception { Files.write(commitFile.toPath(), commitContent.getBytes(StandardCharsets.UTF_8)); // 3. Read it using DeltaReadSchemaTransformProvider - Configuration readConfig = - Configuration.builder().setTablePath(tableDir.getAbsolutePath()).build(); + Configuration readConfig = Configuration.builder().setTable(tableDir.getAbsolutePath()).build(); PCollection output = PCollectionRowTuple.empty(readPipeline) diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index 5bd22c44d485..90de7aed24a6 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -86,7 +86,7 @@ ManagedTransforms.Urns.MYSQL_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, ManagedTransforms.Urns.SQL_SERVER_READ.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long ManagedTransforms.Urns.SQL_SERVER_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long - ManagedTransforms.Urns.DELTA_READ.urn: _IO_EXPANSION_SERVICE_JAR_TARGET, + ManagedTransforms.Urns.DELTA_LAKE_READ.urn: _IO_EXPANSION_SERVICE_JAR_TARGET, } diff --git a/sdks/python/apache_beam/transforms/managed.py b/sdks/python/apache_beam/transforms/managed.py index 13bdf986ced9..ba4cb38a011e 100644 --- a/sdks/python/apache_beam/transforms/managed.py +++ b/sdks/python/apache_beam/transforms/managed.py @@ -103,7 +103,7 @@ class Read(PTransform): POSTGRES: ManagedTransforms.Urns.POSTGRES_READ.urn, MYSQL: ManagedTransforms.Urns.MYSQL_READ.urn, SQL_SERVER: ManagedTransforms.Urns.SQL_SERVER_READ.urn, - DELTA: ManagedTransforms.Urns.DELTA_READ.urn, + DELTA: ManagedTransforms.Urns.DELTA_LAKE_READ.urn, } def __init__( diff --git a/sdks/python/apache_beam/yaml/tests/delta.yaml b/sdks/python/apache_beam/yaml/tests/delta.yaml index a38e0750d633..842c876759f0 100644 --- a/sdks/python/apache_beam/yaml/tests/delta.yaml +++ b/sdks/python/apache_beam/yaml/tests/delta.yaml @@ -25,7 +25,7 @@ pipelines: transforms: - type: ReadFromDelta config: - table_path: "{DELTA_TABLE}" + table: "{DELTA_TABLE}" - type: AssertEqual config: elements: diff --git a/sdks/python/apache_beam/yaml/yaml_io.py b/sdks/python/apache_beam/yaml/yaml_io.py index 600cc6800512..b7a43641161f 100644 --- a/sdks/python/apache_beam/yaml/yaml_io.py +++ b/sdks/python/apache_beam/yaml/yaml_io.py @@ -566,7 +566,7 @@ def read_from_iceberg( def read_from_delta( - table_path: str, + table: str, version: Optional[int] = None, timestamp: Optional[str] = None, hadoop_config: Optional[Mapping[str, str]] = None, @@ -574,7 +574,7 @@ def read_from_delta( """Reads a Delta Lake table. Args: - table_path: Path of the Delta Lake table. + table: Identifier of the Delta Lake table. version: Version of the Delta Lake table to read. timestamp: Timestamp of the Delta Lake table to read. hadoop_config: Hadoop configuration properties. @@ -582,7 +582,7 @@ def read_from_delta( return beam.managed.Read( "delta", config=dict( - table_path=table_path, + table=table, version=version, timestamp=timestamp, hadoop_config=hadoop_config)) diff --git a/sdks/standard_expansion_services.yaml b/sdks/standard_expansion_services.yaml index 4b04b59bdb03..d0c7f125c44b 100644 --- a/sdks/standard_expansion_services.yaml +++ b/sdks/standard_expansion_services.yaml @@ -52,7 +52,7 @@ - 'beam:schematransform:org.apache.beam:iceberg_write:v1' - 'beam:schematransform:org.apache.beam:iceberg_read:v1' - 'beam:schematransform:org.apache.beam:iceberg_cdc_read:v1' - - 'beam:schematransform:org.apache.beam:delta_read:v1' + - 'beam:schematransform:org.apache.beam:delta_lake_read:v1' - gradle_target: 'sdks:java:io:messaging-expansion-service:shadowJar' destinations: From 06cd80527f038bf0d62f523597b79aec1700975d Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Sun, 21 Jun 2026 14:30:29 +0000 Subject: [PATCH 7/7] remove delta lake entry from external transforms --- .../service/WindowIntoTransformProvider.java | 1 - sdks/standard_external_transforms.yaml | 18 ------------------ 2 files changed, 19 deletions(-) diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/WindowIntoTransformProvider.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/WindowIntoTransformProvider.java index ca97ed287a0a..d060d5916e9f 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/WindowIntoTransformProvider.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/WindowIntoTransformProvider.java @@ -88,7 +88,6 @@ public List outputCollectionNames() { @DefaultSchema(AutoValueSchema.class) @AutoValue - @SuppressWarnings("mutable") public abstract static class Configuration { @SuppressWarnings({"AutoValueMutable", "mutable"}) diff --git a/sdks/standard_external_transforms.yaml b/sdks/standard_external_transforms.yaml index f08dbb325a9c..b9802f11b6cf 100644 --- a/sdks/standard_external_transforms.yaml +++ b/sdks/standard_external_transforms.yaml @@ -56,24 +56,6 @@ type: str identifier: beam:schematransform:org.apache.beam:datadog_write:v1 name: DatadogWrite - - description: Hadoop configuration properties. - name: hadoop_config - nullable: true - type: map[str, str] - - description: Path of the Delta Lake table. - name: table_path - nullable: false - type: str - - description: Timestamp of the Delta Lake table to read. - name: timestamp - nullable: true - type: str - - description: Version of the Delta Lake table to read. - name: version - nullable: true - type: int64 - identifier: beam:schematransform:org.apache.beam:delta_read:v1 - name: DeltaRead - default_service: sdks:java:io:expansion-service:shadowJar description: 'Outputs a PCollection of Beam Rows, each containing a single INT64 number called "value". The count is produced from the given "start" value and