From e8a677d9c0961d5c3e64d36d1d50371a3c244d4c Mon Sep 17 00:00:00 2001 From: Yaniv Michael Kaul Date: Sat, 24 Jan 2026 13:21:19 +0200 Subject: [PATCH 1/3] (improvement) unit tests for benchmarking query planning. Not a very scientific one, but reasonable to get some measurements in terms of how different optimizations work. Example run (on https://github.com/scylladb/python-driver/pull/650 branch): ykaul@ykaul:~/github/python-driver$ pytest -s tests/unit/test_policy_performance.py /usr/lib/python3.14/site-packages/pytest_asyncio/plugin.py:211: PytestDeprecationWarning: The configuration option "asyncio_default_fixture_loop_scope" is unset. The event loop scope for asynchronous fixtures will default to the fixture caching scope. Future versions of pytest-asyncio will default the loop scope for asynchronous fixtures to function scope. Set the default fixture loop scope explicitly in order to avoid unexpected behavior in the future. Valid fixture loop scopes are: "function", "class", "module", "package", "session" warnings.warn(PytestDeprecationWarning(_DEFAULT_FIXTURE_LOOP_SCOPE_UNSET)) ============================================================================================================ test session starts ============================================================================================================= platform linux -- Python 3.14.2, pytest-8.3.5, pluggy-1.6.0 rootdir: /home/ykaul/github/python-driver configfile: pyproject.toml plugins: asyncio-1.1.0, anyio-4.12.1 asyncio: mode=Mode.STRICT, asyncio_default_fixture_loop_scope=None, asyncio_default_test_loop_scope=function collected 4 items tests/unit/test_policy_performance.py Pinned to CPU 0 .... === Performance Benchmarks === Policy | Ops | Time (s) | Kops/s ---------------------------------------------------------------------- DCAware | 100000 | 0.2328 | 429 RackAware | 100000 | 0.3637 | 274 TokenAware(DCAware) | 100000 | 1.5884 | 62 TokenAware(RackAware) | 100000 | 1.6816 | 59 ---------------------------------------------------------------------- Signed-off-by: Yaniv Kaul --- tests/unit/test_policy_performance.py | 214 ++++++++++++++++++++++++++ 1 file changed, 214 insertions(+) create mode 100644 tests/unit/test_policy_performance.py diff --git a/tests/unit/test_policy_performance.py b/tests/unit/test_policy_performance.py new file mode 100644 index 0000000000..ecb31ca939 --- /dev/null +++ b/tests/unit/test_policy_performance.py @@ -0,0 +1,214 @@ +import unittest +import time +import uuid +import struct +import os +import statistics +from unittest.mock import Mock + +from cassandra.policies import ( + DCAwareRoundRobinPolicy, + RackAwareRoundRobinPolicy, + TokenAwarePolicy, + DefaultLoadBalancingPolicy, + HostFilterPolicy +) +from cassandra.pool import Host +from cassandra.cluster import SimpleConvictionPolicy + +# Mock for Connection/EndPoint since Host expects it +class MockEndPoint(object): + __slots__ = ('address',) + + def __init__(self, address): + self.address = address + def __str__(self): + return self.address + +class MockStatement(object): + __slots__ = ('routing_key', 'keyspace', 'table') + + def __init__(self, routing_key, keyspace="ks", table="tbl"): + self.routing_key = routing_key + self.keyspace = keyspace + self.table = table + + def is_lwt(self): + return False + +class MockTokenMap(object): + __slots__ = ('token_class', 'get_replicas_func') + def __init__(self, get_replicas_func): + self.token_class = Mock() + self.token_class.from_key = lambda k: k + self.get_replicas_func = get_replicas_func + + def get_replicas(self, keyspace, token): + return self.get_replicas_func(keyspace, token) + +class MockTablets(object): + __slots__ = () + def get_tablet_for_key(self, keyspace, table, key): + return None + +class MockMetadata(object): + __slots__ = ('_tablets', 'token_map', 'get_replicas_func', 'hosts_by_address') + def __init__(self, get_replicas_func, hosts_by_address): + self._tablets = MockTablets() + self.token_map = MockTokenMap(get_replicas_func) + self.get_replicas_func = get_replicas_func + self.hosts_by_address = hosts_by_address + + def can_support_partitioner(self): + return True + + def get_replicas(self, keyspace, key): + return self.get_replicas_func(keyspace, key) + + def get_host(self, addr): + return self.hosts_by_address.get(addr) + +class MockCluster(object): + __slots__ = ('metadata',) + def __init__(self, metadata): + self.metadata = metadata + +class TestPolicyPerformance(unittest.TestCase): + @classmethod + def setUpClass(cls): + if hasattr(os, 'sched_setaffinity'): + try: + # Pin to the first available CPU + cpu = list(os.sched_getaffinity(0))[0] + os.sched_setaffinity(0, {cpu}) + print(f"Pinned to CPU {cpu}") + except Exception as e: + print(f"Could not pin CPU: {e}") + + # 1. Topology: 5 DCs, 3 Racks/DC, 3 Nodes/Rack = 45 Nodes + cls.hosts = [] + cls.hosts_map = {} # host_id -> Host + cls.replicas_map = {} # routing_key -> list of replica hosts + + # Deterministic generation + dcs = ['dc{}'.format(i) for i in range(5)] + racks = ['rack{}'.format(i) for i in range(3)] + nodes_per_rack = 3 + + ip_counter = 0 + subnet_counter = 0 + for dc in dcs: + for rack in racks: + subnet_counter += 1 + for node_idx in range(nodes_per_rack): + ip_counter += 1 + address = "127.0.{}.{}".format(subnet_counter, node_idx + 1) + h_id = uuid.UUID(int=ip_counter) + h = Host(MockEndPoint(address), SimpleConvictionPolicy, host_id=h_id) + h.set_location_info(dc, rack) + cls.hosts.append(h) + cls.hosts_map[h_id] = h + + # 2. Queries: 100,000 deterministic queries + cls.query_count = 100000 + cls.queries = [] + cls.results = [] + # We'll use simple packed integers as routing keys + for i in range(cls.query_count): + key = struct.pack('>I', i) + cls.queries.append(MockStatement(routing_key=key)) + + # Pre-calculate replicas for TokenAware: + # Deterministically pick 3 replicas based on the key index + # This simulates the metadata.get_replicas behavior + # We pick index i, i+1, i+2 mod 45 + replicas = [] + for r in range(3): + idx = (i + r) % len(cls.hosts) + replicas.append(cls.hosts[idx]) + cls.replicas_map[key] = replicas + + def _get_replicas_side_effect(self, keyspace, key): + return self.replicas_map.get(key, []) + + def _setup_cluster_mock(self): + hosts_by_address = {} + for host in self.hosts: + addr = getattr(host, 'address', None) + if addr is None and getattr(host, 'endpoint', None) is not None: + addr = getattr(host.endpoint, 'address', None) + if addr is not None: + hosts_by_address[addr] = host + metadata = MockMetadata(self._get_replicas_side_effect, hosts_by_address) + return MockCluster(metadata) + + def _run_benchmark(self, name, policy): + # Setup + cluster = self._setup_cluster_mock() + policy.populate(cluster, self.hosts) + + # Warmup + for _ in range(100): + list(policy.make_query_plan(working_keyspace="ks", query=self.queries[0])) + + # Run multiple iterations to reduce noise + iterations = 5 + timings = [] + + for _ in range(iterations): + start_time = time.perf_counter() + for q in self.queries: + # We consume the iterator to ensure full plan generation cost is paid + for _ in policy.make_query_plan(working_keyspace="ks", query=q): + pass + end_time = time.perf_counter() + timings.append(end_time - start_time) + + # Use median to filter outliers + duration = statistics.median(timings) + + count = len(self.queries) + ops_per_sec = count / duration + kops = int(ops_per_sec / 1000) + + self.results.append((name, count, duration, kops)) + return ops_per_sec + + @classmethod + def tearDownClass(cls): + print("\n\n=== Performance Benchmarks ===") + print(f"{'Policy':<30} | {'Ops':<10} | {'Time (s)':<10} | {'Kops/s':<10}") + print("-" * 70) + for name, count, duration, kops in cls.results: + print(f"{name:<30} | {count:<10} | {duration:<10.4f} | {kops:<10}") + print("-" * 70) + + def test_dc_aware(self): + # Local DC = dc0, 1 remote host per DC + policy = DCAwareRoundRobinPolicy(local_dc='dc0', used_hosts_per_remote_dc=1) + self._run_benchmark("DCAware", policy) + + def test_rack_aware(self): + # Local DC = dc0, Local Rack = rack0, 1 remote host per DC + policy = RackAwareRoundRobinPolicy(local_dc='dc0', local_rack='rack0', used_hosts_per_remote_dc=1) + self._run_benchmark("RackAware", policy) + + def test_token_aware_wrapping_dc_aware(self): + child = DCAwareRoundRobinPolicy(local_dc='dc0', used_hosts_per_remote_dc=1) + policy = TokenAwarePolicy(child, shuffle_replicas=False) # False for strict determinism in test if needed + self._run_benchmark("TokenAware(DCAware)", policy) + + def test_token_aware_wrapping_rack_aware(self): + child = RackAwareRoundRobinPolicy(local_dc='dc0', local_rack='rack0', used_hosts_per_remote_dc=1) + policy = TokenAwarePolicy(child, shuffle_replicas=False) + self._run_benchmark("TokenAware(RackAware)", policy) + + def test_default_wrapping_dc_aware(self): + child = DCAwareRoundRobinPolicy(local_dc='dc0', used_hosts_per_remote_dc=1) + policy = DefaultLoadBalancingPolicy(child) + self._run_benchmark("Default(DCAware)", policy) + + def test_host_filter_wrapping_dc_aware(self): + child = DCAwareRoundRobinPolicy(local_dc='dc0', used_hosts_per_remote_dc=1) + policy = HostFilterPolicy(child_policy=child, predicate=lambda host: host.rack != 'rack2') + self._run_benchmark("HostFilter(DCAware)", policy) From 47b64aa51b4bb1ff7173da6360468aa3d296eb87 Mon Sep 17 00:00:00 2001 From: Yaniv Michael Kaul Date: Sun, 25 Jan 2026 16:01:26 +0200 Subject: [PATCH 2/3] (improvement) add HostPolicy to the benchmark suite. Signed-off-by: Yaniv Kaul --- tests/unit/test_policy_performance.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/unit/test_policy_performance.py b/tests/unit/test_policy_performance.py index ecb31ca939..fbe80cfb26 100644 --- a/tests/unit/test_policy_performance.py +++ b/tests/unit/test_policy_performance.py @@ -6,6 +6,8 @@ import statistics from unittest.mock import Mock +"A micro-bechmark for performance of policies" + from cassandra.policies import ( DCAwareRoundRobinPolicy, RackAwareRoundRobinPolicy, From d850f81bae5b841fcbedec975d2ca4b5344f37aa Mon Sep 17 00:00:00 2001 From: Yaniv Michael Kaul Date: Fri, 6 Feb 2026 10:58:44 +0200 Subject: [PATCH 3/3] (improvement)change test_policy_performance tests to be a benchmark test 1. Move to regular Pytest, within the performance subdir, as part of a benchmark module 2. Renamed tests/integration/standard/column_encryption/test_policies.py -> test_encrypted_policies.py - we had two test_policies.py which conflicted when trying to run all unit tests. Example run: (scylla-driver) ykaul@ykaul:~/github/python-driver$ SCYLLA_VERSION=release:2025.4.2 PROTOCOL_VRESION=4 pytest -s -m benchmark ============================================================================================================ test session starts ============================================================================================================= platform linux -- Python 3.14.2, pytest-8.4.2, pluggy-1.6.0 rootdir: /home/ykaul/github/python-driver configfile: pyproject.toml collected 1798 items / 1792 deselected / 6 selected tests/performance/test_policy_performance.py Pinned to CPU 0 DCAware | 100000 | 0.1176 | 850 Kops/s . RackAware | 100000 | 0.1774 | 563 Kops/s . TokenAware(DCAware) | 100000 | 0.6666 | 150 Kops/s . TokenAware(RackAware) | 100000 | 0.7195 | 138 Kops/s . Default(DCAware) | 100000 | 0.1481 | 675 Kops/s . HostFilter(DCAware) | 100000 | 0.2416 | 413 Kops/s . Signed-off-by: Yaniv Kaul --- pyproject.toml | 3 + ...policies.py => test_encrypted_policies.py} | 0 tests/performance/test_policy_performance.py | 242 ++++++++++++++++++ tests/unit/test_policy_performance.py | 216 ---------------- 4 files changed, 245 insertions(+), 216 deletions(-) rename tests/integration/standard/column_encryption/{test_policies.py => test_encrypted_policies.py} (100%) create mode 100644 tests/performance/test_policy_performance.py delete mode 100644 tests/unit/test_policy_performance.py diff --git a/pyproject.toml b/pyproject.toml index b19b38934c..aa303a5f32 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -122,6 +122,9 @@ log_level = "DEBUG" log_date_format = "%Y-%m-%d %H:%M:%S" xfail_strict = true addopts = "-rf" +markers = [ + "benchmark: marks tests as performance benchmarks (deselect with '-m \"not benchmark\"')", +] [tool.setuptools_scm] version_file = "cassandra/_version.py" diff --git a/tests/integration/standard/column_encryption/test_policies.py b/tests/integration/standard/column_encryption/test_encrypted_policies.py similarity index 100% rename from tests/integration/standard/column_encryption/test_policies.py rename to tests/integration/standard/column_encryption/test_encrypted_policies.py diff --git a/tests/performance/test_policy_performance.py b/tests/performance/test_policy_performance.py new file mode 100644 index 0000000000..7a422f2550 --- /dev/null +++ b/tests/performance/test_policy_performance.py @@ -0,0 +1,242 @@ +import time +import uuid +import struct +import os +import statistics +from unittest.mock import Mock +import pytest + +"A micro-benchmark for performance of policies" + +from cassandra.policies import ( + DCAwareRoundRobinPolicy, + RackAwareRoundRobinPolicy, + TokenAwarePolicy, + DefaultLoadBalancingPolicy, + HostFilterPolicy +) +from cassandra.pool import Host +from cassandra.cluster import SimpleConvictionPolicy + +# Mock for Connection/EndPoint since Host expects it +class MockEndPoint(object): + __slots__ = ('address',) + + def __init__(self, address): + self.address = address + def __str__(self): + return self.address + +class MockStatement(object): + __slots__ = ('routing_key', 'keyspace', 'table') + + def __init__(self, routing_key, keyspace="ks", table="tbl"): + self.routing_key = routing_key + self.keyspace = keyspace + self.table = table + + def is_lwt(self): + return False + +class MockTokenMap(object): + __slots__ = ('token_class', 'get_replicas_func') + def __init__(self, get_replicas_func): + self.token_class = Mock() + self.token_class.from_key = lambda k: k + self.get_replicas_func = get_replicas_func + + def get_replicas(self, keyspace, token): + return self.get_replicas_func(keyspace, token) + +class MockTablets(object): + __slots__ = () + def get_tablet_for_key(self, keyspace, table, key): + return None + +class MockMetadata(object): + __slots__ = ('_tablets', 'token_map', 'get_replicas_func', 'hosts_by_address') + def __init__(self, get_replicas_func, hosts_by_address): + self._tablets = MockTablets() + self.token_map = MockTokenMap(get_replicas_func) + self.get_replicas_func = get_replicas_func + self.hosts_by_address = hosts_by_address + + def can_support_partitioner(self): + return True + + def get_replicas(self, keyspace, key): + return self.get_replicas_func(keyspace, key) + + def get_host(self, addr): + return self.hosts_by_address.get(addr) + +class MockCluster(object): + __slots__ = ('metadata',) + def __init__(self, metadata): + self.metadata = metadata + +@pytest.fixture(scope="module") +def benchmark_setup(): + """Setup test data that will be shared across all benchmark tests""" + if hasattr(os, 'sched_setaffinity'): + try: + # Pin to the first available CPU + cpu = list(os.sched_getaffinity(0))[0] + os.sched_setaffinity(0, {cpu}) + print(f"Pinned to CPU {cpu}") + except Exception as e: + print(f"Could not pin CPU: {e}") + + # 1. Topology: 5 DCs, 3 Racks/DC, 3 Nodes/Rack = 45 Nodes + hosts = [] + hosts_map = {} # host_id -> Host + replicas_map = {} # routing_key -> list of replica hosts + + # Deterministic generation + dcs = ['dc{}'.format(i) for i in range(5)] + racks = ['rack{}'.format(i) for i in range(3)] + nodes_per_rack = 3 + + ip_counter = 0 + subnet_counter = 0 + for dc in dcs: + for rack in racks: + subnet_counter += 1 + for node_idx in range(nodes_per_rack): + ip_counter += 1 + address = "127.0.{}.{}".format(subnet_counter, node_idx + 1) + h_id = uuid.UUID(int=ip_counter) + h = Host(MockEndPoint(address), SimpleConvictionPolicy, host_id=h_id) + h.set_location_info(dc, rack) + hosts.append(h) + hosts_map[h_id] = h + + # 2. Queries: 100,000 deterministic queries + query_count = 100000 + queries = [] + # We'll use simple packed integers as routing keys + for i in range(query_count): + key = struct.pack('>I', i) + queries.append(MockStatement(routing_key=key)) + + # Pre-calculate replicas for TokenAware: + # Deterministically pick 3 replicas based on the key index + # This simulates the metadata.get_replicas behavior + # We pick index i, i+1, i+2 mod 45 + replicas = [] + for r in range(3): + idx = (i + r) % len(hosts) + replicas.append(hosts[idx]) + replicas_map[key] = replicas + + return { + 'hosts': hosts, + 'hosts_map': hosts_map, + 'replicas_map': replicas_map, + 'queries': queries, + 'query_count': query_count, + } + + +def _get_replicas_side_effect(replicas_map, keyspace, key): + return replicas_map.get(key, []) + + +def _setup_cluster_mock(hosts, replicas_map): + hosts_by_address = {} + for host in hosts: + addr = getattr(host, 'address', None) + if addr is None and getattr(host, 'endpoint', None) is not None: + addr = getattr(host.endpoint, 'address', None) + if addr is not None: + hosts_by_address[addr] = host + + get_replicas_func = lambda ks, key: _get_replicas_side_effect(replicas_map, ks, key) + metadata = MockMetadata(get_replicas_func, hosts_by_address) + return MockCluster(metadata) + + +def _run_benchmark(name, policy, setup_data): + """Run a benchmark for a given policy""" + hosts = setup_data['hosts'] + queries = setup_data['queries'] + replicas_map = setup_data['replicas_map'] + + # Setup + cluster = _setup_cluster_mock(hosts, replicas_map) + policy.populate(cluster, hosts) + + # Warmup + for _ in range(100): + list(policy.make_query_plan(working_keyspace="ks", query=queries[0])) + + # Run multiple iterations to reduce noise + iterations = 5 + timings = [] + + for _ in range(iterations): + start_time = time.perf_counter() + for q in queries: + # We consume the iterator to ensure full plan generation cost is paid + for _ in policy.make_query_plan(working_keyspace="ks", query=q): + pass + end_time = time.perf_counter() + timings.append(end_time - start_time) + + # Use median to filter outliers + duration = statistics.median(timings) + + count = len(queries) + ops_per_sec = count / duration + kops = int(ops_per_sec / 1000) + + print(f"\n{name:<30} | {count:<10} | {duration:<10.4f} | {kops:<10} Kops/s") + return ops_per_sec + + +@pytest.mark.benchmark +def test_dc_aware(benchmark_setup): + """Benchmark DCAwareRoundRobinPolicy""" + # Local DC = dc0, 1 remote host per DC + policy = DCAwareRoundRobinPolicy(local_dc='dc0', used_hosts_per_remote_dc=1) + _run_benchmark("DCAware", policy, benchmark_setup) + + +@pytest.mark.benchmark +def test_rack_aware(benchmark_setup): + """Benchmark RackAwareRoundRobinPolicy""" + # Local DC = dc0, Local Rack = rack0, 1 remote host per DC + policy = RackAwareRoundRobinPolicy(local_dc='dc0', local_rack='rack0', used_hosts_per_remote_dc=1) + _run_benchmark("RackAware", policy, benchmark_setup) + + +@pytest.mark.benchmark +def test_token_aware_wrapping_dc_aware(benchmark_setup): + """Benchmark TokenAwarePolicy wrapping DCAwareRoundRobinPolicy""" + child = DCAwareRoundRobinPolicy(local_dc='dc0', used_hosts_per_remote_dc=1) + policy = TokenAwarePolicy(child, shuffle_replicas=False) # False for strict determinism in test if needed + _run_benchmark("TokenAware(DCAware)", policy, benchmark_setup) + + +@pytest.mark.benchmark +def test_token_aware_wrapping_rack_aware(benchmark_setup): + """Benchmark TokenAwarePolicy wrapping RackAwareRoundRobinPolicy""" + child = RackAwareRoundRobinPolicy(local_dc='dc0', local_rack='rack0', used_hosts_per_remote_dc=1) + policy = TokenAwarePolicy(child, shuffle_replicas=False) + _run_benchmark("TokenAware(RackAware)", policy, benchmark_setup) + + +@pytest.mark.benchmark +def test_default_wrapping_dc_aware(benchmark_setup): + """Benchmark DefaultLoadBalancingPolicy wrapping DCAwareRoundRobinPolicy""" + child = DCAwareRoundRobinPolicy(local_dc='dc0', used_hosts_per_remote_dc=1) + policy = DefaultLoadBalancingPolicy(child) + _run_benchmark("Default(DCAware)", policy, benchmark_setup) + + +@pytest.mark.benchmark +def test_host_filter_wrapping_dc_aware(benchmark_setup): + """Benchmark HostFilterPolicy wrapping DCAwareRoundRobinPolicy""" + child = DCAwareRoundRobinPolicy(local_dc='dc0', used_hosts_per_remote_dc=1) + policy = HostFilterPolicy(child_policy=child, predicate=lambda host: host.rack != 'rack2') + _run_benchmark("HostFilter(DCAware)", policy, benchmark_setup) diff --git a/tests/unit/test_policy_performance.py b/tests/unit/test_policy_performance.py deleted file mode 100644 index fbe80cfb26..0000000000 --- a/tests/unit/test_policy_performance.py +++ /dev/null @@ -1,216 +0,0 @@ -import unittest -import time -import uuid -import struct -import os -import statistics -from unittest.mock import Mock - -"A micro-bechmark for performance of policies" - -from cassandra.policies import ( - DCAwareRoundRobinPolicy, - RackAwareRoundRobinPolicy, - TokenAwarePolicy, - DefaultLoadBalancingPolicy, - HostFilterPolicy -) -from cassandra.pool import Host -from cassandra.cluster import SimpleConvictionPolicy - -# Mock for Connection/EndPoint since Host expects it -class MockEndPoint(object): - __slots__ = ('address',) - - def __init__(self, address): - self.address = address - def __str__(self): - return self.address - -class MockStatement(object): - __slots__ = ('routing_key', 'keyspace', 'table') - - def __init__(self, routing_key, keyspace="ks", table="tbl"): - self.routing_key = routing_key - self.keyspace = keyspace - self.table = table - - def is_lwt(self): - return False - -class MockTokenMap(object): - __slots__ = ('token_class', 'get_replicas_func') - def __init__(self, get_replicas_func): - self.token_class = Mock() - self.token_class.from_key = lambda k: k - self.get_replicas_func = get_replicas_func - - def get_replicas(self, keyspace, token): - return self.get_replicas_func(keyspace, token) - -class MockTablets(object): - __slots__ = () - def get_tablet_for_key(self, keyspace, table, key): - return None - -class MockMetadata(object): - __slots__ = ('_tablets', 'token_map', 'get_replicas_func', 'hosts_by_address') - def __init__(self, get_replicas_func, hosts_by_address): - self._tablets = MockTablets() - self.token_map = MockTokenMap(get_replicas_func) - self.get_replicas_func = get_replicas_func - self.hosts_by_address = hosts_by_address - - def can_support_partitioner(self): - return True - - def get_replicas(self, keyspace, key): - return self.get_replicas_func(keyspace, key) - - def get_host(self, addr): - return self.hosts_by_address.get(addr) - -class MockCluster(object): - __slots__ = ('metadata',) - def __init__(self, metadata): - self.metadata = metadata - -class TestPolicyPerformance(unittest.TestCase): - @classmethod - def setUpClass(cls): - if hasattr(os, 'sched_setaffinity'): - try: - # Pin to the first available CPU - cpu = list(os.sched_getaffinity(0))[0] - os.sched_setaffinity(0, {cpu}) - print(f"Pinned to CPU {cpu}") - except Exception as e: - print(f"Could not pin CPU: {e}") - - # 1. Topology: 5 DCs, 3 Racks/DC, 3 Nodes/Rack = 45 Nodes - cls.hosts = [] - cls.hosts_map = {} # host_id -> Host - cls.replicas_map = {} # routing_key -> list of replica hosts - - # Deterministic generation - dcs = ['dc{}'.format(i) for i in range(5)] - racks = ['rack{}'.format(i) for i in range(3)] - nodes_per_rack = 3 - - ip_counter = 0 - subnet_counter = 0 - for dc in dcs: - for rack in racks: - subnet_counter += 1 - for node_idx in range(nodes_per_rack): - ip_counter += 1 - address = "127.0.{}.{}".format(subnet_counter, node_idx + 1) - h_id = uuid.UUID(int=ip_counter) - h = Host(MockEndPoint(address), SimpleConvictionPolicy, host_id=h_id) - h.set_location_info(dc, rack) - cls.hosts.append(h) - cls.hosts_map[h_id] = h - - # 2. Queries: 100,000 deterministic queries - cls.query_count = 100000 - cls.queries = [] - cls.results = [] - # We'll use simple packed integers as routing keys - for i in range(cls.query_count): - key = struct.pack('>I', i) - cls.queries.append(MockStatement(routing_key=key)) - - # Pre-calculate replicas for TokenAware: - # Deterministically pick 3 replicas based on the key index - # This simulates the metadata.get_replicas behavior - # We pick index i, i+1, i+2 mod 45 - replicas = [] - for r in range(3): - idx = (i + r) % len(cls.hosts) - replicas.append(cls.hosts[idx]) - cls.replicas_map[key] = replicas - - def _get_replicas_side_effect(self, keyspace, key): - return self.replicas_map.get(key, []) - - def _setup_cluster_mock(self): - hosts_by_address = {} - for host in self.hosts: - addr = getattr(host, 'address', None) - if addr is None and getattr(host, 'endpoint', None) is not None: - addr = getattr(host.endpoint, 'address', None) - if addr is not None: - hosts_by_address[addr] = host - metadata = MockMetadata(self._get_replicas_side_effect, hosts_by_address) - return MockCluster(metadata) - - def _run_benchmark(self, name, policy): - # Setup - cluster = self._setup_cluster_mock() - policy.populate(cluster, self.hosts) - - # Warmup - for _ in range(100): - list(policy.make_query_plan(working_keyspace="ks", query=self.queries[0])) - - # Run multiple iterations to reduce noise - iterations = 5 - timings = [] - - for _ in range(iterations): - start_time = time.perf_counter() - for q in self.queries: - # We consume the iterator to ensure full plan generation cost is paid - for _ in policy.make_query_plan(working_keyspace="ks", query=q): - pass - end_time = time.perf_counter() - timings.append(end_time - start_time) - - # Use median to filter outliers - duration = statistics.median(timings) - - count = len(self.queries) - ops_per_sec = count / duration - kops = int(ops_per_sec / 1000) - - self.results.append((name, count, duration, kops)) - return ops_per_sec - - @classmethod - def tearDownClass(cls): - print("\n\n=== Performance Benchmarks ===") - print(f"{'Policy':<30} | {'Ops':<10} | {'Time (s)':<10} | {'Kops/s':<10}") - print("-" * 70) - for name, count, duration, kops in cls.results: - print(f"{name:<30} | {count:<10} | {duration:<10.4f} | {kops:<10}") - print("-" * 70) - - def test_dc_aware(self): - # Local DC = dc0, 1 remote host per DC - policy = DCAwareRoundRobinPolicy(local_dc='dc0', used_hosts_per_remote_dc=1) - self._run_benchmark("DCAware", policy) - - def test_rack_aware(self): - # Local DC = dc0, Local Rack = rack0, 1 remote host per DC - policy = RackAwareRoundRobinPolicy(local_dc='dc0', local_rack='rack0', used_hosts_per_remote_dc=1) - self._run_benchmark("RackAware", policy) - - def test_token_aware_wrapping_dc_aware(self): - child = DCAwareRoundRobinPolicy(local_dc='dc0', used_hosts_per_remote_dc=1) - policy = TokenAwarePolicy(child, shuffle_replicas=False) # False for strict determinism in test if needed - self._run_benchmark("TokenAware(DCAware)", policy) - - def test_token_aware_wrapping_rack_aware(self): - child = RackAwareRoundRobinPolicy(local_dc='dc0', local_rack='rack0', used_hosts_per_remote_dc=1) - policy = TokenAwarePolicy(child, shuffle_replicas=False) - self._run_benchmark("TokenAware(RackAware)", policy) - - def test_default_wrapping_dc_aware(self): - child = DCAwareRoundRobinPolicy(local_dc='dc0', used_hosts_per_remote_dc=1) - policy = DefaultLoadBalancingPolicy(child) - self._run_benchmark("Default(DCAware)", policy) - - def test_host_filter_wrapping_dc_aware(self): - child = DCAwareRoundRobinPolicy(local_dc='dc0', used_hosts_per_remote_dc=1) - policy = HostFilterPolicy(child_policy=child, predicate=lambda host: host.rack != 'rack2') - self._run_benchmark("HostFilter(DCAware)", policy)