From 4a1977dda2394ff603daa78a3b17e033bb03314d Mon Sep 17 00:00:00 2001 From: Anupam Yadav Date: Tue, 21 Apr 2026 20:54:36 +0000 Subject: [PATCH] Kafka Connect: Handle AccessDeniedException in auto-create (#13758) GlueCatalog.createNamespace() did not catch AccessDeniedException from the AWS Glue SDK. When a user lacks glue:CreateDatabase permission, Glue throws AccessDeniedException (HTTP 400) which propagated uncaught, crashing the Kafka Connect connector even when the database already exists. This fix: - Catches AccessDeniedException in GlueCatalog.createNamespace() and wraps it as ForbiddenException, consistent with how GlueTableOperations already handles this exception. - Adds NotAuthorizedException to the catch block in IcebergWriterFactory.createNamespaceIfNotExist() for defense-in-depth against auth exceptions from any catalog implementation. Closes #13758 --- .../apache/iceberg/aws/glue/GlueCatalog.java | 7 +++++ .../iceberg/aws/glue/TestGlueCatalog.java | 14 +++++++++ .../connect/data/IcebergWriterFactory.java | 3 +- .../data/TestIcebergWriterFactory.java | 29 +++++++++++++++++++ 4 files changed, 52 insertions(+), 1 deletion(-) diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java index 47807a2b9f37..6518236e5b93 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java @@ -41,6 +41,7 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; @@ -61,6 +62,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.AccessDeniedException; import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest; import software.amazon.awssdk.services.glue.model.CreateTableRequest; import software.amazon.awssdk.services.glue.model.Database; @@ -459,6 +461,11 @@ public void createNamespace(Namespace namespace, Map metadata) { } catch (software.amazon.awssdk.services.glue.model.AlreadyExistsException e) { throw new AlreadyExistsException( "Cannot create namespace %s because it already exists in Glue", namespace); + } catch (AccessDeniedException e) { + throw new ForbiddenException( + e, + "Cannot create namespace %s because Glue cannot access the requested resources", + namespace); } } diff --git a/aws/src/test/java/org/apache/iceberg/aws/glue/TestGlueCatalog.java b/aws/src/test/java/org/apache/iceberg/aws/glue/TestGlueCatalog.java index 2042948eb3c9..02f89e081fd4 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/glue/TestGlueCatalog.java +++ b/aws/src/test/java/org/apache/iceberg/aws/glue/TestGlueCatalog.java @@ -44,6 +44,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.AccessDeniedException; import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest; import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse; import software.amazon.awssdk.services.glue.model.CreateTableRequest; @@ -427,6 +428,19 @@ public void testCreateNamespaceBadName() { } } + @Test + public void testCreateNamespaceAccessDenied() { + Mockito.doThrow( + AccessDeniedException.builder() + .message("User is not authorized to perform: glue:CreateDatabase") + .build()) + .when(glue) + .createDatabase(Mockito.any(CreateDatabaseRequest.class)); + assertThatThrownBy(() -> glueCatalog.createNamespace(Namespace.of("db"))) + .isInstanceOf(org.apache.iceberg.exceptions.ForbiddenException.class) + .hasMessageContaining("cannot access the requested resources"); + } + @Test public void testListAllNamespaces() { Mockito.doReturn( diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java index afb68f170136..c05d7079cfb7 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java @@ -33,6 +33,7 @@ import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.NotAuthorizedException; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types.StructType; @@ -139,7 +140,7 @@ static void createNamespaceIfNotExist(Catalog catalog, Namespace identifierNames Namespace namespace = Namespace.of(Arrays.copyOfRange(levels, 0, index + 1)); try { ((SupportsNamespaces) catalog).createNamespace(namespace); - } catch (AlreadyExistsException | ForbiddenException ex) { + } catch (AlreadyExistsException | ForbiddenException | NotAuthorizedException ex) { // Ignoring the error as forcefully creating the namespace even if it exists // to avoid double namespaceExists() check. } diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestIcebergWriterFactory.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestIcebergWriterFactory.java index 2b18d4b24a1f..0ace3ccc9119 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestIcebergWriterFactory.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestIcebergWriterFactory.java @@ -19,7 +19,9 @@ package org.apache.iceberg.connect.data; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -36,12 +38,15 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.connect.IcebergSinkConfig; import org.apache.iceberg.connect.TableSinkConfig; +import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.NotAuthorizedException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Types.LongType; import org.apache.iceberg.types.Types.StringType; import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; @@ -96,4 +101,28 @@ public void testAutoCreateTable(boolean partitioned) { assertThat(capturedArguments.get(1)).isEqualTo(Namespace.of("foo1", "foo2")); assertThat(capturedArguments.get(2)).isEqualTo(Namespace.of("foo1", "foo2", "foo3")); } + + @Test + public void testCreateNamespaceHandlesForbiddenException() { + Catalog catalog = mock(Catalog.class, withSettings().extraInterfaces(SupportsNamespaces.class)); + doThrow(new ForbiddenException("access denied")) + .when((SupportsNamespaces) catalog) + .createNamespace(any()); + + assertThatNoException() + .isThrownBy( + () -> IcebergWriterFactory.createNamespaceIfNotExist(catalog, Namespace.of("db"))); + } + + @Test + public void testCreateNamespaceHandlesNotAuthorizedException() { + Catalog catalog = mock(Catalog.class, withSettings().extraInterfaces(SupportsNamespaces.class)); + doThrow(new NotAuthorizedException("not authorized")) + .when((SupportsNamespaces) catalog) + .createNamespace(any()); + + assertThatNoException() + .isThrownBy( + () -> IcebergWriterFactory.createNamespaceIfNotExist(catalog, Namespace.of("db"))); + } }