diff --git a/cpp/src/arrow/filesystem/gcsfs.cc b/cpp/src/arrow/filesystem/gcsfs.cc index ffeba9eadc3..86b7dd58c24 100644 --- a/cpp/src/arrow/filesystem/gcsfs.cc +++ b/cpp/src/arrow/filesystem/gcsfs.cc @@ -334,8 +334,8 @@ class GcsRandomAccessFile : public arrow::io::RandomAccessFile { class GcsFileSystem::Impl { public: - explicit Impl(GcsOptions o) - : options_(std::move(o)), client_(internal::AsGoogleCloudOptions(options_)) {} + explicit Impl(GcsOptions o, const io::IOContext& io_context) + : options_(std::move(o)), client_(internal::AsGoogleCloudOptions(options_, &io_context)) {} const GcsOptions& options() const { return options_; } @@ -975,7 +975,7 @@ Result> GcsFileSystem::Make( } GcsFileSystem::GcsFileSystem(const GcsOptions& options, const io::IOContext& context) - : FileSystem(context), impl_(std::make_shared(options)) {} + : FileSystem(context), impl_(std::make_shared(options, context)) {} } // namespace fs } // namespace arrow diff --git a/cpp/src/arrow/filesystem/gcsfs_internal.cc b/cpp/src/arrow/filesystem/gcsfs_internal.cc index 038fe3097b1..49c30d8fde3 100644 --- a/cpp/src/arrow/filesystem/gcsfs_internal.cc +++ b/cpp/src/arrow/filesystem/gcsfs_internal.cc @@ -25,6 +25,10 @@ #include // NOLINT +#include "arrow/io/interfaces.h" +#include "arrow/io/type_fwd.h" +#include "arrow/util/thread_pool.h" + #include "arrow/filesystem/path_util.h" #include "arrow/util/io_util.h" #include "arrow/util/key_value_metadata.h" @@ -314,7 +318,7 @@ std::int64_t Depth(std::string_view path) { // client library will upload this buffer with zero copies if possible. auto constexpr kUploadBufferSize = 256 * 1024; -google::cloud::Options AsGoogleCloudOptions(const GcsOptions& o) { +google::cloud::Options AsGoogleCloudOptions(const GcsOptions& o, const io::IOContext* io_context) { auto options = google::cloud::Options{}; std::string scheme = o.scheme; if (scheme.empty()) scheme = "https"; @@ -342,6 +346,15 @@ google::cloud::Options AsGoogleCloudOptions(const GcsOptions& o) { if (o.project_id.has_value()) { options.set(*o.project_id); } + + if (io_context && io_context->executor()) { + options.set( + std::max(io_context->executor()->GetCapacity(), 4)); + } else { + options.set( + std::max(::arrow::io::GetIOThreadPoolCapacity(), 4)); + } + return options; } diff --git a/cpp/src/arrow/filesystem/gcsfs_internal.h b/cpp/src/arrow/filesystem/gcsfs_internal.h index c1fd7891c38..741d8e9fe13 100644 --- a/cpp/src/arrow/filesystem/gcsfs_internal.h +++ b/cpp/src/arrow/filesystem/gcsfs_internal.h @@ -65,7 +65,8 @@ ARROW_EXPORT Result> FromObjectMetadata( ARROW_EXPORT std::int64_t Depth(std::string_view path); -ARROW_EXPORT google::cloud::Options AsGoogleCloudOptions(const GcsOptions& options); +ARROW_EXPORT google::cloud::Options AsGoogleCloudOptions(const GcsOptions& options, + const io::IOContext* io_context = nullptr); } // namespace internal } // namespace fs diff --git a/cpp/src/arrow/filesystem/gcsfs_test.cc b/cpp/src/arrow/filesystem/gcsfs_test.cc index e174638b535..949de9bc190 100644 --- a/cpp/src/arrow/filesystem/gcsfs_test.cc +++ b/cpp/src/arrow/filesystem/gcsfs_test.cc @@ -37,6 +37,7 @@ #include "arrow/testing/util.h" #include "arrow/util/future.h" #include "arrow/util/key_value_metadata.h" +#include "arrow/util/thread_pool.h" namespace arrow { namespace fs { @@ -423,22 +424,45 @@ TEST(GcsFileSystem, OptionsAsGoogleCloudOptions) { a.retry_limit_seconds = 40.5; a.project_id = "test-only-invalid-project-id"; - const auto o1 = internal::AsGoogleCloudOptions(a); + auto io_context = io::default_io_context(); + const auto o1 = internal::AsGoogleCloudOptions(a, &io_context); EXPECT_TRUE(o1.has()); EXPECT_TRUE(o1.has()); EXPECT_EQ(o1.get(), "http://localhost:8080"); EXPECT_EQ(o1.get(), "test-only-invalid-project-id"); + EXPECT_TRUE(o1.has()); + EXPECT_EQ(o1.get(), std::max(io_context.executor()->GetCapacity(), 4)); a.scheme.clear(); a.endpoint_override.clear(); a.retry_limit_seconds.reset(); a.project_id.reset(); - const auto o2 = internal::AsGoogleCloudOptions(a); + const auto o2 = internal::AsGoogleCloudOptions(a, &io_context); EXPECT_TRUE(o2.has()); EXPECT_FALSE(o2.has()); EXPECT_FALSE(o2.has()); EXPECT_FALSE(o2.has()); + EXPECT_TRUE(o2.has()); + EXPECT_EQ(o2.get(), std::max(io_context.executor()->GetCapacity(), 4)); +} + +TEST(GcsFileSystem, OptionsConnectionPoolSizeFallback) { + auto a = GcsOptions::Anonymous(); + + // Without passing io_context, it should fallback to arrow::io::GetIOThreadPoolCapacity() + int initial_capacity = arrow::io::GetIOThreadPoolCapacity(); + const auto o1 = internal::AsGoogleCloudOptions(a); + EXPECT_TRUE(o1.has()); + EXPECT_EQ(o1.get(), std::max(initial_capacity, 4)); + + // Change the thread pool capacity and verify mapping reflects changes + ASSERT_OK(arrow::io::SetIOThreadPoolCapacity(initial_capacity + 10)); + const auto o2 = internal::AsGoogleCloudOptions(a); + EXPECT_EQ(o2.get(), std::max(initial_capacity + 10, 4)); + + // Restore the original pool capacity + ASSERT_OK(arrow::io::SetIOThreadPoolCapacity(initial_capacity)); } TEST(GcsFileSystem, ToArrowStatusOK) {