@@ -9,8 +9,10 @@ import (
9
9
"github.com/pluralsh/deployment-operator/api/v1alpha1"
10
10
"github.com/pluralsh/deployment-operator/internal/utils"
11
11
"github.com/pluralsh/deployment-operator/pkg/client"
12
+ "github.com/pluralsh/deployment-operator/pkg/controller"
12
13
"github.com/pluralsh/deployment-operator/pkg/ping"
13
14
"github.com/pluralsh/deployment-operator/pkg/websocket"
15
+ "github.com/pluralsh/polly/algorithms"
14
16
apierrors "k8s.io/apimachinery/pkg/api/errors"
15
17
"k8s.io/apimachinery/pkg/types"
16
18
"k8s.io/client-go/discovery"
@@ -83,26 +85,39 @@ func (s *GateReconciler) ShutdownQueue() {
83
85
s .GateQueue .ShutDown ()
84
86
}
85
87
88
+ func (s * GateReconciler ) ListGates (ctx context.Context ) * algorithms.Pager [* console.PipelineGateEdgeFragment ] {
89
+ logger := log .FromContext (ctx )
90
+ logger .Info ("create pipeline gate pager" )
91
+ fetch := func (page * string , size int64 ) ([]* console.PipelineGateEdgeFragment , * algorithms.PageInfo , error ) {
92
+ resp , err := s .ConsoleClient .GetClusterGates (page , & size )
93
+ if err != nil {
94
+ logger .Error (err , "failed to fetch gates" )
95
+ return nil , nil , err
96
+ }
97
+ pageInfo := & algorithms.PageInfo {
98
+ HasNext : resp .PagedClusterGates .PageInfo .HasNextPage ,
99
+ After : resp .PagedClusterGates .PageInfo .EndCursor ,
100
+ PageSize : size ,
101
+ }
102
+ return resp .PagedClusterGates .Edges , pageInfo , nil
103
+ }
104
+ return algorithms .NewPager [* console.PipelineGateEdgeFragment ](controller .DefaultPageSize , fetch )
105
+ }
106
+
86
107
func (s * GateReconciler ) Poll (ctx context.Context ) (done bool , err error ) {
87
108
logger := log .FromContext (ctx )
88
109
logger .V (1 ).Info ("fetching gates for cluster" )
89
110
90
- var after * string
91
- var pageSize int64
92
- pageSize = 100
93
- hasNextPage := true
111
+ pager := s .ListGates (ctx )
94
112
95
- for hasNextPage {
96
- resp , err := s . ConsoleClient . GetClusterGates ( after , & pageSize )
113
+ for pager . HasNext () {
114
+ gates , err := pager . NextPage ( )
97
115
if err != nil {
98
116
logger .Error (err , "failed to fetch gates list" )
99
117
return false , nil
100
118
}
101
119
102
- hasNextPage = resp .PagedClusterGates .PageInfo .HasNextPage
103
- after = resp .PagedClusterGates .PageInfo .EndCursor
104
-
105
- for _ , gate := range resp .PagedClusterGates .Edges {
120
+ for _ , gate := range gates {
106
121
logger .V (2 ).Info ("sending update for" , "gate" , gate .Node .ID )
107
122
s .GateQueue .Add (gate .Node .ID )
108
123
}
0 commit comments