|
6 | 6 | use React\EventLoop\Loop; |
7 | 7 | use React\EventLoop\LoopInterface; |
8 | 8 | use React\Promise\Deferred; |
| 9 | +use React\Promise\Promise; |
9 | 10 | use React\Promise\PromiseInterface; |
10 | | -use React\Promise\Timer\TimeoutException; |
11 | 11 | use React\Socket\ConnectionInterface; |
12 | 12 | use React\Socket\Connector; |
13 | 13 | use React\Socket\ConnectorInterface; |
14 | 14 | use function React\Promise\reject; |
15 | | -use function React\Promise\Timer\timeout; |
16 | 15 |
|
17 | 16 | /** |
18 | 17 | * @internal |
@@ -175,14 +174,53 @@ function (\Exception $e) use ($redis, $uri) { |
175 | 174 | return $deferred->promise(); |
176 | 175 | } |
177 | 176 |
|
178 | | - return timeout($deferred->promise(), $timeout, $this->loop)->then(null, function (\Throwable $e) use ($uri) { |
179 | | - if ($e instanceof TimeoutException) { |
180 | | - throw new \RuntimeException( |
181 | | - 'Connection to ' . $uri . ' timed out after ' . $e->getTimeout() . ' seconds (ETIMEDOUT)', |
182 | | - defined('SOCKET_ETIMEDOUT') ? SOCKET_ETIMEDOUT : 110 |
183 | | - ); |
| 177 | + $promise = $deferred->promise(); |
| 178 | + |
| 179 | + /** @var Promise<StreamingClient> */ |
| 180 | + $ret = new Promise(function (callable $resolve, callable $reject) use ($timeout, $promise, $uri): void { |
| 181 | + /** @var ?\React\EventLoop\TimerInterface */ |
| 182 | + $timer = null; |
| 183 | + $promise = $promise->then(function (StreamingClient $v) use (&$timer, $resolve): void { |
| 184 | + if ($timer) { |
| 185 | + $this->loop->cancelTimer($timer); |
| 186 | + } |
| 187 | + $timer = false; |
| 188 | + $resolve($v); |
| 189 | + }, function (\Throwable $e) use (&$timer, $reject): void { |
| 190 | + if ($timer) { |
| 191 | + $this->loop->cancelTimer($timer); |
| 192 | + } |
| 193 | + $timer = false; |
| 194 | + $reject($e); |
| 195 | + }); |
| 196 | + |
| 197 | + // promise already settled => no need to start timer |
| 198 | + if ($timer === false) { |
| 199 | + return; |
184 | 200 | } |
185 | | - throw $e; |
| 201 | + |
| 202 | + // start timeout timer which will cancel the pending promise |
| 203 | + $timer = $this->loop->addTimer($timeout, function () use ($timeout, &$promise, $reject, $uri): void { |
| 204 | + $reject(new \RuntimeException( |
| 205 | + 'Connection to ' . $uri . ' timed out after ' . $timeout . ' seconds (ETIMEDOUT)', |
| 206 | + \defined('SOCKET_ETIMEDOUT') ? \SOCKET_ETIMEDOUT : 110 |
| 207 | + )); |
| 208 | + |
| 209 | + // Cancel pending connection to clean up any underlying resources and references. |
| 210 | + // Avoid garbage references in call stack by passing pending promise by reference. |
| 211 | + \assert($promise instanceof PromiseInterface); |
| 212 | + $promise->cancel(); |
| 213 | + $promise = null; |
| 214 | + }); |
| 215 | + }, function () use (&$promise): void { |
| 216 | + // Cancelling this promise will cancel the pending connection, thus triggering the rejection logic above. |
| 217 | + // Avoid garbage references in call stack by passing pending promise by reference. |
| 218 | + \assert($promise instanceof PromiseInterface); |
| 219 | + $promise->cancel(); |
| 220 | + $promise = null; |
186 | 221 | }); |
| 222 | + |
| 223 | + // variable assignment needed for legacy PHPStan on PHP 7.1 only |
| 224 | + return $ret; |
187 | 225 | } |
188 | 226 | } |
0 commit comments