Skip to content
Open
51 changes: 40 additions & 11 deletions include/paimon/read_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <string>
#include <vector>

#include "arrow/c/abi.h"
#include "paimon/cache/cache.h"
#include "paimon/predicate/predicate.h"
#include "paimon/result.h"
Expand All @@ -44,7 +45,7 @@ class FileSystem;
class PAIMON_EXPORT ReadContext {
public:
ReadContext(const std::string& path, const std::string& branch,
const std::vector<std::string>& read_schema,
const std::vector<std::string>& read_field_names,
const std::vector<int32_t>& read_field_ids,
const std::shared_ptr<Predicate>& predicate, bool enable_predicate_filter,
bool enable_prefetch, uint32_t prefetch_batch_count,
Expand Down Expand Up @@ -75,8 +76,8 @@ class PAIMON_EXPORT ReadContext {
return options_;
}

const std::vector<std::string>& GetReadSchema() const {
return read_schema_;
const std::vector<std::string>& GetReadFieldNames() const {
return read_field_names_;
}

const std::vector<int32_t>& GetReadFieldIds() const {
Expand Down Expand Up @@ -130,10 +131,25 @@ class PAIMON_EXPORT ReadContext {
return cache_;
}

/// Whether a read schema (C ArrowSchema) for nested column pruning was provided.
bool HasReadSchema() const {
return read_schema_ != nullptr;
}

/// Get the read schema as a mutable C ArrowSchema pointer.
/// ImportSchema will consume (release) the schema content.
ArrowSchema* GetReadSchema() {
return read_schema_;
}

/// Set the read schema from a C ArrowSchema pointer. Does NOT take ownership.
/// Called internally by ReadContextBuilder.
void SetReadSchema(ArrowSchema* schema);
Comment thread
lucasfang marked this conversation as resolved.

private:
std::string path_;
std::string branch_;
std::vector<std::string> read_schema_;
std::vector<std::string> read_field_names_;
std::vector<int32_t> read_field_ids_;
std::shared_ptr<Predicate> predicate_;
bool enable_predicate_filter_;
Expand All @@ -151,6 +167,7 @@ class PAIMON_EXPORT ReadContext {
PrefetchCacheMode prefetch_cache_mode_;
CacheConfig cache_config_;
std::shared_ptr<Cache> cache_;
ArrowSchema* read_schema_ = nullptr;
};

/// `ReadContextBuilder` used to build a `ReadContext`, has input validation.
Expand All @@ -173,9 +190,9 @@ class PAIMON_EXPORT ReadContextBuilder {
///
/// @param read_field_names Vector of field names to read from the table.
/// @return Reference to this builder for method chaining.
/// @note Currently supports top-level field selection. Future versions may support
/// nested field selection using ArrowSchema for more granular projection
ReadContextBuilder& SetReadSchema(const std::vector<std::string>& read_field_names);
/// @note Currently supports top-level field selection. For nested field selection
/// use SetReadSchema(ArrowSchema*) instead.
ReadContextBuilder& SetReadFieldNames(const std::vector<std::string>& read_field_names);
/// Set the schema fields to read from the table.
///
/// If not set, all fields from the table schema will be read. This is useful for
Expand All @@ -184,12 +201,24 @@ class PAIMON_EXPORT ReadContextBuilder {
///
/// @param read_field_ids Vector of field ids to read from the table.
/// @return Reference to this builder for method chaining.
/// @note Currently supports top-level field selection. Future versions may support
/// nested field selection using ArrowSchema for more granular projection.
/// @note SetReadFieldIds() and SetReadSchema() are mutually exclusive.
/// Calling both will ignore the read schema set by SetReadSchema().
/// @note Currently supports top-level field selection.
/// @note SetReadFieldIds() and SetReadFieldNames() are mutually exclusive.
/// Calling both will ignore the read schema set by SetReadFieldNames().
ReadContextBuilder& SetReadFieldIds(const std::vector<int32_t>& read_field_ids);

/// Set the read Arrow Schema for nested column pruning.
///
/// The read schema is an Arrow C Data Interface schema where STRUCT types
/// may contain only a subset of the original sub-fields, enabling nested column
/// pruning to reduce I/O. Each Arrow field must carry a "paimon.id" metadata
/// entry for field matching.
///
/// @param read_schema Arrow C Schema. The caller retains ownership.
/// @return Reference to this builder for method chaining.
/// @note Priority: read_schema > read_field_ids > read_field_names.
/// When set, read_field_ids and read_field_names are ignored.
ReadContextBuilder& SetReadSchema(ArrowSchema* read_schema);

/// Set a configuration options map to set some option entries which are not defined in the
/// table schema or whose values you want to overwrite.
/// @note The options map will clear the options added by `AddOption()` before.
Expand Down
2 changes: 2 additions & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ set(PAIMON_CORE_SRCS
core/utils/blob_view_lookup.cpp
core/utils/consumer_manager.cpp
core/utils/field_mapping.cpp
core/utils/nested_projection_utils.cpp
core/utils/file_store_path_factory.cpp
core/utils/file_utils.cpp
core/utils/manifest_meta_reader.cpp
Expand Down Expand Up @@ -735,6 +736,7 @@ if(PAIMON_BUILD_TESTS)
core/utils/consumer_manager_test.cpp
core/utils/file_store_path_factory_cache_test.cpp
core/utils/field_mapping_test.cpp
core/utils/nested_projection_utils_test.cpp
core/utils/file_store_path_factory_test.cpp
core/utils/file_utils_test.cpp
core/utils/manifest_meta_reader_test.cpp
Expand Down
5 changes: 1 addition & 4 deletions src/paimon/common/memory/memory_segment_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -555,10 +555,7 @@ TEST(MemorySegmentTest, TestDoubleAccess) {
delete[] occupied;
}

// ------------------------------------------------------------------------
// Bulk Byte Movements
// ------------------------------------------------------------------------

// Bulk Byte Movements
TEST(MemorySegmentTest, TestBulkByteAccess) {
auto pool = paimon::GetDefaultPool();
// test expected correct behavior with default offset / length
Expand Down
3 changes: 3 additions & 0 deletions src/paimon/common/types/data_field.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class DataField : public Jsonizable<DataField> {

static constexpr char FIELD_ID[] = "paimon.id";
static constexpr char DESCRIPTION[] = "paimon.description";
/// Metadata key for map field selected keys. The value is a comma-separated
/// string of key names, e.g. 'key1,key2'. Only string-keyed maps are supported.
static constexpr char MAP_SELECTED_KEYS[] = "paimon.map.selected-keys";

public:
static std::shared_ptr<arrow::Field> ConvertDataFieldToArrowField(const DataField& field);
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/core/global_index/global_index_write_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Result<std::unique_ptr<BatchReader>> CreateBatchReader(
.WithFileSystem(core_options.GetFileSystem())
.EnablePrefetch(true)
.WithMemoryPool(pool)
.SetReadSchema({field_name, SpecialFields::RowId().Name()});
.SetReadFieldNames({field_name, SpecialFields::RowId().Name()});
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<ReadContext> read_context,
read_context_builder.Finish());
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<TableRead> table_read,
Expand Down
67 changes: 50 additions & 17 deletions src/paimon/core/io/field_mapping_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <cassert>
#include <cstddef>
#include <set>
#include <utility>

#include "arrow/api.h"
Expand All @@ -35,6 +36,7 @@
#include "paimon/core/casting/cast_executor.h"
#include "paimon/core/casting/casting_utils.h"
#include "paimon/core/utils/field_mapping.h"
#include "paimon/core/utils/nested_projection_utils.h"
#include "paimon/memory/bytes.h"
#include "paimon/reader/batch_reader.h"

Expand Down Expand Up @@ -74,6 +76,23 @@ FieldMappingReader::FieldMappingReader(int32_t field_count,
non_partition_info_.non_partition_read_schema[i].Name()) {
need_mapping_ = true;
}
// Map selected-keys metadata also requires mapping so that
// FilterMapArrayBySelectedKeys can filter out unwanted entries.
if (!need_mapping_ &&
non_partition_info_.non_partition_read_schema[i].Type()->id() == arrow::Type::MAP) {
auto selected_keys_or = NestedProjectionUtils::GetMapSelectedKeys(
non_partition_info_.non_partition_read_schema[i].ArrowField());
if (!selected_keys_or.ok()) {
// Keep mapping enabled so the parse error can be surfaced in
// MappingFields where Status can be returned.
need_mapping_ = true;
continue;
}
auto& selected_keys = selected_keys_or.value();
if (!selected_keys.empty()) {
need_mapping_ = true;
}
}
}
}

Expand Down Expand Up @@ -142,9 +161,9 @@ Result<BatchReader::ReadBatchWithBitmap> FieldMappingReader::NextBatchWithBitmap
// mapping non-partition array
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> casted_non_partition_array,
CastNonPartitionArrayIfNeed(non_partition_array));
MappingFields(casted_non_partition_array, non_partition_info_.non_partition_read_schema,
non_partition_info_.idx_in_target_read_schema, &target_array,
&target_field_names);
PAIMON_RETURN_NOT_OK(MappingFields(
casted_non_partition_array, non_partition_info_.non_partition_read_schema,
non_partition_info_.idx_in_target_read_schema, &target_array, &target_field_names));

// mapping partition array
if (partition_info_ != std::nullopt) {
Expand All @@ -153,9 +172,9 @@ Result<BatchReader::ReadBatchWithBitmap> FieldMappingReader::NextBatchWithBitmap
GeneratePartitionArray(non_partition_array->length()));
}
auto trim_partition_array = partition_array_->Slice(0, non_partition_array->length());
MappingFields(trim_partition_array, partition_info_.value().partition_read_schema,
partition_info_.value().idx_in_target_read_schema, &target_array,
&target_field_names);
PAIMON_RETURN_NOT_OK(MappingFields(
trim_partition_array, partition_info_.value().partition_read_schema,
partition_info_.value().idx_in_target_read_schema, &target_array, &target_field_names));
}
// mapping non-exist array
if (non_exist_field_info_ != std::nullopt) {
Expand All @@ -164,9 +183,10 @@ Result<BatchReader::ReadBatchWithBitmap> FieldMappingReader::NextBatchWithBitmap
GenerateNonExistArray(non_partition_array->length()));
}
auto trim_non_exist_array = non_exist_array_->Slice(0, non_partition_array->length());
MappingFields(trim_non_exist_array, non_exist_field_info_.value().non_exist_read_schema,
non_exist_field_info_.value().idx_in_target_read_schema, &target_array,
&target_field_names);
PAIMON_RETURN_NOT_OK(MappingFields(trim_non_exist_array,
non_exist_field_info_.value().non_exist_read_schema,
non_exist_field_info_.value().idx_in_target_read_schema,
&target_array, &target_field_names));
}

// construct target array
Expand Down Expand Up @@ -283,20 +303,33 @@ Result<std::shared_ptr<arrow::Array>> FieldMappingReader::GenerateNonExistArray(
return arrow_array;
}

void FieldMappingReader::MappingFields(const std::shared_ptr<arrow::Array>& data_array,
const std::vector<DataField>& read_fields_of_data_array,
const std::vector<int32_t>& idx_in_target_schema,
arrow::ArrayVector* target_array,
std::vector<std::string>* target_field_names) {
Status FieldMappingReader::MappingFields(const std::shared_ptr<arrow::Array>& data_array,
const std::vector<DataField>& read_fields_of_data_array,
const std::vector<int32_t>& idx_in_target_schema,
arrow::ArrayVector* target_array,
std::vector<std::string>* target_field_names) {
auto* struct_array = arrow::internal::checked_cast<arrow::StructArray*>(data_array.get());
assert(struct_array);
assert(struct_array->fields().size() == idx_in_target_schema.size());
for (size_t i = 0; i < idx_in_target_schema.size(); i++) {
// target type may be string type, but after adapter transform, type may be dictionary,
// need reconstruct struct type
(*target_array)[idx_in_target_schema[i]] = struct_array->field(i);
std::shared_ptr<arrow::Array> field_array = struct_array->field(i);

// Filter map entries by selected keys if metadata is present.
if (field_array->type()->id() == arrow::Type::MAP) {
PAIMON_ASSIGN_OR_RAISE(std::vector<std::string> selected_keys,
NestedProjectionUtils::GetMapSelectedKeys(
read_fields_of_data_array[i].ArrowField()));
if (!selected_keys.empty()) {
PAIMON_ASSIGN_OR_RAISE(field_array,
NestedProjectionUtils::FilterMapArrayBySelectedKeys(
field_array, selected_keys));
}
}

(*target_array)[idx_in_target_schema[i]] = std::move(field_array);
(*target_field_names)[idx_in_target_schema[i]] = read_fields_of_data_array[i].Name();
}
return Status::OK();
}

} // namespace paimon
10 changes: 5 additions & 5 deletions src/paimon/core/io/field_mapping_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ class FieldMappingReader : public FileBatchReader {
Result<std::shared_ptr<arrow::Array>> CastNonPartitionArrayIfNeed(
const std::shared_ptr<arrow::Array>& src_array) const;

static void MappingFields(const std::shared_ptr<arrow::Array>& src_array,
const std::vector<DataField>& read_fields_of_data_array,
const std::vector<int32_t>& idx_in_target_schema,
arrow::ArrayVector* target_array,
std::vector<std::string>* target_field_names);
static Status MappingFields(const std::shared_ptr<arrow::Array>& src_array,
const std::vector<DataField>& read_fields_of_data_array,
const std::vector<int32_t>& idx_in_target_schema,
arrow::ArrayVector* target_array,
std::vector<std::string>* target_field_names);

private:
bool need_mapping_ = false;
Expand Down
1 change: 1 addition & 0 deletions src/paimon/core/io/field_mapping_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "paimon/memory/memory_pool.h"
#include "paimon/predicate/literal.h"
#include "paimon/predicate/predicate_builder.h"
#include "paimon/testing/mock/mock_file_batch_reader.h"
#include "paimon/testing/utils/binary_row_generator.h"
#include "paimon/testing/utils/read_result_collector.h"
#include "paimon/testing/utils/testharness.h"
Expand Down
Loading