Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rework restore to simplify and split resource creation from permissio… #71

Merged
merged 1 commit into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/plakar/subcommands/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func cmd_restore(ctx *context.Context, repo *repository.Repository, args []strin
if err != nil {
return 1
}
snap.Restore(exporterInstance, dir, opts)
snap.Restore(exporterInstance, dir, dir, opts)
return 0
}
}
Expand All @@ -102,7 +102,7 @@ func cmd_restore(ctx *context.Context, repo *repository.Repository, args []strin

for offset, snap := range snapshots {
_, pattern := utils.ParseSnapshotID(flags.Args()[offset])
snap.Restore(exporterInstance, pattern, opts)
snap.Restore(exporterInstance, dir, pattern, opts)
}

return 0
Expand Down
27 changes: 19 additions & 8 deletions snapshot/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import (

type ExporterBackend interface {
Root() string
CreateDirectory(pathname string, fileinfo *objects.FileInfo) error
StoreFile(pathname string, fileinfo *objects.FileInfo, fp io.Reader) error
CreateDirectory(pathname string) error
StoreFile(pathname string, fp io.Reader) error
SetPermissions(pathname string, fileinfo *objects.FileInfo) error
Close() error
}

Expand Down Expand Up @@ -94,24 +95,34 @@ func (exporter *Exporter) Root() string {
return exporter.backend.Root()
}

func (exporter *Exporter) CreateDirectory(pathname string, fileinfo *objects.FileInfo) error {
func (exporter *Exporter) CreateDirectory(pathname string) error {
t0 := time.Now()
defer func() {
profiler.RecordEvent("vfs.exporter.CreateDirectory", time.Since(t0))
logger.Trace("vfs", "exporter.CreateDirectory(%s): %s", pathname, time.Since(t0))
}()

return exporter.backend.CreateDirectory(pathname, fileinfo)
return exporter.backend.CreateDirectory(pathname)
}

func (exporter *Exporter) StoreFile(pathname string, fileinfo *objects.FileInfo, fp io.Reader) error {
func (exporter *Exporter) StoreFile(pathname string, fp io.Reader) error {
t0 := time.Now()
defer func() {
profiler.RecordEvent("vfs.exporter.Store", time.Since(t0))
logger.Trace("vfs", "exporter.Store(%s): %s", pathname, time.Since(t0))
profiler.RecordEvent("vfs.exporter.StoreFile", time.Since(t0))
logger.Trace("vfs", "exporter.StoreFile(%s): %s", pathname, time.Since(t0))
}()

return exporter.backend.StoreFile(pathname, fileinfo, fp)
return exporter.backend.StoreFile(pathname, fp)
}

func (exporter *Exporter) SetPermissions(pathname string, fileinfo *objects.FileInfo) error {
t0 := time.Now()
defer func() {
profiler.RecordEvent("vfs.exporter.SetPermissions", time.Since(t0))
logger.Trace("vfs", "exporter.SetPermissions(%s): %s", pathname, time.Since(t0))
}()

return exporter.backend.SetPermissions(pathname, fileinfo)
}

func (exporter *Exporter) Close() error {
Expand Down
29 changes: 15 additions & 14 deletions snapshot/exporter/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,11 @@ func (p *FSExporter) Root() string {
return p.rootDir
}

func (p *FSExporter) CreateDirectory(pathname string, fileinfo *objects.FileInfo) error {
os.MkdirAll(pathname, 0700)
//os.Chmod(pathname, fileinfo.Mode())
//os.Chown(pathname, int(fileinfo.Uid()), int(fileinfo.Gid()))
return nil
func (p *FSExporter) CreateDirectory(pathname string) error {
return os.MkdirAll(pathname, 0700)
}

func (p *FSExporter) StoreFile(pathname string, fileinfo *objects.FileInfo, fp io.Reader) error {
func (p *FSExporter) StoreFile(pathname string, fp io.Reader) error {
f, err := os.Create(pathname)
if err != nil {
return err
Expand All @@ -81,14 +78,18 @@ func (p *FSExporter) StoreFile(pathname string, fileinfo *objects.FileInfo, fp i
if err := f.Close(); err != nil {
logger.Warn("close failure: %s: %s", pathname, err)
}
//if err := os.Chmod(pathname, fileinfo.Mode()); err != nil {
// logger.Warn("chmod failure: %s: %s", pathname, err)
//}
//if err := os.Chown(pathname, int(fileinfo.Uid()), int(fileinfo.Gid())); err != nil {
// if err == os.ErrPermission {
// logger.Warn("chown failure: %s: %s", pathname, err)
// }
//}
return nil
}

func (p *FSExporter) SetPermissions(pathname string, fileinfo *objects.FileInfo) error {
if err := os.Chmod(pathname, fileinfo.Mode()); err != nil {
return err
}
if os.Getuid() == 0 {
if err := os.Chown(pathname, int(fileinfo.Uid()), int(fileinfo.Gid())); err != nil {
return err
}
}
return nil
}

Expand Down
8 changes: 6 additions & 2 deletions snapshot/exporter/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,22 @@ func (p *S3Exporter) Root() string {
return p.rootDir
}

func (p *S3Exporter) CreateDirectory(pathname string, fileinfo *objects.FileInfo) error {
func (p *S3Exporter) CreateDirectory(pathname string) error {
return nil
}

func (p *S3Exporter) StoreFile(pathname string, fileinfo *objects.FileInfo, fp io.Reader) error {
func (p *S3Exporter) StoreFile(pathname string, fp io.Reader) error {
_, err := p.minioClient.PutObject(context.Background(),
strings.TrimPrefix(p.rootDir, "/"),
strings.TrimPrefix(pathname, p.rootDir+"/"),
fp, -1, minio.PutObjectOptions{})
return err
}

func (p *S3Exporter) SetPermissions(pathname string, fileinfo *objects.FileInfo) error {
return nil
}

func (p *S3Exporter) Close() error {
return nil
}
199 changes: 92 additions & 107 deletions snapshot/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,144 +18,129 @@ type RestoreOptions struct {
Rebase bool
}

func (snap *Snapshot) Restore(exp *exporter.Exporter, pattern string, opts *RestoreOptions) error {
rebase := opts.Rebase

hardlinks := make(map[string]string)
hardlinksMutex := sync.Mutex{}

var wg sync.WaitGroup
maxConcurrency := make(chan bool, opts.MaxConcurrency)

dpattern := path.Clean(pattern)
fpattern := path.Clean(pattern)

/* if at root, pretend there's no pattern */
if dpattern == "/" || dpattern == "." {
dpattern = ""
fpattern = ""
}

/* if pattern is a file, we rebase dpattern to parent */
//patternIsFile := false
type restoreContext struct {
hardlinks map[string]string
hardlinksMutex sync.Mutex
maxConcurrency chan bool
}

fs, err := vfs.NewFilesystem(snap.repository, snap.Header.Root)
func snapshotRestorePath(snap *Snapshot, fs *vfs.Filesystem, exp *exporter.Exporter, target string, base string, pathname string, opts *RestoreOptions, restoreContext *restoreContext, wg *sync.WaitGroup) error {
snap.Event(events.PathEvent(snap.Header.SnapshotID, pathname))
fsinfo, err := fs.Stat(pathname)
if err != nil {
snap.Event(events.DirectoryMissingEvent(snap.Header.SnapshotID, pathname))
return err
}
if _, err := fs.Stat(fpattern); err != nil {
//patternIsFile = true
tmp := strings.Split(dpattern, "/")
if len(tmp) > 1 {
dpattern = strings.Join(tmp[:len(tmp)-1], "/")
}
}

directoriesCount := 0
for directory := range fs.Directories() {
if dpattern != "" {
if directory != dpattern &&
(!strings.HasPrefix(directory, fmt.Sprintf("%s/", dpattern)) ||
len(directory) > len(dpattern)) {
continue
}
}

maxConcurrency <- true
wg.Add(1)
go func(directory string) {
defer wg.Done()
defer func() { <-maxConcurrency }()

snap.Event(events.DirectoryEvent(snap.Header.SnapshotID, directory))
var dest string
if opts.Rebase && strings.HasPrefix(pathname, base) {
dest = filepath.Join(target, pathname[len(base):])
} else {
dest = filepath.Join(target, pathname)
}

var dest string
if dirEntry, isDir := fsinfo.(*vfs.DirEntry); isDir {
snap.Event(events.DirectoryEvent(snap.Header.SnapshotID, pathname))

fi, _ := fs.Stat(directory)
rel := path.Clean(filepath.Join(".", directory))
if rebase && strings.HasPrefix(directory, dpattern) {
dest = filepath.Join(exp.Root(), directory[len(dpattern):])
} else {
dest = filepath.Join(exp.Root(), directory)
if pathname != "/" {
if err := exp.CreateDirectory(dest); err != nil {
snap.Event(events.DirectoryErrorEvent(snap.Header.SnapshotID, pathname, err.Error()))
return err
}
_ = rel
}
complete := true

dest = filepath.FromSlash(dest)
if err := exp.CreateDirectory(dest, fi.(*vfs.DirEntry).FileInfo()); err != nil {
snap.Event(events.DirectoryErrorEvent(snap.Header.SnapshotID, directory, err.Error()))
} else {
snap.Event(events.DirectoryOKEvent(snap.Header.SnapshotID, directory))
subwg := sync.WaitGroup{}
for _, child := range dirEntry.Children {
err := snapshotRestorePath(snap, fs, exp, target, base, filepath.Join(pathname, child.FileInfo.Name()), opts, restoreContext, &subwg)
if err != nil {
complete = false
}
directoriesCount++
}(directory)
}
wg.Wait()

filesCount := 0
var filesSize uint64 = 0
for filename := range fs.Files() {
if fpattern != "" {
if filename != fpattern &&
!strings.HasPrefix(filename, fmt.Sprintf("%s/", fpattern)) {
continue
}
subwg.Wait()

if !complete {
snap.Event(events.DirectoryCorruptedEvent(snap.Header.SnapshotID, pathname))
return err
} else {
if pathname != "/" {
if err := exp.SetPermissions(dest, dirEntry.FileInfo()); err != nil {
snap.Event(events.DirectoryErrorEvent(snap.Header.SnapshotID, pathname, err.Error()))
return err
}
}
snap.Event(events.DirectoryOKEvent(snap.Header.SnapshotID, pathname))
return nil
}
} else if fileEntry, isFile := fsinfo.(*vfs.FileEntry); isFile && fileEntry.FileInfo().Mode().IsRegular() {
snap.Event(events.FileEvent(snap.Header.SnapshotID, pathname))

maxConcurrency <- true
restoreContext.maxConcurrency <- true
wg.Add(1)
go func(file string) {
go func(_fileEntry *vfs.FileEntry) {
defer wg.Done()
defer func() { <-maxConcurrency }()

snap.Event(events.FileEvent(snap.Header.SnapshotID, file))

var dest string
defer func() { <-restoreContext.maxConcurrency }()

fi, _ := fs.Stat(file)

//rel := path.Clean(filepath.Join(".", file))
if rebase && strings.HasPrefix(file, dpattern) {
dest = filepath.Join(exp.Root(), file[len(dpattern):])
} else {
dest = filepath.Join(exp.Root(), file)
}
dest = filepath.Clean(dest)

if fi.(*vfs.FileEntry).NumLinks > 1 {
key := fmt.Sprintf("%d:%d", fi.(*vfs.FileEntry).DeviceID, fi.(*vfs.FileEntry).InodeID)
hardlinksMutex.Lock()
v, ok := hardlinks[key]
hardlinksMutex.Unlock()
if fileEntry.NumLinks > 1 {
key := fmt.Sprintf("%d:%d", fileEntry.DeviceID, fileEntry.InodeID)
restoreContext.hardlinksMutex.Lock()
v, ok := restoreContext.hardlinks[key]
restoreContext.hardlinksMutex.Unlock()
if ok {
os.Link(v, dest)
filesSize += uint64(fi.(*vfs.FileEntry).Size)
filesCount++
return
} else {
hardlinksMutex.Lock()
hardlinks[key] = dest
hardlinksMutex.Unlock()
restoreContext.hardlinksMutex.Lock()
restoreContext.hardlinks[key] = dest
restoreContext.hardlinksMutex.Unlock()
}
}

rd, err := snap.NewReader(file)
rd, err := snap.NewReader(pathname)
if err != nil {
snap.Event(events.FileErrorEvent(snap.Header.SnapshotID, file, err.Error()))
snap.Event(events.FileErrorEvent(snap.Header.SnapshotID, pathname, err.Error()))
return
}
defer rd.Close()

if err := exp.StoreFile(dest, fi.(*vfs.FileEntry).FileInfo(), rd); err != nil {
snap.Event(events.FileErrorEvent(snap.Header.SnapshotID, file, err.Error()))
if err := exp.StoreFile(dest, rd); err != nil {
snap.Event(events.FileErrorEvent(snap.Header.SnapshotID, pathname, err.Error()))
} else if err := exp.SetPermissions(dest, fileEntry.FileInfo()); err != nil {
snap.Event(events.FileErrorEvent(snap.Header.SnapshotID, pathname, err.Error()))
} else {
snap.Event(events.FileOKEvent(snap.Header.SnapshotID, file))
snap.Event(events.FileOKEvent(snap.Header.SnapshotID, pathname))
}

filesSize += uint64(fi.(*vfs.FileEntry).Size)
filesCount++
}(filename)
}(fileEntry)
return nil
} else {
return fmt.Errorf("unexpected vfs entry type")
}
}

func (snap *Snapshot) Restore(exp *exporter.Exporter, base string, pathname string, opts *RestoreOptions) error {
snap.Event(events.StartEvent())
defer snap.Event(events.DoneEvent())

fs, err := snap.Filesystem()
if err != nil {
return err
}

restoreContext := &restoreContext{
hardlinks: make(map[string]string),
hardlinksMutex: sync.Mutex{},
maxConcurrency: make(chan bool, opts.MaxConcurrency),
}
wg.Wait()
defer close(restoreContext.maxConcurrency)

base = path.Clean(base)
if base != "/" && !strings.HasSuffix(base, "/") {
base = base + "/"
}

wg := sync.WaitGroup{}
defer wg.Wait()

return nil
return snapshotRestorePath(snap, fs, exp, base, pathname, pathname, opts, restoreContext, &wg)
}
Loading