Skip to content

Commit bdf8fe5

Browse files
committed
fix: add TTL to job records to prevent memory leak
Job records stored at {namespace}.jobs.{queue}.{pid} were accumulating indefinitely because no TTL was set. This adds: - TTL parameter to Connection interface set() and setArray() methods - TTL support in Redis connection using setex() - Configurable jobTtl property on Queue class (default: 24 hours) - Cleanup of old job records after retry re-enqueue
1 parent e551606 commit bdf8fe5

4 files changed

Lines changed: 16 additions & 6 deletions

File tree

src/Queue/Broker/Redis.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
4545
/**
4646
* Move Job to Jobs and it's PID to the processing list.
4747
*/
48-
$this->connection->setArray("{$queue->namespace}.jobs.{$queue->name}.{$message->getPid()}", $nextMessage);
48+
$this->connection->setArray("{$queue->namespace}.jobs.{$queue->name}.{$message->getPid()}", $nextMessage, $queue->jobTtl);
4949
$this->connection->leftPush("{$queue->namespace}.processing.{$queue->name}", $message->getPid());
5050

5151
/**
@@ -150,6 +150,12 @@ public function retry(Queue $queue, ?int $limit = null): void
150150
}
151151

152152
$this->enqueue($queue, $job->getPayload());
153+
154+
/**
155+
* Remove old job record after re-enqueueing to prevent memory leak.
156+
*/
157+
$this->connection->remove("{$queue->namespace}.jobs.{$queue->name}.{$pid}");
158+
153159
$processed++;
154160
}
155161
}

src/Queue/Connection.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ public function listSize(string $key): int;
1919
public function listRange(string $key, int $total, int $offset): array;
2020
public function remove(string $key): bool;
2121
public function move(string $queue, string $destination): bool;
22-
public function set(string $key, string $value): bool;
22+
public function set(string $key, string $value, int $ttl = 0): bool;
2323
public function get(string $key): array|string|null;
24-
public function setArray(string $key, array $value): bool;
24+
public function setArray(string $key, array $value, int $ttl = 0): bool;
2525
public function increment(string $key): int;
2626
public function decrement(string $key): int;
2727
public function ping(): bool;

src/Queue/Connection/Redis.php

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,13 +119,16 @@ public function move(string $queue, string $destination): bool
119119
return $this->getRedis()->move($queue, $destination);
120120
}
121121

122-
public function setArray(string $key, array $value): bool
122+
public function setArray(string $key, array $value, int $ttl = 0): bool
123123
{
124-
return $this->set($key, json_encode($value));
124+
return $this->set($key, json_encode($value), $ttl);
125125
}
126126

127-
public function set(string $key, string $value): bool
127+
public function set(string $key, string $value, int $ttl = 0): bool
128128
{
129+
if ($ttl > 0) {
130+
return $this->getRedis()->setex($key, $ttl, $value);
131+
}
129132
return $this->getRedis()->set($key, $value);
130133
}
131134

src/Queue/Queue.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
public function __construct(
88
public string $name,
99
public string $namespace = 'utopia-queue',
10+
public int $jobTtl = 86400,
1011
) {
1112
if (empty($this->name)) {
1213
throw new \InvalidArgumentException('Cannot create queue with empty name.');

0 commit comments

Comments
 (0)