Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
cc7d2cc
added support for MapFromEntries
athlcode Apr 18, 2026
28ab5d2
Merge branch 'main' into support/MapFromEntries
athlcode Apr 18, 2026
ba3c933
removed MapKeyDedupPolicy
athlcode Apr 18, 2026
bc2296f
Merge branch 'support/MapFromEntries' of https://github.com/KrishnaSu…
athlcode Apr 18, 2026
85bd989
Merge branch 'main' of https://github.com/apache/datafusion into supp…
athlcode Apr 19, 2026
b2a2102
changed error message
athlcode Apr 19, 2026
db7dddb
revert unwanted formatting, added tests
athlcode Apr 19, 2026
1b35bc3
Merge branch 'main' into support/MapFromEntries
athlcode Apr 20, 2026
8858003
added mapKeyDedupPolicy, LAST_WIN for MapFromEntries
athlcode Apr 27, 2026
ba4e1b2
Merge branch 'support/MapFromEntries' of https://github.com/KrishnaSu…
athlcode Apr 27, 2026
233792f
Merge branch 'main' of https://github.com/apache/datafusion into supp…
athlcode Apr 27, 2026
539b906
Merge branch 'main' into support/MapFromEntries
athlcode Apr 29, 2026
9e2a8c0
Merge branch 'main' of https://github.com/apache/datafusion into supp…
athlcode May 7, 2026
3b7b6d2
Merge branch 'support/MapFromEntries' of https://github.com/KrishnaSu…
athlcode May 7, 2026
b98720d
added MapKeyDedupPolicy enum, dropped parse_map_key_dedup_policy
athlcode May 7, 2026
ebb1557
added map_key_dedup_policy in config docs
athlcode May 7, 2026
e0146c0
changed map_key_dedup_policy to namespace
athlcode May 7, 2026
e14eb59
Merge branch 'main' of https://github.com/apache/datafusion into supp…
athlcode May 7, 2026
b76e971
fixed formatting
athlcode May 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,53 @@ impl Display for SpillCompression {
}
}

/// Policy for handling duplicate keys in Spark-compatible map-construction
/// functions (`map_from_arrays`, `map_from_entries`, `str_to_map`). Mirrors
/// Spark's [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961).
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum MapKeyDedupPolicy {
/// Raise `[DUPLICATED_MAP_KEY]` at runtime on any duplicate key.
#[default]
Exception,
/// Keep the last occurrence of each duplicate key.
LastWin,
}

impl FromStr for MapKeyDedupPolicy {
type Err = DataFusionError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_uppercase().as_str() {
"EXCEPTION" => Ok(Self::Exception),
"LAST_WIN" => Ok(Self::LastWin),
other => Err(DataFusionError::Configuration(format!(
"Invalid MapKeyDedupPolicy: {other}. Expected one of: EXCEPTION, LAST_WIN"
))),
}
}
}

impl ConfigField for MapKeyDedupPolicy {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
v.some(key, self, description)
}

fn set(&mut self, _: &str, value: &str) -> Result<()> {
*self = MapKeyDedupPolicy::from_str(value)?;
Ok(())
}
}

impl Display for MapKeyDedupPolicy {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let str = match self {
Self::Exception => "EXCEPTION",
Self::LastWin => "LAST_WIN",
};
write!(f, "{str}")
}
}

impl From<SpillCompression> for Option<CompressionType> {
fn from(c: SpillCompression) -> Self {
match c {
Expand Down Expand Up @@ -1461,6 +1508,24 @@ impl<'a> TryFrom<&'a FormatOptions> for arrow::util::display::FormatOptions<'a>
}
}

config_namespace! {
/// Options controlling DataFusion's Spark-compatibility layer (functions
/// under `datafusion/spark`). Keys here mirror their `spark.sql.*`
/// equivalents in Apache Spark.
pub struct SparkOptions {
/// Policy for handling duplicate keys in Spark-compatible map-construction
/// functions (`map_from_arrays`, `map_from_entries`, `str_to_map`).
///
/// Mirrors Spark's
/// [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961):
/// - `EXCEPTION` (default): raise `[DUPLICATED_MAP_KEY]` at runtime on any duplicate key.
/// - `LAST_WIN`: keep the last occurrence of each duplicate key.
///
/// Values are case-insensitive.
pub map_key_dedup_policy: MapKeyDedupPolicy, default = MapKeyDedupPolicy::Exception
}
}

/// A key value pair, with a corresponding description
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct ConfigEntry {
Expand Down Expand Up @@ -1492,6 +1557,8 @@ pub struct ConfigOptions {
pub extensions: Extensions,
/// Formatting options when printing batches
pub format: FormatOptions,
/// Spark-compatibility options (functions under `datafusion/spark`)
pub spark: SparkOptions,
}

impl ConfigField for ConfigOptions {
Expand All @@ -1502,6 +1569,7 @@ impl ConfigField for ConfigOptions {
self.explain.visit(v, "datafusion.explain", "");
self.sql_parser.visit(v, "datafusion.sql_parser", "");
self.format.visit(v, "datafusion.format", "");
self.spark.visit(v, "datafusion.spark", "");
}

fn set(&mut self, key: &str, value: &str) -> Result<()> {
Expand All @@ -1514,6 +1582,7 @@ impl ConfigField for ConfigOptions {
"explain" => self.explain.set(rem, value),
"sql_parser" => self.sql_parser.set(rem, value),
"format" => self.format.set(rem, value),
"spark" => self.spark.set(rem, value),
_ => _config_err!("Config value \"{key}\" not found on ConfigOptions"),
}
}
Expand Down Expand Up @@ -1553,6 +1622,7 @@ impl ConfigField for ConfigOptions {
"explain" => self.explain.reset(rem),
"sql_parser" => self.sql_parser.reset(rem),
"format" => self.format.reset(rem),
"spark" => self.spark.reset(rem),
other => _config_err!("Config value \"{other}\" not found on ConfigOptions"),
}
}
Expand Down
11 changes: 9 additions & 2 deletions datafusion/spark/src/function/map/map_from_arrays.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::function::map::utils::{
use arrow::array::{Array, ArrayRef, NullArray};
use arrow::compute::kernels::cast;
use arrow::datatypes::{DataType, Field, FieldRef};
use datafusion_common::config::MapKeyDedupPolicy;
use datafusion_common::utils::take_function_args;
use datafusion_common::{Result, internal_err};
use datafusion_expr::{
Expand Down Expand Up @@ -81,11 +82,16 @@ impl ScalarUDFImpl for MapFromArrays {
}

fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
make_scalar_function(map_from_arrays_inner, vec![])(&args.args)
let last_value_wins =
args.config_options.spark.map_key_dedup_policy == MapKeyDedupPolicy::LastWin;
make_scalar_function(
move |args: &[ArrayRef]| map_from_arrays_inner(args, last_value_wins),
vec![],
)(&args.args)
}
}

fn map_from_arrays_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
fn map_from_arrays_inner(args: &[ArrayRef], last_value_wins: bool) -> Result<ArrayRef> {
let [keys, values] = take_function_args("map_from_arrays", args)?;

if *keys.data_type() == DataType::Null || *values.data_type() == DataType::Null {
Expand All @@ -105,6 +111,7 @@ fn map_from_arrays_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
&get_list_offsets(values)?,
keys.nulls(),
values.nulls(),
last_value_wins,
)
}

Expand Down
11 changes: 9 additions & 2 deletions datafusion/spark/src/function/map/map_from_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::function::map::utils::{
use arrow::array::{Array, ArrayRef, NullBufferBuilder, StructArray};
use arrow::buffer::NullBuffer;
use arrow::datatypes::{DataType, Field, FieldRef};
use datafusion_common::config::MapKeyDedupPolicy;
use datafusion_common::utils::take_function_args;
use datafusion_common::{Result, exec_err, internal_err};
use datafusion_expr::{
Expand Down Expand Up @@ -101,11 +102,16 @@ impl ScalarUDFImpl for MapFromEntries {
}

fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
make_scalar_function(map_from_entries_inner, vec![])(&args.args)
let last_value_wins =
args.config_options.spark.map_key_dedup_policy == MapKeyDedupPolicy::LastWin;
make_scalar_function(
move |args: &[ArrayRef]| map_from_entries_inner(args, last_value_wins),
vec![],
)(&args.args)
}
}

fn map_from_entries_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
fn map_from_entries_inner(args: &[ArrayRef], last_value_wins: bool) -> Result<ArrayRef> {
let [entries] = take_function_args("map_from_entries", args)?;
let entries_offsets = get_list_offsets(entries)?;
let entries_values = get_list_values(entries)?;
Expand Down Expand Up @@ -148,6 +154,7 @@ fn map_from_entries_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
&entries_offsets,
None,
res_nulls.as_ref(),
last_value_wins,
)
}

Expand Down
116 changes: 81 additions & 35 deletions datafusion/spark/src/function/map/str_to_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use arrow::array::{
Expand All @@ -33,6 +33,7 @@ use datafusion_expr::{
};

use crate::function::map::utils::map_type_from_key_value_types;
use datafusion_common::config::MapKeyDedupPolicy;

const DEFAULT_PAIR_DELIM: &str = ",";
const DEFAULT_KV_DELIM: &str = ":";
Expand All @@ -48,11 +49,10 @@ const DEFAULT_KV_DELIM: &str = ":";
/// - keyValueDelim: Delimiter between key and value (default: ':')
///
/// # Duplicate Key Handling
/// Uses EXCEPTION behavior (Spark 3.0+ default): errors on duplicate keys.
/// See `spark.sql.mapKeyDedupPolicy`:
/// <https://github.com/apache/spark/blob/v4.0.0/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4502-L4511>
///
/// TODO: Support configurable `spark.sql.mapKeyDedupPolicy` (LAST_WIN) in a follow-up PR.
/// Mirrors Spark's [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/v4.0.0/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4502-L4511),
/// wired through DataFusion's `datafusion.spark.map_key_dedup_policy`:
/// - `EXCEPTION` (default): error on duplicate keys.
/// - `LAST_WIN`: keep the last occurrence of each duplicate key.
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct SparkStrToMap {
signature: Signature,
Expand Down Expand Up @@ -102,22 +102,32 @@ impl ScalarUDFImpl for SparkStrToMap {
}

fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let last_value_wins =
args.config_options.spark.map_key_dedup_policy == MapKeyDedupPolicy::LastWin;
let arrays: Vec<ArrayRef> = ColumnarValue::values_to_arrays(&args.args)?;
let result = str_to_map_inner(&arrays)?;
let result = str_to_map_inner(&arrays, last_value_wins)?;
Ok(ColumnarValue::Array(result))
}
}

fn str_to_map_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
fn str_to_map_inner(args: &[ArrayRef], last_value_wins: bool) -> Result<ArrayRef> {
match args.len() {
1 => match args[0].data_type() {
DataType::Utf8 => str_to_map_impl(as_string_array(&args[0])?, None, None),
DataType::LargeUtf8 => {
str_to_map_impl(as_large_string_array(&args[0])?, None, None)
}
DataType::Utf8View => {
str_to_map_impl(as_string_view_array(&args[0])?, None, None)
DataType::Utf8 => {
str_to_map_impl(as_string_array(&args[0])?, None, None, last_value_wins)
}
DataType::LargeUtf8 => str_to_map_impl(
as_large_string_array(&args[0])?,
None,
None,
last_value_wins,
),
DataType::Utf8View => str_to_map_impl(
as_string_view_array(&args[0])?,
None,
None,
last_value_wins,
),
other => exec_err!(
"Unsupported data type {other:?} for str_to_map, \
expected Utf8, LargeUtf8, or Utf8View"
Expand All @@ -128,16 +138,19 @@ fn str_to_map_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
as_string_array(&args[0])?,
Some(as_string_array(&args[1])?),
None,
last_value_wins,
),
(DataType::LargeUtf8, DataType::LargeUtf8) => str_to_map_impl(
as_large_string_array(&args[0])?,
Some(as_large_string_array(&args[1])?),
None,
last_value_wins,
),
(DataType::Utf8View, DataType::Utf8View) => str_to_map_impl(
as_string_view_array(&args[0])?,
Some(as_string_view_array(&args[1])?),
None,
last_value_wins,
),
(t1, t2) => exec_err!(
"Unsupported data types ({t1:?}, {t2:?}) for str_to_map, \
Expand All @@ -153,19 +166,22 @@ fn str_to_map_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
as_string_array(&args[0])?,
Some(as_string_array(&args[1])?),
Some(as_string_array(&args[2])?),
last_value_wins,
),
(DataType::LargeUtf8, DataType::LargeUtf8, DataType::LargeUtf8) => {
str_to_map_impl(
as_large_string_array(&args[0])?,
Some(as_large_string_array(&args[1])?),
Some(as_large_string_array(&args[2])?),
last_value_wins,
)
}
(DataType::Utf8View, DataType::Utf8View, DataType::Utf8View) => {
str_to_map_impl(
as_string_view_array(&args[0])?,
Some(as_string_view_array(&args[1])?),
Some(as_string_view_array(&args[2])?),
last_value_wins,
)
}
(t1, t2, t3) => exec_err!(
Expand All @@ -181,6 +197,7 @@ fn str_to_map_impl<'a, V: StringArrayType<'a> + Copy>(
text_array: V,
pair_delim_array: Option<V>,
kv_delim_array: Option<V>,
last_value_wins: bool,
) -> Result<ArrayRef> {
let num_rows = text_array.len();

Expand All @@ -207,6 +224,10 @@ fn str_to_map_impl<'a, V: StringArrayType<'a> + Copy>(
);

let mut seen_keys = HashSet::new();
// LAST_WIN buffers pairs to support in-place value overwrite at the key's
// first-seen position — matches Spark's `ArrayBasedMapBuilder`.
let mut pairs: Vec<(&str, Option<&str>)> = Vec::new();
let mut key_positions: HashMap<&str, usize> = HashMap::new();
for row_idx in 0..num_rows {
if combined_nulls.as_ref().is_some_and(|n| n.is_null(row_idx)) {
map_builder.append(false)?;
Expand All @@ -227,31 +248,56 @@ fn str_to_map_impl<'a, V: StringArrayType<'a> + Copy>(
continue;
}

seen_keys.clear();
for pair in text.split(pair_delim) {
if pair.is_empty() {
continue;
if last_value_wins {
pairs.clear();
key_positions.clear();
for pair in text.split(pair_delim) {
if pair.is_empty() {
continue;
}
let mut kv_iter = pair.splitn(2, kv_delim);
let key = kv_iter.next().unwrap_or("");
let value = kv_iter.next();
match key_positions.get(key) {
Some(&idx) => pairs[idx].1 = value,
None => {
key_positions.insert(key, pairs.len());
pairs.push((key, value));
}
}
}
for (key, value) in &pairs {
map_builder.keys().append_value(key);
match value {
Some(v) => map_builder.values().append_value(v),
None => map_builder.values().append_null(),
}
}
} else {
seen_keys.clear();
for pair in text.split(pair_delim) {
if pair.is_empty() {
continue;
}

let mut kv_iter = pair.splitn(2, kv_delim);
let key = kv_iter.next().unwrap_or("");
let value = kv_iter.next();
let mut kv_iter = pair.splitn(2, kv_delim);
let key = kv_iter.next().unwrap_or("");
let value = kv_iter.next();

// TODO: Support LAST_WIN policy via spark.sql.mapKeyDedupPolicy config
// EXCEPTION policy: error on duplicate keys (Spark 3.0+ default)
if !seen_keys.insert(key) {
return exec_err!(
"Duplicate map key '{key}' was found, please check the input data. \
If you want to remove the duplicated keys, you can set \
spark.sql.mapKeyDedupPolicy to \"LAST_WIN\" so that the key \
inserted at last takes precedence."
);
}
if !seen_keys.insert(key) {
return exec_err!(
"[DUPLICATED_MAP_KEY] Duplicate map key '{key}' was found, \
please check the input data. To allow duplicate keys with \
last-value-wins semantics, set \
`datafusion.spark.map_key_dedup_policy` to `LAST_WIN`."
);
}

map_builder.keys().append_value(key);
match value {
Some(v) => map_builder.values().append_value(v),
None => map_builder.values().append_null(),
map_builder.keys().append_value(key);
match value {
Some(v) => map_builder.values().append_value(v),
None => map_builder.values().append_null(),
}
}
}
map_builder.append(true)?;
Expand Down
Loading
Loading