Skip to content

Commit

Permalink
add parallel downloads
Browse files Browse the repository at this point in the history
Signed-off-by: dwillist <[email protected]>
  • Loading branch information
dwillist committed Feb 22, 2021
1 parent 125cb59 commit 9f93ecd
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 14 deletions.
87 changes: 77 additions & 10 deletions create_asset_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package pack
import (
"context"
"fmt"
"strings"

"github.com/google/go-containerregistry/pkg/name"

"github.com/buildpacks/pack/internal/blob"
"github.com/buildpacks/pack/internal/dist"
)

const assetDownloadWorkers = 4

type CreateAssetCacheOptions struct {
ImageName string
Assets dist.Assets
Expand Down Expand Up @@ -39,20 +42,84 @@ func (c *Client) CreateAssetCache(ctx context.Context, opts CreateAssetCacheOpti
return assetCacheImage.Save()
}

// TODO -Dan- downloads should be concurrent.
type downloadResult struct {
index int
pair dist.BlobAssetPair
sha256 string
err error
}

type downloadJob struct {
asset dist.Asset
index int
}

// TODO -Dan- parallel downloads should cleanly exit with Ctrl-C
// TODO -Dan- parallel download output is a bit messed up.
// existing behavior is a bit dangerous and can poison the cache.
func (c *Client) downloadAssets(assets []dist.Asset) (dist.BlobMap, error) {
result := make(dist.BlobMap)
for _, asset := range assets {
if asset.URI == "" {
continue
resultMap := make(dist.BlobMap)
results := make(chan downloadResult, len(assets))
jobs := make(chan downloadJob, len(assets))
for workerCount := 0; workerCount < assetDownloadWorkers; workerCount++ {
go downloadWorker(c.downloader, jobs, results)
}

for assetIdx, asset := range assets {
jobs <- downloadJob{
asset: asset,
index: assetIdx,
}
}
close(jobs)

resultIndiciesMap := map[string]int{}
errors := []error{}
for i := 0; i < len(assets); i++ {
r := <-results
prevIdx, ok := resultIndiciesMap[r.sha256]
switch {
case r.err != nil:
errors = append(errors, r.err)
case !ok || prevIdx < r.index:
resultIndiciesMap[r.sha256] = r.index
resultMap[r.sha256] = r.pair
}
}

joinedErrs := fmt.Errorf("the following errors occurred during download: %q", errorJoin(errors, ", "))
if len(errors) > 0 {
return resultMap, joinedErrs
}
return resultMap, nil
}

func errorJoin(elems []error, sep string) string {
strArr := []string{}
for _, elem := range elems {
strArr = append(strArr, elem.Error())
}

return strings.Join(strArr, sep)
}

func downloadWorker(downloader Downloader, jobs <-chan downloadJob, results chan<- downloadResult) {
for j := range jobs {
var b blob.Blob = nil
var err error
if j.asset.URI != "" {
b, err = downloader.Download(context.Background(), j.asset.URI, blob.RawDownload, blob.ValidateDownload(j.asset.Sha256))
}
b, err := c.downloader.Download(context.Background(), asset.URI, blob.RawDownload, blob.ValidateDownload(asset.Sha256))
if err != nil {
return dist.BlobMap{}, err
results <- downloadResult{
index: j.index,
pair: dist.BlobAssetPair{
Blob: b,
AssetVal: j.asset.ToAssetValue(""),
},
sha256: j.asset.Sha256,
err: err,
}
result[asset.Sha256] = dist.NewBlobAssetPair(b, asset.ToAssetValue(""))
}
return result, nil
}

func validateConfig(cfg CreateAssetCacheOptions) (CreateAssetCacheOptions, error) {
Expand Down
20 changes: 16 additions & 4 deletions create_asset_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path/filepath"
"strings"
"testing"
"time"

"github.com/buildpacks/imgutil/fakes"
"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -93,7 +94,10 @@ second-asset-blob-contents.
fakeImage = fakes.NewImage(imageName, "somesha256", imgRef)

mockImageFactory.EXPECT().NewImage(imageName, true).Return(fakeImage, nil)
mockDownloader.EXPECT().Download(gomock.Any(), "https://first-asset-uri", gomock.Any()).Return(firstAssetBlob, nil)
mockDownloader.EXPECT().Download(gomock.Any(), "https://first-asset-uri", gomock.Any()).Do(func(_ ...interface{}) {
time.Sleep(2 * time.Second)
}).Return(firstAssetBlob, nil)
mockDownloader.EXPECT().Download(gomock.Any(), "https://first-asset-replace-uri", gomock.Any()).Return(firstAssetBlob, nil)
mockDownloader.EXPECT().Download(gomock.Any(), "https://second-asset-uri", gomock.Any()).Return(secondAssetBlob, nil)

assert.Succeeds(client.CreateAssetCache(context.Background(), pack.CreateAssetCacheOptions{
Expand All @@ -107,6 +111,14 @@ second-asset-blob-contents.
URI: "https://first-asset-uri",
Version: "1.2.3",
},
{
ID: "first-asset-replace",
Name: "First Asset Replace",
Sha256: "first-sha256",
Stacks: []string{"io.buildpacks.stacks.bionic"},
URI: "https://first-asset-replace-uri",
Version: "1.2.3",
},
{
ID: "second-asset",
Name: "Second Asset",
Expand Down Expand Up @@ -138,11 +150,11 @@ second-asset-blob-contents.
assert.Succeeds(json.NewDecoder(strings.NewReader(layersLabel)).Decode(&assetMap))
assert.Equal(assetMap, dist.AssetMap{
"first-sha256": dist.AssetValue{
ID: "first-asset",
Name: "First Asset",
ID: "first-asset-replace",
Name: "First Asset Replace",
LayerDiffID: "sha256:edde92682d3bc9b299b52a0af4a3934ae6742e0eb90bc7168e81af5ab6241722",
Stacks: []string{"io.buildpacks.stacks.bionic"},
URI: "https://first-asset-uri",
URI: "https://first-asset-replace-uri",
Version: "1.2.3",
}, "second-sha256": dist.AssetValue{
ID: "second-asset",
Expand Down

0 comments on commit 9f93ecd

Please sign in to comment.