Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Missing Changes for Release 2.4.2 #1622

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
11 changes: 0 additions & 11 deletions blobfuse2-nightly.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ stages:
steps:
- script: |
sudo apt-get update --fix-missing
sudo apt update
sudo apt-get install cmake gcc $(fuselib) git parallel -y
if [ $(tags) == "fuse2" ]; then
sudo apt-get install fuse -y
Expand Down Expand Up @@ -243,7 +242,6 @@ stages:
steps:
- script: |
sudo apt-get update --fix-missing
sudo apt update
sudo apt-get install cmake gcc $(fuselib) git parallel -y
if [ $(tags) == "fuse2" ]; then
sudo apt-get install fuse -y
Expand Down Expand Up @@ -383,7 +381,6 @@ stages:
steps:
- script: |
sudo apt-get update --fix-missing
sudo apt update
sudo apt-get install cmake gcc $(fuselib) git -y
if [ $(tags) == "fuse2" ]; then
sudo apt-get install fuse -y
Expand Down Expand Up @@ -506,7 +503,6 @@ stages:
steps:
- script: |
sudo apt-get update --fix-missing
sudo apt update
sudo apt-get install cmake gcc $(fuselib) git -y
if [ $(tags) == "fuse2" ]; then
sudo apt-get install fuse -y
Expand Down Expand Up @@ -620,7 +616,6 @@ stages:
steps:
- script: |
sudo apt-get update --fix-missing
sudo apt update
sudo apt-get install cmake gcc libfuse3-dev git -y
displayName: 'Install libfuse'

Expand Down Expand Up @@ -1385,7 +1380,6 @@ stages:
installStep:
script: |
sudo apt-get update --fix-missing
sudo apt update
sudo apt-get install cmake gcc $(fuselib) git parallel -y
if [ $(tags) == "fuse2" ]; then
sudo apt-get install fuse -y
Expand Down Expand Up @@ -1462,7 +1456,6 @@ stages:
installStep:
script: |
sudo apt-get update --fix-missing
sudo apt update
sudo apt-get install cmake gcc $(fuselib) git parallel -y
if [ $(tags) == "fuse2" ]; then
sudo apt-get install fuse -y
Expand Down Expand Up @@ -1541,7 +1534,6 @@ stages:
installStep:
script: |
sudo apt-get update --fix-missing
sudo apt update
sudo apt-get install cmake gcc $(fuselib) git parallel -y
if [ $(tags) == "fuse2" ]; then
sudo apt-get install fuse -y
Expand Down Expand Up @@ -1639,7 +1631,6 @@ stages:
installStep:
script: |
sudo apt-get update --fix-missing
sudo apt update
sudo apt-get install cmake gcc $(fuselib) git parallel -y
if [ $(tags) == "fuse2" ]; then
sudo apt-get install fuse -y
Expand Down Expand Up @@ -1716,7 +1707,6 @@ stages:
installStep:
script: |
sudo apt-get update --fix-missing
sudo apt update
sudo apt-get install cmake gcc $(fuselib) git parallel -y
if [ $(tags) == "fuse2" ]; then
sudo apt-get install fuse -y
Expand Down Expand Up @@ -1793,7 +1783,6 @@ stages:
installStep:
script: |
sudo apt-get update --fix-missing
sudo apt update
sudo apt-get install cmake gcc $(fuselib) git parallel -y
if [ $(tags) == "fuse2" ]; then
sudo apt-get install fuse -y
Expand Down
7 changes: 6 additions & 1 deletion component/attr_cache/attr_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,11 @@ func (ac *AttrCache) TruncateFile(options internal.TruncateFileOptions) error {
if found && value.valid() && value.exists() {
value.setSize(options.Size)
}
// todo: invalidating path here rather than updating with etag
// due to some changes that are required in az storage comp which
// were not necessarily required. Once they were done invalidation
// of the attribute can be removed.
ac.invalidatePath(options.Name)
}
return err
}
Expand Down Expand Up @@ -589,7 +594,7 @@ func (ac *AttrCache) CommitData(options internal.CommitDataOptions) error {
if err == nil {
ac.cacheLock.RLock()
defer ac.cacheLock.RUnlock()

// TODO: Could we just update the size, etag, modtime of the file here?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking this way.
We can check all the uses of invalidatePath.

ac.invalidatePath(options.Name)
}
return err
Expand Down
13 changes: 7 additions & 6 deletions component/attr_cache/attr_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,12 +815,13 @@ func (suite *attrCacheTestSuite) TestTruncateFile() {

err = suite.attrCache.TruncateFile(options)
suite.assert.Nil(err)
suite.assert.Contains(suite.attrCache.cacheMap, path)
suite.assert.NotEqualValues(suite.attrCache.cacheMap[path].attr, &internal.ObjAttr{})
suite.assert.EqualValues(suite.attrCache.cacheMap[path].attr.Size, size) // new size should be set
suite.assert.EqualValues(suite.attrCache.cacheMap[path].attr.Mode, defaultMode)
suite.assert.True(suite.attrCache.cacheMap[path].valid())
suite.assert.True(suite.attrCache.cacheMap[path].exists())
// suite.assert.Contains(suite.attrCache.cacheMap, path)
// suite.assert.NotEqualValues(suite.attrCache.cacheMap[path].attr, &internal.ObjAttr{})
// suite.assert.EqualValues(suite.attrCache.cacheMap[path].attr.Size, size) // new size should be set
// suite.assert.EqualValues(suite.attrCache.cacheMap[path].attr.Mode, defaultMode)
// suite.assert.True(suite.attrCache.cacheMap[path].valid())
// suite.assert.True(suite.attrCache.cacheMap[path].exists())
suite.assert.False(suite.attrCache.cacheMap[path].valid())
}

// Tests CopyFromFile
Expand Down
13 changes: 8 additions & 5 deletions component/azstorage/block_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ func (bb *BlockBlob) DeleteDirectory(name string) (err error) {
// Source file must exist in storage account before calling this method.
// When the rename is success, Data, metadata, of the blob will be copied to the destination.
// Creation time and LMT is not preserved for copyBlob API.
// Etag of the destination blob changes.
// Copy the LMT to the src attr if the copy is success.
// https://learn.microsoft.com/en-us/rest/api/storageservices/copy-blob?tabs=microsoft-entra-id
func (bb *BlockBlob) RenameFile(source string, target string, srcAttr *internal.ObjAttr) error {
Expand Down Expand Up @@ -326,6 +327,7 @@ func (bb *BlockBlob) RenameFile(source string, target string, srcAttr *internal.
}

var dstLMT *time.Time = copyResponse.LastModified
var dstETag string = sanitizeEtag(copyResponse.ETag)

copyStatus := copyResponse.CopyStatus
var prop blob.GetPropertiesResponse
Expand All @@ -344,10 +346,11 @@ func (bb *BlockBlob) RenameFile(source string, target string, srcAttr *internal.

if pollCnt > 0 {
dstLMT = prop.LastModified
dstETag = sanitizeEtag(prop.ETag)
}

if copyStatus != nil && *copyStatus == blob.CopyStatusTypeSuccess {
modifyLMT(srcAttr, dstLMT)
modifyLMTandEtag(srcAttr, dstLMT, dstETag)
}

log.Trace("BlockBlob::RenameFile : %s -> %s done", source, target)
Expand Down Expand Up @@ -453,7 +456,7 @@ func (bb *BlockBlob) getAttrUsingRest(name string) (attr *internal.ObjAttr, err
Crtime: *prop.CreationTime,
Flags: internal.NewFileBitMap(),
MD5: prop.ContentMD5,
ETag: strings.Trim(string(*prop.ETag), `"`),
ETag: sanitizeEtag(prop.ETag),
}

parseMetadata(attr, prop.Metadata)
Expand Down Expand Up @@ -662,7 +665,7 @@ func (bb *BlockBlob) getBlobAttr(blobInfo *container.BlobItem) (*internal.ObjAtt
Crtime: bb.dereferenceTime(blobInfo.Properties.CreationTime, *blobInfo.Properties.LastModified),
Flags: internal.NewFileBitMap(),
MD5: blobInfo.Properties.ContentMD5,
ETag: strings.Trim((string)(*blobInfo.Properties.ETag), `"`),
ETag: sanitizeEtag(blobInfo.Properties.ETag),
}

parseMetadata(attr, blobInfo.Metadata)
Expand Down Expand Up @@ -940,7 +943,7 @@ func (bb *BlockBlob) ReadInBuffer(name string, offset int64, len int64, data []b
}

if etag != nil {
*etag = strings.Trim(string(*downloadResponse.ETag), `"`)
*etag = sanitizeEtag(downloadResponse.ETag)
}

return nil
Expand Down Expand Up @@ -1611,7 +1614,7 @@ func (bb *BlockBlob) CommitBlocks(name string, blockList []string, newEtag *stri
}

if newEtag != nil {
*newEtag = strings.Trim(string(*resp.ETag), `"`)
*newEtag = sanitizeEtag(resp.ETag)
}

return nil
Expand Down
5 changes: 3 additions & 2 deletions component/azstorage/datalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ func (dl *Datalake) DeleteDirectory(name string) (err error) {

// RenameFile : Rename the file
// While renaming the file, Creation time is preserved but LMT is changed for the destination blob.
// and also Etag of the destination blob changes
func (dl *Datalake) RenameFile(source string, target string, srcAttr *internal.ObjAttr) error {
log.Trace("Datalake::RenameFile : %s -> %s", source, target)

Expand All @@ -337,7 +338,7 @@ func (dl *Datalake) RenameFile(source string, target string, srcAttr *internal.O
return err
}
}
modifyLMT(srcAttr, renameResponse.LastModified)
modifyLMTandEtag(srcAttr, renameResponse.LastModified, sanitizeEtag(renameResponse.ETag))
return nil
}

Expand Down Expand Up @@ -400,7 +401,7 @@ func (dl *Datalake) GetAttr(name string) (blobAttr *internal.ObjAttr, err error)
Ctime: *prop.LastModified,
Crtime: *prop.LastModified,
Flags: internal.NewFileBitMap(),
ETag: (string)(*prop.ETag),
ETag: sanitizeEtag(prop.ETag),
}
parseMetadata(blobAttr, prop.Metadata)

Expand Down
10 changes: 9 additions & 1 deletion component/azstorage/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,14 +590,22 @@ func removeLeadingSlashes(s string) string {
return s
}

func modifyLMT(attr *internal.ObjAttr, lmt *time.Time) {
func modifyLMTandEtag(attr *internal.ObjAttr, lmt *time.Time, ETag string) {
if attr != nil {
attr.Atime = *lmt
attr.Mtime = *lmt
attr.Ctime = *lmt
attr.ETag = ETag
}
}

func sanitizeEtag(ETag *azcore.ETag) string {
if ETag != nil {
return strings.Trim(string(*ETag), `"`)
}
return ""
}

// func parseBlobTags(tags *container.BlobTags) map[string]string {

// if tags == nil {
Expand Down
11 changes: 9 additions & 2 deletions component/block_cache/block_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,12 +938,18 @@ func (bc *BlockCache) refreshBlock(handle *handlemap.Handle, index uint64, prefe

// lineupDownload : Create a work item and schedule the download
func (bc *BlockCache) lineupDownload(handle *handlemap.Handle, block *Block, prefetch bool) {
etagInterface, found := handle.GetValue("ETAG")
etag := ""
if found {
etag = etagInterface.(string)
}
item := &workItem{
handle: handle,
block: block,
prefetch: prefetch,
failCnt: 0,
upload: false,
ETag: etag,
}

// Remove this block from free block list and add to in-process list
Expand Down Expand Up @@ -1055,8 +1061,7 @@ func (bc *BlockCache) download(item *workItem) {

// Compare the ETAG value and fail download if blob has changed
if etag != "" {
etagVal, found := item.handle.GetValue("ETAG")
if found && etagVal != etag {
if item.ETag != "" && item.ETag != etag {
log.Err("BlockCache::download : Blob has changed for %v=>%s (index %v, offset %v)", item.handle.ID, item.handle.Path, item.block.id, item.block.offset)
item.block.Failed()
item.block.Ready(BlockStatusDownloadFailed)
Expand Down Expand Up @@ -1531,6 +1536,7 @@ return_safe:
}

// Stage the given number of blocks from this handle
// handle lock must be taken before calling this function
func (bc *BlockCache) commitBlocks(handle *handlemap.Handle) error {
log.Debug("BlockCache::commitBlocks : Staging blocks for %s", handle.Path)

Expand Down Expand Up @@ -1580,6 +1586,7 @@ func (bc *BlockCache) commitBlocks(handle *handlemap.Handle) error {
return err
}

// Lock was already acquired on the handle.
if newEtag != "" {
handle.SetValue("ETAG", newEtag)
}
Expand Down
10 changes: 10 additions & 0 deletions component/block_cache/threadpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,18 @@ type workItem struct {
failCnt int32 // How many times this item has failed to download
upload bool // Flag marking this is a upload request or not
blockId string // BlockId of the block
ETag string // Etag of the file before scheduling.
}

// Reason for storing Etag in workitem struct:
// here getting the value of ETag inside upload/download methods
// from the handle is somewhat tricker.
// firstly we need to acquire a lock to read it from the handle.
// In these methods the handle may/maynot be locked by
// other go routine hence acquiring it again would cause a deadlock.
// It is already locked if the call came from the readInBuffer.
// It is may be locked if the call come from the prefetch.

// newThreadPool creates a new thread pool
func newThreadPool(count uint32, reader func(*workItem), writer func(*workItem)) *ThreadPool {
if count == 0 || reader == nil {
Expand Down
13 changes: 12 additions & 1 deletion component/libfuse/libfuse2_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ func populateFuseArgs(opts *C.fuse_options_t, args *C.fuse_args_t) (*C.fuse_opti
options += fmt.Sprintf(",umask=%04d", opts.umask)
}

// force the fuse library to always pass O_TRUNC flag on open call
// Not checking the options since we don't allow user to configure this flag.
// This is the default behaviour for the fuse3 hence we don't pass this flag there.
// ref: https://github.com/libfuse/libfuse/blob/7f86f3be7148b15b71b63357813c66dd32177cf6/lib/fuse_lowlevel.c#L2161C2-L2161C16
options += ",atomic_o_trunc"

// direct_io option is used to bypass the kernel cache. It disables the use of
// page cache (file content cache) in the kernel for the filesystem.
if fuseFS.directIO {
Expand Down Expand Up @@ -920,7 +926,12 @@ func libfuse2_rename(src *C.char, dst *C.char) C.int {
libfuseStatsCollector.UpdateStats(stats_manager.Increment, renameDir, (int64)(1))

} else {
err := fuseFS.NextComponent().RenameFile(internal.RenameFileOptions{Src: srcPath, Dst: dstPath})
err := fuseFS.NextComponent().RenameFile(internal.RenameFileOptions{
Src: srcPath,
Dst: dstPath,
SrcAttr: srcAttr,
DstAttr: dstAttr,
})
if err != nil {
log.Err("Libfuse::libfuse2_rename : error renaming file %s -> %s [%s]", srcPath, dstPath, err.Error())
return -C.EIO
Expand Down
30 changes: 30 additions & 0 deletions test/e2e_tests/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,36 @@ func (suite *fileTestSuite) TestFileCreate() {
suite.fileTestCleanup([]string{fileName})
}

func (suite *fileTestSuite) TestOpenFlag_O_TRUNC() {
fileName := suite.testPath + "/test_on_open"
buf := "foo"
srcFile, err := os.OpenFile(fileName, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0666)
suite.Nil(err)
bytesWritten, err := srcFile.Write([]byte(buf))
suite.Equal(len(buf), bytesWritten)
suite.Nil(err)
err = srcFile.Close()
suite.Nil(err)

srcFile, err = os.OpenFile(fileName, os.O_WRONLY, 0666)
suite.Nil(err)
err = srcFile.Close()
suite.Nil(err)

fileInfo, err := os.Stat(fileName)
suite.Equal(int64(len(buf)), fileInfo.Size())
suite.Nil(err)

srcFile, err = os.OpenFile(fileName, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0666)
suite.Nil(err)
err = srcFile.Close()
suite.Nil(err)

fileInfo, err = os.Stat(fileName)
suite.Equal(int64(0), fileInfo.Size())
suite.Nil(err)
}

func (suite *fileTestSuite) TestFileCreateUtf8Char() {
fileName := suite.testPath + "/भारत.txt"
srcFile, err := os.OpenFile(fileName, os.O_CREATE, 0777)
Expand Down
Loading