Skip to content

Commit

Permalink
fix(rest): do not override content type (#3024)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <[email protected]>
  • Loading branch information
ngjaying authored Jul 19, 2024
1 parent 5e18d8b commit 4c4206d
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 23 deletions.
46 changes: 25 additions & 21 deletions internal/io/http/rest_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/http/httptest"
"net/url"
"reflect"
"strconv"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -76,6 +77,9 @@ func TestRestSink_Apply(t *testing.T) {
config: map[string]interface{}{
"method": "post",
//"url": "http://localhost/test", //set dynamically to the test server
"headers": map[string]any{
"Content-Type": "application/vnd.microsoft.servicebus.json",
},
},
data: []map[string]interface{}{{
"ab": "hello1",
Expand All @@ -85,7 +89,7 @@ func TestRestSink_Apply(t *testing.T) {
result: []request{{
Method: "POST",
Body: `[{"ab":"hello1"},{"ab":"hello2"}]`,
ContentType: "application/json",
ContentType: "application/vnd.microsoft.servicebus.json",
}},
}, {
config: map[string]interface{}{
Expand Down Expand Up @@ -202,28 +206,28 @@ func TestRestSink_Apply(t *testing.T) {
tf, _ := transform.GenTransform("", "json", "", "", "", []string{})
defer ts.Close()
for i, tt := range tests {
requests = nil
ss, ok := tt.config["sendSingle"]
if !ok {
ss = false
}
s := &RestSink{}
tt.config["url"] = ts.URL
s.Configure(tt.config)
s.Open(ctx)
vCtx := context.WithValue(ctx, context.TransKey, tf)
if ss.(bool) {
for _, d := range tt.data {
s.Collect(vCtx, d)
t.Run(strconv.Itoa(i), func(t *testing.T) {
requests = nil
ss, ok := tt.config["sendSingle"]
if !ok {
ss = false
}
s := &RestSink{}
tt.config["url"] = ts.URL
s.Configure(tt.config)
s.Open(ctx)
vCtx := context.WithValue(ctx, context.TransKey, tf)
if ss.(bool) {
for _, d := range tt.data {
s.Collect(vCtx, d)
}
} else {
s.Collect(vCtx, tt.data)
}
} else {
s.Collect(vCtx, tt.data)
}

s.Close(ctx)
if !reflect.DeepEqual(tt.result, requests) {
t.Errorf("%d \tresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.result, requests)
}
s.Close(ctx)
assert.Equal(t, tt.result, requests)
})
}
}

Expand Down
8 changes: 6 additions & 2 deletions internal/pkg/httpx/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ func WithBody(body any, bodyType string, retErrOnConvertFailed bool, compressor
}
req.Body = rc
// set content type with body type
req.Header.Set("Content-Type", BodyTypeMap[bodyType])
if req.Header.Get("Content-Type") == "" {
req.Header.Set("Content-Type", BodyTypeMap[bodyType])
}
case "form":
form := url.Values{}
im, err := convertToMap(body, retErrOnConvertFailed)
Expand All @@ -136,7 +138,9 @@ func WithBody(body any, bodyType string, retErrOnConvertFailed bool, compressor

encodedFormBody := form.Encode()
req.Body = io.NopCloser(strings.NewReader(encodedFormBody))
req.Header.Set("Content-Type", "application/x-www-form-urlencoded;param=value")
if req.Header.Get("Content-Type") == "" {
req.Header.Set("Content-Type", "application/x-www-form-urlencoded;param=value")
}
default:
return fmt.Errorf("unsupported body type %s", bodyType)
}
Expand Down

0 comments on commit 4c4206d

Please sign in to comment.