Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion lib/codecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ memchr = { version = "2", default-features = false }
metrics.workspace = true
opentelemetry-proto = { path = "../opentelemetry-proto", optional = true }
ordered-float.workspace = true
parquet = { version = "39.0.0", default-features = false }
parquet = { version = "39.0.0", default-features = false, features = ["snap", "flate2", "zstd"] }
pin-project.workspace = true
prost.workspace = true
prost-reflect.workspace = true
Expand Down
10 changes: 8 additions & 2 deletions lib/codecs/src/encoding/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,14 @@ impl EncodingConfigWithFraming {

/// Build a `BatchSerializer` for this config, if the configured serializer is batched
/// (e.g. Parquet). Returns `None` for per-event serializers.
pub fn build_batched(&self) -> vector_common::Result<Option<BatchSerializer>> {
self.encoding.config().build_batched()
///
/// `parquet_compression` is applied to parquet column chunks when the inner serializer
/// is `Parquet`; it is ignored for other codecs.
pub fn build_batched(
&self,
parquet_compression: parquet::basic::Compression,
) -> vector_common::Result<Option<BatchSerializer>> {
self.encoding.config().build_batched(parquet_compression)
}
}

Expand Down
70 changes: 60 additions & 10 deletions lib/codecs/src/encoding/format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{io, sync::Arc};

use bytes::{BufMut, BytesMut};
use parquet::{
basic::{LogicalType, Repetition, Type as PhysicalType},
basic::{Compression, LogicalType, Repetition, Type as PhysicalType},
column::writer::{ColumnWriter::*, ColumnWriterImpl},
data_type::DataType,
errors::ParquetError,
Expand Down Expand Up @@ -96,13 +96,16 @@ impl ParquetSerializerConfig {
}

/// Build the `ParquetSerializerConfig` from this configuration.
pub fn build(&self) -> Result<ParquetSerializer, BuildError> {
///
/// `compression` is applied to parquet column chunks at write time.
pub fn build(&self, compression: Compression) -> Result<ParquetSerializer, BuildError> {
let schema = parse_message_type(&self.parquet.schema)
.map_err(|error| format!("Failed building Parquet serializer: {}", error))?;
self.validate_logical_schema(&schema)
.map_err(|error| format!("Failed building Parquet serializer: {}", error))?;
Ok(ParquetSerializer {
schema: Arc::new(schema),
compression,
})
}

Expand Down Expand Up @@ -254,12 +257,16 @@ pub struct ParquetSerializerOptions {
#[derive(Debug, Clone)]
pub struct ParquetSerializer {
schema: TypePtr,
compression: Compression,
}

impl ParquetSerializer {
/// Creates a new `ParquetSerializer`.
pub const fn new(schema: TypePtr) -> Self {
Self { schema }
/// Creates a new `ParquetSerializer` with the given compression applied to column chunks.
pub const fn new(schema: TypePtr, compression: Compression) -> Self {
Self {
schema,
compression,
}
}
}

Expand Down Expand Up @@ -292,7 +299,11 @@ impl Encoder<Vec<Event>> for ParquetSerializer {
/// Expects that all events satisfy the schema, else whole batch can fail.
fn encode(&mut self, events: Vec<Event>, buffer: &mut BytesMut) -> Result<(), Self::Error> {
// Encode events
let props = Arc::new(WriterProperties::builder().build());
let props = Arc::new(
WriterProperties::builder()
.set_compression(self.compression)
.build(),
);
let mut parquet_writer =
SerializedFileWriter::new(buffer.writer(), self.schema.clone(), props)?;

Expand Down Expand Up @@ -721,7 +732,7 @@ mod tests {
use std::panic;
use std::{collections::HashSet, sync::Arc};
use vector_core::event::LogEvent;
use vrl::value::btreemap;
use vrl::btreemap;

macro_rules! log_event {
($($key:expr => $value:expr),* $(,)?) => {
Expand Down Expand Up @@ -778,7 +789,7 @@ mod tests {
validate: impl Fn(usize, &str, &dyn RowGroupReader),
) {
let schema = Arc::new(parse_message_type(schema).unwrap());
let mut encoder = ParquetSerializer::new(schema);
let mut encoder = ParquetSerializer::new(schema, Compression::UNCOMPRESSED);

let mut buffer = BytesMut::new();
encoder.encode(events, &mut buffer).unwrap();
Expand Down Expand Up @@ -1210,6 +1221,45 @@ mod tests {
);
}

#[test]
fn snappy_compression_round_trips() {
// Build a parquet file with SNAPPY column-chunk compression and verify:
// - the output is a valid parquet file (PAR1 magic at start and end)
// - the emitted column chunk metadata reports Compression::SNAPPY
let message_type = r#"
message test {
required binary name (STRING);
}
"#;
let schema = Arc::new(parse_message_type(message_type).unwrap());
let mut encoder = ParquetSerializer::new(schema, Compression::SNAPPY);

let mut buffer = BytesMut::new();
let events: Vec<Event> = vec![
LogEvent::from(btreemap! { "name" => "a" }).into(),
LogEvent::from(btreemap! { "name" => "b" }).into(),
];
encoder.encode(events, &mut buffer).unwrap();
let bytes = buffer.freeze();

assert!(bytes.len() >= 8, "output too short to be parquet");
assert_eq!(&bytes[..4], b"PAR1", "missing parquet header magic");
assert_eq!(
&bytes[bytes.len() - 4..],
b"PAR1",
"missing parquet footer magic"
);

let reader = SerializedFileReader::new(bytes).unwrap();
let metadata = reader.metadata();
let column = metadata.row_group(0).column(0);
assert_eq!(
column.compression(),
Compression::SNAPPY,
"expected column chunk to be snappy-compressed"
);
}

#[test]
fn illegal_list_scheme() {
let config = ParquetSerializerConfig {
Expand All @@ -1227,7 +1277,7 @@ mod tests {
},
};

assert!(config.build().is_err());
assert!(config.build(Compression::UNCOMPRESSED).is_err());
}

#[test]
Expand Down Expand Up @@ -1312,6 +1362,6 @@ mod tests {
},
};

assert!(config.build().is_err());
assert!(config.build(Compression::UNCOMPRESSED).is_err());
}
}
8 changes: 8 additions & 0 deletions lib/codecs/src/encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ pub use serializer::BatchSerializerConfig;
pub use serializer::{Serializer, SerializerConfig};
pub use transformer::{TimestampFormat, Transformer};

/// Re-exports of the parquet crate's column-chunk compression types, for callers
/// (e.g. the `aws_s3` sink) that build a batched parquet serializer and need to
/// pass a compression setting through `EncodingConfigWithFraming::build_batched`.
pub use ::parquet::basic::{
Compression as ParquetCompression, GzipLevel as ParquetGzipLevel,
ZstdLevel as ParquetZstdLevel,
};

/// An error that occurred while building an encoder.
pub type BuildError = Box<dyn std::error::Error + Send + Sync + 'static>;

Expand Down
7 changes: 5 additions & 2 deletions lib/codecs/src/encoding/serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,13 +312,16 @@ impl SerializerConfig {
}

/// Build the `BatchSerializer` from this configuration.
/// Returns `None` if the serializer is not batched.
///
/// Returns `None` if the serializer is not batched. `parquet_compression` is applied to
/// parquet column chunks when the serializer is `Parquet`; it is ignored for other codecs.
pub fn build_batched(
&self,
parquet_compression: parquet::basic::Compression,
) -> Result<Option<BatchSerializer>, Box<dyn std::error::Error + Send + Sync + 'static>> {
match self {
SerializerConfig::Parquet { parquet } => Ok(Some(BatchSerializer::Parquet(
ParquetSerializerConfig::new(parquet.schema.clone()).build()?,
ParquetSerializerConfig::new(parquet.schema.clone()).build(parquet_compression)?,
))),
_ => Ok(None),
}
Expand Down
51 changes: 46 additions & 5 deletions src/sinks/aws_s3/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use vector_lib::{
TimeZone,
codecs::{
BatchEncoder, TextSerializerConfig,
encoding::{EncoderKind, Framer, FramingConfig},
encoding::{
EncoderKind, Framer, FramingConfig, ParquetCompression, ParquetGzipLevel,
ParquetZstdLevel, SerializerConfig,
},
},
configurable::configurable_component,
sink::VectorSink,
Expand Down Expand Up @@ -246,11 +249,29 @@ impl S3SinkConfig {
let partitioner = S3KeyPartitioner::new(key_prefix, ssekms_key_id, None);

let transformer = self.encoding.transformer();
let encoder = if let Some(batch_serializer) = self.encoding.build_batched()? {
EncoderKind::Batch(BatchEncoder::new(batch_serializer))
// For the parquet codec, the sink-level `compression` value feeds into parquet's
// column-chunk compression (via `WriterProperties`). The transport-layer wrap is
// then forced to `None` so the resulting S3 object is a raw parquet file starting
// with `PAR1` — gzipping it again would make DuckDB / Snowflake reject it.
let (encoder, compression) = if matches!(
self.encoding.config().1,
SerializerConfig::Parquet { .. }
) {
let parquet_compression = vector_compression_to_parquet(self.compression)?;
let batch = self
.encoding
.build_batched(parquet_compression)?
.expect("codec=parquet but build_batched returned None");
(
EncoderKind::Batch(BatchEncoder::new(batch)),
Compression::None,
)
} else {
let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
EncoderKind::Framed(Box::new(Encoder::<Framer>::new(framer, serializer)))
(
EncoderKind::Framed(Box::new(Encoder::<Framer>::new(framer, serializer))),
self.compression,
)
};

let request_options = S3RequestOptions {
Expand All @@ -260,7 +281,7 @@ impl S3SinkConfig {
filename_time_format: self.filename_time_format.clone(),
filename_append_uuid: self.filename_append_uuid,
encoder: (transformer, encoder),
compression: self.compression,
compression,
filename_tz_offset: offset,
};

Expand All @@ -285,6 +306,26 @@ impl S3SinkConfig {
}
}

/// Map the sink-level `Compression` enum to the parquet crate's column-chunk
/// `Compression`. Parquet-native codecs only; `Zlib` has no parquet equivalent.
///
/// Level hints from `CompressionLevel` are intentionally discarded — parquet's own
/// default levels (e.g. `GzipLevel::default()`) are used. If a user needs to tune
/// parquet compression level specifically, we can add it as a follow-up.
fn vector_compression_to_parquet(c: Compression) -> crate::Result<ParquetCompression> {
Ok(match c {
Compression::None => ParquetCompression::UNCOMPRESSED,
Compression::Snappy => ParquetCompression::SNAPPY,
Compression::Gzip(_) => ParquetCompression::GZIP(ParquetGzipLevel::default()),
Compression::Zstd(_) => ParquetCompression::ZSTD(ParquetZstdLevel::default()),
Compression::Zlib(_) => {
return Err("zlib compression is not supported for the parquet codec; \
use gzip, zstd, snappy, or none"
.into());
}
})
}

#[cfg(test)]
mod tests {
use super::S3SinkConfig;
Expand Down
Loading