-
Notifications
You must be signed in to change notification settings - Fork 29.3k
[SPARK-57681][SQL] Support dynamic table options for DELETE and UPDATE #56792
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,7 +25,6 @@ import org.apache.spark.sql.connector.catalog.{SupportsDeleteV2, SupportsRowLeve | |
| import org.apache.spark.sql.connector.write.{RowLevelOperationTable, SupportsDelta} | ||
| import org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE | ||
| import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, ExtractV2Table} | ||
| import org.apache.spark.sql.util.CaseInsensitiveStringMap | ||
|
|
||
| /** | ||
| * A rule that rewrites DELETE operations using plans that operate on individual or groups of rows. | ||
|
|
@@ -45,7 +44,7 @@ object RewriteDeleteFromTable extends RewriteRowLevelCommand { | |
| d | ||
|
|
||
| case r @ ExtractV2Table(t: SupportsRowLevelOperations) => | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please consider this concern: silent option-drop on non-rewrite delete paths: for a table implementing both SupportsRowLevelOperations and SupportsDeleteV2 (e.g. Iceberg), a DELETE … WITH(...) WHERE can take the metadata-only/deleteWhere path (OptimizeMetadataOnlyDeleteFromTable / DataSourceV2Strategy), which has no options parameter; so the user's options are silently ignored. Same for the TruncatableTable truncate path. This would be a new user-visible "silently ignored" surprise, and no test currently covers this path.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great catch. I looked at those paths. The options are indeed not being passed all the way. Fixing this requires DSv2 API changes something like this with default methods. // SupportsDeleteV2
default void deleteWhere(Predicate[] predicates, CaseInsensitiveStringMap options) {
deleteWhere(predicates); // default ignores options → fully back-compatible
}
// TruncatableTable
default boolean truncateTable(CaseInsensitiveStringMap options) {
return truncateTable();
}I can make this change in this same PR or can do a follow up if DSv2 changes need separate PRs. Let me know what you think is the best way forward. |
||
| val table = buildOperationTable(t, DELETE, CaseInsensitiveStringMap.empty()) | ||
| val table = buildOperationTable(t, DELETE, r.options) | ||
| table.operation match { | ||
| case _: SupportsDelta => | ||
| buildWriteDeltaPlan(r, table, cond) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,6 +31,7 @@ import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransfo | |
| import org.apache.spark.sql.connector.expressions.LogicalExpressions.bucket | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.types.{DataType, Decimal, IntegerType, LongType, StringType, StructType, TimestampLTZNanosType, TimestampNTZNanosType, TimestampType, TimeType} | ||
| import org.apache.spark.sql.util.CaseInsensitiveStringMap | ||
| import org.apache.spark.storage.StorageLevelMapper | ||
| import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} | ||
|
|
||
|
|
@@ -2211,6 +2212,32 @@ class DDLParserSuite extends AnalysisTest { | |
| stop = 56)) | ||
| } | ||
|
|
||
| test("delete from table: with options") { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please make sure to either update the SQL reference docs, or file a followup Jira ticket to do so later.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For sure. I took a look and looks like INSERT docs are also not updated. I will create a follow up JIRA to update all options related DMLS at once. |
||
| parseCompare("DELETE FROM testcat.ns1.ns2.tbl WITH (`split-size` = 5) WHERE a = 2", | ||
| DeleteFromTable( | ||
| UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl"), | ||
| new CaseInsensitiveStringMap( | ||
| java.util.Map.of("split-size", "5"))), | ||
| EqualTo(UnresolvedAttribute("a"), Literal(2)))) | ||
| } | ||
|
|
||
| test("delete from table: with options and alias") { | ||
| parseCompare("DELETE FROM testcat.ns1.ns2.tbl WITH (`k` = 'v') AS t WHERE t.a = 2", | ||
| DeleteFromTable( | ||
| SubqueryAlias("t", | ||
| UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl"), | ||
| new CaseInsensitiveStringMap( | ||
| java.util.Map.of("k", "v")))), | ||
| EqualTo(UnresolvedAttribute("t.a"), Literal(2)))) | ||
| } | ||
|
|
||
| test("delete from table: options without values are not allowed") { | ||
| val e = intercept[ParseException] { | ||
| parsePlan("DELETE FROM testcat.ns1.ns2.tbl WITH (`split-size`)") | ||
| } | ||
| assert(e.getMessage.contains("Values must be specified for key(s): [split-size]")) | ||
| } | ||
|
|
||
| test("update table: basic") { | ||
| parseCompare( | ||
| """ | ||
|
|
@@ -2253,6 +2280,45 @@ class DDLParserSuite extends AnalysisTest { | |
| stop = 70)) | ||
| } | ||
|
|
||
| test("update table: with options") { | ||
| parseCompare( | ||
| """ | ||
| |UPDATE testcat.ns1.ns2.tbl WITH (`write.split-size` = 10) | ||
| |SET a='Robert', b=32 | ||
| """.stripMargin, | ||
| UpdateTable( | ||
| UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl"), | ||
| new CaseInsensitiveStringMap( | ||
| java.util.Map.of("write.split-size", "10"))), | ||
| Seq(Assignment(UnresolvedAttribute("a"), Literal("Robert")), | ||
| Assignment(UnresolvedAttribute("b"), Literal(32))), | ||
| None)) | ||
| } | ||
|
|
||
| test("update table: with options and alias") { | ||
| parseCompare( | ||
| """ | ||
| |UPDATE testcat.ns1.ns2.tbl WITH (`k` = 'v') AS t | ||
| |SET t.a='Robert', t.b=32 | ||
| |WHERE t.c=2 | ||
| """.stripMargin, | ||
| UpdateTable( | ||
| SubqueryAlias("t", | ||
| UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl"), | ||
| new CaseInsensitiveStringMap( | ||
| java.util.Map.of("k", "v")))), | ||
| Seq(Assignment(UnresolvedAttribute("t.a"), Literal("Robert")), | ||
| Assignment(UnresolvedAttribute("t.b"), Literal(32))), | ||
| Some(EqualTo(UnresolvedAttribute("t.c"), Literal(2))))) | ||
| } | ||
|
|
||
| test("update table: options without values are not allowed") { | ||
| val e = intercept[ParseException] { | ||
| parsePlan("UPDATE testcat.ns1.ns2.tbl WITH (`split-size`) SET a = 1") | ||
| } | ||
| assert(e.getMessage.contains("Values must be specified for key(s): [split-size]")) | ||
| } | ||
|
|
||
| test("merge into table: basic") { | ||
| parseCompare( | ||
| """ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this PR makes INSERT/DELETE/UPDATE accept WITH(...), but how about MERGE?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I intentionally did not add MERGE as the semantics are a bit different since MERGE statements have source and target relations, I'm still thinking about how options would look like there. I can file a separate JIRA and PR for MERGE later.