Skip to content

Scheduled cron tasks don't execute when using ListRedisScheduleSource #105

@ZhymabekRoman

Description

@ZhymabekRoman

Cron tasks configured with the @taskiq_broker.task(schedule=[...]) decorator fail to execute when using ListRedisScheduleSource as the scheduler source. The same tasks work correctly with LabelScheduleSource.

Steps to reproduce:

  1. Clone and start the test project:
git clone https://github.com/build-on-ai/test_taskiq_project
cd test_taskiq_project/
sudo docker compose up
  1. Wait for the scheduled task to execute. You should see the "super big" ASCII art message printed every minute:
Expected output showing ASCII art

Expected behavior:
The heartbeat task should execute every minute (as defined by the */1 * * * * cron expression) regardless of which schedule source is used.

Actual behavior:

  • ✅ Tasks execute correctly with LabelScheduleSource (default)
  • ❌ Tasks do NOT execute with ListRedisScheduleSource

Configuration:
To switch between schedule sources, update TASKIQ_USE_LABEL_SCHEDULER in envs/dev.env:

  • TASKIQ_USE_LABEL_SCHEDULER=1 → Uses LabelScheduleSource (works)
  • TASKIQ_USE_LABEL_SCHEDULER=0 → Uses ListRedisScheduleSource (doesn't work)

Relevant code:

Task definition (test_taskiq_app/tasks.py)
from datetime import datetime, timezone

import logfire as logger

from test_taskiq_app.clients.taskiq import taskiq_broker, taskiq_scheduler


@taskiq_broker.task(schedule=[{"cron": "*/1 * * * *"}])
async def heartbeat() -> None:
    print(
        """
  sssss   u    u  pppp   eeeee  rrrrrr   ttttt
 s        u    u  p   p  e      r     r    t
 ssssss   u    u  pppp   eeeee  rrrrrr     t
      s   u    u  p      e      r   r      t
 sssss     uuuu   p      eeeee  r    r     t

  bbbbb   iiii  ggggg
  b    b   ii   g
  bbbbb    ii   g  ggg
  b    b   ii   g    g
  bbbbb   iiii   gggg
"""
    )


__all__ = ("taskiq_broker", "taskiq_scheduler", "heartbeat")
Broker and scheduler configuration (test_taskiq_app/clients/taskiq/base.py)
from typing import Any

from taskiq import TaskiqScheduler
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_redis import (
    ListRedisScheduleSource,
    RedisAsyncResultBackend,
    RedisStreamBroker,
)

from test_taskiq_app.config import cnf
from test_taskiq_app.utils.taskiq.otel_middleware import OtelMiddleware
import logfire as logger

redis_async_result: RedisAsyncResultBackend[Any] = RedisAsyncResultBackend(
    redis_url=cnf.redis.url,
    max_connection_pool_size=cnf.redis.max_connection_pool_size,
    prefix_str=f"{cnf.taskiq.queue_key}-results",
    result_ex_time=60 * 15,
    keep_results=False,
)

otel_middleware = OtelMiddleware()
middlewares = [otel_middleware]

taskiq_broker = (
    RedisStreamBroker(
        url=cnf.redis.url,
        queue_name=cnf.taskiq.queue_key,
        approximate=True,
        maxlen=cnf.taskiq.max_stream_length,
    )
    .with_middlewares(*middlewares)
    .with_result_backend(redis_async_result)
)

if cnf.taskiq.use_label_scheduler:
    scheduler_source = LabelScheduleSource(taskiq_broker)
    logger.info("Using LabelScheduleSource for TaskIQ scheduler.")
else:
    scheduler_source = ListRedisScheduleSource(
        url=cnf.redis.url,
        prefix=cnf.taskiq.queue_key,
        buffer_size=cnf.taskiq.schedule_buffer_size,
    )
    logger.info("Using ListRedisScheduleSource for TaskIQ scheduler.")

taskiq_scheduler = TaskiqScheduler(
    broker=taskiq_broker,
    sources=[scheduler_source],
)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions