Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add basic consumer group support, only tested with range assignment #172

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/jocko/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func run(cmd *cobra.Command, args []string) {
Param: 1,
},
Reporter: &jaegercfg.ReporterConfig{
LogSpans: true,
//LogSpans: true,
},
}

Expand Down
56 changes: 56 additions & 0 deletions commitlog/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package commitlog

import (
"io"
"os"
"sync"

"github.com/travisjeffery/jocko/log"

"github.com/pkg/errors"
)

Expand All @@ -14,6 +17,17 @@ type Reader struct {
pos int64
}

func (r *Reader) FileAndOffset() (*os.File, int64, int) {
r.mu.Lock()
defer r.mu.Unlock()

segments := r.cl.Segments()
segment := segments[r.idx]
file := segment.File()
//todo size
size := 0
return file, r.pos, size
}
func (r *Reader) Read(p []byte) (n int, err error) {
r.mu.Lock()
defer r.mu.Unlock()
Expand Down Expand Up @@ -56,13 +70,55 @@ func (l *CommitLog) NewReader(offset int64, maxBytes int32) (io.Reader, error) {
if s == nil {
return nil, errors.Wrapf(ErrSegmentNotFound, "segments: %d, offset: %d", len(l.Segments()), offset)
}
//TODO: offset relative?
offset = offset - s.BaseOffset
e, err := s.findEntry(offset)
if err != nil {
return nil, err
}
{
log.Info.Printf("entry: %+v err: %v", e, err)
idx := s.Index
log.Info.Printf("idx: %p idx.position %d mmap: %v \n", idx, idx.position, idx.mmap[0:idx.position])
}
return &Reader{
cl: l,
idx: idx,
pos: e.Position,
}, nil
}

func (l *CommitLog) SendfileParams(offset int64, maxBytes int32) (*os.File, int64, int, error) {
var s *Segment
var idx int
if offset == 0 {
// TODO: seems hackish, should at least check if segments are set.
s, idx = l.Segments()[0], 0
} else {
s, idx = findSegment(l.Segments(), offset)
}
if s == nil {
return nil, 0, 0, errors.Wrapf(ErrSegmentNotFound, "segments: %d, offset: %d", len(l.Segments()), offset)
}
//TODO: offset relative?
offset = offset - s.BaseOffset
e, err := s.findEntry(offset)
if err != nil {
return nil, 0, 0, err
}
{
log.Info.Printf("entry: %+v err: %v", e, err)
idx := s.Index
log.Info.Printf("idx: %p idx.position %d mmap: %v \n", idx, idx.position, idx.mmap[0:idx.position])
}
file := s.File()
_ = idx
//todo : calculate fileOffset and sendSize
fileOffset := int64(0)
sendSize := s.Position
// log.Info.Println("logfile:", file.Name(),
// "fileOffset", fileOffset,
// "sendSize", sendSize)

return file, fileOffset, int(sendSize), nil
}
9 changes: 7 additions & 2 deletions commitlog/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ func (s *Segment) IsFull() bool {
defer s.Unlock()
return s.Position >= s.maxBytes
}
func (s *Segment) File() *os.File {
s.Lock()
defer s.Unlock()
return s.log
}

// Write writes a byte slice to the log at the current position.
// It increments the offset as well as sets the position to the new tail.
Expand Down Expand Up @@ -214,10 +219,10 @@ func (s *Segment) findEntry(offset int64) (e *Entry, err error) {
s.Lock()
defer s.Unlock()
e = &Entry{}
n := int(s.Index.bytes / entryWidth)
n := int(s.Index.position) / entryWidth
idx := sort.Search(n, func(i int) bool {
_ = s.Index.ReadEntryAtFileOffset(e, int64(i*entryWidth))
return e.Offset >= offset || e.Offset == 0
return e.Offset >= offset
})
if idx == n {
return nil, errors.New("entry not found")
Expand Down
65 changes: 65 additions & 0 deletions commitlog/sendfile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package commitlog

import (
"log"
"net"
"os"
"syscall"
)

const maxSendfileSize int = 4 << 20

func Sendfile(conn *net.TCPConn, file *os.File, offsetInt int64, size int, chunkSize int) (int, error) {
offset := &offsetInt
written := 0
var remain int = size
n := chunkSize
if chunkSize > maxSendfileSize {
chunkSize = maxSendfileSize
}
src := int(file.Fd())
rawConn, err := conn.SyscallConn()
rawConn.Write(func(dst uintptr) bool {
defer func() { log.Println("returned") }()
for remain > 0 {
if n > remain {
n = remain
}
var err1 error
log.Println("params:", n, "offset:", *offset)
//todo: for bsd and darwin, pass different offset
n, err1 = syscall.Sendfile(int(dst), src, offset, n)
log.Println("after:", n, "offset:", *offset)
if err1 != nil {
log.Println("sent error:", err1.Error())
}
if n > 0 {
written += n
remain -= n
} else if n == 0 && err1 == nil {
break
}
if err1 == syscall.EAGAIN || err1 == syscall.EWOULDBLOCK {

if n == -1 {
n = chunkSize
}
log.Println("got eagain")
return false
// waitpread, gopark
}
if err1 != nil {
// This includes syscall.ENOSYS (no kernel
// support) and syscall.EINVAL (fd types which
// don't implement sendfile)
err = err1
break
}
}
return true
})
log.Println("written", written)

return written, err

}
20 changes: 10 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/travisjeffery/jocko
go 1.12

require (
github.com/Shopify/sarama v1.13.0
github.com/Shopify/sarama v1.27.0
github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973
Expand All @@ -13,12 +13,12 @@ require (
github.com/cespare/xxhash v1.0.0
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd
github.com/davecgh/go-spew v1.1.1
github.com/eapache/go-resiliency v1.0.0
github.com/eapache/go-xerial-snappy v0.0.0-20160609142408-bb955e01b934
github.com/eapache/queue v1.0.2
github.com/eapache/go-resiliency v1.2.0
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21
github.com/eapache/queue v1.1.0
github.com/go-kit/kit v0.6.0
github.com/golang/protobuf v1.2.0
github.com/golang/snappy v0.0.0-20170215233205-553a64147049
github.com/golang/snappy v0.0.1
github.com/hashicorp/consul v1.0.3
github.com/hashicorp/errwrap v1.0.0
github.com/hashicorp/go-immutable-radix v1.0.0
Expand All @@ -36,27 +36,27 @@ require (
github.com/miekg/dns v1.0.14
github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77
github.com/opentracing/opentracing-go v1.0.2
github.com/pierrec/lz4 v1.0.1
github.com/pierrec/lz4 v2.5.2+incompatible
github.com/pierrec/xxHash v0.1.1
github.com/pkg/errors v0.8.1
github.com/pmezard/go-difflib v1.0.0
github.com/prometheus/client_golang v0.9.2
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a
github.com/rcrowley/go-metrics v0.0.0-20161128210544-1f30fe9094a5
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0
github.com/satori/go.uuid v1.2.0
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529
github.com/spf13/cobra v0.0.1
github.com/spf13/pflag v1.0.0
github.com/stretchr/testify v1.3.0
github.com/stretchr/testify v1.6.0
github.com/tj/go-gracefully v0.0.0-20141227061038-005c1d102f1b
github.com/travisjeffery/go-dynaport v0.0.0-20171203090423-24009f4f2f49
github.com/tysontate/gommap v0.0.0-20131202084435-e87a6e482c2c
github.com/uber/jaeger-client-go v2.11.2+incompatible
github.com/uber/jaeger-lib v1.3.1
github.com/ugorji/go v0.0.0-20180112141927-9831f2c3ac10
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5
golang.org/x/net v0.0.0-20200528225125-3c3fba18258b
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd
upspin.io v0.0.0-20180517055408-63f1073c7a3a
)
Loading