diff --git a/Cargo.lock b/Cargo.lock index bd72b08a71049..d7095c55b158d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2536,6 +2536,7 @@ dependencies = [ "metrics", "opentelemetry-proto", "ordered-float 4.6.0", + "parquet", "pin-project", "prost 0.12.6", "prost-reflect", @@ -4704,6 +4705,12 @@ dependencies = [ "ahash 0.7.7", ] +[[package]] +name = "hashbrown" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" + [[package]] name = "hashbrown" version = "0.14.5" @@ -5672,6 +5679,12 @@ dependencies = [ "web-sys", ] +[[package]] +name = "integer-encoding" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" + [[package]] name = "inventory" version = "0.3.22" @@ -6509,7 +6522,7 @@ version = "0.11.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08ab2867e3eeeca90e844d1940eab391c9dc5228783db2ed999acbc0a9ed375a" dependencies = [ - "twox-hash", + "twox-hash 2.1.2", ] [[package]] @@ -7804,6 +7817,24 @@ dependencies = [ "windows-link 0.2.0", ] +[[package]] +name = "parquet" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0a1e6fa27f09ebddba280f5966ef435f3ac4d74cfc3ffe370fd3fd59c2e004d" +dependencies = [ + "ahash 0.8.11", + "bytes 1.11.1", + "chrono", + "hashbrown 0.13.2", + "num", + "num-bigint", + "paste", + "seq-macro", + "thrift", + "twox-hash 1.6.3", +] + [[package]] name = "parse-size" version = "1.1.0" @@ -9986,6 +10017,12 @@ dependencies = [ "serde_core", ] +[[package]] +name = "seq-macro" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bc711410fbe7399f390ca1c3b60ad0f53f80e95c5eb935e52268a0e2cd49acc" + [[package]] name = "serde" version = "1.0.228" @@ -11228,6 +11265,17 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "thrift" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" +dependencies = [ + "byteorder", + "integer-encoding", + "ordered-float 2.10.1", +] + [[package]] name = "tikv-jemalloc-sys" version = "0.6.1+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7" @@ -12101,6 +12149,16 @@ dependencies = [ "utf-8", ] +[[package]] +name = "twox-hash" +version = "1.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" +dependencies = [ + "cfg-if", + "static_assertions", +] + [[package]] name = "twox-hash" version = "2.1.2" @@ -12985,7 +13043,7 @@ dependencies = [ "tokio-util", "tower 0.5.3", "tracing 0.1.44", - "twox-hash", + "twox-hash 2.1.2", "vector-common", "vector-core", ] diff --git a/lib/codecs/Cargo.toml b/lib/codecs/Cargo.toml index 0ed26d0b16a41..a81de3e51239f 100644 --- a/lib/codecs/Cargo.toml +++ b/lib/codecs/Cargo.toml @@ -31,6 +31,7 @@ memchr = { version = "2", default-features = false } metrics.workspace = true opentelemetry-proto = { path = "../opentelemetry-proto", optional = true } ordered-float.workspace = true +parquet = { version = "39.0.0", default-features = false } pin-project.workspace = true prost.workspace = true prost-reflect.workspace = true diff --git a/lib/codecs/src/encoding/config.rs b/lib/codecs/src/encoding/config.rs index 32d18c9e83d00..096c28f86b6ef 100644 --- a/lib/codecs/src/encoding/config.rs +++ b/lib/codecs/src/encoding/config.rs @@ -1,6 +1,6 @@ use vector_config::configurable_component; -use super::{Encoder, EncoderKind, Transformer}; +use super::{BatchSerializer, Encoder, EncoderKind, Transformer}; use crate::encoding::{ CharacterDelimitedEncoder, Framer, FramingConfig, LengthDelimitedEncoder, NewlineDelimitedEncoder, Serializer, SerializerConfig, @@ -149,6 +149,12 @@ impl EncodingConfigWithFraming { let encoder = EncoderKind::Framed(Box::new(Encoder::::new(framer, serializer))); Ok((self.transformer(), encoder)) } + + /// Build a `BatchSerializer` for this config, if the configured serializer is batched + /// (e.g. Parquet). Returns `None` for per-event serializers. + pub fn build_batched(&self) -> vector_common::Result> { + self.encoding.config().build_batched() + } } /// The way a sink processes outgoing events. diff --git a/lib/codecs/src/encoding/encoder.rs b/lib/codecs/src/encoding/encoder.rs index 4924dd05447b1..0e6499afa1be8 100644 --- a/lib/codecs/src/encoding/encoder.rs +++ b/lib/codecs/src/encoding/encoder.rs @@ -6,7 +6,7 @@ use vector_core::event::Event; #[cfg(feature = "arrow")] use crate::encoding::ArrowStreamSerializer; use crate::{ - encoding::{Error, Framer, Serializer}, + encoding::{Error, Framer, ParquetSerializer, Serializer}, internal_events::{EncoderFramingError, EncoderSerializeError}, }; @@ -16,6 +16,14 @@ pub enum BatchSerializer { /// Arrow IPC stream format serializer. #[cfg(feature = "arrow")] Arrow(ArrowStreamSerializer), + /// Apache Parquet file format serializer. + Parquet(ParquetSerializer), +} + +impl From for BatchSerializer { + fn from(serializer: ParquetSerializer) -> Self { + Self::Parquet(serializer) + } } /// An encoder that encodes batches of events. @@ -36,10 +44,11 @@ impl BatchEncoder { } /// Get the HTTP content type. - #[cfg(feature = "arrow")] pub const fn content_type(&self) -> &'static str { match &self.serializer { + #[cfg(feature = "arrow")] BatchSerializer::Arrow(_) => "application/vnd.apache.arrow.stream", + BatchSerializer::Parquet(_) => "application/vnd.apache.parquet", } } } @@ -47,9 +56,7 @@ impl BatchEncoder { impl tokio_util::codec::Encoder> for BatchEncoder { type Error = Error; - #[allow(unused_variables)] fn encode(&mut self, events: Vec, buffer: &mut BytesMut) -> Result<(), Self::Error> { - #[allow(unreachable_patterns)] match &mut self.serializer { #[cfg(feature = "arrow")] BatchSerializer::Arrow(serializer) => { @@ -63,7 +70,9 @@ impl tokio_util::codec::Encoder> for BatchEncoder { } }) } - _ => unreachable!("BatchSerializer cannot be constructed without encode()"), + BatchSerializer::Parquet(serializer) => serializer + .encode(events, buffer) + .map_err(Error::SerializingError), } } } @@ -74,7 +83,6 @@ pub enum EncoderKind { /// Uses framing to encode individual events Framed(Box>), /// Encodes events in batches without framing - #[cfg(feature = "arrow")] Batch(BatchEncoder), } diff --git a/lib/codecs/src/encoding/format/mod.rs b/lib/codecs/src/encoding/format/mod.rs index 85bc094b26947..dbfea7f618ebd 100644 --- a/lib/codecs/src/encoding/format/mod.rs +++ b/lib/codecs/src/encoding/format/mod.rs @@ -16,6 +16,7 @@ mod native; mod native_json; #[cfg(feature = "opentelemetry")] mod otlp; +mod parquet; mod protobuf; mod raw_message; #[cfg(feature = "syslog")] @@ -39,6 +40,7 @@ pub use native::{NativeSerializer, NativeSerializerConfig}; pub use native_json::{NativeJsonSerializer, NativeJsonSerializerConfig}; #[cfg(feature = "opentelemetry")] pub use otlp::{OtlpSerializer, OtlpSerializerConfig}; +pub use parquet::{ParquetSerializer, ParquetSerializerConfig, ParquetSerializerOptions}; pub use protobuf::{ProtobufSerializer, ProtobufSerializerConfig, ProtobufSerializerOptions}; pub use raw_message::{RawMessageSerializer, RawMessageSerializerConfig}; #[cfg(feature = "syslog")] diff --git a/lib/codecs/src/encoding/format/parquet.rs b/lib/codecs/src/encoding/format/parquet.rs new file mode 100644 index 0000000000000..bf62381865b1c --- /dev/null +++ b/lib/codecs/src/encoding/format/parquet.rs @@ -0,0 +1,1317 @@ +use core::panic; +use std::{io, sync::Arc}; + +use bytes::{BufMut, BytesMut}; +use parquet::{ + basic::{LogicalType, Repetition, Type as PhysicalType}, + column::writer::{ColumnWriter::*, ColumnWriterImpl}, + data_type::DataType, + errors::ParquetError, + file::{properties::WriterProperties, writer::SerializedFileWriter}, + schema::{ + parser::parse_message_type, + types::{BasicTypeInfo, ColumnDescriptor, Type, TypePtr}, + }, +}; +use serde::{Deserialize, Serialize}; +use snafu::*; +use tokio_util::codec::Encoder; +use tracing::error; + +use vector_config::configurable_component; +use vector_core::{ + config, + event::{Event, Value}, + schema, +}; + +use crate::encoding::BuildError; + +/// Errors that can occur during Parquet serialization. +#[derive(Debug, Snafu)] +pub enum ParquetSerializerError { + #[snafu(display(r#"Event does not contain required field. field = "{}""#, field))] + MissingField { + field: String, + }, + #[snafu(display( + r#"Event contains a value with an invalid type. field = "{}" type = "{}" expected type = "{}""#, + field, + actual_type, + expected_type + ))] + InvalidValueType { + field: String, + actual_type: String, + expected_type: String, + }, + #[snafu(display("Failed to write. error: {}", error))] + ParquetError { + error: ParquetError, + }, + IoError { + source: io::Error, + }, +} + +impl ParquetSerializerError { + fn invalid_type( + desc: &ColumnDescriptor, + value: &Value, + expected: &str, + ) -> ParquetSerializerError { + ParquetSerializerError::InvalidValueType { + field: desc.name().to_string(), + actual_type: value.kind_str().to_string(), + expected_type: expected.to_string(), + } + } +} + +impl From for ParquetSerializerError { + fn from(error: ParquetError) -> Self { + Self::ParquetError { error } + } +} + +impl From for ParquetSerializerError { + fn from(source: io::Error) -> Self { + Self::IoError { source } + } +} + +/// Config used to build a `ParquetSerializer`. +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct ParquetSerializerConfig { + /// Options for the Parquet serializer. + pub parquet: ParquetSerializerOptions, +} + +impl ParquetSerializerConfig { + /// Creates a new `ParquetSerializerConfig`. + pub const fn new(schema: String) -> Self { + Self { + parquet: ParquetSerializerOptions { schema }, + } + } + + /// Build the `ParquetSerializerConfig` from this configuration. + pub fn build(&self) -> Result { + let schema = parse_message_type(&self.parquet.schema) + .map_err(|error| format!("Failed building Parquet serializer: {}", error))?; + self.validate_logical_schema(&schema) + .map_err(|error| format!("Failed building Parquet serializer: {}", error))?; + Ok(ParquetSerializer { + schema: Arc::new(schema), + }) + } + + /// The data type of events that are accepted by `ParquetSerializer`. + pub fn input_type(&self) -> config::DataType { + config::DataType::Log + } + + /// The schema required by the serializer. + pub fn schema_requirement(&self) -> schema::Requirement { + // TODO: Convert the Parquet schema to a vector schema requirement. + // NOTE: This isn't yet doable. We don't have meanings to + // to specify for requirement. + schema::Requirement::empty() + } + + fn validate_logical_schema(&self, schema: &Type) -> Result<(), String> { + let info = schema.get_basic_info(); + match info.logical_type() { + // Validate LIST types + Some(LogicalType::List) => { + Self::not_repeated(schema, "LIST")?; + let list = Self::single_child(schema, "LIST")?; + + Self::repeated(list, "child of LIST")?; + let element = Self::single_child(list, "list of LIST")?; + + Self::not_repeated(element, "element of LIST")?; + self.validate_logical_schema(element)?; + } + // Validate MAP types + Some(LogicalType::Map) => { + Self::not_repeated(schema, "MAP")?; + let key_value = Self::single_child(schema, "MAP")?; + + Self::repeated(key_value, "child of MAP")?; + match key_value.get_fields().len() { + 1 | 2 => (), + _ => { + return Err(format!( + "Invalid MAP type. key_value of MAP type must have one or two children, found {}.", + key_value.get_fields().len() + )); + } + } + + let mut found_key = false; + for element in key_value.get_fields() { + match element.name() { + "key" => { + found_key = true; + Self::required(element, "key of MAP")?; + if !element.is_primitive() + || element.get_physical_type() != PhysicalType::BYTE_ARRAY + { + return Err( + "Invalid primitive type for key of MAP type. Must be binary." + .to_string(), + ); + } + } + _ => self.validate_logical_schema(element)?, + } + } + if !found_key { + return Err( + "Invalid MAP type. key_value of MAP type must have a child named \"key\"." + .to_string(), + ); + } + } + _ if schema.is_group() => { + for field in schema.get_fields() { + self.validate_logical_schema(field)?; + } + } + _ => (), + } + Ok(()) + } + + fn not_repeated(ty: &Type, kind: &str) -> Result<(), String> { + let info = ty.get_basic_info(); + if info.has_repetition() && info.repetition() == Repetition::REPEATED { + Err(format!( + "Invalid repetition for {kind} type. repetition = {:?}", + info.repetition() + )) + } else { + Ok(()) + } + } + + fn repeated(ty: &Type, kind: &str) -> Result<(), String> { + let info = ty.get_basic_info(); + if !info.has_repetition() || info.repetition() != Repetition::REPEATED { + Err(format!( + "Invalid repetition for {kind} type. repetition = {:?}", + if info.has_repetition() { + info.repetition() + } else { + Repetition::REQUIRED + } + )) + } else { + Ok(()) + } + } + + fn required(ty: &Type, kind: &str) -> Result<(), String> { + let info = ty.get_basic_info(); + if !info.has_repetition() || info.repetition() == Repetition::REQUIRED { + Err(format!( + "Invalid repetition for {kind} type. repetition = {:?}", + info.repetition() + )) + } else { + Ok(()) + } + } + + fn single_child<'a>(ty: &'a Type, kind: &str) -> Result<&'a Type, String> { + let len = ty.get_fields().len(); + if len != 1 { + Err(format!( + "Invalid {kind} type. Expected one child, found {len}.", + )) + } else { + Ok(ty.get_fields().get(0).expect("Should have a child.")) + } + } +} + +/// Options for the Parquet serializer. +#[configurable_component] +#[derive(Clone, Debug)] +pub struct ParquetSerializerOptions { + /// The Parquet schema. + #[configurable(metadata(docs::examples = r#"message test { + required group data { + required binary name; + repeated int64 values; + } + }"#))] + pub schema: String, +} + +/// Serializer that converts `Vec` to bytes using the Apache Parquet format. +#[derive(Debug, Clone)] +pub struct ParquetSerializer { + schema: TypePtr, +} + +impl ParquetSerializer { + /// Creates a new `ParquetSerializer`. + pub const fn new(schema: TypePtr) -> Self { + Self { schema } + } +} + +impl ParquetSerializer { + fn process( + &self, + events: &[Event], + desc: &ColumnDescriptor, + extractor: impl Fn(&Value) -> Result<::T, ParquetSerializerError>, + writer: &mut ColumnWriterImpl, + ) -> Result<(), ParquetSerializerError> { + let mut column = Column::<::T, _>::new(&*self.schema, desc, extractor); + column.extract_column(events)?; + let written_values = writer.write_batch( + &column.values, + column.def_levels.as_ref().map(|vec| vec.as_slice()), + column.rep_levels.as_ref().map(|vec| vec.as_slice()), + )?; + + assert_eq!(written_values, column.values.len()); + Ok(()) + } +} + +impl Encoder> for ParquetSerializer { + type Error = vector_common::Error; + + /// Builds columns from events and writes them to the writer. + /// + /// Expects that all events satisfy the schema, else whole batch can fail. + fn encode(&mut self, events: Vec, buffer: &mut BytesMut) -> Result<(), Self::Error> { + // Encode events + let props = Arc::new(WriterProperties::builder().build()); + let mut parquet_writer = + SerializedFileWriter::new(buffer.writer(), self.schema.clone(), props)?; + + let mut row_group_writer = parquet_writer.next_row_group()?; + while let Some(mut column_writer) = row_group_writer.next_column()? { + match column_writer.untyped() { + BoolColumnWriter(writer) => { + let desc = writer.get_descriptor().clone(); + self.process( + &events, + &desc, + |value| match value { + Value::Boolean(value) => Ok(*value), + _ => Err(ParquetSerializerError::invalid_type( + &desc, value, "boolean", + )), + }, + writer, + )? + } + Int64ColumnWriter(writer) => { + let desc = writer.get_descriptor().clone(); + self.process( + &events, + &desc, + |value| match value { + Value::Integer(value) => Ok(*value), + _ => Err(ParquetSerializerError::invalid_type( + &desc, value, "integer", + )), + }, + writer, + )? + } + DoubleColumnWriter(writer) => { + let desc = writer.get_descriptor().clone(); + self.process( + &events, + &desc, + |value| match value { + Value::Float(value) => Ok(value.into_inner()), + _ => Err(ParquetSerializerError::invalid_type(&desc, value, "float")), + }, + writer, + )? + } + ByteArrayColumnWriter(writer) => { + let desc = writer.get_descriptor().clone(); + self.process( + &events, + &desc, + |value| match value { + Value::Bytes(value) => Ok(value.clone().into()), + _ => Err(ParquetSerializerError::invalid_type(&desc, value, "string")), + }, + writer, + )? + } + FixedLenByteArrayColumnWriter(_) => { + panic!("Fixed len byte array is not supported."); + } + Int32ColumnWriter(_) => panic!("Int32 is not supported."), + Int96ColumnWriter(_) => panic!("Int96 is not supported."), + FloatColumnWriter(_) => panic!("Float32 is not supported."), + } + column_writer.close()?; + } + + row_group_writer.close()?; + parquet_writer.close()?; + + Ok(()) + } +} + +struct Column<'a, T, F: Fn(&Value) -> Result> { + levels: Vec<&'a Type>, + extract: F, + values: Vec, + // If present encodes definition level. From 0 to column.max_def_level() inclusive. + // With any value bellow max encoding null on that level. + // One thing to keep in mind, if a column is required on some "level" then that level is not counted here. + // This is needed when values are optional. + // In case of null, that value is skipped in values. + def_levels: Option>, + // If present encodes repetition level. + // From 0 to column.max_rep_level() inclusive. With 0 starting a new record and any value bellow max encoding + // starting new list at that level. With max level just adding element to leaf list. + // This is needed when values are repeated. Where that repetition can have multiple nested levels. + rep_levels: Option>, +} + +impl<'a, T, F: Fn(&Value) -> Result> Column<'a, T, F> { + fn new(schema: &'a Type, column: &'a ColumnDescriptor, extract: F) -> Self { + let mut levels = vec![schema]; + for part in column.path().parts() { + match &levels[levels.len() - 1] { + Type::GroupType { fields, .. } => { + let field = fields + .iter() + .find(|field| field.name() == part) + .expect("Field not found in schema."); + levels.push(field); + } + Type::PrimitiveType { .. } => unreachable!(), + } + } + + let def_levels = if levels.iter().any(|ty| ty.is_optional()) { + Some(Vec::new()) + } else { + None + }; + + let rep_levels = if levels.iter().any(|ty| { + let info = ty.get_basic_info(); + info.has_repetition() && info.repetition() == Repetition::REPEATED + }) { + Some(Vec::new()) + } else { + None + }; + + Self { + levels, + extract, + values: Vec::new(), + def_levels, + rep_levels, + } + } + + fn extract_column(&mut self, events: &[Event]) -> Result<(), ParquetSerializerError> { + for event in events { + let res = match event { + Event::Log(log) => { + self.extract_value(log.value(), Level::root()) + } + Event::Trace(trace) => { + self.extract_value(trace.value(), Level::root()) + } + Event::Metric(_) => { + panic!("Metrics are not supported."); + } + }; + res.inspect_err(|error| { + // event to json string + match serde_json::to_string(&event) { + Ok(event) => error!( + error = ?error, + event = event, + ), + Err(e) => error!( + error = ?error, + event = ?event, + serde_error = %e, + ), + } + })?; + } + Ok(()) + } + + /// Will push at least one value, or error. + fn extract_value(&mut self, value: &Value, level: Level) -> Result<(), ParquetSerializerError> { + if let Some(part) = self.levels.get(level.level) { + let sub = match value { + Value::Object(object) => object.get(part.name()), + Value::Null => None, + // Invalid type, error + value => { + return Err(ParquetSerializerError::InvalidValueType { + field: self.path(level), + actual_type: value.kind_str().to_string(), + expected_type: "object".to_string(), + }); + } + }; + + self.process_value(sub, level) + } else if matches!(value, Value::Null) { + self.push_value(None, level.undefine()); + Ok(()) + } else { + let value = (self.extract)(value)?; + self.push_value(Some(value), level); + Ok(()) + } + } + + /// Will push at least one value, or error. + fn process_value( + &mut self, + value: Option<&Value>, + level: Level, + ) -> Result<(), ParquetSerializerError> { + let part = self + .levels + .get(level.level) + .expect("We should have checked this before hand."); + match value { + Some(Value::Null) | None if part.is_optional() => { + self.push_value(None, level); + Ok(()) + } + // Illegal null, error + Some(Value::Null) | None => Err(ParquetSerializerError::MissingField { + field: self.path(level), + }), + Some(value) => { + let info = part.get_basic_info(); + match info.logical_type() { + Some(LogicalType::List) => { + if let Value::Array(array) = value { + let list_level = level.descend(info); + let element_level = list_level.descend_repeated(); + + if array.is_empty() { + self.push_value(None, list_level); + } else { + let mut now = element_level; + for element in array { + self.process_value(Some(element), now)?; + now = now.next(); + } + } + + Ok(()) + } else { + return Err(ParquetSerializerError::InvalidValueType { + field: self.path(level), + actual_type: value.kind_str().to_string(), + expected_type: "array".to_string(), + }); + } + } + Some(LogicalType::Map) => { + if let Value::Object(object) = value { + let key_value = level.descend(info); + let element_level = key_value.descend_repeated(); + + if self + .levels + .get(element_level.level) + .expect("This must be valid MAP") + .name() + == "key" + { + // Key field + if object.is_empty() { + self.push_value(None, key_value); + } else { + let mut now = element_level; + for key in object.keys() { + let value = (self.extract)(&Value::from(key.as_str()))?; + self.push_value(Some(value), now.descend_required()); + now = now.next(); + } + } + } else { + // Value field + if object.is_empty() { + self.push_value(None, key_value); + } else { + let mut now = element_level; + for value in object.values() { + self.process_value(Some(value), now)?; + now = now.next(); + } + } + } + + Ok(()) + } else { + return Err(ParquetSerializerError::InvalidValueType { + field: self.path(level), + actual_type: value.kind_str().to_string(), + expected_type: "object".to_string(), + }); + } + } + _ => self.extract_flat(value, level.descend(info)), + } + } + } + } + + /// Will push at least one value, or error. + fn extract_flat(&mut self, value: &Value, level: Level) -> Result<(), ParquetSerializerError> { + match value { + Value::Array(array) if level.repeated => { + if array.is_empty() { + self.push_value(None, level.undefine()); + } else { + let mut now = level; + for value in array { + self.extract_value(value, now)?; + now = now.next(); + } + } + + Ok(()) + } + _ => self.extract_value(value, level), + } + } + + fn push_value(&mut self, value: Option, level: Level) { + if let Some(rep_levels) = &mut self.rep_levels { + rep_levels.push(level.start_rep_level); + } + if let Some(def_levels) = &mut self.def_levels { + def_levels.push(level.def_level); + } + if let Some(value) = value { + self.values.push(value); + } + } + + fn path(&self, level: Level) -> String { + let mut path = String::new(); + for level in &self.levels[1..level.level] { + path.push_str(level.name()); + path.push('.'); + } + path.push_str(self.levels[level.level].name()); + path + } +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +struct Level { + start_rep_level: i16, + rep_level: i16, + def_level: i16, + level: usize, + // If this level can be null + optional: bool, + // If this level is repeated + repeated: bool, +} + +impl Level { + fn root() -> Self { + Self { + start_rep_level: 0, + rep_level: 0, + def_level: 0, + level: 1, + optional: false, + repeated: false, + } + } + + fn next(self) -> Self { + assert!(self.repeated); + Self { + start_rep_level: self.rep_level, + ..self + } + } + + /// Descend implies that the level is defined. + fn descend(self, info: &BasicTypeInfo) -> Self { + if info.has_repetition() { + match info.repetition() { + Repetition::OPTIONAL => self.descend_optional(), + Repetition::REPEATED => self.descend_repeated(), + Repetition::REQUIRED => self.descend_required(), + } + } else { + self.descend_required() + } + } + + fn descend_required(self) -> Self { + Self { + level: self.level + 1, + repeated: false, + optional: false, + ..self + } + } + + fn descend_optional(self) -> Self { + Self { + def_level: self.def_level + 1, + level: self.level + 1, + repeated: false, + optional: true, + ..self + } + } + + fn descend_repeated(self) -> Self { + Self { + rep_level: self.rep_level + 1, + def_level: self.def_level + 1, + level: self.level + 1, + repeated: true, + optional: true, + ..self + } + } + + /// Undefine by one level + fn undefine(self) -> Self { + Self { + def_level: self.def_level - 1, + ..self + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bytes::Bytes; + use parquet::{ + column::reader::{ColumnReader, ColumnReaderImpl}, + file::reader::*, + file::serialized_reader::SerializedFileReader, + schema::parser::parse_message_type, + }; + use similar_asserts::assert_eq; + use std::panic; + use std::{collections::HashSet, sync::Arc}; + use vector_core::event::LogEvent; + use vrl::value::btreemap; + + macro_rules! log_event { + ($($key:expr => $value:expr),* $(,)?) => { + #[allow(unused_variables)] + { + let mut event = Event::Log(LogEvent::default()); + let log = event.as_mut_log(); + $( + log.insert($key, $value); + )* + event + } + }; + } + + fn assert_column( + count: usize, + expect_values: &[::T], + expect_rep_levels: Option<&[i16]>, + expect_def_levels: Option<&[i16]>, + mut column_reader: ColumnReaderImpl, + ) where + ::T: Default, + { + let mut values = Vec::new(); + values.resize(count, ::T::default()); + let mut def_levels = Vec::new(); + def_levels.resize(count, 0); + let mut rep_levels = Vec::new(); + rep_levels.resize(count, 0); + let (read, level) = column_reader + .read_batch( + count, + Some(def_levels.as_mut_slice()).filter(|_| expect_def_levels.is_some()), + Some(rep_levels.as_mut_slice()).filter(|_| expect_rep_levels.is_some()), + &mut values, + ) + .unwrap(); + + assert_eq!(level, count); + assert_eq!(&values[..read], expect_values); + if expect_rep_levels.is_some() { + assert_eq!(rep_levels, expect_rep_levels.unwrap()); + } + if expect_def_levels.is_some() { + assert_eq!(def_levels, expect_def_levels.unwrap()); + } + } + + fn validate( + schema: &str, + events: Vec, + num_columns: usize, + validate: impl Fn(usize, &str, &dyn RowGroupReader), + ) { + let schema = Arc::new(parse_message_type(schema).unwrap()); + let mut encoder = ParquetSerializer::new(schema); + + let mut buffer = BytesMut::new(); + encoder.encode(events, &mut buffer).unwrap(); + + let reader = SerializedFileReader::new(buffer.freeze()).unwrap(); + + let parquet_metadata = reader.metadata(); + assert_eq!(parquet_metadata.num_row_groups(), 1); + + let row_group_reader = reader.get_row_group(0).unwrap(); + assert_eq!(row_group_reader.num_columns(), num_columns); + + let metadata = row_group_reader.metadata(); + let mut visited = HashSet::new(); + for (i, column) in metadata.columns().iter().enumerate() { + let path = column.column_path().string(); + assert!(visited.insert(path.clone())); + validate(i, &path, row_group_reader.as_ref()); + } + + assert_eq!(visited.len(), num_columns); + } + + #[test] + fn test_serialize() { + let message_type = r#" + message test { + required group a { + required boolean b; + optional int64 c; + } + optional group d { + optional int64 e; + } + required group f { + repeated int64 g; + } + required binary h; + repeated group i { + required int64 j; + repeated double k; + } + } + "#; + + let events = vec![ + log_event! { + "a.b" => true, + "a.c" => 2, + "d.e" => 3, + "f.g" => vec![4, 5], + "h" => "hello", + "i" => vec![btreemap! { + "j" => 6, + "k" => vec![7.0, 8.0] + }] + }, + log_event! { + "a.b" => false, + "f" => Value::Object(Default::default()), + "h" => "world", + "i" => vec![ + btreemap! { + "j" => 9, + "k" => vec![10.0] + }, btreemap! { + "j" => 11, + }] + }, + ]; + + validate( + message_type, + events, + 7, + |i, path, row_group_reader| match path { + "a.b" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::BoolColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column(2, &[true, false], None, None, reader); + } + "a.c" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::Int64ColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column(2, &[2], None, Some(&[1, 0]), reader); + } + "d.e" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::Int64ColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column(2, &[3], None, Some(&[2, 0]), reader); + } + "f.g" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::Int64ColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column(3, &[4, 5], Some(&[0, 1, 0]), Some(&[1, 1, 0]), reader); + } + "h" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::ByteArrayColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column( + 2, + &[Bytes::from("hello").into(), Bytes::from("world").into()], + None, + None, + reader, + ); + } + "i.j" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::Int64ColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column(3, &[6, 9, 11], Some(&[0, 0, 1]), Some(&[1, 1, 1]), reader); + } + "i.k" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::DoubleColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column( + 4, + &[7.0, 8.0, 10.0], + Some(&[0, 2, 0, 1]), + Some(&[2, 2, 2, 1]), + reader, + ); + } + _ => panic!("Unexpected column"), + }, + ); + } + + #[test] + fn test_value_null() { + let message_type = r#" + message test { + optional group geo{ + optional binary city_name (UTF8); + } + } + "#; + + let events = vec![ + log_event! { + "geo.city_name" => "hello", + }, + log_event! { + "geo.city_name" => Value::Null, + }, + ]; + + validate( + message_type, + events, + 1, + |i, path, row_group_reader| match path { + "geo.city_name" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::ByteArrayColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column( + 2, + &[Bytes::from("hello").into()], + None, + Some(&[2, 1]), + reader, + ); + } + _ => panic!("Unexpected column"), + }, + ); + } + + #[test] + fn test_value_null_stack_optional() { + let message_type = r#" + message test { + optional group a{ + optional group b{ + optional boolean c; + } + } + } + "#; + + let events = vec![ + log_event! {}, + log_event! {"a" => Value::Null}, + log_event! {"a.b" => Value::Null}, + log_event! {"a.b.c" => Value::Null}, + log_event! {"a.b.c" => Value::Boolean(true)}, + ]; + + validate( + message_type, + events, + 1, + |i, path, row_group_reader| match path { + "a.b.c" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::BoolColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column(5, &[true], None, Some(&[0, 0, 1, 2, 3]), reader); + } + _ => panic!("Unexpected column"), + }, + ); + } + + #[test] + fn test_value_null_repeated_optional() { + let message_type = r#" + message test { + repeated group a{ + optional boolean b; + } + } + "#; + + let events = vec![ + log_event! {}, + log_event! {"a" => Value::Null}, + log_event! {"a.b" => Value::Null}, + log_event! {"a.b" => Value::Boolean(false)}, + log_event! {"a" => vec![ + btreemap! { "b" => Value::Null }, + btreemap! { "b" => Value::Boolean(true) } + ]}, + ]; + validate( + message_type, + events, + 1, + |i, path, row_group_reader| match path { + "a.b" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::BoolColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column( + 6, + &[false, true], + Some(&[0, 0, 0, 0, 0, 1]), + Some(&[0, 0, 1, 2, 1, 2]), + reader, + ); + } + _ => panic!("Unexpected column"), + }, + ); + } + + #[test] + fn test_value_null_repeated() { + let message_type = r#" + message test { + repeated boolean a; + } + "#; + + let events = vec![ + log_event! {}, + log_event! {"a" => Value::Null}, + log_event! {"a" => Value::Boolean(false)}, + log_event! {"a" => vec![ + Value::Null, + Value::Boolean(true), + Value::Null, + ]}, + ]; + + validate( + message_type, + events, + 1, + |i, path, row_group_reader| match path { + "a" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::BoolColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column( + 6, + &[false, true], + Some(&[0, 0, 0, 0, 1, 1]), + Some(&[0, 0, 1, 0, 1, 0]), + reader, + ); + } + _ => panic!("Unexpected column"), + }, + ); + } + + #[test] + fn test_value_empty_repeated() { + let message_type = r#" + message test { + repeated boolean a; + } + "#; + + let events = vec![log_event! {"a" => Vec::::new()}]; + + validate( + message_type, + events, + 1, + |i, path, row_group_reader| match path { + "a" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::BoolColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column(1, &[], Some(&[0]), Some(&[0]), reader); + } + _ => panic!("Unexpected column"), + }, + ); + } + + #[test] + fn test_repeated() { + let message_type = r#" + message test { + repeated group answer { + optional binary name (UTF8); + optional INT64 ttl; + } + } + "#; + + let events = vec![log_event! { + "answer" => vec![ + btreemap! { + "name" => "test1", + "ttl" => 0, + }, btreemap! { + "name" => "test2", + "ttl" => 3600, + }] + }]; + + validate( + message_type, + events, + 2, + |i, path, row_group_reader| match path { + "answer.name" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::ByteArrayColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column( + 2, + &[Bytes::from("test1").into(), Bytes::from("test2").into()], + Some(&[0, 1]), + Some(&[2, 2]), + reader, + ); + } + "answer.ttl" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::Int64ColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column(2, &[0, 3600], Some(&[0, 1]), Some(&[2, 2]), reader); + } + _ => panic!("Unexpected column"), + }, + ); + } + + #[test] + fn test_list() { + let message_type = r#" + message test { + optional group answers (LIST){ + repeated group list { + optional boolean element; + } + } + } + "#; + + let events = vec![ + log_event! {}, + log_event! {"answers" => Value::Null}, + log_event! {"answers" => Vec::::new()}, + log_event! {"answers" => vec![ + Value::Null, + Value::Boolean(true), + Value::Null, + ]}, + ]; + + validate( + message_type, + events, + 1, + |i, path, row_group_reader| match path { + "answers.list.element" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::BoolColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column( + 6, + &[true], + Some(&[0, 0, 0, 0, 1, 1]), + Some(&[0, 0, 1, 2, 3, 2]), + reader, + ); + } + _ => panic!("Unexpected column"), + }, + ); + } + + #[test] + fn illegal_list_scheme() { + let config = ParquetSerializerConfig { + parquet: ParquetSerializerOptions { + schema: r#" + message test { + optional group answers (LIST){ + optional group list { + optional boolean element; + } + } + } + "# + .to_string(), + }, + }; + + assert!(config.build().is_err()); + } + + #[test] + fn test_map() { + let message_type = r#" + message test { + optional group answers (MAP){ + repeated group key_value { + required binary key (UTF8); + optional boolean value; + } + } + } + "#; + + let events = vec![ + log_event! {}, + log_event! {"answers" => Value::Null}, + log_event! {"answers" => btreemap!{}}, + log_event! {"answers" => btreemap!{ + "test1" => Value::Null, + "test2" => Value::Boolean(true), + "test3" => Value::Null, + }}, + ]; + + validate( + message_type, + events, + 2, + |i, path, row_group_reader| match path { + "answers.key_value.key" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::ByteArrayColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column( + 6, + &[ + Bytes::from("test1").into(), + Bytes::from("test2").into(), + Bytes::from("test3").into(), + ], + Some(&[0, 0, 0, 0, 1, 1]), + Some(&[0, 0, 1, 2, 2, 2]), + reader, + ); + } + "answers.key_value.value" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::BoolColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column( + 6, + &[true], + Some(&[0, 0, 0, 0, 1, 1]), + Some(&[0, 0, 1, 2, 3, 2]), + reader, + ); + } + _ => panic!("Unexpected column"), + }, + ); + } + + #[test] + fn illegal_map_scheme() { + let config = ParquetSerializerConfig { + parquet: ParquetSerializerOptions { + schema: r#" + message test { + optional group answers (MAP){ + repeated group key_value { + required binary str (UTF8); + optional boolean value; + } + } + } + "# + .to_string(), + }, + }; + + assert!(config.build().is_err()); + } +} diff --git a/lib/codecs/src/encoding/mod.rs b/lib/codecs/src/encoding/mod.rs index 25f88406fbae9..d07b661d04db4 100644 --- a/lib/codecs/src/encoding/mod.rs +++ b/lib/codecs/src/encoding/mod.rs @@ -21,9 +21,9 @@ pub use format::{ CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig, JsonSerializer, JsonSerializerConfig, JsonSerializerOptions, LogfmtSerializer, LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer, - NativeSerializerConfig, ProtobufSerializer, ProtobufSerializerConfig, - ProtobufSerializerOptions, RawMessageSerializer, RawMessageSerializerConfig, TextSerializer, - TextSerializerConfig, + NativeSerializerConfig, ParquetSerializer, ParquetSerializerConfig, ParquetSerializerOptions, + ProtobufSerializer, ProtobufSerializerConfig, ProtobufSerializerOptions, RawMessageSerializer, + RawMessageSerializerConfig, TextSerializer, TextSerializerConfig, }; #[cfg(feature = "opentelemetry")] pub use format::{OtlpSerializer, OtlpSerializerConfig}; diff --git a/lib/codecs/src/encoding/serializer.rs b/lib/codecs/src/encoding/serializer.rs index 536f836ad8163..5ce8478e4c53a 100644 --- a/lib/codecs/src/encoding/serializer.rs +++ b/lib/codecs/src/encoding/serializer.rs @@ -12,12 +12,14 @@ use super::format::{OtlpSerializer, OtlpSerializerConfig}; use super::format::{SyslogSerializer, SyslogSerializerConfig}; use super::{ chunking::Chunker, + encoder::BatchSerializer, format::{ AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CefSerializer, CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig, JsonSerializer, JsonSerializerConfig, LogfmtSerializer, LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer, - NativeSerializerConfig, ProtobufSerializer, ProtobufSerializerConfig, RawMessageSerializer, + NativeSerializerConfig, ParquetSerializerConfig, ParquetSerializerOptions, + ProtobufSerializer, ProtobufSerializerConfig, RawMessageSerializer, RawMessageSerializerConfig, TextSerializer, TextSerializerConfig, }, framing::{ @@ -121,6 +123,18 @@ pub enum SerializerConfig { /// could lead to the encoding emitting empty strings for the given event. RawMessage, + /// Encodes events in [Apache Parquet format][parquet]. + /// + /// Parquet is a batched columnar format, so events are buffered and written as + /// a parquet file rather than framed one-at-a-time. Sinks that use this codec + /// must route encoding through `build_batched()` on the encoding config. + /// + /// [parquet]: https://parquet.apache.org/ + Parquet { + /// Apache Parquet-specific encoder options. + parquet: ParquetSerializerOptions, + }, + /// Plain text encoding. /// /// This encoding uses the `message` field of a log event. For metrics, it uses an @@ -265,6 +279,7 @@ impl From for SerializerConfig { impl SerializerConfig { /// Build the `Serializer` from this configuration. + /// Fails if the serializer is batched (e.g. Parquet) — use `build_batched` instead. pub fn build(&self) -> Result> { match self { SerializerConfig::Avro { avro } => Ok(Serializer::Avro( @@ -283,6 +298,9 @@ impl SerializerConfig { SerializerConfig::Otlp => { Ok(Serializer::Otlp(OtlpSerializerConfig::default().build()?)) } + SerializerConfig::Parquet { .. } => { + Err("Parquet serializer is batched; use build_batched() instead.".into()) + } SerializerConfig::Protobuf(config) => Ok(Serializer::Protobuf(config.build()?)), SerializerConfig::RawMessage => { Ok(Serializer::RawMessage(RawMessageSerializerConfig.build())) @@ -293,6 +311,19 @@ impl SerializerConfig { } } + /// Build the `BatchSerializer` from this configuration. + /// Returns `None` if the serializer is not batched. + pub fn build_batched( + &self, + ) -> Result, Box> { + match self { + SerializerConfig::Parquet { parquet } => Ok(Some(BatchSerializer::Parquet( + ParquetSerializerConfig::new(parquet.schema.clone()).build()?, + ))), + _ => Ok(None), + } + } + /// Return an appropriate default framer for the given serializer. pub fn default_stream_framing(&self) -> FramingConfig { match self { @@ -327,6 +358,7 @@ impl SerializerConfig { SerializerConfig::Gelf(_) => { FramingConfig::CharacterDelimited(CharacterDelimitedEncoderConfig::new(0)) } + SerializerConfig::Parquet { .. } => FramingConfig::Bytes, } } @@ -345,6 +377,9 @@ impl SerializerConfig { SerializerConfig::NativeJson => NativeJsonSerializerConfig.input_type(), #[cfg(feature = "opentelemetry")] SerializerConfig::Otlp => OtlpSerializerConfig::default().input_type(), + SerializerConfig::Parquet { parquet } => { + ParquetSerializerConfig::new(parquet.schema.clone()).input_type() + } SerializerConfig::Protobuf(config) => config.input_type(), SerializerConfig::RawMessage => RawMessageSerializerConfig.input_type(), SerializerConfig::Text(config) => config.input_type(), @@ -368,6 +403,9 @@ impl SerializerConfig { SerializerConfig::NativeJson => NativeJsonSerializerConfig.schema_requirement(), #[cfg(feature = "opentelemetry")] SerializerConfig::Otlp => OtlpSerializerConfig::default().schema_requirement(), + SerializerConfig::Parquet { parquet } => { + ParquetSerializerConfig::new(parquet.schema.clone()).schema_requirement() + } SerializerConfig::Protobuf(config) => config.schema_requirement(), SerializerConfig::RawMessage => RawMessageSerializerConfig.schema_requirement(), SerializerConfig::Text(config) => config.schema_requirement(), diff --git a/src/components/validation/resources/mod.rs b/src/components/validation/resources/mod.rs index 6b3fa43a1b6ba..4bf104bcdbcdb 100644 --- a/src/components/validation/resources/mod.rs +++ b/src/components/validation/resources/mod.rs @@ -235,6 +235,7 @@ fn serializer_config_to_deserializer( }, }) } + SerializerConfig::Parquet { .. } => todo!(), SerializerConfig::RawMessage | SerializerConfig::Text(_) => DeserializerConfig::Bytes, #[cfg(feature = "codecs-opentelemetry")] SerializerConfig::Otlp => todo!(), diff --git a/src/sinks/aws_s3/config.rs b/src/sinks/aws_s3/config.rs index 08ba92245d65c..217526629e55f 100644 --- a/src/sinks/aws_s3/config.rs +++ b/src/sinks/aws_s3/config.rs @@ -3,8 +3,8 @@ use tower::ServiceBuilder; use vector_lib::{ TimeZone, codecs::{ - TextSerializerConfig, - encoding::{Framer, FramingConfig}, + BatchEncoder, TextSerializerConfig, + encoding::{EncoderKind, Framer, FramingConfig}, }, configurable::configurable_component, sink::VectorSink, @@ -246,8 +246,12 @@ impl S3SinkConfig { let partitioner = S3KeyPartitioner::new(key_prefix, ssekms_key_id, None); let transformer = self.encoding.transformer(); - let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?; - let encoder = Encoder::::new(framer, serializer); + let encoder = if let Some(batch_serializer) = self.encoding.build_batched()? { + EncoderKind::Batch(BatchEncoder::new(batch_serializer)) + } else { + let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?; + EncoderKind::Framed(Box::new(Encoder::::new(framer, serializer))) + }; let request_options = S3RequestOptions { bucket: self.bucket.clone(), diff --git a/src/sinks/aws_s3/sink.rs b/src/sinks/aws_s3/sink.rs index 26d47cdb7039c..7ee8bb4e9972f 100644 --- a/src/sinks/aws_s3/sink.rs +++ b/src/sinks/aws_s3/sink.rs @@ -3,10 +3,12 @@ use std::io; use bytes::Bytes; use chrono::{FixedOffset, Utc}; use uuid::Uuid; -use vector_lib::{codecs::encoding::Framer, event::Finalizable, request_metadata::RequestMetadata}; +use vector_lib::{ + codecs::encoding::EncoderKind, event::Finalizable, request_metadata::RequestMetadata, +}; use crate::{ - codecs::{Encoder, Transformer}, + codecs::Transformer, event::Event, sinks::{ s3_common::{ @@ -28,7 +30,7 @@ pub struct S3RequestOptions { pub filename_append_uuid: bool, pub filename_extension: Option, pub api_options: S3Options, - pub encoder: (Transformer, Encoder), + pub encoder: (Transformer, EncoderKind), pub compression: Compression, pub filename_tz_offset: Option, } @@ -36,7 +38,7 @@ pub struct S3RequestOptions { impl RequestBuilder<(S3PartitionKey, Vec)> for S3RequestOptions { type Metadata = S3Metadata; type Events = Vec; - type Encoder = (Transformer, Encoder); + type Encoder = (Transformer, EncoderKind); type Payload = Bytes; type Request = S3Request; type Error = io::Error; // TODO: this is ugly. diff --git a/src/sinks/util/encoding.rs b/src/sinks/util/encoding.rs index 4cc49b00f358b..d615b2eb69ff5 100644 --- a/src/sinks/util/encoding.rs +++ b/src/sinks/util/encoding.rs @@ -99,7 +99,6 @@ impl Encoder for (Transformer, vector_lib::codecs::Encoder<()>) { } } -#[cfg(feature = "codecs-arrow")] impl Encoder> for (Transformer, vector_lib::codecs::BatchEncoder) { fn encode_input( &self, @@ -140,7 +139,6 @@ impl Encoder> for (Transformer, vector_lib::codecs::EncoderKind) { vector_lib::codecs::EncoderKind::Framed(encoder) => { (self.0.clone(), *encoder.clone()).encode_input(events, writer) } - #[cfg(feature = "codecs-arrow")] vector_lib::codecs::EncoderKind::Batch(encoder) => { (self.0.clone(), encoder.clone()).encode_input(events, writer) }