Skip to content

Commit

Permalink
Fix empty request body on retries with compression enabled (#543)
Browse files Browse the repository at this point in the history
* fix empty request body on retries with compression enabled

Signed-off-by: merlinz01 <[email protected]>

* update changelog

Signed-off-by: merlinz01 <[email protected]>

* fix lint error

Signed-off-by: merlinz01 <[email protected]>

* update changelog

Signed-off-by: merlinz01 <[email protected]>

---------

Signed-off-by: merlinz01 <[email protected]>
  • Loading branch information
merlinz01 authored May 14, 2024
1 parent 41fd58f commit 9cb314d
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 3 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)

### Fixed

- Fixes empty request body on retry with compression enabled ([#543](https://github.com/opensearch-project/opensearch-go/pull/543))

### Security

### Dependencies
Expand Down
9 changes: 6 additions & 3 deletions opensearchtransport/opensearchtransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,9 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) {
}

req.GetBody = func() (io.ReadCloser, error) {
return io.NopCloser(buf), nil
// We have to return a new reader each time so that retries don't read from an already-consumed body.
reader := bytes.NewReader(buf.Bytes())
return io.NopCloser(reader), nil
}
//nolint:errcheck // error is always nil
req.Body, _ = req.GetBody()
Expand All @@ -258,8 +260,9 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) {
//nolint:errcheck // ignored as this is only for logging
buf.ReadFrom(req.Body)
req.GetBody = func() (io.ReadCloser, error) {
r := buf
return io.NopCloser(&r), nil
// Return a new reader each time
reader := bytes.NewReader(buf.Bytes())
return io.NopCloser(reader), nil
}
//nolint:errcheck // error is always nil
req.Body, _ = req.GetBody()
Expand Down
41 changes: 41 additions & 0 deletions opensearchtransport/opensearchtransport_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,47 @@ func TestTransportPerformRetries(t *testing.T) {
}
})

t.Run("Reset request body during retry with request body compression", func(t *testing.T) {
var bodies []string
u, _ := url.Parse("https://foo.com/bar")
tp, _ := New(
Config{
URLs: []*url.URL{u},
CompressRequestBody: true,
Transport: &mockTransp{
RoundTripFunc: func(req *http.Request) (*http.Response, error) {
body, err := io.ReadAll(req.Body)
if err != nil {
panic(err)
}
bodies = append(bodies, string(body))
return &http.Response{Status: "MOCK", StatusCode: http.StatusBadGateway}, nil
},
},
},
)

foobar := "FOOBAR"
foobarGzipped := "\x1f\x8b\b\x00\x00\x00\x00\x00\x00\xffr\xf3\xf7wr\f\x02\x04\x00\x00\xff\xff\x13\xd8\x0en\x06\x00\x00\x00"

req, _ := http.NewRequest(http.MethodPost, "/abc", strings.NewReader(foobar))
//nolint:bodyclose // Mock response does not have a body to close
res, err := tp.Perform(req)
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
_ = res

if n := len(bodies); n != 4 {
t.Fatalf("expected 4 requests, got %d", n)
}
for i, body := range bodies {
if body != foobarGzipped {
t.Fatalf("request %d body: expected %q, got %q", i, foobarGzipped, body)
}
}
})

t.Run("Don't retry request on regular error", func(t *testing.T) {
var i int

Expand Down

0 comments on commit 9cb314d

Please sign in to comment.