diff --git a/Cargo.lock b/Cargo.lock index f92b955d32..ca5e02012a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2154,7 +2154,11 @@ name = "metric_engine" version = "2.2.0-alpha" dependencies = [ "anyhow", + "arrow", "common", + "dashmap", + "futures", + "object_store", "seahash", "storage", "temp-dir", diff --git a/Cargo.toml b/Cargo.toml index 84ae73c6e3..1a55200b4c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,7 @@ common = { path = "src/common" } thiserror = "1" bytes = "1" byteorder = "1" +dashmap = { version = "6.1.0" } datafusion = "43" parquet = { version = "53" } object_store = { version = "0.11" } diff --git a/src/metric_engine/Cargo.toml b/src/metric_engine/Cargo.toml index ceaf134241..0ecf1492e9 100644 --- a/src/metric_engine/Cargo.toml +++ b/src/metric_engine/Cargo.toml @@ -27,8 +27,12 @@ description.workspace = true [dependencies] anyhow = { workspace = true } +arrow = { workspace = true } common = { workspace = true } +dashmap = { workspace = true } +futures = { workspace = true } horaedb_storage = { workspace = true } +object_store = { workspace = true } seahash = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } diff --git a/src/metric_engine/src/data/mod.rs b/src/metric_engine/src/data/mod.rs index 96d1a9d78f..ae6a9b89d7 100644 --- a/src/metric_engine/src/data/mod.rs +++ b/src/metric_engine/src/data/mod.rs @@ -35,6 +35,7 @@ impl SampleManager { /// Populate series ids from labels. /// It will also build inverted index for labels. pub async fn persist(&self, _samples: Vec) -> Result<()> { + // 1. just write to TM storage directly todo!() } } diff --git a/src/metric_engine/src/index/cache.rs b/src/metric_engine/src/index/cache.rs new file mode 100644 index 0000000000..08624ef497 --- /dev/null +++ b/src/metric_engine/src/index/cache.rs @@ -0,0 +1,1293 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + collections::{HashMap, HashSet}, + path::Path, + sync::Arc, + time::{Duration, SystemTime}, +}; + +use anyhow::Context; +use arrow::{ + array::{ + Array, ArrayRef, BinaryArray, BinaryBuilder, ListArray, UInt64Array, UInt64Builder, + UInt8Array, UInt8Builder, + }, + buffer::OffsetBuffer, + datatypes::{DataType, Field, Schema, ToByteSlice}, + record_batch::RecordBatch, +}; +use dashmap::DashMap; +use futures::StreamExt; +use horaedb_storage::{ + config::StorageConfig, + storage::{ + CloudObjectStorage, ScanRequest, StorageRuntimes, TimeMergeStorageRef, WriteRequest, + }, + types::{ObjectStoreRef, TimeRange, Timestamp}, +}; +use tokio::{ + sync::{ + mpsc::{self, Receiver, Sender}, + RwLock, + }, + time::timeout, +}; +use tracing::{error, warn}; + +use crate::types::{ + hash, FieldName, FieldType, Label, MetricId, MetricName, Result, SegmentDuration, SeriesId, + SeriesKey, TagName, TagNames, TagValue, TagValues, DEFAULT_FIELD_NAME, DEFAULT_FIELD_TYPE, +}; + +const COLUMN_DURATION: &str = "duration"; +const COLUMN_METRIC_NAME: &str = "metric_name"; +const COLUMN_METRIC_ID: &str = "metric_id"; +const COLUMN_SERIES_ID: &str = "series_id"; +const COLUMN_FIELD_ID: &str = "field_id"; +const COLUMN_FIELD_NAME: &str = "field_name"; +const COLUMN_FIELD_TYPE: &str = "field_type"; +const COLUMN_TAG_NAMES: &str = "tag_names"; +const COLUMN_TAG_VALUES: &str = "tag_values"; +const COLUMN_TAG_NAME: &str = "tag_name"; +const COLUMN_TAG_VALUE: &str = "tag_value"; +const COLUMN_TAG_ITEM: &str = "item"; + +type ConcurrentMetricMap = RwLock>; +type ConcurrentSeriesMap = RwLock>; +type ConcurrentTagKVMap = + RwLock>>>>; + +struct MetricsCache { + cache: DashMap, + pub storage: TimeMergeStorageRef, + sender: Sender, +} +struct SeriesCache { + cache: DashMap, + pub storage: TimeMergeStorageRef, + sender: Sender, +} + +#[derive(PartialEq, Eq, Hash, Debug)] +struct SegmentSeries { + segment: SegmentDuration, + series_id: SeriesId, +} + +struct TagIndexCache { + cache: DashMap, + series_records: RwLock>, + storage: TimeMergeStorageRef, + sender: Sender, +} + +impl MetricsCache { + fn new(storage: TimeMergeStorageRef, sender: Sender) -> Self { + Self { + cache: DashMap::new(), + storage, + sender, + } + } + + #[allow(clippy::type_complexity)] + fn parse_record_batch( + batch: &RecordBatch, + index: usize, + ) -> Result<(&[u8], &[u8], u8, u64, u64, u64)> { + let metric_name = batch + .column_by_name(COLUMN_METRIC_NAME) + .context("get column failed")? + .as_any() + .downcast_ref::() + .context("parse column failed")? + .value(index); + + let field_name = batch + .column_by_name(COLUMN_FIELD_NAME) + .context("get column failed")? + .as_any() + .downcast_ref::() + .context("parse column failed")? + .value(index); + + let field_type = batch + .column_by_name(COLUMN_FIELD_TYPE) + .context("get column failed")? + .as_any() + .downcast_ref::() + .context("parse column failed")? + .value(index); + + let filed_id = batch + .column_by_name(COLUMN_FIELD_ID) + .context("get column failed")? + .as_any() + .downcast_ref::() + .context("parse column failed")? + .value(index); + + let metric_id = batch + .column_by_name(COLUMN_METRIC_ID) + .context("get column failed")? + .as_any() + .downcast_ref::() + .context("parse column failed")? + .value(index); + + let duration = batch + .column_by_name(COLUMN_DURATION) + .context("get column failed")? + .as_any() + .downcast_ref::() + .context("parse column failed")? + .value(index); + + Ok(( + metric_name, + field_name, + field_type, + filed_id, + metric_id, + duration, + )) + } + + async fn load_from_storage(&mut self) -> Result<()> { + let mut result_stream = self + .storage + .scan(ScanRequest { + range: TimeRange::new(Timestamp(0), Timestamp::MAX), + predicate: vec![], + projections: None, + }) + .await?; + while let Some(item) = result_stream.next().await { + let batch = item.context("get next batch failed")?; + for index in 0..batch.num_rows() { + let (metric_name, field_name, field_type, _, _, duration) = + MetricsCache::parse_record_batch(&batch, index)?; + self.update( + SegmentDuration::date(Duration::from_millis(duration)), + metric_name, + field_name, + field_type, + ) + .await?; + } + } + Ok(()) + } + + fn schema() -> Arc { + Arc::new(Schema::new(vec![ + Field::new(COLUMN_METRIC_NAME, DataType::Binary, true), + Field::new(COLUMN_METRIC_ID, DataType::UInt64, true), + Field::new(COLUMN_FIELD_NAME, DataType::Binary, true), + Field::new(COLUMN_FIELD_ID, DataType::UInt64, true), + Field::new(COLUMN_FIELD_TYPE, DataType::UInt8, true), + Field::new(COLUMN_DURATION, DataType::UInt64, true), + ])) + } + + async fn update( + &self, + date: SegmentDuration, + name: &[u8], + field_name: &[u8], + field_type: u8, + ) -> Result { + if self.cache.contains_key(&date) + && self + .cache + .get(&date) + .context("get key failed")? + .read() + .await + .contains_key(name) + { + Ok(false) + } else { + let result = self + .cache + .entry(date) + .or_default() + .write() + .await + .insert(name.to_vec(), (field_name.to_vec(), field_type)); + + Ok(result.is_none()) + } + } + + async fn notify_write( + &self, + current: Duration, + name: &[u8], + field_name: &[u8], + field_type: u8, + ) -> Result<()> { + self.sender + .send(Task::Metric( + current, + name.to_vec(), + field_name.to_vec(), + field_type, + )) + .await + .context("notify write failed.")?; + Ok(()) + } +} + +impl SeriesCache { + fn new(storage: TimeMergeStorageRef, sender: Sender) -> Self { + Self { + cache: DashMap::new(), + storage, + sender, + } + } + + async fn parse_record_batch( + batch: &RecordBatch, + index: usize, + ) -> Result<(u64, Vec>, Vec>, u64)> { + let series_id = batch + .column_by_name(COLUMN_SERIES_ID) + .context("get column failed")? + .as_any() + .downcast_ref::() + .context("parse column failed")? + .value(index); + + let tag_names = { + let tag_name_array = batch + .column_by_name(COLUMN_TAG_NAMES) + .context("get column failed")? + .as_any() + .downcast_ref::() + .context("parse column failed")? + .value(index); + let tag_names = tag_name_array + .as_any() + .downcast_ref::() + .context("parse column failed")?; + tag_names + .iter() + .map(|item| item.unwrap_or(b"").to_vec()) + .collect::>() + }; + + let tag_values = { + let tag_value_array = batch + .column_by_name(COLUMN_TAG_VALUES) + .context("get column failed")? + .as_any() + .downcast_ref::() + .context("parse column failed")? + .value(index); + let tag_values = tag_value_array + .as_any() + .downcast_ref::() + .expect("List elements should be BinaryArray"); + tag_values + .iter() + .map(|item| item.unwrap_or(b"").to_vec()) + .collect::>() + }; + + let duration = batch + .column_by_name(COLUMN_DURATION) + .context("get column failed")? + .as_any() + .downcast_ref::() + .context("parse column failed")? + .value(index); + Ok((series_id, tag_names, tag_values, duration)) + } + + async fn load_from_storage(&mut self) -> Result<()> { + let mut result_stream = self + .storage + .scan(ScanRequest { + range: TimeRange::new(Timestamp(0), Timestamp::MAX), + predicate: vec![], + projections: None, + }) + .await?; + while let Some(item) = result_stream.next().await { + let batch = item.context("get next batch failed.")?; + for index in 0..batch.num_rows() { + let (series_id, tag_names, tag_values, duration) = + SeriesCache::parse_record_batch(&batch, index).await?; + let labels = tag_names + .into_iter() + .zip(tag_values.into_iter()) + .map(|(name, value)| Label { name, value }) + .collect::>(); + let key = SeriesKey::new(None, labels.as_slice()); + self.update( + SegmentDuration::date(Duration::from_millis(duration)), + &SeriesId(series_id), + &key, + ) + .await?; + } + } + Ok(()) + } + + pub fn schema() -> Arc { + Arc::new(Schema::new(vec![ + Field::new(COLUMN_METRIC_ID, DataType::UInt64, true), + Field::new(COLUMN_SERIES_ID, DataType::UInt64, true), + Field::new( + COLUMN_TAG_NAMES, + DataType::List(Arc::new(Field::new( + COLUMN_TAG_ITEM, + DataType::Binary, + true, + ))), + true, + ), + Field::new( + COLUMN_TAG_VALUES, + DataType::List(Arc::new(Field::new( + COLUMN_TAG_ITEM, + DataType::Binary, + true, + ))), + true, + ), + Field::new(COLUMN_DURATION, DataType::UInt64, true), + ])) + } + + async fn update(&self, date: SegmentDuration, id: &SeriesId, key: &SeriesKey) -> Result { + if self.cache.contains_key(&date) + && self + .cache + .get(&date) + .context("get key failed")? + .read() + .await + .contains_key(id) + { + Ok(false) + } else { + let result = self + .cache + .entry(date) + .or_default() + .write() + .await + .insert(*id, key.clone()); + + Ok(result.is_none()) + } + } + + async fn notify_write( + &self, + current: Duration, + id: &SeriesId, + key: &SeriesKey, + metric_id: &MetricId, + ) -> Result<()> { + self.sender + .send(Task::Series(current, *id, key.clone(), *metric_id)) + .await + .context("notify write failed.")?; + Ok(()) + } +} + +impl TagIndexCache { + fn new(storage: TimeMergeStorageRef, sender: Sender) -> Self { + Self { + cache: DashMap::new(), + series_records: RwLock::new(HashSet::new()), + storage, + sender, + } + } + + async fn parse_record_batch( + batch: &RecordBatch, + index: usize, + ) -> Result<(u64, &[u8], &[u8], u64, u64)> { + let metric_id = batch + .column_by_name(COLUMN_METRIC_ID) + .context("get column failed")? + .as_any() + .downcast_ref::() + .context("parse column failed")? + .value(index); + + let tag_name = batch + .column_by_name(COLUMN_TAG_NAME) + .context("get column failed")? + .as_any() + .downcast_ref::() + .context("parse column failed")? + .value(index); + + let tag_value = batch + .column_by_name(COLUMN_TAG_VALUE) + .context("get column failed")? + .as_any() + .downcast_ref::() + .context("parse column failed")? + .value(index); + + let series_id = batch + .column_by_name(COLUMN_SERIES_ID) + .context("get column failed")? + .as_any() + .downcast_ref::() + .context("parse column failed")? + .value(index); + + let duration = batch + .column_by_name(COLUMN_DURATION) + .context("get column failed")? + .as_any() + .downcast_ref::() + .context("parse column failed")? + .value(index); + + Ok((metric_id, tag_name, tag_value, series_id, duration)) + } + + async fn load_from_storage(&mut self) -> Result<()> { + let mut result_stream = self + .storage + .scan(ScanRequest { + range: TimeRange::new(Timestamp(0), Timestamp::MAX), + predicate: vec![], + projections: None, + }) + .await + .unwrap(); + while let Some(item) = result_stream.next().await { + let batch = item.context("get next batch failed.")?; + for index in 0..batch.num_rows() { + let (series_id, tag_name, tag_value, metric_id, duration) = + TagIndexCache::parse_record_batch(&batch, index).await?; + self.update( + SegmentDuration::date(Duration::from_millis(duration)), + &SeriesId(series_id), + &vec![tag_name.to_vec()], + &vec![tag_value.to_vec()], + &MetricId(metric_id), + ) + .await?; + } + } + Ok(()) + } + + fn schema() -> Arc { + Arc::new(Schema::new(vec![ + Field::new(COLUMN_METRIC_ID, DataType::UInt64, true), + Field::new(COLUMN_TAG_NAME, DataType::Binary, true), + Field::new(COLUMN_TAG_VALUE, DataType::Binary, true), + Field::new(COLUMN_SERIES_ID, DataType::UInt64, true), + Field::new(COLUMN_DURATION, DataType::UInt64, true), + ])) + } + + async fn update( + &self, + date: SegmentDuration, + series_id: &SeriesId, + tag_names: &TagNames, + tag_values: &TagValues, + metric_id: &MetricId, + ) -> Result { + let segment_series = SegmentSeries { + segment: date, + series_id: *series_id, + }; + if self.series_records.read().await.contains(&segment_series) { + Ok(false) + } else { + let mut series_records = self.series_records.write().await; + if series_records.contains(&segment_series) { + Ok(false) + } else { + series_records.insert(segment_series); + let cache_lock = self.cache.entry(date).or_default(); + let mut cache_guard = cache_lock.write().await; + + let mut tag_names = tag_names.clone(); + let mut tag_values = tag_values.clone(); + remove_default_tag(&mut tag_names, &mut tag_values); + tag_names + .into_iter() + .zip(tag_values.into_iter()) + .for_each(|(name, value)| { + cache_guard + .entry(name) + .or_default() + .entry(value) + .or_default() + .entry(*metric_id) + .or_default() + .insert(*series_id); + }); + Ok(true) + } + } + } + + async fn notify_write( + &self, + current: Duration, + series_id: &SeriesId, + tag_names: &TagNames, + tag_values: &TagValues, + metric_id: &MetricId, + ) -> Result<()> { + self.sender + .send(Task::TagIndex( + current, + *series_id, + tag_names.clone(), + tag_values.clone(), + *metric_id, + )) + .await + .context("notify write failed.")?; + Ok(()) + } +} + +pub struct CacheManager { + metrics: MetricsCache, + series: SeriesCache, + tag_index: TagIndexCache, +} + +enum Task { + Metric(Duration, MetricName, FieldName, FieldType), + Series(Duration, SeriesId, SeriesKey, MetricId), + TagIndex(Duration, SeriesId, TagNames, TagValues, MetricId), +} + +struct CacheWriter { + pub receiver: Receiver, + pub storage: TimeMergeStorageRef, + pub schema: Arc, +} + +impl CacheWriter { + pub fn new( + receiver: Receiver, + storage: TimeMergeStorageRef, + schema: Arc, + ) -> Self { + Self { + receiver, + storage, + schema, + } + } +} + +async fn make_storage( + runtimes: StorageRuntimes, + store: ObjectStoreRef, + root_dir: String, + num_primary_keys: usize, + schema: Arc, +) -> Result { + Ok(Arc::new( + CloudObjectStorage::try_new( + root_dir, + Duration::from_secs(3600 * 24), // 1 day + store.clone(), + schema, + num_primary_keys, + StorageConfig::default(), + runtimes.clone(), + ) + .await?, + )) +} + +impl CacheManager { + pub async fn try_new( + runtimes: StorageRuntimes, + store: ObjectStoreRef, + root_dir: &str, + ) -> Result { + let metrics = { + let path = Path::new(root_dir).join("metrics"); + let root_dir = path.to_string_lossy().to_string(); + let schema = MetricsCache::schema(); + let storage = + make_storage(runtimes.clone(), store.clone(), root_dir, 2, schema.clone()).await?; + let (sender, receiver) = mpsc::channel(1024); + let writer = CacheWriter::new(receiver, storage.clone(), schema.clone()); + tokio::spawn(async move { execute_write(writer).await }); + let mut cache = MetricsCache::new(storage, sender); + cache.load_from_storage().await?; + cache + }; + let series = { + let path = Path::new(root_dir).join("series"); + let root_dir = path.to_string_lossy().to_string(); + let schema = SeriesCache::schema(); + let storage = + make_storage(runtimes.clone(), store.clone(), root_dir, 2, schema.clone()).await?; + let (sender, receiver) = mpsc::channel(1024); + let writer = CacheWriter::new(receiver, storage.clone(), schema.clone()); + tokio::spawn(async move { execute_write(writer).await }); + let mut cache = SeriesCache::new(storage, sender); + cache.load_from_storage().await?; + cache + }; + + let tag_index = { + let path = Path::new(root_dir).join("tag_index"); + let root_dir = path.to_string_lossy().to_string(); + let schema = TagIndexCache::schema(); + let storage = make_storage(runtimes, store, root_dir, 3, schema.clone()).await?; + let (sender, receiver) = mpsc::channel(1024); + let writer = CacheWriter::new(receiver, storage.clone(), schema); + tokio::spawn(async move { execute_write(writer).await }); + let mut cache = TagIndexCache::new(storage, sender); + cache.load_from_storage().await?; + cache + }; + + Ok(Self { + metrics, + series, + tag_index, + }) + } + + pub async fn update_metric(&self, name: &[u8]) -> Result<()> { + let current = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); + let date = SegmentDuration::date(current); + let updated = self + .metrics + .update( + date, + name, + DEFAULT_FIELD_NAME.as_bytes(), + DEFAULT_FIELD_TYPE, + ) + .await?; + if updated { + self.metrics + .notify_write( + current, + name, + DEFAULT_FIELD_NAME.as_bytes(), + DEFAULT_FIELD_TYPE, + ) + .await?; + } + Ok(()) + } + + pub async fn update_series( + &self, + id: &SeriesId, + key: &SeriesKey, + metric_id: &MetricId, + ) -> Result<()> { + let current = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); + let date = SegmentDuration::date(current); + let updated = self.series.update(date, id, key).await?; + if updated { + self.series + .notify_write(current, id, key, metric_id) + .await?; + } + Ok(()) + } + + pub async fn update_tag_index( + &self, + series_id: &SeriesId, + series_key: &SeriesKey, + metric_id: &MetricId, + ) -> Result<()> { + let current = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); + let date = SegmentDuration::date(current); + let updated = self + .tag_index + .update( + date, + series_id, + &series_key.names, + &series_key.values, + metric_id, + ) + .await?; + if updated { + self.tag_index + .notify_write( + current, + series_id, + &series_key.names, + &series_key.values, + metric_id, + ) + .await?; + } + Ok(()) + } +} + +async fn execute_write(mut writer: CacheWriter) { + let mut task_queue: Vec = Vec::new(); + let mut batch_tasks: Vec; + // TODO: make it configurable + let max_wait_time_ms: u64 = 1000; + let max_queue_length: usize = 16; + + let task_duration = |task: &Task| -> Duration { + match task { + Task::Metric(duration, _, _, _) => *duration, + Task::Series(duration, _, _, _) => *duration, + Task::TagIndex(duration, _, _, _, _) => *duration, + } + }; + + loop { + loop { + let wait_time: Duration = { + if task_queue.is_empty() { + Duration::from_millis(max_wait_time_ms) + } else { + let current = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); + let wait_in_ms = std::cmp::min( + max_wait_time_ms + - (current - task_duration(&task_queue[0])).as_millis() as u64, + 1, + ); + Duration::from_millis(wait_in_ms) + } + }; + match timeout(wait_time, writer.receiver.recv()).await { + Ok(Some(task)) => { + if task_queue.is_empty() + || SegmentDuration::same_segment( + task_duration(&task_queue[0]), + task_duration(&task), + ) + { + task_queue.push(task); + if task_queue.len() >= max_queue_length { + batch_tasks = std::mem::take(&mut task_queue); + break; + } + } else { + batch_tasks = std::mem::take(&mut task_queue); + task_queue.push(task); + break; + } + } + Ok(None) => { + warn!("Channel closed"); + return; + } + Err(_) => { + batch_tasks = std::mem::take(&mut task_queue); + break; + } + } + } + + if batch_tasks.is_empty() { + continue; + } + + match &batch_tasks[0] { + Task::Metric(_, _, _, _) => { + batch_write_metrics(batch_tasks, &writer).await; + } + Task::Series(_, _, _, _) => { + batch_write_series(batch_tasks, &writer).await; + } + Task::TagIndex(_, _, _, _, _) => { + batch_write_tag_index(batch_tasks, &writer).await; + } + } + } +} + +async fn batch_write_tag_index(batch_tasks: Vec, writer: &CacheWriter) { + let mut metrics_id_builder = UInt64Builder::new(); + let mut series_id_builder = UInt64Builder::new(); + let mut field_duration_builder = UInt64Builder::new(); + let mut tag_name_builder = BinaryBuilder::new(); + let mut tag_value_builder = BinaryBuilder::new(); + + let mut start_ts: i64 = 0; + let mut end_ts: i64 = 0; + let task_len = batch_tasks.len(); + + batch_tasks + .into_iter() + .enumerate() + .for_each(|(index, mut task)| { + if let Task::TagIndex(duration, series_id, ref mut names, ref mut values, metric_id) = + task + { + if index == 0 { + start_ts = duration.as_millis() as i64; + } else if index == task_len - 1 { + end_ts = duration.as_millis() as i64; + } + + remove_default_tag(names, values); + + names.iter().zip(values.iter()).for_each(|(name, value)| { + metrics_id_builder.append_value(metric_id.0); + tag_name_builder.append_value(name.to_byte_slice()); + tag_value_builder.append_value(value.to_byte_slice()); + series_id_builder.append_value(series_id.0); + field_duration_builder.append_value(duration.as_millis() as u64); + }); + } else { + error!("Some task are not tag index."); + } + }); + + let arrays: Vec = vec![ + Arc::new(metrics_id_builder.finish()), + Arc::new(tag_name_builder.finish()), + Arc::new(tag_value_builder.finish()), + Arc::new(series_id_builder.finish()), + Arc::new(field_duration_builder.finish()), + ]; + let batch = RecordBatch::try_new(writer.schema.clone(), arrays).unwrap(); + writer + .storage + .write(WriteRequest { + batch, + time_range: (Timestamp(start_ts)..Timestamp(end_ts)).into(), + enable_check: true, + }) + .await + .unwrap_or_else(|e| { + error!("write metrics failed: {:?}", e); + }); +} + +async fn batch_write_series(batch_tasks: Vec, writer: &CacheWriter) { + let mut metric_id_builder = UInt64Builder::new(); + let mut series_id_builder = UInt64Builder::new(); + let mut field_duration_builder = UInt64Builder::new(); + + let mut name_binary_values: Vec<&[u8]> = Vec::new(); + let mut value_binary_values: Vec<&[u8]> = Vec::new(); + + let mut start_ts: i64 = 0; + let mut end_ts: i64 = 0; + let task_len = batch_tasks.len(); + + let mut offsets: Vec = vec![0; batch_tasks.len() + 1]; + batch_tasks.iter().enumerate().for_each(|(index, task)| { + if let Task::Series(duration, id, key, metric_id) = task { + if index == 0 { + start_ts = duration.as_millis() as i64; + } else if index == task_len - 1 { + end_ts = duration.as_millis() as i64; + } + + metric_id_builder.append_value(metric_id.0); + series_id_builder.append_value(id.0); + field_duration_builder.append_value(duration.as_millis() as u64); + key.names + .iter() + .for_each(|item| name_binary_values.push(item)); + key.values + .iter() + .for_each(|item| value_binary_values.push(item)); + offsets[index + 1] = offsets[index] + key.names.len() as i32; + } else { + error!("Some task are not series."); + } + }); + + let tag_names_array = { + let name_binary_array = BinaryArray::from_vec(name_binary_values); + ListArray::try_new( + Arc::new(Field::new(COLUMN_TAG_ITEM, DataType::Binary, true)), + OffsetBuffer::new(offsets.clone().into()), + Arc::new(name_binary_array), + None, + ) + .unwrap() + }; + + let tag_values_array = { + let value_binary_array = BinaryArray::from_vec(value_binary_values); + ListArray::try_new( + Arc::new(Field::new(COLUMN_TAG_ITEM, DataType::Binary, true)), + OffsetBuffer::new(offsets.into()), + Arc::new(value_binary_array), + None, + ) + .unwrap() + }; + + let arrays: Vec = vec![ + Arc::new(metric_id_builder.finish()), + Arc::new(series_id_builder.finish()), + Arc::new(tag_names_array), + Arc::new(tag_values_array), + Arc::new(field_duration_builder.finish()), + ]; + let batch = RecordBatch::try_new(writer.schema.clone(), arrays).unwrap(); + writer + .storage + .write(WriteRequest { + batch, + time_range: (Timestamp(start_ts)..Timestamp(end_ts)).into(), + enable_check: true, + }) + .await + .unwrap_or_else(|e| { + error!("write metrics failed: {:?}", e); + }); +} + +async fn batch_write_metrics(batch_tasks: Vec, writer: &CacheWriter) { + let arrays: Vec = { + let mut metric_name_builder = BinaryBuilder::new(); + let mut metric_id_builder = UInt64Builder::new(); + let mut field_name_builder = BinaryBuilder::new(); + let mut field_id_builder = UInt64Builder::new(); + let mut field_type_builder = UInt8Builder::new(); + let mut field_duration_builder = UInt64Builder::new(); + + let mut start_ts: i64 = 0; + let mut end_ts: i64 = 0; + let task_len = batch_tasks.len(); + + batch_tasks + .into_iter() + .enumerate() + .for_each(|(index, task)| { + if let Task::Metric(duration, name, field_name, field_type) = task { + if index == 0 { + start_ts = duration.as_millis() as i64; + } else if index == task_len - 1 { + end_ts = duration.as_millis() as i64; + } + + metric_id_builder.append_value(hash(&name)); + metric_name_builder.append_value(name); + field_id_builder.append_value(hash(field_name.to_byte_slice())); + field_name_builder.append_value(field_name); + field_type_builder.append_value(field_type); + field_duration_builder.append_value(duration.as_millis() as u64); + } else { + error!("Some task are not metric."); + } + }); + + vec![ + Arc::new(metric_name_builder.finish()), + Arc::new(metric_id_builder.finish()), + Arc::new(field_name_builder.finish()), + Arc::new(field_id_builder.finish()), + Arc::new(field_type_builder.finish()), + Arc::new(field_duration_builder.finish()), + ] + }; + let batch = RecordBatch::try_new(writer.schema.clone(), arrays).unwrap(); + writer + .storage + .write(WriteRequest { + batch, + time_range: (0..10).into(), + enable_check: true, + }) + .await + .unwrap_or_else(|e| { + error!("write metrics failed: {:?}", e); + }); +} + +fn remove_default_tag(names: &mut Vec>, values: &mut Vec>) { + let mut to_remove_index = HashSet::new(); + let mut index = 0; + names.retain(|item| { + let keep = !item.starts_with(b"__"); + if !keep { + to_remove_index.insert(index); + } + index += 1; + keep + }); + let mut index = 0; + values.retain(|_| { + let keep = !to_remove_index.contains(&index); + index += 1; + keep + }); +} + +#[cfg(test)] +mod tests { + use horaedb_storage::{ + storage::ScanRequest, + types::{TimeRange, Timestamp}, + }; + use object_store::local::LocalFileSystem; + use tokio::runtime::Runtime; + + use super::*; + use crate::types::{hash, Label}; + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_cache_manager_updates() { + let rt = Arc::new(Runtime::new().unwrap()); + let runtimes = StorageRuntimes::new(rt.clone(), rt); + let store = Arc::new(LocalFileSystem::new()); + let root_dir = "/tmp/horaedb".to_string(); + let cache_manager = CacheManager::try_new(runtimes, store, root_dir.as_str()) + .await + .unwrap(); + + { + // Test update_metric + let metric_name = "metric_neo".as_bytes(); + let metric_id = MetricId(hash(metric_name)); + cache_manager.update_metric(metric_name).await.unwrap(); + + let series_id = SeriesId(11); + let lables = vec![ + Label { + name: b"label_a".to_vec(), + value: b"111".to_vec(), + }, + Label { + name: b"label_b".to_vec(), + value: b"222".to_vec(), + }, + ]; + let series_key = SeriesKey::new(Some(metric_name), lables.as_slice()); + // Test update_series + cache_manager + .update_series(&series_id, &series_key, &metric_id) + .await + .unwrap(); + // Test update_tag_index + cache_manager + .update_tag_index(&series_id, &series_key, &metric_id) + .await + .unwrap(); + } + + { + // Test update_metric + let metric_name = "metric_neo2".as_bytes(); + let metric_id = MetricId(hash(metric_name)); + cache_manager.update_metric(metric_name).await.unwrap(); + + let series_id = SeriesId(22); + let lables = vec![ + Label { + name: b"label_a".to_vec(), + value: b"111".to_vec(), + }, + Label { + name: b"label_c".to_vec(), + value: b"333".to_vec(), + }, + ]; + let series_key = SeriesKey::new(Some(metric_name), lables.as_slice()); + // Test update_series + cache_manager + .update_series(&series_id, &series_key, &metric_id) + .await + .unwrap(); + // Test update_tag_index + cache_manager + .update_tag_index(&series_id, &series_key, &metric_id) + .await + .unwrap(); + } + + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + + // scan and test file data + { + let mut result_stream = cache_manager + .metrics + .storage + .scan(ScanRequest { + range: TimeRange::new(Timestamp(0), Timestamp::MAX), + predicate: vec![], + projections: None, + }) + .await + .unwrap(); + + { + let item = result_stream.next().await; + let batch = item.unwrap().unwrap(); + let (metric_name, field_name, field_type, filed_id, metric_id, _) = + MetricsCache::parse_record_batch(&batch, 0).unwrap(); + assert_eq!(metric_name, b"metric_neo"); + assert_eq!(field_name, b"value"); + assert_eq!(field_type, 0); + assert_eq!(filed_id, 17612580310495814266); + assert_eq!(metric_id, 12417319948205937109); + } + + { + let item = result_stream.next().await; + let batch = item.unwrap().unwrap(); + let (metric_name, field_name, field_type, filed_id, metric_id, _) = + MetricsCache::parse_record_batch(&batch, 0).unwrap(); + assert_eq!(metric_name, b"metric_neo2"); + assert_eq!(field_name, b"value"); + assert_eq!(field_type, 0); + assert_eq!(filed_id, 17612580310495814266); + assert_eq!(metric_id, 17578343207158939466); + } + } + + { + let mut result_stream = cache_manager + .series + .storage + .scan(ScanRequest { + range: TimeRange::new(Timestamp(0), Timestamp::MAX), + predicate: vec![], + projections: None, + }) + .await + .unwrap(); + { + let item = result_stream.next().await; + let batch = item.unwrap().unwrap(); + let (series_id, tag_names, tag_values, _) = + SeriesCache::parse_record_batch(&batch, 0).await.unwrap(); + + assert_eq!(series_id, 11); + assert_eq!( + tag_names, + vec![ + b"__name__".to_vec(), + b"label_a".to_vec(), + b"label_b".to_vec() + ] + ); + assert_eq!( + tag_values, + vec![b"metric_neo".to_vec(), b"111".to_vec(), b"222".to_vec()] + ); + } + { + let item = result_stream.next().await; + let batch = item.unwrap().unwrap(); + let (series_id, tag_names, tag_values, _) = + SeriesCache::parse_record_batch(&batch, 0).await.unwrap(); + + assert_eq!(series_id, 22); + assert_eq!( + tag_names, + vec![ + b"__name__".to_vec(), + b"label_a".to_vec(), + b"label_c".to_vec() + ] + ); + assert_eq!( + tag_values, + vec![b"metric_neo2".to_vec(), b"111".to_vec(), b"333".to_vec()] + ); + } + } + { + let mut result_stream = cache_manager + .tag_index + .storage + .scan(ScanRequest { + range: TimeRange::new(Timestamp(0), Timestamp::MAX), + predicate: vec![], + projections: None, + }) + .await + .unwrap(); + { + let item = result_stream.next().await; + let batch = item.unwrap().unwrap(); + assert_eq!(batch.num_rows(), 3); + { + let (metric_id, tag_name, tag_value, series_id, _) = + TagIndexCache::parse_record_batch(&batch, 0).await.unwrap(); + assert_eq!(metric_id, 12417319948205937109); + assert_eq!(tag_name, b"label_a"); + assert_eq!(tag_value, b"111"); + assert_eq!(series_id, 11); + } + { + let (metric_id, tag_name, tag_value, series_id, _) = + TagIndexCache::parse_record_batch(&batch, 1).await.unwrap(); + assert_eq!(metric_id, 12417319948205937109); + assert_eq!(tag_name, b"label_b"); + assert_eq!(tag_value, b"222"); + assert_eq!(series_id, 11); + } + { + let (metric_id, tag_name, tag_value, series_id, _) = + TagIndexCache::parse_record_batch(&batch, 2).await.unwrap(); + assert_eq!(metric_id, 17578343207158939466); + assert_eq!(tag_name, b"label_a"); + assert_eq!(tag_value, b"111"); + assert_eq!(series_id, 22); + } + } + { + let item = result_stream.next().await; + let batch = item.unwrap().unwrap(); + { + let (metric_id, tag_name, tag_value, series_id, _) = + TagIndexCache::parse_record_batch(&batch, 0).await.unwrap(); + assert_eq!(metric_id, 17578343207158939466); + assert_eq!(tag_name, b"label_c"); + assert_eq!(tag_value, b"333"); + assert_eq!(series_id, 22); + } + } + } + } +} diff --git a/src/metric_engine/src/index/mod.rs b/src/metric_engine/src/index/mod.rs index 4d98db4f5b..07eba82866 100644 --- a/src/metric_engine/src/index/mod.rs +++ b/src/metric_engine/src/index/mod.rs @@ -15,30 +15,123 @@ // specific language governing permissions and limitations // under the License. +mod cache; use std::sync::Arc; -use horaedb_storage::storage::TimeMergeStorageRef; +use cache::CacheManager; +use horaedb_storage::storage::StorageRuntimes; +use object_store::local::LocalFileSystem; +use tokio::runtime::Runtime; -use crate::{types::Sample, Result}; +use crate::{ + types::{hash, MetricId, Sample, SeriesId, SeriesKey}, + Result, +}; pub struct IndexManager { inner: Arc, } impl IndexManager { - pub fn new(storage: TimeMergeStorageRef) -> Self { - Self { - inner: Arc::new(Inner { storage }), - } + pub async fn try_new() -> Result { + // TODO: maybe initialize runtime and store by config, now just make it + // compilable + let rt = Arc::new(Runtime::new().unwrap()); + let runtimes = StorageRuntimes::new(rt.clone(), rt); + let store = Arc::new(LocalFileSystem::new()); + let root_dir = "/tmp/horaedb".to_string(); + + Ok(Self { + inner: Arc::new(Inner { + cache: CacheManager::try_new(runtimes, store, root_dir.as_str()).await?, + }), + }) } /// Populate series ids from labels. /// It will also build inverted index for labels. - pub async fn populate_series_ids(&self, _samples: &mut [Sample]) -> Result<()> { - todo!() + pub async fn populate_series_ids(&self, samples: &mut [Sample]) -> Result<()> { + // 1.1 create metric id and series id + let metric_ids = samples + .iter() + .map(|s| MetricId(hash(s.name.as_slice()))) + .collect::>(); + + let series_keys = samples + .iter() + .map(|s| SeriesKey::new(Some(s.name.as_slice()), s.lables.as_slice())) + .collect::>(); + let series_ids = series_keys + .iter() + .map(|e| SeriesId(hash(e.make_bytes().as_slice()))) + .collect::>(); + + // 1.2 populate metric id and series id + samples.iter_mut().enumerate().for_each(|(i, sample)| { + sample.name_id = Some(metric_ids[i]); + sample.series_id = Some(series_ids[i]); + }); + + // 2.1 update cache metrics + futures::future::join_all( + samples + .iter() + .map(|s| self.inner.update_metrics(s.name.as_slice())), + ) + .await; + + // 2.2 update cache series + futures::future::join_all( + series_ids + .iter() + .zip(series_keys.iter().zip(metric_ids.iter())) + .map(|(id, (key, metric_id))| self.inner.update_series(id, key, metric_id)), + ) + .await; + + // 2.3 update cache tag index + futures::future::join_all( + series_ids + .iter() + .zip(series_keys.iter()) + .zip(metric_ids.iter()) + .map(|((series_id, series_key), metric_id)| { + self.inner + .update_tag_index(series_id, series_key, metric_id) + }), + ) + .await; + + Ok(()) } } struct Inner { - storage: TimeMergeStorageRef, + cache: CacheManager, +} + +impl Inner { + pub async fn update_metrics(&self, name: &[u8]) -> Result<()> { + self.cache.update_metric(name).await + } + + pub async fn update_series( + &self, + id: &SeriesId, + key: &SeriesKey, + metric_id: &MetricId, + ) -> Result<()> { + self.cache.update_series(id, key, metric_id).await + } + + pub async fn update_tag_index( + &self, + series_id: &SeriesId, + series_key: &SeriesKey, + metric_id: &MetricId, + ) -> Result<()> { + self.cache + .update_tag_index(series_id, series_key, metric_id) + .await + } } diff --git a/src/metric_engine/src/lib.rs b/src/metric_engine/src/lib.rs index 2c64fe3361..e705f00276 100644 --- a/src/metric_engine/src/lib.rs +++ b/src/metric_engine/src/lib.rs @@ -17,6 +17,8 @@ //! Metric Engine entry point. +mod data; +mod index; mod metric; mod types; diff --git a/src/metric_engine/src/metric/mod.rs b/src/metric_engine/src/metric/mod.rs index bdd1d6623c..28342da717 100644 --- a/src/metric_engine/src/metric/mod.rs +++ b/src/metric_engine/src/metric/mod.rs @@ -19,32 +19,36 @@ use std::sync::Arc; use horaedb_storage::storage::TimeMergeStorageRef; -use crate::{types::Sample, Result}; +use crate::{index::IndexManager, types::Sample, Result}; pub struct MetricManager { inner: Arc, } impl MetricManager { - pub fn new(storage: TimeMergeStorageRef) -> Self { - Self { - inner: Arc::new(Inner { storage }), - } + pub async fn try_new(storage: TimeMergeStorageRef) -> Result { + Ok(Self { + inner: Arc::new(Inner { + storage: storage.clone(), + index: IndexManager::try_new().await?, + }), + }) } /// Populate metric ids from names. /// If a name does not exist, it will be created on demand. - pub async fn populate_metric_ids(&self, _samples: &mut [Sample]) -> Result<()> { - todo!() + pub async fn populate_metric_ids(&self, samples: &mut [Sample]) -> Result<()> { + self.inner.populate_metric_ids(samples).await } } struct Inner { storage: TimeMergeStorageRef, + index: IndexManager, } impl Inner { - async fn populate_metric_ids(&self, _samples: &mut [Sample]) -> Result<()> { - todo!() + async fn populate_metric_ids(&self, samples: &mut [Sample]) -> Result<()> { + self.index.populate_series_ids(samples).await } } diff --git a/src/metric_engine/src/types.rs b/src/metric_engine/src/types.rs index 6203bc174d..89effc0d97 100644 --- a/src/metric_engine/src/types.rs +++ b/src/metric_engine/src/types.rs @@ -15,14 +15,65 @@ // specific language governing permissions and limitations // under the License. -pub struct MetricId(u64); -pub struct SeriesId(u64); +use std::{ + collections::{BTreeSet, HashMap, HashSet}, + io::Write, + time::Duration, +}; +pub type Result = common::Result; + +#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)] +pub struct MetricId(pub u64); +#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] +pub struct SeriesId(pub u64); +pub type MetricName = Vec; +pub type FieldName = Vec; +pub type FieldType = u8; +pub type TagName = Vec; +pub type TagValue = Vec; +pub type TagNames = Vec>; +pub type TagValues = Vec>; + +pub const METRIC_NAME: &str = "__name__"; +pub const DEFAULT_FIELD_NAME: &str = "value"; +pub const DEFAULT_FIELD_TYPE: u8 = 0; + +#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] +pub struct SegmentDuration(Duration); + +impl SegmentDuration { + const ONE_DAY_SECOND: u64 = 24 * 60 * 60; + + pub fn date(time: Duration) -> Self { + let now = time.as_secs(); + Self(Duration::from_secs( + now / SegmentDuration::ONE_DAY_SECOND * SegmentDuration::ONE_DAY_SECOND, + )) + } + + pub fn same_segment(lhs: Duration, rhs: Duration) -> bool { + lhs.as_secs() / SegmentDuration::ONE_DAY_SECOND + == rhs.as_secs() / SegmentDuration::ONE_DAY_SECOND + } +} +#[derive(PartialEq, PartialOrd, Eq, Ord, Clone)] pub struct Label { pub name: Vec, pub value: Vec, } +impl std::fmt::Debug for Label { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Label {{ name: {:?}, value: {:?} }}", + String::from_utf8(self.name.clone()).unwrap(), + String::from_utf8(self.value.clone()).unwrap(), + ) + } +} + /// This is the main struct used for write, optional values will be filled in /// different modules. pub struct Sample { @@ -36,6 +87,57 @@ pub struct Sample { pub series_id: Option, } +#[derive(Clone, Debug)] +pub struct SeriesKey { + pub names: TagNames, + pub values: TagValues, +} + +impl SeriesKey { + pub fn new(metric_name: Option<&[u8]>, lables: &[Label]) -> Self { + let mut sorted_key: BTreeSet<&Label> = BTreeSet::new(); + lables.iter().for_each(|item| { + sorted_key.insert(item); + }); + + let mut names = sorted_key + .iter() + .map(|item| item.name.clone()) + .collect::>(); + let mut values = sorted_key + .iter() + .map(|item| item.value.clone()) + .collect::>(); + if let Some(metric_name) = metric_name { + names.insert(0, METRIC_NAME.as_bytes().to_vec()); + values.insert(0, metric_name.to_vec()); + } + Self { names, values } + } + + pub fn make_bytes(&self) -> Vec { + let mut series_bytes: Vec = Vec::new(); + self.names + .iter() + .zip(self.values.iter()) + .for_each(|(name, value)| { + series_bytes + .write_all(name.as_slice()) + .expect("could write"); + series_bytes + .write_all(value.as_slice()) + .expect("could write"); + }); + series_bytes + } +} + pub fn hash(buf: &[u8]) -> u64 { seahash::hash(buf) } + +pub type MetricFieldMap = HashMap; +pub type SeriesKeyMap = HashMap; +pub type MetricSeriesMap = HashMap>; +pub type TagValueMap = HashMap; +pub type TagIndexMap = HashMap;