Skip to content

Commit 698db70

Browse files
authored
Merge pull request kubernetes#71551 from mlmhl/scheduler_optimization
activate unschedulable pods only if the node became more schedulable
2 parents a69b565 + 2fe9b14 commit 698db70

File tree

4 files changed

+214
-1
lines changed

4 files changed

+214
-1
lines changed

pkg/scheduler/factory/BUILD

+1
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ go_test(
7474
"//pkg/scheduler/internal/queue:go_default_library",
7575
"//pkg/scheduler/util:go_default_library",
7676
"//staging/src/k8s.io/api/core/v1:go_default_library",
77+
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
7778
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
7879
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
7980
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",

pkg/scheduler/factory/factory.go

+55-1
Original file line numberDiff line numberDiff line change
@@ -998,7 +998,14 @@ func (c *configFactory) updateNodeInCache(oldObj, newObj interface{}) {
998998
}
999999

10001000
c.invalidateCachedPredicatesOnNodeUpdate(newNode, oldNode)
1001-
c.podQueue.MoveAllToActiveQueue()
1001+
// Only activate unschedulable pods if the node became more schedulable.
1002+
// We skip the node property comparison when there is no unschedulable pods in the queue
1003+
// to save processing cycles. We still trigger a move to active queue to cover the case
1004+
// that a pod being processed by the scheduler is determined unschedulable. We want this
1005+
// pod to be reevaluated when a change in the cluster happens.
1006+
if c.podQueue.NumUnschedulablePods() == 0 || nodeSchedulingPropertiesChanged(newNode, oldNode) {
1007+
c.podQueue.MoveAllToActiveQueue()
1008+
}
10021009
}
10031010

10041011
func (c *configFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, oldNode *v1.Node) {
@@ -1070,6 +1077,53 @@ func (c *configFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node,
10701077
}
10711078
}
10721079

1080+
func nodeSchedulingPropertiesChanged(newNode *v1.Node, oldNode *v1.Node) bool {
1081+
if nodeSpecUnschedulableChanged(newNode, oldNode) {
1082+
return true
1083+
}
1084+
if nodeAllocatableChanged(newNode, oldNode) {
1085+
return true
1086+
}
1087+
if nodeLabelsChanged(newNode, oldNode) {
1088+
return true
1089+
}
1090+
if nodeTaintsChanged(newNode, oldNode) {
1091+
return true
1092+
}
1093+
if nodeConditionsChanged(newNode, oldNode) {
1094+
return true
1095+
}
1096+
1097+
return false
1098+
}
1099+
1100+
func nodeAllocatableChanged(newNode *v1.Node, oldNode *v1.Node) bool {
1101+
return !reflect.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable)
1102+
}
1103+
1104+
func nodeLabelsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
1105+
return !reflect.DeepEqual(oldNode.GetLabels(), newNode.GetLabels())
1106+
}
1107+
1108+
func nodeTaintsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
1109+
return !reflect.DeepEqual(newNode.Spec.Taints, oldNode.Spec.Taints)
1110+
}
1111+
1112+
func nodeConditionsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
1113+
strip := func(conditions []v1.NodeCondition) map[v1.NodeConditionType]v1.ConditionStatus {
1114+
conditionStatuses := make(map[v1.NodeConditionType]v1.ConditionStatus, len(conditions))
1115+
for i := range conditions {
1116+
conditionStatuses[conditions[i].Type] = conditions[i].Status
1117+
}
1118+
return conditionStatuses
1119+
}
1120+
return !reflect.DeepEqual(strip(oldNode.Status.Conditions), strip(newNode.Status.Conditions))
1121+
}
1122+
1123+
func nodeSpecUnschedulableChanged(newNode *v1.Node, oldNode *v1.Node) bool {
1124+
return newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable && newNode.Spec.Unschedulable == false
1125+
}
1126+
10731127
func (c *configFactory) deleteNodeFromCache(obj interface{}) {
10741128
var node *v1.Node
10751129
switch t := obj.(type) {

pkg/scheduler/factory/factory_test.go

+144
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"time"
2525

2626
"k8s.io/api/core/v1"
27+
"k8s.io/apimachinery/pkg/api/resource"
2728
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2829
"k8s.io/apimachinery/pkg/runtime"
2930
"k8s.io/apimachinery/pkg/util/sets"
@@ -653,3 +654,146 @@ func testGetBinderFunc(expectedBinderType, podName string, extenders []algorithm
653654
t.Errorf("Expected binder %q but got %q", expectedBinderType, binderType)
654655
}
655656
}
657+
658+
func TestNodeAllocatableChanged(t *testing.T) {
659+
newQuantity := func(value int64) resource.Quantity {
660+
return *resource.NewQuantity(value, resource.BinarySI)
661+
}
662+
for _, c := range []struct {
663+
Name string
664+
Changed bool
665+
OldAllocatable v1.ResourceList
666+
NewAllocatable v1.ResourceList
667+
}{
668+
{
669+
Name: "no allocatable resources changed",
670+
Changed: false,
671+
OldAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024)},
672+
NewAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024)},
673+
},
674+
{
675+
Name: "new node has more allocatable resources",
676+
Changed: true,
677+
OldAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024)},
678+
NewAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024), v1.ResourceStorage: newQuantity(1024)},
679+
},
680+
} {
681+
oldNode := &v1.Node{Status: v1.NodeStatus{Allocatable: c.OldAllocatable}}
682+
newNode := &v1.Node{Status: v1.NodeStatus{Allocatable: c.NewAllocatable}}
683+
changed := nodeAllocatableChanged(newNode, oldNode)
684+
if changed != c.Changed {
685+
t.Errorf("nodeAllocatableChanged should be %t, got %t", c.Changed, changed)
686+
}
687+
}
688+
}
689+
690+
func TestNodeLabelsChanged(t *testing.T) {
691+
for _, c := range []struct {
692+
Name string
693+
Changed bool
694+
OldLabels map[string]string
695+
NewLabels map[string]string
696+
}{
697+
{
698+
Name: "no labels changed",
699+
Changed: false,
700+
OldLabels: map[string]string{"foo": "bar"},
701+
NewLabels: map[string]string{"foo": "bar"},
702+
},
703+
// Labels changed.
704+
{
705+
Name: "new node has more labels",
706+
Changed: true,
707+
OldLabels: map[string]string{"foo": "bar"},
708+
NewLabels: map[string]string{"foo": "bar", "test": "value"},
709+
},
710+
} {
711+
oldNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: c.OldLabels}}
712+
newNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: c.NewLabels}}
713+
changed := nodeLabelsChanged(newNode, oldNode)
714+
if changed != c.Changed {
715+
t.Errorf("Test case %q failed: should be %t, got %t", c.Name, c.Changed, changed)
716+
}
717+
}
718+
}
719+
720+
func TestNodeTaintsChanged(t *testing.T) {
721+
for _, c := range []struct {
722+
Name string
723+
Changed bool
724+
OldTaints []v1.Taint
725+
NewTaints []v1.Taint
726+
}{
727+
{
728+
Name: "no taint changed",
729+
Changed: false,
730+
OldTaints: []v1.Taint{{Key: "key", Value: "value"}},
731+
NewTaints: []v1.Taint{{Key: "key", Value: "value"}},
732+
},
733+
{
734+
Name: "taint value changed",
735+
Changed: true,
736+
OldTaints: []v1.Taint{{Key: "key", Value: "value1"}},
737+
NewTaints: []v1.Taint{{Key: "key", Value: "value2"}},
738+
},
739+
} {
740+
oldNode := &v1.Node{Spec: v1.NodeSpec{Taints: c.OldTaints}}
741+
newNode := &v1.Node{Spec: v1.NodeSpec{Taints: c.NewTaints}}
742+
changed := nodeTaintsChanged(newNode, oldNode)
743+
if changed != c.Changed {
744+
t.Errorf("Test case %q failed: should be %t, not %t", c.Name, c.Changed, changed)
745+
}
746+
}
747+
}
748+
749+
func TestNodeConditionsChanged(t *testing.T) {
750+
nodeConditionType := reflect.TypeOf(v1.NodeCondition{})
751+
if nodeConditionType.NumField() != 6 {
752+
t.Errorf("NodeCondition type has changed. The nodeConditionsChanged() function must be reevaluated.")
753+
}
754+
755+
for _, c := range []struct {
756+
Name string
757+
Changed bool
758+
OldConditions []v1.NodeCondition
759+
NewConditions []v1.NodeCondition
760+
}{
761+
{
762+
Name: "no condition changed",
763+
Changed: false,
764+
OldConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}},
765+
NewConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}},
766+
},
767+
{
768+
Name: "only LastHeartbeatTime changed",
769+
Changed: false,
770+
OldConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue, LastHeartbeatTime: metav1.Unix(1, 0)}},
771+
NewConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue, LastHeartbeatTime: metav1.Unix(2, 0)}},
772+
},
773+
{
774+
Name: "new node has more healthy conditions",
775+
Changed: true,
776+
OldConditions: []v1.NodeCondition{},
777+
NewConditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}},
778+
},
779+
{
780+
Name: "new node has less unhealthy conditions",
781+
Changed: true,
782+
OldConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}},
783+
NewConditions: []v1.NodeCondition{},
784+
},
785+
{
786+
Name: "condition status changed",
787+
Changed: true,
788+
OldConditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionFalse}},
789+
NewConditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}},
790+
},
791+
} {
792+
oldNode := &v1.Node{Status: v1.NodeStatus{Conditions: c.OldConditions}}
793+
newNode := &v1.Node{Status: v1.NodeStatus{Conditions: c.NewConditions}}
794+
changed := nodeConditionsChanged(newNode, oldNode)
795+
if changed != c.Changed {
796+
t.Errorf("Test case %q failed: should be %t, got %t", c.Name, c.Changed, changed)
797+
}
798+
}
799+
}

pkg/scheduler/internal/queue/scheduling_queue.go

+14
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ type SchedulingQueue interface {
7171
Close()
7272
// DeleteNominatedPodIfExists deletes nominatedPod from internal cache
7373
DeleteNominatedPodIfExists(pod *v1.Pod)
74+
// NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
75+
NumUnschedulablePods() int
7476
}
7577

7678
// NewSchedulingQueue initializes a new scheduling queue. If pod priority is
@@ -164,6 +166,11 @@ func (f *FIFO) Close() {
164166
// DeleteNominatedPodIfExists does nothing in FIFO.
165167
func (f *FIFO) DeleteNominatedPodIfExists(pod *v1.Pod) {}
166168

169+
// NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
170+
func (f *FIFO) NumUnschedulablePods() int {
171+
return 0
172+
}
173+
167174
// NewFIFO creates a FIFO object.
168175
func NewFIFO() *FIFO {
169176
return &FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)}
@@ -701,6 +708,13 @@ func (p *PriorityQueue) podsCompareBackoffCompleted(p1, p2 interface{}) bool {
701708
return bo1.Before(bo2)
702709
}
703710

711+
// NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
712+
func (p *PriorityQueue) NumUnschedulablePods() int {
713+
p.lock.RLock()
714+
defer p.lock.RUnlock()
715+
return len(p.unschedulableQ.pods)
716+
}
717+
704718
// UnschedulablePodsMap holds pods that cannot be scheduled. This data structure
705719
// is used to implement unschedulableQ.
706720
type UnschedulablePodsMap struct {

0 commit comments

Comments
 (0)