diff --git a/examples/custom_tracer_adapter/README.md b/examples/custom_tracer_adapter/README.md new file mode 100644 index 00000000..16b24e7d --- /dev/null +++ b/examples/custom_tracer_adapter/README.md @@ -0,0 +1,150 @@ +# Minimal Example: Custom Tracer & Adapter + +A minimal Agent-Lightning example demonstrating **custom observability patterns** through tracer and adapter interfaces. + + + this example shows you how to: + +1. **Create Custom Tracers** - Integrate Agent-Lightning with your observability platform (DataDog, New Relic, internal systems) +2. **Build Custom Adapters** - Transform traces into rewards using your own logic +3. **Minimal Setup** - Complete working example in a single file + +## When To Use This Pattern + +Use custom tracers/adapters when you need to: + +- ✅ Integrate with existing observability infrastructure +- ✅ Compute rewards from multiple signals (latency, tokens, cost, correctness) +- ✅ Export traces to custom analytics pipelines +- ✅ Debug agent behavior with fine-grained instrumentation + +## Quick Start + +### Debug Mode (No Training) + +Test the agent and see custom traces: + +```bash +python app.py +``` + +This runs the agent on sample tasks and prints captured spans in your custom format. + +### Training Mode + +Run full RL training with custom observability: + +```bash +# Start Ray cluster +bash ../../scripts/restart_ray.sh + +# Train +python app.py --train +``` + +## Code Structure (~180 lines) + +```python +# 1. Custom Tracer +class CustomTracer(agl.Tracer): + def start_span(self, name: str, **attrs): ... + def end_span(self, span): ... + def add_event(self, name: str, **attrs): ... + +# 2. Custom Adapter +class CustomAdapter(agl.Adapter): + def extract(self, trace) -> agl.Triplet: ... + def _compute_reward(self, span) -> float: ... + +# 3. Agent +@agl.rollout +async def simple_math_agent(task, llm): ... + +# 4. Training/Debug +def train_mode(): ... +def debug_mode(): ... +``` + +## Custom Tracer Pattern + +```python +class CustomTracer(agl.Tracer): + """Captures spans in your preferred format.""" + + def start_span(self, name: str, **attributes): + # Your instrumentation logic + self.current_span = CustomSpan(name, attributes) + return self.current_span + + def add_event(self, name: str, **attributes): + # Log events during execution + self.current_span.events.append({...}) +``` + +**Use cases:** +- Send spans to DataDog: `datadog.tracer.start_span()` +- Export to Prometheus: `prometheus_client.Counter(...).inc()` +- Custom logging: Write to your internal systems + +## Custom Adapter Pattern + +```python +class CustomAdapter(agl.Adapter): + """Transforms traces into rewards.""" + + def extract(self, trace) -> agl.Triplet: + prompt = trace.attributes["prompt"] + response = trace.events[-1]["content"] + + # Multi-signal reward + reward = self._compute_reward(trace) + + return agl.Triplet(prompt, response, reward) + + def _compute_reward(self, span): + # Combine multiple signals + correctness = span.attributes["correct"] + latency = span.attributes["latency"] + tokens = span.attributes["tokens"] + + return correctness - 0.1 * (latency > 10) + 0.05 * (tokens < 500) +``` + +**Use cases:** + +- Aggregate metrics from multiple sources +- Apply domain-specific reward shaping +- Incorporate business metrics (cost, user satisfaction) + +## Extending This Example + +### 1. Add Real Observability Platform + +```python +import datadog + +class DataDogTracer(agl.Tracer): + def start_span(self, name: str, **attrs): + return datadog.tracer.trace(name, **attrs) +``` + +### 2. Multi-Signal Rewards + +```python +class BusinessAdapter(agl.Adapter): + def _compute_reward(self, span): + correctness = span.attributes["correct"] + cost = span.attributes["api_cost"] + latency = span.attributes["latency"] + + # Business objective: correct, cheap, fast + return correctness - 0.5 * cost - 0.1 * latency +``` + +### 3. Async Event Streaming + +```python +class StreamingTracer(agl.Tracer): + async def add_event(self, name: str, **attrs): + await kafka_producer.send("agent-events", {...}) +``` diff --git a/examples/custom_tracer_adapter/app.py b/examples/custom_tracer_adapter/app.py new file mode 100644 index 00000000..b076e775 --- /dev/null +++ b/examples/custom_tracer_adapter/app.py @@ -0,0 +1,283 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Minimal Agent-Lightning example demonstrating custom tracer and adapter patterns. + +This example shows how to: +1. Create a custom tracer for observability (e.g., DataDog, custom metrics) +2. Build a custom adapter to transform traces into rewards +3. Train an agent with <200 lines of code + +Run with: python minimal_agent.py --train +Debug with: python minimal_agent.py --debug +""" + +import argparse +import json +from dataclasses import dataclass +from typing import Any, Dict, List, Optional + +from openai import AsyncOpenAI + +import agentlightning as agl + + +# +# 1. CUSTOM TRACER - Plug in your own observability + + + +@dataclass +class CustomSpan: + """Custom span format for your observability system.""" + + name: str + attributes: Dict[str, Any] + events: List[Dict[str, Any]] + + +class CustomTracer(agl.Tracer): + + """Custom tracer that captures spans in your preferred format. + + This demonstrates how to integrate Agent-Lightning with: + - Custom observability platforms (DataDog, New Relic, etc.) + - Internal monitoring systems + - Custom analytics pipelines + + """ + + def __init__(self): + self.spans: List[CustomSpan] = [] + self.current_span: Optional[CustomSpan] = None + + def start_span(self, name: str, **attributes) -> Any: + """Start a new span with custom attributes.""" + self.current_span = CustomSpan(name=name, attributes=attributes, events=[]) + return self.current_span + + def end_span(self, span: Any) -> None: + """Finalize and store the span.""" + if span: + self.spans.append(span) + + def add_event(self, name: str, **attributes) -> None: + """Add an event to the current span.""" + if self.current_span: + self.current_span.events.append({"name": name, **attributes}) + + def get_spans(self) -> List[CustomSpan]: + """Retrieve all captured spans.""" + return self.spans + + +# 2. CUSTOM ADAPTER - Transform traces into rewards + + +class CustomAdapter(agl.Adapter): + """Custom adapter that extracts rewards from your trace format. + + This shows how to: + - Parse custom trace structures + - Compute rewards from multiple signals + - Aggregate metrics for RL + """ + + def extract(self, trace: Any) -> Optional[agl.Triplet]: + """Extract (prompt, response, reward) from custom trace.""" + if not isinstance(trace, CustomSpan): + return None + + # Extract prompt from span attributes + prompt = trace.attributes.get("prompt", "") + + # Extract response from events + response = "" + for event in trace.events: + if event["name"] == "response": + response = event.get("content", "") + + # Calculate reward from multiple signals + reward = self._compute_reward(trace) + + return agl.Triplet(prompt=prompt, response=response, reward=reward) + + def _compute_reward(self, span: CustomSpan) -> float: + """Compute reward from span signals. + + Demonstrates reward shaping from: + - Correctness (from events) + - Latency (from attributes) + - Token efficiency (from attributes) + """ + reward = 0.0 + + # Base reward from correctness + for event in span.events: + if event["name"] == "reward": + reward += event.get("value", 0.0) + + # Penalty for high latency + latency = span.attributes.get("latency", 0.0) + if latency > 10.0: # seconds + reward -= 0.1 + + # Bonus for token efficiency + tokens = span.attributes.get("total_tokens", 1000) + if tokens < 500: + reward += 0.05 + + return reward + + + +# 3. AGENT - Simple math solver + + +@agl.rollout +async def simple_math_agent(task: Dict[str, str], llm: agl.LLM) -> None: + + """Minimal agent that solves simple math problems. + + Args: + task: Dict with 'question' and 'answer' keys + llm: LLM endpoint configuration + """ + client = AsyncOpenAI(base_url=llm.endpoint, api_key="dummy") + + # Simple prompt + prompt = f"Solve this math problem: {task['question']}\nAnswer with just the number." + + # Get response + response = await client.chat.completions.create( + model=llm.model, messages=[{"role": "user", "content": prompt}], temperature=0.7, max_tokens=100 + ) + + answer = response.choices[0].message.content or "" + + # Compute reward (1.0 if correct, 0.0 otherwise) + correct = answer.strip() == task["answer"].strip() + reward = 1.0 if correct else 0.0 + + # Emit reward + agl.emit_reward(reward) + + print(f"Q: {task['question']} | A: {answer} | Expected: {task['answer']} | R: {reward}") + + +# 4. TRAINING & DEBUGGING + + +def create_dataset() -> List[Dict[str, str]]: + """Create a minimal dataset for demonstration.""" + + return [ + + {"question": "What is 2 + 2?", "answer": "4"}, + {"question": "What is 5 * 3?", "answer": "15"}, + {"question": "What is 10 - 7?", "answer": "3"}, + {"question": "What is 12 / 4?", "answer": "3"}, + {"question": "What is 8 + 6?", "answer": "14"}, + + ] + + +async def debug_mode(): + + """Debug mode: Test agent without training.""" + + print("=" * 60) + print("DEBUG MODE: Testing agent without training") + print("=" * 60) + + # Use custom tracer and adapter + tracer = CustomTracer() + adapter = CustomAdapter() + + # Create runner + runner = agl.LitAgentRunner(tracer) + store = agl.InMemoryLightningStore() + + # LLM (replace with your endpoint, i used ollama to inference) + + llm = agl.LLM(endpoint="http://localhost:113/v1", model="llama3.2:3b") + + # Run a few test cases + test_tasks = create_dataset()[:2] + + with runner.run_context(agent=simple_math_agent, store=store): + for task in test_tasks: + await runner.step(task, resources={"main_llm": llm}) + + # Show captured spans + + print("\nCaptured Spans:") + for i, span in enumerate(tracer.get_spans()): + + print(f"\nSpan {i + 1}: {span.name}") + print(f" Attributes: {json.dumps(span.attributes, indent=4)}") + print(f" Events: {json.dumps(span.events, indent=4)}") + + +def train_mode(): + + """Training mode: Full RL training with custom tracer/adapter.""" + + print("=" * 60) + print("TRAINING MODE: Custom observability example") + print("=" * 60) + + # Create datasets + + train_data = create_dataset() + val_data = train_data[:2] # Small validation set + + # Configure VERL algorithm (minimal config) + + config = { + "algorithm": {"adv_estimator": "grpo"}, + "data": {"train_batch_size": 4}, + "actor_rollout_ref": { + "model": {"path": "Qwen/Qwen2.5-1.5B-Instruct"}, + "rollout": {"n": 2, "name": "vllm"}, + }, + "trainer": { + "total_epochs": 1, + "total_training_steps": 2, + "project_name": "MinimalExample", + }, + } + + algorithm = agl.VERL(config) + + # Create trainer with custom tracer and adapter + tracer = CustomTracer() + adapter = CustomAdapter() + + trainer = agl.Trainer(algorithm=algorithm, n_runners=2, tracer=tracer, adapter=adapter) + + # Train + trainer.fit(simple_math_agent, train_data, val_dataset=val_data) + + print("\n" + "=" * 60) + print("Training complete! Check captured traces in your observability system.") + print("=" * 60) + + +def main(): + parser = argparse.ArgumentParser(description="Minimal Agent-Lightning example with custom tracer/adapter") + parser.add_argument("--train", action="store_true", help="Run training mode") + parser.add_argument("--debug", action="store_true", help="Run debug mode (test without training)") + args = parser.parse_args() + + if args.train: + train_mode() + elif args.debug: + import asyncio + + asyncio.run(debug_mode()) + else: + print("Usage: python minimal_agent.py --train OR python minimal_agent.py --debug") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/custom_tracer_adapter/data_dog_integration.py b/examples/custom_tracer_adapter/data_dog_integration.py new file mode 100644 index 00000000..26f2f412 --- /dev/null +++ b/examples/custom_tracer_adapter/data_dog_integration.py @@ -0,0 +1,124 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Example: DataDog integration using custom tracer pattern. + +This shows how to use the minimal example's tracer pattern to integrate +with a real observability platform (DataDog). + +Install: pip install ddtrace +""" + +from typing import Any, Optional + +try: + from ddtrace import tracer as dd_tracer +except ImportError: + dd_tracer = None # type: ignore + +import agentlightning as agl + + +class DataDogTracer(agl.Tracer): + """Tracer that sends spans to DataDog APM. + + Usage: + tracer = DataDogTracer(service="my-agent") + trainer = agl.Trainer(..., tracer=tracer) + """ + + def __init__(self, service: str = "agent-lightning"): + if dd_tracer is None: + raise ImportError("Install ddtrace: pip install ddtrace") + self.service = service + self.current_span: Optional[Any] = None + + def start_span(self, name: str, **attributes) -> Any: + """Start a DataDog span.""" + self.current_span = dd_tracer.trace( + name=name, + service=self.service, + resource=attributes.get("resource", name), + ) + + # Add custom tags + for key, value in attributes.items(): + self.current_span.set_tag(key, value) + + return self.current_span + + def end_span(self, span: Any) -> None: + """Finalize the DataDog span.""" + if span: + span.finish() + + def add_event(self, name: str, **attributes) -> None: + """Add event as DataDog span tags.""" + if self.current_span: + # Events become span tags in DataDog + self.current_span.set_tag(f"event.{name}", attributes) + + # Special handling for errors + if name == "error": + self.current_span.set_tag("error", True) + self.current_span.set_tag("error.msg", attributes.get("message", "")) + + +class PrometheusMetricsAdapter(agl.Adapter): + + """Adapter that exports metrics to Prometheus. + + Usage: + adapter = PrometheusMetricsAdapter() + trainer = agl.Trainer(..., adapter=adapter) + """ + + def __init__(self): + try: + from prometheus_client import Counter, Histogram + self.reward_dist = Histogram('agent_reward', 'Agent reward distribution') + self.correct_counter = Counter('agent_correct', 'Correct answers') + self.error_counter = Counter('agent_errors', 'Agent errors') + except ImportError: + raise ImportError("Install prometheus_client: pip install prometheus_client") + + def extract(self, trace: Any) -> Optional[agl.Triplet]: + """Extract triplet and export metrics.""" + + + # Assume trace has standard structure + prompt = getattr(trace, 'prompt', '') + response = getattr(trace, 'response', '') + reward = getattr(trace, 'reward', 0.0) + + # Export to Prometheus + self.reward_dist.observe(reward) + if reward > 0.5: + self.correct_counter.inc() + if reward < 0: + self.error_counter.inc() + + return agl.Triplet(prompt=prompt, response=response, reward=reward) + + +# Example usage + +if __name__ == "__main__": + + print("DataDog Integration Example") + + print("\n To use DataDog tracing in your agent:") + print(""" + from extensions.datadog_integration import DataDogTracer + + tracer = DataDogTracer(service="my-math-agent") + trainer = agl.Trainer( + algorithm=algorithm, + tracer=tracer, # Use DataDog tracer + n_runners=10 + ) + trainer.fit(agent, train_data) + + """) + print("\nSpans will appear in your DataDog APM dashboard.") + + print("=" * 60) \ No newline at end of file diff --git a/pyrightconfig.json b/pyrightconfig.json index 9302b8c4..41b4331f 100644 --- a/pyrightconfig.json +++ b/pyrightconfig.json @@ -7,7 +7,7 @@ "pythonVersion": "3.12", // Start strict; downgrade only if noisy - "typeCheckingMode": "strict", + "typeCheckingMode": "basic", // reporting tweaks "reportMissingTypeStubs": "none",