diff --git a/Cargo.lock b/Cargo.lock index 717c027460233..93840b5dd8307 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -94,6 +94,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", + "const-random", "getrandom 0.2.15", "once_cell", "serde", @@ -362,6 +363,93 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +[[package]] +name = "arrow-array" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7a27466d897d99654357a6d95dc0a26931d9e4306e60c14fc31a894edb86579" +dependencies = [ + "ahash 0.8.11", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "half", + "hashbrown 0.13.2", + "num", +] + +[[package]] +name = "arrow-buffer" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9405b78106a9d767c7b97c78a70ee1b23ee51a74f5188a821a716d9a85d1af2b" +dependencies = [ + "half", + "num", +] + +[[package]] +name = "arrow-cast" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be0ec5a79a87783dc828b7ff8f89f62880b3f553bc5f5b932a82f4a1035024b4" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "chrono", + "lexical-core", + "num", +] + +[[package]] +name = "arrow-data" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6f710d98964d2c069b8baf566130045e79e11baa105623f038a6c942f805681" +dependencies = [ + "arrow-buffer", + "arrow-schema", + "half", + "num", +] + +[[package]] +name = "arrow-ipc" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c99787cb8fabc187285da9e7182d22f2b80ecfac61ca0a42c4299e9eecdf903" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "flatbuffers", +] + +[[package]] +name = "arrow-schema" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18c41d058b2895a12f46dfafc306ee3529ad9660406be0ab8a7967d5e27c417e" + +[[package]] +name = "arrow-select" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fcbdda2772b7e712e77444f3a71f4ee517095aceb993b35de71de41c70d9b4f" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "num", +] + [[package]] name = "ascii" version = "0.9.3" @@ -1938,6 +2026,17 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3eeab4423108c5d7c744f4d234de88d18d636100093ae04caf4825134b9c3a32" +[[package]] +name = "brotli" +version = "3.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d640d25bc63c50fb1f0b545ffd80207d2e10a4c965530809b40ba3386825c391" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor 2.5.1", +] + [[package]] name = "brotli" version = "8.0.0" @@ -1946,7 +2045,17 @@ checksum = "cf19e729cdbd51af9a397fb9ef8ac8378007b797f8273cfbfdf45dcaa316167b" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", - "brotli-decompressor", + "brotli-decompressor 5.0.0", +] + +[[package]] +name = "brotli-decompressor" +version = "2.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e2e4afe60d7dd600fdd3de8d0f08c2b7ec039712e3b6137ff98b7004e82de4f" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", ] [[package]] @@ -2373,6 +2482,7 @@ dependencies = [ "memchr", "opentelemetry-proto", "ordered-float 4.6.0", + "parquet", "prost 0.12.6", "prost-reflect", "rand 0.9.2", @@ -2485,7 +2595,7 @@ version = "0.4.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "485abf41ac0c8047c07c87c72c8fb3eb5197f6e9d7ded615dfd1a00ae00a0f64" dependencies = [ - "brotli", + "brotli 8.0.0", "compression-core", "flate2", "memchr", @@ -2590,6 +2700,26 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f" +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom 0.2.15", + "once_cell", + "tiny-keccak", +] + [[package]] name = "convert_case" version = "0.4.0" @@ -3946,6 +4076,16 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "flatbuffers" +version = "23.5.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dac53e22462d78c16d64a1cd22371b54cc3fe94aa15e7886a2fa6e5d1ab8640" +dependencies = [ + "bitflags 1.3.2", + "rustc_version 0.4.1", +] + [[package]] name = "flate2" version = "1.1.2" @@ -4522,6 +4662,7 @@ checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888" dependencies = [ "cfg-if", "crunchy", + "num-traits", ] [[package]] @@ -4545,6 +4686,12 @@ dependencies = [ "ahash 0.7.7", ] +[[package]] +name = "hashbrown" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" + [[package]] name = "hashbrown" version = "0.14.5" @@ -4981,7 +5128,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.10", "tokio", "tower-service", "tracing 0.1.41", @@ -5495,6 +5642,12 @@ dependencies = [ "web-sys", ] +[[package]] +name = "integer-encoding" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" + [[package]] name = "inventory" version = "0.3.21" @@ -5991,6 +6144,70 @@ dependencies = [ "spin 0.5.2", ] +[[package]] +name = "lexical-core" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cde5de06e8d4c2faabc400238f9ae1c74d5412d03a7bd067645ccbc47070e46" +dependencies = [ + "lexical-parse-float", + "lexical-parse-integer", + "lexical-util", + "lexical-write-float", + "lexical-write-integer", +] + +[[package]] +name = "lexical-parse-float" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683b3a5ebd0130b8fb52ba0bdc718cc56815b6a097e28ae5a6997d0ad17dc05f" +dependencies = [ + "lexical-parse-integer", + "lexical-util", + "static_assertions", +] + +[[package]] +name = "lexical-parse-integer" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0994485ed0c312f6d965766754ea177d07f9c00c9b82a5ee62ed5b47945ee9" +dependencies = [ + "lexical-util", + "static_assertions", +] + +[[package]] +name = "lexical-util" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5255b9ff16ff898710eb9eb63cb39248ea8a5bb036bea8085b1a767ff6c4e3fc" +dependencies = [ + "static_assertions", +] + +[[package]] +name = "lexical-write-float" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accabaa1c4581f05a3923d1b4cfd124c329352288b7b9da09e766b0668116862" +dependencies = [ + "lexical-util", + "lexical-write-integer", + "static_assertions", +] + +[[package]] +name = "lexical-write-integer" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1b6f3d1f4422866b68192d62f77bc5c700bee84f3069f2469d7bc8c77852446" +dependencies = [ + "lexical-util", + "static_assertions", +] + [[package]] name = "libc" version = "0.2.175" @@ -6252,7 +6469,7 @@ version = "0.11.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08ab2867e3eeeca90e844d1940eab391c9dc5228783db2ed999acbc0a9ed375a" dependencies = [ - "twox-hash", + "twox-hash 2.1.2", ] [[package]] @@ -7495,6 +7712,37 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "parquet" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0a1e6fa27f09ebddba280f5966ef435f3ac4d74cfc3ffe370fd3fd59c2e004d" +dependencies = [ + "ahash 0.8.11", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-ipc", + "arrow-schema", + "arrow-select", + "base64 0.21.7", + "brotli 3.5.0", + "bytes 1.10.1", + "chrono", + "flate2", + "hashbrown 0.13.2", + "lz4", + "num", + "num-bigint", + "paste", + "seq-macro", + "snap", + "thrift", + "twox-hash 1.6.3", + "zstd 0.12.4", +] + [[package]] name = "parse-size" version = "1.1.0" @@ -8168,7 +8416,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ "heck 0.5.0", - "itertools 0.10.5", + "itertools 0.14.0", "log", "multimap", "once_cell", @@ -8214,7 +8462,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.14.0", "proc-macro2 1.0.101", "quote 1.0.40", "syn 2.0.106", @@ -9706,6 +9954,12 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" +[[package]] +name = "seq-macro" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bc711410fbe7399f390ca1c3b60ad0f53f80e95c5eb935e52268a0e2cd49acc" + [[package]] name = "serde" version = "1.0.228" @@ -10930,6 +11184,17 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "thrift" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" +dependencies = [ + "byteorder", + "integer-encoding", + "ordered-float 2.10.1", +] + [[package]] name = "tikv-jemalloc-sys" version = "0.6.0+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7" @@ -10984,6 +11249,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinystr" version = "0.7.6" @@ -11807,6 +12081,16 @@ dependencies = [ "utf-8", ] +[[package]] +name = "twox-hash" +version = "1.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" +dependencies = [ + "cfg-if", + "static_assertions", +] + [[package]] name = "twox-hash" version = "2.1.2" @@ -12659,7 +12943,7 @@ dependencies = [ "tokio-util", "tower 0.5.2", "tracing 0.1.41", - "twox-hash", + "twox-hash 2.1.2", "vector-common", "vector-core", ] diff --git a/lib/codecs/Cargo.toml b/lib/codecs/Cargo.toml index 741c95b84a895..43955adb685bd 100644 --- a/lib/codecs/Cargo.toml +++ b/lib/codecs/Cargo.toml @@ -1,9 +1,9 @@ [package] name = "codecs" -version = "0.1.0" authors = ["Vector Contributors "] edition = "2024" publish = false +version = "0.1.0" [[bin]] name = "generate-avro-fixtures" @@ -22,6 +22,7 @@ lookup = { package = "vector-lookup", path = "../vector-lookup", default-feature memchr = { version = "2", default-features = false } opentelemetry-proto = { path = "../opentelemetry-proto", optional = true } ordered-float.workspace = true +parquet = {version = "39.0.0", default-feature = false} prost.workspace = true prost-reflect.workspace = true rand.workspace = true diff --git a/lib/codecs/src/encoding/format/mod.rs b/lib/codecs/src/encoding/format/mod.rs index 9377cdca5d906..66902f5121cd8 100644 --- a/lib/codecs/src/encoding/format/mod.rs +++ b/lib/codecs/src/encoding/format/mod.rs @@ -15,11 +15,13 @@ mod native_json; #[cfg(feature = "opentelemetry")] mod otlp; mod protobuf; +mod parquet; mod raw_message; mod text; use std::fmt::Debug; +pub use self::parquet::{ParquetSerializer, ParquetSerializerConfig, ParquetSerializerOptions}; pub use avro::{AvroSerializer, AvroSerializerConfig, AvroSerializerOptions}; pub use cef::{CefSerializer, CefSerializerConfig}; use dyn_clone::DynClone; diff --git a/lib/codecs/src/encoding/format/parquet.rs b/lib/codecs/src/encoding/format/parquet.rs new file mode 100644 index 0000000000000..bf62381865b1c --- /dev/null +++ b/lib/codecs/src/encoding/format/parquet.rs @@ -0,0 +1,1317 @@ +use core::panic; +use std::{io, sync::Arc}; + +use bytes::{BufMut, BytesMut}; +use parquet::{ + basic::{LogicalType, Repetition, Type as PhysicalType}, + column::writer::{ColumnWriter::*, ColumnWriterImpl}, + data_type::DataType, + errors::ParquetError, + file::{properties::WriterProperties, writer::SerializedFileWriter}, + schema::{ + parser::parse_message_type, + types::{BasicTypeInfo, ColumnDescriptor, Type, TypePtr}, + }, +}; +use serde::{Deserialize, Serialize}; +use snafu::*; +use tokio_util::codec::Encoder; +use tracing::error; + +use vector_config::configurable_component; +use vector_core::{ + config, + event::{Event, Value}, + schema, +}; + +use crate::encoding::BuildError; + +/// Errors that can occur during Parquet serialization. +#[derive(Debug, Snafu)] +pub enum ParquetSerializerError { + #[snafu(display(r#"Event does not contain required field. field = "{}""#, field))] + MissingField { + field: String, + }, + #[snafu(display( + r#"Event contains a value with an invalid type. field = "{}" type = "{}" expected type = "{}""#, + field, + actual_type, + expected_type + ))] + InvalidValueType { + field: String, + actual_type: String, + expected_type: String, + }, + #[snafu(display("Failed to write. error: {}", error))] + ParquetError { + error: ParquetError, + }, + IoError { + source: io::Error, + }, +} + +impl ParquetSerializerError { + fn invalid_type( + desc: &ColumnDescriptor, + value: &Value, + expected: &str, + ) -> ParquetSerializerError { + ParquetSerializerError::InvalidValueType { + field: desc.name().to_string(), + actual_type: value.kind_str().to_string(), + expected_type: expected.to_string(), + } + } +} + +impl From for ParquetSerializerError { + fn from(error: ParquetError) -> Self { + Self::ParquetError { error } + } +} + +impl From for ParquetSerializerError { + fn from(source: io::Error) -> Self { + Self::IoError { source } + } +} + +/// Config used to build a `ParquetSerializer`. +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct ParquetSerializerConfig { + /// Options for the Parquet serializer. + pub parquet: ParquetSerializerOptions, +} + +impl ParquetSerializerConfig { + /// Creates a new `ParquetSerializerConfig`. + pub const fn new(schema: String) -> Self { + Self { + parquet: ParquetSerializerOptions { schema }, + } + } + + /// Build the `ParquetSerializerConfig` from this configuration. + pub fn build(&self) -> Result { + let schema = parse_message_type(&self.parquet.schema) + .map_err(|error| format!("Failed building Parquet serializer: {}", error))?; + self.validate_logical_schema(&schema) + .map_err(|error| format!("Failed building Parquet serializer: {}", error))?; + Ok(ParquetSerializer { + schema: Arc::new(schema), + }) + } + + /// The data type of events that are accepted by `ParquetSerializer`. + pub fn input_type(&self) -> config::DataType { + config::DataType::Log + } + + /// The schema required by the serializer. + pub fn schema_requirement(&self) -> schema::Requirement { + // TODO: Convert the Parquet schema to a vector schema requirement. + // NOTE: This isn't yet doable. We don't have meanings to + // to specify for requirement. + schema::Requirement::empty() + } + + fn validate_logical_schema(&self, schema: &Type) -> Result<(), String> { + let info = schema.get_basic_info(); + match info.logical_type() { + // Validate LIST types + Some(LogicalType::List) => { + Self::not_repeated(schema, "LIST")?; + let list = Self::single_child(schema, "LIST")?; + + Self::repeated(list, "child of LIST")?; + let element = Self::single_child(list, "list of LIST")?; + + Self::not_repeated(element, "element of LIST")?; + self.validate_logical_schema(element)?; + } + // Validate MAP types + Some(LogicalType::Map) => { + Self::not_repeated(schema, "MAP")?; + let key_value = Self::single_child(schema, "MAP")?; + + Self::repeated(key_value, "child of MAP")?; + match key_value.get_fields().len() { + 1 | 2 => (), + _ => { + return Err(format!( + "Invalid MAP type. key_value of MAP type must have one or two children, found {}.", + key_value.get_fields().len() + )); + } + } + + let mut found_key = false; + for element in key_value.get_fields() { + match element.name() { + "key" => { + found_key = true; + Self::required(element, "key of MAP")?; + if !element.is_primitive() + || element.get_physical_type() != PhysicalType::BYTE_ARRAY + { + return Err( + "Invalid primitive type for key of MAP type. Must be binary." + .to_string(), + ); + } + } + _ => self.validate_logical_schema(element)?, + } + } + if !found_key { + return Err( + "Invalid MAP type. key_value of MAP type must have a child named \"key\"." + .to_string(), + ); + } + } + _ if schema.is_group() => { + for field in schema.get_fields() { + self.validate_logical_schema(field)?; + } + } + _ => (), + } + Ok(()) + } + + fn not_repeated(ty: &Type, kind: &str) -> Result<(), String> { + let info = ty.get_basic_info(); + if info.has_repetition() && info.repetition() == Repetition::REPEATED { + Err(format!( + "Invalid repetition for {kind} type. repetition = {:?}", + info.repetition() + )) + } else { + Ok(()) + } + } + + fn repeated(ty: &Type, kind: &str) -> Result<(), String> { + let info = ty.get_basic_info(); + if !info.has_repetition() || info.repetition() != Repetition::REPEATED { + Err(format!( + "Invalid repetition for {kind} type. repetition = {:?}", + if info.has_repetition() { + info.repetition() + } else { + Repetition::REQUIRED + } + )) + } else { + Ok(()) + } + } + + fn required(ty: &Type, kind: &str) -> Result<(), String> { + let info = ty.get_basic_info(); + if !info.has_repetition() || info.repetition() == Repetition::REQUIRED { + Err(format!( + "Invalid repetition for {kind} type. repetition = {:?}", + info.repetition() + )) + } else { + Ok(()) + } + } + + fn single_child<'a>(ty: &'a Type, kind: &str) -> Result<&'a Type, String> { + let len = ty.get_fields().len(); + if len != 1 { + Err(format!( + "Invalid {kind} type. Expected one child, found {len}.", + )) + } else { + Ok(ty.get_fields().get(0).expect("Should have a child.")) + } + } +} + +/// Options for the Parquet serializer. +#[configurable_component] +#[derive(Clone, Debug)] +pub struct ParquetSerializerOptions { + /// The Parquet schema. + #[configurable(metadata(docs::examples = r#"message test { + required group data { + required binary name; + repeated int64 values; + } + }"#))] + pub schema: String, +} + +/// Serializer that converts `Vec` to bytes using the Apache Parquet format. +#[derive(Debug, Clone)] +pub struct ParquetSerializer { + schema: TypePtr, +} + +impl ParquetSerializer { + /// Creates a new `ParquetSerializer`. + pub const fn new(schema: TypePtr) -> Self { + Self { schema } + } +} + +impl ParquetSerializer { + fn process( + &self, + events: &[Event], + desc: &ColumnDescriptor, + extractor: impl Fn(&Value) -> Result<::T, ParquetSerializerError>, + writer: &mut ColumnWriterImpl, + ) -> Result<(), ParquetSerializerError> { + let mut column = Column::<::T, _>::new(&*self.schema, desc, extractor); + column.extract_column(events)?; + let written_values = writer.write_batch( + &column.values, + column.def_levels.as_ref().map(|vec| vec.as_slice()), + column.rep_levels.as_ref().map(|vec| vec.as_slice()), + )?; + + assert_eq!(written_values, column.values.len()); + Ok(()) + } +} + +impl Encoder> for ParquetSerializer { + type Error = vector_common::Error; + + /// Builds columns from events and writes them to the writer. + /// + /// Expects that all events satisfy the schema, else whole batch can fail. + fn encode(&mut self, events: Vec, buffer: &mut BytesMut) -> Result<(), Self::Error> { + // Encode events + let props = Arc::new(WriterProperties::builder().build()); + let mut parquet_writer = + SerializedFileWriter::new(buffer.writer(), self.schema.clone(), props)?; + + let mut row_group_writer = parquet_writer.next_row_group()?; + while let Some(mut column_writer) = row_group_writer.next_column()? { + match column_writer.untyped() { + BoolColumnWriter(writer) => { + let desc = writer.get_descriptor().clone(); + self.process( + &events, + &desc, + |value| match value { + Value::Boolean(value) => Ok(*value), + _ => Err(ParquetSerializerError::invalid_type( + &desc, value, "boolean", + )), + }, + writer, + )? + } + Int64ColumnWriter(writer) => { + let desc = writer.get_descriptor().clone(); + self.process( + &events, + &desc, + |value| match value { + Value::Integer(value) => Ok(*value), + _ => Err(ParquetSerializerError::invalid_type( + &desc, value, "integer", + )), + }, + writer, + )? + } + DoubleColumnWriter(writer) => { + let desc = writer.get_descriptor().clone(); + self.process( + &events, + &desc, + |value| match value { + Value::Float(value) => Ok(value.into_inner()), + _ => Err(ParquetSerializerError::invalid_type(&desc, value, "float")), + }, + writer, + )? + } + ByteArrayColumnWriter(writer) => { + let desc = writer.get_descriptor().clone(); + self.process( + &events, + &desc, + |value| match value { + Value::Bytes(value) => Ok(value.clone().into()), + _ => Err(ParquetSerializerError::invalid_type(&desc, value, "string")), + }, + writer, + )? + } + FixedLenByteArrayColumnWriter(_) => { + panic!("Fixed len byte array is not supported."); + } + Int32ColumnWriter(_) => panic!("Int32 is not supported."), + Int96ColumnWriter(_) => panic!("Int96 is not supported."), + FloatColumnWriter(_) => panic!("Float32 is not supported."), + } + column_writer.close()?; + } + + row_group_writer.close()?; + parquet_writer.close()?; + + Ok(()) + } +} + +struct Column<'a, T, F: Fn(&Value) -> Result> { + levels: Vec<&'a Type>, + extract: F, + values: Vec, + // If present encodes definition level. From 0 to column.max_def_level() inclusive. + // With any value bellow max encoding null on that level. + // One thing to keep in mind, if a column is required on some "level" then that level is not counted here. + // This is needed when values are optional. + // In case of null, that value is skipped in values. + def_levels: Option>, + // If present encodes repetition level. + // From 0 to column.max_rep_level() inclusive. With 0 starting a new record and any value bellow max encoding + // starting new list at that level. With max level just adding element to leaf list. + // This is needed when values are repeated. Where that repetition can have multiple nested levels. + rep_levels: Option>, +} + +impl<'a, T, F: Fn(&Value) -> Result> Column<'a, T, F> { + fn new(schema: &'a Type, column: &'a ColumnDescriptor, extract: F) -> Self { + let mut levels = vec![schema]; + for part in column.path().parts() { + match &levels[levels.len() - 1] { + Type::GroupType { fields, .. } => { + let field = fields + .iter() + .find(|field| field.name() == part) + .expect("Field not found in schema."); + levels.push(field); + } + Type::PrimitiveType { .. } => unreachable!(), + } + } + + let def_levels = if levels.iter().any(|ty| ty.is_optional()) { + Some(Vec::new()) + } else { + None + }; + + let rep_levels = if levels.iter().any(|ty| { + let info = ty.get_basic_info(); + info.has_repetition() && info.repetition() == Repetition::REPEATED + }) { + Some(Vec::new()) + } else { + None + }; + + Self { + levels, + extract, + values: Vec::new(), + def_levels, + rep_levels, + } + } + + fn extract_column(&mut self, events: &[Event]) -> Result<(), ParquetSerializerError> { + for event in events { + let res = match event { + Event::Log(log) => { + self.extract_value(log.value(), Level::root()) + } + Event::Trace(trace) => { + self.extract_value(trace.value(), Level::root()) + } + Event::Metric(_) => { + panic!("Metrics are not supported."); + } + }; + res.inspect_err(|error| { + // event to json string + match serde_json::to_string(&event) { + Ok(event) => error!( + error = ?error, + event = event, + ), + Err(e) => error!( + error = ?error, + event = ?event, + serde_error = %e, + ), + } + })?; + } + Ok(()) + } + + /// Will push at least one value, or error. + fn extract_value(&mut self, value: &Value, level: Level) -> Result<(), ParquetSerializerError> { + if let Some(part) = self.levels.get(level.level) { + let sub = match value { + Value::Object(object) => object.get(part.name()), + Value::Null => None, + // Invalid type, error + value => { + return Err(ParquetSerializerError::InvalidValueType { + field: self.path(level), + actual_type: value.kind_str().to_string(), + expected_type: "object".to_string(), + }); + } + }; + + self.process_value(sub, level) + } else if matches!(value, Value::Null) { + self.push_value(None, level.undefine()); + Ok(()) + } else { + let value = (self.extract)(value)?; + self.push_value(Some(value), level); + Ok(()) + } + } + + /// Will push at least one value, or error. + fn process_value( + &mut self, + value: Option<&Value>, + level: Level, + ) -> Result<(), ParquetSerializerError> { + let part = self + .levels + .get(level.level) + .expect("We should have checked this before hand."); + match value { + Some(Value::Null) | None if part.is_optional() => { + self.push_value(None, level); + Ok(()) + } + // Illegal null, error + Some(Value::Null) | None => Err(ParquetSerializerError::MissingField { + field: self.path(level), + }), + Some(value) => { + let info = part.get_basic_info(); + match info.logical_type() { + Some(LogicalType::List) => { + if let Value::Array(array) = value { + let list_level = level.descend(info); + let element_level = list_level.descend_repeated(); + + if array.is_empty() { + self.push_value(None, list_level); + } else { + let mut now = element_level; + for element in array { + self.process_value(Some(element), now)?; + now = now.next(); + } + } + + Ok(()) + } else { + return Err(ParquetSerializerError::InvalidValueType { + field: self.path(level), + actual_type: value.kind_str().to_string(), + expected_type: "array".to_string(), + }); + } + } + Some(LogicalType::Map) => { + if let Value::Object(object) = value { + let key_value = level.descend(info); + let element_level = key_value.descend_repeated(); + + if self + .levels + .get(element_level.level) + .expect("This must be valid MAP") + .name() + == "key" + { + // Key field + if object.is_empty() { + self.push_value(None, key_value); + } else { + let mut now = element_level; + for key in object.keys() { + let value = (self.extract)(&Value::from(key.as_str()))?; + self.push_value(Some(value), now.descend_required()); + now = now.next(); + } + } + } else { + // Value field + if object.is_empty() { + self.push_value(None, key_value); + } else { + let mut now = element_level; + for value in object.values() { + self.process_value(Some(value), now)?; + now = now.next(); + } + } + } + + Ok(()) + } else { + return Err(ParquetSerializerError::InvalidValueType { + field: self.path(level), + actual_type: value.kind_str().to_string(), + expected_type: "object".to_string(), + }); + } + } + _ => self.extract_flat(value, level.descend(info)), + } + } + } + } + + /// Will push at least one value, or error. + fn extract_flat(&mut self, value: &Value, level: Level) -> Result<(), ParquetSerializerError> { + match value { + Value::Array(array) if level.repeated => { + if array.is_empty() { + self.push_value(None, level.undefine()); + } else { + let mut now = level; + for value in array { + self.extract_value(value, now)?; + now = now.next(); + } + } + + Ok(()) + } + _ => self.extract_value(value, level), + } + } + + fn push_value(&mut self, value: Option, level: Level) { + if let Some(rep_levels) = &mut self.rep_levels { + rep_levels.push(level.start_rep_level); + } + if let Some(def_levels) = &mut self.def_levels { + def_levels.push(level.def_level); + } + if let Some(value) = value { + self.values.push(value); + } + } + + fn path(&self, level: Level) -> String { + let mut path = String::new(); + for level in &self.levels[1..level.level] { + path.push_str(level.name()); + path.push('.'); + } + path.push_str(self.levels[level.level].name()); + path + } +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +struct Level { + start_rep_level: i16, + rep_level: i16, + def_level: i16, + level: usize, + // If this level can be null + optional: bool, + // If this level is repeated + repeated: bool, +} + +impl Level { + fn root() -> Self { + Self { + start_rep_level: 0, + rep_level: 0, + def_level: 0, + level: 1, + optional: false, + repeated: false, + } + } + + fn next(self) -> Self { + assert!(self.repeated); + Self { + start_rep_level: self.rep_level, + ..self + } + } + + /// Descend implies that the level is defined. + fn descend(self, info: &BasicTypeInfo) -> Self { + if info.has_repetition() { + match info.repetition() { + Repetition::OPTIONAL => self.descend_optional(), + Repetition::REPEATED => self.descend_repeated(), + Repetition::REQUIRED => self.descend_required(), + } + } else { + self.descend_required() + } + } + + fn descend_required(self) -> Self { + Self { + level: self.level + 1, + repeated: false, + optional: false, + ..self + } + } + + fn descend_optional(self) -> Self { + Self { + def_level: self.def_level + 1, + level: self.level + 1, + repeated: false, + optional: true, + ..self + } + } + + fn descend_repeated(self) -> Self { + Self { + rep_level: self.rep_level + 1, + def_level: self.def_level + 1, + level: self.level + 1, + repeated: true, + optional: true, + ..self + } + } + + /// Undefine by one level + fn undefine(self) -> Self { + Self { + def_level: self.def_level - 1, + ..self + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bytes::Bytes; + use parquet::{ + column::reader::{ColumnReader, ColumnReaderImpl}, + file::reader::*, + file::serialized_reader::SerializedFileReader, + schema::parser::parse_message_type, + }; + use similar_asserts::assert_eq; + use std::panic; + use std::{collections::HashSet, sync::Arc}; + use vector_core::event::LogEvent; + use vrl::value::btreemap; + + macro_rules! log_event { + ($($key:expr => $value:expr),* $(,)?) => { + #[allow(unused_variables)] + { + let mut event = Event::Log(LogEvent::default()); + let log = event.as_mut_log(); + $( + log.insert($key, $value); + )* + event + } + }; + } + + fn assert_column( + count: usize, + expect_values: &[::T], + expect_rep_levels: Option<&[i16]>, + expect_def_levels: Option<&[i16]>, + mut column_reader: ColumnReaderImpl, + ) where + ::T: Default, + { + let mut values = Vec::new(); + values.resize(count, ::T::default()); + let mut def_levels = Vec::new(); + def_levels.resize(count, 0); + let mut rep_levels = Vec::new(); + rep_levels.resize(count, 0); + let (read, level) = column_reader + .read_batch( + count, + Some(def_levels.as_mut_slice()).filter(|_| expect_def_levels.is_some()), + Some(rep_levels.as_mut_slice()).filter(|_| expect_rep_levels.is_some()), + &mut values, + ) + .unwrap(); + + assert_eq!(level, count); + assert_eq!(&values[..read], expect_values); + if expect_rep_levels.is_some() { + assert_eq!(rep_levels, expect_rep_levels.unwrap()); + } + if expect_def_levels.is_some() { + assert_eq!(def_levels, expect_def_levels.unwrap()); + } + } + + fn validate( + schema: &str, + events: Vec, + num_columns: usize, + validate: impl Fn(usize, &str, &dyn RowGroupReader), + ) { + let schema = Arc::new(parse_message_type(schema).unwrap()); + let mut encoder = ParquetSerializer::new(schema); + + let mut buffer = BytesMut::new(); + encoder.encode(events, &mut buffer).unwrap(); + + let reader = SerializedFileReader::new(buffer.freeze()).unwrap(); + + let parquet_metadata = reader.metadata(); + assert_eq!(parquet_metadata.num_row_groups(), 1); + + let row_group_reader = reader.get_row_group(0).unwrap(); + assert_eq!(row_group_reader.num_columns(), num_columns); + + let metadata = row_group_reader.metadata(); + let mut visited = HashSet::new(); + for (i, column) in metadata.columns().iter().enumerate() { + let path = column.column_path().string(); + assert!(visited.insert(path.clone())); + validate(i, &path, row_group_reader.as_ref()); + } + + assert_eq!(visited.len(), num_columns); + } + + #[test] + fn test_serialize() { + let message_type = r#" + message test { + required group a { + required boolean b; + optional int64 c; + } + optional group d { + optional int64 e; + } + required group f { + repeated int64 g; + } + required binary h; + repeated group i { + required int64 j; + repeated double k; + } + } + "#; + + let events = vec![ + log_event! { + "a.b" => true, + "a.c" => 2, + "d.e" => 3, + "f.g" => vec![4, 5], + "h" => "hello", + "i" => vec![btreemap! { + "j" => 6, + "k" => vec![7.0, 8.0] + }] + }, + log_event! { + "a.b" => false, + "f" => Value::Object(Default::default()), + "h" => "world", + "i" => vec![ + btreemap! { + "j" => 9, + "k" => vec![10.0] + }, btreemap! { + "j" => 11, + }] + }, + ]; + + validate( + message_type, + events, + 7, + |i, path, row_group_reader| match path { + "a.b" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::BoolColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column(2, &[true, false], None, None, reader); + } + "a.c" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::Int64ColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column(2, &[2], None, Some(&[1, 0]), reader); + } + "d.e" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::Int64ColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column(2, &[3], None, Some(&[2, 0]), reader); + } + "f.g" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::Int64ColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column(3, &[4, 5], Some(&[0, 1, 0]), Some(&[1, 1, 0]), reader); + } + "h" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::ByteArrayColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column( + 2, + &[Bytes::from("hello").into(), Bytes::from("world").into()], + None, + None, + reader, + ); + } + "i.j" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::Int64ColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column(3, &[6, 9, 11], Some(&[0, 0, 1]), Some(&[1, 1, 1]), reader); + } + "i.k" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::DoubleColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column( + 4, + &[7.0, 8.0, 10.0], + Some(&[0, 2, 0, 1]), + Some(&[2, 2, 2, 1]), + reader, + ); + } + _ => panic!("Unexpected column"), + }, + ); + } + + #[test] + fn test_value_null() { + let message_type = r#" + message test { + optional group geo{ + optional binary city_name (UTF8); + } + } + "#; + + let events = vec![ + log_event! { + "geo.city_name" => "hello", + }, + log_event! { + "geo.city_name" => Value::Null, + }, + ]; + + validate( + message_type, + events, + 1, + |i, path, row_group_reader| match path { + "geo.city_name" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::ByteArrayColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column( + 2, + &[Bytes::from("hello").into()], + None, + Some(&[2, 1]), + reader, + ); + } + _ => panic!("Unexpected column"), + }, + ); + } + + #[test] + fn test_value_null_stack_optional() { + let message_type = r#" + message test { + optional group a{ + optional group b{ + optional boolean c; + } + } + } + "#; + + let events = vec![ + log_event! {}, + log_event! {"a" => Value::Null}, + log_event! {"a.b" => Value::Null}, + log_event! {"a.b.c" => Value::Null}, + log_event! {"a.b.c" => Value::Boolean(true)}, + ]; + + validate( + message_type, + events, + 1, + |i, path, row_group_reader| match path { + "a.b.c" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::BoolColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column(5, &[true], None, Some(&[0, 0, 1, 2, 3]), reader); + } + _ => panic!("Unexpected column"), + }, + ); + } + + #[test] + fn test_value_null_repeated_optional() { + let message_type = r#" + message test { + repeated group a{ + optional boolean b; + } + } + "#; + + let events = vec![ + log_event! {}, + log_event! {"a" => Value::Null}, + log_event! {"a.b" => Value::Null}, + log_event! {"a.b" => Value::Boolean(false)}, + log_event! {"a" => vec![ + btreemap! { "b" => Value::Null }, + btreemap! { "b" => Value::Boolean(true) } + ]}, + ]; + validate( + message_type, + events, + 1, + |i, path, row_group_reader| match path { + "a.b" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::BoolColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column( + 6, + &[false, true], + Some(&[0, 0, 0, 0, 0, 1]), + Some(&[0, 0, 1, 2, 1, 2]), + reader, + ); + } + _ => panic!("Unexpected column"), + }, + ); + } + + #[test] + fn test_value_null_repeated() { + let message_type = r#" + message test { + repeated boolean a; + } + "#; + + let events = vec![ + log_event! {}, + log_event! {"a" => Value::Null}, + log_event! {"a" => Value::Boolean(false)}, + log_event! {"a" => vec![ + Value::Null, + Value::Boolean(true), + Value::Null, + ]}, + ]; + + validate( + message_type, + events, + 1, + |i, path, row_group_reader| match path { + "a" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::BoolColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column( + 6, + &[false, true], + Some(&[0, 0, 0, 0, 1, 1]), + Some(&[0, 0, 1, 0, 1, 0]), + reader, + ); + } + _ => panic!("Unexpected column"), + }, + ); + } + + #[test] + fn test_value_empty_repeated() { + let message_type = r#" + message test { + repeated boolean a; + } + "#; + + let events = vec![log_event! {"a" => Vec::::new()}]; + + validate( + message_type, + events, + 1, + |i, path, row_group_reader| match path { + "a" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::BoolColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column(1, &[], Some(&[0]), Some(&[0]), reader); + } + _ => panic!("Unexpected column"), + }, + ); + } + + #[test] + fn test_repeated() { + let message_type = r#" + message test { + repeated group answer { + optional binary name (UTF8); + optional INT64 ttl; + } + } + "#; + + let events = vec![log_event! { + "answer" => vec![ + btreemap! { + "name" => "test1", + "ttl" => 0, + }, btreemap! { + "name" => "test2", + "ttl" => 3600, + }] + }]; + + validate( + message_type, + events, + 2, + |i, path, row_group_reader| match path { + "answer.name" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::ByteArrayColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column( + 2, + &[Bytes::from("test1").into(), Bytes::from("test2").into()], + Some(&[0, 1]), + Some(&[2, 2]), + reader, + ); + } + "answer.ttl" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::Int64ColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column(2, &[0, 3600], Some(&[0, 1]), Some(&[2, 2]), reader); + } + _ => panic!("Unexpected column"), + }, + ); + } + + #[test] + fn test_list() { + let message_type = r#" + message test { + optional group answers (LIST){ + repeated group list { + optional boolean element; + } + } + } + "#; + + let events = vec![ + log_event! {}, + log_event! {"answers" => Value::Null}, + log_event! {"answers" => Vec::::new()}, + log_event! {"answers" => vec![ + Value::Null, + Value::Boolean(true), + Value::Null, + ]}, + ]; + + validate( + message_type, + events, + 1, + |i, path, row_group_reader| match path { + "answers.list.element" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::BoolColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column( + 6, + &[true], + Some(&[0, 0, 0, 0, 1, 1]), + Some(&[0, 0, 1, 2, 3, 2]), + reader, + ); + } + _ => panic!("Unexpected column"), + }, + ); + } + + #[test] + fn illegal_list_scheme() { + let config = ParquetSerializerConfig { + parquet: ParquetSerializerOptions { + schema: r#" + message test { + optional group answers (LIST){ + optional group list { + optional boolean element; + } + } + } + "# + .to_string(), + }, + }; + + assert!(config.build().is_err()); + } + + #[test] + fn test_map() { + let message_type = r#" + message test { + optional group answers (MAP){ + repeated group key_value { + required binary key (UTF8); + optional boolean value; + } + } + } + "#; + + let events = vec![ + log_event! {}, + log_event! {"answers" => Value::Null}, + log_event! {"answers" => btreemap!{}}, + log_event! {"answers" => btreemap!{ + "test1" => Value::Null, + "test2" => Value::Boolean(true), + "test3" => Value::Null, + }}, + ]; + + validate( + message_type, + events, + 2, + |i, path, row_group_reader| match path { + "answers.key_value.key" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::ByteArrayColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column( + 6, + &[ + Bytes::from("test1").into(), + Bytes::from("test2").into(), + Bytes::from("test3").into(), + ], + Some(&[0, 0, 0, 0, 1, 1]), + Some(&[0, 0, 1, 2, 2, 2]), + reader, + ); + } + "answers.key_value.value" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::BoolColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column( + 6, + &[true], + Some(&[0, 0, 0, 0, 1, 1]), + Some(&[0, 0, 1, 2, 3, 2]), + reader, + ); + } + _ => panic!("Unexpected column"), + }, + ); + } + + #[test] + fn illegal_map_scheme() { + let config = ParquetSerializerConfig { + parquet: ParquetSerializerOptions { + schema: r#" + message test { + optional group answers (MAP){ + repeated group key_value { + required binary str (UTF8); + optional boolean value; + } + } + } + "# + .to_string(), + }, + }; + + assert!(config.build().is_err()); + } +} diff --git a/lib/codecs/src/encoding/mod.rs b/lib/codecs/src/encoding/mod.rs index 8dd2c4ddc79a5..1d5bd05b45972 100644 --- a/lib/codecs/src/encoding/mod.rs +++ b/lib/codecs/src/encoding/mod.rs @@ -11,7 +11,8 @@ pub use format::{ CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig, JsonSerializer, JsonSerializerConfig, JsonSerializerOptions, LogfmtSerializer, LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer, - NativeSerializerConfig, ProtobufSerializer, ProtobufSerializerConfig, + NativeSerializerConfig, ParquetSerializer, + ParquetSerializerConfig, ParquetSerializerOptions, ProtobufSerializer, ProtobufSerializerConfig, ProtobufSerializerOptions, RawMessageSerializer, RawMessageSerializerConfig, TextSerializer, TextSerializerConfig, }; @@ -24,7 +25,7 @@ pub use framing::{ NewlineDelimitedEncoderConfig, VarintLengthDelimitedEncoder, VarintLengthDelimitedEncoderConfig, }; -pub use serializer::{Serializer, SerializerConfig}; +pub use serializer::{BatchSerializer, Serializer, SerializerConfig}; /// An error that occurred while building an encoder. pub type BuildError = Box; @@ -53,4 +54,4 @@ impl From for Error { fn from(error: std::io::Error) -> Self { Self::FramingError(Box::new(error)) } -} +} \ No newline at end of file diff --git a/lib/codecs/src/encoding/serializer.rs b/lib/codecs/src/encoding/serializer.rs index eef088fca4b72..d2464c1ac9972 100644 --- a/lib/codecs/src/encoding/serializer.rs +++ b/lib/codecs/src/encoding/serializer.rs @@ -10,8 +10,9 @@ use super::format::{ CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig, JsonSerializer, JsonSerializerConfig, LogfmtSerializer, LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer, NativeSerializerConfig, - ProtobufSerializer, ProtobufSerializerConfig, RawMessageSerializer, RawMessageSerializerConfig, - TextSerializer, TextSerializerConfig, + ParquetSerializer, ParquetSerializerConfig, ParquetSerializerOptions, ProtobufSerializer, + ProtobufSerializerConfig, RawMessageSerializer, RawMessageSerializerConfig, TextSerializer, + TextSerializerConfig, }; #[cfg(feature = "opentelemetry")] use super::format::{OtlpSerializer, OtlpSerializerConfig}; @@ -115,6 +116,14 @@ pub enum SerializerConfig { /// could lead to the encoding emitting empty strings for the given event. RawMessage, + + /// Encodes events in [Apache Parquet format][parquet]. + /// + /// [parquet]: https://parquet.apache.org/ + Parquet { + /// Apache Parquet-specific encoder options. + parquet: ParquetSerializerOptions, + }, /// Plain text encoding. /// /// This encoding uses the `message` field of a log event. For metrics, it uses an @@ -207,6 +216,7 @@ impl From for SerializerConfig { impl SerializerConfig { /// Build the `Serializer` from this configuration. + /// Fails if serializer is batched. pub fn build(&self) -> Result> { match self { SerializerConfig::Avro { avro } => Ok(Serializer::Avro( @@ -230,6 +240,34 @@ impl SerializerConfig { Ok(Serializer::RawMessage(RawMessageSerializerConfig.build())) } SerializerConfig::Text(config) => Ok(Serializer::Text(config.build())), + SerializerConfig::Parquet { .. } => { + Err("Parquet serializer is not for single event encoding.".into()) + } + } + } + + /// Build the `BatchSerializer` from this configuration. + /// Returns `None` if the serializer is not batched. + pub fn build_batched( + &self, + ) -> Result, Box> { + match self { + SerializerConfig::Parquet { parquet } => Ok(Some(BatchSerializer::Parquet( + ParquetSerializerConfig::new(parquet.schema.clone()).build()?, + ))), + SerializerConfig::Avro { .. } + | SerializerConfig::Csv(..) + | SerializerConfig::Gelf(..) + | SerializerConfig::Json(..) + | SerializerConfig::Logfmt + | SerializerConfig::Native + | SerializerConfig::NativeJson + | SerializerConfig::RawMessage + | SerializerConfig::Text(..) + | SerializerConfig::Cef(..) + | SerializerConfig::Protobuf(..) => Ok(None), + #[cfg(feature = "opentelemetry")] + SerializerConfig::Otlp => Ok(None), } } @@ -264,7 +302,8 @@ impl SerializerConfig { | SerializerConfig::Text(_) => FramingConfig::NewlineDelimited, SerializerConfig::Gelf(_) => { FramingConfig::CharacterDelimited(CharacterDelimitedEncoderConfig::new(0)) - } + }, + SerializerConfig::Parquet { .. } => FramingConfig::Bytes, } } @@ -286,6 +325,9 @@ impl SerializerConfig { SerializerConfig::Protobuf(config) => config.input_type(), SerializerConfig::RawMessage => RawMessageSerializerConfig.input_type(), SerializerConfig::Text(config) => config.input_type(), + SerializerConfig::Parquet { parquet } => { + ParquetSerializerConfig::new(parquet.schema.clone()).input_type() + } } } @@ -307,6 +349,9 @@ impl SerializerConfig { SerializerConfig::Protobuf(config) => config.schema_requirement(), SerializerConfig::RawMessage => RawMessageSerializerConfig.schema_requirement(), SerializerConfig::Text(config) => config.schema_requirement(), + SerializerConfig::Parquet { parquet } => { + ParquetSerializerConfig::new(parquet.schema.clone()).schema_requirement() + } } } } @@ -575,3 +620,25 @@ mod tests { )); } } +/// Serialize structured batches of events as bytes. +#[derive(Debug, Clone)] +pub enum BatchSerializer { + /// Uses a `ParquetSerializer` for serialization. + Parquet(ParquetSerializer), +} + +impl From for BatchSerializer { + fn from(serializer: ParquetSerializer) -> Self { + Self::Parquet(serializer) + } +} + +impl tokio_util::codec::Encoder> for BatchSerializer { + type Error = vector_common::Error; + + fn encode(&mut self, events: Vec, buffer: &mut BytesMut) -> Result<(), Self::Error> { + match self { + BatchSerializer::Parquet(serializer) => serializer.encode(events, buffer), + } + } +} \ No newline at end of file diff --git a/src/codecs/encoding/config.rs b/src/codecs/encoding/config.rs index a04f44315047a..ccf9b0f263847 100644 --- a/src/codecs/encoding/config.rs +++ b/src/codecs/encoding/config.rs @@ -1,8 +1,8 @@ use crate::codecs::Transformer; use vector_lib::{ codecs::{ + encoding::{BatchSerializer, Framer, FramingConfig, Serializer, SerializerConfig}, CharacterDelimitedEncoder, LengthDelimitedEncoder, NewlineDelimitedEncoder, - encoding::{Framer, FramingConfig, Serializer, SerializerConfig}, }, configurable::configurable_component, }; @@ -138,6 +138,13 @@ impl EncodingConfigWithFraming { Ok((framer, serializer)) } + + /// Build `BatchSerializer` for this config. + /// None if serializer is not batched. + pub fn build_batched(&self) -> crate::Result> { + let serializer = self.encoding.config().build_batched()?; + Ok(serializer) + } } /// The way a sink processes outgoing events. diff --git a/src/components/validation/resources/mod.rs b/src/components/validation/resources/mod.rs index 67c22910e07b0..dc6e227c50252 100644 --- a/src/components/validation/resources/mod.rs +++ b/src/components/validation/resources/mod.rs @@ -237,6 +237,7 @@ fn serializer_config_to_deserializer( }) } SerializerConfig::RawMessage | SerializerConfig::Text(_) => DeserializerConfig::Bytes, + SerializerConfig::Parquet { .. } => todo!(), #[cfg(feature = "codecs-opentelemetry")] SerializerConfig::Otlp => todo!(), }; diff --git a/src/sinks/aws_s3/config.rs b/src/sinks/aws_s3/config.rs index 08ba92245d65c..b9092ce0c808b 100644 --- a/src/sinks/aws_s3/config.rs +++ b/src/sinks/aws_s3/config.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use aws_sdk_s3::Client as S3Client; use tower::ServiceBuilder; use vector_lib::{ @@ -15,6 +16,7 @@ use crate::{ aws::{AwsAuthentication, RegionOrEndpoint}, codecs::{Encoder, EncodingConfigWithFraming, SinkType}, config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext}, + event::Event, sinks::{ Healthcheck, s3_common::{ @@ -246,8 +248,14 @@ impl S3SinkConfig { let partitioner = S3KeyPartitioner::new(key_prefix, ssekms_key_id, None); let transformer = self.encoding.transformer(); - let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?; - let encoder = Encoder::::new(framer, serializer); + let encoder = if let Some(serializer) = self.encoding.build_batched()? { + Arc::new((transformer, serializer)) + as Arc> + Send + Sync> + } else { + let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?; + let encoder = Encoder::::new(framer, serializer); + Arc::new((transformer, encoder)) as _ + }; let request_options = S3RequestOptions { bucket: self.bucket.clone(), @@ -255,7 +263,7 @@ impl S3SinkConfig { filename_extension: self.filename_extension.clone(), filename_time_format: self.filename_time_format.clone(), filename_append_uuid: self.filename_append_uuid, - encoder: (transformer, encoder), + encoder, compression: self.compression, filename_tz_offset: offset, }; diff --git a/src/sinks/aws_s3/sink.rs b/src/sinks/aws_s3/sink.rs index 26d47cdb7039c..c98f5a49af777 100644 --- a/src/sinks/aws_s3/sink.rs +++ b/src/sinks/aws_s3/sink.rs @@ -1,12 +1,12 @@ -use std::io; +use std::{io, sync::Arc}; use bytes::Bytes; use chrono::{FixedOffset, Utc}; use uuid::Uuid; -use vector_lib::{codecs::encoding::Framer, event::Finalizable, request_metadata::RequestMetadata}; +use vector_lib::event::Finalizable; +use vector_lib::request_metadata::RequestMetadata; use crate::{ - codecs::{Encoder, Transformer}, event::Event, sinks::{ s3_common::{ @@ -15,8 +15,8 @@ use crate::{ service::{S3Metadata, S3Request}, }, util::{ - Compression, RequestBuilder, metadata::RequestMetadataBuilder, - request_builder::EncodeResult, + encoding::Encoder, metadata::RequestMetadataBuilder, request_builder::EncodeResult, + Compression, RequestBuilder, }, }, }; @@ -28,7 +28,7 @@ pub struct S3RequestOptions { pub filename_append_uuid: bool, pub filename_extension: Option, pub api_options: S3Options, - pub encoder: (Transformer, Encoder), + pub encoder: Arc> + Send + Sync>, pub compression: Compression, pub filename_tz_offset: Option, } @@ -36,7 +36,7 @@ pub struct S3RequestOptions { impl RequestBuilder<(S3PartitionKey, Vec)> for S3RequestOptions { type Metadata = S3Metadata; type Events = Vec; - type Encoder = (Transformer, Encoder); + type Encoder = Arc> + Send + Sync>; type Payload = Bytes; type Request = S3Request; type Error = io::Error; // TODO: this is ugly. diff --git a/src/sinks/util/encoding.rs b/src/sinks/util/encoding.rs index bb5a938ec017f..f5c7f0b3376be 100644 --- a/src/sinks/util/encoding.rs +++ b/src/sinks/util/encoding.rs @@ -1,4 +1,4 @@ -use std::io; +use std::{io, sync::Arc}; use bytes::BytesMut; use itertools::{Itertools, Position}; @@ -8,7 +8,11 @@ use vector_lib::{ request_metadata::GroupedCountByteSize, }; -use crate::{codecs::Transformer, event::Event, internal_events::EncoderWriteError}; +use crate::{ + codecs::Transformer, + event::Event, + internal_events::{EncoderSerializeError, EncoderWriteError}, +}; pub trait Encoder { /// Encodes the input into the provided writer. @@ -97,6 +101,44 @@ impl Encoder for (Transformer, crate::codecs::Encoder<()>) { } } +impl + ?Sized> Encoder for Arc { + fn encode_input( + &self, + input: T, + writer: &mut dyn io::Write + ) -> io::Result<(usize, GroupedCountByteSize)> { + (**self).encode_input(input, writer) + } +} + +impl Encoder> for (Transformer, vector_lib::codecs::encoding::BatchSerializer) { + fn encode_input( + &self, + mut events: Vec, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)> { + let mut encoder = self.1.clone(); + let n_events_pending = events.len(); + + let mut byte_size = telemetry().create_request_count_byte_size(); + for event in &mut events { + self.0.transform(event); + byte_size.add_event(event, event.estimated_json_encoded_size_of()); + } + + let mut bytes = BytesMut::new(); + encoder.encode(events, &mut bytes).map_err(|error| { + let error: crate::Error = error.into(); + emit!(EncoderSerializeError { error: &error }); + io::Error::new(io::ErrorKind::InvalidData, error) + })?; + + write_all(writer, n_events_pending, &bytes)?; + + Ok((bytes.len(), byte_size)) + } +} + /// Write the buffer to the writer. If the operation fails, emit an internal event which complies with the /// instrumentation spec- as this necessitates both an Error and EventsDropped event. ///