Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add glob support #116

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 55 additions & 12 deletions cmd/byctl/cat.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"errors"
"fmt"

"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -30,35 +31,77 @@ var catCmd = &cli.Command{
return err
}

var storePathMap = make(map[*operations.SingleOperator][]string)
for i := 0; i < c.Args().Len(); i++ {
conn, key, err := cfg.ParseProfileInput(c.Args().Get(i))
arg := c.Args().Get(i)
conn, key, err := cfg.ParseProfileInput(arg)
if err != nil {
logger.Error("parse profile input from src", zap.Error(err))
logger.Error("parse profile input from target", zap.Error(err))
continue
}

store, err := services.NewStoragerFromString(conn)
if err != nil {
logger.Error("init src storager", zap.Error(err), zap.String("conn string", conn))
logger.Error("init target storager", zap.Error(err), zap.String("conn string", conn))
continue
}

so := operations.NewSingleOperator(store)

ch, err := so.CatFile(key)
if err != nil {
logger.Error("run cat", zap.Error(err))
continue
if hasMeta(key) {
objects, err := so.Glob(key)
if err != nil {
logger.Error("glob", zap.Error(err), zap.String("path", arg))
continue
}
for _, o := range objects {
if o.Mode.IsDir() {
// so.StatStorager().Service + ":" + o.Path
fmt.Printf("cat: '%s': Is a directory\n", o.Path)
continue
}
storePathMap[so] = append(storePathMap[so], o.Path)
}
} else {
o, err := so.Stat(key)
if err != nil {
if errors.Is(err, services.ErrObjectNotExist) {
fmt.Printf("cat: '%s': No such file or directory\n", arg)
} else {
logger.Error("stat", zap.Error(err), zap.String("path", arg))
}
continue
} else {
if o.Mode.IsDir() {
fmt.Printf("cat: '%s': Is a directory\n", arg)
continue
} else if o.Mode.IsPart() {
fmt.Printf("cat: '%s': Is an in progress multipart upload task\n", arg)
continue
}
}
err = nil
storePathMap[so] = append(storePathMap[so], key)
}
}

for v := range ch {
if v.Error != nil {
logger.Error("cat", zap.Error(err))
for so, paths := range storePathMap {
for _, path := range paths {
ch, err := so.CatFile(path)
if err != nil {
logger.Error("run cat", zap.Error(err))
continue
}
}

fmt.Printf("\n")
for v := range ch {
if v.Error != nil {
logger.Error("cat", zap.Error(err))
continue
}
}

fmt.Printf("\n")
}
}

return nil
Expand Down
203 changes: 121 additions & 82 deletions cmd/byctl/cp.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,38 +58,6 @@ var cpCmd = &cli.Command{
return err
}

argsNum := c.Args().Len()

dstConn, dstKey, err := cfg.ParseProfileInput(c.Args().Get(argsNum - 1))
if err != nil {
logger.Error("parse profile input from dst", zap.Error(err))
return err
}

dst, err := services.NewStoragerFromString(dstConn)
if err != nil {
logger.Error("init dst storager", zap.Error(err), zap.String("conn string", dstConn))
return err
}

dstSo := operations.NewSingleOperator(dst)

dstObject, err := dstSo.Stat(dstKey)
if err != nil {
if errors.Is(err, services.ErrObjectNotExist) {
err = nil
} else {
logger.Error("stat", zap.Error(err), zap.String("dst path", dstKey))
return err
}
}
if argsNum > 2 {
if dstObject != nil && !dstObject.Mode.IsDir() {
fmt.Printf("cp: target '%s' is not a directory\n", dstKey)
return fmt.Errorf("cp: target '%s' is not a directory", dstKey)
}
}

// Handle read pairs.
var readPairs []types.Pair
if c.IsSet(flagReadSpeedLimitName) {
Expand Down Expand Up @@ -127,79 +95,150 @@ var cpCmd = &cli.Command{
return err
}

for i := 0; i < argsNum-1; i++ {
srcConn, srcKey, err := cfg.ParseProfileInput(c.Args().Get(i))
// parse src args
srcNum := 0
var storeObjectMap = make(map[types.Storager][]*types.Object)
for i := 0; i < c.Args().Len()-1; i++ {
arg := c.Args().Get(i)
conn, key, err := cfg.ParseProfileInput(arg)
if err != nil {
logger.Error("parse profile input from src", zap.Error(err))
continue
}

src, err := services.NewStoragerFromString(srcConn)
if err != nil {
logger.Error("init src storager", zap.Error(err), zap.String("conn string", srcConn))
continue
}

so := operations.NewSingleOperator(src)

srcObject, err := so.Stat(srcKey)
store, err := services.NewStoragerFromString(conn)
if err != nil {
logger.Error("stat", zap.String("path", srcKey), zap.Error(err))
logger.Error("init src storager", zap.Error(err), zap.String("conn string", conn))
continue
}

if srcObject.Mode.IsDir() && !c.Bool(cpFlagRecursive) {
fmt.Printf("cp: -r not specified; omitting directory '%s'\n", srcKey)
continue
}
so := operations.NewSingleOperator(store)

var size int64
if srcObject.Mode.IsRead() {
n, ok := srcObject.GetContentLength()
if !ok {
logger.Error("can't get object content length", zap.String("path", srcKey))
if hasMeta(key) {
objects, err := so.Glob(key)
if err != nil {
logger.Error("glob", zap.Error(err), zap.String("path", arg))
continue
}
size = n
}
for _, o := range objects {
if o.Mode.IsDir() && !c.Bool(cpFlagRecursive) {
// so.StatStorager().Service + ":" + o.Path
fmt.Printf("cp: -r not specified; omitting directory '%s'\n", o.Path)
continue
}
storeObjectMap[store] = append(storeObjectMap[store], o)
srcNum++
}
} else {
o, err := so.Stat(key)
if err != nil && !errors.Is(err, services.ErrObjectNotExist) {
if errors.Is(err, services.ErrObjectNotExist) {
fmt.Printf("cp: cannot stat '%s': No such file or directory\n", arg)
} else {
logger.Error("stat", zap.Error(err), zap.String("path", arg))
}
continue
}
if err == nil {
if o.Mode.IsDir() && !c.Bool(cpFlagRecursive) {
fmt.Printf("cp: -r not specified; omitting directory '%s'\n", arg)
continue
} else if o.Mode.IsPart() {
fmt.Printf("cp: cannot copy '%s': Is an in progress multipart upload task\n", arg)
continue
}
}

do := operations.NewDualOperator(src, dst)
if c.IsSet(flagWorkersName) {
do.WithWorkers(c.Int(flagWorkersName))
err = nil
storeObjectMap[store] = append(storeObjectMap[store], o)
srcNum++
}
}

// set read pairs
do.WithReadPairs(readPairs...)
// set write pairs
do.WithWritePairs(writePairs...)
// check dst
dstConn, dstKey, err := cfg.ParseProfileInput(c.Args().Get(c.Args().Len() - 1))
if err != nil {
logger.Error("parse profile input from dst", zap.Error(err))
return err
}

realDstKey := dstKey
if argsNum > 2 || (dstObject != nil && dstObject.Mode.IsDir()) {
realDstKey = filepath.Join(dstKey, filepath.Base(srcKey))
}
dst, err := services.NewStoragerFromString(dstConn)
if err != nil {
logger.Error("init dst storager", zap.Error(err), zap.String("conn string", dstConn))
return err
}

dstSo := operations.NewSingleOperator(dst)

var ch chan *operations.EmptyResult
if c.Bool(cpFlagRecursive) && srcObject.Mode.IsDir() {
ch, err = do.CopyRecursively(srcKey, realDstKey, multipartThreshold)
} else if size < multipartThreshold {
ch, err = do.CopyFileViaWrite(srcKey, realDstKey, size)
dstObject, err := dstSo.Stat(dstKey)
if err != nil {
if errors.Is(err, services.ErrObjectNotExist) {
err = nil
} else {
// TODO: we will support other copy method later.
ch, err = do.CopyFileViaMultipart(srcKey, realDstKey, size)
logger.Error("stat", zap.Error(err), zap.String("dst path", dstKey))
return err
}
if err != nil {
logger.Error("start copy",
zap.String("src", srcKey),
zap.String("dst", realDstKey),
zap.Error(err))
continue
}
if dstObject != nil {
if dstObject.Mode.IsPart() {
fmt.Printf("cp: target '%s' is an in progress multipart upload task\n", dstKey)
return fmt.Errorf("cp: target '%s' is an in progress multipart upload task", dstKey)
}

for v := range ch {
logger.Error("read next result", zap.Error(v.Error))
if srcNum > 1 && !dstObject.Mode.IsDir() {
fmt.Printf("cp: target '%s' is not a directory\n", dstKey)
return fmt.Errorf("cp: target '%s' is not a directory", dstKey)
}
}

for store, objects := range storeObjectMap {
for _, o := range objects {
var size int64
if o.Mode.IsRead() {
n, ok := o.GetContentLength()
if !ok {
logger.Error("can't get object content length", zap.String("path", o.Path))
continue
}
size = n
}

do := operations.NewDualOperator(store, dst)
if c.IsSet(flagWorkersName) {
do.WithWorkers(c.Int(flagWorkersName))
}

// set read pairs
do.WithReadPairs(readPairs...)
// set write pairs
do.WithWritePairs(writePairs...)

realDstKey := dstKey
if srcNum > 1 || (dstObject != nil && dstObject.Mode.IsDir()) {
realDstKey = filepath.Join(dstKey, filepath.Base(o.Path))
}

var ch chan *operations.EmptyResult
if c.Bool(cpFlagRecursive) && o.Mode.IsDir() {
ch, err = do.CopyRecursively(o.Path, realDstKey, multipartThreshold)
} else if size < multipartThreshold {
ch, err = do.CopyFileViaWrite(o.Path, realDstKey, size)
} else {
// TODO: we will support other copy method later.
ch, err = do.CopyFileViaMultipart(o.Path, realDstKey, size)
}
if err != nil {
logger.Error("start copy",
zap.String("src", o.Path),
zap.String("dst", realDstKey),
zap.Error(err))
continue
}

for v := range ch {
logger.Error("read next result", zap.Error(v.Error))
}
}
}
return
},
}
Loading