Skip to content

Commit

Permalink
experiment sendfile
Browse files Browse the repository at this point in the history
  • Loading branch information
altkatz committed Feb 12, 2021
1 parent a47a1a2 commit 85ced8c
Show file tree
Hide file tree
Showing 13 changed files with 294 additions and 97 deletions.
2 changes: 1 addition & 1 deletion cmd/jocko/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func run(cmd *cobra.Command, args []string) {
Param: 1,
},
Reporter: &jaegercfg.ReporterConfig{
LogSpans: true,
//LogSpans: true,
},
}

Expand Down
56 changes: 56 additions & 0 deletions commitlog/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package commitlog

import (
"io"
"os"
"sync"

"github.com/travisjeffery/jocko/log"

"github.com/pkg/errors"
)

Expand All @@ -14,6 +17,17 @@ type Reader struct {
pos int64
}

func (r *Reader) FileAndOffset() (*os.File, int64, int) {
r.mu.Lock()
defer r.mu.Unlock()

segments := r.cl.Segments()
segment := segments[r.idx]
file := segment.File()
//todo size
size := 0
return file, r.pos, size
}
func (r *Reader) Read(p []byte) (n int, err error) {
r.mu.Lock()
defer r.mu.Unlock()
Expand Down Expand Up @@ -56,13 +70,55 @@ func (l *CommitLog) NewReader(offset int64, maxBytes int32) (io.Reader, error) {
if s == nil {
return nil, errors.Wrapf(ErrSegmentNotFound, "segments: %d, offset: %d", len(l.Segments()), offset)
}
//TODO: offset relative?
offset = offset - s.BaseOffset
e, err := s.findEntry(offset)
if err != nil {
return nil, err
}
{
log.Info.Printf("entry: %+v err: %v", e, err)
idx := s.Index
log.Info.Printf("idx: %p idx.position %d mmap: %v \n", idx, idx.position, idx.mmap[0:idx.position])
}
return &Reader{
cl: l,
idx: idx,
pos: e.Position,
}, nil
}

func (l *CommitLog) SendfileParams(offset int64, maxBytes int32) (*os.File, int64, int, error) {
var s *Segment
var idx int
if offset == 0 {
// TODO: seems hackish, should at least check if segments are set.
s, idx = l.Segments()[0], 0
} else {
s, idx = findSegment(l.Segments(), offset)
}
if s == nil {
return nil, 0, 0, errors.Wrapf(ErrSegmentNotFound, "segments: %d, offset: %d", len(l.Segments()), offset)
}
//TODO: offset relative?
offset = offset - s.BaseOffset
e, err := s.findEntry(offset)
if err != nil {
return nil, 0, 0, err
}
{
log.Info.Printf("entry: %+v err: %v", e, err)
idx := s.Index
log.Info.Printf("idx: %p idx.position %d mmap: %v \n", idx, idx.position, idx.mmap[0:idx.position])
}
file := s.File()
_ = idx
//todo : calculate fileOffset and sendSize
fileOffset := int64(0)
sendSize := s.Position
// log.Info.Println("logfile:", file.Name(),
// "fileOffset", fileOffset,
// "sendSize", sendSize)

return file, fileOffset, int(sendSize), nil
}
9 changes: 7 additions & 2 deletions commitlog/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ func (s *Segment) IsFull() bool {
defer s.Unlock()
return s.Position >= s.maxBytes
}
func (s *Segment) File() *os.File {
s.Lock()
defer s.Unlock()
return s.log
}

// Write writes a byte slice to the log at the current position.
// It increments the offset as well as sets the position to the new tail.
Expand Down Expand Up @@ -214,10 +219,10 @@ func (s *Segment) findEntry(offset int64) (e *Entry, err error) {
s.Lock()
defer s.Unlock()
e = &Entry{}
n := int(s.Index.bytes / entryWidth)
n := int(s.Index.position) / entryWidth
idx := sort.Search(n, func(i int) bool {
_ = s.Index.ReadEntryAtFileOffset(e, int64(i*entryWidth))
return e.Offset >= offset || e.Offset == 0
return e.Offset >= offset
})
if idx == n {
return nil, errors.New("entry not found")
Expand Down
69 changes: 69 additions & 0 deletions commitlog/sendfile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package commitlog

import (
"log"
"net"
"os"
"runtime"
"syscall"
)

const maxSendfileSize int = 4 << 20

func Sendfile(conn *net.TCPConn, file *os.File, offsetInt int64, size int, chunkSize int) (int, error) {
offset := &offsetInt
defer func() {
runtime.KeepAlive(offset)
}()
written := 0
var remain int = size
n := chunkSize
if chunkSize > maxSendfileSize {
chunkSize = maxSendfileSize
}
src := int(file.Fd())
rawConn, err := conn.SyscallConn()
rawConn.Write(func(dst uintptr) bool {
defer func() { log.Println("returned") }()
for remain > 0 {
if n > remain {
n = remain
}
var err1 error
log.Println("params:", n, "offset:", *offset)
//todo: for bsd and darwin, pass different offset
n, err1 = syscall.Sendfile(int(dst), src, offset, n)
log.Println("after:", n, "offset:", *offset)
if err1 != nil {
log.Println("sent error:", err1.Error())
}
if n > 0 {
written += n
remain -= n
} else if n == 0 && err1 == nil {
break
}
if err1 == syscall.EAGAIN || err1 == syscall.EWOULDBLOCK {

if n == -1 {
n = chunkSize
}
log.Println("got eagain")
return false
// waitpread, gopark
}
if err1 != nil {
// This includes syscall.ENOSYS (no kernel
// support) and syscall.EINVAL (fd types which
// don't implement sendfile)
err = err1
break
}
}
return true
})
log.Println("written", written)

return written, err

}
6 changes: 5 additions & 1 deletion jocko/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ func (b *Broker) Run(ctx context.Context, reqCtx *Context) *Context {
case *protocol.ProduceRequest:
res = b.handleProduce(reqCtx, req)
case *protocol.FetchRequest:
if b.config.UseSendfile {
b.handleFetchSendFile(reqCtx, req)
return nil
}
res = b.handleFetch(reqCtx, req)
case *protocol.OffsetsRequest:
res = b.handleOffsets(reqCtx, req)
Expand Down Expand Up @@ -196,7 +200,7 @@ func (b *Broker) Run(ctx context.Context, reqCtx *Context) *Context {

return &Context{
parent: responseCtx,
conn: reqCtx.conn,
Conn: reqCtx.Conn,
header: reqCtx.header,
res: &protocol.Response{
CorrelationID: reqCtx.header.CorrelationID,
Expand Down
6 changes: 5 additions & 1 deletion jocko/commitlog.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package jocko

import "io"
import (
"io"
"os"
)

type CommitLog interface {
Delete() error
Expand All @@ -9,4 +12,5 @@ type CommitLog interface {
NewestOffset() int64
OldestOffset() int64
Append([]byte) (int64, error)
SendfileParams(offset int64, maxBytes int32) (*os.File, int64, int, error)
}
2 changes: 2 additions & 0 deletions jocko/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Config struct {
LeaveDrainTime time.Duration
ReconcileInterval time.Duration
OffsetsTopicReplicationFactor int16
UseSendfile bool
}

// DefaultConfig creates/returns a default configuration.
Expand All @@ -48,6 +49,7 @@ func DefaultConfig() *Config {
LeaveDrainTime: 5 * time.Second,
ReconcileInterval: 60 * time.Second,
OffsetsTopicReplicationFactor: 3,
UseSendfile: true,
}

conf.SerfLANConfig.ReconnectTimeout = 3 * 24 * time.Hour
Expand Down
4 changes: 2 additions & 2 deletions jocko/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package jocko
import (
"context"
"fmt"
"io"
"net"
"sync"
"time"

Expand All @@ -12,7 +12,7 @@ import (

type Context struct {
mu sync.Mutex
conn io.ReadWriter
Conn *net.TCPConn
err error
header *protocol.RequestHeader
parent context.Context
Expand Down
127 changes: 127 additions & 0 deletions jocko/fetch_sendfile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package jocko

import (
"net"
"time"

"github.com/travisjeffery/jocko/commitlog"
"github.com/travisjeffery/jocko/protocol"
)

func (b *Broker) handleFetchSendFile(ctx *Context, r *protocol.FetchRequest) {
sp := span(ctx, b.tracer, "fetch")
defer sp.Finish()
resp := protocol.Response{
CorrelationID: ctx.header.CorrelationID,
}
conn := ctx.Conn
fres := &protocol.FetchResponse{
Responses: make(protocol.FetchTopicResponses, len(r.Topics)),
}
msgSetLen := 0
fres.APIVersion = r.APIVersion
maxBufSize := 0
// calculate total length of message set
//TODO calc max buf length
maxBufSize = 1024
for i, topic := range r.Topics {
fr := &protocol.FetchTopicResponse{
Topic: topic.Topic,
PartitionResponses: make([]*protocol.FetchPartitionResponse, len(topic.Partitions)),
}
for j, p := range topic.Partitions {
fpres := &protocol.FetchPartitionResponse{}
fpres.Partition = p.Partition
replica, err := b.replicaLookup.Replica(topic.Topic, p.Partition)
if err != nil {
panic(err)
}
var rdrErr error
fpres.FileHandle, fpres.SendOffset, fpres.SendSize, rdrErr = replica.Log.SendfileParams(p.FetchOffset, p.MaxBytes)
if rdrErr != nil {
panic(rdrErr)
}
msgSetLen += fpres.SendSize
//get length of record
//
fr.PartitionResponses[j] = fpres
}
fres.Responses[i] = fr
}
lenEnc := new(protocol.LenEncoder)
err := fres.Encode(lenEnc)
if err != nil {
panic(err)
}
// set length field
resp.Size = int32(lenEnc.Length + msgSetLen)
err = sendRes(&resp, maxBufSize, fres, conn)
if err != nil {
panic(err)
}
return
}
func sendRes(resp *protocol.Response,
maxSize int,
r *protocol.FetchResponse,
conn *net.TCPConn) error {
b := make([]byte, maxSize)
e := protocol.NewByteEncoder(b)
// outer response
correlationIDSize := int32(4)
e.PutInt32(resp.Size + correlationIDSize)
e.PutInt32(resp.CorrelationID)
//fetch response
var err error
if r.APIVersion >= 1 {
e.PutInt32(int32(r.ThrottleTime / time.Millisecond))
}

if err = e.PutArrayLength(len(r.Responses)); err != nil {
return err
}
for _, response := range r.Responses {
if err = e.PutString(response.Topic); err != nil {
return err
}
if err = e.PutArrayLength(len(response.PartitionResponses)); err != nil {
return err
}
for _, p := range response.PartitionResponses {
if err = sendResOfPartition(conn, p, e, r.APIVersion); err != nil {
return err
}
}
}
return nil
}
func sendResOfPartition(
conn *net.TCPConn,
r *protocol.FetchPartitionResponse,
e *protocol.ByteEncoder,
version int16) error {
e.PutInt32(r.Partition)
e.PutInt16(r.ErrorCode)
e.PutInt64(r.HighWatermark)
var err error
if version >= 4 {
e.PutInt64(r.LastStableOffset)

if err = e.PutArrayLength(len(r.AbortedTransactions)); err != nil {
return err
}
for _, t := range r.AbortedTransactions {
t.Encode(e)
}
}
e.PutInt32(int32(r.SendSize))
//log.Info.Println("encoder offset", e.GetOffset())
conn.Write(e.Bytes()[:e.GetOffset()])
e.SetOffset(0)
chunkSize := 4096
if _, err = commitlog.Sendfile(conn, r.FileHandle, r.SendOffset, r.SendSize, chunkSize); err != nil {
return err
}

return nil
}
Loading

0 comments on commit 85ced8c

Please sign in to comment.