Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming http request (content) while reading (by streaming) the response seems not to be working #377

Open
Nek- opened this issue Feb 18, 2025 · 0 comments

Comments

@Nek-
Copy link
Contributor

Nek- commented Feb 18, 2025

Hello

I found out that it's not possible to stream an http/2 request while streaming the response. Besides the http/2 protocol supports it in theory (and some clients/servers also does in other languages).

Context unrelated to the issue

I'm writting a GRPC Client. GRPC uses HTTP/2 to its full potential. For example, for bi-directional streaming, it allows the client to stream messages by writing the request while the server may also respond with many messages streamed in the response.

Here is a schema explaining how it works (I hope it makes things clear):

                                                                                                         
                               GRPC Frame (encapsulated in HTTP Frame)                ┌────HTTP/2 Stream 
                                                                                      │                  
                                     │                                                │                  
                                     │                                                │                  
           ┌─────────────────────────┼────────────────────────────────────────────────▼──────┐           
           │┌──────────────┐  ┌──────▼──────┐  ┌─────────────┐                               │           
 Client    ││  Protobuf    │  │ Encoded     │  │ Messages    │  ─────────►                   │           
           │└──────────────┘  └─────────────┘  └─────────────┘                               │           
           │                               ┌─────────────┐   ┌────────────┐  ┌─────────────┐ │           
           │               ◄────────────── │             │   │            │  │             │ │   Server  
           │                               └───────▲─────┘   └────────────┘  └─────────────┘ │           
           └───────────────────────────────────────┼─────────────────────────────────────────┘           
                                                   │                                                     
                                                   │                                                     
                                             GRPC Frame                                                  
                                                                                                         
This is why I have this code - simplified here for issue purpose (click to open):
<?php

namespace Grpc;

use Amp\ByteStream\ReadableIterableStream;
use Amp\ByteStream\ReadableStream;
use Amp\Future;
use Amp\Http\Client\HttpClient;
use Amp\Http\Client\Request;
use Amp\Http\Client\Response;
use Amp\Http\Client\StreamedContent;
use Amp\Pipeline\ConcurrentIterator;
use Amp\Pipeline\Pipeline;
use Amp\Pipeline\Queue;
use Grpc\Protocol\GrpcStreamReader;
use ProtobufTest\TestCase;
use function Amp\async;
use function Amp\Future\await;

class BidiStreamingCall
{
    private Request $request;
    private ?Future $response = null;
    private Queue $writeStream;
    public function __construct(
        /** @var array{0: class-string, 1: 'decode'} */
        private array $reply,
        private HttpClient $httpClient,
        private string $path
    )
    {
        $this->writeStream = new Queue();
        $this->request = new Request(
            $this->path,
            'POST',
            StreamedContent::fromStream(new ReadableIterableStream($this->writeStream->iterate()))
        );
        $this->request->setProtocolVersions(['2']);
        $this->request->addHeader('Content-Type', 'application/grpc+proto');
        $this->request->addHeader('TE', 'trailers');
    }

    public function read(): \Generator {
        if ($this->response === null) {
            $response = async(fn() => $this->httpClient->request($this->request));
            $this->response = $response;
        }
        return (new GrpcStreamReader($this->reply[0], $this->response->await()))();
    }

    public function write(\Google\Protobuf\Internal\Message $data)
    {
        $payload = $data->serializeToString();
        $payload = pack('CN', 0, strlen($payload)).$payload;
        $this->writeStream->push($payload);

        if ($this->response === null) {
            $response = async(fn() => $this->httpClient->request($this->request));
            $this->response = $response;
        }
    }
}


$call = $client->SayHelloBidiStream();

$greets = ['Maxime', 'Elodie', 'Clément', 'Gabriel'];

\Amp\Future\await([
    \Amp\async(function () use ($call) {
        while($response = $call->read()) {
            echo $response->getMessage() . PHP_EOL;
        }
    }),
    \Amp\async(function () use ($greets, $call) {
        foreach ($greets as $greet) {
            $request = new \Helloworld\HelloRequest();
            $request->setName($greet);

            // !! The second call to write is failing!
            $call->write($request);
            \Amp\delay(5);
        }
        $call->end();
    })
]);

The actual issue

Basically, I cannot "write" after the streaming of the response started because the request phase is "ResponseBody".

The error is the following (click me)
  PHP Fatal error:  Uncaught Error: Invalid request phase: ResponseBody in ...../vendor/amphp/http-client/src/Internal/EventInvoker.php:163
  Stack trace:
  #0 ...../vendor/amphp/http-client/src/Connection/Internal/Http2ConnectionProcessor.php(1049): Amp\Http\Client\Internal\EventInvoker->requestBodyProgress(Object(Amp\Http\Client\Request), Object(Amp\Http\Client\Connection\HttpStream))
  #1 ...../vendor/amphp/http-client/src/Connection/Http2Connection.php(104): Amp\Http\Client\Connection\Internal\Http2ConnectionProcessor->request(Object(Amp\Http\Client\Request), Object(Amp\CompositeCancellation), Object(Amp\Http\Client\Connection\HttpStream))
  #2 ...../vendor/amphp/http-client/src/Connection/HttpStream.php(96): Amp\Http\Client\Connection\Http2Connection->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\Connection\HttpStream))
  #3 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\Connection\HttpStream->{closure:Amp\Http\Client\Connection\HttpStream::request():96}(Object(Amp\Http\Client\Request))
  #4 ...../vendor/amphp/http-client/src/Connection/HttpStream.php(96): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #5 ...../vendor/amphp/http-client/src/Connection/ConnectionLimitingPool.php(131): Amp\Http\Client\Connection\HttpStream->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #6 ...../vendor/amphp/http-client/src/Connection/HttpStream.php(96): Amp\Http\Client\Connection\ConnectionLimitingPool->{closure:Amp\Http\Client\Connection\ConnectionLimitingPool::getStream():124}(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\Connection\HttpStream))
  #7 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\Connection\HttpStream->{closure:Amp\Http\Client\Connection\HttpStream::request():96}(Object(Amp\Http\Client\Request))
  #8 ...../vendor/amphp/http-client/src/Connection/HttpStream.php(96): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #9 ...../vendor/amphp/http-client/src/Interceptor/DecompressResponse.php(44): Amp\Http\Client\Connection\HttpStream->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #10 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(54): Amp\Http\Client\Interceptor\DecompressResponse->requestViaNetwork(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\Connection\HttpStream))
  #11 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\Connection\InterceptedStream->{closure:Amp\Http\Client\Connection\InterceptedStream::request():36}(Object(Amp\Http\Client\Request))
  #12 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(36): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #13 ...../vendor/amphp/http-client/src/Interceptor/ModifyRequest.php(36): Amp\Http\Client\Connection\InterceptedStream->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #14 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(54): Amp\Http\Client\Interceptor\ModifyRequest->requestViaNetwork(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\Connection\InterceptedStream))
  #15 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\Connection\InterceptedStream->{closure:Amp\Http\Client\Connection\InterceptedStream::request():36}(Object(Amp\Http\Client\Request))
  #16 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(36): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #17 ...../vendor/amphp/http-client/src/Interceptor/ModifyRequest.php(36): Amp\Http\Client\Connection\InterceptedStream->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #18 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(54): Amp\Http\Client\Interceptor\ModifyRequest->requestViaNetwork(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\Connection\InterceptedStream))
  #19 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\Connection\InterceptedStream->{closure:Amp\Http\Client\Connection\InterceptedStream::request():36}(Object(Amp\Http\Client\Request))
  #20 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(36): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #21 ...../vendor/amphp/http-client/src/Interceptor/ModifyRequest.php(36): Amp\Http\Client\Connection\InterceptedStream->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #22 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(54): Amp\Http\Client\Interceptor\ModifyRequest->requestViaNetwork(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\Connection\InterceptedStream))
  #23 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\Connection\InterceptedStream->{closure:Amp\Http\Client\Connection\InterceptedStream::request():36}(Object(Amp\Http\Client\Request))
  #24 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(36): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #25 ...../vendor/amphp/http-client/src/PooledHttpClient.php(38): Amp\Http\Client\Connection\InterceptedStream->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #26 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\PooledHttpClient->{closure:Amp\Http\Client\PooledHttpClient::request():29}(Object(Amp\Http\Client\Request))
  #27 ...../vendor/amphp/http-client/src/PooledHttpClient.php(29): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #28 ...../vendor/amphp/http-client/src/Interceptor/RetryRequests.php(34): Amp\Http\Client\PooledHttpClient->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #29 ...../vendor/amphp/http-client/src/InterceptedHttpClient.php(40): Amp\Http\Client\Interceptor\RetryRequests->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\PooledHttpClient))
  #30 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\InterceptedHttpClient->{closure:Amp\Http\Client\InterceptedHttpClient::request():28}(Object(Amp\Http\Client\Request))
  #31 ...../vendor/amphp/http-client/src/InterceptedHttpClient.php(28): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #32 ...../vendor/amphp/http-client/src/Interceptor/FollowRedirects.php(131): Amp\Http\Client\InterceptedHttpClient->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #33 ...../vendor/amphp/http-client/src/InterceptedHttpClient.php(40): Amp\Http\Client\Interceptor\FollowRedirects->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\InterceptedHttpClient))
  #34 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\InterceptedHttpClient->{closure:Amp\Http\Client\InterceptedHttpClient::request():28}(Object(Amp\Http\Client\Request))
  #35 ...../vendor/amphp/http-client/src/InterceptedHttpClient.php(28): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #36 ...../vendor/amphp/http-client/src/Interceptor/ModifyRequest.php(48): Amp\Http\Client\InterceptedHttpClient->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #37 ...../vendor/amphp/http-client/src/InterceptedHttpClient.php(40): Amp\Http\Client\Interceptor\ModifyRequest->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\InterceptedHttpClient))
  #38 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\InterceptedHttpClient->{closure:Amp\Http\Client\InterceptedHttpClient::request():28}(Object(Amp\Http\Client\Request))
  #39 ...../vendor/amphp/http-client/src/InterceptedHttpClient.php(28): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #40 ...../vendor/amphp/http-client/src/HttpClient.php(34): Amp\Http\Client\InterceptedHttpClient->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #41 ...../vendor/amphp/http-client/src/functions.php(30): Amp\Http\Client\HttpClient->{closure:Amp\Http\Client\HttpClient::request():34}(Object(Amp\Http\Client\Request))
  #42 ...../vendor/amphp/http-client/src/HttpClient.php(31): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #43 ...../lib/swag-industries/grpc/src/BidiStreamingCall.php(64): Amp\Http\Client\HttpClient->request(Object(Amp\Http\Client\Request))
  #44 ...../vendor/amphp/amp/src/functions.php(33): Grpc\BidiStreamingCall->{closure:Grpc\BidiStreamingCall::read():64}()
  #45 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(430): {closure:Amp\async():23}(NULL, NULL, Array)
  #46 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(567): Revolt\EventLoop\Internal\AbstractDriver->invokeMicrotasks()
  #47 [internal function]: Revolt\EventLoop\Internal\AbstractDriver->{closure:Revolt\EventLoop\Internal\AbstractDriver::createCallbackFiber():565}()
  #48 ...../vendor/revolt/event-loop/src/EventLoop/Internal/DriverSuspension.php(64): Fiber->resume('\x00\x00\x00\x00\n\n\x08Cl\xC3\xA9ment')
  #49 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(430): Revolt\EventLoop\Internal\DriverSuspension::{closure:Revolt\EventLoop\Internal\DriverSuspension::resume():61}()
  #50 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(621): Revolt\EventLoop\Internal\AbstractDriver->invokeMicrotasks()
  #51 [internal function]: Revolt\EventLoop\Internal\AbstractDriver->{closure:Revolt\EventLoop\Internal\AbstractDriver::createCallbackFiber():565}()
  #52 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(502): Fiber->resume()
  #53 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(558): Revolt\EventLoop\Internal\AbstractDriver->invokeCallbacks()
  #54 [internal function]: Revolt\EventLoop\Internal\AbstractDriver->{closure:Revolt\EventLoop\Internal\AbstractDriver::createLoopFiber():538}()
  #55 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(96): Fiber->start()
  #56 ...../vendor/revolt/event-loop/src/EventLoop/Internal/DriverSuspension.php(117): Revolt\EventLoop\Internal\AbstractDriver->{closure:Revolt\EventLoop\Internal\AbstractDriver::__construct():90}()
  #57 ...../vendor/amphp/amp/src/Internal/FutureIterator.php(133): Revolt\EventLoop\Internal\DriverSuspension->suspend()
  #58 ...../vendor/amphp/amp/src/Future.php(59): Amp\Internal\FutureIterator->consume()
  #59 ...../vendor/amphp/amp/src/Future/functions.php(151): Amp\Future::iterate(Array, NULL)
  #60 ...../src/client/client.php(55): Amp\Future\await(Array)
  #61 ...../src/client/client.php(93): manygreet('localhost:50052', 'world')
  #62 {main}

  Next Amp\Http\Client\HttpException: Failed to write request (stream 1) to socket: Invalid request phase: ResponseBody in ...../vendor/amphp/http-client/src/Connection/Internal/Http2ConnectionProcessor.php:1622
  Stack trace:
  #0 ...../vendor/amphp/http-client/src/Connection/Internal/Http2ConnectionProcessor.php(1060): Amp\Http\Client\Connection\Internal\Http2ConnectionProcessor->wrapException(Object(Error), 'Failed to write...')
  #1 ...../vendor/amphp/http-client/src/Connection/Http2Connection.php(104): Amp\Http\Client\Connection\Internal\Http2ConnectionProcessor->request(Object(Amp\Http\Client\Request), Object(Amp\CompositeCancellation), Object(Amp\Http\Client\Connection\HttpStream))
  #2 ...../vendor/amphp/http-client/src/Connection/HttpStream.php(96): Amp\Http\Client\Connection\Http2Connection->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\Connection\HttpStream))
  #3 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\Connection\HttpStream->{closure:Amp\Http\Client\Connection\HttpStream::request():96}(Object(Amp\Http\Client\Request))
  #4 ...../vendor/amphp/http-client/src/Connection/HttpStream.php(96): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #5 ...../vendor/amphp/http-client/src/Connection/ConnectionLimitingPool.php(131): Amp\Http\Client\Connection\HttpStream->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #6 ...../vendor/amphp/http-client/src/Connection/HttpStream.php(96): Amp\Http\Client\Connection\ConnectionLimitingPool->{closure:Amp\Http\Client\Connection\ConnectionLimitingPool::getStream():124}(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\Connection\HttpStream))
  #7 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\Connection\HttpStream->{closure:Amp\Http\Client\Connection\HttpStream::request():96}(Object(Amp\Http\Client\Request))
  #8 ...../vendor/amphp/http-client/src/Connection/HttpStream.php(96): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #9 ...../vendor/amphp/http-client/src/Interceptor/DecompressResponse.php(44): Amp\Http\Client\Connection\HttpStream->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #10 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(54): Amp\Http\Client\Interceptor\DecompressResponse->requestViaNetwork(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\Connection\HttpStream))
  #11 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\Connection\InterceptedStream->{closure:Amp\Http\Client\Connection\InterceptedStream::request():36}(Object(Amp\Http\Client\Request))
  #12 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(36): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #13 ...../vendor/amphp/http-client/src/Interceptor/ModifyRequest.php(36): Amp\Http\Client\Connection\InterceptedStream->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #14 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(54): Amp\Http\Client\Interceptor\ModifyRequest->requestViaNetwork(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\Connection\InterceptedStream))
  #15 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\Connection\InterceptedStream->{closure:Amp\Http\Client\Connection\InterceptedStream::request():36}(Object(Amp\Http\Client\Request))
  #16 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(36): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #17 ...../vendor/amphp/http-client/src/Interceptor/ModifyRequest.php(36): Amp\Http\Client\Connection\InterceptedStream->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #18 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(54): Amp\Http\Client\Interceptor\ModifyRequest->requestViaNetwork(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\Connection\InterceptedStream))
  #19 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\Connection\InterceptedStream->{closure:Amp\Http\Client\Connection\InterceptedStream::request():36}(Object(Amp\Http\Client\Request))
  #20 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(36): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #21 ...../vendor/amphp/http-client/src/Interceptor/ModifyRequest.php(36): Amp\Http\Client\Connection\InterceptedStream->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #22 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(54): Amp\Http\Client\Interceptor\ModifyRequest->requestViaNetwork(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\Connection\InterceptedStream))
  #23 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\Connection\InterceptedStream->{closure:Amp\Http\Client\Connection\InterceptedStream::request():36}(Object(Amp\Http\Client\Request))
  #24 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(36): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #25 ...../vendor/amphp/http-client/src/PooledHttpClient.php(38): Amp\Http\Client\Connection\InterceptedStream->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #26 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\PooledHttpClient->{closure:Amp\Http\Client\PooledHttpClient::request():29}(Object(Amp\Http\Client\Request))
  #27 ...../vendor/amphp/http-client/src/PooledHttpClient.php(29): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #28 ...../vendor/amphp/http-client/src/Interceptor/RetryRequests.php(34): Amp\Http\Client\PooledHttpClient->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #29 ...../vendor/amphp/http-client/src/InterceptedHttpClient.php(40): Amp\Http\Client\Interceptor\RetryRequests->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\PooledHttpClient))
  #30 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\InterceptedHttpClient->{closure:Amp\Http\Client\InterceptedHttpClient::request():28}(Object(Amp\Http\Client\Request))
  #31 ...../vendor/amphp/http-client/src/InterceptedHttpClient.php(28): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #32 ...../vendor/amphp/http-client/src/Interceptor/FollowRedirects.php(131): Amp\Http\Client\InterceptedHttpClient->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #33 ...../vendor/amphp/http-client/src/InterceptedHttpClient.php(40): Amp\Http\Client\Interceptor\FollowRedirects->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\InterceptedHttpClient))
  #34 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\InterceptedHttpClient->{closure:Amp\Http\Client\InterceptedHttpClient::request():28}(Object(Amp\Http\Client\Request))
  #35 ...../vendor/amphp/http-client/src/InterceptedHttpClient.php(28): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #36 ...../vendor/amphp/http-client/src/Interceptor/ModifyRequest.php(48): Amp\Http\Client\InterceptedHttpClient->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #37 ...../vendor/amphp/http-client/src/InterceptedHttpClient.php(40): Amp\Http\Client\Interceptor\ModifyRequest->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\InterceptedHttpClient))
  #38 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\InterceptedHttpClient->{closure:Amp\Http\Client\InterceptedHttpClient::request():28}(Object(Amp\Http\Client\Request))
  #39 ...../vendor/amphp/http-client/src/InterceptedHttpClient.php(28): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #40 ...../vendor/amphp/http-client/src/HttpClient.php(34): Amp\Http\Client\InterceptedHttpClient->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #41 ...../vendor/amphp/http-client/src/functions.php(30): Amp\Http\Client\HttpClient->{closure:Amp\Http\Client\HttpClient::request():34}(Object(Amp\Http\Client\Request))
  #42 ...../vendor/amphp/http-client/src/HttpClient.php(31): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #43 ...../lib/swag-industries/grpc/src/BidiStreamingCall.php(64): Amp\Http\Client\HttpClient->request(Object(Amp\Http\Client\Request))
  #44 ...../vendor/amphp/amp/src/functions.php(33): Grpc\BidiStreamingCall->{closure:Grpc\BidiStreamingCall::read():64}()
  #45 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(430): {closure:Amp\async():23}(NULL, NULL, Array)
  #46 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(567): Revolt\EventLoop\Internal\AbstractDriver->invokeMicrotasks()
  #47 [internal function]: Revolt\EventLoop\Internal\AbstractDriver->{closure:Revolt\EventLoop\Internal\AbstractDriver::createCallbackFiber():565}()
  #48 ...../vendor/revolt/event-loop/src/EventLoop/Internal/DriverSuspension.php(64): Fiber->resume('\x00\x00\x00\x00\n\n\x08Cl\xC3\xA9ment')
  #49 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(430): Revolt\EventLoop\Internal\DriverSuspension::{closure:Revolt\EventLoop\Internal\DriverSuspension::resume():61}()
  #50 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(621): Revolt\EventLoop\Internal\AbstractDriver->invokeMicrotasks()
  #51 [internal function]: Revolt\EventLoop\Internal\AbstractDriver->{closure:Revolt\EventLoop\Internal\AbstractDriver::createCallbackFiber():565}()
  #52 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(502): Fiber->resume()
  #53 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(558): Revolt\EventLoop\Internal\AbstractDriver->invokeCallbacks()
  #54 [internal function]: Revolt\EventLoop\Internal\AbstractDriver->{closure:Revolt\EventLoop\Internal\AbstractDriver::createLoopFiber():538}()
  #55 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(96): Fiber->start()
  #56 ...../vendor/revolt/event-loop/src/EventLoop/Internal/DriverSuspension.php(117): Revolt\EventLoop\Internal\AbstractDriver->{closure:Revolt\EventLoop\Internal\AbstractDriver::__construct():90}()
  #57 ...../vendor/amphp/amp/src/Internal/FutureIterator.php(133): Revolt\EventLoop\Internal\DriverSuspension->suspend()
  #58 ...../vendor/amphp/amp/src/Future.php(59): Amp\Internal\FutureIterator->consume()
  #59 ...../vendor/amphp/amp/src/Future/functions.php(151): Amp\Future::iterate(Array, NULL)
  #60 ...../src/client/client.php(55): Amp\Future\await(Array)
  #61 ...../src/client/client.php(93): manygreet('localhost:50052', 'world')
  #62 {main}
    thrown in ...../vendor/amphp/http-client/src/Connection/Internal/Http2ConnectionProcessor.php on line 1622

  Fatal error: Uncaught Error: Invalid request phase: ResponseBody in ...../vendor/amphp/http-client/src/Internal/EventInvoker.php:163
  Stack trace:
  #0 ...../vendor/amphp/http-client/src/Connection/Internal/Http2ConnectionProcessor.php(1049): Amp\Http\Client\Internal\EventInvoker->requestBodyProgress(Object(Amp\Http\Client\Request), Object(Amp\Http\Client\Connection\HttpStream))
  #1 ...../vendor/amphp/http-client/src/Connection/Http2Connection.php(104): Amp\Http\Client\Connection\Internal\Http2ConnectionProcessor->request(Object(Amp\Http\Client\Request), Object(Amp\CompositeCancellation), Object(Amp\Http\Client\Connection\HttpStream))
  #2 ...../vendor/amphp/http-client/src/Connection/HttpStream.php(96): Amp\Http\Client\Connection\Http2Connection->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\Connection\HttpStream))
  #3 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\Connection\HttpStream->{closure:Amp\Http\Client\Connection\HttpStream::request():96}(Object(Amp\Http\Client\Request))
  #4 ...../vendor/amphp/http-client/src/Connection/HttpStream.php(96): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #5 ...../vendor/amphp/http-client/src/Connection/ConnectionLimitingPool.php(131): Amp\Http\Client\Connection\HttpStream->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #6 ...../vendor/amphp/http-client/src/Connection/HttpStream.php(96): Amp\Http\Client\Connection\ConnectionLimitingPool->{closure:Amp\Http\Client\Connection\ConnectionLimitingPool::getStream():124}(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\Connection\HttpStream))
  #7 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\Connection\HttpStream->{closure:Amp\Http\Client\Connection\HttpStream::request():96}(Object(Amp\Http\Client\Request))
  #8 ...../vendor/amphp/http-client/src/Connection/HttpStream.php(96): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #9 ...../vendor/amphp/http-client/src/Interceptor/DecompressResponse.php(44): Amp\Http\Client\Connection\HttpStream->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #10 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(54): Amp\Http\Client\Interceptor\DecompressResponse->requestViaNetwork(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\Connection\HttpStream))
  #11 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\Connection\InterceptedStream->{closure:Amp\Http\Client\Connection\InterceptedStream::request():36}(Object(Amp\Http\Client\Request))
  #12 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(36): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #13 ...../vendor/amphp/http-client/src/Interceptor/ModifyRequest.php(36): Amp\Http\Client\Connection\InterceptedStream->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #14 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(54): Amp\Http\Client\Interceptor\ModifyRequest->requestViaNetwork(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\Connection\InterceptedStream))
  #15 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\Connection\InterceptedStream->{closure:Amp\Http\Client\Connection\InterceptedStream::request():36}(Object(Amp\Http\Client\Request))
  #16 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(36): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #17 ...../vendor/amphp/http-client/src/Interceptor/ModifyRequest.php(36): Amp\Http\Client\Connection\InterceptedStream->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #18 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(54): Amp\Http\Client\Interceptor\ModifyRequest->requestViaNetwork(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\Connection\InterceptedStream))
  #19 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\Connection\InterceptedStream->{closure:Amp\Http\Client\Connection\InterceptedStream::request():36}(Object(Amp\Http\Client\Request))
  #20 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(36): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #21 ...../vendor/amphp/http-client/src/Interceptor/ModifyRequest.php(36): Amp\Http\Client\Connection\InterceptedStream->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #22 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(54): Amp\Http\Client\Interceptor\ModifyRequest->requestViaNetwork(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\Connection\InterceptedStream))
  #23 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\Connection\InterceptedStream->{closure:Amp\Http\Client\Connection\InterceptedStream::request():36}(Object(Amp\Http\Client\Request))
  #24 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(36): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #25 ...../vendor/amphp/http-client/src/PooledHttpClient.php(38): Amp\Http\Client\Connection\InterceptedStream->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #26 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\PooledHttpClient->{closure:Amp\Http\Client\PooledHttpClient::request():29}(Object(Amp\Http\Client\Request))
  #27 ...../vendor/amphp/http-client/src/PooledHttpClient.php(29): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #28 ...../vendor/amphp/http-client/src/Interceptor/RetryRequests.php(34): Amp\Http\Client\PooledHttpClient->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #29 ...../vendor/amphp/http-client/src/InterceptedHttpClient.php(40): Amp\Http\Client\Interceptor\RetryRequests->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\PooledHttpClient))
  #30 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\InterceptedHttpClient->{closure:Amp\Http\Client\InterceptedHttpClient::request():28}(Object(Amp\Http\Client\Request))
  #31 ...../vendor/amphp/http-client/src/InterceptedHttpClient.php(28): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #32 ...../vendor/amphp/http-client/src/Interceptor/FollowRedirects.php(131): Amp\Http\Client\InterceptedHttpClient->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #33 ...../vendor/amphp/http-client/src/InterceptedHttpClient.php(40): Amp\Http\Client\Interceptor\FollowRedirects->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\InterceptedHttpClient))
  #34 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\InterceptedHttpClient->{closure:Amp\Http\Client\InterceptedHttpClient::request():28}(Object(Amp\Http\Client\Request))
  #35 ...../vendor/amphp/http-client/src/InterceptedHttpClient.php(28): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #36 ...../vendor/amphp/http-client/src/Interceptor/ModifyRequest.php(48): Amp\Http\Client\InterceptedHttpClient->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #37 ...../vendor/amphp/http-client/src/InterceptedHttpClient.php(40): Amp\Http\Client\Interceptor\ModifyRequest->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\InterceptedHttpClient))
  #38 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\InterceptedHttpClient->{closure:Amp\Http\Client\InterceptedHttpClient::request():28}(Object(Amp\Http\Client\Request))
  #39 ...../vendor/amphp/http-client/src/InterceptedHttpClient.php(28): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #40 ...../vendor/amphp/http-client/src/HttpClient.php(34): Amp\Http\Client\InterceptedHttpClient->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #41 ...../vendor/amphp/http-client/src/functions.php(30): Amp\Http\Client\HttpClient->{closure:Amp\Http\Client\HttpClient::request():34}(Object(Amp\Http\Client\Request))
  #42 ...../vendor/amphp/http-client/src/HttpClient.php(31): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #43 ...../lib/swag-industries/grpc/src/BidiStreamingCall.php(64): Amp\Http\Client\HttpClient->request(Object(Amp\Http\Client\Request))
  #44 ...../vendor/amphp/amp/src/functions.php(33): Grpc\BidiStreamingCall->{closure:Grpc\BidiStreamingCall::read():64}()
  #45 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(430): {closure:Amp\async():23}(NULL, NULL, Array)
  #46 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(567): Revolt\EventLoop\Internal\AbstractDriver->invokeMicrotasks()
  #47 [internal function]: Revolt\EventLoop\Internal\AbstractDriver->{closure:Revolt\EventLoop\Internal\AbstractDriver::createCallbackFiber():565}()
  #48 ...../vendor/revolt/event-loop/src/EventLoop/Internal/DriverSuspension.php(64): Fiber->resume('\x00\x00\x00\x00\n\n\x08Cl\xC3\xA9ment')
  #49 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(430): Revolt\EventLoop\Internal\DriverSuspension::{closure:Revolt\EventLoop\Internal\DriverSuspension::resume():61}()
  #50 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(621): Revolt\EventLoop\Internal\AbstractDriver->invokeMicrotasks()
  #51 [internal function]: Revolt\EventLoop\Internal\AbstractDriver->{closure:Revolt\EventLoop\Internal\AbstractDriver::createCallbackFiber():565}()
  #52 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(502): Fiber->resume()
  #53 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(558): Revolt\EventLoop\Internal\AbstractDriver->invokeCallbacks()
  #54 [internal function]: Revolt\EventLoop\Internal\AbstractDriver->{closure:Revolt\EventLoop\Internal\AbstractDriver::createLoopFiber():538}()
  #55 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(96): Fiber->start()
  #56 ...../vendor/revolt/event-loop/src/EventLoop/Internal/DriverSuspension.php(117): Revolt\EventLoop\Internal\AbstractDriver->{closure:Revolt\EventLoop\Internal\AbstractDriver::__construct():90}()
  #57 ...../vendor/amphp/amp/src/Internal/FutureIterator.php(133): Revolt\EventLoop\Internal\DriverSuspension->suspend()
  #58 ...../vendor/amphp/amp/src/Future.php(59): Amp\Internal\FutureIterator->consume()
  #59 ...../vendor/amphp/amp/src/Future/functions.php(151): Amp\Future::iterate(Array, NULL)
  #60 ...../src/client/client.php(55): Amp\Future\await(Array)
  #61 ...../src/client/client.php(93): manygreet('localhost:50052', 'world')
  #62 {main}

  Next Amp\Http\Client\HttpException: Failed to write request (stream 1) to socket: Invalid request phase: ResponseBody in ...../vendor/amphp/http-client/src/Connection/Internal/Http2ConnectionProcessor.php:1622
  Stack trace:
  #0 ...../vendor/amphp/http-client/src/Connection/Internal/Http2ConnectionProcessor.php(1060): Amp\Http\Client\Connection\Internal\Http2ConnectionProcessor->wrapException(Object(Error), 'Failed to write...')
  #1 ...../vendor/amphp/http-client/src/Connection/Http2Connection.php(104): Amp\Http\Client\Connection\Internal\Http2ConnectionProcessor->request(Object(Amp\Http\Client\Request), Object(Amp\CompositeCancellation), Object(Amp\Http\Client\Connection\HttpStream))
  #2 ...../vendor/amphp/http-client/src/Connection/HttpStream.php(96): Amp\Http\Client\Connection\Http2Connection->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\Connection\HttpStream))
  #3 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\Connection\HttpStream->{closure:Amp\Http\Client\Connection\HttpStream::request():96}(Object(Amp\Http\Client\Request))
  #4 ...../vendor/amphp/http-client/src/Connection/HttpStream.php(96): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #5 ...../vendor/amphp/http-client/src/Connection/ConnectionLimitingPool.php(131): Amp\Http\Client\Connection\HttpStream->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #6 ...../vendor/amphp/http-client/src/Connection/HttpStream.php(96): Amp\Http\Client\Connection\ConnectionLimitingPool->{closure:Amp\Http\Client\Connection\ConnectionLimitingPool::getStream():124}(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\Connection\HttpStream))
  #7 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\Connection\HttpStream->{closure:Amp\Http\Client\Connection\HttpStream::request():96}(Object(Amp\Http\Client\Request))
  #8 ...../vendor/amphp/http-client/src/Connection/HttpStream.php(96): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #9 ...../vendor/amphp/http-client/src/Interceptor/DecompressResponse.php(44): Amp\Http\Client\Connection\HttpStream->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #10 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(54): Amp\Http\Client\Interceptor\DecompressResponse->requestViaNetwork(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\Connection\HttpStream))
  #11 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\Connection\InterceptedStream->{closure:Amp\Http\Client\Connection\InterceptedStream::request():36}(Object(Amp\Http\Client\Request))
  #12 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(36): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #13 ...../vendor/amphp/http-client/src/Interceptor/ModifyRequest.php(36): Amp\Http\Client\Connection\InterceptedStream->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #14 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(54): Amp\Http\Client\Interceptor\ModifyRequest->requestViaNetwork(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\Connection\InterceptedStream))
  #15 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\Connection\InterceptedStream->{closure:Amp\Http\Client\Connection\InterceptedStream::request():36}(Object(Amp\Http\Client\Request))
  #16 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(36): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #17 ...../vendor/amphp/http-client/src/Interceptor/ModifyRequest.php(36): Amp\Http\Client\Connection\InterceptedStream->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #18 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(54): Amp\Http\Client\Interceptor\ModifyRequest->requestViaNetwork(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\Connection\InterceptedStream))
  #19 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\Connection\InterceptedStream->{closure:Amp\Http\Client\Connection\InterceptedStream::request():36}(Object(Amp\Http\Client\Request))
  #20 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(36): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #21 ...../vendor/amphp/http-client/src/Interceptor/ModifyRequest.php(36): Amp\Http\Client\Connection\InterceptedStream->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #22 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(54): Amp\Http\Client\Interceptor\ModifyRequest->requestViaNetwork(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\Connection\InterceptedStream))
  #23 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\Connection\InterceptedStream->{closure:Amp\Http\Client\Connection\InterceptedStream::request():36}(Object(Amp\Http\Client\Request))
  #24 ...../vendor/amphp/http-client/src/Connection/InterceptedStream.php(36): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #25 ...../vendor/amphp/http-client/src/PooledHttpClient.php(38): Amp\Http\Client\Connection\InterceptedStream->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #26 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\PooledHttpClient->{closure:Amp\Http\Client\PooledHttpClient::request():29}(Object(Amp\Http\Client\Request))
  #27 ...../vendor/amphp/http-client/src/PooledHttpClient.php(29): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #28 ...../vendor/amphp/http-client/src/Interceptor/RetryRequests.php(34): Amp\Http\Client\PooledHttpClient->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #29 ...../vendor/amphp/http-client/src/InterceptedHttpClient.php(40): Amp\Http\Client\Interceptor\RetryRequests->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\PooledHttpClient))
  #30 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\InterceptedHttpClient->{closure:Amp\Http\Client\InterceptedHttpClient::request():28}(Object(Amp\Http\Client\Request))
  #31 ...../vendor/amphp/http-client/src/InterceptedHttpClient.php(28): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #32 ...../vendor/amphp/http-client/src/Interceptor/FollowRedirects.php(131): Amp\Http\Client\InterceptedHttpClient->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #33 ...../vendor/amphp/http-client/src/InterceptedHttpClient.php(40): Amp\Http\Client\Interceptor\FollowRedirects->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\InterceptedHttpClient))
  #34 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\InterceptedHttpClient->{closure:Amp\Http\Client\InterceptedHttpClient::request():28}(Object(Amp\Http\Client\Request))
  #35 ...../vendor/amphp/http-client/src/InterceptedHttpClient.php(28): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #36 ...../vendor/amphp/http-client/src/Interceptor/ModifyRequest.php(48): Amp\Http\Client\InterceptedHttpClient->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #37 ...../vendor/amphp/http-client/src/InterceptedHttpClient.php(40): Amp\Http\Client\Interceptor\ModifyRequest->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation), Object(Amp\Http\Client\InterceptedHttpClient))
  #38 ...../vendor/amphp/http-client/src/functions.php(20): Amp\Http\Client\InterceptedHttpClient->{closure:Amp\Http\Client\InterceptedHttpClient::request():28}(Object(Amp\Http\Client\Request))
  #39 ...../vendor/amphp/http-client/src/InterceptedHttpClient.php(28): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #40 ...../vendor/amphp/http-client/src/HttpClient.php(34): Amp\Http\Client\InterceptedHttpClient->request(Object(Amp\Http\Client\Request), Object(Amp\NullCancellation))
  #41 ...../vendor/amphp/http-client/src/functions.php(30): Amp\Http\Client\HttpClient->{closure:Amp\Http\Client\HttpClient::request():34}(Object(Amp\Http\Client\Request))
  #42 ...../vendor/amphp/http-client/src/HttpClient.php(31): Amp\Http\Client\processRequest(Object(Amp\Http\Client\Request), Array, Object(Closure))
  #43 ...../lib/swag-industries/grpc/src/BidiStreamingCall.php(64): Amp\Http\Client\HttpClient->request(Object(Amp\Http\Client\Request))
  #44 ...../vendor/amphp/amp/src/functions.php(33): Grpc\BidiStreamingCall->{closure:Grpc\BidiStreamingCall::read():64}()
  #45 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(430): {closure:Amp\async():23}(NULL, NULL, Array)
  #46 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(567): Revolt\EventLoop\Internal\AbstractDriver->invokeMicrotasks()
  #47 [internal function]: Revolt\EventLoop\Internal\AbstractDriver->{closure:Revolt\EventLoop\Internal\AbstractDriver::createCallbackFiber():565}()
  #48 ...../vendor/revolt/event-loop/src/EventLoop/Internal/DriverSuspension.php(64): Fiber->resume('\x00\x00\x00\x00\n\n\x08Cl\xC3\xA9ment')
  #49 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(430): Revolt\EventLoop\Internal\DriverSuspension::{closure:Revolt\EventLoop\Internal\DriverSuspension::resume():61}()
  #50 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(621): Revolt\EventLoop\Internal\AbstractDriver->invokeMicrotasks()
  #51 [internal function]: Revolt\EventLoop\Internal\AbstractDriver->{closure:Revolt\EventLoop\Internal\AbstractDriver::createCallbackFiber():565}()
  #52 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(502): Fiber->resume()
  #53 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(558): Revolt\EventLoop\Internal\AbstractDriver->invokeCallbacks()
  #54 [internal function]: Revolt\EventLoop\Internal\AbstractDriver->{closure:Revolt\EventLoop\Internal\AbstractDriver::createLoopFiber():538}()
  #55 ...../vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php(96): Fiber->start()
  #56 ...../vendor/revolt/event-loop/src/EventLoop/Internal/DriverSuspension.php(117): Revolt\EventLoop\Internal\AbstractDriver->{closure:Revolt\EventLoop\Internal\AbstractDriver::__construct():90}()
  #57 ...../vendor/amphp/amp/src/Internal/FutureIterator.php(133): Revolt\EventLoop\Internal\DriverSuspension->suspend()
  #58 ...../vendor/amphp/amp/src/Future.php(59): Amp\Internal\FutureIterator->consume()
  #59 ...../vendor/amphp/amp/src/Future/functions.php(151): Amp\Future::iterate(Array, NULL)
  #60 ...../src/client/client.php(55): Amp\Future\await(Array)
  #61 ...../src/client/client.php(93): manygreet('localhost:50052', 'world')
  #62 {main}
    thrown in ...../vendor/amphp/http-client/src/Connection/Internal/Http2ConnectionProcessor.php on line 1622

Fix hints

The response needs to be writable while the request body is not finished yet

I have no definitive fix for now. But I started investigating the issue. It seems that this line may be change:

if ($previousPhase !== Phase::RequestBody) {

For this:

  public function requestBodyProgress(Request $request, Stream $stream): void
  {
      $previousPhase = self::getPhase($request);

      // requestBody can be processed while responsebody is in process.
      if (!in_array($previousPhase, [Phase::RequestBody, Phase::ResponseBody], true)) {
          throw new \Error('Invalid request phase: ' . $previousPhase->name);
      }

      $this->invoke($request, fn (EventListener $eventListener) => $eventListener->requestBodyProgress($request, $stream));
  }

Not sure what are the downsides at this point.

Hum. I figured out that the body of the response is never filled besides the server started to answer. It seems to be another problem :/ .

The request body needs to be streamable

I think the following code is blocking the streaming of the request:

$body = $request->getBody()->getContent();

Here is a test that may be interesting (it's WIP)

In Http2ConnectionTest:

    public function testResponseIsFullyAsynchronous(): void
    {
        echo "init\n";
        $writeStream = new Queue();
        $writeContent = StreamedContent::fromStream(new ReadableIterableStream($writeStream->iterate()));
        $request = new Request('http://localhost/', 'POST', $writeContent);

        echo "request\n";
        $writeStream->push('something');

        events()->requestStart($request);

        echo "start\n";
        $stream = $this->connection->getStream($request);

        EventLoop::queue(function (): void {
            delay(0.1);

            $this->server->write(self::packFrame($this->hpack->encode([
                [":status", (string) HttpStatus::OK],
            ]), Http2Parser::HEADERS, Http2Parser::END_HEADERS, 1));

            delay(0.1);

            $this->server->write(self::packFrame('test', Http2Parser::DATA, 0, 1));

            delay(0.1);

        });

        echo "start request\n";
        $response = $stream->request($request, new NullCancellation);

        self::assertSame(200, $response->getStatus());

        echo "first read\n";
        $foo = $response->getBody()->read();
        self::assertSame('test', $foo);

        $writeStream->push('wololo');

        $this->server->write(self::packFrame('test2', Http2Parser::DATA, 0, 1));

        $foo = $response->getBody()->read();
        self::assertSame('test2', $foo);

        $writeStream->complete();
    }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

1 participant