Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions .github/workflows/nightly.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
name: Nightly ITK

on:
push:
branches:
- '**'
schedule:
- cron: '0 2 * * *' # 2:00 AM UTC daily
workflow_dispatch: # Allow manual execution

permissions:
contents: write

jobs:
nightly:
name: Nightly ITK Run
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v6

- name: Install uv
uses: astral-sh/setup-uv@v7

- name: Run Nightly ITK Tests
run: bash run_itk.sh
working-directory: itk
env:
A2A_SAMPLES_REVISION: feat/enable-building-subtests
ITK_NIGHTLY_RUN: "True"

- name: Upload Results to Rolling Release
uses: softprops/action-gh-release@v2
with:
tag_name: "nightly-metrics"
prerelease: true
files: |
itk/itk_python.json
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
39 changes: 33 additions & 6 deletions itk/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,50 +57,63 @@
logger = logging.getLogger(__name__)


def extract_instruction(
message: Message | None,
) -> instruction_pb2.Instruction | None:
"""Extracts an Instruction proto from an A2A Message."""
if not message or not message.parts:
return None

for part in message.parts:
# 1. Handle binary protobuf part (media_type or filename)
if (
part.media_type == 'application/x-protobuf'
or part.filename == 'instruction.bin'
):
try:
inst = instruction_pb2.Instruction()
if part.raw:
inst.ParseFromString(part.raw)
elif part.text:
# Some clients might send it as base64 in text part
raw = base64.b64decode(part.text)
inst.ParseFromString(raw)
except Exception:
logger.debug(
'Failed to parse instruction from binary part',
exc_info=True,
)
continue
else:
return inst

# 2. Handle base64 encoded instruction in any text part
if part.text:
try:
raw = base64.b64decode(part.text)
inst = instruction_pb2.Instruction()
inst.ParseFromString(raw)
except Exception:
logger.debug(
'Failed to parse instruction from text part', exc_info=True
)
continue
else:
return inst

Check failure on line 102 in itk/main.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

ruff (RET503)

itk/main.py:60:1: RET503 Missing explicit `return` at the end of function able to return non-`None` value help: Add explicit `return` statement
return None
def _get_text_from_part(part: Any) -> str | None:
"""Safely extracts text string from a Part object supporting protobuf, pydantic, and raw dict."""
if not part:
return None
if hasattr(part, 'HasField'):
try:
if part.HasField('text'):
return part.text
except ValueError:
pass
root = getattr(part, 'root', part)
if isinstance(root, dict):
return root.get('text')
return getattr(root, 'text', None)


def _extract_text_from_event(event: Any) -> list[str]:
Expand Down Expand Up @@ -128,7 +141,7 @@
return results


async def _handle_call_agent_with_resubscribe(

Check failure on line 144 in itk/main.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

ruff (PLR0915)

itk/main.py:144:11: PLR0915 Too many statements (52 > 50)

Check failure on line 144 in itk/main.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

ruff (PLR0912)

itk/main.py:144:11: PLR0912 Too many branches (21 > 12)
client: Client, request: SendMessageRequest
) -> list[str]:
"""Handles the send-disconnect-resubscribe flow."""
Expand Down Expand Up @@ -157,6 +170,21 @@
if hasattr(event, 'HasField') and event.HasField('task'):
task_obj = event.task

if task_obj and hasattr(task_obj, 'history'):
for msg in task_obj.history:
if str(msg.role) in {'2', 'ROLE_AGENT', 'agent'}:
for part in msg.parts:
text = _get_text_from_part(part)
if text and 'task-finished' in text:
logger.info('Found task-finished in history, breaking loop!')
results.append(text.replace('task-finished', ''))
finished = True
break
if finished:
break
if finished:
break

extracted_text = _extract_text_from_event(event)
for text in extracted_text:
processed_text = text.replace('task-finished', '')
Expand All @@ -174,11 +202,10 @@
# Check stringified role to support protobuf enums (2 for ROLE_AGENT in v0.3 and v1.0)
# as well as string descriptors from dict/JSON forms.
if str(msg.role) in {'2', 'ROLE_AGENT', 'agent'}:
results.extend(
part.text.replace('task-finished', '')
for part in msg.parts
if part.text
)
for part in msg.parts:
text = _get_text_from_part(part)
if text:
results.append(text.replace('task-finished', ''))

if not finished:
logger.info('Canceling task %s after retrieval.', task_id)
Expand Down
206 changes: 206 additions & 0 deletions itk/process_results.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
#!/usr/bin/env python3
"""ITK Compatibility Metrics Processor.

Compiles test outcomes from raw JSON results, retrieves and aggregates historical
runs from GitHub Release assets, and outputs the updated historical metrics log.
"""

import datetime
import json
import logging
import os
import pathlib
import sys
import urllib.error
import urllib.request


# --- CONSTANTS ---
RESULTS_FILE = 'raw_results.json'
HISTORY_OUTPUT_FILE = 'itk_python.json'
HISTORY_URL = 'https://github.com/a2aproject/a2a-python/releases/download/nightly-metrics/itk_python.json'
SCENARIOS_FILE = 'scenarios.json'
DEFAULT_HISTORY_LIMIT = 50

HTTP_STATUS_OK = 200
HTTP_STATUS_NOT_FOUND = 404

# Configure logging to match standard ITK formatting
logging.basicConfig(
level=logging.INFO,
)
logger = logging.getLogger(__name__)


def load_raw_results(filepath: str) -> dict:
"""Loads the raw compatibility results from raw_results.json."""
path = pathlib.Path(filepath)
if not path.exists():
logger.error('Results file %s not found.', filepath)
raise SystemExit(1)

try:
with path.open() as f:
return json.load(f)
except (OSError, json.JSONDecodeError):
logger.exception('Error loading results JSON')
raise SystemExit(1) from None


def fetch_existing_history(url: str) -> list:
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
"""Fetches the existing compatibility history from the GitHub release asset.

If the asset does not exist (HTTP 404), a fresh empty history list is returned.
For all other network or server errors, the script exits with a non-zero status
to prevent overwriting and losing historical metrics.
"""
try:
req = urllib.request.Request( # noqa: S310
url, headers={'User-Agent': 'Mozilla/5.0'}
)
with urllib.request.urlopen(req, timeout=15) as response: # noqa: S310
if response.status == HTTP_STATUS_OK:
history = json.loads(response.read().decode('utf-8'))
logger.info(
'Successfully retrieved history. Current entries: %d',
len(history),
)
return history
logger.error(
'Unexpected HTTP status when downloading existing history: %d',
response.status,
)
raise SystemExit(1) # noqa: TRY301
except urllib.error.HTTPError as e:
if e.code == HTTP_STATUS_NOT_FOUND:
logger.warning(
'No existing history found (HTTP %d). Initializing fresh history.',
e.code,
)
return []
logger.exception(
'HTTP error downloading existing history: %d. Aborting to preserve metrics.',
e.code,
)
raise SystemExit(1) from None
except Exception:
logger.exception(
'Failed to download existing history. Aborting to preserve metrics.'
)
raise SystemExit(1) from None


def load_scenarios(filepath: str) -> list:
"""Loads the list of tests from the scenarios.json definitions."""
path = pathlib.Path(filepath)
if not path.exists():
logger.error('Scenarios file %s not found.', filepath)
raise SystemExit(1)

try:
with path.open() as f:
data = json.load(f)
return data['tests']
except (OSError, json.JSONDecodeError, KeyError):
logger.exception('Failed to load scenarios.json definitions')
raise SystemExit(1) from None


def save_history(filepath: str, history: list) -> None:
"""Saves the updated history back to disk as a release asset candidate."""
path = pathlib.Path(filepath)
try:
with path.open('w') as f:
json.dump(history, f, indent=2)
logger.info(
'Successfully compiled and wrote nightly history to: %s',
filepath,
)
except (OSError, TypeError):
logger.exception('Error writing history file')
sys.exit(1)


def main() -> None:
"""Orchestrates nightly ITK metrics processing and compiles rolling history."""
# 1. Load raw compatibility results
data = load_raw_results(RESULTS_FILE)
all_passed = data.get('all_passed', False)
results = data.get('results', {})

# 2. Fetch existing history from rolling release
history = fetch_existing_history(HISTORY_URL)

# 3. Load scenarios list for base metadata
scenarios_file = 'scenarios_full.json' if os.environ.get('ITK_NIGHTLY_RUN') == 'True' else 'scenarios.json'
base_scenarios = load_scenarios(scenarios_file)
# Merge definitions with current outcomes dynamically
compiled_scenarios = []
for name, details in results.items():
# Extract the parent scenario name cleanly by splitting on the subtest suffix
parent_name = name.split('-sub-')[0]

# Find the matching base scenario with an EXACT match!
matched_base = None
for base in base_scenarios:
if parent_name == base['name']:
matched_base = base
break

Check failure on line 149 in itk/process_results.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

ruff (W293)

itk/process_results.py:149:1: W293 Blank line contains whitespace help: Remove whitespace from blank line
if not matched_base:
logger.warning('No matching base scenario found for result key: %s', name)
continue

# Build the metadata-rich scenario record
passed = False
sdks = matched_base.get('sdks', [])
edges = matched_base.get('edges')

Check failure on line 158 in itk/process_results.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

ruff (W293)

itk/process_results.py:158:1: W293 Blank line contains whitespace help: Remove whitespace from blank line
if isinstance(details, dict):
passed = details.get('passed', False)
sdks = details.get('sdks', sdks)
edges = details.get('edges', edges)
elif isinstance(details, bool):
passed = details

record = {
'name': name,
'sdks': sdks,
'edges': edges,
'protocols': matched_base.get('protocols'),
'behavior': matched_base.get('behavior'),
'traversal': matched_base.get('traversal', 'euler'),
'passed': passed,
}
if 'streaming' in matched_base:
record['streaming'] = matched_base['streaming']
if 'build_subtests' in matched_base:
record['build_subtests'] = matched_base['build_subtests']

compiled_scenarios.append(record)

# 4. Compile new run metadata
new_run = {
'timestamp': datetime.datetime.now(datetime.timezone.utc).isoformat(),
'commit_sha': os.environ.get('GITHUB_SHA', 'local-dev'),
'github_run_id': os.environ.get('GITHUB_RUN_ID', '0'),
'all_passed': all_passed,
'scenarios': compiled_scenarios,
}

# 5. Merge and Prune rolling window
history.append(new_run)
history_limit = int(
os.environ.get('ITK_HISTORY_LIMIT', str(DEFAULT_HISTORY_LIMIT))
)
if len(history) > history_limit:
history = history[-history_limit:]
logger.info('Pruned history to last %d entries.', history_limit)

# 6. Save candidates back to disk
save_history(HISTORY_OUTPUT_FILE, history)
sys.exit(0)


if __name__ == '__main__':
main()
Loading
Loading