diff --git a/Cargo.lock b/Cargo.lock index f92b955d32..a0544f1312 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -789,13 +789,15 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.38" +version = "0.4.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" +checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825" dependencies = [ "android-tzdata", "iana-time-zone", + "js-sys", "num-traits", + "wasm-bindgen", "windows-targets", ] @@ -2154,7 +2156,10 @@ name = "metric_engine" version = "2.2.0-alpha" dependencies = [ "anyhow", + "arrow", + "chrono", "common", + "futures", "seahash", "storage", "temp-dir", diff --git a/Cargo.toml b/Cargo.toml index 84ae73c6e3..9260e2d286 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ members = [ [workspace.dependencies] anyhow = { version = "1.0" } seahash = { version = "4" } +chrono = { version = "0.4.39" } metric_engine = { path = "src/metric_engine" } horaedb_storage = { package = "storage", path = "src/storage" } common = { path = "src/common" } diff --git a/src/metric_engine/Cargo.toml b/src/metric_engine/Cargo.toml index ceaf134241..27b9d1bde0 100644 --- a/src/metric_engine/Cargo.toml +++ b/src/metric_engine/Cargo.toml @@ -27,7 +27,10 @@ description.workspace = true [dependencies] anyhow = { workspace = true } +arrow = { workspace = true } +chrono = { workspace = true } common = { workspace = true } +futures = { workspace = true } horaedb_storage = { workspace = true } seahash = { workspace = true } thiserror = { workspace = true } diff --git a/src/metric_engine/src/metric/mod.rs b/src/metric_engine/src/metric/mod.rs index bdd1d6623c..729d46d4bf 100644 --- a/src/metric_engine/src/metric/mod.rs +++ b/src/metric_engine/src/metric/mod.rs @@ -15,36 +15,352 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc, time::Duration}; -use horaedb_storage::storage::TimeMergeStorageRef; +use anyhow::Context; +use arrow::{ + array::{ArrayRef, BinaryBuilder, Int64Builder, RecordBatch, UInt64Builder, UInt8Builder}, + datatypes::{DataType, Field, Schema, ToByteSlice}, +}; +use chrono::{Datelike, Timelike, Utc}; +use horaedb_storage::{ + storage::{TimeMergeStorageRef, WriteRequest}, + types::Timestamp, +}; +use tokio::{ + sync::{ + mpsc::{self, Receiver, Sender}, + RwLock, + }, + time::{interval_at, Instant}, +}; +use tracing::{debug, error, warn}; -use crate::{types::Sample, Result}; +use crate::{ + types::{ + hash, FieldName, FieldType, MetricName, Sample, SectionedHashMap, SegmentTimeStamp, Task, + TaskData, TimeStamp, DEFAULT_FIELD_NAME, DEFAULT_FIELD_TYPE, + }, + Result, +}; + +type MetricsData = RwLock>; + +struct MetricsCache { + pub inner: Arc>, +} + +struct MetricsInner { + // global cache data older than X days + pub global: Arc>>, + // section of cache data for latest X days + pub local: Arc>>, + // TODO: compute next day cache in advance + pub next_day: Arc, +} + +impl MetricsCache { + pub fn new(num_of_days: usize) -> Self { + let global = Arc::new(SectionedHashMap::new(128)); + let local = Arc::new(SectionedHashMap::new(num_of_days)); + let inner = Arc::new(RwLock::new(MetricsInner { + global, + local: local.clone(), + next_day: Arc::new(RwLock::new(HashMap::new())), + })); + let inner_clone = inner.clone(); + let inner_clone2 = inner.clone(); + tokio::spawn(async move { Self::evict_local(inner_clone, num_of_days).await }); + tokio::spawn(async move { Self::evict_global(inner_clone2).await }); + Self { inner } + } + + async fn evict_local(inner: Arc>, num_of_days: usize) { + // evict day cache of the earliest one from local cache + let mut interval = tokio::time::interval(Duration::from_millis(100)); + let mut last_eviction_day: i32 = 0; + loop { + interval.tick().await; + let now = chrono::Utc::now(); + let hour = now.hour(); + let day = now.num_days_from_ce(); + + if hour == 0 && day > last_eviction_day { + last_eviction_day = day; + // This write lock is acquired only here once per day. + let mut inner_guard = inner.write().await; + // use pre computed next day cache to initialize the coming day cache. + let next_day = inner_guard.next_day.clone(); + inner_guard.local.insert(day as usize, next_day).await; + // invalidate the next day cache + inner_guard.next_day = Arc::new(RwLock::new(HashMap::new())); + // evict the earlist day cache + inner_guard + .local + .evict_oldest(last_eviction_day as usize - num_of_days) + .await; + } + } + } + + async fn evict_global(_inner: Arc>) { + todo!("to evict the global cache"); + } +} + +impl MetricsInner { + pub async fn _update( + &self, + days: i32, + name: &[u8], + cache: Arc>>, + ) -> Result { + let cache_day = cache.get(days as usize).await; + // hit the cache + if let Some(ref cache_day) = cache_day { + if cache_day.read().await.contains_key(name) { + return Ok(false); + } + } + // not hit... + let cache_day = if let Some(cache_day) = cache_day { + cache_day + } else { + cache + .insert(days as usize, Arc::new(RwLock::new(HashMap::new()))) + .await + .unwrap() + }; + + // insert metric + let mut write_guard = cache_day.write().await; + let entry = write_guard.entry(name.to_vec()); + match entry { + std::collections::hash_map::Entry::Vacant(e) => { + e.insert((DEFAULT_FIELD_NAME.as_bytes().to_vec(), DEFAULT_FIELD_TYPE)); + Ok(true) + } + std::collections::hash_map::Entry::Occupied(_) => Ok(false), + } + } + + pub async fn update(&self, timestamp: TimeStamp, name: &[u8]) -> Result { + // find the cache + let now_ts = Utc::now().timestamp(); + let days = SegmentTimeStamp::day_diff(now_ts, timestamp); + // input timestamp is too old to stay in local cache + if days >= self.local.get_section_count() as i32 { + self._update(days, name, self.global.clone()).await + } else { + self._update(days, name, self.local.clone()).await + } + } +} pub struct MetricManager { inner: Arc, + cache_read: MetricsCache, + cache_write: MetricsCache, } impl MetricManager { pub fn new(storage: TimeMergeStorageRef) -> Self { + let inner = Arc::new(Inner::new(storage)); Self { - inner: Arc::new(Inner { storage }), + inner, + cache_read: MetricsCache::new(30), + cache_write: MetricsCache::new(30), } } + // update metric name cache with return value regarding if to write back + pub async fn update_metric(&self, timestamp: TimeStamp, name: &[u8]) -> Result { + let read_guard = self.cache_write.inner.read().await; + read_guard.update(timestamp, name).await + } + /// Populate metric ids from names. /// If a name does not exist, it will be created on demand. - pub async fn populate_metric_ids(&self, _samples: &mut [Sample]) -> Result<()> { - todo!() + pub async fn populate_metric_ids(&self, samples: &mut [Sample]) -> Result<()> { + // 1. update cache + let res = futures::future::join_all( + samples + .iter() + .map(|s| self.update_metric(s.timestamp, s.name.as_slice())), + ) + .await; + + // 2. TODO: as the eviction may invalid some day cache, shall we verify if the + // day-metric exists in storage, or just keep redundant data in the storage + + // 3. write back to storage if not exists + futures::future::join_all(res.iter().zip(samples.iter()).filter_map( + |(write_back, sample)| { + if let Ok(true) = write_back { + Some( + self.inner + .populate_metric_ids(sample.timestamp, sample.name.as_slice()), + ) + } else { + None + } + }, + )) + .await; + + Ok(()) } } struct Inner { - storage: TimeMergeStorageRef, + sender: Sender, } impl Inner { - async fn populate_metric_ids(&self, _samples: &mut [Sample]) -> Result<()> { - todo!() + fn new(storage: TimeMergeStorageRef) -> Self { + // TODO: need channel capacity configuration ? + let (sender, receiver) = mpsc::channel(1024); + tokio::spawn(async move { Inner::execute_write(receiver, storage).await }); + Self { sender } + } + + async fn populate_metric_ids(&self, timestamp: i64, name: &[u8]) -> Result<()> { + self.sender + .send(Task::metric_task( + timestamp, + name.to_vec(), + DEFAULT_FIELD_NAME.as_bytes().to_vec(), + DEFAULT_FIELD_TYPE, + )) + .await + .context("send ")?; + Ok(()) + } + + fn schema() -> Arc { + Arc::new(Schema::new(vec![ + Field::new("metric_name", DataType::Binary, true), + Field::new("metric_id", DataType::UInt64, true), + Field::new("field_name", DataType::Binary, true), + Field::new("field_id", DataType::UInt64, true), + Field::new("field_type", DataType::UInt8, true), + Field::new("timestamp", DataType::Int64, true), + ])) + } + + async fn batch_write_metrics(batch_tasks: Vec, storage: TimeMergeStorageRef) { + let mut start_ts: i64 = 0; + let mut end_ts: i64 = 0; + let arrays: Vec = { + let mut metric_name_builder = BinaryBuilder::new(); + let mut metric_id_builder = UInt64Builder::new(); + let mut field_name_builder = BinaryBuilder::new(); + let mut field_id_builder = UInt64Builder::new(); + let mut field_type_builder = UInt8Builder::new(); + let mut field_duration_builder = Int64Builder::new(); + let task_len = batch_tasks.len(); + + batch_tasks + .into_iter() + .enumerate() + .for_each(|(index, task)| { + let (timestamp, TaskData::Metric(name, field_name, field_type)) = + (task.timestamp, task.data); + + if index == 0 { + start_ts = timestamp; + } else if index == task_len - 1 { + end_ts = timestamp; + } + + metric_id_builder.append_value(hash(&name)); + metric_name_builder.append_value(name); + field_id_builder.append_value(hash(field_name.to_byte_slice())); + field_name_builder.append_value(field_name); + field_type_builder.append_value(field_type); + field_duration_builder.append_value(timestamp); + }); + + vec![ + Arc::new(metric_name_builder.finish()), + Arc::new(metric_id_builder.finish()), + Arc::new(field_name_builder.finish()), + Arc::new(field_id_builder.finish()), + Arc::new(field_type_builder.finish()), + Arc::new(field_duration_builder.finish()), + ] + }; + let batch = RecordBatch::try_new(Self::schema(), arrays).unwrap(); + storage + .write(WriteRequest { + batch, + time_range: (Timestamp(start_ts)..Timestamp(end_ts)).into(), + enable_check: true, + }) + .await + .unwrap_or_else(|e| { + error!("write metrics failed: {:?}", e); + }); + } + + /// Batching the tasks from the receiver + /// + /// # Cancel safety + /// + /// This method is cancellation safe. + async fn batching( + batch_tasks: &mut Vec, + receiver: &mut Receiver, + max_batch_size: usize, + ) -> bool { + match receiver.recv().await { + Some(task) => { + if batch_tasks.is_empty() + || SegmentTimeStamp::day_diff(batch_tasks[0].timestamp, task.timestamp) == 0 + { + batch_tasks.push(task); + batch_tasks.len() >= max_batch_size + } else { + true + } + } + None => { + warn!("Channel closed"); + true + } + } + } + + async fn execute_write(mut receiver: Receiver, storage: TimeMergeStorageRef) { + // TODO: make it configurable + let max_wait_time_ms = 1000; + let max_batch_size: usize = 16; + + loop { + let start = Instant::now() + Duration::from_millis(max_wait_time_ms); + let mut wait_interval = interval_at(start, Duration::from_millis(max_wait_time_ms)); + let mut batch_tasks: Vec = Vec::new(); + loop { + // loop until a batch is ready + tokio::select! { + _ = wait_interval.tick() => { + debug!("reach max wait time."); + break; + }, + finished = Self::batching(&mut batch_tasks, &mut receiver, max_batch_size) => { + debug!("get one task"); + if finished { + debug!("batching finished."); + break; + } + } + } + } + if batch_tasks.is_empty() { + continue; + } + + Self::batch_write_metrics(batch_tasks, storage.clone()).await; + } } } diff --git a/src/metric_engine/src/types.rs b/src/metric_engine/src/types.rs index 6203bc174d..b85a93b654 100644 --- a/src/metric_engine/src/types.rs +++ b/src/metric_engine/src/types.rs @@ -15,9 +15,73 @@ // specific language governing permissions and limitations // under the License. -pub struct MetricId(u64); +use std::{ + cmp::Reverse, + collections::{BinaryHeap, HashMap}, + time::{Duration, SystemTime}, +}; + +use chrono::{DateTime, Datelike, TimeZone, Utc}; +use tokio::sync::RwLock; +pub struct MetricId(pub u64); pub struct SeriesId(u64); +pub const METRIC_NAME: &str = "__name__"; +pub const DEFAULT_FIELD_NAME: &str = "value"; +pub const DEFAULT_FIELD_TYPE: u8 = 0; + +pub type MetricName = Vec; +pub type FieldName = Vec; +pub type FieldType = u8; +pub type TimeStamp = i64; + +pub struct Task { + pub create_time: Duration, + pub timestamp: TimeStamp, + pub data: TaskData, +} + +pub enum TaskData { + Metric(MetricName, FieldName, FieldType), +} + +impl Task { + pub fn metric_task( + timestamp: TimeStamp, + metric_name: MetricName, + field_name: FieldName, + field_type: FieldType, + ) -> Self { + Self { + create_time: SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(), + timestamp, + data: TaskData::Metric(metric_name, field_name, field_type), + } + } +} + +#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] +pub struct SegmentTimeStamp(i64); + +impl SegmentTimeStamp { + pub fn date(timestamp: i64) -> Self { + let dt = DateTime::from_timestamp_millis(timestamp).unwrap(); + let start_of_day = Utc + .with_ymd_and_hms(dt.year(), dt.month(), dt.day(), 0, 0, 0) + .unwrap(); + + Self(start_of_day.timestamp()) + } + + pub fn day_diff(lhs: i64, rhs: i64) -> i32 { + let lhs = DateTime::from_timestamp_millis(lhs).unwrap(); + let rhs = DateTime::from_timestamp_millis(rhs).unwrap(); + lhs.num_days_from_ce() - rhs.num_days_from_ce() + } +} + pub struct Label { pub name: Vec, pub value: Vec, @@ -28,7 +92,7 @@ pub struct Label { pub struct Sample { pub name: Vec, pub lables: Vec