From d29aec89a83a2ba7aa0385b0118cdc7d1ebf801a Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Thu, 14 May 2026 22:07:05 +0200 Subject: [PATCH] Fixed alter table operations(add, modify, drop, rename) for iceberg Signed-off-by: Kanthi Subramanian --- src/Databases/DataLake/RestCatalog.cpp | 193 +++++++++++++----- src/Databases/DataLake/RestCatalog.h | 10 + .../gtest_rest_catalog_update_metadata.cpp | 186 +++++++++++++++++ .../DataLakes/DataLakeConfiguration.h | 15 +- .../DataLakes/IDataLakeMetadata.h | 7 +- .../DataLakes/Iceberg/IcebergMetadata.cpp | 13 +- .../DataLakes/Iceberg/IcebergMetadata.h | 7 +- .../DataLakes/Iceberg/Mutations.cpp | 67 ++++-- .../DataLakes/Iceberg/Mutations.h | 6 +- .../ObjectStorage/StorageObjectStorage.cpp | 11 +- .../StorageObjectStorageConfiguration.h | 3 +- .../test_writes_add_column.py | 72 +++++++ .../test_writes_drop_column.py | 62 ++++++ .../test_writes_modify_column.py | 70 +++++++ 14 files changed, 639 insertions(+), 83 deletions(-) create mode 100644 src/Databases/DataLake/tests/gtest_rest_catalog_update_metadata.cpp create mode 100644 tests/integration/test_storage_iceberg_no_spark/test_writes_add_column.py create mode 100644 tests/integration/test_storage_iceberg_no_spark/test_writes_drop_column.py create mode 100644 tests/integration/test_storage_iceberg_no_spark/test_writes_modify_column.py diff --git a/src/Databases/DataLake/RestCatalog.cpp b/src/Databases/DataLake/RestCatalog.cpp index d7bf5ff9dd3c..d90f81a3e41f 100644 --- a/src/Databases/DataLake/RestCatalog.cpp +++ b/src/Databases/DataLake/RestCatalog.cpp @@ -31,6 +31,7 @@ #include #include +#include #include #include #include @@ -44,6 +45,8 @@ #include #include +#include + namespace DB::ErrorCodes { @@ -118,6 +121,139 @@ String encodeNamespaceForURI(const String & namespace_name) } +namespace +{ +Poco::JSON::Object::Ptr cloneJsonObject(const Poco::JSON::Object::Ptr & obj) +{ + std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM + obj->stringify(oss); + + Poco::JSON::Parser parser; + return parser.parse(oss.str()).extract(); +} +} + +Poco::JSON::Object::Ptr buildUpdateMetadataRequestBody( + const String & namespace_name, const String & table_name, Poco::JSON::Object::Ptr new_snapshot) +{ + if (!new_snapshot) + return nullptr; + + Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object; + { + Poco::JSON::Object::Ptr identifier = new Poco::JSON::Object; + identifier->set("name", table_name); + Poco::JSON::Array::Ptr namespaces = new Poco::JSON::Array; + namespaces->add(namespace_name); + identifier->set("namespace", namespaces); + + request_body->set("identifier", identifier); + } + + // Schema-change commit path (ALTER TABLE add/drop/modify/rename column). + if (new_snapshot->has(DB::Iceberg::f_schemas)) + { + if (!new_snapshot->has(DB::Iceberg::f_current_schema_id)) + throw DB::Exception( + DB::ErrorCodes::DATALAKE_DATABASE_ERROR, + "Iceberg update-metadata for {}.{} is missing '{}' field", + namespace_name, table_name, DB::Iceberg::f_current_schema_id); + + const Int32 new_schema_id = new_snapshot->getValue(DB::Iceberg::f_current_schema_id); + const Int32 old_schema_id = new_schema_id - 1; + + Poco::JSON::Object::Ptr new_schema_obj; + auto schemas = new_snapshot->getArray(DB::Iceberg::f_schemas); + for (UInt32 i = 0; i < schemas->size(); ++i) + { + auto s = schemas->getObject(i); + if (s->getValue(DB::Iceberg::f_schema_id) == new_schema_id) + { + new_schema_obj = s; + break; + } + } + if (!new_schema_obj) + throw DB::Exception( + DB::ErrorCodes::DATALAKE_DATABASE_ERROR, + "Iceberg update-metadata for {}.{}: no schema object matching current-schema-id={}", + namespace_name, table_name, new_schema_id); + + Poco::JSON::Object::Ptr schema_for_rest = cloneJsonObject(new_schema_obj); + if (!schema_for_rest->has("identifier-field-ids")) + { + Poco::JSON::Array::Ptr empty_identifier_field_ids = new Poco::JSON::Array; + schema_for_rest->set("identifier-field-ids", empty_identifier_field_ids); + } + + if (old_schema_id >= 0) + { + Poco::JSON::Object::Ptr requirement = new Poco::JSON::Object; + requirement->set("type", "assert-current-schema-id"); + requirement->set("current-schema-id", old_schema_id); + + Poco::JSON::Array::Ptr requirements = new Poco::JSON::Array; + requirements->add(requirement); + request_body->set("requirements", requirements); + } + + Poco::JSON::Array::Ptr updates = new Poco::JSON::Array; + { + Poco::JSON::Object::Ptr add_schema = new Poco::JSON::Object; + add_schema->set("action", "add-schema"); + add_schema->set("schema", schema_for_rest); + if (new_snapshot->has(DB::Iceberg::f_last_column_id)) + add_schema->set("last-column-id", new_snapshot->getValue(DB::Iceberg::f_last_column_id)); + updates->add(add_schema); + } + { + Poco::JSON::Object::Ptr set_current_schema = new Poco::JSON::Object; + set_current_schema->set("action", "set-current-schema"); + set_current_schema->set("schema-id", new_schema_id); + updates->add(set_current_schema); + } + request_body->set("updates", updates); + } + else + { + // Snapshot-append commit path (INSERT / position-delete mutation). + if (new_snapshot->has("parent-snapshot-id")) + { + auto parent_snapshot_id = new_snapshot->getValue("parent-snapshot-id"); + if (parent_snapshot_id != -1) + { + Poco::JSON::Object::Ptr requirement = new Poco::JSON::Object; + requirement->set("type", "assert-ref-snapshot-id"); + requirement->set("ref", "main"); + requirement->set("snapshot-id", parent_snapshot_id); + + Poco::JSON::Array::Ptr requirements = new Poco::JSON::Array; + requirements->add(requirement); + request_body->set("requirements", requirements); + } + } + + Poco::JSON::Array::Ptr updates = new Poco::JSON::Array; + { + Poco::JSON::Object::Ptr add_snapshot = new Poco::JSON::Object; + add_snapshot->set("action", "add-snapshot"); + add_snapshot->set("snapshot", new_snapshot); + updates->add(add_snapshot); + } + { + Poco::JSON::Object::Ptr set_snapshot = new Poco::JSON::Object; + set_snapshot->set("action", "set-snapshot-ref"); + set_snapshot->set("ref-name", "main"); + set_snapshot->set("type", "branch"); + set_snapshot->set("snapshot-id", new_snapshot->getValue("snapshot-id")); + updates->add(set_snapshot); + } + request_body->set("updates", updates); + } + + return request_body; +} + std::string RestCatalog::Config::toString() const { DB::WriteBufferFromOwnString wb; @@ -1085,62 +1221,19 @@ bool RestCatalog::updateMetadata(const String & namespace_name, const String & t { const std::string endpoint = fmt::format("{}/namespaces/{}/tables/{}", base_url, namespace_name, table_name); - Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object; - { - Poco::JSON::Object::Ptr identifier = new Poco::JSON::Object; - identifier->set("name", table_name); - Poco::JSON::Array::Ptr namespaces = new Poco::JSON::Array; - namespaces->add(namespace_name); - identifier->set("namespace", namespaces); - - request_body->set("identifier", identifier); - } - - if (new_snapshot->has("parent-snapshot-id")) - { - auto parent_snapshot_id = new_snapshot->getValue("parent-snapshot-id"); - if (parent_snapshot_id != -1) - { - Poco::JSON::Object::Ptr requirement = new Poco::JSON::Object; - requirement->set("type", "assert-ref-snapshot-id"); - requirement->set("ref", "main"); - requirement->set("snapshot-id", parent_snapshot_id); - - Poco::JSON::Array::Ptr requirements = new Poco::JSON::Array; - requirements->add(requirement); - - request_body->set("requirements", requirements); - } - } - - { - Poco::JSON::Array::Ptr updates = new Poco::JSON::Array; - - { - Poco::JSON::Object::Ptr add_snapshot = new Poco::JSON::Object; - add_snapshot->set("action", "add-snapshot"); - add_snapshot->set("snapshot", new_snapshot); - updates->add(add_snapshot); - } - - { - Poco::JSON::Object::Ptr set_snapshot = new Poco::JSON::Object; - set_snapshot->set("action", "set-snapshot-ref"); - set_snapshot->set("ref-name", "main"); - set_snapshot->set("type", "branch"); - set_snapshot->set("snapshot-id", new_snapshot->getValue("snapshot-id")); - - updates->add(set_snapshot); - } - request_body->set("updates", updates); - } + // Throws DB::Exception(DATALAKE_DATABASE_ERROR) on malformed metadata (programming error). + auto request_body = buildUpdateMetadataRequestBody(namespace_name, table_name, new_snapshot); + if (!request_body) + return true; // nothing to commit try { sendRequest(endpoint, request_body); } - catch (const DB::HTTPException &) + catch (const DB::HTTPException & ex) { + LOG_WARNING(log, "Iceberg REST updateMetadata for {}.{} failed: {}", + namespace_name, table_name, ex.displayText()); return false; } return true; diff --git a/src/Databases/DataLake/RestCatalog.h b/src/Databases/DataLake/RestCatalog.h index ff951c961e90..217957bf2406 100644 --- a/src/Databases/DataLake/RestCatalog.h +++ b/src/Databases/DataLake/RestCatalog.h @@ -31,6 +31,16 @@ struct AccessToken } }; +/// Builds the JSON body for `POST .../namespaces/{ns}/tables/{table}` (Iceberg REST update). +/// +/// Returns `nullptr` when `new_snapshot` is null (nothing to commit). Throws +/// `DB::Exception(DATALAKE_DATABASE_ERROR)` with a specific message when the metadata +/// blob is malformed (e.g. missing `current-schema-id`, no schema object matching it). +Poco::JSON::Object::Ptr buildUpdateMetadataRequestBody( + const String & namespace_name, + const String & table_name, + Poco::JSON::Object::Ptr new_snapshot); + class RestCatalog : public ICatalog, public DB::WithContext { public: diff --git a/src/Databases/DataLake/tests/gtest_rest_catalog_update_metadata.cpp b/src/Databases/DataLake/tests/gtest_rest_catalog_update_metadata.cpp new file mode 100644 index 000000000000..09d1e29d288e --- /dev/null +++ b/src/Databases/DataLake/tests/gtest_rest_catalog_update_metadata.cpp @@ -0,0 +1,186 @@ +#include "config.h" + +#if USE_AVRO + +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace DB; + +namespace +{ +Poco::JSON::Object::Ptr findUpdateByAction(const Poco::JSON::Array::Ptr & updates, const std::string & action) +{ + for (unsigned int i = 0; i < updates->size(); ++i) + { + auto o = updates->getObject(i); + if (o->getValue("action") == action) + return o; + } + return nullptr; +} +} + +TEST(RestCatalogUpdateMetadataBody, NullSnapshotReturnsNull) +{ + auto body = DataLake::buildUpdateMetadataRequestBody("ns", "t", nullptr); + EXPECT_FALSE(body); +} + +TEST(RestCatalogUpdateMetadataBody, SchemaUpdateValid) +{ + Poco::JSON::Object::Ptr snapshot = new Poco::JSON::Object; + Poco::JSON::Array::Ptr schemas = new Poco::JSON::Array; + Poco::JSON::Object::Ptr schema = new Poco::JSON::Object; + schema->set(Iceberg::f_schema_id, 1); + schema->set(Iceberg::f_type, "struct"); + schema->set(Iceberg::f_fields, Poco::JSON::Array::Ptr(new Poco::JSON::Array)); + schemas->add(schema); + snapshot->set(Iceberg::f_schemas, schemas); + snapshot->set(Iceberg::f_current_schema_id, 1); + snapshot->set(Iceberg::f_last_column_id, 3); + + auto body = DataLake::buildUpdateMetadataRequestBody("my.ns", "tbl", snapshot); + ASSERT_TRUE(body); + + auto id = body->getObject("identifier"); + EXPECT_EQ(id->getValue("name"), "tbl"); + auto ns = id->getArray("namespace"); + ASSERT_EQ(ns->size(), 1u); + EXPECT_EQ(ns->getElement(0), "my.ns"); + + ASSERT_TRUE(body->has("requirements")); + auto req = body->getArray("requirements")->getObject(0); + EXPECT_EQ(req->getValue("type"), "assert-current-schema-id"); + EXPECT_EQ(req->getValue("current-schema-id"), 0); + + auto updates = body->getArray("updates"); + auto add_schema = findUpdateByAction(updates, "add-schema"); + ASSERT_TRUE(add_schema); + EXPECT_TRUE(add_schema->has("schema")); + EXPECT_EQ(add_schema->getValue("last-column-id"), 3); + + auto set_schema = findUpdateByAction(updates, "set-current-schema"); + ASSERT_TRUE(set_schema); + EXPECT_EQ(set_schema->getValue("schema-id"), 1); +} + +TEST(RestCatalogUpdateMetadataBody, SchemaUpdateCurrentIdZeroNoRequirement) +{ + Poco::JSON::Object::Ptr snapshot = new Poco::JSON::Object; + Poco::JSON::Array::Ptr schemas = new Poco::JSON::Array; + Poco::JSON::Object::Ptr schema = new Poco::JSON::Object; + schema->set(Iceberg::f_schema_id, 0); + schema->set(Iceberg::f_type, "struct"); + schema->set(Iceberg::f_fields, Poco::JSON::Array::Ptr(new Poco::JSON::Array)); + schemas->add(schema); + snapshot->set(Iceberg::f_schemas, schemas); + snapshot->set(Iceberg::f_current_schema_id, 0); + + auto body = DataLake::buildUpdateMetadataRequestBody("ns", "t", snapshot); + ASSERT_TRUE(body); + EXPECT_FALSE(body->has("requirements")); +} + +TEST(RestCatalogUpdateMetadataBody, SchemaUpdateBodyIsStringifiable) +{ + Poco::JSON::Object::Ptr snapshot = new Poco::JSON::Object; + Poco::JSON::Array::Ptr schemas = new Poco::JSON::Array; + Poco::JSON::Object::Ptr schema = new Poco::JSON::Object; + schema->set(Iceberg::f_schema_id, 1); + schema->set(Iceberg::f_type, "struct"); + schema->set(Iceberg::f_fields, Poco::JSON::Array::Ptr(new Poco::JSON::Array)); + schemas->add(schema); + snapshot->set(Iceberg::f_schemas, schemas); + snapshot->set(Iceberg::f_current_schema_id, 1); + + auto body = DataLake::buildUpdateMetadataRequestBody("ns", "t", snapshot); + ASSERT_TRUE(body); + + std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM + ASSERT_NO_THROW(body->stringify(oss)); + EXPECT_NE(oss.str().find("\"identifier-field-ids\""), std::string::npos); + EXPECT_NE(oss.str().find("\"add-schema\""), std::string::npos); +} + +TEST(RestCatalogUpdateMetadataBody, SchemaUpdateMissingCurrentSchemaIdThrows) +{ + Poco::JSON::Object::Ptr snapshot = new Poco::JSON::Object; + snapshot->set(Iceberg::f_schemas, Poco::JSON::Array::Ptr(new Poco::JSON::Array)); + + EXPECT_THROW(DataLake::buildUpdateMetadataRequestBody("ns", "t", snapshot), DB::Exception); +} + +TEST(RestCatalogUpdateMetadataBody, SchemaUpdateNoMatchingSchemaIdThrows) +{ + Poco::JSON::Object::Ptr snapshot = new Poco::JSON::Object; + Poco::JSON::Array::Ptr schemas = new Poco::JSON::Array; + Poco::JSON::Object::Ptr schema = new Poco::JSON::Object; + schema->set(Iceberg::f_schema_id, 1); + schema->set(Iceberg::f_type, "struct"); + schema->set(Iceberg::f_fields, Poco::JSON::Array::Ptr(new Poco::JSON::Array)); + schemas->add(schema); + snapshot->set(Iceberg::f_schemas, schemas); + snapshot->set(Iceberg::f_current_schema_id, 99); + + EXPECT_THROW(DataLake::buildUpdateMetadataRequestBody("ns", "t", snapshot), DB::Exception); +} + +TEST(RestCatalogUpdateMetadataBody, SnapshotUpdateWithParent) +{ + Poco::JSON::Object::Ptr snapshot = new Poco::JSON::Object; + snapshot->set("snapshot-id", static_cast(12345)); + snapshot->set("parent-snapshot-id", static_cast(12344)); + snapshot->set(Iceberg::f_timestamp_ms, static_cast(1700000000000LL)); + + auto body = DataLake::buildUpdateMetadataRequestBody("ns", "t", snapshot); + ASSERT_TRUE(body); + + ASSERT_TRUE(body->has("requirements")); + auto req = body->getArray("requirements")->getObject(0); + EXPECT_EQ(req->getValue("type"), "assert-ref-snapshot-id"); + EXPECT_EQ(req->getValue("ref"), "main"); + EXPECT_EQ(req->getValue("snapshot-id"), 12344); + + auto updates = body->getArray("updates"); + auto add_snap = findUpdateByAction(updates, "add-snapshot"); + ASSERT_TRUE(add_snap); + EXPECT_EQ(add_snap->getObject("snapshot")->getValue("snapshot-id"), 12345); + + auto set_ref = findUpdateByAction(updates, "set-snapshot-ref"); + ASSERT_TRUE(set_ref); + EXPECT_EQ(set_ref->getValue("snapshot-id"), 12345); +} + +TEST(RestCatalogUpdateMetadataBody, SnapshotUpdateWithoutParent) +{ + Poco::JSON::Object::Ptr snapshot = new Poco::JSON::Object; + snapshot->set("snapshot-id", static_cast(999)); + + auto body = DataLake::buildUpdateMetadataRequestBody("ns", "t", snapshot); + ASSERT_TRUE(body); + EXPECT_FALSE(body->has("requirements")); + + auto updates = body->getArray("updates"); + ASSERT_TRUE(findUpdateByAction(updates, "add-snapshot")); + ASSERT_TRUE(findUpdateByAction(updates, "set-snapshot-ref")); +} + +TEST(RestCatalogUpdateMetadataBody, SnapshotUpdateParentMinusOneNoRequirement) +{ + Poco::JSON::Object::Ptr snapshot = new Poco::JSON::Object; + snapshot->set("snapshot-id", static_cast(1)); + snapshot->set("parent-snapshot-id", static_cast(-1)); + + auto body = DataLake::buildUpdateMetadataRequestBody("ns", "t", snapshot); + ASSERT_TRUE(body); + EXPECT_FALSE(body->has("requirements")); +} + +#endif diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index 7ae6fdbd682b..d3f4d44e78b2 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -176,15 +176,15 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl void checkAlterIsPossible(const AlterCommands & commands) override { - assertInitializedDL(); - current_metadata->checkAlterIsPossible(commands); + if(current_metadata) + current_metadata->checkAlterIsPossible(commands); } - void alter(const AlterCommands & params, ContextPtr context) override + void alter(const AlterCommands & params, ContextPtr context, + const StorageID & storage_id, std::shared_ptr catalog) override { assertInitializedDL(); - current_metadata->alter(params, context); - + current_metadata->alter(params, shared_from_this(), context, storage_id, catalog); } ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly, StorageObjectStorageConfiguration::CredentialsConfigurationCallback refresh_credentials_callback) override @@ -478,7 +478,6 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl void assertInitializedDL() const { - BaseStorageConfiguration::assertInitialized(); if (!current_metadata) throw Exception(ErrorCodes::LOGICAL_ERROR, "Metadata is not initialized"); } @@ -708,7 +707,9 @@ class StorageIcebergConfiguration : public StorageObjectStorageConfiguration, pu void checkAlterIsPossible(const AlterCommands & commands) override { getImpl().checkAlterIsPossible(commands); } - void alter(const AlterCommands & params, ContextPtr context) override { getImpl().alter(params, context); } + void alter(const AlterCommands & params, ContextPtr context, + const StorageID & storage_id, std::shared_ptr catalog) override + { getImpl().alter(params, context, storage_id, catalog); } const DataLakeStorageSettings & getDataLakeSettings() const override { return getImpl().getDataLakeSettings(); } diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index d7423dd115d6..7deb3fc6ba6c 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -255,7 +255,12 @@ class IDataLakeMetadata : boost::noncopyable virtual void addDeleteTransformers(ObjectInfoPtr, QueryPipelineBuilder &, const std::optional &, FormatParserSharedResourcesPtr, ContextPtr) const { } virtual void checkAlterIsPossible(const AlterCommands & /*commands*/) { throwNotImplemented("alter"); } - virtual void alter(const AlterCommands & /*params*/, ContextPtr /*context*/) { throwNotImplemented("alter"); } + virtual void alter( + const AlterCommands & /*params*/, + StorageObjectStorageConfigurationPtr /*configuration*/, + ContextPtr /*context*/, + const StorageID & /*storage_id*/, + std::shared_ptr /*catalog*/) { throwNotImplemented("alter"); } virtual Pipe executeCommand( const String & command_name, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index f6c7d4e86999..b6d028c515ea 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -685,7 +685,12 @@ void IcebergMetadata::checkAlterIsPossible(const AlterCommands & commands) } } -void IcebergMetadata::alter(const AlterCommands & params, ContextPtr context) +void IcebergMetadata::alter( + const AlterCommands & params, + StorageObjectStorageConfigurationPtr configuration, + ContextPtr context, + const StorageID & storage_id, + std::shared_ptr catalog) { if (!context->getSettingsRef()[Setting::allow_insert_into_iceberg].value) { @@ -695,7 +700,11 @@ void IcebergMetadata::alter(const AlterCommands & params, ContextPtr context) "To allow its usage, enable setting allow_insert_into_iceberg"); } - Iceberg::alter(params, context, object_storage, data_lake_settings, persistent_components, write_format); + Iceberg::alter( + params, context, object_storage, data_lake_settings, persistent_components, write_format, + storage_id, catalog, + configuration ? configuration->getTypeName() : "", + configuration ? configuration->getNamespace() : ""); } static Pipe expireSnapshotsResultToPipe(const Iceberg::ExpireSnapshotsResult & result) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index e8a8e9a0c6ac..590fef74a1bc 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -162,7 +162,12 @@ class IcebergMetadata : public IDataLakeMetadata void modifyFormatSettings(FormatSettings & format_settings, const Context & local_context) const override; void addDeleteTransformers(ObjectInfoPtr object_info, QueryPipelineBuilder & builder, const std::optional & format_settings, FormatParserSharedResourcesPtr parser_shared_resources, ContextPtr local_context) const override; void checkAlterIsPossible(const AlterCommands & commands) override; - void alter(const AlterCommands & params, ContextPtr context) override; + void alter( + const AlterCommands & params, + StorageObjectStorageConfigurationPtr configuration, + ContextPtr context, + const StorageID & storage_id, + std::shared_ptr catalog) override; Pipe executeCommand( const String & command_name, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp index e9ee547b1b44..807b4b2811ec 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp @@ -39,6 +39,7 @@ namespace DB::ErrorCodes { extern const int BAD_ARGUMENTS; +extern const int DATALAKE_DATABASE_ERROR; extern const int LOGICAL_ERROR; extern const int LIMIT_EXCEEDED; } @@ -714,13 +715,16 @@ void alter( ObjectStoragePtr object_storage, const DataLakeStorageSettings & data_lake_settings, PersistentTableComponents & persistent_table_components, - const String & write_format) + const String & write_format, + StorageID storage_id, + std::shared_ptr catalog, + const String & blob_storage_type_name, + const String & blob_storage_namespace_name) { if (params.size() != 1) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Params with size 1 is not supported"); + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Iceberg alter supports exactly one command at a time, got {}", params.size()); - size_t i = 0; - while (i++ < MAX_TRANSACTION_RETRIES) + for (size_t i = 0; i < MAX_TRANSACTION_RETRIES; ++i) { FileNamesGenerator filename_generator( persistent_table_components.table_path, persistent_table_components.table_path, false, CompressionMethod::None, write_format); @@ -769,21 +773,52 @@ void alter( auto [metadata_name, storage_metadata_name] = filename_generator.generateMetadataName(); + LOG_INFO(log, "Iceberg alter: writing metadata to '{}', latest version was {}", storage_metadata_name, last_version); + auto hint = filename_generator.generateVersionHint(); - if (writeMetadataFileAndVersionHint( - storage_metadata_name, - json_representation, - hint.path_in_storage, - storage_metadata_name, - object_storage, - context, - compression_method, - data_lake_settings[DataLakeStorageSetting::iceberg_use_version_hint])) - break; + const bool wrote_ok = writeMetadataFileAndVersionHint( + storage_metadata_name, + json_representation, + hint.path_in_storage, + storage_metadata_name, + object_storage, + context, + compression_method, + data_lake_settings[DataLakeStorageSetting::iceberg_use_version_hint]); + + const bool file_exists = wrote_ok || object_storage->exists(StoredObject(storage_metadata_name)); + + if (!wrote_ok) + { + LOG_WARNING(log, "Iceberg alter: failed to write metadata to '{}' (attempt {}, file exists: {})", + storage_metadata_name, i + 1, file_exists); + if (!file_exists) + continue; + } + + if (catalog) + { + String catalog_filename = metadata_name; + if (!catalog_filename.starts_with(blob_storage_type_name)) + catalog_filename = blob_storage_type_name + "://" + blob_storage_namespace_name + "/" + metadata_name; + + const auto & [namespace_name, table_name] = DataLake::parseTableName(storage_id.getTableName()); + if (!catalog->updateMetadata(namespace_name, table_name, catalog_filename, metadata)) + { + if (wrote_ok) + throw Exception( + ErrorCodes::DATALAKE_DATABASE_ERROR, + "Iceberg alter: catalog commit failed for '{}' after metadata file was written successfully", + catalog_filename); + continue; + } + if (!wrote_ok) + LOG_INFO(log, "Iceberg alter: adopted existing metadata file '{}' and updated catalog", storage_metadata_name); + } + return; } - if (i == MAX_TRANSACTION_RETRIES) - throw Exception(ErrorCodes::LIMIT_EXCEEDED, "Too many unsuccessed retries to alter iceberg table"); + throw Exception(ErrorCodes::LIMIT_EXCEEDED, "Too many unsuccessful retries to alter iceberg table"); } /// Table-level snapshot retention policy read from Iceberg table properties. diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.h index f46ec2823509..8b0401565397 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.h @@ -41,7 +41,11 @@ void alter( ObjectStoragePtr object_storage, const DataLakeStorageSettings & data_lake_settings, PersistentTableComponents & persistent_table_components, - const String & write_format); + const String & write_format, + StorageID storage_id, + std::shared_ptr catalog, + const String & blob_storage_type_name, + const String & blob_storage_namespace_name); ExpireSnapshotsResult expireSnapshots( const ExpireSnapshotsOptions & options, diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 18caf93c588f..bec8d29da2f1 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -910,14 +910,17 @@ Pipe StorageObjectStorage::executeCommand(const String & command_name, const AST void StorageObjectStorage::alter(const AlterCommands & params, ContextPtr context, AlterLockHolder & /*alter_lock_holder*/) { + configuration->update(object_storage, context); + StorageInMemoryMetadata new_metadata = getInMemoryMetadata(); params.apply(new_metadata, context); - configuration->alter(params, context); + configuration->alter(params, context, getStorageID(), catalog); + + auto database = DatabaseCatalog::instance().getDatabase(storage_id.database_name); + if (!database->isDatalakeCatalog()) + database->alterTable(context, storage_id, new_metadata, /*validate_new_create_query=*/true); - DatabaseCatalog::instance() - .getDatabase(storage_id.database_name) - ->alterTable(context, storage_id, new_metadata, /*validate_new_create_query=*/true); setInMemoryMetadata(new_metadata); } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h index 91cd56571906..308d90934bfd 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h @@ -258,7 +258,8 @@ class StorageObjectStorageConfiguration } } - virtual void alter(const AlterCommands & /*params*/, ContextPtr /*context*/) {} + virtual void alter(const AlterCommands & /*params*/, ContextPtr /*context*/, + const StorageID & /*storage_id*/, std::shared_ptr /*catalog*/) {} virtual const DataLakeStorageSettings & getDataLakeSettings() const { diff --git a/tests/integration/test_storage_iceberg_no_spark/test_writes_add_column.py b/tests/integration/test_storage_iceberg_no_spark/test_writes_add_column.py new file mode 100644 index 000000000000..630cafdf3a06 --- /dev/null +++ b/tests/integration/test_storage_iceberg_no_spark/test_writes_add_column.py @@ -0,0 +1,72 @@ +import pytest + +from helpers.iceberg_utils import ( + create_iceberg_table, + get_uuid_str, +) + +INSERT_SETTINGS = {"allow_insert_into_iceberg": 1} + + +@pytest.mark.parametrize("format_version", [1, 2]) +@pytest.mark.parametrize("storage_type", ["local", "s3"]) +def test_add_column_basic(started_cluster_iceberg_no_spark, format_version, storage_type): + """ADD COLUMN (nullable): existing rows read with NULL in the new column; new inserts can set it.""" + instance = started_cluster_iceberg_no_spark.instances["node1"] + TABLE_NAME = "test_add_column_basic_" + storage_type + "_" + get_uuid_str() + + create_iceberg_table( + storage_type, + instance, + TABLE_NAME, + started_cluster_iceberg_no_spark, + "(id Int32, value Nullable(String))", + format_version, + ) + + instance.query(f"INSERT INTO {TABLE_NAME} VALUES (1, 'hello'), (2, 'world');", settings=INSERT_SETTINGS) + assert instance.query(f"SELECT id, value FROM {TABLE_NAME} ORDER BY id") == "1\thello\n2\tworld\n" + + instance.query(f"ALTER TABLE {TABLE_NAME} ADD COLUMN extra Nullable(Int32);", settings=INSERT_SETTINGS) + + assert instance.query(f"SELECT id, value, extra FROM {TABLE_NAME} ORDER BY id") == ( + "1\thello\t\\N\n2\tworld\t\\N\n" + ) + + instance.query(f"INSERT INTO {TABLE_NAME} VALUES (3, 'foo', 7);", settings=INSERT_SETTINGS) + assert instance.query(f"SELECT id, value, extra FROM {TABLE_NAME} ORDER BY id") == ( + "1\thello\t\\N\n2\tworld\t\\N\n3\tfoo\t7\n" + ) + + +@pytest.mark.parametrize("format_version", [1, 2]) +@pytest.mark.parametrize("storage_type", ["local", "s3"]) +def test_add_column_errors(started_cluster_iceberg_no_spark, format_version, storage_type): + """Non-nullable ADD COLUMN and duplicate name must fail; schema unchanged.""" + instance = started_cluster_iceberg_no_spark.instances["node1"] + TABLE_NAME = "test_add_column_errors_" + storage_type + "_" + get_uuid_str() + + create_iceberg_table( + storage_type, + instance, + TABLE_NAME, + started_cluster_iceberg_no_spark, + "(id Int32, value Nullable(String))", + format_version, + ) + + error = instance.query_and_get_error( + f"ALTER TABLE {TABLE_NAME} ADD COLUMN bad Int32;", + settings=INSERT_SETTINGS, + ) + assert "non-nullable" in error.lower() or "doesn't allow" in error.lower() + + error = instance.query_and_get_error( + f"ALTER TABLE {TABLE_NAME} ADD COLUMN value Nullable(Int32);", + settings=INSERT_SETTINGS, + ) + assert "DUPLICATE_COLUMN" in error or "already exists" in error + + assert instance.query( + f"SELECT name FROM system.columns WHERE database = currentDatabase() AND table = '{TABLE_NAME}' ORDER BY name" + ) == "id\nvalue\n" diff --git a/tests/integration/test_storage_iceberg_no_spark/test_writes_drop_column.py b/tests/integration/test_storage_iceberg_no_spark/test_writes_drop_column.py new file mode 100644 index 000000000000..f29f8903e2c6 --- /dev/null +++ b/tests/integration/test_storage_iceberg_no_spark/test_writes_drop_column.py @@ -0,0 +1,62 @@ +import pytest + +from helpers.iceberg_utils import ( + create_iceberg_table, + get_uuid_str, +) + +INSERT_SETTINGS = {"allow_insert_into_iceberg": 1} + + +@pytest.mark.parametrize("format_version", [1, 2]) +@pytest.mark.parametrize("storage_type", ["local", "s3"]) +def test_drop_column_basic(started_cluster_iceberg_no_spark, format_version, storage_type): + """DROP COLUMN removes the column from reads and inserts; remaining columns unchanged.""" + instance = started_cluster_iceberg_no_spark.instances["node1"] + TABLE_NAME = "test_drop_column_basic_" + storage_type + "_" + get_uuid_str() + + create_iceberg_table( + storage_type, + instance, + TABLE_NAME, + started_cluster_iceberg_no_spark, + "(id Int32, value Nullable(String))", + format_version, + ) + + instance.query(f"INSERT INTO {TABLE_NAME} VALUES (1, 'hello'), (2, 'world');", settings=INSERT_SETTINGS) + assert instance.query(f"SELECT id, value FROM {TABLE_NAME} ORDER BY id") == "1\thello\n2\tworld\n" + + instance.query(f"ALTER TABLE {TABLE_NAME} DROP COLUMN value;", settings=INSERT_SETTINGS) + + assert instance.query(f"SELECT id FROM {TABLE_NAME} ORDER BY id") == "1\n2\n" + + instance.query(f"INSERT INTO {TABLE_NAME} VALUES (3);", settings=INSERT_SETTINGS) + assert instance.query(f"SELECT id FROM {TABLE_NAME} ORDER BY id") == "1\n2\n3\n" + + +@pytest.mark.parametrize("format_version", [1, 2]) +@pytest.mark.parametrize("storage_type", ["local", "s3"]) +def test_drop_column_errors(started_cluster_iceberg_no_spark, format_version, storage_type): + """Dropping a non-existent column must fail; table structure unchanged.""" + instance = started_cluster_iceberg_no_spark.instances["node1"] + TABLE_NAME = "test_drop_column_errors_" + storage_type + "_" + get_uuid_str() + + create_iceberg_table( + storage_type, + instance, + TABLE_NAME, + started_cluster_iceberg_no_spark, + "(id Int32, value Nullable(String))", + format_version, + ) + + error = instance.query_and_get_error( + f"ALTER TABLE {TABLE_NAME} DROP COLUMN nonexistent;", + settings=INSERT_SETTINGS, + ) + assert "nonexistent" in error + + assert instance.query( + f"SELECT name FROM system.columns WHERE database = currentDatabase() AND table = '{TABLE_NAME}' ORDER BY name" + ) == "id\nvalue\n" diff --git a/tests/integration/test_storage_iceberg_no_spark/test_writes_modify_column.py b/tests/integration/test_storage_iceberg_no_spark/test_writes_modify_column.py new file mode 100644 index 000000000000..21e871b359ef --- /dev/null +++ b/tests/integration/test_storage_iceberg_no_spark/test_writes_modify_column.py @@ -0,0 +1,70 @@ +import pytest + +from helpers.iceberg_utils import ( + create_iceberg_table, + get_uuid_str, +) + +INSERT_SETTINGS = {"allow_insert_into_iceberg": 1} + + +@pytest.mark.parametrize("format_version", [1, 2]) +@pytest.mark.parametrize("storage_type", ["local", "s3"]) +def test_modify_column_basic(started_cluster_iceberg_no_spark, format_version, storage_type): + """Widen Int32 to Int64 (Iceberg int→long); existing and new rows read correctly.""" + instance = started_cluster_iceberg_no_spark.instances["node1"] + TABLE_NAME = "test_modify_column_basic_" + storage_type + "_" + get_uuid_str() + + create_iceberg_table( + storage_type, + instance, + TABLE_NAME, + started_cluster_iceberg_no_spark, + "(id Int32, value Nullable(String))", + format_version, + ) + + instance.query(f"INSERT INTO {TABLE_NAME} VALUES (1, 'hello'), (2, 'world');", settings=INSERT_SETTINGS) + assert instance.query(f"SELECT id, value FROM {TABLE_NAME} ORDER BY id") == "1\thello\n2\tworld\n" + + instance.query(f"ALTER TABLE {TABLE_NAME} MODIFY COLUMN id Int64;", settings=INSERT_SETTINGS) + + assert instance.query(f"SELECT id, value FROM {TABLE_NAME} ORDER BY id") == "1\thello\n2\tworld\n" + + instance.query(f"INSERT INTO {TABLE_NAME} VALUES (3, 'foo');", settings=INSERT_SETTINGS) + assert instance.query(f"SELECT id, value FROM {TABLE_NAME} ORDER BY id") == "1\thello\n2\tworld\n3\tfoo\n" + + +@pytest.mark.parametrize("format_version", [1, 2]) +@pytest.mark.parametrize("storage_type", ["local", "s3"]) +def test_modify_column_errors(started_cluster_iceberg_no_spark, format_version, storage_type): + """Invalid schema evolution (e.g. String→Int64) must fail; columns unchanged.""" + instance = started_cluster_iceberg_no_spark.instances["node1"] + TABLE_NAME = "test_modify_column_errors_" + storage_type + "_" + get_uuid_str() + + create_iceberg_table( + storage_type, + instance, + TABLE_NAME, + started_cluster_iceberg_no_spark, + "(id Int32, value Nullable(String))", + format_version, + ) + + error = instance.query_and_get_error( + f"ALTER TABLE {TABLE_NAME} MODIFY COLUMN value Int64;", + settings=INSERT_SETTINGS, + ) + el = error.lower() + # String→integer: mismatched Poco::Var kinds in checkValidSchemaEvolution → BadCastException + assert ( + "bad cast" in el + or "can not convert" in el + or "cannot convert" in el + or "schema evolution" in el + or "doesn't allow" in el + ) + + assert instance.query( + f"SELECT name FROM system.columns WHERE database = currentDatabase() AND table = '{TABLE_NAME}' ORDER BY name" + ) == "id\nvalue\n"