From 7ea32b830ee27a6262835dab77228cb02505247c Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Wed, 20 May 2026 15:55:31 +0300 Subject: [PATCH 1/3] Fix table row inference benchmark model_path --- .../table_row_inference_benchmark.py | 42 ++++++++++--------- .../apache_beam/testing/test_pipeline.py | 4 +- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/inference/table_row_inference_benchmark.py b/sdks/python/apache_beam/testing/benchmarks/inference/table_row_inference_benchmark.py index b8591a0fea83..56c3b332b47f 100644 --- a/sdks/python/apache_beam/testing/benchmarks/inference/table_row_inference_benchmark.py +++ b/sdks/python/apache_beam/testing/benchmarks/inference/table_row_inference_benchmark.py @@ -72,32 +72,34 @@ def __init__(self): metrics_namespace=self.metrics_namespace, is_streaming=False, pcollection='RunInference/BeamML_RunInference_Postprocess-0.out0') - self.is_streaming = ((self.pipeline.get_option('mode') or - 'batch') == 'streaming') + opts = self.pipeline.get_pipeline_options().view_as( + TableRowInferenceOptions) + mode = opts.mode or 'batch' + self.is_streaming = mode == 'streaming' if self.is_streaming: - self.subscription = self.pipeline.get_option('input_subscription') + self.subscription = opts.input_subscription def test(self): """Execute the table row inference pipeline for benchmarking.""" - extra_opts = {} - - mode = self.pipeline.get_option('mode') or 'batch' - extra_opts['mode'] = mode + opts = self.pipeline.get_pipeline_options().view_as( + TableRowInferenceOptions) + mode = opts.mode or 'batch' + extra_opts = {'mode': mode} if mode == 'streaming': - extra_opts['input_subscription'] = self.pipeline.get_option( - 'input_subscription') - extra_opts['window_size_sec'] = int( - self.pipeline.get_option('window_size_sec') or 60) - extra_opts['trigger_interval_sec'] = int( - self.pipeline.get_option('trigger_interval_sec') or 30) - else: - extra_opts['input_file'] = self.pipeline.get_option('input_file') - - for opt in ['output_table', 'model_path', 'feature_columns']: - val = self.pipeline.get_option(opt) - if val: - extra_opts[opt] = val + if opts.input_subscription: + extra_opts['input_subscription'] = opts.input_subscription + extra_opts['window_size_sec'] = opts.window_size_sec or 60 + extra_opts['trigger_interval_sec'] = opts.trigger_interval_sec or 30 + elif opts.input_file: + extra_opts['input_file'] = opts.input_file + + if opts.output_table: + extra_opts['output_table'] = opts.output_table + if opts.model_path: + extra_opts['model_path'] = opts.model_path + if opts.feature_columns: + extra_opts['feature_columns'] = opts.feature_columns self.result = table_row_inference.run( self.pipeline.get_full_options_as_args(**extra_opts), diff --git a/sdks/python/apache_beam/testing/test_pipeline.py b/sdks/python/apache_beam/testing/test_pipeline.py index 712da8636234..1fe2d86e35b3 100644 --- a/sdks/python/apache_beam/testing/test_pipeline.py +++ b/sdks/python/apache_beam/testing/test_pipeline.py @@ -209,7 +209,9 @@ def get_option(self, opt_name, bool_option=False): None if option is not found in existing option list which is generated by parsing value of argument `test-pipeline-options`. """ - parser = argparse.ArgumentParser() + # Parse one flag at a time; disable prefix matching so e.g. --mode does + # not satisfy --model_path when both appear in options_list. + parser = argparse.ArgumentParser(allow_abbrev=False) opt_name = opt_name[:2] if opt_name[:2] == '--' else opt_name # Option name should start with '--' when it's used for parsing. if bool_option: From 264c2a11216e963ffcfec3743fdfcd1fe1eba4f4 Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Thu, 21 May 2026 00:24:14 +0300 Subject: [PATCH 2/3] refactor --- .../table_row_inference_benchmark.py | 40 ++++++++++--------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/inference/table_row_inference_benchmark.py b/sdks/python/apache_beam/testing/benchmarks/inference/table_row_inference_benchmark.py index 56c3b332b47f..265759a1cd7c 100644 --- a/sdks/python/apache_beam/testing/benchmarks/inference/table_row_inference_benchmark.py +++ b/sdks/python/apache_beam/testing/benchmarks/inference/table_row_inference_benchmark.py @@ -72,34 +72,36 @@ def __init__(self): metrics_namespace=self.metrics_namespace, is_streaming=False, pcollection='RunInference/BeamML_RunInference_Postprocess-0.out0') - opts = self.pipeline.get_pipeline_options().view_as( + self.opts = self.pipeline.get_pipeline_options().view_as( TableRowInferenceOptions) - mode = opts.mode or 'batch' + mode = self.opts.mode or 'batch' self.is_streaming = mode == 'streaming' if self.is_streaming: - self.subscription = opts.input_subscription + self.subscription = self.opts.input_subscription def test(self): """Execute the table row inference pipeline for benchmarking.""" - opts = self.pipeline.get_pipeline_options().view_as( - TableRowInferenceOptions) - mode = opts.mode or 'batch' + mode = self.opts.mode or 'batch' extra_opts = {'mode': mode} if mode == 'streaming': - if opts.input_subscription: - extra_opts['input_subscription'] = opts.input_subscription - extra_opts['window_size_sec'] = opts.window_size_sec or 60 - extra_opts['trigger_interval_sec'] = opts.trigger_interval_sec or 30 - elif opts.input_file: - extra_opts['input_file'] = opts.input_file - - if opts.output_table: - extra_opts['output_table'] = opts.output_table - if opts.model_path: - extra_opts['model_path'] = opts.model_path - if opts.feature_columns: - extra_opts['feature_columns'] = opts.feature_columns + if self.opts.input_subscription: + extra_opts['input_subscription'] = self.opts.input_subscription + extra_opts['window_size_sec'] = ( + self.opts.window_size_sec if self.opts.window_size_sec is not None + else 60) + extra_opts['trigger_interval_sec'] = ( + self.opts.trigger_interval_sec + if self.opts.trigger_interval_sec is not None else 30) + elif self.opts.input_file: + extra_opts['input_file'] = self.opts.input_file + + if self.opts.output_table: + extra_opts['output_table'] = self.opts.output_table + if self.opts.model_path: + extra_opts['model_path'] = self.opts.model_path + if self.opts.feature_columns: + extra_opts['feature_columns'] = self.opts.feature_columns self.result = table_row_inference.run( self.pipeline.get_full_options_as_args(**extra_opts), From 0b4b5b7237d5fe457896044726e510872600e262 Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Thu, 21 May 2026 00:46:44 +0300 Subject: [PATCH 3/3] fix formatter --- .../benchmarks/inference/table_row_inference_benchmark.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/inference/table_row_inference_benchmark.py b/sdks/python/apache_beam/testing/benchmarks/inference/table_row_inference_benchmark.py index 265759a1cd7c..e3de24574391 100644 --- a/sdks/python/apache_beam/testing/benchmarks/inference/table_row_inference_benchmark.py +++ b/sdks/python/apache_beam/testing/benchmarks/inference/table_row_inference_benchmark.py @@ -88,8 +88,8 @@ def test(self): if self.opts.input_subscription: extra_opts['input_subscription'] = self.opts.input_subscription extra_opts['window_size_sec'] = ( - self.opts.window_size_sec if self.opts.window_size_sec is not None - else 60) + self.opts.window_size_sec + if self.opts.window_size_sec is not None else 60) extra_opts['trigger_interval_sec'] = ( self.opts.trigger_interval_sec if self.opts.trigger_interval_sec is not None else 30)