diff --git a/Cargo.lock b/Cargo.lock index 97f5a18f61c1d..de3eaa71be47e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1872,7 +1872,7 @@ dependencies = [ "bitflags 2.10.0", "cexpr", "clang-sys", - "itertools 0.13.0", + "itertools 0.12.1", "proc-macro2 1.0.103", "quote 1.0.42", "regex", @@ -3451,7 +3451,7 @@ dependencies = [ "libc", "option-ext", "redox_users 0.5.2", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -3853,7 +3853,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -5347,7 +5347,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.6.1", + "socket2 0.5.10", "tokio", "tower-service", "tracing 0.1.41", @@ -5380,7 +5380,7 @@ dependencies = [ "js-sys", "log", "wasm-bindgen", - "windows-core 0.62.2", + "windows-core", ] [[package]] @@ -5720,7 +5720,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi 0.5.2", "libc", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -6837,18 +6837,6 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" -[[package]] -name = "multimap" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" - -[[package]] -name = "murmur3" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9252111cf132ba0929b6f8e030cac2a24b507f3a4d6db6fb2896f27b354c714b" - [[package]] name = "native-tls" version = "0.2.14" @@ -7135,7 +7123,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -7321,16 +7309,16 @@ checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" [[package]] name = "oauth2" -version = "5.0.0" +version = "4.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51e219e79014df21a225b1860a479e2dcd7cbd9130f4defd4bd0e191ea31d67d" +checksum = "c38841cdd844847e3e7c8d29cef9dcfed8877f8f56f9071f77843ecf3baf937f" dependencies = [ - "base64 0.22.1", + "base64 0.13.1", "chrono", "getrandom 0.2.16", - "http 1.3.1", + "http 0.2.12", "rand 0.8.5", - "reqwest 0.12.24", + "reqwest 0.11.27", "serde", "serde_json", "serde_path_to_error", @@ -7481,16 +7469,16 @@ dependencies = [ [[package]] name = "openidconnect" -version = "4.0.1" +version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d8c6709ba2ea764bbed26bce1adf3c10517113ddea6f2d4196e4851757ef2b2" +checksum = "f47e80a9cfae4462dd29c41e987edd228971d6565553fbc14b8a11e666d91590" dependencies = [ - "base64 0.21.7", + "base64 0.13.1", "chrono", "dyn-clone", "ed25519-dalek", "hmac", - "http 1.3.1", + "http 0.2.12", "itertools 0.10.5", "log", "oauth2", @@ -7500,6 +7488,7 @@ dependencies = [ "rsa", "serde", "serde-value", + "serde_derive", "serde_json", "serde_path_to_error", "serde_plain", @@ -7613,7 +7602,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d8fae84b431384b68627d0f9b3b1245fcf9f46f6c0e3dc902e9dce64edd1967" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.45.0", ] [[package]] @@ -8382,7 +8371,7 @@ dependencies = [ "itertools 0.10.5", "lazy_static", "log", - "multimap 0.8.3", + "multimap", "petgraph 0.6.5", "prettyplease 0.1.25", "prost 0.11.9", @@ -8400,10 +8389,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" dependencies = [ "bytes 1.10.1", - "heck 0.5.0", + "heck 0.4.1", "itertools 0.12.1", "log", - "multimap 0.10.1", + "multimap", "once_cell", "petgraph 0.6.5", "prettyplease 0.2.37", @@ -8420,12 +8409,12 @@ version = "0.13.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ - "heck 0.5.0", - "itertools 0.14.0", + "heck 0.4.1", + "itertools 0.12.1", "log", - "multimap 0.10.1", + "multimap", "once_cell", - "petgraph 0.7.1", + "petgraph 0.6.5", "prettyplease 0.2.37", "prost 0.13.5", "prost-types 0.13.5", @@ -8467,7 +8456,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.14.0", + "itertools 0.12.1", "proc-macro2 1.0.103", "quote 1.0.42", "syn 2.0.110", @@ -8561,9 +8550,9 @@ dependencies = [ [[package]] name = "pulsar" -version = "6.5.0" +version = "6.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1235e80b398545bcc4efdad05079964a7596033de2fd9893cc9baa24b56bfaf9" +checksum = "6cee616af00383c461f9ceb0067d15dee68e7d313ae47dbd7f8543236aed7ee9" dependencies = [ "async-channel 2.5.0", "async-trait", @@ -8575,7 +8564,6 @@ dependencies = [ "futures 0.3.31", "log", "lz4", - "murmur3", "native-tls", "nom 7.1.3", "oauth2", @@ -8694,7 +8682,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls 0.23.35", - "socket2 0.6.1", + "socket2 0.5.10", "thiserror 2.0.17", "tokio", "tracing 0.1.41", @@ -8731,9 +8719,9 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.6.1", + "socket2 0.5.10", "tracing 0.1.41", - "windows-sys 0.60.2", + "windows-sys 0.59.0", ] [[package]] @@ -9183,6 +9171,7 @@ dependencies = [ "http 0.2.12", "http-body 0.4.6", "hyper 0.14.32", + "hyper-rustls 0.24.2", "hyper-tls 0.5.0", "ipnet", "js-sys", @@ -9192,6 +9181,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", + "rustls 0.21.12", "rustls-pemfile 1.0.4", "serde", "serde_json", @@ -9200,11 +9190,13 @@ dependencies = [ "system-configuration", "tokio", "tokio-native-tls", + "tokio-rustls 0.24.1", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", + "webpki-roots 0.25.4", "winreg", ] @@ -9567,7 +9559,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.11.0", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -10479,7 +10471,7 @@ version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1c97747dbf44bb1ca44a561ece23508e99cb592e862f22222dcf42f51d1e451" dependencies = [ - "heck 0.5.0", + "heck 0.4.1", "proc-macro2 1.0.103", "quote 1.0.42", "syn 2.0.110", @@ -11070,7 +11062,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix 1.1.2", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -11090,7 +11082,7 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2111ef44dae28680ae9752bb89409e7310ca33a8c621ebe7b106cf5c928b3ac0" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -13479,7 +13471,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.48.0", ] [[package]] @@ -13495,7 +13487,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9babd3a767a4c1aef6900409f85f5d53ce2544ccdfaa86dad48c91782c6d6893" dependencies = [ "windows-collections", - "windows-core 0.61.2", + "windows-core", "windows-future", "windows-link 0.1.3", "windows-numerics", @@ -13507,7 +13499,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3beeceb5e5cfd9eb1d76b381630e82c4241ccd0d27f1a39ed41b2760b255c5e8" dependencies = [ - "windows-core 0.61.2", + "windows-core", ] [[package]] @@ -13519,21 +13511,8 @@ dependencies = [ "windows-implement", "windows-interface", "windows-link 0.1.3", - "windows-result 0.3.4", - "windows-strings 0.4.2", -] - -[[package]] -name = "windows-core" -version = "0.62.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" -dependencies = [ - "windows-implement", - "windows-interface", - "windows-link 0.2.1", - "windows-result 0.4.1", - "windows-strings 0.5.1", + "windows-result", + "windows-strings", ] [[package]] @@ -13542,7 +13521,7 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e" dependencies = [ - "windows-core 0.61.2", + "windows-core", "windows-link 0.1.3", "windows-threading", ] @@ -13587,7 +13566,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1" dependencies = [ - "windows-core 0.61.2", + "windows-core", "windows-link 0.1.3", ] @@ -13600,15 +13579,6 @@ dependencies = [ "windows-link 0.1.3", ] -[[package]] -name = "windows-result" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" -dependencies = [ - "windows-link 0.2.1", -] - [[package]] name = "windows-service" version = "0.8.0" @@ -13629,15 +13599,6 @@ dependencies = [ "windows-link 0.1.3", ] -[[package]] -name = "windows-strings" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091" -dependencies = [ - "windows-link 0.2.1", -] - [[package]] name = "windows-sys" version = "0.45.0" diff --git a/lib/codecs/src/encoding/format/parquet.rs b/lib/codecs/src/encoding/format/parquet.rs index 9b5fdf5f2174b..bf62381865b1c 100644 --- a/lib/codecs/src/encoding/format/parquet.rs +++ b/lib/codecs/src/encoding/format/parquet.rs @@ -299,7 +299,7 @@ impl Encoder> for ParquetSerializer { let mut row_group_writer = parquet_writer.next_row_group()?; while let Some(mut column_writer) = row_group_writer.next_column()? { match column_writer.untyped() { - BoolColumnWriter(ref mut writer) => { + BoolColumnWriter(writer) => { let desc = writer.get_descriptor().clone(); self.process( &events, diff --git a/lib/codecs/src/encoding/mod.rs b/lib/codecs/src/encoding/mod.rs index ed1eb2be18f78..1d5bd05b45972 100644 --- a/lib/codecs/src/encoding/mod.rs +++ b/lib/codecs/src/encoding/mod.rs @@ -25,7 +25,7 @@ pub use framing::{ NewlineDelimitedEncoderConfig, VarintLengthDelimitedEncoder, VarintLengthDelimitedEncoderConfig, }; -pub use serializer::{Serializer, SerializerConfig}; +pub use serializer::{BatchSerializer, Serializer, SerializerConfig}; /// An error that occurred while building an encoder. pub type BuildError = Box; diff --git a/lib/codecs/src/encoding/serializer.rs b/lib/codecs/src/encoding/serializer.rs index 925bf7a8145b7..d2464c1ac9972 100644 --- a/lib/codecs/src/encoding/serializer.rs +++ b/lib/codecs/src/encoding/serializer.rs @@ -10,8 +10,9 @@ use super::format::{ CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig, JsonSerializer, JsonSerializerConfig, LogfmtSerializer, LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer, NativeSerializerConfig, - ProtobufSerializer, ProtobufSerializerConfig, RawMessageSerializer, RawMessageSerializerConfig, - TextSerializer, TextSerializerConfig, + ParquetSerializer, ParquetSerializerConfig, ParquetSerializerOptions, ProtobufSerializer, + ProtobufSerializerConfig, RawMessageSerializer, RawMessageSerializerConfig, TextSerializer, + TextSerializerConfig, }; #[cfg(feature = "opentelemetry")] use super::format::{OtlpSerializer, OtlpSerializerConfig}; @@ -256,7 +257,7 @@ impl SerializerConfig { ))), SerializerConfig::Avro { .. } | SerializerConfig::Csv(..) - | SerializerConfig::Gelf + | SerializerConfig::Gelf(..) | SerializerConfig::Json(..) | SerializerConfig::Logfmt | SerializerConfig::Native @@ -265,6 +266,8 @@ impl SerializerConfig { | SerializerConfig::Text(..) | SerializerConfig::Cef(..) | SerializerConfig::Protobuf(..) => Ok(None), + #[cfg(feature = "opentelemetry")] + SerializerConfig::Otlp => Ok(None), } } diff --git a/scripts/environment/bootstrap-ubuntu-24.04.sh b/scripts/environment/bootstrap-ubuntu-24.04.sh index f91742839f4b9..5e59d383dbc1a 100755 --- a/scripts/environment/bootstrap-ubuntu-24.04.sh +++ b/scripts/environment/bootstrap-ubuntu-24.04.sh @@ -1,4 +1,4 @@ -#! /usr/bin/env bash +#!/usr/bin/env bash # Refer to https://github.com/actions/runner-images/blob/main/images/ubuntu/Ubuntu2404-Readme.md # for all runner information such as OS version and installed software. @@ -26,12 +26,9 @@ apt-get update --yes # Install all base dependencies in one go apt-get install --yes \ - software-properties-common \ - apt-utils \ - apt-transport-https - -# Deps -apt-get install --yes --no-install-recommends \ + software-properties-common \ + apt-utils \ + apt-transport-https \ build-essential \ ca-certificates \ cmake \ diff --git a/scripts/environment/prepare.sh b/scripts/environment/prepare.sh index af5b93cc57bb5..368e4287f4bce 100755 --- a/scripts/environment/prepare.sh +++ b/scripts/environment/prepare.sh @@ -148,27 +148,6 @@ if contains_module cargo-deb; then if [[ "$(cargo-deb --version 2>/dev/null)" != "2.9.3" ]]; then cargo "${install[@]}" cargo-deb --version 2.9.3 --force --locked fi - - - -# git config --global --add safe.directory /git/vectordotdev/vector - -# rustup show # causes installation of version from rust-toolchain.toml -# rustup default "$(rustup show active-toolchain | awk '{print $1;}' | head -n 1)" -# if [[ "$(cargo-deb --version)" != "2.0.2" ]] ; then -# rustup run stable cargo install cargo-deb --version 2.0.0 --force --locked -# fi -# if [[ "$(cross --version | grep cross)" != "cross 0.2.5" ]] ; then -# rustup run stable cargo install cross --version 0.2.5 --force --locked -# fi -# if [[ "$(cargo-nextest --version)" != "cargo-nextest 0.9.72" ]] ; then -# rustup run stable cargo install cargo-nextest --version 0.9.72 --force --locked -# fi -# if [[ "$(cargo-deny --version)" != "cargo-deny 0.16.1" ]] ; then -# rustup run stable cargo install cargo-deny --version 0.16.1 --force --locked -# fi -# if ! dd-rust-license-tool --help >& /dev/null ; then -# rustup run stable cargo install dd-rust-license-tool --version 1.0.2 --force --locked fi if contains_module cross; then diff --git a/scripts/integration/azure/compose.yaml b/scripts/integration/azure/compose.yaml index fec6975508506..933a72d235a24 100644 --- a/scripts/integration/azure/compose.yaml +++ b/scripts/integration/azure/compose.yaml @@ -3,7 +3,7 @@ version: '3' services: local-azure-blob: image: mcr.microsoft.com/azure-storage/azurite:${CONFIG_VERSION} - command: azurite --blobHost 0.0.0.0 --queueHost 0.0.0.0 --loose + command: azurite --blobHost 0.0.0.0 --loose volumes: - /var/run:/var/run diff --git a/src/azure/mod.rs b/src/azure/mod.rs deleted file mode 100644 index ddaf9b436d90c..0000000000000 --- a/src/azure/mod.rs +++ /dev/null @@ -1,235 +0,0 @@ -//! Shared functionality for the Azure components. -use std::sync::Arc; - -use azure_core::{auth::TokenCredential, new_http_client, HttpClient, RetryOptions}; -use azure_identity::{ - AutoRefreshingTokenCredential, ClientSecretCredential, DefaultAzureCredential, - TokenCredentialOptions, -}; -use azure_storage::{prelude::*, CloudLocation, ConnectionString}; -use azure_storage_blobs; -use azure_storage_queues; -use serde_with::serde_as; - -use vector_lib::configurable::configurable_component; - -/// Stores credentials used to build Azure Clients. -#[serde_as] -#[configurable_component] -#[derive(Clone, Debug, Derivative)] -#[derivative(Default)] -#[serde(deny_unknown_fields)] -pub struct ClientCredentials { - /// Check how to get Tenant ID in [the docs][docs]. - /// - /// [docs]: https://learn.microsoft.com/en-us/azure/azure-portal/get-subscription-tenant-id - tenant_id: String, - - /// Check how to get Client ID in [the docs][docs]. - /// - /// [docs]: https://learn.microsoft.com/en-us/entra/identity-platform/quickstart-register-app#add-credentials - client_id: String, - - /// Check how to get Client Secret in [the docs][docs]. - /// - /// [docs]: https://learn.microsoft.com/en-us/entra/identity-platform/quickstart-register-app#add-credentials - client_secret: String, -} - -/// Builds Azure Storage Container Client. -/// -/// To authenticate only **one** of the following should be set: -/// 1. `connection_string` -/// 2. `storage_account` - optionally you can set `client_credentials` to provide credentials, -/// if `client_credentials` is None, [`DefaultAzureCredential`][dac] would be used. -/// -/// [dac]: https://docs.rs/azure_identity/0.17.0/azure_identity/struct.DefaultAzureCredential.html -pub fn build_container_client( - connection_string: Option, - storage_account: Option, - container_name: String, - endpoint: Option, - client_credentials: Option, -) -> crate::Result> { - let client; - match (connection_string, storage_account) { - (Some(connection_string_p), None) => { - let connection_string = ConnectionString::new(&connection_string_p)?; - - client = match connection_string.blob_endpoint { - // When the blob_endpoint is provided, we use the Custom CloudLocation since it is - // required to contain the full URI to the blob storage API endpoint, this means - // that account_name is not required to exist in the connection_string since - // account_name is only used with the default CloudLocation in the Azure SDK to - // generate the storage API endpoint - Some(uri) => azure_storage_blobs::prelude::ClientBuilder::with_location( - CloudLocation::Custom { - uri: uri.to_string(), - }, - connection_string.storage_credentials()?, - ), - // Without a valid blob_endpoint in the connection_string, assume we are in Azure - // Commercial (AzureCloud location) and create a default Blob Storage Client that - // builds the API endpoint location using the account_name as input - None => azure_storage_blobs::prelude::ClientBuilder::new( - connection_string - .account_name - .ok_or("Account name missing in connection string")?, - connection_string.storage_credentials()?, - ), - } - .retry(RetryOptions::none()) - .container_client(container_name); - } - (None, Some(storage_account_p)) => { - let creds: Arc = match client_credentials { - Some(client_credentials_p) => { - let http_client: Arc = new_http_client(); - let options = TokenCredentialOptions::default(); - let creds = std::sync::Arc::new(ClientSecretCredential::new( - http_client.clone(), - client_credentials_p.tenant_id, - client_credentials_p.client_id, - client_credentials_p.client_secret, - options, - )); - creds - } - None => { - let creds = std::sync::Arc::new(DefaultAzureCredential::default()); - creds - } - }; - let auto_creds = std::sync::Arc::new(AutoRefreshingTokenCredential::new(creds)); - let storage_credentials = StorageCredentials::token_credential(auto_creds); - - client = match endpoint { - // If a blob_endpoint is provided in the configuration, use it with a Custom - // CloudLocation, to allow overriding the blob storage API endpoint - Some(endpoint) => azure_storage_blobs::prelude::ClientBuilder::with_location( - CloudLocation::Custom { uri: endpoint }, - storage_credentials, - ), - // Use the storage_account configuration parameter and assume we are in Azure - // Commercial (AzureCloud location) and build the blob storage API endpoint using - // the storage_account as input. - None => azure_storage_blobs::prelude::ClientBuilder::new( - storage_account_p, - storage_credentials, - ), - } - .retry(RetryOptions::none()) - .container_client(container_name); - } - (None, None) => { - return Err("Either `connection_string` or `storage_account` has to be provided".into()) - } - (Some(_), Some(_)) => { - return Err( - "`connection_string` and `storage_account` can't be provided at the same time" - .into(), - ) - } - } - Ok(std::sync::Arc::new(client)) -} - -/// Builds Azure Queue Service Client. -/// -/// To authenticate only **one** of the following should be set: -/// 1. `connection_string` -/// 2. `storage_account` - optionally you can set `client_credentials` to provide credentials, -/// if `client_credentials` is None, [`DefaultAzureCredential`][dac] would be used. -/// -/// [dac]: https://docs.rs/azure_identity/0.17.0/azure_identity/struct.DefaultAzureCredential.html -pub fn build_queue_client( - connection_string: Option, - storage_account: Option, - queue_name: String, - endpoint: Option, - client_credentials: Option, -) -> crate::Result> { - let client; - match (connection_string, storage_account) { - (Some(connection_string_p), None) => { - let connection_string = ConnectionString::new(&connection_string_p)?; - - client = match connection_string.queue_endpoint { - // When the queue_endpoint is provided, we use the Custom CloudLocation since it is - // required to contain the full URI to the storage queue API endpoint, this means - // that account_name is not required to exist in the connection_string since - // account_name is only used with the default CloudLocation in the Azure SDK to - // generate the storage API endpoint - Some(uri) => azure_storage_queues::QueueServiceClientBuilder::with_location( - CloudLocation::Custom { - uri: uri.to_string(), - }, - connection_string.storage_credentials()?, - ), - // Without a valid queue_endpoint in the connection_string, assume we are in Azure - // Commercial (AzureCloud location) and create a default Blob Storage Client that - // builds the API endpoint location using the account_name as input - None => azure_storage_queues::QueueServiceClientBuilder::new( - connection_string - .account_name - .ok_or("Account name missing in connection string")?, - connection_string.storage_credentials()?, - ), - } - .retry(RetryOptions::none()) - .build() - .queue_client(queue_name); - } - (None, Some(storage_account_p)) => { - let creds: Arc = match client_credentials { - Some(client_credentials_p) => { - let http_client: Arc = new_http_client(); - let options = TokenCredentialOptions::default(); - let creds = std::sync::Arc::new(ClientSecretCredential::new( - http_client.clone(), - client_credentials_p.tenant_id, - client_credentials_p.client_id, - client_credentials_p.client_secret, - options, - )); - creds - } - None => { - let creds = std::sync::Arc::new(DefaultAzureCredential::default()); - creds - } - }; - let auto_creds = std::sync::Arc::new(AutoRefreshingTokenCredential::new(creds)); - let storage_credentials = StorageCredentials::token_credential(auto_creds); - - client = match endpoint { - // If a queue_endpoint is provided in the configuration, use it with a Custom - // CloudLocation, to allow overriding the storage queue API endpoint - Some(endpoint) => azure_storage_queues::QueueServiceClientBuilder::with_location( - CloudLocation::Custom { uri: endpoint }, - storage_credentials, - ), - // Use the storage_account configuration parameter and assume we are in Azure - // Commercial (AzureCloud location) and build the blob storage API endpoint using - // the storage_account as input. - None => azure_storage_queues::QueueServiceClientBuilder::new( - storage_account_p, - storage_credentials, - ), - } - .retry(RetryOptions::none()) - .build() - .queue_client(queue_name); - } - (None, None) => { - return Err("Either `connection_string` or `storage_account` has to be provided".into()) - } - (Some(_), Some(_)) => { - return Err( - "`connection_string` and `storage_account` can't be provided at the same time" - .into(), - ) - } - } - Ok(std::sync::Arc::new(client)) -} diff --git a/src/codecs/encoding/config.rs b/src/codecs/encoding/config.rs index 8ef90e8a50a18..ccf9b0f263847 100644 --- a/src/codecs/encoding/config.rs +++ b/src/codecs/encoding/config.rs @@ -1,7 +1,10 @@ use crate::codecs::Transformer; -use vector_lib::codecs::{ - encoding::{BatchSerializer, Framer, FramingConfig, Serializer, SerializerConfig}, - CharacterDelimitedEncoder, LengthDelimitedEncoder, NewlineDelimitedEncoder, +use vector_lib::{ + codecs::{ + encoding::{BatchSerializer, Framer, FramingConfig, Serializer, SerializerConfig}, + CharacterDelimitedEncoder, LengthDelimitedEncoder, NewlineDelimitedEncoder, + }, + configurable::configurable_component, }; #[cfg(feature = "codecs-opentelemetry")] diff --git a/src/components/validation/resources/mod.rs b/src/components/validation/resources/mod.rs index 3b3a74721e23b..dc6e227c50252 100644 --- a/src/components/validation/resources/mod.rs +++ b/src/components/validation/resources/mod.rs @@ -237,8 +237,9 @@ fn serializer_config_to_deserializer( }) } SerializerConfig::RawMessage | SerializerConfig::Text(_) => DeserializerConfig::Bytes, - #[cfg(feature = "codecs-opentelemetry")] SerializerConfig::Parquet { .. } => todo!(), + #[cfg(feature = "codecs-opentelemetry")] + SerializerConfig::Otlp => todo!(), }; deserializer_config.build() diff --git a/src/gcp.rs b/src/gcp.rs index df8524b937a20..770283e17033b 100644 --- a/src/gcp.rs +++ b/src/gcp.rs @@ -4,8 +4,7 @@ use std::{ time::Duration, }; -use base64::prelude::{Engine as _, BASE64_URL_SAFE}; -use chrono::{DateTime, Utc}; +use base64::prelude::{BASE64_URL_SAFE, Engine as _}; pub use goauth::scopes::Scope; use goauth::{ GoErr, @@ -14,15 +13,10 @@ use goauth::{ }; use http::{Uri, uri::PathAndQuery}; use hyper::header::AUTHORIZATION; -use reqwest::{Client, Response}; -use serde_json::{from_value, json}; -use serde_with::serde_derive::Deserialize; use smpl_jwt::Jwt; use snafu::{ResultExt, Snafu}; use tokio::sync::watch; -use typetag::serde; -use vector_lib::configurable::configurable_component; -use vector_lib::sensitive_string::SensitiveString; +use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString}; use crate::{ config::ProxyConfig, @@ -108,10 +102,6 @@ pub struct GcpAuthConfig { #[serde(default, skip_serializing)] #[configurable(metadata(docs::hidden))] pub skip_authentication: bool, - - /// The service account to impersonate. The impersonated service account must have the - /// `roles/iam.serviceAccountTokenCreator` role on the target service account. - pub impersonated_service_account: Option, } impl GcpAuthConfig { @@ -122,7 +112,7 @@ impl GcpAuthConfig { let gap = std::env::var("GOOGLE_APPLICATION_CREDENTIALS").ok(); let creds_path = self.credentials_path.as_ref().or(gap.as_ref()); match (&creds_path, &self.api_key) { - (Some(path), _) => GcpAuthenticator::from_file(path, scope, self.impersonated_service_account.clone()).await?, + (Some(path), _) => GcpAuthenticator::from_file(path, scope).await?, (None, Some(api_key)) => GcpAuthenticator::from_api_key(api_key.inner())?, (None, None) => GcpAuthenticator::new_implicit().await?, } @@ -137,30 +127,18 @@ pub enum GcpAuthenticator { None, } -type ServiceAccount = String; -#[derive(Debug)] -pub enum Creds { - Regular(Credentials, Scope), - Impersonated(Credentials, Scope, ServiceAccount), -} #[derive(Debug)] pub struct InnerCreds { - creds: Option, + creds: Option<(Credentials, Scope)>, token: RwLock, } impl GcpAuthenticator { - async fn from_file(path: &str, scope: Scope, service_account: Option) -> crate::Result { + async fn from_file(path: &str, scope: Scope) -> crate::Result { let creds = Credentials::from_file(path).context(InvalidCredentialsSnafu)?; - let token = RwLock::new(fetch_token(&creds, &scope, service_account.as_deref()).await?); - - let creds = Some(match service_account { - Some(service_account) => - Creds::Impersonated(creds, scope, service_account), - None => - Creds::Regular(creds, scope), - }); - Ok(Self::Credentials(Arc::new(InnerCreds { creds, token, }))) + let token = RwLock::new(fetch_token(&creds, &scope).await?); + let creds = Some((creds, scope)); + Ok(Self::Credentials(Arc::new(InnerCreds { creds, token }))) } async fn new_implicit() -> crate::Result { @@ -267,12 +245,8 @@ impl GcpAuthenticator { impl InnerCreds { async fn regenerate_token(&self) -> crate::Result<()> { let token = match &self.creds { - Some(Creds::Regular(creds, scope)) => - fetch_regular_token(creds, scope).await?, - Some(Creds::Impersonated(creds, scope, service_account)) => - fetch_impersonated_token(creds, scope, service_account).await?, - None => - get_token_implicit().await?, + Some((creds, scope)) => fetch_token(creds, scope).await?, + None => get_token_implicit().await?, }; *self.token.write().unwrap() = token; Ok(()) @@ -284,16 +258,14 @@ impl InnerCreds { } } -async fn fetch_token(creds: &Credentials, scope: &Scope, impersonated_service_account: Option<&str> -) -> crate::Result { - match impersonated_service_account { - Some(service_account) => fetch_impersonated_token(creds, scope, service_account).await, - None => fetch_regular_token(creds, scope).await, - } -} - -async fn fetch_regular_token(creds: &Credentials, scope: &Scope) -> crate::Result { - let claims = JwtClaims::new(creds.iss(), scope, creds.token_uri(), None, None); +async fn fetch_token(creds: &Credentials, scope: &Scope) -> crate::Result { + let claims = JwtClaims::new( + creds.iss(), + std::slice::from_ref(scope), + creds.token_uri(), + None, + None, + ); let rsa_key = creds.rsa_key().context(InvalidRsaKeySnafu)?; let jwt = Jwt::new(claims, rsa_key, None); @@ -309,118 +281,6 @@ async fn fetch_regular_token(creds: &Credentials, scope: &Scope) -> crate::Resul .map_err(Into::into) } -async fn fetch_impersonated_token( - creds: &Credentials, - impersonated_scope: &Scope, - impersonated_service_account: &str, -) -> crate::Result { - // base scope is used only for impersonation from base service account to target service account - let base_scope = Scope::CloudPlatform; - let claims = JwtClaims::new(creds.iss(), &base_scope, creds.token_uri(), None, None); - let rsa_key = creds.rsa_key().context(InvalidRsaKeySnafu)?; - let jwt = Jwt::new(claims, rsa_key, None); - - debug!( - message = "Fetching base service account GCP authentication token.", - project = ?creds.project(), - iss = ?creds.iss(), - token_uri = ?creds.token_uri(), - ); - let token = goauth::get_token(&jwt, creds) - .await - .context(GetTokenSnafu)?; - - debug!( - message = "Fetching impersonated service account GCP authentication token.", - project = ?creds.project(), - impersonated_service_account = impersonated_service_account - ); - let token = do_fetch_impersonated_token(token.access_token(), - impersonated_service_account, - &[&impersonated_scope.url()]) - .await - .map_err(move |e| { - error!( - message = "Failed to generate impersonated token.", - impersonated_service_account = impersonated_service_account, - error = %e, - ); - e - })?; - Ok(token) -} - -async fn do_fetch_impersonated_token( - base_token: &str, - target_service_account: &str, - scopes: &[&str], -) -> crate::Result { - // Define the IAM Credentials API endpoint for generating impersonated tokens - let url = format!( - "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{target_service_account}:generateAccessToken", - ); - - // Construct the JSON payload with the requested scopes - let body = json!({ - "scope": scopes, - }); - - // Create an HTTP client and make the POST request - let client = Client::new(); - let response = client - .post(&url) - .bearer_auth(base_token) // Use the base token for authorization - .json(&body) - .send() - .await?; - - token_from_json(response).await -} - -async fn token_from_json(resp: Response) -> crate::Result { - let is_success = resp.status().is_success(); - let resp = resp.bytes().await?; - if !is_success { - error!( - message = "No success in response.", - raw_resp = String::from_utf8_lossy(&resp).into_owned(), - ); - let token_err: TokenErr = serde_json::from_slice(&resp)?; - return Err(token_err.into()) - } - - #[derive(Deserialize)] - #[serde(rename_all = "camelCase")] - struct TokenCamelCase { - access_token: String, - expire_time: String, - } - let token: TokenCamelCase = serde_json::from_slice(&resp.clone()).map_err(|e| { - error!( - message = "Failed to parse OAuth token JSON.", - error = %e, - raw_resp = String::from_utf8_lossy(&resp).into_owned(), - ); - e - })?; - - let remapped = json!({ - "access_token": token.access_token, - "token_type": "Bearer", - "expires_in": seconds_from_now_to_timestamp(&token.expire_time)?, - }); - - let token: Token = from_value(remapped)?; - Ok(token) -} - -fn seconds_from_now_to_timestamp(timestamp: &str) -> crate::Result { - let future_time: DateTime = timestamp.parse()?; - let now = Utc::now(); - let duration = future_time.signed_duration_since(now); - Ok(duration.num_seconds() as u32) -} - async fn get_token_implicit() -> Result { debug!("Fetching implicit GCP authentication token."); let req = http::Request::get(SERVICE_ACCOUNT_TOKEN_URL) diff --git a/src/internal_events/azure_queue.rs b/src/internal_events/azure_queue.rs deleted file mode 100644 index 31f864b1d0d73..0000000000000 --- a/src/internal_events/azure_queue.rs +++ /dev/null @@ -1,196 +0,0 @@ -#[cfg(feature = "sources-azure_blob")] -pub use azure_blob::*; -use metrics::counter; -use vector_lib::internal_event::{error_stage, error_type, InternalEvent}; - -#[cfg(feature = "sources-azure_blob")] -mod azure_blob { - use super::*; - use crate::event::Event; - use crate::sources::azure_blob::queue::ProcessingError; - - #[derive(Debug)] - pub struct QueueMessageProcessingError<'a> { - pub message_id: &'a str, - pub error: &'a ProcessingError, - } - - impl<'a> InternalEvent for QueueMessageProcessingError<'a> { - fn emit(self) { - error!( - message = "Failed to process Queue message.", - message_id = %self.message_id, - error = %self.error, - error_code = "failed_processing_azure_queue_message", - error_type = error_type::PARSER_FAILED, - stage = error_stage::PROCESSING, - internal_log_rate_limit = true, - ); - counter!( - "component_errors_total", - "error_code" => "failed_processing_azure_queue_message", - "error_type" => error_type::PARSER_FAILED, - "stage" => error_stage::PROCESSING, - ).increment(1); - } - } - - #[derive(Debug)] - pub struct InvalidRowEventType<'a> { - pub event: &'a Event, - } - - impl<'a> InternalEvent for InvalidRowEventType<'a> { - fn emit(self) { - error!( - message = "Expected Azure rows as Log Events", - event = ?self.event, - error_code = "invalid_azure_row_event", - error_type = error_type::CONDITION_FAILED, - stage = error_stage::PROCESSING, - ); - counter!( - "component_errors_total", - "error_code" => "invalid_azure_row_event", - "error_type" => error_type::CONDITION_FAILED, - "stage" => error_stage::PROCESSING, - ).increment(1); - } - } -} - -#[derive(Debug)] -pub struct QueueMessageReceiveError<'a, E> { - pub error: &'a E, -} - -impl<'a, E: std::fmt::Display + std::fmt::Debug> InternalEvent for QueueMessageReceiveError<'a, E> { - fn emit(self) { - error!( - message = "Failed reading messages", - event = format!("{:?}", self.error), - error_code = "failed_fetching_azure_queue_events", - error_type = error_type::REQUEST_FAILED, - stage = error_stage::RECEIVING, - ); - counter!( - "component_errors_total", - "error_code" => "failed_fetching_azure_queue_events", - "error_type" => error_type::REQUEST_FAILED, - "stage" => error_stage::RECEIVING, - ).increment(1); - } -} - -#[derive(Debug)] -pub struct QueueMessageDeleteError<'a, E> { - pub error: &'a E, -} - -impl<'a, E: std::fmt::Display> InternalEvent for QueueMessageDeleteError<'a, E> { - fn emit(self) { - error!( - message = "Failed deleting message", - error = %self.error, - error_code = "failed_deleting_azure_queue_event", - error_type = error_type::ACKNOWLEDGMENT_FAILED, - stage = error_stage::PROCESSING, - ); - counter!( - "component_errors_total", - "error_code" => "failed_deleting_azure_queue_event", - "error_type" => error_type::WRITER_FAILED, - "stage" => error_stage::RECEIVING, - ).increment(1); - } -} - -#[derive(Debug)] -pub struct QueueStorageInvalidEventIgnored<'a> { - pub container: &'a str, - pub subject: &'a str, - pub event_type: &'a str, -} - -impl<'a> InternalEvent for QueueStorageInvalidEventIgnored<'a> { - fn emit(self) { - trace!( - message = "Ignoring event because of wrong event type", - container = %self.container, - subject = %self.subject, - event_type = %self.event_type - ); - counter!( - "azure_queue_event_ignored_total", - "ignore_type" => "invalid_event_type" - ).increment(1); - } -} - -#[derive(Debug)] -pub struct QueueStorageMismatchingContainerName<'a> { - pub container: &'a str, - pub configured_container: &'a str, -} - -impl<'a> InternalEvent for QueueStorageMismatchingContainerName<'a> { - fn emit(self) { - warn!( - message = "Ignoring event because of wrong container name", - configured_container = %self.configured_container, - container = %self.container, - ); - counter!( - "azure_queue_event_ignored_total", - "ignore_type" => "mismatching_container_name" - ).increment(1); - } -} - -#[derive(Debug)] -pub struct QueueMessageProcessingSucceeded {} - -impl InternalEvent for QueueMessageProcessingSucceeded { - fn emit(self) { - trace!(message = "Processed azure queue message successfully."); - counter!("azure_queue_message_processing_succeeded_total").increment(1); - } -} - -#[derive(Debug)] -pub struct QueueMessageProcessingErrored {} - -impl InternalEvent for QueueMessageProcessingErrored { - fn emit(self) { - error!(message = "Batch event had a transient error in delivery."); - counter!("azure_queue_message_processing_errored_total").increment(1); - } -} - -#[derive(Debug)] -pub struct QueueMessageProcessingRejected {} - -impl InternalEvent for QueueMessageProcessingRejected { - fn emit(self) { - error!(message = "Batch event had a permanent failure or rejection."); - counter!("azure_queue_message_processing_rejected_total").increment(1); - } -} - -#[derive(Debug)] -pub struct BlobDoesntExist<'a> { - pub nonexistent_blob_name: &'a str, -} - -impl<'a> InternalEvent for BlobDoesntExist<'a> { - fn emit(self) { - warn!( - message = "Ignoring event because blob doesn't exist in storage", - blob_name = self.nonexistent_blob_name - ); - counter!( - "azure_queue_event_ignored_total", - "ignore_type" => "blob_doesnt_exist" - ).increment(1); - } -} diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index 27177a9881c37..1647e2b8ff0ca 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -155,8 +155,6 @@ mod windows; pub mod config; #[cfg(any(feature = "transforms-log_to_metric", feature = "sinks-loki"))] mod expansion; -#[cfg(any(feature = "sources-azure_blob", feature = "sources-azure_blob",))] -mod azure_queue; #[cfg(feature = "sources-mongodb_metrics")] pub(crate) use mongodb_metrics::*; @@ -185,8 +183,6 @@ pub(crate) use self::aws_kinesis::*; pub(crate) use self::aws_kinesis_firehose::*; #[cfg(any(feature = "sources-aws_s3", feature = "sources-aws_sqs",))] pub(crate) use self::aws_sqs::*; -#[cfg(any(feature = "sources-azure_blob"))] -pub(crate) use self::azure_queue::*; pub(crate) use self::codecs::*; #[cfg(feature = "sources-datadog_agent")] pub(crate) use self::datadog_agent::*; diff --git a/src/lib.rs b/src/lib.rs index b3e20ee68d2aa..6ed250658b65f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -73,7 +73,6 @@ pub mod async_read; #[cfg(feature = "aws-config")] pub mod aws; #[allow(unreachable_pub)] -pub mod azure; pub mod codecs; pub mod common; mod convert_config; diff --git a/src/sinks/azure_blob/config.rs b/src/sinks/azure_blob/config.rs index fc4d628ada5c5..3c2437c554106 100644 --- a/src/sinks/azure_blob/config.rs +++ b/src/sinks/azure_blob/config.rs @@ -10,7 +10,7 @@ use vector_lib::{ use super::request_builder::AzureBlobRequestOptions; use crate::{ - azure, + Result, codecs::{Encoder, EncodingConfigWithFraming, SinkType}, config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext}, sinks::{ @@ -166,14 +166,9 @@ impl GenerateConfig for AzureBlobSinkConfig { #[typetag::serde(name = "azure_blob")] impl SinkConfig for AzureBlobSinkConfig { async fn build(&self, _cx: SinkContext) -> Result<(VectorSink, Healthcheck)> { - let client = azure::build_container_client( - self.connection_string - .as_ref() - .map(|v| v.inner().to_string()), - self.storage_account.as_ref().map(|v| v.to_string()), + let client = azure_common::config::build_client( + self.connection_string.clone().into(), self.container_name.clone(), - self.endpoint.clone(), - None, )?; let healthcheck = azure_common::config::build_healthcheck( diff --git a/src/sinks/azure_blob/integration_tests.rs b/src/sinks/azure_blob/integration_tests.rs index e6946ecc8a7c9..a48b330982eb5 100644 --- a/src/sinks/azure_blob/integration_tests.rs +++ b/src/sinks/azure_blob/integration_tests.rs @@ -19,7 +19,6 @@ use vector_lib::{ use super::config::AzureBlobSinkConfig; use crate::{ - azure, event::{Event, EventArray, LogEvent}, sinks::{ VectorSink, azure_common, @@ -34,12 +33,9 @@ use crate::{ #[tokio::test] async fn azure_blob_healthcheck_passed() { let config = AzureBlobSinkConfig::new_emulator().await; - let client = azure::build_container_client( - config.connection_string.map(Into::into), - None, + let client = azure_common::config::build_client( + config.connection_string.clone().into(), config.container_name.clone(), - None, - None, ) .expect("Failed to create client"); @@ -56,12 +52,9 @@ async fn azure_blob_healthcheck_unknown_container() { container_name: String::from("other-container-name"), ..config }; - let client = azure::build_container_client( - config.connection_string.map(Into::into), - config.storage_account.map(Into::into), + let client = azure_common::config::build_client( + config.connection_string.clone().into(), config.container_name.clone(), - config.endpoint.clone(), - None, ) .expect("Failed to create client"); @@ -247,12 +240,9 @@ impl AzureBlobSinkConfig { } fn to_sink(&self) -> VectorSink { - let client = azure::build_container_client( - self.connection_string.clone().map(Into::into), - self.storage_account.clone().map(Into::into), + let client = azure_common::config::build_client( + self.connection_string.clone().into(), self.container_name.clone(), - self.endpoint.clone(), - None, ) .expect("Failed to create client"); @@ -267,12 +257,9 @@ impl AzureBlobSinkConfig { } pub async fn list_blobs(&self, prefix: String) -> Vec { - let client = azure::build_container_client( - self.connection_string.clone().map(Into::into), - self.storage_account.clone().map(Into::into), + let client = azure_common::config::build_client( + self.connection_string.clone().into(), self.container_name.clone(), - self.endpoint.clone(), - None, ) .unwrap(); let response = client @@ -295,12 +282,9 @@ impl AzureBlobSinkConfig { } pub async fn get_blob(&self, blob: String) -> (Blob, Vec) { - let client = azure::build_container_client( - self.connection_string.clone().map(Into::into), - self.storage_account.clone().map(Into::into), + let client = azure_common::config::build_client( + self.connection_string.clone().into(), self.container_name.clone(), - self.endpoint.clone(), - None, ) .unwrap(); let response = client @@ -333,12 +317,9 @@ impl AzureBlobSinkConfig { } async fn ensure_container(&self) { - let client = azure::build_container_client( - self.connection_string.clone().map(Into::into), - self.storage_account.clone().map(Into::into), + let client = azure_common::config::build_client( + self.connection_string.clone().into(), self.container_name.clone(), - self.endpoint.clone(), - None, ) .unwrap(); let request = client diff --git a/src/sinks/azure_common/config.rs b/src/sinks/azure_common/config.rs index f5bd9c485cd3d..fadd0e2c4707c 100644 --- a/src/sinks/azure_common/config.rs +++ b/src/sinks/azure_common/config.rs @@ -1,6 +1,8 @@ use std::sync::Arc; use azure_core::error::HttpError; +use azure_core_for_storage::RetryOptions; +use azure_storage::{CloudLocation, ConnectionString}; use azure_storage_blobs::{blob::operations::PutBlockBlobResponse, prelude::*}; use bytes::Bytes; use futures::FutureExt; @@ -121,3 +123,37 @@ pub fn build_healthcheck( Ok(healthcheck.boxed()) } + +pub fn build_client( + connection_string: String, + container_name: String, +) -> crate::Result> { + let client = { + let connection_string = ConnectionString::new(&connection_string)?; + let account_name = connection_string + .account_name + .ok_or("Account name missing in connection string")?; + + match connection_string.blob_endpoint { + // When the blob_endpoint is provided, we use the Custom CloudLocation since it is + // required to contain the full URI to the blob storage API endpoint, this means + // that account_name is not required to exist in the connection_string since + // account_name is only used with the default CloudLocation in the Azure SDK to + // generate the storage API endpoint + Some(uri) => ClientBuilder::with_location( + CloudLocation::Custom { + uri: uri.to_string(), + account: account_name.to_string(), + }, + connection_string.storage_credentials()?, + ), + // Without a valid blob_endpoint in the connection_string, assume we are in Azure + // Commercial (AzureCloud location) and create a default Blob Storage Client that + // builds the API endpoint location using the account_name as input + None => ClientBuilder::new(account_name, connection_string.storage_credentials()?), + } + .retry(RetryOptions::none()) + .container_client(container_name) + }; + Ok(Arc::new(client)) +} diff --git a/src/sinks/gcp/stackdriver/metrics/tests.rs b/src/sinks/gcp/stackdriver/metrics/tests.rs index 4f31682cae4fe..a5b3a40649d1f 100644 --- a/src/sinks/gcp/stackdriver/metrics/tests.rs +++ b/src/sinks/gcp/stackdriver/metrics/tests.rs @@ -56,7 +56,6 @@ async fn sends_metric() { api_key: None, credentials_path: None, skip_authentication: true, - impersonated_service_account: None, }, ..Default::default() }; @@ -117,7 +116,6 @@ async fn sends_multiple_metrics() { api_key: None, credentials_path: None, skip_authentication: true, - impersonated_service_account: None, }, batch, ..Default::default() @@ -205,7 +203,6 @@ async fn does_not_aggregate_metrics() { api_key: None, credentials_path: None, skip_authentication: true, - impersonated_service_account: None, }, batch, ..Default::default() diff --git a/src/sources/azure_blob/integration_tests.rs b/src/sources/azure_blob/integration_tests.rs deleted file mode 100644 index 2e453296d8335..0000000000000 --- a/src/sources/azure_blob/integration_tests.rs +++ /dev/null @@ -1,215 +0,0 @@ -use azure_core::error::HttpError; -use azure_storage_blobs::prelude::PublicAccess; -use base64::{prelude::BASE64_STANDARD, Engine}; -use http::StatusCode; - -use super::{ - queue::{make_container_client, make_queue_client, Config}, - time::Duration, - AzureBlobConfig, Strategy, -}; -use crate::{ - event::Event, - serde::default_decoding, - test_util::components::{ - run_and_assert_source_compliance, run_and_assert_source_error, COMPONENT_ERROR_TAGS, - SOURCE_TAGS, - }, -}; - -impl AzureBlobConfig { - pub async fn new_emulator() -> AzureBlobConfig { - let address = std::env::var("AZURE_ADDRESS").unwrap_or_else(|_| "localhost".into()); - let config = AzureBlobConfig { - connection_string: Some(format!("UseDevelopmentStorage=true;DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://{}:10000/devstoreaccount1;QueueEndpoint=http://{}:10001/devstoreaccount1;TableEndpoint=http://{}:10002/devstoreaccount1;", address, address, address).into()), - storage_account: None, - container_name: "logs".to_string(), - strategy: Strategy::StorageQueue, - queue: Some(Config { - queue_name: format!("test-{}", rand::random::()), - poll_secs: 1, - }), - // TODO shouldn't we have blob_endpoint and queue_endpoint? - endpoint: None, - acknowledgements: Default::default(), - // TODO this should be option - exec_interval_secs: 0, - log_namespace: None, - decoding: default_decoding(), - client_credentials: None, - }; - - config.ensure_container().await; - config.ensure_queue().await; - - config - } - - async fn run_assert(&self) -> Vec { - run_and_assert_source_compliance(self.clone(), Duration::from_secs(1), &SOURCE_TAGS).await - } - - async fn run_error(&self) -> Vec { - run_and_assert_source_error(self.clone(), Duration::from_secs(1), &COMPONENT_ERROR_TAGS) - .await - } - - async fn ensure_container(&self) { - let client = make_container_client(self).expect("Failed to create container client"); - let request = client - .create() - .public_access(PublicAccess::None) - .into_future(); - - let response = match request.await { - Ok(_) => Ok(()), - Err(reason) => match reason.downcast_ref::() { - Some(err) => match StatusCode::from_u16(err.status().into()) { - Ok(StatusCode::CONFLICT) => Ok(()), - _ => Err(format!("Unexpected status code {}", err.status())), - }, - _ => Err(format!("Unexpected error {}", reason)), - }, - }; - - response.expect("Failed to create container") - } - - async fn ensure_queue(&self) { - let client = make_queue_client(self).expect("Failed to create queue client"); - let request = client.create().into_future(); - - let response = match request.await { - Ok(_) => Ok(()), - Err(reason) => match reason.downcast_ref::() { - Some(err) => match StatusCode::from_u16(err.status().into()) { - Ok(StatusCode::CONFLICT) => Ok(()), - _ => Err(format!("Unexpected status code {}", err.status())), - }, - _ => Err(format!("Unexpected error {}", reason)), - }, - }; - - response.expect("Failed to create queue") - } - - async fn upload_blob(&self, name: String, content: String) { - let container_client = - make_container_client(self).expect("Failed to create container client"); - let blob_client = container_client.blob_client(name.clone()); - blob_client - .put_block_blob(content) - .await - .expect("Failed putting blob"); - - self.queue_notify_blob_created(&name).await; - } - - async fn queue_notify_blob_created(&self, name: &str) { - let queue_client = make_queue_client(self).expect("Failed to create queue client"); - let message = format!( - r#"{{ - "topic": "/subscriptions/fa5f2180-1451-4461-9b1f-aae7d4b33cf8/resourceGroups/events_poc/providers/Microsoft.Storage/storageAccounts/eventspocaccount", - "subject": "/blobServices/default/containers/logs/blobs/{}", - "eventType": "Microsoft.Storage.BlobCreated", - "id": "be3f21f7-201e-000b-7605-a29195062628", - "data": {{ - "api": "PutBlob", - "clientRequestId": "1fa42c94-6dd3-4172-95c4-fd9cf56b5009", - "requestId": "be3f21f7-201e-000b-7605-a29195000000", - "eTag": "0x8DC701C5D3FFDF6", - "contentType": "application/octet-stream", - "contentLength": 0, - "blobType": "BlockBlob", - "url": "https://eventspocaccount.blob.core.windows.net/logs/{}", - "sequencer": "0000000000000000000000000005C5360000000000276a63", - "storageDiagnostics": {{ - "batchId": "fec5b12c-2006-0034-0005-a25936000000" - }} - }}, - "dataVersion": "", - "metadataVersion": "1", - "eventTime": "2024-05-09T11:37:10.5637878Z" - }}"#, - name, name - ); - queue_client - .put_message(BASE64_STANDARD.encode(message)) - .await - .expect("Failed putting message"); - } -} - -#[tokio::test] -async fn azure_blob_read_single_line_from_blob() { - let config = AzureBlobConfig::new_emulator().await; - let content = "a"; - config - .upload_blob("file.txt".to_string(), content.to_string()) - .await; - - let events = config.run_assert().await; - assert_eq!(events.len(), 1); - assert_eq!(events[0].as_log()["message"], "a".into()); -} - -#[tokio::test] -async fn azure_blob_read_multiple_lines_from_blob() { - let config = AzureBlobConfig::new_emulator().await; - let content = "a\nb\nc"; - config - .upload_blob("file.txt".to_string(), content.to_string()) - .await; - - let events = config.run_assert().await; - assert_eq!(events.len(), 3); - assert_eq!(events[0].as_log()["message"], "a".into()); - assert_eq!(events[1].as_log()["message"], "b".into()); - assert_eq!(events[2].as_log()["message"], "c".into()); -} - -#[tokio::test] -async fn azure_blob_read_single_line_from_multiple_blobs() { - let config = AzureBlobConfig::new_emulator().await; - let contents = vec!["a", "b", "c"]; - for (i, content) in contents.clone().iter().enumerate() { - config - .upload_blob(format!("file{}.txt", i), content.to_string()) - .await; - } - - let events = - run_and_assert_source_compliance(config.clone(), Duration::from_secs(4), &SOURCE_TAGS) - .await; - assert_eq!(events.len(), contents.len()); - for (i, event) in events.iter().enumerate() { - assert_eq!(event.as_log()["message"], contents[i].into()); - } -} - -#[tokio::test] -async fn azure_blob_emit_error_on_message_read() { - let mut config = AzureBlobConfig::new_emulator().await; - let content = "a\nb\nc"; - config - .upload_blob("file.txt".to_string(), content.to_string()) - .await; - config.queue = Some(Config { - queue_name: "nonexistent".to_string(), - poll_secs: 1, - }); - - let events = config.run_error().await; - assert!(events.is_empty()); -} - -#[tokio::test] -async fn azure_blob_ignore_missing_blob() { - let config = AzureBlobConfig::new_emulator().await; - - config.queue_notify_blob_created("non-existent").await; - config.upload_blob("file.txt".to_string(), "some_content".to_string()).await; - - let events = config.run_assert().await; - assert_eq!(events.len(), 1); -} diff --git a/src/sources/azure_blob/mod.rs b/src/sources/azure_blob/mod.rs deleted file mode 100644 index e2aa8d4892aeb..0000000000000 --- a/src/sources/azure_blob/mod.rs +++ /dev/null @@ -1,392 +0,0 @@ -use std::{future::Future, pin::Pin, time::Duration}; - -use async_stream::stream; -use bytes::Bytes; -use futures::{stream::StreamExt, Stream}; -use tokio::{select, time}; -use tokio_stream::wrappers::IntervalStream; -use vrl::path; - -use vector_lib::internal_event::Registered; -use vector_lib::{ - codecs::{ - decoding::{DeserializerConfig, FramingConfig, NewlineDelimitedDecoderOptions}, - NewlineDelimitedDecoderConfig, - }, - config::LegacyKey, - internal_event::{ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol}, - sensitive_string::SensitiveString, -}; - -use crate::{ - azure::ClientCredentials, - codecs::{Decoder, DecodingConfig}, - config::{ - LogNamespace, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput, - }, - event::{BatchNotifier, BatchStatus, EstimatedJsonEncodedSizeOf, Event}, - internal_events::{ - EventsReceived, InvalidRowEventType, QueueMessageProcessingErrored, - QueueMessageProcessingRejected, QueueMessageProcessingSucceeded, StreamClosedError, - }, - serde::{bool_or_struct, default_decoding}, - shutdown::ShutdownSignal, - sinks::prelude::configurable_component, - sources::azure_blob::queue::make_azure_row_stream, - SourceSender, -}; - -#[cfg(all(test, feature = "azure-blob-source-integration-tests"))] -mod integration_tests; -pub mod queue; -#[cfg(test)] -mod test; - -/// Strategies for consuming objects from Azure Storage. -#[configurable_component] -#[derive(Clone, Copy, Debug, Derivative)] -#[serde(rename_all = "lowercase")] -#[derivative(Default)] -enum Strategy { - /// Consumes objects by processing events sent to an [Azure Storage Queue][azure_storage_queue]. - /// - /// [azure_storage_queue]: https://learn.microsoft.com/en-us/azure/storage/queues/storage-queues-introduction - StorageQueue, - - /// This is a test strategy used only of development and PoC. Should be removed - /// once development is done. - #[derivative(Default)] - Test, -} - -/// WIP -/// A dummy implementation is used as a starter. -/// The source will send dummy messages at a fixed interval, incrementing a counter every -/// exec_interval_secs seconds. -#[configurable_component(source("azure_blob", "Collect logs from Azure Container."))] -#[derive(Clone, Debug, Derivative)] -#[derivative(Default)] -#[serde(default, deny_unknown_fields)] -pub struct AzureBlobConfig { - /// The namespace to use for logs. This overrides the global setting. - #[configurable(metadata(docs::hidden))] - #[serde(default)] - log_namespace: Option, - - /// The interval, in seconds, between subsequent dummy messages - #[serde(default = "default_exec_interval_secs")] - exec_interval_secs: u64, - - /// The strategy to use to consume objects from Azure Storage. - #[configurable(metadata(docs::hidden))] - strategy: Strategy, - - /// Configuration options for Storage Queue. - queue: Option, - - /// The Azure Blob Storage Account connection string. - /// - /// Authentication with access key is the only supported authentication method. - /// - /// Either `storage_account`, or this field, must be specified. - #[configurable(metadata( - docs::examples = "DefaultEndpointsProtocol=https;AccountName=mylogstorage;AccountKey=storageaccountkeybase64encoded;EndpointSuffix=core.windows.net" - ))] - pub connection_string: Option, - - /// The Azure Blob Storage Account name. - /// - /// Attempts to load credentials for the account in the following ways, in order: - /// - /// - read from environment variables ([more information][env_cred_docs]) - /// - looks for a [Managed Identity][managed_ident_docs] - /// - uses the `az` CLI tool to get an access token ([more information][az_cli_docs]) - /// - /// Either `connection_string`, or this field, must be specified. - /// - /// [env_cred_docs]: https://docs.rs/azure_identity/latest/azure_identity/struct.EnvironmentCredential.html - /// [managed_ident_docs]: https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/overview - /// [az_cli_docs]: https://docs.microsoft.com/en-us/cli/azure/account?view=azure-cli-latest#az-account-get-access-token - #[configurable(metadata(docs::examples = "mylogstorage"))] - pub storage_account: Option, - - #[configurable(derived)] - pub client_credentials: Option, - - /// The Azure Blob Storage Endpoint URL. - /// - /// This is used to override the default blob storage endpoint URL in cases where you are using - /// credentials read from the environment/managed identities or access tokens without using an - /// explicit connection_string (which already explicitly supports overriding the blob endpoint - /// URL). - /// - /// This may only be used with `storage_account` and is ignored when used with - /// `connection_string`. - #[configurable(metadata(docs::examples = "https://test.blob.core.usgovcloudapi.net/"))] - #[configurable(metadata(docs::examples = "https://test.blob.core.windows.net/"))] - pub endpoint: Option, - - /// The Azure Blob Storage Account container name. - #[configurable(metadata(docs::examples = "my-logs"))] - pub(super) container_name: String, - - #[configurable(derived)] - #[serde(default, deserialize_with = "bool_or_struct")] - pub acknowledgements: SourceAcknowledgementsConfig, - - #[configurable(derived)] - #[serde(default = "default_decoding")] - #[derivative(Default(value = "default_decoding()"))] - pub decoding: DeserializerConfig, -} - -impl_generate_config_from_default!(AzureBlobConfig); - -impl AzureBlobConfig { - /// Self validation - pub fn validate(&self) -> crate::Result<()> { - match self.strategy { - Strategy::StorageQueue => { - if self.queue.is_none() || self.queue.as_ref().unwrap().queue_name.is_empty() { - return Err("Azure event grid queue must be set.".into()); - } - if self.storage_account.clone().unwrap_or_default().is_empty() - && self - .connection_string - .clone() - .unwrap_or_default() - .inner() - .is_empty() - { - return Err("Azure Storage Account or Connection String must be set.".into()); - } - if self.container_name.is_empty() { - return Err("Azure Container must be set.".into()); - } - } - Strategy::Test => { - if self.exec_interval_secs == 0 { - return Err("exec_interval_secs must be greater than 0".into()); - } - } - } - - Ok(()) - } -} - -type BlobStream = Pin> + Send>>; - -pub struct BlobPack { - row_stream: BlobStream, - success_handler: Box Pin + Send>> + Send>, -} - -type BlobPackStream = Pin + Send>>; - -struct AzureBlobStreamer { - shutdown: ShutdownSignal, - out: SourceSender, - log_namespace: LogNamespace, - acknowledge: bool, - decoder: Decoder, - bytes_received: Registered, - events_received: Registered, -} - -impl AzureBlobStreamer { - pub fn new( - shutdown: ShutdownSignal, - out: SourceSender, - log_namespace: LogNamespace, - acknowledge: bool, - decoding: DeserializerConfig, - ) -> crate::Result { - Ok(Self { - shutdown, - out, - log_namespace: log_namespace.clone(), - acknowledge, - decoder: { - let framing = FramingConfig::NewlineDelimited(NewlineDelimitedDecoderConfig { - newline_delimited: NewlineDelimitedDecoderOptions { max_length: None }, - }); - DecodingConfig::new(framing, decoding, log_namespace).build()? - }, - bytes_received: register!(BytesReceived::from(Protocol::HTTP)), - events_received: register!(EventsReceived), - }) - } - - pub async fn run_streaming(mut self, mut blob_stream: BlobPackStream) -> Result<(), ()> { - debug!("Starting Azure streaming."); - - loop { - select! { - blob_pack = blob_stream.next() => { - match blob_pack{ - Some(blob_pack) => { - self.process_blob_pack(blob_pack).await?; - } - None => { - break; // end of stream - } - } - }, - _ = self.shutdown.clone() => { - break; - } - } - } - - Ok(()) - } - - async fn process_blob_pack(&mut self, blob_pack: BlobPack) -> Result<(), ()> { - let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(self.acknowledge); - let mut row_stream = blob_pack.row_stream; - let mut output_stream = { - let bytes_received = self.bytes_received.clone(); - let events_received = self.events_received.clone(); - let log_namespace = self.log_namespace.clone(); - let decoder = self.decoder.clone(); - stream! { - // TODO: consider selecting with a shutdown - while let Some(row) = row_stream.next().await { - bytes_received.emit(ByteSize(row.len())); - let deser_result = decoder.deserializer_parse(Bytes::from(row)); - if deser_result.is_err(){ - continue; - } - // Error handling is done above, so we don't mind doing unwrap. - let (events, _) = deser_result.unwrap(); - for mut event in events.into_iter(){ - event = event.with_batch_notifier_option(&batch); - match event { - Event::Log(ref mut log_event) => { - log_namespace.insert_source_metadata( - AzureBlobConfig::NAME, - log_event, - Some(LegacyKey::Overwrite("ingest_timestamp")), - path!("ingest_timestamp"), - chrono::Utc::now().to_rfc3339(), - ); - events_received.emit(CountByteSize(1, event.estimated_json_encoded_size_of())); - yield event - } - _ => { - emit!(InvalidRowEventType{event: &event}) - } - } - } - } - // Explicitly dropping to showcase that the status of the batch is sent to the channel. - drop(batch); - }.boxed() - }; - - // Return if send was unsuccessful. - if let Err(send_error) = self.out.send_event_stream(&mut output_stream).await { - // TODO: consider dedicated error. - error!("Failed to send event stream: {}.", send_error); - let (count, _) = output_stream.size_hint(); - emit!(StreamClosedError { count }); - return Ok(()); - } - - // dropping like s3 sender - drop(output_stream); // TODO: better explanation - - // Run success handler if there are no errors in send or acknowledgement. - match receiver { - None => (blob_pack.success_handler)().await, - Some(receiver) => { - let result = receiver.await; - match result { - BatchStatus::Delivered => { - (blob_pack.success_handler)().await; - emit!(QueueMessageProcessingSucceeded {}); - } - BatchStatus::Errored => { - emit!(QueueMessageProcessingErrored {}); - } - BatchStatus::Rejected => { - // TODO: consider allowing rejected events wihtout retrying, like s3 - emit!(QueueMessageProcessingRejected {}); - } - } - } - } - - Ok(()) - } -} - -#[async_trait::async_trait] -#[typetag::serde(name = "azure_blob")] -impl SourceConfig for AzureBlobConfig { - async fn build(&self, cx: SourceContext) -> crate::Result { - self.validate()?; - let azure_blob_streamer = AzureBlobStreamer::new( - cx.shutdown.clone(), - cx.out.clone(), - cx.log_namespace(self.log_namespace), - cx.do_acknowledgements(self.acknowledgements), - self.decoding.clone(), - )?; - - let blob_pack_stream: BlobPackStream = match self.strategy { - Strategy::Test => { - // streaming incremented numbers periodically - let exec_interval_secs = self.exec_interval_secs; - let shutdown = cx.shutdown.clone(); - stream! { - let schedule = Duration::from_secs(exec_interval_secs); - let mut counter = 0; - let mut interval = IntervalStream::new(time::interval(schedule)).take_until(shutdown); - while interval.next().await.is_some() { - counter += 1; - let counter_copy = counter; - yield BlobPack { - row_stream: stream! { - for i in 0..=counter { - yield format!("{}:{}", counter, i).into_bytes(); - } - }.boxed(), - success_handler: Box::new(move || { - Box::pin(async move { - debug!("Successfully processed blob pack for counter {}.", counter_copy); - }) - }), - } - } - }.boxed() - } - Strategy::StorageQueue => make_azure_row_stream(self, cx.shutdown.clone())?, - }; - Ok(Box::pin( - azure_blob_streamer.run_streaming(blob_pack_stream), - )) - } - - fn outputs(&self, global_log_namespace: LogNamespace) -> Vec { - let log_namespace = global_log_namespace.merge(self.log_namespace); - let schema_definition = self - .decoding - .schema_definition(log_namespace) - .with_standard_vector_source_metadata(); - - vec![SourceOutput::new_maybe_logs( - self.decoding.output_type(), - schema_definition, - )] - } - - fn can_acknowledge(&self) -> bool { - true - } -} - -fn default_exec_interval_secs() -> u64 { - 1 -} diff --git a/src/sources/azure_blob/queue.rs b/src/sources/azure_blob/queue.rs deleted file mode 100644 index 8754c3443ef4d..0000000000000 --- a/src/sources/azure_blob/queue.rs +++ /dev/null @@ -1,363 +0,0 @@ -use std::{ - io::{BufRead, BufReader, Cursor}, - panic, - sync::Arc, -}; - -use anyhow::anyhow; -use async_stream::stream; -use azure_core; -use azure_storage_blobs::prelude::ContainerClient; -use azure_storage_queues::{operations::Message, QueueClient}; -use base64::{prelude::BASE64_STANDARD, Engine}; -use futures::stream::StreamExt; -use serde::Deserialize; -use serde_with::serde_as; -use snafu::Snafu; -use tokio::{select, time}; - -use vector_lib::{ - configurable::configurable_component, - internal_event::{ByteSize, BytesReceived, InternalEventHandle, Protocol, Registered}, -}; - -use crate::{ - azure, - internal_events::{ - QueueMessageDeleteError, QueueMessageProcessingError, QueueMessageReceiveError, - QueueStorageInvalidEventIgnored, QueueStorageMismatchingContainerName, BlobDoesntExist, - }, - shutdown::ShutdownSignal, - sources::azure_blob::{AzureBlobConfig, BlobPack, BlobPackStream}, -}; - -/// Azure Queue configuration options. -#[serde_as] -#[configurable_component] -#[derive(Clone, Debug, Derivative)] -#[derivative(Default)] -#[serde(deny_unknown_fields)] -pub(super) struct Config { - /// The name of the storage queue to poll for events. - pub(super) queue_name: String, - - /// How long to wait while polling the event grid queue for new messages, in seconds. - /// - // NOTE: We restrict this to u32 for safe conversion to i32 later. - #[serde(default = "default_poll_secs")] - #[derivative(Default(value = "default_poll_secs()"))] - #[configurable(metadata(docs::type_unit = "seconds"))] - pub(super) poll_secs: u32, -} - -pub fn make_azure_row_stream( - cfg: &AzureBlobConfig, - shutdown: ShutdownSignal, -) -> crate::Result { - let queue_client = make_queue_client(cfg)?; - let container_client = make_container_client(cfg)?; - let bytes_received = register!(BytesReceived::from(Protocol::HTTP)); - let poll_interval = std::time::Duration::from_secs( - cfg.queue - .as_ref() - .ok_or(anyhow!("Missing Event Grid queue config."))? - .poll_secs as u64, - ); - - Ok(Box::pin(stream! { - // TODO: add a way to stop this loop, possibly with shutdown - loop { - let messages = match queue_client.get_messages().number_of_messages(num_messages()).await { - Ok(messages) => messages, - Err(e) => { - emit!(QueueMessageReceiveError{error: &e}); - continue; - } - }; - if !messages.messages.is_empty() { - for message in messages.messages { - let msg_id = message.message_id.clone(); - match proccess_event_grid_message( - message, - &container_client, - &queue_client, - bytes_received.clone() - ).await { - Ok(blob_pack) => { - match blob_pack { - None => trace!("Message {msg_id} is ignored, \ - no blob stream stream created from it. \ - Will retry on next message."), - Some(bp) => yield bp - } - }, - Err(e) => { - emit!(QueueMessageProcessingError{ - error: &e, - message_id: &msg_id - }); - } - } - } - } else { - // sleep or shutdown - select! { - _ = shutdown.clone() => { - info!("Shutdown signal received, terminating azure row stream."); - break; - }, - _ = time::sleep(poll_interval) => { } - } - } - } - })) -} - -pub fn make_queue_client(cfg: &AzureBlobConfig) -> crate::Result> { - let q = cfg.queue.clone().ok_or("Missing queue.")?; - azure::build_queue_client( - cfg.connection_string - .as_ref() - .map(|v| v.inner().to_string()), - cfg.storage_account.as_ref().map(|v| v.to_string()), - q.queue_name.clone(), - cfg.endpoint.clone(), - cfg.client_credentials.clone(), - ) -} - -pub fn make_container_client(cfg: &AzureBlobConfig) -> crate::Result> { - azure::build_container_client( - cfg.connection_string - .as_ref() - .map(|v| v.inner().to_string()), - cfg.storage_account.as_ref().map(|v| v.to_string()), - cfg.container_name.clone(), - cfg.endpoint.clone(), - cfg.client_credentials.clone(), - ) -} - -#[derive(Clone, Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -struct AzureStorageEvent { - pub subject: String, - pub event_type: String, -} - -#[derive(Debug, Snafu)] -pub enum ProcessingError { - #[snafu(display("Could not decode Queue message with id {}: {}", message_id, error))] - InvalidQueueMessage { - error: serde_json::Error, - message_id: String, - }, - - #[snafu(display("Failed to base64 decode message: {}", error))] - FailedDecodingMessageBase64 { error: base64::DecodeError }, - - #[snafu(display("Failed to utf8 decode message: {}", error))] - FailedDecodingUTF8 { error: std::string::FromUtf8Error }, - - #[snafu(display("Failed to get blob: {}", error))] - FailedToGetBlob { error: azure_core::Error }, - - #[snafu(display("Failed to parse {} as subject", subject))] - FailedToParseSubject { subject: String }, -} - -async fn proccess_event_grid_message( - message: Message, - container_client: &ContainerClient, - queue_client: &QueueClient, - bytes_received: Registered, -) -> Result, ProcessingError> { - let msg_id = message.message_id.clone(); - let decoded_bytes = BASE64_STANDARD - .decode(&message.message_text) - .map_err(|e| ProcessingError::FailedDecodingMessageBase64 { error: e })?; - let decoded_string = String::from_utf8(decoded_bytes) - .map_err(|e| ProcessingError::FailedDecodingUTF8 { error: e })?; - let body: AzureStorageEvent = serde_json::from_str(decoded_string.as_str()).map_err(|e| { - ProcessingError::InvalidQueueMessage { - error: e, - message_id: msg_id, - } - })?; - if body.event_type != "Microsoft.Storage.BlobCreated" { - emit!(QueueStorageInvalidEventIgnored { - container: container_client.container_name(), - subject: &body.subject, - event_type: &body.event_type, - }); - return Ok(None); - } - match parse_subject(body.subject.clone()) { - Some((container, blob)) => { - if container != container_client.container_name() { - emit!(QueueStorageMismatchingContainerName { - configured_container: container_client.container_name(), - container: container.as_str(), - }); - - return Ok(None); - } - trace!( - "Detected new blob creation in container '{}': '{}'", - &container, - &blob - ); - let blob_client = container_client.blob_client(blob); - let mut result: Vec = vec![]; - let mut stream = blob_client.get().into_stream(); - while let Some(value) = stream.next().await { - match value { - Ok(response) => { - let mut body = response.data; - while let Some(value) = body.next().await { - match value { - Ok(chunk) => result.extend(&chunk), - Err(e) => { - // This should now happen as long as `next()` is working - // correctly. Leaving just a safeguard, not to crash Vector. - trace!("Failed to read body chunk: {}", e); - break; - } - } - } - } - Err(e) => { - if let Some(http_error) = e.as_http_error() { - if http_error.status() == azure_core::StatusCode::NotFound { - emit!(BlobDoesntExist{ - nonexistent_blob_name: blob_client.blob_name(), - }); - remove_message_from_queue(queue_client, message).await; - return Ok(None); - } - } - return Err(ProcessingError::FailedToGetBlob { error: e }); - } - } - } - - let reader = Cursor::new(result); - let buffered = BufReader::new(reader); - let queue_client_copy = queue_client.clone(); - let bytes_received_copy = bytes_received.clone(); - - Ok(Some(BlobPack { - row_stream: Box::pin(stream! { - for line in buffered.lines() { - let line = line.map(|line| line.as_bytes().to_vec()); - let line = match line { - Ok(l) => l, - Err(e) => { - error!("Failed to map line: {}", e); - break; - } - }; - bytes_received_copy.emit(ByteSize(line.len())); - yield line; - } - }), - success_handler: Box::new(|| { - Box::pin(async move { - remove_message_from_queue(&queue_client_copy, message).await; - }) - }), - })) - } - None => { - return Err(ProcessingError::FailedToParseSubject { - subject: body.subject, - }); - } - } -} - -fn parse_subject(subject: String) -> Option<(String, String)> { - let parts: Vec<&str> = subject.split('/').collect(); - if parts.len() < 7 { - warn!("Ignoring event because of wrong subject format"); - return None; - } - let container = parts[4]; - let blob = parts[6..].join("/"); - Some((container.to_string(), blob)) -} - -const fn default_poll_secs() -> u32 { - 15 -} - -// Number of messages to consume from the queue at once. This is the maximum -// value allowed by the Azure API. -const fn num_messages() -> u8 { - 32 -} - -async fn remove_message_from_queue(queue_client: &QueueClient, message: Message) { - _ = queue_client.pop_receipt_client(message).delete().await.inspect_err(move |e| { - emit!(QueueMessageDeleteError { error: &e }) - }) -} - -#[test] -fn test_azure_storage_event() { - let event_value: AzureStorageEvent = serde_json::from_str( - r#"{ - "topic": "/subscriptions/fa5f2180-1451-4461-9b1f-aae7d4b33cf8/resourceGroups/events_poc/providers/Microsoft.Storage/storageAccounts/eventspocaccount", - "subject": "/blobServices/default/containers/content/blobs/foo", - "eventType": "Microsoft.Storage.BlobCreated", - "id": "be3f21f7-201e-000b-7605-a29195062628", - "data": { - "api": "PutBlob", - "clientRequestId": "1fa42c94-6dd3-4172-95c4-fd9cf56b5009", - "requestId": "be3f21f7-201e-000b-7605-a29195000000", - "eTag": "0x8DC701C5D3FFDF6", - "contentType": "application/octet-stream", - "contentLength": 0, - "blobType": "BlockBlob", - "url": "https://eventspocaccount.blob.core.windows.net/content/foo", - "sequencer": "0000000000000000000000000005C5360000000000276a63", - "storageDiagnostics": { - "batchId": "fec5b12c-2006-0034-0005-a25936000000" - } - }, - "dataVersion": "", - "metadataVersion": "1", - "eventTime": "2024-05-09T11:37:10.5637878Z" - }"#, - ).unwrap(); - - assert_eq!( - event_value.subject, - "/blobServices/default/containers/content/blobs/foo".to_string() - ); - assert_eq!( - event_value.event_type, - "Microsoft.Storage.BlobCreated".to_string() - ); -} - -#[test] -fn test_parse_subject_no_dir() { - let subject = "/blobServices/default/containers/content/blobs/foo".to_string(); - let result = parse_subject(subject); - assert_eq!(result, Some(("content".to_string(), "foo".to_string()))); -} - -#[test] -fn test_parse_subject_with_dirs() { - let subject = "/blobServices/default/containers/insights-logs-signinlogs/blobs/tenantId=0e35ee7a-425d-45a5-9013-218c1eae8fd4/y=2024/m=06/d=20/h=05/m=00/PT1H.json".to_string(); - let result = parse_subject(subject); - assert_eq!( - result, - Some(( - "insights-logs-signinlogs".to_string(), - "tenantId=0e35ee7a-425d-45a5-9013-218c1eae8fd4/y=2024/m=06/d=20/h=05/m=00/PT1H.json" - .to_string() - )) - ); -} diff --git a/src/sources/azure_blob/test.rs b/src/sources/azure_blob/test.rs deleted file mode 100644 index 83b0e820ca02d..0000000000000 --- a/src/sources/azure_blob/test.rs +++ /dev/null @@ -1,95 +0,0 @@ -use super::*; -use crate::{ - config::LogNamespace, event::EventStatus, serde::default_decoding, shutdown::ShutdownSignal, - test_util::collect_n, SourceSender, -}; -use tokio::{select, sync::oneshot}; - -#[tokio::test] -async fn test_messages_delivered() { - let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered); - let streamer = super::AzureBlobStreamer::new( - ShutdownSignal::noop(), - tx, - LogNamespace::Vector, - true, - default_decoding(), - ); - let mut streamer = streamer.expect("Failed to create streamer"); - let (success_sender, success_receiver) = oneshot::channel(); - let blob_pack = BlobPack { - row_stream: Box::pin(stream! { - let lines = vec!["foo", "bar"]; - for line in lines { - yield line.as_bytes().to_vec(); - } - }), - success_handler: Box::new(move || { - Box::pin(async move { - success_sender.send(()).unwrap(); - }) - }), - }; - let (events_collector, events_receiver) = oneshot::channel(); - tokio::spawn(async move { - events_collector.send(collect_n(rx, 2).await).unwrap(); - }); - streamer - .process_blob_pack(blob_pack) - .await - .expect("Failed processing blob pack"); - - let events = select! { - value = events_receiver => value.expect("Failed to receive events"), - _ = time::sleep(Duration::from_secs(5)) => panic!("Timeout waiting for events"), - }; - assert_eq!(events[0].as_log().value().to_string(), "\"foo\""); - assert_eq!(events[1].as_log().value().to_string(), "\"bar\""); - select! { - _ = success_receiver => {} - _ = time::sleep(Duration::from_secs(5)) => panic!("Timeout waiting for success handler"), - } -} - -#[tokio::test] -async fn test_messages_rejected() { - let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Rejected); - let streamer = super::AzureBlobStreamer::new( - ShutdownSignal::noop(), - tx, - LogNamespace::Vector, - true, - default_decoding(), - ); - let mut streamer = streamer.expect("Failed to create streamer"); - let (success_sender, mut success_receiver) = oneshot::channel(); - let blob_pack = BlobPack { - row_stream: Box::pin(stream! { - let lines = vec!["foo", "bar"]; - for line in lines { - yield line.as_bytes().to_vec(); - } - }), - success_handler: Box::new(move || { - Box::pin(async move { - success_sender.send(()).unwrap(); - }) - }), - }; - let (events_collector, events_receiver) = oneshot::channel(); - tokio::spawn(async move { - events_collector.send(collect_n(rx, 2).await).unwrap(); - }); - streamer - .process_blob_pack(blob_pack) - .await - .expect("Failed processing blob pack"); - - let events = select! { - value = events_receiver => value.expect("Failed to receive events"), - _ = time::sleep(Duration::from_secs(5)) => panic!("Timeout waiting for events"), - }; - assert_eq!(events[0].as_log().value().to_string(), "\"foo\""); - assert_eq!(events[1].as_log().value().to_string(), "\"bar\""); - assert!(success_receiver.try_recv().is_err()); // assert success handler not called -} diff --git a/src/sources/mod.rs b/src/sources/mod.rs index bdaa17166d89e..f7b2b6bb534e8 100644 --- a/src/sources/mod.rs +++ b/src/sources/mod.rs @@ -13,8 +13,6 @@ pub mod aws_kinesis_firehose; pub mod aws_s3; #[cfg(feature = "sources-aws_sqs")] pub mod aws_sqs; -#[cfg(feature = "sources-azure_blob")] -pub mod azure_blob; #[cfg(feature = "sources-datadog_agent")] pub mod datadog_agent; #[cfg(feature = "sources-demo_logs")]