diff --git a/pkg/controller/populators/BUILD.bazel b/pkg/controller/populators/BUILD.bazel index 8d25206a92..7d20d81395 100644 --- a/pkg/controller/populators/BUILD.bazel +++ b/pkg/controller/populators/BUILD.bazel @@ -24,11 +24,13 @@ go_library( "//staging/src/kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1:go_default_library", "//staging/src/kubevirt.io/containerized-data-importer-api/pkg/apis/forklift/v1beta1:go_default_library", "//vendor/github.com/go-logr/logr:go_default_library", + "//vendor/github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1:go_default_library", "//vendor/github.com/pkg/errors:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/storage/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", @@ -37,8 +39,10 @@ go_library( "//vendor/k8s.io/utils/ptr:go_default_library", "//vendor/sigs.k8s.io/controller-runtime/pkg/client:go_default_library", "//vendor/sigs.k8s.io/controller-runtime/pkg/controller:go_default_library", + "//vendor/sigs.k8s.io/controller-runtime/pkg/event:go_default_library", "//vendor/sigs.k8s.io/controller-runtime/pkg/handler:go_default_library", "//vendor/sigs.k8s.io/controller-runtime/pkg/manager:go_default_library", + "//vendor/sigs.k8s.io/controller-runtime/pkg/predicate:go_default_library", "//vendor/sigs.k8s.io/controller-runtime/pkg/reconcile:go_default_library", "//vendor/sigs.k8s.io/controller-runtime/pkg/source:go_default_library", ], diff --git a/pkg/controller/populators/clone-populator.go b/pkg/controller/populators/clone-populator.go index 3ec9b3849f..8f74602f0f 100644 --- a/pkg/controller/populators/clone-populator.go +++ b/pkg/controller/populators/clone-populator.go @@ -23,16 +23,23 @@ import ( "time" "github.com/go-logr/logr" + snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1" corev1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1" "kubevirt.io/containerized-data-importer/pkg/controller/clone" @@ -81,6 +88,103 @@ type ClonePopulatorReconciler struct { multiTokenValidator *cc.MultiTokenValidator } +var supportedCloneSources = map[string]client.Object{ + "VolumeSnapshot": &snapshotv1.VolumeSnapshot{}, + "PersistentVolumeClaim": &corev1.PersistentVolumeClaim{}, +} + +func isSourceReady(e event.UpdateEvent) bool { + switch objNew := e.ObjectNew.(type) { + case *snapshotv1.VolumeSnapshot: + objOld, ok := e.ObjectOld.(*snapshotv1.VolumeSnapshot) + if !ok { + return false + } + return !cc.IsSnapshotReady(objOld) && cc.IsSnapshotReady(objNew) + + case *corev1.PersistentVolumeClaim: + objOld, ok := e.ObjectOld.(*corev1.PersistentVolumeClaim) + if !ok { + return false + } + return !cc.IsBound(objOld) && cc.IsBound(objNew) + } + return false +} + +func addCloneSourceWatches(mgr manager.Manager, c controller.Controller, log logr.Logger) error { + getKey := func(kind, namespace, name string) string { + return kind + "/" + namespace + "/" + name + } + + cloneSourceList := make(map[string]client.Object, len(supportedCloneSources)) + for k, v := range supportedCloneSources { + cloneSourceList[k] = v + } + + if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil { + if meta.IsNoMatchError(err) { + // Remove VolumeSnapshot from supported sources if it's not present in the cluster + delete(cloneSourceList, "VolumeSnapshot") + } else if !cc.IsErrCacheNotStarted(err) { + return err + } + } + + indexingKey := "spec.source" + + // Indexing field for clone sources + if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &cdiv1.VolumeCloneSource{}, indexingKey, func(obj client.Object) []string { + cloneSource := obj.(*cdiv1.VolumeCloneSource) + sourceName := cloneSource.Spec.Source.Name + sourceNamespace := cloneSource.Namespace + sourceKind := cloneSource.Spec.Source.Kind + + if _, supported := cloneSourceList[sourceKind]; !supported || sourceName == "" { + return nil + } + + ns := cc.GetNamespace(sourceNamespace, obj.GetNamespace()) + return []string{getKey(sourceKind, ns, sourceName)} + }); err != nil { + return err + } + + // Generic mapper function for supported clone sources + genericSourceMapper := func(sourceKind string) handler.MapFunc { + return func(ctx context.Context, obj client.Object) (reqs []reconcile.Request) { + var volumeCloneSources cdiv1.VolumeCloneSourceList + matchingFields := client.MatchingFields{indexingKey: getKey(sourceKind, obj.GetNamespace(), obj.GetName())} + + if err := mgr.GetClient().List(ctx, &volumeCloneSources, matchingFields); err != nil { + log.Error(err, "Failed to list VolumeCloneSources") + return nil + } + + for _, cs := range volumeCloneSources.Items { + reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cs.Namespace, Name: cs.Name}}) + } + return reqs + } + } + + // Register watches for all supported clone source types + for kind, obj := range cloneSourceList { + if err := c.Watch(source.Kind(mgr.GetCache(), obj, + handler.EnqueueRequestsFromMapFunc(genericSourceMapper(kind)), + predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { return true }, + DeleteFunc: func(e event.DeleteEvent) bool { return false }, + UpdateFunc: func(e event.UpdateEvent) bool { return isSourceReady(e) }, + }, + )); err != nil { + return err + } + } + + return nil +} + // NewClonePopulator creates a new instance of the clone-populator controller func NewClonePopulator( ctx context.Context, @@ -131,6 +235,10 @@ func NewClonePopulator( return nil, err } + if err := addCloneSourceWatches(mgr, clonePopulator, log); err != nil { + return nil, err + } + if err := planner.AddCoreWatches(reconciler.log); err != nil { return nil, err }