From 582592492be2acbc9fbbab9c8b9c55069ae9d6c6 Mon Sep 17 00:00:00 2001 From: sundaresanr Date: Mon, 20 Apr 2026 20:23:40 -0700 Subject: [PATCH] Fix parquet compression handling in aws_s3 sink MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two bugs caused parquet sinks to emit unreadable files unless users explicitly set compression: none: 1. Sink-level `compression` (default gzip) wrapped the parquet bytes, so the S3 object started with gzip magic instead of PAR1 — DuckDB and Snowflake rejected it. 2. Parquet internal compression was hardcoded to UNCOMPRESSED, and the parquet crate was built without snap/flate2/zstd features so even trying SNAPPY would panic at runtime. Fix: when codec=parquet, feed the sink-level compression into the parquet writer's WriterProperties and force transport-layer compression to None. Non-parquet codecs are unaffected. Vector Compression -> parquet: None -> UNCOMPRESSED, Snappy -> SNAPPY, Gzip -> GZIP, Zstd -> ZSTD. Zlib rejected at build time (no parquet equivalent). Also fixes a broken test import (vrl::value::btreemap was made private) so the parquet test module actually compiles. --- Cargo.lock | 3 + lib/codecs/Cargo.toml | 2 +- lib/codecs/src/encoding/config.rs | 10 +++- lib/codecs/src/encoding/format/parquet.rs | 70 +++++++++++++++++++---- lib/codecs/src/encoding/mod.rs | 8 +++ lib/codecs/src/encoding/serializer.rs | 7 ++- src/sinks/aws_s3/config.rs | 51 +++++++++++++++-- 7 files changed, 131 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d7095c55b158d..1ecb861658bbe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7826,13 +7826,16 @@ dependencies = [ "ahash 0.8.11", "bytes 1.11.1", "chrono", + "flate2", "hashbrown 0.13.2", "num", "num-bigint", "paste", "seq-macro", + "snap", "thrift", "twox-hash 1.6.3", + "zstd 0.12.4", ] [[package]] diff --git a/lib/codecs/Cargo.toml b/lib/codecs/Cargo.toml index a81de3e51239f..7cbc0c1c3aaca 100644 --- a/lib/codecs/Cargo.toml +++ b/lib/codecs/Cargo.toml @@ -31,7 +31,7 @@ memchr = { version = "2", default-features = false } metrics.workspace = true opentelemetry-proto = { path = "../opentelemetry-proto", optional = true } ordered-float.workspace = true -parquet = { version = "39.0.0", default-features = false } +parquet = { version = "39.0.0", default-features = false, features = ["snap", "flate2", "zstd"] } pin-project.workspace = true prost.workspace = true prost-reflect.workspace = true diff --git a/lib/codecs/src/encoding/config.rs b/lib/codecs/src/encoding/config.rs index 096c28f86b6ef..15ee5c53ddfef 100644 --- a/lib/codecs/src/encoding/config.rs +++ b/lib/codecs/src/encoding/config.rs @@ -152,8 +152,14 @@ impl EncodingConfigWithFraming { /// Build a `BatchSerializer` for this config, if the configured serializer is batched /// (e.g. Parquet). Returns `None` for per-event serializers. - pub fn build_batched(&self) -> vector_common::Result> { - self.encoding.config().build_batched() + /// + /// `parquet_compression` is applied to parquet column chunks when the inner serializer + /// is `Parquet`; it is ignored for other codecs. + pub fn build_batched( + &self, + parquet_compression: parquet::basic::Compression, + ) -> vector_common::Result> { + self.encoding.config().build_batched(parquet_compression) } } diff --git a/lib/codecs/src/encoding/format/parquet.rs b/lib/codecs/src/encoding/format/parquet.rs index bf62381865b1c..bcba326b60aba 100644 --- a/lib/codecs/src/encoding/format/parquet.rs +++ b/lib/codecs/src/encoding/format/parquet.rs @@ -3,7 +3,7 @@ use std::{io, sync::Arc}; use bytes::{BufMut, BytesMut}; use parquet::{ - basic::{LogicalType, Repetition, Type as PhysicalType}, + basic::{Compression, LogicalType, Repetition, Type as PhysicalType}, column::writer::{ColumnWriter::*, ColumnWriterImpl}, data_type::DataType, errors::ParquetError, @@ -96,13 +96,16 @@ impl ParquetSerializerConfig { } /// Build the `ParquetSerializerConfig` from this configuration. - pub fn build(&self) -> Result { + /// + /// `compression` is applied to parquet column chunks at write time. + pub fn build(&self, compression: Compression) -> Result { let schema = parse_message_type(&self.parquet.schema) .map_err(|error| format!("Failed building Parquet serializer: {}", error))?; self.validate_logical_schema(&schema) .map_err(|error| format!("Failed building Parquet serializer: {}", error))?; Ok(ParquetSerializer { schema: Arc::new(schema), + compression, }) } @@ -254,12 +257,16 @@ pub struct ParquetSerializerOptions { #[derive(Debug, Clone)] pub struct ParquetSerializer { schema: TypePtr, + compression: Compression, } impl ParquetSerializer { - /// Creates a new `ParquetSerializer`. - pub const fn new(schema: TypePtr) -> Self { - Self { schema } + /// Creates a new `ParquetSerializer` with the given compression applied to column chunks. + pub const fn new(schema: TypePtr, compression: Compression) -> Self { + Self { + schema, + compression, + } } } @@ -292,7 +299,11 @@ impl Encoder> for ParquetSerializer { /// Expects that all events satisfy the schema, else whole batch can fail. fn encode(&mut self, events: Vec, buffer: &mut BytesMut) -> Result<(), Self::Error> { // Encode events - let props = Arc::new(WriterProperties::builder().build()); + let props = Arc::new( + WriterProperties::builder() + .set_compression(self.compression) + .build(), + ); let mut parquet_writer = SerializedFileWriter::new(buffer.writer(), self.schema.clone(), props)?; @@ -721,7 +732,7 @@ mod tests { use std::panic; use std::{collections::HashSet, sync::Arc}; use vector_core::event::LogEvent; - use vrl::value::btreemap; + use vrl::btreemap; macro_rules! log_event { ($($key:expr => $value:expr),* $(,)?) => { @@ -778,7 +789,7 @@ mod tests { validate: impl Fn(usize, &str, &dyn RowGroupReader), ) { let schema = Arc::new(parse_message_type(schema).unwrap()); - let mut encoder = ParquetSerializer::new(schema); + let mut encoder = ParquetSerializer::new(schema, Compression::UNCOMPRESSED); let mut buffer = BytesMut::new(); encoder.encode(events, &mut buffer).unwrap(); @@ -1210,6 +1221,45 @@ mod tests { ); } + #[test] + fn snappy_compression_round_trips() { + // Build a parquet file with SNAPPY column-chunk compression and verify: + // - the output is a valid parquet file (PAR1 magic at start and end) + // - the emitted column chunk metadata reports Compression::SNAPPY + let message_type = r#" + message test { + required binary name (STRING); + } + "#; + let schema = Arc::new(parse_message_type(message_type).unwrap()); + let mut encoder = ParquetSerializer::new(schema, Compression::SNAPPY); + + let mut buffer = BytesMut::new(); + let events: Vec = vec![ + LogEvent::from(btreemap! { "name" => "a" }).into(), + LogEvent::from(btreemap! { "name" => "b" }).into(), + ]; + encoder.encode(events, &mut buffer).unwrap(); + let bytes = buffer.freeze(); + + assert!(bytes.len() >= 8, "output too short to be parquet"); + assert_eq!(&bytes[..4], b"PAR1", "missing parquet header magic"); + assert_eq!( + &bytes[bytes.len() - 4..], + b"PAR1", + "missing parquet footer magic" + ); + + let reader = SerializedFileReader::new(bytes).unwrap(); + let metadata = reader.metadata(); + let column = metadata.row_group(0).column(0); + assert_eq!( + column.compression(), + Compression::SNAPPY, + "expected column chunk to be snappy-compressed" + ); + } + #[test] fn illegal_list_scheme() { let config = ParquetSerializerConfig { @@ -1227,7 +1277,7 @@ mod tests { }, }; - assert!(config.build().is_err()); + assert!(config.build(Compression::UNCOMPRESSED).is_err()); } #[test] @@ -1312,6 +1362,6 @@ mod tests { }, }; - assert!(config.build().is_err()); + assert!(config.build(Compression::UNCOMPRESSED).is_err()); } } diff --git a/lib/codecs/src/encoding/mod.rs b/lib/codecs/src/encoding/mod.rs index d07b661d04db4..f4ba9fd8b1a79 100644 --- a/lib/codecs/src/encoding/mod.rs +++ b/lib/codecs/src/encoding/mod.rs @@ -41,6 +41,14 @@ pub use serializer::BatchSerializerConfig; pub use serializer::{Serializer, SerializerConfig}; pub use transformer::{TimestampFormat, Transformer}; +/// Re-exports of the parquet crate's column-chunk compression types, for callers +/// (e.g. the `aws_s3` sink) that build a batched parquet serializer and need to +/// pass a compression setting through `EncodingConfigWithFraming::build_batched`. +pub use ::parquet::basic::{ + Compression as ParquetCompression, GzipLevel as ParquetGzipLevel, + ZstdLevel as ParquetZstdLevel, +}; + /// An error that occurred while building an encoder. pub type BuildError = Box; diff --git a/lib/codecs/src/encoding/serializer.rs b/lib/codecs/src/encoding/serializer.rs index 5ce8478e4c53a..a483af8d56a9f 100644 --- a/lib/codecs/src/encoding/serializer.rs +++ b/lib/codecs/src/encoding/serializer.rs @@ -312,13 +312,16 @@ impl SerializerConfig { } /// Build the `BatchSerializer` from this configuration. - /// Returns `None` if the serializer is not batched. + /// + /// Returns `None` if the serializer is not batched. `parquet_compression` is applied to + /// parquet column chunks when the serializer is `Parquet`; it is ignored for other codecs. pub fn build_batched( &self, + parquet_compression: parquet::basic::Compression, ) -> Result, Box> { match self { SerializerConfig::Parquet { parquet } => Ok(Some(BatchSerializer::Parquet( - ParquetSerializerConfig::new(parquet.schema.clone()).build()?, + ParquetSerializerConfig::new(parquet.schema.clone()).build(parquet_compression)?, ))), _ => Ok(None), } diff --git a/src/sinks/aws_s3/config.rs b/src/sinks/aws_s3/config.rs index 217526629e55f..3c324f1c85746 100644 --- a/src/sinks/aws_s3/config.rs +++ b/src/sinks/aws_s3/config.rs @@ -4,7 +4,10 @@ use vector_lib::{ TimeZone, codecs::{ BatchEncoder, TextSerializerConfig, - encoding::{EncoderKind, Framer, FramingConfig}, + encoding::{ + EncoderKind, Framer, FramingConfig, ParquetCompression, ParquetGzipLevel, + ParquetZstdLevel, SerializerConfig, + }, }, configurable::configurable_component, sink::VectorSink, @@ -246,11 +249,29 @@ impl S3SinkConfig { let partitioner = S3KeyPartitioner::new(key_prefix, ssekms_key_id, None); let transformer = self.encoding.transformer(); - let encoder = if let Some(batch_serializer) = self.encoding.build_batched()? { - EncoderKind::Batch(BatchEncoder::new(batch_serializer)) + // For the parquet codec, the sink-level `compression` value feeds into parquet's + // column-chunk compression (via `WriterProperties`). The transport-layer wrap is + // then forced to `None` so the resulting S3 object is a raw parquet file starting + // with `PAR1` — gzipping it again would make DuckDB / Snowflake reject it. + let (encoder, compression) = if matches!( + self.encoding.config().1, + SerializerConfig::Parquet { .. } + ) { + let parquet_compression = vector_compression_to_parquet(self.compression)?; + let batch = self + .encoding + .build_batched(parquet_compression)? + .expect("codec=parquet but build_batched returned None"); + ( + EncoderKind::Batch(BatchEncoder::new(batch)), + Compression::None, + ) } else { let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?; - EncoderKind::Framed(Box::new(Encoder::::new(framer, serializer))) + ( + EncoderKind::Framed(Box::new(Encoder::::new(framer, serializer))), + self.compression, + ) }; let request_options = S3RequestOptions { @@ -260,7 +281,7 @@ impl S3SinkConfig { filename_time_format: self.filename_time_format.clone(), filename_append_uuid: self.filename_append_uuid, encoder: (transformer, encoder), - compression: self.compression, + compression, filename_tz_offset: offset, }; @@ -285,6 +306,26 @@ impl S3SinkConfig { } } +/// Map the sink-level `Compression` enum to the parquet crate's column-chunk +/// `Compression`. Parquet-native codecs only; `Zlib` has no parquet equivalent. +/// +/// Level hints from `CompressionLevel` are intentionally discarded — parquet's own +/// default levels (e.g. `GzipLevel::default()`) are used. If a user needs to tune +/// parquet compression level specifically, we can add it as a follow-up. +fn vector_compression_to_parquet(c: Compression) -> crate::Result { + Ok(match c { + Compression::None => ParquetCompression::UNCOMPRESSED, + Compression::Snappy => ParquetCompression::SNAPPY, + Compression::Gzip(_) => ParquetCompression::GZIP(ParquetGzipLevel::default()), + Compression::Zstd(_) => ParquetCompression::ZSTD(ParquetZstdLevel::default()), + Compression::Zlib(_) => { + return Err("zlib compression is not supported for the parquet codec; \ + use gzip, zstd, snappy, or none" + .into()); + } + }) +} + #[cfg(test)] mod tests { use super::S3SinkConfig;