Skip to content

Commit

Permalink
experiment sendfile
Browse files Browse the repository at this point in the history
  • Loading branch information
altkatz committed Feb 12, 2021
1 parent a47a1a2 commit 9b9bb44
Show file tree
Hide file tree
Showing 11 changed files with 167 additions and 13 deletions.
2 changes: 1 addition & 1 deletion cmd/jocko/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func run(cmd *cobra.Command, args []string) {
Param: 1,
},
Reporter: &jaegercfg.ReporterConfig{
LogSpans: true,
//LogSpans: true,
},
}

Expand Down
56 changes: 56 additions & 0 deletions commitlog/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package commitlog

import (
"io"
"os"
"sync"

"github.com/travisjeffery/jocko/log"

"github.com/pkg/errors"
)

Expand All @@ -14,6 +17,17 @@ type Reader struct {
pos int64
}

func (r *Reader) FileAndOffset() (*os.File, int64, int) {
r.mu.Lock()
defer r.mu.Unlock()

segments := r.cl.Segments()
segment := segments[r.idx]
file := segment.File()
//todo size
size := 0
return file, r.pos, size
}
func (r *Reader) Read(p []byte) (n int, err error) {
r.mu.Lock()
defer r.mu.Unlock()
Expand Down Expand Up @@ -56,13 +70,55 @@ func (l *CommitLog) NewReader(offset int64, maxBytes int32) (io.Reader, error) {
if s == nil {
return nil, errors.Wrapf(ErrSegmentNotFound, "segments: %d, offset: %d", len(l.Segments()), offset)
}
//TODO: offset relative?
offset = offset - s.BaseOffset
e, err := s.findEntry(offset)
if err != nil {
return nil, err
}
{
log.Info.Printf("entry: %+v err: %v", e, err)
idx := s.Index
log.Info.Printf("idx: %p idx.position %d mmap: %v \n", idx, idx.position, idx.mmap[0:idx.position])
}
return &Reader{
cl: l,
idx: idx,
pos: e.Position,
}, nil
}

func (l *CommitLog) SendfileParams(offset int64, maxBytes int32) (*os.File, int64, int, error) {
var s *Segment
var idx int
if offset == 0 {
// TODO: seems hackish, should at least check if segments are set.
s, idx = l.Segments()[0], 0
} else {
s, idx = findSegment(l.Segments(), offset)
}
if s == nil {
return nil, 0, 0, errors.Wrapf(ErrSegmentNotFound, "segments: %d, offset: %d", len(l.Segments()), offset)
}
//TODO: offset relative?
offset = offset - s.BaseOffset
e, err := s.findEntry(offset)
if err != nil {
return nil, 0, 0, err
}
{
log.Info.Printf("entry: %+v err: %v", e, err)
idx := s.Index
log.Info.Printf("idx: %p idx.position %d mmap: %v \n", idx, idx.position, idx.mmap[0:idx.position])
}
file := s.File()
_ = idx
//todo : calculate fileOffset and sendSize
fileOffset := int64(0)
sendSize := s.Position
// log.Info.Println("logfile:", file.Name(),
// "fileOffset", fileOffset,
// "sendSize", sendSize)

return file, fileOffset, int(sendSize), nil
}
9 changes: 7 additions & 2 deletions commitlog/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ func (s *Segment) IsFull() bool {
defer s.Unlock()
return s.Position >= s.maxBytes
}
func (s *Segment) File() *os.File {
s.Lock()
defer s.Unlock()
return s.log
}

// Write writes a byte slice to the log at the current position.
// It increments the offset as well as sets the position to the new tail.
Expand Down Expand Up @@ -214,10 +219,10 @@ func (s *Segment) findEntry(offset int64) (e *Entry, err error) {
s.Lock()
defer s.Unlock()
e = &Entry{}
n := int(s.Index.bytes / entryWidth)
n := int(s.Index.position) / entryWidth
idx := sort.Search(n, func(i int) bool {
_ = s.Index.ReadEntryAtFileOffset(e, int64(i*entryWidth))
return e.Offset >= offset || e.Offset == 0
return e.Offset >= offset
})
if idx == n {
return nil, errors.New("entry not found")
Expand Down
69 changes: 69 additions & 0 deletions commitlog/sendfile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package commitlog

import (
"log"
"net"
"os"
"runtime"
"syscall"
)

const maxSendfileSize int = 4 << 20

func Sendfile(conn *net.TCPConn, file *os.File, offsetInt int64, size int, chunkSize int) (int, error) {
offset := &offsetInt
defer func() {
runtime.KeepAlive(offset)
}()
written := 0
var remain int = size
n := chunkSize
if chunkSize > maxSendfileSize {
chunkSize = maxSendfileSize
}
src := int(file.Fd())
rawConn, err := conn.SyscallConn()
rawConn.Write(func(dst uintptr) bool {
defer func() { log.Println("returned") }()
for remain > 0 {
if n > remain {
n = remain
}
var err1 error
log.Println("params:", n, "offset:", *offset)
//todo: for bsd and darwin, pass different offset
n, err1 = syscall.Sendfile(int(dst), src, offset, n)
log.Println("after:", n, "offset:", *offset)
if err1 != nil {
log.Println("sent error:", err1.Error())
}
if n > 0 {
written += n
remain -= n
} else if n == 0 && err1 == nil {
break
}
if err1 == syscall.EAGAIN || err1 == syscall.EWOULDBLOCK {

if n == -1 {
n = chunkSize
}
log.Println("got eagain")
return false
// waitpread, gopark
}
if err1 != nil {
// This includes syscall.ENOSYS (no kernel
// support) and syscall.EINVAL (fd types which
// don't implement sendfile)
err = err1
break
}
}
return true
})
log.Println("written", written)

return written, err

}
6 changes: 5 additions & 1 deletion jocko/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ func (b *Broker) Run(ctx context.Context, reqCtx *Context) *Context {
case *protocol.ProduceRequest:
res = b.handleProduce(reqCtx, req)
case *protocol.FetchRequest:
if b.config.UseSendfile {
b.handleFetchSendFile(reqCtx, req)
return nil
}
res = b.handleFetch(reqCtx, req)
case *protocol.OffsetsRequest:
res = b.handleOffsets(reqCtx, req)
Expand Down Expand Up @@ -196,7 +200,7 @@ func (b *Broker) Run(ctx context.Context, reqCtx *Context) *Context {

return &Context{
parent: responseCtx,
conn: reqCtx.conn,
Conn: reqCtx.Conn,
header: reqCtx.header,
res: &protocol.Response{
CorrelationID: reqCtx.header.CorrelationID,
Expand Down
6 changes: 5 additions & 1 deletion jocko/commitlog.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package jocko

import "io"
import (
"io"
"os"
)

type CommitLog interface {
Delete() error
Expand All @@ -9,4 +12,5 @@ type CommitLog interface {
NewestOffset() int64
OldestOffset() int64
Append([]byte) (int64, error)
SendfileParams(offset int64, maxBytes int32) (*os.File, int64, int, error)
}
2 changes: 2 additions & 0 deletions jocko/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Config struct {
LeaveDrainTime time.Duration
ReconcileInterval time.Duration
OffsetsTopicReplicationFactor int16
UseSendfile bool
}

// DefaultConfig creates/returns a default configuration.
Expand All @@ -48,6 +49,7 @@ func DefaultConfig() *Config {
LeaveDrainTime: 5 * time.Second,
ReconcileInterval: 60 * time.Second,
OffsetsTopicReplicationFactor: 3,
UseSendfile: true,
}

conf.SerfLANConfig.ReconnectTimeout = 3 * 24 * time.Hour
Expand Down
4 changes: 2 additions & 2 deletions jocko/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package jocko
import (
"context"
"fmt"
"io"
"net"
"sync"
"time"

Expand All @@ -12,7 +12,7 @@ import (

type Context struct {
mu sync.Mutex
conn io.ReadWriter
Conn *net.TCPConn
err error
header *protocol.RequestHeader
parent context.Context
Expand Down
12 changes: 7 additions & 5 deletions jocko/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,17 +231,19 @@ func (s *Server) handleRequest(conn net.Conn) {
ctx := opentracing.ContextWithSpan(context.Background(), span)
queueSpan := s.tracer.StartSpan("server: queue request", opentracing.ChildOf(span.Context()))
ctx = context.WithValue(ctx, requestQueueSpanKey, queueSpan)

reqCtx := &Context{
parent: ctx,
header: header,
req: req,
conn: conn,
Conn: conn.(*net.TCPConn),
}

//log.Debug.Printf("server/%d: handle request: %s", s.config.ID, reqCtx)
log.Info.Printf("server/%d: handle request: %s", s.config.ID, reqCtx)

respCtx := s.handler.Run(ctx, reqCtx)
if header.APIKey == protocol.FetchKey && s.config.UseSendfile {
continue
}
if queueSpan, ok := respCtx.Value(responseQueueSpanKey).(opentracing.Span); ok {
queueSpan.Finish()
}
Expand All @@ -255,7 +257,7 @@ func (s *Server) handleResponse(respCtx *Context) error {
psp := opentracing.SpanFromContext(respCtx)
sp := s.tracer.StartSpan("server: handle response", opentracing.ChildOf(psp.Context()))

//log.Debug.Printf("server/%d: handle response: %s", s.config.ID, respCtx)
log.Info.Printf("server/%d: handle response: %s", s.config.ID, respCtx)

defer psp.Finish()
defer sp.Finish()
Expand All @@ -264,7 +266,7 @@ func (s *Server) handleResponse(respCtx *Context) error {
if err != nil {
return err
}
_, err = respCtx.conn.Write(b)
_, err = respCtx.Conn.Write(b)
return err
}

Expand Down
6 changes: 6 additions & 0 deletions protocol/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,3 +294,9 @@ func (e *ByteEncoder) Pop() {
e.stack = e.stack[:len(e.stack)-1]
pe.Fill(e.off, e.b)
}
func (e *ByteEncoder) SetOffset(offset int) {
e.off = offset
}
func (e *ByteEncoder) GetOffset() int {
return e.off
}
8 changes: 7 additions & 1 deletion protocol/fetch_response.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package protocol

import "time"
import (
"os"
"time"
)

type AbortedTransaction struct {
ProducerID int64
Expand Down Expand Up @@ -30,6 +33,9 @@ type FetchPartitionResponse struct {
LastStableOffset int64
AbortedTransactions []*AbortedTransaction
RecordSet []byte
FileHandle *os.File
SendOffset int64
SendSize int
}

func (r *FetchPartitionResponse) Decode(d PacketDecoder, version int16) (err error) {
Expand Down

0 comments on commit 9b9bb44

Please sign in to comment.