-
Notifications
You must be signed in to change notification settings - Fork 1.7k
feat(bigtable): add client side metric instrumentation to basic rpcs #16712
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
8a3af45
cf58c57
00618f8
ead3bf1
af0beba
d0c03d0
01f1739
d40fa0a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -63,7 +63,11 @@ | |
| _validate_timeouts, | ||
| _WarmedInstanceKey, | ||
| ) | ||
| from google.cloud.bigtable.data._metrics import BigtableClientSideMetricsController | ||
| from google.cloud.bigtable.data._metrics import ( | ||
| BigtableClientSideMetricsController, | ||
| OperationType, | ||
| tracked_retry, | ||
| ) | ||
| from google.cloud.bigtable.data.exceptions import ( | ||
| FailedQueryShardError, | ||
| ShardedReadRowsExceptionGroup, | ||
|
|
@@ -1431,26 +1435,28 @@ async def sample_row_keys( | |
| retryable_excs = _get_retryable_errors(retryable_errors, self) | ||
| predicate = retries.if_exception_type(*retryable_excs) | ||
|
|
||
| sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60) | ||
|
|
||
| @CrossSync.convert | ||
| async def execute_rpc(): | ||
| results = await self.client._gapic_client.sample_row_keys( | ||
| request=SampleRowKeysRequest( | ||
| app_profile_id=self.app_profile_id, **self._request_path | ||
| ), | ||
| timeout=next(attempt_timeout_gen), | ||
| retry=None, | ||
| with self._metrics.create_operation( | ||
| OperationType.SAMPLE_ROW_KEYS | ||
| ) as operation_metric: | ||
|
|
||
| @CrossSync.convert | ||
| async def execute_rpc(): | ||
| results = await self.client._gapic_client.sample_row_keys( | ||
| request=SampleRowKeysRequest( | ||
| app_profile_id=self.app_profile_id, **self._request_path | ||
| ), | ||
| timeout=next(attempt_timeout_gen), | ||
| retry=None, | ||
| ) | ||
| return [(s.row_key, s.offset_bytes) async for s in results] | ||
|
|
||
| return await tracked_retry( | ||
| retry_fn=CrossSync.retry_target, | ||
| operation=operation_metric, | ||
| target=execute_rpc, | ||
| predicate=predicate, | ||
| timeout=operation_timeout, | ||
| ) | ||
|
daniel-sanche marked this conversation as resolved.
|
||
| return [(s.row_key, s.offset_bytes) async for s in results] | ||
|
|
||
| return await CrossSync.retry_target( | ||
| execute_rpc, | ||
| predicate, | ||
| sleep_generator, | ||
| operation_timeout, | ||
| exception_factory=_retry_exception_factory, | ||
| ) | ||
|
|
||
| @CrossSync.convert(replace_symbols={"MutationsBatcherAsync": "MutationsBatcher"}) | ||
| def mutations_batcher( | ||
|
|
@@ -1561,28 +1567,29 @@ async def mutate_row( | |
| # mutations should not be retried | ||
| predicate = retries.if_exception_type() | ||
|
|
||
| sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60) | ||
|
|
||
| target = partial( | ||
| self.client._gapic_client.mutate_row, | ||
| request=MutateRowRequest( | ||
| row_key=row_key.encode("utf-8") | ||
| if isinstance(row_key, str) | ||
| else row_key, | ||
| mutations=[mutation._to_pb() for mutation in mutations_list], | ||
| app_profile_id=self.app_profile_id, | ||
| **self._request_path, | ||
| ), | ||
| timeout=attempt_timeout, | ||
| retry=None, | ||
| ) | ||
| return await CrossSync.retry_target( | ||
| target, | ||
| predicate, | ||
| sleep_generator, | ||
| operation_timeout, | ||
| exception_factory=_retry_exception_factory, | ||
| ) | ||
| with self._metrics.create_operation( | ||
| OperationType.MUTATE_ROW | ||
| ) as operation_metric: | ||
| target = partial( | ||
| self.client._gapic_client.mutate_row, | ||
| request=MutateRowRequest( | ||
| row_key=row_key.encode("utf-8") | ||
| if isinstance(row_key, str) | ||
| else row_key, | ||
| mutations=[mutation._to_pb() for mutation in mutations_list], | ||
| app_profile_id=self.app_profile_id, | ||
| **self._request_path, | ||
| ), | ||
| timeout=attempt_timeout, | ||
| retry=None, | ||
| ) | ||
| return await tracked_retry( | ||
| retry_fn=CrossSync.retry_target, | ||
| operation=operation_metric, | ||
| target=target, | ||
| predicate=predicate, | ||
| timeout=operation_timeout, | ||
| ) | ||
|
daniel-sanche marked this conversation as resolved.
|
||
|
|
||
| @CrossSync.convert | ||
| async def bulk_mutate_rows( | ||
|
|
@@ -1693,21 +1700,25 @@ async def check_and_mutate_row( | |
| ): | ||
| false_case_mutations = [false_case_mutations] | ||
| false_case_list = [m._to_pb() for m in false_case_mutations or []] | ||
| result = await self.client._gapic_client.check_and_mutate_row( | ||
| request=CheckAndMutateRowRequest( | ||
| true_mutations=true_case_list, | ||
| false_mutations=false_case_list, | ||
| predicate_filter=predicate._to_pb() if predicate is not None else None, | ||
| row_key=row_key.encode("utf-8") | ||
| if isinstance(row_key, str) | ||
| else row_key, | ||
| app_profile_id=self.app_profile_id, | ||
| **self._request_path, | ||
| ), | ||
| timeout=operation_timeout, | ||
| retry=None, | ||
| ) | ||
| return result.predicate_matched | ||
|
|
||
| with self._metrics.create_operation(OperationType.CHECK_AND_MUTATE): | ||
| result = await self.client._gapic_client.check_and_mutate_row( | ||
| request=CheckAndMutateRowRequest( | ||
| true_mutations=true_case_list, | ||
| false_mutations=false_case_list, | ||
| predicate_filter=predicate._to_pb() | ||
| if predicate is not None | ||
| else None, | ||
| row_key=row_key.encode("utf-8") | ||
| if isinstance(row_key, str) | ||
| else row_key, | ||
| app_profile_id=self.app_profile_id, | ||
| **self._request_path, | ||
| ), | ||
| timeout=operation_timeout, | ||
| retry=None, | ||
| ) | ||
| return result.predicate_matched | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is not wrapped in tracked_retry. Will the attempt level metrics (attempt latencies, server latencies and connectivity error count) still be recorded? |
||
|
|
||
| @CrossSync.convert | ||
| async def read_modify_write_row( | ||
|
|
@@ -1747,20 +1758,22 @@ async def read_modify_write_row( | |
| rules = [rules] | ||
| if not rules: | ||
| raise ValueError("rules must contain at least one item") | ||
| result = await self.client._gapic_client.read_modify_write_row( | ||
| request=ReadModifyWriteRowRequest( | ||
| rules=[rule._to_pb() for rule in rules], | ||
| row_key=row_key.encode("utf-8") | ||
| if isinstance(row_key, str) | ||
| else row_key, | ||
| app_profile_id=self.app_profile_id, | ||
| **self._request_path, | ||
| ), | ||
| timeout=operation_timeout, | ||
| retry=None, | ||
| ) | ||
| # construct Row from result | ||
| return Row._from_pb(result.row) | ||
|
|
||
| with self._metrics.create_operation(OperationType.READ_MODIFY_WRITE): | ||
| result = await self.client._gapic_client.read_modify_write_row( | ||
| request=ReadModifyWriteRowRequest( | ||
| rules=[rule._to_pb() for rule in rules], | ||
| row_key=row_key.encode("utf-8") | ||
| if isinstance(row_key, str) | ||
| else row_key, | ||
| app_profile_id=self.app_profile_id, | ||
| **self._request_path, | ||
| ), | ||
| timeout=operation_timeout, | ||
| retry=None, | ||
| ) | ||
| # construct Row from result | ||
| return Row._from_pb(result.row) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same question as check and mutate. |
||
|
|
||
| @CrossSync.convert | ||
| async def close(self): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -105,6 +105,7 @@ def _retry_exception_factory( | |
| tuple[Exception, Exception|None]: | ||
| tuple of the exception to raise, and a cause exception if applicable | ||
| """ | ||
| exc_list = exc_list.copy() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need to copy the exception list now? |
||
| if reason == RetryFailureReason.TIMEOUT: | ||
| timeout_val_str = f"of {timeout_val:0.1f}s " if timeout_val is not None else "" | ||
| # if failed due to timeout, raise deadline exceeded as primary exception | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where did sleep_generator and exception_factory go?