From 5ebd441c88b0e57a5d131e219c44af29d78e4fab Mon Sep 17 00:00:00 2001 From: neochen Date: Sun, 19 Jan 2025 18:02:01 -0600 Subject: [PATCH 1/9] add index cache layer --- src/metric_engine/src/data/mod.rs | 1 + src/metric_engine/src/index/cache.rs | 49 +++++++++++++++++ src/metric_engine/src/index/mod.rs | 78 ++++++++++++++++++++++++++-- src/metric_engine/src/lib.rs | 2 + src/metric_engine/src/metric/mod.rs | 15 ++++-- src/metric_engine/src/types.rs | 44 +++++++++++++++- 6 files changed, 180 insertions(+), 9 deletions(-) create mode 100644 src/metric_engine/src/index/cache.rs diff --git a/src/metric_engine/src/data/mod.rs b/src/metric_engine/src/data/mod.rs index 96d1a9d78f..ae6a9b89d7 100644 --- a/src/metric_engine/src/data/mod.rs +++ b/src/metric_engine/src/data/mod.rs @@ -35,6 +35,7 @@ impl SampleManager { /// Populate series ids from labels. /// It will also build inverted index for labels. pub async fn persist(&self, _samples: Vec) -> Result<()> { + // 1. just write to TM storage directly todo!() } } diff --git a/src/metric_engine/src/index/cache.rs b/src/metric_engine/src/index/cache.rs new file mode 100644 index 0000000000..48f6cc3799 --- /dev/null +++ b/src/metric_engine/src/index/cache.rs @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; + +use crate::types::{ + FieldName, FieldType, MetricId, MetricName, SegmentDuration, SeriesId, SeriesKey, TagName, + TagValue, +}; + +type MetricsCache = HashMap>; +type SeriesCache = HashMap>; +type TagIndexCache = + HashMap>>>; + +#[derive(Default)] +pub struct CacheManager { + metrics: MetricsCache, + series: SeriesCache, + tag_index: TagIndexCache, +} + +impl CacheManager { + pub fn update_metric() -> Option<()> { + todo!() + } + + pub fn update_series() -> Option<()> { + todo!() + } + + pub fn update_tag_index() -> Option<()> { + todo!() + } +} diff --git a/src/metric_engine/src/index/mod.rs b/src/metric_engine/src/index/mod.rs index 4d98db4f5b..2f1cd35d71 100644 --- a/src/metric_engine/src/index/mod.rs +++ b/src/metric_engine/src/index/mod.rs @@ -15,11 +15,19 @@ // specific language governing permissions and limitations // under the License. +mod cache; use std::sync::Arc; +use cache::CacheManager; use horaedb_storage::storage::TimeMergeStorageRef; -use crate::{types::Sample, Result}; +use crate::{ + types::{ + hash, FieldName, FieldType, MetricId, MetricName, Sample, SegmentDuration, SeriesId, + SeriesKey, TagName, TagValue, + }, + Result, +}; pub struct IndexManager { inner: Arc, @@ -28,17 +36,81 @@ pub struct IndexManager { impl IndexManager { pub fn new(storage: TimeMergeStorageRef) -> Self { Self { - inner: Arc::new(Inner { storage }), + inner: Arc::new(Inner { + storage, + cache: CacheManager::default(), + }), } } /// Populate series ids from labels. /// It will also build inverted index for labels. - pub async fn populate_series_ids(&self, _samples: &mut [Sample]) -> Result<()> { + pub async fn populate_series_ids(&self, samples: &mut [Sample]) -> Result<()> { + // 1. create metric id and series id + let metric_ids = samples + .iter() + .map(|s| MetricId(hash(s.name.as_slice()))) + .collect::>(); + + let series_keys = samples + .iter() + .map(|s| SeriesKey::new(s.name.as_slice(), s.lables.as_slice())) + .collect::>(); + let series_ids = series_keys + .iter() + .map(|e| SeriesId(hash(e.make_bytes().as_slice()))) + .collect::>(); + + samples.iter_mut().enumerate().for_each(|(i, sample)| { + sample.name_id = Some(metric_ids[i]); + sample.series_id = Some(series_ids[i]); + }); + // 2. cache metrics + samples + .iter() + .for_each(|s| self.inner.create_metrics(s.name.as_slice())); + + // 3. cache series + series_ids + .iter() + .zip(series_keys.iter()) + .for_each(|(id, key)| self.inner.create_series(id, key)); + + // 4. cache tag index + series_ids + .iter() + .zip(series_keys.iter()) + .zip(metric_ids.iter()) + .for_each(|((series_id, series_key), metric_id)| { + self.inner + .create_tag_index(series_id, series_key, metric_id) + }); + + // 5. write all todo!() } } struct Inner { storage: TimeMergeStorageRef, + cache: CacheManager, +} + +impl Inner { + pub fn create_metrics(&self, name: &[u8]) { + todo!() + } + + pub fn create_series(&self, id: &SeriesId, key: &SeriesKey) { + todo!() + } + + pub fn create_tag_index( + &self, + series_id: &SeriesId, + series_key: &SeriesKey, + metric_id: &MetricId, + ) { + todo!() + } } diff --git a/src/metric_engine/src/lib.rs b/src/metric_engine/src/lib.rs index 2c64fe3361..e705f00276 100644 --- a/src/metric_engine/src/lib.rs +++ b/src/metric_engine/src/lib.rs @@ -17,6 +17,8 @@ //! Metric Engine entry point. +mod data; +mod index; mod metric; mod types; diff --git a/src/metric_engine/src/metric/mod.rs b/src/metric_engine/src/metric/mod.rs index bdd1d6623c..69f8e77baf 100644 --- a/src/metric_engine/src/metric/mod.rs +++ b/src/metric_engine/src/metric/mod.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use horaedb_storage::storage::TimeMergeStorageRef; -use crate::{types::Sample, Result}; +use crate::{index::IndexManager, types::Sample, Result}; pub struct MetricManager { inner: Arc, @@ -28,23 +28,30 @@ pub struct MetricManager { impl MetricManager { pub fn new(storage: TimeMergeStorageRef) -> Self { Self { - inner: Arc::new(Inner { storage }), + inner: Arc::new(Inner { + storage: storage.clone(), + index: IndexManager::new(storage), + }), } } /// Populate metric ids from names. /// If a name does not exist, it will be created on demand. pub async fn populate_metric_ids(&self, _samples: &mut [Sample]) -> Result<()> { + // 1. call index manager to create index + + // 2. call data manager to write samples todo!() } } struct Inner { storage: TimeMergeStorageRef, + index: IndexManager, } impl Inner { - async fn populate_metric_ids(&self, _samples: &mut [Sample]) -> Result<()> { - todo!() + async fn populate_metric_ids(&self, samples: &mut [Sample]) -> Result<()> { + self.index.populate_series_ids(samples).await } } diff --git a/src/metric_engine/src/types.rs b/src/metric_engine/src/types.rs index 6203bc174d..9f6f1e35ee 100644 --- a/src/metric_engine/src/types.rs +++ b/src/metric_engine/src/types.rs @@ -14,10 +14,20 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +use std::{collections::BTreeSet, io::Write, time::Duration}; -pub struct MetricId(u64); -pub struct SeriesId(u64); +#[derive(Copy, Clone)] +pub struct MetricId(pub u64); +#[derive(Copy, Clone)] +pub struct SeriesId(pub u64); +pub type SegmentDuration = Duration; +pub type MetricName = Vec; +pub type FieldName = Vec; +pub type FieldType = usize; +pub type TagName = Vec; +pub type TagValue = Vec; +#[derive(PartialEq, PartialOrd, Eq, Ord, Clone)] pub struct Label { pub name: Vec, pub value: Vec, @@ -36,6 +46,36 @@ pub struct Sample { pub series_id: Option, } +pub struct SeriesKey(Vec