Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cpp/src/arrow/filesystem/gcsfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,8 @@ class GcsRandomAccessFile : public arrow::io::RandomAccessFile {

class GcsFileSystem::Impl {
public:
explicit Impl(GcsOptions o)
: options_(std::move(o)), client_(internal::AsGoogleCloudOptions(options_)) {}
explicit Impl(GcsOptions o, const io::IOContext& io_context)
: options_(std::move(o)), client_(internal::AsGoogleCloudOptions(options_, &io_context)) {}

const GcsOptions& options() const { return options_; }

Expand Down Expand Up @@ -975,7 +975,7 @@ Result<std::shared_ptr<GcsFileSystem>> GcsFileSystem::Make(
}

GcsFileSystem::GcsFileSystem(const GcsOptions& options, const io::IOContext& context)
: FileSystem(context), impl_(std::make_shared<Impl>(options)) {}
: FileSystem(context), impl_(std::make_shared<Impl>(options, context)) {}

} // namespace fs
} // namespace arrow
15 changes: 14 additions & 1 deletion cpp/src/arrow/filesystem/gcsfs_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@

#include <absl/time/time.h> // NOLINT

#include "arrow/io/interfaces.h"
#include "arrow/io/type_fwd.h"
#include "arrow/util/thread_pool.h"

#include "arrow/filesystem/path_util.h"
#include "arrow/util/io_util.h"
#include "arrow/util/key_value_metadata.h"
Expand Down Expand Up @@ -314,7 +318,7 @@ std::int64_t Depth(std::string_view path) {
// client library will upload this buffer with zero copies if possible.
auto constexpr kUploadBufferSize = 256 * 1024;

google::cloud::Options AsGoogleCloudOptions(const GcsOptions& o) {
google::cloud::Options AsGoogleCloudOptions(const GcsOptions& o, const io::IOContext* io_context) {
auto options = google::cloud::Options{};
std::string scheme = o.scheme;
if (scheme.empty()) scheme = "https";
Expand Down Expand Up @@ -342,6 +346,15 @@ google::cloud::Options AsGoogleCloudOptions(const GcsOptions& o) {
if (o.project_id.has_value()) {
options.set<gcs::ProjectIdOption>(*o.project_id);
}

if (io_context && io_context->executor()) {
options.set<gcs::ConnectionPoolSizeOption>(
std::max(io_context->executor()->GetCapacity(), 4));
} else {
options.set<gcs::ConnectionPoolSizeOption>(
std::max(::arrow::io::GetIOThreadPoolCapacity(), 4));
}

return options;
}

Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/filesystem/gcsfs_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ ARROW_EXPORT Result<std::shared_ptr<const KeyValueMetadata>> FromObjectMetadata(

ARROW_EXPORT std::int64_t Depth(std::string_view path);

ARROW_EXPORT google::cloud::Options AsGoogleCloudOptions(const GcsOptions& options);
ARROW_EXPORT google::cloud::Options AsGoogleCloudOptions(const GcsOptions& options,
const io::IOContext* io_context = nullptr);

} // namespace internal
} // namespace fs
Expand Down
28 changes: 26 additions & 2 deletions cpp/src/arrow/filesystem/gcsfs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "arrow/testing/util.h"
#include "arrow/util/future.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/thread_pool.h"

namespace arrow {
namespace fs {
Expand Down Expand Up @@ -423,22 +424,45 @@ TEST(GcsFileSystem, OptionsAsGoogleCloudOptions) {
a.retry_limit_seconds = 40.5;
a.project_id = "test-only-invalid-project-id";

const auto o1 = internal::AsGoogleCloudOptions(a);
auto io_context = io::default_io_context();
const auto o1 = internal::AsGoogleCloudOptions(a, &io_context);
EXPECT_TRUE(o1.has<google::cloud::UnifiedCredentialsOption>());
EXPECT_TRUE(o1.has<gcs::RetryPolicyOption>());
EXPECT_EQ(o1.get<gcs::RestEndpointOption>(), "http://localhost:8080");
EXPECT_EQ(o1.get<gcs::ProjectIdOption>(), "test-only-invalid-project-id");
EXPECT_TRUE(o1.has<gcs::ConnectionPoolSizeOption>());
EXPECT_EQ(o1.get<gcs::ConnectionPoolSizeOption>(), std::max(io_context.executor()->GetCapacity(), 4));

a.scheme.clear();
a.endpoint_override.clear();
a.retry_limit_seconds.reset();
a.project_id.reset();

const auto o2 = internal::AsGoogleCloudOptions(a);
const auto o2 = internal::AsGoogleCloudOptions(a, &io_context);
EXPECT_TRUE(o2.has<google::cloud::UnifiedCredentialsOption>());
EXPECT_FALSE(o2.has<gcs::RetryPolicyOption>());
EXPECT_FALSE(o2.has<gcs::RestEndpointOption>());
EXPECT_FALSE(o2.has<gcs::ProjectIdOption>());
EXPECT_TRUE(o2.has<gcs::ConnectionPoolSizeOption>());
EXPECT_EQ(o2.get<gcs::ConnectionPoolSizeOption>(), std::max(io_context.executor()->GetCapacity(), 4));
}

TEST(GcsFileSystem, OptionsConnectionPoolSizeFallback) {
auto a = GcsOptions::Anonymous();

// Without passing io_context, it should fallback to arrow::io::GetIOThreadPoolCapacity()
int initial_capacity = arrow::io::GetIOThreadPoolCapacity();
const auto o1 = internal::AsGoogleCloudOptions(a);
EXPECT_TRUE(o1.has<gcs::ConnectionPoolSizeOption>());
EXPECT_EQ(o1.get<gcs::ConnectionPoolSizeOption>(), std::max(initial_capacity, 4));

// Change the thread pool capacity and verify mapping reflects changes
ASSERT_OK(arrow::io::SetIOThreadPoolCapacity(initial_capacity + 10));
const auto o2 = internal::AsGoogleCloudOptions(a);
EXPECT_EQ(o2.get<gcs::ConnectionPoolSizeOption>(), std::max(initial_capacity + 10, 4));

// Restore the original pool capacity
ASSERT_OK(arrow::io::SetIOThreadPoolCapacity(initial_capacity));
}

TEST(GcsFileSystem, ToArrowStatusOK) {
Expand Down