diff --git a/datafusion/functions/benches/replace.rs b/datafusion/functions/benches/replace.rs index 7ad198995a028..b117968bad039 100644 --- a/datafusion/functions/benches/replace.rs +++ b/datafusion/functions/benches/replace.rs @@ -162,6 +162,36 @@ fn criterion_benchmark(c: &mut Criterion) { } } + // Empty-`from` path: insert `to` between every char of the input and at + // both ends. + if size == 1024 { + for &str_len in &[32_usize, 128] { + let args = create_args::(size, str_len, false, 0, 3, 0.0); + group.bench_function( + format!("replace_string_empty_from [size={size}, str_len={str_len}]"), + |b| { + b.iter(|| { + let args_cloned = args.clone(); + black_box(invoke_replace_with_args(args_cloned, size)) + }) + }, + ); + + let args = create_args::(size, str_len, true, 0, 3, 0.0); + group.bench_function( + format!( + "replace_string_view_empty_from [size={size}, str_len={str_len}]" + ), + |b| { + b.iter(|| { + let args_cloned = args.clone(); + black_box(invoke_replace_with_args(args_cloned, size)) + }) + }, + ); + } + } + group.finish(); } } diff --git a/datafusion/functions/src/string/replace.rs b/datafusion/functions/src/string/replace.rs index b980baeb9975e..769727999ea05 100644 --- a/datafusion/functions/src/string/replace.rs +++ b/datafusion/functions/src/string/replace.rs @@ -21,7 +21,9 @@ use arrow::array::{Array, ArrayRef, OffsetSizeTrait}; use arrow::buffer::NullBuffer; use arrow::datatypes::DataType; -use crate::strings::GenericStringArrayBuilder; +use crate::strings::{ + BulkNullStringArrayBuilder, GenericStringArrayBuilder, StringWriter, +}; use crate::utils::{make_scalar_function, utf8_to_str_type}; use datafusion_common::cast::{as_generic_string_array, as_string_view_array}; use datafusion_common::types::logical_string; @@ -164,7 +166,6 @@ fn replace_view(args: &[ArrayRef]) -> Result { let len = string_array.len(); let mut builder = GenericStringArrayBuilder::::with_capacity(len, 0); - let mut buffer = String::new(); let nulls = NullBuffer::union_many([ string_array.nulls(), from_array.nulls(), @@ -184,9 +185,7 @@ fn replace_view(args: &[ArrayRef]) -> Result { let string = unsafe { string_array.value_unchecked(i) }; let from = unsafe { from_array.value_unchecked(i) }; let to = unsafe { to_array.value_unchecked(i) }; - buffer.clear(); - replace_into_string(&mut buffer, string, from, to); - builder.append_value(&buffer); + apply_replace(&mut builder, string, from, to); } } else { for i in 0..len { @@ -194,9 +193,7 @@ fn replace_view(args: &[ArrayRef]) -> Result { let string = unsafe { string_array.value_unchecked(i) }; let from = unsafe { from_array.value_unchecked(i) }; let to = unsafe { to_array.value_unchecked(i) }; - buffer.clear(); - replace_into_string(&mut buffer, string, from, to); - builder.append_value(&buffer); + apply_replace(&mut builder, string, from, to); } } @@ -212,7 +209,6 @@ fn replace(args: &[ArrayRef]) -> Result { let len = string_array.len(); let mut builder = GenericStringArrayBuilder::::with_capacity(len, 0); - let mut buffer = String::new(); let nulls = NullBuffer::union_many([ string_array.nulls(), from_array.nulls(), @@ -232,9 +228,7 @@ fn replace(args: &[ArrayRef]) -> Result { let string = unsafe { string_array.value_unchecked(i) }; let from = unsafe { from_array.value_unchecked(i) }; let to = unsafe { to_array.value_unchecked(i) }; - buffer.clear(); - replace_into_string(&mut buffer, string, from, to); - builder.append_value(&buffer); + apply_replace(&mut builder, string, from, to); } } else { for i in 0..len { @@ -242,61 +236,67 @@ fn replace(args: &[ArrayRef]) -> Result { let string = unsafe { string_array.value_unchecked(i) }; let from = unsafe { from_array.value_unchecked(i) }; let to = unsafe { to_array.value_unchecked(i) }; - buffer.clear(); - replace_into_string(&mut buffer, string, from, to); - builder.append_value(&buffer); + apply_replace(&mut builder, string, from, to); } } Ok(Arc::new(builder.finish(nulls)?) as ArrayRef) } -/// Helper function to perform string replacement into a reusable String buffer #[inline] -fn replace_into_string(buffer: &mut String, string: &str, from: &str, to: &str) { - if from.is_empty() { - // When from is empty, insert 'to' at the beginning, between each character, and at the end - // This matches the behavior of str::replace() - buffer.push_str(to); - for ch in string.chars() { - buffer.push(ch); - buffer.push_str(to); - } - return; - } - - // Fast path for replacing a single ASCII character with another single ASCII character. - // Extends the buffer's underlying Vec directly, for performance. - if let ([from_byte], [to_byte]) = (from.as_bytes(), to.as_bytes()) +fn apply_replace( + builder: &mut B, + string: &str, + from: &str, + to: &str, +) { + // Hot path: single ASCII byte → single ASCII byte. An ASCII byte (< 0x80) + // cannot appear inside a multi-byte UTF-8 sequence, so any multi-byte + // sequences in `string` pass through unchanged and output stays valid + // UTF-8. + if let (&[from_byte], &[to_byte]) = (from.as_bytes(), to.as_bytes()) && from_byte.is_ascii() && to_byte.is_ascii() { - // SAFETY: Replacing an ASCII byte with another ASCII byte preserves UTF-8 validity. + // SAFETY: see the contract above. unsafe { - buffer.as_mut_vec().extend( - string - .as_bytes() - .iter() - .map(|&b| if b == *from_byte { *to_byte } else { b }), - ); + builder.append_byte_map(string.as_bytes(), |b| { + if b == from_byte { to_byte } else { b } + }); } return; } + if from.is_empty() { + // Empty `from`: insert `to` before each character and at both ends. + builder.append_with(|w| { + w.write_str(to); + for ch in string.chars() { + w.write_char(ch); + w.write_str(to); + } + }); + return; + } + + builder.append_with(|w| replace_into_writer(w, string, from, to)); +} + +#[inline] +fn replace_into_writer(w: &mut W, string: &str, from: &str, to: &str) { let mut last_end = 0; for (start, _part) in string.match_indices(from) { - buffer.push_str(&string[last_end..start]); - buffer.push_str(to); + w.write_str(&string[last_end..start]); + w.write_str(to); last_end = start + from.len(); } - buffer.push_str(&string[last_end..]); + w.write_str(&string[last_end..]); } #[cfg(test)] mod tests { use super::*; use crate::utils::test::test_function; - use arrow::array::Array; use arrow::array::LargeStringArray; use arrow::array::StringArray; use arrow::datatypes::DataType::{LargeUtf8, Utf8}; diff --git a/datafusion/functions/src/strings.rs b/datafusion/functions/src/strings.rs index 872539854a9eb..1d02def4765cc 100644 --- a/datafusion/functions/src/strings.rs +++ b/datafusion/functions/src/strings.rs @@ -432,20 +432,20 @@ impl ConcatLargeStringBuilder { // Bulk-nulls builders // // These builders are similar to Arrow's `GenericStringBuilder` and -// `StringViewBuilder`, except that callers must pass the NULL bitmap to -// `finish()`, rather than maintaining it iteratively (per-row). For callers -// that can compute the NULL bitmap in bulk (which is true of many -// string-related UDFs), this can be significantly more efficient. +// `StringViewBuilder` but tuned for string UDFs along two axes: // -// For a row known to be null, call `append_placeholder` to advance the row -// count without touching the value buffer; the caller MUST ensure that the -// corresponding bit is cleared (0 = null) in the null buffer passed to -// `finish`. +// * Bulk-NULL handling. The NULL bitmap is passed to `finish()` rather than +// maintained per-row. Many string UDFs can compute the bitmap in bulk, +// where this is significantly more efficient. +// * Closure-based row emission. Beyond `append_value(&str)`, the builders +// expose `append_with` (fragments written into the builder via a +// `StringWriter`) and `append_byte_map` (byte-to-byte mapping of an input +// slice), letting UDFs emit a row without first assembling it in a scratch +// `String`. // ---------------------------------------------------------------------------- -/// Builder for a [`GenericStringArray`] that defers null tracking to -/// `finish`. Instantiate with `O = i32` for [`StringArray`] (Utf8) or -/// `O = i64` for [`LargeStringArray`] (LargeUtf8). +/// Builder for a [`GenericStringArray`]. Instantiate with `O = i32` for +/// [`StringArray`] (Utf8) or `O = i64` for [`LargeStringArray`] (LargeUtf8). pub(crate) struct GenericStringArrayBuilder { offsets_buffer: MutableBuffer, value_buffer: MutableBuffer, @@ -470,7 +470,7 @@ impl GenericStringArrayBuilder { } } - /// Append `value` as the next row. + /// See [`BulkNullStringArrayBuilder::append_value`]. /// /// # Panics /// @@ -483,8 +483,7 @@ impl GenericStringArrayBuilder { self.offsets_buffer.push(next_offset); } - /// Append an empty placeholder row. The corresponding slot must be masked - /// as null by the null buffer passed to `finish`. + /// See [`BulkNullStringArrayBuilder::append_placeholder`]. #[inline] pub fn append_placeholder(&mut self) { let next_offset = @@ -493,6 +492,43 @@ impl GenericStringArrayBuilder { self.placeholder_count += 1; } + /// See [`BulkNullStringArrayBuilder::append_byte_map`]. + /// + /// # Safety + /// + /// The bytes produced by applying `map` to each byte of `src`, in order, + /// must form valid UTF-8. + /// + /// # Panics + /// + /// Panics if the cumulative byte length exceeds `O::MAX`. + #[inline] + pub unsafe fn append_byte_map u8>(&mut self, src: &[u8], mut map: F) { + self.value_buffer.extend(src.iter().map(|&b| map(b))); + let next_offset = + O::from_usize(self.value_buffer.len()).expect("byte array offset overflow"); + self.offsets_buffer.push(next_offset); + } + + /// See [`BulkNullStringArrayBuilder::append_with`]. + /// + /// # Panics + /// + /// Panics if the cumulative byte length exceeds `O::MAX`. + #[inline] + pub fn append_with(&mut self, f: F) + where + F: FnOnce(&mut GenericStringWriter<'_>), + { + let mut writer = GenericStringWriter { + value_buffer: &mut self.value_buffer, + }; + f(&mut writer); + let next_offset = + O::from_usize(self.value_buffer.len()).expect("byte array offset overflow"); + self.offsets_buffer.push(next_offset); + } + /// Finalize into a [`GenericStringArray`] using the caller-supplied /// null buffer. /// @@ -538,10 +574,81 @@ pub(crate) const STRING_VIEW_INIT_BLOCK_SIZE: u32 = 8 * 1024; /// grows to; matches Arrow's `GenericByteViewBuilder` default. pub(crate) const STRING_VIEW_MAX_BLOCK_SIZE: u32 = 2 * 1024 * 1024; -/// Builder for a [`StringViewArray`] that defers null tracking to `finish`. +/// Append-only writer handed to closures passed to `append_with`. +pub(crate) trait StringWriter { + fn write_str(&mut self, s: &str); + fn write_char(&mut self, c: char); +} + +/// [`StringWriter`] for [`GenericStringArrayBuilder`]. Writes go straight to +/// the value buffer. +pub(crate) struct GenericStringWriter<'a> { + value_buffer: &'a mut MutableBuffer, +} + +impl StringWriter for GenericStringWriter<'_> { + #[inline(always)] + fn write_str(&mut self, s: &str) { + push_bytes_to_mutable_buffer(self.value_buffer, s.as_bytes()); + } + + #[inline(always)] + fn write_char(&mut self, c: char) { + push_char_to_mutable_buffer(self.value_buffer, c); + } +} + +/// Write `bytes` into `value_buffer`. For repeated small writes, +/// MutableBuffer::extend_from_slice can be slow (memcpy per call), so we extend +/// the buffer here directly and force inlining. +#[inline(always)] +fn push_bytes_to_mutable_buffer(value_buffer: &mut MutableBuffer, bytes: &[u8]) { + let n = bytes.len(); + let old_len = value_buffer.len(); + value_buffer.reserve(n); + + // SAFETY: we reserved `n` bytes; the source and destination do not alias + // because `bytes` was passed in by the caller and `value_buffer` is owned. + unsafe { + let dst = value_buffer.as_mut_ptr().add(old_len); + let src = bytes.as_ptr(); + match n { + 0 => {} + 1 => std::ptr::copy_nonoverlapping(src, dst, 1), + 2 => std::ptr::copy_nonoverlapping(src, dst, 2), + 3 => std::ptr::copy_nonoverlapping(src, dst, 3), + 4 => std::ptr::copy_nonoverlapping(src, dst, 4), + 5 => std::ptr::copy_nonoverlapping(src, dst, 5), + 6 => std::ptr::copy_nonoverlapping(src, dst, 6), + 7 => std::ptr::copy_nonoverlapping(src, dst, 7), + 8 => std::ptr::copy_nonoverlapping(src, dst, 8), + _ => std::ptr::copy_nonoverlapping(src, dst, n), + } + value_buffer.set_len(old_len + n); + } +} + +#[inline(always)] +fn push_char_to_mutable_buffer(value_buffer: &mut MutableBuffer, c: char) { + let len = c.len_utf8(); + let old_len = value_buffer.len(); + value_buffer.reserve(len); + + // SAFETY: we reserved `len` bytes above, write valid UTF-8 into those + // bytes, then update the initialized length to include them. + unsafe { + let dst = value_buffer.as_mut_ptr().add(old_len); + if len == 1 { + *dst = c as u8; + } else { + c.encode_utf8(std::slice::from_raw_parts_mut(dst, len)); + } + value_buffer.set_len(old_len + len); + } +} + +/// Builder for a [`StringViewArray`]. /// -/// Modeled on Arrow's [`arrow::array::builder::StringViewBuilder`] but -/// without per-row [`arrow::array::builder::NullBufferBuilder`] maintenance. /// Short strings (≤ 12 bytes) are inlined into the view itself; long strings /// are appended into an in-progress data block. When the in-progress block /// fills up it is flushed into `completed` and a new block — double the size @@ -573,7 +680,7 @@ impl StringViewArrayBuilder { self.block_size } - /// Append `value` as the next row. + /// See [`BulkNullStringArrayBuilder::append_value`]. /// /// # Panics /// @@ -599,27 +706,13 @@ impl StringViewArrayBuilder { self.in_progress.reserve(to_reserve); } - let buffer_index: u32 = i32::try_from(self.completed.len()) - .expect("buffer count exceeds i32::MAX") - as u32; let offset: u32 = i32::try_from(self.in_progress.len()) .expect("offset exceeds i32::MAX") as u32; self.in_progress.extend_from_slice(v); - - // Build the ByteView inline rather than going through `make_view`, - // which is marked as `[inline(never)]`. - let view = ByteView { - length, - // SAFETY: length > 12 here, so v has at least 4 bytes. - prefix: u32::from_le_bytes(v[0..4].try_into().unwrap()), - buffer_index, - offset, - }; - self.views.push(view.into()); + self.views.push(self.make_long_view(length, offset, v)); } - /// Append an empty placeholder row. The corresponding slot must be - /// masked as null by the null buffer passed to `finish`. + /// See [`BulkNullStringArrayBuilder::append_placeholder`]. #[inline] pub fn append_placeholder(&mut self) { // Zero-length inline view — `length` field is 0, no buffer ref. @@ -627,6 +720,126 @@ impl StringViewArrayBuilder { self.placeholder_count += 1; } + /// Ensure the in-progress block has room for `length` more bytes, + /// flushing the current block and starting a new (doubled) one if not. + /// Caller must invoke this only when no bytes of the current row are + /// yet in `in_progress` — flushing mid-row would orphan partial data. + #[inline] + fn ensure_long_capacity(&mut self, length: u32) { + let required_cap = self.in_progress.len() + length as usize; + if self.in_progress.capacity() < required_cap { + self.flush_in_progress(); + let to_reserve = (length as usize).max(self.next_block_size() as usize); + self.in_progress.reserve(to_reserve); + } + } + + /// Encode a long-form view referencing `length` bytes already written + /// into the in-progress block at `offset`. `prefix_bytes` is the row's + /// data slice (or any slice starting with the row's first 4 bytes). + /// + /// Built inline rather than going through Arrow's `make_view`: that + /// function is `[inline(never)]` and has to handle short strings, so + /// building the view here ourselves is faster. + #[inline] + fn make_long_view(&self, length: u32, offset: u32, prefix_bytes: &[u8]) -> u128 { + let buffer_index: u32 = i32::try_from(self.completed.len()) + .expect("buffer count exceeds i32::MAX") + as u32; + ByteView { + length, + // length > 12, so prefix_bytes has at least 4 bytes. + prefix: u32::from_le_bytes(prefix_bytes[..4].try_into().unwrap()), + buffer_index, + offset, + } + .into() + } + + /// See [`BulkNullStringArrayBuilder::append_byte_map`]. + /// + /// # Safety + /// + /// The bytes produced by applying `map` to each byte of `src`, in order, + /// must form valid UTF-8. + /// + /// # Panics + /// + /// Panics under the same conditions as [`Self::append_value`]: if + /// `src.len()`, the in-progress buffer offset, or the number of completed + /// buffers exceeds `i32::MAX`. + #[inline] + pub unsafe fn append_byte_map u8>(&mut self, src: &[u8], mut map: F) { + let length: u32 = + i32::try_from(src.len()).expect("value length exceeds i32::MAX") as u32; + if length <= 12 { + let mut bytes = [0u8; 12]; + for (d, &b) in bytes[..src.len()].iter_mut().zip(src) { + *d = map(b); + } + self.views.push(make_view(&bytes[..src.len()], 0, 0)); + return; + } + + self.ensure_long_capacity(length); + + let cursor = self.in_progress.len(); + let offset: u32 = i32::try_from(cursor).expect("offset exceeds i32::MAX") as u32; + self.in_progress.extend(src.iter().map(|&b| map(b))); + self.views + .push(self.make_long_view(length, offset, &self.in_progress[cursor..])); + } + + /// See [`BulkNullStringArrayBuilder::append_with`]. + /// + /// # Panics + /// + /// Panics under the same conditions as [`Self::append_value`]: if the + /// row's byte length, the in-progress buffer offset, or the number of + /// completed buffers exceeds `i32::MAX`. + #[inline] + pub fn append_with(&mut self, f: F) + where + F: FnOnce(&mut StringViewWriter<'_>), + { + let mut writer = StringViewWriter { + inline_buf: [0u8; 12], + inline_len: 0, + spill_cursor: None, + builder: self, + }; + f(&mut writer); + // Destructure to release the borrow on `self` and pull out the + // inline-buffer state by-value. Copy types only; the &mut self is + // dropped here, ending the borrow. + let StringViewWriter { + inline_buf, + inline_len, + spill_cursor, + .. + } = writer; + + match spill_cursor { + None => { + self.views + .push(make_view(&inline_buf[..inline_len as usize], 0, 0)); + } + Some(start) => { + let end = self.in_progress.len(); + let length: u32 = i32::try_from(end - start) + .expect("value length exceeds i32::MAX") + as u32; + let offset: u32 = + i32::try_from(start).expect("offset exceeds i32::MAX") as u32; + self.views.push(self.make_long_view( + length, + offset, + &self.in_progress[start..], + )); + } + } + } + fn flush_in_progress(&mut self) { if !self.in_progress.is_empty() { let block = std::mem::take(&mut self.in_progress); @@ -673,18 +886,173 @@ impl StringViewArrayBuilder { } } +/// [`StringWriter`] for [`StringViewArrayBuilder`]. +/// +/// The writer accumulates the first up-to-12 bytes of a row in a stack +/// buffer; if the row stays inline-sized, it never touches the data block. +/// On the first write that would exceed 12 bytes, the stack buffer is +/// spilled into the builder's in-progress block and subsequent writes go +/// directly there. +pub(crate) struct StringViewWriter<'a> { + inline_buf: [u8; 12], + inline_len: u8, + /// `None` while the row fits inline; becomes `Some(start)` (offset of + /// the row's first byte in `in_progress`) at first spill. + spill_cursor: Option, + builder: &'a mut StringViewArrayBuilder, +} + +impl StringWriter for StringViewWriter<'_> { + #[inline] + fn write_str(&mut self, s: &str) { + let bytes = s.as_bytes(); + if self.spill_cursor.is_some() { + self.builder.in_progress.extend_from_slice(bytes); + return; + } + + let inline_len = self.inline_len as usize; + let new_len = inline_len + bytes.len(); + if new_len <= 12 { + self.inline_buf[inline_len..new_len].copy_from_slice(bytes); + self.inline_len = new_len as u8; + return; + } + + // First spill of this row: `ensure_long_capacity` may flush the + // current block, which is safe because no row-data for this row + // is in it yet — the inline prefix is still in `inline_buf`. + self.builder.ensure_long_capacity(new_len as u32); + let cursor = self.builder.in_progress.len(); + self.builder + .in_progress + .extend_from_slice(&self.inline_buf[..inline_len]); + self.builder.in_progress.extend_from_slice(bytes); + self.spill_cursor = Some(cursor); + } + + #[inline] + fn write_char(&mut self, c: char) { + let len = c.len_utf8(); + if self.spill_cursor.is_some() { + push_char_to_vec(&mut self.builder.in_progress, c); + return; + } + + let inline_len = self.inline_len as usize; + let new_len = inline_len + len; + if new_len <= 12 { + c.encode_utf8(&mut self.inline_buf[inline_len..new_len]); + self.inline_len = new_len as u8; + return; + } + + self.builder.ensure_long_capacity(new_len as u32); + let cursor = self.builder.in_progress.len(); + self.builder + .in_progress + .extend_from_slice(&self.inline_buf[..inline_len]); + push_char_to_vec(&mut self.builder.in_progress, c); + self.spill_cursor = Some(cursor); + } +} + +#[inline] +fn push_char_to_vec(v: &mut Vec, c: char) { + let mut buf = [0u8; 4]; + v.extend_from_slice(c.encode_utf8(&mut buf).as_bytes()); +} + /// Trait abstracting over the bulk-NULL string array builders. /// /// Similar to Arrow's `StringLikeArrayBuilder`, this allows generic dispatch /// over the three string array types (Utf8, LargeUtf8, Utf8View) when the /// function body is uniform across them. +/// +/// Three methods append a non-null row; which method to pick depends on how the +/// row is produced: +/// +/// - [`append_value`](Self::append_value) pushes an already-finished `&str`. +/// Use it when the row is forwarded from an existing slice (e.g. an input +/// column) — there is nothing to elide. +/// - [`append_byte_map`](Self::append_byte_map) emits a row whose bytes are a +/// byte-to-byte mapping of an input slice. Output length is known up front +/// and the inner loop is straight-line, so this is the fastest path when the +/// shape fits. +/// - [`append_with`](Self::append_with) emits a row by feeding fragments to a +/// [`StringWriter`]. Use it when the row is computed from multiple sources or +/// when the output length is not known up front. Bytes are written directly +/// into the builder, so it is typically faster than assembling a `String` and +/// calling `append_value(&scratch)`. +/// +/// For a NULL row, call [`append_placeholder`](Self::append_placeholder) to +/// advance the row count without writing into the value buffer; the caller MUST +/// clear the corresponding bit in the null buffer passed to +/// [`finish`](Self::finish). pub(crate) trait BulkNullStringArrayBuilder { + /// Per-builder concrete writer type, exposed as a GAT so generic callers + /// can use the inherent (non-`dyn`) writer methods without vtable + /// dispatch. + type Writer<'a>: StringWriter + where + Self: 'a; + + /// Append `value` as the next row. + /// + /// # Panics + /// + /// Panics if the resulting array would exceed the per-implementation + /// size limit. See the inherent method on each builder for specifics. fn append_value(&mut self, value: &str); + + /// Append an empty placeholder row. The corresponding slot MUST be masked + /// as null by the null buffer passed to [`finish`](Self::finish). fn append_placeholder(&mut self); + + /// Append a row whose bytes are produced by `f` calling write methods on + /// the supplied [`StringWriter`]. + /// + /// The closure can call `write_str` or `write_char` on the supplied + /// `StringWriter` zero or more times. Zero calls produces a row containing + /// the empty string. + /// + /// # Panics + /// + /// See [`append_value`](Self::append_value). + fn append_with(&mut self, f: F) + where + F: for<'a> FnOnce(&mut Self::Writer<'a>); + + /// Append a row whose bytes are produced by mapping each byte of `src` + /// through `map`, in order. Output length equals `src.len()`. + /// + /// Because the output length is known up front and the inner loop is + /// straight-line, this is more efficient than + /// [`append_with`](Self::append_with) for byte-to-byte mappings and + /// autovectorizes well. + /// + /// # Safety + /// + /// The bytes produced by applying `map` to each byte of `src`, in order, + /// must form valid UTF-8. + /// + /// # Panics + /// + /// See [`append_value`](Self::append_value). + unsafe fn append_byte_map u8>(&mut self, src: &[u8], map: F); + + /// Finalize into a concrete array using the caller-supplied null buffer. + /// + /// # Errors + /// + /// Returns an error when `null_buffer.len()` does not match the number + /// of appended rows. fn finish(self, nulls: Option) -> Result; } impl BulkNullStringArrayBuilder for GenericStringArrayBuilder { + type Writer<'a> = GenericStringWriter<'a>; + #[inline] fn append_value(&mut self, value: &str) { GenericStringArrayBuilder::::append_value(self, value) @@ -693,6 +1061,18 @@ impl BulkNullStringArrayBuilder for GenericStringArrayBuilde fn append_placeholder(&mut self) { GenericStringArrayBuilder::::append_placeholder(self) } + #[inline] + fn append_with(&mut self, f: F) + where + F: for<'a> FnOnce(&mut Self::Writer<'a>), + { + GenericStringArrayBuilder::::append_with(self, f) + } + #[inline] + unsafe fn append_byte_map u8>(&mut self, src: &[u8], map: F) { + // SAFETY: contract forwarded. + unsafe { GenericStringArrayBuilder::::append_byte_map(self, src, map) } + } fn finish(self, nulls: Option) -> Result { Ok(Arc::new(GenericStringArrayBuilder::::finish( self, nulls, @@ -701,6 +1081,8 @@ impl BulkNullStringArrayBuilder for GenericStringArrayBuilde } impl BulkNullStringArrayBuilder for StringViewArrayBuilder { + type Writer<'a> = StringViewWriter<'a>; + #[inline] fn append_value(&mut self, value: &str) { StringViewArrayBuilder::append_value(self, value) @@ -709,6 +1091,18 @@ impl BulkNullStringArrayBuilder for StringViewArrayBuilder { fn append_placeholder(&mut self) { StringViewArrayBuilder::append_placeholder(self) } + #[inline] + fn append_with(&mut self, f: F) + where + F: for<'a> FnOnce(&mut Self::Writer<'a>), + { + StringViewArrayBuilder::append_with(self, f) + } + #[inline] + unsafe fn append_byte_map u8>(&mut self, src: &[u8], map: F) { + // SAFETY: contract forwarded. + unsafe { StringViewArrayBuilder::append_byte_map(self, src, map) } + } fn finish(self, nulls: Option) -> Result { Ok(Arc::new(StringViewArrayBuilder::finish(self, nulls)?)) } @@ -801,6 +1195,75 @@ impl ColumnarValueRef<'_> { mod tests { use super::*; + /// Run `scenario` against `builder`, finish with a null buffer derived + /// from `expected` (a bit is set wherever `expected[i].is_some()`), and + /// assert the resulting array equals the corresponding + /// `*Array::from(expected)`. + /// + /// The caller is responsible for driving NULLs in `scenario` — usually + /// by calling `append_placeholder` at each index where `expected[i]` is + /// `None`. + fn run_scenario(mut builder: B, expected: &[Option<&str>], scenario: F) + where + B: BulkNullStringArrayBuilder, + F: FnOnce(&mut B), + { + scenario(&mut builder); + let bits: Vec = expected.iter().map(|x| x.is_some()).collect(); + let nulls = if bits.iter().any(|v| !v) { + Some(NullBuffer::from(bits)) + } else { + None + }; + let array = builder.finish(nulls).unwrap(); + let owned: Vec> = expected.to_vec(); + if let Some(a) = array.as_any().downcast_ref::() { + assert_eq!(a, &StringArray::from(owned)); + } else if let Some(a) = array.as_any().downcast_ref::() { + assert_eq!(a, &LargeStringArray::from(owned)); + } else if let Some(a) = array.as_any().downcast_ref::() { + assert_eq!(a, &StringViewArray::from(owned)); + } else { + panic!("unexpected array type"); + } + } + + /// Run `$scenario` against all three bulk-null builders, asserting each + /// produces an array equivalent to `$expected`. `$scenario` is a closure + /// `|builder| { ... }`; it is duplicated syntactically at each call site + /// so the `BulkNullStringArrayBuilder::Writer` GAT can specialize per + /// builder. + macro_rules! check_on_all_builders { + ($expected:expr, $scenario:expr $(,)?) => {{ + let expected = $expected; + run_scenario( + GenericStringArrayBuilder::::with_capacity(0, 0), + expected, + $scenario, + ); + run_scenario( + GenericStringArrayBuilder::::with_capacity(0, 0), + expected, + $scenario, + ); + run_scenario( + StringViewArrayBuilder::with_capacity(0), + expected, + $scenario, + ); + }}; + } + + fn assert_finish_errs_on_length_mismatch(mut builder: B) + where + B: BulkNullStringArrayBuilder, + { + builder.append_value("a"); + builder.append_value("b"); + let nulls = NullBuffer::from(vec![true, false, true]); + assert!(builder.finish(Some(nulls)).is_err()); + } + #[test] #[should_panic(expected = "capacity integer overflow")] fn test_overflow_concat_string_builder() { @@ -814,47 +1277,107 @@ mod tests { } #[test] - fn string_array_builder_empty() { - let builder = GenericStringArrayBuilder::::with_capacity(0, 0); - let array = builder.finish(None).unwrap(); - assert_eq!(array.len(), 0); + fn bulk_append_value_with_nulls() { + check_on_all_builders!( + &[ + Some("a string longer than twelve bytes"), + None, + Some("short"), + None, + ], + |b| { + b.append_value("a string longer than twelve bytes"); + b.append_placeholder(); + b.append_value("short"); + b.append_placeholder(); + }, + ); } #[test] - fn string_array_builder_no_nulls() { - let mut builder = GenericStringArrayBuilder::::with_capacity(3, 16); - builder.append_value("foo"); - builder.append_value(""); - builder.append_value("hello world"); - let array = builder.finish(None).unwrap(); - assert_eq!(array.len(), 3); - assert_eq!(array.value(0), "foo"); - assert_eq!(array.value(1), ""); - assert_eq!(array.value(2), "hello world"); - assert_eq!(array.null_count(), 0); + fn bulk_empty_builder() { + check_on_all_builders!(&[], |_b| {}); } #[test] - fn string_array_builder_with_nulls() { - let mut builder = GenericStringArrayBuilder::::with_capacity(3, 8); - builder.append_value("a"); - builder.append_placeholder(); - builder.append_value("c"); - let nulls = NullBuffer::from(vec![true, false, true]); - let array = builder.finish(Some(nulls)).unwrap(); - assert_eq!(array.len(), 3); - assert_eq!(array.value(0), "a"); - assert!(array.is_null(1)); - assert_eq!(array.value(2), "c"); + fn bulk_all_placeholders() { + check_on_all_builders!(&[None, None, None], |b| { + b.append_placeholder(); + b.append_placeholder(); + b.append_placeholder(); + }); } #[test] - fn string_array_builder_null_buffer_length_mismatch() { - let mut builder = GenericStringArrayBuilder::::with_capacity(2, 4); - builder.append_value("a"); - builder.append_value("b"); - let nulls = NullBuffer::from(vec![true, false, true]); - assert!(builder.finish(Some(nulls)).is_err()); + fn bulk_append_value_no_nulls() { + check_on_all_builders!( + &[ + Some("foo"), + Some(""), + Some("a string longer than twelve bytes") + ], + |b| { + b.append_value("foo"); + b.append_value(""); + b.append_value("a string longer than twelve bytes"); + }, + ); + } + + #[test] + fn bulk_append_with() { + check_on_all_builders!( + &[ + Some("hello"), + None, + Some("hello world"), + Some("a long string of 25 bytes"), + Some(""), + ], + |b| { + b.append_with(|w| w.write_str("hello")); + b.append_placeholder(); + b.append_with(|w| { + w.write_str("hello "); + w.write_str("world"); + }); + b.append_with(|w| w.write_str("a long string of 25 bytes")); + b.append_with(|_w| {}); + }, + ); + } + + #[test] + fn bulk_append_with_chars() { + check_on_all_builders!(&[Some("hé!"), Some("x")], |b| { + b.append_with(|w| { + w.write_char('h'); + w.write_char('é'); + w.write_char('!'); + }); + b.append_with(|w| w.write_char('x')); + }); + } + + #[test] + fn bulk_append_byte_map() { + // SAFETY: ASCII inputs and ASCII outputs in every call. + check_on_all_builders!(&[Some("HELLO"), Some("aXcaX"), Some("")], |b| unsafe { + b.append_byte_map(b"hello", |x| x.to_ascii_uppercase()); + b.append_byte_map(b"abcab", |x| if x == b'b' { b'X' } else { x }); + b.append_byte_map(b"", |x| x); + },); + } + + #[test] + fn bulk_finish_errors_on_null_buffer_length_mismatch() { + assert_finish_errs_on_length_mismatch( + GenericStringArrayBuilder::::with_capacity(2, 4), + ); + assert_finish_errs_on_length_mismatch( + GenericStringArrayBuilder::::with_capacity(2, 4), + ); + assert_finish_errs_on_length_mismatch(StringViewArrayBuilder::with_capacity(2)); } #[test] @@ -879,108 +1402,165 @@ mod tests { } #[test] - fn string_array_builder_all_placeholders() { - let mut builder = GenericStringArrayBuilder::::with_capacity(3, 0); - builder.append_placeholder(); - builder.append_placeholder(); + #[cfg(debug_assertions)] + #[should_panic(expected = "placeholder rows")] + fn string_view_array_builder_placeholder_without_null_mask() { + let mut builder = StringViewArrayBuilder::with_capacity(2); + builder.append_value("a"); builder.append_placeholder(); - let nulls = NullBuffer::from(vec![false, false, false]); - let array = builder.finish(Some(nulls)).unwrap(); - assert_eq!(array.len(), 3); - assert_eq!(array.null_count(), 3); - assert!((0..3).all(|i| array.is_null(i))); + let nulls = NullBuffer::from(vec![true, true]); + let _ = builder.finish(Some(nulls)); } #[test] - fn large_string_array_builder_with_nulls() { - let mut builder = GenericStringArrayBuilder::::with_capacity(3, 8); - builder.append_value("a"); + #[cfg(debug_assertions)] + #[should_panic(expected = "placeholder rows")] + fn string_view_array_builder_placeholder_with_none_null_buffer() { + let mut builder = StringViewArrayBuilder::with_capacity(1); builder.append_placeholder(); - builder.append_value("c"); - let nulls = NullBuffer::from(vec![true, false, true]); - let array = builder.finish(Some(nulls)).unwrap(); - assert_eq!(array.len(), 3); - assert_eq!(array.value(0), "a"); - assert!(array.is_null(1)); - assert_eq!(array.value(2), "c"); + let _ = builder.finish(None); } #[test] - fn string_view_array_builder_empty() { - let builder = StringViewArrayBuilder::with_capacity(0); + fn string_view_array_builder_append_with_inline() { + // Rows that stay ≤ 12 bytes never touch the data block. + let mut builder = StringViewArrayBuilder::with_capacity(4); + let inputs = ["hello", "world!", "", "0123456789ab"]; + for s in &inputs { + builder.append_with(|w| w.write_str(s)); + } let array = builder.finish(None).unwrap(); - assert_eq!(array.len(), 0); + assert_eq!(array.len(), inputs.len()); + for (i, s) in inputs.iter().enumerate() { + assert_eq!(array.value(i), *s); + } + assert_eq!(array.data_buffers().len(), 0); } #[test] - fn string_view_array_builder_inline_and_buffer() { - let mut builder = StringViewArrayBuilder::with_capacity(3); - builder.append_value("short"); // ≤ 12 bytes, inline - builder.append_value("a string longer than twelve bytes"); - builder.append_value(""); + fn string_view_array_builder_append_byte_map() { + let mut builder = StringViewArrayBuilder::with_capacity(4); + // SAFETY: ASCII inputs and ASCII outputs in every call. + unsafe { + builder.append_byte_map(b"hello", |b| b.to_ascii_uppercase()); + builder.append_byte_map(b"a long string of 25 bytes", |b| { + if b == b' ' { b'_' } else { b } + }); + // 12 bytes — exactly at the inline boundary. + builder.append_byte_map(b"abcdefghijkl", |b| b); + builder.append_byte_map(b"", |b| b); + } let array = builder.finish(None).unwrap(); - assert_eq!(array.len(), 3); - assert_eq!(array.value(0), "short"); - assert_eq!(array.value(1), "a string longer than twelve bytes"); - assert_eq!(array.value(2), ""); + assert_eq!(array.value(0), "HELLO"); + assert_eq!(array.value(1), "a_long_string_of_25_bytes"); + assert_eq!(array.value(2), "abcdefghijkl"); + assert_eq!(array.value(3), ""); + assert_eq!(array.data_buffers().len(), 1); + assert_eq!(array.data_buffers()[0].len(), 25); } #[test] - fn string_view_array_builder_with_nulls() { - let mut builder = StringViewArrayBuilder::with_capacity(4); - builder.append_value("a string longer than twelve bytes"); - builder.append_placeholder(); - builder.append_value("short"); - builder.append_placeholder(); - let nulls = NullBuffer::from(vec![true, false, true, false]); - let array = builder.finish(Some(nulls)).unwrap(); - assert_eq!(array.len(), 4); - assert_eq!(array.value(0), "a string longer than twelve bytes"); - assert!(array.is_null(1)); - assert_eq!(array.value(2), "short"); - assert!(array.is_null(3)); + fn string_view_array_builder_append_with_at_inline_boundary() { + // Building exactly 12 bytes via several writes should still go inline. + let mut builder = StringViewArrayBuilder::with_capacity(2); + builder.append_with(|w| { + w.write_str("hello"); + w.write_str(" world!"); + }); + builder.append_with(|w| { + for _ in 0..6 { + w.write_str("ab"); + } + }); + let array = builder.finish(None).unwrap(); + assert_eq!(array.value(0), "hello world!"); + assert_eq!(array.value(1), "abababababab"); + assert_eq!(array.data_buffers().len(), 0); } #[test] - fn string_view_array_builder_all_placeholders() { - let mut builder = StringViewArrayBuilder::with_capacity(3); - builder.append_placeholder(); - builder.append_placeholder(); - builder.append_placeholder(); - let nulls = NullBuffer::from(vec![false, false, false]); - let array = builder.finish(Some(nulls)).unwrap(); - assert_eq!(array.len(), 3); - assert_eq!(array.null_count(), 3); - assert!((0..3).all(|i| array.is_null(i))); + fn string_view_array_builder_append_with_spill_on_overflow() { + // 12 bytes from one write, +1 byte from another → spill at boundary. + let mut builder = StringViewArrayBuilder::with_capacity(1); + builder.append_with(|w| { + w.write_str("hello world!"); + w.write_str("X"); + }); + let array = builder.finish(None).unwrap(); + assert_eq!(array.value(0), "hello world!X"); + assert_eq!(array.data_buffers().len(), 1); + assert_eq!(array.data_buffers()[0].len(), 13); } #[test] - fn string_view_array_builder_null_buffer_length_mismatch() { - let mut builder = StringViewArrayBuilder::with_capacity(2); - builder.append_value("a"); - builder.append_value("b"); - let nulls = NullBuffer::from(vec![true, false, true]); - assert!(builder.finish(Some(nulls)).is_err()); + fn string_view_array_builder_append_with_long_single_write() { + // A single write larger than 12 bytes spills immediately with an + // empty inline_buf prefix. + let mut builder = StringViewArrayBuilder::with_capacity(1); + builder.append_with(|w| w.write_str("a long string of 25 bytes")); + let array = builder.finish(None).unwrap(); + assert_eq!(array.value(0), "a long string of 25 bytes"); + assert_eq!(array.data_buffers().len(), 1); + assert_eq!(array.data_buffers()[0].len(), 25); } #[test] - #[cfg(debug_assertions)] - #[should_panic(expected = "placeholder rows")] - fn string_view_array_builder_placeholder_without_null_mask() { + fn string_view_array_builder_append_with_many_small_writes_spilling() { + // 30 × "ab" (60 bytes total): first 6 fit inline, remainder spills. + let mut builder = StringViewArrayBuilder::with_capacity(1); + builder.append_with(|w| { + for _ in 0..30 { + w.write_str("ab"); + } + }); + let array = builder.finish(None).unwrap(); + assert_eq!(array.value(0), "ab".repeat(30)); + assert_eq!(array.data_buffers().len(), 1); + assert_eq!(array.data_buffers()[0].len(), 60); + } + + #[test] + fn string_view_array_builder_append_with_chars() { + // write_char with multi-byte UTF-8: row 0 stays inline (3 bytes), + // row 1 spills (40 bytes). let mut builder = StringViewArrayBuilder::with_capacity(2); - builder.append_value("a"); - builder.append_placeholder(); - let nulls = NullBuffer::from(vec![true, true]); - let _ = builder.finish(Some(nulls)); + builder.append_with(|w| { + w.write_char('é'); + w.write_char('!'); + }); + builder.append_with(|w| { + for _ in 0..10 { + w.write_char('🦀'); + } + }); + let array = builder.finish(None).unwrap(); + assert_eq!(array.value(0), "é!"); + assert_eq!(array.value(1), "🦀".repeat(10)); } #[test] - #[cfg(debug_assertions)] - #[should_panic(expected = "placeholder rows")] - fn string_view_array_builder_placeholder_with_none_null_buffer() { - let mut builder = StringViewArrayBuilder::with_capacity(1); - builder.append_placeholder(); - let _ = builder.finish(None); + fn string_view_array_builder_append_with_block_rotation() { + // 40 long rows, 500 bytes each, exceeds the first doubled block + // (~16 KiB). Forces the builder to rotate blocks between rows. + const STR_LEN: usize = 500; + const N: usize = 40; + let s = "x".repeat(STR_LEN); + let mut builder = StringViewArrayBuilder::with_capacity(N); + for _ in 0..N { + builder.append_with(|w| w.write_str(&s)); + } + let array = builder.finish(None).unwrap(); + assert_eq!(array.len(), N); + assert!( + array.data_buffers().len() >= 2, + "expected multiple data buffers, got {}", + array.data_buffers().len() + ); + let total: usize = array.data_buffers().iter().map(|b| b.len()).sum(); + assert_eq!(total, N * STR_LEN); + for i in 0..N { + assert_eq!(array.value(i), s); + } } #[test] @@ -1004,65 +1584,4 @@ mod tests { assert_eq!(array.value(i), value); } } - - /// Build an array via `BulkNullStringArrayBuilder` to verify that the - /// trait methods produce the same result as the inherent methods. - fn build_via_trait( - mut builder: B, - nulls: Option, - ) -> ArrayRef { - builder.append_value("a"); - builder.append_placeholder(); - builder.append_value("hello world!"); - builder.finish(nulls).unwrap() - } - - #[test] - fn bulk_null_trait_string_i32() { - let builder = GenericStringArrayBuilder::::with_capacity(3, 16); - let nulls = NullBuffer::from(vec![true, false, true]); - let array = build_via_trait(builder, Some(nulls)); - let array = array.as_any().downcast_ref::().unwrap(); - assert_eq!(array.len(), 3); - assert_eq!(array.value(0), "a"); - assert!(array.is_null(1)); - assert_eq!(array.value(2), "hello world!"); - } - - #[test] - fn bulk_null_trait_string_i64() { - let builder = GenericStringArrayBuilder::::with_capacity(3, 16); - let nulls = NullBuffer::from(vec![true, false, true]); - let array = build_via_trait(builder, Some(nulls)); - let array = array.as_any().downcast_ref::().unwrap(); - assert_eq!(array.len(), 3); - assert_eq!(array.value(0), "a"); - assert!(array.is_null(1)); - assert_eq!(array.value(2), "hello world!"); - } - - #[test] - fn bulk_null_trait_string_view() { - let builder = StringViewArrayBuilder::with_capacity(3); - let nulls = NullBuffer::from(vec![true, false, true]); - let array = build_via_trait(builder, Some(nulls)); - let array = array.as_any().downcast_ref::().unwrap(); - assert_eq!(array.len(), 3); - assert_eq!(array.value(0), "a"); - assert!(array.is_null(1)); - assert_eq!(array.value(2), "hello world!"); - } - - #[test] - fn bulk_null_trait_no_nulls() { - let mut builder = GenericStringArrayBuilder::::with_capacity(2, 8); - BulkNullStringArrayBuilder::append_value(&mut builder, "x"); - BulkNullStringArrayBuilder::append_value(&mut builder, "yy"); - let array = BulkNullStringArrayBuilder::finish(builder, None).unwrap(); - let array = array.as_any().downcast_ref::().unwrap(); - assert_eq!(array.len(), 2); - assert_eq!(array.value(0), "x"); - assert_eq!(array.value(1), "yy"); - assert_eq!(array.null_count(), 0); - } }