From cc7d2cc72499c659943547a60cf887c77bf996c2 Mon Sep 17 00:00:00 2001 From: Sudarshan Date: Sat, 18 Apr 2026 13:21:34 +0530 Subject: [PATCH 1/9] added support for MapFromEntries --- datafusion/common/src/config.rs | 10 + .../spark/src/function/map/map_from_arrays.rs | 3 +- .../src/function/map/map_from_entries.rs | 242 ++++++++++++++++-- datafusion/spark/src/function/map/utils.rs | 56 +++- 4 files changed, 275 insertions(+), 36 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 85361ef5e17e1..b5021d6bd05f8 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -692,6 +692,16 @@ config_namespace! { /// `false` — ANSI SQL mode is disabled by default. pub enable_ansi_mode: bool, default = false + /// Duplicate-key policy used by Spark-compatible map construction functions + /// (e.g. `map_from_entries`, `map_from_arrays`). + /// + /// The flag is experimental and relevant only for DataFusion Spark built-in functions. + /// It mirrors Spark's [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala). + /// + /// Accepted values (case-insensitive): `"EXCEPTION"` (default, matches Spark's + /// default) and `"LAST_WIN"`. Any other value falls back to `"EXCEPTION"`. + pub map_key_dedup_policy: String, default = "EXCEPTION".to_string() + /// How many bytes to buffer in the probe side of hash joins while the build side is /// concurrently being built. /// diff --git a/datafusion/spark/src/function/map/map_from_arrays.rs b/datafusion/spark/src/function/map/map_from_arrays.rs index 692e837d00f5e..b096e2faf5350 100644 --- a/datafusion/spark/src/function/map/map_from_arrays.rs +++ b/datafusion/spark/src/function/map/map_from_arrays.rs @@ -16,7 +16,7 @@ // under the License. use crate::function::map::utils::{ - get_element_type, get_list_offsets, get_list_values, + MapKeyDedupPolicy, get_element_type, get_list_offsets, get_list_values, map_from_keys_values_offsets_nulls, map_type_from_key_value_types, }; use arrow::array::{Array, ArrayRef, NullArray}; @@ -105,6 +105,7 @@ fn map_from_arrays_inner(args: &[ArrayRef]) -> Result { &get_list_offsets(values)?, keys.nulls(), values.nulls(), + MapKeyDedupPolicy::Exception, ) } diff --git a/datafusion/spark/src/function/map/map_from_entries.rs b/datafusion/spark/src/function/map/map_from_entries.rs index facf9f8c53473..698ca719006d1 100644 --- a/datafusion/spark/src/function/map/map_from_entries.rs +++ b/datafusion/spark/src/function/map/map_from_entries.rs @@ -18,11 +18,10 @@ use std::sync::Arc; use crate::function::map::utils::{ - get_list_offsets, get_list_values, map_from_keys_values_offsets_nulls, - map_type_from_key_value_types, + MapKeyDedupPolicy, get_list_offsets, get_list_values, + map_from_keys_values_offsets_nulls, map_type_from_key_value_types, }; -use arrow::array::{Array, ArrayRef, NullBufferBuilder, StructArray}; -use arrow::buffer::NullBuffer; +use arrow::array::{Array, ArrayRef, StructArray}; use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::utils::take_function_args; use datafusion_common::{Result, exec_err, internal_err}; @@ -101,11 +100,20 @@ impl ScalarUDFImpl for MapFromEntries { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - make_scalar_function(map_from_entries_inner, vec![])(&args.args) + let dedup_policy = MapKeyDedupPolicy::from_config_str( + &args.config_options.execution.map_key_dedup_policy, + ); + make_scalar_function( + move |arrays: &[ArrayRef]| map_from_entries_inner(arrays, dedup_policy), + vec![], + )(&args.args) } } -fn map_from_entries_inner(args: &[ArrayRef]) -> Result { +fn map_from_entries_inner( + args: &[ArrayRef], + dedup_policy: MapKeyDedupPolicy, +) -> Result { let [entries] = take_function_args("map_from_entries", args)?; let entries_offsets = get_list_offsets(entries)?; let entries_values = get_list_values(entries)?; @@ -119,27 +127,36 @@ fn map_from_entries_inner(args: &[ArrayRef]) -> Result { ), }?; - let entries_with_nulls = entries_values.nulls().and_then(|entries_inner_nulls| { - let mut builder = NullBufferBuilder::new_with_len(0); - let mut cur_offset = entries_offsets + // Spark throws on: + // * a null struct entry inside a non-null list row — Spark error class `NULL_MAP_KEY` + // (see `QueryExecutionErrors.nullAsMapKeyNotAllowedError`) + // * a null key inside a non-null struct entry — Spark error class `NULL_MAP_KEY` + // A null outer list row is valid and propagates to a null output row. + let outer_nulls = entries.nulls(); + let struct_nulls = entries_values.nulls(); + let key_nulls = flat_keys.nulls(); + + if struct_nulls.is_some() || key_nulls.is_some() { + let start = entries_offsets .first() .map(|offset| *offset as usize) .unwrap_or(0); - - for next_offset in entries_offsets.iter().skip(1) { - let num_entries = *next_offset as usize - cur_offset; - builder.append( - entries_inner_nulls - .slice(cur_offset, num_entries) - .null_count() - == 0, - ); - cur_offset = *next_offset as usize; + let mut cur_offset = start; + for (row_idx, next_offset) in entries_offsets.iter().skip(1).enumerate() { + let next = *next_offset as usize; + let row_is_null = outer_nulls.is_some_and(|n| n.is_null(row_idx)); + if !row_is_null { + for i in cur_offset..next { + if struct_nulls.is_some_and(|n| n.is_null(i)) + || key_nulls.is_some_and(|n| n.is_null(i)) + { + return exec_err!("[NULL_MAP_KEY] Cannot use null as map key."); + } + } + } + cur_offset = next; } - builder.finish() - }); - - let res_nulls = NullBuffer::union(entries.nulls(), entries_with_nulls.as_ref()); + } map_from_keys_values_offsets_nulls( flat_keys, @@ -147,13 +164,19 @@ fn map_from_entries_inner(args: &[ArrayRef]) -> Result { &entries_offsets, &entries_offsets, None, - res_nulls.as_ref(), + outer_nulls, + dedup_policy, ) } #[cfg(test)] mod tests { use super::*; + use arrow::array::{ + Int32Array, Int32Builder, ListArray, MapArray, StringArray, StringBuilder, + StructArray, + }; + use arrow::buffer::{NullBuffer, OffsetBuffer}; use arrow::datatypes::Fields; fn make_entries_field(array_nullable: bool, element_nullable: bool) -> FieldRef { @@ -207,4 +230,175 @@ mod tests { assert!(result.is_nullable()); assert_eq!(result.data_type(), &expected_type); } + + fn struct_fields() -> Fields { + Fields::from(vec![ + Field::new("key", DataType::Int32, false), + Field::new("value", DataType::Utf8, true), + ]) + } + + type TestRow<'a> = Option)>>; + + /// Build a `List>` from per-row entries. + /// `rows` is a list of rows; `None` means the outer list row is null; a row is a vector + /// of `(key, value)` pairs where `key` is always present and `value` may be `None`. + fn build_list(rows: Vec) -> ArrayRef { + let fields = struct_fields(); + let mut key_builder = Int32Builder::new(); + let mut val_builder = StringBuilder::new(); + let mut offsets: Vec = vec![0]; + let mut nulls = vec![]; + let mut cur: i32 = 0; + for row in rows { + match row { + Some(entries) => { + for (k, v) in entries { + key_builder.append_value(k); + match v { + Some(s) => val_builder.append_value(s), + None => val_builder.append_null(), + } + cur += 1; + } + nulls.push(true); + } + None => nulls.push(false), + } + offsets.push(cur); + } + let keys: ArrayRef = Arc::new(key_builder.finish()); + let values: ArrayRef = Arc::new(val_builder.finish()); + let entries = StructArray::try_new(fields.clone(), vec![keys, values], None) + .expect("struct array"); + let list_field = Arc::new(Field::new("item", DataType::Struct(fields), false)); + let list = ListArray::try_new( + list_field, + OffsetBuffer::new(offsets.into()), + Arc::new(entries), + Some(NullBuffer::from(nulls)), + ) + .expect("list array"); + Arc::new(list) + } + + #[test] + fn test_map_from_entries_happy_path() { + let input = build_list(vec![ + Some(vec![(1, Some("a")), (2, Some("b"))]), + Some(vec![]), + None, + ]); + let out = map_from_entries_inner(&[input], MapKeyDedupPolicy::Exception).unwrap(); + let map = out.as_any().downcast_ref::().unwrap(); + assert_eq!(map.len(), 3); + assert!(!map.is_null(0)); + assert!(!map.is_null(1)); + assert!(map.is_null(2)); + let row0 = map.value(0); + let row0 = row0.as_any().downcast_ref::().unwrap(); + let keys = row0 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let values = row0 + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(keys.values(), &[1, 2]); + assert_eq!(values.value(0), "a"); + assert_eq!(values.value(1), "b"); + } + + #[test] + fn test_map_from_entries_duplicate_keys_exception() { + let input = build_list(vec![Some(vec![(1, Some("a")), (1, Some("b"))])]); + let err = map_from_entries_inner(&[input], MapKeyDedupPolicy::Exception) + .expect_err("should error on duplicate key under Exception policy"); + assert!( + err.to_string().contains("[DUPLICATED_MAP_KEY]"), + "unexpected error: {err}" + ); + } + + #[test] + fn test_map_from_entries_duplicate_keys_last_win() { + let input = build_list(vec![Some(vec![(1, Some("a")), (1, Some("b"))])]); + let out = map_from_entries_inner(&[input], MapKeyDedupPolicy::LastWin).unwrap(); + let map = out.as_any().downcast_ref::().unwrap(); + assert_eq!(map.len(), 1); + let row0 = map.value(0); + let row0 = row0.as_any().downcast_ref::().unwrap(); + let keys = row0 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let values = row0 + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(keys.len(), 1); + assert_eq!(keys.value(0), 1); + assert_eq!(values.value(0), "b"); + } + + #[test] + fn test_map_from_entries_null_struct_entry_throws() { + // Build List where the struct element has a null at position 1 + // inside a non-null list row. + let fields = struct_fields(); + let keys: ArrayRef = Arc::new(Int32Array::from(vec![1, 0])); + let values: ArrayRef = Arc::new(StringArray::from(vec![Some("a"), Some("x")])); + let struct_nulls = NullBuffer::from(vec![true, false]); + let entries = + StructArray::try_new(fields.clone(), vec![keys, values], Some(struct_nulls)) + .unwrap(); + let list_field = Arc::new(Field::new("item", DataType::Struct(fields), true)); + let list = ListArray::try_new( + list_field, + OffsetBuffer::new(vec![0, 2].into()), + Arc::new(entries), + None, + ) + .unwrap(); + let input: ArrayRef = Arc::new(list); + let err = map_from_entries_inner(&[input], MapKeyDedupPolicy::Exception) + .expect_err("should error on null struct entry"); + assert!( + err.to_string().contains("[NULL_MAP_KEY]"), + "unexpected error: {err}" + ); + } + + #[test] + fn test_map_from_entries_null_key_throws() { + // Build List where the struct itself is non-null but the key column has a null. + let fields = Fields::from(vec![ + Field::new("key", DataType::Int32, true), + Field::new("value", DataType::Utf8, true), + ]); + let keys: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None])); + let values: ArrayRef = Arc::new(StringArray::from(vec![Some("a"), Some("b")])); + let entries = + StructArray::try_new(fields.clone(), vec![keys, values], None).unwrap(); + let list_field = Arc::new(Field::new("item", DataType::Struct(fields), false)); + let list = ListArray::try_new( + list_field, + OffsetBuffer::new(vec![0, 2].into()), + Arc::new(entries), + None, + ) + .unwrap(); + let input: ArrayRef = Arc::new(list); + let err = map_from_entries_inner(&[input], MapKeyDedupPolicy::Exception) + .expect_err("should error on null key"); + assert!( + err.to_string().contains("[NULL_MAP_KEY]"), + "unexpected error: {err}" + ); + } } diff --git a/datafusion/spark/src/function/map/utils.rs b/datafusion/spark/src/function/map/utils.rs index f5fff0c4b4c46..75954dbc35147 100644 --- a/datafusion/spark/src/function/map/utils.rs +++ b/datafusion/spark/src/function/map/utils.rs @@ -25,6 +25,31 @@ use arrow::compute::filter; use arrow::datatypes::{DataType, Field, Fields}; use datafusion_common::{Result, ScalarValue, exec_err}; +/// Policy for handling duplicate keys when constructing a Spark `MapType`. +/// +/// Mirrors Spark's [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961). +/// Spark's default is [`Exception`](Self::Exception). +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)] +pub enum MapKeyDedupPolicy { + /// Raise a runtime error when a duplicate key is encountered (Spark default). + #[default] + Exception, + /// Keep the last occurrence of each key. + LastWin, +} + +impl MapKeyDedupPolicy { + /// Parse from a case-insensitive string. Unknown values fall back to + /// [`MapKeyDedupPolicy::Exception`] (Spark's default). + pub fn from_config_str(value: &str) -> Self { + if value.eq_ignore_ascii_case("LAST_WIN") { + Self::LastWin + } else { + Self::Exception + } + } +} + /// Helper function to get element [`DataType`] /// from [`List`](DataType::List)/[`LargeList`](DataType::LargeList)/[`FixedSizeList`](DataType::FixedSizeList)
/// [`Null`](DataType::Null) can be coerced to `ListType`([`Null`](DataType::Null)), so [`Null`](DataType::Null) is returned
@@ -111,13 +136,8 @@ pub fn map_type_from_key_value_types( /// So the inputs can be [`ListArray`](`arrow::array::ListArray`)/[`LargeListArray`](`arrow::array::LargeListArray`)/[`FixedSizeListArray`](`arrow::array::FixedSizeListArray`)
/// To preserve the row info, [`offsets`](arrow::array::ListArray::offsets) and [`nulls`](arrow::array::ListArray::nulls) for both keys and values need to be provided
/// [`FixedSizeListArray`](`arrow::array::FixedSizeListArray`) has no `offsets`, so they can be generated as a cumulative sum of it's `Size` -/// 2. Spark provides [spark.sql.mapKeyDedupPolicy](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961) -/// to handle duplicate keys
-/// For now, configurable functions are not supported by Datafusion
-/// So more permissive `LAST_WIN` option is used in this implementation (instead of `EXCEPTION`)
-/// `EXCEPTION` behaviour can still be achieved externally in cost of performance:
-/// `when(array_length(array_distinct(keys)) == array_length(keys), constructed_map)`
-/// `.otherwise(raise_error("duplicate keys occurred during map construction"))` +/// 2. Duplicate key handling follows [`MapKeyDedupPolicy`], mirroring Spark's +/// [spark.sql.mapKeyDedupPolicy](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961). pub fn map_from_keys_values_offsets_nulls( flat_keys: &ArrayRef, flat_values: &ArrayRef, @@ -125,6 +145,7 @@ pub fn map_from_keys_values_offsets_nulls( values_offsets: &[i32], keys_nulls: Option<&NullBuffer>, values_nulls: Option<&NullBuffer>, + dedup_policy: MapKeyDedupPolicy, ) -> Result { let (keys, values, offsets) = map_deduplicate_keys( flat_keys, @@ -133,6 +154,7 @@ pub fn map_from_keys_values_offsets_nulls( values_offsets, keys_nulls, values_nulls, + dedup_policy, )?; let nulls = NullBuffer::union(keys_nulls, values_nulls); @@ -155,6 +177,7 @@ fn map_deduplicate_keys( values_offsets: &[i32], keys_nulls: Option<&NullBuffer>, values_nulls: Option<&NullBuffer>, + dedup_policy: MapKeyDedupPolicy, ) -> Result<(ArrayRef, ArrayRef, OffsetBuffer)> { let offsets_len = keys_offsets.len(); let mut new_offsets = Vec::with_capacity(offsets_len); @@ -203,11 +226,22 @@ fn map_deduplicate_keys( )? .compacted(); if seen_keys.contains(&key) { - // TODO: implement configuration and logic for spark.sql.mapKeyDedupPolicy=EXCEPTION (this is default spark-config) - // exec_err!("invalid argument: duplicate keys in map") - // https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961 + match dedup_policy { + MapKeyDedupPolicy::Exception => { + // Message matches Spark's `duplicateMapKeyFoundError` + // (org.apache.spark.sql.errors.QueryExecutionErrors). + return exec_err!( + "[DUPLICATED_MAP_KEY] Duplicate map key {key} was found, \ + please check the input data. If you want to remove the \ + duplicated keys, you can set spark.sql.mapKeyDedupPolicy \ + to LAST_WIN so that the key inserted at last takes precedence." + ); + } + MapKeyDedupPolicy::LastWin => { + // Earlier occurrence is dropped; the later (already-kept) entry wins. + } + } } else { - // This code implements deduplication logic for spark.sql.mapKeyDedupPolicy=LAST_WIN (this is NOT default spark-config) keys_mask_one[cur_entry_idx] = true; values_mask_one[cur_entry_idx] = true; seen_keys.insert(key); From ba3c933d0851fadb34108da74c06100ef997fc6d Mon Sep 17 00:00:00 2001 From: Sudarshan Date: Sat, 18 Apr 2026 22:04:06 +0530 Subject: [PATCH 2/9] removed MapKeyDedupPolicy --- datafusion/common/src/config.rs | 10 - .../spark/src/function/map/map_from_arrays.rs | 3 +- .../src/function/map/map_from_entries.rs | 242 ++---------------- datafusion/spark/src/function/map/utils.rs | 69 ++--- .../test_files/spark/map/map_from_arrays.slt | 6 +- .../test_files/spark/map/map_from_entries.slt | 12 +- 6 files changed, 51 insertions(+), 291 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index b5021d6bd05f8..85361ef5e17e1 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -692,16 +692,6 @@ config_namespace! { /// `false` — ANSI SQL mode is disabled by default. pub enable_ansi_mode: bool, default = false - /// Duplicate-key policy used by Spark-compatible map construction functions - /// (e.g. `map_from_entries`, `map_from_arrays`). - /// - /// The flag is experimental and relevant only for DataFusion Spark built-in functions. - /// It mirrors Spark's [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala). - /// - /// Accepted values (case-insensitive): `"EXCEPTION"` (default, matches Spark's - /// default) and `"LAST_WIN"`. Any other value falls back to `"EXCEPTION"`. - pub map_key_dedup_policy: String, default = "EXCEPTION".to_string() - /// How many bytes to buffer in the probe side of hash joins while the build side is /// concurrently being built. /// diff --git a/datafusion/spark/src/function/map/map_from_arrays.rs b/datafusion/spark/src/function/map/map_from_arrays.rs index b096e2faf5350..692e837d00f5e 100644 --- a/datafusion/spark/src/function/map/map_from_arrays.rs +++ b/datafusion/spark/src/function/map/map_from_arrays.rs @@ -16,7 +16,7 @@ // under the License. use crate::function::map::utils::{ - MapKeyDedupPolicy, get_element_type, get_list_offsets, get_list_values, + get_element_type, get_list_offsets, get_list_values, map_from_keys_values_offsets_nulls, map_type_from_key_value_types, }; use arrow::array::{Array, ArrayRef, NullArray}; @@ -105,7 +105,6 @@ fn map_from_arrays_inner(args: &[ArrayRef]) -> Result { &get_list_offsets(values)?, keys.nulls(), values.nulls(), - MapKeyDedupPolicy::Exception, ) } diff --git a/datafusion/spark/src/function/map/map_from_entries.rs b/datafusion/spark/src/function/map/map_from_entries.rs index 698ca719006d1..facf9f8c53473 100644 --- a/datafusion/spark/src/function/map/map_from_entries.rs +++ b/datafusion/spark/src/function/map/map_from_entries.rs @@ -18,10 +18,11 @@ use std::sync::Arc; use crate::function::map::utils::{ - MapKeyDedupPolicy, get_list_offsets, get_list_values, - map_from_keys_values_offsets_nulls, map_type_from_key_value_types, + get_list_offsets, get_list_values, map_from_keys_values_offsets_nulls, + map_type_from_key_value_types, }; -use arrow::array::{Array, ArrayRef, StructArray}; +use arrow::array::{Array, ArrayRef, NullBufferBuilder, StructArray}; +use arrow::buffer::NullBuffer; use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::utils::take_function_args; use datafusion_common::{Result, exec_err, internal_err}; @@ -100,20 +101,11 @@ impl ScalarUDFImpl for MapFromEntries { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - let dedup_policy = MapKeyDedupPolicy::from_config_str( - &args.config_options.execution.map_key_dedup_policy, - ); - make_scalar_function( - move |arrays: &[ArrayRef]| map_from_entries_inner(arrays, dedup_policy), - vec![], - )(&args.args) + make_scalar_function(map_from_entries_inner, vec![])(&args.args) } } -fn map_from_entries_inner( - args: &[ArrayRef], - dedup_policy: MapKeyDedupPolicy, -) -> Result { +fn map_from_entries_inner(args: &[ArrayRef]) -> Result { let [entries] = take_function_args("map_from_entries", args)?; let entries_offsets = get_list_offsets(entries)?; let entries_values = get_list_values(entries)?; @@ -127,36 +119,27 @@ fn map_from_entries_inner( ), }?; - // Spark throws on: - // * a null struct entry inside a non-null list row — Spark error class `NULL_MAP_KEY` - // (see `QueryExecutionErrors.nullAsMapKeyNotAllowedError`) - // * a null key inside a non-null struct entry — Spark error class `NULL_MAP_KEY` - // A null outer list row is valid and propagates to a null output row. - let outer_nulls = entries.nulls(); - let struct_nulls = entries_values.nulls(); - let key_nulls = flat_keys.nulls(); - - if struct_nulls.is_some() || key_nulls.is_some() { - let start = entries_offsets + let entries_with_nulls = entries_values.nulls().and_then(|entries_inner_nulls| { + let mut builder = NullBufferBuilder::new_with_len(0); + let mut cur_offset = entries_offsets .first() .map(|offset| *offset as usize) .unwrap_or(0); - let mut cur_offset = start; - for (row_idx, next_offset) in entries_offsets.iter().skip(1).enumerate() { - let next = *next_offset as usize; - let row_is_null = outer_nulls.is_some_and(|n| n.is_null(row_idx)); - if !row_is_null { - for i in cur_offset..next { - if struct_nulls.is_some_and(|n| n.is_null(i)) - || key_nulls.is_some_and(|n| n.is_null(i)) - { - return exec_err!("[NULL_MAP_KEY] Cannot use null as map key."); - } - } - } - cur_offset = next; + + for next_offset in entries_offsets.iter().skip(1) { + let num_entries = *next_offset as usize - cur_offset; + builder.append( + entries_inner_nulls + .slice(cur_offset, num_entries) + .null_count() + == 0, + ); + cur_offset = *next_offset as usize; } - } + builder.finish() + }); + + let res_nulls = NullBuffer::union(entries.nulls(), entries_with_nulls.as_ref()); map_from_keys_values_offsets_nulls( flat_keys, @@ -164,19 +147,13 @@ fn map_from_entries_inner( &entries_offsets, &entries_offsets, None, - outer_nulls, - dedup_policy, + res_nulls.as_ref(), ) } #[cfg(test)] mod tests { use super::*; - use arrow::array::{ - Int32Array, Int32Builder, ListArray, MapArray, StringArray, StringBuilder, - StructArray, - }; - use arrow::buffer::{NullBuffer, OffsetBuffer}; use arrow::datatypes::Fields; fn make_entries_field(array_nullable: bool, element_nullable: bool) -> FieldRef { @@ -230,175 +207,4 @@ mod tests { assert!(result.is_nullable()); assert_eq!(result.data_type(), &expected_type); } - - fn struct_fields() -> Fields { - Fields::from(vec![ - Field::new("key", DataType::Int32, false), - Field::new("value", DataType::Utf8, true), - ]) - } - - type TestRow<'a> = Option)>>; - - /// Build a `List>` from per-row entries. - /// `rows` is a list of rows; `None` means the outer list row is null; a row is a vector - /// of `(key, value)` pairs where `key` is always present and `value` may be `None`. - fn build_list(rows: Vec) -> ArrayRef { - let fields = struct_fields(); - let mut key_builder = Int32Builder::new(); - let mut val_builder = StringBuilder::new(); - let mut offsets: Vec = vec![0]; - let mut nulls = vec![]; - let mut cur: i32 = 0; - for row in rows { - match row { - Some(entries) => { - for (k, v) in entries { - key_builder.append_value(k); - match v { - Some(s) => val_builder.append_value(s), - None => val_builder.append_null(), - } - cur += 1; - } - nulls.push(true); - } - None => nulls.push(false), - } - offsets.push(cur); - } - let keys: ArrayRef = Arc::new(key_builder.finish()); - let values: ArrayRef = Arc::new(val_builder.finish()); - let entries = StructArray::try_new(fields.clone(), vec![keys, values], None) - .expect("struct array"); - let list_field = Arc::new(Field::new("item", DataType::Struct(fields), false)); - let list = ListArray::try_new( - list_field, - OffsetBuffer::new(offsets.into()), - Arc::new(entries), - Some(NullBuffer::from(nulls)), - ) - .expect("list array"); - Arc::new(list) - } - - #[test] - fn test_map_from_entries_happy_path() { - let input = build_list(vec![ - Some(vec![(1, Some("a")), (2, Some("b"))]), - Some(vec![]), - None, - ]); - let out = map_from_entries_inner(&[input], MapKeyDedupPolicy::Exception).unwrap(); - let map = out.as_any().downcast_ref::().unwrap(); - assert_eq!(map.len(), 3); - assert!(!map.is_null(0)); - assert!(!map.is_null(1)); - assert!(map.is_null(2)); - let row0 = map.value(0); - let row0 = row0.as_any().downcast_ref::().unwrap(); - let keys = row0 - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - let values = row0 - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(keys.values(), &[1, 2]); - assert_eq!(values.value(0), "a"); - assert_eq!(values.value(1), "b"); - } - - #[test] - fn test_map_from_entries_duplicate_keys_exception() { - let input = build_list(vec![Some(vec![(1, Some("a")), (1, Some("b"))])]); - let err = map_from_entries_inner(&[input], MapKeyDedupPolicy::Exception) - .expect_err("should error on duplicate key under Exception policy"); - assert!( - err.to_string().contains("[DUPLICATED_MAP_KEY]"), - "unexpected error: {err}" - ); - } - - #[test] - fn test_map_from_entries_duplicate_keys_last_win() { - let input = build_list(vec![Some(vec![(1, Some("a")), (1, Some("b"))])]); - let out = map_from_entries_inner(&[input], MapKeyDedupPolicy::LastWin).unwrap(); - let map = out.as_any().downcast_ref::().unwrap(); - assert_eq!(map.len(), 1); - let row0 = map.value(0); - let row0 = row0.as_any().downcast_ref::().unwrap(); - let keys = row0 - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - let values = row0 - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(keys.len(), 1); - assert_eq!(keys.value(0), 1); - assert_eq!(values.value(0), "b"); - } - - #[test] - fn test_map_from_entries_null_struct_entry_throws() { - // Build List where the struct element has a null at position 1 - // inside a non-null list row. - let fields = struct_fields(); - let keys: ArrayRef = Arc::new(Int32Array::from(vec![1, 0])); - let values: ArrayRef = Arc::new(StringArray::from(vec![Some("a"), Some("x")])); - let struct_nulls = NullBuffer::from(vec![true, false]); - let entries = - StructArray::try_new(fields.clone(), vec![keys, values], Some(struct_nulls)) - .unwrap(); - let list_field = Arc::new(Field::new("item", DataType::Struct(fields), true)); - let list = ListArray::try_new( - list_field, - OffsetBuffer::new(vec![0, 2].into()), - Arc::new(entries), - None, - ) - .unwrap(); - let input: ArrayRef = Arc::new(list); - let err = map_from_entries_inner(&[input], MapKeyDedupPolicy::Exception) - .expect_err("should error on null struct entry"); - assert!( - err.to_string().contains("[NULL_MAP_KEY]"), - "unexpected error: {err}" - ); - } - - #[test] - fn test_map_from_entries_null_key_throws() { - // Build List where the struct itself is non-null but the key column has a null. - let fields = Fields::from(vec![ - Field::new("key", DataType::Int32, true), - Field::new("value", DataType::Utf8, true), - ]); - let keys: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None])); - let values: ArrayRef = Arc::new(StringArray::from(vec![Some("a"), Some("b")])); - let entries = - StructArray::try_new(fields.clone(), vec![keys, values], None).unwrap(); - let list_field = Arc::new(Field::new("item", DataType::Struct(fields), false)); - let list = ListArray::try_new( - list_field, - OffsetBuffer::new(vec![0, 2].into()), - Arc::new(entries), - None, - ) - .unwrap(); - let input: ArrayRef = Arc::new(list); - let err = map_from_entries_inner(&[input], MapKeyDedupPolicy::Exception) - .expect_err("should error on null key"); - assert!( - err.to_string().contains("[NULL_MAP_KEY]"), - "unexpected error: {err}" - ); - } } diff --git a/datafusion/spark/src/function/map/utils.rs b/datafusion/spark/src/function/map/utils.rs index 75954dbc35147..0cf5b45ffd0b7 100644 --- a/datafusion/spark/src/function/map/utils.rs +++ b/datafusion/spark/src/function/map/utils.rs @@ -25,31 +25,6 @@ use arrow::compute::filter; use arrow::datatypes::{DataType, Field, Fields}; use datafusion_common::{Result, ScalarValue, exec_err}; -/// Policy for handling duplicate keys when constructing a Spark `MapType`. -/// -/// Mirrors Spark's [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961). -/// Spark's default is [`Exception`](Self::Exception). -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)] -pub enum MapKeyDedupPolicy { - /// Raise a runtime error when a duplicate key is encountered (Spark default). - #[default] - Exception, - /// Keep the last occurrence of each key. - LastWin, -} - -impl MapKeyDedupPolicy { - /// Parse from a case-insensitive string. Unknown values fall back to - /// [`MapKeyDedupPolicy::Exception`] (Spark's default). - pub fn from_config_str(value: &str) -> Self { - if value.eq_ignore_ascii_case("LAST_WIN") { - Self::LastWin - } else { - Self::Exception - } - } -} - /// Helper function to get element [`DataType`] /// from [`List`](DataType::List)/[`LargeList`](DataType::LargeList)/[`FixedSizeList`](DataType::FixedSizeList)
/// [`Null`](DataType::Null) can be coerced to `ListType`([`Null`](DataType::Null)), so [`Null`](DataType::Null) is returned
@@ -136,8 +111,13 @@ pub fn map_type_from_key_value_types( /// So the inputs can be [`ListArray`](`arrow::array::ListArray`)/[`LargeListArray`](`arrow::array::LargeListArray`)/[`FixedSizeListArray`](`arrow::array::FixedSizeListArray`)
/// To preserve the row info, [`offsets`](arrow::array::ListArray::offsets) and [`nulls`](arrow::array::ListArray::nulls) for both keys and values need to be provided
/// [`FixedSizeListArray`](`arrow::array::FixedSizeListArray`) has no `offsets`, so they can be generated as a cumulative sum of it's `Size` -/// 2. Duplicate key handling follows [`MapKeyDedupPolicy`], mirroring Spark's -/// [spark.sql.mapKeyDedupPolicy](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961). +/// 2. Spark provides [spark.sql.mapKeyDedupPolicy](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961) +/// to handle duplicate keys
+/// For now, configurable functions are not supported by Datafusion
+/// So more permissive `LAST_WIN` option is used in this implementation (instead of `EXCEPTION`)
+/// `EXCEPTION` behaviour can still be achieved externally in cost of performance:
+/// `when(array_length(array_distinct(keys)) == array_length(keys), constructed_map)`
+/// `.otherwise(raise_error("duplicate keys occurred during map construction"))` pub fn map_from_keys_values_offsets_nulls( flat_keys: &ArrayRef, flat_values: &ArrayRef, @@ -145,7 +125,6 @@ pub fn map_from_keys_values_offsets_nulls( values_offsets: &[i32], keys_nulls: Option<&NullBuffer>, values_nulls: Option<&NullBuffer>, - dedup_policy: MapKeyDedupPolicy, ) -> Result { let (keys, values, offsets) = map_deduplicate_keys( flat_keys, @@ -154,7 +133,6 @@ pub fn map_from_keys_values_offsets_nulls( values_offsets, keys_nulls, values_nulls, - dedup_policy, )?; let nulls = NullBuffer::union(keys_nulls, values_nulls); @@ -177,7 +155,6 @@ fn map_deduplicate_keys( values_offsets: &[i32], keys_nulls: Option<&NullBuffer>, values_nulls: Option<&NullBuffer>, - dedup_policy: MapKeyDedupPolicy, ) -> Result<(ArrayRef, ArrayRef, OffsetBuffer)> { let offsets_len = keys_offsets.len(); let mut new_offsets = Vec::with_capacity(offsets_len); @@ -225,28 +202,20 @@ fn map_deduplicate_keys( cur_keys_offset + cur_entry_idx, )? .compacted(); + // Enforce Spark's default `spark.sql.mapKeyDedupPolicy=EXCEPTION`. + // Native LAST_WIN support is deferred to a follow-up. if seen_keys.contains(&key) { - match dedup_policy { - MapKeyDedupPolicy::Exception => { - // Message matches Spark's `duplicateMapKeyFoundError` - // (org.apache.spark.sql.errors.QueryExecutionErrors). - return exec_err!( - "[DUPLICATED_MAP_KEY] Duplicate map key {key} was found, \ - please check the input data. If you want to remove the \ - duplicated keys, you can set spark.sql.mapKeyDedupPolicy \ - to LAST_WIN so that the key inserted at last takes precedence." - ); - } - MapKeyDedupPolicy::LastWin => { - // Earlier occurrence is dropped; the later (already-kept) entry wins. - } - } - } else { - keys_mask_one[cur_entry_idx] = true; - values_mask_one[cur_entry_idx] = true; - seen_keys.insert(key); - new_last_offset += 1; + return exec_err!( + "[DUPLICATED_MAP_KEY] Duplicate map key {key} was found, \ + please check the input data. If you want to remove the \ + duplicated keys, you can set spark.sql.mapKeyDedupPolicy \ + to LAST_WIN so that the key inserted at last takes precedence." + ); } + keys_mask_one[cur_entry_idx] = true; + values_mask_one[cur_entry_idx] = true; + seen_keys.insert(key); + new_last_offset += 1; } } } else { diff --git a/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt b/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt index a26b0435c9291..c961f183befd9 100644 --- a/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt +++ b/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt @@ -118,11 +118,9 @@ SELECT ---- {outer_key1: {inner_a: 1, inner_b: 2}, outer_key2: {inner_x: 10, inner_y: 20, inner_z: 30}} -# Test with duplicate keys -query ? +# Test with duplicate keys: raises DUPLICATED_MAP_KEY under Spark's default policy +query error DataFusion error: Execution error: \[DUPLICATED_MAP_KEY\] Duplicate map key true was found SELECT map_from_arrays(array(true, false, true), array('a', NULL, 'b')); ----- -{false: NULL, true: b} # Tests with different list types query ? diff --git a/datafusion/sqllogictest/test_files/spark/map/map_from_entries.slt b/datafusion/sqllogictest/test_files/spark/map/map_from_entries.slt index 19b46886a027e..7aec6b3264f4a 100644 --- a/datafusion/sqllogictest/test_files/spark/map/map_from_entries.slt +++ b/datafusion/sqllogictest/test_files/spark/map/map_from_entries.slt @@ -151,14 +151,12 @@ SELECT ---- {outer_key1: {inner_a: 1, inner_b: 2}, outer_key2: {inner_x: 10, inner_y: 20, inner_z: 30}} -# Test with duplicate keys -query ? +# Test with duplicate keys: raises DUPLICATED_MAP_KEY under Spark's default policy +query error DataFusion error: Execution error: \[DUPLICATED_MAP_KEY\] Duplicate map key true was found SELECT map_from_entries(array( - struct(true, 'a'), - struct(false, 'b'), + struct(true, 'a'), + struct(false, 'b'), struct(true, 'c'), - struct(false, cast(NULL as string)), + struct(false, cast(NULL as string)), struct(true, 'd') )); ----- -{false: NULL, true: d} From b2a210212ae15613d19345ddc3855b06c4650619 Mon Sep 17 00:00:00 2001 From: Sudarshan Date: Mon, 20 Apr 2026 01:00:49 +0530 Subject: [PATCH 3/9] changed error message --- datafusion/spark/src/function/map/utils.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/spark/src/function/map/utils.rs b/datafusion/spark/src/function/map/utils.rs index 0cf5b45ffd0b7..9ffc99847b644 100644 --- a/datafusion/spark/src/function/map/utils.rs +++ b/datafusion/spark/src/function/map/utils.rs @@ -207,9 +207,9 @@ fn map_deduplicate_keys( if seen_keys.contains(&key) { return exec_err!( "[DUPLICATED_MAP_KEY] Duplicate map key {key} was found, \ - please check the input data. If you want to remove the \ - duplicated keys, you can set spark.sql.mapKeyDedupPolicy \ - to LAST_WIN so that the key inserted at last takes precedence." + please check the input data. DataFusion currently only \ + supports the EXCEPTION policy for duplicate map keys; \ + deduplicate the input before calling this function." ); } keys_mask_one[cur_entry_idx] = true; From db7dddbec2fe406acd20331e91be97ce8cd60ab2 Mon Sep 17 00:00:00 2001 From: Sudarshan Date: Mon, 20 Apr 2026 01:19:49 +0530 Subject: [PATCH 4/9] revert unwanted formatting, added tests --- .../test_files/spark/map/map_from_arrays.slt | 16 ++++++++++++++ .../test_files/spark/map/map_from_entries.slt | 22 ++++++++++++++++--- .../test_files/spark/map/str_to_map.slt | 15 +++++++++++++ 3 files changed, 50 insertions(+), 3 deletions(-) diff --git a/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt b/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt index c961f183befd9..ea9c5cf24f8d3 100644 --- a/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt +++ b/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt @@ -122,6 +122,22 @@ SELECT query error DataFusion error: Execution error: \[DUPLICATED_MAP_KEY\] Duplicate map key true was found SELECT map_from_arrays(array(true, false, true), array('a', NULL, 'b')); +# Integer keys with a duplicate also raise DUPLICATED_MAP_KEY. +query error DataFusion error: Execution error: \[DUPLICATED_MAP_KEY\] Duplicate map key 1 was found +SELECT map_from_arrays(array(1, 2, 1), array('a', 'b', 'c')); + +# String keys with a duplicate also raise DUPLICATED_MAP_KEY. +query error DataFusion error: Execution error: \[DUPLICATED_MAP_KEY\] Duplicate map key k was found +SELECT map_from_arrays(array('k', 'k', 'k'), array(1, 2, 3)); + +# Multi-row: a clean row and a duplicate row still errors. +query error DataFusion error: Execution error: \[DUPLICATED_MAP_KEY\] Duplicate map key 1 was found +SELECT map_from_arrays(a, b) +FROM values + (array[1, 2], array['a', 'b']), + (array[1, 1], array['x', 'y']) +AS tab(a, b); + # Tests with different list types query ? SELECT map_from_arrays(arrow_cast(array(2, 5), 'LargeList(Int32)'), arrow_cast(array('a', 'b'), 'FixedSizeList(2, Utf8)')); diff --git a/datafusion/sqllogictest/test_files/spark/map/map_from_entries.slt b/datafusion/sqllogictest/test_files/spark/map/map_from_entries.slt index 7aec6b3264f4a..3acf338d2bb34 100644 --- a/datafusion/sqllogictest/test_files/spark/map/map_from_entries.slt +++ b/datafusion/sqllogictest/test_files/spark/map/map_from_entries.slt @@ -154,9 +154,25 @@ SELECT # Test with duplicate keys: raises DUPLICATED_MAP_KEY under Spark's default policy query error DataFusion error: Execution error: \[DUPLICATED_MAP_KEY\] Duplicate map key true was found SELECT map_from_entries(array( - struct(true, 'a'), - struct(false, 'b'), + struct(true, 'a'), + struct(false, 'b'), struct(true, 'c'), - struct(false, cast(NULL as string)), + struct(false, cast(NULL as string)), struct(true, 'd') )); + +# Integer keys with a duplicate also raise DUPLICATED_MAP_KEY. +query error DataFusion error: Execution error: \[DUPLICATED_MAP_KEY\] Duplicate map key 1 was found +SELECT map_from_entries(array(struct(1, 'a'), struct(2, 'b'), struct(1, 'c'))); + +# String keys with triple occurrence also raise DUPLICATED_MAP_KEY. +query error DataFusion error: Execution error: \[DUPLICATED_MAP_KEY\] Duplicate map key k was found +SELECT map_from_entries(array(struct('k', 1), struct('k', 2), struct('k', 3))); + +# Multi-row: a clean row followed by a duplicate row still errors. +query error DataFusion error: Execution error: \[DUPLICATED_MAP_KEY\] Duplicate map key 1 was found +SELECT map_from_entries(data) +FROM values + (array[struct(1, 'a'), struct(2, 'b')]), + (array[struct(1, 'x'), struct(1, 'y')]) +AS tab(data); diff --git a/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt b/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt index 30d1672aef0ae..ee29afef4e579 100644 --- a/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt +++ b/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt @@ -69,6 +69,21 @@ statement error Duplicate map key SELECT str_to_map('a:1,b:2,a:3'); +# Triple+ occurrences of the same key still raise DUPLICATED_MAP_KEY. +statement error +Duplicate map key 'a' +SELECT str_to_map('a:1,a:2,a:3'); + +# Duplicate where one occurrence is missing the kv_delim (value = NULL) still errors. +statement error +Duplicate map key 'a' +SELECT str_to_map('a,b:2,a:3'); + +# Multi-row input: a clean row followed by a duplicate row fails on the duplicate row. +statement error +Duplicate map key 'a' +SELECT str_to_map(col) FROM (VALUES ('a:1,b:2'), ('a:3,a:4')) AS t(col); + # Additional tests (DataFusion-specific) # NULL input returns NULL From 8858003f18ea29b8dc1bba869c593bbaf9598c1b Mon Sep 17 00:00:00 2001 From: Sudarshan Date: Mon, 27 Apr 2026 22:39:16 +0530 Subject: [PATCH 5/9] added mapKeyDedupPolicy, LAST_WIN for MapFromEntries --- datafusion/common/src/config.rs | 11 + .../spark/src/function/map/map_from_arrays.rs | 12 +- .../src/function/map/map_from_entries.rs | 13 +- .../spark/src/function/map/str_to_map.rs | 120 +++++--- datafusion/spark/src/function/map/utils.rs | 256 +++++++++++++++--- .../test_files/spark/map/map_from_arrays.slt | 37 +++ .../test_files/spark/map/map_from_entries.slt | 39 +++ .../test_files/spark/map/str_to_map.slt | 44 ++- 8 files changed, 446 insertions(+), 86 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 85361ef5e17e1..5332e2f669538 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -692,6 +692,17 @@ config_namespace! { /// `false` — ANSI SQL mode is disabled by default. pub enable_ansi_mode: bool, default = false + /// Policy for handling duplicate keys in Spark-compatible map-construction + /// functions (`map_from_arrays`, `map_from_entries`, `str_to_map`). + /// + /// Mirrors Spark's + /// [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961): + /// - `"EXCEPTION"` (default): raise `[DUPLICATED_MAP_KEY]` at runtime on any duplicate key. + /// - `"LAST_WIN"`: keep the last occurrence of each duplicate key. + /// + /// Values are case-insensitive. Only affects functions under `datafusion/spark`. + pub map_key_dedup_policy: String, default = "EXCEPTION".to_string() + /// How many bytes to buffer in the probe side of hash joins while the build side is /// concurrently being built. /// diff --git a/datafusion/spark/src/function/map/map_from_arrays.rs b/datafusion/spark/src/function/map/map_from_arrays.rs index 692e837d00f5e..10c57deafaf87 100644 --- a/datafusion/spark/src/function/map/map_from_arrays.rs +++ b/datafusion/spark/src/function/map/map_from_arrays.rs @@ -18,6 +18,7 @@ use crate::function::map::utils::{ get_element_type, get_list_offsets, get_list_values, map_from_keys_values_offsets_nulls, map_type_from_key_value_types, + parse_map_key_dedup_policy, }; use arrow::array::{Array, ArrayRef, NullArray}; use arrow::compute::kernels::cast; @@ -81,11 +82,17 @@ impl ScalarUDFImpl for MapFromArrays { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - make_scalar_function(map_from_arrays_inner, vec![])(&args.args) + let last_value_wins = parse_map_key_dedup_policy( + &args.config_options.execution.map_key_dedup_policy, + )?; + make_scalar_function( + move |args: &[ArrayRef]| map_from_arrays_inner(args, last_value_wins), + vec![], + )(&args.args) } } -fn map_from_arrays_inner(args: &[ArrayRef]) -> Result { +fn map_from_arrays_inner(args: &[ArrayRef], last_value_wins: bool) -> Result { let [keys, values] = take_function_args("map_from_arrays", args)?; if *keys.data_type() == DataType::Null || *values.data_type() == DataType::Null { @@ -105,6 +112,7 @@ fn map_from_arrays_inner(args: &[ArrayRef]) -> Result { &get_list_offsets(values)?, keys.nulls(), values.nulls(), + last_value_wins, ) } diff --git a/datafusion/spark/src/function/map/map_from_entries.rs b/datafusion/spark/src/function/map/map_from_entries.rs index facf9f8c53473..c6254151ee41c 100644 --- a/datafusion/spark/src/function/map/map_from_entries.rs +++ b/datafusion/spark/src/function/map/map_from_entries.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use crate::function::map::utils::{ get_list_offsets, get_list_values, map_from_keys_values_offsets_nulls, - map_type_from_key_value_types, + map_type_from_key_value_types, parse_map_key_dedup_policy, }; use arrow::array::{Array, ArrayRef, NullBufferBuilder, StructArray}; use arrow::buffer::NullBuffer; @@ -101,11 +101,17 @@ impl ScalarUDFImpl for MapFromEntries { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - make_scalar_function(map_from_entries_inner, vec![])(&args.args) + let last_value_wins = parse_map_key_dedup_policy( + &args.config_options.execution.map_key_dedup_policy, + )?; + make_scalar_function( + move |args: &[ArrayRef]| map_from_entries_inner(args, last_value_wins), + vec![], + )(&args.args) } } -fn map_from_entries_inner(args: &[ArrayRef]) -> Result { +fn map_from_entries_inner(args: &[ArrayRef], last_value_wins: bool) -> Result { let [entries] = take_function_args("map_from_entries", args)?; let entries_offsets = get_list_offsets(entries)?; let entries_values = get_list_values(entries)?; @@ -148,6 +154,7 @@ fn map_from_entries_inner(args: &[ArrayRef]) -> Result { &entries_offsets, None, res_nulls.as_ref(), + last_value_wins, ) } diff --git a/datafusion/spark/src/function/map/str_to_map.rs b/datafusion/spark/src/function/map/str_to_map.rs index c603e775a6031..da4a3e8b1d759 100644 --- a/datafusion/spark/src/function/map/str_to_map.rs +++ b/datafusion/spark/src/function/map/str_to_map.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use arrow::array::{ @@ -32,7 +32,9 @@ use datafusion_expr::{ TypeSignature, Volatility, }; -use crate::function::map::utils::map_type_from_key_value_types; +use crate::function::map::utils::{ + map_type_from_key_value_types, parse_map_key_dedup_policy, +}; const DEFAULT_PAIR_DELIM: &str = ","; const DEFAULT_KV_DELIM: &str = ":"; @@ -48,11 +50,10 @@ const DEFAULT_KV_DELIM: &str = ":"; /// - keyValueDelim: Delimiter between key and value (default: ':') /// /// # Duplicate Key Handling -/// Uses EXCEPTION behavior (Spark 3.0+ default): errors on duplicate keys. -/// See `spark.sql.mapKeyDedupPolicy`: -/// -/// -/// TODO: Support configurable `spark.sql.mapKeyDedupPolicy` (LAST_WIN) in a follow-up PR. +/// Mirrors Spark's [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/v4.0.0/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4502-L4511), +/// wired through DataFusion's `datafusion.execution.map_key_dedup_policy`: +/// - `EXCEPTION` (default): error on duplicate keys. +/// - `LAST_WIN`: keep the last occurrence of each duplicate key. #[derive(Debug, PartialEq, Eq, Hash)] pub struct SparkStrToMap { signature: Signature, @@ -102,22 +103,33 @@ impl ScalarUDFImpl for SparkStrToMap { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let last_value_wins = parse_map_key_dedup_policy( + &args.config_options.execution.map_key_dedup_policy, + )?; let arrays: Vec = ColumnarValue::values_to_arrays(&args.args)?; - let result = str_to_map_inner(&arrays)?; + let result = str_to_map_inner(&arrays, last_value_wins)?; Ok(ColumnarValue::Array(result)) } } -fn str_to_map_inner(args: &[ArrayRef]) -> Result { +fn str_to_map_inner(args: &[ArrayRef], last_value_wins: bool) -> Result { match args.len() { 1 => match args[0].data_type() { - DataType::Utf8 => str_to_map_impl(as_string_array(&args[0])?, None, None), - DataType::LargeUtf8 => { - str_to_map_impl(as_large_string_array(&args[0])?, None, None) - } - DataType::Utf8View => { - str_to_map_impl(as_string_view_array(&args[0])?, None, None) + DataType::Utf8 => { + str_to_map_impl(as_string_array(&args[0])?, None, None, last_value_wins) } + DataType::LargeUtf8 => str_to_map_impl( + as_large_string_array(&args[0])?, + None, + None, + last_value_wins, + ), + DataType::Utf8View => str_to_map_impl( + as_string_view_array(&args[0])?, + None, + None, + last_value_wins, + ), other => exec_err!( "Unsupported data type {other:?} for str_to_map, \ expected Utf8, LargeUtf8, or Utf8View" @@ -128,16 +140,19 @@ fn str_to_map_inner(args: &[ArrayRef]) -> Result { as_string_array(&args[0])?, Some(as_string_array(&args[1])?), None, + last_value_wins, ), (DataType::LargeUtf8, DataType::LargeUtf8) => str_to_map_impl( as_large_string_array(&args[0])?, Some(as_large_string_array(&args[1])?), None, + last_value_wins, ), (DataType::Utf8View, DataType::Utf8View) => str_to_map_impl( as_string_view_array(&args[0])?, Some(as_string_view_array(&args[1])?), None, + last_value_wins, ), (t1, t2) => exec_err!( "Unsupported data types ({t1:?}, {t2:?}) for str_to_map, \ @@ -153,12 +168,14 @@ fn str_to_map_inner(args: &[ArrayRef]) -> Result { as_string_array(&args[0])?, Some(as_string_array(&args[1])?), Some(as_string_array(&args[2])?), + last_value_wins, ), (DataType::LargeUtf8, DataType::LargeUtf8, DataType::LargeUtf8) => { str_to_map_impl( as_large_string_array(&args[0])?, Some(as_large_string_array(&args[1])?), Some(as_large_string_array(&args[2])?), + last_value_wins, ) } (DataType::Utf8View, DataType::Utf8View, DataType::Utf8View) => { @@ -166,6 +183,7 @@ fn str_to_map_inner(args: &[ArrayRef]) -> Result { as_string_view_array(&args[0])?, Some(as_string_view_array(&args[1])?), Some(as_string_view_array(&args[2])?), + last_value_wins, ) } (t1, t2, t3) => exec_err!( @@ -181,6 +199,7 @@ fn str_to_map_impl<'a, V: StringArrayType<'a> + Copy>( text_array: V, pair_delim_array: Option, kv_delim_array: Option, + last_value_wins: bool, ) -> Result { let num_rows = text_array.len(); @@ -207,6 +226,10 @@ fn str_to_map_impl<'a, V: StringArrayType<'a> + Copy>( ); let mut seen_keys = HashSet::new(); + // LAST_WIN buffers pairs to support in-place value overwrite at the key's + // first-seen position — matches Spark's `ArrayBasedMapBuilder`. + let mut pairs: Vec<(&str, Option<&str>)> = Vec::new(); + let mut key_positions: HashMap<&str, usize> = HashMap::new(); for row_idx in 0..num_rows { if combined_nulls.as_ref().is_some_and(|n| n.is_null(row_idx)) { map_builder.append(false)?; @@ -227,31 +250,56 @@ fn str_to_map_impl<'a, V: StringArrayType<'a> + Copy>( continue; } - seen_keys.clear(); - for pair in text.split(pair_delim) { - if pair.is_empty() { - continue; + if last_value_wins { + pairs.clear(); + key_positions.clear(); + for pair in text.split(pair_delim) { + if pair.is_empty() { + continue; + } + let mut kv_iter = pair.splitn(2, kv_delim); + let key = kv_iter.next().unwrap_or(""); + let value = kv_iter.next(); + match key_positions.get(key) { + Some(&idx) => pairs[idx].1 = value, + None => { + key_positions.insert(key, pairs.len()); + pairs.push((key, value)); + } + } + } + for (key, value) in &pairs { + map_builder.keys().append_value(key); + match value { + Some(v) => map_builder.values().append_value(v), + None => map_builder.values().append_null(), + } } + } else { + seen_keys.clear(); + for pair in text.split(pair_delim) { + if pair.is_empty() { + continue; + } - let mut kv_iter = pair.splitn(2, kv_delim); - let key = kv_iter.next().unwrap_or(""); - let value = kv_iter.next(); + let mut kv_iter = pair.splitn(2, kv_delim); + let key = kv_iter.next().unwrap_or(""); + let value = kv_iter.next(); - // TODO: Support LAST_WIN policy via spark.sql.mapKeyDedupPolicy config - // EXCEPTION policy: error on duplicate keys (Spark 3.0+ default) - if !seen_keys.insert(key) { - return exec_err!( - "Duplicate map key '{key}' was found, please check the input data. \ - If you want to remove the duplicated keys, you can set \ - spark.sql.mapKeyDedupPolicy to \"LAST_WIN\" so that the key \ - inserted at last takes precedence." - ); - } + if !seen_keys.insert(key) { + return exec_err!( + "[DUPLICATED_MAP_KEY] Duplicate map key '{key}' was found, \ + please check the input data. To allow duplicate keys with \ + last-value-wins semantics, set \ + `datafusion.execution.map_key_dedup_policy` to `LAST_WIN`." + ); + } - map_builder.keys().append_value(key); - match value { - Some(v) => map_builder.values().append_value(v), - None => map_builder.values().append_null(), + map_builder.keys().append_value(key); + match value { + Some(v) => map_builder.values().append_value(v), + None => map_builder.values().append_null(), + } } } map_builder.append(true)?; diff --git a/datafusion/spark/src/function/map/utils.rs b/datafusion/spark/src/function/map/utils.rs index 9ffc99847b644..9967cfee2cb6e 100644 --- a/datafusion/spark/src/function/map/utils.rs +++ b/datafusion/spark/src/function/map/utils.rs @@ -16,15 +16,32 @@ // under the License. use std::borrow::Cow; -use std::collections::HashSet; +use std::collections::HashMap; use std::sync::Arc; -use arrow::array::{Array, ArrayRef, AsArray, BooleanBuilder, MapArray, StructArray}; +use arrow::array::{ + Array, ArrayRef, AsArray, BooleanBuilder, Int32Array, MapArray, StructArray, +}; use arrow::buffer::{NullBuffer, OffsetBuffer}; -use arrow::compute::filter; +use arrow::compute::{filter, take}; use arrow::datatypes::{DataType, Field, Fields}; use datafusion_common::{Result, ScalarValue, exec_err}; +/// Parse a `map_key_dedup_policy` config string into a `last_value_wins` bool. +/// +/// Mirrors Spark's `spark.sql.mapKeyDedupPolicy` (case-insensitive): +/// - `"EXCEPTION"` — raise on any duplicate key (default). +/// - `"LAST_WIN"` — keep the last occurrence of each duplicate key. +pub fn parse_map_key_dedup_policy(policy: &str) -> Result { + match policy.to_ascii_uppercase().as_str() { + "EXCEPTION" => Ok(false), + "LAST_WIN" => Ok(true), + other => exec_err!( + "Unknown map_key_dedup_policy '{other}', expected 'EXCEPTION' or 'LAST_WIN'" + ), + } +} + /// Helper function to get element [`DataType`] /// from [`List`](DataType::List)/[`LargeList`](DataType::LargeList)/[`FixedSizeList`](DataType::FixedSizeList)
/// [`Null`](DataType::Null) can be coerced to `ListType`([`Null`](DataType::Null)), so [`Null`](DataType::Null) is returned
@@ -111,13 +128,13 @@ pub fn map_type_from_key_value_types( /// So the inputs can be [`ListArray`](`arrow::array::ListArray`)/[`LargeListArray`](`arrow::array::LargeListArray`)/[`FixedSizeListArray`](`arrow::array::FixedSizeListArray`)
/// To preserve the row info, [`offsets`](arrow::array::ListArray::offsets) and [`nulls`](arrow::array::ListArray::nulls) for both keys and values need to be provided
/// [`FixedSizeListArray`](`arrow::array::FixedSizeListArray`) has no `offsets`, so they can be generated as a cumulative sum of it's `Size` -/// 2. Spark provides [spark.sql.mapKeyDedupPolicy](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961) -/// to handle duplicate keys
-/// For now, configurable functions are not supported by Datafusion
-/// So more permissive `LAST_WIN` option is used in this implementation (instead of `EXCEPTION`)
-/// `EXCEPTION` behaviour can still be achieved externally in cost of performance:
-/// `when(array_length(array_distinct(keys)) == array_length(keys), constructed_map)`
-/// `.otherwise(raise_error("duplicate keys occurred during map construction"))` +/// 2. Duplicate-key handling mirrors Spark's +/// [spark.sql.mapKeyDedupPolicy](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961) +/// and is driven by `last_value_wins`: +/// - `false` (Spark's default `EXCEPTION`): raise `[DUPLICATED_MAP_KEY]` on any duplicate. +/// - `true` (`LAST_WIN`): keep the last occurrence of each duplicate key. +/// +/// Callers wire this from `datafusion.execution.map_key_dedup_policy`. pub fn map_from_keys_values_offsets_nulls( flat_keys: &ArrayRef, flat_values: &ArrayRef, @@ -125,6 +142,7 @@ pub fn map_from_keys_values_offsets_nulls( values_offsets: &[i32], keys_nulls: Option<&NullBuffer>, values_nulls: Option<&NullBuffer>, + last_value_wins: bool, ) -> Result { let (keys, values, offsets) = map_deduplicate_keys( flat_keys, @@ -133,6 +151,7 @@ pub fn map_from_keys_values_offsets_nulls( values_offsets, keys_nulls, values_nulls, + last_value_wins, )?; let nulls = NullBuffer::union(keys_nulls, values_nulls); @@ -155,6 +174,7 @@ fn map_deduplicate_keys( values_offsets: &[i32], keys_nulls: Option<&NullBuffer>, values_nulls: Option<&NullBuffer>, + last_value_wins: bool, ) -> Result<(ArrayRef, ArrayRef, OffsetBuffer)> { let offsets_len = keys_offsets.len(); let mut new_offsets = Vec::with_capacity(offsets_len); @@ -171,8 +191,14 @@ fn map_deduplicate_keys( let mut new_last_offset = 0; new_offsets.push(new_last_offset); + // Mirror Spark's `ArrayBasedMapBuilder`: the first occurrence of a key + // fixes its position in the output; under LAST_WIN a later duplicate + // overwrites that slot's value. `keys_mask` selects the first-seen keys, + // `value_indices` records the source index in `flat_values` to materialize + // for each output slot (updated in place on overwrite). let mut keys_mask_builder = BooleanBuilder::new(); - let mut values_mask_builder = BooleanBuilder::new(); + let mut value_indices: Vec = Vec::new(); + let mut key_to_output_idx: HashMap = HashMap::new(); for (row_idx, (next_keys_offset, next_values_offset)) in keys_offsets .iter() .zip(values_offsets.iter()) @@ -182,9 +208,6 @@ fn map_deduplicate_keys( let num_keys_entries = *next_keys_offset as usize - cur_keys_offset; let num_values_entries = *next_values_offset as usize - cur_values_offset; - let mut keys_mask_one = vec![false; num_keys_entries]; - let mut values_mask_one = vec![false; num_values_entries]; - let key_is_valid = keys_nulls.is_none_or(|buf| buf.is_valid(row_idx)); let value_is_valid = values_nulls.is_none_or(|buf| buf.is_valid(row_idx)); @@ -193,46 +216,193 @@ fn map_deduplicate_keys( return exec_err!( "map_deduplicate_keys: keys and values lists in the same row must have equal lengths" ); - } else if num_keys_entries != 0 { - let mut seen_keys = HashSet::new(); - - for cur_entry_idx in (0..num_keys_entries).rev() { - let key = ScalarValue::try_from_array( - &flat_keys, - cur_keys_offset + cur_entry_idx, - )? - .compacted(); - // Enforce Spark's default `spark.sql.mapKeyDedupPolicy=EXCEPTION`. - // Native LAST_WIN support is deferred to a follow-up. - if seen_keys.contains(&key) { - return exec_err!( - "[DUPLICATED_MAP_KEY] Duplicate map key {key} was found, \ - please check the input data. DataFusion currently only \ - supports the EXCEPTION policy for duplicate map keys; \ - deduplicate the input before calling this function." - ); + } + key_to_output_idx.clear(); + for cur_entry_idx in 0..num_keys_entries { + let key = ScalarValue::try_from_array( + &flat_keys, + cur_keys_offset + cur_entry_idx, + )? + .compacted(); + let abs_value_idx = (cur_values_offset + cur_entry_idx) as i32; + + if let Some(&output_idx) = key_to_output_idx.get(&key) { + if last_value_wins { + value_indices[output_idx] = abs_value_idx; + keys_mask_builder.append_value(false); + continue; } - keys_mask_one[cur_entry_idx] = true; - values_mask_one[cur_entry_idx] = true; - seen_keys.insert(key); - new_last_offset += 1; + return exec_err!( + "[DUPLICATED_MAP_KEY] Duplicate map key {key} was found, \ + please check the input data. To allow duplicate keys with \ + last-value-wins semantics, set \ + `datafusion.execution.map_key_dedup_policy` to `LAST_WIN`." + ); } + keys_mask_builder.append_value(true); + key_to_output_idx.insert(key, value_indices.len()); + value_indices.push(abs_value_idx); + new_last_offset += 1; } } else { - // the result entry is NULL - // both current row offsets are skipped - // keys or values in the current row are marked false in the masks + // The result entry is NULL — no keys/values emitted. Still pad the + // mask so it stays aligned with `flat_keys`. + keys_mask_builder.append_n(num_keys_entries, false); } - keys_mask_builder.append_array(&keys_mask_one.into()); - values_mask_builder.append_array(&values_mask_one.into()); new_offsets.push(new_last_offset); cur_keys_offset += num_keys_entries; cur_values_offset += num_values_entries; } let keys_mask = keys_mask_builder.finish(); - let values_mask = values_mask_builder.finish(); let needed_keys = filter(&flat_keys, &keys_mask)?; - let needed_values = filter(&flat_values, &values_mask)?; + let value_indices_array = Int32Array::from(value_indices); + let needed_values = take(&flat_values, &value_indices_array, None)?; let offsets = OffsetBuffer::new(new_offsets.into()); Ok((needed_keys, needed_values, offsets)) } + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{Int32Array, MapArray, StringArray}; + + fn as_map(array: &ArrayRef) -> &MapArray { + array.as_any().downcast_ref::().expect("MapArray") + } + + fn int32_utf8_inputs( + keys: Vec, + values: Vec>, + ) -> (ArrayRef, ArrayRef) { + let keys: ArrayRef = Arc::new(Int32Array::from(keys)); + let values: ArrayRef = Arc::new(StringArray::from(values)); + (keys, values) + } + + #[test] + fn parse_policy_accepts_both_values_case_insensitively() { + assert!(!parse_map_key_dedup_policy("EXCEPTION").unwrap()); + assert!(!parse_map_key_dedup_policy("exception").unwrap()); + assert!(parse_map_key_dedup_policy("LAST_WIN").unwrap()); + assert!(parse_map_key_dedup_policy("last_win").unwrap()); + } + + #[test] + fn parse_policy_rejects_unknown() { + let err = parse_map_key_dedup_policy("BOGUS").unwrap_err().to_string(); + assert!(err.contains("Unknown map_key_dedup_policy"), "{err}"); + } + + #[test] + fn happy_path_two_rows_no_duplicates() { + let (keys, values) = + int32_utf8_inputs(vec![1, 2, 3], vec![Some("a"), Some("b"), Some("c")]); + let offsets = [0i32, 2, 3]; + + let result = map_from_keys_values_offsets_nulls( + &keys, &values, &offsets, &offsets, None, None, false, + ) + .unwrap(); + + let map = as_map(&result); + assert_eq!(map.len(), 2); + assert_eq!(map.value_offsets(), &[0, 2, 3]); + } + + #[test] + fn single_row_duplicate_errors_under_exception() { + let (keys, values) = + int32_utf8_inputs(vec![1, 2, 1], vec![Some("a"), Some("b"), Some("c")]); + let offsets = [0i32, 3]; + + let err = map_from_keys_values_offsets_nulls( + &keys, &values, &offsets, &offsets, None, None, false, + ) + .unwrap_err() + .to_string(); + + assert!(err.contains("[DUPLICATED_MAP_KEY]"), "{err}"); + assert!(err.contains("map_key_dedup_policy"), "{err}"); + } + + #[test] + fn last_win_keeps_final_occurrence() { + let (keys, values) = int32_utf8_inputs( + vec![1, 2, 1, 3, 2], + vec![Some("a"), Some("b"), Some("c"), Some("d"), Some("e")], + ); + let offsets = [0i32, 5]; + + let result = map_from_keys_values_offsets_nulls( + &keys, &values, &offsets, &offsets, None, None, true, + ) + .unwrap(); + + let map = as_map(&result); + assert_eq!(map.len(), 1); + // 5 entries in, 3 unique keys -> offsets [0, 3] + assert_eq!(map.value_offsets(), &[0, 3]); + } + + #[test] + fn duplicate_in_later_row_still_errors() { + let (keys, values) = int32_utf8_inputs( + vec![1, 2, 1, 1], + vec![Some("a"), Some("b"), Some("x"), Some("y")], + ); + let offsets = [0i32, 2, 4]; + + let err = map_from_keys_values_offsets_nulls( + &keys, &values, &offsets, &offsets, None, None, false, + ) + .unwrap_err() + .to_string(); + + assert!(err.contains("[DUPLICATED_MAP_KEY]"), "{err}"); + } + + #[test] + fn empty_row_does_not_trigger_dedup() { + let (keys, values) = int32_utf8_inputs(vec![], vec![]); + let offsets = [0i32, 0]; + + let result = map_from_keys_values_offsets_nulls( + &keys, &values, &offsets, &offsets, None, None, false, + ) + .unwrap(); + + let map = as_map(&result); + assert_eq!(map.len(), 1); + assert_eq!(map.value_offsets(), &[0, 0]); + } + + #[test] + fn null_row_is_skipped_and_not_checked() { + // Row 0 is NULL (keys null). Its duplicate keys should be ignored; + // row 1 is a clean row. + let (keys, values) = int32_utf8_inputs( + vec![1, 1, 2, 3], + vec![Some("dup-a"), Some("dup-b"), Some("x"), Some("y")], + ); + let offsets = [0i32, 2, 4]; + let keys_nulls = NullBuffer::from(vec![false, true]); + + let result = map_from_keys_values_offsets_nulls( + &keys, + &values, + &offsets, + &offsets, + Some(&keys_nulls), + None, + false, + ) + .unwrap(); + + let map = as_map(&result); + assert_eq!(map.len(), 2); + // First row is NULL (no entries emitted), second row keeps both entries. + assert_eq!(map.value_offsets(), &[0, 0, 2]); + assert!(map.is_null(0)); + assert!(!map.is_null(1)); + } +} diff --git a/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt b/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt index ea9c5cf24f8d3..d34e448395fee 100644 --- a/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt +++ b/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt @@ -148,3 +148,40 @@ query ? SELECT map_from_arrays(arrow_cast(array('a', 'b', 'c'), 'FixedSizeList(3, Utf8)'), arrow_cast(array(1, 2, 3), 'LargeList(Int32)')); ---- {a: 1, b: 2, c: 3} + +# LAST_WIN policy: duplicates are allowed; later occurrences overwrite earlier ones. +statement ok +set datafusion.execution.map_key_dedup_policy = 'LAST_WIN'; + +query ? +SELECT map_from_arrays(array(1, 2, 1), array('a', 'b', 'c')); +---- +{1: c, 2: b} + +query ? +SELECT map_from_arrays(array('k', 'k', 'k'), array(1, 2, 3)); +---- +{k: 3} + +query ? +SELECT map_from_arrays(array(true, false, true), array('a', NULL, 'b')); +---- +{true: b, false: NULL} + +# Multi-row mix under LAST_WIN: clean, duplicate, empty and NULL rows all work. +query ? +SELECT map_from_arrays(a, b) +FROM values + (array[1, 2], array['a', 'b']), + (array[1, 1], array['x', 'y']), + (array[], array[]), + (NULL, NULL) +AS tab(a, b); +---- +{1: a, 2: b} +{1: y} +{} +NULL + +statement ok +set datafusion.execution.map_key_dedup_policy = 'EXCEPTION'; diff --git a/datafusion/sqllogictest/test_files/spark/map/map_from_entries.slt b/datafusion/sqllogictest/test_files/spark/map/map_from_entries.slt index 3acf338d2bb34..66f9f07a4b67c 100644 --- a/datafusion/sqllogictest/test_files/spark/map/map_from_entries.slt +++ b/datafusion/sqllogictest/test_files/spark/map/map_from_entries.slt @@ -176,3 +176,42 @@ FROM values (array[struct(1, 'a'), struct(2, 'b')]), (array[struct(1, 'x'), struct(1, 'y')]) AS tab(data); + +# LAST_WIN policy: duplicates are allowed; later occurrences overwrite earlier ones. +statement ok +set datafusion.execution.map_key_dedup_policy = 'LAST_WIN'; + +query ? +SELECT map_from_entries(array( + struct(true, 'a'), + struct(false, 'b'), + struct(true, 'c'), + struct(false, cast(NULL as string)), + struct(true, 'd') +)); +---- +{true: d, false: NULL} + +query ? +SELECT map_from_entries(array(struct(1, 'a'), struct(2, 'b'), struct(1, 'c'))); +---- +{1: c, 2: b} + +query ? +SELECT map_from_entries(array(struct('k', 1), struct('k', 2), struct('k', 3))); +---- +{k: 3} + +# Multi-row mix under LAST_WIN: clean row + duplicate row both succeed. +query ? +SELECT map_from_entries(data) +FROM values + (array[struct(1, 'a'), struct(2, 'b')]), + (array[struct(1, 'x'), struct(1, 'y')]) +AS tab(data); +---- +{1: a, 2: b} +{1: y} + +statement ok +set datafusion.execution.map_key_dedup_policy = 'EXCEPTION'; diff --git a/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt b/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt index ee29afef4e579..e2a68f26ace16 100644 --- a/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt +++ b/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt @@ -64,7 +64,6 @@ SELECT str_to_map('a=1&b=2&c=3', '&', '='); {a: 1, b: 2, c: 3} # Duplicate keys: EXCEPTION policy (Spark 3.0+ default) -# TODO: Add LAST_WIN policy tests when spark.sql.mapKeyDedupPolicy config is supported statement error Duplicate map key SELECT str_to_map('a:1,b:2,a:3'); @@ -126,4 +125,45 @@ SELECT str_to_map(col1, col2, col3) FROM (VALUES ('a=1,b=2', ',', '='), ('x#9', ---- {a: 1, b: 2} {x: 9} -NULL \ No newline at end of file +NULL + +# LAST_WIN policy: duplicates are allowed; later occurrences overwrite earlier ones. +statement ok +set datafusion.execution.map_key_dedup_policy = 'LAST_WIN'; + +query ? +SELECT str_to_map('a:1,b:2,a:3'); +---- +{a: 3, b: 2} + +query ? +SELECT str_to_map('a:1,a:2,a:3'); +---- +{a: 3} + +# Missing kv_delim: the later occurrence overwrites the value at the key's +# first-seen position. +query ? +SELECT str_to_map('a:1,b:2,a'); +---- +{a: NULL, b: 2} + +# Multi-row: both clean and duplicate rows succeed under LAST_WIN. +query ? +SELECT str_to_map(col) FROM (VALUES ('a:1,b:2'), ('a:3,a:4')) AS t(col); +---- +{a: 1, b: 2} +{a: 4} + +statement ok +set datafusion.execution.map_key_dedup_policy = 'EXCEPTION'; + +# Invalid policy values raise an error with a clear message. +statement ok +set datafusion.execution.map_key_dedup_policy = 'BOGUS'; + +query error DataFusion error: Execution error: Unknown map_key_dedup_policy 'BOGUS', expected 'EXCEPTION' or 'LAST_WIN' +SELECT str_to_map('a:1,b:2'); + +statement ok +set datafusion.execution.map_key_dedup_policy = 'EXCEPTION'; \ No newline at end of file From b98720d639c9bbc87e056df2b61aeb38ca186add Mon Sep 17 00:00:00 2001 From: Sudarshan Date: Thu, 7 May 2026 22:27:56 +0530 Subject: [PATCH 6/9] added MapKeyDedupPolicy enum, dropped parse_map_key_dedup_policy --- datafusion/common/src/config.rs | 53 +++++++++++++++++-- .../spark/src/function/map/map_from_arrays.rs | 7 ++- .../src/function/map/map_from_entries.rs | 8 +-- .../spark/src/function/map/str_to_map.rs | 10 ++-- datafusion/spark/src/function/map/utils.rs | 43 ++------------- .../test_files/information_schema.slt | 2 + .../test_files/spark/map/str_to_map.slt | 12 ++--- 7 files changed, 71 insertions(+), 64 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index f0b9fd13e203f..bed431685bc3e 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -456,6 +456,53 @@ impl Display for SpillCompression { } } +/// Policy for handling duplicate keys in Spark-compatible map-construction +/// functions (`map_from_arrays`, `map_from_entries`, `str_to_map`). Mirrors +/// Spark's [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961). +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] +pub enum MapKeyDedupPolicy { + /// Raise `[DUPLICATED_MAP_KEY]` at runtime on any duplicate key. + #[default] + Exception, + /// Keep the last occurrence of each duplicate key. + LastWin, +} + +impl FromStr for MapKeyDedupPolicy { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s.to_ascii_uppercase().as_str() { + "EXCEPTION" => Ok(Self::Exception), + "LAST_WIN" => Ok(Self::LastWin), + other => Err(DataFusionError::Configuration(format!( + "Invalid MapKeyDedupPolicy: {other}. Expected one of: EXCEPTION, LAST_WIN" + ))), + } + } +} + +impl ConfigField for MapKeyDedupPolicy { + fn visit(&self, v: &mut V, key: &str, description: &'static str) { + v.some(key, self, description) + } + + fn set(&mut self, _: &str, value: &str) -> Result<()> { + *self = MapKeyDedupPolicy::from_str(value)?; + Ok(()) + } +} + +impl Display for MapKeyDedupPolicy { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let str = match self { + Self::Exception => "EXCEPTION", + Self::LastWin => "LAST_WIN", + }; + write!(f, "{str}") + } +} + impl From for Option { fn from(c: SpillCompression) -> Self { match c { @@ -697,11 +744,11 @@ config_namespace! { /// /// Mirrors Spark's /// [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961): - /// - `"EXCEPTION"` (default): raise `[DUPLICATED_MAP_KEY]` at runtime on any duplicate key. - /// - `"LAST_WIN"`: keep the last occurrence of each duplicate key. + /// - `EXCEPTION` (default): raise `[DUPLICATED_MAP_KEY]` at runtime on any duplicate key. + /// - `LAST_WIN`: keep the last occurrence of each duplicate key. /// /// Values are case-insensitive. Only affects functions under `datafusion/spark`. - pub map_key_dedup_policy: String, default = "EXCEPTION".to_string() + pub map_key_dedup_policy: MapKeyDedupPolicy, default = MapKeyDedupPolicy::Exception /// How many bytes to buffer in the probe side of hash joins while the build side is /// concurrently being built. diff --git a/datafusion/spark/src/function/map/map_from_arrays.rs b/datafusion/spark/src/function/map/map_from_arrays.rs index 10c57deafaf87..5a6d1c614716b 100644 --- a/datafusion/spark/src/function/map/map_from_arrays.rs +++ b/datafusion/spark/src/function/map/map_from_arrays.rs @@ -18,11 +18,11 @@ use crate::function::map::utils::{ get_element_type, get_list_offsets, get_list_values, map_from_keys_values_offsets_nulls, map_type_from_key_value_types, - parse_map_key_dedup_policy, }; use arrow::array::{Array, ArrayRef, NullArray}; use arrow::compute::kernels::cast; use arrow::datatypes::{DataType, Field, FieldRef}; +use datafusion_common::config::MapKeyDedupPolicy; use datafusion_common::utils::take_function_args; use datafusion_common::{Result, internal_err}; use datafusion_expr::{ @@ -82,9 +82,8 @@ impl ScalarUDFImpl for MapFromArrays { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - let last_value_wins = parse_map_key_dedup_policy( - &args.config_options.execution.map_key_dedup_policy, - )?; + let last_value_wins = args.config_options.execution.map_key_dedup_policy + == MapKeyDedupPolicy::LastWin; make_scalar_function( move |args: &[ArrayRef]| map_from_arrays_inner(args, last_value_wins), vec![], diff --git a/datafusion/spark/src/function/map/map_from_entries.rs b/datafusion/spark/src/function/map/map_from_entries.rs index c6254151ee41c..82bad0da435b8 100644 --- a/datafusion/spark/src/function/map/map_from_entries.rs +++ b/datafusion/spark/src/function/map/map_from_entries.rs @@ -19,11 +19,12 @@ use std::sync::Arc; use crate::function::map::utils::{ get_list_offsets, get_list_values, map_from_keys_values_offsets_nulls, - map_type_from_key_value_types, parse_map_key_dedup_policy, + map_type_from_key_value_types, }; use arrow::array::{Array, ArrayRef, NullBufferBuilder, StructArray}; use arrow::buffer::NullBuffer; use arrow::datatypes::{DataType, Field, FieldRef}; +use datafusion_common::config::MapKeyDedupPolicy; use datafusion_common::utils::take_function_args; use datafusion_common::{Result, exec_err, internal_err}; use datafusion_expr::{ @@ -101,9 +102,8 @@ impl ScalarUDFImpl for MapFromEntries { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - let last_value_wins = parse_map_key_dedup_policy( - &args.config_options.execution.map_key_dedup_policy, - )?; + let last_value_wins = args.config_options.execution.map_key_dedup_policy + == MapKeyDedupPolicy::LastWin; make_scalar_function( move |args: &[ArrayRef]| map_from_entries_inner(args, last_value_wins), vec![], diff --git a/datafusion/spark/src/function/map/str_to_map.rs b/datafusion/spark/src/function/map/str_to_map.rs index da4a3e8b1d759..b47d5c1850361 100644 --- a/datafusion/spark/src/function/map/str_to_map.rs +++ b/datafusion/spark/src/function/map/str_to_map.rs @@ -32,9 +32,8 @@ use datafusion_expr::{ TypeSignature, Volatility, }; -use crate::function::map::utils::{ - map_type_from_key_value_types, parse_map_key_dedup_policy, -}; +use crate::function::map::utils::map_type_from_key_value_types; +use datafusion_common::config::MapKeyDedupPolicy; const DEFAULT_PAIR_DELIM: &str = ","; const DEFAULT_KV_DELIM: &str = ":"; @@ -103,9 +102,8 @@ impl ScalarUDFImpl for SparkStrToMap { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - let last_value_wins = parse_map_key_dedup_policy( - &args.config_options.execution.map_key_dedup_policy, - )?; + let last_value_wins = args.config_options.execution.map_key_dedup_policy + == MapKeyDedupPolicy::LastWin; let arrays: Vec = ColumnarValue::values_to_arrays(&args.args)?; let result = str_to_map_inner(&arrays, last_value_wins)?; Ok(ColumnarValue::Array(result)) diff --git a/datafusion/spark/src/function/map/utils.rs b/datafusion/spark/src/function/map/utils.rs index 9967cfee2cb6e..899c9f64ca5aa 100644 --- a/datafusion/spark/src/function/map/utils.rs +++ b/datafusion/spark/src/function/map/utils.rs @@ -27,21 +27,6 @@ use arrow::compute::{filter, take}; use arrow::datatypes::{DataType, Field, Fields}; use datafusion_common::{Result, ScalarValue, exec_err}; -/// Parse a `map_key_dedup_policy` config string into a `last_value_wins` bool. -/// -/// Mirrors Spark's `spark.sql.mapKeyDedupPolicy` (case-insensitive): -/// - `"EXCEPTION"` — raise on any duplicate key (default). -/// - `"LAST_WIN"` — keep the last occurrence of each duplicate key. -pub fn parse_map_key_dedup_policy(policy: &str) -> Result { - match policy.to_ascii_uppercase().as_str() { - "EXCEPTION" => Ok(false), - "LAST_WIN" => Ok(true), - other => exec_err!( - "Unknown map_key_dedup_policy '{other}', expected 'EXCEPTION' or 'LAST_WIN'" - ), - } -} - /// Helper function to get element [`DataType`] /// from [`List`](DataType::List)/[`LargeList`](DataType::LargeList)/[`FixedSizeList`](DataType::FixedSizeList)
/// [`Null`](DataType::Null) can be coerced to `ListType`([`Null`](DataType::Null)), so [`Null`](DataType::Null) is returned
@@ -264,11 +249,7 @@ fn map_deduplicate_keys( #[cfg(test)] mod tests { use super::*; - use arrow::array::{Int32Array, MapArray, StringArray}; - - fn as_map(array: &ArrayRef) -> &MapArray { - array.as_any().downcast_ref::().expect("MapArray") - } + use arrow::array::{Int32Array, StringArray}; fn int32_utf8_inputs( keys: Vec, @@ -279,20 +260,6 @@ mod tests { (keys, values) } - #[test] - fn parse_policy_accepts_both_values_case_insensitively() { - assert!(!parse_map_key_dedup_policy("EXCEPTION").unwrap()); - assert!(!parse_map_key_dedup_policy("exception").unwrap()); - assert!(parse_map_key_dedup_policy("LAST_WIN").unwrap()); - assert!(parse_map_key_dedup_policy("last_win").unwrap()); - } - - #[test] - fn parse_policy_rejects_unknown() { - let err = parse_map_key_dedup_policy("BOGUS").unwrap_err().to_string(); - assert!(err.contains("Unknown map_key_dedup_policy"), "{err}"); - } - #[test] fn happy_path_two_rows_no_duplicates() { let (keys, values) = @@ -304,7 +271,7 @@ mod tests { ) .unwrap(); - let map = as_map(&result); + let map = result.as_map(); assert_eq!(map.len(), 2); assert_eq!(map.value_offsets(), &[0, 2, 3]); } @@ -338,7 +305,7 @@ mod tests { ) .unwrap(); - let map = as_map(&result); + let map = result.as_map(); assert_eq!(map.len(), 1); // 5 entries in, 3 unique keys -> offsets [0, 3] assert_eq!(map.value_offsets(), &[0, 3]); @@ -371,7 +338,7 @@ mod tests { ) .unwrap(); - let map = as_map(&result); + let map = result.as_map(); assert_eq!(map.len(), 1); assert_eq!(map.value_offsets(), &[0, 0]); } @@ -398,7 +365,7 @@ mod tests { ) .unwrap(); - let map = as_map(&result); + let map = result.as_map(); assert_eq!(map.len(), 2); // First row is NULL (no entries emitted), second row keeps both entries. assert_eq!(map.value_offsets(), &[0, 0, 2]); diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index b04c78bd2774c..b950708eae608 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -224,6 +224,7 @@ datafusion.execution.hash_join_buffering_capacity 0 datafusion.execution.keep_partition_by_columns false datafusion.execution.listing_table_factory_infer_partitions true datafusion.execution.listing_table_ignore_subdirectory true +datafusion.execution.map_key_dedup_policy EXCEPTION datafusion.execution.max_buffered_batches_per_output_file 2 datafusion.execution.max_spill_file_size_bytes 134217728 datafusion.execution.meta_fetch_concurrency 32 @@ -371,6 +372,7 @@ datafusion.execution.hash_join_buffering_capacity 0 How many bytes to buffer in datafusion.execution.keep_partition_by_columns false Should DataFusion keep the columns used for partition_by in the output RecordBatches datafusion.execution.listing_table_factory_infer_partitions true Should a `ListingTable` created through the `ListingTableFactory` infer table partitions from Hive compliant directories. Defaults to true (partition columns are inferred and will be represented in the table schema). datafusion.execution.listing_table_ignore_subdirectory true Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). +datafusion.execution.map_key_dedup_policy EXCEPTION Policy for handling duplicate keys in Spark-compatible map-construction functions (`map_from_arrays`, `map_from_entries`, `str_to_map`). Mirrors Spark's [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961): - `EXCEPTION` (default): raise `[DUPLICATED_MAP_KEY]` at runtime on any duplicate key. - `LAST_WIN`: keep the last occurrence of each duplicate key. Values are case-insensitive. Only affects functions under `datafusion/spark`. datafusion.execution.max_buffered_batches_per_output_file 2 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption datafusion.execution.max_spill_file_size_bytes 134217728 Maximum size in bytes for individual spill files before rotating to a new file. When operators spill data to disk (e.g., RepartitionExec), they write multiple batches to the same file until this size limit is reached, then rotate to a new file. This reduces syscall overhead compared to one-file-per-batch while preventing files from growing too large. A larger value reduces file creation overhead but may hold more disk space. A smaller value creates more files but allows finer-grained space reclamation as files can be deleted once fully consumed. Now only `RepartitionExec` supports this spill file rotation feature, other spilling operators may create spill files larger than the limit. Default: 128 MB datafusion.execution.meta_fetch_concurrency 32 Number of files to read in parallel when inferring schema and statistics diff --git a/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt b/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt index e2a68f26ace16..aeddc368c3a37 100644 --- a/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt +++ b/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt @@ -158,12 +158,6 @@ SELECT str_to_map(col) FROM (VALUES ('a:1,b:2'), ('a:3,a:4')) AS t(col); statement ok set datafusion.execution.map_key_dedup_policy = 'EXCEPTION'; -# Invalid policy values raise an error with a clear message. -statement ok -set datafusion.execution.map_key_dedup_policy = 'BOGUS'; - -query error DataFusion error: Execution error: Unknown map_key_dedup_policy 'BOGUS', expected 'EXCEPTION' or 'LAST_WIN' -SELECT str_to_map('a:1,b:2'); - -statement ok -set datafusion.execution.map_key_dedup_policy = 'EXCEPTION'; \ No newline at end of file +# Invalid policy values are rejected at SET time with a clear message. +statement error DataFusion error: Invalid or Unsupported Configuration: Invalid MapKeyDedupPolicy: BOGUS\. Expected one of: EXCEPTION, LAST_WIN +set datafusion.execution.map_key_dedup_policy = 'BOGUS'; \ No newline at end of file From ebb1557df3b8d0928a0d4ebdb6dbc50c746eea08 Mon Sep 17 00:00:00 2001 From: Sudarshan Date: Thu, 7 May 2026 23:10:59 +0530 Subject: [PATCH 7/9] added map_key_dedup_policy in config docs --- docs/source/user-guide/configs.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 46039f3c99c27..89d7664026335 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -135,6 +135,7 @@ The following configuration settings are available: | datafusion.execution.enforce_batch_size_in_joins | false | Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower. | | datafusion.execution.objectstore_writer_buffer_size | 10485760 | Size (bytes) of data buffer DataFusion uses when writing output files. This affects the size of the data chunks that are uploaded to remote object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being written, it may be necessary to increase this size to avoid errors from the remote end point. | | datafusion.execution.enable_ansi_mode | false | Whether to enable ANSI SQL mode. The flag is experimental and relevant only for DataFusion Spark built-in functions When `enable_ansi_mode` is set to `true`, the query engine follows ANSI SQL semantics for expressions, casting, and error handling. This means: - **Strict type coercion rules:** implicit casts between incompatible types are disallowed. - **Standard SQL arithmetic behavior:** operations such as division by zero, numeric overflow, or invalid casts raise runtime errors rather than returning `NULL` or adjusted values. - **Consistent ANSI behavior** for string concatenation, comparisons, and `NULL` handling. When `enable_ansi_mode` is `false` (the default), the engine uses a more permissive, non-ANSI mode designed for user convenience and backward compatibility. In this mode: - Implicit casts between types are allowed (e.g., string to integer when possible). - Arithmetic operations are more lenient — for example, `abs()` on the minimum representable integer value returns the input value instead of raising overflow. - Division by zero or invalid casts may return `NULL` instead of failing. # Default `false` — ANSI SQL mode is disabled by default. | +| datafusion.execution.map_key_dedup_policy | EXCEPTION | Policy for handling duplicate keys in Spark-compatible map-construction functions (`map_from_arrays`, `map_from_entries`, `str_to_map`). Mirrors Spark's [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961): - `EXCEPTION` (default): raise `[DUPLICATED_MAP_KEY]` at runtime on any duplicate key. - `LAST_WIN`: keep the last occurrence of each duplicate key. Values are case-insensitive. Only affects functions under `datafusion/spark`. | | datafusion.execution.hash_join_buffering_capacity | 0 | How many bytes to buffer in the probe side of hash joins while the build side is concurrently being built. Without this, hash joins will wait until the full materialization of the build side before polling the probe side. This is useful in scenarios where the query is not completely CPU bounded, allowing to do some early work concurrently and reducing the latency of the query. Note that when hash join buffering is enabled, the probe side will start eagerly polling data, not giving time for the producer side of dynamic filters to produce any meaningful predicate. Queries with dynamic filters might see performance degradation. Disabled by default, set to a number greater than 0 for enabling it. | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | From e0146c08598e4e11f1b2222415218a185236f8bd Mon Sep 17 00:00:00 2001 From: Sudarshan Date: Thu, 7 May 2026 23:43:49 +0530 Subject: [PATCH 8/9] changed map_key_dedup_policy to namespace --- datafusion/common/src/config.rs | 34 +++++++++++++------ .../spark/src/function/map/map_from_arrays.rs | 2 +- .../src/function/map/map_from_entries.rs | 2 +- .../spark/src/function/map/str_to_map.rs | 6 ++-- datafusion/spark/src/function/map/utils.rs | 4 +-- .../test_files/information_schema.slt | 4 +-- .../test_files/spark/map/map_from_arrays.slt | 4 +-- .../test_files/spark/map/map_from_entries.slt | 4 +-- .../test_files/spark/map/str_to_map.slt | 6 ++-- docs/source/user-guide/configs.md | 2 +- 10 files changed, 40 insertions(+), 28 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index bed431685bc3e..d143ee228f122 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -739,17 +739,6 @@ config_namespace! { /// `false` — ANSI SQL mode is disabled by default. pub enable_ansi_mode: bool, default = false - /// Policy for handling duplicate keys in Spark-compatible map-construction - /// functions (`map_from_arrays`, `map_from_entries`, `str_to_map`). - /// - /// Mirrors Spark's - /// [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961): - /// - `EXCEPTION` (default): raise `[DUPLICATED_MAP_KEY]` at runtime on any duplicate key. - /// - `LAST_WIN`: keep the last occurrence of each duplicate key. - /// - /// Values are case-insensitive. Only affects functions under `datafusion/spark`. - pub map_key_dedup_policy: MapKeyDedupPolicy, default = MapKeyDedupPolicy::Exception - /// How many bytes to buffer in the probe side of hash joins while the build side is /// concurrently being built. /// @@ -1519,6 +1508,24 @@ impl<'a> TryFrom<&'a FormatOptions> for arrow::util::display::FormatOptions<'a> } } +config_namespace! { + /// Options controlling DataFusion's Spark-compatibility layer (functions + /// under `datafusion/spark`). Keys here mirror their `spark.sql.*` + /// equivalents in Apache Spark. + pub struct SparkOptions { + /// Policy for handling duplicate keys in Spark-compatible map-construction + /// functions (`map_from_arrays`, `map_from_entries`, `str_to_map`). + /// + /// Mirrors Spark's + /// [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961): + /// - `EXCEPTION` (default): raise `[DUPLICATED_MAP_KEY]` at runtime on any duplicate key. + /// - `LAST_WIN`: keep the last occurrence of each duplicate key. + /// + /// Values are case-insensitive. + pub map_key_dedup_policy: MapKeyDedupPolicy, default = MapKeyDedupPolicy::Exception + } +} + /// A key value pair, with a corresponding description #[derive(Debug, Clone, Hash, PartialEq, Eq)] pub struct ConfigEntry { @@ -1550,6 +1557,8 @@ pub struct ConfigOptions { pub extensions: Extensions, /// Formatting options when printing batches pub format: FormatOptions, + /// Spark-compatibility options (functions under `datafusion/spark`) + pub spark: SparkOptions, } impl ConfigField for ConfigOptions { @@ -1560,6 +1569,7 @@ impl ConfigField for ConfigOptions { self.explain.visit(v, "datafusion.explain", ""); self.sql_parser.visit(v, "datafusion.sql_parser", ""); self.format.visit(v, "datafusion.format", ""); + self.spark.visit(v, "datafusion.spark", ""); } fn set(&mut self, key: &str, value: &str) -> Result<()> { @@ -1572,6 +1582,7 @@ impl ConfigField for ConfigOptions { "explain" => self.explain.set(rem, value), "sql_parser" => self.sql_parser.set(rem, value), "format" => self.format.set(rem, value), + "spark" => self.spark.set(rem, value), _ => _config_err!("Config value \"{key}\" not found on ConfigOptions"), } } @@ -1611,6 +1622,7 @@ impl ConfigField for ConfigOptions { "explain" => self.explain.reset(rem), "sql_parser" => self.sql_parser.reset(rem), "format" => self.format.reset(rem), + "spark" => self.spark.reset(rem), other => _config_err!("Config value \"{other}\" not found on ConfigOptions"), } } diff --git a/datafusion/spark/src/function/map/map_from_arrays.rs b/datafusion/spark/src/function/map/map_from_arrays.rs index 5a6d1c614716b..48b9dc55938ea 100644 --- a/datafusion/spark/src/function/map/map_from_arrays.rs +++ b/datafusion/spark/src/function/map/map_from_arrays.rs @@ -82,7 +82,7 @@ impl ScalarUDFImpl for MapFromArrays { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - let last_value_wins = args.config_options.execution.map_key_dedup_policy + let last_value_wins = args.config_options.spark.map_key_dedup_policy == MapKeyDedupPolicy::LastWin; make_scalar_function( move |args: &[ArrayRef]| map_from_arrays_inner(args, last_value_wins), diff --git a/datafusion/spark/src/function/map/map_from_entries.rs b/datafusion/spark/src/function/map/map_from_entries.rs index 82bad0da435b8..cc4dd1677110d 100644 --- a/datafusion/spark/src/function/map/map_from_entries.rs +++ b/datafusion/spark/src/function/map/map_from_entries.rs @@ -102,7 +102,7 @@ impl ScalarUDFImpl for MapFromEntries { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - let last_value_wins = args.config_options.execution.map_key_dedup_policy + let last_value_wins = args.config_options.spark.map_key_dedup_policy == MapKeyDedupPolicy::LastWin; make_scalar_function( move |args: &[ArrayRef]| map_from_entries_inner(args, last_value_wins), diff --git a/datafusion/spark/src/function/map/str_to_map.rs b/datafusion/spark/src/function/map/str_to_map.rs index b47d5c1850361..e0edf69cf5a9b 100644 --- a/datafusion/spark/src/function/map/str_to_map.rs +++ b/datafusion/spark/src/function/map/str_to_map.rs @@ -50,7 +50,7 @@ const DEFAULT_KV_DELIM: &str = ":"; /// /// # Duplicate Key Handling /// Mirrors Spark's [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/v4.0.0/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4502-L4511), -/// wired through DataFusion's `datafusion.execution.map_key_dedup_policy`: +/// wired through DataFusion's `datafusion.spark.map_key_dedup_policy`: /// - `EXCEPTION` (default): error on duplicate keys. /// - `LAST_WIN`: keep the last occurrence of each duplicate key. #[derive(Debug, PartialEq, Eq, Hash)] @@ -102,7 +102,7 @@ impl ScalarUDFImpl for SparkStrToMap { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - let last_value_wins = args.config_options.execution.map_key_dedup_policy + let last_value_wins = args.config_options.spark.map_key_dedup_policy == MapKeyDedupPolicy::LastWin; let arrays: Vec = ColumnarValue::values_to_arrays(&args.args)?; let result = str_to_map_inner(&arrays, last_value_wins)?; @@ -289,7 +289,7 @@ fn str_to_map_impl<'a, V: StringArrayType<'a> + Copy>( "[DUPLICATED_MAP_KEY] Duplicate map key '{key}' was found, \ please check the input data. To allow duplicate keys with \ last-value-wins semantics, set \ - `datafusion.execution.map_key_dedup_policy` to `LAST_WIN`." + `datafusion.spark.map_key_dedup_policy` to `LAST_WIN`." ); } diff --git a/datafusion/spark/src/function/map/utils.rs b/datafusion/spark/src/function/map/utils.rs index 899c9f64ca5aa..fa6b2a960dabb 100644 --- a/datafusion/spark/src/function/map/utils.rs +++ b/datafusion/spark/src/function/map/utils.rs @@ -119,7 +119,7 @@ pub fn map_type_from_key_value_types( /// - `false` (Spark's default `EXCEPTION`): raise `[DUPLICATED_MAP_KEY]` on any duplicate. /// - `true` (`LAST_WIN`): keep the last occurrence of each duplicate key. /// -/// Callers wire this from `datafusion.execution.map_key_dedup_policy`. +/// Callers wire this from `datafusion.spark.map_key_dedup_policy`. pub fn map_from_keys_values_offsets_nulls( flat_keys: &ArrayRef, flat_values: &ArrayRef, @@ -221,7 +221,7 @@ fn map_deduplicate_keys( "[DUPLICATED_MAP_KEY] Duplicate map key {key} was found, \ please check the input data. To allow duplicate keys with \ last-value-wins semantics, set \ - `datafusion.execution.map_key_dedup_policy` to `LAST_WIN`." + `datafusion.spark.map_key_dedup_policy` to `LAST_WIN`." ); } keys_mask_builder.append_value(true); diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index b950708eae608..5a758062fe0c2 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -224,7 +224,6 @@ datafusion.execution.hash_join_buffering_capacity 0 datafusion.execution.keep_partition_by_columns false datafusion.execution.listing_table_factory_infer_partitions true datafusion.execution.listing_table_ignore_subdirectory true -datafusion.execution.map_key_dedup_policy EXCEPTION datafusion.execution.max_buffered_batches_per_output_file 2 datafusion.execution.max_spill_file_size_bytes 134217728 datafusion.execution.meta_fetch_concurrency 32 @@ -339,6 +338,7 @@ datafusion.runtime.max_temp_directory_size 100G datafusion.runtime.memory_limit unlimited datafusion.runtime.metadata_cache_limit 50M datafusion.runtime.temp_directory NULL +datafusion.spark.map_key_dedup_policy EXCEPTION datafusion.sql_parser.collect_spans false datafusion.sql_parser.default_null_ordering nulls_max datafusion.sql_parser.dialect generic @@ -372,7 +372,6 @@ datafusion.execution.hash_join_buffering_capacity 0 How many bytes to buffer in datafusion.execution.keep_partition_by_columns false Should DataFusion keep the columns used for partition_by in the output RecordBatches datafusion.execution.listing_table_factory_infer_partitions true Should a `ListingTable` created through the `ListingTableFactory` infer table partitions from Hive compliant directories. Defaults to true (partition columns are inferred and will be represented in the table schema). datafusion.execution.listing_table_ignore_subdirectory true Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). -datafusion.execution.map_key_dedup_policy EXCEPTION Policy for handling duplicate keys in Spark-compatible map-construction functions (`map_from_arrays`, `map_from_entries`, `str_to_map`). Mirrors Spark's [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961): - `EXCEPTION` (default): raise `[DUPLICATED_MAP_KEY]` at runtime on any duplicate key. - `LAST_WIN`: keep the last occurrence of each duplicate key. Values are case-insensitive. Only affects functions under `datafusion/spark`. datafusion.execution.max_buffered_batches_per_output_file 2 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption datafusion.execution.max_spill_file_size_bytes 134217728 Maximum size in bytes for individual spill files before rotating to a new file. When operators spill data to disk (e.g., RepartitionExec), they write multiple batches to the same file until this size limit is reached, then rotate to a new file. This reduces syscall overhead compared to one-file-per-batch while preventing files from growing too large. A larger value reduces file creation overhead but may hold more disk space. A smaller value creates more files but allows finer-grained space reclamation as files can be deleted once fully consumed. Now only `RepartitionExec` supports this spill file rotation feature, other spilling operators may create spill files larger than the limit. Default: 128 MB datafusion.execution.meta_fetch_concurrency 32 Number of files to read in parallel when inferring schema and statistics @@ -487,6 +486,7 @@ datafusion.runtime.max_temp_directory_size 100G Maximum temporary file directory datafusion.runtime.memory_limit unlimited Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. datafusion.runtime.metadata_cache_limit 50M Maximum memory to use for file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. datafusion.runtime.temp_directory NULL The path to the temporary file directory. +datafusion.spark.map_key_dedup_policy EXCEPTION Policy for handling duplicate keys in Spark-compatible map-construction functions (`map_from_arrays`, `map_from_entries`, `str_to_map`). Mirrors Spark's [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961): - `EXCEPTION` (default): raise `[DUPLICATED_MAP_KEY]` at runtime on any duplicate key. - `LAST_WIN`: keep the last occurrence of each duplicate key. Values are case-insensitive. datafusion.sql_parser.collect_spans false When set to true, the source locations relative to the original SQL query (i.e. [`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html)) will be collected and recorded in the logical plan nodes. datafusion.sql_parser.default_null_ordering nulls_max Specifies the default null ordering for query results. There are 4 options: - `nulls_max`: Nulls appear last in ascending order. - `nulls_min`: Nulls appear first in ascending order. - `nulls_first`: Nulls always be first in any order. - `nulls_last`: Nulls always be last in any order. By default, `nulls_max` is used to follow Postgres's behavior. postgres rule: datafusion.sql_parser.dialect generic Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks. diff --git a/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt b/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt index d34e448395fee..7e501a31628e1 100644 --- a/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt +++ b/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt @@ -151,7 +151,7 @@ SELECT map_from_arrays(arrow_cast(array('a', 'b', 'c'), 'FixedSizeList(3, Utf8)' # LAST_WIN policy: duplicates are allowed; later occurrences overwrite earlier ones. statement ok -set datafusion.execution.map_key_dedup_policy = 'LAST_WIN'; +set datafusion.spark.map_key_dedup_policy = 'LAST_WIN'; query ? SELECT map_from_arrays(array(1, 2, 1), array('a', 'b', 'c')); @@ -184,4 +184,4 @@ AS tab(a, b); NULL statement ok -set datafusion.execution.map_key_dedup_policy = 'EXCEPTION'; +set datafusion.spark.map_key_dedup_policy = 'EXCEPTION'; diff --git a/datafusion/sqllogictest/test_files/spark/map/map_from_entries.slt b/datafusion/sqllogictest/test_files/spark/map/map_from_entries.slt index 66f9f07a4b67c..21f41f5ad976b 100644 --- a/datafusion/sqllogictest/test_files/spark/map/map_from_entries.slt +++ b/datafusion/sqllogictest/test_files/spark/map/map_from_entries.slt @@ -179,7 +179,7 @@ AS tab(data); # LAST_WIN policy: duplicates are allowed; later occurrences overwrite earlier ones. statement ok -set datafusion.execution.map_key_dedup_policy = 'LAST_WIN'; +set datafusion.spark.map_key_dedup_policy = 'LAST_WIN'; query ? SELECT map_from_entries(array( @@ -214,4 +214,4 @@ AS tab(data); {1: y} statement ok -set datafusion.execution.map_key_dedup_policy = 'EXCEPTION'; +set datafusion.spark.map_key_dedup_policy = 'EXCEPTION'; diff --git a/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt b/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt index aeddc368c3a37..68d856d8545ae 100644 --- a/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt +++ b/datafusion/sqllogictest/test_files/spark/map/str_to_map.slt @@ -129,7 +129,7 @@ NULL # LAST_WIN policy: duplicates are allowed; later occurrences overwrite earlier ones. statement ok -set datafusion.execution.map_key_dedup_policy = 'LAST_WIN'; +set datafusion.spark.map_key_dedup_policy = 'LAST_WIN'; query ? SELECT str_to_map('a:1,b:2,a:3'); @@ -156,8 +156,8 @@ SELECT str_to_map(col) FROM (VALUES ('a:1,b:2'), ('a:3,a:4')) AS t(col); {a: 4} statement ok -set datafusion.execution.map_key_dedup_policy = 'EXCEPTION'; +set datafusion.spark.map_key_dedup_policy = 'EXCEPTION'; # Invalid policy values are rejected at SET time with a clear message. statement error DataFusion error: Invalid or Unsupported Configuration: Invalid MapKeyDedupPolicy: BOGUS\. Expected one of: EXCEPTION, LAST_WIN -set datafusion.execution.map_key_dedup_policy = 'BOGUS'; \ No newline at end of file +set datafusion.spark.map_key_dedup_policy = 'BOGUS'; \ No newline at end of file diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 89d7664026335..58eaffb9bf9f3 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -135,7 +135,6 @@ The following configuration settings are available: | datafusion.execution.enforce_batch_size_in_joins | false | Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower. | | datafusion.execution.objectstore_writer_buffer_size | 10485760 | Size (bytes) of data buffer DataFusion uses when writing output files. This affects the size of the data chunks that are uploaded to remote object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being written, it may be necessary to increase this size to avoid errors from the remote end point. | | datafusion.execution.enable_ansi_mode | false | Whether to enable ANSI SQL mode. The flag is experimental and relevant only for DataFusion Spark built-in functions When `enable_ansi_mode` is set to `true`, the query engine follows ANSI SQL semantics for expressions, casting, and error handling. This means: - **Strict type coercion rules:** implicit casts between incompatible types are disallowed. - **Standard SQL arithmetic behavior:** operations such as division by zero, numeric overflow, or invalid casts raise runtime errors rather than returning `NULL` or adjusted values. - **Consistent ANSI behavior** for string concatenation, comparisons, and `NULL` handling. When `enable_ansi_mode` is `false` (the default), the engine uses a more permissive, non-ANSI mode designed for user convenience and backward compatibility. In this mode: - Implicit casts between types are allowed (e.g., string to integer when possible). - Arithmetic operations are more lenient — for example, `abs()` on the minimum representable integer value returns the input value instead of raising overflow. - Division by zero or invalid casts may return `NULL` instead of failing. # Default `false` — ANSI SQL mode is disabled by default. | -| datafusion.execution.map_key_dedup_policy | EXCEPTION | Policy for handling duplicate keys in Spark-compatible map-construction functions (`map_from_arrays`, `map_from_entries`, `str_to_map`). Mirrors Spark's [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961): - `EXCEPTION` (default): raise `[DUPLICATED_MAP_KEY]` at runtime on any duplicate key. - `LAST_WIN`: keep the last occurrence of each duplicate key. Values are case-insensitive. Only affects functions under `datafusion/spark`. | | datafusion.execution.hash_join_buffering_capacity | 0 | How many bytes to buffer in the probe side of hash joins while the build side is concurrently being built. Without this, hash joins will wait until the full materialization of the build side before polling the probe side. This is useful in scenarios where the query is not completely CPU bounded, allowing to do some early work concurrently and reducing the latency of the query. Note that when hash join buffering is enabled, the probe side will start eagerly polling data, not giving time for the producer side of dynamic filters to produce any meaningful predicate. Queries with dynamic filters might see performance degradation. Disabled by default, set to a number greater than 0 for enabling it. | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | @@ -202,6 +201,7 @@ The following configuration settings are available: | datafusion.format.time_format | %H:%M:%S%.f | Time format for time arrays | | datafusion.format.duration_format | pretty | Duration format. Can be either `"pretty"` or `"ISO8601"` | | datafusion.format.types_info | false | Show types in visual representation batches | +| datafusion.spark.map_key_dedup_policy | EXCEPTION | Policy for handling duplicate keys in Spark-compatible map-construction functions (`map_from_arrays`, `map_from_entries`, `str_to_map`). Mirrors Spark's [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961): - `EXCEPTION` (default): raise `[DUPLICATED_MAP_KEY]` at runtime on any duplicate key. - `LAST_WIN`: keep the last occurrence of each duplicate key. Values are case-insensitive. | You can also reset configuration options to default settings via SQL using the `RESET` command. For example, to set and reset `datafusion.execution.batch_size`: From b76e97113348fb43d715b25d98154a4073c872e7 Mon Sep 17 00:00:00 2001 From: Sudarshan Date: Thu, 7 May 2026 23:51:16 +0530 Subject: [PATCH 9/9] fixed formatting --- datafusion/spark/src/function/map/map_from_arrays.rs | 4 ++-- datafusion/spark/src/function/map/map_from_entries.rs | 4 ++-- datafusion/spark/src/function/map/str_to_map.rs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/datafusion/spark/src/function/map/map_from_arrays.rs b/datafusion/spark/src/function/map/map_from_arrays.rs index 48b9dc55938ea..92dea2720fbfc 100644 --- a/datafusion/spark/src/function/map/map_from_arrays.rs +++ b/datafusion/spark/src/function/map/map_from_arrays.rs @@ -82,8 +82,8 @@ impl ScalarUDFImpl for MapFromArrays { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - let last_value_wins = args.config_options.spark.map_key_dedup_policy - == MapKeyDedupPolicy::LastWin; + let last_value_wins = + args.config_options.spark.map_key_dedup_policy == MapKeyDedupPolicy::LastWin; make_scalar_function( move |args: &[ArrayRef]| map_from_arrays_inner(args, last_value_wins), vec![], diff --git a/datafusion/spark/src/function/map/map_from_entries.rs b/datafusion/spark/src/function/map/map_from_entries.rs index cc4dd1677110d..69ce352694bd1 100644 --- a/datafusion/spark/src/function/map/map_from_entries.rs +++ b/datafusion/spark/src/function/map/map_from_entries.rs @@ -102,8 +102,8 @@ impl ScalarUDFImpl for MapFromEntries { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - let last_value_wins = args.config_options.spark.map_key_dedup_policy - == MapKeyDedupPolicy::LastWin; + let last_value_wins = + args.config_options.spark.map_key_dedup_policy == MapKeyDedupPolicy::LastWin; make_scalar_function( move |args: &[ArrayRef]| map_from_entries_inner(args, last_value_wins), vec![], diff --git a/datafusion/spark/src/function/map/str_to_map.rs b/datafusion/spark/src/function/map/str_to_map.rs index e0edf69cf5a9b..8f4feb130bcb0 100644 --- a/datafusion/spark/src/function/map/str_to_map.rs +++ b/datafusion/spark/src/function/map/str_to_map.rs @@ -102,8 +102,8 @@ impl ScalarUDFImpl for SparkStrToMap { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - let last_value_wins = args.config_options.spark.map_key_dedup_policy - == MapKeyDedupPolicy::LastWin; + let last_value_wins = + args.config_options.spark.map_key_dedup_policy == MapKeyDedupPolicy::LastWin; let arrays: Vec = ColumnarValue::values_to_arrays(&args.args)?; let result = str_to_map_inner(&arrays, last_value_wins)?; Ok(ColumnarValue::Array(result))