Skip to content

Commit

Permalink
Adding concurrent batch writes
Browse files Browse the repository at this point in the history
  • Loading branch information
David n. Jumani committed May 20, 2019
1 parent 41f0249 commit a18f213
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 5 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,14 @@ err := db.Table("Books").Get("ID", 555).One(dynamo.AWSEncoding(&someBook))

By default, tests are run in offline mode. Create a table called `TestDB`, with a Number Parition Key called `UserID` and a String Sort Key called `Time`. Change the table name with the environment variable `DYNAMO_TEST_TABLE`. You must specify `DYNAMO_TEST_REGION`, setting it to the AWS region where your test table is.

```bash
```bash
DYNAMO_TEST_REGION=us-west-2 go test github.com/guregu/dynamo/... -cover
```

Or simply run the following command to test it locally :

```bash
./run_tests.sh
```

### License
Expand Down
18 changes: 16 additions & 2 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

const batchSize = 101

func TestBatchGetWrite(t *testing.T) {
func testBatchGetWrite(t *testing.T, isSequential bool) {
if testDB == nil {
t.Skip(offlineSkipMsg)
}
Expand All @@ -29,7 +29,13 @@ func TestBatchGetWrite(t *testing.T) {
}

var wcc ConsumedCapacity
wrote, err := table.Batch().Write().Put(items...).ConsumedCapacity(&wcc).Run()
var wrote int
var err error
if isSequential {
wrote, err = table.Batch().Write().Put(items...).ConsumedCapacity(&wcc).Run()
} else {
wrote, err = table.Batch().Write().Put(items...).ConsumedCapacity(&wcc).RunConcurrently()
}
if wrote != batchSize {
t.Error("unexpected wrote:", wrote, "≠", batchSize)
}
Expand Down Expand Up @@ -90,3 +96,11 @@ func TestBatchGetWrite(t *testing.T) {
t.Error("expected 0 results, got", len(results))
}
}

func TestSequentialBatchGetWrite(t *testing.T) {
testBatchGetWrite(t, true)
}

func TestConcurrentBatchGetWrite(t *testing.T) {
testBatchGetWrite(t, false)
}
177 changes: 177 additions & 0 deletions batchwrite.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package dynamo

import (
"errors"
"math"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/cenkalti/backoff"
multierror "github.com/hashicorp/go-multierror"
)

// DynamoDB API limit, 25 operations per request
Expand Down Expand Up @@ -61,13 +63,188 @@ func (bw *BatchWrite) ConsumedCapacity(cc *ConsumedCapacity) *BatchWrite {
return bw
}

// Structure passed to the concurrent batch write operation
type batchRequest struct {
ctx aws.Context
ops []*dynamodb.WriteRequest
}

// Structure returned after a concurrent batch operation
type batchResponse struct {
Result *dynamodb.BatchWriteItemOutput
Error error
Wrote int
}

// Config used when calling RunConcurrently
type batchWriteConfig struct {
poolSize int
}

// Parameter type to be passed to RunConcurrently
type BatchWriteOption func(*batchWriteConfig)

// Sets the default config
func defaults(cfg *batchWriteConfig) {
cfg.poolSize = 10
}

// Sets the pool size to process the request
func WithPoolSize(poolSize int) BatchWriteOption {
return func(cfg *batchWriteConfig) {
cfg.poolSize = poolSize
}
}

func (bw *BatchWrite) writeBatch(ctx aws.Context, ops []*dynamodb.WriteRequest) batchResponse {

boff := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
wrote := 0

for {
var res *dynamodb.BatchWriteItemOutput
req := bw.input(ops)
err := retry(ctx, func() error {
var err error
res, err = bw.batch.table.db.client.BatchWriteItemWithContext(ctx, req)
return err
})
if err != nil {
return batchResponse{
Result: res,
Error: err,
Wrote: 0,
}
}
if bw.cc != nil {
for _, cc := range res.ConsumedCapacity {
addConsumedCapacity(bw.cc, cc)
}
}

unprocessed := res.UnprocessedItems[bw.batch.table.Name()]
wrote = len(ops) - len(unprocessed)
if len(unprocessed) == 0 {
return batchResponse{
Result: res,
Error: err,
Wrote: wrote,
}
}
ops = unprocessed

// need to sleep when re-requesting, per spec
if err := aws.SleepWithContext(ctx, boff.NextBackOff()); err != nil {
return batchResponse{
Result: nil,
Error: err,
Wrote: wrote,
}
}
}
}

func (bw *BatchWrite) writeBatchWorker(worker int, requests <-chan batchRequest, response chan<- batchResponse) {
for request := range requests {
response <- bw.writeBatch(request.ctx, request.ops)
}
}

func splitBatches(requests []*dynamodb.WriteRequest) (batches [][]*dynamodb.WriteRequest) {
batches = [][]*dynamodb.WriteRequest{}
requestsLength := len(requests)
for i := 0; i < requestsLength; i += maxWriteOps {
end := i + maxWriteOps
if end > requestsLength {
end = requestsLength
}
batches = append(batches, requests[i:end])
}
return batches
}

func min(a int, b int) int {
if a < b {
return a
}
return b
}

// RunConcurrently executes this batch concurrently with the pool size specified.
// By default, the pool size is 10
func (bw *BatchWrite) RunConcurrently(opts ...BatchWriteOption) (wrote int, err error) {
ctx, cancel := defaultContext()
defer cancel()
return bw.RunConcurrentlyWithContext(ctx, opts...)
}

func (bw *BatchWrite) RunConcurrentlyWithContext(ctx aws.Context, opts ...BatchWriteOption) (wrote int, err error) {

if bw.err != nil {
return 0, bw.err
}

cfg := new(batchWriteConfig)
defaults(cfg)
for _, fn := range opts {
fn(cfg)
}

// TODO : Can split the batches and run them concurrently ?
batches := splitBatches(bw.ops)
totalBatches := len(batches)

requests := make(chan batchRequest, totalBatches)
response := make(chan batchResponse, totalBatches)
defer close(response)

// Create the workers
for i := 0; i < cfg.poolSize; i++ {
go bw.writeBatchWorker(i, requests, response)
}

// Push the write requests
for i := 0; i < totalBatches; i++ {
requests <- batchRequest{
ctx: ctx,
ops: batches[i],
}
}
close(requests)

// Capture the response
wrote = 0
batchCounter := 0
for {
select {
case batchResponse, ok := <-response:
if !ok {
err = multierror.Append(err, errors.New("channel unexpectedly closed"))
return wrote, err
}
if batchResponse.Error != nil {
err = multierror.Append(err, batchResponse.Error)
}
wrote += batchResponse.Wrote
batchCounter++
if batchCounter == totalBatches {
return wrote, err
}
case <-ctx.Done():
err = multierror.Append(err, ctx.Err())
return wrote, err
}
}
}

// Run executes this batch.
// For batches with more than 25 operations, an error could indicate that
// some records have been written and some have not. Consult the wrote
// return amount to figure out which operations have succeeded.
func (bw *BatchWrite) Run() (wrote int, err error) {
ctx, cancel := defaultContext()
defer cancel()
// TODO : Perhaps use RunConcurrentlyWithContext(dynamo.WithPoolSize(1)) instead ?
return bw.RunWithContext(ctx)
}

Expand Down
9 changes: 7 additions & 2 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@ var (
const offlineSkipMsg = "DYNAMO_TEST_REGION not set"

func init() {
if region := os.Getenv("DYNAMO_TEST_REGION"); region != "" {
testDB = New(session.New(), &aws.Config{Region: aws.String(region)})
region := os.Getenv("DYNAMO_TEST_REGION")
endpoint := os.Getenv("DYNAMO_ENDPOINT")
if region != "" && endpoint != "" {
testDB = New(session.New(), &aws.Config{
Region: aws.String(region),
Endpoint: aws.String(endpoint),
})
}
if table := os.Getenv("DYNAMO_TEST_TABLE"); table != "" {
testTable = table
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ require (
github.com/cenkalti/backoff v2.1.1+incompatible
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gofrs/uuid v3.2.0+incompatible
github.com/hashicorp/go-multierror v1.0.0
github.com/stretchr/testify v1.3.0 // indirect
golang.org/x/net v0.0.0-20190318221613-d196dffd7c2b
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE=
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
27 changes: 27 additions & 0 deletions run_tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/usr/bin/env bash

docker rm -f dynamodb > /dev/null
docker run --name dynamodb -p 8000:8000 amazon/dynamodb-local > /dev/null &


export DYNAMO_ENDPOINT="http://localhost:8000"
export DYNAMO_TEST_REGION="us-west-2"
export DYNAMO_TEST_TABLE="TestDB"

aws dynamodb delete-table \
--table-name $DYNAMO_TEST_TABLE \
--endpoint-url $DYNAMO_ENDPOINT > /dev/null 2>&1

aws dynamodb create-table \
--table-name $DYNAMO_TEST_TABLE \
--attribute-definitions \
AttributeName=UserID,AttributeType=N \
AttributeName=Time,AttributeType=S \
--key-schema \
AttributeName=UserID,KeyType=HASH \
AttributeName=Time,KeyType=RANGE \
--provisioned-throughput ReadCapacityUnits=1000,WriteCapacityUnits=1000 \
--region $DYNAMO_TEST_REGION \
--endpoint-url $DYNAMO_ENDPOINT > /dev/null

go test . -cover

0 comments on commit a18f213

Please sign in to comment.