[bugfix] ensure predict threads are joined on exception#433
Conversation
Wrap prediction loop in try/finally to ensure forward and write threads are properly signaled and joined even when exceptions occur, preventing potential resource leaks and hanging processes. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
tzrec/main.py
Outdated
| for t in forward_t_list: | ||
| t.join() |
There was a problem hiding this comment.
Bug risk: join() without timeout can hang the process indefinitely
If a forward thread is blocked on pred_queue.put() (queue full because write thread stopped consuming), the sentinel None on data_queue won't unblock it. t.join() will then block forever. In a distributed job this hangs all ranks.
Consider adding a timeout:
| for t in forward_t_list: | |
| t.join() | |
| for t in forward_t_list: | |
| t.join(timeout=PREDICT_QUEUE_TIMEOUT) |
tzrec/main.py
Outdated
| pred_queue.put((None, None), timeout=PREDICT_QUEUE_TIMEOUT) | ||
| except Exception: | ||
| pass | ||
| write_t.join() |
There was a problem hiding this comment.
Same concern — add a timeout to prevent indefinite hang if the sentinel put above failed silently:
| write_t.join() | |
| write_t.join(timeout=PREDICT_QUEUE_TIMEOUT) |
tzrec/main.py
Outdated
| except Exception: | ||
| pass |
There was a problem hiding this comment.
Nit: Silently swallowing exceptions makes cleanup hangs very hard to diagnose in production. Consider logging instead of bare pass:
except Exception:
logger.warning("Failed to send sentinel to data_queue during cleanup", exc_info=True)Same for the pred_queue.put handler at line 1242.
Code Review SummaryGood bugfix — wrapping the prediction loop in Issues to address1. If a forward thread is blocked on 2. The function at line ~1412–1439 uses the same thread pattern ( Minor suggestions
🤖 Generated with Claude Code |
predict_checkpoint had the same vulnerability as predict: threads and writer not cleaned up on exception, plus assert on write_t that would crash if no steps ran. Apply the same try/finally pattern. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
tzrec/main.py
Outdated
| for t in forward_t_list: | ||
| t.join() |
There was a problem hiding this comment.
Potential hang: t.join() has no timeout. If the sentinel put(None) above failed silently (caught by except Exception: pass), the worker thread never receives the stop signal and this join() blocks indefinitely. Same applies to write_t.join() on line 1251.
Consider adding a timeout and logging a warning:
| for t in forward_t_list: | |
| t.join() | |
| for t in forward_t_list: | |
| t.join(timeout=PREDICT_QUEUE_TIMEOUT) |
And after each join:
if t.is_alive():
logger.warning("Forward thread did not exit within timeout.")This bounds worst-case hang time and makes failures diagnosable. The same pattern should be applied to write_t.join() below and in predict_checkpoint (line 1454).
tzrec/main.py
Outdated
| try: | ||
| data_queue.put(None, timeout=PREDICT_QUEUE_TIMEOUT) | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
Silent failure → deadlock risk: If the sentinel put times out (e.g. queue is full because a downstream thread crashed), the exception is silently swallowed. The subsequent t.join() then blocks forever since the thread never got the stop signal.
At minimum, log the failure so hangs are diagnosable:
| try: | |
| data_queue.put(None, timeout=PREDICT_QUEUE_TIMEOUT) | |
| except Exception: | |
| pass | |
| try: | |
| data_queue.put(None, timeout=PREDICT_QUEUE_TIMEOUT) | |
| except Exception: | |
| logger.warning("Failed to send sentinel to data_queue during cleanup.", exc_info=True) |
Same applies to the pred_queue sentinel put on lines 1247-1250 and in predict_checkpoint (lines 1449-1453).
Code Review SummaryOverall: Good bugfix — wrapping prediction loops in Actionable Feedback
Non-blocking Observations (pre-existing, not introduced by this PR)
🤖 Generated with Claude Code |
If Thread.start() raises RuntimeError (e.g., OS can't create thread), write_t would be non-None but never started. In the finally cleanup, write_t.join() would raise RuntimeError, causing writer.close() to be skipped — leaking the writer resource. Fix: assign write_t only after start() succeeds, matching the pattern already used for forward threads via forward_t_list.append(). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
tzrec/main.py
Outdated
| except Exception: | ||
| pass | ||
| for t in forward_t_list: | ||
| t.join() |
There was a problem hiding this comment.
join() without timeout can hang forever — If the sentinel put above fails (caught by except Exception: pass), the thread never sees its termination signal and join() blocks indefinitely. This creates a new deadlock path in the very code meant to fix deadlocks.
Consider adding a timeout and logging:
| t.join() | |
| t.join(timeout=PREDICT_QUEUE_TIMEOUT) | |
| if t.is_alive(): | |
| logger.warning("Forward thread %s did not terminate in time.", t.name) |
tzrec/main.py
Outdated
| pred_queue.put((None, None), timeout=PREDICT_QUEUE_TIMEOUT) | ||
| except Exception: | ||
| pass | ||
| write_t.join() |
There was a problem hiding this comment.
Same concern here — write_t.join() without a timeout can hang if the sentinel didn't reach the write thread.
| write_t.join() | |
| write_t.join(timeout=PREDICT_QUEUE_TIMEOUT) | |
| if write_t.is_alive(): | |
| logger.warning("Write thread did not terminate in time.") |
tzrec/main.py
Outdated
| except Exception: | ||
| pass |
There was a problem hiding this comment.
Bare except Exception: pass silently swallows the sentinel enqueue failure, giving zero signal that a thread will never see its termination signal. At minimum, log a warning so operators can diagnose hangs:
| except Exception: | |
| pass | |
| except Exception: | |
| logger.warning("Failed to send sentinel to data_queue", exc_info=True) |
tzrec/main.py
Outdated
| except Exception: | ||
| pass |
There was a problem hiding this comment.
Same here — log instead of silently passing:
| except Exception: | |
| pass | |
| except Exception: | |
| logger.warning("Failed to send sentinel to pred_queue", exc_info=True) |
tzrec/main.py
Outdated
| pred_queue.put((None, None), timeout=PREDICT_QUEUE_TIMEOUT) | ||
| except Exception: | ||
| pass | ||
| write_t.join() |
There was a problem hiding this comment.
Same two issues as the predict function: join() needs a timeout, and the except Exception: pass above should log.
| write_t.join() | |
| write_t.join(timeout=PREDICT_QUEUE_TIMEOUT) | |
| if write_t.is_alive(): | |
| logger.warning("Write thread did not terminate in time.") |
Code Review SummaryOverall: This is a well-motivated bugfix. The Key Finding:
|
Replace silent except-pass with warning logs so cleanup failures are visible. Helps debugging when threads are stuck or queues are broken during error-path cleanup. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…eption writer.close() can fail in real scenarios (ODPS commit error, disk full during flush, network drop). When called unprotected in a finally block, such failures replace the original exception that triggered cleanup, making root-cause debugging harder. Wrap writer.close() in try/except with a warning log, consistent with the queue sentinel cleanup. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
tzrec/main.py
Outdated
| for t in forward_t_list: | ||
| t.join() | ||
| if write_t is not None: | ||
| try: | ||
| pred_queue.put((None, None), timeout=PREDICT_QUEUE_TIMEOUT) | ||
| except Exception as e: | ||
| logger.warning(f"Failed to send sentinel to pred_queue: {e}") | ||
| write_t.join() |
There was a problem hiding this comment.
Potential indefinite hang if sentinel delivery fails.
When pred_queue.put((None, None)) fails (caught on line 1250), write_t.join() on line 1252 will block indefinitely — _write_loop is stuck waiting on pred_queue.get(timeout=PREDICT_QUEUE_TIMEOUT) and will only unblock after the queue timeout (default 600s). The same applies to the forward thread joins on line 1246 if data_queue.put(None) fails.
With multiple forward threads, this could block cleanup for up to N_threads * 600s sequentially.
Consider adding a timeout to join() calls:
for t in forward_t_list:
t.join(timeout=PREDICT_QUEUE_TIMEOUT + 30)
if t.is_alive():
logger.warning("Forward thread did not exit within timeout")
tzrec/main.py
Outdated
| if write_t is not None: | ||
| try: | ||
| pred_queue.put((None, None), timeout=PREDICT_QUEUE_TIMEOUT) | ||
| except Exception as e: | ||
| logger.warning(f"Failed to send sentinel to pred_queue: {e}") | ||
| write_t.join() |
There was a problem hiding this comment.
Same concern here — write_t.join() on line 1459 has no timeout. If the sentinel put fails, the join blocks until _write_loop's queue get times out (default 600s).
Code Review SummaryGood bugfix — the original code had a real resource-leak risk where threads and the writer would never be cleaned up if an exception occurred mid-prediction. The What looks good
Issues to consider1. If a sentinel 2. No test coverage for the exception cleanup paths The entire purpose of this PR — exception-path thread cleanup — has no test coverage. Existing tests only cover the happy path via subprocess-based integration tests. A unit test that mocks the model to raise mid-prediction and verifies all threads are joined and 🤖 Generated with Claude Code |
…angs Per PR review feedback: t.join() and write_t.join() without a timeout can hang the cleanup path indefinitely if: - The sentinel put failed (caught and logged), so the worker never sees its termination signal - The worker is stuck in a non-timed operation (CUDA op, network call, blocking writer flush) In a distributed job, hanging cleanup hangs all ranks. Add bounded join(timeout=PREDICT_QUEUE_TIMEOUT) and log a warning if the thread is still alive after the timeout, so operators can diagnose the leak. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
tzrec/main.pyto ensure forward and write threads are properly cleaned up on exceptionTest plan
🤖 Generated with Claude Code