Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import org.apache.spark.sql.catalyst.expressions.{
Alias,
AliasHelper,
Attribute,
AttributeReference,
AttributeSet,
Cast,
Coalesce,
EmptyRow,
EqualNullSafe,
Expression,
Expand All @@ -43,6 +45,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.util.toPrettySQL
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StringType

/**
Expand Down Expand Up @@ -138,12 +141,25 @@ object PivotTransformer extends AliasHelper with SQLConfHelper {
firstAgg
)
val pivotAggregatesAttributes = pivotAggregates.map(_.toAttribute)
// When the flag is disabled, supply no empty-input defaults: empty buckets stay NULL and no
// Coalesce is added, so the pivoted columns stay nullable.
val aggregateEmptyInputDefaults: Seq[Option[Expression]] =
if (conf.getConf(SQLConf.PIVOT_EMPTY_BUCKET_RETURNS_AGGREGATE_DEFAULT)) {
aggregates.map(aggregateEmptyInputDefault)
} else {
Seq.fill(aggregates.size)(None)
}
val pivotOutputs = pivotValues.zipWithIndex.flatMap {
case (value, i) =>
aggregates.zip(pivotAggregatesAttributes).map {
case (aggregate, pivotAtt) =>
aggregates.zip(pivotAggregatesAttributes).zip(aggregateEmptyInputDefaults).map {
case ((aggregate, pivotAtt), emptyInputDefault) =>
val extractedValue = ExtractValue(pivotAtt, Literal(i), conf.resolver)
val withEmptyInputDefault = emptyInputDefault match {
case Some(default) => Coalesce(Seq(extractedValue, default))
case None => extractedValue
}
newAlias(
ExtractValue(pivotAtt, Literal(i), conf.resolver),
withEmptyInputDefault,
Some(outputName(value, aggregate, isSingleAggregate = aggregates.size == 1))
)
}
Expand Down Expand Up @@ -185,6 +201,42 @@ object PivotTransformer extends AliasHelper with SQLConfHelper {
}
}

/**
* Empty-input default for a pivot aggregate to coalesce into its extracted value, or `None` to
* leave the value unchanged. The fast path's [[PivotFirst]] leaves an unmatched pivot category's
* slot unset, so the caller wraps the result in a [[Coalesce]] to recover the value the slow path
* produces on an empty bucket (`count` -> 0; `sum`/`avg`/`min`/`max` -> NULL).
*
* Returned unevaluated so later constant folding defers a default that throws under ANSI (e.g.
* `count(v1) / count(v2)` -> `0 / 0`) to runtime, where [[Coalesce]] only evaluates it for
* actually-empty buckets -- matching the slow path. Mirrors
* `RewriteCorrelatedScalarSubquery.evalAggExprOnZeroTups`.
*/
private def aggregateEmptyInputDefault(aggregate: Expression): Option[Expression] = {
trimAliases(aggregate) match {
// Bare aggregate: use its published default result (count -> 0, sum/avg/min/max -> None).
case AggregateExpression(aggregateFunction, _, _, _, _) =>
aggregateFunction.defaultResult
// Composite over aggregate(s): substitute each aggregate/attribute with its empty-input
// value. Return None for a non-foldable default or a literal NULL (nothing to coalesce in). A
// default that only folds to NULL (e.g. sum(x) + 1) is still returned; its Coalesce evaluates
// to NULL on an empty bucket, which is the correct result.
case other =>
val default = other.transform {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The composite branch can return a non-null default for an aggregate that is NULL-capable on a non-empty bucket, and the caller then wraps it in Coalesce(extractedValue, default). The problem is that PivotFirst.update stores a value only if (value != null) and keeps no presence bit, so a buffer slot is NULL iff the pivot category is absent or present-but-the-value-is-NULLExtractValue can't distinguish them, and the Coalesce defaults both. That's correct for bare count-family aggregates (never NULL on a non-empty bucket), but wrong for composites over a NULL-capable sub-aggregate:

  • PIVOT(nullif(count(v), 5) ...) — a bucket with exactly 5 matching non-null rows: slow path = nullif(5,5) = NULL, fast path = Coalesce(NULL, nullif(0,5)=0) = 0.
  • PIVOT(if(count(v1) > 0, sum(v2), 0) ...) — a non-empty bucket with all-NULL v2: slow path = NULL, fast path = 0.
    Since the flag defaults to true, these are live wrong results, and the pivoted column is also declared non-nullable while it can be NULL. The simplest safe fix is to apply the empty-default only for bare aggregates whose defaultResult is non-null (count-family, where a NULL slot provably means empty) and leave NULL-capable composites to NULL / the slow path; a fuller fix would add a per-slot presence signal to PivotFirst.

case AggregateExpression(aggregateFunction, _, _, _, _) =>
aggregateFunction.defaultResult.getOrElse(
Literal.create(null, aggregateFunction.dataType))
case attribute: AttributeReference =>
Literal.create(null, attribute.dataType)
}
default match {
case _ if !trimAliases(default).foldable => None
case Literal(null, _) => None

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this guard only catches a syntactic Literal(null), but a default that evaluates to null without being a literal — e.g. CEIL(null) (from CEIL(sum(x))) or (null + cast(1 as bigint)) (from sum(x) + 1) — slips through and yields Some(default), so a Coalesce(extractedValue, <expr→null>) wrapper is emitted (you can see it in the analyzer-results golden, e.g. coalesce(__pivot_CEIL(sum…), CEIL(null))). It's harmless to results (Coalesce(x, null) ≡ x, and nullability is unchanged) but adds dead plan nodes and doesn't match the doc's "Return None for … a literal NULL". Folding the default first would catch these: default.foldable && default.eval() == null => None.

case _ => Some(default)
}
}
}

private def outputName(
value: Expression,
aggregate: Expression,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2551,6 +2551,19 @@ object SQLConf {
.intConf
.createWithDefault(10000)

val PIVOT_EMPTY_BUCKET_RETURNS_AGGREGATE_DEFAULT =
buildConf("spark.sql.pivot.emptyBucketReturnsAggregateDefault")
.internal()
.doc("When true, a pivot value with no matching rows returns the value the aggregate " +
"produces on empty input, e.g. 0 for count, as required by the SQL standard. The same " +
"applies to other aggregates with a non-null result on empty input (e.g. " +
"approx_count_distinct) and to expressions over them (e.g. count(x) + 1). When false, " +
"such cells return NULL.")
.version("4.3.0")
.withBindingPolicy(ConfigBindingPolicy.SESSION)
.booleanConf
.createWithDefault(true)

val DATAFRAME_TRANSPOSE_MAX_VALUES = buildConf("spark.sql.transposeMaxValues")
.doc("When doing a transpose without specifying values for the index column this is" +
" the maximum number of values that will be transposed without error.")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Project [id#0L, __pivot_count(b) AS `count(b)`#0[0] AS 1#0L, __pivot_count(b) AS `count(b)`#0[1] AS 2#0L, __pivot_count(b) AS `count(b)`#0[2] AS 3#0L]
Project [id#0L, coalesce(__pivot_count(b) AS `count(b)`#0[0], 0) AS 1#0L, coalesce(__pivot_count(b) AS `count(b)`#0[1], 0) AS 2#0L, coalesce(__pivot_count(b) AS `count(b)`#0[2], 0) AS 3#0L]
+- Aggregate [id#0L], [id#0L, pivotfirst(a#0, count(b)#0L, 1, 2, 3, 0, 0) AS __pivot_count(b) AS `count(b)`#0]
+- Aggregate [id#0L, a#0], [id#0L, a#0, count(b#0) AS count(b)#0L]
+- LocalRelation <empty>, [id#0L, a#0, b#0]
Loading