Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
5c99b58
[WIP] Migrate to Google Cloud Dataflow Client
jrmccluskey Feb 18, 2026
45f0f11
Trigger relevant postcommits
jrmccluskey Feb 18, 2026
8fadf1f
base image update
jrmccluskey Feb 18, 2026
c123d82
fix camel case
jrmccluskey Feb 18, 2026
16de19b
update dataflow runner + tests
jrmccluskey Feb 18, 2026
fdd8804
slide import to avoid triggering unit tests
jrmccluskey Feb 18, 2026
c957d8d
yapf stuff
jrmccluskey Feb 18, 2026
6d60ea9
remove extra print
jrmccluskey Feb 18, 2026
0f34295
further spec structs, fix incorrect piplineUrl option, remove old cli…
jrmccluskey Feb 19, 2026
0b2213c
suppress line-too-longs
jrmccluskey Feb 19, 2026
60500e7
formatting
jrmccluskey Feb 19, 2026
fcc3e91
linting, tweak metrics tests
jrmccluskey Feb 24, 2026
e20481f
Proto-specific changes to metric processing tests
jrmccluskey Feb 24, 2026
326c7da
try to dump logging
jrmccluskey Feb 24, 2026
cf12d0f
handle more straightforward metrics values
jrmccluskey Feb 24, 2026
fc70a66
add skips since the unit tests now depend on the proto library
jrmccluskey Feb 24, 2026
9dc9c54
testing if there's a disconnect between proto behavior locally and in…
jrmccluskey Feb 25, 2026
02a6eda
correct scalar access
jrmccluskey Feb 25, 2026
25a05b4
clean up dist accesses
jrmccluskey Feb 25, 2026
4806c1e
linting, various fixes
jrmccluskey Feb 25, 2026
469846e
fix unit test setup for direct accesses
jrmccluskey Feb 25, 2026
f905719
linting
jrmccluskey Feb 25, 2026
8e800e7
more linting
jrmccluskey Feb 26, 2026
efda6b4
re-enable histograms
jrmccluskey Mar 25, 2026
132c421
Bump dataflow client version, restore pausing/paused concept
jrmccluskey Apr 9, 2026
c3e4b6a
formatting
jrmccluskey Apr 9, 2026
4314fbe
fix enum selection
jrmccluskey Apr 16, 2026
9cfe59c
handle the proto hash PR
jrmccluskey May 7, 2026
a653425
remove unnecessary try/except block
jrmccluskey May 7, 2026
4e9b173
re-delete old messsages
jrmccluskey May 12, 2026
e043e28
fix disk_provisioned_iops/throughput_mibps tests
jrmccluskey May 12, 2026
f153862
code bot suggestions
jrmccluskey May 12, 2026
ba83e4f
revert pipeline options interaction
jrmccluskey May 12, 2026
aab7f51
leftover update
jrmccluskey May 12, 2026
9ff3e51
yapf
jrmccluskey May 12, 2026
f46b857
swap credential loading
jrmccluskey May 12, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/trigger_files/beam_PostCommit_Python.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"pr": "38069",
"modification": 41
}
"modification": 40
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
60 changes: 15 additions & 45 deletions sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def _get_match(proto, filter_fn):


# V1b3 MetricStructuredName keys to accept and copy to the MetricKey labels.
STEP_LABEL = 'step'
STRUCTURED_NAME_LABELS = set(
['execution_step', 'original_name', 'output_user_name'])

Expand Down Expand Up @@ -112,9 +111,7 @@ def _translate_step_name(self, internal_name):
try:
step = _get_match(
self._job_graph.proto.steps, lambda x: x.name == internal_name)
user_step_name = _get_match(
step.properties.additionalProperties,
lambda x: x.key == 'user_name').value.string_value
user_step_name = step.properties.get('user_name')
except ValueError:
pass # Exception is handled below.
if not user_step_name:
Expand All @@ -135,24 +132,19 @@ def _get_metric_key(self, metric):
# step name (only happens for unstructured-named metrics).
# 2. Unable to unpack [step] or [namespace]; which should only happen
# for unstructured names.
step = _get_match(
metric.name.context.additionalProperties,
lambda x: x.key == STEP_LABEL).value
step = metric.name.context['step']
step = self._translate_step_name(step)
except ValueError:
pass

namespace = "dataflow/v1b3" # Try to extract namespace or add a default.
try:
namespace = _get_match(
metric.name.context.additionalProperties,
lambda x: x.key == 'namespace').value
except ValueError:
pass
carried_namespace = metric.name.context.get('namespace', None)
if carried_namespace:
namespace = carried_namespace

for kv in metric.name.context.additionalProperties:
if kv.key in STRUCTURED_NAME_LABELS:
labels[kv.key] = kv.value
for key in metric.name.context:
if key in STRUCTURED_NAME_LABELS:
labels[key] = metric.name.context[key]
# Package everything besides namespace and name the labels as well,
# including unmodified step names to assist in integration the exact
# unmodified values which come from dataflow.
Expand Down Expand Up @@ -185,10 +177,7 @@ def _populate_metrics(self, response, result, user_metrics=False):
# in the service.
# The second way is only useful for the UI, and should be ignored.
continue
is_tentative = [
prop for prop in metric.name.context.additionalProperties
if prop.key == 'tentative' and prop.value == 'true'
]
is_tentative = metric.name.context['tentative']
tentative_or_committed = 'tentative' if is_tentative else 'committed'

metric_key = self._get_metric_key(metric)
Expand All @@ -209,32 +198,13 @@ def _get_metric_value(self, metric):
return None

if metric.scalar is not None:
return metric.scalar.integer_value
# This will always be a single value if there is any data in the field.
return metric.scalar
elif metric.distribution is not None:
dist_count = _get_match(
metric.distribution.object_value.properties,
lambda x: x.key == 'count').value.integer_value
dist_min = _get_match(
metric.distribution.object_value.properties,
lambda x: x.key == 'min').value.integer_value
dist_max = _get_match(
metric.distribution.object_value.properties,
lambda x: x.key == 'max').value.integer_value
dist_sum = _get_match(
metric.distribution.object_value.properties,
lambda x: x.key == 'sum').value.integer_value
if dist_sum is None:
# distribution metric is not meant to use on large values, but in case
# it is, the value can overflow and become double_value, the correctness
# of the value may not be guaranteed.
_LOGGER.info(
"Distribution metric sum value seems to have "
"overflowed integer_value range, the correctness of sum or mean "
"value may not be guaranteed: %s" % metric.distribution)
dist_sum = int(
_get_match(
metric.distribution.object_value.properties,
lambda x: x.key == 'sum').value.double_value)
dist_count = metric.distribution['count']
dist_min = metric.distribution['min']
dist_max = metric.distribution['max']
dist_sum = metric.distribution['sum']
return DistributionResult(
DistributionData(dist_sum, dist_count, dist_min, dist_max))
#TODO(https://github.com/apache/beam/issues/31788) support StringSet after
Expand Down
Loading
Loading