Skip to content

Commit 69296f6

Browse files
committed
Implement streaming S3 facility
1 parent e73ac45 commit 69296f6

File tree

8 files changed

+309
-15
lines changed

8 files changed

+309
-15
lines changed

README.md

+2
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ Examples:
117117
```s3sync --tk KEY2 --ts SECRET2 --sk KEY1 --ss SECRET1 --se "http://127.0.0.1:7484" --te "http://127.0.0.1:7484" -w 128 s3://shared s3://shared_new```
118118
* Sync one Amazon bucket directory to another Amazon bucket:
119119
```s3sync --tk KEY2 --ts SECRET2 --sk KEY1 --ss SECRET1 -w 128 s3://shared/test/ s3://shared_new```
120+
* Use streaming S3 transfers (reduces memory, might be slower for small files):
121+
```s3sync --sk KEY --ss SECRET --tk KEY --ts SECRET -w 128 s3s://shared/test/ s3s://shared_new```
120122

121123
SOURCE and TARGET should be a directory. Syncing of single file are not supported (This will not work `s3sync --sk KEY --ss SECRET s3://shared/megafile.zip fs:///opt/backups/s3/`)
122124

cli/cli.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,16 @@ package main
22

33
import (
44
"fmt"
5-
"github.com/alexflint/go-arg"
6-
"github.com/larrabee/s3sync/storage"
7-
"github.com/mattn/go-isatty"
85
"net/url"
96
"os"
107
"strconv"
118
"strings"
129
"time"
1310
"unicode"
11+
12+
"github.com/alexflint/go-arg"
13+
"github.com/larrabee/s3sync/storage"
14+
"github.com/mattn/go-isatty"
1415
)
1516

1617
var (
@@ -202,6 +203,10 @@ func parseConn(cStr string) (conn connect, err error) {
202203
conn.Type = storage.TypeS3
203204
conn.Bucket = u.Host
204205
conn.Path = strings.TrimPrefix(u.Path, "/")
206+
case "s3s":
207+
conn.Type = storage.TypeS3Stream
208+
conn.Bucket = u.Host
209+
conn.Path = strings.TrimPrefix(u.Path, "/")
205210
case "fs":
206211
conn.Type = storage.TypeFS
207212
conn.Path = strings.TrimPrefix(cStr, "fs://")

cli/setup.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ package main
33
import (
44
"context"
55
"fmt"
6+
"os"
7+
68
"github.com/larrabee/s3sync/pipeline"
79
"github.com/larrabee/s3sync/pipeline/collection"
810
"github.com/larrabee/s3sync/storage"
911
"github.com/larrabee/s3sync/storage/fs"
1012
"github.com/larrabee/s3sync/storage/s3"
11-
"os"
13+
"github.com/larrabee/s3sync/storage/s3stream"
1214
)
1315

1416
func setupStorages(ctx context.Context, syncGroup *pipeline.Group, cli *argsParsed) error {
@@ -18,6 +20,10 @@ func setupStorages(ctx context.Context, syncGroup *pipeline.Group, cli *argsPars
1820
sourceStorage = s3.NewS3Storage(cli.SourceNoSign, cli.SourceKey, cli.SourceSecret, cli.SourceToken, cli.SourceRegion, cli.SourceEndpoint,
1921
cli.Source.Bucket, cli.Source.Path, cli.S3KeysPerReq, cli.S3Retry, cli.S3RetryInterval,
2022
)
23+
case storage.TypeS3Stream:
24+
sourceStorage = s3stream.NewS3StreamStorage(cli.SourceNoSign, cli.SourceKey, cli.SourceSecret, cli.SourceToken, cli.SourceRegion, cli.SourceEndpoint,
25+
cli.Source.Bucket, cli.Source.Path, cli.S3KeysPerReq, cli.S3Retry, cli.S3RetryInterval,
26+
)
2127
case storage.TypeFS:
2228
sourceStorage = fs.NewFSStorage(cli.Source.Path, cli.FSFilePerm, cli.FSDirPerm, os.Getpagesize()*256*32, !cli.FSDisableXattr, cli.ErrorHandlingMask, cli.FSAtomicWrite)
2329
}
@@ -27,6 +33,10 @@ func setupStorages(ctx context.Context, syncGroup *pipeline.Group, cli *argsPars
2733
targetStorage = s3.NewS3Storage(cli.TargetNoSign, cli.TargetKey, cli.TargetSecret, cli.TargetToken, cli.TargetRegion, cli.TargetEndpoint,
2834
cli.Target.Bucket, cli.Target.Path, cli.S3KeysPerReq, cli.S3Retry, cli.S3RetryInterval,
2935
)
36+
case storage.TypeS3Stream:
37+
targetStorage = s3stream.NewS3StreamStorage(cli.TargetNoSign, cli.TargetKey, cli.TargetSecret, cli.TargetToken, cli.TargetRegion, cli.TargetEndpoint,
38+
cli.Target.Bucket, cli.Target.Path, cli.S3KeysPerReq, cli.S3Retry, cli.S3RetryInterval,
39+
)
3040
case storage.TypeFS:
3141
targetStorage = fs.NewFSStorage(cli.Target.Path, cli.FSFilePerm, cli.FSDirPerm, 0, !cli.FSDisableXattr, cli.ErrorHandlingMask, cli.FSAtomicWrite)
3242
}

pipeline/collection/misc.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ var Logger pipeline.StepFn = func(group *pipeline.Group, stepNum int, input <-ch
2828
if ok {
2929
cfg.WithFields(logrus.Fields{
3030
"key": *obj.Key,
31-
"size": len(*obj.Content),
31+
"size": *obj.ContentLength,
3232
"Content-Type": *obj.ContentType,
3333
}).Infof("Sync file")
3434
output <- obj

storage/fs/fs.go

+9-5
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,17 @@ import (
55
"context"
66
"encoding/json"
77
"errors"
8-
"github.com/karrick/godirwalk"
9-
"github.com/larrabee/ratelimit"
10-
"github.com/larrabee/s3sync/storage"
11-
"github.com/pkg/xattr"
128
"io"
139
"io/ioutil"
1410
"mime"
1511
"os"
1612
"path/filepath"
1713
"strings"
14+
15+
"github.com/karrick/godirwalk"
16+
"github.com/larrabee/ratelimit"
17+
"github.com/larrabee/s3sync/storage"
18+
"github.com/pkg/xattr"
1819
)
1920

2021
const tempFileSuffixLen = 8
@@ -188,7 +189,10 @@ func (st *FSStorage) GetObjectContent(obj *storage.Object) error {
188189
return err
189190
}
190191

192+
dataSize := int64(len(data))
193+
191194
obj.Content = &data
195+
obj.ContentLength = &dataSize
192196

193197
if err := st.GetObjectMeta(obj); err != nil {
194198
return err
@@ -256,4 +260,4 @@ func (st *FSStorage) DeleteObject(obj *storage.Object) error {
256260
}
257261

258262
return nil
259-
}
263+
}

storage/s3/s3.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,18 @@ package s3
33
import (
44
"bytes"
55
"context"
6+
"io"
7+
"net/url"
8+
"strings"
9+
"time"
10+
611
"github.com/aws/aws-sdk-go/aws"
712
"github.com/aws/aws-sdk-go/aws/credentials"
813
"github.com/aws/aws-sdk-go/aws/defaults"
914
"github.com/aws/aws-sdk-go/aws/session"
1015
"github.com/aws/aws-sdk-go/service/s3"
1116
"github.com/larrabee/ratelimit"
1217
"github.com/larrabee/s3sync/storage"
13-
"io"
14-
"net/url"
15-
"strings"
16-
"time"
1718
)
1819

1920
// S3Storage configuration.
@@ -186,6 +187,7 @@ func (st *S3Storage) GetObjectContent(obj *storage.Object) error {
186187
data := buf.Bytes()
187188
obj.Content = &data
188189
obj.ContentType = result.ContentType
190+
obj.ContentLength = result.ContentLength
189191
obj.ContentDisposition = result.ContentDisposition
190192
obj.ContentEncoding = result.ContentEncoding
191193
obj.ContentLanguage = result.ContentLanguage

0 commit comments

Comments
 (0)