Lightweight Python asynchronous task queue manager, no extra services required, ready to use out of the box.
NeoTask is a pure Python-based asynchronous task queue scheduling system specifically designed for time-consuming tasks (such as AI generation, video processing, data scraping, etc.). It supports scheduled tasks, periodic tasks, and delayed tasks. There is no need to deploy external services like Redis or PostgreSQL. After installation, it can be directly used in any Python project.
中文 | English | Documentation | PyPI | website
- Zero-Dependency Deployment - Pure Python implementation, no Redis/PostgreSQL required
- Immediate Tasks - Supports priority scheduling, high-priority tasks execute first
- Scheduled Tasks - Supports delayed execution, fixed intervals, and Cron expressions
- Asynchronous Concurrency - Based on asyncio, multi-worker concurrent processing
- Automatic Retry - Failed tasks automatically retry with configurable attempts
- Persistence - Multiple storage backends: Memory/SQLite/Redis
- DAG Workflow - Support for task orchestration, conditional branches, and parallel execution
- Distributed Support - Distributed task scheduling, high availability, and fault tolerance
- Event Callbacks - Supports task lifecycle event listeners
| Scenario | Description | Recommended Configuration | Entry Point |
|---|---|---|---|
| AI Text-to-Image/Video Generation | Queue time-consuming tasks to avoid blocking main flow | worker_concurrency=3 |
TaskPool |
| Batch File Processing | Batch operations like transcoding, compression, uploading | worker_concurrency=10 |
TaskPool |
| Web Scraping Scheduling | Distributed scraping to prevent being blocked | storage_type="redis" |
TaskPool |
| Scheduled Report Sending | Send daily reports at 9 AM | cron="0 9 * * *" |
TaskScheduler |
| Delayed Notifications | Send reminders 5 minutes after user action | delay_seconds=300 |
TaskScheduler |
| Heartbeat Detection | Check service health status every 30 seconds | interval_seconds=30 |
TaskScheduler |
| Background Data Analysis | Execute data aggregation tasks at night | cron="0 2 * * *" |
TaskScheduler |
graph TB
subgraph User["User Application Layer"]
APP[User Code]
end
subgraph NeoTask["NeoTask Core"]
TP[TaskPool<br/>Immediate Task Entry]
TS[TaskScheduler<br/>Scheduled Task Entry]
subgraph Core["Shared Core Components"]
LM[LifecycleManager<br/>Task Lifecycle Management]
QS[QueueScheduler<br/>Priority + Delayed Queue]
WP[WorkerPool<br/>Worker Pool/Concurrency Control]
FM[FutureManager<br/>Async Wait/Result Callback]
end
subgraph Internal["Internal Components"]
EB[EventBus<br/>Event Bus]
MC[MetricsCollector<br/>Metrics Collection]
LF[LockFactory<br/>Distributed Lock]
end
EX[TaskExecutor<br/>User Business Logic]
end
subgraph Storage["Storage Layer"]
MEM[MemoryStorage]
SQLITE[(SQLiteStorage)]
REDIS[(RedisStorage)]
end
APP -->|Immediate Task| TP
APP -->|Scheduled Task| TS
TS -->|Delegates| TP
TP --> LM
TP --> QS
TP --> WP
TP --> FM
LM --> MEM
LM --> SQLITE
LM --> REDIS
WP --> EX
WP --> EB
WP --> MC
WP --> LF
Development Roadmap
timeline
title NeoTask Architecture Evolution Roadmap
section v0.1
Basic Task Pool : Local Memory Queue
: Async Execution Engine
: Memory/SQLite Storage
section v0.2
Observability : Event Bus
: Metrics Collection
: Health Check
section v0.3
Scheduled Tasks : Delayed Queue/Time Wheel
: Periodic Tasks
: Cron Expression
section v0.4
Distributed Base : Redis Shared Queue
: Distributed Lock
section v0.5
Performance Optimization : Prefetch Mechanism
: Batch Operations
: Connection Pool
section v1.0
High Availability : Watchdog Renewal
: Timeout Detection
: Automatic Fault Recovery
section v1.5
Task Orchestration : DAG Workflow
: Conditional Branch
: Parallel Execution
section v2.0
Enterprise Features : Independent Web UI
: Multi-Tenancy Isolation
: Prometheus Integration
# Basic installation
pip install neotask
# With Redis distributed support
pip install neotask[redis]
# Full installation
pip install neotask[full]from neotask import TaskPool
async def process(data):
return {"result": "done", "data": data}
# Create task pool
pool = TaskPool(executor=process)
# Submit task
task_id = pool.submit({"id": 123})
# Wait for result
result = pool.wait_for_result(task_id)
pool.shutdown()from neotask import TaskScheduler
scheduler = TaskScheduler(executor=process)
# Execute after 60 seconds delay
scheduler.submit_delayed({"id": 123}, delay_seconds=60)
# Execute every 5 minutes
scheduler.submit_interval({"id": 123}, interval_seconds=300)
# Execute daily at 9 AM
scheduler.submit_cron({"id": 123}, "0 9 * * *")
scheduler.shutdown()with TaskPool(executor=process) as pool:
task_id = pool.submit({"id": 123})
result = pool.wait_for_result(task_id)from neotask import TaskPool
async def on_task_created(event):
print(f"Task created: {event.task_id}")
async def on_task_completed(event):
print(f"Task completed: {event.task_id}, Result: {event.data}")
async def on_task_failed(event):
print(f"Task failed: {event.task_id}, Error: {event.data}")
pool = TaskPool(executor=my_executor)
pool.start()
# Register event callbacks
pool.on_created(on_task_created)
pool.on_completed(on_task_completed)
pool.on_failed(on_task_failed)
task_id = pool.submit({"test": "event"})
result = pool.wait_for_result(task_id)| Method | Description |
|---|---|
pool.submit(data, priority=2, delay=0) |
Submit task |
pool.wait_for_result(task_id, timeout=300) |
Wait for result |
pool.get_status(task_id) |
Get status |
pool.cancel(task_id) |
Cancel task |
scheduler.submit_delayed(data, delay) |
Delayed task |
scheduler.submit_interval(data, interval) |
Periodic task |
scheduler.submit_cron(data, cron) |
Cron task |
engine.submit_workflow(definition) |
Submit workflow (v1.5) |
engine.wait_workflow(execution_id) |
Wait for workflow (v1.5) |
Detailed API documentation can be found here
from neotask import TaskPool, TaskPoolConfig
config = TaskPoolConfig(
worker_concurrency=10, # Number of concurrent workers
max_retries=3, # Number of retries
storage_type="sqlite", # Storage type
)
pool = TaskPool(executor=process, config=config)Detailed usage examples can be found here
# Clone repository
git clone https://github.com/neopen/neotask.git
cd neotask
# Create virtual environment
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
# Install development dependencies
pip install -e ".[dev]"
# Run tests
pytest tests/
# View test coverage
pytest --cov=neotask tests/
# Run specific module tests
pytest tests/test_task_pool.py -v
pytest tests/test_task_scheduler.py -vneotask/
├── api/ # TaskPool, TaskScheduler
├── core/ # Lifecycle, Queue, Worker
├── workflow/ # Workflow Engine
├── engine/ # Task Orchestration
├── executor/ # Async Execution Engine
├── storage/ # Memory/SQLite/Redis
├── event/ # Event Bus
└── models/ # Data Models
Welcome to submit Issues and Pull Requests
- Fork the project
- Create a feature branch (
git checkout -b feature/amazing) - Commit changes (
git commit -m 'Add amazing feature') - Push branch (
git push origin feature/amazing) - Submit Pull Request
- Follow PEP 8 code style
- Add appropriate type annotations
- Write unit tests for new features (coverage ≥ 80%)
- Update relevant documentation and example code
- Commit messages follow Conventional Commits
# Run all tests
pytest tests/
# Run specific module tests
pytest tests/unit/test_task.py
# Run manual tests
python examples/01_simple.py
python examples/05_webui.py
- Submit Issue: https://github.com/neopen/neotask/issues
- Feature Suggestions: Use Enhancement label
- Bug Reports: Use Bug label and provide reproduction steps
- Security Vulnerabilities: Please send email directly to the author's email
MIT License © 2026 NeoPen
Thanks to all contributors and the open source community for their support.