Skip to content

Commit

Permalink
buffered write handler changes to integrate upload handler
Browse files Browse the repository at this point in the history
  • Loading branch information
ashmeenkaur committed Nov 11, 2024
1 parent 3dd81ac commit a251009
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 17 deletions.
34 changes: 22 additions & 12 deletions internal/bufferedwrites/buffered_write_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
package bufferedwrites

import (
"context"
"fmt"
"math"
"time"

"github.com/googlecloudplatform/gcsfuse/v2/internal/block"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"golang.org/x/sync/semaphore"
)

Expand All @@ -29,8 +31,9 @@ import (
// BufferedWriteHandler is responsible for filling up the buffers with the data
// as it receives and handing over to uploadHandler which uploads to GCS.
type BufferedWriteHandler struct {
current block.Block
blockPool *block.BlockPool
current block.Block
blockPool *block.BlockPool
uploadHandler *UploadHandler

Check failure on line 36 in internal/bufferedwrites/buffered_write_handler.go

View workflow job for this annotation

GitHub Actions / Lint

undefined: UploadHandler
// Total size of data buffered so far. Some part of buffered data might have
// been uploaded to GCS as well.
totalSize int64
Expand All @@ -45,24 +48,25 @@ type WriteFileInfo struct {
}

// NewBWHandler creates the bufferedWriteHandler struct.
func NewBWHandler(blockSize int64, maxBlocks int32, globalMaxBlocksSem *semaphore.Weighted) (bwh *BufferedWriteHandler, err error) {
func NewBWHandler(objectName string, bucket gcs.Bucket, blockSize int64, maxBlocks int32, globalMaxBlocksSem *semaphore.Weighted) (bwh *BufferedWriteHandler, err error) {
bp, err := block.NewBlockPool(blockSize, maxBlocks, globalMaxBlocksSem)
if err != nil {
return
}

bwh = &BufferedWriteHandler{
current: nil,
blockPool: bp,
totalSize: 0,
mtime: time.Now(),
current: nil,
blockPool: bp,
uploadHandler: newUploadHandler(objectName, bucket, bp.FreeBlocksChannel(), blockSize),

Check failure on line 60 in internal/bufferedwrites/buffered_write_handler.go

View workflow job for this annotation

GitHub Actions / Lint

undefined: newUploadHandler

Check failure on line 60 in internal/bufferedwrites/buffered_write_handler.go

View workflow job for this annotation

GitHub Actions / Lint

bp.FreeBlocksChannel undefined (type *block.BlockPool has no field or method FreeBlocksChannel) (typecheck)
totalSize: 0,
mtime: time.Now(),
}
return
}

// Write writes the given data to the buffer. It writes to an existing buffer if
// the capacity is available otherwise writes to a new buffer.
func (wh *BufferedWriteHandler) Write(data []byte, offset int64) (err error) {
func (wh *BufferedWriteHandler) Write(ctx context.Context, data []byte, offset int64) (err error) {
if offset > wh.totalSize {
// TODO: Will be handled as part of ordered writes.
return fmt.Errorf("non sequential writes")
Expand All @@ -88,7 +92,7 @@ func (wh *BufferedWriteHandler) Write(data []byte, offset int64) (err error) {
dataWritten += bytesToCopy

if wh.current.Size() == wh.blockPool.BlockSize() {
// TODO: err = trigger upload
err := wh.uploadHandler.Upload(ctx, wh.current)
if err != nil {
return err
}
Expand All @@ -107,9 +111,15 @@ func (wh *BufferedWriteHandler) Sync() (err error) {
}

// Flush finalizes the upload.
func (wh *BufferedWriteHandler) Flush() (err error) {
// TODO: Will be added after uploadHandler changes are done.
return fmt.Errorf("not implemented")
func (wh *BufferedWriteHandler) Flush(ctx context.Context) (err error) {
if wh.current != nil {
err := wh.uploadHandler.Upload(ctx, wh.current)
if err != nil {
return err
}
wh.current = nil
}
return wh.uploadHandler.Finalize()
}

// SetMtime stores the mtime with the bufferedWriteHandler.
Expand Down
36 changes: 31 additions & 5 deletions internal/bufferedwrites/buffered_write_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
package bufferedwrites

import (
"context"
"strings"
"testing"
"time"

"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/fake"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"github.com/jacobsa/timeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand All @@ -35,7 +39,8 @@ func TestBufferedWriteTestSuite(t *testing.T) {
}

func (testSuite *BufferedWriteTest) SetupTest() {
bwh, err := NewBWHandler(1024, 10, semaphore.NewWeighted(10))
bucket := fake.NewFakeBucket(timeutil.RealClock(), "FakeBucketName", gcs.NonHierarchical)
bwh, err := NewBWHandler("testObject", bucket, 1024, 10, semaphore.NewWeighted(10))
require.Nil(testSuite.T(), err)
testSuite.bwh = bwh
}
Expand All @@ -49,7 +54,7 @@ func (testSuite *BufferedWriteTest) TestSetMTime() {
}

func (testSuite *BufferedWriteTest) TestWrite() {
err := testSuite.bwh.Write([]byte("hi"), 0)
err := testSuite.bwh.Write(context.Background(), []byte("hi"), 0)

require.Nil(testSuite.T(), err)
fileInfo := testSuite.bwh.WriteFileInfo()
Expand All @@ -58,7 +63,7 @@ func (testSuite *BufferedWriteTest) TestWrite() {
}

func (testSuite *BufferedWriteTest) TestWriteWithEmptyBuffer() {
err := testSuite.bwh.Write([]byte{}, 0)
err := testSuite.bwh.Write(context.Background(), []byte{}, 0)

require.Nil(testSuite.T(), err)
fileInfo := testSuite.bwh.WriteFileInfo()
Expand All @@ -69,7 +74,7 @@ func (testSuite *BufferedWriteTest) TestWriteWithEmptyBuffer() {
func (testSuite *BufferedWriteTest) TestWriteEqualToBlockSize() {
size := 1024
data := strings.Repeat("A", size)
err := testSuite.bwh.Write([]byte(data), 0)
err := testSuite.bwh.Write(context.Background(), []byte(data), 0)

require.Nil(testSuite.T(), err)
fileInfo := testSuite.bwh.WriteFileInfo()
Expand All @@ -80,10 +85,31 @@ func (testSuite *BufferedWriteTest) TestWriteEqualToBlockSize() {
func (testSuite *BufferedWriteTest) TestWriteDataSizeGreaterThanBlockSize() {
size := 2000
data := strings.Repeat("A", size)
err := testSuite.bwh.Write([]byte(data), 0)
err := testSuite.bwh.Write(context.Background(), []byte(data), 0)

require.Nil(testSuite.T(), err)
fileInfo := testSuite.bwh.WriteFileInfo()
assert.Equal(testSuite.T(), testSuite.bwh.mtime, fileInfo.Mtime)
assert.Equal(testSuite.T(), int64(size), fileInfo.TotalSize)
}

func (testSuite *BufferedWriteTest) TestFlushWithNonNilCurrentBlock() {
err := testSuite.bwh.Write(context.Background(), []byte("hi"), 0)
currentBlock := testSuite.bwh.current
require.Nil(testSuite.T(), err)

err = testSuite.bwh.Flush(context.Background())
require.NoError(testSuite.T(), err)
assert.Equal(testSuite.T(), nil, testSuite.bwh.current)
// The current block should be available on the free channel as flush triggers
// an upload before finalize.
got := testSuite.bwh.blockPool.ClearFreeBlockChannel()
assert.Equal(testSuite.T(), currentBlock, got)
}

func (testSuite *BufferedWriteTest) TestFlushWithNilCurrentBlock() {
require.Nil(testSuite.T(), testSuite.bwh.current)

err := testSuite.bwh.Flush(context.Background())
assert.NoError(testSuite.T(), err)
}

0 comments on commit a251009

Please sign in to comment.