Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ci/docker/integration/runner/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ org.apache.hudi:hudi-spark3.5-bundle_2.12:1.0.1,\
org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.3,\
org.apache.hadoop:hadoop-aws:3.3.4,\
com.amazonaws:aws-java-sdk-bundle:1.12.262,\
org.apache.hadoop:hadoop-azure:3.3.4,\
com.microsoft.azure:azure-storage:8.6.6,\
org.apache.spark:spark-avro_2.12:3.5.1"\
&& /spark-3.5.5-bin-hadoop3/bin/spark-shell --packages "$packages" \
&& find /root/.ivy2/ -name '*.jar' -exec ln -sf {} /spark-3.5.5-bin-hadoop3/jars/ \;
Expand Down
28 changes: 11 additions & 17 deletions src/Databases/DataLake/GlueCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,20 @@
#include <DataTypes/DataTypesNumber.h>


#include <IO/S3/Credentials.h>
#include <IO/S3/Client.h>
#include <IO/S3Settings.h>
#include <Databases/DataLake/Common.h>
#include <IO/CompressionMethod.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <Common/ProxyConfigurationResolverProvider.h>
#include <Databases/DataLake/Common.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/Utils.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
#include <IO/S3/Client.h>
#include <IO/S3/Credentials.h>
#include <IO/S3Settings.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/Utils.h>
#include <Common/ProxyConfigurationResolverProvider.h>

namespace DB::ErrorCodes
{
Expand Down Expand Up @@ -554,14 +555,7 @@ String GlueCatalog::resolveMetadataPathFromTableLocation(const String & table_lo
try
{
auto [metadata_version, metadata_path, compression_method] = DB::Iceberg::getLatestOrExplicitMetadataFileAndVersion(
object_storage,
table_path,
*storage_settings,
nullptr,
getContext(),
log.get(),
std::nullopt
);
object_storage, table_path, *storage_settings, nullptr, getContext(), log.get(), std::nullopt, DB::CompressionMethod::None);

LOG_TRACE(log, "Resolved metadata path '{}' (version {}) for table location '{}'", metadata_path, metadata_version, table_location);

Expand Down
2 changes: 1 addition & 1 deletion src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ size_t ReadBufferFromAzureBlobStorage::readBigAt(char * to, size_t n, size_t ran

ObjectMetadata ReadBufferFromAzureBlobStorage::getObjectMetadataFromTheLastRequest() const
{
if (last_object_metadata.get()->has_value())
if (!last_object_metadata.get()->has_value())
throw Exception(ErrorCodes::NOT_INITIALIZED, "No Azure object metadata available because there were no successful requests");

return last_object_metadata.get()->value();
Expand Down
4 changes: 2 additions & 2 deletions src/Interpreters/IcebergMetadataLog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ void insertRowToLogTable(
std::function<String()> get_row,
IcebergMetadataLogLevel row_log_level,
const String & table_path,
const String & file_path,
const Iceberg::IcebergPathFromMetadata & file_path,
std::optional<UInt64> row_in_file,
std::optional<Iceberg::PruningReturnStatus> pruning_status)
{
Expand All @@ -108,7 +108,7 @@ void insertRowToLogTable(
.query_id = local_context->getCurrentQueryId(),
.content_type = row_log_level,
.table_path = table_path,
.file_path = file_path,
.file_path = file_path.serialize(),
.metadata_content = get_row(),
.row_in_file = row_in_file,
.pruning_status = pruning_status});
Expand Down
3 changes: 2 additions & 1 deletion src/Interpreters/IcebergMetadataLog.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <Core/SettingsEnums.h>
#include <Interpreters/SystemLog.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergPath.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/ManifestFilesPruning.h>

namespace DB
Expand Down Expand Up @@ -33,7 +34,7 @@ void insertRowToLogTable(
std::function<String()> get_row,
IcebergMetadataLogLevel row_log_level,
const String & table_path,
const String & file_path,
const Iceberg::IcebergPathFromMetadata & file_path,
std::optional<UInt64> row_in_file,
std::optional<Iceberg::PruningReturnStatus> pruning_status);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <IO/WriteBufferFromString.h>
#include <Storages/ObjectStorage/DataLakes/Common/AvroForIcebergDeserializer.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/Constant.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergPath.h>
#include <Poco/JSON/Array.h>
#include <Poco/JSON/Object.h>
#include <Common/UniqueLock.h>
Expand Down Expand Up @@ -35,7 +36,9 @@ namespace DB::Iceberg
using namespace DB;

AvroForIcebergDeserializer::AvroForIcebergDeserializer(
std::unique_ptr<ReadBufferFromFileBase> buffer_, const std::string & manifest_file_path_, const DB::FormatSettings & format_settings)
std::unique_ptr<ReadBufferFromFileBase> buffer_,
const IcebergPathFromMetadata & manifest_file_path_,
const DB::FormatSettings & format_settings)
try
: buffer(std::move(buffer_))
, manifest_file_path(manifest_file_path_)
Expand Down Expand Up @@ -156,7 +159,8 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE
}


const auto file_path_key = getValueFromRowByName(row_index, c_data_file_file_path, TypeIndex::String).safeGet<String>();
const auto file_path_key = IcebergPathFromMetadata::deserialize(
getValueFromRowByName(row_index, c_data_file_file_path, TypeIndex::String).safeGet<String>());
/// NOTE: This is weird, because in manifest file partition looks like this:
/// {
/// ...
Expand Down Expand Up @@ -257,16 +261,18 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE
}
case FileContentType::POSITION_DELETE: {
/// reference_file_path can be absent in schema for some reason, though it is present in specification: https://iceberg.apache.org/spec/#manifests
std::optional<String> lower_reference_data_file_path = std::nullopt;
std::optional<String> upper_reference_data_file_path = std::nullopt;
std::optional<Iceberg::IcebergPathFromMetadata> lower_reference_data_file_path;
std::optional<Iceberg::IcebergPathFromMetadata> upper_reference_data_file_path;
bool bounds_set_by_referenced_data_file = false;
if (hasPath(c_data_file_referenced_data_file))
{
Field reference_file_path_field = getValueFromRowByName(row_index, c_data_file_referenced_data_file);
if (!reference_file_path_field.isNull())
{
lower_reference_data_file_path = reference_file_path_field.safeGet<String>();
upper_reference_data_file_path = reference_file_path_field.safeGet<String>();
lower_reference_data_file_path.emplace(
Iceberg::IcebergPathFromMetadata::deserialize(reference_file_path_field.safeGet<String>()));
upper_reference_data_file_path.emplace(
Iceberg::IcebergPathFromMetadata::deserialize(reference_file_path_field.safeGet<String>()));
bounds_set_by_referenced_data_file = true;
}
}
Expand All @@ -277,9 +283,9 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE
{
auto & [lower, upper] = it->second;
if (!lower.isNull())
lower_reference_data_file_path = lower.safeGet<String>();
lower_reference_data_file_path.emplace(Iceberg::IcebergPathFromMetadata::deserialize(lower.safeGet<String>()));
if (!upper.isNull())
upper_reference_data_file_path = upper.safeGet<String>();
upper_reference_data_file_path.emplace(Iceberg::IcebergPathFromMetadata::deserialize(upper.safeGet<String>()));
}
}
return std::make_shared<const ParsedManifestFileEntry>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <Core/Field.h>
#include <DataTypes/DataTypeTuple.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergPath.h>
#include <Common/SharedMutex.h>


Expand Down Expand Up @@ -36,7 +37,7 @@ class AvroForIcebergDeserializer
{
private:
std::unique_ptr<DB::ReadBufferFromFileBase> buffer;
std::string manifest_file_path;
Iceberg::IcebergPathFromMetadata manifest_file_path;
DB::ColumnPtr parsed_column;
std::shared_ptr<const DB::DataTypeTuple> parsed_column_data_type;
mutable std::optional<ColumnsWithTypeAndName> cache_parsed_columns TSA_GUARDED_BY(cache_mutex);
Expand All @@ -61,7 +62,7 @@ class AvroForIcebergDeserializer
public:
AvroForIcebergDeserializer(
std::unique_ptr<DB::ReadBufferFromFileBase> buffer_,
const std::string & manifest_file_path_,
const Iceberg::IcebergPathFromMetadata & manifest_file_path_,
const DB::FormatSettings & format_settings);

size_t rows() const;
Expand Down
1 change: 0 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ class IDataLakeMetadata : boost::noncopyable
const std::vector<Field> & /* partition_values */,
SharedHeader /* sample_block */,
const std::vector<String> & /* data_file_paths */,
StorageObjectStorageConfigurationPtr /* configuration */,
ContextPtr /* context */)
{
throwNotImplemented("commitExportPartitionTransaction");
Expand Down
Loading
Loading