diff --git a/src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function.cpp b/src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function.cpp new file mode 100644 index 0000000..45b5f16 --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function.cpp @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/compact/aggregate/aggregate_merge_function.h" + +#include +#include +#include +#include +#include + +#include "arrow/api.h" +#include "paimon/common/data/data_define.h" +#include "paimon/common/types/row_kind.h" +#include "paimon/common/utils/internal_row_utils.h" +#include "paimon/core/core_options.h" +#include "paimon/core/mergetree/compact/aggregate/field_aggregator_factory.h" +#include "paimon/core/mergetree/compact/aggregate/field_last_non_null_value_agg.h" +#include "paimon/core/mergetree/compact/aggregate/field_last_value_agg.h" +#include "paimon/core/mergetree/compact/aggregate/field_primary_key_agg.h" + +namespace paimon { +Result> AggregateMergeFunction::Create( + const std::shared_ptr& value_schema, + const std::vector& primary_keys, const CoreOptions& options) { + std::vector> aggregators; + aggregators.reserve(value_schema->num_fields()); + for (int32_t i = 0; i < value_schema->num_fields(); i++) { + const auto& field_name = value_schema->field(i)->name(); + const auto& field_type = value_schema->field(i)->type(); + PAIMON_ASSIGN_OR_RAISE(std::string str_agg, + GetAggFuncName(field_name, primary_keys, options)); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr agg, + FieldAggregatorFactory::CreateFieldAggregator(field_name, field_type, + str_agg, options)); + aggregators.push_back(std::move(agg)); + } + + bool remove_record_on_delete = options.AggregationRemoveRecordOnDelete(); + + PAIMON_ASSIGN_OR_RAISE(std::vector getters, + InternalRowUtils::CreateFieldGetters(value_schema, /*use_view=*/true)); + return std::unique_ptr(new AggregateMergeFunction( + std::move(getters), std::move(aggregators), remove_record_on_delete)); +} + +Status AggregateMergeFunction::Add(KeyValue&& kv) { + // When removeRecordOnDelete is enabled, if we receive a DELETE row, + // mark the current row for deletion and initialize the row with input values. + if (remove_record_on_delete_ && kv.value_kind == RowKind::Delete()) { + current_delete_row_ = true; + row_ = std::make_unique(getters_.size()); + for (size_t i = 0; i < getters_.size(); i++) { + row_->SetField(i, getters_[i](*(kv.value))); + } + row_->AddDataHolder(std::move(kv.value)); + latest_kv_ = std::move(kv); + return Status::OK(); + } + + current_delete_row_ = false; + bool is_retract = kv.value_kind->IsRetract(); + for (size_t i = 0; i < getters_.size(); i++) { + auto accumulator = getters_[i](*row_); + auto input_field = getters_[i](*(kv.value)); + VariantType merged_field; + if (is_retract) { + PAIMON_ASSIGN_OR_RAISE(merged_field, + aggregators_[i]->Retract(accumulator, input_field)); + } else { + merged_field = aggregators_[i]->Agg(accumulator, input_field); + } + row_->SetField(i, merged_field); + } + row_->AddDataHolder(std::move(kv.value)); + latest_kv_ = std::move(kv); + return Status::OK(); +} + +Result> AggregateMergeFunction::GetResult() { + assert(latest_kv_); + latest_kv_.value().value = std::move(row_); + latest_kv_.value().value_kind = current_delete_row_ ? RowKind::Delete() : RowKind::Insert(); + latest_kv_.value().level = KeyValue::UNKNOWN_LEVEL; + return std::move(latest_kv_); +} + +Result AggregateMergeFunction::GetAggFuncName( + const std::string& field_name, const std::vector& primary_keys, + const CoreOptions& options) { + const auto& seq_fields = options.GetSequenceField(); + auto seq_iter = std::find(seq_fields.begin(), seq_fields.end(), field_name); + if (seq_iter != seq_fields.end()) { + // no agg for sequence fields, use last_value to do cover + return std::string(FieldLastValueAgg::NAME); + } + + auto pk_iter = std::find(primary_keys.begin(), primary_keys.end(), field_name); + if (pk_iter != primary_keys.end()) { + return std::string(FieldPrimaryKeyAgg::NAME); + } + + PAIMON_ASSIGN_OR_RAISE(std::optional str_agg, options.GetFieldAggFunc(field_name)); + if (str_agg == std::nullopt) { + str_agg = options.GetFieldsDefaultFunc(); + } + if (!str_agg) { + str_agg = std::string(FieldLastNonNullValueAgg::NAME); + } + return str_agg.value(); +} +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function.h b/src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function.h new file mode 100644 index 0000000..243ce6f --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function.h @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include +#include +#include +#include + +#include "arrow/api.h" +#include "paimon/common/data/data_define.h" +#include "paimon/common/data/generic_row.h" +#include "paimon/common/data/internal_row.h" +#include "paimon/core/core_options.h" +#include "paimon/core/key_value.h" +#include "paimon/core/mergetree/compact/aggregate/field_aggregator.h" +#include "paimon/core/mergetree/compact/merge_function.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace arrow { +class Schema; +} // namespace arrow + +namespace paimon { +class CoreOptions; + +/// A `MergeFunction` where key is primary key (unique) and value is the partial record, +/// pre-aggregate non-null fields on merge. +class AggregateMergeFunction : public MergeFunction { + public: + // value_schema is the schema of parameter value in KeyValue object + static Result> Create( + const std::shared_ptr& value_schema, + const std::vector& primary_keys, const CoreOptions& options); + + void Reset() override { + latest_kv_ = std::nullopt; + current_delete_row_ = false; + row_ = std::make_unique(getters_.size()); + for (const auto& agg : aggregators_) { + agg->Reset(); + } + } + + Status Add(KeyValue&& kv) override; + + Result> GetResult() override; + + private: + AggregateMergeFunction(std::vector&& getters, + std::vector>&& aggregators, + bool remove_record_on_delete) + : getters_(std::move(getters)), + aggregators_(std::move(aggregators)), + remove_record_on_delete_(remove_record_on_delete), + row_(std::make_unique(getters_.size())) { + assert(getters_.size() == aggregators_.size()); + } + static Result GetAggFuncName(const std::string& field_name, + const std::vector& primary_keys, + const CoreOptions& options); + + private: + std::vector getters_; + std::vector> aggregators_; + bool remove_record_on_delete_; + bool current_delete_row_ = false; + std::optional latest_kv_; + std::unique_ptr row_; +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function_test.cpp b/src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function_test.cpp new file mode 100644 index 0000000..30c3240 --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function_test.cpp @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/compact/aggregate/aggregate_merge_function.h" + +#include +#include + +#include "arrow/api.h" +#include "gtest/gtest.h" +#include "paimon/common/data/data_define.h" +#include "paimon/common/types/row_kind.h" +#include "paimon/core/core_options.h" +#include "paimon/core/mergetree/compact/aggregate/field_last_non_null_value_agg.h" +#include "paimon/core/mergetree/compact/aggregate/field_last_value_agg.h" +#include "paimon/core/mergetree/compact/aggregate/field_min_agg.h" +#include "paimon/core/mergetree/compact/aggregate/field_primary_key_agg.h" +#include "paimon/core/mergetree/compact/aggregate/field_sum_agg.h" +#include "paimon/defs.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/binary_row_generator.h" +#include "paimon/testing/utils/key_value_checker.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +TEST(AggregateMergeFunctionTest, TestGetAggFuncName) { + { + // test with specified agg + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::FIELDS_DEFAULT_AGG_FUNC, "sum"}, + {"fields.f0.aggregate-function", "min"}})); + ASSERT_OK_AND_ASSIGN(std::string str_agg, AggregateMergeFunction::GetAggFuncName( + "f0", /*primary_keys=*/{"f1"}, options)); + ASSERT_EQ(FieldMinAgg::NAME, str_agg); + } + { + // test with default agg + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::FIELDS_DEFAULT_AGG_FUNC, "sum"}})); + ASSERT_OK_AND_ASSIGN(std::string str_agg, AggregateMergeFunction::GetAggFuncName( + "f0", /*primary_keys=*/{"f1"}, options)); + ASSERT_EQ(FieldSumAgg::NAME, str_agg); + } + { + // test no agg configuration + ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({})); + ASSERT_OK_AND_ASSIGN(std::string str_agg, AggregateMergeFunction::GetAggFuncName( + "f0", /*primary_keys=*/{"f1"}, options)); + ASSERT_EQ(FieldLastNonNullValueAgg::NAME, str_agg); + } + { + // test primary key + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::FIELDS_DEFAULT_AGG_FUNC, "sum"}})); + ASSERT_OK_AND_ASSIGN(std::string str_agg, AggregateMergeFunction::GetAggFuncName( + "f0", /*primary_keys=*/{"f0"}, options)); + ASSERT_EQ(FieldPrimaryKeyAgg::NAME, str_agg); + } + { + // test sequence fields + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::SEQUENCE_FIELD, "f0"}})); + ASSERT_OK_AND_ASSIGN(std::string str_agg, AggregateMergeFunction::GetAggFuncName( + "f0", /*primary_keys=*/{"f1"}, options)); + ASSERT_EQ(FieldLastValueAgg::NAME, str_agg); + } +} +TEST(AggregateMergeFunctionTest, TestSimple) { + arrow::FieldVector fields = {arrow::field("k0", arrow::int32()), + arrow::field("v0", arrow::int32())}; + auto value_schema = arrow::schema(fields); + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, + CoreOptions::FromMap({{Options::FIELDS_DEFAULT_AGG_FUNC, "sum"}})); + ASSERT_OK_AND_ASSIGN( + std::unique_ptr merge_func, + AggregateMergeFunction::Create(value_schema, /*primary_keys=*/{"k0"}, core_options)); + + auto pool = GetDefaultPool(); + KeyValue kv1(RowKind::Insert(), /*sequence_number=*/0, /*level=*/0, /*key=*/ + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + /*value=*/BinaryRowGenerator::GenerateRowPtr({10, 100}, pool.get())); + KeyValue kv2(RowKind::Insert(), /*sequence_number=*/0, /*level=*/1, + /*key=*/BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + /*value=*/BinaryRowGenerator::GenerateRowPtr({10, 200}, pool.get())); + KeyValue kv3(RowKind::Delete(), /*sequence_number=*/0, /*level=*/2, /*key=*/ + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + /*value=*/BinaryRowGenerator::GenerateRowPtr({10, 300}, pool.get())); + ASSERT_OK(merge_func->Add(std::move(kv1))); + auto result_kv = std::move(merge_func->GetResult().value().value()); + KeyValue expected(RowKind::Insert(), /*sequence_number=*/0, + /*level=*/KeyValue::UNKNOWN_LEVEL, /*key=*/ + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + /*value=*/BinaryRowGenerator::GenerateRowPtr({10, 100}, pool.get())); + KeyValueChecker::CheckResult(expected, result_kv, /*key_arity=*/1, /*value_arity=*/2); + + merge_func->Reset(); + ASSERT_OK(merge_func->Add(std::move(kv2))); + ASSERT_OK(merge_func->Add(std::move(kv3))); + result_kv = std::move(merge_func->GetResult().value().value()); + KeyValue expected2(RowKind::Insert(), /*sequence_number=*/0, + /*level=*/KeyValue::UNKNOWN_LEVEL, /*key=*/ + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + /*value=*/BinaryRowGenerator::GenerateRowPtr({10, -100}, pool.get())); + KeyValueChecker::CheckResult(expected2, result_kv, /*key_arity=*/1, /*value_arity=*/2); +} + +TEST(AggregateMergeFunctionTest, TestIgnoreRetract) { + arrow::FieldVector fields = {arrow::field("k0", arrow::int32()), + arrow::field("v0", arrow::int32())}; + auto value_schema = arrow::schema(fields); + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, + CoreOptions::FromMap({{Options::FIELDS_DEFAULT_AGG_FUNC, "sum"}, + {"fields.v0.ignore-retract", "true"}})); + ASSERT_OK_AND_ASSIGN( + std::unique_ptr merge_func, + AggregateMergeFunction::Create(value_schema, /*primary_keys=*/{"k0"}, core_options)); + + auto pool = GetDefaultPool(); + KeyValue kv1(RowKind::Insert(), /*sequence_number=*/0, /*level=*/0, /*key=*/ + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + /*value=*/BinaryRowGenerator::GenerateRowPtr({10, 100}, pool.get())); + KeyValue kv2(RowKind::Insert(), /*sequence_number=*/0, /*level=*/1, + /*key=*/BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + /*value=*/BinaryRowGenerator::GenerateRowPtr({10, 200}, pool.get())); + KeyValue kv3(RowKind::Delete(), /*sequence_number=*/1, /*level=*/2, /*key=*/ + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + /*value=*/BinaryRowGenerator::GenerateRowPtr({10, 300}, pool.get())); + ASSERT_OK(merge_func->Add(std::move(kv1))); + auto result_kv = std::move(merge_func->GetResult().value().value()); + KeyValue expected(RowKind::Insert(), /*sequence_number=*/0, + /*level=*/KeyValue::UNKNOWN_LEVEL, /*key=*/ + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + /*value=*/BinaryRowGenerator::GenerateRowPtr({10, 100}, pool.get())); + KeyValueChecker::CheckResult(expected, result_kv, /*key_arity=*/1, /*value_arity=*/2); + + merge_func->Reset(); + ASSERT_OK(merge_func->Add(std::move(kv2))); + ASSERT_OK(merge_func->Add(std::move(kv3))); + result_kv = std::move(merge_func->GetResult().value().value()); + KeyValue expected2(RowKind::Insert(), /*sequence_number=*/1, + /*level=*/KeyValue::UNKNOWN_LEVEL, /*key=*/ + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + /*value=*/BinaryRowGenerator::GenerateRowPtr({10, 200}, pool.get())); + KeyValueChecker::CheckResult(expected2, result_kv, /*key_arity=*/1, /*value_arity=*/2); +} + +TEST(AggregateMergeFunctionTest, TestSequenceFields) { + arrow::FieldVector fields = { + arrow::field("k0", arrow::int32()), arrow::field("s0", arrow::int32()), + arrow::field("s1", arrow::int32()), arrow::field("v0", arrow::int32())}; + auto value_schema = arrow::schema(fields); + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, + CoreOptions::FromMap({{Options::SEQUENCE_FIELD, "s0,s1"}, + {Options::FIELDS_DEFAULT_AGG_FUNC, "sum"}})); + ASSERT_OK_AND_ASSIGN( + std::unique_ptr merge_func, + AggregateMergeFunction::Create(value_schema, /*primary_keys=*/{"k0"}, core_options)); + auto pool = GetDefaultPool(); + // sequence: null, 2 + KeyValue kv1( + RowKind::Insert(), /*sequence_number=*/1, /*level=*/0, + /*key=*/BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + /*value=*/BinaryRowGenerator::GenerateRowPtr({10, NullType(), 2, 200}, pool.get())); + // sequence: 1, null + KeyValue kv2( + RowKind::Insert(), /*sequence_number=*/0, /*level=*/0, /*key=*/ + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + /*value=*/BinaryRowGenerator::GenerateRowPtr({10, 1, NullType(), 100}, pool.get())); + merge_func->Reset(); + ASSERT_OK(merge_func->Add(std::move(kv1))); + ASSERT_OK(merge_func->Add(std::move(kv2))); + KeyValue result_kv = std::move(merge_func->GetResult().value().value()); + // expect sequence: 1, null + KeyValue expected( + RowKind::Insert(), /*sequence_number=*/0, /*level=*/KeyValue::UNKNOWN_LEVEL, /*key=*/ + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + /*value=*/BinaryRowGenerator::GenerateRowPtr({10, 1, NullType(), 300}, pool.get())); + KeyValueChecker::CheckResult(expected, result_kv, /*key_arity=*/1, /*value_arity=*/4); +} + +TEST(AggregateMergeFunctionTest, TestRemoveRecordOnDelete) { + arrow::FieldVector fields = {arrow::field("k0", arrow::int32()), + arrow::field("v0", arrow::int32())}; + auto value_schema = arrow::schema(fields); + ASSERT_OK_AND_ASSIGN( + CoreOptions core_options, + CoreOptions::FromMap({{Options::FIELDS_DEFAULT_AGG_FUNC, "sum"}, + {Options::AGGREGATION_REMOVE_RECORD_ON_DELETE, "true"}})); + ASSERT_OK_AND_ASSIGN( + std::unique_ptr merge_func, + AggregateMergeFunction::Create(value_schema, /*primary_keys=*/{"k0"}, core_options)); + + auto pool = GetDefaultPool(); + + // Case 1: INSERT + INSERT, then DELETE -> result should be RowKind::Delete + { + merge_func->Reset(); + KeyValue kv1(RowKind::Insert(), /*sequence_number=*/0, /*level=*/0, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, 100}, pool.get())); + KeyValue kv2(RowKind::Insert(), /*sequence_number=*/1, /*level=*/0, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, 200}, pool.get())); + KeyValue kv3(RowKind::Delete(), /*sequence_number=*/2, /*level=*/0, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, 300}, pool.get())); + ASSERT_OK(merge_func->Add(std::move(kv1))); + ASSERT_OK(merge_func->Add(std::move(kv2))); + ASSERT_OK(merge_func->Add(std::move(kv3))); + auto result_kv = std::move(merge_func->GetResult().value().value()); + // Should return DELETE row kind with the original values from the delete record + KeyValue expected(RowKind::Delete(), /*sequence_number=*/2, + /*level=*/KeyValue::UNKNOWN_LEVEL, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, 300}, pool.get())); + KeyValueChecker::CheckResult(expected, result_kv, /*key_arity=*/1, /*value_arity=*/2); + } + + // Case 2: Only INSERT rows, no DELETE -> result should be RowKind::Insert with aggregated + // values + { + merge_func->Reset(); + KeyValue kv1(RowKind::Insert(), /*sequence_number=*/0, /*level=*/0, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, 100}, pool.get())); + KeyValue kv2(RowKind::Insert(), /*sequence_number=*/1, /*level=*/0, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, 200}, pool.get())); + ASSERT_OK(merge_func->Add(std::move(kv1))); + ASSERT_OK(merge_func->Add(std::move(kv2))); + auto result_kv = std::move(merge_func->GetResult().value().value()); + // Should return INSERT with sum aggregation: 100 + 200 = 300 + KeyValue expected(RowKind::Insert(), /*sequence_number=*/1, + /*level=*/KeyValue::UNKNOWN_LEVEL, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, 300}, pool.get())); + KeyValueChecker::CheckResult(expected, result_kv, /*key_arity=*/1, /*value_arity=*/2); + } + + // Case 3: DELETE only -> result should be RowKind::Delete + { + merge_func->Reset(); + KeyValue kv1(RowKind::Delete(), /*sequence_number=*/0, /*level=*/0, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, 100}, pool.get())); + ASSERT_OK(merge_func->Add(std::move(kv1))); + auto result_kv = std::move(merge_func->GetResult().value().value()); + KeyValue expected(RowKind::Delete(), /*sequence_number=*/0, + /*level=*/KeyValue::UNKNOWN_LEVEL, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, 100}, pool.get())); + KeyValueChecker::CheckResult(expected, result_kv, /*key_arity=*/1, /*value_arity=*/2); + } + + // Case 4: INSERT + DELETE + INSERT -> DELETE resets row, then INSERT aggregates on top + { + merge_func->Reset(); + KeyValue kv1(RowKind::Insert(), /*sequence_number=*/0, /*level=*/0, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, 100}, pool.get())); + KeyValue kv2(RowKind::Delete(), /*sequence_number=*/1, /*level=*/0, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, 200}, pool.get())); + KeyValue kv3(RowKind::Insert(), /*sequence_number=*/2, /*level=*/0, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, 300}, pool.get())); + ASSERT_OK(merge_func->Add(std::move(kv1))); + ASSERT_OK(merge_func->Add(std::move(kv2))); + ASSERT_OK(merge_func->Add(std::move(kv3))); + auto result_kv = std::move(merge_func->GetResult().value().value()); + // DELETE resets row_ to {10, 200}, then INSERT aggregates: 200 + 300 = 500 + // current_delete_row_ is false because last record is INSERT + KeyValue expected(RowKind::Insert(), /*sequence_number=*/2, + /*level=*/KeyValue::UNKNOWN_LEVEL, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, 500}, pool.get())); + KeyValueChecker::CheckResult(expected, result_kv, /*key_arity=*/1, /*value_arity=*/2); + } +} + +TEST(AggregateMergeFunctionTest, TestDeleteWithoutRemoveRecordOnDelete) { + // Without removeRecordOnDelete, DELETE row should be treated as retract (subtract) + arrow::FieldVector fields = {arrow::field("k0", arrow::int32()), + arrow::field("v0", arrow::int32())}; + auto value_schema = arrow::schema(fields); + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, + CoreOptions::FromMap({{Options::FIELDS_DEFAULT_AGG_FUNC, "sum"}})); + ASSERT_OK_AND_ASSIGN( + std::unique_ptr merge_func, + AggregateMergeFunction::Create(value_schema, /*primary_keys=*/{"k0"}, core_options)); + + auto pool = GetDefaultPool(); + merge_func->Reset(); + KeyValue kv1(RowKind::Insert(), /*sequence_number=*/0, /*level=*/0, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, 200}, pool.get())); + KeyValue kv2(RowKind::Delete(), /*sequence_number=*/1, /*level=*/0, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, 300}, pool.get())); + ASSERT_OK(merge_func->Add(std::move(kv1))); + ASSERT_OK(merge_func->Add(std::move(kv2))); + auto result_kv = std::move(merge_func->GetResult().value().value()); + // Without removeRecordOnDelete, DELETE is retract: 200 - 300 = -100, result is INSERT + KeyValue expected(RowKind::Insert(), /*sequence_number=*/1, + /*level=*/KeyValue::UNKNOWN_LEVEL, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, -100}, pool.get())); + KeyValueChecker::CheckResult(expected, result_kv, /*key_arity=*/1, /*value_arity=*/2); +} + +} // namespace paimon::test diff --git a/src/paimon/core/mergetree/compact/aggregate/field_aggregator.h b/src/paimon/core/mergetree/compact/aggregate/field_aggregator.h new file mode 100644 index 0000000..d151eec --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_aggregator.h @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "arrow/api.h" +#include "fmt/format.h" +#include "paimon/common/data/data_define.h" +#include "paimon/result.h" + +namespace paimon { +/// abstract class of aggregating a field of a row. +class FieldAggregator { + public: + virtual ~FieldAggregator() = default; + + FieldAggregator(const std::string& name, const std::shared_ptr& field_type) + : name_(name), field_type_(field_type) {} + + virtual VariantType Agg(const VariantType& accumulator, const VariantType& input_field) = 0; + + /// reset the aggregator to a clean start state. + virtual void Reset() {} + + virtual VariantType AggReversed(const VariantType& accumulator, + const VariantType& input_field) { + return Agg(input_field, accumulator); + } + + virtual Result Retract(const VariantType& accumulator, + const VariantType& input_field) const { + return Status::Invalid(fmt::format( + "Aggregate function {} does not support retraction, if you allow this function to " + "ignore retraction messages, you can configure fields.field_name.ignore-retract=true.", + name_)); + } + + const std::string& GetName() const { + return name_; + } + std::shared_ptr GetFieldType() const { + return field_type_; + } + + protected: + std::string name_; + std::shared_ptr field_type_; +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/aggregate/field_aggregator_factory.h b/src/paimon/core/mergetree/compact/aggregate/field_aggregator_factory.h new file mode 100644 index 0000000..3519bfc --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_aggregator_factory.h @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include + +#include "fmt/format.h" +#include "paimon/core/core_options.h" +#include "paimon/core/mergetree/compact/aggregate/field_aggregator.h" +#include "paimon/core/mergetree/compact/aggregate/field_bool_and_agg.h" +#include "paimon/core/mergetree/compact/aggregate/field_bool_or_agg.h" +#include "paimon/core/mergetree/compact/aggregate/field_first_non_null_value_agg.h" +#include "paimon/core/mergetree/compact/aggregate/field_first_value_agg.h" +#include "paimon/core/mergetree/compact/aggregate/field_ignore_retract_agg.h" +#include "paimon/core/mergetree/compact/aggregate/field_last_non_null_value_agg.h" +#include "paimon/core/mergetree/compact/aggregate/field_last_value_agg.h" +#include "paimon/core/mergetree/compact/aggregate/field_listagg_agg.h" +#include "paimon/core/mergetree/compact/aggregate/field_max_agg.h" +#include "paimon/core/mergetree/compact/aggregate/field_min_agg.h" +#include "paimon/core/mergetree/compact/aggregate/field_primary_key_agg.h" +#include "paimon/core/mergetree/compact/aggregate/field_sum_agg.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace arrow { +class DataType; +} // namespace arrow + +namespace paimon { +/// %Factory for `FieldAggregator`. +class FieldAggregatorFactory { + public: + FieldAggregatorFactory() = delete; + ~FieldAggregatorFactory() = delete; + + static Result> CreateFieldAggregator( + const std::string& field_name, const std::shared_ptr& field_type, + const std::string& str_agg, const CoreOptions& options) { + std::unique_ptr field_aggregator; + if (str_agg == FieldPrimaryKeyAgg::NAME) { + field_aggregator = std::make_unique(field_type); + } else if (str_agg == FieldLastNonNullValueAgg::NAME) { + field_aggregator = std::make_unique(field_type); + } else if (str_agg == FieldFirstNonNullValueAgg::NAME) { + field_aggregator = std::make_unique(field_type); + } else if (str_agg == FieldLastValueAgg::NAME) { + field_aggregator = std::make_unique(field_type); + } else if (str_agg == FieldFirstValueAgg::NAME) { + field_aggregator = std::make_unique(field_type); + } else if (str_agg == FieldSumAgg::NAME) { + PAIMON_ASSIGN_OR_RAISE(field_aggregator, FieldSumAgg::Create(field_type)); + } else if (str_agg == FieldMinAgg::NAME) { + PAIMON_ASSIGN_OR_RAISE(field_aggregator, FieldMinAgg::Create(field_type)); + } else if (str_agg == FieldMaxAgg::NAME) { + PAIMON_ASSIGN_OR_RAISE(field_aggregator, FieldMaxAgg::Create(field_type)); + } else if (str_agg == FieldBoolOrAgg::NAME) { + PAIMON_ASSIGN_OR_RAISE(field_aggregator, FieldBoolOrAgg::Create(field_type)); + } else if (str_agg == FieldBoolAndAgg::NAME) { + PAIMON_ASSIGN_OR_RAISE(field_aggregator, FieldBoolAndAgg::Create(field_type)); + } else if (str_agg == FieldListaggAgg::NAME) { + PAIMON_ASSIGN_OR_RAISE(field_aggregator, + FieldListaggAgg::Create(field_type, options, field_name)); + } else { + return Status::Invalid(fmt::format( + "Use unsupported aggregation {} or spell aggregate function incorrectly!", + str_agg)); + } + bool remove_record_on_retract = options.AggregationRemoveRecordOnDelete(); + PAIMON_ASSIGN_OR_RAISE(bool ignore_retract, options.FieldAggIgnoreRetract(field_name)); + if (remove_record_on_retract && ignore_retract) { + return Status::Invalid(fmt::format( + "{} and {}.{}.{} have conflicting behavior so should not be enabled at the same " + "time.", + Options::AGGREGATION_REMOVE_RECORD_ON_DELETE, Options::FIELDS_PREFIX, field_name, + Options::IGNORE_RETRACT)); + } + if (ignore_retract) { + field_aggregator = std::make_unique(std::move(field_aggregator)); + } + return field_aggregator; + } +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/aggregate/field_aggregator_factory_test.cpp b/src/paimon/core/mergetree/compact/aggregate/field_aggregator_factory_test.cpp new file mode 100644 index 0000000..3a5de4a --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_aggregator_factory_test.cpp @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/compact/aggregate/field_aggregator_factory.h" + +#include + +#include "arrow/type_fwd.h" +#include "gtest/gtest.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +TEST(FieldAggregatorFactoryTest, TestSimple) { + { + ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({})); + ASSERT_OK_AND_ASSIGN(std::unique_ptr agg, + FieldAggregatorFactory::CreateFieldAggregator("f0", arrow::int32(), + "primary-key", options)); + ASSERT_TRUE(dynamic_cast(agg.get())); + } + { + ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({})); + ASSERT_OK_AND_ASSIGN( + std::unique_ptr agg, + FieldAggregatorFactory::CreateFieldAggregator("f0", arrow::int32(), "sum", options)); + ASSERT_TRUE(dynamic_cast(agg.get())); + } + { + ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({})); + ASSERT_OK_AND_ASSIGN( + std::unique_ptr agg, + FieldAggregatorFactory::CreateFieldAggregator("f0", arrow::int32(), "min", options)); + ASSERT_TRUE(dynamic_cast(agg.get())); + } + { + ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({})); + ASSERT_OK_AND_ASSIGN( + std::unique_ptr agg, + FieldAggregatorFactory::CreateFieldAggregator("f0", arrow::int32(), "max", options)); + ASSERT_TRUE(dynamic_cast(agg.get())); + } + { + ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({})); + ASSERT_OK_AND_ASSIGN(std::unique_ptr agg, + FieldAggregatorFactory::CreateFieldAggregator("f0", arrow::boolean(), + "bool_and", options)); + ASSERT_TRUE(dynamic_cast(agg.get())); + } + { + ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({})); + ASSERT_OK_AND_ASSIGN(std::unique_ptr agg, + FieldAggregatorFactory::CreateFieldAggregator("f0", arrow::boolean(), + "bool_or", options)); + ASSERT_TRUE(dynamic_cast(agg.get())); + } + { + ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({})); + ASSERT_OK_AND_ASSIGN(std::unique_ptr agg, + FieldAggregatorFactory::CreateFieldAggregator( + "f0", arrow::int32(), "last_non_null_value", options)); + ASSERT_TRUE(dynamic_cast(agg.get())); + } + { + ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({})); + ASSERT_OK_AND_ASSIGN(std::unique_ptr agg, + FieldAggregatorFactory::CreateFieldAggregator( + "f0", arrow::int32(), "first_non_null_value", options)); + ASSERT_TRUE(dynamic_cast(agg.get())); + } + { + ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({})); + ASSERT_OK_AND_ASSIGN(std::unique_ptr agg, + FieldAggregatorFactory::CreateFieldAggregator("f0", arrow::int32(), + "last_value", options)); + ASSERT_TRUE(dynamic_cast(agg.get())); + } + { + ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({})); + ASSERT_OK_AND_ASSIGN(std::unique_ptr agg, + FieldAggregatorFactory::CreateFieldAggregator("f0", arrow::int32(), + "first_value", options)); + ASSERT_TRUE(dynamic_cast(agg.get())); + } + { + // test ignore_retract is true + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{"fields.f0.ignore-retract", "true"}})); + ASSERT_OK_AND_ASSIGN( + std::unique_ptr agg, + FieldAggregatorFactory::CreateFieldAggregator("f0", arrow::int32(), "sum", options)); + auto ignore_retract_agg = dynamic_cast(agg.get()); + ASSERT_TRUE(ignore_retract_agg); + ASSERT_TRUE(dynamic_cast(ignore_retract_agg->agg_.get())); + } + { + // test non exist agg + ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({})); + auto agg = FieldAggregatorFactory::CreateFieldAggregator("f0", arrow::int32(), + "non-exist-agg", options); + ASSERT_FALSE(agg.ok()); + } +} + +TEST(FieldAggregatorFactoryTest, TestRemoveRecordOnDeleteConflictsWithIgnoreRetract) { + ASSERT_OK_AND_ASSIGN( + CoreOptions options, + CoreOptions::FromMap({{Options::AGGREGATION_REMOVE_RECORD_ON_DELETE, "true"}, + {"fields.f0.ignore-retract", "true"}})); + ASSERT_NOK_WITH_MSG( + FieldAggregatorFactory::CreateFieldAggregator("f0", arrow::int32(), "sum", options), + "conflicting behavior"); +} + +} // namespace paimon::test diff --git a/src/paimon/core/mergetree/compact/aggregate/field_bool_agg_test.cpp b/src/paimon/core/mergetree/compact/aggregate/field_bool_agg_test.cpp new file mode 100644 index 0000000..eede288 --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_bool_agg_test.cpp @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "arrow/type_fwd.h" +#include "gtest/gtest.h" +#include "paimon/common/data/data_define.h" +#include "paimon/core/mergetree/compact/aggregate/field_bool_and_agg.h" +#include "paimon/core/mergetree/compact/aggregate/field_bool_or_agg.h" +#include "paimon/result.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +TEST(FieldBoolAndAggTest, TestSimple) { + ASSERT_OK_AND_ASSIGN(auto agg, FieldBoolAndAgg::Create(arrow::boolean())); + auto agg_ret = agg->Agg(true, true); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), true); + agg_ret = agg->Agg(true, false); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), false); + agg_ret = agg->Agg(false, true); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), false); + agg_ret = agg->Agg(false, false); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), false); + + auto retract_ret = agg->Retract(false, false); + ASSERT_FALSE(retract_ret.ok()); +} + +TEST(FieldBoolAndAggTest, TestInvalidType) { + auto agg = FieldBoolAndAgg::Create(arrow::utf8()); + ASSERT_FALSE(agg.ok()); +} + +TEST(FieldBoolAndAggTest, TestNull) { + ASSERT_OK_AND_ASSIGN(auto agg, FieldBoolAndAgg::Create(arrow::boolean())); + { + auto agg_ret = agg->Agg(true, NullType()); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), true); + } + { + auto agg_ret = agg->Agg(NullType(), true); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), true); + } + { + auto agg_ret = agg->Agg(NullType(), NullType()); + ASSERT_TRUE(DataDefine::IsVariantNull(agg_ret)); + } +} + +TEST(FieldBoolOrAggTest, TestSimple) { + ASSERT_OK_AND_ASSIGN(auto agg, FieldBoolOrAgg::Create(arrow::boolean())); + auto agg_ret = agg->Agg(true, true); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), true); + agg_ret = agg->Agg(true, false); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), true); + agg_ret = agg->Agg(false, true); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), true); + agg_ret = agg->Agg(false, false); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), false); + + auto retract_ret = agg->Retract(false, false); + ASSERT_FALSE(retract_ret.ok()); +} + +TEST(FieldBoolOrAggTest, TestInvalidType) { + auto agg = FieldBoolOrAgg::Create(arrow::utf8()); + ASSERT_FALSE(agg.ok()); +} + +TEST(FieldBoolOrAggTest, TestNull) { + ASSERT_OK_AND_ASSIGN(auto agg, FieldBoolOrAgg::Create(arrow::boolean())); + { + auto agg_ret = agg->Agg(true, NullType()); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), true); + } + { + auto agg_ret = agg->Agg(NullType(), true); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), true); + } + { + auto agg_ret = agg->Agg(NullType(), NullType()); + ASSERT_TRUE(DataDefine::IsVariantNull(agg_ret)); + } +} + +} // namespace paimon::test diff --git a/src/paimon/core/mergetree/compact/aggregate/field_bool_and_agg.h b/src/paimon/core/mergetree/compact/aggregate/field_bool_and_agg.h new file mode 100644 index 0000000..30e6596 --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_bool_and_agg.h @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/core/mergetree/compact/aggregate/field_aggregator.h" + +namespace paimon { +/// bool_and aggregate a field of a row. +class FieldBoolAndAgg : public FieldAggregator { + public: + static Result> Create( + const std::shared_ptr& field_type) { + if (field_type->id() != arrow::Type::type::BOOL) { + return Status::Invalid( + fmt::format("invalid field type {} for {}, supposed to be boolean", + field_type->ToString(), NAME)); + } + return std::unique_ptr(new FieldBoolAndAgg(field_type)); + } + + VariantType Agg(const VariantType& accumulator, const VariantType& input_field) override { + bool accumulator_null = DataDefine::IsVariantNull(accumulator); + bool input_null = DataDefine::IsVariantNull(input_field); + if (accumulator_null || input_null) { + return accumulator_null ? input_field : accumulator; + } + bool accumulator_value = DataDefine::GetVariantValue(accumulator); + bool input_value = DataDefine::GetVariantValue(input_field); + return accumulator_value && input_value; + } + + public: + static constexpr char NAME[] = "bool_and"; + + private: + explicit FieldBoolAndAgg(const std::shared_ptr& field_type) + : FieldAggregator(std::string(NAME), field_type) {} +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/aggregate/field_bool_or_agg.h b/src/paimon/core/mergetree/compact/aggregate/field_bool_or_agg.h new file mode 100644 index 0000000..da89fd5 --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_bool_or_agg.h @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/core/mergetree/compact/aggregate/field_aggregator.h" + +namespace paimon { +/// bool_or aggregate a field of a row. +class FieldBoolOrAgg : public FieldAggregator { + public: + static Result> Create( + const std::shared_ptr& field_type) { + if (field_type->id() != arrow::Type::type::BOOL) { + return Status::Invalid( + fmt::format("invalid field type {} for {}, supposed to be boolean", + field_type->ToString(), NAME)); + } + return std::unique_ptr(new FieldBoolOrAgg(field_type)); + } + + VariantType Agg(const VariantType& accumulator, const VariantType& input_field) override { + bool accumulator_null = DataDefine::IsVariantNull(accumulator); + bool input_null = DataDefine::IsVariantNull(input_field); + if (accumulator_null || input_null) { + return accumulator_null ? input_field : accumulator; + } + bool accumulator_value = DataDefine::GetVariantValue(accumulator); + bool input_value = DataDefine::GetVariantValue(input_field); + return accumulator_value || input_value; + } + + public: + static constexpr char NAME[] = "bool_or"; + + private: + explicit FieldBoolOrAgg(const std::shared_ptr& field_type) + : FieldAggregator(std::string(NAME), field_type) {} +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/aggregate/field_first_non_null_value_agg.h b/src/paimon/core/mergetree/compact/aggregate/field_first_non_null_value_agg.h new file mode 100644 index 0000000..19f12cd --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_first_non_null_value_agg.h @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/common/data/data_define.h" +#include "paimon/core/mergetree/compact/aggregate/field_aggregator.h" + +namespace arrow { +class DataType; +} // namespace arrow + +namespace paimon { +/// first non-null value aggregate a field of a row. +class FieldFirstNonNullValueAgg : public FieldAggregator { + public: + explicit FieldFirstNonNullValueAgg(const std::shared_ptr& field_type) + : FieldAggregator(std::string(NAME), field_type) {} + + VariantType Agg(const VariantType& accumulator, const VariantType& input_field) override { + if (!initialized_ && !DataDefine::IsVariantNull(input_field)) { + initialized_ = true; + return input_field; + } + return accumulator; + } + + void Reset() override { + initialized_ = false; + } + + public: + static constexpr char NAME[] = "first_non_null_value"; + + private: + bool initialized_ = false; +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/aggregate/field_first_non_null_value_agg_test.cpp b/src/paimon/core/mergetree/compact/aggregate/field_first_non_null_value_agg_test.cpp new file mode 100644 index 0000000..c77b5c2 --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_first_non_null_value_agg_test.cpp @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/compact/aggregate/field_first_non_null_value_agg.h" + +#include + +#include "arrow/type_fwd.h" +#include "gtest/gtest.h" +#include "paimon/result.h" + +namespace paimon::test { +TEST(FieldFirstNonNullValueAggTest, TestSimple) { + auto agg = std::make_unique(arrow::int32()); + + auto agg_ret = agg->Agg(5, 10); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 10); + agg_ret = agg->Agg(10, 20); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 10); + agg_ret = agg->Agg(10, 30); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 10); + + agg->Reset(); + agg_ret = agg->Agg(10, 30); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 30); + + auto retract_ret = agg->Retract(10, 30); + ASSERT_FALSE(retract_ret.ok()); +} + +TEST(FieldFirstNonNullValueAggTest, TestNull) { + auto agg = std::make_unique(arrow::int32()); + auto agg_ret = agg->Agg(5, NullType()); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 5); + agg_ret = agg->Agg(5, 10); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 10); + agg_ret = agg->Agg(10, NullType()); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 10); + agg_ret = agg->Agg(10, 20); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 10); + + agg->Reset(); + + agg_ret = agg->Agg(NullType(), NullType()); + ASSERT_TRUE(DataDefine::IsVariantNull(agg_ret)); + + agg->Reset(); + + agg_ret = agg->Agg(NullType(), 5); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 5); + agg_ret = agg->Agg(5, NullType()); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 5); +} + +} // namespace paimon::test diff --git a/src/paimon/core/mergetree/compact/aggregate/field_first_value_agg.h b/src/paimon/core/mergetree/compact/aggregate/field_first_value_agg.h new file mode 100644 index 0000000..0a08c06 --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_first_value_agg.h @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/common/data/data_define.h" +#include "paimon/core/mergetree/compact/aggregate/field_aggregator.h" + +namespace arrow { +class DataType; +} // namespace arrow + +namespace paimon { +/// first value aggregate a field of a row. +class FieldFirstValueAgg : public FieldAggregator { + public: + explicit FieldFirstValueAgg(const std::shared_ptr& field_type) + : FieldAggregator(std::string(NAME), field_type) {} + + VariantType Agg(const VariantType& accumulator, const VariantType& input_field) override { + if (!initialized_) { + initialized_ = true; + return input_field; + } + return accumulator; + } + + void Reset() override { + initialized_ = false; + } + + public: + static constexpr char NAME[] = "first_value"; + + private: + bool initialized_ = false; +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/aggregate/field_first_value_agg_test.cpp b/src/paimon/core/mergetree/compact/aggregate/field_first_value_agg_test.cpp new file mode 100644 index 0000000..997a1c2 --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_first_value_agg_test.cpp @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/compact/aggregate/field_first_value_agg.h" + +#include + +#include "arrow/type_fwd.h" +#include "gtest/gtest.h" +#include "paimon/result.h" + +namespace paimon::test { +TEST(FieldFirstValueAggTest, TestSimple) { + auto agg = std::make_unique(arrow::int32()); + + auto agg_ret = agg->Agg(5, 10); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 10); + agg_ret = agg->Agg(10, 20); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 10); + agg_ret = agg->Agg(10, 30); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 10); + + agg->Reset(); + agg_ret = agg->Agg(10, 30); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 30); + + auto retract_ret = agg->Retract(10, 30); + ASSERT_FALSE(retract_ret.ok()); +} + +TEST(FieldFirstValueAggTest, TestNull) { + auto agg = std::make_unique(arrow::int32()); + auto agg_ret = agg->Agg(5, NullType()); + ASSERT_TRUE(DataDefine::IsVariantNull(agg_ret)); + agg_ret = agg->Agg(NullType(), 10); + ASSERT_TRUE(DataDefine::IsVariantNull(agg_ret)); + + agg->Reset(); + + agg_ret = agg->Agg(NullType(), NullType()); + ASSERT_TRUE(DataDefine::IsVariantNull(agg_ret)); + + agg->Reset(); + + agg_ret = agg->Agg(NullType(), 5); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 5); + agg_ret = agg->Agg(5, NullType()); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 5); +} + +} // namespace paimon::test diff --git a/src/paimon/core/mergetree/compact/aggregate/field_ignore_retract_agg.h b/src/paimon/core/mergetree/compact/aggregate/field_ignore_retract_agg.h new file mode 100644 index 0000000..c24803b --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_ignore_retract_agg.h @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/common/data/data_define.h" +#include "paimon/core/mergetree/compact/aggregate/field_aggregator.h" +#include "paimon/result.h" + +namespace paimon { +/// An aggregator which ignores retraction messages. +class FieldIgnoreRetractAgg : public FieldAggregator { + public: + explicit FieldIgnoreRetractAgg(std::unique_ptr&& agg) + : FieldAggregator(agg->GetName(), agg->GetFieldType()), agg_(std::move(agg)) {} + + VariantType Agg(const VariantType& accumulator, const VariantType& input_field) override { + return agg_->Agg(accumulator, input_field); + } + + Result Retract(const VariantType& accumulator, + const VariantType& input_field) const override { + return accumulator; + } + + void Reset() override { + agg_->Reset(); + } + + private: + std::unique_ptr agg_; +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/aggregate/field_ignore_retract_agg_test.cpp b/src/paimon/core/mergetree/compact/aggregate/field_ignore_retract_agg_test.cpp new file mode 100644 index 0000000..e0efa4e --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_ignore_retract_agg_test.cpp @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/compact/aggregate/field_ignore_retract_agg.h" + +#include + +#include "arrow/type_fwd.h" +#include "gtest/gtest.h" +#include "paimon/core/mergetree/compact/aggregate/field_sum_agg.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +TEST(FieldIgnoreRetractAggTest, TestSimple) { + ASSERT_OK_AND_ASSIGN(auto field_sum_agg, FieldSumAgg::Create(arrow::int32())); + auto agg = std::make_unique(std::move(field_sum_agg)); + + auto agg_ret = agg->Agg(5, 10); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 15); + + ASSERT_OK_AND_ASSIGN(auto retract_ret, agg->Retract(5, 10)); + ASSERT_EQ(DataDefine::GetVariantValue(retract_ret), 5); +} +TEST(FieldIgnoreRetractAggTest, TestNull) { + ASSERT_OK_AND_ASSIGN(auto field_sum_agg, FieldSumAgg::Create(arrow::int32())); + auto agg = std::make_unique(std::move(field_sum_agg)); + { + auto agg_ret = agg->Agg(5, NullType()); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 5); + } + { + auto agg_ret = agg->Agg(NullType(), 10); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 10); + } + { + auto agg_ret = agg->Agg(NullType(), NullType()); + ASSERT_TRUE(DataDefine::IsVariantNull(agg_ret)); + } + { + ASSERT_OK_AND_ASSIGN(auto retract_ret, agg->Retract(5, NullType())); + ASSERT_EQ(DataDefine::GetVariantValue(retract_ret), 5); + } + { + ASSERT_OK_AND_ASSIGN(auto retract_ret, agg->Retract(NullType(), 10)); + ASSERT_TRUE(DataDefine::IsVariantNull(retract_ret)); + } + { + ASSERT_OK_AND_ASSIGN(auto retract_ret, agg->Retract(NullType(), NullType())); + ASSERT_TRUE(DataDefine::IsVariantNull(retract_ret)); + } +} + +} // namespace paimon::test diff --git a/src/paimon/core/mergetree/compact/aggregate/field_last_non_null_value_agg.h b/src/paimon/core/mergetree/compact/aggregate/field_last_non_null_value_agg.h new file mode 100644 index 0000000..988c31f --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_last_non_null_value_agg.h @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/common/data/data_define.h" +#include "paimon/core/mergetree/compact/aggregate/field_aggregator.h" +#include "paimon/result.h" + +namespace arrow { +class DataType; +} // namespace arrow + +namespace paimon { +/// last non-null value aggregate a field of a row. +class FieldLastNonNullValueAgg : public FieldAggregator { + public: + explicit FieldLastNonNullValueAgg(const std::shared_ptr& field_type) + : FieldAggregator(std::string(NAME), field_type) {} + + VariantType Agg(const VariantType& accumulator, const VariantType& input_field) override { + return DataDefine::IsVariantNull(input_field) ? accumulator : input_field; + } + + Result Retract(const VariantType& accumulator, + const VariantType& input_field) const override { + return DataDefine::IsVariantNull(input_field) ? accumulator : NullType(); + } + + public: + static constexpr char NAME[] = "last_non_null_value"; +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/aggregate/field_last_non_null_value_agg_test.cpp b/src/paimon/core/mergetree/compact/aggregate/field_last_non_null_value_agg_test.cpp new file mode 100644 index 0000000..b417261 --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_last_non_null_value_agg_test.cpp @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/compact/aggregate/field_last_non_null_value_agg.h" + +#include + +#include "arrow/type_fwd.h" +#include "gtest/gtest.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +TEST(FieldLastNonNullValueAggTest, TestSimple) { + auto agg = std::make_unique(arrow::int32()); + + auto agg_ret = agg->Agg(5, 10); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 10); + + ASSERT_OK_AND_ASSIGN(auto retract_ret, agg->Retract(5, 10)); + ASSERT_TRUE(DataDefine::IsVariantNull(retract_ret)); +} +TEST(FieldLastNonNullValueAggTest, TestNull) { + auto agg = std::make_unique(arrow::int32()); + { + auto agg_ret = agg->Agg(5, NullType()); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 5); + } + { + auto agg_ret = agg->Agg(NullType(), 10); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 10); + } + { + auto agg_ret = agg->Agg(NullType(), NullType()); + ASSERT_TRUE(DataDefine::IsVariantNull(agg_ret)); + } + + { + ASSERT_OK_AND_ASSIGN(auto retract_ret, agg->Retract(5, NullType())); + ASSERT_EQ(DataDefine::GetVariantValue(retract_ret), 5); + } + { + ASSERT_OK_AND_ASSIGN(auto retract_ret, agg->Retract(NullType(), 10)); + ASSERT_TRUE(DataDefine::IsVariantNull(retract_ret)); + } + { + ASSERT_OK_AND_ASSIGN(auto retract_ret, agg->Retract(NullType(), NullType())); + ASSERT_TRUE(DataDefine::IsVariantNull(retract_ret)); + } +} + +} // namespace paimon::test diff --git a/src/paimon/core/mergetree/compact/aggregate/field_last_value_agg.h b/src/paimon/core/mergetree/compact/aggregate/field_last_value_agg.h new file mode 100644 index 0000000..ac35d03 --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_last_value_agg.h @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/common/data/data_define.h" +#include "paimon/core/mergetree/compact/aggregate/field_aggregator.h" +#include "paimon/result.h" + +namespace arrow { +class DataType; +} // namespace arrow + +namespace paimon { +/// last value aggregate a field of a row. +class FieldLastValueAgg : public FieldAggregator { + public: + explicit FieldLastValueAgg(const std::shared_ptr& field_type) + : FieldAggregator(std::string(NAME), field_type) {} + + VariantType Agg(const VariantType& accumulator, const VariantType& input_field) override { + return input_field; + } + + Result Retract(const VariantType& accumulator, + const VariantType& input_field) const override { + return VariantType(NullType()); + } + + public: + static constexpr char NAME[] = "last_value"; +}; + +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/aggregate/field_last_value_agg_test.cpp b/src/paimon/core/mergetree/compact/aggregate/field_last_value_agg_test.cpp new file mode 100644 index 0000000..b7eb2e1 --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_last_value_agg_test.cpp @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/compact/aggregate/field_last_value_agg.h" + +#include + +#include "arrow/type_fwd.h" +#include "gtest/gtest.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +TEST(FieldLastValueAggTest, TestSimple) { + auto agg = std::make_unique(arrow::int32()); + + auto agg_ret = agg->Agg(5, 10); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 10); + + ASSERT_OK_AND_ASSIGN(auto retract_ret, agg->Retract(5, 10)); + ASSERT_TRUE(DataDefine::IsVariantNull(retract_ret)); +} + +TEST(FieldLastValueAggTest, TestNull) { + auto agg = std::make_unique(arrow::int32()); + { + auto agg_ret = agg->Agg(5, NullType()); + ASSERT_TRUE(DataDefine::IsVariantNull(agg_ret)); + } + { + auto agg_ret = agg->Agg(NullType(), 10); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 10); + } + { + auto agg_ret = agg->Agg(NullType(), NullType()); + ASSERT_TRUE(DataDefine::IsVariantNull(agg_ret)); + } + + { + ASSERT_OK_AND_ASSIGN(auto retract_ret, agg->Retract(5, NullType())); + ASSERT_TRUE(DataDefine::IsVariantNull(retract_ret)); + } + { + ASSERT_OK_AND_ASSIGN(auto retract_ret, agg->Retract(NullType(), 10)); + ASSERT_TRUE(DataDefine::IsVariantNull(retract_ret)); + } + { + ASSERT_OK_AND_ASSIGN(auto retract_ret, agg->Retract(NullType(), NullType())); + ASSERT_TRUE(DataDefine::IsVariantNull(retract_ret)); + } +} + +} // namespace paimon::test diff --git a/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h b/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h new file mode 100644 index 0000000..c8a2f5e --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/common/data/data_define.h" +#include "paimon/core/core_options.h" +#include "paimon/core/mergetree/compact/aggregate/field_aggregator.h" + +namespace arrow { +class DataType; +} // namespace arrow + +namespace paimon { +/// listagg aggregate a field of a row. +/// Concatenates string values with a delimiter. +class FieldListaggAgg : public FieldAggregator { + public: + static constexpr char NAME[] = "listagg"; + + static Result> Create( + const std::shared_ptr& field_type, const CoreOptions& options, + const std::string& field_name) { + if (field_type->id() != arrow::Type::type::STRING) { + return Status::Invalid( + fmt::format("invalid field type {} for field '{}' of {}, supposed to be string", + field_type->ToString(), field_name, NAME)); + } + PAIMON_ASSIGN_OR_RAISE(std::string delimiter, options.FieldListAggDelimiter(field_name)); + PAIMON_ASSIGN_OR_RAISE(bool distinct, options.FieldCollectAggDistinct(field_name)); + // When delimiter is empty and distinct is true, fall back to whitespace split. + if (distinct && delimiter.empty()) { + delimiter = " "; + } + return std::unique_ptr( + new FieldListaggAgg(field_type, std::move(delimiter), distinct)); + } + + VariantType Agg(const VariantType& accumulator, const VariantType& input_field) override { + bool accumulator_null = DataDefine::IsVariantNull(accumulator); + bool input_null = DataDefine::IsVariantNull(input_field); + if (accumulator_null || input_null) { + return accumulator_null ? input_field : accumulator; + } + std::string_view acc_str = DataDefine::GetStringView(accumulator); + std::string_view in_str = DataDefine::GetStringView(input_field); + if (in_str.empty()) { + return accumulator; + } + if (acc_str.empty()) { + return input_field; + } + + if (distinct_) { + result_ = AggDistinctImpl(acc_str, in_str); + } else { + // Build into a local string to avoid aliasing when acc_str points into result_ + std::string new_result; + new_result.reserve(acc_str.size() + delimiter_.size() + in_str.size()); + new_result.append(acc_str); + new_result.append(delimiter_); + new_result.append(in_str); + result_ = std::move(new_result); + } + return std::string_view{result_}; + } + + private: + std::string AggDistinctImpl(std::string_view acc_str, std::string_view in_str) const { + // Split accumulator tokens into a set for dedup + std::unordered_set seen; + std::string_view remaining = acc_str; + while (true) { + size_t pos = remaining.find(delimiter_); + std::string_view token = + (pos == std::string_view::npos) ? remaining : remaining.substr(0, pos); + if (!token.empty()) { + seen.insert(token); + } + if (pos == std::string_view::npos) { + break; + } + remaining = remaining.substr(pos + delimiter_.size()); + } + + // Start with the full accumulator, then append delimiter + new distinct tokens from input + std::string result; + result.reserve(acc_str.size() + in_str.size()); + result.append(acc_str); + remaining = in_str; + while (true) { + size_t pos = remaining.find(delimiter_); + std::string_view token = + (pos == std::string_view::npos) ? remaining : remaining.substr(0, pos); + if (!token.empty() && seen.insert(token).second) { + result.append(delimiter_); + result.append(token); + } + if (pos == std::string_view::npos) { + break; + } + remaining = remaining.substr(pos + delimiter_.size()); + } + return result; + } + + explicit FieldListaggAgg(const std::shared_ptr& field_type, + std::string delimiter, bool distinct) + : FieldAggregator(std::string(NAME), field_type), + delimiter_(std::move(delimiter)), + distinct_(distinct) {} + + std::string delimiter_; + bool distinct_; + std::string result_; +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg_test.cpp b/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg_test.cpp new file mode 100644 index 0000000..beb1aeb --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg_test.cpp @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/compact/aggregate/field_listagg_agg.h" + +#include +#include +#include + +#include "arrow/type_fwd.h" +#include "gtest/gtest.h" +#include "paimon/core/core_options.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +class FieldListaggAggTest : public testing::Test { + protected: + static Result> MakeAgg(const std::string& delimiter = ",", + bool distinct = false) { + std::map opts; + opts["fields.f.list-agg-delimiter"] = delimiter; + opts["fields.f.distinct"] = distinct ? "true" : "false"; + PAIMON_ASSIGN_OR_RAISE(auto options, CoreOptions::FromMap(opts)); + return FieldListaggAgg::Create(arrow::utf8(), std::move(options), "f"); + } +}; + +TEST_F(FieldListaggAggTest, TestSimple) { + ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg()); + auto ret = agg->Agg(std::string_view("hello"), std::string_view(" world")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "hello, world"); +} + +TEST_F(FieldListaggAggTest, TestDelimiter) { + ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg("-")); + auto ret = agg->Agg(std::string_view("user1"), std::string_view("user2")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "user1-user2"); +} + +TEST_F(FieldListaggAggTest, TestNull) { + ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg()); + + // input null -> return accumulator + { + auto ret = agg->Agg(std::string_view("hello"), NullType()); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "hello"); + } + // accumulator null -> return input + { + auto ret = agg->Agg(NullType(), std::string_view("world")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "world"); + } + // both null -> return null + { + auto ret = agg->Agg(NullType(), NullType()); + ASSERT_TRUE(DataDefine::IsVariantNull(ret)); + } +} + +TEST_F(FieldListaggAggTest, TestEmptyString) { + ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg()); + + // empty input -> return accumulator + { + auto ret = agg->Agg(std::string_view("hello"), std::string_view("")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "hello"); + } + // empty accumulator -> return input + { + auto ret = agg->Agg(std::string_view(""), std::string_view("world")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "world"); + } + // both empty -> return input (which is empty) + { + auto ret = agg->Agg(std::string_view(""), std::string_view("")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), ""); + } +} + +TEST_F(FieldListaggAggTest, TestMultipleAccumulation) { + ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg()); + + // "a" + "," + "b" = "a,b", then "a,b" + "," + "c" = "a,b,c" + auto ret = agg->Agg(std::string_view("a"), std::string_view("b")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "a,b"); + ret = agg->Agg(std::move(ret), std::string_view("c")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "a,b,c"); +} + +TEST_F(FieldListaggAggTest, TestDistinct) { + ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg(";", true)); + + // "a;b" + "b;c" -> "a;b;c" (deduplicate "b") + auto ret = agg->Agg(std::string_view("a;b"), std::string_view("b;c")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "a;b;c"); +} + +TEST_F(FieldListaggAggTest, TestDistinctNoDuplicates) { + ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg(" ", true)); + + // "a b" + "c d" -> "a b c d" (no dups to remove) + auto ret = agg->Agg(std::string_view("a b"), std::string_view("c d")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "a b c d"); +} + +TEST_F(FieldListaggAggTest, TestDistinctEmptyInput) { + ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg(";", true)); + + // empty input -> return accumulator + auto ret = agg->Agg(std::string_view("a;b"), std::string_view("")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "a;b"); +} + +TEST_F(FieldListaggAggTest, TestDistinctFalse) { + ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg(";", false)); + + // "a;b" + "b;c" -> "a;b;b;c" (no dedup) + auto ret = agg->Agg(std::string_view("a;b"), std::string_view("b;c")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "a;b;b;c"); +} + +TEST_F(FieldListaggAggTest, TestInvalidType) { + EXPECT_OK_AND_ASSIGN(auto options, CoreOptions::FromMap({})); + auto result = FieldListaggAgg::Create(arrow::int32(), options, "f"); + ASSERT_FALSE(result.ok()); + ASSERT_TRUE(result.status().ToString().find("supposed to be string") != std::string::npos) + << result.status().ToString(); +} + +} // namespace paimon::test diff --git a/src/paimon/core/mergetree/compact/aggregate/field_max_agg.h b/src/paimon/core/mergetree/compact/aggregate/field_max_agg.h new file mode 100644 index 0000000..e4c04dc --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_max_agg.h @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/core/mergetree/compact/aggregate/field_aggregator.h" + +namespace paimon { +/// max aggregate a field of a row. +class FieldMaxAgg : public FieldAggregator { + public: + static Result> Create( + const std::shared_ptr& field_type) { + PAIMON_ASSIGN_OR_RAISE(FieldMaxFunc max_func, CreateMaxFunc(field_type)); + return std::unique_ptr(new FieldMaxAgg(field_type, max_func)); + } + + VariantType Agg(const VariantType& accumulator, const VariantType& input_field) override { + bool accumulator_null = DataDefine::IsVariantNull(accumulator); + bool input_null = DataDefine::IsVariantNull(input_field); + if (accumulator_null || input_null) { + return accumulator_null ? input_field : accumulator; + } + return max_func_(accumulator, input_field); + } + + public: + static constexpr char NAME[] = "max"; + + private: + using FieldMaxFunc = + std::function; + + FieldMaxAgg(const std::shared_ptr& field_type, const FieldMaxFunc& max_func) + : FieldAggregator(std::string(NAME), field_type), max_func_(max_func) {} + + static Result CreateMaxFunc(const std::shared_ptr& field_type) { + arrow::Type::type type = field_type->id(); + switch (type) { + case arrow::Type::type::INT8: + case arrow::Type::type::INT16: + case arrow::Type::type::INT32: + case arrow::Type::type::DATE32: + case arrow::Type::type::INT64: + case arrow::Type::type::FLOAT: + case arrow::Type::type::DOUBLE: + case arrow::Type::type::TIMESTAMP: + case arrow::Type::type::DECIMAL: + case arrow::Type::type::STRING: + case arrow::Type::type::BINARY: + return FieldMaxFunc([](const VariantType& accumulator, + const VariantType& input_field) -> VariantType { + return accumulator < input_field ? input_field : accumulator; + }); + default: + return Status::Invalid( + fmt::format("type {} not support in FieldMaxAgg", field_type->ToString())); + } + } + + private: + FieldMaxFunc max_func_; +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/aggregate/field_min_agg.h b/src/paimon/core/mergetree/compact/aggregate/field_min_agg.h new file mode 100644 index 0000000..69e1073 --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_min_agg.h @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/core/mergetree/compact/aggregate/field_aggregator.h" + +namespace paimon { +/// min aggregate a field of a row. +class FieldMinAgg : public FieldAggregator { + public: + static Result> Create( + const std::shared_ptr& field_type) { + PAIMON_ASSIGN_OR_RAISE(FieldMinFunc min_func, CreateMinFunc(field_type)); + return std::unique_ptr(new FieldMinAgg(field_type, min_func)); + } + + VariantType Agg(const VariantType& accumulator, const VariantType& input_field) override { + bool accumulator_null = DataDefine::IsVariantNull(accumulator); + bool input_null = DataDefine::IsVariantNull(input_field); + if (accumulator_null || input_null) { + return accumulator_null ? input_field : accumulator; + } + return min_func_(accumulator, input_field); + } + + public: + static constexpr char NAME[] = "min"; + + private: + using FieldMinFunc = + std::function; + + FieldMinAgg(const std::shared_ptr& field_type, const FieldMinFunc& min_func) + : FieldAggregator(std::string(NAME), field_type), min_func_(min_func) {} + + static Result CreateMinFunc(const std::shared_ptr& field_type) { + arrow::Type::type type = field_type->id(); + switch (type) { + case arrow::Type::type::INT8: + case arrow::Type::type::INT16: + case arrow::Type::type::INT32: + case arrow::Type::type::DATE32: + case arrow::Type::type::INT64: + case arrow::Type::type::FLOAT: + case arrow::Type::type::DOUBLE: + case arrow::Type::type::TIMESTAMP: + case arrow::Type::type::DECIMAL: + case arrow::Type::type::STRING: + case arrow::Type::type::BINARY: + return FieldMinFunc([](const VariantType& accumulator, + const VariantType& input_field) -> VariantType { + return accumulator < input_field ? accumulator : input_field; + }); + default: + return Status::Invalid( + fmt::format("type {} not support in FieldMinAgg", field_type->ToString())); + } + } + + private: + FieldMinFunc min_func_; +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/aggregate/field_min_max_agg_test.cpp b/src/paimon/core/mergetree/compact/aggregate/field_min_max_agg_test.cpp new file mode 100644 index 0000000..0f79a52 --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_min_max_agg_test.cpp @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "arrow/type_fwd.h" +#include "gtest/gtest.h" +#include "paimon/common/data/data_define.h" +#include "paimon/common/utils/decimal_utils.h" +#include "paimon/core/mergetree/compact/aggregate/field_max_agg.h" +#include "paimon/core/mergetree/compact/aggregate/field_min_agg.h" +#include "paimon/data/decimal.h" +#include "paimon/data/timestamp.h" +#include "paimon/result.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace arrow { +class DataType; +} // namespace arrow + +namespace paimon::test { + +TEST(FieldMinMaxAggTest, TestSimple) { + { + ASSERT_OK_AND_ASSIGN(auto field_min_agg, FieldMinAgg::Create(arrow::int32())); + auto agg_ret = field_min_agg->Agg(5, 10); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 5); + } + { + ASSERT_OK_AND_ASSIGN(auto field_max_agg, FieldMaxAgg::Create(arrow::int32())); + auto agg_ret = field_max_agg->Agg(5, 10); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 10); + } +} + +TEST(FieldMinMaxAggTest, TestInvalidType) { + auto field_min_agg = FieldMinAgg::Create(arrow::boolean()); + ASSERT_FALSE(field_min_agg.ok()); + auto field_max_agg = FieldMaxAgg::Create(arrow::boolean()); + ASSERT_FALSE(field_max_agg.ok()); +} + +TEST(FieldMinMaxAggTest, TestNull) { + ASSERT_OK_AND_ASSIGN(auto field_min_agg, FieldMinAgg::Create(arrow::int32())); + ASSERT_OK_AND_ASSIGN(auto field_max_agg, FieldMaxAgg::Create(arrow::int32())); + { + auto agg_ret = field_min_agg->Agg(5, NullType()); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 5); + agg_ret = field_max_agg->Agg(5, NullType()); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 5); + } + { + auto agg_ret = field_min_agg->Agg(NullType(), 10); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 10); + agg_ret = field_max_agg->Agg(NullType(), 10); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 10); + } + { + auto agg_ret = field_min_agg->Agg(NullType(), NullType()); + ASSERT_TRUE(DataDefine::IsVariantNull(agg_ret)); + agg_ret = field_max_agg->Agg(NullType(), NullType()); + ASSERT_TRUE(DataDefine::IsVariantNull(agg_ret)); + } +} + +TEST(FieldMinMaxAggTest, TestVariantType) { + auto CheckResult = [](const std::shared_ptr& type, const VariantType& large, + const VariantType& small) { + ASSERT_OK_AND_ASSIGN(auto field_min_agg, FieldMinAgg::Create(type)); + ASSERT_OK_AND_ASSIGN(auto field_max_agg, FieldMaxAgg::Create(type)); + auto agg_ret = field_min_agg->Agg(small, large); + ASSERT_EQ(agg_ret, small); + agg_ret = field_min_agg->Agg(large, small); + ASSERT_EQ(agg_ret, small); + + agg_ret = field_max_agg->Agg(small, large); + ASSERT_EQ(agg_ret, large); + agg_ret = field_max_agg->Agg(large, small); + ASSERT_EQ(agg_ret, large); + }; + + CheckResult(arrow::int8(), static_cast(100), static_cast(15)); + CheckResult(arrow::int16(), static_cast(100), static_cast(15)); + CheckResult(arrow::int32(), static_cast(100), static_cast(15)); + CheckResult(arrow::date32(), static_cast(100), static_cast(15)); + CheckResult(arrow::int64(), static_cast(100), static_cast(15)); + CheckResult(arrow::float32(), static_cast(100.2), static_cast(15.1)); + CheckResult(arrow::float64(), 100.23, 15.11); + CheckResult(arrow::timestamp(arrow::TimeUnit::NANO), + Timestamp(/*millisecond=*/100, /*nano_of_millisecond=*/999), + Timestamp(/*millisecond=*/10, /*nano_of_millisecond=*/999)); + CheckResult(arrow::decimal128(30, 20), + Decimal(/*precision=*/30, /*scale=*/20, + DecimalUtils::StrToInt128("12345678998765432145678").value()), + Decimal(/*precision=*/30, /*scale=*/20, + DecimalUtils::StrToInt128("2345679987639475677478").value())); + std::string str1 = "bcd"; + std::string str2 = "abc"; + CheckResult(arrow::utf8(), std::string_view(str1.data(), str1.size()), + std::string_view(str2.data(), str2.size())); + CheckResult(arrow::binary(), std::string_view(str1.data(), str1.size()), + std::string_view(str2.data(), str2.size())); +} +} // namespace paimon::test diff --git a/src/paimon/core/mergetree/compact/aggregate/field_primary_key_agg.h b/src/paimon/core/mergetree/compact/aggregate/field_primary_key_agg.h new file mode 100644 index 0000000..4027cf2 --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_primary_key_agg.h @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/common/data/data_define.h" +#include "paimon/core/mergetree/compact/aggregate/field_aggregator.h" +#include "paimon/result.h" + +namespace arrow { +class DataType; +} // namespace arrow + +namespace paimon { +/// primary key aggregate a field of a row. +class FieldPrimaryKeyAgg : public FieldAggregator { + public: + explicit FieldPrimaryKeyAgg(const std::shared_ptr& field_type) + : FieldAggregator(std::string(NAME), field_type) {} + + VariantType Agg(const VariantType& accumulator, const VariantType& input_field) override { + return input_field; + } + + Result Retract(const VariantType& accumulator, + const VariantType& input_field) const override { + return input_field; + } + + public: + static constexpr char NAME[] = "primary-key"; +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/aggregate/field_primary_key_agg_test.cpp b/src/paimon/core/mergetree/compact/aggregate/field_primary_key_agg_test.cpp new file mode 100644 index 0000000..545bb25 --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_primary_key_agg_test.cpp @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/compact/aggregate/field_primary_key_agg.h" + +#include + +#include "arrow/type_fwd.h" +#include "gtest/gtest.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +// just for test, in practice, for primary key, accumulator will always equals to input_field +TEST(FieldPrimaryKeyAggTest, TestSimple) { + auto agg = std::make_unique(arrow::int32()); + + auto agg_ret = agg->Agg(5, 10); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 10); + + ASSERT_OK_AND_ASSIGN(auto retract_ret, agg->Retract(5, 10)); + ASSERT_EQ(DataDefine::GetVariantValue(retract_ret), 10); +} +TEST(FieldPrimaryKeyAggTest, TestNull) { + auto agg = std::make_unique(arrow::int32()); + { + auto agg_ret = agg->Agg(5, NullType()); + ASSERT_TRUE(DataDefine::IsVariantNull(agg_ret)); + } + { + auto agg_ret = agg->Agg(NullType(), 10); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 10); + } + { + auto agg_ret = agg->Agg(NullType(), NullType()); + ASSERT_TRUE(DataDefine::IsVariantNull(agg_ret)); + } + + { + ASSERT_OK_AND_ASSIGN(auto retract_ret, agg->Retract(5, NullType())); + ASSERT_TRUE(DataDefine::IsVariantNull(retract_ret)); + } + { + ASSERT_OK_AND_ASSIGN(auto retract_ret, agg->Retract(NullType(), 10)); + ASSERT_EQ(DataDefine::GetVariantValue(retract_ret), 10); + } + { + ASSERT_OK_AND_ASSIGN(auto retract_ret, agg->Retract(NullType(), NullType())); + ASSERT_TRUE(DataDefine::IsVariantNull(retract_ret)); + } +} + +} // namespace paimon::test diff --git a/src/paimon/core/mergetree/compact/aggregate/field_sum_agg.cpp b/src/paimon/core/mergetree/compact/aggregate/field_sum_agg.cpp new file mode 100644 index 0000000..021f020 --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_sum_agg.cpp @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/compact/aggregate/field_sum_agg.h" + +#include +#include + +#include "arrow/type.h" +#include "fmt/format.h" +#include "paimon/data/decimal.h" +#include "paimon/status.h" + +namespace paimon { +Result FieldSumAgg::CreateSumFunc( + const std::shared_ptr& field_type) { + arrow::Type::type type = field_type->id(); + switch (type) { + case arrow::Type::type::INT8: + return FieldSumFunc( + [](const VariantType& accumulator, const VariantType& input_field) -> VariantType { + char sum = DataDefine::GetVariantValue(accumulator) + + DataDefine::GetVariantValue(input_field); + return sum; + }); + case arrow::Type::type::INT16: + return FieldSumFunc( + [](const VariantType& accumulator, const VariantType& input_field) -> VariantType { + int16_t sum = DataDefine::GetVariantValue(accumulator) + + DataDefine::GetVariantValue(input_field); + return sum; + }); + case arrow::Type::type::INT32: + return FieldSumFunc( + [](const VariantType& accumulator, const VariantType& input_field) -> VariantType { + int32_t sum = DataDefine::GetVariantValue(accumulator) + + DataDefine::GetVariantValue(input_field); + return sum; + }); + case arrow::Type::type::INT64: + return FieldSumFunc( + [](const VariantType& accumulator, const VariantType& input_field) -> VariantType { + int64_t sum = DataDefine::GetVariantValue(accumulator) + + DataDefine::GetVariantValue(input_field); + return sum; + }); + case arrow::Type::type::FLOAT: + return FieldSumFunc( + [](const VariantType& accumulator, const VariantType& input_field) -> VariantType { + float sum = DataDefine::GetVariantValue(accumulator) + + DataDefine::GetVariantValue(input_field); + return sum; + }); + case arrow::Type::type::DOUBLE: + return FieldSumFunc( + [](const VariantType& accumulator, const VariantType& input_field) -> VariantType { + double sum = DataDefine::GetVariantValue(accumulator) + + DataDefine::GetVariantValue(input_field); + return sum; + }); + case arrow::Type::type::DECIMAL: { + return FieldSumFunc( + [](const VariantType& accumulator, const VariantType& input_field) -> VariantType { + auto v1 = DataDefine::GetVariantValue(accumulator); + auto v2 = DataDefine::GetVariantValue(input_field); + assert(v1.Precision() == v2.Precision() && v1.Scale() == v2.Scale()); + return Decimal(v1.Precision(), v1.Scale(), v1.Value() + v2.Value()); + }); + } + default: + return Status::Invalid( + fmt::format("type {} not support in FieldSumAgg", field_type->ToString())); + } +} + +Result FieldSumAgg::CreateNegFunc( + const std::shared_ptr& field_type) { + arrow::Type::type type = field_type->id(); + switch (type) { + case arrow::Type::type::INT8: + return FieldNegFunc([](const VariantType& input_field) -> VariantType { + char value = DataDefine::GetVariantValue(input_field); + return static_cast(-value); + }); + case arrow::Type::type::INT16: + return FieldNegFunc([](const VariantType& input_field) -> VariantType { + auto value = DataDefine::GetVariantValue(input_field); + return static_cast(-value); + }); + case arrow::Type::type::INT32: + return FieldNegFunc([](const VariantType& input_field) -> VariantType { + auto value = DataDefine::GetVariantValue(input_field); + return (-value); + }); + case arrow::Type::type::INT64: + return FieldNegFunc([](const VariantType& input_field) -> VariantType { + auto value = DataDefine::GetVariantValue(input_field); + return (-value); + }); + case arrow::Type::type::FLOAT: + return FieldNegFunc([](const VariantType& input_field) -> VariantType { + auto value = DataDefine::GetVariantValue(input_field); + return (-value); + }); + case arrow::Type::type::DOUBLE: + return FieldNegFunc([](const VariantType& input_field) -> VariantType { + auto value = DataDefine::GetVariantValue(input_field); + return (-value); + }); + case arrow::Type::type::DECIMAL: { + return FieldNegFunc([](const VariantType& input_field) -> VariantType { + auto value = DataDefine::GetVariantValue(input_field); + return Decimal(value.Precision(), value.Scale(), -value.Value()); + }); + } + default: + return Status::Invalid( + fmt::format("type {} not support in FieldSumAgg", field_type->ToString())); + } +} + +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/aggregate/field_sum_agg.h b/src/paimon/core/mergetree/compact/aggregate/field_sum_agg.h new file mode 100644 index 0000000..b183532 --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_sum_agg.h @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "paimon/common/data/data_define.h" +#include "paimon/core/mergetree/compact/aggregate/field_aggregator.h" +#include "paimon/result.h" + +namespace arrow { +class DataType; +} // namespace arrow + +namespace paimon { +/// sum aggregate a field of a row. +class FieldSumAgg : public FieldAggregator { + public: + static Result> Create( + const std::shared_ptr& field_type) { + PAIMON_ASSIGN_OR_RAISE(FieldSumFunc sum_func, CreateSumFunc(field_type)); + PAIMON_ASSIGN_OR_RAISE(FieldNegFunc neg_func, CreateNegFunc(field_type)); + return std::unique_ptr(new FieldSumAgg(field_type, sum_func, neg_func)); + } + + VariantType Agg(const VariantType& accumulator, const VariantType& input_field) override { + bool accumulator_null = DataDefine::IsVariantNull(accumulator); + bool input_null = DataDefine::IsVariantNull(input_field); + if (accumulator_null || input_null) { + return accumulator_null ? input_field : accumulator; + } + return sum_func_(accumulator, input_field); + } + + Result Retract(const VariantType& accumulator, + const VariantType& input_field) const override { + bool accumulator_null = DataDefine::IsVariantNull(accumulator); + bool input_null = DataDefine::IsVariantNull(input_field); + if (!accumulator_null && !input_null) { + return sum_func_(accumulator, neg_func_(input_field)); + } + if (!accumulator_null) { + return accumulator; + } + if (!input_null) { + return neg_func_(input_field); + } + // accumulator and input_field are both null + return accumulator; + } + + public: + static constexpr char NAME[] = "sum"; + + private: + using FieldSumFunc = + std::function; + using FieldNegFunc = std::function; + + FieldSumAgg(const std::shared_ptr& field_type, const FieldSumFunc& sum_func, + const FieldNegFunc& neg_func) + : FieldAggregator(std::string(NAME), field_type), + sum_func_(sum_func), + neg_func_(neg_func) {} + + static Result CreateSumFunc(const std::shared_ptr& field_type); + + static Result CreateNegFunc(const std::shared_ptr& field_type); + + private: + FieldSumFunc sum_func_; + FieldNegFunc neg_func_; +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/aggregate/field_sum_agg_test.cpp b/src/paimon/core/mergetree/compact/aggregate/field_sum_agg_test.cpp new file mode 100644 index 0000000..85081da --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_sum_agg_test.cpp @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/compact/aggregate/field_sum_agg.h" + +#include + +#include "arrow/type_fwd.h" +#include "gtest/gtest.h" +#include "paimon/common/utils/decimal_utils.h" +#include "paimon/data/decimal.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +TEST(FieldSumAggTest, TestSimple) { + ASSERT_OK_AND_ASSIGN(std::unique_ptr field_sum_agg, + FieldSumAgg::Create(arrow::int32())); + auto agg_ret = field_sum_agg->Agg(5, 10); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 15); + + ASSERT_OK_AND_ASSIGN(auto retract_ret, field_sum_agg->Retract(5, 10)); + ASSERT_EQ(DataDefine::GetVariantValue(retract_ret), -5); +} +TEST(FieldSumAggTest, TestNull) { + ASSERT_OK_AND_ASSIGN(std::unique_ptr field_sum_agg, + FieldSumAgg::Create(arrow::int32())); + { + auto agg_ret = field_sum_agg->Agg(5, NullType()); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 5); + } + { + auto agg_ret = field_sum_agg->Agg(NullType(), 10); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 10); + } + { + auto agg_ret = field_sum_agg->Agg(NullType(), NullType()); + ASSERT_TRUE(DataDefine::IsVariantNull(agg_ret)); + } + + { + ASSERT_OK_AND_ASSIGN(auto retract_ret, field_sum_agg->Retract(5, NullType())); + ASSERT_EQ(DataDefine::GetVariantValue(retract_ret), 5); + } + { + ASSERT_OK_AND_ASSIGN(auto retract_ret, field_sum_agg->Retract(NullType(), 10)); + ASSERT_EQ(DataDefine::GetVariantValue(retract_ret), -10); + } + { + ASSERT_OK_AND_ASSIGN(auto retract_ret, field_sum_agg->Retract(NullType(), NullType())); + ASSERT_TRUE(DataDefine::IsVariantNull(retract_ret)); + } +} + +TEST(FieldSumAggTest, TestVariantType) { + { + ASSERT_OK_AND_ASSIGN(auto field_sum_agg, FieldSumAgg::Create(arrow::int8())); + auto agg_ret = field_sum_agg->Agg(static_cast(100), static_cast(15)); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 115); + ASSERT_OK_AND_ASSIGN(auto retract_ret, + field_sum_agg->Retract(static_cast(100), static_cast(15))); + ASSERT_EQ(DataDefine::GetVariantValue(retract_ret), 85); + } + { + ASSERT_OK_AND_ASSIGN(auto field_sum_agg, FieldSumAgg::Create(arrow::int16())); + auto agg_ret = field_sum_agg->Agg(static_cast(100), static_cast(15)); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 115); + ASSERT_OK_AND_ASSIGN(auto retract_ret, field_sum_agg->Retract(static_cast(100), + static_cast(15))); + ASSERT_EQ(DataDefine::GetVariantValue(retract_ret), 85); + } + { + ASSERT_OK_AND_ASSIGN(auto field_sum_agg, FieldSumAgg::Create(arrow::int32())); + auto agg_ret = field_sum_agg->Agg(static_cast(100), static_cast(15)); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 115); + ASSERT_OK_AND_ASSIGN(auto retract_ret, field_sum_agg->Retract(static_cast(100), + static_cast(15))); + ASSERT_EQ(DataDefine::GetVariantValue(retract_ret), 85); + } + { + ASSERT_OK_AND_ASSIGN(auto field_sum_agg, FieldSumAgg::Create(arrow::int64())); + auto agg_ret = field_sum_agg->Agg(static_cast(100), static_cast(15)); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), 115); + ASSERT_OK_AND_ASSIGN(auto retract_ret, field_sum_agg->Retract(static_cast(100), + static_cast(15))); + ASSERT_EQ(DataDefine::GetVariantValue(retract_ret), 85); + } + { + ASSERT_OK_AND_ASSIGN(auto field_sum_agg, FieldSumAgg::Create(arrow::float32())); + auto agg_ret = field_sum_agg->Agg(static_cast(100.2), static_cast(15.1)); + ASSERT_NEAR(DataDefine::GetVariantValue(agg_ret), 115.3, 0.0001); + ASSERT_OK_AND_ASSIGN(auto retract_ret, field_sum_agg->Retract(static_cast(100.2), + static_cast(15.1))); + ASSERT_NEAR(DataDefine::GetVariantValue(retract_ret), 85.1, 0.0001); + } + { + ASSERT_OK_AND_ASSIGN(auto field_sum_agg, FieldSumAgg::Create(arrow::float64())); + auto agg_ret = field_sum_agg->Agg(100.23, 15.11); + ASSERT_NEAR(DataDefine::GetVariantValue(agg_ret), 115.34, 0.0001); + ASSERT_OK_AND_ASSIGN(auto retract_ret, field_sum_agg->Retract(static_cast(100.23), + static_cast(15.11))); + ASSERT_NEAR(DataDefine::GetVariantValue(retract_ret), 85.12, 0.0001); + } + { + Decimal decimal1(/*precision=*/30, /*scale=*/20, + DecimalUtils::StrToInt128("12345678998765432145678").value()); + Decimal decimal2(/*precision=*/30, /*scale=*/20, + DecimalUtils::StrToInt128("2345679987639475677478").value()); + ASSERT_OK_AND_ASSIGN(auto field_sum_agg, FieldSumAgg::Create(arrow::decimal128(30, 20))); + auto agg_ret = field_sum_agg->Agg(decimal1, decimal2); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), + Decimal(/*precision=*/30, /*scale=*/20, + DecimalUtils::StrToInt128("14691358986404907823156").value())); + ASSERT_OK_AND_ASSIGN(auto retract_ret, field_sum_agg->Retract(decimal1, decimal2)); + ASSERT_EQ(DataDefine::GetVariantValue(retract_ret), + Decimal(/*precision=*/30, /*scale=*/20, + DecimalUtils::StrToInt128("9999999011125956468200").value())); + } +} + +TEST(FieldSumAggTest, TestInvalidType) { + auto field_sum_agg = FieldSumAgg::Create(arrow::boolean()); + ASSERT_FALSE(field_sum_agg.ok()); + ASSERT_TRUE(field_sum_agg.status().ToString().find("type bool not support in FieldSumAgg") != + std::string::npos) + << field_sum_agg.status().ToString(); +} +} // namespace paimon::test