Skip to content

Commit

Permalink
XiaoMi#218 [new feture] support client-user level rate limit
Browse files Browse the repository at this point in the history
  • Loading branch information
funnyAnt committed May 13, 2022
1 parent 9e16060 commit 2246ca7
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 7 deletions.
3 changes: 3 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ namespace的配置格式为json,包含分表、非分表、实例等配置信
| rw_flag | int | 读写标识, 只读=1, 读写=2 |
| rw_split | int | 是否读写分离, 非读写分离=0, 读写分离=1 |
| other_property | int | 目前用来标识是否走统计从实例, 普通用户=0, 统计用户=1 |
| open_rate_limit | bool | 是否开启流控 |
| max_rate_limit | int | 最大流控速率 |
| max_token_wait_time | int | 流控时获取不到流控颁发的令牌,最大等待时间,单位ms |

### 全局序列号配置

Expand Down
23 changes: 17 additions & 6 deletions models/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,15 @@ const (

// User meand user struct
type User struct {
UserName string `json:"user_name"`
Password string `json:"password"`
Namespace string `json:"namespace"`
RWFlag int `json:"rw_flag"` //1: 只读 2:读写
RWSplit int `json:"rw_split"` //0: 不采用读写分离 1:读写分离
OtherProperty int `json:"other_property"` // 1:统计用户
UserName string `json:"user_name"`
Password string `json:"password"`
Namespace string `json:"namespace"`
RWFlag int `json:"rw_flag"` //1: 只读 2:读写
RWSplit int `json:"rw_split"` //0: 不采用读写分离 1:读写分离
OtherProperty int `json:"other_property"` // 1:统计用户
OpenRateLimit bool `json:"open_rate_limit"` // false:非流控;true:流控
MaxRateLimit int `json:"max_rate_limit"` // 流控,最大QPS速率,默认值是0.
MaxTokenWaitTime int `json:"max_token_wait_time"` //流控,获取流控token默认最大等待时间。默认值0,表示不等待
}

func (p *User) verify() error {
Expand Down Expand Up @@ -77,5 +80,13 @@ func (p *User) verify() error {
return fmt.Errorf("invalid other property, user: %s, %d", p.UserName, p.OtherProperty)
}

if p.MaxRateLimit < 0 {
return fmt.Errorf("invalid MaxRateLimit, MaxRateLimit: %d, max_rate_limit should >= 0", p.MaxRateLimit)
}

if p.MaxTokenWaitTime < 0 {
return fmt.Errorf("invalid MaxTokenWaitTime, MaxTokenWaitTime: %d, max_token_wait_time should >= 0", p.MaxTokenWaitTime)
}

return nil
}
14 changes: 14 additions & 0 deletions proxy/server/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,22 @@ func (se *SessionExecutor) GetDatabase() string {
return se.db
}

func (se *SessionExecutor) checkFlowControl() (bool, error) {
ns := se.GetNamespace()
if ns.userProperties != nil && ns.userProperties[se.user] != nil {
return ns.userProperties[se.user].GetRateLimiterToken()
} else {
return true, nil
}
}

// ExecuteCommand execute command
func (se *SessionExecutor) ExecuteCommand(cmd byte, data []byte) Response {
if ok, err := se.checkFlowControl(); !ok {
log.Warn("flow control error: %v", err)
return CreateErrorResponse(se.status, err)
}

switch cmd {
case mysql.ComQuit:
se.handleRollback(nil)
Expand Down
46 changes: 45 additions & 1 deletion proxy/server/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
package server

import (
"context"
"errors"
"fmt"
"net"
"strconv"
"strings"
"sync"
"time"

"github.com/XiaoMi/Gaea/backend"
Expand All @@ -31,6 +33,7 @@ import (
"github.com/XiaoMi/Gaea/proxy/sequence"
"github.com/XiaoMi/Gaea/util"
"github.com/XiaoMi/Gaea/util/cache"
"golang.org/x/time/rate"
)

const (
Expand All @@ -51,6 +54,46 @@ type UserProperty struct {
RWFlag int
RWSplit int
OtherProperty int
rateLimiter *rate.Limiter
maxTokenWaitTimeMs int64 //流控,获取流控token默认最大等待时间。默认值0,表示不等待
lock sync.RWMutex
}
func (cc *UserProperty) initRateLimiter(openRateLimit bool, maxRateLimit int, maxTokenWaitTime int) {
cc.lock.Lock()
defer cc.lock.Unlock()
if !openRateLimit {
cc.rateLimiter = nil
return
}

//cc.maxRateLimit = maxRateLimit
cc.rateLimiter = rate.NewLimiter(rate.Limit(maxRateLimit), maxRateLimit)
cc.maxTokenWaitTimeMs = int64(maxTokenWaitTime)
}

func (cc *UserProperty) GetRateLimiterToken() (ok bool, err error) {
cc.lock.RLock()
defer cc.lock.RUnlock()
if cc.rateLimiter == nil {
return true, nil
}

ok = true
if cc.maxTokenWaitTimeMs > 0 {
getCtx, cancel := context.WithTimeout(context.Background(), time.Duration(cc.maxTokenWaitTimeMs)*time.Millisecond)
defer cancel()
err = cc.rateLimiter.Wait(getCtx)
if err != nil {
ok = false
}
} else {
ok = cc.rateLimiter.Allow()
if !ok {
err = fmt.Errorf("rate limit %v,can't get rate limit token now", cc.rateLimiter.Limit())
}
}

return ok, err
}

// Namespace is struct driected used by server
Expand Down Expand Up @@ -160,7 +203,8 @@ func NewNamespace(namespaceConfig *models.Namespace) (*Namespace, error) {
// init user properties
for _, user := range namespaceConfig.Users {
up := &UserProperty{RWFlag: user.RWFlag, RWSplit: user.RWSplit, OtherProperty: user.OtherProperty}
namespace.userProperties[user.UserName] = up
up.initRateLimiter(user.OpenRateLimit, user.MaxRateLimit, user.MaxTokenWaitTime)
namespace.userProperties[user.UserName] = up
}

// init backend slices
Expand Down

0 comments on commit 2246ca7

Please sign in to comment.