Skip to content

Commit

Permalink
feat: add file upload
Browse files Browse the repository at this point in the history
  • Loading branch information
ernado committed Feb 3, 2025
1 parent 1157f8b commit 8087444
Show file tree
Hide file tree
Showing 18 changed files with 1,025 additions and 59 deletions.
36 changes: 35 additions & 1 deletion _oas/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,33 @@ info:
servers:
- url: 'https://localhost:8080'
paths:
/upload:
post:
operationId: "uploadFile"
description: "Upload a file"
requestBody:
required: true
content:
multipart/form-data:
schema:
type: object
required:
- file
properties:
file:
type: string
format: binary
iterations:
type: integer
responses:
200:
description: "File uploaded successfully"
content:
application/json:
schema:
$ref: "#/components/schemas/UploadResponse"
default:
$ref: "#/components/responses/Error"
/status:
get:
operationId: "status"
Expand Down Expand Up @@ -34,7 +61,14 @@ components:
message:
type: string
required: [ message ]

UploadResponse:
type: object
properties:
message:
type: string
hash:
type: string
required: [ message, hash ]
Error:
type: object
description: "error description"
Expand Down
5 changes: 4 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ services:
context: .
dockerfile: build.Dockerfile
restart: always
command: ["client"]
command:
- "client"
- --upload-rps=1
- --upload-hash-iterations=500
environment:
- SERVER_ADDR=http://server:8080
- OTEL_ZAP_TEE=0
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ toolchain go1.22.10
require (
github.com/go-faster/errors v0.7.1
github.com/go-faster/jx v1.1.0
github.com/go-faster/sdk v0.22.0
github.com/go-faster/sdk v0.23.0
github.com/ogen-go/ogen v1.9.0
github.com/rs/cors v1.11.1
github.com/spf13/cobra v1.8.1
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0
go.opentelemetry.io/otel v1.34.0
go.opentelemetry.io/otel/metric v1.34.0
go.opentelemetry.io/otel/trace v1.34.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
golang.org/x/sync v0.10.0
golang.org/x/time v0.9.0
)

require (
Expand Down Expand Up @@ -72,7 +74,6 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
go.uber.org/automaxprocs v1.6.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/exp v0.0.0-20230725093048-515e97ebf090 // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/sys v0.29.0 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ github.com/go-faster/errors v0.7.1 h1:MkJTnDoEdi9pDabt1dpWf7AA8/BaSYZqibYyhZ20AY
github.com/go-faster/errors v0.7.1/go.mod h1:5ySTjWFiphBs07IKuiL69nxdfd5+fzh1u7FPGZP2quo=
github.com/go-faster/jx v1.1.0 h1:ZsW3wD+snOdmTDy9eIVgQdjUpXRRV4rqW8NS3t+20bg=
github.com/go-faster/jx v1.1.0/go.mod h1:vKDNikrKoyUmpzaJ0OkIkRQClNHFX/nF3dnTJZb3skg=
github.com/go-faster/sdk v0.22.0 h1:s/Pq3lJ00MrHMoEUzeAemPz7kXzfETw2iN6outyOHoY=
github.com/go-faster/sdk v0.22.0/go.mod h1:UJWFlbuRJHmXJwl4JxStMbbIZtMAz4fxrD4CnuDXCIc=
github.com/go-faster/sdk v0.23.0 h1:A7+DHEUau3N8uzqh8GY2l3p0YuhdIGYxLF2p4EmYcWw=
github.com/go-faster/sdk v0.23.0/go.mod h1:UJWFlbuRJHmXJwl4JxStMbbIZtMAz4fxrD4CnuDXCIc=
github.com/go-faster/yaml v0.4.6 h1:lOK/EhI04gCpPgPhgt0bChS6bvw7G3WwI8xxVe0sw9I=
github.com/go-faster/yaml v0.4.6/go.mod h1:390dRIvV4zbnO7qC9FGo6YYutc+wyyUSHBgbXL52eXk=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
Expand Down Expand Up @@ -181,6 +181,8 @@ golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY=
golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.29.0 h1:Xx0h3TtM9rzQpQuR4dKLrdglAmCEN5Oi+P74JdhdzXE=
golang.org/x/tools v0.29.0/go.mod h1:KMQVMRsVxU6nHCFXrBPhDB8XncLNLM0lIy/F14RP588=
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f h1:gap6+3Gk41EItBuyi4XX/bp4oqJ3UwuIMl25yGinuAA=
Expand Down
150 changes: 126 additions & 24 deletions internal/cmd/client.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,37 @@
package cmd

import (
"bytes"
"context"
"crypto/sha256"
"fmt"
"math/rand"
"net/http"
"os"
"time"

"github.com/go-faster/errors"
sdka "github.com/go-faster/sdk/app"
"github.com/go-faster/sdk/zctx"
ohttp "github.com/ogen-go/ogen/http"
"github.com/spf13/cobra"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"

"github.com/go-faster/simon/internal/app"
"github.com/go-faster/simon/internal/oas"
)

func cmdClient() *cobra.Command {
return &cobra.Command{
var arg struct {
UploadRPS int
UploadHashIterations int
}
cmd := &cobra.Command{
Use: "client",
Short: "Run a HTTP client",
Run: func(cmd *cobra.Command, args []string) {
Expand All @@ -44,37 +57,126 @@ func cmdClient() *cobra.Command {
if err != nil {
return errors.Wrap(err, "client")
}
ticker := time.NewTicker(time.Second)
tracer := t.TracerProvider().Tracer("")
tick := func() {
ctx, cancel := context.WithTimeout(ctx, time.Millisecond*250)
defer cancel()
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
ticker := time.NewTicker(time.Second)
tracer := t.TracerProvider().Tracer("")
tick := func() {
ctx, cancel := context.WithTimeout(ctx, time.Millisecond*250)
defer cancel()

ctx, span := tracer.Start(ctx, "client.tick")
defer span.End()
ctx, span := tracer.Start(ctx, "client.tick")
defer span.End()

lg := zctx.From(ctx)
lg.Info("Sending request")
lg := zctx.From(ctx)
lg.Info("Sending request")

status, err := c.Status(ctx)
if err != nil {
lg.Error("Request failed", zap.Error(err))
return
status, err := c.Status(ctx)
if err != nil {
lg.Error("Request failed", zap.Error(err))
return
}
lg.Info("Request succeeded", zap.String("message", status.Message))
}
lg.Info("Request succeeded", zap.String("message", status.Message))
}
tick()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
tick()
tick()
for {
select {
case <-t.ShutdownContext().Done():
return ctx.Err()
case <-ticker.C:
tick()
}
}
}
})
g.Go(func() error {
// Uploads.
const burst = 1
limiter := rate.NewLimiter(rate.Limit(arg.UploadRPS), burst)
rnd := rand.New(rand.NewSource(10)) // #nosec G404

tracer := t.TracerProvider().Tracer("")

tick := func() error {
if err := limiter.Wait(ctx); err != nil {
return errors.Wrap(err, "limiter")
}

ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()

ctx, span := tracer.Start(ctx, "client.upload",
trace.WithAttributes(
attribute.Int("rps", arg.UploadRPS),
attribute.Int("hash_iterations", arg.UploadHashIterations),
),
)
defer span.End()

lg := zctx.From(ctx)
lg.Info("Uploading data")

// Generate payload.
const payloadSize = 1024 * 1024 * 1 // 1MB
payload := make([]byte, payloadSize)
if _, err := rnd.Read(payload); err != nil {
return errors.Wrap(err, "gen payload")
}

msg, err := c.UploadFile(ctx, &oas.UploadFileReq{
File: ohttp.MultipartFile{
Name: "random.bin",
Size: int64(len(payload)),
File: bytes.NewReader(payload),
},
Iterations: oas.NewOptInt(arg.UploadHashIterations),
})
if err != nil {
return errors.Wrap(err, "upload file")
}

lg.Info("Upload succeeded", zap.String("hash", msg.Hash))
// Verifying hash.
h := sha256.New()
for i := 0; i < arg.UploadHashIterations; i++ {
if _, err := h.Write(payload); err != nil {
return errors.Wrap(err, "write")
}
}
gotHash := fmt.Sprintf("%x", h.Sum(nil))
span.AddEvent("Hash verification",
trace.WithAttributes(
attribute.String("expected", gotHash),
attribute.String("got", msg.Hash),
attribute.Bool("equal", gotHash == msg.Hash),
),
)

return nil
}

lg := zctx.From(ctx)
for {
select {
case <-t.ShutdownContext().Done():
return ctx.Err()
default:
if err := tick(); err != nil {
lg.Error("Upload failed", zap.Error(err))
} else {
lg.Info("Upload succeeded")
}
}
}
})
return g.Wait()
},
sdka.WithServiceName("simon.client"),
)
},
}

cmd.Flags().IntVar(&arg.UploadRPS, "upload-rps", 1, "Upload requests per second")
cmd.Flags().IntVar(&arg.UploadHashIterations, "upload-hash-iterations", 10, "Upload hash iterations")

return cmd
}
Loading

0 comments on commit 8087444

Please sign in to comment.