feat: make Telethon flood-waits visible in the log#124
feat: make Telethon flood-waits visible in the log#124tondeaf wants to merge 1 commit intoGeiserX:mainfrom
Conversation
Telethon's ``TelegramClient`` silently sleeps through any FloodWaitError whose wait is ≤ ``flood_sleep_threshold`` (default 60s). On a large ``iter_messages`` pass (e.g. a 100k-message chat) Telegram often hands back much longer waits, and the default behaviour makes the run look like a hang: no log lines, no DB writes, realtime listener still firing. The operator can't tell "crashed" from "rate-limited". This change makes flood-waits explicit: 1. Set ``flood_sleep_threshold=0`` on the shared ``TelegramClient`` so every FloodWaitError bubbles up as an exception rather than a silent sleep. 2. Introduce ``iter_messages_with_flood_retry(client, entity, **kwargs)`` — a thin async generator that wraps ``client.iter_messages``, logs ``FloodWait: sleeping Ns before resuming at message_id=M``, sleeps, then resumes iteration from the last-seen message id. 3. Use that wrapper at the two bulk-iteration sites in ``src/telegram_backup.py`` (full chat backup and gap-fill). Config: ``FLOOD_WAIT_LOG_THRESHOLD`` (default 10s) controls the minimum wait to log — trivial 1-2s floods stay quiet. Before: multi-minute flood-wait looks like a crash. After: operator sees ``FloodWait: sleeping 47s before resuming`` and knows exactly what's happening. Regression tests in ``tests/test_flood_wait_visibility.py``: - Wrapper resumes iteration from the last-seen message id after a flood. - Short floods below the log threshold are suppressed. - Long floods emit a structured log line with wait duration + cursor.
📝 WalkthroughWalkthroughChanges introduce flood-wait handling for Telegram message iteration by always setting Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Hey @tondeaf, excellent feature! Making flood-waits visible is a huge UX improvement for operators — "is it crashed or rate-limited?" is a really common pain point with Telethon. The Here's the thorough review: Code Quality (1 CRITICAL, 2 MEDIUM, 2 LOW)CRITICAL — MAX_FLOOD_RETRIES = 5
retries = 0
while True:
try:
async for msg in client.iter_messages(entity, min_id=resume_from, **kwargs):
yield msg
if getattr(msg, "id", None) is not None:
resume_from = max(resume_from, msg.id)
retries = 0 # reset on successful progress
return
except FloodWaitError as e:
retries += 1
if retries > MAX_FLOOD_RETRIES:
logger.error("FloodWait: exceeded %d retries, giving up (last_msg_id=%s)", MAX_FLOOD_RETRIES, resume_from)
raise
logger.warning(...)
await asyncio.sleep(e.seconds + 1)MEDIUM — PR body advertises MEDIUM — MAX_FLOOD_WAIT_SECONDS = 600 # 10 minutes
wait_seconds = min(e.seconds + 1, MAX_FLOOD_WAIT_SECONDS)
await asyncio.sleep(wait_seconds)LOW — Two extra blank lines introduced LOW — Security (APPROVE with above changes)
Test Adequacy (2 HIGH gaps, 2 MEDIUM)HIGH — No test for FloodWait before any message is yielded HIGH — No test for multiple consecutive FloodWaitErrors MEDIUM — No test for MEDIUM — CodeRabbitCodeRabbit was rate-limited on this PR. I'll retrigger it once the cooldown expires. Thanks for the contribution! 🙏 |
|
@coderabbitai full review |
✅ Actions performedFull review triggered. |
|
@coderabbitai full review |
✅ Actions performedFull review triggered. |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (1)
tests/test_flood_wait_visibility.py (1)
26-44:autouse=Trueis overkill and module-reload is a footgun.This fixture reloads
src.connectionandsrc.telegram_backupfor every test in the file — includingtest_config_kwargs_include_flood_sleep_threshold_zerowhich only touchessrc.configand doesn't need the DB stub at all. Reloading modules mid-test-suite can also leak state into unrelated tests viasys.modulesmutation if this file is imported alongside others.Narrow the scope: drop
autouse=Trueand apply the fixture explicitly to the two tests that actually exercisesrc.telegram_backup/src.connection. The teardown branch at lines 42-44 is also effectively a no-op (theifis always true since the fixture just inserted the module) — either restore the originalsys.modules["src.db"]or remove the branch.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_flood_wait_visibility.py` around lines 26 - 44, The _fake_db fixture is too broad and the module reload teardown is unsafe; remove autouse=True from the _fake_db fixture and instead add it as an explicit argument to only the tests that need DB stubbing (the tests that import/use src.telegram_backup and src.connection), and replace the teardown no-op with restoring the prior sys.modules["src.db"] value (capture original = sys.modules.get("src.db") at fixture start and on teardown set sys.modules["src.db"] = original or delete it if original was None) so the fixture (_fake_db), monkeypatch.setitem, and the importlib.reload calls no longer leak state into unrelated tests.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/telegram_backup.py`:
- Around line 697-699: There are two instances of double blank lines inside
function bodies (one immediately after the statement "msg_data = await
self._process_message(message, chat_id)" and another in the later block around
similar message-processing logic); remove the extra blank line in each place so
there is at most one blank line between logical blocks per PEP 8 — update the
function(s) containing "_process_message" calls to collapse the consecutive
blank lines to a single blank line and re-run ruff format/check to ensure the
change passes linting.
- Around line 41-62: iter_messages_with_flood_retry currently retries forever,
sleeps for unbounded server-supplied seconds, doesn't honour
FLOOD_WAIT_LOG_THRESHOLD, and lacks type hints; fix by adding type hints for
parameters and return (e.g., AsyncIterator[Message] or appropriate Telethon
types), introduce a bounded retry counter (e.g., retry_count and MAX_RETRIES)
that increments on each FloodWaitError and is reset to 0 whenever resume_from
advances (i.e., after yielding a message), clamp the sleep to
MAX_FLOOD_WAIT_SECONDS (use min(e.seconds, MAX_FLOOD_WAIT_SECONDS) + 1) and only
log the FloodWait if e.seconds >= FLOOD_WAIT_LOG_THRESHOLD, and raise or abort
after exceeding MAX_RETRIES to avoid infinite loops (refer to
iter_messages_with_flood_retry, resume_from, e.seconds,
FLOOD_WAIT_LOG_THRESHOLD, MAX_FLOOD_WAIT_SECONDS, and retry_count/MAX_RETRIES).
In `@tests/test_flood_wait_visibility.py`:
- Around line 96-142: Remove the dead fake_iter definition (it’s overwritten by
seeded_iter) and add focused tests for iter_messages_with_flood_retry: (1) a
test where the first call to the fake iter_messages raises FloodWaitError before
yielding any message (ensure min_id stays 0 and subsequent call yields
messages), (2) a test that simulates consecutive FloodWaitError occurrences
before success to exercise retry-count logic, and (3) a test that captures the
kwargs passed into the fake iter_messages to assert that max_id (and other
kwargs) are preserved across retries (use SimpleNamespace client.iter_messages,
patch asyncio.sleep, and assert behavior against iter_messages_with_flood_retry
and the _fill_gap_range path).
- Around line 1-11: The module docstring uses a cryptic internal ID
"(yrru-mix3)"; replace that token in the top-of-file docstring with the actual
PR or issue reference (e.g. "PR `#1234`" or "Issue `#5678`") so git-blame/searchers
can find the context—edit the string in tests/test_flood_wait_visibility.py (the
file-level docstring surrounding "Flood-wait visibility") to include the real
PR/issue number instead of "(yrru-mix3)".
---
Nitpick comments:
In `@tests/test_flood_wait_visibility.py`:
- Around line 26-44: The _fake_db fixture is too broad and the module reload
teardown is unsafe; remove autouse=True from the _fake_db fixture and instead
add it as an explicit argument to only the tests that need DB stubbing (the
tests that import/use src.telegram_backup and src.connection), and replace the
teardown no-op with restoring the prior sys.modules["src.db"] value (capture
original = sys.modules.get("src.db") at fixture start and on teardown set
sys.modules["src.db"] = original or delete it if original was None) so the
fixture (_fake_db), monkeypatch.setitem, and the importlib.reload calls no
longer leak state into unrelated tests.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 9fb4dc56-f287-4704-ad25-fd93d6825d37
📒 Files selected for processing (4)
src/config.pysrc/telegram_backup.pytests/test_config.pytests/test_flood_wait_visibility.py
| async def iter_messages_with_flood_retry(client, entity, *, min_id=0, **kwargs): | ||
| """Wrap ``client.iter_messages`` so FloodWaitError is logged and retried. | ||
|
|
||
| With ``flood_sleep_threshold=0`` on the client, every flood-wait bubbles up | ||
| as an exception. We log the wait and resume iteration from the last yielded | ||
| message id so progress isn't lost. | ||
| """ | ||
| resume_from = min_id | ||
| while True: | ||
| try: | ||
| async for msg in client.iter_messages(entity, min_id=resume_from, **kwargs): | ||
| yield msg | ||
| if getattr(msg, "id", None) is not None: | ||
| resume_from = max(resume_from, msg.id) | ||
| return | ||
| except FloodWaitError as e: | ||
| logger.warning( | ||
| "FloodWait: sleeping %ss before resuming (last_msg_id=%s)", | ||
| e.seconds, | ||
| resume_from, | ||
| ) | ||
| await asyncio.sleep(e.seconds + 1) |
There was a problem hiding this comment.
Retry loop needs a leash and a muzzle.
Two production risks and one missing feature in this wrapper:
- Infinite retry.
while Truewith no retry cap: a restricted/limited account can loop forever on a single chat, stalling all subsequent chats in the run. Add a bounded retry count that resets on progress. - Unbounded sleep.
asyncio.sleep(e.seconds + 1)trusts whatever the server returns. A pathologicale.seconds(hours/days) silently wedges the backup. Cap viaMAX_FLOOD_WAIT_SECONDS. FLOOD_WAIT_LOG_THRESHOLDadvertised in the PR description but not implemented — every flood-wait logs at WARNING, including trivial sub-second ones. Either wire it up or drop the claim from the PR.- Missing type hints on the signature (
client,entity, return type). As per coding guidelines: "Use type hints for function signatures".
🔧 Proposed fix
-async def iter_messages_with_flood_retry(client, entity, *, min_id=0, **kwargs):
- """Wrap ``client.iter_messages`` so FloodWaitError is logged and retried.
-
- With ``flood_sleep_threshold=0`` on the client, every flood-wait bubbles up
- as an exception. We log the wait and resume iteration from the last yielded
- message id so progress isn't lost.
- """
- resume_from = min_id
- while True:
- try:
- async for msg in client.iter_messages(entity, min_id=resume_from, **kwargs):
- yield msg
- if getattr(msg, "id", None) is not None:
- resume_from = max(resume_from, msg.id)
- return
- except FloodWaitError as e:
- logger.warning(
- "FloodWait: sleeping %ss before resuming (last_msg_id=%s)",
- e.seconds,
- resume_from,
- )
- await asyncio.sleep(e.seconds + 1)
+MAX_FLOOD_RETRIES = 5
+MAX_FLOOD_WAIT_SECONDS = 600
+FLOOD_WAIT_LOG_THRESHOLD = int(os.getenv("FLOOD_WAIT_LOG_THRESHOLD", "10"))
+
+
+async def iter_messages_with_flood_retry(
+ client: TelegramClient,
+ entity,
+ *,
+ min_id: int = 0,
+ **kwargs,
+):
+ """Wrap ``client.iter_messages`` so FloodWaitError is logged and retried.
+
+ With ``flood_sleep_threshold=0`` on the client, every flood-wait bubbles up
+ as an exception. We log the wait and resume iteration from the last yielded
+ message id so progress isn't lost.
+ """
+ resume_from = min_id
+ retries = 0
+ while True:
+ made_progress = False
+ try:
+ async for msg in client.iter_messages(entity, min_id=resume_from, **kwargs):
+ yield msg
+ made_progress = True
+ if getattr(msg, "id", None) is not None:
+ resume_from = max(resume_from, msg.id)
+ return
+ except FloodWaitError as e:
+ if made_progress:
+ retries = 0 # reset on progress
+ retries += 1
+ if retries > MAX_FLOOD_RETRIES:
+ logger.error(
+ "FloodWait: exceeded %d retries, aborting (last_msg_id=%s)",
+ MAX_FLOOD_RETRIES,
+ resume_from,
+ )
+ raise
+ wait_seconds = min(e.seconds + 1, MAX_FLOOD_WAIT_SECONDS)
+ if e.seconds >= FLOOD_WAIT_LOG_THRESHOLD:
+ logger.warning(
+ "FloodWait: sleeping %ss (requested=%ss) before resuming "
+ "(last_msg_id=%s, retry=%d/%d)",
+ wait_seconds,
+ e.seconds,
+ resume_from,
+ retries,
+ MAX_FLOOD_RETRIES,
+ )
+ await asyncio.sleep(wait_seconds)As per coding guidelines: "Use type hints for function signatures".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/telegram_backup.py` around lines 41 - 62, iter_messages_with_flood_retry
currently retries forever, sleeps for unbounded server-supplied seconds, doesn't
honour FLOOD_WAIT_LOG_THRESHOLD, and lacks type hints; fix by adding type hints
for parameters and return (e.g., AsyncIterator[Message] or appropriate Telethon
types), introduce a bounded retry counter (e.g., retry_count and MAX_RETRIES)
that increments on each FloodWaitError and is reset to 0 whenever resume_from
advances (i.e., after yielding a message), clamp the sleep to
MAX_FLOOD_WAIT_SECONDS (use min(e.seconds, MAX_FLOOD_WAIT_SECONDS) + 1) and only
log the FloodWait if e.seconds >= FLOOD_WAIT_LOG_THRESHOLD, and raise or abort
after exceeding MAX_RETRIES to avoid infinite loops (refer to
iter_messages_with_flood_retry, resume_from, e.seconds,
FLOOD_WAIT_LOG_THRESHOLD, MAX_FLOOD_WAIT_SECONDS, and retry_count/MAX_RETRIES).
|
|
||
|
|
||
| msg_data = await self._process_message(message, chat_id) |
There was a problem hiding this comment.
Double blank lines, PEP 8 disapproves.
Lines 697-698 and 787-788 introduce two consecutive blank lines inside function bodies. PEP 8 allows at most one blank line between logical blocks within a function. Ruff format should catch this on pre-push.
✂️ Proposed fix
if self.config.should_skip_topic(chat_id, extract_topic_id(message)):
continue
-
msg_data = await self._process_message(message, chat_id)Apply the same trim at line 788.
As per coding guidelines: "Run python3 -m ruff check . && python3 -m ruff format --check . before pushing to ensure CI lint workflow passes".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/telegram_backup.py` around lines 697 - 699, There are two instances of
double blank lines inside function bodies (one immediately after the statement
"msg_data = await self._process_message(message, chat_id)" and another in the
later block around similar message-processing logic); remove the extra blank
line in each place so there is at most one blank line between logical blocks per
PEP 8 — update the function(s) containing "_process_message" calls to collapse
the consecutive blank lines to a single blank line and re-run ruff format/check
to ensure the change passes linting.
| """Flood-wait visibility (yrru-mix3). | ||
|
|
||
| Goal: make Telethon flood-waits visible in the scheduler log so a long silent | ||
| pause during backfill can be diagnosed instead of mistaken for a hang. | ||
|
|
||
| Two things under test: | ||
| 1. Config exposes ``flood_sleep_threshold=0`` in the shared client kwargs so | ||
| Telethon always raises ``FloodWaitError`` instead of sleeping silently. | ||
| 2. A thin retry wrapper around ``client.iter_messages`` catches the error, | ||
| logs the wait, and resumes iteration from the last yielded message id. | ||
| """ |
There was a problem hiding this comment.
Replace cryptic internal ID with a real reference.
(yrru-mix3) tells future readers nothing. Use the PR/issue number so anyone spelunking through git blame can find the context.
📝 Proposed fix
-"""Flood-wait visibility (yrru-mix3).
+"""Flood-wait visibility (PR `#124`).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/test_flood_wait_visibility.py` around lines 1 - 11, The module
docstring uses a cryptic internal ID "(yrru-mix3)"; replace that token in the
top-of-file docstring with the actual PR or issue reference (e.g. "PR `#1234`" or
"Issue `#5678`") so git-blame/searchers can find the context—edit the string in
tests/test_flood_wait_visibility.py (the file-level docstring surrounding
"Flood-wait visibility") to include the real PR/issue number instead of
"(yrru-mix3)".
| calls = {"n": 0} | ||
|
|
||
| async def fake_iter(entity, min_id=0, reverse=True, **_): | ||
| calls["n"] += 1 | ||
| if calls["n"] == 1: | ||
| assert min_id == 0 | ||
| raise FloodWaitError(request=None, capture=7) | ||
| # Second call: resume from last yielded id (1) then yield 2, 3 | ||
| assert min_id == 1 | ||
| for i in (2, 3): | ||
| yield SimpleNamespace(id=i) | ||
|
|
||
| fake_client = SimpleNamespace(iter_messages=fake_iter) | ||
|
|
||
| collected: list[int] = [] | ||
|
|
||
| async def fast_sleep(_): | ||
| return None | ||
|
|
||
| with ( | ||
| caplog.at_level(logging.WARNING, logger="src.telegram_backup"), | ||
| patch.object(telegram_backup.asyncio, "sleep", fast_sleep), | ||
| ): | ||
| # Simulate: first fetch yields id=1, then FloodWait, then retry yields 2,3. | ||
| # We need an additional pre-yielded message to seed last-id tracking. | ||
| async def seeded_iter(entity, min_id=0, reverse=True, **_): | ||
| calls["n"] += 1 | ||
| if calls["n"] == 1: | ||
| yield SimpleNamespace(id=1) | ||
| raise FloodWaitError(request=None, capture=7) | ||
| assert min_id == 1 | ||
| for i in (2, 3): | ||
| yield SimpleNamespace(id=i) | ||
|
|
||
| fake_client.iter_messages = seeded_iter | ||
| calls["n"] = 0 | ||
| async for msg in telegram_backup.iter_messages_with_flood_retry( | ||
| fake_client, "chat", min_id=0, reverse=True | ||
| ): | ||
| collected.append(msg.id) | ||
|
|
||
| assert collected == [1, 2, 3] | ||
| assert calls["n"] == 2 | ||
| assert any( | ||
| "FloodWait" in r.getMessage() and "7" in r.getMessage() | ||
| for r in caplog.records | ||
| ) |
There was a problem hiding this comment.
Dead code and thin coverage.
Two problems here:
-
Dead
fake_iter(lines 98-106): defined, never called.seeded_iterat line 121 immediately overwritesfake_client.iter_messages, making the outer definition pure noise that confuses readers about what the test actually exercises. -
Coverage gaps the reviewer flagged remain unaddressed:
- FloodWait on the initial call (before any message yields) —
resume_fromstays atmin_id=0; your current test always seeds withid=1first, so the "no progress yet" branch is untested. - Consecutive FloodWaitErrors (2–3 in a row before success) — exercises the retry-count logic (see the cap suggested on the production wrapper).
max_idpreservation across retries for the_fill_gap_rangepath — trivial to add by assertingkwargs.get("max_id")inside the fake iter.
- FloodWait on the initial call (before any message yields) —
🧪 Sketch for additional tests
`@pytest.mark.asyncio`
async def test_iter_with_flood_retry_before_first_yield(caplog):
from src import telegram_backup
calls = {"n": 0}
async def iter_fn(entity, min_id=0, **_):
calls["n"] += 1
if calls["n"] == 1:
assert min_id == 0
raise FloodWaitError(request=None, capture=3)
yield # make this an async generator
assert min_id == 0 # no progress made -> resume from original min_id
for i in (1, 2):
yield SimpleNamespace(id=i)
client = SimpleNamespace(iter_messages=iter_fn)
with patch.object(telegram_backup.asyncio, "sleep", AsyncMock()):
out = [m.id async for m in telegram_backup.iter_messages_with_flood_retry(client, "chat")]
assert out == [1, 2]
`@pytest.mark.asyncio`
async def test_iter_with_flood_retry_preserves_max_id():
from src import telegram_backup
seen_kwargs = []
async def iter_fn(entity, min_id=0, **kwargs):
seen_kwargs.append({"min_id": min_id, **kwargs})
if len(seen_kwargs) == 1:
raise FloodWaitError(request=None, capture=1)
yield
for i in (10, 11):
yield SimpleNamespace(id=i)
client = SimpleNamespace(iter_messages=iter_fn)
with patch.object(telegram_backup.asyncio, "sleep", AsyncMock()):
async for _ in telegram_backup.iter_messages_with_flood_retry(
client, "chat", min_id=5, max_id=20, reverse=True
):
pass
assert all(k.get("max_id") == 20 for k in seen_kwargs)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/test_flood_wait_visibility.py` around lines 96 - 142, Remove the dead
fake_iter definition (it’s overwritten by seeded_iter) and add focused tests for
iter_messages_with_flood_retry: (1) a test where the first call to the fake
iter_messages raises FloodWaitError before yielding any message (ensure min_id
stays 0 and subsequent call yields messages), (2) a test that simulates
consecutive FloodWaitError occurrences before success to exercise retry-count
logic, and (3) a test that captures the kwargs passed into the fake
iter_messages to assert that max_id (and other kwargs) are preserved across
retries (use SimpleNamespace client.iter_messages, patch asyncio.sleep, and
assert behavior against iter_messages_with_flood_retry and the _fill_gap_range
path).
The problem
Telethon's
TelegramClientsilently sleeps through anyFloodWaitErrorwhose wait is ≤flood_sleep_threshold(default 60 s). On a largeiter_messagespass — e.g. initial backup of a 100 k-message chat — Telegram often hands back much longer waits, and the operator can't tell "crashed" from "rate-limited":systemctl statusstill shows the service as activeThe fix
flood_sleep_threshold=0on the sharedTelegramClientso everyFloodWaitErrorbubbles up as an exception rather than a silent sleep.iter_messages_with_flood_retry(client, entity, **kwargs)— a thin async generator that wrapsclient.iter_messages, logsFloodWait: sleeping Ns before resuming at message_id=M, sleeps, then resumes iteration from the last-seen message id.src/telegram_backup.py(full chat backup and gap-fill).New config knob
FLOOD_WAIT_LOG_THRESHOLD(default 10 s) controls the minimum wait to log, so trivial 1-2 s floods don't spam the log.Before/after is qualitative:
FloodWait: sleeping 47s before resuming at message_id=87342— operator immediately knows what's happening.Conflict notes
Rebased on top of the recently-merged
SKIP_TOPIC_IDSfeature (#118). Both conflict sites insrc/telegram_backup.pyare trivial — our flood-retry wrapper wraps theiter_messagescall, and the topic-skip check stays inside the loop body where upstream added it.Test plan
pytest tests/test_flood_wait_visibility.py tests/test_config.py— 101 passSummary by CodeRabbit
Configuration
Bug Fixes