From 105a8969fe0fbc9ee3e8de0d56cf500912c7a601 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 21 Nov 2025 12:33:42 -0800 Subject: [PATCH 1/8] shortened test length --- tests/system/data/test_system_async.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/system/data/test_system_async.py b/tests/system/data/test_system_async.py index 39c454996..579956192 100644 --- a/tests/system/data/test_system_async.py +++ b/tests/system/data/test_system_async.py @@ -266,18 +266,19 @@ async def test_ping_and_warm(self, client, target): @CrossSync.pytest async def test_channel_refresh(self, table_id, instance_id, temp_rows): """ - change grpc channel to refresh after 1 second. Schedule a read_rows call after refresh, + change grpc channel to refresh after 0.1 second. Schedule a read_rows call after refresh, to ensure new channel works """ await temp_rows.add_row(b"row_key_1") await temp_rows.add_row(b"row_key_2") client = self._make_client() # start custom refresh task + client._channel_refresh_task.cancel() try: client._channel_refresh_task = CrossSync.create_task( client._manage_channel, - refresh_interval_min=1, - refresh_interval_max=1, + refresh_interval_min=0.25, + refresh_interval_max=0.25, sync_executor=client._executor, ) # let task run @@ -287,7 +288,7 @@ async def test_channel_refresh(self, table_id, instance_id, temp_rows): channel_wrapper = client.transport.grpc_channel first_channel = channel_wrapper._channel assert len(rows) == 2 - await CrossSync.sleep(2) + await CrossSync.sleep(0.5) rows_after_refresh = await table.read_rows({}) assert len(rows_after_refresh) == 2 assert client.transport.grpc_channel is channel_wrapper From 02a8f63c02416b48a8b5dd0816777ce03d234f19 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 21 Nov 2025 12:34:45 -0800 Subject: [PATCH 2/8] added stress test --- tests/system/data/test_system_async.py | 30 ++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/tests/system/data/test_system_async.py b/tests/system/data/test_system_async.py index 579956192..07505114c 100644 --- a/tests/system/data/test_system_async.py +++ b/tests/system/data/test_system_async.py @@ -311,6 +311,36 @@ async def test_channel_refresh(self, table_id, instance_id, temp_rows): finally: await client.close() + @CrossSync.pytest + async def test_channel_refresh_stress_test(self, table_id, instance_id, temp_rows): + """ + While swapping channels, consistently hit it with reads. Make sure no failures are found + """ + import time + await temp_rows.add_row(b"test_row") + client = self._make_client() + client._channel_refresh_task.cancel() + try: + # swap channels frequently, with large grace windows + client._channel_refresh_task = CrossSync.create_task( + client._manage_channel, + refresh_interval_min=0.1, + refresh_interval_max=0.1, + grace_period=0.2, + sync_executor=client._executor, + ) + + # hit channels with frequent requests + end_time = time.monotonic() + 1 + async with client.get_table(instance_id, table_id) as table: + while time.monotonic() < end_time: + # we expect a CancelledError if a channel is closed before completion + rows = await table.read_rows({}) + assert len(rows) == 1 + await CrossSync.yield_to_event_loop() + finally: + await client.close() + @CrossSync.pytest @pytest.mark.usefixtures("target") @CrossSync.Retry( From 62e62b744785077be361d0001037e4b8004dd58e Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 21 Nov 2025 12:42:24 -0800 Subject: [PATCH 3/8] use context managers --- tests/system/data/test_system_async.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/tests/system/data/test_system_async.py b/tests/system/data/test_system_async.py index 07505114c..7e1f68390 100644 --- a/tests/system/data/test_system_async.py +++ b/tests/system/data/test_system_async.py @@ -271,10 +271,9 @@ async def test_channel_refresh(self, table_id, instance_id, temp_rows): """ await temp_rows.add_row(b"row_key_1") await temp_rows.add_row(b"row_key_2") - client = self._make_client() - # start custom refresh task - client._channel_refresh_task.cancel() - try: + async with self._make_client() as client: + # start custom refresh task + client._channel_refresh_task.cancel() client._channel_refresh_task = CrossSync.create_task( client._manage_channel, refresh_interval_min=0.25, @@ -308,8 +307,6 @@ async def test_channel_refresh(self, table_id, instance_id, temp_rows): client.transport._logged_channel._interceptor, GapicInterceptor ) assert updated_channel._interceptor == client._metrics_interceptor - finally: - await client.close() @CrossSync.pytest async def test_channel_refresh_stress_test(self, table_id, instance_id, temp_rows): @@ -318,9 +315,8 @@ async def test_channel_refresh_stress_test(self, table_id, instance_id, temp_row """ import time await temp_rows.add_row(b"test_row") - client = self._make_client() - client._channel_refresh_task.cancel() - try: + async with self._make_client() as client: + client._channel_refresh_task.cancel() # swap channels frequently, with large grace windows client._channel_refresh_task = CrossSync.create_task( client._manage_channel, @@ -338,8 +334,6 @@ async def test_channel_refresh_stress_test(self, table_id, instance_id, temp_row rows = await table.read_rows({}) assert len(rows) == 1 await CrossSync.yield_to_event_loop() - finally: - await client.close() @CrossSync.pytest @pytest.mark.usefixtures("target") From b3e110f3b6515952907f4987a37b9f6656327d00 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 21 Nov 2025 12:45:04 -0800 Subject: [PATCH 4/8] regenerated sync files --- tests/system/data/test_system_async.py | 5 ++-- tests/system/data/test_system_autogen.py | 37 ++++++++++++++++++------ 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/tests/system/data/test_system_async.py b/tests/system/data/test_system_async.py index 7e1f68390..499b08d4d 100644 --- a/tests/system/data/test_system_async.py +++ b/tests/system/data/test_system_async.py @@ -266,8 +266,8 @@ async def test_ping_and_warm(self, client, target): @CrossSync.pytest async def test_channel_refresh(self, table_id, instance_id, temp_rows): """ - change grpc channel to refresh after 0.1 second. Schedule a read_rows call after refresh, - to ensure new channel works + change grpc channel to refresh quickly, then schedule a read_rows call after refresh + to ensure new channel is in place and works """ await temp_rows.add_row(b"row_key_1") await temp_rows.add_row(b"row_key_2") @@ -314,6 +314,7 @@ async def test_channel_refresh_stress_test(self, table_id, instance_id, temp_row While swapping channels, consistently hit it with reads. Make sure no failures are found """ import time + await temp_rows.add_row(b"test_row") async with self._make_client() as client: client._channel_refresh_task.cancel() diff --git a/tests/system/data/test_system_autogen.py b/tests/system/data/test_system_autogen.py index 37c00f2ae..5df140946 100644 --- a/tests/system/data/test_system_autogen.py +++ b/tests/system/data/test_system_autogen.py @@ -221,16 +221,16 @@ def test_ping_and_warm(self, client, target): reason="emulator mode doesn't refresh channel", ) def test_channel_refresh(self, table_id, instance_id, temp_rows): - """change grpc channel to refresh after 1 second. Schedule a read_rows call after refresh, - to ensure new channel works""" + """change grpc channel to refresh quickly, then schedule a read_rows call after refresh + to ensure new channel is in place and works""" temp_rows.add_row(b"row_key_1") temp_rows.add_row(b"row_key_2") - client = self._make_client() - try: + with self._make_client() as client: + client._channel_refresh_task.cancel() client._channel_refresh_task = CrossSync._Sync_Impl.create_task( client._manage_channel, - refresh_interval_min=1, - refresh_interval_max=1, + refresh_interval_min=0.25, + refresh_interval_max=0.25, sync_executor=client._executor, ) CrossSync._Sync_Impl.yield_to_event_loop() @@ -239,7 +239,7 @@ def test_channel_refresh(self, table_id, instance_id, temp_rows): channel_wrapper = client.transport.grpc_channel first_channel = channel_wrapper._channel assert len(rows) == 2 - CrossSync._Sync_Impl.sleep(2) + CrossSync._Sync_Impl.sleep(0.5) rows_after_refresh = table.read_rows({}) assert len(rows_after_refresh) == 2 assert client.transport.grpc_channel is channel_wrapper @@ -249,8 +249,27 @@ def test_channel_refresh(self, table_id, instance_id, temp_rows): client.transport._logged_channel._interceptor, GapicInterceptor ) assert updated_channel._interceptor == client._metrics_interceptor - finally: - client.close() + + def test_channel_refresh_stress_test(self, table_id, instance_id, temp_rows): + """While swapping channels, consistently hit it with reads. Make sure no failures are found""" + import time + + temp_rows.add_row(b"test_row") + with self._make_client() as client: + client._channel_refresh_task.cancel() + client._channel_refresh_task = CrossSync._Sync_Impl.create_task( + client._manage_channel, + refresh_interval_min=0.1, + refresh_interval_max=0.1, + grace_period=0.2, + sync_executor=client._executor, + ) + end_time = time.monotonic() + 1 + with client.get_table(instance_id, table_id) as table: + while time.monotonic() < end_time: + rows = table.read_rows({}) + assert len(rows) == 1 + CrossSync._Sync_Impl.yield_to_event_loop() @pytest.mark.usefixtures("target") @CrossSync._Sync_Impl.Retry( From 045900aa9caf97ee2d495bb98ad2605f3d9ca8ad Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 21 Nov 2025 13:48:54 -0800 Subject: [PATCH 5/8] combined channel refresh tests --- tests/system/data/test_system_async.py | 70 ++++++++---------------- tests/system/data/test_system_autogen.py | 44 ++++----------- 2 files changed, 33 insertions(+), 81 deletions(-) diff --git a/tests/system/data/test_system_async.py b/tests/system/data/test_system_async.py index 499b08d4d..ac8a358a3 100644 --- a/tests/system/data/test_system_async.py +++ b/tests/system/data/test_system_async.py @@ -266,75 +266,49 @@ async def test_ping_and_warm(self, client, target): @CrossSync.pytest async def test_channel_refresh(self, table_id, instance_id, temp_rows): """ - change grpc channel to refresh quickly, then schedule a read_rows call after refresh - to ensure new channel is in place and works - """ - await temp_rows.add_row(b"row_key_1") - await temp_rows.add_row(b"row_key_2") - async with self._make_client() as client: - # start custom refresh task - client._channel_refresh_task.cancel() - client._channel_refresh_task = CrossSync.create_task( - client._manage_channel, - refresh_interval_min=0.25, - refresh_interval_max=0.25, - sync_executor=client._executor, - ) - # let task run - await CrossSync.yield_to_event_loop() - async with client.get_table(instance_id, table_id) as table: - rows = await table.read_rows({}) - channel_wrapper = client.transport.grpc_channel - first_channel = channel_wrapper._channel - assert len(rows) == 2 - await CrossSync.sleep(0.5) - rows_after_refresh = await table.read_rows({}) - assert len(rows_after_refresh) == 2 - assert client.transport.grpc_channel is channel_wrapper - updated_channel = channel_wrapper._channel - assert updated_channel is not first_channel - # ensure interceptors are kept (gapic's logging interceptor, and metric interceptor) - if CrossSync.is_async: - unary_interceptors = updated_channel._unary_unary_interceptors - assert len(unary_interceptors) == 2 - assert GapicInterceptor in [type(i) for i in unary_interceptors] - assert client._metrics_interceptor in unary_interceptors - stream_interceptors = updated_channel._unary_stream_interceptors - assert len(stream_interceptors) == 1 - assert client._metrics_interceptor in stream_interceptors - else: - assert isinstance( - client.transport._logged_channel._interceptor, GapicInterceptor - ) - assert updated_channel._interceptor == client._metrics_interceptor - - @CrossSync.pytest - async def test_channel_refresh_stress_test(self, table_id, instance_id, temp_rows): - """ - While swapping channels, consistently hit it with reads. Make sure no failures are found + perform requests while swapping out the grpc channel. Requests should continue without error """ import time await temp_rows.add_row(b"test_row") async with self._make_client() as client: client._channel_refresh_task.cancel() + channel_wrapper = client.transport.grpc_channel + first_channel = channel_wrapper._channel # swap channels frequently, with large grace windows client._channel_refresh_task = CrossSync.create_task( client._manage_channel, refresh_interval_min=0.1, refresh_interval_max=0.1, - grace_period=0.2, + grace_period=1, sync_executor=client._executor, ) # hit channels with frequent requests - end_time = time.monotonic() + 1 + end_time = time.monotonic() + 3 async with client.get_table(instance_id, table_id) as table: while time.monotonic() < end_time: # we expect a CancelledError if a channel is closed before completion rows = await table.read_rows({}) assert len(rows) == 1 await CrossSync.yield_to_event_loop() + # ensure channel was updated + updated_channel = channel_wrapper._channel + assert updated_channel is not first_channel + # ensure interceptors are kept (gapic's logging interceptor, and metric interceptor) + if CrossSync.is_async: + unary_interceptors = updated_channel._unary_unary_interceptors + assert len(unary_interceptors) == 2 + assert GapicInterceptor in [type(i) for i in unary_interceptors] + assert client._metrics_interceptor in unary_interceptors + stream_interceptors = updated_channel._unary_stream_interceptors + assert len(stream_interceptors) == 1 + assert client._metrics_interceptor in stream_interceptors + else: + assert isinstance( + client.transport._logged_channel._interceptor, GapicInterceptor + ) + assert updated_channel._interceptor == client._metrics_interceptor @CrossSync.pytest @pytest.mark.usefixtures("target") diff --git a/tests/system/data/test_system_autogen.py b/tests/system/data/test_system_autogen.py index 5df140946..463235087 100644 --- a/tests/system/data/test_system_autogen.py +++ b/tests/system/data/test_system_autogen.py @@ -221,55 +221,33 @@ def test_ping_and_warm(self, client, target): reason="emulator mode doesn't refresh channel", ) def test_channel_refresh(self, table_id, instance_id, temp_rows): - """change grpc channel to refresh quickly, then schedule a read_rows call after refresh - to ensure new channel is in place and works""" - temp_rows.add_row(b"row_key_1") - temp_rows.add_row(b"row_key_2") - with self._make_client() as client: - client._channel_refresh_task.cancel() - client._channel_refresh_task = CrossSync._Sync_Impl.create_task( - client._manage_channel, - refresh_interval_min=0.25, - refresh_interval_max=0.25, - sync_executor=client._executor, - ) - CrossSync._Sync_Impl.yield_to_event_loop() - with client.get_table(instance_id, table_id) as table: - rows = table.read_rows({}) - channel_wrapper = client.transport.grpc_channel - first_channel = channel_wrapper._channel - assert len(rows) == 2 - CrossSync._Sync_Impl.sleep(0.5) - rows_after_refresh = table.read_rows({}) - assert len(rows_after_refresh) == 2 - assert client.transport.grpc_channel is channel_wrapper - updated_channel = channel_wrapper._channel - assert updated_channel is not first_channel - assert isinstance( - client.transport._logged_channel._interceptor, GapicInterceptor - ) - assert updated_channel._interceptor == client._metrics_interceptor - - def test_channel_refresh_stress_test(self, table_id, instance_id, temp_rows): - """While swapping channels, consistently hit it with reads. Make sure no failures are found""" + """perform requests while swapping out the grpc channel. Requests should continue without error""" import time temp_rows.add_row(b"test_row") with self._make_client() as client: client._channel_refresh_task.cancel() + channel_wrapper = client.transport.grpc_channel + first_channel = channel_wrapper._channel client._channel_refresh_task = CrossSync._Sync_Impl.create_task( client._manage_channel, refresh_interval_min=0.1, refresh_interval_max=0.1, - grace_period=0.2, + grace_period=1, sync_executor=client._executor, ) - end_time = time.monotonic() + 1 + end_time = time.monotonic() + 3 with client.get_table(instance_id, table_id) as table: while time.monotonic() < end_time: rows = table.read_rows({}) assert len(rows) == 1 CrossSync._Sync_Impl.yield_to_event_loop() + updated_channel = channel_wrapper._channel + assert updated_channel is not first_channel + assert isinstance( + client.transport._logged_channel._interceptor, GapicInterceptor + ) + assert updated_channel._interceptor == client._metrics_interceptor @pytest.mark.usefixtures("target") @CrossSync._Sync_Impl.Retry( From f0ff16881e2fa27a4716a96ea86d7191c65b94a8 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 21 Nov 2025 13:49:59 -0800 Subject: [PATCH 6/8] improved grace period handling --- google/cloud/bigtable/data/_async/client.py | 8 ++------ google/cloud/bigtable/data/_sync_autogen/client.py | 5 +++-- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index 1c98f56ab..69e09e983 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -480,12 +480,8 @@ async def _manage_channel( old_channel = super_channel.swap_channel(new_channel) self._invalidate_channel_stubs() # give old_channel a chance to complete existing rpcs - if CrossSync.is_async: - await old_channel.close(grace_period) - else: - if grace_period: - self._is_closed.wait(grace_period) # type: ignore - old_channel.close() # type: ignore + await CrossSync.event_wait(self._is_closed, grace_period, async_break_early=False) + await old_channel.close() # subtract the time spent waiting for the channel to be replaced next_refresh = random.uniform(refresh_interval_min, refresh_interval_max) next_sleep = max(next_refresh - (time.monotonic() - start_timestamp), 0) diff --git a/google/cloud/bigtable/data/_sync_autogen/client.py b/google/cloud/bigtable/data/_sync_autogen/client.py index a403643f5..479c0cb06 100644 --- a/google/cloud/bigtable/data/_sync_autogen/client.py +++ b/google/cloud/bigtable/data/_sync_autogen/client.py @@ -363,8 +363,9 @@ def _manage_channel( self._ping_and_warm_instances(channel=new_channel) old_channel = super_channel.swap_channel(new_channel) self._invalidate_channel_stubs() - if grace_period: - self._is_closed.wait(grace_period) + CrossSync._Sync_Impl.event_wait( + self._is_closed, grace_period, async_break_early=False + ) old_channel.close() next_refresh = random.uniform(refresh_interval_min, refresh_interval_max) next_sleep = max(next_refresh - (time.monotonic() - start_timestamp), 0) From e4d60a8df45520d7551dc072d22bb818425d4da1 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 21 Nov 2025 16:48:10 -0800 Subject: [PATCH 7/8] fixed lint --- google/cloud/bigtable/data/_async/client.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index 69e09e983..98129474e 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -480,7 +480,9 @@ async def _manage_channel( old_channel = super_channel.swap_channel(new_channel) self._invalidate_channel_stubs() # give old_channel a chance to complete existing rpcs - await CrossSync.event_wait(self._is_closed, grace_period, async_break_early=False) + await CrossSync.event_wait( + self._is_closed, grace_period, async_break_early=False + ) await old_channel.close() # subtract the time spent waiting for the channel to be replaced next_refresh = random.uniform(refresh_interval_min, refresh_interval_max) From 150ae3b19b14aaad3958f9d6950ad6e826163582 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 4 Dec 2025 11:46:02 -0800 Subject: [PATCH 8/8] add wait behind a conditional --- google/cloud/bigtable/data/_async/client.py | 7 ++++--- google/cloud/bigtable/data/_sync_autogen/client.py | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index 98129474e..2d5b28841 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -480,9 +480,10 @@ async def _manage_channel( old_channel = super_channel.swap_channel(new_channel) self._invalidate_channel_stubs() # give old_channel a chance to complete existing rpcs - await CrossSync.event_wait( - self._is_closed, grace_period, async_break_early=False - ) + if grace_period: + await CrossSync.event_wait( + self._is_closed, grace_period, async_break_early=False + ) await old_channel.close() # subtract the time spent waiting for the channel to be replaced next_refresh = random.uniform(refresh_interval_min, refresh_interval_max) diff --git a/google/cloud/bigtable/data/_sync_autogen/client.py b/google/cloud/bigtable/data/_sync_autogen/client.py index 479c0cb06..8e99ef05c 100644 --- a/google/cloud/bigtable/data/_sync_autogen/client.py +++ b/google/cloud/bigtable/data/_sync_autogen/client.py @@ -363,9 +363,10 @@ def _manage_channel( self._ping_and_warm_instances(channel=new_channel) old_channel = super_channel.swap_channel(new_channel) self._invalidate_channel_stubs() - CrossSync._Sync_Impl.event_wait( - self._is_closed, grace_period, async_break_early=False - ) + if grace_period: + CrossSync._Sync_Impl.event_wait( + self._is_closed, grace_period, async_break_early=False + ) old_channel.close() next_refresh = random.uniform(refresh_interval_min, refresh_interval_max) next_sleep = max(next_refresh - (time.monotonic() - start_timestamp), 0)