diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 3f1d8bdd3..c5b10b35e 100755 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -83,15 +83,24 @@ else () endif () message("CMAKE BUILD TYPE " ${CMAKE_BUILD_TYPE}) -if (CMAKE_BUILD_TYPE STREQUAL "Debug") - set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -g") -elseif (CMAKE_BUILD_TYPE STREQUAL "Release") - set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O2") -elseif (CMAKE_BUILD_TYPE STREQUAL "RelWithDebInfo") - set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO} -O2 -g") -elseif (CMAKE_BUILD_TYPE STREQUAL "MinSizeRel") - set(CMAKE_CXX_FLAGS_MINSIZEREL "${CMAKE_CXX_FLAGS_MINSIZEREL} -ffunction-sections -fdata-sections -Os") - set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,--gc-sections") +# Keep optimization policy external by default (caller/toolchain/CMake defaults). +set(TSFILE_OPTIMIZATION_FLAGS "" + CACHE STRING + "Optional extra optimization flags for tsfile-cpp (e.g. -O3). Empty means inherit caller defaults.") +if (TSFILE_OPTIMIZATION_FLAGS) + # Apply after CMake defaults for each config so explicit optimization can + # override default -O flags in Release/RelWithDebInfo/Debug/MinSizeRel. + set(CMAKE_CXX_FLAGS_DEBUG + "${CMAKE_CXX_FLAGS_DEBUG} ${TSFILE_OPTIMIZATION_FLAGS}") + set(CMAKE_CXX_FLAGS_RELEASE + "${CMAKE_CXX_FLAGS_RELEASE} ${TSFILE_OPTIMIZATION_FLAGS}") + set(CMAKE_CXX_FLAGS_RELWITHDEBINFO + "${CMAKE_CXX_FLAGS_RELWITHDEBINFO} ${TSFILE_OPTIMIZATION_FLAGS}") + set(CMAKE_CXX_FLAGS_MINSIZEREL + "${CMAKE_CXX_FLAGS_MINSIZEREL} ${TSFILE_OPTIMIZATION_FLAGS}") + message("cmake using: TSFILE_OPTIMIZATION_FLAGS=${TSFILE_OPTIMIZATION_FLAGS}") +else () + message("cmake using: TSFILE_OPTIMIZATION_FLAGS=") endif () message("CMAKE DEBUG: CMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS}") @@ -177,10 +186,7 @@ set(PROJECT_SRC_DIR ${PROJECT_SOURCE_DIR}/src) set(LIBRARY_INCLUDE_DIR ${PROJECT_BINARY_DIR}/include CACHE STRING "TsFile includes") set(THIRD_PARTY_INCLUDE ${PROJECT_BINARY_DIR}/third_party) -set(SAVED_CXX_FLAGS "${CMAKE_CXX_FLAGS}") -set(CMAKE_CXX_FLAGS "$ENV{CXXFLAGS} -Wall -std=c++11") add_subdirectory(third_party) -set(CMAKE_CXX_FLAGS "${SAVED_CXX_FLAGS}") add_subdirectory(src) if (BUILD_TEST) diff --git a/cpp/examples/CMakeLists.txt b/cpp/examples/CMakeLists.txt index ebe6c66c8..308e285bb 100644 --- a/cpp/examples/CMakeLists.txt +++ b/cpp/examples/CMakeLists.txt @@ -21,6 +21,7 @@ project(examples) message("Running in exampes directory") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") +set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=c11") # TsFile include dir set(SDK_INCLUDE_DIR ${PROJECT_SOURCE_DIR}/../src/) @@ -37,8 +38,13 @@ include_directories(${PROJECT_SOURCE_DIR}/../third_party/antlr4-cpp-runtime-4/ru set(BUILD_TYPE "Release") include_directories(${SDK_INCLUDE_DIR}) -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O0 -g") -set(CMAKE_CXX_FLAGS_DEBUG" ${CMAKE_CXX_FLAGS} -O0 -g") +if (DEFINED TSFILE_OPTIMIZATION_FLAGS AND NOT "${TSFILE_OPTIMIZATION_FLAGS}" STREQUAL "") + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${TSFILE_OPTIMIZATION_FLAGS}") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${TSFILE_OPTIMIZATION_FLAGS}") + message("examples using: TSFILE_OPTIMIZATION_FLAGS=${TSFILE_OPTIMIZATION_FLAGS}") +else () + message("examples using: TSFILE_OPTIMIZATION_FLAGS=") +endif () add_subdirectory(cpp_examples) add_subdirectory(c_examples) diff --git a/cpp/examples/README.md b/cpp/examples/README.md index 5f5af186a..5503eb6f3 100644 --- a/cpp/examples/README.md +++ b/cpp/examples/README.md @@ -55,6 +55,14 @@ target_link_libraries(your_target ${TSFILE_LIB}) Note: Set ${SDK_LIB} to your TSFile library directory. +### Optional Optimization Control + +By default, `tsfile-cpp` inherits optimization settings from the caller/toolchain. +If you want to override optimization for `tsfile-cpp`, pass +`TSFILE_OPTIMIZATION_FLAGS` during configure: + +Leave `TSFILE_OPTIMIZATION_FLAGS` empty to keep inherited behavior. + ## 3. Implementation Examples ### Directory Structure diff --git a/cpp/src/common/allocator/byte_stream.h b/cpp/src/common/allocator/byte_stream.h index 6d08744c3..dddc00b81 100644 --- a/cpp/src/common/allocator/byte_stream.h +++ b/cpp/src/common/allocator/byte_stream.h @@ -347,6 +347,21 @@ class ByteStream { FORCE_INLINE uint32_t total_size() const { return total_size_.load(); } FORCE_INLINE uint32_t read_pos() const { return read_pos_; }; + /** + * Seek the read cursor to an absolute offset. Re-anchors read_page_ for + * multi-page streams. + */ + void set_read_pos(uint32_t pos) { + ASSERT(pos <= total_size()); + read_pos_ = pos; + Page* p = head_.load(); + uint32_t skipped = 0; + while (p != nullptr && skipped + page_size_ <= pos) { + skipped += page_size_; + p = p->next_.load(); + } + read_page_ = p; + } FORCE_INLINE void wrapped_buf_advance_read_pos(uint32_t size) { if (size + read_pos_ > total_size_.load()) { read_pos_ = total_size_.load(); diff --git a/cpp/src/encoding/ts2diff_decoder.h b/cpp/src/encoding/ts2diff_decoder.h index 32584546d..fcaff771c 100644 --- a/cpp/src/encoding/ts2diff_decoder.h +++ b/cpp/src/encoding/ts2diff_decoder.h @@ -22,7 +22,9 @@ #include +#include #include +#include #include "common/allocator/alloc_base.h" #include "common/allocator/byte_stream.h" @@ -30,6 +32,106 @@ #include "utils/util_define.h" namespace storage { + +namespace ts2diff_java_detail { + +// Java float/double TS_2DIFF overflow page markers. +constexpr uint32_t kJavaFloatTs2DiffOverflowMagic = + 2147483646u; // Integer.MAX_VALUE - 1 +constexpr uint32_t kJavaFloatTs2DiffUnderflowMagic = + 2147483647u; // Integer.MAX_VALUE + +inline bool bitmap_marked(const std::vector& bm, int idx) { + if (bm.empty()) { + return false; + } + size_t byte_idx = static_cast(idx / 8); + if (byte_idx >= bm.size()) { + return false; + } + return (bm[byte_idx] & static_cast(1u << (idx % 8))) != 0; +} + +inline bool looks_like_ts2diff_header(common::ByteStream& in) { + int ret = common::E_OK; + uint32_t probe_mark = in.read_pos(); + int32_t write_index = 0; + int32_t bit_width = 0; + if (RET_FAIL(common::SerializationUtil::read_i32(write_index, in)) || + RET_FAIL(common::SerializationUtil::read_i32(bit_width, in))) { + in.set_read_pos(probe_mark); + return false; + } + in.set_read_pos(probe_mark); + if (write_index < 0 || write_index > 128) { + return false; + } + if (bit_width < 0 || bit_width > 64) { + return false; + } + return true; +} + +inline int consume_float_double_ts2diff_prefix( + common::ByteStream& in, bool& is_legacy_raw, int& max_point_number, + std::vector& underflow_bm, std::vector& value_overflow_bm, + int& segment_size) { + int ret = common::E_OK; + is_legacy_raw = false; + max_point_number = 0; + underflow_bm.clear(); + value_overflow_bm.clear(); + segment_size = 0; + uint32_t mark = in.read_pos(); + uint32_t tag = 0; + if (RET_FAIL(common::SerializationUtil::read_var_uint(tag, in))) { + return ret; + } + if (tag == kJavaFloatTs2DiffOverflowMagic || + tag == kJavaFloatTs2DiffUnderflowMagic) { + uint32_t n = 0; + if (RET_FAIL(common::SerializationUtil::read_var_uint(n, in))) { + return ret; + } + segment_size = static_cast(n); + int bm_len = segment_size / 8 + 1; + underflow_bm.resize(static_cast(bm_len), 0); + uint32_t read_len = 0; + if (RET_FAIL(in.read_buf(underflow_bm.data(), + static_cast(bm_len), read_len)) || + read_len != static_cast(bm_len)) { + return ret; + } + if (tag == kJavaFloatTs2DiffOverflowMagic) { + value_overflow_bm.resize(static_cast(bm_len), 0); + if (RET_FAIL(in.read_buf(value_overflow_bm.data(), + static_cast(bm_len), + read_len)) || + read_len != static_cast(bm_len)) { + return ret; + } + } + uint32_t mpn = 0; + if (RET_FAIL(common::SerializationUtil::read_var_uint(mpn, in))) { + return ret; + } + max_point_number = static_cast(mpn); + return common::E_OK; + } + + // Distinguish Java maxPointNumber prefix from legacy raw C++ block. + max_point_number = static_cast(tag); + if (!looks_like_ts2diff_header(in)) { + in.set_read_pos(mark); + is_legacy_raw = true; + } else { + segment_size = 0; + } + return common::E_OK; +} + +} // namespace ts2diff_java_detail + template class TS2DIFFDecoder : public Decoder { public: @@ -174,6 +276,7 @@ inline int64_t TS2DIFFDecoder::decode(common::ByteStream& in) { class FloatTS2DIFFDecoder : public TS2DIFFDecoder { public: + FloatTS2DIFFDecoder() = default; float decode(common::ByteStream& in) { int32_t value_int = TS2DIFFDecoder::decode(in); return common::int_to_float(value_int); @@ -184,10 +287,20 @@ class FloatTS2DIFFDecoder : public TS2DIFFDecoder { int read_int64(int64_t& ret_value, common::ByteStream& in); int read_float(float& ret_value, common::ByteStream& in); int read_double(double& ret_value, common::ByteStream& in); + + private: + bool is_legacy_raw_{false}; + int max_point_number_{0}; + double max_point_value_{1.0}; + int segment_pos_{0}; + int segment_size_{0}; + std::vector underflow_bm_; + std::vector value_overflow_bm_; }; class DoubleTS2DIFFDecoder : public TS2DIFFDecoder { public: + DoubleTS2DIFFDecoder() = default; double decode(common::ByteStream& in) { int64_t value_long = TS2DIFFDecoder::decode(in); return common::long_to_double(value_long); @@ -198,6 +311,15 @@ class DoubleTS2DIFFDecoder : public TS2DIFFDecoder { int read_int64(int64_t& ret_value, common::ByteStream& in); int read_float(float& ret_value, common::ByteStream& in); int read_double(double& ret_value, common::ByteStream& in); + + private: + bool is_legacy_raw_{false}; + int max_point_number_{0}; + double max_point_value_{1.0}; + int segment_pos_{0}; + int segment_size_{0}; + std::vector underflow_bm_; + std::vector value_overflow_bm_; }; typedef TS2DIFFDecoder IntTS2DIFFDecoder; @@ -295,7 +417,38 @@ FORCE_INLINE int FloatTS2DIFFDecoder::read_int64(int64_t& ret_value, } FORCE_INLINE int FloatTS2DIFFDecoder::read_float(float& ret_value, common::ByteStream& in) { - ret_value = decode(in); + int ret = common::E_OK; + if (current_index_ == 0) { + if (RET_FAIL(ts2diff_java_detail::consume_float_double_ts2diff_prefix( + in, is_legacy_raw_, max_point_number_, underflow_bm_, + value_overflow_bm_, segment_size_))) { + return ret; + } + max_point_value_ = + max_point_number_ <= 0 + ? 1.0 + : std::pow(10.0, static_cast(max_point_number_)); + segment_pos_ = 0; + } + if (is_legacy_raw_) { + ret_value = decode(in); + return common::E_OK; + } + int32_t value_int = TS2DIFFDecoder::decode(in); + if (!value_overflow_bm_.empty() && + ts2diff_java_detail::bitmap_marked(value_overflow_bm_, segment_pos_)) { + ret_value = common::int_to_float(value_int); + } else { + bool use_scaled = true; + if (!underflow_bm_.empty()) { + use_scaled = + ts2diff_java_detail::bitmap_marked(underflow_bm_, segment_pos_); + } + const double divisor = use_scaled ? max_point_value_ : 1.0; + ret_value = + static_cast(static_cast(value_int) / divisor); + } + segment_pos_++; return common::E_OK; } FORCE_INLINE int FloatTS2DIFFDecoder::read_double(double& ret_value, @@ -325,7 +478,37 @@ FORCE_INLINE int DoubleTS2DIFFDecoder::read_float(float& ret_value, } FORCE_INLINE int DoubleTS2DIFFDecoder::read_double(double& ret_value, common::ByteStream& in) { - ret_value = decode(in); + int ret = common::E_OK; + if (current_index_ == 0) { + if (RET_FAIL(ts2diff_java_detail::consume_float_double_ts2diff_prefix( + in, is_legacy_raw_, max_point_number_, underflow_bm_, + value_overflow_bm_, segment_size_))) { + return ret; + } + max_point_value_ = + max_point_number_ <= 0 + ? 1.0 + : std::pow(10.0, static_cast(max_point_number_)); + segment_pos_ = 0; + } + if (is_legacy_raw_) { + ret_value = decode(in); + return common::E_OK; + } + int64_t value_long = TS2DIFFDecoder::decode(in); + if (!value_overflow_bm_.empty() && + ts2diff_java_detail::bitmap_marked(value_overflow_bm_, segment_pos_)) { + ret_value = common::long_to_double(value_long); + } else { + bool use_scaled = true; + if (!underflow_bm_.empty()) { + use_scaled = + ts2diff_java_detail::bitmap_marked(underflow_bm_, segment_pos_); + } + const double divisor = use_scaled ? max_point_value_ : 1.0; + ret_value = static_cast(value_long) / divisor; + } + segment_pos_++; return common::E_OK; } diff --git a/cpp/src/encoding/ts2diff_encoder.h b/cpp/src/encoding/ts2diff_encoder.h index 8c5ddafc7..4e5d0ba4e 100644 --- a/cpp/src/encoding/ts2diff_encoder.h +++ b/cpp/src/encoding/ts2diff_encoder.h @@ -22,6 +22,10 @@ #include +#include +#include +#include + #include "common/allocator/alloc_base.h" #include "common/allocator/byte_stream.h" #include "encoder.h" @@ -271,28 +275,106 @@ inline int TS2DIFFEncoder::flush(common::ByteStream& out_stream) { class FloatTS2DIFFEncoder : public TS2DIFFEncoder { public: + FloatTS2DIFFEncoder() : max_point_number_(2), max_point_value_(100.0) {} int do_encode(float value, common::ByteStream& out_stream) { - int32_t value_int = common::float_to_int(value); + int32_t value_int = convert_float_to_int(value); return TS2DIFFEncoder::do_encode(value_int, out_stream); } + int flush(common::ByteStream& out_stream) override; int encode(bool value, common::ByteStream& out_stream); int encode(int32_t value, common::ByteStream& out_stream); int encode(int64_t value, common::ByteStream& out_stream); int encode(float value, common::ByteStream& out_stream); int encode(double value, common::ByteStream& out_stream); + + private: + int32_t convert_float_to_int(float value) { + const double scaled = static_cast(value) * max_point_value_; + if (scaled > static_cast(std::numeric_limits::max()) || + scaled < static_cast(std::numeric_limits::min())) { + if (std::isnan(value) || + value > + static_cast(std::numeric_limits::max()) || + value < + static_cast(std::numeric_limits::min())) { + underflow_flags_.push_back(-1); + return common::float_to_int(value); + } + underflow_flags_.push_back(0); + return static_cast(std::lround(value)); + } + if (std::isnan(value)) { + underflow_flags_.push_back(-1); + return common::float_to_int(value); + } + underflow_flags_.push_back(1); + return static_cast(std::lround(scaled)); + } + bool has_overflow() const { + for (int8_t f : underflow_flags_) { + if (f != 1) { + return true; + } + } + return false; + } + + private: + int max_point_number_; + double max_point_value_; + std::vector underflow_flags_; }; class DoubleTS2DIFFEncoder : public TS2DIFFEncoder { public: + DoubleTS2DIFFEncoder() : max_point_number_(2), max_point_value_(100.0) {} int do_encode(double value, common::ByteStream& out_stream) { - int64_t value_long = common::double_to_long(value); + int64_t value_long = convert_double_to_long(value); return TS2DIFFEncoder::do_encode(value_long, out_stream); } + int flush(common::ByteStream& out_stream) override; int encode(bool value, common::ByteStream& out_stream); int encode(int32_t value, common::ByteStream& out_stream); int encode(int64_t value, common::ByteStream& out_stream); int encode(float value, common::ByteStream& out_stream); int encode(double value, common::ByteStream& out_stream); + + private: + int64_t convert_double_to_long(double value) { + const double scaled = value * max_point_value_; + if (scaled > static_cast(std::numeric_limits::max()) || + scaled < static_cast(std::numeric_limits::min())) { + if (std::isnan(value) || + value > + static_cast(std::numeric_limits::max()) || + value < + static_cast(std::numeric_limits::min())) { + underflow_flags_.push_back(-1); + return common::double_to_long(value); + } + underflow_flags_.push_back(0); + return static_cast(std::llround(value)); + } + if (std::isnan(value)) { + underflow_flags_.push_back(-1); + return common::double_to_long(value); + } + underflow_flags_.push_back(1); + return static_cast(std::llround(scaled)); + } + bool has_overflow() const { + for (int8_t f : underflow_flags_) { + if (f != 1) { + return true; + } + } + return false; + } + + private: + int max_point_number_; + double max_point_value_; + std::vector underflow_flags_; }; typedef TS2DIFFEncoder IntTS2DIFFEncoder; @@ -402,5 +484,168 @@ FORCE_INLINE int DoubleTS2DIFFEncoder::encode(double value, return do_encode(value, out); } +// Keep float/double TS_2DIFF page layout compatible with Java. +FORCE_INLINE int FloatTS2DIFFEncoder::flush(common::ByteStream& out_stream) { + int ret = common::E_OK; + if (write_index_ == -1) { + return common::E_OK; + } + const int num_values = write_index_ + 1; + common::ByteStream inner(1024, common::MOD_TS2DIFF_OBJ, false); + if (RET_FAIL(common::SerializationUtil::write_var_uint( + static_cast(max_point_number_), inner))) { + return ret; + } + SIMDOps::rebase(delta_arr_, delta_arr_min_, write_index_); + int bit_width = cal_bit_width(delta_arr_max_ - delta_arr_min_); + if (RET_FAIL(common::SerializationUtil::write_ui32( + static_cast(write_index_), inner))) { + return ret; + } + if (RET_FAIL(common::SerializationUtil::write_ui32( + static_cast(bit_width), inner))) { + return ret; + } + if (RET_FAIL(common::SerializationUtil::write_ui32( + static_cast(delta_arr_min_), inner))) { + return ret; + } + if (RET_FAIL(common::SerializationUtil::write_ui32( + static_cast(first_value_), inner))) { + return ret; + } + for (int i = 0; i < write_index_; i++) { + write_bits(delta_arr_[i], bit_width, inner); + } + flush_remaining(inner); + reset(); + + const bool overflow = has_overflow(); + if (overflow) { + std::vector underflow_bitmap( + static_cast(num_values / 8 + 1), 0); + std::vector value_overflow_bitmap( + static_cast(num_values / 8 + 1), 0); + bool has_value_overflow = false; + for (int i = 0; i < num_values; i++) { + int8_t f = underflow_flags_[static_cast(i)]; + if (f == 1) { + underflow_bitmap[static_cast(i / 8)] |= + static_cast(1u << (i % 8)); + } else if (f == -1) { + has_value_overflow = true; + value_overflow_bitmap[static_cast(i / 8)] |= + static_cast(1u << (i % 8)); + } + } + constexpr uint32_t kJavaOverflowMagic = + 2147483647u; // Integer.MAX_VALUE + constexpr uint32_t kJavaValueOverflowMagic = + 2147483646u; // Integer.MAX_VALUE - 1 + if (RET_FAIL(common::SerializationUtil::write_var_uint( + has_value_overflow ? kJavaValueOverflowMagic + : kJavaOverflowMagic, + out_stream))) { + return ret; + } + if (RET_FAIL(common::SerializationUtil::write_var_uint( + static_cast(num_values), out_stream))) { + return ret; + } + const uint32_t bm_len = static_cast(num_values / 8 + 1); + if (RET_FAIL(out_stream.write_buf(underflow_bitmap.data(), bm_len))) { + return ret; + } + if (has_value_overflow && RET_FAIL(out_stream.write_buf( + value_overflow_bitmap.data(), bm_len))) { + return ret; + } + } + if (RET_FAIL(merge_byte_stream(out_stream, inner, true))) { + return ret; + } + underflow_flags_.clear(); + return ret; +} + +FORCE_INLINE int DoubleTS2DIFFEncoder::flush(common::ByteStream& out_stream) { + int ret = common::E_OK; + if (write_index_ == -1) { + return common::E_OK; + } + const int num_values = write_index_ + 1; + common::ByteStream inner(1024, common::MOD_TS2DIFF_OBJ, false); + if (RET_FAIL(common::SerializationUtil::write_var_uint( + static_cast(max_point_number_), inner))) { + return ret; + } + SIMDOps::rebase(delta_arr_, delta_arr_min_, write_index_); + int bit_width = cal_bit_width(delta_arr_max_ - delta_arr_min_); + if (RET_FAIL(common::SerializationUtil::write_i32(write_index_, inner))) { + return ret; + } + if (RET_FAIL(common::SerializationUtil::write_i32(bit_width, inner))) { + return ret; + } + if (RET_FAIL(common::SerializationUtil::write_i64(delta_arr_min_, inner))) { + return ret; + } + if (RET_FAIL(common::SerializationUtil::write_i64(first_value_, inner))) { + return ret; + } + for (int i = 0; i < write_index_; i++) { + write_bits(delta_arr_[i], bit_width, inner); + } + flush_remaining(inner); + reset(); + + const bool overflow = has_overflow(); + if (overflow) { + std::vector underflow_bitmap( + static_cast(num_values / 8 + 1), 0); + std::vector value_overflow_bitmap( + static_cast(num_values / 8 + 1), 0); + bool has_value_overflow = false; + for (int i = 0; i < num_values; i++) { + int8_t f = underflow_flags_[static_cast(i)]; + if (f == 1) { + underflow_bitmap[static_cast(i / 8)] |= + static_cast(1u << (i % 8)); + } else if (f == -1) { + has_value_overflow = true; + value_overflow_bitmap[static_cast(i / 8)] |= + static_cast(1u << (i % 8)); + } + } + constexpr uint32_t kJavaOverflowMagic = + 2147483647u; // Integer.MAX_VALUE + constexpr uint32_t kJavaValueOverflowMagic = + 2147483646u; // Integer.MAX_VALUE - 1 + if (RET_FAIL(common::SerializationUtil::write_var_uint( + has_value_overflow ? kJavaValueOverflowMagic + : kJavaOverflowMagic, + out_stream))) { + return ret; + } + if (RET_FAIL(common::SerializationUtil::write_var_uint( + static_cast(num_values), out_stream))) { + return ret; + } + const uint32_t bm_len = static_cast(num_values / 8 + 1); + if (RET_FAIL(out_stream.write_buf(underflow_bitmap.data(), bm_len))) { + return ret; + } + if (has_value_overflow && RET_FAIL(out_stream.write_buf( + value_overflow_bitmap.data(), bm_len))) { + return ret; + } + } + if (RET_FAIL(merge_byte_stream(out_stream, inner, true))) { + return ret; + } + underflow_flags_.clear(); + return ret; +} + } // end namespace storage #endif // ENCODING_TS2DIFF_ENCODER_H diff --git a/cpp/test/CMakeLists.txt b/cpp/test/CMakeLists.txt index 2be9c1b2c..9b84bf883 100644 --- a/cpp/test/CMakeLists.txt +++ b/cpp/test/CMakeLists.txt @@ -18,7 +18,6 @@ under the License. ]] cmake_minimum_required(VERSION 3.11) project(TsFile_CPP_TEST) -include(FetchContent) set(CMAKE_VERBOSE_MAKEFILE ON) @@ -33,36 +32,84 @@ set(DOWNLOADED 0) set(GTEST_URL "") set(TIMEOUT 30) -if (EXISTS ${GTEST_ZIP_PATH}) +# Treat only a real ZIP as valid (local header magic PK\x03\x04 -> hex 504b0304). +# EXISTS alone is wrong: failed downloads often leave a 0-byte file. +# Do not use plain file(READ)+string LENGTH on binary: CMake may report length > LIMIT. +set(GTEST_ZIP_LOCAL_VALID 0) +if (EXISTS "${GTEST_ZIP_PATH}") + file(READ "${GTEST_ZIP_PATH}" GTEST_ZIP_HEX_PROBE LIMIT 4 HEX) + string(STRIP "${GTEST_ZIP_HEX_PROBE}" GTEST_ZIP_HEX_PROBE) + string(TOLOWER "${GTEST_ZIP_HEX_PROBE}" GTEST_ZIP_HEX_PROBE) + if (GTEST_ZIP_HEX_PROBE MATCHES "^504b03") + set(GTEST_ZIP_LOCAL_VALID 1) + else () + message( + WARNING + "Local googletest zip is empty or not a zip (${GTEST_ZIP_PATH}); " + "will try download." + ) + file(REMOVE "${GTEST_ZIP_PATH}") + endif () +endif () + +if (GTEST_ZIP_LOCAL_VALID) message(STATUS "Using local gtest zip file: ${GTEST_ZIP_PATH}") set(DOWNLOADED 1) set(GTEST_URL ${GTEST_ZIP_PATH}) else () - message(STATUS "Local gtest zip file not found, trying to download from network...") + message(STATUS "Local gtest zip missing or invalid, trying to download from network...") endif () if (NOT DOWNLOADED) foreach (URL ${GTEST_URL_LIST}) message(STATUS "Trying to download from ${URL}") - file(DOWNLOAD ${URL} "${CMAKE_SOURCE_DIR}/third_party/googletest-release-1.12.1.zip" STATUS DOWNLOAD_STATUS TIMEOUT ${TIMEOUT}) + file(DOWNLOAD ${URL} "${GTEST_ZIP_PATH}" STATUS DOWNLOAD_STATUS TIMEOUT + ${TIMEOUT}) list(GET DOWNLOAD_STATUS 0 DOWNLOAD_RESULT) - if (${DOWNLOAD_RESULT} EQUAL 0) - set(DOWNLOADED 1) - set(GTEST_URL ${GTEST_ZIP_PATH}) - break() + if (${DOWNLOAD_RESULT} EQUAL 0 AND EXISTS "${GTEST_ZIP_PATH}") + file(READ "${GTEST_ZIP_PATH}" GTEST_ZIP_HEX_PROBE LIMIT 4 HEX) + string(STRIP "${GTEST_ZIP_HEX_PROBE}" GTEST_ZIP_HEX_PROBE) + string(TOLOWER "${GTEST_ZIP_HEX_PROBE}" GTEST_ZIP_HEX_PROBE) + if (GTEST_ZIP_HEX_PROBE MATCHES "^504b03") + set(DOWNLOADED 1) + set(GTEST_URL ${GTEST_ZIP_PATH}) + break() + else () + message(WARNING "Download from ${URL} did not yield a valid zip; trying next URL...") + file(REMOVE "${GTEST_ZIP_PATH}") + endif () endif () endforeach () endif () if (${DOWNLOADED}) message(STATUS "Successfully get googletest from ${GTEST_URL}") - FetchContent_Declare( - googletest - URL ${GTEST_URL} - ) set(gtest_force_shared_crt ON CACHE BOOL "" FORCE) - FetchContent_MakeAvailable(googletest) + # Extract GitHub release zip via CMake (top folder googletest-release-1.12.1/). + # Avoid FetchContent here: deferred populate / wrong extract dir broke configure. + set(_gtest_stage "${CMAKE_BINARY_DIR}/googletest-extract") + set(GTEST_SRC_ROOT "${_gtest_stage}/googletest-release-1.12.1") + if (NOT EXISTS "${GTEST_SRC_ROOT}/CMakeLists.txt") + file(REMOVE_RECURSE "${_gtest_stage}") + file(MAKE_DIRECTORY "${_gtest_stage}") + execute_process( + COMMAND ${CMAKE_COMMAND} -E tar xf "${GTEST_ZIP_PATH}" + WORKING_DIRECTORY "${_gtest_stage}" + RESULT_VARIABLE _gtest_tar_result + ) + if (NOT _gtest_tar_result EQUAL 0) + message(FATAL_ERROR "Failed to extract googletest zip: ${GTEST_ZIP_PATH}") + endif () + endif () + if (NOT EXISTS "${GTEST_SRC_ROOT}/CMakeLists.txt") + message( + FATAL_ERROR + "googletest zip layout unexpected (missing ${GTEST_SRC_ROOT}/CMakeLists.txt)." + ) + endif () + add_subdirectory("${GTEST_SRC_ROOT}" "${CMAKE_BINARY_DIR}/googletest-build" + EXCLUDE_FROM_ALL) set(TESTS_ENABLED ON PARENT_SCOPE) else () message(WARNING "Failed to download googletest from all provided URLs, setting TESTS_ENABLED to OFF") diff --git a/cpp/test/encoding/ts2diff_codec_test.cc b/cpp/test/encoding/ts2diff_codec_test.cc index be16d4af2..755338b30 100644 --- a/cpp/test/encoding/ts2diff_codec_test.cc +++ b/cpp/test/encoding/ts2diff_codec_test.cc @@ -19,7 +19,13 @@ #include #include +#include +#include +#include +#include #include +#include +#include #include "encoding/ts2diff_decoder.h" #include "encoding/ts2diff_encoder.h" @@ -59,6 +65,113 @@ class TS2DIFFCodecTest : public ::testing::Test { LongTS2DIFFDecoder* decoder_long_; }; +class FloatDoubleTS2DIFFCodecTest : public ::testing::Test { + protected: + void SetUp() override { + encoder_float_ = new FloatTS2DIFFEncoder(); + decoder_float_ = new FloatTS2DIFFDecoder(); + encoder_double_ = new DoubleTS2DIFFEncoder(); + decoder_double_ = new DoubleTS2DIFFDecoder(); + } + + void TearDown() override { + if (encoder_float_ != nullptr) { + encoder_float_->destroy(); + delete encoder_float_; + encoder_float_ = nullptr; + } + if (encoder_double_ != nullptr) { + encoder_double_->destroy(); + delete encoder_double_; + encoder_double_ = nullptr; + } + delete decoder_float_; + decoder_float_ = nullptr; + delete decoder_double_; + decoder_double_ = nullptr; + } + + FloatTS2DIFFEncoder* encoder_float_{nullptr}; + DoubleTS2DIFFEncoder* encoder_double_{nullptr}; + FloatTS2DIFFDecoder* decoder_float_{nullptr}; + DoubleTS2DIFFDecoder* decoder_double_{nullptr}; +}; + +static std::string byte_stream_to_hex(common::ByteStream& stream) { + uint32_t mark = stream.read_pos(); + uint32_t size = stream.total_size(); + std::vector buf(size); + uint32_t read_len = 0; + EXPECT_EQ(stream.read_buf(buf.data(), size, read_len), common::E_OK); + EXPECT_EQ(read_len, size); + stream.set_read_pos(mark); + + std::ostringstream oss; + for (uint32_t i = 0; i < size; i++) { + if (i > 0) { + oss << " "; + } + oss << std::uppercase << std::hex << std::setw(2) << std::setfill('0') + << static_cast(buf[i]); + } + return oss.str(); +} + +TEST_F(FloatDoubleTS2DIFFCodecTest, TestFloatRoundTrip) { + common::ByteStream out_stream(1024, common::MOD_TS2DIFF_OBJ, false); + const int row_num = 1000; + std::vector data(row_num); + for (int i = 0; i < row_num; i++) { + data[i] = static_cast(i) * 0.25f + 0.50f; + } + for (int i = 0; i < row_num; i++) { + EXPECT_EQ(encoder_float_->encode(data[i], out_stream), common::E_OK); + } + EXPECT_EQ(encoder_float_->flush(out_stream), common::E_OK); + + float x = 0.f; + for (int i = 0; i < row_num; i++) { + EXPECT_EQ(decoder_float_->read_float(x, out_stream), common::E_OK); + EXPECT_FLOAT_EQ(x, data[i]) << "row " << i; + } + EXPECT_FALSE(decoder_float_->has_remaining(out_stream)); +} + +TEST_F(FloatDoubleTS2DIFFCodecTest, TestFloatJavaDefaultHexCompatibility) { + common::ByteStream out_stream(1024, common::MOD_TS2DIFF_OBJ, false); + const float data[] = {3.123456768E20f, std::nanf("")}; + + for (float v : data) { + EXPECT_EQ(encoder_float_->encode(v, out_stream), common::E_OK); + } + EXPECT_EQ(encoder_float_->flush(out_stream), common::E_OK); + + const std::string expected_hex = + "FE FF FF FF 07 02 00 03 02 00 00 00 01 00 00 00 00 1E 38 8A AA 61 87 " + "75 56"; + EXPECT_EQ(byte_stream_to_hex(out_stream), expected_hex); +} + +TEST_F(FloatDoubleTS2DIFFCodecTest, TestDoubleRoundTrip) { + common::ByteStream out_stream(1024, common::MOD_TS2DIFF_OBJ, false); + const int row_num = 800; + std::vector data(row_num); + for (int i = 0; i < row_num; i++) { + data[i] = static_cast(i) * 0.25 + 0.5; + } + for (int i = 0; i < row_num; i++) { + EXPECT_EQ(encoder_double_->encode(data[i], out_stream), common::E_OK); + } + EXPECT_EQ(encoder_double_->flush(out_stream), common::E_OK); + + double y = 0.; + for (int i = 0; i < row_num; i++) { + EXPECT_EQ(decoder_double_->read_double(y, out_stream), common::E_OK); + EXPECT_DOUBLE_EQ(y, data[i]) << "row " << i; + } + EXPECT_FALSE(decoder_double_->has_remaining(out_stream)); +} + TEST_F(TS2DIFFCodecTest, TestIntEncoding1) { common::ByteStream out_stream(1024, common::MOD_TS2DIFF_OBJ, false); const int row_num = 10000; diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java index 481a4d6c2..69aded79d 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java @@ -2433,11 +2433,15 @@ public long selfCheck( Decoder.getDecoderByType( chunkHeader.getEncodingType(), chunkHeader.getDataType()); ByteBuffer pageData = readPage(pageHeader, chunkHeader.getCompressionType()); + TSEncoding configuredTimeEncoding = + TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()); + boolean isTimeColumn = + (chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK) + == TsFileConstant.TIME_COLUMN_MASK; + TSEncoding selectedTimeEncoding = + isTimeColumn ? chunkHeader.getEncodingType() : configuredTimeEncoding; Decoder timeDecoder = - Decoder.getDecoderByType( - TSEncoding.valueOf( - TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), - TSDataType.INT64); + Decoder.getDecoderByType(selectedTimeEncoding, TSDataType.INT64); if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK) == TsFileConstant.TIME_COLUMN_MASK) { // Time Chunk with only one page diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java index 85073a456..acc9789e4 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java @@ -250,7 +250,7 @@ private AbstractAlignedPageReader constructAlignedPageReader( return constructPageReader( timePageHeader, timePageData, - defaultTimeDecoder, + getTimeDecoder(timeChunkHeader.getEncodingType()), valuePageHeaderList, lazyLoadPageDataArray, valueDataTypeList, diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractChunkReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractChunkReader.java index caaa58751..aa675ba38 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractChunkReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractChunkReader.java @@ -19,7 +19,6 @@ package org.apache.tsfile.read.reader.chunk; -import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.encoding.decoder.Decoder; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.enums.TSEncoding; @@ -35,10 +34,9 @@ public abstract class AbstractChunkReader implements IChunkReader { - protected final Decoder defaultTimeDecoder = - Decoder.getDecoderByType( - TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), - TSDataType.INT64); + protected Decoder getTimeDecoder(TSEncoding actualTimeEncoding) { + return Decoder.getDecoderByType(actualTimeEncoding, TSDataType.INT64); + } protected final long readStopTime; diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java index c2b2301d7..9d917fa8d 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java @@ -153,7 +153,7 @@ private PageReader constructPageReader(PageHeader pageHeader) { chunkDataBuffer.array(), currentPagePosition, unCompressor, encryptParam), chunkHeader.getDataType(), chunkHeader.calculateDecoderForNonTimeChunk(), - defaultTimeDecoder, + getTimeDecoder(chunkHeader.getEncodingType()), queryFilter); reader.setDeleteIntervalList(deleteIntervalList); return reader;