Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "paimon/core/mergetree/compact/aggregate/aggregate_merge_function.h"

#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <variant>

#include "arrow/api.h"
#include "paimon/common/data/data_define.h"
#include "paimon/common/types/row_kind.h"
#include "paimon/common/utils/internal_row_utils.h"
#include "paimon/core/core_options.h"
#include "paimon/core/mergetree/compact/aggregate/field_aggregator_factory.h"
#include "paimon/core/mergetree/compact/aggregate/field_last_non_null_value_agg.h"
#include "paimon/core/mergetree/compact/aggregate/field_last_value_agg.h"
#include "paimon/core/mergetree/compact/aggregate/field_primary_key_agg.h"

namespace paimon {
Result<std::unique_ptr<AggregateMergeFunction>> AggregateMergeFunction::Create(
const std::shared_ptr<arrow::Schema>& value_schema,
const std::vector<std::string>& primary_keys, const CoreOptions& options) {
std::vector<std::unique_ptr<FieldAggregator>> aggregators;
aggregators.reserve(value_schema->num_fields());
for (int32_t i = 0; i < value_schema->num_fields(); i++) {
const auto& field_name = value_schema->field(i)->name();
const auto& field_type = value_schema->field(i)->type();
PAIMON_ASSIGN_OR_RAISE(std::string str_agg,
GetAggFuncName(field_name, primary_keys, options));
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FieldAggregator> agg,
FieldAggregatorFactory::CreateFieldAggregator(field_name, field_type,
str_agg, options));
aggregators.push_back(std::move(agg));
}

bool remove_record_on_delete = options.AggregationRemoveRecordOnDelete();

PAIMON_ASSIGN_OR_RAISE(std::vector<InternalRow::FieldGetterFunc> getters,
InternalRowUtils::CreateFieldGetters(value_schema, /*use_view=*/true));
return std::unique_ptr<AggregateMergeFunction>(new AggregateMergeFunction(
std::move(getters), std::move(aggregators), remove_record_on_delete));
}

Status AggregateMergeFunction::Add(KeyValue&& kv) {
// When removeRecordOnDelete is enabled, if we receive a DELETE row,
// mark the current row for deletion and initialize the row with input values.
if (remove_record_on_delete_ && kv.value_kind == RowKind::Delete()) {
current_delete_row_ = true;
row_ = std::make_unique<GenericRow>(getters_.size());
for (size_t i = 0; i < getters_.size(); i++) {
row_->SetField(i, getters_[i](*(kv.value)));
}
row_->AddDataHolder(std::move(kv.value));
latest_kv_ = std::move(kv);
return Status::OK();
}

current_delete_row_ = false;
bool is_retract = kv.value_kind->IsRetract();
for (size_t i = 0; i < getters_.size(); i++) {
auto accumulator = getters_[i](*row_);
auto input_field = getters_[i](*(kv.value));
VariantType merged_field;
if (is_retract) {
PAIMON_ASSIGN_OR_RAISE(merged_field,
aggregators_[i]->Retract(accumulator, input_field));
} else {
merged_field = aggregators_[i]->Agg(accumulator, input_field);
}
row_->SetField(i, merged_field);
}
row_->AddDataHolder(std::move(kv.value));
latest_kv_ = std::move(kv);
return Status::OK();
}

Result<std::optional<KeyValue>> AggregateMergeFunction::GetResult() {
assert(latest_kv_);
latest_kv_.value().value = std::move(row_);
latest_kv_.value().value_kind = current_delete_row_ ? RowKind::Delete() : RowKind::Insert();
latest_kv_.value().level = KeyValue::UNKNOWN_LEVEL;
return std::move(latest_kv_);
}

Result<std::string> AggregateMergeFunction::GetAggFuncName(
const std::string& field_name, const std::vector<std::string>& primary_keys,
const CoreOptions& options) {
const auto& seq_fields = options.GetSequenceField();
auto seq_iter = std::find(seq_fields.begin(), seq_fields.end(), field_name);
if (seq_iter != seq_fields.end()) {
// no agg for sequence fields, use last_value to do cover
return std::string(FieldLastValueAgg::NAME);
}

auto pk_iter = std::find(primary_keys.begin(), primary_keys.end(), field_name);
if (pk_iter != primary_keys.end()) {
return std::string(FieldPrimaryKeyAgg::NAME);
}

PAIMON_ASSIGN_OR_RAISE(std::optional<std::string> str_agg, options.GetFieldAggFunc(field_name));
if (str_agg == std::nullopt) {
str_agg = options.GetFieldsDefaultFunc();
}
if (!str_agg) {
str_agg = std::string(FieldLastNonNullValueAgg::NAME);
}
return str_agg.value();
}
} // namespace paimon
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once
#include <cassert>
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <vector>

#include "arrow/api.h"
#include "paimon/common/data/data_define.h"
#include "paimon/common/data/generic_row.h"
#include "paimon/common/data/internal_row.h"
#include "paimon/core/core_options.h"
#include "paimon/core/key_value.h"
#include "paimon/core/mergetree/compact/aggregate/field_aggregator.h"
#include "paimon/core/mergetree/compact/merge_function.h"
#include "paimon/result.h"
#include "paimon/status.h"

namespace arrow {
class Schema;
} // namespace arrow

namespace paimon {
class CoreOptions;

/// A `MergeFunction` where key is primary key (unique) and value is the partial record,
/// pre-aggregate non-null fields on merge.
class AggregateMergeFunction : public MergeFunction {
public:
// value_schema is the schema of parameter value in KeyValue object
static Result<std::unique_ptr<AggregateMergeFunction>> Create(
const std::shared_ptr<arrow::Schema>& value_schema,
const std::vector<std::string>& primary_keys, const CoreOptions& options);

void Reset() override {
latest_kv_ = std::nullopt;
current_delete_row_ = false;
row_ = std::make_unique<GenericRow>(getters_.size());
for (const auto& agg : aggregators_) {
agg->Reset();
}
}

Status Add(KeyValue&& kv) override;

Result<std::optional<KeyValue>> GetResult() override;

private:
AggregateMergeFunction(std::vector<InternalRow::FieldGetterFunc>&& getters,
std::vector<std::unique_ptr<FieldAggregator>>&& aggregators,
bool remove_record_on_delete)
: getters_(std::move(getters)),
aggregators_(std::move(aggregators)),
remove_record_on_delete_(remove_record_on_delete),
row_(std::make_unique<GenericRow>(getters_.size())) {
assert(getters_.size() == aggregators_.size());
}
static Result<std::string> GetAggFuncName(const std::string& field_name,
const std::vector<std::string>& primary_keys,
const CoreOptions& options);

private:
std::vector<InternalRow::FieldGetterFunc> getters_;
std::vector<std::unique_ptr<FieldAggregator>> aggregators_;
bool remove_record_on_delete_;
bool current_delete_row_ = false;
std::optional<KeyValue> latest_kv_;
std::unique_ptr<GenericRow> row_;
};
} // namespace paimon
Loading