Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
765960e
[refactor](be) Refactor file table reader stack
Gabriel39 Jun 12, 2026
b000e93
Fix 0612 (#64456)
Gabriel39 Jun 12, 2026
b4948c3
[refactor](be) Normalize parquet complex schema projection (#64451)
suxiaogang223 Jun 12, 2026
9166a9a
support truncate_char_or_varchar (#64463)
zhangstar333 Jun 12, 2026
c82510f
Fix wrong map (#64474)
Gabriel39 Jun 12, 2026
67d2e6e
fix (#64475)
Gabriel39 Jun 12, 2026
e9db346
fix compile (#64477)
Gabriel39 Jun 12, 2026
0a92095
fix(column-mapper): handle nested predicates on evolved complex colum…
Gabriel39 Jun 13, 2026
cdebd7e
fix rf (#64481)
Gabriel39 Jun 13, 2026
d336826
fix paimon (#64482)
Gabriel39 Jun 13, 2026
c57eb1d
[test](regression) Update parquet int96 timestamp outputs
suxiaogang223 Jun 15, 2026
a1041ad
[fix](be) Allow nullable parquet map keys
suxiaogang223 Jun 15, 2026
f842166
[fix](be) Execute runtime filter wrapper expressions
suxiaogang223 Jun 15, 2026
c17033c
[fix](be) Support decimal256 in new parquet reader
suxiaogang223 Jun 15, 2026
4f81823
[fix](be) Count parquet lazy materialized rows
suxiaogang223 Jun 15, 2026
8058e3f
fix (#64526)
Gabriel39 Jun 15, 2026
04a4ae6
[chore](be) Clean up format v2 code style
suxiaogang223 Jun 15, 2026
6b72505
fix some hive case failed (#64534)
zhangstar333 Jun 16, 2026
ba2c63d
unit tests for file scanner and AccessPathParser (#64543)
Gabriel39 Jun 16, 2026
a156331
clang format
suxiaogang223 Jun 16, 2026
95900c7
[test](regression) Update external timestamp outputs
suxiaogang223 Jun 16, 2026
f8c4689
fix by_index column mapper with struct type (#64567)
zhangstar333 Jun 16, 2026
f21fc29
[refactor](be) Unify new parquet profile definitions
suxiaogang223 Jun 16, 2026
7db9584
[refactor](be) Move format v2 implementations under format namespace
suxiaogang223 Jun 16, 2026
3a625c4
[test](regression) Update timestamp regression outputs
suxiaogang223 Jun 16, 2026
a66cad5
[fix](be) Support parquet timestamp nanos in new reader
suxiaogang223 Jun 16, 2026
e3379ea
[refactor](be) Refine parquet reader pruning and documentation
suxiaogang223 Jun 16, 2026
974f56b
[comment](be) Rewrite documentation comments for core Parquet modules
suxiaogang223 Jun 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,4 @@ compile_commands.json
.github

.worktrees/
.worktree_initialized
148 changes: 148 additions & 0 deletions be/src/core/data_type_serde/data_type_datetimev2_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "core/data_type/data_type_decimal.h"
#include "core/data_type/data_type_number.h"
#include "core/data_type/primitive_type.h"
#include "core/data_type_serde/decoded_column_view.h"
#include "core/types.h"
#include "core/value/vdatetime_value.h"
#include "exprs/function/cast/cast_to_datetimev2_impl.hpp"
Expand All @@ -43,6 +44,95 @@ enum {
namespace doris {
static const int64_t micro_to_nano_second = 1000;

namespace {

#pragma pack(1)
struct DecodedInt96Timestamp {
int64_t nanos_of_day;
int32_t julian_day;

int64_t to_timestamp_micros() const {
static constexpr int32_t JULIAN_EPOCH_OFFSET_DAYS = 2440588;
static constexpr int64_t MICROS_IN_DAY = 86400000000;
static constexpr int64_t NANOS_PER_MICROSECOND = 1000;
return (julian_day - JULIAN_EPOCH_OFFSET_DAYS) * MICROS_IN_DAY +
nanos_of_day / NANOS_PER_MICROSECOND;
}
};
#pragma pack()
static_assert(sizeof(DecodedInt96Timestamp) == 12);

Status append_datetimev2_from_epoch_micros(ColumnDateTimeV2::Container& data,
int64_t timestamp_micros) {
static constexpr int64_t MICROS_PER_SECOND = 1000000;
static constexpr int64_t MICROS_PER_MINUTE = MICROS_PER_SECOND * 60;
static constexpr int64_t MICROS_PER_HOUR = MICROS_PER_MINUTE * 60;
static constexpr int64_t MICROS_PER_DAY = MICROS_PER_HOUR * 24;
static const int64_t EPOCH_DAYNR = calc_daynr(1970, 1, 1);

int64_t days_since_epoch = timestamp_micros / MICROS_PER_DAY;
int64_t micros_of_day = timestamp_micros % MICROS_PER_DAY;
if (micros_of_day < 0) {
micros_of_day += MICROS_PER_DAY;
--days_since_epoch;
}

const int64_t daynr = EPOCH_DAYNR + days_since_epoch;
if (daynr <= 0) {
return Status::DataQualityError(
"Decoded DATETIMEV2 timestamp is out of range: micros={}, daynr={}",
timestamp_micros, daynr);
}

DateV2Value<DateTimeV2ValueType> datetime_value;
if (!datetime_value.get_date_from_daynr(static_cast<uint64_t>(daynr))) {
return Status::DataQualityError(
"Decoded DATETIMEV2 timestamp is out of range: micros={}, daynr={}",
timestamp_micros, daynr);
}

const auto hour = static_cast<uint8_t>(micros_of_day / MICROS_PER_HOUR);
micros_of_day %= MICROS_PER_HOUR;
const auto minute = static_cast<uint8_t>(micros_of_day / MICROS_PER_MINUTE);
micros_of_day %= MICROS_PER_MINUTE;
const auto second = static_cast<uint16_t>(micros_of_day / MICROS_PER_SECOND);
const auto microsecond = static_cast<uint32_t>(micros_of_day % MICROS_PER_SECOND);
datetime_value.unchecked_set_time(datetime_value.year(), datetime_value.month(),
datetime_value.day(), hour, minute, second, microsecond);
data.push_back(datetime_value);
return Status::OK();
}

void append_datetimev2_from_utc_epoch_micros(ColumnDateTimeV2::Container& data,
int64_t timestamp_micros,
const cctz::time_zone& timezone) {
static constexpr int64_t MICROS_PER_SECOND = 1000000;

int64_t epoch_seconds = timestamp_micros / MICROS_PER_SECOND;
int64_t micros_of_second = timestamp_micros % MICROS_PER_SECOND;
if (micros_of_second < 0) {
micros_of_second += MICROS_PER_SECOND;
--epoch_seconds;
}

DateV2Value<DateTimeV2ValueType> datetime_value;
datetime_value.from_unixtime(epoch_seconds, timezone);
datetime_value.set_microsecond(static_cast<uint32_t>(micros_of_second));
data.push_back(datetime_value);
}

int64_t decoded_timestamp_micros(const DecodedColumnView& view, int64_t value) {
if (view.time_unit == DecodedTimeUnit::MILLIS) {
return value * 1000;
}
if (view.time_unit == DecodedTimeUnit::NANOS) {
return value / 1000;
}
return value;
}

} // namespace

// NOLINTBEGIN(readability-function-size)
// NOLINTBEGIN(readability-function-cognitive-complexity)
Status DataTypeDateTimeV2SerDe::from_string_batch(const ColumnString& col_str,
Expand Down Expand Up @@ -451,6 +541,64 @@ Status DataTypeDateTimeV2SerDe::read_column_from_arrow(IColumn& column,
return Status::OK();
}

Status DataTypeDateTimeV2SerDe::read_column_from_decoded_values(
IColumn& column, const DecodedColumnView& view) const {
if (view.value_kind != DecodedValueKind::INT64 && view.value_kind != DecodedValueKind::INT96) {
return decoded_column_view_handle_conversion_failure(
column, view,
Status::NotSupported("DATETIMEV2 decoded reader expects INT64 or INT96 source"));
}
if (view.values == nullptr && decoded_column_view_has_non_null_value(view)) {
return Status::Corruption("Decoded value buffer is null for {}", column.get_name());
}
auto& data = assert_cast<ColumnDateTimeV2&>(column).get_data();
const auto old_size = data.size();
if (view.value_kind == DecodedValueKind::INT96) {
const auto* values = reinterpret_cast<const DecodedInt96Timestamp*>(view.values);
for (int64_t row = 0; row < view.row_count; ++row) {
if (decoded_column_view_row_is_null(view, row)) {
data.push_back(DateV2Value<DateTimeV2ValueType>());
continue;
}
auto st = append_datetimev2_from_epoch_micros(data, values[row].to_timestamp_micros());
if (!st.ok()) {
if (decoded_column_view_can_null_on_conversion_failure(view)) {
decoded_column_view_insert_null_on_conversion_failure(column, view, row);
continue;
}
data.resize(old_size);
return st;
}
}
return Status::OK();
}

const auto* values = reinterpret_cast<const int64_t*>(view.values);
static const auto utc_timezone = cctz::utc_time_zone();
const auto& timezone = view.timezone == nullptr ? utc_timezone : *view.timezone;
for (int64_t row = 0; row < view.row_count; ++row) {
if (decoded_column_view_row_is_null(view, row)) {
data.push_back(DateV2Value<DateTimeV2ValueType>());
continue;
}
const int64_t timestamp_micros = decoded_timestamp_micros(view, values[row]);
if (view.timestamp_is_adjusted_to_utc) {
append_datetimev2_from_utc_epoch_micros(data, timestamp_micros, timezone);
} else {
auto st = append_datetimev2_from_epoch_micros(data, timestamp_micros);
if (!st.ok()) {
if (decoded_column_view_can_null_on_conversion_failure(view)) {
decoded_column_view_insert_null_on_conversion_failure(column, view, row);
continue;
}
data.resize(old_size);
return st;
}
}
}
return Status::OK();
}

Status DataTypeDateTimeV2SerDe::write_column_to_mysql_binary(const IColumn& column,
MysqlRowBinaryBuffer& result,
int64_t row_idx, bool col_const,
Expand Down
2 changes: 2 additions & 0 deletions be/src/core/data_type_serde/data_type_datetimev2_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ class DataTypeDateTimeV2SerDe : public DataTypeNumberSerDe<PrimitiveType::TYPE_D
const cctz::time_zone& ctz) const override;
Status read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int64_t start,
int64_t end, const cctz::time_zone& ctz) const override;
Status read_column_from_decoded_values(IColumn& column,
const DecodedColumnView& view) const override;

Status write_column_to_mysql_binary(const IColumn& column, MysqlRowBinaryBuffer& row_buffer,
int64_t row_idx, bool col_const,
Expand Down
24 changes: 24 additions & 0 deletions be/src/core/data_type_serde/data_type_datev2_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "core/data_type/data_type_decimal.h"
#include "core/data_type/data_type_number.h"
#include "core/data_type/define_primitive_type.h"
#include "core/data_type_serde/decoded_column_view.h"
#include "core/types.h"
#include "core/value/vdatetime_value.h"
#include "exprs/function/cast/cast_to_datev2_impl.hpp"
Expand Down Expand Up @@ -125,6 +126,29 @@ Status DataTypeDateV2SerDe::read_column_from_arrow(IColumn& column, const arrow:
return Status::OK();
}

Status DataTypeDateV2SerDe::read_column_from_decoded_values(IColumn& column,
const DecodedColumnView& view) const {
if (view.value_kind != DecodedValueKind::INT32) {
return decoded_column_view_handle_conversion_failure(
column, view, Status::NotSupported("DATEV2 decoded reader expects INT32 source"));
}
if (view.values == nullptr && decoded_column_view_has_non_null_value(view)) {
return Status::Corruption("Decoded value buffer is null for {}", column.get_name());
}
auto& data = assert_cast<ColumnDateV2&>(column).get_data();
const auto* values = reinterpret_cast<const int32_t*>(view.values);
for (int64_t row = 0; row < view.row_count; ++row) {
if (decoded_column_view_row_is_null(view, row)) {
data.push_back(DateV2Value<DateV2ValueType>());
continue;
}
DateV2Value<DateV2ValueType> date_v2;
date_v2.get_date_from_daynr(values[row] + date_threshold);
data.push_back(date_v2);
}
return Status::OK();
}

Status DataTypeDateV2SerDe::write_column_to_mysql_binary(const IColumn& column,
MysqlRowBinaryBuffer& result,
int64_t row_idx, bool col_const,
Expand Down
2 changes: 2 additions & 0 deletions be/src/core/data_type_serde/data_type_datev2_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class DataTypeDateV2SerDe : public DataTypeNumberSerDe<PrimitiveType::TYPE_DATEV
const cctz::time_zone& ctz) const override;
Status read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int64_t start,
int64_t end, const cctz::time_zone& ctz) const override;
Status read_column_from_decoded_values(IColumn& column,
const DecodedColumnView& view) const override;
Status write_column_to_mysql_binary(const IColumn& column, MysqlRowBinaryBuffer& row_buffer,
int64_t row_idx, bool col_const,
const FormatOptions& options) const override;
Expand Down
Loading
Loading