Skip to content

Commit

Permalink
operator: the operator timeout duation depends on all the step not se…
Browse files Browse the repository at this point in the history
…parated (#5600) (#5679)

ref #5596, ref #5600

Signed-off-by: ti-chi-bot <[email protected]>
Signed-off-by: bufferflies <[email protected]>

Co-authored-by: buffer <[email protected]>
Co-authored-by: bufferflies <[email protected]>
  • Loading branch information
ti-chi-bot and bufferflies authored Jan 18, 2023
1 parent 055399d commit 5222943
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 129 deletions.
1 change: 1 addition & 0 deletions server/schedule/operator/create_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func CreateMergeRegionOperator(desc string, ci ClusterInformer, source *core.Reg
ToRegion: target.GetMeta(),
IsPassive: true,
})
op2.Sync(op1)

return []*Operator{op1, op2}, nil
}
Expand Down
28 changes: 15 additions & 13 deletions server/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,6 @@ const (
// OperatorExpireTime is the duration that when an operator is not started
// after it, the operator will be considered expired.
OperatorExpireTime = 3 * time.Second
// FastOperatorWaitTime is the duration that when an operator that is not marked
// `OpRegion` runs longer than it, the operator will be considered timeout.
FastOperatorWaitTime = 10 * time.Second
// SlowOperatorWaitTime is the duration that when an operator marked `OpRegion`
// runs longer than it, the operator will be considered timeout.
SlowOperatorWaitTime = 10 * time.Minute
)

// Operator contains execution steps generated by scheduler.
Expand All @@ -56,6 +50,7 @@ type Operator struct {
FinishedCounters []prometheus.Counter
AdditionalInfos map[string]string
ApproximateSize int64
timeout time.Duration
}

// NewOperator creates a new operator.
Expand All @@ -64,6 +59,10 @@ func NewOperator(desc, brief string, regionID uint64, regionEpoch *metapb.Region
if kind&OpAdmin != 0 {
level = core.HighPriority
}
maxDuration := float64(0)
for _, v := range steps {
maxDuration += v.Timeout(approximateSize).Seconds()
}
return &Operator{
desc: desc,
brief: brief,
Expand All @@ -76,17 +75,23 @@ func NewOperator(desc, brief string, regionID uint64, regionEpoch *metapb.Region
level: level,
AdditionalInfos: make(map[string]string),
ApproximateSize: approximateSize,
timeout: time.Duration(maxDuration) * time.Second,
}
}

// Sync some attribute with the given timeout.
func (o *Operator) Sync(other *Operator) {
o.timeout = other.timeout
}

func (o *Operator) String() string {
stepStrs := make([]string, len(o.steps))
for i := range o.steps {
stepStrs[i] = o.steps[i].String()
}
s := fmt.Sprintf("%s {%s} (kind:%s, region:%v(%v, %v), createAt:%s, startAt:%s, currentStep:%v, size:%d, steps:[%s])",
s := fmt.Sprintf("%s {%s} (kind:%s, region:%v(%v, %v), createAt:%s, startAt:%s, currentStep:%v, size:%d, steps:[%s],timeout:[%s])",
o.desc, o.brief, o.kind, o.regionID, o.regionEpoch.GetVersion(), o.regionEpoch.GetConfVer(), o.GetCreateTime(),
o.GetStartTime(), atomic.LoadInt32(&o.currentStep), o.ApproximateSize, strings.Join(stepStrs, ", "))
o.GetStartTime(), atomic.LoadInt32(&o.currentStep), o.ApproximateSize, strings.Join(stepStrs, ", "), o.timeout.String())
if o.CheckSuccess() {
s += " finished"
}
Expand Down Expand Up @@ -224,15 +229,12 @@ func (o *Operator) CheckExpired() bool {
return o.status.CheckExpired(OperatorExpireTime)
}

// CheckTimeout checks if the operator is timeout, and update the status.
// CheckTimeout returns true if the operator is timeout, and update the status.
func (o *Operator) CheckTimeout() bool {
if o.CheckSuccess() {
return false
}
if startTime, step := o.getCurrentTimeAndStep(); step != nil {
return o.status.CheckStepTimeout(startTime, step, o.ApproximateSize)
}
return false
return o.status.CheckTimeout(o.timeout)
}

// Len returns the operator's steps count.
Expand Down
88 changes: 26 additions & 62 deletions server/schedule/operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package operator
import (
"context"
"encoding/json"
"fmt"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -119,9 +120,8 @@ func (s *testOperatorSuite) TestOperator(c *C) {
op.Start()
c.Assert(op.Check(region), IsNil)
c.Assert(op.Status(), Equals, SUCCESS)
SetOperatorStatusReachTime(op, STARTED, time.Now().Add(-SlowOperatorWaitTime-time.Second))
SetOperatorStatusReachTime(op, STARTED, time.Now().Add(-SlowStepWaitTime-time.Second))
c.Assert(op.CheckTimeout(), IsFalse)

// addPeer1, transferLeader1, removePeer2
steps = []OpStep{
AddPeer{ToStore: 1, PeerID: 1},
Expand All @@ -131,13 +131,12 @@ func (s *testOperatorSuite) TestOperator(c *C) {
op = s.newTestOperator(1, OpLeader|OpRegion, steps...)
s.checkSteps(c, op, steps)
op.Start()
c.Assert(op.Check(region), Equals, RemovePeer{FromStore: 2})
c.Assert(atomic.LoadInt32(&op.currentStep), Equals, int32(2))
c.Assert(RemovePeer{FromStore: 2}, Equals, op.Check(region))
c.Assert(int32(2), Equals, atomic.LoadInt32(&op.currentStep))
c.Assert(op.CheckTimeout(), IsFalse)
SetOperatorStatusReachTime(op, STARTED, op.GetStartTime().Add(-FastOperatorWaitTime-time.Second))
SetOperatorStatusReachTime(op, STARTED, op.GetStartTime().Add(-FastStepWaitTime-2*FastStepWaitTime+time.Second))
c.Assert(op.CheckTimeout(), IsFalse)
op.stepsTime[op.currentStep-1] = op.GetReachTimeOf(STARTED).Unix()
SetOperatorStatusReachTime(op, STARTED, op.GetStartTime().Add(-SlowOperatorWaitTime-time.Second))
SetOperatorStatusReachTime(op, STARTED, op.GetStartTime().Add(-SlowStepWaitTime-2*FastStepWaitTime-time.Second))
c.Assert(op.CheckTimeout(), IsTrue)
res, err := json.Marshal(op)
c.Assert(err, IsNil)
Expand All @@ -147,17 +146,19 @@ func (s *testOperatorSuite) TestOperator(c *C) {
steps = []OpStep{TransferLeader{FromStore: 2, ToStore: 1}}
op = s.newTestOperator(1, OpLeader, steps...)
op.Start()

c.Assert(op.CheckTimeout(), IsFalse)
SetOperatorStatusReachTime(op, STARTED, op.GetStartTime().Add(-FastOperatorWaitTime-time.Second))
SetOperatorStatusReachTime(op, STARTED, op.GetStartTime().Add(-FastStepWaitTime-time.Second))
c.Assert(op.CheckTimeout(), IsTrue)

// case2: check timeout operator will return false not panic.
op = NewTestOperator(1, &metapb.RegionEpoch{}, OpRegion, TransferLeader{ToStore: 1, FromStore: 4})
op.currentStep = 1

c.Assert(op.status.To(STARTED), IsTrue)
c.Assert(op.status.To(TIMEOUT), IsTrue)
c.Assert(op.CheckSuccess(), IsFalse)
c.Assert(op.CheckTimeout(), IsFalse)
c.Assert(op.CheckTimeout(), IsTrue)
}

func (s *testOperatorSuite) TestInfluence(c *C) {
Expand Down Expand Up @@ -309,7 +310,7 @@ func (s *testOperatorSuite) TestCheckTimeout(c *C) {
c.Assert(op.Status(), Equals, CREATED)
c.Assert(op.Start(), IsTrue)
op.currentStep = int32(len(op.steps))
SetOperatorStatusReachTime(op, STARTED, time.Now().Add(-SlowOperatorWaitTime))
SetOperatorStatusReachTime(op, STARTED, time.Now().Add(-SlowStepWaitTime))
c.Assert(op.CheckTimeout(), IsFalse)
c.Assert(op.Status(), Equals, SUCCESS)
}
Expand Down Expand Up @@ -370,7 +371,7 @@ func (s *testOperatorSuite) TestCheck(c *C) {
c.Assert(op.Start(), IsTrue)
c.Assert(op.Check(region), NotNil)
c.Assert(op.Status(), Equals, STARTED)
op.stepsTime[op.currentStep-1] = time.Now().Add(-SlowOperatorWaitTime).Unix()
SetOperatorStatusReachTime(op, STARTED, time.Now().Add(-SlowStepWaitTime-2*FastStepWaitTime))
c.Assert(op.Check(region), NotNil)
c.Assert(op.Status(), Equals, TIMEOUT)
}
Expand All @@ -385,7 +386,7 @@ func (s *testOperatorSuite) TestCheck(c *C) {
c.Assert(op.Start(), IsTrue)
c.Assert(op.Check(region), NotNil)
c.Assert(op.Status(), Equals, STARTED)
op.status.setTime(STARTED, time.Now().Add(-SlowOperatorWaitTime))
op.status.setTime(STARTED, time.Now().Add(-SlowStepWaitTime))
region = s.newTestRegion(1, 1, [2]uint64{1, 1})
c.Assert(op.Check(region), IsNil)
c.Assert(op.Status(), Equals, SUCCESS)
Expand Down Expand Up @@ -432,83 +433,46 @@ func (s *testOperatorSuite) TestOpStepTimeout(c *C) {
testdata := []struct {
step []OpStep
regionSize int64
start time.Time
expect bool
expect time.Duration
}{
{
// case1: 10GB region will have 60,000s to executor.
step: []OpStep{AddLearner{}, AddPeer{}},
regionSize: 10 * 1000,
start: time.Now().Add(-(time.Second*(6*10*1000) + time.Second)),
expect: true,
},
{
step: []OpStep{AddLearner{}, AddPeer{}},
regionSize: 10 * 1000,
start: time.Now().Add(-(time.Second*(6*10*1000) - time.Second)),
expect: false,
expect: time.Second * (6 * 10 * 1000),
}, {
// case2: 10MB region will have at least SlowOperatorWaitTime(10min) to executor.
// case2: 10MB region will have at least SlowStepWaitTime(10min) to executor.
step: []OpStep{AddLearner{}, AddPeer{}},
regionSize: 10,
start: time.Now().Add(-(SlowOperatorWaitTime + time.Second)),
expect: true,
}, {
step: []OpStep{AddLearner{}, AddPeer{}},
regionSize: 10,
start: time.Now().Add(-(time.Second*(6*10) - time.Second)),
expect: false,
expect: SlowStepWaitTime,
}, {
// case3: 10GB region will have 1000s to executor for RemovePeer, TransferLeader, SplitRegion, PromoteLearner.
step: []OpStep{RemovePeer{}, TransferLeader{}, SplitRegion{}, PromoteLearner{}},
start: time.Now().Add(-(time.Second*(1000) + time.Second)),
regionSize: 10 * 1000,
expect: true,
expect: time.Second * (10 * 1000 * 0.6),
}, {
// case4: 10MB will have at lease FastStepWaitTime(10s) to executor for RemovePeer, TransferLeader, SplitRegion, PromoteLearner.
step: []OpStep{RemovePeer{}, TransferLeader{}, SplitRegion{}, PromoteLearner{}},
start: time.Now().Add(-(time.Second*(1000) - time.Second)),
regionSize: 10 * 1000,
expect: false,
}, {
// case4: 10MB will have at lease FastOperatorWaitTime(10s) to executor for RemovePeer, TransferLeader, SplitRegion, PromoteLearner.
step: []OpStep{RemovePeer{}, TransferLeader{}, SplitRegion{}, PromoteLearner{}},
start: time.Now().Add(-(FastOperatorWaitTime + time.Second)),
regionSize: 10,
expect: true,
}, {
step: []OpStep{RemovePeer{}, TransferLeader{}, SplitRegion{}, PromoteLearner{}},
start: time.Now().Add(-(FastOperatorWaitTime - time.Second)),
regionSize: 10,
expect: false,
expect: FastStepWaitTime,
}, {
// case5: 10GB region will have 1000*3 for ChangePeerV2Enter, ChangePeerV2Leave.
step: []OpStep{ChangePeerV2Enter{PromoteLearners: []PromoteLearner{{}, {}}},
ChangePeerV2Leave{PromoteLearners: []PromoteLearner{{}, {}}}},
start: time.Now().Add(-(time.Second*(3000) + time.Second)),
regionSize: 10 * 1000,
expect: true,
}, {
step: []OpStep{ChangePeerV2Enter{PromoteLearners: []PromoteLearner{{}, {}}},
ChangePeerV2Leave{PromoteLearners: []PromoteLearner{{}, {}}}},
start: time.Now().Add(-(time.Second*(3000) - time.Second)),
regionSize: 10 * 1000,
expect: false,
expect: time.Second * (10 * 1000 * 0.6 * 3),
}, {
//case6: 10GB region will have 1000*10s for ChangePeerV2Enter, ChangePeerV2Leave.
step: []OpStep{MergeRegion{}},
start: time.Now().Add(-(time.Second*(10000) + time.Second)),
regionSize: 10 * 1000,
expect: true,
}, {
step: []OpStep{MergeRegion{}},
start: time.Now().Add(-(time.Second*(10000) - time.Second)),
regionSize: 10 * 1000,
expect: false,
expect: time.Second * (10 * 1000 * 0.6 * 10),
},
}
for _, v := range testdata {

for i, v := range testdata {
fmt.Printf("case:%d\n", i)
for _, step := range v.step {
c.Assert(v.expect, Equals, step.Timeout(v.start, v.regionSize))
c.Assert(v.expect, Equals, step.Timeout(v.regionSize))
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions server/schedule/operator/status_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,13 @@ func (trk *OpStatusTracker) CheckExpired(exp time.Duration) bool {
return trk.current == EXPIRED
}

// CheckStepTimeout checks if timeout, and update the current status.
func (trk *OpStatusTracker) CheckStepTimeout(start time.Time, step OpStep, approximateSize int64) bool {
// CheckTimeout returns true if timeout, and update the current status.
func (trk *OpStatusTracker) CheckTimeout(duration time.Duration) bool {
trk.rw.Lock()
defer trk.rw.Unlock()
if trk.current == STARTED {
if !step.Timeout(start, approximateSize) {
start := trk.getTime(STARTED)
if time.Since(start) < duration {
return false
}
_ = trk.toLocked(TIMEOUT)
Expand Down
12 changes: 7 additions & 5 deletions server/schedule/operator/status_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,21 +120,23 @@ func (s *testOpStatusTrackerSuite) TestCheckStepTimeout(c *C) {
start time.Time
status OpStatus
}{{
step: AddLearner{},
start: time.Now().Add(-(SlowOperatorWaitTime - 1*time.Second)),
step: AddLearner{},

start: time.Now().Add(-(SlowStepWaitTime - time.Second)),
status: STARTED,
}, {
step: AddLearner{},
start: time.Now().Add(-(SlowOperatorWaitTime + 1*time.Second)),
start: time.Now().Add(-(SlowStepWaitTime + time.Second)),
status: TIMEOUT,
}}

for _, v := range testdata {
// Timeout and status changed
trk := NewOpStatusTracker()
trk.To(STARTED)
c.Assert(trk.CheckStepTimeout(v.start, v.step, 0), Equals, v.status == TIMEOUT)
c.Assert(trk.Status(), Equals, v.status)
trk.reachTimes[STARTED] = v.start
c.Assert(v.status == TIMEOUT, Equals, trk.CheckTimeout(SlowStepWaitTime))
c.Assert(v.status, Equals, trk.Status())
}
}

Expand Down
Loading

0 comments on commit 5222943

Please sign in to comment.