Skip to content

Commit 399f52b

Browse files
committed
support referencing cluster authentication information from secrets
Signed-off-by: Iceber Gu <[email protected]>
1 parent 9728156 commit 399f52b

File tree

9 files changed

+224
-9
lines changed

9 files changed

+224
-9
lines changed

cmd/apiserver/app/options/options.go

+6
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626
storageoptions "github.com/clusterpedia-io/clusterpedia/pkg/storage/options"
2727
)
2828

29+
const DefaultNamespace = "clusterpedia-system"
30+
2931
type ClusterPediaServerOptions struct {
3032
MaxRequestsInFlight int
3133
MaxMutatingRequestsInFlight int
@@ -41,6 +43,7 @@ type ClusterPediaServerOptions struct {
4143
Traces *genericoptions.TracingOptions
4244
Metrics *metrics.Options
4345

46+
RunInNamespace string
4447
Storage *storageoptions.StorageOptions
4548
ResourceServer *kubeapiserver.Options
4649
}
@@ -72,6 +75,7 @@ func NewServerOptions() *ClusterPediaServerOptions {
7275
Traces: genericoptions.NewTracingOptions(),
7376
Metrics: metrics.NewOptions(),
7477

78+
RunInNamespace: DefaultNamespace,
7579
Storage: storageoptions.NewStorageOptions(),
7680
ResourceServer: kubeapiserver.NewOptions(),
7781
}
@@ -101,6 +105,7 @@ func (o *ClusterPediaServerOptions) Config() (*apiserver.Config, error) {
101105
if err != nil {
102106
return nil, err
103107
}
108+
resourceServerConfig.SecretNamespace = o.RunInNamespace
104109

105110
if err := o.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{net.ParseIP("127.0.0.1")}); err != nil {
106111
return nil, fmt.Errorf("error create self-signed certificates: %v", err)
@@ -171,6 +176,7 @@ func (o *ClusterPediaServerOptions) Flags() cliflag.NamedFlagSets {
171176
var fss cliflag.NamedFlagSets
172177

173178
genericfs := fss.FlagSet("generic")
179+
genericfs.StringVar(&o.RunInNamespace, "namespace", o.RunInNamespace, "The namespace in which the Pod is running.")
174180
genericfs.IntVar(&o.MaxRequestsInFlight, "max-requests-inflight", o.MaxRequestsInFlight, ""+
175181
"Otherwise, this flag limits the maximum number of non-mutating requests in flight, or a zero value disables the limit completely.")
176182
genericfs.IntVar(&o.MaxMutatingRequestsInFlight, "max-mutating-requests-inflight", o.MaxMutatingRequestsInFlight, ""+

cmd/clustersynchro-manager/app/config/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
type Config struct {
1616
Kubeconfig *restclient.Config
17+
Namespace string
1718
CRDClient *crdclientset.Clientset
1819
EventRecorder record.EventRecorder
1920

cmd/clustersynchro-manager/app/options/options.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import (
3131

3232
const (
3333
ClusterSynchroManagerUserAgent = "cluster-synchro-manager"
34+
35+
DefaultNamespace = "clusterpedia-system"
3436
)
3537

3638
type Options struct {
@@ -45,6 +47,7 @@ type Options struct {
4547
Metrics *MetricsOptions
4648
KubeStateMetrics *kubestatemetrics.Options
4749

50+
RunInNamespace string
4851
WorkerNumber int // WorkerNumber is the number of worker goroutines
4952
PageSizeForResourceSync int64
5053
ShardingName string
@@ -59,7 +62,7 @@ func NewClusterSynchroManagerOptions() (*Options, error) {
5962
componentbaseconfigv1alpha1.RecommendedDefaultClientConnectionConfiguration(&clientConnection)
6063

6164
leaderElection.ResourceName = "clusterpedia-clustersynchro-manager"
62-
leaderElection.ResourceNamespace = "clusterpedia-system"
65+
leaderElection.ResourceNamespace = DefaultNamespace
6366
leaderElection.ResourceLock = resourcelock.LeasesResourceLock
6467

6568
clientConnection.ContentType = runtime.ContentTypeJSON
@@ -78,6 +81,7 @@ func NewClusterSynchroManagerOptions() (*Options, error) {
7881
options.Storage = storageoptions.NewStorageOptions()
7982
options.Metrics = NewMetricsOptions()
8083
options.KubeStateMetrics = kubestatemetrics.NewOptions()
84+
options.RunInNamespace = DefaultNamespace
8185

8286
options.WorkerNumber = 5
8387
return &options, nil
@@ -92,6 +96,7 @@ func (o *Options) Flags() cliflag.NamedFlagSets {
9296
genericfs.Int32Var(&o.ClientConnection.Burst, "kube-api-burst", o.ClientConnection.Burst, "Burst to use while talking with kubernetes apiserver.")
9397
genericfs.IntVar(&o.WorkerNumber, "worker-number", o.WorkerNumber, "The number of worker goroutines.")
9498
genericfs.StringVar(&o.ShardingName, "sharding-name", o.ShardingName, "The sharding name of manager.")
99+
genericfs.StringVar(&o.RunInNamespace, "namespace", o.RunInNamespace, "The namespace in which the Pod is running.")
95100

96101
syncfs := fss.FlagSet("resource sync")
97102
syncfs.Int64Var(&o.PageSizeForResourceSync, "page-size", o.PageSizeForResourceSync, "The requested chunk size of initial and resync watch lists for resource sync")
@@ -169,8 +174,11 @@ func (o *Options) Config() (*config.Config, error) {
169174
if o.ShardingName != "" {
170175
o.LeaderElection.ResourceName = fmt.Sprintf("%s-%s", o.LeaderElection.ResourceName, o.ShardingName)
171176
}
177+
// Override the namespace for leader election resource.
178+
o.LeaderElection.ResourceNamespace = o.RunInNamespace
172179

173180
return &config.Config{
181+
Namespace: o.RunInNamespace,
174182
CRDClient: crdclient,
175183
Kubeconfig: kubeconfig,
176184
EventRecorder: eventRecorder,

pkg/kubeapiserver/apiserver.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ func NewDefaultConfig() *Config {
6666
}
6767

6868
type ExtraConfig struct {
69+
SecretNamespace string
6970
AllowPediaClusterConfigReuse bool
7071
ExtraProxyRequestHeaderPrefixes []string
7172
AllowedProxySubresources map[schema.GroupResource]sets.Set[string]
@@ -103,6 +104,7 @@ func (c *Config) Complete() CompletedConfig {
103104
type completedConfig struct {
104105
GenericConfig genericapiserver.CompletedConfig
105106

107+
SecretNamespace string
106108
StorageFactory storage.StorageFactory
107109
InformerFactory informers.SharedInformerFactory
108110
InitialAPIGroupResources []*restmapper.APIGroupResources
@@ -138,8 +140,9 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
138140
restManager := NewRESTManager(c.GenericConfig.Serializer, runtime.ContentTypeJSON, c.StorageFactory, c.InitialAPIGroupResources)
139141
discoveryManager := discovery.NewDiscoveryManager(c.GenericConfig.Serializer, restManager, delegate)
140142

143+
secretLister := c.GenericConfig.SharedInformerFactory.Core().V1().Secrets().Lister().Secrets(c.ExtraConfig.SecretNamespace)
141144
clusterInformer := c.InformerFactory.Cluster().V1alpha2().PediaClusters()
142-
connector := proxyrest.NewProxyConnector(clusterInformer.Lister(), c.ExtraConfig.AllowPediaClusterConfigReuse, c.ExtraConfig.ExtraProxyRequestHeaderPrefixes)
145+
connector := proxyrest.NewProxyConnector(clusterInformer.Lister(), secretLister, c.ExtraConfig.AllowPediaClusterConfigReuse, c.ExtraConfig.ExtraProxyRequestHeaderPrefixes)
143146

144147
methodSet := sets.New("GET")
145148
for _, rest := range proxyrest.GetSubresourceRESTs(connector) {

pkg/kubeapiserver/options.go

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
)
1313

1414
type Options struct {
15+
ClusterCertificationSecretNamespace string
16+
1517
// AllowPediaClusterConfigForProxyRequest controls all proxy requests.
1618
// TODO(iceber): Perhaps we could add a separate setting specifically for subresource proxy request.
1719
AllowPediaClusterConfigForProxyRequest bool

pkg/kubeapiserver/resourcerest/proxy/proxy_connector.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
clusterlister "github.com/clusterpedia-io/clusterpedia/pkg/generated/listers/cluster/v1alpha2"
1414
"github.com/clusterpedia-io/clusterpedia/pkg/utils"
15+
corev1listers "k8s.io/client-go/listers/core/v1"
1516
)
1617

1718
const DefaultProxyRequestHeaderPrefix = "X-Clusterpedia-Proxy-"
@@ -20,16 +21,18 @@ type Connector struct {
2021
allowConfigReuse bool
2122
extraHeaderPrefixes []string
2223
clusterLister clusterlister.PediaClusterLister
24+
secretLister corev1listers.SecretNamespaceLister
2325
}
2426

25-
func NewProxyConnector(clusterLister clusterlister.PediaClusterLister, allowPediaClusterConfigReuse bool, extraHeaderPrefixes []string) ClusterConnectionGetter {
27+
func NewProxyConnector(clusterLister clusterlister.PediaClusterLister, secretLister corev1listers.SecretNamespaceLister, allowPediaClusterConfigReuse bool, extraHeaderPrefixes []string) ClusterConnectionGetter {
2628
if len(extraHeaderPrefixes) == 0 {
2729
extraHeaderPrefixes = []string{DefaultProxyRequestHeaderPrefix}
2830
}
2931
return &Connector{
3032
allowConfigReuse: allowPediaClusterConfigReuse,
3133
extraHeaderPrefixes: extraHeaderPrefixes,
3234
clusterLister: clusterLister,
35+
secretLister: secretLister,
3336
}
3437
}
3538

@@ -100,7 +103,7 @@ func (c *Connector) GetClusterConnection(ctx context.Context, name string, req *
100103
}
101104

102105
if !authInHeader && c.allowConfigReuse {
103-
config, err = utils.BuildClusterRestConfig(cluster)
106+
config, err = utils.BuildClusterRestConfig(cluster, c.secretLister)
104107
if err != nil {
105108
return "", nil, err
106109
}

pkg/synchromanager/clustersynchro_manager.go

+87-4
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,16 @@ import (
99
"sync"
1010
"time"
1111

12+
corev1 "k8s.io/api/core/v1"
1213
"k8s.io/apimachinery/pkg/api/equality"
1314
apierrors "k8s.io/apimachinery/pkg/api/errors"
1415
"k8s.io/apimachinery/pkg/api/meta"
1516
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1617
"k8s.io/apimachinery/pkg/labels"
1718
"k8s.io/apimachinery/pkg/util/wait"
19+
v1 "k8s.io/client-go/informers/core/v1"
20+
"k8s.io/client-go/kubernetes"
21+
corev1listers "k8s.io/client-go/listers/core/v1"
1822
"k8s.io/client-go/tools/cache"
1923
"k8s.io/client-go/util/retry"
2024
"k8s.io/client-go/util/workqueue"
@@ -45,6 +49,8 @@ type Manager struct {
4549

4650
clusterpediaclient crdclientset.Interface
4751
informerFactory externalversions.SharedInformerFactory
52+
secretLister corev1listers.SecretNamespaceLister
53+
secretInformer cache.SharedIndexInformer
4854

4955
shardingName string
5056
queue workqueue.RateLimitingInterface
@@ -57,18 +63,20 @@ type Manager struct {
5763
synchrolock sync.RWMutex
5864
synchros map[string]*clustersynchro.ClusterSynchro
5965
synchroWaitGroup wait.Group
66+
67+
clusterSecretsMap sync.Map
6068
}
6169

6270
var _ kubestatemetrics.ClusterMetricsWriterListGetter = &Manager{}
6371

64-
func NewManager(client crdclientset.Interface, storage storage.StorageFactory, syncConfig clustersynchro.ClusterSyncConfig, shardingName string) *Manager {
65-
factory := externalversions.NewSharedInformerFactory(client, 0)
72+
func NewManager(client kubernetes.Interface, clusterpediaClient crdclientset.Interface, storage storage.StorageFactory, syncConfig clustersynchro.ClusterSyncConfig, shardingName string, secretNamespace string) *Manager {
73+
factory := externalversions.NewSharedInformerFactory(clusterpediaClient, 0)
6674
clusterinformer := factory.Cluster().V1alpha2().PediaClusters()
6775
clusterSyncResourcesInformer := factory.Cluster().V1alpha2().ClusterSyncResources()
6876

6977
manager := &Manager{
7078
informerFactory: factory,
71-
clusterpediaclient: client,
79+
clusterpediaclient: clusterpediaClient,
7280
shardingName: shardingName,
7381

7482
storage: storage,
@@ -83,6 +91,23 @@ func NewManager(client crdclientset.Interface, storage storage.StorageFactory, s
8391
synchros: make(map[string]*clustersynchro.ClusterSynchro),
8492
}
8593

94+
secretInformer := v1.NewSecretInformer(client, secretNamespace, 0, nil)
95+
if _, err := secretInformer.AddEventHandler(
96+
cache.ResourceEventHandlerFuncs{
97+
AddFunc: func(obj any) { manager.handleSecret(nil, obj.(*corev1.Secret)) },
98+
UpdateFunc: func(older, newer any) { manager.handleSecret(older.(*corev1.Secret), newer.(*corev1.Secret)) },
99+
DeleteFunc: func(obj any) {
100+
objName, err := cache.DeletionHandlingObjectToName(obj)
101+
if err != nil {
102+
return
103+
}
104+
manager.handleDeletedSecret(objName.Name)
105+
},
106+
},
107+
); err != nil {
108+
klog.ErrorS(err, "error when adding event handler to informer")
109+
}
110+
86111
if _, err := clusterinformer.Informer().AddEventHandler(
87112
cache.ResourceEventHandlerFuncs{
88113
AddFunc: manager.addCluster,
@@ -130,6 +155,21 @@ func (manager *Manager) Run(workers int, stopCh <-chan struct{}) {
130155

131156
// informerFactory should not be controlled by stopCh
132157
stopInformer := make(chan struct{})
158+
159+
// 优先启动 secret informer
160+
manager.secretInformer.Run(stopInformer)
161+
timeout := make(chan struct{})
162+
go func() {
163+
select {
164+
case <-stopCh:
165+
case <-time.After(60 * time.Second):
166+
}
167+
close(timeout)
168+
}()
169+
if !cache.WaitForCacheSync(timeout, manager.secretInformer.HasSynced) {
170+
klog.Fatal("clustersynchro manager: wait for secret informer failed")
171+
}
172+
133173
manager.informerFactory.Start(stopInformer)
134174
if !cache.WaitForCacheSync(stopCh, manager.clusterInformer.HasSynced) {
135175
klog.Fatal("clustersynchro manager: wait for informer factory failed")
@@ -159,6 +199,31 @@ func (manager *Manager) Run(workers int, stopCh <-chan struct{}) {
159199
klog.Info("cluster synchro manager stopped.")
160200
}
161201

202+
func (manager *Manager) handleSecret(old *corev1.Secret, obj *corev1.Secret) {
203+
if old != nil && reflect.DeepEqual(old.Data, obj.Data) {
204+
return
205+
}
206+
207+
// TODO(Iceber): 通过字段 Key 来优化 cluster 的查找
208+
manager.clusterSecretsMap.Range(func(k, v any) bool {
209+
secrets := v.(map[string]struct{})
210+
if _, ok := secrets[obj.Name]; ok {
211+
manager.enqueue(k.(string))
212+
}
213+
return true
214+
})
215+
}
216+
217+
func (manager *Manager) handleDeletedSecret(name string) {
218+
manager.clusterSecretsMap.Range(func(k, v any) bool {
219+
secrets := v.(map[string]struct{})
220+
if _, ok := secrets[name]; ok {
221+
manager.enqueue(k.(string))
222+
}
223+
return true
224+
})
225+
}
226+
162227
func (manager *Manager) addCluster(obj interface{}) {
163228
manager.enqueue(obj)
164229
}
@@ -328,7 +393,23 @@ func (manager *Manager) reconcileCluster(cluster *clusterv1alpha2.PediaCluster)
328393
synchro := manager.synchros[cluster.Name]
329394
manager.synchrolock.RUnlock()
330395

331-
config, err := utils.BuildClusterRestConfig(cluster)
396+
// TODO(Iceber): Need to optimize and simplify the following code
397+
secrets := make(map[string]struct{})
398+
if source := cluster.Spec.CertificationFrom.Cert; source != nil {
399+
secrets[source.Name] = struct{}{}
400+
}
401+
if source := cluster.Spec.CertificationFrom.Token; source != nil {
402+
secrets[source.Name] = struct{}{}
403+
}
404+
if source := cluster.Spec.CertificationFrom.KubeConfig; source != nil {
405+
secrets[source.Name] = struct{}{}
406+
}
407+
if source := cluster.Spec.CertificationFrom.Key; source != nil {
408+
secrets[source.Name] = struct{}{}
409+
}
410+
manager.clusterSecretsMap.Store(cluster.Name, secrets)
411+
412+
config, err := utils.BuildClusterRestConfig(cluster, manager.secretLister)
332413
if err != nil {
333414
klog.ErrorS(err, "Failed to build cluster config", "cluster", cluster.Name)
334415
manager.UpdateClusterAPIServerAndValidatedCondition(cluster.Name, cluster.Spec.APIServer, synchro, clusterv1alpha2.InvalidConfigReason,
@@ -453,6 +534,8 @@ func (manager *Manager) stopClusterSynchro(name string) {
453534
}
454535

455536
func (manager *Manager) removeCluster(name string) error {
537+
manager.clusterSecretsMap.Delete(name)
538+
456539
manager.synchrolock.Lock()
457540
synchro := manager.synchros[name]
458541
delete(manager.synchros, name)

0 commit comments

Comments
 (0)