diff --git a/src/Connection/ConnectionLimitingPool.php b/src/Connection/ConnectionLimitingPool.php index 52c8f8ea..1bc38ea7 100644 --- a/src/Connection/ConnectionLimitingPool.php +++ b/src/Connection/ConnectionLimitingPool.php @@ -187,8 +187,9 @@ private function getStreamFor(string $uri, Request $request, Cancellation $cance $deferred = new DeferredFuture; $futureFromDeferred = $deferred->getFuture(); + $deferredId = \spl_object_id($deferred); - $this->waiting[$uri][\spl_object_id($deferred)] = $deferred; + $this->waiting[$uri][$deferredId] = $deferred; if ($this->isAdditionalConnectionAllowed($uri)) { break; @@ -251,10 +252,10 @@ private function getStreamFor(string $uri, Request $request, Cancellation $cance } catch (CompositeException $exception) { [$exception] = $exception->getReasons(); // The first reason is why the connection failed. throw $exception; + } finally { + $this->removeWaiting($uri, $deferredId); // DeferredFuture no longer needed for this request. } - $this->removeWaiting($uri, \spl_object_id($deferred)); // DeferredFuture no longer needed for this request. - \assert($connection instanceof Connection); $stream = $this->getStreamFromConnection($connection, $request); diff --git a/src/Connection/Http1Connection.php b/src/Connection/Http1Connection.php index 19a0b76d..c233b0b3 100644 --- a/src/Connection/Http1Connection.php +++ b/src/Connection/Http1Connection.php @@ -468,7 +468,11 @@ private function readResponse( $this->priorTimeout = $priorTimeout ?? $this->priorTimeout; if ($requestTimeout > 0 && $parser->getState() !== Http1Parser::BODY_IDENTITY_EOF) { - $this->timeoutWatcher = EventLoop::delay($requestTimeout, $this->close(...)); + $connectionRef = \WeakReference::create($this); + $this->timeoutWatcher = EventLoop::delay( + $requestTimeout, + static fn () => $connectionRef->get()?->close(), + ); EventLoop::unreference($this->timeoutWatcher); $this->watchIdleConnection(); } else { @@ -736,16 +740,19 @@ private function watchIdleConnection(): void $this->socket->unreference(); } - $this->idleRead = async(function (): ?string { + $socket = $this->socket; + $connectionRef = \WeakReference::create($this); + + $this->idleRead = async(static function () use ($socket, $connectionRef): ?string { $chunk = null; try { - $chunk = $this->socket?->read(); + $chunk = $socket?->read(); } catch (\Throwable) { // Close connection below. } if ($chunk === null) { - $this->close(); + $connectionRef->get()?->close(); } return $chunk; diff --git a/test/Connection/ConnectionLimitingPoolTest.php b/test/Connection/ConnectionLimitingPoolTest.php index e8570fb1..9a119e79 100644 --- a/test/Connection/ConnectionLimitingPoolTest.php +++ b/test/Connection/ConnectionLimitingPoolTest.php @@ -7,6 +7,7 @@ use Amp\Http\Client\HttpClientBuilder; use Amp\Http\Client\Request; use Amp\Http\Client\Response; +use Amp\Http\Client\SocketException; use Amp\Http\Client\Trailers; use Amp\PHPUnit\AsyncTestCase; use Amp\Socket\InternetAddress; @@ -195,6 +196,34 @@ public function testConnectionNotClosedWhileInUse(): void } } + public function testWaitingRequestRemovedIfConnectionAttemptFails(): void + { + $factory = $this->createMock(ConnectionFactory::class); + $factory->expects(self::once()) + ->method('create') + ->willThrowException(new SocketException('Connection failed')); + + $pool = ConnectionLimitingPool::byAuthority(1, $factory); + + $client = (new HttpClientBuilder) + ->retry(0) + ->usingPool($pool) + ->build(); + + try { + $client->request(new Request('http://localhost')); + self::fail('Connection attempt should have failed'); + } catch (SocketException) { + // Expected. + } + + delay(0); + + $property = new \ReflectionProperty($pool, 'waiting'); + + self::assertSame([], $property->getValue($pool)); + } + private function createMockConnection(Request $request): Connection&MockObject { $response = new Response('1.1', 200, null, [], new ReadableBuffer, $request, Future::complete(new Trailers([]))); diff --git a/test/Connection/Http1ConnectionTest.php b/test/Connection/Http1ConnectionTest.php index 278e3eeb..c6fb721b 100644 --- a/test/Connection/Http1ConnectionTest.php +++ b/test/Connection/Http1ConnectionTest.php @@ -80,6 +80,33 @@ public function testConnectionNotBusyWithoutRequestGarbageCollected(): void self::assertNotNull($connection->getStream($secondRequest)); } + public function testIdleKeepAliveConnectionCanBeGarbageCollected(): void + { + [$server, $client] = Socket\createSocketPair(); + + $connection = new Http1Connection($client, 0, null, 5); + $connectionRef = \WeakReference::create($connection); + + $request = new Request('http://localhost'); + events()->requestStart($request); + + $stream = $connection->getStream($request); + $server->write("HTTP/1.1 204 No Content\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n"); + + $response = $stream->request($request, new NullCancellation); + $response->getBody()->buffer(); + + unset($client, $connection, $request, $response, $stream); + + do { + delay(0); + } while (\gc_collect_cycles()); + + self::assertNull($connectionRef->get()); + + $server->close(); + } + public function test100Continue(): void { [$server, $client] = Socket\createSocketPair();