Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
)
from google.cloud.storage import __version__

_DEFAULT_HOST = "storage.googleapis.com"


class AsyncGrpcClient:
"""An asynchronous client for interacting with Google Cloud Storage using the gRPC API.
Expand Down Expand Up @@ -109,7 +111,15 @@ def _create_async_grpc_client(

primary_user_agent = client_info.to_user_agent()

host = _DEFAULT_HOST
quota_project_id = None
if client_options:
host = getattr(client_options, "api_endpoint", None) or _DEFAULT_HOST
quota_project_id = getattr(client_options, "quota_project_id", None)

channel = transport_cls.create_channel(
host=host,
quota_project_id=quota_project_id,
attempt_direct_path=attempt_direct_path,
credentials=credentials,
options=(("grpc.primary_user_agent", primary_user_agent),),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,29 @@ def publish_benchmark_extra_info(
benchmark.extra_info["bucket_name"] = params.bucket_name
benchmark.extra_info["bucket_type"] = params.bucket_type
benchmark.extra_info["processes"] = params.num_processes
benchmark.extra_info["num_downloads_after_open"] = params.num_downloads_after_open
benchmark.extra_info["ignore_first_download"] = params.ignore_first_download
benchmark.group = benchmark_group

if download_bytes_list is not None:
assert duration is not None, (
"Duration must be provided if total_bytes_transferred is provided."
)
throughputs_list = [x / duration / (1024 * 1024) for x in download_bytes_list]
effective_downloads = params.num_downloads_after_open
if params.ignore_first_download:
effective_downloads -= 1
effective_downloads = max(1, effective_downloads)

if params.pattern == "whole":
# duration is total time for all rounds
duration_per_round = duration / len(download_bytes_list)
throughputs_list = [
(x / duration_per_round / effective_downloads) / (1024 * 1024)
for x in download_bytes_list
]
else:
# duration is time per round
throughputs_list = [x / duration / (1024 * 1024) for x in download_bytes_list]
min_throughput = min(throughputs_list)
max_throughput = max(throughputs_list)
mean_throughput = statistics.mean(throughputs_list)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import os
from typing import Dict, List

import yaml

try:
from tests.perf.microbenchmarks.time_based.reads_regional.parameters import (
TimeBasedReadParameters,
)
except ModuleNotFoundError:
from reads_regional.parameters import TimeBasedReadParameters


def _get_params() -> Dict[str, List[TimeBasedReadParameters]]:
"""Generates a dictionary of benchmark parameters for time based read operations."""
params: Dict[str, List[TimeBasedReadParameters]] = {}
config_path = os.path.join(os.path.dirname(__file__), "config.yaml")
with open(config_path, "r") as f:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

It is recommended to specify an explicit encoding (e.g., encoding="utf-8") when opening files to ensure consistent behavior across different platforms and locales.

Suggested change
with open(config_path, "r") as f:
with open(config_path, "r", encoding="utf-8") as f:

config = yaml.safe_load(f)

common_params = config["common"]
read_types = common_params["read_types"]
file_sizes = common_params["file_sizes"]
chunk_sizes_kib = common_params["chunk_sizes_kib"]
num_ranges = common_params["num_ranges"]
rounds = common_params["rounds"]
duration = common_params["duration"]
warmup_duration = common_params["warmup_duration"]
num_downloads_after_open = common_params["num_downloads_after_open"]
ignore_first_download = common_params["ignore_first_download"]

# All read types use the same regional bucket
bucket_name = os.environ.get(
"DEFAULT_STANDARD_BUCKET", config["defaults"]["DEFAULT_STANDARD_BUCKET"]
)

for workload in config["workload"]:
workload_name = workload["name"]
params[workload_name] = []
pattern = workload["pattern"]
processes = workload["processes"]
coros = workload["coros"]

# Create a product of all parameter combinations
product = itertools.product(
read_types,
file_sizes,
chunk_sizes_kib,
num_ranges,
processes,
coros,
)

for (
read_type,
file_size,
chunk_size_kib,
num_ranges_val,
num_processes,
num_coros,
) in product:
file_size_bytes = file_size
chunk_size_bytes = chunk_size_kib * 1024

num_files = num_processes

# Create a descriptive name for the parameter set
name = f"{pattern}_{read_type}_{num_processes}p_{num_coros}c_{file_size / (1024 * 1024)}MiB_{chunk_size_kib}KiB_{num_ranges_val}ranges"

params[workload_name].append(
TimeBasedReadParameters(
name=name,
workload_name=workload_name,
pattern=pattern,
bucket_name=bucket_name,
bucket_type="regional",
read_type=read_type,
num_coros=num_coros,
num_processes=num_processes,
num_files=num_files,
rounds=rounds,
chunk_size_bytes=chunk_size_bytes,
file_size_bytes=file_size_bytes,
duration=duration,
warmup_duration=warmup_duration,
num_ranges=num_ranges_val,
num_downloads_after_open=num_downloads_after_open,
ignore_first_download=ignore_first_download,
)
)
return params
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
common:
read_types:
- "async_json"
- "async_grpc_dp"
- "async_grpc_cp"
file_sizes:
- 10737418240 # 10GiB in bytes
chunk_sizes_kib: [64]
num_ranges: [1]
rounds: 1
num_downloads_after_open: 3
ignore_first_download: true
duration: 30 # seconds
warmup_duration: 5 # seconds

workload:
############# multi process multi coroutine #########
- name: "read_seq_multi_process"
pattern: "seq"
coros: [1]
processes: [96]

- name: "read_rand_multi_process"
pattern: "rand"
coros: [1, 16]
processes: [1]

- name: "read_whole_multi_process"
pattern: "whole"
coros: [1]
processes: [1]

defaults:
DEFAULT_STANDARD_BUCKET: "chandrasiri-benchmarks-rb"
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass

from tests.perf.microbenchmarks.parameters import IOBenchmarkParameters


@dataclass
class TimeBasedReadParameters(IOBenchmarkParameters):
pattern: str
duration: int
warmup_duration: int
num_ranges: int
read_type: str
num_downloads_after_open: int
ignore_first_download: bool
Loading
Loading