Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pool: add external load balancing methods support #401

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
`ConnectionPool.controller()` and `ConnectionPool.reconnect()`
- Methods that are implemented but not included in the pooler interface (#395).
- Implemented stringer methods for pool.Role (#405).
- Add support for external load balancing methods when connecting via Pool (#400).

### Changed

Expand Down
38 changes: 38 additions & 0 deletions pool/balancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package pool

import "github.com/tarantool/go-tarantool/v2"

// BalancerFactory is an interface for creating a balancing pool of connections.
type BalancerFactory interface {
// Create initializes a new BalancingPool with the specified size.
// The size parameter indicates the intended number of connections to manage within the pool.
Create(size int) BalancingPool
}

// BalancingPool represents a connection pool with load balancing.
type BalancingPool interface {
// GetConnection returns the connection associated with the specified identifier.
// If no connection with the given identifier is found, it returns nil.
GetConnection(string) *tarantool.Connection

// DeleteConnection removes the connection with the specified identifier from the pool
// and returns the removed connection. If no connection is found, it returns nil.
DeleteConnection(string) *tarantool.Connection

// AddConnection adds a new connection to the pool under the specified identifier.
// If a connection with that identifier already exists, the behavior may depend
// on the implementation (e.g., it may overwrite the existing connection).
AddConnection(id string, conn *tarantool.Connection)

// GetNextConnection returns the next available connection from the pool.
// The implementation may use load balancing algorithms to select the connection.
GetNextConnection() *tarantool.Connection

// GetConnections returns the current map of all connections in the pool,
// where the key is the connection identifier and the value is a pointer to the connection object.

Check failure on line 32 in pool/balancer.go

View workflow job for this annotation

GitHub Actions / golangci-lint

the line is 102 characters long, which exceeds the maximum of 100 characters. (lll)
GetConnections() map[string]*tarantool.Connection
}

func IsEmpty(pool BalancingPool) bool {
return len(pool.GetConnections()) == 0
}
27 changes: 17 additions & 10 deletions pool/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ type Opts struct {
CheckTimeout time.Duration
// ConnectionHandler provides an ability to handle connection updates.
ConnectionHandler ConnectionHandler
// BalancerFactory - a factory for creating balancing,
// contains the pool size as well as the connections for which it is used.
BalancerFactory BalancerFactory
}

/*
Expand Down Expand Up @@ -110,9 +113,9 @@ type ConnectionPool struct {

state state
done chan struct{}
roPool *roundRobinStrategy
rwPool *roundRobinStrategy
anyPool *roundRobinStrategy
roPool BalancingPool
rwPool BalancingPool
anyPool BalancingPool
poolsMutex sync.RWMutex
watcherContainer watcherContainer
}
Expand Down Expand Up @@ -153,6 +156,10 @@ func newEndpoint(name string, dialer tarantool.Dialer, opts tarantool.Opts) *end
// opts. Instances must have unique names.
func ConnectWithOpts(ctx context.Context, instances []Instance,
opts Opts) (*ConnectionPool, error) {
if opts.BalancerFactory == nil {
opts.BalancerFactory = &RoundRobinFactory{}
}

unique := make(map[string]bool)
for _, instance := range instances {
if _, ok := unique[instance.Name]; ok {
Expand All @@ -166,9 +173,9 @@ func ConnectWithOpts(ctx context.Context, instances []Instance,
}

size := len(instances)
rwPool := newRoundRobinStrategy(size)
roPool := newRoundRobinStrategy(size)
anyPool := newRoundRobinStrategy(size)
rwPool := opts.BalancerFactory.Create(size)
roPool := opts.BalancerFactory.Create(size)
anyPool := opts.BalancerFactory.Create(size)

connPool := &ConnectionPool{
ends: make(map[string]*endpoint),
Expand Down Expand Up @@ -218,15 +225,15 @@ func (p *ConnectionPool) ConnectedNow(mode Mode) (bool, error) {
}
switch mode {
case ANY:
return !p.anyPool.IsEmpty(), nil
return !IsEmpty(p.anyPool), nil
case RW:
return !p.rwPool.IsEmpty(), nil
return !IsEmpty(p.rwPool), nil
case RO:
return !p.roPool.IsEmpty(), nil
return !IsEmpty(p.roPool), nil
case PreferRW:
fallthrough
case PreferRO:
return !p.rwPool.IsEmpty() || !p.roPool.IsEmpty(), nil
return !IsEmpty(p.rwPool) || !IsEmpty(p.roPool), nil
default:
return false, ErrNoHealthyInstance
}
Expand Down
28 changes: 18 additions & 10 deletions pool/round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,32 @@ import (
"github.com/tarantool/go-tarantool/v2"
)

type roundRobinStrategy struct {
var _ BalancingPool = (*RoundRobinStrategy)(nil)

type RoundRobinStrategy struct {
conns []*tarantool.Connection
indexById map[string]uint
mutex sync.RWMutex
size uint64
current uint64
}

func newRoundRobinStrategy(size int) *roundRobinStrategy {
return &roundRobinStrategy{
type RoundRobinFactory struct{}

func (r *RoundRobinFactory) Create(size int) BalancingPool {
return NewRoundRobinStrategy(size)
}

func NewRoundRobinStrategy(size int) BalancingPool {
return &RoundRobinStrategy{
conns: make([]*tarantool.Connection, 0, size),
indexById: make(map[string]uint, size),
size: 0,
current: 0,
}
}

func (r *roundRobinStrategy) GetConnection(id string) *tarantool.Connection {
func (r *RoundRobinStrategy) GetConnection(id string) *tarantool.Connection {
Comment on lines +26 to +35
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, add comments to the public API methods.

r.mutex.RLock()
defer r.mutex.RUnlock()

Expand All @@ -36,7 +44,7 @@ func (r *roundRobinStrategy) GetConnection(id string) *tarantool.Connection {
return r.conns[index]
}

func (r *roundRobinStrategy) DeleteConnection(id string) *tarantool.Connection {
func (r *RoundRobinStrategy) DeleteConnection(id string) *tarantool.Connection {
r.mutex.Lock()
defer r.mutex.Unlock()

Expand Down Expand Up @@ -64,14 +72,14 @@ func (r *roundRobinStrategy) DeleteConnection(id string) *tarantool.Connection {
return conn
}

func (r *roundRobinStrategy) IsEmpty() bool {
func (r *RoundRobinStrategy) IsEmpty() bool {
r.mutex.RLock()
defer r.mutex.RUnlock()

return r.size == 0
}

func (r *roundRobinStrategy) GetNextConnection() *tarantool.Connection {
func (r *RoundRobinStrategy) GetNextConnection() *tarantool.Connection {
r.mutex.RLock()
defer r.mutex.RUnlock()

Expand All @@ -81,7 +89,7 @@ func (r *roundRobinStrategy) GetNextConnection() *tarantool.Connection {
return r.conns[r.nextIndex()]
}

func (r *roundRobinStrategy) GetConnections() map[string]*tarantool.Connection {
func (r *RoundRobinStrategy) GetConnections() map[string]*tarantool.Connection {
r.mutex.RLock()
defer r.mutex.RUnlock()

Expand All @@ -93,7 +101,7 @@ func (r *roundRobinStrategy) GetConnections() map[string]*tarantool.Connection {
return conns
}

func (r *roundRobinStrategy) AddConnection(id string, conn *tarantool.Connection) {
func (r *RoundRobinStrategy) AddConnection(id string, conn *tarantool.Connection) {
r.mutex.Lock()
defer r.mutex.Unlock()

Expand All @@ -106,7 +114,7 @@ func (r *roundRobinStrategy) AddConnection(id string, conn *tarantool.Connection
}
}

func (r *roundRobinStrategy) nextIndex() uint64 {
func (r *RoundRobinStrategy) nextIndex() uint64 {
next := atomic.AddUint64(&r.current, 1)
return (next - 1) % r.size
}
15 changes: 8 additions & 7 deletions pool/round_robin_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package pool
package pool_test

import (
"testing"

"github.com/tarantool/go-tarantool/v2"
"github.com/tarantool/go-tarantool/v2/pool"
)

const (
Expand All @@ -12,7 +13,7 @@ const (
)

func TestRoundRobinAddDelete(t *testing.T) {
rr := newRoundRobinStrategy(10)
rr := pool.NewRoundRobinStrategy(10)

addrs := []string{validAddr1, validAddr2}
conns := []*tarantool.Connection{&tarantool.Connection{}, &tarantool.Connection{}}
Expand All @@ -26,13 +27,13 @@ func TestRoundRobinAddDelete(t *testing.T) {
t.Errorf("Unexpected connection on address %s", addr)
}
}
if !rr.IsEmpty() {
if !pool.IsEmpty(rr) {
t.Errorf("RoundRobin does not empty")
}
}

func TestRoundRobinAddDuplicateDelete(t *testing.T) {
rr := newRoundRobinStrategy(10)
rr := pool.NewRoundRobinStrategy(10)

conn1 := &tarantool.Connection{}
conn2 := &tarantool.Connection{}
Expand All @@ -43,7 +44,7 @@ func TestRoundRobinAddDuplicateDelete(t *testing.T) {
if rr.DeleteConnection(validAddr1) != conn2 {
t.Errorf("Unexpected deleted connection")
}
if !rr.IsEmpty() {
if !pool.IsEmpty(rr) {
t.Errorf("RoundRobin does not empty")
}
if rr.DeleteConnection(validAddr1) != nil {
Expand All @@ -52,7 +53,7 @@ func TestRoundRobinAddDuplicateDelete(t *testing.T) {
}

func TestRoundRobinGetNextConnection(t *testing.T) {
rr := newRoundRobinStrategy(10)
rr := pool.NewRoundRobinStrategy(10)

addrs := []string{validAddr1, validAddr2}
conns := []*tarantool.Connection{&tarantool.Connection{}, &tarantool.Connection{}}
Expand All @@ -70,7 +71,7 @@ func TestRoundRobinGetNextConnection(t *testing.T) {
}

func TestRoundRobinStrategy_GetConnections(t *testing.T) {
rr := newRoundRobinStrategy(10)
rr := pool.NewRoundRobinStrategy(10)

addrs := []string{validAddr1, validAddr2}
conns := []*tarantool.Connection{&tarantool.Connection{}, &tarantool.Connection{}}
Expand Down
Loading