Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -739,8 +739,8 @@ resource
dmlStatementNoWith
: insertInto (query | LEFT_PAREN query RIGHT_PAREN queryAlias=tableAlias) #singleInsertQuery
| fromClause multiInsertQueryBody+ #multiInsertQuery
| DELETE FROM identifierReference tableAlias whereClause? #deleteFromTable
| UPDATE identifierReference tableAlias setClause whereClause? #updateTable
| DELETE FROM identifierReference optionsClause? tableAlias whereClause? #deleteFromTable
| UPDATE identifierReference optionsClause? tableAlias setClause whereClause? #updateTable
| MERGE (WITH SCHEMA EVOLUTION)? INTO target=identifierReference targetAlias=tableAlias

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this PR makes INSERT/DELETE/UPDATE accept WITH(...), but how about MERGE?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally did not add MERGE as the semantics are a bit different since MERGE statements have source and target relations, I'm still thinking about how options would look like there. I can file a separate JIRA and PR for MERGE later.

USING (source=identifierReference |
LEFT_PAREN sourceQuery=query RIGHT_PAREN) sourceAlias=tableAlias
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.spark.sql.connector.catalog.{SupportsDeleteV2, SupportsRowLeve
import org.apache.spark.sql.connector.write.{RowLevelOperationTable, SupportsDelta}
import org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, ExtractV2Table}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* A rule that rewrites DELETE operations using plans that operate on individual or groups of rows.
Expand All @@ -45,7 +44,7 @@ object RewriteDeleteFromTable extends RewriteRowLevelCommand {
d

case r @ ExtractV2Table(t: SupportsRowLevelOperations) =>

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consider this concern: silent option-drop on non-rewrite delete paths: for a table implementing both SupportsRowLevelOperations and SupportsDeleteV2 (e.g. Iceberg), a DELETE … WITH(...) WHERE can take the metadata-only/deleteWhere path (OptimizeMetadataOnlyDeleteFromTable / DataSourceV2Strategy), which has no options parameter; so the user's options are silently ignored. Same for the TruncatableTable truncate path. This would be a new user-visible "silently ignored" surprise, and no test currently covers this path.

@anuragmantri anuragmantri Jun 27, 2026

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch. I looked at those paths. The options are indeed not being passed all the way. Fixing this requires DSv2 API changes something like this with default methods.

  // SupportsDeleteV2
  default void deleteWhere(Predicate[] predicates, CaseInsensitiveStringMap options) {
    deleteWhere(predicates);   // default ignores options → fully back-compatible
  }
  // TruncatableTable
  default boolean truncateTable(CaseInsensitiveStringMap options) {
    return truncateTable();
  }

I can make this change in this same PR or can do a follow up if DSv2 changes need separate PRs. Let me know what you think is the best way forward.

val table = buildOperationTable(t, DELETE, CaseInsensitiveStringMap.empty())
val table = buildOperationTable(t, DELETE, r.options)
table.operation match {
case _: SupportsDelta =>
buildWriteDeltaPlan(r, table, cond)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.apache.spark.sql.connector.write.{RowLevelOperationTable, SupportsDel
import org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, ExtractV2Table}
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* A rule that rewrites UPDATE operations using plans that operate on individual or groups of rows.
Expand All @@ -41,7 +40,7 @@ object RewriteUpdateTable extends RewriteRowLevelCommand {

EliminateSubqueryAliases(aliasedTable) match {
case r @ ExtractV2Table(tbl: SupportsRowLevelOperations) =>
val table = buildOperationTable(tbl, UPDATE, CaseInsensitiveStringMap.empty())
val table = buildOperationTable(tbl, UPDATE, r.options)
val updateCond = cond.getOrElse(TrueLiteral)
table.operation match {
case _: SupportsDelta =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1224,7 +1224,8 @@ class AstBuilder extends DataTypeAstBuilder
override def visitDeleteFromTable(
ctx: DeleteFromTableContext): LogicalPlan = withOrigin(ctx) {
val table = createUnresolvedRelation(
ctx.identifierReference, writePrivileges = Set(TableWritePrivilege.DELETE))
ctx.identifierReference, Option(ctx.optionsClause()),
writePrivileges = Set(TableWritePrivilege.DELETE))
val tableAlias = getTableAliasWithoutColumnAlias(ctx.tableAlias(), "DELETE")
val aliasedTable = tableAlias.map(SubqueryAlias(_, table)).getOrElse(table)
val predicate = if (ctx.whereClause() != null) {
Expand All @@ -1237,7 +1238,8 @@ class AstBuilder extends DataTypeAstBuilder

override def visitUpdateTable(ctx: UpdateTableContext): LogicalPlan = withOrigin(ctx) {
val table = createUnresolvedRelation(
ctx.identifierReference, writePrivileges = Set(TableWritePrivilege.UPDATE))
ctx.identifierReference, Option(ctx.optionsClause()),
writePrivileges = Set(TableWritePrivilege.UPDATE))
val tableAlias = getTableAliasWithoutColumnAlias(ctx.tableAlias(), "UPDATE")
val aliasedTable = tableAlias.map(SubqueryAlias(_, table)).getOrElse(table)
val assignments = withAssignments(ctx.setClause().assignmentList())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransfo
import org.apache.spark.sql.connector.expressions.LogicalExpressions.bucket
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, Decimal, IntegerType, LongType, StringType, StructType, TimestampLTZNanosType, TimestampNTZNanosType, TimestampType, TimeType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.StorageLevelMapper
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

Expand Down Expand Up @@ -2211,6 +2212,32 @@ class DDLParserSuite extends AnalysisTest {
stop = 56))
}

test("delete from table: with options") {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make sure to either update the SQL reference docs, or file a followup Jira ticket to do so later.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For sure. I took a look and looks like INSERT docs are also not updated. I will create a follow up JIRA to update all options related DMLS at once.

parseCompare("DELETE FROM testcat.ns1.ns2.tbl WITH (`split-size` = 5) WHERE a = 2",
DeleteFromTable(
UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl"),
new CaseInsensitiveStringMap(
java.util.Map.of("split-size", "5"))),
EqualTo(UnresolvedAttribute("a"), Literal(2))))
}

test("delete from table: with options and alias") {
parseCompare("DELETE FROM testcat.ns1.ns2.tbl WITH (`k` = 'v') AS t WHERE t.a = 2",
DeleteFromTable(
SubqueryAlias("t",
UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl"),
new CaseInsensitiveStringMap(
java.util.Map.of("k", "v")))),
EqualTo(UnresolvedAttribute("t.a"), Literal(2))))
}

test("delete from table: options without values are not allowed") {
val e = intercept[ParseException] {
parsePlan("DELETE FROM testcat.ns1.ns2.tbl WITH (`split-size`)")
}
assert(e.getMessage.contains("Values must be specified for key(s): [split-size]"))
}

test("update table: basic") {
parseCompare(
"""
Expand Down Expand Up @@ -2253,6 +2280,45 @@ class DDLParserSuite extends AnalysisTest {
stop = 70))
}

test("update table: with options") {
parseCompare(
"""
|UPDATE testcat.ns1.ns2.tbl WITH (`write.split-size` = 10)
|SET a='Robert', b=32
""".stripMargin,
UpdateTable(
UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl"),
new CaseInsensitiveStringMap(
java.util.Map.of("write.split-size", "10"))),
Seq(Assignment(UnresolvedAttribute("a"), Literal("Robert")),
Assignment(UnresolvedAttribute("b"), Literal(32))),
None))
}

test("update table: with options and alias") {
parseCompare(
"""
|UPDATE testcat.ns1.ns2.tbl WITH (`k` = 'v') AS t
|SET t.a='Robert', t.b=32
|WHERE t.c=2
""".stripMargin,
UpdateTable(
SubqueryAlias("t",
UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl"),
new CaseInsensitiveStringMap(
java.util.Map.of("k", "v")))),
Seq(Assignment(UnresolvedAttribute("t.a"), Literal("Robert")),
Assignment(UnresolvedAttribute("t.b"), Literal(32))),
Some(EqualTo(UnresolvedAttribute("t.c"), Literal(2)))))
}

test("update table: options without values are not allowed") {
val e = intercept[ParseException] {
parsePlan("UPDATE testcat.ns1.ns2.tbl WITH (`split-size`) SET a = 1")
}
assert(e.getMessage.contains("Values must be specified for key(s): [split-size]"))
}

test("merge into table: basic") {
parseCompare(
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.ArrayImplicits._

/**
* Test helper trait mixed into the in-memory row-level operations so tests can verify that
* per-statement SQL options reach the operation via [[RowLevelOperationInfo#options]].
*/
trait RowLevelOperationWithOptions {
def options: CaseInsensitiveStringMap
}

class InMemoryRowLevelOperationTable private (
name: String,
columns: Array[Column],
Expand Down Expand Up @@ -108,13 +116,14 @@ class InMemoryRowLevelOperationTable private (
override def newRowLevelOperationBuilder(
info: RowLevelOperationInfo): RowLevelOperationBuilder = {
if (properties.getOrDefault(SUPPORTS_DELTAS, "false") == "true") {
() => DeltaBasedOperation(info.command)
() => DeltaBasedOperation(info.command, info.options)
} else {
() => PartitionBasedOperation(info.command)
() => PartitionBasedOperation(info.command, info.options)
}
}

case class PartitionBasedOperation(command: Command) extends RowLevelOperation {
case class PartitionBasedOperation(command: Command, options: CaseInsensitiveStringMap)
extends RowLevelOperation with RowLevelOperationWithOptions {
var configuredScan: InMemoryBatchScan = _

override def requiredMetadataAttributes(): Array[NamedReference] = {
Expand Down Expand Up @@ -183,7 +192,8 @@ class InMemoryRowLevelOperationTable private (
}
}

case class DeltaBasedOperation(command: Command) extends RowLevelOperation with SupportsDelta {
case class DeltaBasedOperation(command: Command, options: CaseInsensitiveStringMap)
extends RowLevelOperation with SupportsDelta with RowLevelOperationWithOptions {
private final val PK_COLUMN_REF = FieldReference("pk")

override def requiredMetadataAttributes(): Array[NamedReference] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1051,4 +1051,21 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase {
fail("unexpected executed plan: " + other)
}
}

test("delete with dynamic options") {
createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
"""{ "pk": 1, "id": 1, "dep": "hr" }
|{ "pk": 2, "id": 2, "dep": "software" }
|{ "pk": 3, "id": 3, "dep": "hr" }
|""".stripMargin)

// the WITH options must reach the relation, the RowLevelOperationInfo, and the write builder
checkRowLevelOperationOptions(
sql(s"DELETE FROM $tableNameAsString WITH (`write.split-size` = 10) WHERE id IN (1, 100)"),
"write.split-size" -> "10")

checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
Row(2, 2, "software") :: Row(3, 3, "hr") :: Nil)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expr
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReplaceData, WriteDelta}
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Delete, Identifier, InMemoryRowLevelOperationTable, InMemoryRowLevelOperationTableCatalog, Insert, MetadataColumn, Operation, Reinsert, Table, TableInfo, Txn, TxnTable, Update, Write}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Delete, Identifier, InMemoryRowLevelOperationTable, InMemoryRowLevelOperationTableCatalog, Insert, MetadataColumn, Operation, Reinsert, RowLevelOperationWithOptions, Table, TableInfo, Txn, TxnTable, Update, Write}
import org.apache.spark.sql.connector.expressions.LogicalExpressions.{identity, reference}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.write.RowLevelOperationTable
Expand Down Expand Up @@ -179,6 +179,28 @@ abstract class RowLevelOperationSuiteBase
}.getOrElse(fail("couldn't find row-level operation in optimized plan"))
}

// runs a row-level command and asserts the given SQL options reached every layer that should
// carry them: the rewritten DataSourceV2Relation, the RowLevelOperationInfo passed to the
// operation builder (the row-level scan/write planning hook), and the write builder's
// LogicalWriteInfo
protected def checkRowLevelOperationOptions(
func: => Unit,
expectedOptions: (String, String)*): Unit = {
val Seq(qe) = withQueryExecutionsCaptured(spark)(func)
val writeRelation = qe.optimizedPlan.collectFirst {
case rd: ReplaceData => rd.table
case wd: WriteDelta => wd.table
}.getOrElse(fail("couldn't find row-level operation in optimized plan"))
.asInstanceOf[DataSourceV2Relation]
val operation = writeRelation.table.asInstanceOf[RowLevelOperationTable].operation
.asInstanceOf[RowLevelOperationWithOptions]
expectedOptions.foreach { case (key, value) =>
assert(writeRelation.options.get(key) === value, s"relation option '$key'")
assert(operation.options.get(key) === value, s"row-level operation option '$key'")
assert(table.lastWriteInfo.options().get(key) === value, s"write option '$key'")
}
}

protected def assertNoScanPlanning(plan: LogicalPlan): Unit = {
val relations = plan.collect { case r: DataSourceV2Relation => r }
assert(relations.nonEmpty, "plan must contain relations")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1232,4 +1232,20 @@ abstract class UpdateTableSuiteBase extends RowLevelOperationSuiteBase {
Row(1, 100, "hr"),
Row(2, 200, "software")))
}

test("update with dynamic options") {
createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
"""{ "pk": 1, "salary": 100, "dep": "hr" }
|{ "pk": 2, "salary": 200, "dep": "software" }
|""".stripMargin)

// the WITH options must reach the relation, the RowLevelOperationInfo, and the write builder
checkRowLevelOperationOptions(
sql(s"UPDATE $tableNameAsString WITH (`write.split-size` = 10) SET salary = -1 WHERE pk = 1"),
"write.split-size" -> "10")

checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
Row(1, -1, "hr") :: Row(2, 200, "software") :: Nil)
}
}