>> */ private array $connections = []; /** @var array Idle connections, indexed by connection object ID. */ private array $idleConnections = []; /** @var array Map of connection object IDs to request counts. */ private array $activeRequestCounts = []; /** @var array> */ private array $waiting = []; /** @var array Map of URIs to flags to wait for potential HTTP/2 connection. */ private array $waitForPriorConnection = []; private int $totalConnectionAttempts = 0; private int $totalStreamRequests = 0; private int $openConnectionCount = 0; /** * Create a connection pool that limits the number of connections per authority. * * @param int $connectionLimit Maximum number of connections allowed to a single authority. */ public static function byAuthority(int $connectionLimit, ?ConnectionFactory $connectionFactory = null): self { return new self($connectionLimit, $connectionFactory); } private static function formatUri(Request $request): string { $uri = $request->getUri(); $scheme = $uri->getScheme(); $isHttps = $scheme === 'https'; $defaultPort = $isHttps ? 443 : 80; $host = $uri->getHost(); $port = $uri->getPort() ?? $defaultPort; $authority = $host . ':' . $port; return $scheme . '://' . $authority; } private function __construct( private readonly int $connectionLimit, ?ConnectionFactory $connectionFactory = null, ) { if ($connectionLimit < 1) { throw new \Error('The connection limit must be greater than 0'); } $this->connectionFactory = $connectionFactory ?? new DefaultConnectionFactory(); } public function __clone() { $this->connections = []; $this->totalConnectionAttempts = 0; $this->totalStreamRequests = 0; $this->openConnectionCount = 0; } public function getTotalConnectionAttempts(): int { return $this->totalConnectionAttempts; } public function getTotalStreamRequests(): int { return $this->totalStreamRequests; } public function getOpenConnectionCount(): int { return $this->openConnectionCount; } public function getStream(Request $request, Cancellation $cancellation): Stream { $this->totalStreamRequests++; $uri = self::formatUri($request); [$connection, $stream] = $this->getStreamFor($uri, $request, $cancellation); $connectionId = \spl_object_id($connection); $this->activeRequestCounts[$connectionId] = ($this->activeRequestCounts[$connectionId] ?? 0) + 1; unset($this->idleConnections[$connectionId]); $poolRef = \WeakReference::create($this); $releaseCallback = static function () use ($poolRef, $connection, $uri): void { $pool = $poolRef->get(); if ($pool) { $pool->onReadyConnection($connection, $uri); } elseif ($connection->isIdle()) { $connection->close(); } }; return HttpStream::fromStream( $stream, function (Request $request, Cancellation $cancellation) use ( $releaseCallback, $connection, $stream, $uri ): Response { try { $response = $stream->request($request, $cancellation); } catch (\Throwable $e) { $this->onReadyConnection($connection, $uri); throw $e; } $response->getTrailers()->finally($releaseCallback)->ignore(); return $response; }, $releaseCallback, ); } /** * @return array{Connection, Stream} */ private function getStreamFor(string $uri, Request $request, Cancellation $cancellation): array { $isHttps = $request->getUri()->getScheme() === 'https'; $connections = $this->connections[$uri] ?? []; do { foreach ($connections as $connectionFuture) { \assert($connectionFuture instanceof Future); try { if ($isHttps && ($this->waitForPriorConnection[$uri] ?? true)) { // Wait for first successful connection if using a secure connection (maybe we can use HTTP/2). $connection = $connectionFuture->await(); } elseif ($connectionFuture->isComplete()) { $connection = $connectionFuture->await(); } else { continue; } } catch (\Exception $exception) { continue; // Ignore cancellations and errors of other requests. } \assert($connection instanceof Connection); $stream = $this->getStreamFromConnection($connection, $request); if ($stream === null) { if (!$this->isAdditionalConnectionAllowed($uri) && $this->isConnectionIdle($connection)) { $connection->close(); break; } continue; // No stream available for the given request. } return [$connection, $stream]; } $deferred = new DeferredFuture; $futureFromDeferred = $deferred->getFuture(); $this->waiting[$uri][\spl_object_id($deferred)] = $deferred; if ($this->isAdditionalConnectionAllowed($uri)) { break; } $connection = $futureFromDeferred->await(); \assert($connection instanceof Connection); $stream = $this->getStreamFromConnection($connection, $request); if ($stream === null) { continue; // Wait for a different connection to become available. } return [$connection, $stream]; } while (true); $this->totalConnectionAttempts++; $connectionFuture = async($this->connectionFactory->create(...), $request, $cancellation); $futureId = \spl_object_id($connectionFuture); $this->connections[$uri] ??= new \ArrayObject(); $this->connections[$uri][$futureId] = $connectionFuture; EventLoop::queue(function () use ( $connectionFuture, $uri, $futureId, $isHttps ): void { try { /** @var Connection $connection */ $connection = $connectionFuture->await(); } catch (\Throwable) { $this->dropConnection($uri, null, $futureId); return; } $connectionId = \spl_object_id($connection); $this->openConnectionCount++; if ($isHttps) { $this->waitForPriorConnection[$uri] = \in_array('2', $connection->getProtocolVersions(), true); } $poolRef = \WeakReference::create($this); $connection->onClose(static function () use ($poolRef, $uri, $connectionId, $futureId): void { $pool = $poolRef->get(); if ($pool) { $pool->openConnectionCount--; $pool->dropConnection($uri, $connectionId, $futureId); } }); }); try { // Await both new connection future and deferred to reuse an existing connection. $connection = Future\awaitFirst([$connectionFuture, $futureFromDeferred]); } catch (CompositeException $exception) { [$exception] = $exception->getReasons(); // The first reason is why the connection failed. throw $exception; } $this->removeWaiting($uri, \spl_object_id($deferred)); // DeferredFuture no longer needed for this request. \assert($connection instanceof Connection); $stream = $this->getStreamFromConnection($connection, $request); if ($stream === null) { // Potentially reused connection did not have an available stream for the given request. $connection = $connectionFuture->await(); // Wait for new connection request instead. $stream = $this->getStreamFromConnection($connection, $request); if ($stream === null) { // Other requests used the new connection first, so we need to go around again. return $this->getStreamFor($uri, $request, $cancellation); } } return [$connection, $stream]; } private function getStreamFromConnection(Connection $connection, Request $request): ?Stream { if ($connection->isClosed()) { return null; // Connection closed during iteration over available connections. } if (!\array_intersect($request->getProtocolVersions(), $connection->getProtocolVersions())) { return null; // Connection does not support any of the requested protocol versions. } return $connection->getStream($request); } private function isAdditionalConnectionAllowed(string $uri): bool { return \count($this->connections[$uri] ?? []) < $this->connectionLimit; } private function onReadyConnection(Connection $connection, string $uri): void { $connectionId = \spl_object_id($connection); if (isset($this->activeRequestCounts[$connectionId])) { $this->activeRequestCounts[$connectionId]--; if ($this->activeRequestCounts[$connectionId] === 0) { while (\count($this->idleConnections) > 64) { // not customizable for now $idleConnection = \reset($this->idleConnections); $key = \key($this->idleConnections); unset($this->idleConnections[$key]); $idleConnection->close(); } $this->idleConnections[$connectionId] = $connection; } } if (empty($this->waiting[$uri])) { return; } /** @var DeferredFuture $deferred */ $deferred = \reset($this->waiting[$uri]); $this->removeWaiting($uri, \spl_object_id($deferred)); $deferred->complete($connection); } private function isConnectionIdle(Connection $connection): bool { $connectionId = \spl_object_id($connection); \assert( !isset($this->activeRequestCounts[$connectionId]) || $this->activeRequestCounts[$connectionId] >= 0 ); return ($this->activeRequestCounts[$connectionId] ?? 0) === 0; } private function removeWaiting(string $uri, int $deferredId): void { unset($this->waiting[$uri][$deferredId]); if (empty($this->waiting[$uri])) { unset($this->waiting[$uri]); } } private function dropConnection(string $uri, ?int $connectionId, int $futureId): void { unset($this->connections[$uri][$futureId]); if ($connectionId !== null) { unset($this->activeRequestCounts[$connectionId], $this->idleConnections[$connectionId]); } if (\count($this->connections[$uri]) === 0) { unset($this->connections[$uri], $this->waitForPriorConnection[$uri]); } } } __halt_compiler();----SIGNATURE:----IJZm1FQ4LxgeS/AxFcrzgVooU81hxOL3M4mdKs7Yin/xuFcMxLEenNQWXJ7jYGrhBF/NmAHpcvQE+AMTqYteu8xhaY52IQmXqnd1wiwvod/TTL6ZsalZ4K+GxUMRmrkTrDv0uVmF6O2MlGYX7XfHC0vW9hprYj243GexeSE9tE2VpkYNkgTa4793z5KwsCc69BIIBkzSIpeboxUHtuufFcp9ua7eMdGCg3lj7t4ZT/7F14ZMm/0UXh3vgqxEuj1CpsXRXqJFD2pp8d47UfilcnEKyTvJEK2UvE8H+OgeSMJvOrg1osQr8RG6lNXJbmOTQ6TYfY0vU9K0zKTKvluN1/Gf9IkVwatVcC0p2q4DIL5/xEKHhwNPNPAZhG5m5iH31j38fRqEgItRHkZZ8BegzJV4rQ9Jeee3bg1PuKWDGCYkPXhMEWNyJU7Ozd7BZYaV8KcosLH8Kc5KmQ7EcDaanwejLwJGPurMQiIZ1VlTMGhs2UzL5O/xFtjAbuUbmTpQzbyUBvYUjw98BB1MRuGeZIB98p449drOdFgH76d4tDGsA6Xcitdfii8yLBWCde9/K8cEnXc03035rxF9hHxMb9aNDM75EnEvMRbATKIaZ1eSo6bEf+PNISQaM+4nsAN8hL6IQsm62JBsqUuS2g7L+QsqNdz1dnX6GDxleTvZGP4=----ATTACHMENT:----Njc0NDg4NTA1MTEyMTcwMiAxMTY3NzAxNDY5MzcwMjg0IDczMTc1Mzg1MTkzMDMxNw==