diff --git a/crates/rustapi-jobs/src/backend/memory.rs b/crates/rustapi-jobs/src/backend/memory.rs index a71addd..eaabd0b 100644 --- a/crates/rustapi-jobs/src/backend/memory.rs +++ b/crates/rustapi-jobs/src/backend/memory.rs @@ -34,19 +34,26 @@ impl JobBackend for InMemoryBackend { .lock() .map_err(|_| JobError::BackendError("Lock poisoned".to_string()))?; - // Simple FIFO for now, ignoring run_at logic complexity for basic in-memory - // In reality we should scan for ready jobs - if let Some(job) = q.front() { + let now = chrono::Utc::now(); + let mut index_to_remove = None; + + // Scan the queue for the first ready job + for (i, job) in q.iter().enumerate() { if let Some(run_at) = job.run_at { - if run_at > chrono::Utc::now() { - return Ok(None); + if run_at > now { + continue; } } - } else { - return Ok(None); + // Found a ready job (no run_at, or run_at <= now) + index_to_remove = Some(i); + break; } - Ok(q.pop_front()) + if let Some(i) = index_to_remove { + Ok(q.remove(i)) + } else { + Ok(None) + } } async fn complete(&self, _job_id: &str) -> Result<()> { diff --git a/crates/rustapi-jobs/tests/repro_blocking.rs b/crates/rustapi-jobs/tests/repro_blocking.rs new file mode 100644 index 0000000..40da453 --- /dev/null +++ b/crates/rustapi-jobs/tests/repro_blocking.rs @@ -0,0 +1,71 @@ +use async_trait::async_trait; +use rustapi_jobs::{EnqueueOptions, InMemoryBackend, Job, JobContext, JobQueue, Result}; +use serde::{Deserialize, Serialize}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct SimpleJobData { + id: i32, +} + +#[derive(Clone)] +struct SimpleJob { + processed_ids: Arc>>, +} + +#[async_trait] +impl Job for SimpleJob { + const NAME: &'static str = "simple_job"; + type Data = SimpleJobData; + + async fn execute(&self, _ctx: JobContext, data: Self::Data) -> Result<()> { + self.processed_ids.lock().unwrap().push(data.id); + Ok(()) + } +} + +#[tokio::test] +async fn test_head_of_line_blocking() { + let backend = InMemoryBackend::new(); + let queue = JobQueue::new(backend); + + let processed_ids = Arc::new(Mutex::new(Vec::new())); + let job = SimpleJob { + processed_ids: processed_ids.clone(), + }; + + queue.register_job(job).await; + + // 1. Enqueue a job scheduled far in the future (Job 1) + let opts_future = EnqueueOptions::new().delay(Duration::from_secs(3600)); + queue + .enqueue_opts::(SimpleJobData { id: 1 }, opts_future) + .await + .unwrap(); + + // 2. Enqueue a job scheduled now (Job 2) + queue + .enqueue::(SimpleJobData { id: 2 }) + .await + .unwrap(); + + // 3. Attempt to process one job. + // Job 2 should be picked up because Job 1 is not ready. + let result = queue.process_one().await.unwrap(); + + // Verify + if result { + // If it processed something, it MUST be Job 2 + let ids = processed_ids.lock().unwrap().clone(); + assert_eq!(ids.len(), 1); + assert_eq!( + ids[0], 2, + "Should have processed Job 2, but processed {:?}", + ids + ); + } else { + // If it returned false, it means it was blocked by Job 1 + panic!("Head-of-line blocking detected! Failed to process Job 2 because Job 1 is blocking the queue."); + } +}