@@ -87,6 +87,12 @@ final class StreamingServer extends EventEmitter
87
87
/** @var Clock */
88
88
private $ clock ;
89
89
90
+ /** @var LoopInterface */
91
+ private $ loop ;
92
+
93
+ /** @var int */
94
+ private $ idleConnectionTimeout ;
95
+
90
96
/**
91
97
* Creates an HTTP server that invokes the given callback for each incoming HTTP request
92
98
*
@@ -95,19 +101,21 @@ final class StreamingServer extends EventEmitter
95
101
* connections in order to then parse incoming data as HTTP.
96
102
* See also [listen()](#listen) for more details.
97
103
*
98
- * @param LoopInterface $loop
99
104
* @param callable $requestHandler
105
+ * @param int $idleConnectionTimeout
100
106
* @see self::listen()
101
107
*/
102
- public function __construct (LoopInterface $ loop , $ requestHandler )
108
+ public function __construct (LoopInterface $ loop , $ requestHandler, $ idleConnectionTimeout )
103
109
{
104
110
if (!\is_callable ($ requestHandler )) {
105
111
throw new \InvalidArgumentException ('Invalid request handler given ' );
106
112
}
107
113
114
+ $ this ->loop = $ loop ;
108
115
$ this ->callback = $ requestHandler ;
109
116
$ this ->clock = new Clock ($ loop );
110
117
$ this ->parser = new RequestHeaderParser ($ this ->clock );
118
+ $ this ->idleConnectionTimeout = $ idleConnectionTimeout ;
111
119
112
120
$ that = $ this ;
113
121
$ this ->parser ->on ('headers ' , function (ServerRequestInterface $ request , ConnectionInterface $ conn ) use ($ that ) {
@@ -134,7 +142,35 @@ public function __construct(LoopInterface $loop, $requestHandler)
134
142
*/
135
143
public function listen (ServerInterface $ socket )
136
144
{
137
- $ socket ->on ('connection ' , array ($ this ->parser , 'handle ' ));
145
+ $ socket ->on ('connection ' , array ($ this , 'handleConnection ' ));
146
+ }
147
+
148
+ /** @internal */
149
+ public function handleConnection (ConnectionInterface $ connection )
150
+ {
151
+ $ idleConnectionTimeout = $ this ->idleConnectionTimeout ;
152
+ $ loop = $ this ->loop ;
153
+ $ idleConnectionTimeoutHandler = function () use ($ connection , &$ closeEventHandler , &$ dataEventHandler ) {
154
+ $ connection ->removeListener ('close ' , $ closeEventHandler );
155
+ $ connection ->removeListener ('data ' , $ dataEventHandler );
156
+
157
+ $ connection ->close ();
158
+ };
159
+ $ timer = $ this ->loop ->addTimer ($ idleConnectionTimeout , $ idleConnectionTimeoutHandler );
160
+ $ closeEventHandler = function () use ($ connection , &$ closeEventHandler , &$ dataEventHandler , $ loop , &$ timer ) {
161
+ $ connection ->removeListener ('close ' , $ closeEventHandler );
162
+ $ connection ->removeListener ('data ' , $ dataEventHandler );
163
+
164
+ $ loop ->cancelTimer ($ timer );
165
+ };
166
+ $ dataEventHandler = function () use ($ loop , $ idleConnectionTimeout , $ idleConnectionTimeoutHandler , &$ timer ) {
167
+ $ loop ->cancelTimer ($ timer );
168
+ $ timer = $ loop ->addTimer ($ idleConnectionTimeout , $ idleConnectionTimeoutHandler );
169
+ };
170
+ $ connection ->on ('close ' , $ closeEventHandler );
171
+ $ connection ->on ('data ' , $ dataEventHandler );
172
+
173
+ $ this ->parseRequest ($ connection );
138
174
}
139
175
140
176
/** @internal */
@@ -372,7 +408,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
372
408
373
409
// either wait for next request over persistent connection or end connection
374
410
if ($ persist ) {
375
- $ this ->parser -> handle ($ connection );
411
+ $ this ->parseRequest ($ connection );
376
412
} else {
377
413
$ connection ->end ();
378
414
}
@@ -393,13 +429,67 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
393
429
// write streaming body and then wait for next request over persistent connection
394
430
if ($ persist ) {
395
431
$ body ->pipe ($ connection , array ('end ' => false ));
396
- $ parser = $ this -> parser ;
397
- $ body ->on ('end ' , function () use ($ connection , $ parser , $ body ) {
432
+ $ that = $ this ;
433
+ $ body ->on ('end ' , function () use ($ connection , $ body , & $ that ) {
398
434
$ connection ->removeListener ('close ' , array ($ body , 'close ' ));
399
- $ parser -> handle ($ connection );
435
+ $ that -> parseRequest ($ connection );
400
436
});
401
437
} else {
402
438
$ body ->pipe ($ connection );
403
439
}
404
440
}
441
+
442
+ /**
443
+ * @internal
444
+ */
445
+ public function parseRequest (ConnectionInterface $ connection )
446
+ {
447
+ $ idleConnectionTimeout = $ this ->idleConnectionTimeout ;
448
+ $ loop = $ this ->loop ;
449
+ $ parser = $ this ->parser ;
450
+ $ idleConnectionTimeoutHandler = function () use ($ connection , $ parser , &$ removeTimerHandler ) {
451
+ $ parser ->removeListener ('headers ' , $ removeTimerHandler );
452
+ $ parser ->removeListener ('error ' , $ removeTimerHandler );
453
+
454
+ $ parser ->emit ('error ' , array (
455
+ new \RuntimeException ('Request timed out ' , Response::STATUS_REQUEST_TIMEOUT ),
456
+ $ connection
457
+ ));
458
+ };
459
+ $ timer = $ this ->loop ->addTimer ($ idleConnectionTimeout , $ idleConnectionTimeoutHandler );
460
+ $ removeTimerHandler = function ($ requestOrError , $ conn ) use ($ loop , &$ timer , $ parser , $ connection , &$ removeTimerHandler , $ idleConnectionTimeout , $ idleConnectionTimeoutHandler ) {
461
+ if ($ conn !== $ connection ) {
462
+ return ;
463
+ }
464
+
465
+ $ loop ->cancelTimer ($ timer );
466
+ $ parser ->removeListener ('headers ' , $ removeTimerHandler );
467
+ $ parser ->removeListener ('error ' , $ removeTimerHandler );
468
+
469
+ if (!($ requestOrError instanceof ServerRequestInterface)) {
470
+ return ;
471
+ }
472
+
473
+ $ requestBody = $ requestOrError ->getBody ();
474
+ if (!($ requestBody instanceof HttpBodyStream)) {
475
+ return ;
476
+ }
477
+
478
+ $ timer = $ loop ->addTimer ($ idleConnectionTimeout , $ idleConnectionTimeoutHandler );
479
+ $ requestBody ->on ('data ' , function () use (&$ timer , $ loop , $ idleConnectionTimeout , $ idleConnectionTimeoutHandler ) {
480
+ $ loop ->cancelTimer ($ timer );
481
+ $ timer = $ loop ->addTimer ($ idleConnectionTimeout , $ idleConnectionTimeoutHandler );
482
+ });
483
+ $ requestBody ->on ('end ' , function () use (&$ timer , $ loop ) {
484
+ $ loop ->cancelTimer ($ timer );
485
+ });
486
+ $ requestBody ->on ('close ' , function () use (&$ timer , $ loop ) {
487
+ $ loop ->cancelTimer ($ timer );
488
+ });
489
+ };
490
+ $ this ->parser ->on ('headers ' , $ removeTimerHandler );
491
+ $ this ->parser ->on ('error ' , $ removeTimerHandler );
492
+
493
+ $ this ->parser ->handle ($ connection );
494
+ }
405
495
}
0 commit comments