Skip to content

Comments

Fix/thread safety and logging#4540

Open
Waqar53 wants to merge 6 commits intocrewAIInc:mainfrom
Waqar53:fix/thread-safety-and-logging
Open

Fix/thread safety and logging#4540
Waqar53 wants to merge 6 commits intocrewAIInc:mainfrom
Waqar53:fix/thread-safety-and-logging

Conversation

@Waqar53
Copy link

@Waqar53 Waqar53 commented Feb 20, 2026

Summary

Three targeted fixes improving thread-safety and debuggability in core utilities. 24 new tests, 0 failures, all ruff checks pass.


1. RPM Controller Thread-Safety & Timer Lifecycle Bugs

Files: utilities/rpm_controller.py, tests/utilities/test_rpm_controller.py (new)

Bug Fix
_wait_for_next_minute() slept 60s holding the lock, blocking all threads Wait outside lock using Event.wait(60)
_shutdown_flag read/written without lock (race condition) Replaced with threading.Event (atomic)
Timer could fire after stop_rpm_counter() Cancel under lock, check _shutdown_event
_lock could be None despite being required Always initialized via default_factory

Tests added: 9 new dedicated tests (previously 0 existed for RPMController)


2. RWLock Writer Starvation Fix

Files: utilities/rw_lock.py, tests/utilities/events/test_rw_lock.py

New readers could indefinitely starve waiting writers because r_acquire only checked if a writer held the lock, not if one was waiting.

Fix: Added _waiting_writers counter — standard practice for production RWLock implementations. r_acquire now blocks when _waiting_writers > 0. No behavior change when there are no waiting writers, so all 10 existing tests pass unchanged.

Tests added: 1 new test (test_writer_not_starved_by_continuous_readers)


3. Debug Logging for OutputParser Retries (Closes #4246)

Files: utilities/agent_utils.py, tests/utilities/test_output_parser_logging.py (new)

Added logger.debug() to handle_output_parser_exception so every retry is logged at DEBUG level, regardless of verbose setting or iteration count.

Previously, retry information was only visible when verbose=True AND iterations exceeded log_error_after (default 3), making it nearly impossible to debug retry loops in production.

Tests added: 4 new tests


Verification

# RPM Controller (9 tests)
pytest lib/crewai/tests/utilities/test_rpm_controller.py -v

# RWLock (11 tests — 10 existing + 1 new)
pytest lib/crewai/tests/utilities/events/test_rw_lock.py -v

# OutputParser logging (4 tests)
pytest lib/crewai/tests/utilities/test_output_parser_logging.py -v

# Linting
ruff check lib/crewai/src/crewai/utilities/rpm_controller.py lib/crewai/src/crewai/utilities/rw_lock.py lib/crewai/src/crewai/utilities/agent_utils.py

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> **Medium Risk**
> Touches core concurrency and rate-limiting utilities used across agents/crew; changes are well-covered by new multithreaded regression tests but could still affect timing-sensitive behavior in production.
> 
> **Overview**
> Adds **DEBUG-level logging** to `handle_output_parser_exception` so every `OutputParserError` retry records the iteration and error message independent of `verbose` settings.
> 
> Refactors `RPMController` to be thread-safe and stoppable: always initializes `_lock`, replaces the shutdown flag with a `threading.Event`, moves the rate-limit wait outside the lock, and ensures timer reset/stop operations are synchronized to avoid races and limit bypass on concurrent wakeups.
> 
> Updates `RWLock` to prevent writer starvation by blocking new readers when writers are queued, and adds regression tests for the new logging behavior, RPM controller lifecycle/limit correctness under concurrency, and RWLock fairness.
> 
> <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 8ace304570a039f7e8c2322319707b546db22463. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->

Add _waiting_writers counter so new readers block when writers are
waiting for the lock. Without this, a continuous stream of readers
can indefinitely starve writers.

Changes:
- Add _waiting_writers field initialized to 0
- r_acquire now blocks when _waiting_writers > 0
- w_acquire increments _waiting_writers before waiting, decrements
  in finally block after acquiring
- Add test_writer_not_starved_by_continuous_readers test

This is standard practice for production RWLock implementations
and does not change behavior when there are no waiting writers.
Add logger.debug() call to handle_output_parser_exception so that
every OutputParserError retry is logged at DEBUG level, regardless
of the verbose setting or iteration count.

Previously, retry information was only visible when verbose=True AND
iterations exceeded log_error_after (default 3), making it nearly
impossible to debug retry loops in production.

The debug log includes the iteration number and error message,
enabling operators to trace agent retry behavior by setting
DEBUG level for crewai.utilities.agent_utils.

Closes crewAIInc#4246

Add 4 tests verifying:
- Debug log emission on parser error
- Error details included in log message
- Log emitted regardless of verbose setting
- Error still appended to messages list
Fix four concurrency issues in RPMController:

1. _wait_for_next_minute() blocked for 60s while holding the lock,
   starving all other threads. Now waits OUTSIDE the lock.

2. _shutdown_flag was a plain bool without synchronization. Replaced
   with threading.Event for atomic reads and wake-up signaling.

3. Timer callback could race with stop_rpm_counter() because
   _shutdown_flag was read outside the lock. Now all timer and
   shutdown operations are protected by _lock.

4. _lock was Optional (could be None) despite being required for
   correctness. Now always initialized via default_factory.

Add comprehensive test suite (9 tests) covering:
- Basic rate limiting and no-limit passthrough
- Timer reset and cancellation
- Concurrent request enforcement
- Shutdown unblocking waiting threads
- Lock availability during wait periods
When multiple threads hit the RPM limit and wait simultaneously,
they all wake up at the same time. Previously each thread set
_current_rpm = 1 (unconditional overwrite), allowing all N threads
through while the counter only reflected 1 request.

Fix: replace linear check-wait-assign with a while-loop that
re-checks the limit under the lock after waking. Threads that
lose the race loop back and wait again, ensuring _current_rpm
never exceeds max_rpm.

Add regression test: test_concurrent_wakeup_respects_limit
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

"""Acquire a read lock, blocking if a writer holds or is waiting for the lock."""
with self._cond:
while self._writer:
while self._writer or self._waiting_writers > 0:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nested read locks deadlock when writer is waiting

Medium Severity

The new _waiting_writers > 0 check in r_acquire introduces a deadlock for reentrant (nested) read locks under writer contention. If Thread A holds a read lock and Thread B calls w_acquire (incrementing _waiting_writers), Thread A's inner r_acquire blocks on _waiting_writers > 0, while Thread B blocks on _readers > 0 — neither can proceed. The existing test_nested_read_locks_same_thread test validates nested reads as a supported pattern but only passes because no writer is contending; under real-world contention it would deadlock.

Additional Locations (1)

Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Enhancement] Add debug logging when OutputParserError triggers agent retry

1 participant