Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 143 additions & 50 deletions src/Databases/DataLake/RestCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <Interpreters/Context.h>
#include <filesystem>

#include <Storages/ObjectStorage/DataLakes/Iceberg/Constant.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h>
#include <Server/HTTP/HTMLForm.h>
#include <Formats/FormatFactory.h>
Expand All @@ -44,6 +45,8 @@
#include <Poco/Net/SSLManager.h>
#include <Poco/StreamCopier.h>

#include <sstream>


namespace DB::ErrorCodes
{
Expand Down Expand Up @@ -118,6 +121,139 @@ String encodeNamespaceForURI(const String & namespace_name)

}

namespace
{
Poco::JSON::Object::Ptr cloneJsonObject(const Poco::JSON::Object::Ptr & obj)
{
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
obj->stringify(oss);

Poco::JSON::Parser parser;
return parser.parse(oss.str()).extract<Poco::JSON::Object::Ptr>();
}
}

Poco::JSON::Object::Ptr buildUpdateMetadataRequestBody(
const String & namespace_name, const String & table_name, Poco::JSON::Object::Ptr new_snapshot)
{
if (!new_snapshot)
return nullptr;

Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object;
{
Poco::JSON::Object::Ptr identifier = new Poco::JSON::Object;
identifier->set("name", table_name);
Poco::JSON::Array::Ptr namespaces = new Poco::JSON::Array;
namespaces->add(namespace_name);
identifier->set("namespace", namespaces);

request_body->set("identifier", identifier);
}

// Schema-change commit path (ALTER TABLE add/drop/modify/rename column).
if (new_snapshot->has(DB::Iceberg::f_schemas))
{
if (!new_snapshot->has(DB::Iceberg::f_current_schema_id))
throw DB::Exception(
DB::ErrorCodes::DATALAKE_DATABASE_ERROR,
"Iceberg update-metadata for {}.{} is missing '{}' field",
namespace_name, table_name, DB::Iceberg::f_current_schema_id);

const Int32 new_schema_id = new_snapshot->getValue<Int32>(DB::Iceberg::f_current_schema_id);
const Int32 old_schema_id = new_schema_id - 1;

Poco::JSON::Object::Ptr new_schema_obj;
auto schemas = new_snapshot->getArray(DB::Iceberg::f_schemas);
for (UInt32 i = 0; i < schemas->size(); ++i)
{
auto s = schemas->getObject(i);
if (s->getValue<Int32>(DB::Iceberg::f_schema_id) == new_schema_id)
{
new_schema_obj = s;
break;
}
}
if (!new_schema_obj)
throw DB::Exception(
DB::ErrorCodes::DATALAKE_DATABASE_ERROR,
"Iceberg update-metadata for {}.{}: no schema object matching current-schema-id={}",
namespace_name, table_name, new_schema_id);

Poco::JSON::Object::Ptr schema_for_rest = cloneJsonObject(new_schema_obj);
if (!schema_for_rest->has("identifier-field-ids"))
{
Poco::JSON::Array::Ptr empty_identifier_field_ids = new Poco::JSON::Array;
schema_for_rest->set("identifier-field-ids", empty_identifier_field_ids);
}

if (old_schema_id >= 0)
{
Poco::JSON::Object::Ptr requirement = new Poco::JSON::Object;
requirement->set("type", "assert-current-schema-id");
requirement->set("current-schema-id", old_schema_id);

Poco::JSON::Array::Ptr requirements = new Poco::JSON::Array;
requirements->add(requirement);
request_body->set("requirements", requirements);
}

Poco::JSON::Array::Ptr updates = new Poco::JSON::Array;
{
Poco::JSON::Object::Ptr add_schema = new Poco::JSON::Object;
add_schema->set("action", "add-schema");
add_schema->set("schema", schema_for_rest);
if (new_snapshot->has(DB::Iceberg::f_last_column_id))
add_schema->set("last-column-id", new_snapshot->getValue<Int32>(DB::Iceberg::f_last_column_id));
updates->add(add_schema);
}
{
Poco::JSON::Object::Ptr set_current_schema = new Poco::JSON::Object;
set_current_schema->set("action", "set-current-schema");
set_current_schema->set("schema-id", new_schema_id);
updates->add(set_current_schema);
}
request_body->set("updates", updates);
}
else
{
// Snapshot-append commit path (INSERT / position-delete mutation).
if (new_snapshot->has("parent-snapshot-id"))
{
auto parent_snapshot_id = new_snapshot->getValue<Int64>("parent-snapshot-id");
if (parent_snapshot_id != -1)
{
Poco::JSON::Object::Ptr requirement = new Poco::JSON::Object;
requirement->set("type", "assert-ref-snapshot-id");
requirement->set("ref", "main");
requirement->set("snapshot-id", parent_snapshot_id);

Poco::JSON::Array::Ptr requirements = new Poco::JSON::Array;
requirements->add(requirement);
request_body->set("requirements", requirements);
}
}

Poco::JSON::Array::Ptr updates = new Poco::JSON::Array;
{
Poco::JSON::Object::Ptr add_snapshot = new Poco::JSON::Object;
add_snapshot->set("action", "add-snapshot");
add_snapshot->set("snapshot", new_snapshot);
updates->add(add_snapshot);
}
{
Poco::JSON::Object::Ptr set_snapshot = new Poco::JSON::Object;
set_snapshot->set("action", "set-snapshot-ref");
set_snapshot->set("ref-name", "main");
set_snapshot->set("type", "branch");
set_snapshot->set("snapshot-id", new_snapshot->getValue<Int64>("snapshot-id"));
updates->add(set_snapshot);
}
request_body->set("updates", updates);
}

return request_body;
}

std::string RestCatalog::Config::toString() const
{
DB::WriteBufferFromOwnString wb;
Expand Down Expand Up @@ -1085,62 +1221,19 @@ bool RestCatalog::updateMetadata(const String & namespace_name, const String & t
{
const std::string endpoint = fmt::format("{}/namespaces/{}/tables/{}", base_url, namespace_name, table_name);

Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object;
{
Poco::JSON::Object::Ptr identifier = new Poco::JSON::Object;
identifier->set("name", table_name);
Poco::JSON::Array::Ptr namespaces = new Poco::JSON::Array;
namespaces->add(namespace_name);
identifier->set("namespace", namespaces);

request_body->set("identifier", identifier);
}

if (new_snapshot->has("parent-snapshot-id"))
{
auto parent_snapshot_id = new_snapshot->getValue<Int64>("parent-snapshot-id");
if (parent_snapshot_id != -1)
{
Poco::JSON::Object::Ptr requirement = new Poco::JSON::Object;
requirement->set("type", "assert-ref-snapshot-id");
requirement->set("ref", "main");
requirement->set("snapshot-id", parent_snapshot_id);

Poco::JSON::Array::Ptr requirements = new Poco::JSON::Array;
requirements->add(requirement);

request_body->set("requirements", requirements);
}
}

{
Poco::JSON::Array::Ptr updates = new Poco::JSON::Array;

{
Poco::JSON::Object::Ptr add_snapshot = new Poco::JSON::Object;
add_snapshot->set("action", "add-snapshot");
add_snapshot->set("snapshot", new_snapshot);
updates->add(add_snapshot);
}

{
Poco::JSON::Object::Ptr set_snapshot = new Poco::JSON::Object;
set_snapshot->set("action", "set-snapshot-ref");
set_snapshot->set("ref-name", "main");
set_snapshot->set("type", "branch");
set_snapshot->set("snapshot-id", new_snapshot->getValue<Int64>("snapshot-id"));

updates->add(set_snapshot);
}
request_body->set("updates", updates);
}
// Throws DB::Exception(DATALAKE_DATABASE_ERROR) on malformed metadata (programming error).
auto request_body = buildUpdateMetadataRequestBody(namespace_name, table_name, new_snapshot);
if (!request_body)
return true; // nothing to commit

try
{
sendRequest(endpoint, request_body);
}
catch (const DB::HTTPException &)
catch (const DB::HTTPException & ex)
{
LOG_WARNING(log, "Iceberg REST updateMetadata for {}.{} failed: {}",
namespace_name, table_name, ex.displayText());
return false;
}
return true;
Expand Down
10 changes: 10 additions & 0 deletions src/Databases/DataLake/RestCatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ struct AccessToken
}
};

/// Builds the JSON body for `POST .../namespaces/{ns}/tables/{table}` (Iceberg REST update).
///
/// Returns `nullptr` when `new_snapshot` is null (nothing to commit). Throws
/// `DB::Exception(DATALAKE_DATABASE_ERROR)` with a specific message when the metadata
/// blob is malformed (e.g. missing `current-schema-id`, no schema object matching it).
Poco::JSON::Object::Ptr buildUpdateMetadataRequestBody(
const String & namespace_name,
const String & table_name,
Poco::JSON::Object::Ptr new_snapshot);

class RestCatalog : public ICatalog, public DB::WithContext
{
public:
Expand Down
Loading
Loading