Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 29 additions & 23 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -197,19 +197,12 @@ public function connect(): self
if ($this->client->isConnected()) {
return $this;
}

// Validate connection parameters before attempting connection
$validateConnectionParams = function () {
if (empty($this->host)) {
throw new Exception('MongoDB host cannot be empty');
}
if ($this->port <= 0 || $this->port > 65535) {
throw new Exception('MongoDB port must be between 1 and 65535');
}
};

$validateConnectionParams();

if (empty($this->host)) {
throw new Exception('MongoDB host cannot be empty');
}
if ($this->port <= 0 || $this->port > 65535) {
throw new Exception('MongoDB port must be between 1 and 65535');
}
if (!$this->client->connect($this->host, $this->port)) {
throw new Exception("Failed to connect to MongoDB at {$this->host}:{$this->port}");
}
Expand Down Expand Up @@ -261,17 +254,15 @@ public function query(array $command, ?string $db = null): stdClass|array|int
if (is_array($sessionData) && isset($sessionData['id'])) {
$command['lsid'] = $sessionData['id'];
$rawId = $sessionData['id']->id ?? null;
$sessionId = $rawId instanceof \MongoDB\BSON\Binary
? bin2hex($rawId->getData())
: $rawId;
} else {
$command['lsid'] = $sessionData;
$rawId = $sessionData->id ?? null;
$sessionId = $rawId instanceof \MongoDB\BSON\Binary
? bin2hex($rawId->getData())
: $rawId;
}

$sessionId = $rawId instanceof \MongoDB\BSON\Binary
? bin2hex($rawId->getData())
: $rawId;

// Add transaction parameters if session is in transaction
if ($sessionId && isset($this->sessions[$sessionId]) &&
$this->sessions[$sessionId]['state'] === self::TRANSACTION_IN_PROGRESS) {
Expand Down Expand Up @@ -346,7 +337,6 @@ public function query(array $command, ?string $db = null): stdClass|array|int
$message = pack('V*', 21 + strlen($sections), $this->id, 0, 2013, 0) . "\0" . $sections;
$result = $this->send($message);

// Update causal consistency timestamps from response
$this->updateCausalConsistency($result);

// Update session last use time if session was provided
Expand All @@ -367,7 +357,23 @@ public function query(array $command, ?string $db = null): stdClass|array|int
*/
public function send(mixed $data): stdClass|array|int
{
$this->client->send($data);
// Check if connection is alive, connect if not
if (!$this->client->isConnected()) {
$this->connect();
}

$result = $this->client->send($data);

// If send fails, try to reconnect once
if ($result === false) {
$this->close();
$this->connect();
$result = $this->client->send($data);
if ($result === false) {
throw new Exception('Failed to send data to MongoDB after reconnection attempt');
}
}

return $this->receive();
}

Expand Down Expand Up @@ -397,8 +403,8 @@ private function receive(): stdClass|array|int
if ($this->client instanceof CoroutineClient) {
Coroutine::sleep(0.001); // 1ms for coroutines
} else {
\usleep($sleepTime); // Microsecond precision for sync client
$sleepTime = \min($sleepTime * 1.2, 10000); // Cap at 10ms for faster checking
\usleep((int)$sleepTime); // Microsecond precision for sync client
$sleepTime = (int)\min($sleepTime * 1.2, 10000); // Cap at 10ms for faster checking
}
continue;
}
Expand Down
Loading