Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions crates/rustapi-jobs/src/backend/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,26 @@ impl JobBackend for InMemoryBackend {
.lock()
.map_err(|_| JobError::BackendError("Lock poisoned".to_string()))?;

// Simple FIFO for now, ignoring run_at logic complexity for basic in-memory
// In reality we should scan for ready jobs
if let Some(job) = q.front() {
let now = chrono::Utc::now();
let mut index_to_remove = None;

// Scan the queue for the first ready job
for (i, job) in q.iter().enumerate() {
if let Some(run_at) = job.run_at {
if run_at > chrono::Utc::now() {
return Ok(None);
if run_at > now {
continue;
}
}
} else {
return Ok(None);
// Found a ready job (no run_at, or run_at <= now)
index_to_remove = Some(i);
break;
}

Ok(q.pop_front())
if let Some(i) = index_to_remove {
Ok(q.remove(i))
} else {
Ok(None)
}
}

async fn complete(&self, _job_id: &str) -> Result<()> {
Expand Down
71 changes: 71 additions & 0 deletions crates/rustapi-jobs/tests/repro_blocking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use async_trait::async_trait;
use rustapi_jobs::{EnqueueOptions, InMemoryBackend, Job, JobContext, JobQueue, Result};
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};
use std::time::Duration;

#[derive(Debug, Clone, Serialize, Deserialize)]
struct SimpleJobData {
id: i32,
}

#[derive(Clone)]
struct SimpleJob {
processed_ids: Arc<Mutex<Vec<i32>>>,
}

#[async_trait]
impl Job for SimpleJob {
const NAME: &'static str = "simple_job";
type Data = SimpleJobData;

async fn execute(&self, _ctx: JobContext, data: Self::Data) -> Result<()> {
self.processed_ids.lock().unwrap().push(data.id);
Ok(())
}
}

#[tokio::test]
async fn test_head_of_line_blocking() {
let backend = InMemoryBackend::new();
let queue = JobQueue::new(backend);

let processed_ids = Arc::new(Mutex::new(Vec::new()));
let job = SimpleJob {
processed_ids: processed_ids.clone(),
};

queue.register_job(job).await;

// 1. Enqueue a job scheduled far in the future (Job 1)
let opts_future = EnqueueOptions::new().delay(Duration::from_secs(3600));
queue
.enqueue_opts::<SimpleJob>(SimpleJobData { id: 1 }, opts_future)
.await
.unwrap();

// 2. Enqueue a job scheduled now (Job 2)
queue
.enqueue::<SimpleJob>(SimpleJobData { id: 2 })
.await
.unwrap();

// 3. Attempt to process one job.
// Job 2 should be picked up because Job 1 is not ready.
let result = queue.process_one().await.unwrap();

// Verify
if result {
// If it processed something, it MUST be Job 2
let ids = processed_ids.lock().unwrap().clone();
assert_eq!(ids.len(), 1);
assert_eq!(
ids[0], 2,
"Should have processed Job 2, but processed {:?}",
ids
);
} else {
// If it returned false, it means it was blocked by Job 1
panic!("Head-of-line blocking detected! Failed to process Job 2 because Job 1 is blocking the queue.");
}
}
Loading