Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion src/Database/Adapter/MariaDB.php
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,29 @@ public function createDocument(Document $collection, Document $document): Docume
}

if (isset($stmtPermissions)) {
$stmtPermissions->execute();
try {
$stmtPermissions->execute();
} catch (PDOException $e) {
$isOrphanedPermission = $e->getCode() === '23000'
&& isset($e->errorInfo[1])
&& $e->errorInfo[1] === 1062
&& \str_contains($e->getMessage(), '_index1');

if (!$isOrphanedPermission) {
throw $e;
}

// Clean up orphaned permissions from a previous failed delete, then retry
$sql = "DELETE FROM {$this->getSQLTable($name . '_perms')} WHERE _document = :_uid {$this->getTenantQuery($collection)}";
$cleanup = $this->getPDO()->prepare($sql);
$cleanup->bindValue(':_uid', $document->getId());
if ($this->sharedTables) {
$cleanup->bindValue(':_tenant', $document->getTenant());
}
$cleanup->execute();

$stmtPermissions->execute();
}
}
} catch (PDOException $e) {
throw $this->processException($e);
Expand Down Expand Up @@ -1883,6 +1905,13 @@ protected function processException(PDOException $e): \Exception

// Duplicate row
if ($e->getCode() === '23000' && isset($e->errorInfo[1]) && $e->errorInfo[1] === 1062) {
$message = $e->getMessage();
if (\str_contains($message, '_index1')) {
return new DuplicateException('Duplicate permissions for document', $e->getCode(), $e);
}
if (!\str_contains($message, '_uid')) {
return new DuplicateException('Document with the requested unique attributes already exists', $e->getCode(), $e);
}
return new DuplicateException('Document already exists', $e->getCode(), $e);
}

Expand Down
11 changes: 5 additions & 6 deletions src/Database/Adapter/Mongo.php
Original file line number Diff line number Diff line change
Expand Up @@ -3483,12 +3483,11 @@ protected function processException(\Throwable $e): \Throwable
}

// Duplicate key error
if ($e->getCode() === 11000) {
return new DuplicateException('Document already exists', $e->getCode(), $e);
}

// Duplicate key error for unique index
if ($e->getCode() === 11001) {
if ($e->getCode() === 11000 || $e->getCode() === 11001) {
$message = $e->getMessage();
if (!\str_contains($message, '_uid')) {
return new DuplicateException('Document with the requested unique attributes already exists', $e->getCode(), $e);
}
return new DuplicateException('Document already exists', $e->getCode(), $e);
}

Expand Down
56 changes: 56 additions & 0 deletions src/Database/Adapter/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ class Pool extends Adapter
*/
protected UtopiaPool $pool;

/**
* When a transaction is active, all delegate calls are routed through
* this pinned adapter to ensure they run on the same connection.
*/
protected ?Adapter $pinnedAdapter = null;

/**
* @param UtopiaPool<covariant Adapter> $pool The pool to use for connections. Must contain instances of Adapter.
*/
Expand All @@ -36,6 +42,10 @@ public function __construct(UtopiaPool $pool)
*/
public function delegate(string $method, array $args): mixed
{
if ($this->pinnedAdapter !== null) {
return $this->pinnedAdapter->{$method}(...$args);
}

return $this->pool->use(function (Adapter $adapter) use ($method, $args) {
// Run setters in case config changed since this connection was last used
$adapter->setDatabase($this->getDatabase());
Expand Down Expand Up @@ -92,6 +102,52 @@ public function rollbackTransaction(): bool
return $this->delegate(__FUNCTION__, \func_get_args());
}

/**
* Pin a single connection from the pool for the entire transaction lifecycle.
* This prevents startTransaction(), the callback, and commitTransaction()
* from running on different connections.
*
* @template T
* @param callable(): T $callback
* @return T
* @throws \Throwable
*/
public function withTransaction(callable $callback): mixed
{
// If already inside a transaction, reuse the pinned adapter
// so nested withTransaction calls use the same connection
if ($this->pinnedAdapter !== null) {
return $this->pinnedAdapter->withTransaction($callback);
}

return $this->pool->use(function (Adapter $adapter) use ($callback) {
$adapter->setDatabase($this->getDatabase());
$adapter->setNamespace($this->getNamespace());
$adapter->setSharedTables($this->getSharedTables());
$adapter->setTenant($this->getTenant());
$adapter->setAuthorization($this->authorization);

if ($this->getTimeout() > 0) {
$adapter->setTimeout($this->getTimeout());
}
$adapter->resetDebug();
foreach ($this->getDebug() as $key => $value) {
$adapter->setDebug($key, $value);
}
$adapter->resetMetadata();
foreach ($this->getMetadata() as $key => $value) {
$adapter->setMetadata($key, $value);
}

$this->pinnedAdapter = $adapter;
try {
return $adapter->withTransaction($callback);
} finally {
$this->pinnedAdapter = null;
}
});
}
Comment on lines +115 to +149
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check how Pool adapter instances are created/shared in the codebase
echo "=== Pool instantiation patterns ==="
rg -n 'new Pool\(' --type php -g '!vendor/**' -g '!tests/**' | head -100

echo ""
echo "=== Pool::class bindings in DI containers ==="
rg -n 'Pool::class' --type php -g '!vendor/**' -g '!tests/**' | head -100

echo ""
echo "=== Check for singleton/shared container bindings ==="
rg -n -B2 -A2 '(singleton|share|shared).*[Pp]ool' --type php -g '!vendor/**' -g '!tests/**' | head -100

Repository: utopia-php/database

Length of output: 200


🏁 Script executed:

#!/bin/bash
# Check for Pool usage patterns (imports, instanceof, etc.)
echo "=== Import/use Pool patterns ==="
rg -n '(use.*Pool|instanceof Pool)' --type php -g '!vendor/**' | head -100

echo ""
echo "=== Pool class definition and interface ==="
rg -n 'class Pool\|interface.*Pool' --type php -g '!vendor/**' | head -50

echo ""
echo "=== Check test files for Pool usage ==="
rg -n 'new Pool\|Pool::class' --type php tests/ 2>/dev/null | head -100

echo ""
echo "=== Check if Pool is created through a factory or builder ==="
rg -n -B3 -A3 'function.*[Pp]ool|Pool.*factory' --type php -g '!vendor/**' | head -150

Repository: utopia-php/database

Length of output: 2067


🏁 Script executed:

#!/bin/bash
# Read the Pool test file to understand usage patterns
echo "=== Pool Test File ==="
cat -n tests/e2e/Adapter/PoolTest.php

echo ""
echo "=== Search for pool->use() patterns ==="
rg -n -B2 -A5 'pool->use\(' --type php -g '!vendor/**' | head -150

echo ""
echo "=== Check for Fiber/async documentation or comments ==="
rg -n -i '(fiber|coroutine|concurrent|async|swoole|thread)' --type php src/Database/Adapter/Pool.php

Repository: utopia-php/database

Length of output: 11805


🏁 Script executed:

#!/bin/bash
# Check for Database instantiation patterns in production code
echo "=== Database instantiation patterns ==="
rg -n 'new Database\(' --type php src/ | head -100

echo ""
echo "=== Check Pool class structure and properties ==="
head -100 src/Database/Adapter/Pool.php

echo ""
echo "=== Look for README or documentation ==="
find . -maxdepth 2 -name 'README*' -o -name '*.md' | head -10

Repository: utopia-php/database

Length of output: 3362


🏁 Script executed:

#!/bin/bash
# Check README for Pool usage documentation
echo "=== README Pool usage section ==="
grep -n -i -A10 -B2 'pool' README.md | head -100

echo ""
echo "=== Check SPEC for Pool design ==="
grep -n -i -A10 'pool\|transaction\|concurrent\|coroutine' SPEC.md | head -150

Repository: utopia-php/database

Length of output: 132


🏁 Script executed:

#!/bin/bash
# Read full README
echo "=== Full README.md ==="
cat README.md

echo ""
echo "=== Search for UtopiaPool usage documentation ==="
rg -n 'UtopiaPool\|Pools\\Pool' --type php -B5 -A5 src/ | head -200

Repository: utopia-php/database

Length of output: 26019


🏁 Script executed:

#!/bin/bash
# Check if there are any concurrency-related comments or patterns
echo "=== Search for concurrency/thread safety patterns in Pool ==="
cat -n src/Database/Adapter/Pool.php | grep -i -A5 -B5 'thread\|concurrent\|coroutine\|fiber\|async\|race\|mutex\|lock'

echo ""
echo "=== Check Pool's usage of delegate() with pinnedAdapter ==="
cat -n src/Database/Adapter/Pool.php | head -150

echo ""
echo "=== Search for any container/DI configuration files ==="
find . -maxdepth 3 -name 'config*' -o -name 'container*' -o -name 'services*' | grep -v vendor | grep -v node_modules

echo ""
echo "=== Check if there are any examples or integration tests showing shared Pool usage ==="
find . -path ./vendor -prune -o -name '*example*' -o -name '*integration*' -type f -print 2>/dev/null | head -20

Repository: utopia-php/database

Length of output: 7023


Document Pool ownership model regarding shared instances in async runtimes.

pinnedAdapter stored as mutable instance state could race if a single Pool instance were shared across concurrent coroutines (e.g., Swoole, Fibers). Coroutine B entering withTransaction while A's transaction is active would see A's pinnedAdapter and incorrectly route to A's connection, silently corrupting both transactions.

While Pool is dependency-injected (suggesting per-request instantiation), the codebase lacks documentation or configuration showing the intended ownership model. Add documentation clarifying whether Pool instances must be instantiated per-request/per-coroutine, or refactor to use context-aware storage (e.g., Fiber::getCurrent() key or similar) instead of instance state to make the design explicitly safe for shared instances.

🤖 Prompt for AI Agents
In `@src/Database/Adapter/Pool.php` around lines 115 - 149, The Pool class
currently stores pinnedAdapter as instance state which races when a Pool is
shared across concurrent coroutines/fibers; update the codebase to either (A)
clearly document the ownership model so callers know Pool must be instantiated
per-request/per-coroutine (add a class-level docblock on Pool and notes in
constructor and DI config) or (B) make pinnedAdapter context-aware by replacing
the single property with a context map keyed by the current execution context
(e.g., Fiber::getCurrent() or the runtime's coroutine id) so withTransaction and
the pinnedAdapter lookup store/retrieve adapter using that key; refer to the
Pool::withTransaction method and the pinnedAdapter property when making the
change.


protected function quote(string $string): string
{
return $this->delegate(__FUNCTION__, \func_get_args());
Expand Down
4 changes: 4 additions & 0 deletions src/Database/Adapter/Postgres.php
Original file line number Diff line number Diff line change
Expand Up @@ -2189,6 +2189,10 @@ protected function processException(PDOException $e): \Exception

// Duplicate row
if ($e->getCode() === '23505' && isset($e->errorInfo[1]) && $e->errorInfo[1] === 7) {
$message = $e->getMessage();
if (!\str_contains($message, '_uid')) {
return new DuplicateException('Document with the requested unique attributes already exists', $e->getCode(), $e);
}
return new DuplicateException('Document already exists', $e->getCode(), $e);
}

Expand Down
3 changes: 3 additions & 0 deletions src/Database/Adapter/SQLite.php
Original file line number Diff line number Diff line change
Expand Up @@ -1328,6 +1328,9 @@ protected function processException(PDOException $e): \Exception
stripos($message, 'unique') !== false ||
stripos($message, 'duplicate') !== false
) {
if (!\str_contains($message, '_uid')) {
return new DuplicateException('Document with the requested unique attributes already exists', $e->getCode(), $e);
}
return new DuplicateException('Document already exists', $e->getCode(), $e);
}
}
Expand Down
88 changes: 88 additions & 0 deletions tests/e2e/Adapter/PoolTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
use Utopia\Database\Adapter\MySQL;
use Utopia\Database\Adapter\Pool;
use Utopia\Database\Database;
use Utopia\Database\Document;
use Utopia\Database\Exception;
use Utopia\Database\Exception\Duplicate;
use Utopia\Database\Exception\Limit;
use Utopia\Database\Helpers\Permission;
use Utopia\Database\Helpers\Role;
use Utopia\Database\PDO;
use Utopia\Pools\Adapter\Stack;
use Utopia\Pools\Pool as UtopiaPool;
Expand Down Expand Up @@ -109,4 +112,89 @@ protected function deleteIndex(string $collection, string $index): bool

return true;
}

/**
* Execute raw SQL via the pool using reflection to access the adapter's PDO.
*
* @param string $sql
* @param array<string, mixed> $binds
*/
private function execRawSQL(string $sql, array $binds = []): void
{
self::$pool->use(function (Adapter $adapter) use ($sql, $binds) {
$class = new ReflectionClass($adapter);
$property = $class->getProperty('pdo');
$property->setAccessible(true);
$pdo = $property->getValue($adapter);
$stmt = $pdo->prepare($sql);
foreach ($binds as $key => $value) {
$stmt->bindValue($key, $value);
}
$stmt->execute();
});
}

/**
* Test that orphaned permission records from a previous failed delete
* don't block document recreation. The createDocument method should
* clean up orphaned perms and retry.
*/
public function testOrphanedPermissionsRecovery(): void
{
$database = $this->getDatabase();
$collection = 'orphanedPermsRecovery';

$database->createCollection($collection);
$database->createAttribute($collection, 'title', Database::VAR_STRING, 128, true);

// Step 1: Create a document with permissions
$doc = $database->createDocument($collection, new Document([
'$id' => 'orphan_test',
'$permissions' => [
Permission::read(Role::any()),
Permission::update(Role::any()),
Permission::delete(Role::any()),
],
'title' => 'Original',
]));
$this->assertEquals('orphan_test', $doc->getId());

// Step 2: Delete the document normally (cleans up both doc and perms)
$database->deleteDocument($collection, 'orphan_test');
$deleted = $database->getDocument($collection, 'orphan_test');
$this->assertTrue($deleted->isEmpty());

// Step 3: Manually re-insert orphaned permission rows (simulating a partial delete failure)
$namespace = $this->getDatabase()->getNamespace();
$dbName = $this->getDatabase()->getDatabase();
$permsTable = "`{$dbName}`.`{$namespace}_{$collection}_perms`";

$this->execRawSQL(
"INSERT INTO {$permsTable} (_type, _permission, _document) VALUES (:type, :perm, :doc)",
[':type' => 'read', ':perm' => 'any', ':doc' => 'orphan_test']
);
$this->execRawSQL(
"INSERT INTO {$permsTable} (_type, _permission, _document) VALUES (:type, :perm, :doc)",
[':type' => 'update', ':perm' => 'any', ':doc' => 'orphan_test']
);

// Step 4: Recreate a document with the same ID - should succeed by cleaning up orphaned perms
$newDoc = $database->createDocument($collection, new Document([
'$id' => 'orphan_test',
'$permissions' => [
Permission::read(Role::any()),
Permission::update(Role::any()),
],
'title' => 'Recreated',
]));
$this->assertEquals('orphan_test', $newDoc->getId());
$this->assertEquals('Recreated', $newDoc->getAttribute('title'));

// Verify the document can be fetched
$found = $database->getDocument($collection, 'orphan_test');
$this->assertFalse($found->isEmpty());
$this->assertEquals('Recreated', $found->getAttribute('title'));

$database->deleteCollection($collection);
}
}
58 changes: 58 additions & 0 deletions tests/e2e/Adapter/Scopes/DocumentTests.php
Original file line number Diff line number Diff line change
Expand Up @@ -5007,6 +5007,64 @@ public function testUniqueIndexDuplicate(): void
$this->assertInstanceOf(DuplicateException::class, $e);
}
}

/**
* Test that DuplicateException messages differentiate between
* document ID duplicates and unique index violations.
*/
public function testDuplicateExceptionMessages(): void
{
/** @var Database $database */
$database = $this->getDatabase();

if (!$database->getAdapter()->getSupportForUniqueIndex()) {
$this->expectNotToPerformAssertions();
return;
}

$database->createCollection('duplicateMessages');
$database->createAttribute('duplicateMessages', 'email', Database::VAR_STRING, 128, true);
$database->createIndex('duplicateMessages', 'emailUnique', Database::INDEX_UNIQUE, ['email'], [128]);

// Create first document
$database->createDocument('duplicateMessages', new Document([
'$id' => 'dup_msg_1',
'$permissions' => [
Permission::read(Role::any()),
],
'email' => 'test@example.com',
]));

// Test 1: Duplicate document ID should say "Document already exists"
try {
$database->createDocument('duplicateMessages', new Document([
'$id' => 'dup_msg_1',
'$permissions' => [
Permission::read(Role::any()),
],
'email' => 'different@example.com',
]));
$this->fail('Expected DuplicateException for duplicate document ID');
} catch (DuplicateException $e) {
$this->assertStringContainsString('Document already exists', $e->getMessage());
}

// Test 2: Unique index violation should mention "unique attributes"
try {
$database->createDocument('duplicateMessages', new Document([
'$id' => 'dup_msg_2',
'$permissions' => [
Permission::read(Role::any()),
],
'email' => 'test@example.com',
]));
$this->fail('Expected DuplicateException for unique index violation');
} catch (DuplicateException $e) {
$this->assertStringContainsString('unique attributes', $e->getMessage());
}

$database->deleteCollection('duplicateMessages');
}
/**
* @depends testUniqueIndexDuplicate
*/
Expand Down
52 changes: 52 additions & 0 deletions tests/e2e/Adapter/Scopes/GeneralTests.php
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,58 @@ public function testCacheReconnect(): void
}
}

/**
* Test that withTransaction properly rolls back on failure.
* With the Pool adapter, this verifies that the entire transaction
* (start, callback, commit/rollback) runs on a single pinned connection.
*/
public function testTransactionAtomicity(): void
{
/** @var Database $database */
$database = $this->getDatabase();

$database->createCollection('transactionAtomicity');
$database->createAttribute('transactionAtomicity', 'title', Database::VAR_STRING, 128, true);

// Verify a successful transaction commits
$doc = $database->withTransaction(function () use ($database) {
return $database->createDocument('transactionAtomicity', new Document([
'$id' => 'tx_success',
'$permissions' => [
Permission::read(Role::any()),
],
'title' => 'Committed',
]));
});
$this->assertEquals('tx_success', $doc->getId());
$found = $database->getDocument('transactionAtomicity', 'tx_success');
$this->assertFalse($found->isEmpty());
$this->assertEquals('Committed', $found->getAttribute('title'));

// Verify a failed transaction rolls back completely
try {
$database->withTransaction(function () use ($database) {
$database->createDocument('transactionAtomicity', new Document([
'$id' => 'tx_fail',
'$permissions' => [
Permission::read(Role::any()),
],
'title' => 'Should be rolled back',
]));

throw new \Exception('Intentional failure to trigger rollback');
});
} catch (\Exception $e) {
$this->assertEquals('Intentional failure to trigger rollback', $e->getMessage());
}

// Document should NOT exist since the transaction was rolled back
$notFound = $database->getDocument('transactionAtomicity', 'tx_fail');
$this->assertTrue($notFound->isEmpty(), 'Document should not exist after transaction rollback');

$database->deleteCollection('transactionAtomicity');
}

/**
* Wait for Redis to be ready with a readiness probe
*/
Expand Down