From 07c8b1862fd740659e398723f438f2d03b4cc284 Mon Sep 17 00:00:00 2001 From: Flavio Cruz Date: Mon, 20 Apr 2026 20:16:01 +0000 Subject: [PATCH 1/8] feat(databricks zerobus): add new databricks_zerobus for ingesting data into Databricks --- .github/actions/spelling/expect.txt | 2 + Cargo.lock | 241 +++++++- Cargo.toml | 5 + .../24840_databricks_zerobus_sink.feature.md | 3 + lib/codecs/src/encoding/encoder.rs | 54 +- lib/codecs/src/encoding/format/arrow.rs | 13 + lib/codecs/src/encoding/format/mod.rs | 2 + lib/codecs/src/encoding/format/proto_batch.rs | 127 ++++ lib/codecs/src/encoding/mod.rs | 9 +- lib/codecs/src/encoding/serializer.rs | 18 +- src/sinks/clickhouse/config.rs | 6 +- src/sinks/databricks_zerobus/config.rs | 431 ++++++++++++++ src/sinks/databricks_zerobus/error.rs | 166 ++++++ src/sinks/databricks_zerobus/mod.rs | 12 + src/sinks/databricks_zerobus/service.rs | 563 ++++++++++++++++++ src/sinks/databricks_zerobus/sink.rs | 123 ++++ .../nested_structs_complete_schema.json | 127 ++++ .../unity_catalog_schema.rs | 456 ++++++++++++++ src/sinks/mod.rs | 2 + src/sinks/util/encoding.rs | 2 - .../components/sinks/databricks_zerobus.cue | 156 +++++ .../reference/services/databricks_zerobus.cue | 10 + website/cue/reference/urls.cue | 2 + 23 files changed, 2506 insertions(+), 24 deletions(-) create mode 100644 changelog.d/24840_databricks_zerobus_sink.feature.md create mode 100644 lib/codecs/src/encoding/format/proto_batch.rs create mode 100644 src/sinks/databricks_zerobus/config.rs create mode 100644 src/sinks/databricks_zerobus/error.rs create mode 100644 src/sinks/databricks_zerobus/mod.rs create mode 100644 src/sinks/databricks_zerobus/service.rs create mode 100644 src/sinks/databricks_zerobus/sink.rs create mode 100644 src/sinks/databricks_zerobus/tests/fixtures/nested_structs_complete_schema.json create mode 100644 src/sinks/databricks_zerobus/unity_catalog_schema.rs create mode 100644 website/cue/reference/components/sinks/databricks_zerobus.cue create mode 100644 website/cue/reference/services/databricks_zerobus.cue diff --git a/.github/actions/spelling/expect.txt b/.github/actions/spelling/expect.txt index 3656732c51bba..0ff760753c259 100644 --- a/.github/actions/spelling/expect.txt +++ b/.github/actions/spelling/expect.txt @@ -124,6 +124,7 @@ customizability CVS cwl daschl +databricks dataflows datafuselabs datasources @@ -677,6 +678,7 @@ ytt YXRR yyy zieme +zerobus zoog zork zorp diff --git a/Cargo.lock b/Cargo.lock index a39a96a6500b1..da8f8943c24f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1548,7 +1548,7 @@ dependencies = [ "http-body 0.4.6", "hyper 0.14.32", "itoa", - "matchit", + "matchit 0.7.3", "memchr", "mime", "percent-encoding", @@ -1576,7 +1576,7 @@ dependencies = [ "http-body 1.0.1", "http-body-util", "itoa", - "matchit", + "matchit 0.7.3", "memchr", "mime", "percent-encoding", @@ -1589,6 +1589,31 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8" +dependencies = [ + "axum-core 0.5.6", + "bytes", + "futures-util", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "itoa", + "matchit 0.8.4", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde_core", + "sync_wrapper 1.0.1", + "tower 0.5.3", + "tower-layer", + "tower-service", +] + [[package]] name = "axum-core" version = "0.3.4" @@ -1626,6 +1651,24 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-core" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1" +dependencies = [ + "bytes", + "futures-core", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "sync_wrapper 1.0.1", + "tower-layer", + "tower-service", +] + [[package]] name = "azure_core" version = "0.30.1" @@ -3185,6 +3228,32 @@ dependencies = [ "uuid", ] +[[package]] +name = "databricks-zerobus-ingest-sdk" +version = "1.1.0" +source = "git+https://github.com/databricks/zerobus-sdk-rs?rev=a963e81#a963e81d6f3da61b8436714b1f23a3c64d663c33" +dependencies = [ + "async-trait", + "bytes", + "hyper-http-proxy", + "hyper-util", + "prost 0.13.5", + "prost-types 0.13.5", + "protoc-bin-vendored", + "reqwest 0.12.28", + "serde", + "serde_json", + "smallvec", + "thiserror 1.0.68", + "tokio", + "tokio-retry", + "tokio-stream", + "tokio-util", + "tonic 0.13.1", + "tonic-build 0.13.1", + "tracing 0.1.44", +] + [[package]] name = "dbl" version = "0.3.2" @@ -4595,13 +4664,28 @@ checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270" dependencies = [ "base64 0.21.7", "bytes", - "headers-core", + "headers-core 0.2.0", "http 0.2.12", "httpdate", "mime", "sha1", ] +[[package]] +name = "headers" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3314d5adb5d94bcdf56771f2e50dbbc80bb4bdf88967526706205ac9eff24eb" +dependencies = [ + "base64 0.22.1", + "bytes", + "headers-core 0.3.0", + "http 1.3.1", + "httpdate", + "mime", + "sha1", +] + [[package]] name = "headers-core" version = "0.2.0" @@ -4611,6 +4695,15 @@ dependencies = [ "http 0.2.12", ] +[[package]] +name = "headers-core" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" +dependencies = [ + "http 1.3.1", +] + [[package]] name = "heck" version = "0.4.1" @@ -5045,6 +5138,23 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-http-proxy" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ad4b0a1e37510028bc4ba81d0e38d239c39671b0f0ce9e02dfa93a8133f7c08" +dependencies = [ + "bytes", + "futures-util", + "headers 0.4.1", + "http 1.3.1", + "hyper 1.7.0", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-named-pipe" version = "0.1.0" @@ -5105,7 +5215,7 @@ checksum = "ca815a891b24fdfb243fa3239c86154392b0953ee584aa1a2a1f66d20cbe75cc" dependencies = [ "bytes", "futures", - "headers", + "headers 0.3.9", "http 0.2.12", "hyper 0.14.32", "openssl", @@ -6433,6 +6543,12 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "matchit" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" + [[package]] name = "matrixmultiply" version = "0.3.8" @@ -8346,6 +8462,70 @@ dependencies = [ "prost 0.13.5", ] +[[package]] +name = "protoc-bin-vendored" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1c381df33c98266b5f08186583660090a4ffa0889e76c7e9a5e175f645a67fa" +dependencies = [ + "protoc-bin-vendored-linux-aarch_64", + "protoc-bin-vendored-linux-ppcle_64", + "protoc-bin-vendored-linux-s390_64", + "protoc-bin-vendored-linux-x86_32", + "protoc-bin-vendored-linux-x86_64", + "protoc-bin-vendored-macos-aarch_64", + "protoc-bin-vendored-macos-x86_64", + "protoc-bin-vendored-win32", +] + +[[package]] +name = "protoc-bin-vendored-linux-aarch_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c350df4d49b5b9e3ca79f7e646fde2377b199e13cfa87320308397e1f37e1a4c" + +[[package]] +name = "protoc-bin-vendored-linux-ppcle_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a55a63e6c7244f19b5c6393f025017eb5d793fd5467823a099740a7a4222440c" + +[[package]] +name = "protoc-bin-vendored-linux-s390_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1dba5565db4288e935d5330a07c264a4ee8e4a5b4a4e6f4e83fad824cc32f3b0" + +[[package]] +name = "protoc-bin-vendored-linux-x86_32" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8854774b24ee28b7868cd71dccaae8e02a2365e67a4a87a6cd11ee6cdbdf9cf5" + +[[package]] +name = "protoc-bin-vendored-linux-x86_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b38b07546580df720fa464ce124c4b03630a6fb83e05c336fea2a241df7e5d78" + +[[package]] +name = "protoc-bin-vendored-macos-aarch_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89278a9926ce312e51f1d999fee8825d324d603213344a9a706daa009f1d8092" + +[[package]] +name = "protoc-bin-vendored-macos-x86_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81745feda7ccfb9471d7a4de888f0652e806d5795b61480605d4943176299756" + +[[package]] +name = "protoc-bin-vendored-win32" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95067976aca6421a523e491fce939a3e65249bac4b977adee0ee9771568e8aa3" + [[package]] name = "psl" version = "2.1.22" @@ -11337,6 +11517,7 @@ dependencies = [ "futures-core", "futures-io", "futures-sink", + "futures-util", "pin-project-lite", "slab", "tokio", @@ -11513,6 +11694,37 @@ dependencies = [ "tracing 0.1.44", ] +[[package]] +name = "tonic" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9" +dependencies = [ + "async-trait", + "axum 0.8.8", + "base64 0.22.1", + "bytes", + "h2 0.4.13", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "hyper 1.7.0", + "hyper-timeout 0.5.1", + "hyper-util", + "percent-encoding", + "pin-project", + "prost 0.13.5", + "rustls-native-certs 0.8.1", + "socket2 0.5.10", + "tokio", + "tokio-rustls 0.26.2", + "tokio-stream", + "tower 0.5.3", + "tower-layer", + "tower-service", + "tracing 0.1.44", +] + [[package]] name = "tonic-build" version = "0.9.2" @@ -11539,6 +11751,20 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "tonic-build" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac6f67be712d12f0b41328db3137e0d0757645d8904b4cb7d51cd9c2279e847" +dependencies = [ + "prettyplease 0.2.37", + "proc-macro2 1.0.106", + "prost-build 0.13.5", + "prost-types 0.13.5", + "quote 1.0.44", + "syn 2.0.117", +] + [[package]] name = "tonic-health" version = "0.11.0" @@ -12330,6 +12556,7 @@ dependencies = [ "criterion", "csv", "databend-client", + "databricks-zerobus-ingest-sdk", "deadpool 0.13.0", "derivative", "dirs-next", @@ -12352,7 +12579,7 @@ dependencies = [ "h2 0.4.13", "hash_hasher", "hashbrown 0.14.5", - "headers", + "headers 0.3.9", "heim", "hex", "hickory-proto 0.25.2", @@ -12654,7 +12881,7 @@ dependencies = [ "float_eq", "futures", "futures-util", - "headers", + "headers 0.3.9", "http 0.2.12", "hyper-proxy", "indexmap 2.12.0", @@ -13078,7 +13305,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "headers", + "headers 0.3.9", "http 0.2.12", "hyper 0.14.32", "log", diff --git a/Cargo.toml b/Cargo.toml index 1045e07aae1a1..612ff2ecdeb55 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -331,6 +331,9 @@ prost = { workspace = true, optional = true } prost-reflect = { workspace = true, optional = true } prost-types = { workspace = true, optional = true } +# Databricks Zerobus +databricks-zerobus-ingest-sdk = { git = "https://github.com/databricks/zerobus-sdk-rs", rev = "a963e81", optional = true } + # GCP goauth = { version = "0.16.0", optional = true } smpl_jwt = { version = "0.8.0", default-features = false, optional = true } @@ -826,6 +829,7 @@ sinks-logs = [ "sinks-clickhouse", "sinks-console", "sinks-databend", + "sinks-databricks-zerobus", "sinks-datadog_events", "sinks-datadog_logs", "sinks-datadog_traces", @@ -894,6 +898,7 @@ sinks-chronicle = [] sinks-clickhouse = ["dep:nom", "dep:rust_decimal", "codecs-arrow"] sinks-console = [] sinks-databend = ["dep:databend-client"] +sinks-databricks-zerobus = ["dep:databricks-zerobus-ingest-sdk", "dep:prost-reflect", "dep:base64"] sinks-datadog_events = [] sinks-datadog_logs = [] sinks-datadog_metrics = ["protobuf-build", "dep:prost", "dep:prost-reflect"] diff --git a/changelog.d/24840_databricks_zerobus_sink.feature.md b/changelog.d/24840_databricks_zerobus_sink.feature.md new file mode 100644 index 0000000000000..34a8ecb1c9224 --- /dev/null +++ b/changelog.d/24840_databricks_zerobus_sink.feature.md @@ -0,0 +1,3 @@ +Add a new `databricks_zerobus` sink that streams log data to Databricks Unity Catalog tables via the Zerobus ingestion service. Supports OAuth 2.0 authentication, automatic schema fetching from Unity Catalog, and protobuf batch encoding. + +authors: flaviocruz diff --git a/lib/codecs/src/encoding/encoder.rs b/lib/codecs/src/encoding/encoder.rs index b3ca9a00216ec..b2ff3cd2c5d7b 100644 --- a/lib/codecs/src/encoding/encoder.rs +++ b/lib/codecs/src/encoding/encoder.rs @@ -8,10 +8,24 @@ use crate::encoding::ArrowStreamSerializer; #[cfg(feature = "parquet")] use crate::encoding::ParquetSerializer; use crate::{ - encoding::{Error, Framer, Serializer}, + encoding::{Error, Framer, ProtoBatchSerializer, Serializer}, internal_events::{EncoderFramingError, EncoderSerializeError}, }; +/// The output of a batch encoding operation. +/// +/// Different batch serializers produce different output types: +/// - Arrow serializer produces a `RecordBatch` +/// - Proto serializer produces individual byte buffers per event +#[derive(Debug)] +pub enum BatchOutput { + /// An Arrow RecordBatch containing all events encoded as columnar data. + #[cfg(feature = "arrow")] + Arrow(arrow::record_batch::RecordBatch), + /// A list of individually-serialized records (one per event). + Records(Vec>), +} + /// Serializers that support batch encoding (encoding all events at once). #[derive(Debug, Clone)] pub enum BatchSerializer { @@ -21,6 +35,8 @@ pub enum BatchSerializer { /// Parquet format serializer. #[cfg(feature = "parquet")] Parquet(Box), + /// Protobuf batch serializer that encodes each event individually. + ProtoBatch(ProtoBatchSerializer), } /// An encoder that encodes batches of events. @@ -47,6 +63,34 @@ impl BatchEncoder { BatchSerializer::Arrow(_) => "application/vnd.apache.arrow.stream", #[cfg(feature = "parquet")] BatchSerializer::Parquet(_) => "application/vnd.apache.parquet", + _ => unreachable!(), + } + } + + /// Encode a batch of events into a `BatchOutput`. + pub fn encode_batch(&self, events: &[Event]) -> Result { + match &self.serializer { + #[cfg(feature = "arrow")] + BatchSerializer::Arrow(serializer) => { + let record_batch = serializer.encode_to_record_batch(events).map_err(|err| { + use crate::encoding::ArrowEncodingError; + match err { + ArrowEncodingError::NullConstraint { .. } => { + Error::SchemaConstraintViolation(Box::new(err)) + } + _ => Error::SerializingError(Box::new(err)), + } + })?; + Ok(BatchOutput::Arrow(record_batch)) + } + BatchSerializer::ProtoBatch(serializer) => { + let records = serializer + .encode_batch(events) + .map_err(|err| Error::SerializingError(Box::new(err)))?; + Ok(BatchOutput::Records(records)) + } + #[allow(unreachable_patterns)] + _ => unreachable!("encode_batch not supported for this serializer"), } } } @@ -74,19 +118,19 @@ impl tokio_util::codec::Encoder> for BatchEncoder { BatchSerializer::Parquet(serializer) => serializer .encode(events, buffer) .map_err(Error::SerializingError), - #[allow(unreachable_patterns)] - _ => unreachable!("BatchSerializer cannot be constructed without encode()"), + _ => unreachable!( + "tokio Encoder trait is only used for Arrow/Parquet; other batch serializers use encode_batch()" + ), } } } -/// An wrapper that supports both framed and batch encoding modes. +/// A wrapper that supports both framed and batch encoding modes. #[derive(Debug, Clone)] pub enum EncoderKind { /// Uses framing to encode individual events Framed(Box>), /// Encodes events in batches without framing - #[cfg(any(feature = "arrow", feature = "parquet"))] Batch(BatchEncoder), } diff --git a/lib/codecs/src/encoding/format/arrow.rs b/lib/codecs/src/encoding/format/arrow.rs index b8c86239ddb6a..f943f05d26ed7 100644 --- a/lib/codecs/src/encoding/format/arrow.rs +++ b/lib/codecs/src/encoding/format/arrow.rs @@ -93,6 +93,19 @@ pub struct ArrowStreamSerializer { } impl ArrowStreamSerializer { + /// Encode events into a `RecordBatch` without writing to IPC stream format. + pub fn encode_to_record_batch( + &self, + events: &[Event], + ) -> Result { + let values = vector_log_events_to_json_values(events).map_err(|e| { + ArrowEncodingError::RecordBatchCreation { + source: arrow::error::ArrowError::JsonError(e.to_string()), + } + })?; + build_record_batch(self.schema.clone(), &values) + } + /// Create a new ArrowStreamSerializer with the given configuration pub fn new(config: ArrowStreamSerializerConfig) -> Result { let schema = config.schema.ok_or(ArrowEncodingError::MissingSchema)?; diff --git a/lib/codecs/src/encoding/format/mod.rs b/lib/codecs/src/encoding/format/mod.rs index f760ea5c8dd83..33030fd41ff2e 100644 --- a/lib/codecs/src/encoding/format/mod.rs +++ b/lib/codecs/src/encoding/format/mod.rs @@ -18,6 +18,7 @@ mod native_json; mod otlp; #[cfg(feature = "parquet")] mod parquet; +mod proto_batch; mod protobuf; mod raw_message; #[cfg(feature = "syslog")] @@ -45,6 +46,7 @@ pub use native::{NativeSerializer, NativeSerializerConfig}; pub use native_json::{NativeJsonSerializer, NativeJsonSerializerConfig}; #[cfg(feature = "opentelemetry")] pub use otlp::{OtlpSerializer, OtlpSerializerConfig}; +pub use proto_batch::{ProtoBatchEncodingError, ProtoBatchSerializer, ProtoBatchSerializerConfig}; pub use protobuf::{ProtobufSerializer, ProtobufSerializerConfig, ProtobufSerializerOptions}; pub use raw_message::{RawMessageSerializer, RawMessageSerializerConfig}; #[cfg(feature = "syslog")] diff --git a/lib/codecs/src/encoding/format/proto_batch.rs b/lib/codecs/src/encoding/format/proto_batch.rs new file mode 100644 index 0000000000000..3bfc50dbce204 --- /dev/null +++ b/lib/codecs/src/encoding/format/proto_batch.rs @@ -0,0 +1,127 @@ +//! Protobuf batch serializer for encoding events as individual protobuf records. +//! +//! Encodes each event in a batch independently into protobuf bytes, producing +//! a `Vec>` where each element is a single serialized protobuf message. + +use prost_reflect::{MessageDescriptor, prost::Message as _}; +use snafu::Snafu; +use std::sync::Arc; +use vector_config::configurable_component; +use vector_core::{config::DataType, event::Event, schema}; +use vrl::protobuf::encode::{Options, encode_message}; + +/// Errors that can occur during protobuf batch encoding +#[derive(Debug, Snafu)] +pub enum ProtoBatchEncodingError { + /// No events provided + #[snafu(display("Cannot encode an empty batch"))] + NoEvents, + + /// Unsupported event type + #[snafu(display("Unsupported event type: only Log events are supported"))] + UnsupportedEventType, + + /// Protobuf encoding failed + #[snafu(display("Protobuf encoding failed: {}", source))] + EncodingFailed { + /// The underlying encoding error + source: vector_common::Error, + }, + + /// Protobuf prost encoding failed + #[snafu(display("Protobuf prost encoding failed: {}", source))] + ProstEncodingFailed { + /// The underlying prost error + source: prost_reflect::prost::EncodeError, + }, +} + +/// Configuration for protobuf batch serialization +#[configurable_component] +#[derive(Clone, Default)] +pub struct ProtoBatchSerializerConfig { + /// The protobuf message descriptor to use for encoding. + #[serde(skip)] + #[configurable(derived)] + pub descriptor: Option, +} + +impl std::fmt::Debug for ProtoBatchSerializerConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ProtoBatchSerializerConfig") + .field( + "descriptor", + &self.descriptor.as_ref().map(|d| d.full_name().to_string()), + ) + .finish() + } +} + +impl ProtoBatchSerializerConfig { + /// Create a new ProtoBatchSerializerConfig with a message descriptor + pub fn new(descriptor: MessageDescriptor) -> Self { + Self { + descriptor: Some(descriptor), + } + } + + /// The data type of events that are accepted by this serializer. + pub fn input_type(&self) -> DataType { + DataType::Log + } + + /// The schema required by the serializer. + pub fn schema_requirement(&self) -> schema::Requirement { + schema::Requirement::empty() + } +} + +/// Protobuf batch serializer that encodes each event into individual protobuf bytes. +#[derive(Clone, Debug)] +pub struct ProtoBatchSerializer { + descriptor: Arc, + options: Options, +} + +impl ProtoBatchSerializer { + /// Create a new ProtoBatchSerializer with the given configuration. + pub fn new(config: ProtoBatchSerializerConfig) -> Result { + let descriptor = config.descriptor.ok_or_else(|| { + vector_common::Error::from("Proto batch serializer requires a message descriptor.") + })?; + + Ok(Self { + descriptor: Arc::new(descriptor), + options: Options { + use_json_names: false, + }, + }) + } + + /// Encode a batch of events into individual protobuf byte buffers. + pub fn encode_batch(&self, events: &[Event]) -> Result>, ProtoBatchEncodingError> { + if events.is_empty() { + return Err(ProtoBatchEncodingError::NoEvents); + } + + let mut records = Vec::with_capacity(events.len()); + + for event in events { + let dynamic_message = match event { + Event::Log(log) => { + encode_message(&self.descriptor, log.value().clone(), &self.options) + } + Event::Trace(_) | Event::Metric(_) => { + return Err(ProtoBatchEncodingError::UnsupportedEventType); + } + } + .map_err(|source| ProtoBatchEncodingError::EncodingFailed { + source: source.into(), + })?; + + records.push(dynamic_message.encode_to_vec()); + } + + Ok(records) + } +} diff --git a/lib/codecs/src/encoding/mod.rs b/lib/codecs/src/encoding/mod.rs index a9f698f79153d..2c8b485a49d8f 100644 --- a/lib/codecs/src/encoding/mod.rs +++ b/lib/codecs/src/encoding/mod.rs @@ -10,7 +10,7 @@ pub mod serializer; mod transformer; pub use chunking::{Chunker, Chunking, GelfChunker}; pub use config::{EncodingConfig, EncodingConfigWithFraming, SinkType}; -pub use encoder::{BatchEncoder, BatchSerializer, Encoder, EncoderKind}; +pub use encoder::{BatchEncoder, BatchOutput, BatchSerializer, Encoder, EncoderKind}; #[cfg(feature = "arrow")] pub use format::{ ArrowEncodingError, ArrowStreamSerializer, ArrowStreamSerializerConfig, SchemaProvider, @@ -21,7 +21,8 @@ pub use format::{ CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig, JsonSerializer, JsonSerializerConfig, JsonSerializerOptions, LogfmtSerializer, LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer, - NativeSerializerConfig, ProtobufSerializer, ProtobufSerializerConfig, + NativeSerializerConfig, ProtoBatchEncodingError, ProtoBatchSerializer, + ProtoBatchSerializerConfig, ProtobufSerializer, ProtobufSerializerConfig, ProtobufSerializerOptions, RawMessageSerializer, RawMessageSerializerConfig, TextSerializer, TextSerializerConfig, }; @@ -40,9 +41,7 @@ pub use framing::{ NewlineDelimitedEncoderConfig, VarintLengthDelimitedEncoder, VarintLengthDelimitedEncoderConfig, }; -#[cfg(feature = "arrow")] -pub use serializer::BatchSerializerConfig; -pub use serializer::{Serializer, SerializerConfig}; +pub use serializer::{BatchSerializerConfig, Serializer, SerializerConfig}; pub use transformer::{TimestampFormat, Transformer}; /// An error that occurred while building an encoder. diff --git a/lib/codecs/src/encoding/serializer.rs b/lib/codecs/src/encoding/serializer.rs index c6081e3e2f274..0a7b07a00fd04 100644 --- a/lib/codecs/src/encoding/serializer.rs +++ b/lib/codecs/src/encoding/serializer.rs @@ -10,6 +10,7 @@ use super::format::{ArrowStreamSerializer, ArrowStreamSerializerConfig}; use super::format::{OtlpSerializer, OtlpSerializerConfig}; #[cfg(feature = "parquet")] use super::format::{ParquetSerializer, ParquetSerializerConfig}; +use super::format::{ProtoBatchSerializer, ProtoBatchSerializerConfig}; #[cfg(feature = "syslog")] use super::format::{SyslogSerializer, SyslogSerializerConfig}; use super::{ @@ -168,15 +169,24 @@ pub enum BatchSerializerConfig { #[cfg(feature = "parquet")] #[serde(rename = "parquet")] Parquet(ParquetSerializerConfig), + + /// Encodes each event individually as a [Protocol Buffers][protobuf] message. + /// + /// Each event in the batch is serialized to protobuf bytes independently, + /// producing a list of byte buffers (one per event). + /// + /// [protobuf]: https://protobuf.dev/ + #[serde(rename = "proto_batch")] + ProtoBatch(ProtoBatchSerializerConfig), } -#[cfg(any(feature = "arrow", feature = "parquet"))] impl BatchSerializerConfig { /// Build the batch serializer from this configuration. pub fn build_batch_serializer( &self, ) -> Result> { match self { + #[cfg(feature = "arrow")] BatchSerializerConfig::ArrowStream(arrow_config) => { let serializer = ArrowStreamSerializer::new(arrow_config.clone())?; Ok(super::BatchSerializer::Arrow(serializer)) @@ -186,6 +196,10 @@ impl BatchSerializerConfig { let serializer = ParquetSerializer::new(parquet_config.clone())?; Ok(super::BatchSerializer::Parquet(Box::new(serializer))) } + BatchSerializerConfig::ProtoBatch(proto_config) => { + let serializer = ProtoBatchSerializer::new(proto_config.clone())?; + Ok(super::BatchSerializer::ProtoBatch(serializer)) + } } } @@ -196,6 +210,7 @@ impl BatchSerializerConfig { BatchSerializerConfig::ArrowStream(arrow_config) => arrow_config.input_type(), #[cfg(feature = "parquet")] BatchSerializerConfig::Parquet(parquet_config) => parquet_config.input_type(), + BatchSerializerConfig::ProtoBatch(proto_config) => proto_config.input_type(), } } @@ -206,6 +221,7 @@ impl BatchSerializerConfig { BatchSerializerConfig::ArrowStream(arrow_config) => arrow_config.schema_requirement(), #[cfg(feature = "parquet")] BatchSerializerConfig::Parquet(parquet_config) => parquet_config.schema_requirement(), + BatchSerializerConfig::ProtoBatch(proto_config) => proto_config.schema_requirement(), } } } diff --git a/src/sinks/clickhouse/config.rs b/src/sinks/clickhouse/config.rs index d7af3413c6b97..ff243d46bfb68 100644 --- a/src/sinks/clickhouse/config.rs +++ b/src/sinks/clickhouse/config.rs @@ -292,11 +292,9 @@ impl ClickhouseConfig { let mut arrow_config = match batch_encoding { BatchSerializerConfig::ArrowStream(config) => config.clone(), - #[cfg(feature = "codecs-parquet")] - BatchSerializerConfig::Parquet(_) => { + _ => { return Err( - "ClickHouse sink does not support Parquet batch encoding. Use 'arrow_stream' instead." - .into(), + "'batch_encoding' for ClickHouse must use 'arrow_stream' codec.".into(), ); } }; diff --git a/src/sinks/databricks_zerobus/config.rs b/src/sinks/databricks_zerobus/config.rs new file mode 100644 index 0000000000000..bfbdf550d8934 --- /dev/null +++ b/src/sinks/databricks_zerobus/config.rs @@ -0,0 +1,431 @@ +//! Configuration for the Zerobus sink. + +use vector_lib::configurable::configurable_component; +use vector_lib::sensitive_string::SensitiveString; + +use crate::config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext}; +use crate::sinks::{ + prelude::*, + util::{BatchConfig, RealtimeSizeBasedDefaultBatchSettings}, +}; + +use vector_lib::codecs::encoding::{ + BatchEncoder, BatchSerializerConfig, ProtoBatchSerializerConfig, +}; + +use super::{ + error::ZerobusSinkError, + service::{StreamMode, ZerobusService}, + sink::ZerobusSink, +}; + +/// Authentication configuration for Databricks. +#[configurable_component] +#[derive(Clone, Debug)] +#[serde(tag = "strategy", rename_all = "snake_case")] +pub enum DatabricksAuthentication { + /// Authenticate using OAuth 2.0 client credentials. + #[serde(rename = "oauth")] + OAuth { + /// OAuth 2.0 client ID. + #[configurable(metadata(docs::examples = "${DATABRICKS_CLIENT_ID}"))] + #[configurable(metadata(docs::examples = "abc123..."))] + client_id: SensitiveString, + + /// OAuth 2.0 client secret. + #[configurable(metadata(docs::examples = "${DATABRICKS_CLIENT_SECRET}"))] + #[configurable(metadata(docs::examples = "secret123..."))] + client_secret: SensitiveString, + }, +} + +impl DatabricksAuthentication { + /// Extract the client ID and client secret as string references. + pub fn credentials(&self) -> (&str, &str) { + match self { + DatabricksAuthentication::OAuth { + client_id, + client_secret, + } => (client_id.inner(), client_secret.inner()), + } + } +} + +/// Zerobus stream configuration options. +/// +/// This is a thin wrapper around the SDK's `StreamConfigurationOptions` with Vector-specific +/// configuration attributes and custom defaults suitable for Vector's use case. +#[configurable_component] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub struct ZerobusStreamOptions { + /// Timeout in milliseconds for flush operations. + #[serde(default = "default_flush_timeout_ms")] + #[configurable(metadata(docs::examples = 30000))] + pub flush_timeout_ms: u64, + + /// Timeout in milliseconds for server acknowledgements. + #[serde(default = "default_server_ack_timeout_ms")] + #[configurable(metadata(docs::examples = 60000))] + pub server_lack_of_ack_timeout_ms: u64, +} + +impl Default for ZerobusStreamOptions { + fn default() -> Self { + Self { + flush_timeout_ms: default_flush_timeout_ms(), + server_lack_of_ack_timeout_ms: default_server_ack_timeout_ms(), + } + } +} + +impl From for databricks_zerobus_ingest_sdk::StreamConfigurationOptions { + fn from(options: ZerobusStreamOptions) -> Self { + Self { + recovery: true, + recovery_retries: 4, + server_lack_of_ack_timeout_ms: options.server_lack_of_ack_timeout_ms, + flush_timeout_ms: options.flush_timeout_ms, + ..Default::default() + } + } +} + +/// Configuration for the Databricks Zerobus sink. +#[configurable_component(sink( + "databricks_zerobus", + "Stream observability data to Databricks Unity Catalog via Zerobus." +))] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub struct ZerobusSinkConfig { + /// The Zerobus ingestion endpoint URL. + /// + /// This should be the full URL to the Zerobus ingestion service. + #[configurable(metadata(docs::examples = "https://ingest.dev.databricks.com"))] + #[configurable(metadata(docs::examples = "https://ingest.prod.databricks.com"))] + pub ingestion_endpoint: String, + + /// The Unity Catalog table name to write to. + /// + /// This should be in the format `catalog.schema.table`. + #[configurable(metadata(docs::examples = "logging_platform.my_team.logs"))] + #[configurable(metadata(docs::examples = "main.default.vector_logs"))] + pub table_name: String, + + /// The Unity Catalog endpoint URL. + /// + /// This is used for authentication and table metadata. + #[configurable(metadata( + docs::examples = "https://dbc-e2f0eb31-2b0e.staging.cloud.databricks.com" + ))] + #[configurable(metadata(docs::examples = "https://your-workspace.cloud.databricks.com"))] + pub unity_catalog_endpoint: String, + + /// Databricks authentication configuration. + #[configurable(derived)] + pub auth: DatabricksAuthentication, + + /// Zerobus stream configuration options. + #[serde(default)] + pub stream_options: ZerobusStreamOptions, + + #[configurable(derived)] + #[serde(default)] + pub batch: BatchConfig, + + #[configurable(derived)] + #[serde(default)] + pub request: TowerRequestConfig, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::is_default" + )] + pub acknowledgements: AcknowledgementsConfig, +} + +impl GenerateConfig for ZerobusSinkConfig { + fn generate_config() -> toml::Value { + toml::Value::try_from(Self { + ingestion_endpoint: "https://ingest.dev.databricks.com".to_string(), + table_name: "catalog.schema.table".to_string(), + unity_catalog_endpoint: "https://your-workspace.cloud.databricks.com".to_string(), + auth: DatabricksAuthentication::OAuth { + client_id: SensitiveString::from("${DATABRICKS_CLIENT_ID}".to_string()), + client_secret: SensitiveString::from("${DATABRICKS_CLIENT_SECRET}".to_string()), + }, + stream_options: ZerobusStreamOptions::default(), + batch: BatchConfig::default(), + request: TowerRequestConfig::default(), + acknowledgements: AcknowledgementsConfig::default(), + }) + .unwrap() + } +} + +#[async_trait::async_trait] +#[typetag::serde(name = "databricks_zerobus")] +impl SinkConfig for ZerobusSinkConfig { + async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + self.validate()?; + + let descriptor = ZerobusService::resolve_descriptor(self, cx.proxy()).await?; + + // The zerobus sink always encodes in proto_batch form — the stream + // descriptor is the one we just resolved from Unity Catalog. + let descriptor_proto = std::sync::Arc::new(descriptor.descriptor_proto().clone()); + let stream_mode = StreamMode::Proto { descriptor_proto }; + + let proto_config = ProtoBatchSerializerConfig { + descriptor: Some(descriptor), + }; + let batch_serializer = BatchSerializerConfig::ProtoBatch(proto_config) + .build_batch_serializer() + .map_err(|e| format!("Failed to build batch serializer: {}", e))?; + let encoder = BatchEncoder::new(batch_serializer); + + let acknowledgements_enabled = self + .acknowledgements + .merge_default(&cx.globals.acknowledgements) + .enabled(); + + let service = + ZerobusService::new(self.clone(), stream_mode, acknowledgements_enabled).await?; + let healthcheck_service = service.clone(); + + let request_limits = self.request.into_settings(); + + let sink = ZerobusSink::new(service, request_limits, self.batch, encoder)?; + + let healthcheck = async move { + healthcheck_service + .ensure_stream() + .await + .map_err(|e| e.into()) + }; + + Ok(( + VectorSink::from_event_streamsink(sink), + Box::pin(healthcheck), + )) + } + + fn input(&self) -> Input { + Input::log() + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} + +impl ZerobusSinkConfig { + pub fn validate(&self) -> Result<(), ZerobusSinkError> { + if self.ingestion_endpoint.is_empty() { + return Err(ZerobusSinkError::ConfigError { + message: "ingestion_endpoint cannot be empty".to_string(), + }); + } + + if self.table_name.is_empty() { + return Err(ZerobusSinkError::ConfigError { + message: "table_name cannot be empty".to_string(), + }); + } + + if self.table_name.matches('.').count() != 2 { + return Err(ZerobusSinkError::ConfigError { + message: "table_name must be in format 'catalog.schema.table' (exactly 3 parts)" + .to_string(), + }); + } + + if self.unity_catalog_endpoint.is_empty() { + return Err(ZerobusSinkError::ConfigError { + message: "unity_catalog_endpoint cannot be empty".to_string(), + }); + } + + // Validate authentication credentials + match &self.auth { + DatabricksAuthentication::OAuth { + client_id, + client_secret, + } => { + if client_id.inner().is_empty() { + return Err(ZerobusSinkError::ConfigError { + message: "OAuth client_id cannot be empty".to_string(), + }); + } + if client_secret.inner().is_empty() { + return Err(ZerobusSinkError::ConfigError { + message: "OAuth client_secret cannot be empty".to_string(), + }); + } + } + } + + if let Some(max_bytes) = self.batch.max_bytes { + // Zerobus SDK limits max bytes to 10MB. + // NOTE: The size of the batch in Vector is not exactly the same as the size of the + // batch in the SDK since they are encoded differently. Though it is expected that + // Vector encoded data will be larger than the SDK encoded data since SDK encodes the + // data in protobuf format. + if max_bytes > 10_000_000 { + return Err(ZerobusSinkError::ConfigError { + message: "max_bytes must be less than or equal to 10MB".to_string(), + }); + } + } + + Ok(()) + } +} + +// Default value functions +const fn default_flush_timeout_ms() -> u64 { + 30000 +} + +const fn default_server_ack_timeout_ms() -> u64 { + 60000 +} + + +#[cfg(test)] +mod tests { + use super::*; + use vector_lib::sensitive_string::SensitiveString; + + fn create_test_config() -> ZerobusSinkConfig { + ZerobusSinkConfig { + ingestion_endpoint: "https://test.databricks.com".to_string(), + table_name: "test.default.logs".to_string(), + unity_catalog_endpoint: "https://test-workspace.databricks.com".to_string(), + auth: DatabricksAuthentication::OAuth { + client_id: SensitiveString::from("test-client-id".to_string()), + client_secret: SensitiveString::from("test-client-secret".to_string()), + }, + stream_options: ZerobusStreamOptions::default(), + batch: Default::default(), + request: Default::default(), + acknowledgements: Default::default(), + } + } + + #[test] + fn test_config_validation_success() { + let config = create_test_config(); + assert!(config.validate().is_ok()); + } + + #[test] + fn test_config_validation_empty_endpoint() { + let mut config = create_test_config(); + config.ingestion_endpoint = "".to_string(); + + let result = config.validate(); + assert!(result.is_err()); + + if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError { + message, + }) = result + { + assert!(message.contains("ingestion_endpoint cannot be empty")); + } else { + panic!("Expected ConfigError for empty ingestion_endpoint"); + } + } + + #[test] + fn test_config_validation_empty_table_name() { + let mut config = create_test_config(); + config.table_name = "".to_string(); + + let result = config.validate(); + assert!(result.is_err()); + + if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError { + message, + }) = result + { + assert!(message.contains("table_name cannot be empty")); + } else { + panic!("Expected ConfigError for empty table_name"); + } + } + + #[test] + fn test_config_validation_invalid_table_name() { + let mut config = create_test_config(); + config.table_name = "invalid_table".to_string(); // Missing dots + + let result = config.validate(); + assert!(result.is_err()); + + if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError { + message, + }) = result + { + assert!(message.contains("catalog.schema.table")); + } else { + panic!("Expected ConfigError for invalid table_name format"); + } + } + + #[test] + fn test_config_validation_empty_unity_catalog_endpoint() { + let mut config = create_test_config(); + config.unity_catalog_endpoint = "".to_string(); + + let result = config.validate(); + assert!(result.is_err()); + + if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError { + message, + }) = result + { + assert!(message.contains("unity_catalog_endpoint cannot be empty")); + } else { + panic!("Expected ConfigError for empty unity_catalog_endpoint"); + } + } + + #[test] + fn test_config_validation_empty_oauth_credentials() { + let mut config = create_test_config(); + config.auth = DatabricksAuthentication::OAuth { + client_id: SensitiveString::from("".to_string()), + client_secret: SensitiveString::from("test-secret".to_string()), + }; + + let result = config.validate(); + assert!(result.is_err()); + + if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError { + message, + }) = result + { + assert!(message.contains("OAuth client_id cannot be empty")); + } else { + panic!("Expected ConfigError for empty OAuth client_id"); + } + } + + #[test] + fn test_stream_options_conversion() { + let options = ZerobusStreamOptions { + flush_timeout_ms: 45000, + server_lack_of_ack_timeout_ms: 90000, + }; + + let sdk_options: databricks_zerobus_ingest_sdk::StreamConfigurationOptions = options.into(); + assert_eq!(sdk_options.flush_timeout_ms, 45000); + assert_eq!(sdk_options.server_lack_of_ack_timeout_ms, 90000); + assert!(sdk_options.recovery); + assert_eq!(sdk_options.recovery_retries, 4); + } +} diff --git a/src/sinks/databricks_zerobus/error.rs b/src/sinks/databricks_zerobus/error.rs new file mode 100644 index 0000000000000..053898e46c6e0 --- /dev/null +++ b/src/sinks/databricks_zerobus/error.rs @@ -0,0 +1,166 @@ +//! Error types for the Zerobus sink. + +use databricks_zerobus_ingest_sdk::ZerobusError; +use snafu::Snafu; +use vector_lib::event::EventStatus; + +/// Errors that can occur when using the Zerobus sink. +#[derive(Debug, Snafu)] +#[allow(clippy::enum_variant_names)] +pub enum ZerobusSinkError { + /// Configuration validation failed. + #[snafu(display("Configuration error: {}", message))] + ConfigError { message: String }, + + /// Event encoding failed. + #[snafu(display("Encoding error: {}", message))] + EncodingError { message: String }, + + /// Zerobus SDK error. + #[snafu(display("Zerobus error: {}", source))] + ZerobusError { source: ZerobusError }, + + /// Stream initialization failed. + #[snafu(display("Stream initialization failed: {}", source))] + StreamInitError { source: ZerobusError }, + + /// Record ingestion failed. + #[snafu(display("Record ingestion failed: {}", source))] + IngestionError { source: ZerobusError }, + + /// Acknowledgement was requested but the SDK returned no offset to wait on, + /// so durability cannot be confirmed. Treated as non-retryable. + #[snafu(display("Zerobus ingest returned no offset in acknowledgement mode"))] + MissingAckOffset, +} + +impl From for ZerobusSinkError { + fn from(error: ZerobusError) -> Self { + ZerobusSinkError::ZerobusError { source: error } + } +} + +/// Convert Zerobus errors to Vector event status. +impl From for EventStatus { + fn from(error: ZerobusSinkError) -> Self { + match error { + ZerobusSinkError::ConfigError { .. } + | ZerobusSinkError::EncodingError { .. } + | ZerobusSinkError::MissingAckOffset => EventStatus::Rejected, + ZerobusSinkError::ZerobusError { source } + | ZerobusSinkError::StreamInitError { source } + | ZerobusSinkError::IngestionError { source } => { + if source.is_retryable() { + EventStatus::Errored + } else { + EventStatus::Rejected + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::sinks::databricks_zerobus::service::ZerobusRetryLogic; + use crate::sinks::util::retries::RetryLogic; + + fn retryable_error() -> ZerobusError { + // ChannelCreationError is always retryable + ZerobusError::ChannelCreationError("connection reset".to_string()) + } + + fn non_retryable_error() -> ZerobusError { + // InvalidArgument is never retryable + ZerobusError::InvalidArgument("bad field".to_string()) + } + + #[test] + fn retryable_ingestion_error_maps_to_errored() { + let error = ZerobusSinkError::IngestionError { + source: retryable_error(), + }; + assert_eq!(EventStatus::from(error), EventStatus::Errored); + } + + #[test] + fn non_retryable_ingestion_error_maps_to_rejected() { + let error = ZerobusSinkError::IngestionError { + source: non_retryable_error(), + }; + assert_eq!(EventStatus::from(error), EventStatus::Rejected); + } + + #[test] + fn retryable_stream_init_error_maps_to_errored() { + let error = ZerobusSinkError::StreamInitError { + source: retryable_error(), + }; + assert_eq!(EventStatus::from(error), EventStatus::Errored); + } + + #[test] + fn non_retryable_stream_init_error_maps_to_rejected() { + let error = ZerobusSinkError::StreamInitError { + source: non_retryable_error(), + }; + assert_eq!(EventStatus::from(error), EventStatus::Rejected); + } + + #[test] + fn config_error_maps_to_rejected() { + let error = ZerobusSinkError::ConfigError { + message: "bad config".to_string(), + }; + assert_eq!(EventStatus::from(error), EventStatus::Rejected); + } + + #[test] + fn encoding_error_maps_to_rejected() { + let error = ZerobusSinkError::EncodingError { + message: "encode failed".to_string(), + }; + assert_eq!(EventStatus::from(error), EventStatus::Rejected); + } + + #[test] + fn retry_logic_retryable_errors() { + let logic = ZerobusRetryLogic; + + let error = ZerobusSinkError::IngestionError { + source: retryable_error(), + }; + assert!(logic.is_retriable_error(&error)); + + let error = ZerobusSinkError::StreamInitError { + source: retryable_error(), + }; + assert!(logic.is_retriable_error(&error)); + + let error = ZerobusSinkError::ZerobusError { + source: retryable_error(), + }; + assert!(logic.is_retriable_error(&error)); + } + + #[test] + fn retry_logic_non_retryable_errors() { + let logic = ZerobusRetryLogic; + + let error = ZerobusSinkError::IngestionError { + source: non_retryable_error(), + }; + assert!(!logic.is_retriable_error(&error)); + + let error = ZerobusSinkError::ConfigError { + message: "bad".to_string(), + }; + assert!(!logic.is_retriable_error(&error)); + + let error = ZerobusSinkError::EncodingError { + message: "bad".to_string(), + }; + assert!(!logic.is_retriable_error(&error)); + } +} diff --git a/src/sinks/databricks_zerobus/mod.rs b/src/sinks/databricks_zerobus/mod.rs new file mode 100644 index 0000000000000..19419b3216e4d --- /dev/null +++ b/src/sinks/databricks_zerobus/mod.rs @@ -0,0 +1,12 @@ +//! The Zerobus sink. +//! +//! This sink streams observability data to Databricks Unity Catalog tables +//! via the Zerobus/Shinkansen ingestion service. + +mod config; +mod error; +mod service; +mod sink; +mod unity_catalog_schema; + +pub use config::ZerobusSinkConfig; diff --git a/src/sinks/databricks_zerobus/service.rs b/src/sinks/databricks_zerobus/service.rs new file mode 100644 index 0000000000000..0dfa968bf40a0 --- /dev/null +++ b/src/sinks/databricks_zerobus/service.rs @@ -0,0 +1,563 @@ +//! Zerobus service wrapper for Vector sink integration. + +use databricks_zerobus_ingest_sdk::{TableProperties, ZerobusSdk, ZerobusStream}; +use futures::future::BoxFuture; +use std::sync::Arc; +use tokio::sync::Mutex; +use tower::Service; +use tracing::warn; +use vector_lib::finalization::{EventFinalizers, Finalizable}; +use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; +use vector_lib::stream::DriverResponse; +use crate::sinks::util::retries::RetryLogic; + +use super::{config::ZerobusSinkConfig, error::ZerobusSinkError, unity_catalog_schema}; + +/// The payload for a Zerobus request. +/// +/// The zerobus sink only supports proto-encoded records. +#[derive(Clone, Debug)] +pub enum ZerobusPayload { + /// Pre-encoded protobuf records (one byte buffer per event). + Records(Vec>), +} + +/// Request type for the Zerobus service. +#[derive(Clone, Debug)] +pub struct ZerobusRequest { + pub payload: ZerobusPayload, + pub metadata: RequestMetadata, + pub finalizers: EventFinalizers, +} + +/// Response type for the Zerobus service. +#[derive(Debug)] +pub struct ZerobusResponse { + pub events_byte_size: GroupedCountByteSize, +} + +impl DriverResponse for ZerobusResponse { + fn event_status(&self) -> vector_lib::event::EventStatus { + vector_lib::event::EventStatus::Delivered + } + + fn events_sent(&self) -> &GroupedCountByteSize { + &self.events_byte_size + } +} + +impl Finalizable for ZerobusRequest { + fn take_finalizers(&mut self) -> EventFinalizers { + std::mem::take(&mut self.finalizers) + } +} + +impl MetaDescriptive for ZerobusRequest { + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata + } +} + +/// Determines what kind of stream the service creates and how payloads are ingested. +/// +/// The sink only supports proto streams today; this is kept as an enum to +/// leave room for future stream modes without reshaping all the call sites. +#[derive(Clone)] +pub enum StreamMode { + /// Proto stream using `ZerobusStream::ingest_records_offset`. + Proto { + descriptor_proto: Arc, + }, +} + +/// The active stream. +enum ActiveStream { + Proto(Box), + /// Test-only variant that returns a pre-configured error on ingest. + #[cfg(test)] + Mock(MockStream), +} + +impl ActiveStream { + /// Gracefully flush and close the underlying SDK stream. + /// + /// Safe to call before the value is dropped — the SDK's own `Drop` + /// implementation is a no-op on already-closed streams. + async fn close(&mut self) { + let result = match self { + ActiveStream::Proto(s) => s.close().await, + #[cfg(test)] + ActiveStream::Mock(m) => { + m.closed.store(true, std::sync::atomic::Ordering::Relaxed); + Ok(()) + } + }; + if let Err(e) = result { + warn!(message = "Failed to close Zerobus stream.", error = %e); + } + } +} + +/// A mock stream that returns a configurable error on the next ingest call. +#[cfg(test)] +pub struct MockStream { + /// When `Some`, the next ingest returns this error; when `None`, ingest succeeds. + next_error: std::sync::Mutex>, + /// Shared flag set to `true` when `ActiveStream::close()` is called. + closed: Arc, +} + +#[cfg(test)] +impl MockStream { + pub fn succeeding() -> Self { + Self { + next_error: std::sync::Mutex::new(None), + closed: Arc::new(std::sync::atomic::AtomicBool::new(false)), + } + } + + pub fn failing(error: databricks_zerobus_ingest_sdk::ZerobusError) -> Self { + Self { + next_error: std::sync::Mutex::new(Some(error)), + closed: Arc::new(std::sync::atomic::AtomicBool::new(false)), + } + } + + /// Returns a shared handle to the closed flag for test assertions. + pub fn closed_flag(&self) -> Arc { + Arc::clone(&self.closed) + } + + /// Set the error that will be returned on the next ingest call. + pub fn set_next_error(&self, error: databricks_zerobus_ingest_sdk::ZerobusError) { + *self.next_error.lock().unwrap() = Some(error); + } + + fn try_ingest(&self) -> Result<(), databricks_zerobus_ingest_sdk::ZerobusError> { + match self.next_error.lock().unwrap().take() { + Some(e) => Err(e), + None => Ok(()), + } + } +} + +/// Service for handling Zerobus requests. +pub struct ZerobusService { + sdk: Arc, + config: Arc, + stream: Arc>>>, + stream_mode: StreamMode, + /// When true, the service waits for server-side acknowledgment after each + /// ingest call. Derived from `AcknowledgementsConfig`. + require_acknowledgements: bool, +} + +impl ZerobusService { + pub async fn new( + config: ZerobusSinkConfig, + stream_mode: StreamMode, + require_acknowledgements: bool, + ) -> Result { + // Create SDK instance + let sdk = ZerobusSdk::builder() + .endpoint(&config.ingestion_endpoint) + .unity_catalog_url(&config.unity_catalog_endpoint) + .build() + .map_err(|e| ZerobusSinkError::ConfigError { + message: format!("Failed to create Zerobus SDK: {}", e), + })?; + + Ok(Self { + sdk: Arc::new(sdk), + config: Arc::new(config), + stream: Arc::new(Mutex::new(None)), + stream_mode, + require_acknowledgements, + }) + } + + /// Resolve the protobuf message descriptor from the schema configuration. + pub async fn resolve_descriptor( + config: &ZerobusSinkConfig, + proxy: &crate::config::ProxyConfig, + ) -> Result { + let (client_id, client_secret) = config.auth.credentials(); + + let table_schema = unity_catalog_schema::fetch_table_schema( + &config.unity_catalog_endpoint, + &config.table_name, + client_id, + client_secret, + proxy, + ) + .await?; + + unity_catalog_schema::generate_descriptor_from_schema(&table_schema) + } + + /// Ensure we have an active stream, creating one if necessary. + /// + /// Also used as the healthcheck: eagerly creating a stream verifies + /// OAuth credentials, endpoint connectivity, and table validity. + pub async fn ensure_stream(&self) -> Result<(), ZerobusSinkError> { + self.get_or_create_stream().await.map(|_| ()) + } + + /// Return an `Arc` handle to the active stream, creating one if needed. + /// + /// The lock is held only while checking/creating the stream; callers can + /// then use the returned `Arc` without holding the lock. + async fn get_or_create_stream(&self) -> Result, ZerobusSinkError> { + let mut stream_guard = self.stream.lock().await; + + if stream_guard.is_none() { + let (client_id, client_secret) = self.config.auth.credentials(); + let (client_id, client_secret) = (client_id.to_string(), client_secret.to_string()); + + let active_stream = match &self.stream_mode { + StreamMode::Proto { descriptor_proto } => { + let table_properties = TableProperties { + table_name: self.config.table_name.clone(), + descriptor_proto: Some((**descriptor_proto).clone()), + }; + let stream_options = Some(self.config.stream_options.clone().into()); + let stream = self + .sdk + .create_stream(table_properties, client_id, client_secret, stream_options) + .await + .map_err(|e| ZerobusSinkError::StreamInitError { source: e })?; + ActiveStream::Proto(Box::new(stream)) + } + }; + + *stream_guard = Some(Arc::new(active_stream)); + } + + Ok(Arc::clone(stream_guard.as_ref().unwrap())) + } + + /// Gracefully close and remove the active stream. + /// + /// Should be called after all in-flight ingests have completed (e.g., + /// after the driver returns) so that the slot holds the sole `Arc` + /// reference to the stream. + pub async fn close_stream(&self) { + if let Some(stream) = self.stream.lock().await.take() { + match Arc::try_unwrap(stream) { + Ok(mut stream) => stream.close().await, + Err(_) => { + warn!( + message = + "Zerobus stream has outstanding references, skipping graceful close." + ); + } + } + } + } + + /// Ingest a payload (proto records or Arrow batch). + /// + /// Obtains an `Arc` handle to the stream (creating one if needed) and + /// then releases the lock before calling into the SDK so that concurrent + /// ingests are not serialized. + /// + /// On retryable errors the active stream is removed from the slot so that + /// the next attempt (driven by Tower retry) creates a fresh one. + pub async fn ingest( + &self, + payload: ZerobusPayload, + events_byte_size: GroupedCountByteSize, + ) -> Result { + let mut stream = self.get_or_create_stream().await?; + + // Lock is not held here — other tasks can ingest concurrently. + let result = match (payload, stream.as_ref()) { + (ZerobusPayload::Records(records), ActiveStream::Proto(stream)) => { + match stream.ingest_records_offset(records).await { + Ok(Some(offset)) if self.require_acknowledgements => { + stream.wait_for_offset(offset).await.map(|_| ()) + } + Ok(None) if self.require_acknowledgements => { + return Err(ZerobusSinkError::MissingAckOffset); + } + Ok(_) => Ok(()), + Err(e) => Err(e), + } + } + #[cfg(test)] + (ZerobusPayload::Records(_), ActiveStream::Mock(mock)) => mock.try_ingest(), + }; + + match result { + Ok(()) => Ok(ZerobusResponse { events_byte_size }), + Err(e) => { + if e.is_retryable() { + // Only clear the slot if it still points to the same stream that + // failed. Another concurrent task may have already replaced it + // with a fresh stream after recovering from its own error. + { + let mut guard = self.stream.lock().await; + if guard.as_ref().is_some_and(|s| Arc::ptr_eq(s, &stream)) { + guard.take(); + } + } + if let Some(active) = Arc::get_mut(&mut stream) { + active.close().await; + } + } + Err(ZerobusSinkError::IngestionError { source: e }) + } + } + } +} + +impl Service for ZerobusService { + type Response = ZerobusResponse; + type Error = ZerobusSinkError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + + fn call(&mut self, mut request: ZerobusRequest) -> Self::Future { + let service = self.clone(); + let events_byte_size = + std::mem::take(request.metadata_mut()).into_events_estimated_json_encoded_byte_size(); + + Box::pin(async move { service.ingest(request.payload, events_byte_size).await }) + } +} + +impl Clone for ZerobusService { + fn clone(&self) -> Self { + Self { + sdk: Arc::clone(&self.sdk), + config: Arc::clone(&self.config), + stream: Arc::clone(&self.stream), + stream_mode: self.stream_mode.clone(), + require_acknowledgements: self.require_acknowledgements, + } + } +} + +/// Retry logic for the Zerobus service. +/// +/// For SDK errors (`ZerobusError`), delegates to the SDK's `is_retryable()` which +/// correctly marks transient errors (stream closed, channel issues) as retriable +/// and permanent errors (invalid table name, invalid argument, invalid endpoint) +/// as non-retriable. +#[derive(Debug, Default, Clone)] +pub struct ZerobusRetryLogic; + +#[cfg(test)] +impl ZerobusService { + /// Create a service with a mock stream already installed for testing. + pub async fn new_with_mock( + config: ZerobusSinkConfig, + mock: MockStream, + require_acknowledgements: bool, + ) -> Result { + config.validate()?; + + let sdk = ZerobusSdk::builder() + .endpoint(&config.ingestion_endpoint) + .unity_catalog_url(&config.unity_catalog_endpoint) + .build() + .map_err(|e| ZerobusSinkError::ConfigError { + message: format!("Failed to create Zerobus SDK: {}", e), + })?; + + Ok(Self { + sdk: Arc::new(sdk), + config: Arc::new(config), + stream: Arc::new(Mutex::new(Some(Arc::new(ActiveStream::Mock(mock))))), + stream_mode: StreamMode::Proto { + descriptor_proto: Arc::new(Default::default()), + }, + require_acknowledgements, + }) + } + + /// Returns true if the service currently has an active stream. + pub async fn has_active_stream(&self) -> bool { + self.stream.lock().await.is_some() + } +} + +impl RetryLogic for ZerobusRetryLogic { + type Error = ZerobusSinkError; + type Request = ZerobusRequest; + type Response = ZerobusResponse; + + fn is_retriable_error(&self, error: &Self::Error) -> bool { + match error { + ZerobusSinkError::ZerobusError { source } + | ZerobusSinkError::StreamInitError { source } + | ZerobusSinkError::IngestionError { source } => source.is_retryable(), + ZerobusSinkError::ConfigError { .. } + | ZerobusSinkError::EncodingError { .. } + | ZerobusSinkError::MissingAckOffset => false, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::sinks::databricks_zerobus::config::{ + DatabricksAuthentication, ZerobusStreamOptions, + }; + use databricks_zerobus_ingest_sdk::ZerobusError; + use vector_lib::sensitive_string::SensitiveString; + + fn test_config() -> ZerobusSinkConfig { + ZerobusSinkConfig { + ingestion_endpoint: "https://127.0.0.1:1".to_string(), + table_name: "test.default.logs".to_string(), + unity_catalog_endpoint: "https://127.0.0.1:1".to_string(), + auth: DatabricksAuthentication::OAuth { + client_id: SensitiveString::from("id".to_string()), + client_secret: SensitiveString::from("secret".to_string()), + }, + stream_options: ZerobusStreamOptions::default(), + batch: Default::default(), + request: Default::default(), + acknowledgements: Default::default(), + } + } + + fn dummy_payload() -> ZerobusPayload { + ZerobusPayload::Records(vec![vec![1, 2, 3]]) + } + + #[tokio::test] + async fn ingest_succeeds_with_mock_stream() { + let service = ZerobusService::new_with_mock(test_config(), MockStream::succeeding(), false) + .await + .unwrap(); + + let result = service + .ingest(dummy_payload(), GroupedCountByteSize::new_untagged()) + .await; + + assert!(result.is_ok()); + assert!(service.has_active_stream().await); + } + + #[tokio::test] + async fn retryable_error_clears_stream() { + let mock = MockStream::failing(ZerobusError::ChannelCreationError( + "connection reset".to_string(), + )); + let service = ZerobusService::new_with_mock(test_config(), mock, false) + .await + .unwrap(); + + assert!(service.has_active_stream().await); + + let err = service + .ingest(dummy_payload(), GroupedCountByteSize::new_untagged()) + .await + .unwrap_err(); + + assert!(matches!(err, ZerobusSinkError::IngestionError { .. })); + assert!(ZerobusRetryLogic.is_retriable_error(&err)); + // Stream must have been cleared for the next retry. + assert!(!service.has_active_stream().await); + } + + #[tokio::test] + async fn non_retryable_error_keeps_stream() { + let mock = MockStream::failing(ZerobusError::InvalidArgument("bad field".to_string())); + let service = ZerobusService::new_with_mock(test_config(), mock, false) + .await + .unwrap(); + + assert!(service.has_active_stream().await); + + let err = service + .ingest(dummy_payload(), GroupedCountByteSize::new_untagged()) + .await + .unwrap_err(); + + assert!(matches!(err, ZerobusSinkError::IngestionError { .. })); + assert!(!ZerobusRetryLogic.is_retriable_error(&err)); + // Stream should NOT be cleared for non-retryable errors. + assert!(service.has_active_stream().await); + } + + #[tokio::test] + async fn stream_recovers_after_retryable_failure() { + // Simulate: success → retryable failure → success again. + let mock = MockStream::succeeding(); + let service = ZerobusService::new_with_mock(test_config(), mock, false) + .await + .unwrap(); + + // First ingest succeeds. + assert!( + service + .ingest(dummy_payload(), GroupedCountByteSize::new_untagged()) + .await + .is_ok() + ); + assert!(service.has_active_stream().await); + + // Inject a retryable error for the next call. + { + let guard = service.stream.lock().await; + if let Some(arc) = guard.as_ref() + && let ActiveStream::Mock(mock) = arc.as_ref() + { + mock.set_next_error(ZerobusError::ChannelCreationError("reset".to_string())); + } + } + + // Second ingest fails and clears the stream. + let err = service + .ingest(dummy_payload(), GroupedCountByteSize::new_untagged()) + .await + .unwrap_err(); + assert!(ZerobusRetryLogic.is_retriable_error(&err)); + assert!(!service.has_active_stream().await); + + // Simulate Tower retry: re-inject a fresh mock stream + // (in production, ensure_stream() would create a new real stream). + *service.stream.lock().await = Some(Arc::new(ActiveStream::Mock(MockStream::succeeding()))); + + // Third ingest succeeds on the new stream. + assert!( + service + .ingest(dummy_payload(), GroupedCountByteSize::new_untagged()) + .await + .is_ok() + ); + assert!(service.has_active_stream().await); + } + + #[tokio::test] + async fn close_stream_calls_close_on_active_stream() { + let mock = MockStream::succeeding(); + let closed = mock.closed_flag(); + + let service = ZerobusService::new_with_mock(test_config(), mock, false) + .await + .unwrap(); + + assert!(service.has_active_stream().await); + assert!(!closed.load(std::sync::atomic::Ordering::Relaxed)); + + service.close_stream().await; + + assert!(!service.has_active_stream().await); + assert!(closed.load(std::sync::atomic::Ordering::Relaxed)); + } +} diff --git a/src/sinks/databricks_zerobus/sink.rs b/src/sinks/databricks_zerobus/sink.rs new file mode 100644 index 0000000000000..30b473c2c041b --- /dev/null +++ b/src/sinks/databricks_zerobus/sink.rs @@ -0,0 +1,123 @@ +//! The main Zerobus sink implementation. + +use std::num::NonZeroUsize; +use std::sync::Arc; + +use futures::stream::BoxStream; + +use vector_lib::codecs::encoding::{BatchEncoder, BatchOutput}; +use vector_lib::event::EventStatus; +use vector_lib::finalization::Finalizable; + +use crate::sinks::prelude::*; +use crate::sinks::util::metadata::RequestMetadataBuilder; +use crate::sinks::util::request_builder::default_request_builder_concurrency_limit; +use crate::sinks::util::{RealtimeSizeBasedDefaultBatchSettings, TowerRequestSettings}; + +use super::service::{ZerobusPayload, ZerobusRequest, ZerobusRetryLogic, ZerobusService}; + +/// The main Zerobus sink. +pub struct ZerobusSink { + service: ZerobusService, + request_limits: TowerRequestSettings, + batch_settings: BatcherSettings, + encoder: BatchEncoder, +} + +impl ZerobusSink { + pub fn new( + service: ZerobusService, + request_limits: TowerRequestSettings, + batch_config: BatchConfig, + encoder: BatchEncoder, + ) -> Result { + let batch_settings = batch_config.into_batcher_settings()?; + + Ok(Self { + service, + request_limits, + batch_settings, + encoder, + }) + } + + fn encode_batch( + encoder: &BatchEncoder, + mut events: Vec, + ) -> Result { + let finalizers = events.take_finalizers(); + let metadata_builder = RequestMetadataBuilder::from_events(&events); + + let batch_output = match encoder.encode_batch(&events) { + Ok(output) => output, + Err(e) => { + finalizers.update_status(EventStatus::Rejected); + return Err(format!("Failed to encode batch: {}", e)); + } + }; + + let (payload, byte_size) = match batch_output { + BatchOutput::Records(records) => { + let size = records.iter().map(|r| r.len()).sum::(); + (ZerobusPayload::Records(records), size) + } + // The sink only builds a proto-batch encoder, so this arm is + // only reachable if the shared codec adds a new `BatchOutput` + // variant without a corresponding zerobus payload type. + #[cfg(feature = "codecs-arrow")] + BatchOutput::Arrow(_) => { + finalizers.update_status(EventStatus::Rejected); + return Err("The Databricks Zerobus sink only supports proto-batch output.".into()); + } + }; + + let request_size = NonZeroUsize::new(byte_size).unwrap_or(NonZeroUsize::MIN); + let metadata = metadata_builder.with_request_size(request_size); + + Ok(ZerobusRequest { + payload, + metadata, + finalizers, + }) + } + + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + let encoder = Arc::new(self.encoder.clone()); + + let result = { + let tower_service = ServiceBuilder::new() + .settings(self.request_limits, ZerobusRetryLogic) + .service(self.service.clone()); + + input + .batched(self.batch_settings.as_byte_size_config()) + .concurrent_map(default_request_builder_concurrency_limit(), move |events| { + let encoder = Arc::clone(&encoder); + Box::pin(async move { Self::encode_batch(&encoder, events) }) + }) + .filter_map(|result| async move { + match result { + Err(error) => { + emit!(SinkRequestBuildError { error }); + None + } + Ok(req) => Some(req), + } + }) + .into_driver(tower_service) + .run() + .await + }; + + self.service.close_stream().await; + + result + } +} + +#[async_trait::async_trait] +impl StreamSink for ZerobusSink { + async fn run(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + self.run_inner(input).await + } +} diff --git a/src/sinks/databricks_zerobus/tests/fixtures/nested_structs_complete_schema.json b/src/sinks/databricks_zerobus/tests/fixtures/nested_structs_complete_schema.json new file mode 100644 index 0000000000000..f30f749d13230 --- /dev/null +++ b/src/sinks/databricks_zerobus/tests/fixtures/nested_structs_complete_schema.json @@ -0,0 +1,127 @@ +{ + "name": "nested_structs_table", + "catalog_name": "test_catalog", + "schema_name": "test_schema", + "columns": [ + { + "name": "field_001", + "type_text": "bigint", + "type_name": "LONG", + "position": 0, + "nullable": true, + "type_json": "{\"name\":\"field_001\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}" + }, + { + "name": "field_002", + "type_text": "string", + "type_name": "STRING", + "position": 1, + "nullable": true, + "type_json": "{\"name\":\"field_002\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}" + }, + { + "name": "field_003", + "type_text": "string", + "type_name": "STRING", + "position": 2, + "nullable": true, + "type_json": "{\"name\":\"field_003\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}" + }, + { + "name": "field_004", + "type_text": "string", + "type_name": "STRING", + "position": 3, + "nullable": true, + "type_json": "{\"name\":\"field_004\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}" + }, + { + "name": "field_005", + "type_text": "string", + "type_name": "STRING", + "position": 4, + "nullable": true, + "type_json": "{\"name\":\"field_005\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}" + }, + { + "name": "field_006", + "type_text": "bigint", + "type_name": "LONG", + "position": 5, + "nullable": true, + "type_json": "{\"name\":\"field_006\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}" + }, + { + "name": "field_007", + "type_text": "bigint", + "type_name": "LONG", + "position": 6, + "nullable": true, + "type_json": "{\"name\":\"field_007\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}" + }, + { + "name": "field_008", + "type_text": "struct,field_012:struct,field_014:struct,field_016:struct>", + "type_name": "STRUCT", + "position": 7, + "nullable": true, + "type_json": "{\"name\":\"field_008\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"field_009\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"field_010\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"field_011\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"field_012\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"field_013\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"field_014\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"field_015\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"field_016\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"field_017\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}" + }, + { + "name": "field_018", + "type_text": "struct", + "type_name": "STRUCT", + "position": 8, + "nullable": true, + "type_json": "{\"name\":\"field_018\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"field_019\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"field_020\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}" + }, + { + "name": "field_021", + "type_text": "struct", + "type_name": "STRUCT", + "position": 9, + "nullable": true, + "type_json": "{\"name\":\"field_021\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"field_022\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"field_023\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"field_020\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"field_024\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"field_025\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"field_026\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}" + }, + { + "name": "field_027", + "type_text": "array", + "type_name": "ARRAY", + "position": 10, + "nullable": true, + "type_json": "{\"name\":\"field_027\",\"type\":{\"type\":\"array\",\"elementType\":\"long\",\"containsNull\":true},\"nullable\":true,\"metadata\":{}}" + }, + { + "name": "field_028", + "type_text": "boolean", + "type_name": "BOOLEAN", + "position": 11, + "nullable": true, + "type_json": "{\"name\":\"field_028\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}}" + }, + { + "name": "field_029", + "type_text": "string", + "type_name": "STRING", + "position": 12, + "nullable": true, + "type_json": "{\"name\":\"field_029\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}" + }, + { + "name": "field_030", + "type_text": "string", + "type_name": "STRING", + "position": 13, + "nullable": true, + "type_json": "{\"name\":\"field_030\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}" + }, + { + "name": "field_031", + "type_text": "boolean", + "type_name": "BOOLEAN", + "position": 14, + "nullable": true, + "type_json": "{\"name\":\"field_031\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}}" + } + ] +} diff --git a/src/sinks/databricks_zerobus/unity_catalog_schema.rs b/src/sinks/databricks_zerobus/unity_catalog_schema.rs new file mode 100644 index 0000000000000..2d540c13da582 --- /dev/null +++ b/src/sinks/databricks_zerobus/unity_catalog_schema.rs @@ -0,0 +1,456 @@ +//! Unity Catalog schema fetching and protobuf descriptor generation. +//! +//! The UC-to-protobuf conversion is delegated to +//! [`databricks_zerobus_ingest_sdk::schema`]; this module only wraps it with +//! the HTTP fetching + descriptor-pool assembly that the sink needs. + +use bytes::Buf; +use databricks_zerobus_ingest_sdk::schema::descriptor_from_uc_schema; +use http::{Request, Uri}; +use http_body::Body as HttpBody; +use hyper::Body; +use percent_encoding::{NON_ALPHANUMERIC, percent_encode}; +use prost_reflect::prost_types; +use serde::Deserialize; + +use super::error::ZerobusSinkError; +use crate::config::ProxyConfig; +use crate::http::HttpClient; +use crate::tls::TlsSettings; + +// Alias the SDK types under the names the rest of the sink already uses. +#[cfg(test)] +use databricks_zerobus_ingest_sdk::schema::UcColumn as UnityCatalogColumn; +pub use databricks_zerobus_ingest_sdk::schema::UcTableSchema as UnityCatalogTableSchema; + +/// OAuth token response from Databricks +#[derive(Debug, Deserialize)] +struct OAuthTokenResponse { + access_token: String, +} + +/// Fetch table schema from Unity Catalog API +pub async fn fetch_table_schema( + unity_catalog_endpoint: &str, + table_name: &str, + client_id: &str, + client_secret: &str, + proxy: &ProxyConfig, +) -> Result { + let http_client = + HttpClient::new(TlsSettings::default(), proxy).map_err(|e| { + ZerobusSinkError::ConfigError { + message: format!("Failed to create HTTP client: {}", e), + } + })?; + + // First, get OAuth token + let token = get_oauth_token( + &http_client, + unity_catalog_endpoint, + client_id, + client_secret, + ) + .await?; + + // Fetch table schema. + // Encode each segment of the fully-qualified table name (catalog.schema.table) + // so that reserved URI characters in quoted Unity Catalog identifiers (spaces, + // #, /, etc.) don't break URI parsing or hit the wrong endpoint. + let encoded_table_name: String = table_name + .split('.') + .map(|seg| percent_encode(seg.as_bytes(), NON_ALPHANUMERIC).to_string()) + .collect::>() + .join("."); + let url = format!( + "{}/api/2.0/unity-catalog/tables/{}", + unity_catalog_endpoint.trim_end_matches('/'), + encoded_table_name + ); + + let uri: Uri = url.parse().map_err(|e| ZerobusSinkError::ConfigError { + message: format!("Invalid Unity Catalog endpoint URL: {}", e), + })?; + + let request = Request::get(uri) + .header("Authorization", format!("Bearer {}", token)) + .header("Content-Type", "application/json") + .body(Body::empty()) + .map_err(|e| ZerobusSinkError::ConfigError { + message: format!("Failed to build request: {}", e), + })?; + + let response = http_client + .send(request) + .await + .map_err(|e| ZerobusSinkError::ConfigError { + message: format!("Failed to fetch table schema: {}", e), + })?; + + let status = response.status(); + if !status.is_success() { + let body_bytes = response + .into_body() + .collect() + .await + .map(|c| c.to_bytes()) + .unwrap_or_default(); + let error_text = String::from_utf8_lossy(&body_bytes); + return Err(ZerobusSinkError::ConfigError { + message: format!( + "Unity Catalog API returned error {}: {}", + status, error_text + ), + }); + } + + let body_bytes = response + .into_body() + .collect() + .await + .map(|c| c.to_bytes()) + .map_err(|e| ZerobusSinkError::ConfigError { + message: format!("Failed to read response body: {}", e), + })?; + + let schema: UnityCatalogTableSchema = + serde_json::from_reader(body_bytes.reader()).map_err(|e| { + ZerobusSinkError::ConfigError { + message: format!("Failed to parse table schema response: {}", e), + } + })?; + + Ok(schema) +} + +/// Get OAuth token from Databricks +async fn get_oauth_token( + http_client: &HttpClient, + unity_catalog_endpoint: &str, + client_id: &str, + client_secret: &str, +) -> Result { + let token_url = format!( + "{}/oidc/v1/token", + unity_catalog_endpoint.trim_end_matches('/') + ); + + let uri: Uri = token_url + .parse() + .map_err(|e| ZerobusSinkError::ConfigError { + message: format!("Invalid token endpoint URL: {}", e), + })?; + + // Build form-encoded body + let form_body = format!( + "grant_type=client_credentials&client_id={}&client_secret={}&scope=all-apis", + percent_encode(client_id.as_bytes(), NON_ALPHANUMERIC), + percent_encode(client_secret.as_bytes(), NON_ALPHANUMERIC) + ); + + let request = Request::post(uri) + .header("Content-Type", "application/x-www-form-urlencoded") + .body(Body::from(form_body)) + .map_err(|e| ZerobusSinkError::ConfigError { + message: format!("Failed to build OAuth request: {}", e), + })?; + + let response = http_client + .send(request) + .await + .map_err(|e| ZerobusSinkError::ConfigError { + message: format!("Failed to get OAuth token: {}", e), + })?; + + let status = response.status(); + if !status.is_success() { + let body_bytes = response + .into_body() + .collect() + .await + .map(|c| c.to_bytes()) + .unwrap_or_default(); + let error_text = String::from_utf8_lossy(&body_bytes); + return Err(ZerobusSinkError::ConfigError { + message: format!("OAuth token request failed {}: {}", status, error_text), + }); + } + + let body_bytes = response + .into_body() + .collect() + .await + .map(|c| c.to_bytes()) + .map_err(|e| ZerobusSinkError::ConfigError { + message: format!("Failed to read OAuth response body: {}", e), + })?; + + let token_response: OAuthTokenResponse = + serde_json::from_reader(body_bytes.reader()).map_err(|e| { + ZerobusSinkError::ConfigError { + message: format!("Failed to parse OAuth token response: {}", e), + } + })?; + + Ok(token_response.access_token) +} + +/// Format a protobuf MessageDescriptor as a .proto file string for logging +fn format_descriptor_as_proto(descriptor: &prost_reflect::MessageDescriptor) -> String { + let mut output = String::new(); + format_message_as_proto(descriptor, &mut output, 0); + output +} + +/// Recursively format a message and its nested types +fn format_message_as_proto( + descriptor: &prost_reflect::MessageDescriptor, + output: &mut String, + indent_level: usize, +) { + let indent = " ".repeat(indent_level); + + // Write message header + output.push_str(&format!("{}message {} {{\n", indent, descriptor.name())); + + // Write fields + for field in descriptor.fields() { + let field_indent = " ".repeat(indent_level + 1); + let field_type = format_field_type(&field); + let field_number = field.number(); + output.push_str(&format!( + "{}{}{} = {};\n", + field_indent, + field_type, + field.name(), + field_number + )); + } + + output.push_str(&format!("{}}}\n", indent)); + + // Write nested message types + for nested in descriptor.child_messages() { + output.push('\n'); + format_message_as_proto(&nested, output, indent_level); + } +} + +/// Format a field's type declaration +fn format_field_type(field: &prost_reflect::FieldDescriptor) -> String { + use prost_reflect::Kind; + + if field.is_map() { + // Map fields: map field_name + if let Kind::Message(map_entry) = field.kind() { + let key_field = map_entry.fields().find(|f| f.name() == "key").unwrap(); + let value_field = map_entry.fields().find(|f| f.name() == "value").unwrap(); + let key_type = format_scalar_type(&key_field); + let value_type = format_scalar_type(&value_field); + return format!("map<{}, {}> ", key_type, value_type); + } + } + + let base_type = match field.kind() { + Kind::Message(msg) => msg.name().to_string(), + kind => format_kind_type(&kind), + }; + + if field.is_list() { + format!("repeated {} ", base_type) + } else { + format!("{} ", base_type) + } +} + +/// Format a scalar field type (for map keys/values) +fn format_scalar_type(field: &prost_reflect::FieldDescriptor) -> String { + match field.kind() { + prost_reflect::Kind::Message(msg) => msg.name().to_string(), + kind => format_kind_type(&kind), + } +} + +/// Map Kind enum to proto type string +fn format_kind_type(kind: &prost_reflect::Kind) -> String { + use prost_reflect::Kind; + match kind { + Kind::Double => "double".into(), + Kind::Float => "float".into(), + Kind::Int32 => "int32".into(), + Kind::Int64 => "int64".into(), + Kind::Uint32 => "uint32".into(), + Kind::Uint64 => "uint64".into(), + Kind::Sint32 => "sint32".into(), + Kind::Sint64 => "sint64".into(), + Kind::Fixed32 => "fixed32".into(), + Kind::Fixed64 => "fixed64".into(), + Kind::Sfixed32 => "sfixed32".into(), + Kind::Sfixed64 => "sfixed64".into(), + Kind::Bool => "bool".into(), + Kind::String => "string".into(), + Kind::Bytes => "bytes".into(), + Kind::Message(msg) => msg.name().to_string(), + Kind::Enum(e) => e.name().to_string(), + } +} + +/// Generate a protobuf message descriptor from a Unity Catalog table schema. +/// +/// The core UC-type → protobuf conversion lives in +/// [`databricks_zerobus_ingest_sdk::schema::descriptor_from_uc_schema`]; this +/// wrapper adds the `FileDescriptorProto` / `DescriptorPool` plumbing that +/// Vector needs to get a `prost_reflect::MessageDescriptor` usable for +/// dynamic message encoding. +pub fn generate_descriptor_from_schema( + schema: &UnityCatalogTableSchema, +) -> Result { + let message_proto = + descriptor_from_uc_schema(schema).map_err(|e| ZerobusSinkError::ConfigError { + message: format!("Failed to convert Unity Catalog schema to protobuf: {}", e), + })?; + + let message_name = message_proto.name().to_string(); + let package_name = sanitize_package_name(&schema.catalog_name); + + let file_proto = prost_types::FileDescriptorProto { + name: Some(format!("{}.proto", message_name)), + package: Some(package_name.clone()), + message_type: vec![message_proto], + ..Default::default() + }; + + let file_descriptor_set = prost_types::FileDescriptorSet { + file: vec![file_proto], + }; + + let pool = prost_reflect::DescriptorPool::from_file_descriptor_set(file_descriptor_set) + .map_err(|e| ZerobusSinkError::ConfigError { + message: format!("Failed to build descriptor pool: {}", e), + })?; + + let full_message_name = format!("{}.{}", package_name, message_name); + let message_descriptor = pool + .get_message_by_name(&full_message_name) + .ok_or_else(|| ZerobusSinkError::ConfigError { + message: format!("Failed to get message descriptor for {}", full_message_name), + })?; + + if tracing::enabled!(tracing::Level::INFO) { + let proto_schema = format_descriptor_as_proto(&message_descriptor); + info!( + "Inferred protobuf schema from Unity Catalog table {}.{}.{}:\n{}", + schema.catalog_name, schema.schema_name, schema.name, proto_schema + ); + } + + Ok(message_descriptor) +} + +/// Default prefix for package name segments that start with a non-letter. +const PACKAGE_SEGMENT_PREFIX: char = 'p'; + +/// Sanitize a string for use as a protobuf package name. +/// +/// Package identifiers allow `[a-zA-Z][a-zA-Z0-9_]*` segments separated by `.`. +/// Invalid characters are replaced with `_` and each segment is ensured to start +/// with a letter. +fn sanitize_package_name(name: &str) -> String { + name.split('.') + .map(|segment| { + let mut s: String = segment + .chars() + .map(|c| if c.is_alphanumeric() || c == '_' { c } else { '_' }) + .collect(); + if s.is_empty() || !s.starts_with(|c: char| c.is_alphabetic()) { + s.insert(0, PACKAGE_SEGMENT_PREFIX); + } + s + }) + .collect::>() + .join(".") +} + +#[cfg(test)] +mod tests { + use super::*; + + /// Smoke test: the wrapper calls into the SDK and builds a usable + /// `MessageDescriptor` via the descriptor pool. + #[test] + fn test_generate_descriptor_simple_schema() { + let schema = UnityCatalogTableSchema { + name: "test_table".to_string(), + catalog_name: "test_catalog".to_string(), + schema_name: "test_schema".to_string(), + columns: vec![ + UnityCatalogColumn { + name: "id".to_string(), + type_text: "bigint".to_string(), + type_name: "BIGINT".to_string(), + position: 1, + nullable: false, + type_json: "{}".to_string(), + }, + UnityCatalogColumn { + name: "body".to_string(), + type_text: "string".to_string(), + type_name: "STRING".to_string(), + position: 2, + nullable: true, + type_json: "{}".to_string(), + }, + ], + }; + + let descriptor = generate_descriptor_from_schema(&schema) + .expect("descriptor should be generated"); + assert_eq!(descriptor.fields().len(), 2); + assert!(descriptor.get_field_by_name("id").is_some()); + assert!(descriptor.get_field_by_name("body").is_some()); + } + + /// Snapshot test for the proto-text formatter used in info logging. + /// The UC→proto conversion itself is covered by the SDK's own tests; + /// this guards the local `format_descriptor_as_proto` rendering. + #[test] + fn test_proto_schema_snapshot() { + let json = include_str!("tests/fixtures/nested_structs_complete_schema.json"); + let schema: UnityCatalogTableSchema = + serde_json::from_str(json).expect("Failed to parse nested_structs_complete schema"); + + let descriptor = + generate_descriptor_from_schema(&schema).expect("Failed to generate descriptor"); + + let proto_text = format_descriptor_as_proto(&descriptor); + + assert!( + proto_text.contains("message TestSchemaNestedStructsTable"), + "Proto should have main message definition" + ); + assert!( + proto_text.contains("string field_003"), + "Proto should have field_003 (string)" + ); + assert!( + proto_text.contains("int64 field_007"), + "Proto should have field_007 (int64)" + ); + assert!( + proto_text.contains("repeated int64 field_027"), + "Proto should have field_027 as repeated int64" + ); + assert!( + proto_text.contains("message Field018"), + "Proto should have Field018 nested message" + ); + assert!( + proto_text.contains("message Field021"), + "Proto should have Field021 nested message" + ); + assert!( + proto_text.contains("message Field008"), + "Proto should have Field008 nested message" + ); + } +} diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index e5584de61dd02..95967c645aa16 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -40,6 +40,8 @@ pub mod clickhouse; pub mod console; #[cfg(feature = "sinks-databend")] pub mod databend; +#[cfg(feature = "sinks-databricks-zerobus")] +pub mod databricks_zerobus; #[cfg(any( feature = "sinks-datadog_events", feature = "sinks-datadog_logs", diff --git a/src/sinks/util/encoding.rs b/src/sinks/util/encoding.rs index 4cc49b00f358b..d615b2eb69ff5 100644 --- a/src/sinks/util/encoding.rs +++ b/src/sinks/util/encoding.rs @@ -99,7 +99,6 @@ impl Encoder for (Transformer, vector_lib::codecs::Encoder<()>) { } } -#[cfg(feature = "codecs-arrow")] impl Encoder> for (Transformer, vector_lib::codecs::BatchEncoder) { fn encode_input( &self, @@ -140,7 +139,6 @@ impl Encoder> for (Transformer, vector_lib::codecs::EncoderKind) { vector_lib::codecs::EncoderKind::Framed(encoder) => { (self.0.clone(), *encoder.clone()).encode_input(events, writer) } - #[cfg(feature = "codecs-arrow")] vector_lib::codecs::EncoderKind::Batch(encoder) => { (self.0.clone(), encoder.clone()).encode_input(events, writer) } diff --git a/website/cue/reference/components/sinks/databricks_zerobus.cue b/website/cue/reference/components/sinks/databricks_zerobus.cue new file mode 100644 index 0000000000000..6329bd4fb75c2 --- /dev/null +++ b/website/cue/reference/components/sinks/databricks_zerobus.cue @@ -0,0 +1,156 @@ +package metadata + +components: sinks: databricks_zerobus: { + title: "Databricks Zerobus" + + classes: { + commonly_used: false + delivery: "at_least_once" + development: "beta" + egress_method: "batch" + service_providers: ["Databricks"] + stateful: false + } + + features: { + auto_generated: true + acknowledgements: true + healthcheck: enabled: true + send: { + batch: { + enabled: true + common: false + max_bytes: 10_000_000 + timeout_secs: 1.0 + } + compression: enabled: false + encoding: enabled: false + proxy: enabled: false + request: { + enabled: true + headers: false + } + tls: enabled: false + to: { + service: services.databricks_zerobus + + interface: { + socket: { + api: { + title: "Databricks Zerobus Ingestion API" + url: urls.databricks + } + direction: "outgoing" + protocols: ["http"] + ssl: "required" + } + } + } + } + } + + support: { + requirements: [ + """ + A [Databricks](\(urls.databricks)) workspace with [Unity Catalog](\(urls.databricks_unity_catalog)) enabled. + """, + """ + OAuth 2.0 client credentials (client ID and client secret) with permissions to write to the target table. + """, + ] + warnings: [] + notices: [] + } + + configuration: generated.components.sinks.databricks_zerobus.configuration + + input: { + logs: true + metrics: null + traces: false + } + + how_it_works: { + authentication: { + title: "Authentication" + body: """ + The Databricks Zerobus sink authenticates using OAuth 2.0 client credentials. + You must provide a `client_id` and `client_secret` that have been granted + permissions to write to the target Unity Catalog table. + """ + } + + schema: { + title: "Schema" + body: """ + The sink requires a schema to encode events into protobuf format. + + The sink automatically fetches the table schema from the Unity Catalog API + at startup, ensuring the schema always matches the target table. + + ```yaml + sinks: + zerobus: + type: databricks_zerobus + schema: + type: unity_catalog + ``` + """ + } + + batching: { + title: "Batching" + body: """ + Events are batched before being sent to Zerobus. Each event is individually + serialized as a protobuf message, and the batch is sent as a single request. + The maximum batch size is 10MB, which is enforced by the Zerobus SDK. + """ + } + + error_handling: { + title: "Error Handling & Retries" + body: """ + The sink classifies errors from the Zerobus SDK into retryable and non-retryable + categories: + + - **Retryable errors** (connection failures, stream closed, channel errors): The + sink automatically discards the current gRPC stream and creates a fresh one on + the next retry attempt. This ensures recovery from transient network issues + without manual intervention. + + - **Non-retryable errors** (invalid table, invalid endpoint, invalid arguments): + Events are rejected permanently and the existing stream is kept alive. + + Retry behavior (backoff, concurrency, timeouts) is controlled by the standard + `request` configuration options. + """ + } + + proxy: { + title: "Proxy" + body: """ + Vector's `proxy` configuration is not supported for this sink because the + underlying Zerobus SDK manages its own gRPC connections. The SDK reads proxy + settings from standard environment variables (`grpc_proxy`, `https_proxy`, + `http_proxy`, and their uppercase equivalents). Set these environment variables + if your environment requires egress through an HTTP proxy. + + The Unity Catalog schema discovery requests (used when `schema.type` is set to + `unity_catalog`) do respect Vector's runtime proxy configuration. + """ + } + + acknowledgements: { + title: "Acknowledgements" + body: """ + When `acknowledgements` is enabled, the sink waits for a server-side + acknowledgement after each batch is ingested. This confirms that the Zerobus + service has received and accepted the data before marking events as delivered. + + When disabled (the default), events are marked as delivered as soon as the + ingestion call completes without error, without waiting for an explicit + server acknowledgement. + """ + } + } +} diff --git a/website/cue/reference/services/databricks_zerobus.cue b/website/cue/reference/services/databricks_zerobus.cue new file mode 100644 index 0000000000000..1091c215a5d68 --- /dev/null +++ b/website/cue/reference/services/databricks_zerobus.cue @@ -0,0 +1,10 @@ +package metadata + +services: databricks_zerobus: { + name: "Databricks Zerobus" + thing: "a \(name) ingestion stream" + url: urls.databricks + versions: null + + description: "[Databricks](\(urls.databricks)) is a unified analytics platform. The Zerobus sink streams observability data to Databricks Unity Catalog tables via the Zerobus ingestion service, using protobuf encoding for efficient data transfer." +} diff --git a/website/cue/reference/urls.cue b/website/cue/reference/urls.cue index 239599effed75..0133e581f82e5 100644 --- a/website/cue/reference/urls.cue +++ b/website/cue/reference/urls.cue @@ -124,6 +124,8 @@ urls: { cue: "https://cuelang.org/" csv: "\(wikipedia)/wiki/Comma-separated_values" dag: "\(wikipedia)/wiki/Directed_acyclic_graph" + databricks: "https://www.databricks.com" + databricks_unity_catalog: "https://docs.databricks.com/en/data-governance/unity-catalog/index.html" databend: "https://databend.rs" databend_rest: "https://databend.rs/doc/integrations/api/rest" databend_cloud: "https://www.databend.com" From de47d26db48a9ad9446f4d792cc5fb600e4d45fb Mon Sep 17 00:00:00 2001 From: Flavio Cruz Date: Tue, 21 Apr 2026 21:02:00 +0000 Subject: [PATCH 2/8] Some cosmetic changes --- src/sinks/databricks_zerobus/config.rs | 1 - src/sinks/databricks_zerobus/service.rs | 2 +- .../unity_catalog_schema.rs | 23 +++++++++++-------- .../configuration/sinks/databricks_zerobus.md | 14 +++++++++++ .../components/sinks/databricks_zerobus.cue | 16 ++++--------- 5 files changed, 34 insertions(+), 22 deletions(-) create mode 100644 website/content/en/docs/reference/configuration/sinks/databricks_zerobus.md diff --git a/src/sinks/databricks_zerobus/config.rs b/src/sinks/databricks_zerobus/config.rs index bfbdf550d8934..469b5eb83328f 100644 --- a/src/sinks/databricks_zerobus/config.rs +++ b/src/sinks/databricks_zerobus/config.rs @@ -294,7 +294,6 @@ const fn default_server_ack_timeout_ms() -> u64 { 60000 } - #[cfg(test)] mod tests { use super::*; diff --git a/src/sinks/databricks_zerobus/service.rs b/src/sinks/databricks_zerobus/service.rs index 0dfa968bf40a0..51597026a5817 100644 --- a/src/sinks/databricks_zerobus/service.rs +++ b/src/sinks/databricks_zerobus/service.rs @@ -1,5 +1,6 @@ //! Zerobus service wrapper for Vector sink integration. +use crate::sinks::util::retries::RetryLogic; use databricks_zerobus_ingest_sdk::{TableProperties, ZerobusSdk, ZerobusStream}; use futures::future::BoxFuture; use std::sync::Arc; @@ -9,7 +10,6 @@ use tracing::warn; use vector_lib::finalization::{EventFinalizers, Finalizable}; use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; use vector_lib::stream::DriverResponse; -use crate::sinks::util::retries::RetryLogic; use super::{config::ZerobusSinkConfig, error::ZerobusSinkError, unity_catalog_schema}; diff --git a/src/sinks/databricks_zerobus/unity_catalog_schema.rs b/src/sinks/databricks_zerobus/unity_catalog_schema.rs index 2d540c13da582..9a2863f270dcd 100644 --- a/src/sinks/databricks_zerobus/unity_catalog_schema.rs +++ b/src/sinks/databricks_zerobus/unity_catalog_schema.rs @@ -37,12 +37,11 @@ pub async fn fetch_table_schema( client_secret: &str, proxy: &ProxyConfig, ) -> Result { - let http_client = - HttpClient::new(TlsSettings::default(), proxy).map_err(|e| { - ZerobusSinkError::ConfigError { - message: format!("Failed to create HTTP client: {}", e), - } - })?; + let http_client = HttpClient::new(TlsSettings::default(), proxy).map_err(|e| { + ZerobusSinkError::ConfigError { + message: format!("Failed to create HTTP client: {}", e), + } + })?; // First, get OAuth token let token = get_oauth_token( @@ -360,7 +359,13 @@ fn sanitize_package_name(name: &str) -> String { .map(|segment| { let mut s: String = segment .chars() - .map(|c| if c.is_alphanumeric() || c == '_' { c } else { '_' }) + .map(|c| { + if c.is_alphanumeric() || c == '_' { + c + } else { + '_' + } + }) .collect(); if s.is_empty() || !s.starts_with(|c: char| c.is_alphabetic()) { s.insert(0, PACKAGE_SEGMENT_PREFIX); @@ -403,8 +408,8 @@ mod tests { ], }; - let descriptor = generate_descriptor_from_schema(&schema) - .expect("descriptor should be generated"); + let descriptor = + generate_descriptor_from_schema(&schema).expect("descriptor should be generated"); assert_eq!(descriptor.fields().len(), 2); assert!(descriptor.get_field_by_name("id").is_some()); assert!(descriptor.get_field_by_name("body").is_some()); diff --git a/website/content/en/docs/reference/configuration/sinks/databricks_zerobus.md b/website/content/en/docs/reference/configuration/sinks/databricks_zerobus.md new file mode 100644 index 0000000000000..70275b4141504 --- /dev/null +++ b/website/content/en/docs/reference/configuration/sinks/databricks_zerobus.md @@ -0,0 +1,14 @@ +--- +title: Databricks Zerobus +description: Stream observability data to Databricks Unity Catalog via Zerobus +component_kind: sink +layout: component +tags: ["databricks", "component", "sink", "logs"] +--- + +{{/* +This doc is generated using: + +1. The template in layouts/docs/component.html +2. The relevant CUE data in cue/reference/components/... +*/}} diff --git a/website/cue/reference/components/sinks/databricks_zerobus.cue b/website/cue/reference/components/sinks/databricks_zerobus.cue index 6329bd4fb75c2..8a97e5b2205c0 100644 --- a/website/cue/reference/components/sinks/databricks_zerobus.cue +++ b/website/cue/reference/components/sinks/databricks_zerobus.cue @@ -86,15 +86,9 @@ components: sinks: databricks_zerobus: { The sink requires a schema to encode events into protobuf format. The sink automatically fetches the table schema from the Unity Catalog API - at startup, ensuring the schema always matches the target table. - - ```yaml - sinks: - zerobus: - type: databricks_zerobus - schema: - type: unity_catalog - ``` + at startup using the configured `table_name` and `unity_catalog_endpoint`, + ensuring the schema always matches the target table. No additional schema + configuration is required. """ } @@ -135,8 +129,8 @@ components: sinks: databricks_zerobus: { `http_proxy`, and their uppercase equivalents). Set these environment variables if your environment requires egress through an HTTP proxy. - The Unity Catalog schema discovery requests (used when `schema.type` is set to - `unity_catalog`) do respect Vector's runtime proxy configuration. + The Unity Catalog schema discovery requests do respect Vector's runtime proxy + configuration. """ } From 1f559d01b397ddd3c77ff9162f12311419e4b15c Mon Sep 17 00:00:00 2001 From: Flavio Cruz Date: Tue, 21 Apr 2026 21:32:43 +0000 Subject: [PATCH 3/8] Add generated file --- Cargo.lock | 23 +- Cargo.toml | 2 +- LICENSE-3rdparty.csv | 2 + src/sinks/databricks_zerobus/config.rs | 41 ++- src/sinks/databricks_zerobus/service.rs | 58 +++- .../components/sinks/databricks_zerobus.cue | 23 +- .../components/sinks/generated/clickhouse.cue | 32 +- .../sinks/generated/databricks_zerobus.cue | 322 ++++++++++++++++++ 8 files changed, 457 insertions(+), 46 deletions(-) create mode 100644 website/cue/reference/components/sinks/generated/databricks_zerobus.cue diff --git a/Cargo.lock b/Cargo.lock index da8f8943c24f5..f57d4907b7f7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1824,7 +1824,7 @@ dependencies = [ "bitflags 2.10.0", "cexpr", "clang-sys", - "itertools 0.13.0", + "itertools 0.11.0", "proc-macro2 1.0.106", "quote 1.0.44", "regex", @@ -3230,8 +3230,8 @@ dependencies = [ [[package]] name = "databricks-zerobus-ingest-sdk" -version = "1.1.0" -source = "git+https://github.com/databricks/zerobus-sdk-rs?rev=a963e81#a963e81d6f3da61b8436714b1f23a3c64d663c33" +version = "1.2.0" +source = "git+https://github.com/databricks/zerobus-sdk-rs?rev=082928884828d9012bd988b2863f1670f097b466#082928884828d9012bd988b2863f1670f097b466" dependencies = [ "async-trait", "bytes", @@ -5149,9 +5149,12 @@ dependencies = [ "headers 0.4.1", "http 1.3.1", "hyper 1.7.0", + "hyper-rustls 0.27.5", "hyper-util", "pin-project-lite", + "rustls-native-certs 0.7.0", "tokio", + "tokio-rustls 0.26.2", "tower-service", ] @@ -8369,7 +8372,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ "heck 0.5.0", - "itertools 0.14.0", + "itertools 0.10.5", "log", "multimap", "once_cell", @@ -8415,7 +8418,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.14.0", + "itertools 0.11.0", "proc-macro2 1.0.106", "quote 1.0.44", "syn 2.0.117", @@ -9653,7 +9656,7 @@ dependencies = [ "once_cell", "ring", "rustls-pki-types", - "rustls-webpki 0.103.13", + "rustls-webpki 0.103.12", "subtle", "zeroize", ] @@ -9735,9 +9738,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.103.13" +version = "0.103.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61c429a8649f110dddef65e2a5ad240f747e85f7758a6bccc7e5777bd33f756e" +checksum = "8279bb85272c9f10811ae6a6c547ff594d6a7f3c6c6b02ee9726d1d0dcfcdd06" dependencies = [ "ring", "rustls-pki-types", @@ -13139,8 +13142,8 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "vrl" -version = "0.32.0" -source = "git+https://github.com/vectordotdev/vrl.git?branch=main#8a5b4711f37d0b3bd4d0d2ea8bbb500942bd8e98" +version = "0.31.0" +source = "git+https://github.com/vectordotdev/vrl.git?branch=main#1357941d4481e84fb0323bd89c69cedf35c323f8" dependencies = [ "aes", "aes-siv", diff --git a/Cargo.toml b/Cargo.toml index 612ff2ecdeb55..2cc2cea689139 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -332,7 +332,7 @@ prost-reflect = { workspace = true, optional = true } prost-types = { workspace = true, optional = true } # Databricks Zerobus -databricks-zerobus-ingest-sdk = { git = "https://github.com/databricks/zerobus-sdk-rs", rev = "a963e81", optional = true } +databricks-zerobus-ingest-sdk = { git = "https://github.com/databricks/zerobus-sdk-rs", rev = "082928884828d9012bd988b2863f1670f097b466", optional = true } # GCP goauth = { version = "0.16.0", optional = true } diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index eb9e179e57c2b..be21bc0d4a114 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -215,6 +215,7 @@ dashmap,https://github.com/xacrimon/dashmap,MIT,Acrimon data-url,https://github.com/servo/rust-url,MIT OR Apache-2.0,Simon Sapin databend-client,https://github.com/databendlabs/bendsql,Apache-2.0,Databend Authors +databricks-zerobus-ingest-sdk,https://github.com/databricks/zerobus-sdk,Apache-2.0,Databricks dbl,https://github.com/RustCrypto/utils,MIT OR Apache-2.0,RustCrypto Developers deadpool,https://github.com/deadpool-rs/deadpool,MIT OR Apache-2.0,Michael P. Jung deadpool-runtime,https://github.com/deadpool-rs/deadpool,MIT OR Apache-2.0,Michael P. Jung @@ -349,6 +350,7 @@ httparse,https://github.com/seanmonstar/httparse,MIT OR Apache-2.0,Sean McArthur httpdate,https://github.com/pyfisch/httpdate,MIT OR Apache-2.0,Pyfisch humantime,https://github.com/chronotope/humantime,MIT OR Apache-2.0,The humantime Authors hyper,https://github.com/hyperium/hyper,MIT,Sean McArthur +hyper-http-proxy,https://github.com/metalbear-co/hyper-http-proxy,MIT,MetalBear Tech LTD hyper-named-pipe,https://github.com/fussybeaver/hyper-named-pipe,Apache-2.0,The hyper-named-pipe Authors hyper-openssl,https://github.com/sfackler/hyper-openssl,MIT OR Apache-2.0,Steven Fackler hyper-proxy,https://github.com/tafia/hyper-proxy,MIT,Johann Tuffe diff --git a/src/sinks/databricks_zerobus/config.rs b/src/sinks/databricks_zerobus/config.rs index 469b5eb83328f..c686a4aa0c0c3 100644 --- a/src/sinks/databricks_zerobus/config.rs +++ b/src/sinks/databricks_zerobus/config.rs @@ -23,6 +23,9 @@ use super::{ #[configurable_component] #[derive(Clone, Debug)] #[serde(tag = "strategy", rename_all = "snake_case")] +#[configurable(metadata( + docs::enum_tag_description = "The authentication strategy to use for Databricks." +))] pub enum DatabricksAuthentication { /// Authenticate using OAuth 2.0 client credentials. #[serde(rename = "oauth")] @@ -192,8 +195,13 @@ impl SinkConfig for ZerobusSinkConfig { .merge_default(&cx.globals.acknowledgements) .enabled(); - let service = - ZerobusService::new(self.clone(), stream_mode, acknowledgements_enabled).await?; + let service = ZerobusService::new( + self.clone(), + stream_mode, + acknowledgements_enabled, + cx.proxy(), + ) + .await?; let healthcheck_service = service.clone(); let request_limits = self.request.into_settings(); @@ -236,9 +244,10 @@ impl ZerobusSinkConfig { }); } - if self.table_name.matches('.').count() != 2 { + let parts: Vec<&str> = self.table_name.split('.').collect(); + if parts.len() != 3 || parts.iter().any(|p| p.is_empty()) { return Err(ZerobusSinkError::ConfigError { - message: "table_name must be in format 'catalog.schema.table' (exactly 3 parts)" + message: "table_name must be in format 'catalog.schema.table' (exactly 3 non-empty parts)" .to_string(), }); } @@ -375,6 +384,30 @@ mod tests { } } + #[test] + fn test_config_validation_table_name_empty_segments() { + for bad in [ + "catalog..table", + ".schema.table", + "catalog.schema.", + "..", + "catalog.schema.table.extra", + ] { + let mut config = create_test_config(); + config.table_name = bad.to_string(); + let result = config.validate(); + assert!(result.is_err(), "expected error for table_name={bad:?}"); + if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError { + message, + }) = result + { + assert!(message.contains("catalog.schema.table")); + } else { + panic!("Expected ConfigError for table_name={bad:?}"); + } + } + } + #[test] fn test_config_validation_empty_unity_catalog_endpoint() { let mut config = create_test_config(); diff --git a/src/sinks/databricks_zerobus/service.rs b/src/sinks/databricks_zerobus/service.rs index 51597026a5817..5fa9fb911e6c1 100644 --- a/src/sinks/databricks_zerobus/service.rs +++ b/src/sinks/databricks_zerobus/service.rs @@ -1,7 +1,10 @@ //! Zerobus service wrapper for Vector sink integration. +use crate::config::ProxyConfig; use crate::sinks::util::retries::RetryLogic; -use databricks_zerobus_ingest_sdk::{TableProperties, ZerobusSdk, ZerobusStream}; +use databricks_zerobus_ingest_sdk::{ + ConnectorFactory, ProxyConnector, TableProperties, ZerobusSdk, ZerobusStream, +}; use futures::future::BoxFuture; use std::sync::Arc; use tokio::sync::Mutex; @@ -13,6 +16,43 @@ use vector_lib::stream::DriverResponse; use super::{config::ZerobusSinkConfig, error::ZerobusSinkError, unity_catalog_schema}; +/// Build a connector factory that routes Zerobus gRPC traffic through +/// Vector's configured proxy, honoring `no_proxy` rules. +/// +/// The Zerobus endpoint is always HTTPS gRPC, so the `https` proxy is +/// preferred; the `http` proxy is used as a fallback if only that is set. +/// When the factory is installed on the SDK it fully replaces the SDK's +/// default env-var proxy detection — which is intentional, since Vector's +/// `ProxyConfig` has already merged the process environment at a higher +/// layer and is the single source of truth for proxy decisions. +/// +/// Returns `None` when `enabled = false` or no proxy URL is configured, in +/// which case the SDK falls back to its built-in env-var detection. Because +/// Vector has already merged env vars into `ProxyConfig`, reaching that +/// fallback path means the user has not configured a proxy anywhere. +fn build_connector_factory(proxy: &ProxyConfig) -> Option { + if !proxy.enabled { + return None; + } + let proxy_url = proxy.https.clone().or_else(|| proxy.http.clone())?; + let no_proxy = proxy.no_proxy.clone(); + Some(Arc::new(move |host: &str| { + if no_proxy.matches(host) { + return None; + } + match ProxyConnector::new(&proxy_url) { + Ok(c) => Some(c), + Err(e) => { + warn!( + message = "Failed to build Zerobus proxy connector; falling back to direct connection.", + error = %e, + ); + None + } + } + })) +} + /// The payload for a Zerobus request. /// /// The zerobus sink only supports proto-encoded records. @@ -161,15 +201,17 @@ impl ZerobusService { config: ZerobusSinkConfig, stream_mode: StreamMode, require_acknowledgements: bool, + proxy: &ProxyConfig, ) -> Result { - // Create SDK instance - let sdk = ZerobusSdk::builder() + let mut builder = ZerobusSdk::builder() .endpoint(&config.ingestion_endpoint) - .unity_catalog_url(&config.unity_catalog_endpoint) - .build() - .map_err(|e| ZerobusSinkError::ConfigError { - message: format!("Failed to create Zerobus SDK: {}", e), - })?; + .unity_catalog_url(&config.unity_catalog_endpoint); + if let Some(factory) = build_connector_factory(proxy) { + builder = builder.connector_factory(factory); + } + let sdk = builder.build().map_err(|e| ZerobusSinkError::ConfigError { + message: format!("Failed to create Zerobus SDK: {}", e), + })?; Ok(Self { sdk: Arc::new(sdk), diff --git a/website/cue/reference/components/sinks/databricks_zerobus.cue b/website/cue/reference/components/sinks/databricks_zerobus.cue index 8a97e5b2205c0..9a3c773416bf7 100644 --- a/website/cue/reference/components/sinks/databricks_zerobus.cue +++ b/website/cue/reference/components/sinks/databricks_zerobus.cue @@ -25,7 +25,7 @@ components: sinks: databricks_zerobus: { } compression: enabled: false encoding: enabled: false - proxy: enabled: false + proxy: enabled: true request: { enabled: true headers: false @@ -123,14 +123,19 @@ components: sinks: databricks_zerobus: { proxy: { title: "Proxy" body: """ - Vector's `proxy` configuration is not supported for this sink because the - underlying Zerobus SDK manages its own gRPC connections. The SDK reads proxy - settings from standard environment variables (`grpc_proxy`, `https_proxy`, - `http_proxy`, and their uppercase equivalents). Set these environment variables - if your environment requires egress through an HTTP proxy. - - The Unity Catalog schema discovery requests do respect Vector's runtime proxy - configuration. + Both the Zerobus gRPC ingestion channel and the Unity Catalog schema + discovery requests honor Vector's `proxy` configuration (`proxy.http`, + `proxy.https`, `proxy.no_proxy`), which itself is merged with the standard + `HTTP_PROXY`, `HTTPS_PROXY`, and `NO_PROXY` environment variables. + + Because the Zerobus endpoint is always HTTPS gRPC, the `proxy.https` URL is + used when set; `proxy.http` is used as a fallback only if `proxy.https` is + not configured. Hosts matching `proxy.no_proxy` bypass the proxy. Both + `http://` and `https://` proxy URIs are supported — for HTTPS proxies, the + client-to-proxy hop does its own TLS handshake using the system trust store. + + Setting `proxy.enabled = false` disables proxying entirely, including the + SDK's built-in env-var fallback. """ } diff --git a/website/cue/reference/components/sinks/generated/clickhouse.cue b/website/cue/reference/components/sinks/generated/clickhouse.cue index a0f4399bdc112..a4799ecee18ec 100644 --- a/website/cue/reference/components/sinks/generated/clickhouse.cue +++ b/website/cue/reference/components/sinks/generated/clickhouse.cue @@ -263,27 +263,31 @@ generated: components: sinks: clickhouse: configuration: { When disabled (default), missing values for non-nullable fields will cause encoding errors, ensuring all required data is present before sending to the sink. """ - required: false + relevant_when: "codec = \"arrow_stream\"" + required: false type: bool: default: false } codec: { - description: """ - Encodes events in [Apache Arrow][apache_arrow] IPC streaming format. + description: "The codec to use for batch encoding events." + required: true + type: string: enum: { + arrow_stream: """ + Encodes events in [Apache Arrow][apache_arrow] IPC streaming format. - This is the streaming variant of the Arrow IPC format, which writes - a continuous stream of record batches. + This is the streaming variant of the Arrow IPC format, which writes + a continuous stream of record batches. - [apache_arrow]: https://arrow.apache.org/ - """ - required: true - type: string: enum: arrow_stream: """ - Encodes events in [Apache Arrow][apache_arrow] IPC streaming format. + [apache_arrow]: https://arrow.apache.org/ + """ + proto_batch: """ + Encodes each event individually as a [Protocol Buffers][protobuf] message. - This is the streaming variant of the Arrow IPC format, which writes - a continuous stream of record batches. + Each event in the batch is serialized to protobuf bytes independently, + producing a list of byte buffers (one per event). - [apache_arrow]: https://arrow.apache.org/ - """ + [protobuf]: https://protobuf.dev/ + """ + } } } } diff --git a/website/cue/reference/components/sinks/generated/databricks_zerobus.cue b/website/cue/reference/components/sinks/generated/databricks_zerobus.cue new file mode 100644 index 0000000000000..b82a660028b76 --- /dev/null +++ b/website/cue/reference/components/sinks/generated/databricks_zerobus.cue @@ -0,0 +1,322 @@ +package metadata + +generated: components: sinks: databricks_zerobus: configuration: { + acknowledgements: { + description: """ + Controls how acknowledgements are handled for this sink. + + See [End-to-end Acknowledgements][e2e_acks] for more information on how event acknowledgement is handled. + + [e2e_acks]: https://vector.dev/docs/architecture/end-to-end-acknowledgements/ + """ + required: false + type: object: options: enabled: { + description: """ + Controls whether or not end-to-end acknowledgements are enabled. + + When enabled for a sink, any source that supports end-to-end + acknowledgements that is connected to that sink waits for events + to be acknowledged by **all connected sinks** before acknowledging them at the source. + + Enabling or disabling acknowledgements at the sink level takes precedence over any global + [`acknowledgements`][global_acks] configuration. + + [global_acks]: https://vector.dev/docs/reference/configuration/global-options/#acknowledgements + """ + required: false + type: bool: {} + } + } + auth: { + description: "Databricks authentication configuration." + required: true + type: object: options: { + client_id: { + description: "OAuth 2.0 client ID." + required: true + type: string: examples: ["${DATABRICKS_CLIENT_ID}", "abc123..."] + } + client_secret: { + description: "OAuth 2.0 client secret." + required: true + type: string: examples: ["${DATABRICKS_CLIENT_SECRET}", "secret123..."] + } + strategy: { + description: "The authentication strategy to use for Databricks." + required: true + type: string: enum: oauth: "Authenticate using OAuth 2.0 client credentials." + } + } + } + batch: { + description: "Event batching behavior." + required: false + type: object: options: { + max_bytes: { + description: """ + The maximum size of a batch that is processed by a sink. + + This is based on the uncompressed size of the batched events, before they are + serialized or compressed. + """ + required: false + type: uint: { + default: 10000000 + unit: "bytes" + } + } + max_events: { + description: "The maximum size of a batch before it is flushed." + required: false + type: uint: unit: "events" + } + timeout_secs: { + description: "The maximum age of a batch before it is flushed." + required: false + type: float: { + default: 1.0 + unit: "seconds" + } + } + } + } + ingestion_endpoint: { + description: """ + The Zerobus ingestion endpoint URL. + + This should be the full URL to the Zerobus ingestion service. + """ + required: true + type: string: examples: ["https://ingest.dev.databricks.com", "https://ingest.prod.databricks.com"] + } + request: { + description: """ + Middleware settings for outbound requests. + + Various settings can be configured, such as concurrency and rate limits, timeouts, and retry behavior. + + Note that the retry backoff policy follows the Fibonacci sequence. + """ + required: false + type: object: options: { + adaptive_concurrency: { + description: """ + Configuration of adaptive concurrency parameters. + + These parameters typically do not require changes from the default, and incorrect values can lead to meta-stable or + unstable performance and sink behavior. Proceed with caution. + """ + required: false + type: object: options: { + decrease_ratio: { + description: """ + The fraction of the current value to set the new concurrency limit when decreasing the limit. + + Valid values are greater than `0` and less than `1`. Smaller values cause the algorithm to scale back rapidly + when latency increases. + + **Note**: The new limit is rounded down after applying this ratio. + """ + required: false + type: float: default: 0.9 + } + ewma_alpha: { + description: """ + The weighting of new measurements compared to older measurements. + + Valid values are greater than `0` and less than `1`. + + ARC uses an exponentially weighted moving average (EWMA) of past RTT measurements as a reference to compare with + the current RTT. Smaller values cause this reference to adjust more slowly, which may be useful if a service has + unusually high response variability. + """ + required: false + type: float: default: 0.4 + } + initial_concurrency: { + description: """ + The initial concurrency limit to use. If not specified, the initial limit is 1 (no concurrency). + + Datadog recommends setting this value to your service's average limit if you're seeing that it takes a + long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the + `adaptive_concurrency_limit` metric. + """ + required: false + type: uint: default: 1 + } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit does not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } + rtt_deviation_scale: { + description: """ + Scale of RTT deviations which are not considered anomalous. + + Valid values are greater than or equal to `0`, and reasonable values range from `1.0` to `3.0`. + + When calculating the past RTT average, a secondary “deviation” value is also computed that indicates how variable + those values are. That deviation is used when comparing the past RTT average to the current measurements, so we + can ignore increases in RTT that are within an expected range. This factor is used to scale up the deviation to + an appropriate range. Larger values cause the algorithm to ignore larger increases in the RTT. + """ + required: false + type: float: default: 2.5 + } + } + } + concurrency: { + description: """ + Configuration for outbound request concurrency. + + This can be set either to one of the below enum values or to a positive integer, which denotes + a fixed concurrency limit. + """ + required: false + type: { + string: { + default: "adaptive" + enum: { + adaptive: """ + Concurrency is managed by Vector's [Adaptive Request Concurrency][arc] feature. + + [arc]: https://vector.dev/docs/architecture/arc/ + """ + none: """ + A fixed concurrency of 1. + + Only one request can be outstanding at any given time. + """ + } + } + uint: {} + } + } + rate_limit_duration_secs: { + description: "The time window used for the `rate_limit_num` option." + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + rate_limit_num: { + description: "The maximum number of requests allowed within the `rate_limit_duration_secs` time window." + required: false + type: uint: { + default: 9223372036854775807 + unit: "requests" + } + } + retry_attempts: { + description: "The maximum number of retries to make for failed requests." + required: false + type: uint: { + default: 9223372036854775807 + unit: "retries" + } + } + retry_initial_backoff_secs: { + description: """ + The amount of time to wait before attempting the first retry for a failed request. + + After the first retry has failed, the Fibonacci sequence is used to select future backoffs. + """ + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + retry_jitter_mode: { + description: "The jitter mode to use for retry backoff behavior." + required: false + type: string: { + default: "Full" + enum: { + Full: """ + Full jitter. + + The random delay is anywhere from 0 up to the maximum current delay calculated by the backoff + strategy. + + Incorporating full jitter into your backoff strategy can greatly reduce the likelihood + of creating accidental denial of service (DoS) conditions against your own systems when + many clients are recovering from a failure state. + """ + None: "No jitter." + } + } + } + retry_max_duration_secs: { + description: "The maximum amount of time to wait between retries." + required: false + type: uint: { + default: 30 + unit: "seconds" + } + } + timeout_secs: { + description: """ + The time a request can take before being aborted. + + Datadog highly recommends that you do not lower this value below the service's internal timeout, as this could + create orphaned requests, pile on retries, and result in duplicate data downstream. + """ + required: false + type: uint: { + default: 60 + unit: "seconds" + } + } + } + } + stream_options: { + description: """ + Zerobus stream configuration options. + + Zerobus stream configuration options. + """ + required: false + type: object: options: { + flush_timeout_ms: { + description: "Timeout in milliseconds for flush operations." + required: false + type: uint: { + default: 30000 + examples: [30000] + } + } + server_lack_of_ack_timeout_ms: { + description: "Timeout in milliseconds for server acknowledgements." + required: false + type: uint: { + default: 60000 + examples: [60000] + } + } + } + } + table_name: { + description: """ + The Unity Catalog table name to write to. + + This should be in the format `catalog.schema.table`. + """ + required: true + type: string: examples: ["logging_platform.my_team.logs", "main.default.vector_logs"] + } + unity_catalog_endpoint: { + description: """ + The Unity Catalog endpoint URL. + + This is used for authentication and table metadata. + """ + required: true + type: string: examples: ["https://dbc-e2f0eb31-2b0e.staging.cloud.databricks.com", "https://your-workspace.cloud.databricks.com"] + } +} From 07a23d277890c0cbc007b5e6e5e94491897f54f3 Mon Sep 17 00:00:00 2001 From: Flavio Cruz Date: Wed, 22 Apr 2026 16:50:16 +0000 Subject: [PATCH 4/8] Force direct connection when proxy is disabled to avoid the sdk to pick up the environment variables --- src/sinks/databricks_zerobus/service.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/sinks/databricks_zerobus/service.rs b/src/sinks/databricks_zerobus/service.rs index 5fa9fb911e6c1..660bc16e3d497 100644 --- a/src/sinks/databricks_zerobus/service.rs +++ b/src/sinks/databricks_zerobus/service.rs @@ -26,13 +26,21 @@ use super::{config::ZerobusSinkConfig, error::ZerobusSinkError, unity_catalog_sc /// `ProxyConfig` has already merged the process environment at a higher /// layer and is the single source of truth for proxy decisions. /// -/// Returns `None` when `enabled = false` or no proxy URL is configured, in -/// which case the SDK falls back to its built-in env-var detection. Because -/// Vector has already merged env vars into `ProxyConfig`, reaching that -/// fallback path means the user has not configured a proxy anywhere. +/// When `enabled = false`, returns a factory that unconditionally yields +/// `None`, forcing the SDK to use direct connections and overriding any +/// ambient `HTTP_PROXY`/`HTTPS_PROXY` environment variables that would +/// otherwise be picked up by the SDK's default autodetection. +/// +/// When `enabled = true` but no proxy URL is configured, returns `None` so +/// the SDK falls back to its built-in env-var detection. Because Vector has +/// already merged env vars into `ProxyConfig`, reaching that fallback path +/// means the user has not configured a proxy anywhere. fn build_connector_factory(proxy: &ProxyConfig) -> Option { if !proxy.enabled { - return None; + // Explicit direct-connection factory: prevents the SDK from + // autodetecting proxies from the process environment when the user + // has explicitly disabled proxying in Vector's config. + return Some(Arc::new(|_host: &str| None)); } let proxy_url = proxy.https.clone().or_else(|| proxy.http.clone())?; let no_proxy = proxy.no_proxy.clone(); From ef5d13ac0f5652e2ee4559dddd964b59b75db486 Mon Sep 17 00:00:00 2001 From: Flavio Cruz Date: Wed, 22 Apr 2026 17:38:34 +0000 Subject: [PATCH 5/8] Throw a config error is proxy url is invalid. Use direct connection if no URL is set up --- src/sinks/databricks_zerobus/service.rs | 59 ++++++++++--------------- 1 file changed, 24 insertions(+), 35 deletions(-) diff --git a/src/sinks/databricks_zerobus/service.rs b/src/sinks/databricks_zerobus/service.rs index 660bc16e3d497..46d7646393b44 100644 --- a/src/sinks/databricks_zerobus/service.rs +++ b/src/sinks/databricks_zerobus/service.rs @@ -21,43 +21,34 @@ use super::{config::ZerobusSinkConfig, error::ZerobusSinkError, unity_catalog_sc /// /// The Zerobus endpoint is always HTTPS gRPC, so the `https` proxy is /// preferred; the `http` proxy is used as a fallback if only that is set. -/// When the factory is installed on the SDK it fully replaces the SDK's -/// default env-var proxy detection — which is intentional, since Vector's -/// `ProxyConfig` has already merged the process environment at a higher -/// layer and is the single source of truth for proxy decisions. +/// The returned factory fully replaces the SDK's default env-var proxy +/// detection — Vector's `ProxyConfig` has already merged the process +/// environment at a higher layer and is the single source of truth. /// -/// When `enabled = false`, returns a factory that unconditionally yields -/// `None`, forcing the SDK to use direct connections and overriding any -/// ambient `HTTP_PROXY`/`HTTPS_PROXY` environment variables that would -/// otherwise be picked up by the SDK's default autodetection. -/// -/// When `enabled = true` but no proxy URL is configured, returns `None` so -/// the SDK falls back to its built-in env-var detection. Because Vector has -/// already merged env vars into `ProxyConfig`, reaching that fallback path -/// means the user has not configured a proxy anywhere. -fn build_connector_factory(proxy: &ProxyConfig) -> Option { - if !proxy.enabled { - // Explicit direct-connection factory: prevents the SDK from - // autodetecting proxies from the process environment when the user - // has explicitly disabled proxying in Vector's config. - return Some(Arc::new(|_host: &str| None)); - } - let proxy_url = proxy.https.clone().or_else(|| proxy.http.clone())?; +/// When proxying is disabled or no proxy URL is configured, returns a +/// factory that unconditionally yields `None`, forcing direct connections. +/// Returns an error if the configured proxy URL is malformed, so the +/// problem surfaces at sink startup rather than per-connection. +fn build_connector_factory(proxy: &ProxyConfig) -> Result { + let proxy_url = if proxy.enabled { + proxy.https.clone().or_else(|| proxy.http.clone()) + } else { + None + }; + let Some(proxy_url) = proxy_url else { + return Ok(Arc::new(|_host: &str| None)); + }; + // Validate the proxy URL once up-front so a malformed value surfaces at + // sink startup rather than per-connection. + ProxyConnector::new(&proxy_url).map_err(|e| ZerobusSinkError::ConfigError { + message: format!("Invalid proxy URL '{}': {}", proxy_url, e), + })?; let no_proxy = proxy.no_proxy.clone(); - Some(Arc::new(move |host: &str| { + Ok(Arc::new(move |host: &str| { if no_proxy.matches(host) { return None; } - match ProxyConnector::new(&proxy_url) { - Ok(c) => Some(c), - Err(e) => { - warn!( - message = "Failed to build Zerobus proxy connector; falling back to direct connection.", - error = %e, - ); - None - } - } + ProxyConnector::new(&proxy_url).ok() })) } @@ -214,9 +205,7 @@ impl ZerobusService { let mut builder = ZerobusSdk::builder() .endpoint(&config.ingestion_endpoint) .unity_catalog_url(&config.unity_catalog_endpoint); - if let Some(factory) = build_connector_factory(proxy) { - builder = builder.connector_factory(factory); - } + builder = builder.connector_factory(build_connector_factory(proxy)?); let sdk = builder.build().map_err(|e| ZerobusSinkError::ConfigError { message: format!("Failed to create Zerobus SDK: {}", e), })?; From b1b2566cf95adb7acfe7ff9cf55aedf68144c243 Mon Sep 17 00:00:00 2001 From: Flavio Cruz Date: Wed, 22 Apr 2026 18:54:47 +0000 Subject: [PATCH 6/8] Fix the clickhouse sink so that it only accepts arrow --- src/sinks/clickhouse/config.rs | 38 +++++++++++++------ src/sinks/clickhouse/integration_tests.rs | 14 +++---- src/sinks/databricks_zerobus/config.rs | 7 ++-- .../components/sinks/generated/clickhouse.cue | 32 +++++++--------- 4 files changed, 50 insertions(+), 41 deletions(-) diff --git a/src/sinks/clickhouse/config.rs b/src/sinks/clickhouse/config.rs index ff243d46bfb68..3c161006e9ed0 100644 --- a/src/sinks/clickhouse/config.rs +++ b/src/sinks/clickhouse/config.rs @@ -5,7 +5,7 @@ use std::fmt; use http::{Request, StatusCode, Uri}; use hyper::Body; use vector_lib::codecs::encoding::format::SchemaProvider; -use vector_lib::codecs::encoding::{ArrowStreamSerializerConfig, BatchSerializerConfig}; +use vector_lib::codecs::encoding::ArrowStreamSerializerConfig; use super::{ request_builder::ClickhouseRequestBuilder, @@ -45,6 +45,25 @@ pub enum Format { ArrowStream, } +/// Batch encoding configuration for the `clickhouse` sink. +/// +/// ClickHouse only supports the `arrow_stream` codec for batch encoding. +#[configurable_component] +#[derive(Clone, Debug)] +#[serde(tag = "codec", rename_all = "snake_case")] +#[configurable(metadata( + docs::enum_tag_description = "The codec to use for batch encoding events." +))] +pub enum ClickhouseBatchEncoding { + /// Encodes events in [Apache Arrow][apache_arrow] IPC streaming format. + /// + /// This is the streaming variant of the Arrow IPC format, which writes + /// a continuous stream of record batches. + /// + /// [apache_arrow]: https://arrow.apache.org/ + ArrowStream(ArrowStreamSerializerConfig), +} + impl fmt::Display for Format { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -106,7 +125,7 @@ pub struct ClickhouseConfig { /// This is mutually exclusive with per-event encoding based on the `format` field. #[configurable(derived)] #[serde(default)] - pub batch_encoding: Option, + pub batch_encoding: Option, #[configurable(derived)] #[serde(default)] @@ -280,6 +299,7 @@ impl ClickhouseConfig { if let Some(batch_encoding) = &self.batch_encoding { use vector_lib::codecs::BatchEncoder; + use vector_lib::codecs::encoding::BatchSerializerConfig; // Validate that batch_encoding is only compatible with ArrowStream format if self.format != Format::ArrowStream { @@ -290,14 +310,8 @@ impl ClickhouseConfig { .into()); } - let mut arrow_config = match batch_encoding { - BatchSerializerConfig::ArrowStream(config) => config.clone(), - _ => { - return Err( - "'batch_encoding' for ClickHouse must use 'arrow_stream' codec.".into(), - ); - } - }; + let ClickhouseBatchEncoding::ArrowStream(arrow_config) = batch_encoding; + let mut arrow_config = arrow_config.clone(); self.resolve_arrow_schema( client, @@ -432,7 +446,7 @@ mod tests { /// Helper to create a minimal ClickhouseConfig for testing fn create_test_config( format: Format, - batch_encoding: Option, + batch_encoding: Option, ) -> ClickhouseConfig { ClickhouseConfig { endpoint: "http://localhost:8123".parse::().unwrap().into(), @@ -465,7 +479,7 @@ mod tests { for (format, format_name) in incompatible_formats { let config = create_test_config( format, - Some(BatchSerializerConfig::ArrowStream( + Some(ClickhouseBatchEncoding::ArrowStream( ArrowStreamSerializerConfig::default(), )), ); diff --git a/src/sinks/clickhouse/integration_tests.rs b/src/sinks/clickhouse/integration_tests.rs index a32771bf55730..2e27a20340dfa 100644 --- a/src/sinks/clickhouse/integration_tests.rs +++ b/src/sinks/clickhouse/integration_tests.rs @@ -18,7 +18,7 @@ use serde::Deserialize; use serde_json::Value; use tokio::time::{Duration, timeout}; use vector_lib::{ - codecs::encoding::{ArrowStreamSerializerConfig, BatchSerializerConfig}, + codecs::encoding::ArrowStreamSerializerConfig, event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event, LogEvent}, lookup::PathPrefix, }; @@ -28,7 +28,7 @@ use crate::{ codecs::{TimestampFormat, Transformer}, config::{SinkConfig, SinkContext, log_schema}, sinks::{ - clickhouse::config::ClickhouseConfig, + clickhouse::config::{ClickhouseBatchEncoding, ClickhouseConfig}, util::{BatchConfig, Compression, TowerRequestConfig}, }, test_util::{ @@ -502,7 +502,7 @@ async fn insert_events_arrow_format() { table: table.clone().try_into().unwrap(), compression: Compression::None, format: crate::sinks::clickhouse::config::Format::ArrowStream, - batch_encoding: Some(BatchSerializerConfig::ArrowStream(Default::default())), + batch_encoding: Some(ClickhouseBatchEncoding::ArrowStream(Default::default())), batch, request: TowerRequestConfig { retry_attempts: 1, @@ -574,7 +574,7 @@ async fn insert_events_arrow_with_schema_fetching() { table: table.clone().try_into().unwrap(), compression: Compression::None, format: crate::sinks::clickhouse::config::Format::ArrowStream, - batch_encoding: Some(BatchSerializerConfig::ArrowStream(Default::default())), + batch_encoding: Some(ClickhouseBatchEncoding::ArrowStream(Default::default())), batch, request: TowerRequestConfig { retry_attempts: 1, @@ -657,7 +657,7 @@ async fn test_complex_types() { table: table.clone().try_into().unwrap(), compression: Compression::None, format: crate::sinks::clickhouse::config::Format::ArrowStream, - batch_encoding: Some(BatchSerializerConfig::ArrowStream(arrow_config)), + batch_encoding: Some(ClickhouseBatchEncoding::ArrowStream(arrow_config)), batch, request: TowerRequestConfig { retry_attempts: 1, @@ -1231,7 +1231,7 @@ async fn test_missing_required_field_emits_null_constraint_error() { table: table.clone().try_into().unwrap(), compression: Compression::None, format: crate::sinks::clickhouse::config::Format::ArrowStream, - batch_encoding: Some(BatchSerializerConfig::ArrowStream(Default::default())), + batch_encoding: Some(ClickhouseBatchEncoding::ArrowStream(Default::default())), batch, request: TowerRequestConfig { retry_attempts: 1, @@ -1323,7 +1323,7 @@ async fn arrow_schema_excludes_non_insertable_columns() { table: table.clone().try_into().unwrap(), compression: Compression::None, format: crate::sinks::clickhouse::config::Format::ArrowStream, - batch_encoding: Some(BatchSerializerConfig::ArrowStream( + batch_encoding: Some(ClickhouseBatchEncoding::ArrowStream( ArrowStreamSerializerConfig::default(), )), batch, diff --git a/src/sinks/databricks_zerobus/config.rs b/src/sinks/databricks_zerobus/config.rs index c686a4aa0c0c3..0f74b1d4e6f43 100644 --- a/src/sinks/databricks_zerobus/config.rs +++ b/src/sinks/databricks_zerobus/config.rs @@ -10,7 +10,7 @@ use crate::sinks::{ }; use vector_lib::codecs::encoding::{ - BatchEncoder, BatchSerializerConfig, ProtoBatchSerializerConfig, + BatchEncoder, BatchSerializer, ProtoBatchSerializer, ProtoBatchSerializerConfig, }; use super::{ @@ -185,10 +185,9 @@ impl SinkConfig for ZerobusSinkConfig { let proto_config = ProtoBatchSerializerConfig { descriptor: Some(descriptor), }; - let batch_serializer = BatchSerializerConfig::ProtoBatch(proto_config) - .build_batch_serializer() + let serializer = ProtoBatchSerializer::new(proto_config) .map_err(|e| format!("Failed to build batch serializer: {}", e))?; - let encoder = BatchEncoder::new(batch_serializer); + let encoder = BatchEncoder::new(BatchSerializer::ProtoBatch(serializer)); let acknowledgements_enabled = self .acknowledgements diff --git a/website/cue/reference/components/sinks/generated/clickhouse.cue b/website/cue/reference/components/sinks/generated/clickhouse.cue index a4799ecee18ec..a0f4399bdc112 100644 --- a/website/cue/reference/components/sinks/generated/clickhouse.cue +++ b/website/cue/reference/components/sinks/generated/clickhouse.cue @@ -263,31 +263,27 @@ generated: components: sinks: clickhouse: configuration: { When disabled (default), missing values for non-nullable fields will cause encoding errors, ensuring all required data is present before sending to the sink. """ - relevant_when: "codec = \"arrow_stream\"" - required: false + required: false type: bool: default: false } codec: { - description: "The codec to use for batch encoding events." - required: true - type: string: enum: { - arrow_stream: """ - Encodes events in [Apache Arrow][apache_arrow] IPC streaming format. + description: """ + Encodes events in [Apache Arrow][apache_arrow] IPC streaming format. - This is the streaming variant of the Arrow IPC format, which writes - a continuous stream of record batches. + This is the streaming variant of the Arrow IPC format, which writes + a continuous stream of record batches. - [apache_arrow]: https://arrow.apache.org/ - """ - proto_batch: """ - Encodes each event individually as a [Protocol Buffers][protobuf] message. + [apache_arrow]: https://arrow.apache.org/ + """ + required: true + type: string: enum: arrow_stream: """ + Encodes events in [Apache Arrow][apache_arrow] IPC streaming format. - Each event in the batch is serialized to protobuf bytes independently, - producing a list of byte buffers (one per event). + This is the streaming variant of the Arrow IPC format, which writes + a continuous stream of record batches. - [protobuf]: https://protobuf.dev/ - """ - } + [apache_arrow]: https://arrow.apache.org/ + """ } } } From 73f567a675cdb7bf93f22153ff758b78a10addc3 Mon Sep 17 00:00:00 2001 From: Flavio Cruz Date: Wed, 22 Apr 2026 20:02:27 +0000 Subject: [PATCH 7/8] Format the rust code --- src/sinks/clickhouse/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sinks/clickhouse/config.rs b/src/sinks/clickhouse/config.rs index 3c161006e9ed0..ec5da250d45e4 100644 --- a/src/sinks/clickhouse/config.rs +++ b/src/sinks/clickhouse/config.rs @@ -4,8 +4,8 @@ use std::fmt; use http::{Request, StatusCode, Uri}; use hyper::Body; -use vector_lib::codecs::encoding::format::SchemaProvider; use vector_lib::codecs::encoding::ArrowStreamSerializerConfig; +use vector_lib::codecs::encoding::format::SchemaProvider; use super::{ request_builder::ClickhouseRequestBuilder, From ea457d1b09d9863477635a3e8f17be2729b11b85 Mon Sep 17 00:00:00 2001 From: Flavio Cruz Date: Wed, 22 Apr 2026 21:21:53 +0000 Subject: [PATCH 8/8] Add test to confirm we always cap at 10MB for the batch size --- src/sinks/databricks_zerobus/config.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/sinks/databricks_zerobus/config.rs b/src/sinks/databricks_zerobus/config.rs index 0f74b1d4e6f43..98b4b1da8a073 100644 --- a/src/sinks/databricks_zerobus/config.rs +++ b/src/sinks/databricks_zerobus/config.rs @@ -446,6 +446,24 @@ mod tests { } } + /// When `batch.max_bytes` is `None` (user omitted the field or set it to `null`), + /// `into_batcher_settings()` must merge it against + /// `RealtimeSizeBasedDefaultBatchSettings::MAX_BYTES` (10MB) — never unbounded. + /// This guarantees the Zerobus SDK's 10MB limit cannot be exceeded at runtime + /// even without an explicit user cap. + #[test] + fn test_batch_max_bytes_none_defaults_to_10mb() { + let mut config = create_test_config(); + config.batch.max_bytes = None; + + let settings = config + .batch + .into_batcher_settings() + .expect("batch settings should build"); + + assert_eq!(settings.size_limit, 10_000_000); + } + #[test] fn test_stream_options_conversion() { let options = ZerobusStreamOptions {