A configurable DAG-based pipeline framework for processing large text datasets with LLM integration.
Trawler is a scalable pipeline framework for running multi-stage text processing workflows over large datasets. Built on Ray Data for distributed processing and vLLM for efficient LLM inference, Trawler enables researchers and engineers to:
- Define complex multi-stage pipelines as YAML DAGs
- Process millions of documents using distributed computing
- Integrate LLMs for classification, extraction, and synthesis
- Track experiments automatically with Weights & Biases
- Deploy on local machines or SLURM clusters
- Configuration-Driven: Define pipelines in YAML, no code changes needed
- Dagspace Architecture: Modular domain-specific pipeline configurations
- Scalable: Process millions of records using Ray Data and SLURM clusters
- LLM-Integrated: Built-in vLLM support with automatic GPU management
- Extensible: Easy to add custom processing stages
- Tracked: Automatic experiment logging with Weights & Biases
┌─────────────────────────────────────────────────────────────────┐
│ Pipeline Definition (YAML) │
├─────────────────────────────────────────────────────────────────┤
│ Sources → Stage 1 → Stage 2 → Stage 3 → Outputs │
│ (Data) (filter) (extract) (synth) (Parquet) │
├─────────────────────────────────────────────────────────────────┤
│ Orchestrator (DAG Execution Engine) │
├─────────────────────────────────────────────────────────────────┤
│ Ray Data (Distributed) | vLLM (GPU) | SLURM (Cluster) | W&B │
├─────────────────────────────────────────────────────────────────┤
│ Hydra (Configuration Management) │
└─────────────────────────────────────────────────────────────────┘
Trawler organizes domain-specific pipelines into dagspaces - self-contained modules with their own stages, configurations, and prompts:
| Dagspace | Domain | Description |
|---|---|---|
| uair | News Analysis | AI risk assessment in news media coverage |
| historical_norms | Literature | Norm extraction from historical texts |
| rule_tuples | Social Media | Rule classification from Reddit communities |
Each dagspace follows a consistent structure:
dagspaces/{name}/
├── cli.py # Hydra CLI entry point
├── orchestrator.py # Pipeline execution engine
├── conf/ # Configuration files
│ ├── config.yaml # Base config
│ ├── pipeline/ # Pipeline DAG definitions
│ ├── prompt/ # LLM prompt templates
│ └── model/ # Model configurations
├── runners/ # Stage runner classes
└── stages/ # Stage implementations
# Clone the repository
git clone https://github.com/your-org/trawler.git
cd trawler
# Create virtual environment
python -m venv .venv
source .venv/bin/activate
# Install with uv (recommended)
uv pip install -e .# Run the UAIR news analysis pipeline
python -m dagspaces.uair.cli \
pipeline=full_event_pipeline \
data.parquet_path=/path/to/articles.parquet
# Run historical norms extraction
python -m dagspaces.historical_norms.cli \
pipeline=norm_extraction \
data.parquet_path=/path/to/texts.parquet# Run with sampling for quick iteration
python -m dagspaces.uair.cli \
runtime.debug=true \
runtime.sample_n=100 \
data.parquet_path=/path/to/data.parquetComplete documentation is available in docs/:
| Document | Description |
|---|---|
| User Guide | Complete introduction and Quick Start |
| Configuration Guide | Pipeline recipes and config patterns |
| Custom Stages Guide | Building custom processing stages |
| Quick Reference | Command cheat sheet |
trawler/
├── dagspaces/ # Domain-specific pipelines
│ ├── uair/ # News AI analysis
│ │ ├── cli.py # CLI entry point
│ │ ├── orchestrator.py # Pipeline orchestrator
│ │ ├── conf/ # Configuration files
│ │ ├── runners/ # Stage runners
│ │ └── stages/ # Stage implementations
│ ├── historical_norms/ # Historical text analysis
│ └── rule_tuples/ # Social media rules
├── docs/ # Documentation
├── notebooks/ # Analysis notebooks
├── scripts/ # Utility scripts
├── viz/ # Visualization projects
└── pyproject.toml # Project configuration
Trawler uses Hydra for hierarchical configuration. Pipelines are defined as DAGs in YAML:
# conf/pipeline/my_pipeline.yaml
pipeline:
sources:
articles:
path: ${data.parquet_path}
graph:
nodes:
classify:
stage: classify_relevance
inputs: {articles: articles}
outputs: [classified]
extract:
stage: decompose
depends_on: [classify]
inputs: {articles: classified}
outputs: [extracted]Override from command line:
python -m dagspaces.uair.cli \
pipeline=my_pipeline \
model.batch_size=16 \
runtime.sample_n=1000python -m dagspaces.uair.cli \
hydra/launcher=null \
runtime.sample_n=100python -m dagspaces.uair.cli \
pipeline=full_event_pipeline \
hydra/launcher=g2_slurm_gpu_4x- Implement stage function in
dagspaces/{name}/stages/mystage.py:
def run_mystage(df, cfg):
"""Process dataframe with custom logic."""
# Your processing logic
return df- Create runner in
dagspaces/{name}/runners/mystage.py:
from .base import StageRunner
class MyStageRunner(StageRunner):
stage_name = "mystage"
def run(self, context):
from ..stages.mystage import run_mystage
return run_mystage(context.input_df, context.cfg)- Register in
dagspaces/{name}/runners/__init__.py:
STAGE_REGISTRY["mystage"] = MyStageRunner()See Custom Stages Guide for details.
Analyze AI-related risks and benefits in news coverage:
python -m dagspaces.uair.cli \
pipeline=classify_risks_and_benefits_from_decompose \
data.parquet_path=/data/news_articles.parquetStages: classify_relevance → decompose → verify → taxonomy → topic → synthesis
Extract structured norms from historical texts:
python -m dagspaces.historical_norms.cli \
pipeline=norm_extraction \
data.parquet_path=/data/gutenberg_texts.parquetStages: fetch_gutenberg → norm_reasoning → norm_extraction
We welcome contributions! Areas of interest:
- New Dagspaces: Domain-specific pipeline configurations
- Stages: Additional processing capabilities
- Optimizations: Performance improvements
- Documentation: Examples, tutorials, guides
MIT License - see LICENSE file for details.
Built with:
- Hydra - Configuration management
- Ray Data - Distributed processing
- vLLM - LLM inference
- Weights & Biases - Experiment tracking
- SLURM - Cluster scheduling