diff --git a/internal/controller/platform/standalone/nimservice.go b/internal/controller/platform/standalone/nimservice.go index d564f3ab..872b18c0 100644 --- a/internal/controller/platform/standalone/nimservice.go +++ b/internal/controller/platform/standalone/nimservice.go @@ -23,6 +23,7 @@ import ( appsv1alpha1 "github.com/NVIDIA/k8s-nim-operator/api/apps/v1alpha1" "github.com/NVIDIA/k8s-nim-operator/internal/conditions" "github.com/NVIDIA/k8s-nim-operator/internal/render" + rendertypes "github.com/NVIDIA/k8s-nim-operator/internal/render/types" "github.com/NVIDIA/k8s-nim-operator/internal/shared" "github.com/NVIDIA/k8s-nim-operator/internal/utils" "github.com/go-logr/logr" @@ -33,6 +34,7 @@ import ( networkingv1 "k8s.io/api/networking/v1" rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/errors" + apiResource "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -189,9 +191,21 @@ func (r *NIMServiceReconciler) reconcileNIMService(ctx context.Context, nimServi } deploymentParams.Env = append(deploymentParams.Env, profileEnv) + // Retrieve and set profile details from NIMCache + profile, err := r.getNIMCacheProfile(ctx, nimService, modelProfile) + if err != nil { + logger.Error(err, "Failed to get cached NIM profile") + return ctrl.Result{}, err + } + + // Auto assign GPU resources in case of the optimized profile + if profile != nil { + if err := r.assignGPUResources(ctx, nimService, profile, deploymentParams); err != nil { + return ctrl.Result{}, err + } + } + // TODO: assign GPU resources and node selector that is required for the selected profile - // TODO: assign GPU resources - // TODO: update the node selector } // Sync deployment @@ -413,3 +427,114 @@ func (r *NIMServiceReconciler) reconcilePVC(ctx context.Context, nimService *app } return &nimService.Spec.Storage.PVC, nil } + +// getNIMCacheProfile returns model profile info from the NIM cache instance +func (r *NIMServiceReconciler) getNIMCacheProfile(ctx context.Context, nimService *appsv1alpha1.NIMService, profile string) (*appsv1alpha1.NIMProfile, error) { + logger := log.FromContext(ctx) + + if nimService.GetNIMCacheName() == "" { + // NIM cache is not used + return nil, nil + } + + // Lookup NIMCache instance in the same namespace as the NIMService instance + nimCache := &appsv1alpha1.NIMCache{} + if err := r.Get(ctx, types.NamespacedName{Name: nimService.GetNIMCacheName(), Namespace: nimService.Namespace}, nimCache); err != nil { + logger.Error(err, "unable to fetch nimcache", "nimcache", nimService.GetNIMCacheName(), "nimservice", nimService.Name) + return nil, err + } + + // Get the status of NIMCache + if nimCache.Status.State != appsv1alpha1.NimCacheStatusReady { + return nil, fmt.Errorf("nimcache %s is not ready, nimservice %s", nimCache.GetName(), nimService.GetName()) + } + + for _, cachedProfile := range nimCache.Status.Profiles { + if cachedProfile.Name == profile { + return &cachedProfile, nil + } + } + + // If the specified profile is not cached, return nil + return nil, nil +} + +// getTensorParallelismByProfile returns the value of tensor parallelism parameter in the given NIM profile +func (r *NIMServiceReconciler) getTensorParallelismByProfile(ctx context.Context, profile *appsv1alpha1.NIMProfile) (string, error) { + // List of possible keys for tensor parallelism + possibleKeys := []string{"tensorParallelism", "tp"} + + tensorParallelism := "" + + // Iterate through possible keys and return the first valid value + for _, key := range possibleKeys { + if value, exists := profile.Config[key]; exists { + tensorParallelism = value + break + } + } + + return tensorParallelism, nil +} + +// assignGPUResources automatically assigns GPU resources to the NIMService based on the provided profile, +// but retains any user-specified GPU resources if they are explicitly provided. +// +// This function retrieves the tensor parallelism (TP) value from the provided profile config to determine +// the number of GPUs to be allocated. If the TP value is defined and no GPU resources have been +// explicitly provided by the user, the function allocates GPUs according to the TP value. +// If the TP value is not present, the function defaults to allocating 1 GPU. +func (r *NIMServiceReconciler) assignGPUResources(ctx context.Context, nimService *appsv1alpha1.NIMService, profile *appsv1alpha1.NIMProfile, deploymentParams *rendertypes.DeploymentParams) error { + logger := log.FromContext(ctx) + + // TODO: Make the resource name configurable + const gpuResourceName = corev1.ResourceName("nvidia.com/gpu") + + // Check if the user has already provided a GPU resource quantity in the requests or limits + if deploymentParams.Resources != nil { + if _, gpuRequested := deploymentParams.Resources.Requests[gpuResourceName]; gpuRequested { + logger.V(2).Info("User has provided GPU resource requests, skipping auto-assignment", "gpuResource", gpuResourceName) + return nil + } + if _, gpuLimit := deploymentParams.Resources.Limits[gpuResourceName]; gpuLimit { + logger.V(2).Info("User has provided GPU resource limits, skipping auto-assignment", "gpuResource", gpuResourceName) + return nil + } + } + + // If no user-provided GPU resource is found, proceed with auto-assignment + // Get tensorParallelism from the profile + tensorParallelism, err := r.getTensorParallelismByProfile(ctx, profile) + if err != nil { + logger.Error(err, "Failed to retrieve tensorParallelism") + return err + } + + // Initialize the Resources field if not already initialized + if deploymentParams.Resources == nil { + deploymentParams.Resources = &corev1.ResourceRequirements{ + Requests: corev1.ResourceList{}, + Limits: corev1.ResourceList{}, + } + } + + // Assign GPU resources based on tensorParallelism, or default to 1 GPU if tensorParallelism is not available + gpuQuantity := apiResource.MustParse("1") // Default to 1 GPU + + if tensorParallelism != "" { + gpuQuantity, err = apiResource.ParseQuantity(tensorParallelism) + if err != nil { + return fmt.Errorf("failed to parse tensorParallelism: %w", err) + } + + logger.V(2).Info("Auto-assigning GPU resources based on tensorParallelism", "tensorParallelism", tensorParallelism, "gpuQuantity", gpuQuantity.String()) + } else { + logger.V(2).Info("tensorParallelism not found, assigning 1 GPU by default", "Profile", profile.Name) + } + + // Assign the GPU quantity for both requests and limits + deploymentParams.Resources.Requests[gpuResourceName] = gpuQuantity + deploymentParams.Resources.Limits[gpuResourceName] = gpuQuantity + + return nil +} diff --git a/internal/controller/platform/standalone/nimservice_test.go b/internal/controller/platform/standalone/nimservice_test.go index 8245c75c..58c0fdb3 100644 --- a/internal/controller/platform/standalone/nimservice_test.go +++ b/internal/controller/platform/standalone/nimservice_test.go @@ -30,6 +30,7 @@ import ( appsv1alpha1 "github.com/NVIDIA/k8s-nim-operator/api/apps/v1alpha1" "github.com/NVIDIA/k8s-nim-operator/internal/conditions" "github.com/NVIDIA/k8s-nim-operator/internal/render" + rendertypes "github.com/NVIDIA/k8s-nim-operator/internal/render/types" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" appsv1 "k8s.io/api/apps/v1" @@ -38,6 +39,7 @@ import ( networkingv1 "k8s.io/api/networking/v1" rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/resource" + apiResource "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -72,6 +74,7 @@ var _ = Describe("NIMServiceReconciler for a standalone platform", func() { reconciler *NIMServiceReconciler scheme *runtime.Scheme nimService *appsv1alpha1.NIMService + nimCache *appsv1alpha1.NIMCache volumeMounts []corev1.VolumeMount volumes []corev1.Volume ) @@ -86,6 +89,7 @@ var _ = Describe("NIMServiceReconciler for a standalone platform", func() { client = fake.NewClientBuilder().WithScheme(scheme). WithStatusSubresource(&appsv1alpha1.NIMService{}). + WithStatusSubresource(&appsv1alpha1.NIMCache{}). Build() boolTrue := true cwd, err := os.Getwd() @@ -269,7 +273,7 @@ var _ = Describe("NIMServiceReconciler for a standalone platform", func() { MountPath: "/dev/shm", }, } - NIMCache := &appsv1alpha1.NIMCache{ + nimCache = &appsv1alpha1.NIMCache{ ObjectMeta: metav1.ObjectMeta{ Name: "test-nimcache", Namespace: "default", @@ -281,9 +285,13 @@ var _ = Describe("NIMServiceReconciler for a standalone platform", func() { Status: appsv1alpha1.NIMCacheStatus{ State: appsv1alpha1.NimCacheStatusReady, PVC: pvcName, + Profiles: []appsv1alpha1.NIMProfile{{ + Name: "test-profile", + Config: map[string]string{"tp": "4"}}, + }, }, } - _ = client.Create(context.TODO(), NIMCache) + _ = client.Create(context.TODO(), nimCache) pvc := &corev1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ Name: pvcName, @@ -309,6 +317,14 @@ var _ = Describe("NIMServiceReconciler for a standalone platform", func() { }, } _ = client.Delete(context.TODO(), nimService) + + // Ensure that nimCache status is ready before each test + nimCache.Status = appsv1alpha1.NIMCacheStatus{ + State: appsv1alpha1.NimCacheStatusReady, + } + + // Update nimCache status + Expect(client.Status().Update(context.TODO(), nimCache)).To(Succeed()) }) Describe("Reconcile", func() { @@ -525,4 +541,133 @@ var _ = Describe("NIMServiceReconciler for a standalone platform", func() { Expect(msg).To(Equal(fmt.Sprintf("deployment %q successfully rolled out\n", deployment.Name))) }) }) + + Describe("getNIMCacheProfile", func() { + It("should return nil when NIMCache is not used", func() { + nimService.Spec.Storage.NIMCache.Name = "" + profile, err := reconciler.getNIMCacheProfile(context.TODO(), nimService, "some-profile") + Expect(err).To(BeNil()) + Expect(profile).To(BeNil()) + }) + + It("should return an error when NIMCache is not found", func() { + nimService.Spec.Storage.NIMCache.Name = "non-existent-cache" + _, err := reconciler.getNIMCacheProfile(context.TODO(), nimService, "some-profile") + Expect(err).To(HaveOccurred()) + }) + + It("should return an error when NIMCache is not ready", func() { + nimService.Spec.Storage.NIMCache.Name = "test-nimcache" + nimCache.Status = appsv1alpha1.NIMCacheStatus{ + State: appsv1alpha1.NimCacheStatusPending, + } + + // Update nimCache status + Expect(reconciler.Client.Status().Update(context.TODO(), nimCache)).To(Succeed()) + _, err := reconciler.getNIMCacheProfile(context.TODO(), nimService, "test-profile") + Expect(err).To(HaveOccurred()) + }) + + It("should return nil when NIMCache profile is not found", func() { + nimService.Spec.Storage.NIMCache.Name = "test-nimcache" + profile, err := reconciler.getNIMCacheProfile(context.TODO(), nimService, "non-existent-profile") + Expect(err).To(BeNil()) + Expect(profile).To(BeNil()) + }) + + It("should return the profile if found in NIMCache", func() { + nimService.Spec.Storage.NIMCache.Name = "test-nimcache" + profile, err := reconciler.getNIMCacheProfile(context.TODO(), nimService, "test-profile") + Expect(err).To(BeNil()) + Expect(profile.Name).To(Equal("test-profile")) + }) + }) + + Describe("getTensorParallelismByProfile", func() { + It("should return tensor parallelism value if exists", func() { + profile := &appsv1alpha1.NIMProfile{ + Name: "test-profile", + Config: map[string]string{"tp": "4"}, + } + + tensorParallelism, err := reconciler.getTensorParallelismByProfile(context.TODO(), profile) + Expect(err).To(BeNil()) + Expect(tensorParallelism).To(Equal("4")) + }) + + It("should return empty string if tensor parallelism does not exist", func() { + profile := &appsv1alpha1.NIMProfile{ + Name: "test-profile", + Config: map[string]string{}, + } + tensorParallelism, err := reconciler.getTensorParallelismByProfile(context.TODO(), profile) + Expect(err).To(BeNil()) + Expect(tensorParallelism).To(BeEmpty()) + }) + }) + + Describe("assignGPUResources", func() { + It("should retain user-provided GPU resources and not override them", func() { + profile := &appsv1alpha1.NIMProfile{ + Name: "test-profile", + Config: map[string]string{"tp": "4"}, + } + + // Initialize deployment params with user-provided GPU resources + deploymentParams := &rendertypes.DeploymentParams{ + Resources: &corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceName("nvidia.com/gpu"): apiResource.MustParse("8"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceName("nvidia.com/gpu"): apiResource.MustParse("8"), + }, + }, + } + + Expect(reconciler.assignGPUResources(context.TODO(), nimService, profile, deploymentParams)).To(Succeed()) + + // Ensure the user-provided GPU resources (8) are retained and not overridden + Expect(deploymentParams.Resources.Requests).To(HaveKeyWithValue(corev1.ResourceName("nvidia.com/gpu"), apiResource.MustParse("8"))) + Expect(deploymentParams.Resources.Limits).To(HaveKeyWithValue(corev1.ResourceName("nvidia.com/gpu"), apiResource.MustParse("8"))) + }) + + It("should assign GPU resources when tensor parallelism is provided", func() { + profile := &appsv1alpha1.NIMProfile{ + Name: "test-profile", + Config: map[string]string{"tp": "4"}, + } + // Initialize deployment params with no user-provided GPU resources + deploymentParams := &rendertypes.DeploymentParams{} + + Expect(reconciler.assignGPUResources(context.TODO(), nimService, profile, deploymentParams)).To(Succeed()) + Expect(deploymentParams.Resources.Requests).To(HaveKeyWithValue(corev1.ResourceName("nvidia.com/gpu"), apiResource.MustParse("4"))) + Expect(deploymentParams.Resources.Limits).To(HaveKeyWithValue(corev1.ResourceName("nvidia.com/gpu"), apiResource.MustParse("4"))) + }) + + It("should assign 1 GPU resource if tensor parallelism is not provided", func() { + profile := &appsv1alpha1.NIMProfile{ + Name: "test-profile", + Config: map[string]string{}, + } + // Initialize deployment params with no user-provided GPU resources + deploymentParams := &rendertypes.DeploymentParams{} + + Expect(reconciler.assignGPUResources(context.TODO(), nimService, profile, deploymentParams)).To(Succeed()) + Expect(deploymentParams.Resources.Requests).To(HaveKeyWithValue(corev1.ResourceName("nvidia.com/gpu"), apiResource.MustParse("1"))) + Expect(deploymentParams.Resources.Limits).To(HaveKeyWithValue(corev1.ResourceName("nvidia.com/gpu"), apiResource.MustParse("1"))) + }) + + It("should return an error if tensor parallelism cannot be parsed", func() { + profile := &appsv1alpha1.NIMProfile{ + Name: "test-profile", + Config: map[string]string{"tp": "invalid"}, + } + // Initialize deployment params with no user-provided GPU resources + deploymentParams := &rendertypes.DeploymentParams{} + + err := reconciler.assignGPUResources(context.TODO(), nimService, profile, deploymentParams) + Expect(err).To(HaveOccurred()) + }) + }) })