diff --git a/build-logic/conventions/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts b/build-logic/conventions/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts index a7013d9b5b2b6..ff9a971d3dba5 100644 --- a/build-logic/conventions/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts +++ b/build-logic/conventions/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts @@ -55,7 +55,6 @@ tasks.named("shadowJ include(dependency("commons-.*:.*")) include(dependency("io.airlift:.*")) include(dependency("io.grpc:.*")) - include(dependency("io.netty.incubator:.*")) include(dependency("io.netty:.*")) include(dependency("io.opencensus:.*")) include(dependency("io.perfmark:.*")) diff --git a/build-logic/conventions/src/main/kotlin/pulsar.java-conventions.gradle.kts b/build-logic/conventions/src/main/kotlin/pulsar.java-conventions.gradle.kts index bb0fc131a2981..a9090dd930cf9 100644 --- a/build-logic/conventions/src/main/kotlin/pulsar.java-conventions.gradle.kts +++ b/build-logic/conventions/src/main/kotlin/pulsar.java-conventions.gradle.kts @@ -193,6 +193,11 @@ tasks.withType().configureEach { "-Dpulsar.allocator.exit_on_oom=false", "-Dpulsar.allocator.out_of_memory_policy=FallbackToHeap", "-Dpulsar.test.preventExit=true", + // Force IPv4 to match Pulsar's runtime scripts (bin/pulsar, bin/bookkeeper). BookKeeper's + // BookieId validation rejects IPv6 zone identifiers (e.g. fe80::1%lo0), so on hosts where the + // loopback interface resolves to an IPv6 link-local address (notably macOS) bookies bound to + // loopback would otherwise fail to start. + "-Djava.net.preferIPv4Stack=true", ) } diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index afe1c3d5c0177..cfe01dbde4dc5 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -262,7 +262,8 @@ The Apache Software License, Version 2.0 - com.fasterxml.jackson.module-jackson-module-parameter-names-2.21.3.jar * Caffeine -- com.github.ben-manes.caffeine-caffeine-3.2.4.jar * Conscrypt -- org.conscrypt-conscrypt-openjdk-uber-2.5.2.jar - * Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-2.59.2.jar + * LMAX Disruptor -- com.lmax-disruptor-4.0.0.jar + * Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-2.63.2.jar * Bitbucket -- org.bitbucket.b_c-jose4j-0.9.6.jar * Gson - com.google.code.gson-gson-2.13.2.jar @@ -277,7 +278,7 @@ The Apache Software License, Version 2.0 - io.swagger-swagger-annotations-1.6.2.jar - io.swagger-swagger-core-1.6.2.jar - io.swagger-swagger-models-1.6.2.jar - * slog -- io.github.merlimat.slog-slog-0.9.7.jar + * slog -- io.github.merlimat.slog-slog-0.9.9.jar * DataSketches - com.yahoo.datasketches-memory-0.8.3.jar - com.yahoo.datasketches-sketches-core-0.8.3.jar @@ -293,26 +294,27 @@ The Apache Software License, Version 2.0 - org.apache.commons-commons-lang3-3.20.0.jar - org.apache.commons-commons-text-1.14.0.jar * Netty - - io.netty-netty-buffer-4.1.134.Final.jar - - io.netty-netty-codec-4.1.134.Final.jar - - io.netty-netty-codec-dns-4.1.134.Final.jar - - io.netty-netty-codec-http-4.1.134.Final.jar - - io.netty-netty-codec-http2-4.1.134.Final.jar - - io.netty-netty-codec-socks-4.1.134.Final.jar - - io.netty-netty-codec-haproxy-4.1.134.Final.jar - - io.netty-netty-common-4.1.134.Final.jar - - io.netty-netty-handler-4.1.134.Final.jar - - io.netty-netty-handler-proxy-4.1.134.Final.jar - - io.netty-netty-resolver-4.1.134.Final.jar - - io.netty-netty-resolver-dns-4.1.134.Final.jar - - io.netty-netty-resolver-dns-classes-macos-4.1.134.Final.jar - - io.netty-netty-resolver-dns-native-macos-4.1.134.Final-osx-aarch_64.jar - - io.netty-netty-resolver-dns-native-macos-4.1.134.Final-osx-x86_64.jar - - io.netty-netty-transport-4.1.134.Final.jar - - io.netty-netty-transport-classes-epoll-4.1.134.Final.jar - - io.netty-netty-transport-native-epoll-4.1.134.Final-linux-aarch_64.jar - - io.netty-netty-transport-native-epoll-4.1.134.Final-linux-x86_64.jar - - io.netty-netty-transport-native-unix-common-4.1.134.Final.jar + - io.netty-netty-buffer-4.2.14.Final.jar + - io.netty-netty-codec-base-4.2.14.Final.jar + - io.netty-netty-codec-compression-4.2.14.Final.jar + - io.netty-netty-codec-dns-4.2.14.Final.jar + - io.netty-netty-codec-http-4.2.14.Final.jar + - io.netty-netty-codec-http2-4.2.14.Final.jar + - io.netty-netty-codec-socks-4.2.14.Final.jar + - io.netty-netty-codec-haproxy-4.2.14.Final.jar + - io.netty-netty-common-4.2.14.Final.jar + - io.netty-netty-handler-4.2.14.Final.jar + - io.netty-netty-handler-proxy-4.2.14.Final.jar + - io.netty-netty-resolver-4.2.14.Final.jar + - io.netty-netty-resolver-dns-4.2.14.Final.jar + - io.netty-netty-resolver-dns-classes-macos-4.2.14.Final.jar + - io.netty-netty-resolver-dns-native-macos-4.2.14.Final-osx-aarch_64.jar + - io.netty-netty-resolver-dns-native-macos-4.2.14.Final-osx-x86_64.jar + - io.netty-netty-transport-4.2.14.Final.jar + - io.netty-netty-transport-classes-epoll-4.2.14.Final.jar + - io.netty-netty-transport-native-epoll-4.2.14.Final-linux-aarch_64.jar + - io.netty-netty-transport-native-epoll-4.2.14.Final-linux-x86_64.jar + - io.netty-netty-transport-native-unix-common-4.2.14.Final.jar - io.netty-netty-tcnative-boringssl-static-2.0.77.Final.jar - io.netty-netty-tcnative-boringssl-static-2.0.77.Final-linux-aarch_64.jar - io.netty-netty-tcnative-boringssl-static-2.0.77.Final-linux-x86_64.jar @@ -320,9 +322,9 @@ The Apache Software License, Version 2.0 - io.netty-netty-tcnative-boringssl-static-2.0.77.Final-osx-x86_64.jar - io.netty-netty-tcnative-boringssl-static-2.0.77.Final-windows-x86_64.jar - io.netty-netty-tcnative-classes-2.0.77.Final.jar - - io.netty.incubator-netty-incubator-transport-classes-io_uring-0.0.26.Final.jar - - io.netty.incubator-netty-incubator-transport-native-io_uring-0.0.26.Final-linux-x86_64.jar - - io.netty.incubator-netty-incubator-transport-native-io_uring-0.0.26.Final-linux-aarch_64.jar + - io.netty-netty-transport-classes-io_uring-4.2.14.Final.jar + - io.netty-netty-transport-native-io_uring-4.2.14.Final-linux-x86_64.jar + - io.netty-netty-transport-native-io_uring-4.2.14.Final-linux-aarch_64.jar * Prometheus client - io.prometheus.jmx-collector-0.16.1.jar - io.prometheus-simpleclient-0.16.0.jar @@ -356,32 +358,31 @@ The Apache Software License, Version 2.0 - net.java.dev.jna-jna-jpms-5.18.1.jar - net.java.dev.jna-jna-platform-jpms-5.18.1.jar * BookKeeper - - org.apache.bookkeeper-bookkeeper-common-4.17.3.jar - - org.apache.bookkeeper-bookkeeper-common-allocator-4.17.3.jar - - org.apache.bookkeeper-bookkeeper-proto-4.17.3.jar - - org.apache.bookkeeper-bookkeeper-server-4.17.3.jar - - org.apache.bookkeeper-bookkeeper-tools-framework-4.17.3.jar - - org.apache.bookkeeper-circe-checksum-4.17.3.jar - - org.apache.bookkeeper-cpu-affinity-4.17.3.jar - - org.apache.bookkeeper-statelib-4.17.3.jar - - org.apache.bookkeeper-stream-storage-api-4.17.3.jar - - org.apache.bookkeeper-stream-storage-common-4.17.3.jar - - org.apache.bookkeeper-stream-storage-java-client-4.17.3.jar - - org.apache.bookkeeper-stream-storage-java-client-base-4.17.3.jar - - org.apache.bookkeeper-stream-storage-proto-4.17.3.jar - - org.apache.bookkeeper-stream-storage-server-4.17.3.jar - - org.apache.bookkeeper-stream-storage-service-api-4.17.3.jar - - org.apache.bookkeeper-stream-storage-service-impl-4.17.3.jar - - org.apache.bookkeeper.http-http-server-4.17.3.jar - - org.apache.bookkeeper.http-vertx-http-server-4.17.3.jar - - org.apache.bookkeeper.stats-bookkeeper-stats-api-4.17.3.jar - - org.apache.distributedlog-distributedlog-common-4.17.3.jar - - org.apache.distributedlog-distributedlog-core-4.17.3-tests.jar - - org.apache.distributedlog-distributedlog-core-4.17.3.jar - - org.apache.distributedlog-distributedlog-protocol-4.17.3.jar - - org.apache.bookkeeper-bookkeeper-slogger-api-4.17.3.jar - - org.apache.bookkeeper-bookkeeper-slogger-slf4j-4.17.3.jar - - org.apache.bookkeeper-native-io-4.17.3.jar + - org.apache.bookkeeper-bookkeeper-common-4.18.0.jar + - org.apache.bookkeeper-bookkeeper-common-allocator-4.18.0.jar + - org.apache.bookkeeper-bookkeeper-proto-4.18.0.jar + - org.apache.bookkeeper-bookkeeper-server-4.18.0.jar + - org.apache.bookkeeper-bookkeeper-tools-framework-4.18.0.jar + - org.apache.bookkeeper-circe-checksum-4.18.0.jar + - org.apache.bookkeeper-cpu-affinity-4.18.0.jar + - org.apache.bookkeeper-statelib-4.18.0.jar + - org.apache.bookkeeper-stream-storage-api-4.18.0.jar + - org.apache.bookkeeper-stream-storage-common-4.18.0.jar + - org.apache.bookkeeper-stream-storage-java-client-4.18.0.jar + - org.apache.bookkeeper-stream-storage-java-client-base-4.18.0.jar + - org.apache.bookkeeper-stream-storage-proto-4.18.0.jar + - org.apache.bookkeeper-stream-storage-server-4.18.0.jar + - org.apache.bookkeeper-stream-storage-service-api-4.18.0.jar + - org.apache.bookkeeper-stream-storage-service-impl-4.18.0.jar + - org.apache.bookkeeper.http-http-server-4.18.0.jar + - org.apache.bookkeeper.http-vertx-http-server-4.18.0.jar + - org.apache.bookkeeper.stats-bookkeeper-stats-api-4.18.0.jar + - org.apache.distributedlog-distributedlog-common-4.18.0.jar + - org.apache.distributedlog-distributedlog-core-4.18.0-tests.jar + - org.apache.distributedlog-distributedlog-core-4.18.0.jar + - org.apache.distributedlog-distributedlog-protocol-4.18.0.jar + - org.apache.bookkeeper-native-io-4.18.0.jar + - org.apache.bookkeeper-native-library-common-4.18.0.jar - at.yawk.lz4-lz4-java-1.10.3.jar * Apache HTTP Client - org.apache.httpcomponents-httpclient-4.5.13.jar @@ -430,7 +431,7 @@ The Apache Software License, Version 2.0 - org.eclipse.jetty.websocket-jetty-websocket-jetty-common-12.1.9.jar - org.eclipse.jetty.websocket-jetty-websocket-jetty-server-12.1.9.jar * SnakeYaml -- org.yaml-snakeyaml-2.0.jar - * RocksDB - org.rocksdb-rocksdbjni-7.9.2.jar + * RocksDB - org.rocksdb-rocksdbjni-9.9.3.jar * Google Error Prone Annotations - com.google.errorprone-error_prone_annotations-2.45.0.jar * Apache Thrift - org.apache.thrift-libthrift-0.23.0.jar * OkHttp3 @@ -443,26 +444,26 @@ The Apache Software License, Version 2.0 - org.jetbrains.kotlin-kotlin-stdlib-2.2.21.jar - org.jetbrains-annotations-13.0.jar * gRPC - - io.grpc-grpc-all-1.75.0.jar - - io.grpc-grpc-auth-1.75.0.jar - - io.grpc-grpc-context-1.75.0.jar - - io.grpc-grpc-core-1.75.0.jar - - io.grpc-grpc-protobuf-1.75.0.jar - - io.grpc-grpc-protobuf-lite-1.75.0.jar - - io.grpc-grpc-stub-1.75.0.jar - - io.grpc-grpc-alts-1.75.0.jar - - io.grpc-grpc-api-1.75.0.jar - - io.grpc-grpc-grpclb-1.75.0.jar - - io.grpc-grpc-netty-shaded-1.75.0.jar - - io.grpc-grpc-services-1.75.0.jar - - io.grpc-grpc-xds-1.75.0.jar - - io.grpc-grpc-rls-1.75.0.jar - - io.grpc-grpc-servlet-1.75.0.jar - - io.grpc-grpc-servlet-jakarta-1.75.0.jar - - io.grpc-grpc-util-1.75.0.jar - - io.grpc-grpc-opentelemetry-1.75.0.jar - - io.grpc-grpc-gcp-csm-observability-1.75.0.jar - - io.grpc-grpc-inprocess-1.75.0.jar + - io.grpc-grpc-all-1.79.0.jar + - io.grpc-grpc-auth-1.79.0.jar + - io.grpc-grpc-context-1.79.0.jar + - io.grpc-grpc-core-1.79.0.jar + - io.grpc-grpc-protobuf-1.79.0.jar + - io.grpc-grpc-protobuf-lite-1.79.0.jar + - io.grpc-grpc-stub-1.79.0.jar + - io.grpc-grpc-alts-1.79.0.jar + - io.grpc-grpc-api-1.79.0.jar + - io.grpc-grpc-grpclb-1.79.0.jar + - io.grpc-grpc-netty-shaded-1.79.0.jar + - io.grpc-grpc-services-1.79.0.jar + - io.grpc-grpc-xds-1.79.0.jar + - io.grpc-grpc-rls-1.79.0.jar + - io.grpc-grpc-servlet-1.79.0.jar + - io.grpc-grpc-servlet-jakarta-1.79.0.jar + - io.grpc-grpc-util-1.79.0.jar + - io.grpc-grpc-opentelemetry-1.79.0.jar + - io.grpc-grpc-gcp-csm-observability-1.79.0.jar + - io.grpc-grpc-inprocess-1.79.0.jar * Perfmark - io.perfmark-perfmark-api-0.26.0.jar * OpenCensus @@ -548,7 +549,6 @@ The Apache Software License, Version 2.0 - io.opentelemetry.instrumentation-opentelemetry-resources-2.28.1-alpha.jar - io.opentelemetry.instrumentation-opentelemetry-runtime-telemetry-2.28.1-alpha.jar - io.opentelemetry.semconv-opentelemetry-semconv-1.41.1.jar - - com.google.cloud.opentelemetry-detector-resources-support-0.36.0.jar - io.opentelemetry.contrib-opentelemetry-gcp-resources-1.57.0-alpha.jar * Spotify completable-futures - com.spotify-completable-futures-0.3.6.jar diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt index 36de53c73e9f1..9029e6eaab3ac 100644 --- a/distribution/shell/src/assemble/LICENSE.bin.txt +++ b/distribution/shell/src/assemble/LICENSE.bin.txt @@ -345,22 +345,23 @@ The Apache Software License, Version 2.0 - commons-text-1.14.0.jar - commons-compress-1.28.0.jar * Netty - - netty-buffer-4.1.134.Final.jar - - netty-codec-4.1.134.Final.jar - - netty-codec-dns-4.1.134.Final.jar - - netty-codec-http-4.1.134.Final.jar - - netty-codec-socks-4.1.134.Final.jar - - netty-codec-haproxy-4.1.134.Final.jar - - netty-common-4.1.134.Final.jar - - netty-handler-4.1.134.Final.jar - - netty-handler-proxy-4.1.134.Final.jar - - netty-resolver-4.1.134.Final.jar - - netty-resolver-dns-4.1.134.Final.jar - - netty-transport-4.1.134.Final.jar - - netty-transport-classes-epoll-4.1.134.Final.jar - - netty-transport-native-epoll-4.1.134.Final-linux-aarch_64.jar - - netty-transport-native-epoll-4.1.134.Final-linux-x86_64.jar - - netty-transport-native-unix-common-4.1.134.Final.jar + - netty-buffer-4.2.14.Final.jar + - netty-codec-base-4.2.14.Final.jar + - netty-codec-compression-4.2.14.Final.jar + - netty-codec-dns-4.2.14.Final.jar + - netty-codec-http-4.2.14.Final.jar + - netty-codec-socks-4.2.14.Final.jar + - netty-codec-haproxy-4.2.14.Final.jar + - netty-common-4.2.14.Final.jar + - netty-handler-4.2.14.Final.jar + - netty-handler-proxy-4.2.14.Final.jar + - netty-resolver-4.2.14.Final.jar + - netty-resolver-dns-4.2.14.Final.jar + - netty-transport-4.2.14.Final.jar + - netty-transport-classes-epoll-4.2.14.Final.jar + - netty-transport-native-epoll-4.2.14.Final-linux-aarch_64.jar + - netty-transport-native-epoll-4.2.14.Final-linux-x86_64.jar + - netty-transport-native-unix-common-4.2.14.Final.jar - netty-tcnative-boringssl-static-2.0.77.Final.jar - netty-tcnative-boringssl-static-2.0.77.Final-linux-aarch_64.jar - netty-tcnative-boringssl-static-2.0.77.Final-linux-x86_64.jar @@ -368,12 +369,12 @@ The Apache Software License, Version 2.0 - netty-tcnative-boringssl-static-2.0.77.Final-osx-x86_64.jar - netty-tcnative-boringssl-static-2.0.77.Final-windows-x86_64.jar - netty-tcnative-classes-2.0.77.Final.jar - - netty-incubator-transport-classes-io_uring-0.0.26.Final.jar - - netty-incubator-transport-native-io_uring-0.0.26.Final-linux-aarch_64.jar - - netty-incubator-transport-native-io_uring-0.0.26.Final-linux-x86_64.jar - - netty-resolver-dns-classes-macos-4.1.134.Final.jar - - netty-resolver-dns-native-macos-4.1.134.Final-osx-aarch_64.jar - - netty-resolver-dns-native-macos-4.1.134.Final-osx-x86_64.jar + - netty-transport-classes-io_uring-4.2.14.Final.jar + - netty-transport-native-io_uring-4.2.14.Final-linux-aarch_64.jar + - netty-transport-native-io_uring-4.2.14.Final-linux-x86_64.jar + - netty-resolver-dns-classes-macos-4.2.14.Final.jar + - netty-resolver-dns-native-macos-4.2.14.Final-osx-aarch_64.jar + - netty-resolver-dns-native-macos-4.2.14.Final-osx-x86_64.jar * Prometheus client - simpleclient-0.16.0.jar - simpleclient_log4j2-0.16.0.jar @@ -392,12 +393,13 @@ The Apache Software License, Version 2.0 - opentelemetry-common-1.62.0.jar - opentelemetry-context-1.62.0.jar * Slog - - slog-0.9.7.jar + - slog-0.9.9.jar * BookKeeper - - bookkeeper-common-allocator-4.17.3.jar - - cpu-affinity-4.17.3.jar - - circe-checksum-4.17.3.jar + - bookkeeper-common-allocator-4.18.0.jar + - cpu-affinity-4.18.0.jar + - circe-checksum-4.18.0.jar + - native-library-common-4.18.0.jar * AirCompressor - aircompressor-2.0.3.jar * AsyncHttpClient diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 26f6af028e944..1ba7561347631 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -23,18 +23,17 @@ pulsar-client-python = "3.10.0" # Code quality checkstyle = "13.3.0" # Major frameworks -bookkeeper = "4.17.3" +bookkeeper = "4.18.0" zookeeper = "3.9.5" -netty = "4.1.134.Final" -netty-iouring = "0.0.26.Final" +netty = "4.2.14.Final" jetty = "12.1.9" jersey = "3.1.10" jackson = "2.21.3" jackson-annotations = "2.21" protobuf = "3.25.5" -grpc = "1.75.0" +grpc = "1.79.0" slf4j = "2.0.17" -slog = "0.9.7" +slog = "0.9.9" log4j2 = "2.25.4" lombok = "1.18.42" # OpenTelemetry @@ -107,7 +106,7 @@ picocli = "4.7.5" jline3 = "3.21.0" jline2 = "2.14.6" javassist = "3.25.0-GA" -rocksdb = "7.9.2" +rocksdb = "9.9.3" audience-annotations = "0.12.0" # Misc curator = "5.7.1" @@ -219,8 +218,8 @@ netty-resolver-dns-native-macos = { module = "io.netty:netty-resolver-dns-native netty-transport-native-epoll = { module = "io.netty:netty-transport-native-epoll", version.ref = "netty" } netty-transport-native-unix-common = { module = "io.netty:netty-transport-native-unix-common", version.ref = "netty" } netty-tcnative-boringssl-static = { module = "io.netty:netty-tcnative-boringssl-static", version.ref = "netty-tcnative" } -netty-incubator-transport-classes-io_uring = { module = "io.netty.incubator:netty-incubator-transport-classes-io_uring", version.ref = "netty-iouring" } -netty-incubator-transport-native-io-uring = { module = "io.netty.incubator:netty-incubator-transport-native-io_uring", version.ref = "netty-iouring" } +netty-transport-classes-io_uring = { module = "io.netty:netty-transport-classes-io_uring", version.ref = "netty" } +netty-transport-native-io-uring = { module = "io.netty:netty-transport-native-io_uring", version.ref = "netty" } netty-reactive-streams = { module = "com.typesafe.netty:netty-reactive-streams", version.ref = "netty-reactive-streams" } # Protobuf / gRPC protobuf-bom = { module = "com.google.protobuf:protobuf-bom", version.ref = "protobuf" } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java index a284f802c7837..3a45e74b04514 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java @@ -18,7 +18,6 @@ */ package org.apache.bookkeeper.mledger.offload; -import com.google.protobuf.ByteString; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -41,7 +40,7 @@ import org.apache.bookkeeper.mledger.proto.OffloadContext; import org.apache.bookkeeper.mledger.proto.OffloadDriverMetadata; import org.apache.bookkeeper.net.BookieId; -import org.apache.bookkeeper.proto.DataFormats; +import org.apache.bookkeeper.proto.LedgerMetadataFormat; @CustomLog public final class OffloadUtils { @@ -107,38 +106,38 @@ public static void setOffloadDriverMetadata(LedgerInfo infoBuilder, } public static byte[] buildLedgerMetadataFormat(LedgerMetadata metadata) { - DataFormats.LedgerMetadataFormat.Builder builder = DataFormats.LedgerMetadataFormat.newBuilder(); + LedgerMetadataFormat builder = new LedgerMetadataFormat(); builder.setQuorumSize(metadata.getWriteQuorumSize()) .setAckQuorumSize(metadata.getAckQuorumSize()) .setEnsembleSize(metadata.getEnsembleSize()) .setLength(metadata.getLength()) - .setState(metadata.isClosed() ? DataFormats.LedgerMetadataFormat.State.CLOSED : - DataFormats.LedgerMetadataFormat.State.OPEN) + .setState(metadata.isClosed() ? LedgerMetadataFormat.State.CLOSED : + LedgerMetadataFormat.State.OPEN) .setLastEntryId(metadata.getLastEntryId()) .setCtime(metadata.getCtime()) .setDigestType(BookKeeper.DigestType.toProtoDigestType( BookKeeper.DigestType.fromApiDigestType(metadata.getDigestType()))); for (Map.Entry e : metadata.getCustomMetadata().entrySet()) { - builder.addCustomMetadataBuilder() - .setKey(e.getKey()).setValue(ByteString.copyFrom(e.getValue())); + builder.addCustomMetadata() + .setKey(e.getKey()).setValue(e.getValue()); } for (Map.Entry> e : metadata.getAllEnsembles().entrySet()) { - builder.addSegmentBuilder() + builder.addSegment() .setFirstEntryId(e.getKey()) - .addAllEnsembleMember(e.getValue().stream().map(BookieId::toString).collect(Collectors.toList())); + .addAllEnsembleMembers(e.getValue().stream().map(BookieId::toString).collect(Collectors.toList())); } - return builder.build().toByteArray(); + return builder.toByteArray(); } public static LedgerMetadata parseLedgerMetadata(long id, byte[] bytes) throws IOException { - DataFormats.LedgerMetadataFormat ledgerMetadataFormat = DataFormats.LedgerMetadataFormat.newBuilder() - .mergeFrom(bytes).build(); + LedgerMetadataFormat ledgerMetadataFormat = new LedgerMetadataFormat(); + ledgerMetadataFormat.parseFrom(bytes); LedgerMetadataBuilder builder = LedgerMetadataBuilder.create() .withLastEntryId(ledgerMetadataFormat.getLastEntryId()) - .withPassword(ledgerMetadataFormat.getPassword().toByteArray()) + .withPassword(ledgerMetadataFormat.getPassword()) .withClosedState() .withId(id) .withMetadataFormatVersion(2) @@ -147,9 +146,9 @@ public static LedgerMetadata parseLedgerMetadata(long id, byte[] bytes) throws I .withCreationTime(ledgerMetadataFormat.getCtime()) .withWriteQuorumSize(ledgerMetadataFormat.getQuorumSize()) .withEnsembleSize(ledgerMetadataFormat.getEnsembleSize()); - ledgerMetadataFormat.getSegmentList().forEach(segment -> { + ledgerMetadataFormat.getSegmentsList().forEach(segment -> { ArrayList addressArrayList = new ArrayList<>(); - segment.getEnsembleMemberList().forEach(address -> { + segment.getEnsembleMembersList().forEach(address -> { try { addressArrayList.add(BookieId.parse(address)); } catch (IllegalArgumentException e) { @@ -159,10 +158,10 @@ public static LedgerMetadata parseLedgerMetadata(long id, byte[] bytes) throws I builder.newEnsembleEntry(segment.getFirstEntryId(), addressArrayList); }); - if (ledgerMetadataFormat.getCustomMetadataCount() > 0) { + if (ledgerMetadataFormat.getCustomMetadatasCount() > 0) { Map customMetadata = new HashMap<>(); - ledgerMetadataFormat.getCustomMetadataList().forEach( - entry -> customMetadata.put(entry.getKey(), entry.getValue().toByteArray())); + ledgerMetadataFormat.getCustomMetadatasList().forEach( + entry -> customMetadata.put(entry.getKey(), entry.getValue())); builder.withCustomMetadata(customMetadata); } diff --git a/pulsar-broker/build.gradle.kts b/pulsar-broker/build.gradle.kts index 80c9ade313695..0e9ab444a3f02 100644 --- a/pulsar-broker/build.gradle.kts +++ b/pulsar-broker/build.gradle.kts @@ -120,6 +120,7 @@ dependencies { testImplementation(project(":pulsar-functions:pulsar-functions-api-examples")) testImplementation(project(":pulsar-io:pulsar-io-batch-discovery-triggerers")) testImplementation(libs.zt.zip) + testImplementation(libs.re2j) testImplementation(libs.asynchttpclient) testImplementation(libs.bcprov.jdk18on) testImplementation(libs.commons.math3) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java index 2f68928d6d6c7..a16b5b79c2f5d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java @@ -355,10 +355,10 @@ public void runStreamStorage(CompositeConfiguration conf) throws Exception { } catch (NamespaceNotFoundException nnfe) { log.info("Creating default namespace"); try { + NamespaceConfiguration nsConf = new NamespaceConfiguration(); + nsConf.setDefaultStreamConf().copyFrom(DEFAULT_STREAM_CONF); NamespaceProperties ns = - FutureUtils.result(admin.createNamespace("default", NamespaceConfiguration.newBuilder() - .setDefaultStreamConf(DEFAULT_STREAM_CONF) - .build())); + FutureUtils.result(admin.createNamespace("default", nsConf)); log.info().attr("n", ns).log("Successfully created 'default' namespace :\n"); } catch (NamespaceExistsException nee) { // namespace already exists @@ -411,6 +411,10 @@ public void startStandalone() throws Exception { public void startStandalone(ServerConfiguration conf, boolean enableStreamStorage) throws Exception { log.debug("Local ZK/BK starting ..."); conf.setAdvertisedAddress(advertisedAddress); + // This is an embedded, localhost-only ensemble (advertised address is a loopback address), so + // loopback binding must be permitted. BookKeeper forbids binding to a loopback address unless + // allowLoopback is set. Mirror the start(boolean) path, which sets this for the same reason. + conf.setAllowLoopback(true); runZookeeper(1000); initializeZookeper(); diff --git a/pulsar-common/build.gradle.kts b/pulsar-common/build.gradle.kts index ef5d3cb8c1ae3..b3aa7c1bf2d4a 100644 --- a/pulsar-common/build.gradle.kts +++ b/pulsar-common/build.gradle.kts @@ -141,9 +141,9 @@ dependencies { implementation(variantOf(libs.netty.tcnative.boringssl.static) { classifier("linux-aarch_64") }) implementation(variantOf(libs.netty.tcnative.boringssl.static) { classifier("osx-x86_64") }) implementation(variantOf(libs.netty.tcnative.boringssl.static) { classifier("osx-aarch_64") }) - implementation(libs.netty.incubator.transport.classes.io.uring) - implementation(variantOf(libs.netty.incubator.transport.native.io.uring) { classifier("linux-x86_64") }) - implementation(variantOf(libs.netty.incubator.transport.native.io.uring) { classifier("linux-aarch_64") }) + implementation(libs.netty.transport.classes.io.uring) + implementation(variantOf(libs.netty.transport.native.io.uring) { classifier("linux-x86_64") }) + implementation(variantOf(libs.netty.transport.native.io.uring) { classifier("linux-aarch_64") }) implementation(libs.netty.codec.haproxy) implementation(libs.commons.lang3) implementation(libs.jakarta.ws.rs.api) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/DefaultPulsarSslFactory.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/DefaultPulsarSslFactory.java index 9be16f835b28b..b7902b4543fc5 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/DefaultPulsarSslFactory.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/DefaultPulsarSslFactory.java @@ -332,6 +332,13 @@ private SSLEngine createSSLEngine(ByteBufAllocator buf, String peerHost, int pee sslParams.setWantClientAuth(true); } } + // Netty 4.2 changed SslContext.newEngine(alloc, peerHost, peerPort) to default the endpoint + // identification algorithm to "HTTPS" (it was unset in 4.1). Pulsar applies TLS hostname + // verification separately via SecurityUtility.configureSSLHandler(), and only when + // tlsHostnameVerificationEnable is set. Clear it here so the engine does not perform unintended + // hostname verification (which would otherwise fail whenever the peer host is absent from the + // certificate SANs, e.g. internal "localhost" broker connections). + sslParams.setEndpointIdentificationAlgorithm(null); if (this.config.getTlsProtocols() != null && !this.config.getTlsProtocols().isEmpty()) { sslParams.setProtocols(this.config.getTlsProtocols().toArray(new String[0])); } else { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java index 3dbdd28871532..cabb434bf7cae 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java @@ -20,6 +20,8 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.EventLoopGroup; +import io.netty.channel.IoEventLoopGroup; +import io.netty.channel.MultiThreadIoEventLoopGroup; import io.netty.channel.SelectStrategy; import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollChannelOption; @@ -35,11 +37,11 @@ import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.incubator.channel.uring.IOUring; -import io.netty.incubator.channel.uring.IOUringDatagramChannel; -import io.netty.incubator.channel.uring.IOUringEventLoopGroup; -import io.netty.incubator.channel.uring.IOUringServerSocketChannel; -import io.netty.incubator.channel.uring.IOUringSocketChannel; +import io.netty.channel.uring.IoUring; +import io.netty.channel.uring.IoUringDatagramChannel; +import io.netty.channel.uring.IoUringIoHandler; +import io.netty.channel.uring.IoUringServerSocketChannel; +import io.netty.channel.uring.IoUringSocketChannel; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadFactory; import lombok.CustomLog; @@ -55,10 +57,13 @@ public class EventLoopUtil { /** * @return an EventLoopGroup suitable for the current platform */ + // Epoll/NioEventLoopGroup are deprecated since Netty 4.2 in favor of MultiThreadIoEventLoopGroup, but are + // intentionally kept here as the (still-supported) epoll/nio transports; io_uring uses the new IoHandler API. + @SuppressWarnings("deprecation") public static EventLoopGroup newEventLoopGroup(int nThreads, boolean enableBusyWait, ThreadFactory threadFactory) { if (Epoll.isAvailable()) { if (isIoUringEnabledAndAvailable()) { - return new IOUringEventLoopGroup(nThreads, threadFactory); + return new MultiThreadIoEventLoopGroup(nThreads, threadFactory, IoUringIoHandler.newFactory()); } else { if (!enableBusyWait) { // Regular Epoll based event loop @@ -96,21 +101,33 @@ private static boolean isIoUringEnabledAndAvailable() { String ioUringSetting = System.getProperty(ENABLE_IO_URING); boolean ioUringEnabled = "1".equalsIgnoreCase(ioUringSetting) || "true".equalsIgnoreCase(ioUringSetting); if (ioUringEnabled) { - // Throw exception if IOUring cannot be used - IOUring.ensureAvailability(); + // Throw exception if io_uring cannot be used + IoUring.ensureAvailability(); } return ioUringEnabled; } + /** + * Returns true if the given EventLoopGroup is backed by Netty's io_uring transport. + * Since Netty 4.2 the classic {@code IOUringEventLoopGroup} no longer exists; io_uring groups are + * {@link MultiThreadIoEventLoopGroup} instances (as are epoll/nio groups), so the IO handler type is + * queried via {@link IoEventLoopGroup#isIoType} rather than {@code instanceof}. + */ + private static boolean isIoUring(EventLoopGroup eventLoopGroup) { + return eventLoopGroup instanceof IoEventLoopGroup + && ((IoEventLoopGroup) eventLoopGroup).isIoType(IoUringIoHandler.class); + } + /** * Return a SocketChannel class suitable for the given EventLoopGroup implementation. * * @param eventLoopGroup * @return */ + @SuppressWarnings("deprecation") // EpollEventLoopGroup is deprecated since Netty 4.2 but still supported public static Class getClientSocketChannelClass(EventLoopGroup eventLoopGroup) { - if (eventLoopGroup instanceof IOUringEventLoopGroup) { - return IOUringSocketChannel.class; + if (isIoUring(eventLoopGroup)) { + return IoUringSocketChannel.class; } else if (eventLoopGroup instanceof EpollEventLoopGroup) { return EpollSocketChannel.class; } else { @@ -128,7 +145,7 @@ public static Class getClientSocketChannelClass(EventLo public static Class getClientSocketChannelClass() { if (Epoll.isAvailable()) { if (isIoUringEnabledAndAvailable()) { - return IOUringSocketChannel.class; + return IoUringSocketChannel.class; } else { return EpollSocketChannel.class; } @@ -137,9 +154,10 @@ public static Class getClientSocketChannelClass() { } } + @SuppressWarnings("deprecation") // EpollEventLoopGroup is deprecated since Netty 4.2 but still supported public static Class getServerSocketChannelClass(EventLoopGroup eventLoopGroup) { - if (eventLoopGroup instanceof IOUringEventLoopGroup) { - return IOUringServerSocketChannel.class; + if (isIoUring(eventLoopGroup)) { + return IoUringServerSocketChannel.class; } else if (eventLoopGroup instanceof EpollEventLoopGroup) { return EpollServerSocketChannel.class; } else { @@ -157,7 +175,7 @@ public static Class getServerSocketChannelClass(E public static Class getServerSocketChannelClass() { if (Epoll.isAvailable()) { if (isIoUringEnabledAndAvailable()) { - return IOUringServerSocketChannel.class; + return IoUringServerSocketChannel.class; } else { return EpollServerSocketChannel.class; } @@ -166,9 +184,10 @@ public static Class getServerSocketChannelClass() } } + @SuppressWarnings("deprecation") // EpollEventLoopGroup is deprecated since Netty 4.2 but still supported public static Class getDatagramChannelClass(EventLoopGroup eventLoopGroup) { - if (eventLoopGroup instanceof IOUringEventLoopGroup) { - return IOUringDatagramChannel.class; + if (isIoUring(eventLoopGroup)) { + return IoUringDatagramChannel.class; } else if (eventLoopGroup instanceof EpollEventLoopGroup) { return EpollDatagramChannel.class; } else { @@ -186,7 +205,7 @@ public static Class getDatagramChannelClass(EventLoop public static Class getDatagramChannelClass() { if (Epoll.isAvailable()) { if (isIoUringEnabledAndAvailable()) { - return IOUringDatagramChannel.class; + return IoUringDatagramChannel.class; } else { return EpollDatagramChannel.class; } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/BitSetRecyclableRecyclableTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/BitSetRecyclableRecyclableTest.java index 8061f853d66c1..5bb615b19ecb7 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/BitSetRecyclableRecyclableTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/BitSetRecyclableRecyclableTest.java @@ -28,11 +28,14 @@ public void testRecycle() { BitSetRecyclable bitset1 = BitSetRecyclable.create(); bitset1.set(3); bitset1.recycle(); + // Note: netty 4.2 reworked the Recycler and no longer returns the just-recycled instance on the + // next get() (the previous LIFO thread-local reuse was an implementation detail, not a contract). + // Verify only the invariants that hold under any Recycler: recycle() clears state, and two + // concurrently-live instances are always distinct objects. BitSetRecyclable bitset2 = BitSetRecyclable.create(); BitSetRecyclable bitset3 = BitSetRecyclable.create(); - Assert.assertSame(bitset2, bitset1); Assert.assertFalse(bitset2.get(3)); - Assert.assertNotSame(bitset3, bitset1); + Assert.assertNotSame(bitset2, bitset3); } @Test diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java index 7ffda74156969..e835504637882 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java @@ -30,11 +30,14 @@ public void testRecycle() { ConcurrentBitSetRecyclable bitset1 = ConcurrentBitSetRecyclable.create(); bitset1.set(3); bitset1.recycle(); + // Note: netty 4.2 reworked the Recycler and no longer returns the just-recycled instance on the + // next get() (the previous LIFO thread-local reuse was an implementation detail, not a contract). + // Verify only the invariants that hold under any Recycler: recycle() clears state, and two + // concurrently-live instances are always distinct objects. ConcurrentBitSetRecyclable bitset2 = ConcurrentBitSetRecyclable.create(); ConcurrentBitSetRecyclable bitset3 = ConcurrentBitSetRecyclable.create(); - Assert.assertSame(bitset2, bitset1); Assert.assertFalse(bitset2.get(3)); - Assert.assertNotSame(bitset3, bitset1); + Assert.assertNotSame(bitset2, bitset3); } @SuppressWarnings("deprecation") diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreProviderImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreProviderImpl.java index 607154ffdf4cf..6743034447517 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreProviderImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreProviderImpl.java @@ -101,11 +101,10 @@ private void createStateTable(String stateStorageServiceUrl, try (StorageAdminClient storageAdminClient = new SimpleStorageAdminClientImpl( StorageClientSettings.newBuilder().serviceUri(stateStorageServiceUrl).build(), ClientResources.create().scheduler())){ - StreamConfiguration streamConf = StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF) + StreamConfiguration streamConf = new StreamConfiguration().copyFrom(DEFAULT_STREAM_CONF) .setInitialNumRanges(4) .setMinNumRanges(4) - .setStorageType(StorageType.TABLE) - .build(); + .setStorageType(StorageType.TABLE); Stopwatch elapsedWatch = Stopwatch.createStarted(); Exception lastException = null; @@ -115,9 +114,9 @@ private void createStateTable(String stateStorageServiceUrl, return; } catch (NamespaceNotFoundException nnfe) { try { - result(storageAdminClient.createNamespace(tableNs, NamespaceConfiguration.newBuilder() - .setDefaultStreamConf(streamConf) - .build())); + NamespaceConfiguration nsConf = new NamespaceConfiguration(); + nsConf.setDefaultStreamConf().copyFrom(streamConf); + result(storageAdminClient.createNamespace(tableNs, nsConf)); } catch (Exception e) { // there might be two clients conflicting at creating table, so let's retrieve the table again // to make sure the table is created. diff --git a/pulsar-functions/localrun-shaded/build.gradle.kts b/pulsar-functions/localrun-shaded/build.gradle.kts index 5777ed96b2ac8..06e8fa1f1c4a0 100644 --- a/pulsar-functions/localrun-shaded/build.gradle.kts +++ b/pulsar-functions/localrun-shaded/build.gradle.kts @@ -42,7 +42,6 @@ tasks.shadowJar { include(dependency("org.apache.commons:.*")) include(dependency("com.fasterxml.jackson.*:.*")) include(dependency("io.netty:.*")) - include(dependency("io.netty.incubator:.*")) include(dependency("com.google.*:.*")) include(dependency("jakarta.servlet:.*")) include(dependency("org.reactivestreams:reactive-streams")) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java index a697f32f0c2fe..86f7a44c94110 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java @@ -18,14 +18,15 @@ */ package org.apache.pulsar.metadata.bookkeeper; -import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import lombok.CustomLog; import org.apache.bookkeeper.discover.BookieServiceInfo; import org.apache.bookkeeper.discover.BookieServiceInfoUtils; -import org.apache.bookkeeper.proto.DataFormats.BookieServiceInfoFormat; +import org.apache.bookkeeper.proto.BookieServiceInfoFormat; import org.apache.pulsar.metadata.api.MetadataSerde; import org.apache.pulsar.metadata.api.Stat; @@ -40,25 +41,19 @@ private BookieServiceInfoSerde() { @Override public byte[] serialize(String path, BookieServiceInfo bookieServiceInfo) throws IOException { log.debug().attr("bookieServiceInfo", bookieServiceInfo).log("serialize BookieServiceInfo"); - try (ByteArrayOutputStream os = new ByteArrayOutputStream()) { - BookieServiceInfoFormat.Builder builder = BookieServiceInfoFormat.newBuilder(); - List bsiEndpoints = bookieServiceInfo.getEndpoints().stream() - .map(e -> BookieServiceInfoFormat.Endpoint.newBuilder() - .setId(e.getId()) - .setPort(e.getPort()) - .setHost(e.getHost()) - .setProtocol(e.getProtocol()) - .addAllAuth(e.getAuth()) - .addAllExtensions(e.getExtensions()) - .build()) - .collect(Collectors.toList()); - - builder.addAllEndpoints(bsiEndpoints); - builder.putAllProperties(bookieServiceInfo.getProperties()); - - builder.build().writeTo(os); - return os.toByteArray(); + BookieServiceInfoFormat builder = new BookieServiceInfoFormat(); + for (BookieServiceInfo.Endpoint e : bookieServiceInfo.getEndpoints()) { + builder.addEndpoint() + .setId(e.getId()) + .setPort(e.getPort()) + .setHost(e.getHost()) + .setProtocol(e.getProtocol()) + .addAllAuths(e.getAuth()) + .addAllExtensions(e.getExtensions()); } + bookieServiceInfo.getProperties().forEach(builder::putProperties); + + return builder.toByteArray(); } @Override @@ -71,7 +66,8 @@ public BookieServiceInfo deserialize(String path, byte[] bookieServiceInfo, Stat return BookieServiceInfoUtils.buildLegacyBookieServiceInfo(bookieId); } - BookieServiceInfoFormat builder = BookieServiceInfoFormat.parseFrom(bookieServiceInfo); + BookieServiceInfoFormat builder = new BookieServiceInfoFormat(); + builder.parseFrom(bookieServiceInfo); BookieServiceInfo bsi = new BookieServiceInfo(); List endpoints = builder.getEndpointsList().stream() .map(e -> { @@ -80,14 +76,16 @@ public BookieServiceInfo deserialize(String path, byte[] bookieServiceInfo, Stat endpoint.setPort(e.getPort()); endpoint.setHost(e.getHost()); endpoint.setProtocol(e.getProtocol()); - endpoint.setAuth(e.getAuthList()); + endpoint.setAuth(e.getAuthsList()); endpoint.setExtensions(e.getExtensionsList()); return endpoint; }) .collect(Collectors.toList()); bsi.setEndpoints(endpoints); - bsi.setProperties(builder.getPropertiesMap()); + Map properties = new HashMap<>(); + builder.forEachProperties(properties::put); + bsi.setProperties(properties); return bsi; diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java index 86bb1b85cd1e7..38d7a5cd54a61 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java @@ -20,16 +20,8 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.apache.bookkeeper.proto.DataFormats.CheckAllLedgersFormat; -import static org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat; -import static org.apache.bookkeeper.proto.DataFormats.LockDataFormat; -import static org.apache.bookkeeper.proto.DataFormats.PlacementPolicyCheckFormat; -import static org.apache.bookkeeper.proto.DataFormats.ReplicasCheckFormat; -import static org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat; import static org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT; import com.google.common.base.Joiner; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.TextFormat; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; @@ -56,7 +48,12 @@ import org.apache.bookkeeper.meta.UnderreplicatedLedger; import org.apache.bookkeeper.net.DNS; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; -import org.apache.bookkeeper.proto.DataFormats; +import org.apache.bookkeeper.proto.CheckAllLedgersFormat; +import org.apache.bookkeeper.proto.LedgerRereplicationLayoutFormat; +import org.apache.bookkeeper.proto.LockDataFormat; +import org.apache.bookkeeper.proto.PlacementPolicyCheckFormat; +import org.apache.bookkeeper.proto.ReplicasCheckFormat; +import org.apache.bookkeeper.proto.UnderreplicatedLedgerFormat; import org.apache.bookkeeper.replication.ReplicationEnableCb; import org.apache.bookkeeper.replication.ReplicationException; import org.apache.bookkeeper.util.BookKeeperConstants; @@ -164,23 +161,23 @@ static String getUrLockPath(String rootPath) { } public static byte[] getLockData() { - DataFormats.LockDataFormat.Builder lockDataBuilder = DataFormats.LockDataFormat.newBuilder(); + LockDataFormat lockData = new LockDataFormat(); try { - lockDataBuilder.setBookieId(DNS.getDefaultHost("default")); + lockData.setBookieId(DNS.getDefaultHost("default")); } catch (UnknownHostException uhe) { // if we cant get the address, ignore. it's optional // in the data structure in any case } - return lockDataBuilder.build().toString().getBytes(UTF_8); + return lockData.toTextFormat().getBytes(UTF_8); } private void checkLayout() throws ReplicationException.CompatibilityException { while (true) { if (!store.exists(layoutPath).join()) { - LedgerRereplicationLayoutFormat.Builder builder = LedgerRereplicationLayoutFormat.newBuilder(); - builder.setType(LAYOUT).setVersion(LAYOUT_VERSION); + LedgerRereplicationLayoutFormat layoutFormat = new LedgerRereplicationLayoutFormat(); + layoutFormat.setType(LAYOUT).setVersion(LAYOUT_VERSION); try { - store.put(layoutPath, builder.build().toString().getBytes(UTF_8), Optional.of(-1L)).get(); + store.put(layoutPath, layoutFormat.toTextFormat().getBytes(UTF_8), Optional.of(-1L)).get(); } catch (ExecutionException | InterruptedException e) { if (!(e.getCause() instanceof MetadataStoreException.BadVersionException)) { throw new RuntimeException(e); @@ -189,17 +186,16 @@ private void checkLayout() throws ReplicationException.CompatibilityException { } else { byte[] layoutData = store.get(layoutPath).join().get().getValue(); - LedgerRereplicationLayoutFormat.Builder builder = LedgerRereplicationLayoutFormat.newBuilder(); + LedgerRereplicationLayoutFormat layout = new LedgerRereplicationLayoutFormat(); try { - TextFormat.merge(new String(layoutData, UTF_8), builder); - LedgerRereplicationLayoutFormat layout = builder.build(); + layout.parseFromTextFormat(layoutData); if (!layout.getType().equals(LAYOUT) || layout.getVersion() != LAYOUT_VERSION) { throw new ReplicationException.CompatibilityException( "Incompatible layout found (" + LAYOUT + ":" + LAYOUT_VERSION + ")"); } - } catch (TextFormat.ParseException pe) { + } catch (RuntimeException pe) { throw new ReplicationException.CompatibilityException( "Invalid data found", pe); } @@ -293,12 +289,11 @@ public UnderreplicatedLedger getLedgerUnreplicationInfo(long ledgerId) byte[] data = optRes.get().getValue(); - UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder(); + UnderreplicatedLedgerFormat underreplicatedLedgerFormat = new UnderreplicatedLedgerFormat(); - TextFormat.merge(new String(data, UTF_8), builder); - UnderreplicatedLedgerFormat underreplicatedLedgerFormat = builder.build(); + underreplicatedLedgerFormat.parseFromTextFormat(data); PulsarUnderreplicatedLedger underreplicatedLedger = new PulsarUnderreplicatedLedger(ledgerId); - List replicaList = underreplicatedLedgerFormat.getReplicaList(); + List replicaList = underreplicatedLedgerFormat.getReplicasList(); long ctime = (underreplicatedLedgerFormat.hasCtime() ? underreplicatedLedgerFormat.getCtime() : UnderreplicatedLedger.UNASSIGNED_CTIME); underreplicatedLedger.setCtime(ctime); @@ -309,7 +304,7 @@ public UnderreplicatedLedger getLedgerUnreplicationInfo(long ledgerId) } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while connecting metadata store", ie); - } catch (TextFormat.ParseException pe) { + } catch (RuntimeException pe) { throw new ReplicationException.UnavailableException("Error parsing proto message", pe); } } @@ -327,12 +322,12 @@ public CompletableFuture markLedgerUnderreplicatedAsync(long ledgerId, Col private void tryMarkLedgerUnderreplicatedAsync(final String path, final Collection missingReplicas, final CompletableFuture finalFuture) { - final UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder(); + final UnderreplicatedLedgerFormat builder = new UnderreplicatedLedgerFormat(); if (conf.getStoreSystemTimeAsLedgerUnderreplicatedMarkTime()) { builder.setCtime(System.currentTimeMillis()); } missingReplicas.forEach(builder::addReplica); - final byte[] urLedgerData = builder.build().toString().getBytes(UTF_8); + final byte[] urLedgerData = builder.toTextFormat().getBytes(UTF_8); store.put(path, urLedgerData, Optional.of(-1L)) .thenRun(() -> { FutureUtils.complete(finalFuture, null); @@ -361,20 +356,19 @@ private void handleLedgerUnderreplicatedAlreadyMarked(final String path, byte[] existingUrLedgerData = optRes.get().getValue(); // deserialize existing underreplicated ledger data - final UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder(); + final UnderreplicatedLedgerFormat builder = new UnderreplicatedLedgerFormat(); try { - TextFormat.merge(new String(existingUrLedgerData, UTF_8), builder); - } catch (TextFormat.ParseException e) { + builder.parseFromTextFormat(existingUrLedgerData); + } catch (RuntimeException e) { // corrupted metadata in zookeeper FutureUtils.completeExceptionally(finalFuture, new ReplicationException.UnavailableException( "Invalid underreplicated ledger data for ledger " + path, e)); return; } - UnderreplicatedLedgerFormat existingUrLedgerFormat = builder.build(); boolean replicaAdded = false; for (String missingReplica : missingReplicas) { - if (existingUrLedgerFormat.getReplicaList().contains(missingReplica)) { + if (builder.getReplicasList().contains(missingReplica)) { continue; } else { builder.addReplica(missingReplica); @@ -388,7 +382,7 @@ private void handleLedgerUnderreplicatedAlreadyMarked(final String path, if (conf.getStoreSystemTimeAsLedgerUnderreplicatedMarkTime()) { builder.setCtime(System.currentTimeMillis()); } - final byte[] newUrLedgerData = builder.build().toString().getBytes(UTF_8); + final byte[] newUrLedgerData = builder.toTextFormat().getBytes(UTF_8); store.put(path, newUrLedgerData, Optional.of(optRes.get().getStat().getVersion())) .thenRun(() -> { @@ -883,9 +877,8 @@ public String getReplicationWorkerIdRereplicatingLedger(long ledgerId) } byte[] lockData = optRes.get().getValue(); - LockDataFormat.Builder lockDataBuilder = LockDataFormat.newBuilder(); - TextFormat.merge(new String(lockData, UTF_8), lockDataBuilder); - LockDataFormat lock = lockDataBuilder.build(); + LockDataFormat lock = new LockDataFormat(); + lock.parseFromTextFormat(lockData); return lock.getBookieId(); } catch (ExecutionException | TimeoutException e) { log.error().exception(e).log("Error while getting ReplicationWorkerId rereplicating Ledger"); @@ -895,7 +888,7 @@ public String getReplicationWorkerIdRereplicatingLedger(long ledgerId) log.error().exception(e).log("Got interrupted while getting ReplicationWorkerId rereplicating Ledger"); Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e); - } catch (TextFormat.ParseException e) { + } catch (RuntimeException e) { log.error().exception(e).log("Error while parsing ZK data of lock"); throw new ReplicationException.UnavailableException("Error while parsing ZK data of lock", e); } @@ -905,9 +898,9 @@ public String getReplicationWorkerIdRereplicatingLedger(long ledgerId) public void setCheckAllLedgersCTime(long checkAllLedgersCTime) throws ReplicationException.UnavailableException { log.debug("setCheckAllLedgersCTime"); try { - CheckAllLedgersFormat.Builder builder = CheckAllLedgersFormat.newBuilder(); + CheckAllLedgersFormat builder = new CheckAllLedgersFormat(); builder.setCheckAllLedgersCTime(checkAllLedgersCTime); - byte[] checkAllLedgersFormatByteArray = builder.build().toByteArray(); + byte[] checkAllLedgersFormatByteArray = builder.toByteArray(); store.put(checkAllLedgersCtimePath, checkAllLedgersFormatByteArray, Optional.empty()) .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); @@ -929,7 +922,8 @@ public long getCheckAllLedgersCTime() throws ReplicationException.UnavailableExc return -1; } byte[] data = optRes.get().getValue(); - CheckAllLedgersFormat checkAllLedgersFormat = CheckAllLedgersFormat.parseFrom(data); + CheckAllLedgersFormat checkAllLedgersFormat = new CheckAllLedgersFormat(); + checkAllLedgersFormat.parseFrom(data); return checkAllLedgersFormat.hasCheckAllLedgersCTime() ? checkAllLedgersFormat.getCheckAllLedgersCTime() : -1; } catch (ExecutionException | TimeoutException ee) { @@ -937,7 +931,7 @@ public long getCheckAllLedgersCTime() throws ReplicationException.UnavailableExc } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie); - } catch (InvalidProtocolBufferException ipbe) { + } catch (RuntimeException ipbe) { throw new ReplicationException.UnavailableException("Error while parsing ZK protobuf binary data", ipbe); } } @@ -947,9 +941,9 @@ public void setPlacementPolicyCheckCTime(long placementPolicyCheckCTime) throws ReplicationException.UnavailableException { log.debug("setPlacementPolicyCheckCTime"); try { - PlacementPolicyCheckFormat.Builder builder = PlacementPolicyCheckFormat.newBuilder(); + PlacementPolicyCheckFormat builder = new PlacementPolicyCheckFormat(); builder.setPlacementPolicyCheckCTime(placementPolicyCheckCTime); - byte[] placementPolicyCheckFormatByteArray = builder.build().toByteArray(); + byte[] placementPolicyCheckFormatByteArray = builder.toByteArray(); store.put(placementPolicyCheckCtimePath, placementPolicyCheckFormatByteArray, Optional.empty()) .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); } catch (ExecutionException | TimeoutException ke) { @@ -971,7 +965,8 @@ public long getPlacementPolicyCheckCTime() throws ReplicationException.Unavailab return -1; } byte[] data = optRes.get().getValue(); - PlacementPolicyCheckFormat placementPolicyCheckFormat = PlacementPolicyCheckFormat.parseFrom(data); + PlacementPolicyCheckFormat placementPolicyCheckFormat = new PlacementPolicyCheckFormat(); + placementPolicyCheckFormat.parseFrom(data); return placementPolicyCheckFormat.hasPlacementPolicyCheckCTime() ? placementPolicyCheckFormat.getPlacementPolicyCheckCTime() : -1; } catch (ExecutionException | TimeoutException ee) { @@ -979,7 +974,7 @@ public long getPlacementPolicyCheckCTime() throws ReplicationException.Unavailab } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie); - } catch (InvalidProtocolBufferException ipbe) { + } catch (RuntimeException ipbe) { throw new ReplicationException.UnavailableException("Error while parsing ZK protobuf binary data", ipbe); } } @@ -987,9 +982,9 @@ public long getPlacementPolicyCheckCTime() throws ReplicationException.Unavailab @Override public void setReplicasCheckCTime(long replicasCheckCTime) throws ReplicationException.UnavailableException { try { - ReplicasCheckFormat.Builder builder = ReplicasCheckFormat.newBuilder(); + ReplicasCheckFormat builder = new ReplicasCheckFormat(); builder.setReplicasCheckCTime(replicasCheckCTime); - byte[] replicasCheckFormatByteArray = builder.build().toByteArray(); + byte[] replicasCheckFormatByteArray = builder.toByteArray(); store.put(replicasCheckCtimePath, replicasCheckFormatByteArray, Optional.empty()) .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); log.debug("setReplicasCheckCTime completed successfully"); @@ -1011,7 +1006,8 @@ public long getReplicasCheckCTime() throws ReplicationException.UnavailableExcep return -1; } byte[] data = optRes.get().getValue(); - ReplicasCheckFormat replicasCheckFormat = ReplicasCheckFormat.parseFrom(data); + ReplicasCheckFormat replicasCheckFormat = new ReplicasCheckFormat(); + replicasCheckFormat.parseFrom(data); log.debug("getReplicasCheckCTime completed successfully"); return replicasCheckFormat.hasReplicasCheckCTime() ? replicasCheckFormat.getReplicasCheckCTime() : -1; } catch (ExecutionException | TimeoutException ee) { @@ -1019,7 +1015,7 @@ public long getReplicasCheckCTime() throws ReplicationException.UnavailableExcep } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie); - } catch (InvalidProtocolBufferException ipbe) { + } catch (RuntimeException ipbe) { throw new ReplicationException.UnavailableException("Error while parsing ZK protobuf binary data", ipbe); } } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java index aba28ec1fc357..2c9571ea64e7f 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java @@ -35,10 +35,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import lombok.CustomLog; +import org.apache.bookkeeper.common.util.MathUtils; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.util.MathUtils; import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; import org.apache.bookkeeper.zookeeper.RetryPolicy; import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase; diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/FaultInjectableZKRegistrationManager.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/FaultInjectableZKRegistrationManager.java index d82fa1f76a6f5..4de78b8f507f2 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/FaultInjectableZKRegistrationManager.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/FaultInjectableZKRegistrationManager.java @@ -27,7 +27,6 @@ import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -37,7 +36,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Function; -import java.util.stream.Collectors; import lombok.CustomLog; import org.apache.bookkeeper.bookie.BookieException; import org.apache.bookkeeper.bookie.BookieException.BookieIllegalOpException; @@ -60,7 +58,7 @@ import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager; import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase; import org.apache.bookkeeper.net.BookieId; -import org.apache.bookkeeper.proto.DataFormats.BookieServiceInfoFormat; +import org.apache.bookkeeper.proto.BookieServiceInfoFormat; import org.apache.bookkeeper.util.BookKeeperConstants; import org.apache.bookkeeper.util.ZkUtils; import org.apache.bookkeeper.versioning.LongVersion; @@ -238,30 +236,19 @@ public void registerBookie(BookieId bookieId, boolean readOnly, @VisibleForTesting static byte[] serializeBookieServiceInfo(BookieServiceInfo bookieServiceInfo) { log.debug().attr("bookieServiceInfo", bookieServiceInfo).log("serialize BookieServiceInfo"); - try (ByteArrayOutputStream os = new ByteArrayOutputStream()) { - BookieServiceInfoFormat.Builder builder = BookieServiceInfoFormat.newBuilder(); - List bsiEndpoints = bookieServiceInfo.getEndpoints().stream() - .map(e -> { - return BookieServiceInfoFormat.Endpoint.newBuilder() - .setId(e.getId()) - .setPort(e.getPort()) - .setHost(e.getHost()) - .setProtocol(e.getProtocol()) - .addAllAuth(e.getAuth()) - .addAllExtensions(e.getExtensions()) - .build(); - }) - .collect(Collectors.toList()); - - builder.addAllEndpoints(bsiEndpoints); - builder.putAllProperties(bookieServiceInfo.getProperties()); - - builder.build().writeTo(os); - return os.toByteArray(); - } catch (IOException err) { - log.error("Cannot serialize bookieServiceInfo from " + bookieServiceInfo); - throw new RuntimeException(err); + BookieServiceInfoFormat builder = new BookieServiceInfoFormat(); + for (BookieServiceInfo.Endpoint e : bookieServiceInfo.getEndpoints()) { + builder.addEndpoint() + .setId(e.getId()) + .setPort(e.getPort()) + .setHost(e.getHost()) + .setProtocol(e.getProtocol()) + .addAllAuths(e.getAuth()) + .addAllExtensions(e.getExtensions()); } + bookieServiceInfo.getProperties().forEach(builder::putProperties); + + return builder.toByteArray(); } private void doRegisterBookie(String regPath, BookieServiceInfo bookieServiceInfo) throws BookieException { diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerManagerIteratorTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerManagerIteratorTest.java index c476146f4d550..16295d8c04119 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerManagerIteratorTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerManagerIteratorTest.java @@ -49,11 +49,11 @@ import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerMetadataBuilder; import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.common.util.MathUtils; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator; import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.net.BookieSocketAddress; -import org.apache.bookkeeper.util.MathUtils; import org.apache.bookkeeper.versioning.Version; import org.apache.bookkeeper.versioning.Versioned; import org.apache.pulsar.common.util.FutureUtil; diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java index 165ce82dd7c1f..205d617c4aca1 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java @@ -22,9 +22,7 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; -import com.google.protobuf.TextFormat; import java.lang.reflect.Field; -import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -52,7 +50,7 @@ import org.apache.bookkeeper.meta.UnderreplicatedLedger; import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager; import org.apache.bookkeeper.net.DNS; -import org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat; +import org.apache.bookkeeper.proto.UnderreplicatedLedgerFormat; import org.apache.bookkeeper.replication.ReplicationException.UnavailableException; import org.apache.bookkeeper.util.BookKeeperConstants; import org.apache.commons.lang3.StringUtils; @@ -500,11 +498,10 @@ public void test2reportSame(String provider, Supplier urlSupplier) m2.markLedgerUnderreplicated(ledgerA, missingReplica1); // verify duplicate missing replica - UnderreplicatedLedgerFormat.Builder builderA = UnderreplicatedLedgerFormat - .newBuilder(); + UnderreplicatedLedgerFormat builderA = new UnderreplicatedLedgerFormat(); byte[] data = store.get(getUrLedgerZnode(ledgerA)).join().get().getValue(); - TextFormat.merge(new String(data, Charset.forName("UTF-8")), builderA); - List replicaList = builderA.getReplicaList(); + builderA.parseFromTextFormat(data); + List replicaList = builderA.getReplicasList(); assertEquals(replicaList.size(), 1, "Published duplicate missing replica : " + replicaList); assertTrue(replicaList.contains(missingReplica1), "Published duplicate missing replica : " + replicaList); @@ -774,12 +771,11 @@ private void verifyMarkLedgerUnderreplicated(Collection missingReplica) } String urLedgerA = new String(store.get(znodeA).join().get().getValue()); - UnderreplicatedLedgerFormat.Builder builderA = UnderreplicatedLedgerFormat - .newBuilder(); + UnderreplicatedLedgerFormat builderA = new UnderreplicatedLedgerFormat(); for (String replica : missingReplica) { builderA.addReplica(replica); } - List replicaList = builderA.getReplicaList(); + List replicaList = builderA.getReplicasList(); for (String replica : missingReplica) { assertTrue(replicaList.contains(replica), diff --git a/settings.gradle.kts b/settings.gradle.kts index f8f16f386039f..45f9d487fc221 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -35,6 +35,19 @@ dependencyResolutionManagement { includeGroupByRegex("io\\.confluent(\\..*)?") } } + // BookKeeper 4.18.0 staging repository (release candidate, not yet on Maven Central). + // Remove once BookKeeper 4.18.0 is released to Maven Central. + maven { + name = "bk-staging" + url = uri("https://repository.apache.org/content/repositories/orgapachebookkeeper-1105/") + mavenContent { + releasesOnly() + } + content { + includeGroupByRegex("org\\.apache\\.bookkeeper(\\..*)?") + includeGroupByRegex("org\\.apache\\.distributedlog(\\..*)?") + } + } } // override docker-jdk version with -PdockerJavaVersion=21|25 diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java index d86ec1034d588..40395d79c8d5b 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java @@ -38,8 +38,7 @@ import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock; import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry; import org.apache.bookkeeper.net.BookieId; -import org.apache.bookkeeper.proto.DataFormats; -import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat; +import org.apache.bookkeeper.proto.LedgerMetadataFormat; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; @CustomLog @@ -181,7 +180,7 @@ private static class InternalLedgerMetadata implements LedgerMetadata { private int ackQuorumSize; private long lastEntryId; private long length; - private DataFormats.LedgerMetadataFormat.DigestType digestType; + private LedgerMetadataFormat.DigestType digestType; private long ctime; private State state; private Map customMetadata = Maps.newHashMap(); @@ -199,14 +198,14 @@ private static class InternalLedgerMetadata implements LedgerMetadata { this.state = org.apache.bookkeeper.client.api.LedgerMetadata.State.valueOf( ledgerMetadataFormat.getState().toString()); - if (ledgerMetadataFormat.getCustomMetadataCount() > 0) { - ledgerMetadataFormat.getCustomMetadataList().forEach( - entry -> this.customMetadata.put(entry.getKey(), entry.getValue().toByteArray())); + if (ledgerMetadataFormat.getCustomMetadatasCount() > 0) { + ledgerMetadataFormat.getCustomMetadatasList().forEach( + entry -> this.customMetadata.put(entry.getKey(), entry.getValue())); } - ledgerMetadataFormat.getSegmentList().forEach(segment -> { + ledgerMetadataFormat.getSegmentsList().forEach(segment -> { ArrayList addressArrayList = new ArrayList<>(); - segment.getEnsembleMemberList().forEach(address -> { + segment.getEnsembleMembersList().forEach(address -> { try { addressArrayList.add(BookieId.parse(address)); } catch (IllegalArgumentException e) { @@ -325,9 +324,9 @@ public String toSafeString() { } private static LedgerMetadata parseLedgerMetadata(byte[] bytes) throws IOException { - LedgerMetadataFormat.Builder builder = LedgerMetadataFormat.newBuilder(); - builder.mergeFrom(bytes); - return new InternalLedgerMetadata(builder.build()); + LedgerMetadataFormat builder = new LedgerMetadataFormat(); + builder.parseFrom(bytes); + return new InternalLedgerMetadata(builder); } private OffloadIndexBlock fromStream(DataInputStream dis) throws IOException {