Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clone populator: Add clone source watches #3639

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/controller/populators/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ go_library(
"//staging/src/kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1:go_default_library",
"//staging/src/kubevirt.io/containerized-data-importer-api/pkg/apis/forklift/v1beta1:go_default_library",
"//vendor/github.com/go-logr/logr:go_default_library",
"//vendor/github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/storage/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
Expand All @@ -37,8 +39,10 @@ go_library(
"//vendor/k8s.io/utils/ptr:go_default_library",
"//vendor/sigs.k8s.io/controller-runtime/pkg/client:go_default_library",
"//vendor/sigs.k8s.io/controller-runtime/pkg/controller:go_default_library",
"//vendor/sigs.k8s.io/controller-runtime/pkg/event:go_default_library",
"//vendor/sigs.k8s.io/controller-runtime/pkg/handler:go_default_library",
"//vendor/sigs.k8s.io/controller-runtime/pkg/manager:go_default_library",
"//vendor/sigs.k8s.io/controller-runtime/pkg/predicate:go_default_library",
"//vendor/sigs.k8s.io/controller-runtime/pkg/reconcile:go_default_library",
"//vendor/sigs.k8s.io/controller-runtime/pkg/source:go_default_library",
],
Expand Down
95 changes: 95 additions & 0 deletions pkg/controller/populators/clone-populator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,23 @@ import (
"time"

"github.com/go-logr/logr"
snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"

corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
"kubevirt.io/containerized-data-importer/pkg/controller/clone"
Expand Down Expand Up @@ -81,6 +88,90 @@ type ClonePopulatorReconciler struct {
multiTokenValidator *cc.MultiTokenValidator
}

var supportedCloneSources = map[string]client.Object{
"VolumeSnapshot": &snapshotv1.VolumeSnapshot{},
"PersistentVolumeClaim": &corev1.PersistentVolumeClaim{},
}

func isSourceReady(obj client.Object) bool {
switch obj := obj.(type) {
case *snapshotv1.VolumeSnapshot:
return cc.IsSnapshotReady(obj)
case *corev1.PersistentVolumeClaim:
return cc.IsBound(obj)
}
return false
}

func addCloneSourceWatches(mgr manager.Manager, c controller.Controller, log logr.Logger) error {
getKey := func(kind, namespace, name string) string {
return kind + "/" + namespace + "/" + name
}

if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I don't know if this is a real world issue, but, if there is not snapshot support in the cluster, you'd back out and not set up a PVC watch

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered it but I don't know how much I should worry about this case... Don't mind handling it if necessary but code might become more complex.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not so complex:

if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
if meta.IsNoMatchError(err) {
// Back out if there's no point to attempt watch
return nil
}
if !cc.IsErrCacheNotStarted(err) {
return err
}
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By complex I mean that it requires duplicating the watches and mappers instead of having a list of supported sources, but yeah I'll change it if we prefer to handle this error.

if meta.IsNoMatchError(err) {
// Back out if there's no point to attempt watch
return nil
}
if !cc.IsErrCacheNotStarted(err) {
return err
}
}

indexingKey := "spec.source"

// Indexing field for clone sources
if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &cdiv1.VolumeCloneSource{}, indexingKey, func(obj client.Object) []string {
cloneSource := obj.(*cdiv1.VolumeCloneSource)
sourceName := cloneSource.Spec.Source.Name
sourceNamespace := cloneSource.Namespace
sourceKind := cloneSource.Spec.Source.Kind

if _, supported := supportedCloneSources[sourceKind]; !supported || sourceName == "" {
return nil
}

ns := cc.GetNamespace(sourceNamespace, obj.GetNamespace())
return []string{getKey(sourceKind, ns, sourceName)}
}); err != nil {
return err
}

// Generic mapper function for supported clone sources
genericSourceMapper := func(sourceKind string) handler.MapFunc {
return func(ctx context.Context, obj client.Object) (reqs []reconcile.Request) {
var cloneSources cdiv1.VolumeCloneSourceList
matchingFields := client.MatchingFields{indexingKey: getKey(sourceKind, obj.GetNamespace(), obj.GetName())}

if err := mgr.GetClient().List(ctx, &cloneSources, matchingFields); err != nil {
log.Error(err, "Failed to list VolumeCloneSources")
return nil
}

for _, cs := range cloneSources.Items {
reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cs.Namespace, Name: cs.Name}})
}
return reqs
}
}

// Register watches for all supported clone source types
for kind, obj := range supportedCloneSources {
if err := c.Watch(source.Kind(mgr.GetCache(), obj,
handler.EnqueueRequestsFromMapFunc(genericSourceMapper(kind)),
predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool { return true },
DeleteFunc: func(e event.DeleteEvent) bool { return false },
UpdateFunc: func(e event.UpdateEvent) bool { return isSourceReady(obj) },
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will requeue for any readyToUse snapshot. Instead, what you're after is a single case where readyToUse switches from false to true, so something like

UpdateFunc: func(e event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool {
return !reflect.DeepEqual(e.ObjectOld.Status, e.ObjectNew.Status) ||
!reflect.DeepEqual(e.ObjectOld.Labels, e.ObjectNew.Labels)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that we still want to requeue any volume snapshot update even after it becomes ready? This would only allow for one single update requeue.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The controller requeues the request if there's an error/some other reason. You only have to catch the transition

},
)); err != nil {
return err
}
}

return nil
}

// NewClonePopulator creates a new instance of the clone-populator controller
func NewClonePopulator(
ctx context.Context,
Expand Down Expand Up @@ -131,6 +222,10 @@ func NewClonePopulator(
return nil, err
}

if err := addCloneSourceWatches(mgr, clonePopulator, log); err != nil {
return nil, err
}

if err := planner.AddCoreWatches(reconciler.log); err != nil {
return nil, err
}
Expand Down