Skip to content

Commit

Permalink
Merge pull request #291 from YenchangChan/main
Browse files Browse the repository at this point in the history
rebalance
  • Loading branch information
YenchangChan authored Mar 11, 2024
2 parents d9a1311 + f0df3a0 commit f581646
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 96 deletions.
3 changes: 0 additions & 3 deletions common/ck.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ func ConnectClickHouse(host string, database string, opt model.ConnetOption) (*C
Username: opt.User,
Password: opt.Password,
},
Settings: clickhouse.Settings{
"max_execution_time": 60,
},
Protocol: opt.Protocol,
DialTimeout: time.Duration(10) * time.Second,
ConnOpenStrategy: clickhouse.ConnOpenInOrder,
Expand Down
16 changes: 11 additions & 5 deletions service/clickhouse/clickhouse_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1744,7 +1744,7 @@ func getShardingType(key *model.RebalanceShardingkey, conn *common.Conn) error {

func RebalanceByPartition(conf *model.CKManClickHouseConfig, rebalancer *CKRebalance) error {
var err error
if err = rebalancer.InitCKConns(); err != nil {
if err = rebalancer.InitCKConns(false); err != nil {
log.Logger.Errorf("got error %+v", err)
return err
}
Expand All @@ -1767,7 +1767,7 @@ func RebalanceByShardingkey(conf *model.CKManClickHouseConfig, rebalancer *CKReb
var err error
start := time.Now()
log.Logger.Info("[rebalance] STEP InitCKConns")
if err = rebalancer.InitCKConns(); err != nil {
if err = rebalancer.InitCKConns(true); err != nil {
log.Logger.Errorf("got error %+v", err)
return err
}
Expand All @@ -1780,19 +1780,25 @@ func RebalanceByShardingkey(conf *model.CKManClickHouseConfig, rebalancer *CKReb
return err
}
if err = rebalancer.CheckCounts(rebalancer.TmpTable); err != nil {
return err
time.Sleep(5 * time.Second)
if err = rebalancer.CheckCounts(rebalancer.TmpTable); err != nil {
return err
}
}
log.Logger.Info("[rebalance] STEP InsertPlan")
if err = rebalancer.InsertPlan(); err != nil {
return errors.Wrapf(err, "table %s.%s rebalance failed, data can be corrupted, please move back from temp table[%s] manually", rebalancer.Database, rebalancer.Table, rebalancer.TmpTable)
}
if err = rebalancer.CheckCounts(rebalancer.Table); err != nil {
return err
time.Sleep(5 * time.Second)
if err = rebalancer.CheckCounts(rebalancer.Table); err != nil {
return err
}
}
log.Logger.Info("[rebalance] STEP Cleanup")
rebalancer.Cleanup()

log.Logger.Infof("[rebalance] DONE, Elapsed: %v sec", time.Since(start).Seconds())
log.Logger.Infof("[rebalance] DONE, Total counts: %d, Elapsed: %v sec", rebalancer.OriCount, time.Since(start).Seconds())
return nil
}

Expand Down
183 changes: 95 additions & 88 deletions service/clickhouse/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,13 @@ package clickhouse
import (
"fmt"
"path/filepath"
"regexp"
"runtime"
"sort"
"strings"
"sync"

"github.com/housepower/ckman/common"
"github.com/housepower/ckman/log"
"github.com/housepower/ckman/model"
"github.com/housepower/ckman/repository"
"github.com/k0kubun/pp"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -41,6 +38,7 @@ type CKRebalance struct {
Engine string
EngineFull string
OriCount uint64
SortingKey []string
}

// TblPartitions is partitions status of a host. A host never move out and move in at the same iteration.
Expand All @@ -54,7 +52,7 @@ type TblPartitions struct {
ToMoveIn bool // plan to move some partitions in
}

func (r *CKRebalance) InitCKConns() (err error) {
func (r *CKRebalance) InitCKConns(withShardingkey bool) (err error) {
locks = make(map[string]*sync.Mutex)
for _, host := range r.Hosts {
_, err = common.ConnectClickHouse(host, model.ClickHouseDefaultDB, r.ConnOpt)
Expand All @@ -65,32 +63,56 @@ func (r *CKRebalance) InitCKConns() (err error) {
locks[host] = &sync.Mutex{}
}

conn := common.GetConnection(r.Hosts[0])
query := fmt.Sprintf("SELECT engine, engine_full FROM system.tables WHERE database = '%s' AND table = '%s'", r.Database, r.Table)
log.Logger.Debugf("query:%s", query)
rows, _ := conn.Query(query)
for rows.Next() {
err = rows.Scan(&r.Engine, &r.EngineFull)
if err != nil {
return
if withShardingkey {
conn := common.GetConnection(r.Hosts[0])
// get engine
query := fmt.Sprintf("SELECT engine, engine_full FROM system.tables WHERE database = '%s' AND table = '%s'", r.Database, r.Table)
log.Logger.Debugf("query:%s", query)
rows, _ := conn.Query(query)
for rows.Next() {
err = rows.Scan(&r.Engine, &r.EngineFull)
if err != nil {
return
}
}
}
rows.Close()
log.Logger.Infof("table: %s.%s, engine: %s, engine_full:%s", r.Database, r.Table, r.Engine, r.EngineFull)
query = fmt.Sprintf("SELECT count() FROM cluster('%s', '%s.%s')", r.Cluster, r.Database, r.Table)
if strings.Contains(r.Engine, "Replacing") {
query += " FINAL"
}
log.Logger.Debugf("query: %s", query)
rows, _ = conn.Query(query)
for rows.Next() {
err = rows.Scan(&r.OriCount)
if err != nil {
return
rows.Close()
log.Logger.Infof("table: %s.%s, engine: %s, engine_full:%s", r.Database, r.Table, r.Engine, r.EngineFull)

//get sortingkey
if strings.Contains(r.Engine, "Replacing") {
query = fmt.Sprintf("SELECT name FROM system.columns WHERE (database = '%s') AND (table = '%s') AND (is_in_sorting_key = 1)", r.Database, r.Table)
log.Logger.Debugf("query:%s", query)
rows, _ := conn.Query(query)
for rows.Next() {
var sortingkey string
err = rows.Scan(&sortingkey)
if err != nil {
return
}
r.SortingKey = append(r.SortingKey, sortingkey)
}
rows.Close()
log.Logger.Infof("table: %s.%s, sortingkey:%s", r.Database, r.Table, r.SortingKey)

}

//get original count
if strings.Contains(r.Engine, "Replacing") {
query = fmt.Sprintf("SELECT count() FROM (SELECT DISTINCT %s FROM cluster('%s', '%s.%s') FINAL)", strings.Join(r.SortingKey, ","), r.Cluster, r.Database, r.Table)
} else {
query = fmt.Sprintf("SELECT count() FROM cluster('%s', '%s.%s')", r.Cluster, r.Database, r.Table)
}
log.Logger.Debugf("query: %s", query)
rows, _ = conn.Query(query)
for rows.Next() {
err = rows.Scan(&r.OriCount)
if err != nil {
return
}
}
log.Logger.Infof("table: %s.%s, count: %d", r.Database, r.Table, r.OriCount)
rows.Close()
}
log.Logger.Infof("table: %s.%s, count: %d", r.Database, r.Table, r.OriCount)
rows.Close()
return
}

Expand Down Expand Up @@ -402,9 +424,11 @@ func (r *CKRebalance) CreateTemporaryTable() error {
}

func (r *CKRebalance) CheckCounts(tableName string) error {
query := fmt.Sprintf("SELECT count() FROM cluster('%s', '%s.%s')", r.Cluster, r.Database, tableName)
var query string
if strings.Contains(r.Engine, "Replacing") {
query += " FINAL"
query = fmt.Sprintf("SELECT count() FROM (SELECT DISTINCT %s FROM cluster('%s', '%s.%s') FINAL)", strings.Join(r.SortingKey, ","), r.Cluster, r.Database, r.Table)
} else {
query = fmt.Sprintf("SELECT count() FROM cluster('%s', '%s.%s')", r.Cluster, r.Database, r.Table)
}
log.Logger.Debugf("query: %s", query)
conn := common.GetConnection(r.Hosts[0])
Expand All @@ -428,8 +452,8 @@ func (r *CKRebalance) CheckCounts(tableName string) error {
return nil
}

// moveback from tmp_table to ori_table after rehash
func (r *CKRebalance) InsertPlan() error {
max_insert_threads := runtime.NumCPU()*3/4 + 1 // add 1 to ensure threads not zero
var lastError error
var wg sync.WaitGroup
for idx, host := range r.Hosts {
Expand All @@ -445,26 +469,43 @@ func (r *CKRebalance) InsertPlan() error {
lastError = errors.Wrap(err, host)
return
}

query = fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM cluster('%s', '%s.%s') WHERE %s %% %d = %d SETTINGS max_insert_threads=%d, max_execution_time=0",
r.Database, r.Table, r.Cluster, r.Database, r.TmpTable, ShardingFunc(r.Shardingkey), len(r.Hosts), idx, max_insert_threads)
query = fmt.Sprintf(`SELECT distinct partition_id FROM cluster('%s', 'system.parts') WHERE database = '%s' AND table = '%s' AND active=1 ORDER BY partition_id`, r.Cluster, r.Database, r.TmpTable)
log.Logger.Debugf("[%s]%s", host, query)
if err := conn.Exec(query); err != nil {
rows, err := conn.Query(query)
if err != nil {
lastError = errors.Wrap(err, host)
return
}
partitions := make([]string, 0)
for rows.Next() {
var partitionId string
err = rows.Scan(&partitionId)
if err != nil {
lastError = errors.Wrap(err, host)
return
}
partitions = append(partitions, partitionId)
}
rows.Close()
log.Logger.Debugf("host:[%s], parts: %v", host, partitions)

for i, partition := range partitions {
query = fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM cluster('%s', '%s.%s') WHERE _partition_id = '%s' AND %s %% %d = %d SETTINGS insert_deduplicate=false,max_execution_time=0,max_insert_threads=8",
r.Database, r.Table, r.Cluster, r.Database, r.TmpTable, partition, ShardingFunc(r.Shardingkey), len(r.Hosts), idx)
log.Logger.Debugf("[%s](%d/%d) %s", host, i+1, len(partitions), query)
if err = conn.Exec(query); err != nil {
lastError = errors.Wrap(err, host)
return
}
}
})
}
wg.Wait()
return lastError
}

// backup from ori_table to tmp_table
func (r *CKRebalance) MoveBackup() error {
conf, err := repository.Ps.GetClusterbyName(r.Cluster)
if err != nil {
return err
}
var wg sync.WaitGroup
var lastError error
for _, host := range r.Hosts {
Expand All @@ -473,67 +514,33 @@ func (r *CKRebalance) MoveBackup() error {
_ = common.Pool.Submit(func() {
defer wg.Done()
conn := common.GetConnection(host)
// copy data
cmd := fmt.Sprintf("ls -l %sclickhouse/data/%s/%s/ |grep -v total |awk '{print $9}'", r.DataDir, r.Database, r.Table)
sshOpts := common.SshOptions{
User: conf.SshUser,
Password: conf.SshPassword,
Port: conf.SshPort,
Host: host,
NeedSudo: conf.NeedSudo,
AuthenticateType: conf.AuthenticateType,
}
out, err := common.RemoteExecute(sshOpts, cmd)
query := fmt.Sprintf(`SELECT distinct partition_id FROM system.parts WHERE database = '%s' AND table = '%s' AND active=1 order by partition_id`, r.Database, r.Table)
log.Logger.Debugf("[%s]%s", host, query)
rows, err := conn.Query(query)
if err != nil {
lastError = errors.Wrap(err, host)
return
}
parts := make([]string, 0)
for _, file := range strings.Split(out, "\n") {
file = strings.TrimSpace(strings.TrimSuffix(file, "\r"))
reg, err := regexp.Compile(`[^_]+(_\d+){3,}$`) //parts name
if err != nil {
lastError = errors.Wrap(err, host)
return
}
if reg.MatchString(file) && !strings.HasPrefix(file, "tmp_merge") {
parts = append(parts, file)
}
}
log.Logger.Debugf("host:[%s], parts: %v", host, parts)
var cmds []string
for _, part := range parts {
cmds = append(cmds, fmt.Sprintf("cp -prf %sclickhouse/data/%s/%s/%s %sclickhouse/data/%s/%s/detached/", r.DataDir, r.Database, r.Table, part, r.DataDir, r.Database, r.TmpTable))
}
if len(cmds) > 0 {
log.Logger.Debugf("host:[%s], cmds: %v", host, cmds)
_, err = common.RemoteExecute(sshOpts, strings.Join(cmds, ";"))
partitions := make([]string, 0)
for rows.Next() {
var partitionId string
err = rows.Scan(&partitionId)
if err != nil {
lastError = errors.Wrap(err, host)
return
}
partitions = append(partitions, partitionId)
}
rows.Close()
log.Logger.Debugf("host:[%s], partitions: %v", host, partitions)

var failedParts []string
for _, part := range parts {
query := fmt.Sprintf("ALTER TABLE `%s`.`%s` ATTACH PART '%s' settings mutations_sync=1", r.Database, r.TmpTable, part)
log.Logger.Debugf("[%s]%s", host, query)
for idx, partition := range partitions {
query = fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM `%s`.`%s` WHERE _partition_id = '%s' SETTINGS insert_deduplicate=false,max_execution_time=0,max_insert_threads=8",
r.Database, r.TmpTable, r.Database, r.Table, partition)
log.Logger.Debugf("[%s](%d/%d) %s", host, idx+1, len(partitions), query)
if err = conn.Exec(query); err != nil {
failedParts = append(failedParts, part)
continue
}
}

if len(failedParts) > 0 {
max_insert_threads := runtime.NumCPU()*3/4 + 1
log.Logger.Infof("[%s]failed parts: %v, retry again", host, failedParts)
for _, part := range failedParts {
query := fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM `%s`.`%s` WHERE _part = '%s' SETTINGS max_insert_threads=%d, max_execution_time=0", r.Database, r.TmpTable, r.Database, r.Table, part, max_insert_threads)
log.Logger.Debugf("[%s]%s", host, query)
if err = conn.Exec(query); err != nil {
lastError = errors.Wrap(err, host)
return
}
lastError = errors.Wrap(err, host)
return
}
}
})
Expand Down

0 comments on commit f581646

Please sign in to comment.