-
Notifications
You must be signed in to change notification settings - Fork 29.3k
[SPARK-57715][SQL] PIVOT count() empty buckets should return 0 instead of NULL #56808
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,8 +22,10 @@ import org.apache.spark.sql.catalyst.expressions.{ | |
| Alias, | ||
| AliasHelper, | ||
| Attribute, | ||
| AttributeReference, | ||
| AttributeSet, | ||
| Cast, | ||
| Coalesce, | ||
| EmptyRow, | ||
| EqualNullSafe, | ||
| Expression, | ||
|
|
@@ -43,6 +45,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{ | |
| import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project} | ||
| import org.apache.spark.sql.catalyst.util.toPrettySQL | ||
| import org.apache.spark.sql.errors.QueryCompilationErrors | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.types.StringType | ||
|
|
||
| /** | ||
|
|
@@ -138,12 +141,25 @@ object PivotTransformer extends AliasHelper with SQLConfHelper { | |
| firstAgg | ||
| ) | ||
| val pivotAggregatesAttributes = pivotAggregates.map(_.toAttribute) | ||
| // When the flag is disabled, supply no empty-input defaults: empty buckets stay NULL and no | ||
| // Coalesce is added, so the pivoted columns stay nullable. | ||
| val aggregateEmptyInputDefaults: Seq[Option[Expression]] = | ||
| if (conf.getConf(SQLConf.PIVOT_EMPTY_BUCKET_RETURNS_AGGREGATE_DEFAULT)) { | ||
| aggregates.map(aggregateEmptyInputDefault) | ||
| } else { | ||
| Seq.fill(aggregates.size)(None) | ||
| } | ||
| val pivotOutputs = pivotValues.zipWithIndex.flatMap { | ||
| case (value, i) => | ||
| aggregates.zip(pivotAggregatesAttributes).map { | ||
| case (aggregate, pivotAtt) => | ||
| aggregates.zip(pivotAggregatesAttributes).zip(aggregateEmptyInputDefaults).map { | ||
| case ((aggregate, pivotAtt), emptyInputDefault) => | ||
| val extractedValue = ExtractValue(pivotAtt, Literal(i), conf.resolver) | ||
| val withEmptyInputDefault = emptyInputDefault match { | ||
| case Some(default) => Coalesce(Seq(extractedValue, default)) | ||
| case None => extractedValue | ||
| } | ||
| newAlias( | ||
| ExtractValue(pivotAtt, Literal(i), conf.resolver), | ||
| withEmptyInputDefault, | ||
| Some(outputName(value, aggregate, isSingleAggregate = aggregates.size == 1)) | ||
| ) | ||
| } | ||
|
|
@@ -185,6 +201,42 @@ object PivotTransformer extends AliasHelper with SQLConfHelper { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Empty-input default for a pivot aggregate to coalesce into its extracted value, or `None` to | ||
| * leave the value unchanged. The fast path's [[PivotFirst]] leaves an unmatched pivot category's | ||
| * slot unset, so the caller wraps the result in a [[Coalesce]] to recover the value the slow path | ||
| * produces on an empty bucket (`count` -> 0; `sum`/`avg`/`min`/`max` -> NULL). | ||
| * | ||
| * Returned unevaluated so later constant folding defers a default that throws under ANSI (e.g. | ||
| * `count(v1) / count(v2)` -> `0 / 0`) to runtime, where [[Coalesce]] only evaluates it for | ||
| * actually-empty buckets -- matching the slow path. Mirrors | ||
| * `RewriteCorrelatedScalarSubquery.evalAggExprOnZeroTups`. | ||
| */ | ||
| private def aggregateEmptyInputDefault(aggregate: Expression): Option[Expression] = { | ||
| trimAliases(aggregate) match { | ||
| // Bare aggregate: use its published default result (count -> 0, sum/avg/min/max -> None). | ||
| case AggregateExpression(aggregateFunction, _, _, _, _) => | ||
| aggregateFunction.defaultResult | ||
| // Composite over aggregate(s): substitute each aggregate/attribute with its empty-input | ||
| // value. Return None for a non-foldable default or a literal NULL (nothing to coalesce in). A | ||
| // default that only folds to NULL (e.g. sum(x) + 1) is still returned; its Coalesce evaluates | ||
| // to NULL on an empty bucket, which is the correct result. | ||
| case other => | ||
| val default = other.transform { | ||
| case AggregateExpression(aggregateFunction, _, _, _, _) => | ||
| aggregateFunction.defaultResult.getOrElse( | ||
| Literal.create(null, aggregateFunction.dataType)) | ||
| case attribute: AttributeReference => | ||
| Literal.create(null, attribute.dataType) | ||
| } | ||
| default match { | ||
| case _ if !trimAliases(default).foldable => None | ||
| case Literal(null, _) => None | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: this guard only catches a syntactic |
||
| case _ => Some(default) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private def outputName( | ||
| value: Expression, | ||
| aggregate: Expression, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| Project [id#0L, __pivot_count(b) AS `count(b)`#0[0] AS 1#0L, __pivot_count(b) AS `count(b)`#0[1] AS 2#0L, __pivot_count(b) AS `count(b)`#0[2] AS 3#0L] | ||
| Project [id#0L, coalesce(__pivot_count(b) AS `count(b)`#0[0], 0) AS 1#0L, coalesce(__pivot_count(b) AS `count(b)`#0[1], 0) AS 2#0L, coalesce(__pivot_count(b) AS `count(b)`#0[2], 0) AS 3#0L] | ||
| +- Aggregate [id#0L], [id#0L, pivotfirst(a#0, count(b)#0L, 1, 2, 3, 0, 0) AS __pivot_count(b) AS `count(b)`#0] | ||
| +- Aggregate [id#0L, a#0], [id#0L, a#0, count(b#0) AS count(b)#0L] | ||
| +- LocalRelation <empty>, [id#0L, a#0, b#0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The composite branch can return a non-null
defaultfor an aggregate that is NULL-capable on a non-empty bucket, and the caller then wraps it inCoalesce(extractedValue, default). The problem is thatPivotFirst.updatestores a value onlyif (value != null)and keeps no presence bit, so a buffer slot is NULL iff the pivot category is absent or present-but-the-value-is-NULL —ExtractValuecan't distinguish them, and theCoalescedefaults both. That's correct for bare count-family aggregates (never NULL on a non-empty bucket), but wrong for composites over a NULL-capable sub-aggregate:PIVOT(nullif(count(v), 5) ...)— a bucket with exactly 5 matching non-null rows: slow path =nullif(5,5)= NULL, fast path =Coalesce(NULL, nullif(0,5)=0)=0.PIVOT(if(count(v1) > 0, sum(v2), 0) ...)— a non-empty bucket with all-NULLv2: slow path = NULL, fast path =0.Since the flag defaults to
true, these are live wrong results, and the pivoted column is also declared non-nullable while it can be NULL. The simplest safe fix is to apply the empty-default only for bare aggregates whosedefaultResultis non-null (count-family, where a NULL slot provably means empty) and leave NULL-capable composites to NULL / the slow path; a fuller fix would add a per-slot presence signal toPivotFirst.