Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ ReadFromFormatInfo IDataLakeMetadata::prepareReadingFromFormat(
DataFileMetaInfo::DataFileMetaInfo(
const Iceberg::IcebergSchemaProcessor & schema_processor,
Int32 schema_id,
const std::unordered_map<Int32, Iceberg::ColumnInfo> & columns_info_)
const std::unordered_map<Int32, Iceberg::ColumnInfo> & columns_info_,
bool any_stats_field_present)
: stats_were_read(any_stats_field_present)
{

std::vector<Int32> column_ids;
Expand Down Expand Up @@ -134,6 +136,10 @@ DataFileMetaInfo::DataFileMetaInfo(Poco::JSON::Object::Ptr file_info)

auto log = getLogger("DataFileMetaInfo");

// Missing field means old coordinator — default to false (safe: no absent-NULL).
if (file_info->has("stats_were_read"))
stats_were_read = static_cast<bool>(file_info->get("stats_were_read").convert<bool>());

if (file_info->has("columns"))
{
auto columns = file_info->getArray("columns");
Expand Down Expand Up @@ -179,6 +185,8 @@ Poco::JSON::Object::Ptr DataFileMetaInfo::toJson() const
{
Poco::JSON::Object::Ptr file_info = new Poco::JSON::Object();

file_info->set("stats_were_read", stats_were_read);

if (!columns_info.empty())
{
Poco::JSON::Array::Ptr columns = new Poco::JSON::Array();
Expand Down Expand Up @@ -210,6 +218,7 @@ constexpr size_t FIELD_MASK_ALL = 0x7;

void DataFileMetaInfo::serialize(WriteBuffer & out) const
{
writeIntBinary(static_cast<UInt8>(stats_were_read), out);
auto size = columns_info.size();
writeIntBinary(size, out);
for (const auto & column : columns_info)
Expand Down Expand Up @@ -240,6 +249,10 @@ DataFileMetaInfo DataFileMetaInfo::deserialize(ReadBuffer & in)
{
DataFileMetaInfo result;

UInt8 stats_were_read_uint;
readIntBinary(stats_were_read_uint, in);
result.stats_were_read = static_cast<bool>(stats_were_read_uint);

size_t size;
readIntBinary(size, in);

Expand Down
21 changes: 19 additions & 2 deletions src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,35 @@ class DataFileMetaInfo
std::optional<DB::Range> hyperrectangle;
};

// Extract metadata from Iceberg structure
// Extract metadata from Iceberg structure.
// any_stats_field_present must be true when at least one of the three per-column
// stats fields (value_counts, column_sizes, null_value_counts) was present in the
// manifest Avro writer schema. When false, the absent-NULL optimization is
// suppressed: we cannot distinguish a schema-evolution absent column from a column
// that merely has no stats written, so letting Parquet handle it is the only safe
// choice.
explicit DataFileMetaInfo(
const Iceberg::IcebergSchemaProcessor & schema_processor,
Int32 schema_id,
const std::unordered_map<Int32, Iceberg::ColumnInfo> & columns_info_);
const std::unordered_map<Int32, Iceberg::ColumnInfo> & columns_info_,
bool any_stats_field_present);

void serialize(WriteBuffer & out) const;
static DataFileMetaInfo deserialize(ReadBuffer & in);

bool empty() const { return columns_info.empty(); }

std::unordered_map<std::string, ColumnInfo> columns_info;

// True when the manifest Avro schema included at least one of value_counts,
// column_sizes, or null_value_counts. columns_info is authoritative only when
// this is true: a column absent from columns_info was not written to the data
// file and may be substituted with constant NULL.
//
// Serialization note: old nodes do not write this field. On deserialization of
// a missing field (JSON or binary), default to false so that the absent-NULL
// optimization is skipped entirely — safe fallback, minor optimization loss only.
bool stats_were_read = false;
};

using DataFileMetaInfoPtr = std::shared_ptr<DataFileMetaInfo>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,8 @@ ObjectInfoPtr IcebergIterator::next(size_t)
object_info->relative_path_with_metadata.setFileMetaInfo(std::make_shared<DataFileMetaInfo>(
*persistent_components.schema_processor,
table_schema_id, /// current schema id to use current column names
manifest_file_entry->columns_infos));
manifest_file_entry->columns_infos,
manifest_file_entry->any_stats_field_present));
ProfileEvents::increment(ProfileEvents::IcebergMetadataReturnedObjectInfos);
return object_info;
}
Expand Down
5 changes: 5 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,13 @@ ManifestFileContent::ManifestFileContent(
partition_key_value.emplace_back(value);

std::unordered_map<Int32, ColumnInfo> columns_infos;
bool any_stats_field_present = false;

for (const auto & path : {c_data_file_value_counts, c_data_file_column_sizes, c_data_file_null_value_counts})
{
if (manifest_file_deserializer.hasPath(path))
{
any_stats_field_present = true;
Field values_count = manifest_file_deserializer.getValueFromRowByName(i, path);
for (const auto & column_stats : values_count.safeGet<Array>())
{
Expand Down Expand Up @@ -454,6 +456,7 @@ ManifestFileContent::ManifestFileContent(
partition_key_value,
common_partition_specification,
columns_infos,
any_stats_field_present,
file_format,
/*lower_reference_data_file_path_ = */ std::nullopt,
/*upper_reference_data_file_path_ = */ std::nullopt,
Expand Down Expand Up @@ -500,6 +503,7 @@ ManifestFileContent::ManifestFileContent(
partition_key_value,
common_partition_specification,
columns_infos,
any_stats_field_present,
file_format,
lower_reference_data_file_path,
upper_reference_data_file_path,
Expand Down Expand Up @@ -532,6 +536,7 @@ ManifestFileContent::ManifestFileContent(
partition_key_value,
common_partition_specification,
columns_infos,
any_stats_field_present,
file_format,
/*lower_reference_data_file_path_ = */ std::nullopt,
/*upper_reference_data_file_path_ = */ std::nullopt,
Expand Down
6 changes: 6 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ struct ManifestFileEntry : public boost::noncopyable
DB::Row partition_key_value;
PartitionSpecification common_partition_specification;
std::unordered_map<Int32, ColumnInfo> columns_infos;
// True when at least one of value_counts, column_sizes, null_value_counts was
// present in the manifest Avro writer schema. Forwarded to DataFileMetaInfo so
// the absent-NULL optimization can be suppressed when stats are absent entirely.
bool any_stats_field_present = false;

String file_format;
std::optional<String> lower_reference_data_file_path; // For position delete files only.
Expand All @@ -104,6 +108,7 @@ struct ManifestFileEntry : public boost::noncopyable
DB::Row& partition_key_value_,
PartitionSpecification& common_partition_specification_,
std::unordered_map<Int32, ColumnInfo>& columns_infos_,
bool any_stats_field_present_,
const String& file_format_,
std::optional<String> lower_reference_data_file_path_,
std::optional<String> upper_reference_data_file_path_,
Expand All @@ -119,6 +124,7 @@ struct ManifestFileEntry : public boost::noncopyable
, partition_key_value(std::move(partition_key_value_))
, common_partition_specification(common_partition_specification_)
, columns_infos(std::move(columns_infos_))
, any_stats_field_present(any_stats_field_present_)
, file_format(file_format_)
, lower_reference_data_file_path(lower_reference_data_file_path_)
, upper_reference_data_file_path(upper_reference_data_file_path_)
Expand Down
58 changes: 34 additions & 24 deletions src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -763,35 +763,45 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
}
}
}
for (const auto & column : requested_columns_list)
/// Absent-column NULL injection: only when we have positive evidence from
/// stats that a column is not in the file. When stats_were_read is false
/// the manifest had no stats fields at all — we cannot distinguish a
/// schema-evolution absent column from a column that is present but has no
/// stats written. In that case skip this loop entirely; Parquet will return
/// NULL naturally for truly absent columns.
if (file_meta_data.value()->stats_were_read)
{
const auto & column_name = column.first;
for (const auto & column : requested_columns_list)
{
const auto & column_name = column.first;

if (file_meta_data.value()->columns_info.contains(column_name))
continue;
/// Column has stats → it exists in the file; do not treat as absent.
if (file_meta_data.value()->columns_info.contains(column_name))
continue;

if (!column.second.second.type->isNullable())
continue;
if (!column.second.second.type->isNullable())
continue;

/// Skip columns produced by prewhere or row-level filter expressions —
/// they are computed at read time, not stored in the file.
if (format_filter_info
&& ((format_filter_info->prewhere_info && column_name == format_filter_info->prewhere_info->prewhere_column_name)
|| (format_filter_info->row_level_filter && column_name == format_filter_info->row_level_filter->column_name)))
continue;
/// Skip columns produced by prewhere or row-level filter expressions —
/// they are computed at read time, not stored in the file.
if (format_filter_info
&& ((format_filter_info->prewhere_info && column_name == format_filter_info->prewhere_info->prewhere_column_name)
|| (format_filter_info->row_level_filter && column_name == format_filter_info->row_level_filter->column_name)))
continue;

/// Column is nullable and absent in file
constant_columns_with_values[column.second.first] =
ConstColumnWithValue{
column.second.second,
Field()
};
constant_columns.insert(column_name);

LOG_DEBUG(log, "In file {} constant column '{}' type '{}' with value 'NULL'",
object_info->getPath(),
column_name,
column.second.second.type);
/// Column is nullable and absent in file (schema evolution)
constant_columns_with_values[column.second.first] =
ConstColumnWithValue{
column.second.second,
Field()
};
constant_columns.insert(column_name);

LOG_DEBUG(log, "In file {} constant column '{}' type '{}' with value 'NULL'",
object_info->getPath(),
column_name,
column.second.second.type);
}
}
}

Expand Down
Loading
Loading