From 11f84d3f010211234ea1bb4929f8e0204e06aef6 Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Thu, 16 Apr 2026 18:11:00 +0000 Subject: [PATCH 1/8] Improve logging for swarming bots --- .../_internal/bot/tasks/utasks/__init__.py | 2 +- src/clusterfuzz/_internal/metrics/logs.py | 39 ++++++++- .../_internal/system/environment.py | 5 +- .../core/bot/tasks/utasks/utasks_test.py | 7 +- .../_internal/tests/core/metrics/logs_test.py | 79 +++++++++++++++++++ 5 files changed, 123 insertions(+), 9 deletions(-) diff --git a/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py b/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py index 107ea5d907a..ef40e35b195 100644 --- a/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py +++ b/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py @@ -454,7 +454,7 @@ def uworker_main(input_download_url) -> None: _start_web_server_if_needed(uworker_input.job_type) utask_module = get_utask_module(uworker_input.module_name) - execution_mode = Mode.SWARMING if environment.is_swarming_bot( + execution_mode = Mode.SWARMING if environment._is_running_on_swarming( # pylint: disable=protected-access ) else Mode.BATCH recorder.set_task_details( utask_module, uworker_input.job_type, execution_mode, diff --git a/src/clusterfuzz/_internal/metrics/logs.py b/src/clusterfuzz/_internal/metrics/logs.py index 8fa6c026795..b0b4c0e34fa 100644 --- a/src/clusterfuzz/_internal/metrics/logs.py +++ b/src/clusterfuzz/_internal/metrics/logs.py @@ -99,9 +99,10 @@ def _file_logging_enabled(): This is disabled if we are running in app engine or kubernetes as these have their dedicated loggers, see configure_appengine() and configure_k8s(). """ + from clusterfuzz._internal.system import environment return bool(os.getenv( - 'LOG_TO_FILE', - 'True')) and not _is_running_on_app_engine() and not _is_running_on_k8s() + 'LOG_TO_FILE', 'True')) and not _is_running_on_app_engine() and ( + not _is_running_on_k8s() or environment._is_running_on_swarming()) # pylint: disable=protected-access def _cloud_logging_enabled(): @@ -110,9 +111,11 @@ def _cloud_logging_enabled(): This is disabled for local development and if we are running in a app engine or kubernetes as these have their dedicated loggers, see configure_appengine() and configure_k8s().""" + from clusterfuzz._internal.system import environment return (bool(os.getenv('LOG_TO_GCP', 'True')) and not os.getenv("PY_UNITTESTS") and not _is_local() and - not _is_running_on_app_engine() and not _is_running_on_k8s()) + not _is_running_on_app_engine() and + (not _is_running_on_k8s() or environment._is_running_on_swarming())) # pylint: disable=protected-access def suppress_unwanted_warnings(): @@ -554,12 +557,42 @@ def cloud_label_filter(record): logging.getLogger().addHandler(handler) +def configure_swarming(name, extras=None): + """Configure logging for swarming bots.""" + if extras is None: + extras = {} + extras['task_id'] = os.getenv('TASK_ID') + extras['instance_id'] = os.getenv('BOT_NAME') + extras['platform'] = 'swarming' + + global _default_extras + _default_extras = extras + + if _console_logging_enabled(): + logging.basicConfig(level=logging.INFO) + if _file_logging_enabled(): + config.dictConfig(get_logging_config_dict(name)) + if _cloud_logging_enabled(): + configure_cloud_logging() + + logger = logging.getLogger(name) + logger.setLevel(logging.INFO) + set_logger(logger) + + sys.excepthook = uncaught_exception_handler + + def configure(name, extras=None): """Set logger. See the list of loggers in bot/config/logging.yaml. Also configures the process to log any uncaught exceptions as an error. |extras| will be included by emit() in log messages.""" suppress_unwanted_warnings() + from clusterfuzz._internal.system import environment + if environment._is_running_on_swarming(): # pylint: disable=protected-access + configure_swarming(name, extras) + return + if _is_running_on_k8s(): configure_k8s() return diff --git a/src/clusterfuzz/_internal/system/environment.py b/src/clusterfuzz/_internal/system/environment.py index 2e6dd86f5e6..548631eeebd 100644 --- a/src/clusterfuzz/_internal/system/environment.py +++ b/src/clusterfuzz/_internal/system/environment.py @@ -749,9 +749,10 @@ def get_runtime() -> UtaskMainRuntime: return UtaskMainRuntime.INSTANCE_GROUP -def is_swarming_bot(): +def _is_running_on_swarming(): """Return whether or not the current bot is a swarming bot.""" - return get_value('SWARMING_BOT') + value = get_value('SWARMING_BOT') + return value is True or value == 'true' def is_running_on_app_engine(): diff --git a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py index 0c16e120d6a..333b957b643 100644 --- a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py +++ b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py @@ -199,7 +199,7 @@ def setUp(self): 'clusterfuzz._internal.bot.tasks.utasks.uworker_io.download_and_deserialize_uworker_input', 'clusterfuzz._internal.bot.tasks.utasks.uworker_io.serialize_and_upload_uworker_output', 'clusterfuzz._internal.bot.tasks.utasks.get_utask_module', - 'clusterfuzz._internal.system.environment.is_swarming_bot', + 'clusterfuzz._internal.system.environment._is_running_on_swarming', 'clusterfuzz._internal.metrics.events.emit', ]) self.module = mock.MagicMock(__name__='tasks.analyze_task') @@ -208,12 +208,13 @@ def setUp(self): @parameterized.parameterized.expand([utasks.Mode.BATCH, utasks.Mode.SWARMING]) def test_uworker_main(self, execution_mode: utasks.Mode): """Tests that uworker_main works as intended.""" + # pylint: disable=protected-access start_time_ns = time.time_ns() if execution_mode == utasks.Mode.SWARMING: - self.mock.is_swarming_bot.return_value = True # pylint: disable=protected-access + self.mock._is_running_on_swarming.return_value = True # pylint: disable=protected-access else: - self.mock.is_swarming_bot.return_value = False + self.mock._is_running_on_swarming.return_value = False preprocess_start_time_ns = start_time_ns - 42 * 10**9 # In the past. preprocess_start_timestamp = timestamp_pb2.Timestamp() diff --git a/src/clusterfuzz/_internal/tests/core/metrics/logs_test.py b/src/clusterfuzz/_internal/tests/core/metrics/logs_test.py index e9dc16b6176..2aff6020c5e 100644 --- a/src/clusterfuzz/_internal/tests/core/metrics/logs_test.py +++ b/src/clusterfuzz/_internal/tests/core/metrics/logs_test.py @@ -521,6 +521,85 @@ def test_configure_appengine(self): logs.configure('test') self.assertEqual(0, self.mock.dictConfig.call_count) + def test_configure_swarming(self): + """Test configure for swarming bot.""" + # pylint: disable=protected-access + os.environ['SWARMING_BOT'] = 'true' + os.environ['TASK_ID'] = 'task-123' + os.environ['BOT_NAME'] = 'bot-123' + + helpers.patch( + self, + ['clusterfuzz._internal.system.environment._is_running_on_swarming']) + self.mock._is_running_on_swarming.return_value = True + + logger = mock.MagicMock() + self.mock.getLogger.return_value = logger + + logs.configure('test') + + self.assertEqual(logs._default_extras['task_id'], 'task-123') + self.assertEqual(logs._default_extras['instance_id'], 'bot-123') + self.assertEqual(logs._default_extras['platform'], 'swarming') + + +class LoggingEnabledTest(unittest.TestCase): + """Test _cloud_logging_enabled and _file_logging_enabled.""" + + # pylint: disable=protected-access + + def setUp(self): + helpers.patch_environ(self) + helpers.patch(self, [ + 'clusterfuzz._internal.metrics.logs._is_running_on_app_engine', + 'clusterfuzz._internal.metrics.logs._is_running_on_k8s', + 'clusterfuzz._internal.metrics.logs._is_local', + 'clusterfuzz._internal.system.environment._is_running_on_swarming', + ]) + + def test_cloud_logging_enabled_swarming_k8s(self): + """Test cloud logging enabled for swarming bot on K8S.""" + self.mock._is_running_on_app_engine.return_value = False + self.mock._is_running_on_k8s.return_value = True + self.mock._is_local.return_value = False + self.mock._is_running_on_swarming.return_value = True + + os.environ['LOG_TO_GCP'] = 'True' + os.environ['PY_UNITTESTS'] = '' + + self.assertTrue(logs._cloud_logging_enabled()) + + def test_cloud_logging_disabled_non_swarming_k8s(self): + """Test cloud logging disabled for non-swarming bot on K8S.""" + self.mock._is_running_on_app_engine.return_value = False + self.mock._is_running_on_k8s.return_value = True + self.mock._is_local.return_value = False + self.mock._is_running_on_swarming.return_value = False + + os.environ['LOG_TO_GCP'] = 'True' + + self.assertFalse(logs._cloud_logging_enabled()) + + def test_file_logging_enabled_swarming_k8s(self): + """Test file logging enabled for swarming bot on K8S.""" + self.mock._is_running_on_app_engine.return_value = False + self.mock._is_running_on_k8s.return_value = True + self.mock._is_running_on_swarming.return_value = True + + os.environ['LOG_TO_FILE'] = 'True' + + self.assertTrue(logs._file_logging_enabled()) + + def test_file_logging_disabled_non_swarming_k8s(self): + """Test file logging disabled for non-swarming bot on K8S.""" + self.mock._is_running_on_app_engine.return_value = False + self.mock._is_running_on_k8s.return_value = True + self.mock._is_running_on_swarming.return_value = False + + os.environ['LOG_TO_FILE'] = 'True' + + self.assertFalse(logs._file_logging_enabled()) + @test_utils.with_cloud_emulators('datastore') class EmitTest(unittest.TestCase): From d811e0132ed1c963e5e64e2536d5e1d55e1b60ba Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Fri, 17 Apr 2026 00:16:52 +0000 Subject: [PATCH 2/8] Make is_running_on_swarming public and fix linter issues --- .../_internal/bot/tasks/utasks/__init__.py | 2 +- src/clusterfuzz/_internal/metrics/logs.py | 6 +++--- src/clusterfuzz/_internal/system/environment.py | 2 +- .../tests/core/bot/tasks/utasks/utasks_test.py | 6 +++--- .../_internal/tests/core/metrics/logs_test.py | 14 +++++++------- 5 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py b/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py index ef40e35b195..e7b5f7938e2 100644 --- a/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py +++ b/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py @@ -454,7 +454,7 @@ def uworker_main(input_download_url) -> None: _start_web_server_if_needed(uworker_input.job_type) utask_module = get_utask_module(uworker_input.module_name) - execution_mode = Mode.SWARMING if environment._is_running_on_swarming( # pylint: disable=protected-access + execution_mode = Mode.SWARMING if environment.is_running_on_swarming( ) else Mode.BATCH recorder.set_task_details( utask_module, uworker_input.job_type, execution_mode, diff --git a/src/clusterfuzz/_internal/metrics/logs.py b/src/clusterfuzz/_internal/metrics/logs.py index b0b4c0e34fa..03667560b2f 100644 --- a/src/clusterfuzz/_internal/metrics/logs.py +++ b/src/clusterfuzz/_internal/metrics/logs.py @@ -102,7 +102,7 @@ def _file_logging_enabled(): from clusterfuzz._internal.system import environment return bool(os.getenv( 'LOG_TO_FILE', 'True')) and not _is_running_on_app_engine() and ( - not _is_running_on_k8s() or environment._is_running_on_swarming()) # pylint: disable=protected-access + not _is_running_on_k8s() or environment.is_running_on_swarming()) def _cloud_logging_enabled(): @@ -115,7 +115,7 @@ def _cloud_logging_enabled(): return (bool(os.getenv('LOG_TO_GCP', 'True')) and not os.getenv("PY_UNITTESTS") and not _is_local() and not _is_running_on_app_engine() and - (not _is_running_on_k8s() or environment._is_running_on_swarming())) # pylint: disable=protected-access + (not _is_running_on_k8s() or environment.is_running_on_swarming())) def suppress_unwanted_warnings(): @@ -589,7 +589,7 @@ def configure(name, extras=None): suppress_unwanted_warnings() from clusterfuzz._internal.system import environment - if environment._is_running_on_swarming(): # pylint: disable=protected-access + if environment.is_running_on_swarming(): configure_swarming(name, extras) return diff --git a/src/clusterfuzz/_internal/system/environment.py b/src/clusterfuzz/_internal/system/environment.py index 548631eeebd..d878df8b6cb 100644 --- a/src/clusterfuzz/_internal/system/environment.py +++ b/src/clusterfuzz/_internal/system/environment.py @@ -749,7 +749,7 @@ def get_runtime() -> UtaskMainRuntime: return UtaskMainRuntime.INSTANCE_GROUP -def _is_running_on_swarming(): +def is_running_on_swarming(): """Return whether or not the current bot is a swarming bot.""" value = get_value('SWARMING_BOT') return value is True or value == 'true' diff --git a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py index 333b957b643..fd2df623f1c 100644 --- a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py +++ b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py @@ -199,7 +199,7 @@ def setUp(self): 'clusterfuzz._internal.bot.tasks.utasks.uworker_io.download_and_deserialize_uworker_input', 'clusterfuzz._internal.bot.tasks.utasks.uworker_io.serialize_and_upload_uworker_output', 'clusterfuzz._internal.bot.tasks.utasks.get_utask_module', - 'clusterfuzz._internal.system.environment._is_running_on_swarming', + 'clusterfuzz._internal.system.environment.is_running_on_swarming', 'clusterfuzz._internal.metrics.events.emit', ]) self.module = mock.MagicMock(__name__='tasks.analyze_task') @@ -212,9 +212,9 @@ def test_uworker_main(self, execution_mode: utasks.Mode): start_time_ns = time.time_ns() if execution_mode == utasks.Mode.SWARMING: - self.mock._is_running_on_swarming.return_value = True # pylint: disable=protected-access + self.mock.is_running_on_swarming.return_value = True else: - self.mock._is_running_on_swarming.return_value = False + self.mock.is_running_on_swarming.return_value = False preprocess_start_time_ns = start_time_ns - 42 * 10**9 # In the past. preprocess_start_timestamp = timestamp_pb2.Timestamp() diff --git a/src/clusterfuzz/_internal/tests/core/metrics/logs_test.py b/src/clusterfuzz/_internal/tests/core/metrics/logs_test.py index 2aff6020c5e..19d445ecca7 100644 --- a/src/clusterfuzz/_internal/tests/core/metrics/logs_test.py +++ b/src/clusterfuzz/_internal/tests/core/metrics/logs_test.py @@ -530,8 +530,8 @@ def test_configure_swarming(self): helpers.patch( self, - ['clusterfuzz._internal.system.environment._is_running_on_swarming']) - self.mock._is_running_on_swarming.return_value = True + ['clusterfuzz._internal.system.environment.is_running_on_swarming']) + self.mock.is_running_on_swarming.return_value = True logger = mock.MagicMock() self.mock.getLogger.return_value = logger @@ -554,7 +554,7 @@ def setUp(self): 'clusterfuzz._internal.metrics.logs._is_running_on_app_engine', 'clusterfuzz._internal.metrics.logs._is_running_on_k8s', 'clusterfuzz._internal.metrics.logs._is_local', - 'clusterfuzz._internal.system.environment._is_running_on_swarming', + 'clusterfuzz._internal.system.environment.is_running_on_swarming', ]) def test_cloud_logging_enabled_swarming_k8s(self): @@ -562,7 +562,7 @@ def test_cloud_logging_enabled_swarming_k8s(self): self.mock._is_running_on_app_engine.return_value = False self.mock._is_running_on_k8s.return_value = True self.mock._is_local.return_value = False - self.mock._is_running_on_swarming.return_value = True + self.mock.is_running_on_swarming.return_value = True os.environ['LOG_TO_GCP'] = 'True' os.environ['PY_UNITTESTS'] = '' @@ -574,7 +574,7 @@ def test_cloud_logging_disabled_non_swarming_k8s(self): self.mock._is_running_on_app_engine.return_value = False self.mock._is_running_on_k8s.return_value = True self.mock._is_local.return_value = False - self.mock._is_running_on_swarming.return_value = False + self.mock.is_running_on_swarming.return_value = False os.environ['LOG_TO_GCP'] = 'True' @@ -584,7 +584,7 @@ def test_file_logging_enabled_swarming_k8s(self): """Test file logging enabled for swarming bot on K8S.""" self.mock._is_running_on_app_engine.return_value = False self.mock._is_running_on_k8s.return_value = True - self.mock._is_running_on_swarming.return_value = True + self.mock.is_running_on_swarming.return_value = True os.environ['LOG_TO_FILE'] = 'True' @@ -594,7 +594,7 @@ def test_file_logging_disabled_non_swarming_k8s(self): """Test file logging disabled for non-swarming bot on K8S.""" self.mock._is_running_on_app_engine.return_value = False self.mock._is_running_on_k8s.return_value = True - self.mock._is_running_on_swarming.return_value = False + self.mock.is_running_on_swarming.return_value = False os.environ['LOG_TO_FILE'] = 'True' From aeceb807e6a03e301c3c01f61bf7adfaf20ddab9 Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Fri, 17 Apr 2026 18:51:25 +0000 Subject: [PATCH 3/8] Removes circular dependency in logs.py & Adresses comments --- src/clusterfuzz/_internal/metrics/logs.py | 38 ++++-------- .../_internal/system/environment.py | 48 +-------------- .../core/bot/tasks/utasks/utasks_test.py | 7 +-- .../_internal/tests/core/metrics/logs_test.py | 61 +------------------ src/python/bot/startup/run_bot.py | 45 +++++++++++++- 5 files changed, 61 insertions(+), 138 deletions(-) diff --git a/src/clusterfuzz/_internal/metrics/logs.py b/src/clusterfuzz/_internal/metrics/logs.py index 03667560b2f..a6424565a68 100644 --- a/src/clusterfuzz/_internal/metrics/logs.py +++ b/src/clusterfuzz/_internal/metrics/logs.py @@ -32,6 +32,8 @@ from typing import NamedTuple from typing import TYPE_CHECKING +from clusterfuzz._internal.system import environment + # This is needed to avoid circular import if TYPE_CHECKING: from clusterfuzz._internal.cron.grouper import TestcaseAttributes @@ -93,29 +95,18 @@ def _console_logging_enabled(): # TODO(pmeuleman) Revert the changeset that added these once # https://github.com/google/clusterfuzz/pull/3422 lands. -def _file_logging_enabled(): +def _file_logging_enabled() -> bool: """Return bool True when logging to files (bot/logs/*.log) is enabled. - This is enabled by default. - This is disabled if we are running in app engine or kubernetes as these have - their dedicated loggers, see configure_appengine() and configure_k8s(). - """ - from clusterfuzz._internal.system import environment - return bool(os.getenv( - 'LOG_TO_FILE', 'True')) and not _is_running_on_app_engine() and ( - not _is_running_on_k8s() or environment.is_running_on_swarming()) + This is enabled by default.""" + return environment.get_value('LOG_TO_FILE', True) -def _cloud_logging_enabled(): +def _cloud_logging_enabled() -> bool: """Return bool True where Google Cloud Logging is enabled. - This is enabled by default. - This is disabled for local development and if we are running in a app engine - or kubernetes as these have their dedicated loggers, see - configure_appengine() and configure_k8s().""" - from clusterfuzz._internal.system import environment - return (bool(os.getenv('LOG_TO_GCP', 'True')) and - not os.getenv("PY_UNITTESTS") and not _is_local() and - not _is_running_on_app_engine() and - (not _is_running_on_k8s() or environment.is_running_on_swarming())) + This is enabled by default but disabled for local development.""" + return environment.get_value('LOG_TO_GCP', + True) and (not os.getenv('PY_UNITTESTS') and + not _is_local()) def suppress_unwanted_warnings(): @@ -557,7 +548,7 @@ def cloud_label_filter(record): logging.getLogger().addHandler(handler) -def configure_swarming(name, extras=None): +def configure_swarming(name: str, extras: dict[str, str] = None) -> None: """Configure logging for swarming bots.""" if extras is None: extras = {} @@ -568,10 +559,7 @@ def configure_swarming(name, extras=None): global _default_extras _default_extras = extras - if _console_logging_enabled(): - logging.basicConfig(level=logging.INFO) - if _file_logging_enabled(): - config.dictConfig(get_logging_config_dict(name)) + logging.basicConfig(level=logging.INFO) if _cloud_logging_enabled(): configure_cloud_logging() @@ -588,7 +576,6 @@ def configure(name, extras=None): |extras| will be included by emit() in log messages.""" suppress_unwanted_warnings() - from clusterfuzz._internal.system import environment if environment.is_running_on_swarming(): configure_swarming(name, extras) return @@ -825,7 +812,6 @@ def get_common_log_context() -> dict[str, str]: """Return common context to be propagated by logs.""" # Avoid circular imports on the top level. from clusterfuzz._internal.base import utils - from clusterfuzz._internal.system import environment try: os_type = environment.platform() diff --git a/src/clusterfuzz/_internal/system/environment.py b/src/clusterfuzz/_internal/system/environment.py index d878df8b6cb..90fe163f16f 100644 --- a/src/clusterfuzz/_internal/system/environment.py +++ b/src/clusterfuzz/_internal/system/environment.py @@ -23,11 +23,9 @@ import sys import uuid -import requests import yaml from clusterfuzz._internal import fuzzing -from clusterfuzz._internal.metrics import logs # Tools supporting customization of options via ADDITIONAL_{TOOL_NAME}_OPTIONS. # FIXME: Support ADDITIONAL_UBSAN_OPTIONS and ADDITIONAL_LSAN_OPTIONS in an @@ -749,10 +747,9 @@ def get_runtime() -> UtaskMainRuntime: return UtaskMainRuntime.INSTANCE_GROUP -def is_running_on_swarming(): +def is_running_on_swarming() -> bool: """Return whether or not the current bot is a swarming bot.""" - value = get_value('SWARMING_BOT') - return value is True or value == 'true' + return get_value('SWARMING_BOT') is True def is_running_on_app_engine(): @@ -1239,44 +1236,3 @@ def can_testcase_run_on_platform(testcase_platform_id, current_platform_id): def is_tworker(): return get_value('TWORKER', False) - - -def update_task_enabled() -> bool: - """ It uses the GCE VM metadata server `update_task_enabled` flag. - - This flag will be used to rollout the update_task deprecation - by disabling it progressively for each instance group through - the instance template metadata - """ - metadata_url = ("http://metadata.google.internal/computeMetadata/v1/" + - "instance/attributes/") - metadata_header = {"Metadata-Flavor": "Google"} - metadata_key = "update_task_enabled" - - running_on_batch = bool(is_uworker()) - - try: - # Construct the full URL for your specific metadata key - response = requests.get( - f"{metadata_url}{metadata_key}", headers=metadata_header, timeout=10) - - # Raise an exception for bad status codes (4xx or 5xx) - response.raise_for_status() - - # The metadata value is in the response text - metadata_value = response.text - logs.info(f"The value for '{metadata_key}' is: {metadata_value}") - is_update_task_enabled = metadata_value.lower() != 'false' - - # The flag is_uworker is true for Batch environment - # The update task should run if it's not a Batch environment - # and the flag is enabled on the VM template metadata - return not running_on_batch and is_update_task_enabled - - except requests.exceptions.HTTPError as http_error: - logs.warning(f"Http error fetching metadata: {http_error}") - - except Exception as ex: - logs.error(f"Unknown exception fetching metadata: {ex}") - - return not running_on_batch diff --git a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py index fd2df623f1c..38db68e09ee 100644 --- a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py +++ b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py @@ -208,13 +208,10 @@ def setUp(self): @parameterized.parameterized.expand([utasks.Mode.BATCH, utasks.Mode.SWARMING]) def test_uworker_main(self, execution_mode: utasks.Mode): """Tests that uworker_main works as intended.""" - # pylint: disable=protected-access start_time_ns = time.time_ns() - if execution_mode == utasks.Mode.SWARMING: - self.mock.is_running_on_swarming.return_value = True - else: - self.mock.is_running_on_swarming.return_value = False + self.mock.is_running_on_swarming.return_value = ( + execution_mode == utasks.Mode.SWARMING) preprocess_start_time_ns = start_time_ns - 42 * 10**9 # In the past. preprocess_start_timestamp = timestamp_pb2.Timestamp() diff --git a/src/clusterfuzz/_internal/tests/core/metrics/logs_test.py b/src/clusterfuzz/_internal/tests/core/metrics/logs_test.py index 19d445ecca7..68ad9153a10 100644 --- a/src/clusterfuzz/_internal/tests/core/metrics/logs_test.py +++ b/src/clusterfuzz/_internal/tests/core/metrics/logs_test.py @@ -524,14 +524,13 @@ def test_configure_appengine(self): def test_configure_swarming(self): """Test configure for swarming bot.""" # pylint: disable=protected-access - os.environ['SWARMING_BOT'] = 'true' + os.environ['SWARMING_BOT'] = 'True' os.environ['TASK_ID'] = 'task-123' os.environ['BOT_NAME'] = 'bot-123' helpers.patch( self, ['clusterfuzz._internal.system.environment.is_running_on_swarming']) - self.mock.is_running_on_swarming.return_value = True logger = mock.MagicMock() self.mock.getLogger.return_value = logger @@ -543,64 +542,6 @@ def test_configure_swarming(self): self.assertEqual(logs._default_extras['platform'], 'swarming') -class LoggingEnabledTest(unittest.TestCase): - """Test _cloud_logging_enabled and _file_logging_enabled.""" - - # pylint: disable=protected-access - - def setUp(self): - helpers.patch_environ(self) - helpers.patch(self, [ - 'clusterfuzz._internal.metrics.logs._is_running_on_app_engine', - 'clusterfuzz._internal.metrics.logs._is_running_on_k8s', - 'clusterfuzz._internal.metrics.logs._is_local', - 'clusterfuzz._internal.system.environment.is_running_on_swarming', - ]) - - def test_cloud_logging_enabled_swarming_k8s(self): - """Test cloud logging enabled for swarming bot on K8S.""" - self.mock._is_running_on_app_engine.return_value = False - self.mock._is_running_on_k8s.return_value = True - self.mock._is_local.return_value = False - self.mock.is_running_on_swarming.return_value = True - - os.environ['LOG_TO_GCP'] = 'True' - os.environ['PY_UNITTESTS'] = '' - - self.assertTrue(logs._cloud_logging_enabled()) - - def test_cloud_logging_disabled_non_swarming_k8s(self): - """Test cloud logging disabled for non-swarming bot on K8S.""" - self.mock._is_running_on_app_engine.return_value = False - self.mock._is_running_on_k8s.return_value = True - self.mock._is_local.return_value = False - self.mock.is_running_on_swarming.return_value = False - - os.environ['LOG_TO_GCP'] = 'True' - - self.assertFalse(logs._cloud_logging_enabled()) - - def test_file_logging_enabled_swarming_k8s(self): - """Test file logging enabled for swarming bot on K8S.""" - self.mock._is_running_on_app_engine.return_value = False - self.mock._is_running_on_k8s.return_value = True - self.mock.is_running_on_swarming.return_value = True - - os.environ['LOG_TO_FILE'] = 'True' - - self.assertTrue(logs._file_logging_enabled()) - - def test_file_logging_disabled_non_swarming_k8s(self): - """Test file logging disabled for non-swarming bot on K8S.""" - self.mock._is_running_on_app_engine.return_value = False - self.mock._is_running_on_k8s.return_value = True - self.mock.is_running_on_swarming.return_value = False - - os.environ['LOG_TO_FILE'] = 'True' - - self.assertFalse(logs._file_logging_enabled()) - - @test_utils.with_cloud_emulators('datastore') class EmitTest(unittest.TestCase): """Test emit.""" diff --git a/src/python/bot/startup/run_bot.py b/src/python/bot/startup/run_bot.py index de7fcaf849e..6825a28e149 100644 --- a/src/python/bot/startup/run_bot.py +++ b/src/python/bot/startup/run_bot.py @@ -27,6 +27,8 @@ import time import traceback +import requests + from clusterfuzz._internal.base import dates from clusterfuzz._internal.base import errors from clusterfuzz._internal.base import tasks @@ -125,7 +127,7 @@ def task_loop(): # This caches the current environment on first run. Don't move this. environment.reset_environment() try: - if environment.update_task_enabled(): + if update_task_enabled(): logs.info("Running update task.") # Run regular updates. # TODO(metzman): Move this after utask_main execution @@ -193,6 +195,47 @@ def task_loop(): return stacktrace, clean_exit, task_payload +def update_task_enabled() -> bool: + """ It uses the GCE VM metadata server `update_task_enabled` flag. + + This flag will be used to rollout the update_task deprecation + by disabling it progressively for each instance group through + the instance template metadata + """ + metadata_url = ("http://metadata.google.internal/computeMetadata/v1/" + + "instance/attributes/") + metadata_header = {"Metadata-Flavor": "Google"} + metadata_key = "update_task_enabled" + + running_on_batch = bool(environment.is_uworker()) + + try: + # Construct the full URL for your specific metadata key + response = requests.get( + f"{metadata_url}{metadata_key}", headers=metadata_header, timeout=10) + + # Raise an exception for bad status codes (4xx or 5xx) + response.raise_for_status() + + # The metadata value is in the response text + metadata_value = response.text + logs.info(f"The value for '{metadata_key}' is: {metadata_value}") + is_update_task_enabled = metadata_value.lower() != 'false' + + # The flag is_uworker is true for Batch environment + # The update task should run if it's not a Batch environment + # and the flag is enabled on the VM template metadata + return not running_on_batch and is_update_task_enabled + + except requests.exceptions.HTTPError as http_error: + logs.warning(f"Http error fetching metadata: {http_error}") + + except Exception as ex: + logs.error(f"Unknown exception fetching metadata: {ex}") + + return not running_on_batch + + def main(): """Prepare the configuration options and start requesting tasks.""" logs.configure('run_bot') From a47f3bfb693bb60b54fd031f469c714ba818ab7b Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Mon, 20 Apr 2026 19:46:53 +0000 Subject: [PATCH 4/8] Metrics now register swarming fuzzing hours & adds unit tests --- .../_internal/system/environment.py | 24 ++++++++------- .../tests/core/system/environment_test.py | 29 +++++++++++++++++++ 2 files changed, 43 insertions(+), 10 deletions(-) diff --git a/src/clusterfuzz/_internal/system/environment.py b/src/clusterfuzz/_internal/system/environment.py index 90fe163f16f..f3afdf3f333 100644 --- a/src/clusterfuzz/_internal/system/environment.py +++ b/src/clusterfuzz/_internal/system/environment.py @@ -60,6 +60,7 @@ class UtaskMainRuntime(enum.Enum): BATCH = 'batch' KATA_CONTAINER = 'kata_container' INSTANCE_GROUP = 'instance_group' + SWARMING = 'swarming' def _eval_value(value_string): @@ -723,7 +724,7 @@ def is_untrusted_worker(): return get_value('UNTRUSTED_WORKER') -def is_uworker(): +def is_uworker() -> bool: """Return whether or not the current bot is a uworker. This is not the same as OSS-Fuzz's untrusted worker.""" return get_value('UWORKER') @@ -732,19 +733,22 @@ def is_uworker(): def get_runtime() -> UtaskMainRuntime: """ Get the current runtime for running the tasks. - It can be KATA_CONTAINER, BATCH or INSTANCE_GROUP. + It can be KATA_CONTAINER, BATCH, SWARMING or INSTANCE_GROUP. :return: Enum UtaskMainRuntime with one of KATA_CONTAINER, - BATCH or INSTANCE_GROUP + BATCH, SWARMING or INSTANCE_GROUP :rtype: UtaskMainRuntime """ - if is_uworker() and is_running_on_k8s(): - return UtaskMainRuntime.KATA_CONTAINER + if not is_uworker(): + return UtaskMainRuntime.INSTANCE_GROUP + + if is_running_on_swarming(): + return UtaskMainRuntime.SWARMING - if is_uworker() and not is_running_on_k8s(): - return UtaskMainRuntime.BATCH + if is_running_on_k8s(): + return UtaskMainRuntime.KATA_CONTAINER - return UtaskMainRuntime.INSTANCE_GROUP + return UtaskMainRuntime.BATCH def is_running_on_swarming() -> bool: @@ -785,9 +789,9 @@ def parse_environment_definition(environment_string): return values -def is_running_on_k8s(): +def is_running_on_k8s() -> bool: """Returns whether or not we're running on K8s.""" - return os.getenv('IS_K8S_ENV') == 'true' + return get_value('IS_K8S_ENV', False) def base_platform(override): diff --git a/src/clusterfuzz/_internal/tests/core/system/environment_test.py b/src/clusterfuzz/_internal/tests/core/system/environment_test.py index e70159d94cb..8de8ef603b8 100644 --- a/src/clusterfuzz/_internal/tests/core/system/environment_test.py +++ b/src/clusterfuzz/_internal/tests/core/system/environment_test.py @@ -51,6 +51,35 @@ def test_set_bot_environment_default_variables(self): self.assertEqual(environment.get_value('VERSION_PATTERN'), '') self.assertEqual(environment.get_value('WATCH_FOR_PROCESS_EXIT'), False) + def test_get_runtime_instance_group(self): + """Tests that get_runtime() returns INSTANCE_GROUP when not a uworker.""" + environment.set_value('UWORKER', False) + self.assertEqual(environment.get_runtime(), + environment.UtaskMainRuntime.INSTANCE_GROUP) + + def test_get_runtime_kata_container(self): + """Tests that get_runtime() returns KATA_CONTAINER when running on k8s.""" + environment.set_value('UWORKER', True) + environment.set_value('IS_K8S_ENV', 'True') + self.assertEqual(environment.get_runtime(), + environment.UtaskMainRuntime.KATA_CONTAINER) + + def test_get_runtime_swarming(self): + """Tests that get_runtime() returns SWARMING when running on swarming.""" + environment.set_value('UWORKER', True) + environment.set_value('IS_K8S_ENV', 'False') + environment.set_value('SWARMING_BOT', True) + self.assertEqual(environment.get_runtime(), + environment.UtaskMainRuntime.SWARMING) + + def test_get_runtime_batch(self): + """Tests that get_runtime() returns BATCH as fallback.""" + environment.set_value('UWORKER', True) + environment.set_value('IS_K8S_ENV', 'False') + environment.set_value('SWARMING_BOT', False) + self.assertEqual(environment.get_runtime(), + environment.UtaskMainRuntime.BATCH) + class GetExecutableFileNameTest(unittest.TestCase): """Tests for get_executable_filename.""" From 7e87ce5f7076135c490cda6d47b025ce4d31ab18 Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Tue, 28 Apr 2026 23:52:59 +0000 Subject: [PATCH 5/8] Refines conditional and removes repetead code --- src/clusterfuzz/_internal/metrics/logs.py | 29 +++++-------------- .../_internal/system/environment.py | 5 +++- 2 files changed, 11 insertions(+), 23 deletions(-) diff --git a/src/clusterfuzz/_internal/metrics/logs.py b/src/clusterfuzz/_internal/metrics/logs.py index f5d00c2d551..dcc0436db17 100644 --- a/src/clusterfuzz/_internal/metrics/logs.py +++ b/src/clusterfuzz/_internal/metrics/logs.py @@ -54,17 +54,11 @@ _default_extras = {} -def _is_running_on_k8s(): - """Returns whether or not we're running on K8s.""" - # We do this here to avoid circular imports with environment. - return os.getenv('IS_K8S_ENV') == 'true' - - def _increment_error_count(): """"Increment the error count metric.""" - if _is_running_on_k8s(): + if environment.is_running_on_k8s(): task_name = 'k8s' - elif _is_running_on_app_engine(): + elif environment.is_running_on_app_engine(): task_name = 'appengine' else: task_name = os.getenv('TASK_NAME', 'unknown') @@ -79,15 +73,6 @@ def _is_local(): os.getenv('SERVER_SOFTWARE', '').startswith('Development/')) -def _is_running_on_app_engine(): - """Return whether or not we're running on App Engine (production or - development).""" - return os.getenv('GAE_ENV') or ( - os.getenv('SERVER_SOFTWARE') and - (os.getenv('SERVER_SOFTWARE').startswith('Development/') or - os.getenv('SERVER_SOFTWARE').startswith('Google App Engine/'))) - - def _console_logging_enabled(): """Return bool on where console logging is enabled, usually for tests.""" return bool(os.getenv('LOG_TO_CONSOLE')) @@ -579,11 +564,11 @@ def configure(name, extras=None): configure_swarming(name, extras) return - if _is_running_on_k8s(): + if environment.is_running_on_k8s(): configure_k8s() return - if _is_running_on_app_engine(): + if environment.is_running_on_app_engine(): configure_appengine() return @@ -613,7 +598,7 @@ def get_logger(): if _logger: return _logger - if _is_running_on_app_engine() or _is_running_on_k8s(): + if environment.is_running_on_app_engine() or environment.is_running_on_k8s(): # Running on App Engine. set_logger(logging.getLogger()) @@ -645,7 +630,7 @@ def get_source_location(): def _add_appengine_trace(extras): """Add App Engine tracing information.""" - if not _is_running_on_app_engine(): + if not environment.is_running_on_app_engine(): return from libs import auth @@ -724,7 +709,7 @@ def emit(level, message, exc_info=None, **extras): path_name, line_number, method_name = get_source_location() - if _is_running_on_app_engine(): + if environment.is_running_on_app_engine(): if exc_info == (None, None, None): # Don't pass exc_info at all, as otherwise cloud logging will append # "NoneType: None" to the message. diff --git a/src/clusterfuzz/_internal/system/environment.py b/src/clusterfuzz/_internal/system/environment.py index 6f76c0fdc6c..cea2241979c 100644 --- a/src/clusterfuzz/_internal/system/environment.py +++ b/src/clusterfuzz/_internal/system/environment.py @@ -791,7 +791,10 @@ def parse_environment_definition(environment_string): def is_running_on_k8s() -> bool: """Returns whether or not we're running on K8s.""" - return get_value('IS_K8S_ENV', False) + env_value = get_value('IS_K8S_ENV', False) + if isinstance(env_value, str): + return env_value.lower() == 'true' + return bool(env_value) def base_platform(override): From 5e83623e12f27e32d74af8b8c8a00062c48fd3bd Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Wed, 29 Apr 2026 03:21:52 +0000 Subject: [PATCH 6/8] Fix Failing Unit Tests --- .../tests/appengine/handlers/issue_redirector_test.py | 4 ++-- .../_internal/tests/appengine/server_test.py | 6 +++--- .../_internal/tests/core/metrics/logs_test.py | 10 +++++----- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/clusterfuzz/_internal/tests/appengine/handlers/issue_redirector_test.py b/src/clusterfuzz/_internal/tests/appengine/handlers/issue_redirector_test.py index 318074a89a1..b6a36d6cab0 100644 --- a/src/clusterfuzz/_internal/tests/appengine/handlers/issue_redirector_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/handlers/issue_redirector_test.py @@ -29,9 +29,9 @@ def setUp(self): test_helpers.patch(self, [ 'clusterfuzz._internal.issue_management.issue_tracker_utils.get_issue_url', 'libs.helpers.get_testcase', - 'clusterfuzz._internal.metrics.logs._is_running_on_app_engine', + 'clusterfuzz._internal.system.environment.is_running_on_app_engine', ]) - self.mock._is_running_on_app_engine.return_value = True # pylint: disable=protected-access + self.mock.is_running_on_app_engine.return_value = True import server self.app = webtest.TestApp(server.app) diff --git a/src/clusterfuzz/_internal/tests/appengine/server_test.py b/src/clusterfuzz/_internal/tests/appengine/server_test.py index 825c0a042a6..f8c6fee04d1 100644 --- a/src/clusterfuzz/_internal/tests/appengine/server_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/server_test.py @@ -22,12 +22,12 @@ class ServerTest(unittest.TestCase): def setUp(self): helpers.patch(self, [ - 'clusterfuzz._internal.metrics.logs._is_running_on_app_engine', + 'clusterfuzz._internal.system.environment.is_running_on_app_engine', ]) - self.mock._is_running_on_app_engine.return_value = True # pylint: disable=protected-access + self.mock.is_running_on_app_engine.return_value = True - # pylint: disable=protected-access def test(self): + # pylint: disable=import-outside-toplevel import server self.assertIsNotNone(server.handlers) self.assertIsNotNone(server.cron_routes) diff --git a/src/clusterfuzz/_internal/tests/core/metrics/logs_test.py b/src/clusterfuzz/_internal/tests/core/metrics/logs_test.py index 68ad9153a10..fd92f73173b 100644 --- a/src/clusterfuzz/_internal/tests/core/metrics/logs_test.py +++ b/src/clusterfuzz/_internal/tests/core/metrics/logs_test.py @@ -495,14 +495,14 @@ def setUp(self): 'clusterfuzz._internal.metrics.logs.set_logger', 'logging.config.dictConfig', 'logging.getLogger', - 'clusterfuzz._internal.metrics.logs._is_running_on_app_engine', + 'clusterfuzz._internal.system.environment.is_running_on_app_engine', 'clusterfuzz._internal.metrics.logs.suppress_unwanted_warnings', 'google.cloud.logging.Client', ]) def test_configure(self): """Test configure.""" - self.mock._is_running_on_app_engine.return_value = False # pylint: disable=protected-access + self.mock.is_running_on_app_engine.return_value = False logs._logger = None # pylint: disable=protected-access logger = mock.MagicMock() self.mock.getLogger.return_value = logger @@ -517,7 +517,7 @@ def test_configure(self): def test_configure_appengine(self): """Test configure on App Engine.""" - self.mock._is_running_on_app_engine.return_value = True # pylint: disable=protected-access + self.mock.is_running_on_app_engine.return_value = True logs.configure('test') self.assertEqual(0, self.mock.dictConfig.call_count) @@ -549,7 +549,7 @@ class EmitTest(unittest.TestCase): def setUp(self): helpers.patch(self, [ 'clusterfuzz._internal.metrics.logs.get_logger', - 'clusterfuzz._internal.metrics.logs._is_running_on_app_engine', + 'clusterfuzz._internal.system.environment.is_running_on_app_engine', 'clusterfuzz._internal.datastore.data_types.Testcase.get_fuzz_target', 'clusterfuzz._internal.base.utils.get_instance_name', ]) @@ -578,7 +578,7 @@ def setUp(self): # Reset the `common_ctx` metadata as it may be setted by other test runs. logs.log_contexts.delete_metadata('common_ctx') logs.log_contexts.clear() - self.mock._is_running_on_app_engine.return_value = False # pylint: disable=protected-access + self.mock.is_running_on_app_engine.return_value = False def tearDown(self): os.environ.clear() From 5bfbde157bcec6f93fee4b872771a899cffc638f Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Thu, 30 Apr 2026 20:43:59 +0000 Subject: [PATCH 7/8] Cleans UT so env vars dont look misleading --- src/clusterfuzz/_internal/metrics/logs.py | 1 - .../_internal/tests/core/system/environment_test.py | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/clusterfuzz/_internal/metrics/logs.py b/src/clusterfuzz/_internal/metrics/logs.py index dcc0436db17..8322a73ed11 100644 --- a/src/clusterfuzz/_internal/metrics/logs.py +++ b/src/clusterfuzz/_internal/metrics/logs.py @@ -543,7 +543,6 @@ def configure_swarming(name: str, extras: dict[str, str] | None = None) -> None: global _default_extras _default_extras = extras - logging.basicConfig(level=logging.INFO) if _cloud_logging_enabled(): configure_cloud_logging() diff --git a/src/clusterfuzz/_internal/tests/core/system/environment_test.py b/src/clusterfuzz/_internal/tests/core/system/environment_test.py index 8de8ef603b8..3751e7b842d 100644 --- a/src/clusterfuzz/_internal/tests/core/system/environment_test.py +++ b/src/clusterfuzz/_internal/tests/core/system/environment_test.py @@ -60,14 +60,14 @@ def test_get_runtime_instance_group(self): def test_get_runtime_kata_container(self): """Tests that get_runtime() returns KATA_CONTAINER when running on k8s.""" environment.set_value('UWORKER', True) - environment.set_value('IS_K8S_ENV', 'True') + environment.set_value('IS_K8S_ENV', True) self.assertEqual(environment.get_runtime(), environment.UtaskMainRuntime.KATA_CONTAINER) def test_get_runtime_swarming(self): """Tests that get_runtime() returns SWARMING when running on swarming.""" environment.set_value('UWORKER', True) - environment.set_value('IS_K8S_ENV', 'False') + environment.set_value('IS_K8S_ENV', False) environment.set_value('SWARMING_BOT', True) self.assertEqual(environment.get_runtime(), environment.UtaskMainRuntime.SWARMING) @@ -75,7 +75,7 @@ def test_get_runtime_swarming(self): def test_get_runtime_batch(self): """Tests that get_runtime() returns BATCH as fallback.""" environment.set_value('UWORKER', True) - environment.set_value('IS_K8S_ENV', 'False') + environment.set_value('IS_K8S_ENV', False) environment.set_value('SWARMING_BOT', False) self.assertEqual(environment.get_runtime(), environment.UtaskMainRuntime.BATCH) From e41ede7729a0254476c0375c52d66b4af32e8f09 Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Thu, 30 Apr 2026 22:20:23 +0000 Subject: [PATCH 8/8] Tags error count for swarming --- src/clusterfuzz/_internal/metrics/logs.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/clusterfuzz/_internal/metrics/logs.py b/src/clusterfuzz/_internal/metrics/logs.py index 8322a73ed11..a66fa138ba4 100644 --- a/src/clusterfuzz/_internal/metrics/logs.py +++ b/src/clusterfuzz/_internal/metrics/logs.py @@ -56,7 +56,9 @@ def _increment_error_count(): """"Increment the error count metric.""" - if environment.is_running_on_k8s(): + if environment.is_running_on_swarming(): + task_name = 'swarming' + elif environment.is_running_on_k8s(): task_name = 'k8s' elif environment.is_running_on_app_engine(): task_name = 'appengine'