@@ -50,6 +50,7 @@ import (
50
50
"k8s.io/apiserver/pkg/util/flag"
51
51
"k8s.io/client-go/dynamic"
52
52
clientset "k8s.io/client-go/kubernetes"
53
+ certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
53
54
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
54
55
restclient "k8s.io/client-go/rest"
55
56
"k8s.io/client-go/tools/clientcmd"
@@ -537,66 +538,39 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan
537
538
return err
538
539
}
539
540
540
- if s .BootstrapKubeconfig != "" {
541
- if err := bootstrap .LoadClientCert (s .KubeConfig , s .BootstrapKubeconfig , s .CertDirectory , nodeName ); err != nil {
542
- return err
543
- }
544
- }
545
-
546
541
// if in standalone mode, indicate as much by setting all clients to nil
547
- if standaloneMode {
542
+ switch {
543
+ case standaloneMode :
548
544
kubeDeps .KubeClient = nil
549
545
kubeDeps .DynamicKubeClient = nil
550
546
kubeDeps .EventClient = nil
551
547
kubeDeps .HeartbeatClient = nil
552
548
klog .Warningf ("standalone mode, no API client" )
553
- } else if kubeDeps .KubeClient == nil || kubeDeps .EventClient == nil || kubeDeps .HeartbeatClient == nil || kubeDeps .DynamicKubeClient == nil {
554
- // initialize clients if not standalone mode and any of the clients are not provided
555
- var kubeClient clientset.Interface
556
- var eventClient v1core.EventsGetter
557
- var heartbeatClient clientset.Interface
558
- var dynamicKubeClient dynamic.Interface
559
-
560
- clientConfig , err := createAPIServerClientConfig (s )
561
- if err != nil {
562
- return fmt .Errorf ("invalid kubeconfig: %v" , err )
563
- }
564
549
565
- var clientCertificateManager certificate.Manager
566
- if s .RotateCertificates && utilfeature .DefaultFeatureGate .Enabled (features .RotateKubeletClientCertificate ) {
567
- clientCertificateManager , err = kubeletcertificate .NewKubeletClientCertificateManager (s .CertDirectory , nodeName , clientConfig .CertData , clientConfig .KeyData , clientConfig .CertFile , clientConfig .KeyFile )
568
- if err != nil {
569
- return err
570
- }
571
- }
572
- // we set exitAfter to five minutes because we use this client configuration to request new certs - if we are unable
573
- // to request new certs, we will be unable to continue normal operation. Exiting the process allows a wrapper
574
- // or the bootstrapping credentials to potentially lay down new initial config.
575
- closeAllConns , err := kubeletcertificate .UpdateTransport (wait .NeverStop , clientConfig , clientCertificateManager , 5 * time .Minute )
550
+ case kubeDeps .KubeClient == nil , kubeDeps .EventClient == nil , kubeDeps .HeartbeatClient == nil , kubeDeps .DynamicKubeClient == nil :
551
+ clientConfig , closeAllConns , err := buildKubeletClientConfig (s , nodeName )
576
552
if err != nil {
577
553
return err
578
554
}
555
+ kubeDeps .OnHeartbeatFailure = closeAllConns
579
556
580
- kubeClient , err = clientset .NewForConfig (clientConfig )
557
+ kubeDeps . KubeClient , err = clientset .NewForConfig (clientConfig )
581
558
if err != nil {
582
- klog .Warningf ("New kubeClient from clientConfig error: %v" , err )
583
- } else if kubeClient .CertificatesV1beta1 () != nil && clientCertificateManager != nil {
584
- klog .V (2 ).Info ("Starting client certificate rotation." )
585
- clientCertificateManager .SetCertificateSigningRequestClient (kubeClient .CertificatesV1beta1 ().CertificateSigningRequests ())
586
- clientCertificateManager .Start ()
559
+ return fmt .Errorf ("failed to initialize kubelet client: %v" , err )
587
560
}
588
- dynamicKubeClient , err = dynamic .NewForConfig (clientConfig )
561
+
562
+ kubeDeps .DynamicKubeClient , err = dynamic .NewForConfig (clientConfig )
589
563
if err != nil {
590
- klog . Warningf ( "Failed to initialize dynamic KubeClient : %v" , err )
564
+ return fmt . Errorf ( "failed to initialize kubelet dynamic client : %v" , err )
591
565
}
592
566
593
567
// make a separate client for events
594
568
eventClientConfig := * clientConfig
595
569
eventClientConfig .QPS = float32 (s .EventRecordQPS )
596
570
eventClientConfig .Burst = int (s .EventBurst )
597
- eventClient , err = v1core .NewForConfig (& eventClientConfig )
571
+ kubeDeps . EventClient , err = v1core .NewForConfig (& eventClientConfig )
598
572
if err != nil {
599
- klog . Warningf ( "Failed to create API Server client for Events : %v" , err )
573
+ return fmt . Errorf ( "failed to initialize kubelet event client: %v" , err )
600
574
}
601
575
602
576
// make a separate client for heartbeat with throttling disabled and a timeout attached
@@ -610,28 +584,18 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan
610
584
}
611
585
}
612
586
heartbeatClientConfig .QPS = float32 (- 1 )
613
- heartbeatClient , err = clientset .NewForConfig (& heartbeatClientConfig )
587
+ kubeDeps . HeartbeatClient , err = clientset .NewForConfig (& heartbeatClientConfig )
614
588
if err != nil {
615
- klog . Warningf ( "Failed to create API Server client for heartbeat : %v" , err )
589
+ return fmt . Errorf ( "failed to initialize kubelet heartbeat client: %v" , err )
616
590
}
617
591
618
- // csiClient works with CRDs that support json only
619
- clientConfig .ContentType = "application/json"
620
- csiClient , err := csiclientset .NewForConfig (clientConfig )
592
+ // CRDs are JSON only, and client renegotiation for streaming is not correct as per #67803
593
+ csiClientConfig := restclient .CopyConfig (clientConfig )
594
+ csiClientConfig .ContentType = "application/json"
595
+ kubeDeps .CSIClient , err = csiclientset .NewForConfig (csiClientConfig )
621
596
if err != nil {
622
- klog .Warningf ("Failed to create CSI API client: %v" , err )
623
- }
624
-
625
- kubeDeps .KubeClient = kubeClient
626
- kubeDeps .DynamicKubeClient = dynamicKubeClient
627
- if heartbeatClient != nil {
628
- kubeDeps .HeartbeatClient = heartbeatClient
629
- kubeDeps .OnHeartbeatFailure = closeAllConns
630
- }
631
- if eventClient != nil {
632
- kubeDeps .EventClient = eventClient
597
+ return fmt .Errorf ("failed to initialize kubelet storage client: %v" , err )
633
598
}
634
- kubeDeps .CSIClient = csiClient
635
599
}
636
600
637
601
// If the kubelet config controller is available, and dynamic config is enabled, start the config and status sync loops
@@ -771,6 +735,118 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan
771
735
return nil
772
736
}
773
737
738
+ // buildKubeletClientConfig constructs the appropriate client config for the kubelet depending on whether
739
+ // bootstrapping is enabled or client certificate rotation is enabled.
740
+ func buildKubeletClientConfig (s * options.KubeletServer , nodeName types.NodeName ) (* restclient.Config , func (), error ) {
741
+ if s .RotateCertificates && utilfeature .DefaultFeatureGate .Enabled (features .RotateKubeletClientCertificate ) {
742
+ // Rules for client rotation and the handling of kube config files:
743
+ //
744
+ // 1. If the client provides only a kubeconfig file, we must use that as the initial client
745
+ // kubeadm needs the initial data in the kubeconfig to be placed into the cert store
746
+ // 2. If the client provides only an initial bootstrap kubeconfig file, we must create a
747
+ // kubeconfig file at the target location that points to the cert store, but until
748
+ // the file is present the client config will have no certs
749
+ // 3. If the client provides both and the kubeconfig is valid, we must ignore the bootstrap
750
+ // kubeconfig.
751
+ // 4. If the client provides both and the kubeconfig is expired or otherwise invalid, we must
752
+ // replace the kubeconfig with a new file that points to the cert dir
753
+ //
754
+ // The desired configuration for bootstrapping is to use a bootstrap kubeconfig and to have
755
+ // the kubeconfig file be managed by this process. For backwards compatibility with kubeadm,
756
+ // which provides a high powered kubeconfig on the master with cert/key data, we must
757
+ // bootstrap the cert manager with the contents of the initial client config.
758
+
759
+ klog .Infof ("Client rotation is on, will bootstrap in background" )
760
+ certConfig , clientConfig , err := bootstrap .LoadClientConfig (s .KubeConfig , s .BootstrapKubeconfig , s .CertDirectory )
761
+ if err != nil {
762
+ return nil , nil , err
763
+ }
764
+
765
+ clientCertificateManager , err := buildClientCertificateManager (certConfig , clientConfig , s .CertDirectory , nodeName )
766
+ if err != nil {
767
+ return nil , nil , err
768
+ }
769
+
770
+ // the rotating transport will use the cert from the cert manager instead of these files
771
+ transportConfig := restclient .AnonymousClientConfig (clientConfig )
772
+ kubeClientConfigOverrides (s , transportConfig )
773
+
774
+ // we set exitAfter to five minutes because we use this client configuration to request new certs - if we are unable
775
+ // to request new certs, we will be unable to continue normal operation. Exiting the process allows a wrapper
776
+ // or the bootstrapping credentials to potentially lay down new initial config.
777
+ closeAllConns , err := kubeletcertificate .UpdateTransport (wait .NeverStop , transportConfig , clientCertificateManager , 5 * time .Minute )
778
+ if err != nil {
779
+ return nil , nil , err
780
+ }
781
+
782
+ klog .V (2 ).Info ("Starting client certificate rotation." )
783
+ clientCertificateManager .Start ()
784
+
785
+ return transportConfig , closeAllConns , nil
786
+ }
787
+
788
+ if len (s .BootstrapKubeconfig ) > 0 {
789
+ if err := bootstrap .LoadClientCert (s .KubeConfig , s .BootstrapKubeconfig , s .CertDirectory , nodeName ); err != nil {
790
+ return nil , nil , err
791
+ }
792
+ }
793
+
794
+ clientConfig , err := clientcmd .NewNonInteractiveDeferredLoadingClientConfig (
795
+ & clientcmd.ClientConfigLoadingRules {ExplicitPath : s .KubeConfig },
796
+ & clientcmd.ConfigOverrides {},
797
+ ).ClientConfig ()
798
+ if err != nil {
799
+ return nil , nil , fmt .Errorf ("invalid kubeconfig: %v" , err )
800
+ }
801
+
802
+ kubeClientConfigOverrides (s , clientConfig )
803
+
804
+ return clientConfig , nil , nil
805
+ }
806
+
807
+ // buildClientCertificateManager creates a certificate manager that will use certConfig to request a client certificate
808
+ // if no certificate is available, or the most recent clientConfig (which is assumed to point to the cert that the manager will
809
+ // write out).
810
+ func buildClientCertificateManager (certConfig , clientConfig * restclient.Config , certDir string , nodeName types.NodeName ) (certificate.Manager , error ) {
811
+ newClientFn := func (current * tls.Certificate ) (certificatesclient.CertificateSigningRequestInterface , error ) {
812
+ // If we have a valid certificate, use that to fetch CSRs. Otherwise use the bootstrap
813
+ // credentials. In the future it would be desirable to change the behavior of bootstrap
814
+ // to always fall back to the external bootstrap credentials when such credentials are
815
+ // provided by a fundamental trust system like cloud VM identity or an HSM module.
816
+ config := certConfig
817
+ if current != nil {
818
+ config = clientConfig
819
+ }
820
+ client , err := clientset .NewForConfig (config )
821
+ if err != nil {
822
+ return nil , err
823
+ }
824
+ return client .CertificatesV1beta1 ().CertificateSigningRequests (), nil
825
+ }
826
+
827
+ return kubeletcertificate .NewKubeletClientCertificateManager (
828
+ certDir ,
829
+ nodeName ,
830
+
831
+ // this preserves backwards compatibility with kubeadm which passes
832
+ // a high powered certificate to the kubelet as --kubeconfig and expects
833
+ // it to be rotated out immediately
834
+ clientConfig .CertData ,
835
+ clientConfig .KeyData ,
836
+
837
+ clientConfig .CertFile ,
838
+ clientConfig .KeyFile ,
839
+ newClientFn ,
840
+ )
841
+ }
842
+
843
+ func kubeClientConfigOverrides (s * options.KubeletServer , clientConfig * restclient.Config ) {
844
+ clientConfig .ContentType = s .ContentType
845
+ // Override kubeconfig qps/burst settings from flags
846
+ clientConfig .QPS = float32 (s .KubeAPIQPS )
847
+ clientConfig .Burst = int (s .KubeAPIBurst )
848
+ }
849
+
774
850
// getNodeName returns the node name according to the cloud provider
775
851
// if cloud provider is specified. Otherwise, returns the hostname of the node.
776
852
func getNodeName (cloud cloudprovider.Interface , hostname string ) (types.NodeName , error ) {
@@ -859,39 +935,6 @@ func InitializeTLS(kf *options.KubeletFlags, kc *kubeletconfiginternal.KubeletCo
859
935
return tlsOptions , nil
860
936
}
861
937
862
- func kubeconfigClientConfig (s * options.KubeletServer ) (* restclient.Config , error ) {
863
- return clientcmd .NewNonInteractiveDeferredLoadingClientConfig (
864
- & clientcmd.ClientConfigLoadingRules {ExplicitPath : s .KubeConfig },
865
- & clientcmd.ConfigOverrides {},
866
- ).ClientConfig ()
867
- }
868
-
869
- // createClientConfig creates a client configuration from the command line arguments.
870
- // If --kubeconfig is explicitly set, it will be used.
871
- func createClientConfig (s * options.KubeletServer ) (* restclient.Config , error ) {
872
- if s .BootstrapKubeconfig != "" || len (s .KubeConfig ) > 0 {
873
- return kubeconfigClientConfig (s )
874
- } else {
875
- return nil , fmt .Errorf ("createClientConfig called in standalone mode" )
876
- }
877
- }
878
-
879
- // createAPIServerClientConfig generates a client.Config from command line flags
880
- // via createClientConfig and then injects chaos into the configuration via addChaosToClientConfig.
881
- func createAPIServerClientConfig (s * options.KubeletServer ) (* restclient.Config , error ) {
882
- clientConfig , err := createClientConfig (s )
883
- if err != nil {
884
- return nil , err
885
- }
886
-
887
- clientConfig .ContentType = s .ContentType
888
- // Override kubeconfig qps/burst settings from flags
889
- clientConfig .QPS = float32 (s .KubeAPIQPS )
890
- clientConfig .Burst = int (s .KubeAPIBurst )
891
-
892
- return clientConfig , nil
893
- }
894
-
895
938
// RunKubelet is responsible for setting up and running a kubelet. It is used in three different applications:
896
939
// 1 Integration tests
897
940
// 2 Kubelet binary
0 commit comments