From 9a9640fb4e5b845bbadef442d4a785b693882727 Mon Sep 17 00:00:00 2001 From: Dylan Jew Date: Tue, 28 Apr 2026 14:01:08 -0400 Subject: [PATCH 01/11] Index the Fuzzer.builtin field From 1c59409dac547d3292998e938acdab80d081ad82 Mon Sep 17 00:00:00 2001 From: Dylan Jew Date: Tue, 28 Apr 2026 15:25:55 -0400 Subject: [PATCH 02/11] WIP cronjob to aggregate fuzzer stats table --- .../_internal/cron/aggregate_fuzzer_stats.py | 172 ++++++++++++++++++ .../cron/aggregate_fuzzer_stats_test.py | 99 ++++++++++ 2 files changed, 271 insertions(+) create mode 100644 src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py create mode 100644 src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py diff --git a/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py b/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py new file mode 100644 index 0000000000..d342f70011 --- /dev/null +++ b/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py @@ -0,0 +1,172 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Cron job to aggregate fuzzer stats in BigQuery.""" + +import argparse + +from clusterfuzz._internal.base import utils +from clusterfuzz._internal.datastore import data_types +from clusterfuzz._internal.google_cloud_utils import big_query +from clusterfuzz._internal.metrics import fuzzer_stats +from clusterfuzz._internal.metrics import logs + +DAILY_STATS_SCHEMA = { + 'fields': [{ + 'name': 'fuzzer_name', + 'type': 'STRING', + 'mode': 'NULLABLE' + }, { + 'name': 'date', + 'type': 'DATE', + 'mode': 'NULLABLE' + }, { + 'name': 'testcases_executed', + 'type': 'INTEGER', + 'mode': 'NULLABLE' + }, { + 'name': 'testcase_execution_duration', + 'type': 'INTERVAL', + 'mode': 'NULLABLE' + }, { + 'name': 'testcases_generated', + 'type': 'INTEGER', + 'mode': 'NULLABLE' + }, { + 'name': 'testcase_generation_duration', + 'type': 'INTERVAL', + 'mode': 'NULLABLE' + }, { + 'name': 'fuzzing_duration', + 'type': 'INTERVAL', + 'mode': 'NULLABLE' + }] +} + + +def _create_dataset_if_needed(bigquery, dataset_id): + """Create a new dataset if necessary.""" + project_id = utils.get_application_id() + dataset_body = { + 'datasetReference': { + 'datasetId': dataset_id, + 'projectId': project_id, + }, + } + try: + bigquery.datasets().insert( + projectId=project_id, body=dataset_body).execute() + logs.info(f'Created dataset {dataset_id}.') + except Exception as e: + if '409' not in str(e): + logs.error(f'Failed to create dataset {dataset_id}: {e}') + + +def _create_table_if_needed(bigquery, dataset_id, table_id, schema): + """Create a new table if needed.""" + project_id = utils.get_application_id() + table_body = { + 'tableReference': { + 'datasetId': dataset_id, + 'projectId': project_id, + 'tableId': table_id, + }, + 'schema': schema + } + try: + bigquery.tables().insert( + projectId=project_id, datasetId=dataset_id, body=table_body).execute() + logs.info(f'Created table {dataset_id}.{table_id}.') + except Exception as e: + if '409' not in str(e): + logs.error(f'Failed to create table {dataset_id}.{table_id}: {e}') + + +def main(argv): + """Main entry point for the aggregate_fuzzer_stats cron job.""" + parser = argparse.ArgumentParser(prog='aggregate_fuzzer_stats') + parser.add_argument( + '--fuzzer', required=False, help='Specific fuzzer to write') + parser.add_argument( + '--non-dry-run', action='store_true', help='Whether to write to BigQuery') + args = parser.parse_args(argv) + + logs.info('Starting fuzzer stats aggregation cron.') + + bigquery_client = big_query.get_api_client() + project_id = utils.get_application_id() + + _create_dataset_if_needed(bigquery_client, 'fuzzer_stats') + _create_table_if_needed(bigquery_client, 'fuzzer_stats', 'daily_stats', + DAILY_STATS_SCHEMA) + + fuzzers = data_types.Fuzzer.query(data_types.Fuzzer.builtin == False) + dest_client = big_query.Client( + dataset_id='fuzzer_stats', table_id='daily_stats') + + for fuzzer in fuzzers: + if args.fuzzer and args.fuzzer != fuzzer.name: + continue + + logs.info(f'Processing stats for fuzzer: {fuzzer.name}') + dataset_id = fuzzer_stats.dataset_name(fuzzer.name) + table_id = 'JobRun' + + query = f""" + SELECT + '{fuzzer.name}' as fuzzer_name, + CAST(DATE(TIMESTAMP_SECONDS(CAST(timestamp AS INT64))) AS STRING) as date, + SUM(testcases_executed) as testcases_executed, + CAST(SUM(testcase_execution_duration) AS STRING) as testcase_execution_duration, + SUM(testcases_generated) as testcases_generated, + CAST(SUM(testcase_generation_duration) AS STRING) as testcase_generation_duration, + CAST(SUM(fuzzing_duration) AS STRING) as fuzzing_duration + FROM + `{project_id}.{dataset_id}.{table_id}` + WHERE + DATE(TIMESTAMP_SECONDS(CAST(timestamp AS INT64))) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY) + GROUP BY + DATE(TIMESTAMP_SECONDS(CAST(timestamp AS INT64))) + """ + + try: + source_client = big_query.Client() + result = source_client.query(query) + + if not result.rows: + logs.info(f'No data for {fuzzer.name} for yesterday.') + continue + + inserts = [] + for row in result.rows: + date_str = row['date'] + insert_id = fuzzer.name + '_' + date_str + inserts.append(big_query.Insert(row=row, insert_id=insert_id)) + + if inserts: + if not args.non_dry_run: + logs.info( + f'DRY RUN: Would insert {len(inserts)} rows for {fuzzer.name}.') + else: + insert_result = dest_client.insert(inserts) + errors = insert_result.get('insertErrors') + if errors: + logs.error(f'Failed to insert rows for {fuzzer.name}: {errors}') + else: + logs.info( + f'Successfully inserted {len(inserts)} rows for {fuzzer.name}.') + + except Exception as e: + logs.error(f'Failed to process {fuzzer.name}: {e}') + + logs.info('Fuzzer stats aggregation cron complete.') diff --git a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py new file mode 100644 index 0000000000..b83f644bf3 --- /dev/null +++ b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py @@ -0,0 +1,99 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for aggregate_fuzzer_stats.""" + +import unittest +from unittest import mock + +from clusterfuzz._internal.cron import aggregate_fuzzer_stats +from clusterfuzz._internal.datastore import data_types +from clusterfuzz._internal.google_cloud_utils.big_query import QueryResult +from clusterfuzz._internal.tests.test_libs import helpers as test_helpers +from clusterfuzz._internal.tests.test_libs import test_utils + + +@test_utils.with_cloud_emulators('datastore') +class AggregateFuzzerStatsTest(unittest.TestCase): + """Test AggregateFuzzerStats.""" + + def setUp(self): + # Create a non-builtin fuzzer + data_types.Fuzzer(name='fuzzer', jobs=['job'], builtin=False).put() + data_types.Job(name='job').put() + + test_helpers.patch(self, [ + 'clusterfuzz._internal.google_cloud_utils.big_query.get_api_client', + 'clusterfuzz._internal.google_cloud_utils.big_query.Client', + 'clusterfuzz._internal.base.utils.get_application_id', + ]) + + self.mock.get_application_id.return_value = 'test-clusterfuzz' + self.mock_api_client = mock.MagicMock() + self.mock.get_api_client.return_value = self.mock_api_client + + self.mock_client_instance = mock.MagicMock() + self.mock.Client.return_value = self.mock_client_instance + + self.mock_client_instance.query.return_value = QueryResult( + rows=[{ + 'fuzzer_name': 'fuzzer', + 'date': '2026-05-01', + 'testcases_executed': 10, + 'testcase_execution_duration': 'P0DT0H1M0S', + 'testcases_generated': 5, + 'testcase_generation_duration': 'P0DT0H0M30S', + 'fuzzing_duration': 'P0DT0H1M30S' + }], + total_count=1) + + self.mock_client_instance.insert.return_value = {} + + def test_aggregate(self): + """Tests execution of the aggregate_fuzzer_stats cron job.""" + # Pass argv list instead of mock args object + aggregate_fuzzer_stats.main(['--non-dry-run']) + + # Verify dataset creation attempt + self.mock_api_client.datasets().insert.assert_called_with( + projectId='test-clusterfuzz', + body={ + 'datasetReference': { + 'projectId': 'test-clusterfuzz', + 'datasetId': 'fuzzer_stats' + } + }) + + # Verify table creation attempt + self.mock_api_client.tables().insert.assert_called_with( + body={ + 'tableReference': { + 'projectId': 'test-clusterfuzz', + 'datasetId': 'fuzzer_stats', + 'tableId': 'daily_stats', + }, + 'schema': aggregate_fuzzer_stats.DAILY_STATS_SCHEMA, + }, + datasetId='fuzzer_stats', + projectId='test-clusterfuzz') + + # Verify query execution + self.mock_client_instance.query.assert_called_once() + + # Verify insert + self.mock_client_instance.insert.assert_called_once() + args_list = self.mock_client_instance.insert.call_args_list + inserts = args_list[0][0][0] + self.assertEqual(len(inserts), 1) + self.assertEqual(inserts[0].row['fuzzer_name'], 'fuzzer') + self.assertEqual(inserts[0].insert_id, 'fuzzer_2026-05-01') From c3df31d40557e00580d9943c81210831d734a5e8 Mon Sep 17 00:00:00 2001 From: Dylan Jew Date: Wed, 29 Apr 2026 17:51:31 -0400 Subject: [PATCH 03/11] Change upload to WRITE_TRUNCATE --- .../_internal/cron/aggregate_fuzzer_stats.py | 150 +++++++++++++----- .../cron/aggregate_fuzzer_stats_test.py | 59 ++++++- src/local/butler/run.py | 11 ++ 3 files changed, 169 insertions(+), 51 deletions(-) diff --git a/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py b/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py index d342f70011..beff449023 100644 --- a/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py +++ b/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py @@ -1,25 +1,18 @@ -# Copyright 2026 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Cron job to aggregate fuzzer stats in BigQuery.""" - import argparse +import datetime +import io +import json + +from googleapiclient.http import MediaIoBaseUpload from clusterfuzz._internal.base import utils from clusterfuzz._internal.datastore import data_types from clusterfuzz._internal.google_cloud_utils import big_query from clusterfuzz._internal.metrics import fuzzer_stats from clusterfuzz._internal.metrics import logs +from clusterfuzz._internal.system import environment + +# pylint: disable=no-member DAILY_STATS_SCHEMA = { 'fields': [{ @@ -54,6 +47,10 @@ } +class TableConfigurationError(Exception): + """Exception raised for structural mismatches in target databases.""" + + def _create_dataset_if_needed(bigquery, dataset_id): """Create a new dataset if necessary.""" project_id = utils.get_application_id() @@ -81,8 +78,34 @@ def _create_table_if_needed(bigquery, dataset_id, table_id, schema): 'projectId': project_id, 'tableId': table_id, }, + 'timePartitioning': { + 'type': 'DAY', + 'field': 'date', + }, 'schema': schema } + + try: + # Validate that existing partitioned state holds right parameters + table_info = bigquery.tables().get( + projectId=project_id, datasetId=dataset_id, tableId=table_id).execute() + time_partitioning = table_info.get('timePartitioning') + if not time_partitioning or time_partitioning.get('field') != 'date': + logs.info( + f'Table {dataset_id}.{table_id} exists but is unpartitioned or ' + f'configured differently. Re-creating.' + ) + bigquery.tables().delete( + projectId=project_id, + datasetId=dataset_id, + tableId=table_id).execute() + raise TableConfigurationError('Table dropped for re-creation') + except Exception as e: + if '404' not in str(e) and 'dropped for re-creation' not in str(e): + logs.error( + f'Error checking metadata for table {dataset_id}.{table_id}: {e}') + return + try: bigquery.tables().insert( projectId=project_id, datasetId=dataset_id, body=table_body).execute() @@ -95,30 +118,39 @@ def _create_table_if_needed(bigquery, dataset_id, table_id, schema): def main(argv): """Main entry point for the aggregate_fuzzer_stats cron job.""" parser = argparse.ArgumentParser(prog='aggregate_fuzzer_stats') - parser.add_argument( - '--fuzzer', required=False, help='Specific fuzzer to write') parser.add_argument( '--non-dry-run', action='store_true', help='Whether to write to BigQuery') args = parser.parse_args(argv) logs.info('Starting fuzzer stats aggregation cron.') + if environment.is_local_development(): + logs.error( + 'BigQuery requires a cloud project to run. ' + 'This cron job cannot run locally.' + ) + return + bigquery_client = big_query.get_api_client() project_id = utils.get_application_id() - _create_dataset_if_needed(bigquery_client, 'fuzzer_stats') - _create_table_if_needed(bigquery_client, 'fuzzer_stats', 'daily_stats', - DAILY_STATS_SCHEMA) + if args.non_dry_run: + _create_dataset_if_needed(bigquery_client, 'fuzzer_stats') + _create_table_if_needed(bigquery_client, 'fuzzer_stats', 'daily_stats', + DAILY_STATS_SCHEMA) + # The linter suggests a comparison that isn't supported by query() filters. + # pylint: disable=singleton-comparison fuzzers = data_types.Fuzzer.query(data_types.Fuzzer.builtin == False) - dest_client = big_query.Client( - dataset_id='fuzzer_stats', table_id='daily_stats') - for fuzzer in fuzzers: - if args.fuzzer and args.fuzzer != fuzzer.name: - continue + yesterday = (datetime.datetime.utcnow().date() - datetime.timedelta(days=1)) + date_partition_str = yesterday.strftime('%Y%m%d') + all_rows = [] + + for fuzzer in fuzzers: logs.info(f'Processing stats for fuzzer: {fuzzer.name}') + dataset_id = fuzzer_stats.dataset_name(fuzzer.name) table_id = 'JobRun' @@ -136,7 +168,7 @@ def main(argv): WHERE DATE(TIMESTAMP_SECONDS(CAST(timestamp AS INT64))) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY) GROUP BY - DATE(TIMESTAMP_SECONDS(CAST(timestamp AS INT64))) + date """ try: @@ -147,26 +179,58 @@ def main(argv): logs.info(f'No data for {fuzzer.name} for yesterday.') continue - inserts = [] for row in result.rows: - date_str = row['date'] - insert_id = fuzzer.name + '_' + date_str - inserts.append(big_query.Insert(row=row, insert_id=insert_id)) - - if inserts: - if not args.non_dry_run: - logs.info( - f'DRY RUN: Would insert {len(inserts)} rows for {fuzzer.name}.') - else: - insert_result = dest_client.insert(inserts) - errors = insert_result.get('insertErrors') - if errors: - logs.error(f'Failed to insert rows for {fuzzer.name}: {errors}') - else: - logs.info( - f'Successfully inserted {len(inserts)} rows for {fuzzer.name}.') + all_rows.append(row) except Exception as e: logs.error(f'Failed to process {fuzzer.name}: {e}') + if all_rows: + if not args.non_dry_run: + logs.info( + f'DRY RUN: Would insert {len(all_rows)} rows across all fuzzers.' + ) + logs.info(all_rows) + else: + try: + output = io.StringIO() + for row in all_rows: + output.write(json.dumps(row) + '\n') + + content = output.getvalue().encode('utf-8') + media_body = MediaIoBaseUpload( + io.BytesIO(content), + mimetype='application/octet-stream', + resumable=False + ) + + body = { + 'configuration': { + 'load': { + 'destinationTable': { + 'projectId': project_id, + 'datasetId': 'fuzzer_stats', + 'tableId': f'daily_stats${date_partition_str}' + }, + 'sourceFormat': 'NEWLINE_DELIMITED_JSON', + 'writeDisposition': 'WRITE_TRUNCATE', + 'schema': DAILY_STATS_SCHEMA + } + } + } + + request = bigquery_client.jobs().insert( + projectId=project_id, + body=body, + media_body=media_body + ) + response = request.execute(num_retries=2) + logs.info( + f'Successfully loaded data to ' + f'daily_stats${date_partition_str}: {response}' + ) + + except Exception as e: + logs.error(f'Failed to execute batch load job in BigQuery: {e}') + logs.info('Fuzzer stats aggregation cron complete.') diff --git a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py index b83f644bf3..5f1990cf63 100644 --- a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py @@ -13,9 +13,14 @@ # limitations under the License. """Tests for aggregate_fuzzer_stats.""" +import datetime +import json import unittest from unittest import mock +from googleapiclient.errors import HttpError +import httplib2 + from clusterfuzz._internal.cron import aggregate_fuzzer_stats from clusterfuzz._internal.datastore import data_types from clusterfuzz._internal.google_cloud_utils.big_query import QueryResult @@ -36,12 +41,18 @@ def setUp(self): 'clusterfuzz._internal.google_cloud_utils.big_query.get_api_client', 'clusterfuzz._internal.google_cloud_utils.big_query.Client', 'clusterfuzz._internal.base.utils.get_application_id', + 'googleapiclient.http.MediaIoBaseUpload', ]) self.mock.get_application_id.return_value = 'test-clusterfuzz' self.mock_api_client = mock.MagicMock() self.mock.get_api_client.return_value = self.mock_api_client + # Mock tables().get() to throw 404 (simulate table doesn't exist) + resp = httplib2.Response({'status': 404}) + self.mock_api_client.tables().get().execute.side_effect = HttpError( + resp, b'Not found') + self.mock_client_instance = mock.MagicMock() self.mock.Client.return_value = self.mock_client_instance @@ -57,7 +68,13 @@ def setUp(self): }], total_count=1) - self.mock_client_instance.insert.return_value = {} + self.mock_job = mock.MagicMock() + self.mock_api_client.jobs().insert.return_value = self.mock_job + self.mock_job.execute.return_value = { + 'status': { + 'state': 'DONE' + } + } def test_aggregate(self): """Tests execution of the aggregate_fuzzer_stats cron job.""" @@ -82,6 +99,10 @@ def test_aggregate(self): 'datasetId': 'fuzzer_stats', 'tableId': 'daily_stats', }, + 'timePartitioning': { + 'type': 'DAY', + 'field': 'date', + }, 'schema': aggregate_fuzzer_stats.DAILY_STATS_SCHEMA, }, datasetId='fuzzer_stats', @@ -90,10 +111,32 @@ def test_aggregate(self): # Verify query execution self.mock_client_instance.query.assert_called_once() - # Verify insert - self.mock_client_instance.insert.assert_called_once() - args_list = self.mock_client_instance.insert.call_args_list - inserts = args_list[0][0][0] - self.assertEqual(len(inserts), 1) - self.assertEqual(inserts[0].row['fuzzer_name'], 'fuzzer') - self.assertEqual(inserts[0].insert_id, 'fuzzer_2026-05-01') + # Verify load jobs insert + self.mock_api_client.jobs().insert.assert_called_once() + call_kwargs = self.mock_api_client.jobs().insert.call_args[1] + self.assertEqual(call_kwargs['projectId'], 'test-clusterfuzz') + + body = call_kwargs['body'] + load_config = body['configuration']['load'] + self.assertEqual( + load_config['destinationTable']['datasetId'], 'fuzzer_stats') + self.assertEqual(load_config['writeDisposition'], 'WRITE_TRUNCATE') + self.assertEqual(load_config['sourceFormat'], 'NEWLINE_DELIMITED_JSON') + + yesterday = (datetime.datetime.utcnow().date() - datetime.timedelta(days=1)) + expected_table_id = f"daily_stats${yesterday.strftime('%Y%m%d')}" + self.assertEqual( + load_config['destinationTable']['tableId'], expected_table_id) + + # Verify JSON uploaded media content using the patched wrapper inputs + self.mock.MediaIoBaseUpload.assert_called_once() + media_call_args = self.mock.MediaIoBaseUpload.call_args[0] + bytes_io_arg = media_call_args[0] + stream_content = bytes_io_arg.getvalue().decode('utf-8') + uploaded_dict = json.loads(stream_content.strip()) + + self.assertEqual(uploaded_dict['fuzzer_name'], 'fuzzer') + self.assertEqual(uploaded_dict['date'], '2026-05-01') + self.assertEqual(uploaded_dict['testcases_executed'], 10) + + diff --git a/src/local/butler/run.py b/src/local/butler/run.py index c77d92b4e2..518eab1f19 100644 --- a/src/local/butler/run.py +++ b/src/local/butler/run.py @@ -21,6 +21,7 @@ from local.butler import constants from src.clusterfuzz._internal.config import local_config from src.clusterfuzz._internal.datastore import ndb_init +from src.clusterfuzz._internal.base import utils def execute(args): @@ -34,6 +35,16 @@ def execute(args): os.environ['CONFIG_DIR_OVERRIDE'] = os.path.abspath(args.config_dir) local_config.ProjectConfig().set_environment() + app_id = utils.get_application_id() + # Required to use the correct project ID when interacting with the BigQuery client. + if app_id: + # os.environ['GOOGLE_CLOUD_PROJECT'] = app_id + if 'GOOGLE_CLOUD_PROJECT' not in os.environ: + print('empty google cloud project') + else: + print(f'google cloud project {os.environ["GOOGLE_CLOUD_PROJECT"]}') + print(f'app id {app_id}') + if args.local: os.environ['DATASTORE_EMULATOR_HOST'] = constants.DATASTORE_EMULATOR_HOST os.environ['PUBSUB_EMULATOR_HOST'] = constants.PUBSUB_EMULATOR_HOST From 5178d2303792de327740fb0b3e7a3f7427bc93b4 Mon Sep 17 00:00:00 2001 From: Dylan Jew Date: Thu, 30 Apr 2026 14:52:46 -0400 Subject: [PATCH 04/11] Fix interval format and poll completion --- .../_internal/cron/aggregate_fuzzer_stats.py | 159 ++++++++++++++---- .../cron/aggregate_fuzzer_stats_test.py | 19 +-- src/local/butler/run.py | 2 +- 3 files changed, 135 insertions(+), 45 deletions(-) diff --git a/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py b/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py index beff449023..2814060a0f 100644 --- a/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py +++ b/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py @@ -1,7 +1,23 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Cron job to aggregate fuzzer stats onto a daily_stats BigQuery table.""" + import argparse import datetime import io import json +import time from googleapiclient.http import MediaIoBaseUpload @@ -12,8 +28,6 @@ from clusterfuzz._internal.metrics import logs from clusterfuzz._internal.system import environment -# pylint: disable=no-member - DAILY_STATS_SCHEMA = { 'fields': [{ 'name': 'fuzzer_name', @@ -47,10 +61,6 @@ } -class TableConfigurationError(Exception): - """Exception raised for structural mismatches in target databases.""" - - def _create_dataset_if_needed(bigquery, dataset_id): """Create a new dataset if necessary.""" project_id = utils.get_application_id() @@ -91,16 +101,14 @@ def _create_table_if_needed(bigquery, dataset_id, table_id, schema): projectId=project_id, datasetId=dataset_id, tableId=table_id).execute() time_partitioning = table_info.get('timePartitioning') if not time_partitioning or time_partitioning.get('field') != 'date': - logs.info( - f'Table {dataset_id}.{table_id} exists but is unpartitioned or ' - f'configured differently. Re-creating.' - ) + logs.info(f'Table {dataset_id}.{table_id} exists but is unpartitioned or ' + f'configured differently. Re-creating.') bigquery.tables().delete( - projectId=project_id, - datasetId=dataset_id, + projectId=project_id, datasetId=dataset_id, tableId=table_id).execute() - raise TableConfigurationError('Table dropped for re-creation') + except Exception as e: + # TODO: Use real exceptions from the API instead of string matching codes if '404' not in str(e) and 'dropped for re-creation' not in str(e): logs.error( f'Error checking metadata for table {dataset_id}.{table_id}: {e}') @@ -115,6 +123,87 @@ def _create_table_if_needed(bigquery, dataset_id, table_id, schema): logs.error(f'Failed to create table {dataset_id}.{table_id}: {e}') +def _parse_bq_interval_string(s): + """Parses canonical BigQuery INTERVAL string representation to ISO 8601.""" + if not s: + return s + + s = s.strip() + if s.startswith('P') or s.startswith('-P'): + return s + + try: + parts = s.split() + years = 0 + months = 0 + days = 0 + hours = 0 + minutes = 0 + seconds = 0.0 + + for p in parts: + if '-' in p: + is_neg = p.startswith('-') + if is_neg or p.startswith('+'): + p = p[1:] + sub_parts = p.split('-') + if len(sub_parts) == 2: + years = int(sub_parts[0]) + months = int(sub_parts[1]) + if is_neg: + years = -years + months = -months + elif ':' in p: + is_neg = p.startswith('-') + if is_neg or p.startswith('+'): + p = p[1:] + sub_parts = p.split(':') + if len(sub_parts) >= 3: + hours = int(sub_parts[0]) + minutes = int(sub_parts[1]) + seconds = float(sub_parts[2]) + if is_neg: + hours = -hours + minutes = -minutes + seconds = -seconds + else: + days = int(p) + + is_negative_duration = ( + years < 0 or months < 0 or days < 0 or hours < 0 or minutes < 0 or + seconds < 0) + + abs_years = abs(years) + abs_months = abs(months) + abs_days = abs(days) + abs_hours = abs(hours) + abs_minutes = abs(minutes) + abs_seconds = abs(seconds) + + secs = int(abs_seconds) + microseconds = int(round((abs_seconds - secs) * 1000000)) + + duration = '-' if is_negative_duration else '' + duration += 'P' + + return f"{duration}{abs_days}DT{abs_hours}H{abs_minutes}M{secs}.{microseconds:06d}S" + + except Exception: + return s + + +def _poll_completion(bigquery, project_id, job_id): + """Poll for completion.""" + response = bigquery.jobs().get( + projectId=project_id, jobId=job_id).execute(num_retries=2) + while response['status']['state'] == 'RUNNING': + time.sleep(5) + response = bigquery.jobs().get( + projectId=project_id, jobId=job_id).execute(num_retries=2) + + return response + + def main(argv): """Main entry point for the aggregate_fuzzer_stats cron job.""" parser = argparse.ArgumentParser(prog='aggregate_fuzzer_stats') @@ -125,10 +214,8 @@ def main(argv): logs.info('Starting fuzzer stats aggregation cron.') if environment.is_local_development(): - logs.error( - 'BigQuery requires a cloud project to run. ' - 'This cron job cannot run locally.' - ) + logs.error('BigQuery requires a cloud project to run. ' + 'This cron job cannot run locally.') return bigquery_client = big_query.get_api_client() @@ -143,7 +230,7 @@ def main(argv): # pylint: disable=singleton-comparison fuzzers = data_types.Fuzzer.query(data_types.Fuzzer.builtin == False) - yesterday = (datetime.datetime.utcnow().date() - datetime.timedelta(days=1)) + yesterday = utils.utcnow().date() - datetime.timedelta(days=1) date_partition_str = yesterday.strftime('%Y%m%d') all_rows = [] @@ -151,6 +238,9 @@ def main(argv): for fuzzer in fuzzers: logs.info(f'Processing stats for fuzzer: {fuzzer.name}') + if fuzzer.name != 'ochang_js_fuzzer': + continue + dataset_id = fuzzer_stats.dataset_name(fuzzer.name) table_id = 'JobRun' @@ -188,21 +278,24 @@ def main(argv): if all_rows: if not args.non_dry_run: logs.info( - f'DRY RUN: Would insert {len(all_rows)} rows across all fuzzers.' - ) + f'DRY RUN: Would insert {len(all_rows)} rows across all fuzzers.') logs.info(all_rows) else: try: output = io.StringIO() for row in all_rows: - output.write(json.dumps(row) + '\n') + row_dict = dict(row) + for field in ('testcase_execution_duration', + 'testcase_generation_duration', 'fuzzing_duration'): + if field in row_dict and row_dict[field] is not None: + row_dict[field] = _parse_bq_interval_string(row_dict[field]) + output.write(json.dumps(row_dict) + '\n') content = output.getvalue().encode('utf-8') media_body = MediaIoBaseUpload( io.BytesIO(content), mimetype='application/octet-stream', - resumable=False - ) + resumable=False) body = { 'configuration': { @@ -220,15 +313,19 @@ def main(argv): } request = bigquery_client.jobs().insert( - projectId=project_id, - body=body, - media_body=media_body - ) + projectId=project_id, body=body, media_body=media_body) response = request.execute(num_retries=2) - logs.info( - f'Successfully loaded data to ' - f'daily_stats${date_partition_str}: {response}' - ) + job_id = response['jobReference']['jobId'] + + logs.info(f'Monitoring completion for load job id: {job_id}') + poll_response = _poll_completion(bigquery_client, project_id, job_id) + + errors = poll_response['status'].get('errors') + if errors: + logs.error(f'Failed load for {job_id} with errors: {str(errors)})') + else: + logs.info(f'Successfully loaded data to ' + f'daily_stats${date_partition_str}: {poll_response}') except Exception as e: logs.error(f'Failed to execute batch load job in BigQuery: {e}') diff --git a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py index 5f1990cf63..33f1c70e05 100644 --- a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py @@ -70,15 +70,10 @@ def setUp(self): self.mock_job = mock.MagicMock() self.mock_api_client.jobs().insert.return_value = self.mock_job - self.mock_job.execute.return_value = { - 'status': { - 'state': 'DONE' - } - } + self.mock_job.execute.return_value = {'status': {'state': 'DONE'}} - def test_aggregate(self): + def test_aggregate_fuzzer_stats(self): """Tests execution of the aggregate_fuzzer_stats cron job.""" - # Pass argv list instead of mock args object aggregate_fuzzer_stats.main(['--non-dry-run']) # Verify dataset creation attempt @@ -118,15 +113,15 @@ def test_aggregate(self): body = call_kwargs['body'] load_config = body['configuration']['load'] - self.assertEqual( - load_config['destinationTable']['datasetId'], 'fuzzer_stats') + self.assertEqual(load_config['destinationTable']['datasetId'], + 'fuzzer_stats') self.assertEqual(load_config['writeDisposition'], 'WRITE_TRUNCATE') self.assertEqual(load_config['sourceFormat'], 'NEWLINE_DELIMITED_JSON') yesterday = (datetime.datetime.utcnow().date() - datetime.timedelta(days=1)) expected_table_id = f"daily_stats${yesterday.strftime('%Y%m%d')}" - self.assertEqual( - load_config['destinationTable']['tableId'], expected_table_id) + self.assertEqual(load_config['destinationTable']['tableId'], + expected_table_id) # Verify JSON uploaded media content using the patched wrapper inputs self.mock.MediaIoBaseUpload.assert_called_once() @@ -138,5 +133,3 @@ def test_aggregate(self): self.assertEqual(uploaded_dict['fuzzer_name'], 'fuzzer') self.assertEqual(uploaded_dict['date'], '2026-05-01') self.assertEqual(uploaded_dict['testcases_executed'], 10) - - diff --git a/src/local/butler/run.py b/src/local/butler/run.py index 518eab1f19..1c2fea85e7 100644 --- a/src/local/butler/run.py +++ b/src/local/butler/run.py @@ -19,9 +19,9 @@ import sys from local.butler import constants +from src.clusterfuzz._internal.base import utils from src.clusterfuzz._internal.config import local_config from src.clusterfuzz._internal.datastore import ndb_init -from src.clusterfuzz._internal.base import utils def execute(args): From ceec8d17350317ae95ab1382ffb371b126d9affb Mon Sep 17 00:00:00 2001 From: Dylan Jew Date: Thu, 30 Apr 2026 15:55:35 -0400 Subject: [PATCH 05/11] switch to SQL formatting --- .../_internal/cron/aggregate_fuzzer_stats.py | 117 +++++------------- 1 file changed, 29 insertions(+), 88 deletions(-) diff --git a/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py b/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py index 2814060a0f..96a6b7d3e9 100644 --- a/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py +++ b/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py @@ -71,7 +71,7 @@ def _create_dataset_if_needed(bigquery, dataset_id): }, } try: - bigquery.datasets().insert( + bigquery.datasets().insert( # pylint: disable=no-member projectId=project_id, body=dataset_body).execute() logs.info(f'Created dataset {dataset_id}.') except Exception as e: @@ -97,13 +97,13 @@ def _create_table_if_needed(bigquery, dataset_id, table_id, schema): try: # Validate that existing partitioned state holds right parameters - table_info = bigquery.tables().get( + table_info = bigquery.tables().get( # pylint: disable=no-member projectId=project_id, datasetId=dataset_id, tableId=table_id).execute() time_partitioning = table_info.get('timePartitioning') if not time_partitioning or time_partitioning.get('field') != 'date': logs.info(f'Table {dataset_id}.{table_id} exists but is unpartitioned or ' f'configured differently. Re-creating.') - bigquery.tables().delete( + bigquery.tables().delete( # pylint: disable=no-member projectId=project_id, datasetId=dataset_id, tableId=table_id).execute() @@ -115,7 +115,7 @@ def _create_table_if_needed(bigquery, dataset_id, table_id, schema): return try: - bigquery.tables().insert( + bigquery.tables().insert( # pylint: disable=no-member projectId=project_id, datasetId=dataset_id, body=table_body).execute() logs.info(f'Created table {dataset_id}.{table_id}.') except Exception as e: @@ -123,82 +123,13 @@ def _create_table_if_needed(bigquery, dataset_id, table_id, schema): logs.error(f'Failed to create table {dataset_id}.{table_id}: {e}') -def _parse_bq_interval_string(s): - """Parses canonical BigQuery INTERVAL string representation to ISO 8601.""" - if not s: - return s - - s = s.strip() - if s.startswith('P') or s.startswith('-P'): - return s - - try: - parts = s.split() - years = 0 - months = 0 - days = 0 - hours = 0 - minutes = 0 - seconds = 0.0 - - for p in parts: - if '-' in p: - is_neg = p.startswith('-') - if is_neg or p.startswith('+'): - p = p[1:] - sub_parts = p.split('-') - if len(sub_parts) == 2: - years = int(sub_parts[0]) - months = int(sub_parts[1]) - if is_neg: - years = -years - months = -months - elif ':' in p: - is_neg = p.startswith('-') - if is_neg or p.startswith('+'): - p = p[1:] - sub_parts = p.split(':') - if len(sub_parts) >= 3: - hours = int(sub_parts[0]) - minutes = int(sub_parts[1]) - seconds = float(sub_parts[2]) - if is_neg: - hours = -hours - minutes = -minutes - seconds = -seconds - else: - days = int(p) - - is_negative_duration = ( - years < 0 or months < 0 or days < 0 or hours < 0 or minutes < 0 or - seconds < 0) - - abs_years = abs(years) - abs_months = abs(months) - abs_days = abs(days) - abs_hours = abs(hours) - abs_minutes = abs(minutes) - abs_seconds = abs(seconds) - - secs = int(abs_seconds) - microseconds = int(round((abs_seconds - secs) * 1000000)) - - duration = '-' if is_negative_duration else '' - duration += 'P' - - return f"{duration}{abs_days}DT{abs_hours}H{abs_minutes}M{secs}.{microseconds:06d}S" - - except Exception: - return s - - def _poll_completion(bigquery, project_id, job_id): """Poll for completion.""" - response = bigquery.jobs().get( + response = bigquery.jobs().get( # pylint: disable=no-member projectId=project_id, jobId=job_id).execute(num_retries=2) while response['status']['state'] == 'RUNNING': time.sleep(5) - response = bigquery.jobs().get( + response = bigquery.jobs().get( # pylint: disable=no-member projectId=project_id, jobId=job_id).execute(num_retries=2) return response @@ -238,9 +169,6 @@ def main(argv): for fuzzer in fuzzers: logs.info(f'Processing stats for fuzzer: {fuzzer.name}') - if fuzzer.name != 'ochang_js_fuzzer': - continue - dataset_id = fuzzer_stats.dataset_name(fuzzer.name) table_id = 'JobRun' @@ -249,10 +177,28 @@ def main(argv): '{fuzzer.name}' as fuzzer_name, CAST(DATE(TIMESTAMP_SECONDS(CAST(timestamp AS INT64))) AS STRING) as date, SUM(testcases_executed) as testcases_executed, - CAST(SUM(testcase_execution_duration) AS STRING) as testcase_execution_duration, + CONCAT( + 'P', + CAST(EXTRACT(DAY FROM SUM(testcase_execution_duration)) AS STRING), 'DT', + CAST(EXTRACT(HOUR FROM SUM(testcase_execution_duration)) AS STRING), 'H', + CAST(EXTRACT(MINUTE FROM SUM(testcase_execution_duration)) AS STRING), 'M', + CAST(EXTRACT(SECOND FROM SUM(testcase_execution_duration)) AS STRING), 'S' + ) as testcase_execution_duration, SUM(testcases_generated) as testcases_generated, - CAST(SUM(testcase_generation_duration) AS STRING) as testcase_generation_duration, - CAST(SUM(fuzzing_duration) AS STRING) as fuzzing_duration + CONCAT( + 'P', + CAST(EXTRACT(DAY FROM SUM(testcase_generation_duration)) AS STRING), 'DT', + CAST(EXTRACT(HOUR FROM SUM(testcase_generation_duration)) AS STRING), 'H', + CAST(EXTRACT(MINUTE FROM SUM(testcase_generation_duration)) AS STRING), 'M', + CAST(EXTRACT(SECOND FROM SUM(testcase_generation_duration)) AS STRING), 'S' + ) as testcase_generation_duration, + CONCAT( + 'P', + CAST(EXTRACT(DAY FROM SUM(fuzzing_duration)) AS STRING), 'DT', + CAST(EXTRACT(HOUR FROM SUM(fuzzing_duration)) AS STRING), 'H', + CAST(EXTRACT(MINUTE FROM SUM(fuzzing_duration)) AS STRING), 'M', + CAST(EXTRACT(SECOND FROM SUM(fuzzing_duration)) AS STRING), 'S' + ) as fuzzing_duration FROM `{project_id}.{dataset_id}.{table_id}` WHERE @@ -284,12 +230,7 @@ def main(argv): try: output = io.StringIO() for row in all_rows: - row_dict = dict(row) - for field in ('testcase_execution_duration', - 'testcase_generation_duration', 'fuzzing_duration'): - if field in row_dict and row_dict[field] is not None: - row_dict[field] = _parse_bq_interval_string(row_dict[field]) - output.write(json.dumps(row_dict) + '\n') + output.write(json.dumps(row) + '\n') content = output.getvalue().encode('utf-8') media_body = MediaIoBaseUpload( @@ -312,7 +253,7 @@ def main(argv): } } - request = bigquery_client.jobs().insert( + request = bigquery_client.jobs().insert( # pylint: disable=no-member projectId=project_id, body=body, media_body=media_body) response = request.execute(num_retries=2) job_id = response['jobReference']['jobId'] From 10c574dc4a6d820751ff5402539e24c8d794280e Mon Sep 17 00:00:00 2001 From: Dylan Jew Date: Thu, 30 Apr 2026 16:18:39 -0400 Subject: [PATCH 06/11] update test --- .../_internal/cron/aggregate_fuzzer_stats.py | 17 ++++---- .../cron/aggregate_fuzzer_stats_test.py | 40 +++++++++---------- 2 files changed, 30 insertions(+), 27 deletions(-) diff --git a/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py b/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py index 96a6b7d3e9..e92729b236 100644 --- a/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py +++ b/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py @@ -71,7 +71,7 @@ def _create_dataset_if_needed(bigquery, dataset_id): }, } try: - bigquery.datasets().insert( # pylint: disable=no-member + bigquery.datasets().insert( projectId=project_id, body=dataset_body).execute() logs.info(f'Created dataset {dataset_id}.') except Exception as e: @@ -97,13 +97,13 @@ def _create_table_if_needed(bigquery, dataset_id, table_id, schema): try: # Validate that existing partitioned state holds right parameters - table_info = bigquery.tables().get( # pylint: disable=no-member + table_info = bigquery.tables().get( projectId=project_id, datasetId=dataset_id, tableId=table_id).execute() time_partitioning = table_info.get('timePartitioning') if not time_partitioning or time_partitioning.get('field') != 'date': logs.info(f'Table {dataset_id}.{table_id} exists but is unpartitioned or ' f'configured differently. Re-creating.') - bigquery.tables().delete( # pylint: disable=no-member + bigquery.tables().delete( projectId=project_id, datasetId=dataset_id, tableId=table_id).execute() @@ -115,7 +115,7 @@ def _create_table_if_needed(bigquery, dataset_id, table_id, schema): return try: - bigquery.tables().insert( # pylint: disable=no-member + bigquery.tables().insert( projectId=project_id, datasetId=dataset_id, body=table_body).execute() logs.info(f'Created table {dataset_id}.{table_id}.') except Exception as e: @@ -125,11 +125,11 @@ def _create_table_if_needed(bigquery, dataset_id, table_id, schema): def _poll_completion(bigquery, project_id, job_id): """Poll for completion.""" - response = bigquery.jobs().get( # pylint: disable=no-member + response = bigquery.jobs().get( projectId=project_id, jobId=job_id).execute(num_retries=2) while response['status']['state'] == 'RUNNING': time.sleep(5) - response = bigquery.jobs().get( # pylint: disable=no-member + response = bigquery.jobs().get( projectId=project_id, jobId=job_id).execute(num_retries=2) return response @@ -140,6 +140,9 @@ def main(argv): parser = argparse.ArgumentParser(prog='aggregate_fuzzer_stats') parser.add_argument( '--non-dry-run', action='store_true', help='Whether to write to BigQuery') + # parser.add_argument( + # '--force-replace', action='store_true', help='Force replace the existing date partition rather than returning a successful no-op by generating a unique BigQuery' + # ) args = parser.parse_args(argv) logs.info('Starting fuzzer stats aggregation cron.') @@ -253,7 +256,7 @@ def main(argv): } } - request = bigquery_client.jobs().insert( # pylint: disable=no-member + request = bigquery_client.jobs().insert( projectId=project_id, body=body, media_body=media_body) response = request.execute(num_retries=2) job_id = response['jobReference']['jobId'] diff --git a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py index 33f1c70e05..94a5823b97 100644 --- a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py @@ -34,7 +34,7 @@ class AggregateFuzzerStatsTest(unittest.TestCase): def setUp(self): # Create a non-builtin fuzzer - data_types.Fuzzer(name='fuzzer', jobs=['job'], builtin=False).put() + data_types.Fuzzer(name='ochang_js_fuzzer', jobs=['job'], builtin=False).put() data_types.Job(name='job').put() test_helpers.patch(self, [ @@ -56,24 +56,24 @@ def setUp(self): self.mock_client_instance = mock.MagicMock() self.mock.Client.return_value = self.mock_client_instance - self.mock_client_instance.query.return_value = QueryResult( - rows=[{ - 'fuzzer_name': 'fuzzer', - 'date': '2026-05-01', - 'testcases_executed': 10, - 'testcase_execution_duration': 'P0DT0H1M0S', - 'testcases_generated': 5, - 'testcase_generation_duration': 'P0DT0H0M30S', - 'fuzzing_duration': 'P0DT0H1M30S' - }], - total_count=1) - self.mock_job = mock.MagicMock() self.mock_api_client.jobs().insert.return_value = self.mock_job self.mock_job.execute.return_value = {'status': {'state': 'DONE'}} def test_aggregate_fuzzer_stats(self): """Tests execution of the aggregate_fuzzer_stats cron job.""" + self.mock_client_instance.query.return_value = QueryResult( + rows=[{ + 'fuzzer_name': 'ochang_js_fuzzer', + 'date': '2026-04-29', + 'testcases_executed': 10495, + 'testcase_execution_duration': 'P0DT11H12M11S', + 'testcases_generated': 10495, + 'testcase_generation_duration': 'P0DT1H15M33S', + 'fuzzing_duration': 'P0DT12H49M49S' + }], + total_count=1) + aggregate_fuzzer_stats.main(['--non-dry-run']) # Verify dataset creation attempt @@ -85,8 +85,6 @@ def test_aggregate_fuzzer_stats(self): 'datasetId': 'fuzzer_stats' } }) - - # Verify table creation attempt self.mock_api_client.tables().insert.assert_called_with( body={ 'tableReference': { @@ -103,8 +101,6 @@ def test_aggregate_fuzzer_stats(self): datasetId='fuzzer_stats', projectId='test-clusterfuzz') - # Verify query execution - self.mock_client_instance.query.assert_called_once() # Verify load jobs insert self.mock_api_client.jobs().insert.assert_called_once() @@ -130,6 +126,10 @@ def test_aggregate_fuzzer_stats(self): stream_content = bytes_io_arg.getvalue().decode('utf-8') uploaded_dict = json.loads(stream_content.strip()) - self.assertEqual(uploaded_dict['fuzzer_name'], 'fuzzer') - self.assertEqual(uploaded_dict['date'], '2026-05-01') - self.assertEqual(uploaded_dict['testcases_executed'], 10) + self.assertEqual(uploaded_dict['fuzzer_name'], 'ochang_js_fuzzer') + self.assertEqual(uploaded_dict['date'], '2026-04-29') + self.assertEqual(uploaded_dict['testcases_executed'], 10495) + self.assertEqual(uploaded_dict['testcase_execution_duration'], 'P0DT11H12M11S') + self.assertEqual(uploaded_dict['testcases_generated'], 10495) + self.assertEqual(uploaded_dict['testcase_generation_duration'], 'P0DT1H15M33S') + self.assertEqual(uploaded_dict['fuzzing_duration'], 'P0DT12H49M49S') From 3e77b137503c071da77b295ee56074b6bbf6c3fb Mon Sep 17 00:00:00 2001 From: Dylan Jew Date: Thu, 30 Apr 2026 17:07:38 -0400 Subject: [PATCH 07/11] use concurrent threads --- .../_internal/cron/aggregate_fuzzer_stats.py | 258 ++++++++++-------- .../cron/aggregate_fuzzer_stats_test.py | 10 +- 2 files changed, 155 insertions(+), 113 deletions(-) diff --git a/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py b/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py index e92729b236..cbf7c4eab9 100644 --- a/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py +++ b/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py @@ -14,6 +14,8 @@ """Cron job to aggregate fuzzer stats onto a daily_stats BigQuery table.""" import argparse +from concurrent.futures import as_completed +from concurrent.futures import ThreadPoolExecutor import datetime import io import json @@ -28,6 +30,11 @@ from clusterfuzz._internal.metrics import logs from clusterfuzz._internal.system import environment +# Limit orker count to 4 concurrent threads to prevents exhaustion of +# project-wide queued interactive queries quota (1,000 maximum). See +# https://cloud.google.com/bigquery/quotas#query_jobs +NUM_THREADS = 4 + DAILY_STATS_SCHEMA = { 'fields': [{ 'name': 'fuzzer_name', @@ -135,14 +142,145 @@ def _poll_completion(bigquery, project_id, job_id): return response +def _query_fuzzer_stats(fuzzer_name, project_id): + """Queries single fuzzer stats for yesterday.""" + dataset_id = fuzzer_stats.dataset_name(fuzzer_name) + table_id = 'JobRun' + + # TODO: Pass timestamp from an optional flag with default value for yesterday + query = f""" + SELECT + '{fuzzer_name}' as fuzzer_name, + CAST(DATE(TIMESTAMP_SECONDS(CAST(timestamp AS INT64))) AS STRING) as date, + SUM(testcases_executed) as testcases_executed, + CONCAT( + 'P', + CAST(EXTRACT(DAY FROM SUM(testcase_execution_duration)) AS STRING), 'DT', + CAST(EXTRACT(HOUR FROM SUM(testcase_execution_duration)) AS STRING), 'H', + CAST(EXTRACT(MINUTE FROM SUM(testcase_execution_duration)) AS STRING), 'M', + CAST(EXTRACT(SECOND FROM SUM(testcase_execution_duration)) AS STRING), 'S' + ) as testcase_execution_duration, + SUM(testcases_generated) as testcases_generated, + CONCAT( + 'P', + CAST(EXTRACT(DAY FROM SUM(testcase_generation_duration)) AS STRING), 'DT', + CAST(EXTRACT(HOUR FROM SUM(testcase_generation_duration)) AS STRING), 'H', + CAST(EXTRACT(MINUTE FROM SUM(testcase_generation_duration)) AS STRING), 'M', + CAST(EXTRACT(SECOND FROM SUM(testcase_generation_duration)) AS STRING), 'S' + ) as testcase_generation_duration, + CONCAT( + 'P', + CAST(EXTRACT(DAY FROM SUM(fuzzing_duration)) AS STRING), 'DT', + CAST(EXTRACT(HOUR FROM SUM(fuzzing_duration)) AS STRING), 'H', + CAST(EXTRACT(MINUTE FROM SUM(fuzzing_duration)) AS STRING), 'M', + CAST(EXTRACT(SECOND FROM SUM(fuzzing_duration)) AS STRING), 'S' + ) as fuzzing_duration + FROM + `{project_id}.{dataset_id}.{table_id}` + WHERE + DATE(TIMESTAMP_SECONDS(CAST(timestamp AS INT64))) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY) + GROUP BY + date + """ + + try: + source_client = big_query.Client() + result = source_client.query(query) + + if not result.rows: + logs.info(f'No data for {fuzzer_name} for yesterday.') + return [] + + return list(result.rows) + + except Exception as e: + logs.error(f'Failed to process {fuzzer_name}: {e}') + return [] + + +def _gather_all_stats(fuzzers, project_id): + """Gathers fuzzer statistics concurrently using a thread pool.""" + all_rows = [] + with ThreadPoolExecutor(max_workers=NUM_THREADS) as executor: + future_to_fuzzer = { + executor.submit(_query_fuzzer_stats, fuzzer.name, project_id): fuzzer + for fuzzer in fuzzers + } + + for future in as_completed(future_to_fuzzer): + fuzzer = future_to_fuzzer[future] + try: + rows = future.result() + if rows: + all_rows.extend(rows) + except Exception as e: + logs.error(f'Task execution crashed for {fuzzer.name}: {e}') + + return all_rows + + +def _persist_daily_stats(all_rows, bigquery_client, project_id, + date_partition_str, non_dry_run): + """Writes gathered row statistics to destination table.""" + if not all_rows: + logs.error(f'No data to write to daily_stats on {date_partition_str}') + return + + if not non_dry_run: + logs.info(f'DRY RUN: Would insert {len(all_rows)} rows across all fuzzers.') + logs.info(all_rows) + return + + try: + output = io.StringIO() + for row in all_rows: + output.write(json.dumps(row) + '\n') + + content = output.getvalue().encode('utf-8') + media_body = MediaIoBaseUpload( + io.BytesIO(content), + mimetype='application/octet-stream', + resumable=False) + + body = { + 'configuration': { + 'load': { + 'destinationTable': { + 'projectId': project_id, + 'datasetId': 'fuzzer_stats', + 'tableId': f'daily_stats${date_partition_str}' + }, + 'sourceFormat': 'NEWLINE_DELIMITED_JSON', + 'writeDisposition': 'WRITE_TRUNCATE', + 'schema': DAILY_STATS_SCHEMA + } + } + } + + request = bigquery_client.jobs().insert( + projectId=project_id, body=body, media_body=media_body) + response = request.execute(num_retries=2) + job_id = response['jobReference']['jobId'] + + logs.info(f'Monitoring completion for load job id: {job_id}') + poll_response = _poll_completion(bigquery_client, project_id, job_id) + + errors = poll_response['status'].get('errors') + if errors: + logs.error(f'Failed load for {job_id} with errors: {str(errors)})') + else: + logs.info(f'Successfully loaded data to ' + f'daily_stats${date_partition_str}: {poll_response}') + + except Exception as e: + logs.error(f'Failed to execute batch load job in BigQuery: {e}') + + def main(argv): """Main entry point for the aggregate_fuzzer_stats cron job.""" parser = argparse.ArgumentParser(prog='aggregate_fuzzer_stats') parser.add_argument( '--non-dry-run', action='store_true', help='Whether to write to BigQuery') - # parser.add_argument( - # '--force-replace', action='store_true', help='Force replace the existing date partition rather than returning a successful no-op by generating a unique BigQuery' - # ) args = parser.parse_args(argv) logs.info('Starting fuzzer stats aggregation cron.') @@ -162,116 +300,18 @@ def main(argv): # The linter suggests a comparison that isn't supported by query() filters. # pylint: disable=singleton-comparison - fuzzers = data_types.Fuzzer.query(data_types.Fuzzer.builtin == False) + fuzzers = list(data_types.Fuzzer.query(data_types.Fuzzer.builtin == False)) yesterday = utils.utcnow().date() - datetime.timedelta(days=1) date_partition_str = yesterday.strftime('%Y%m%d') - all_rows = [] - - for fuzzer in fuzzers: - logs.info(f'Processing stats for fuzzer: {fuzzer.name}') - - dataset_id = fuzzer_stats.dataset_name(fuzzer.name) - table_id = 'JobRun' - - query = f""" - SELECT - '{fuzzer.name}' as fuzzer_name, - CAST(DATE(TIMESTAMP_SECONDS(CAST(timestamp AS INT64))) AS STRING) as date, - SUM(testcases_executed) as testcases_executed, - CONCAT( - 'P', - CAST(EXTRACT(DAY FROM SUM(testcase_execution_duration)) AS STRING), 'DT', - CAST(EXTRACT(HOUR FROM SUM(testcase_execution_duration)) AS STRING), 'H', - CAST(EXTRACT(MINUTE FROM SUM(testcase_execution_duration)) AS STRING), 'M', - CAST(EXTRACT(SECOND FROM SUM(testcase_execution_duration)) AS STRING), 'S' - ) as testcase_execution_duration, - SUM(testcases_generated) as testcases_generated, - CONCAT( - 'P', - CAST(EXTRACT(DAY FROM SUM(testcase_generation_duration)) AS STRING), 'DT', - CAST(EXTRACT(HOUR FROM SUM(testcase_generation_duration)) AS STRING), 'H', - CAST(EXTRACT(MINUTE FROM SUM(testcase_generation_duration)) AS STRING), 'M', - CAST(EXTRACT(SECOND FROM SUM(testcase_generation_duration)) AS STRING), 'S' - ) as testcase_generation_duration, - CONCAT( - 'P', - CAST(EXTRACT(DAY FROM SUM(fuzzing_duration)) AS STRING), 'DT', - CAST(EXTRACT(HOUR FROM SUM(fuzzing_duration)) AS STRING), 'H', - CAST(EXTRACT(MINUTE FROM SUM(fuzzing_duration)) AS STRING), 'M', - CAST(EXTRACT(SECOND FROM SUM(fuzzing_duration)) AS STRING), 'S' - ) as fuzzing_duration - FROM - `{project_id}.{dataset_id}.{table_id}` - WHERE - DATE(TIMESTAMP_SECONDS(CAST(timestamp AS INT64))) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY) - GROUP BY - date - """ - - try: - source_client = big_query.Client() - result = source_client.query(query) - - if not result.rows: - logs.info(f'No data for {fuzzer.name} for yesterday.') - continue - - for row in result.rows: - all_rows.append(row) - - except Exception as e: - logs.error(f'Failed to process {fuzzer.name}: {e}') - - if all_rows: - if not args.non_dry_run: - logs.info( - f'DRY RUN: Would insert {len(all_rows)} rows across all fuzzers.') - logs.info(all_rows) - else: - try: - output = io.StringIO() - for row in all_rows: - output.write(json.dumps(row) + '\n') - - content = output.getvalue().encode('utf-8') - media_body = MediaIoBaseUpload( - io.BytesIO(content), - mimetype='application/octet-stream', - resumable=False) - - body = { - 'configuration': { - 'load': { - 'destinationTable': { - 'projectId': project_id, - 'datasetId': 'fuzzer_stats', - 'tableId': f'daily_stats${date_partition_str}' - }, - 'sourceFormat': 'NEWLINE_DELIMITED_JSON', - 'writeDisposition': 'WRITE_TRUNCATE', - 'schema': DAILY_STATS_SCHEMA - } - } - } - - request = bigquery_client.jobs().insert( - projectId=project_id, body=body, media_body=media_body) - response = request.execute(num_retries=2) - job_id = response['jobReference']['jobId'] - - logs.info(f'Monitoring completion for load job id: {job_id}') - poll_response = _poll_completion(bigquery_client, project_id, job_id) + all_rows = _gather_all_stats(fuzzers, project_id) - errors = poll_response['status'].get('errors') - if errors: - logs.error(f'Failed load for {job_id} with errors: {str(errors)})') - else: - logs.info(f'Successfully loaded data to ' - f'daily_stats${date_partition_str}: {poll_response}') - - except Exception as e: - logs.error(f'Failed to execute batch load job in BigQuery: {e}') + _persist_daily_stats( + all_rows=all_rows, + bigquery_client=bigquery_client, + project_id=project_id, + date_partition_str=date_partition_str, + non_dry_run=args.non_dry_run) logs.info('Fuzzer stats aggregation cron complete.') diff --git a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py index 94a5823b97..0433ba46db 100644 --- a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py @@ -34,7 +34,8 @@ class AggregateFuzzerStatsTest(unittest.TestCase): def setUp(self): # Create a non-builtin fuzzer - data_types.Fuzzer(name='ochang_js_fuzzer', jobs=['job'], builtin=False).put() + data_types.Fuzzer( + name='ochang_js_fuzzer', jobs=['job'], builtin=False).put() data_types.Job(name='job').put() test_helpers.patch(self, [ @@ -101,7 +102,6 @@ def test_aggregate_fuzzer_stats(self): datasetId='fuzzer_stats', projectId='test-clusterfuzz') - # Verify load jobs insert self.mock_api_client.jobs().insert.assert_called_once() call_kwargs = self.mock_api_client.jobs().insert.call_args[1] @@ -129,7 +129,9 @@ def test_aggregate_fuzzer_stats(self): self.assertEqual(uploaded_dict['fuzzer_name'], 'ochang_js_fuzzer') self.assertEqual(uploaded_dict['date'], '2026-04-29') self.assertEqual(uploaded_dict['testcases_executed'], 10495) - self.assertEqual(uploaded_dict['testcase_execution_duration'], 'P0DT11H12M11S') + self.assertEqual(uploaded_dict['testcase_execution_duration'], + 'P0DT11H12M11S') self.assertEqual(uploaded_dict['testcases_generated'], 10495) - self.assertEqual(uploaded_dict['testcase_generation_duration'], 'P0DT1H15M33S') + self.assertEqual(uploaded_dict['testcase_generation_duration'], + 'P0DT1H15M33S') self.assertEqual(uploaded_dict['fuzzing_duration'], 'P0DT12H49M49S') From 28274f6ae7d7716b4fd59973c662d186da87a724 Mon Sep 17 00:00:00 2001 From: Dylan Jew Date: Thu, 30 Apr 2026 17:40:41 -0400 Subject: [PATCH 08/11] error handling --- .../_internal/cron/aggregate_fuzzer_stats.py | 92 ++++++++++--------- .../cron/aggregate_fuzzer_stats_test.py | 37 +++++++- .../scripts/update_bigquery_stats_schema.py | 86 +++++++++++++++++ 3 files changed, 169 insertions(+), 46 deletions(-) create mode 100644 src/local/butler/scripts/update_bigquery_stats_schema.py diff --git a/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py b/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py index cbf7c4eab9..8d678fd535 100644 --- a/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py +++ b/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py @@ -19,21 +19,26 @@ import datetime import io import json +import random import time +from googleapiclient.errors import HttpError from googleapiclient.http import MediaIoBaseUpload +import httplib2 from clusterfuzz._internal.base import utils -from clusterfuzz._internal.datastore import data_types +from clusterfuzz._internal.datastore import data_types, ndb_utils from clusterfuzz._internal.google_cloud_utils import big_query from clusterfuzz._internal.metrics import fuzzer_stats from clusterfuzz._internal.metrics import logs from clusterfuzz._internal.system import environment -# Limit orker count to 4 concurrent threads to prevents exhaustion of +# Limit worker count to 4 concurrent threads to prevent exhaustion of # project-wide queued interactive queries quota (1,000 maximum). See # https://cloud.google.com/bigquery/quotas#query_jobs NUM_THREADS = 4 +NUM_RETRIES = 2 +RETRY_SLEEP_TIME = 5 DAILY_STATS_SCHEMA = { 'fields': [{ @@ -68,6 +73,29 @@ } +def _execute_insert_request(request): + """Executes a table/dataset insert request, retrying on transport errors.""" + for i in range(NUM_RETRIES + 1): + try: + request.execute() + return + except HttpError as e: + if e.resp.status == 409: + # 409 Conflict: Returned when the resource already exists. This is + # expected in after the first execution because tables are created + # exactly once. + return + + logs.error('Failed to insert table/dataset.', exception=e) + raise + except httplib2.HttpLib2Error as e: + # Network or transport error, retry operation with exponential back-off. + if i == NUM_RETRIES: + logs.error('Failed to insert table/dataset after retries.', exception=e) + raise + time.sleep(random.uniform(0, (1 << i) * RETRY_SLEEP_TIME)) + + def _create_dataset_if_needed(bigquery, dataset_id): """Create a new dataset if necessary.""" project_id = utils.get_application_id() @@ -77,13 +105,10 @@ def _create_dataset_if_needed(bigquery, dataset_id): 'projectId': project_id, }, } - try: - bigquery.datasets().insert( - projectId=project_id, body=dataset_body).execute() - logs.info(f'Created dataset {dataset_id}.') - except Exception as e: - if '409' not in str(e): - logs.error(f'Failed to create dataset {dataset_id}: {e}') + dataset_insert = bigquery.datasets().insert( + projectId=project_id, body=dataset_body) + + _execute_insert_request(dataset_insert) def _create_table_if_needed(bigquery, dataset_id, table_id, schema): @@ -102,32 +127,9 @@ def _create_table_if_needed(bigquery, dataset_id, table_id, schema): 'schema': schema } - try: - # Validate that existing partitioned state holds right parameters - table_info = bigquery.tables().get( - projectId=project_id, datasetId=dataset_id, tableId=table_id).execute() - time_partitioning = table_info.get('timePartitioning') - if not time_partitioning or time_partitioning.get('field') != 'date': - logs.info(f'Table {dataset_id}.{table_id} exists but is unpartitioned or ' - f'configured differently. Re-creating.') - bigquery.tables().delete( - projectId=project_id, datasetId=dataset_id, - tableId=table_id).execute() - - except Exception as e: - # TODO: Use real exceptions from the API instead of string matching codes - if '404' not in str(e) and 'dropped for re-creation' not in str(e): - logs.error( - f'Error checking metadata for table {dataset_id}.{table_id}: {e}') - return - - try: - bigquery.tables().insert( - projectId=project_id, datasetId=dataset_id, body=table_body).execute() - logs.info(f'Created table {dataset_id}.{table_id}.') - except Exception as e: - if '409' not in str(e): - logs.error(f'Failed to create table {dataset_id}.{table_id}: {e}') + table_insert = bigquery.tables().insert( + projectId=project_id, datasetId=dataset_id, body=table_body) + _execute_insert_request(table_insert) def _poll_completion(bigquery, project_id, job_id): @@ -193,8 +195,16 @@ def _query_fuzzer_stats(fuzzer_name, project_id): return list(result.rows) + except HttpError as e: + if e.resp.status == 404: + logs.info( + f'JobRun table or dataset does not exist for {fuzzer_name}. Skipping.' + ) + return [] + else: + raise # fallback to general exception except Exception as e: - logs.error(f'Failed to process {fuzzer_name}: {e}') + logs.error(f'Failed to process {fuzzer_name}', exception=e) return [] @@ -214,7 +224,7 @@ def _gather_all_stats(fuzzers, project_id): if rows: all_rows.extend(rows) except Exception as e: - logs.error(f'Task execution crashed for {fuzzer.name}: {e}') + logs.error(f'Task execution crashed for {fuzzer.name}', exception=e) return all_rows @@ -273,7 +283,7 @@ def _persist_daily_stats(all_rows, bigquery_client, project_id, f'daily_stats${date_partition_str}: {poll_response}') except Exception as e: - logs.error(f'Failed to execute batch load job in BigQuery: {e}') + logs.error('Failed to execute batch load job in BigQuery', exception=e) def main(argv): @@ -298,9 +308,9 @@ def main(argv): _create_table_if_needed(bigquery_client, 'fuzzer_stats', 'daily_stats', DAILY_STATS_SCHEMA) - # The linter suggests a comparison that isn't supported by query() filters. - # pylint: disable=singleton-comparison - fuzzers = list(data_types.Fuzzer.query(data_types.Fuzzer.builtin == False)) + fuzzers = list(data_types.Fuzzer.query( + ndb_utils.is_false(data_types.Fuzzer.builtin))) + yesterday = utils.utcnow().date() - datetime.timedelta(days=1) date_partition_str = yesterday.strftime('%Y%m%d') diff --git a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py index 0433ba46db..f096016a88 100644 --- a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py @@ -49,11 +49,6 @@ def setUp(self): self.mock_api_client = mock.MagicMock() self.mock.get_api_client.return_value = self.mock_api_client - # Mock tables().get() to throw 404 (simulate table doesn't exist) - resp = httplib2.Response({'status': 404}) - self.mock_api_client.tables().get().execute.side_effect = HttpError( - resp, b'Not found') - self.mock_client_instance = mock.MagicMock() self.mock.Client.return_value = self.mock_client_instance @@ -135,3 +130,35 @@ def test_aggregate_fuzzer_stats(self): self.assertEqual(uploaded_dict['testcase_generation_duration'], 'P0DT1H15M33S') self.assertEqual(uploaded_dict['fuzzing_duration'], 'P0DT12H49M49S') + + def test_aggregate_fuzzer_stats_ignoring_409(self): + """Tests that execution successfully proceeds past HTTP 409 Conflict scenarios.""" + resp = httplib2.Response({'status': 409}) + self.mock_api_client.datasets().insert().execute.side_effect = HttpError( + resp, b'Already exists') + + self.mock_client_instance.query.return_value = QueryResult( + rows=[{ + 'fuzzer_name': 'ochang_js_fuzzer', + 'date': '2026-04-29', + 'testcases_executed': 10495, + 'testcase_execution_duration': 'P0DT11H12M11S', + 'testcases_generated': 10495, + 'testcase_generation_duration': 'P0DT1H15M33S', + 'fuzzing_duration': 'P0DT12H49M49S' + }], + total_count=1) + + aggregate_fuzzer_stats.main(['--non-dry-run']) + + # Should attempt inserting the table nonetheless + self.mock_api_client.tables().insert.assert_called_once() + + def test_aggregate_fuzzer_stats_raises_non_409(self): + """Tests that unexpected HTTP errors cause job failures.""" + resp = httplib2.Response({'status': 500}) + self.mock_api_client.datasets().insert().execute.side_effect = HttpError( + resp, b'Internal Server Error') + + with self.assertRaises(HttpError): + aggregate_fuzzer_stats.main(['--non-dry-run']) diff --git a/src/local/butler/scripts/update_bigquery_stats_schema.py b/src/local/butler/scripts/update_bigquery_stats_schema.py new file mode 100644 index 0000000000..7b66e21f7e --- /dev/null +++ b/src/local/butler/scripts/update_bigquery_stats_schema.py @@ -0,0 +1,86 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Script to migrate BigQuery JobRun table schemas for old fuzzers.""" + +from googleapiclient.errors import HttpError + +from clusterfuzz._internal.base import utils +from clusterfuzz._internal.datastore import data_types +from clusterfuzz._internal.google_cloud_utils import big_query +from clusterfuzz._internal.metrics import fuzzer_stats + + +def execute(args): + """Migrate historical BigQuery JobRun tables to current JOB_RUN_SCHEMA. + + Adds missing duration fields to fuzzer statistics datasets in BigQuery. + """ + print('Starting BigQuery statistics tables schema migration.') + + bigquery_client = big_query.get_api_client() + project_id = utils.get_application_id() + + fuzzers = list(data_types.Fuzzer.query(data_types.Fuzzer.builtin == False)) + count = 0 + + for fuzzer in fuzzers: + dataset_id = fuzzer_stats.dataset_name(fuzzer.name) + table_id = 'JobRun' + + try: + table = bigquery_client.tables().get( + projectId=project_id, datasetId=dataset_id, + tableId=table_id).execute() + except HttpError as e: + if e.resp.status == 404: + # Table or dataset doesn't exist. No schema to update. + continue + print(f'Failed getting table details for {fuzzer.name}: {e}') + continue + except Exception as e: + print(f'Failed getting table details for {fuzzer.name}: {e}') + continue + + fields = table.get('schema', {}).get('fields', []) + existing_names = {f['name'] for f in fields} + + expected_fields = fuzzer_stats.JobRun.SCHEMA['fields'] + missing_fields = [ + f for f in expected_fields if f['name'] not in existing_names + ] + + if not missing_fields: + continue + + updated_fields = list(fields) + missing_fields + body = {'schema': {'fields': updated_fields}} + + if not args.non_dry_run: + missing_names = [f['name'] for f in missing_fields] + print(f'DRY RUN: Would append {missing_names} to {fuzzer.name}.') + else: + try: + bigquery_client.tables().patch( + projectId=project_id, + datasetId=dataset_id, + tableId=table_id, + body=body).execute() + print(f'Successfully updated schema for fuzzer: {fuzzer.name}') + count += 1 + except Exception as e: + print(f'Error updating schema for {fuzzer.name}: {e}') + + print( + f'BigQuery schema migration complete. Updated {count} fuzzer stats schemas.' + ) From 910fa1b542c2bd2ea8739766a40417e0015255e7 Mon Sep 17 00:00:00 2001 From: Dylan Jew Date: Sat, 2 May 2026 11:50:12 -0400 Subject: [PATCH 09/11] Clean up and fixes --- .../_internal/cron/aggregate_fuzzer_stats.py | 26 +++++++++---------- .../cron/aggregate_fuzzer_stats_test.py | 11 ++++---- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py b/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py index 8d678fd535..2671022384 100644 --- a/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py +++ b/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py @@ -27,7 +27,8 @@ import httplib2 from clusterfuzz._internal.base import utils -from clusterfuzz._internal.datastore import data_types, ndb_utils +from clusterfuzz._internal.datastore import data_types +from clusterfuzz._internal.datastore import ndb_utils from clusterfuzz._internal.google_cloud_utils import big_query from clusterfuzz._internal.metrics import fuzzer_stats from clusterfuzz._internal.metrics import logs @@ -82,8 +83,8 @@ def _execute_insert_request(request): except HttpError as e: if e.resp.status == 409: # 409 Conflict: Returned when the resource already exists. This is - # expected in after the first execution because tables are created - # exactly once. + # expected after the first execution because tables are created exactly + # once. return logs.error('Failed to insert table/dataset.', exception=e) @@ -96,7 +97,7 @@ def _execute_insert_request(request): time.sleep(random.uniform(0, (1 << i) * RETRY_SLEEP_TIME)) -def _create_dataset_if_needed(bigquery, dataset_id): +def _create_dataset_if_needed(bigquery_client, dataset_id): """Create a new dataset if necessary.""" project_id = utils.get_application_id() dataset_body = { @@ -105,13 +106,13 @@ def _create_dataset_if_needed(bigquery, dataset_id): 'projectId': project_id, }, } - dataset_insert = bigquery.datasets().insert( + dataset_insert = bigquery_client.datasets().insert( projectId=project_id, body=dataset_body) _execute_insert_request(dataset_insert) -def _create_table_if_needed(bigquery, dataset_id, table_id, schema): +def _create_table_if_needed(bigquery_client, dataset_id, table_id, schema): """Create a new table if needed.""" project_id = utils.get_application_id() table_body = { @@ -127,18 +128,18 @@ def _create_table_if_needed(bigquery, dataset_id, table_id, schema): 'schema': schema } - table_insert = bigquery.tables().insert( + table_insert = bigquery_client.tables().insert( projectId=project_id, datasetId=dataset_id, body=table_body) _execute_insert_request(table_insert) -def _poll_completion(bigquery, project_id, job_id): +def _poll_completion(bigquery_client, project_id, job_id): """Poll for completion.""" - response = bigquery.jobs().get( + response = bigquery_client.jobs().get( projectId=project_id, jobId=job_id).execute(num_retries=2) while response['status']['state'] == 'RUNNING': time.sleep(5) - response = bigquery.jobs().get( + response = bigquery_client.jobs().get( projectId=project_id, jobId=job_id).execute(num_retries=2) return response @@ -308,9 +309,8 @@ def main(argv): _create_table_if_needed(bigquery_client, 'fuzzer_stats', 'daily_stats', DAILY_STATS_SCHEMA) - fuzzers = list(data_types.Fuzzer.query( - ndb_utils.is_false(data_types.Fuzzer.builtin))) - + fuzzers = list( + data_types.Fuzzer.query(ndb_utils.is_false(data_types.Fuzzer.builtin))) yesterday = utils.utcnow().date() - datetime.timedelta(days=1) date_partition_str = yesterday.strftime('%Y%m%d') diff --git a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py index f096016a88..dc0a2adc43 100644 --- a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Tests for aggregate_fuzzer_stats.""" +"""Tests for aggregate_fuzzer_stats cron job""" import datetime import json @@ -33,7 +33,6 @@ class AggregateFuzzerStatsTest(unittest.TestCase): """Test AggregateFuzzerStats.""" def setUp(self): - # Create a non-builtin fuzzer data_types.Fuzzer( name='ochang_js_fuzzer', jobs=['job'], builtin=False).put() data_types.Job(name='job').put() @@ -42,7 +41,7 @@ def setUp(self): 'clusterfuzz._internal.google_cloud_utils.big_query.get_api_client', 'clusterfuzz._internal.google_cloud_utils.big_query.Client', 'clusterfuzz._internal.base.utils.get_application_id', - 'googleapiclient.http.MediaIoBaseUpload', + 'clusterfuzz._internal.cron.aggregate_fuzzer_stats.MediaIoBaseUpload', ]) self.mock.get_application_id.return_value = 'test-clusterfuzz' @@ -132,7 +131,7 @@ def test_aggregate_fuzzer_stats(self): self.assertEqual(uploaded_dict['fuzzing_duration'], 'P0DT12H49M49S') def test_aggregate_fuzzer_stats_ignoring_409(self): - """Tests that execution successfully proceeds past HTTP 409 Conflict scenarios.""" + """Tests that execution successfully proceeds when the table already exists.""" resp = httplib2.Response({'status': 409}) self.mock_api_client.datasets().insert().execute.side_effect = HttpError( resp, b'Already exists') @@ -156,9 +155,9 @@ def test_aggregate_fuzzer_stats_ignoring_409(self): def test_aggregate_fuzzer_stats_raises_non_409(self): """Tests that unexpected HTTP errors cause job failures.""" - resp = httplib2.Response({'status': 500}) + response = httplib2.Response({'status': 500}) self.mock_api_client.datasets().insert().execute.side_effect = HttpError( - resp, b'Internal Server Error') + response, b'Internal Server Error') with self.assertRaises(HttpError): aggregate_fuzzer_stats.main(['--non-dry-run']) From 4004139433177dc8881049fc4469ab832b084358 Mon Sep 17 00:00:00 2001 From: Dylan Jew Date: Sat, 2 May 2026 12:26:36 -0400 Subject: [PATCH 10/11] Add date flag --- .../_internal/cron/aggregate_fuzzer_stats.py | 46 +++++++++------- .../cron/aggregate_fuzzer_stats_test.py | 53 +++++++++++++++---- 2 files changed, 71 insertions(+), 28 deletions(-) diff --git a/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py b/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py index 2671022384..fe48f6af2b 100644 --- a/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py +++ b/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py @@ -98,7 +98,6 @@ def _execute_insert_request(request): def _create_dataset_if_needed(bigquery_client, dataset_id): - """Create a new dataset if necessary.""" project_id = utils.get_application_id() dataset_body = { 'datasetReference': { @@ -113,7 +112,6 @@ def _create_dataset_if_needed(bigquery_client, dataset_id): def _create_table_if_needed(bigquery_client, dataset_id, table_id, schema): - """Create a new table if needed.""" project_id = utils.get_application_id() table_body = { 'tableReference': { @@ -134,7 +132,7 @@ def _create_table_if_needed(bigquery_client, dataset_id, table_id, schema): def _poll_completion(bigquery_client, project_id, job_id): - """Poll for completion.""" + """Poll bigquery for job completion.""" response = bigquery_client.jobs().get( projectId=project_id, jobId=job_id).execute(num_retries=2) while response['status']['state'] == 'RUNNING': @@ -145,12 +143,11 @@ def _poll_completion(bigquery_client, project_id, job_id): return response -def _query_fuzzer_stats(fuzzer_name, project_id): - """Queries single fuzzer stats for yesterday.""" +def _query_fuzzer_stats(fuzzer_name, project_id, target_date_str): + """Queries single fuzzer stats for the given target date.""" dataset_id = fuzzer_stats.dataset_name(fuzzer_name) table_id = 'JobRun' - # TODO: Pass timestamp from an optional flag with default value for yesterday query = f""" SELECT '{fuzzer_name}' as fuzzer_name, @@ -181,7 +178,7 @@ def _query_fuzzer_stats(fuzzer_name, project_id): FROM `{project_id}.{dataset_id}.{table_id}` WHERE - DATE(TIMESTAMP_SECONDS(CAST(timestamp AS INT64))) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY) + DATE(TIMESTAMP_SECONDS(CAST(timestamp AS INT64))) = '{target_date_str}' GROUP BY date """ @@ -191,31 +188,28 @@ def _query_fuzzer_stats(fuzzer_name, project_id): result = source_client.query(query) if not result.rows: - logs.info(f'No data for {fuzzer_name} for yesterday.') + logs.info(f'No data for {fuzzer_name} for {target_date_str}.') return [] return list(result.rows) except HttpError as e: if e.resp.status == 404: - logs.info( - f'JobRun table or dataset does not exist for {fuzzer_name}. Skipping.' - ) + logs.info(f'JobRun table does not exist for {fuzzer_name}. Skipping.') return [] - else: - raise # fallback to general exception + raise # fallback to general exception except Exception as e: logs.error(f'Failed to process {fuzzer_name}', exception=e) return [] -def _gather_all_stats(fuzzers, project_id): +def _gather_all_stats(fuzzers, project_id, target_date_str): """Gathers fuzzer statistics concurrently using a thread pool.""" all_rows = [] with ThreadPoolExecutor(max_workers=NUM_THREADS) as executor: future_to_fuzzer = { - executor.submit(_query_fuzzer_stats, fuzzer.name, project_id): fuzzer - for fuzzer in fuzzers + executor.submit(_query_fuzzer_stats, fuzzer.name, project_id, + target_date_str): fuzzer for fuzzer in fuzzers } for future in as_completed(future_to_fuzzer): @@ -292,10 +286,24 @@ def main(argv): parser = argparse.ArgumentParser(prog='aggregate_fuzzer_stats') parser.add_argument( '--non-dry-run', action='store_true', help='Whether to write to BigQuery') + parser.add_argument( + '--date', + help=('Date for fuzzer stats aggregation (YYYY-MM-DD). Defaults to today ' + 'UTC.'), + type=str) args = parser.parse_args(argv) logs.info('Starting fuzzer stats aggregation cron.') + if args.date: + try: + target_date = datetime.datetime.strptime(args.date, '%Y-%m-%d').date() + except ValueError: + parser.error(f'Invalid date format: {args.date}. Expected YYYY-MM-DD.') + else: + # Default to yesterday. + target_date = utils.utcnow().date() - datetime.timedelta(days=1) + if environment.is_local_development(): logs.error('BigQuery requires a cloud project to run. ' 'This cron job cannot run locally.') @@ -312,10 +320,10 @@ def main(argv): fuzzers = list( data_types.Fuzzer.query(ndb_utils.is_false(data_types.Fuzzer.builtin))) - yesterday = utils.utcnow().date() - datetime.timedelta(days=1) - date_partition_str = yesterday.strftime('%Y%m%d') + date_partition_str = target_date.strftime('%Y%m%d') + target_date_str = target_date.strftime('%Y-%m-%d') - all_rows = _gather_all_stats(fuzzers, project_id) + all_rows = _gather_all_stats(fuzzers, project_id, target_date_str) _persist_daily_stats( all_rows=all_rows, diff --git a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py index dc0a2adc43..9f65309510 100644 --- a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/aggregate_fuzzer_stats_test.py @@ -21,6 +21,7 @@ from googleapiclient.errors import HttpError import httplib2 +from clusterfuzz._internal.base import utils from clusterfuzz._internal.cron import aggregate_fuzzer_stats from clusterfuzz._internal.datastore import data_types from clusterfuzz._internal.google_cloud_utils.big_query import QueryResult @@ -41,10 +42,12 @@ def setUp(self): 'clusterfuzz._internal.google_cloud_utils.big_query.get_api_client', 'clusterfuzz._internal.google_cloud_utils.big_query.Client', 'clusterfuzz._internal.base.utils.get_application_id', + 'clusterfuzz._internal.base.utils.utcnow', 'clusterfuzz._internal.cron.aggregate_fuzzer_stats.MediaIoBaseUpload', ]) self.mock.get_application_id.return_value = 'test-clusterfuzz' + self.mock.utcnow.return_value = datetime.datetime(2026, 5, 1, 12, 0, 0) self.mock_api_client = mock.MagicMock() self.mock.get_api_client.return_value = self.mock_api_client @@ -60,7 +63,7 @@ def test_aggregate_fuzzer_stats(self): self.mock_client_instance.query.return_value = QueryResult( rows=[{ 'fuzzer_name': 'ochang_js_fuzzer', - 'date': '2026-04-29', + 'date': '2026-04-30', 'testcases_executed': 10495, 'testcase_execution_duration': 'P0DT11H12M11S', 'testcases_generated': 10495, @@ -71,7 +74,6 @@ def test_aggregate_fuzzer_stats(self): aggregate_fuzzer_stats.main(['--non-dry-run']) - # Verify dataset creation attempt self.mock_api_client.datasets().insert.assert_called_with( projectId='test-clusterfuzz', body={ @@ -96,7 +98,6 @@ def test_aggregate_fuzzer_stats(self): datasetId='fuzzer_stats', projectId='test-clusterfuzz') - # Verify load jobs insert self.mock_api_client.jobs().insert.assert_called_once() call_kwargs = self.mock_api_client.jobs().insert.call_args[1] self.assertEqual(call_kwargs['projectId'], 'test-clusterfuzz') @@ -106,14 +107,18 @@ def test_aggregate_fuzzer_stats(self): self.assertEqual(load_config['destinationTable']['datasetId'], 'fuzzer_stats') self.assertEqual(load_config['writeDisposition'], 'WRITE_TRUNCATE') - self.assertEqual(load_config['sourceFormat'], 'NEWLINE_DELIMITED_JSON') - yesterday = (datetime.datetime.utcnow().date() - datetime.timedelta(days=1)) + yesterday = utils.utcnow().date() - datetime.timedelta(days=1) expected_table_id = f"daily_stats${yesterday.strftime('%Y%m%d')}" self.assertEqual(load_config['destinationTable']['tableId'], expected_table_id) - # Verify JSON uploaded media content using the patched wrapper inputs + self.mock_client_instance.query.assert_called_once() + query_str = self.mock_client_instance.query.call_args[0][0] + self.assertIn( + "DATE(TIMESTAMP_SECONDS(CAST(timestamp AS INT64))) = '2026-04-30'", + query_str) + self.mock.MediaIoBaseUpload.assert_called_once() media_call_args = self.mock.MediaIoBaseUpload.call_args[0] bytes_io_arg = media_call_args[0] @@ -121,7 +126,7 @@ def test_aggregate_fuzzer_stats(self): uploaded_dict = json.loads(stream_content.strip()) self.assertEqual(uploaded_dict['fuzzer_name'], 'ochang_js_fuzzer') - self.assertEqual(uploaded_dict['date'], '2026-04-29') + self.assertEqual(uploaded_dict['date'], '2026-04-30') self.assertEqual(uploaded_dict['testcases_executed'], 10495) self.assertEqual(uploaded_dict['testcase_execution_duration'], 'P0DT11H12M11S') @@ -139,7 +144,7 @@ def test_aggregate_fuzzer_stats_ignoring_409(self): self.mock_client_instance.query.return_value = QueryResult( rows=[{ 'fuzzer_name': 'ochang_js_fuzzer', - 'date': '2026-04-29', + 'date': '2026-04-30', 'testcases_executed': 10495, 'testcase_execution_duration': 'P0DT11H12M11S', 'testcases_generated': 10495, @@ -150,7 +155,6 @@ def test_aggregate_fuzzer_stats_ignoring_409(self): aggregate_fuzzer_stats.main(['--non-dry-run']) - # Should attempt inserting the table nonetheless self.mock_api_client.tables().insert.assert_called_once() def test_aggregate_fuzzer_stats_raises_non_409(self): @@ -161,3 +165,34 @@ def test_aggregate_fuzzer_stats_raises_non_409(self): with self.assertRaises(HttpError): aggregate_fuzzer_stats.main(['--non-dry-run']) + + def test_aggregate_fuzzer_stats_with_date_flag(self): + """Tests execution of the cron job with a specific --date flag.""" + self.mock_client_instance.query.return_value = QueryResult( + rows=[{ + 'fuzzer_name': 'ochang_js_fuzzer', + 'date': '2026-04-30', + 'testcases_executed': 100, + 'testcase_execution_duration': 'P0DT1H0M0S', + 'testcases_generated': 100, + 'testcase_generation_duration': 'P0DT0H10M0S', + 'fuzzing_duration': 'P0DT1H10M0S' + }], + total_count=1) + + aggregate_fuzzer_stats.main(['--non-dry-run', '--date', '2026-04-30']) + + call_kwargs = self.mock_api_client.jobs().insert.call_args[1] + load_config = call_kwargs['body']['configuration']['load'] + self.assertEqual(load_config['destinationTable']['tableId'], + 'daily_stats$20260430') + + query_str = self.mock_client_instance.query.call_args[0][0] + self.assertIn( + "DATE(TIMESTAMP_SECONDS(CAST(timestamp AS INT64))) = '2026-04-30'", + query_str) + + def test_aggregate_fuzzer_stats_with_invalid_date_flag(self): + """Tests that an invalid --date format causes argument parsing error.""" + with self.assertRaises(SystemExit): + aggregate_fuzzer_stats.main(['--non-dry-run', '--date', 'invalid-date']) From fefb232f961b557ec31d567825dc44b62f3fb64f Mon Sep 17 00:00:00 2001 From: Dylan Jew Date: Mon, 4 May 2026 10:34:51 -0400 Subject: [PATCH 11/11] remove bigquery script --- .../_internal/cron/aggregate_fuzzer_stats.py | 2 + src/local/butler/run.py | 11 --- .../scripts/update_bigquery_stats_schema.py | 86 ------------------- 3 files changed, 2 insertions(+), 97 deletions(-) delete mode 100644 src/local/butler/scripts/update_bigquery_stats_schema.py diff --git a/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py b/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py index fe48f6af2b..e390aa8c19 100644 --- a/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py +++ b/src/clusterfuzz/_internal/cron/aggregate_fuzzer_stats.py @@ -98,6 +98,7 @@ def _execute_insert_request(request): def _create_dataset_if_needed(bigquery_client, dataset_id): + """Writes a dataset for the `dataset_id`. No-op if it already exists""" project_id = utils.get_application_id() dataset_body = { 'datasetReference': { @@ -112,6 +113,7 @@ def _create_dataset_if_needed(bigquery_client, dataset_id): def _create_table_if_needed(bigquery_client, dataset_id, table_id, schema): + """Writes a table for the `table_id`. No-op if it already exists""" project_id = utils.get_application_id() table_body = { 'tableReference': { diff --git a/src/local/butler/run.py b/src/local/butler/run.py index 1c2fea85e7..c77d92b4e2 100644 --- a/src/local/butler/run.py +++ b/src/local/butler/run.py @@ -19,7 +19,6 @@ import sys from local.butler import constants -from src.clusterfuzz._internal.base import utils from src.clusterfuzz._internal.config import local_config from src.clusterfuzz._internal.datastore import ndb_init @@ -35,16 +34,6 @@ def execute(args): os.environ['CONFIG_DIR_OVERRIDE'] = os.path.abspath(args.config_dir) local_config.ProjectConfig().set_environment() - app_id = utils.get_application_id() - # Required to use the correct project ID when interacting with the BigQuery client. - if app_id: - # os.environ['GOOGLE_CLOUD_PROJECT'] = app_id - if 'GOOGLE_CLOUD_PROJECT' not in os.environ: - print('empty google cloud project') - else: - print(f'google cloud project {os.environ["GOOGLE_CLOUD_PROJECT"]}') - print(f'app id {app_id}') - if args.local: os.environ['DATASTORE_EMULATOR_HOST'] = constants.DATASTORE_EMULATOR_HOST os.environ['PUBSUB_EMULATOR_HOST'] = constants.PUBSUB_EMULATOR_HOST diff --git a/src/local/butler/scripts/update_bigquery_stats_schema.py b/src/local/butler/scripts/update_bigquery_stats_schema.py deleted file mode 100644 index 7b66e21f7e..0000000000 --- a/src/local/butler/scripts/update_bigquery_stats_schema.py +++ /dev/null @@ -1,86 +0,0 @@ -# Copyright 2026 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Script to migrate BigQuery JobRun table schemas for old fuzzers.""" - -from googleapiclient.errors import HttpError - -from clusterfuzz._internal.base import utils -from clusterfuzz._internal.datastore import data_types -from clusterfuzz._internal.google_cloud_utils import big_query -from clusterfuzz._internal.metrics import fuzzer_stats - - -def execute(args): - """Migrate historical BigQuery JobRun tables to current JOB_RUN_SCHEMA. - - Adds missing duration fields to fuzzer statistics datasets in BigQuery. - """ - print('Starting BigQuery statistics tables schema migration.') - - bigquery_client = big_query.get_api_client() - project_id = utils.get_application_id() - - fuzzers = list(data_types.Fuzzer.query(data_types.Fuzzer.builtin == False)) - count = 0 - - for fuzzer in fuzzers: - dataset_id = fuzzer_stats.dataset_name(fuzzer.name) - table_id = 'JobRun' - - try: - table = bigquery_client.tables().get( - projectId=project_id, datasetId=dataset_id, - tableId=table_id).execute() - except HttpError as e: - if e.resp.status == 404: - # Table or dataset doesn't exist. No schema to update. - continue - print(f'Failed getting table details for {fuzzer.name}: {e}') - continue - except Exception as e: - print(f'Failed getting table details for {fuzzer.name}: {e}') - continue - - fields = table.get('schema', {}).get('fields', []) - existing_names = {f['name'] for f in fields} - - expected_fields = fuzzer_stats.JobRun.SCHEMA['fields'] - missing_fields = [ - f for f in expected_fields if f['name'] not in existing_names - ] - - if not missing_fields: - continue - - updated_fields = list(fields) + missing_fields - body = {'schema': {'fields': updated_fields}} - - if not args.non_dry_run: - missing_names = [f['name'] for f in missing_fields] - print(f'DRY RUN: Would append {missing_names} to {fuzzer.name}.') - else: - try: - bigquery_client.tables().patch( - projectId=project_id, - datasetId=dataset_id, - tableId=table_id, - body=body).execute() - print(f'Successfully updated schema for fuzzer: {fuzzer.name}') - count += 1 - except Exception as e: - print(f'Error updating schema for {fuzzer.name}: {e}') - - print( - f'BigQuery schema migration complete. Updated {count} fuzzer stats schemas.' - )