Skip to content

Commit

Permalink
Auto assign GPUs for NIM deployments when a cached optimized profile …
Browse files Browse the repository at this point in the history
…is specified (#142)

* Auto assign GPUs for NIM deployments when a cached optimized profile is specified

Optimized NIM profile config contains a field called tensorParallelism which indicates required number of GPUs. Use that to auto-assign
GPU resources for NIM deployments.

Signed-off-by: Shiva Krishna, Merla <[email protected]>

* Don't override GPU resources if specified by the user

Signed-off-by: Shiva Krishna, Merla <[email protected]>

---------

Signed-off-by: Shiva Krishna, Merla <[email protected]>
  • Loading branch information
shivamerla authored Sep 16, 2024
1 parent 2923194 commit de5a37f
Show file tree
Hide file tree
Showing 2 changed files with 274 additions and 4 deletions.
129 changes: 127 additions & 2 deletions internal/controller/platform/standalone/nimservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
appsv1alpha1 "github.com/NVIDIA/k8s-nim-operator/api/apps/v1alpha1"
"github.com/NVIDIA/k8s-nim-operator/internal/conditions"
"github.com/NVIDIA/k8s-nim-operator/internal/render"
rendertypes "github.com/NVIDIA/k8s-nim-operator/internal/render/types"
"github.com/NVIDIA/k8s-nim-operator/internal/shared"
"github.com/NVIDIA/k8s-nim-operator/internal/utils"
"github.com/go-logr/logr"
Expand All @@ -33,6 +34,7 @@ import (
networkingv1 "k8s.io/api/networking/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/errors"
apiResource "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -189,9 +191,21 @@ func (r *NIMServiceReconciler) reconcileNIMService(ctx context.Context, nimServi
}
deploymentParams.Env = append(deploymentParams.Env, profileEnv)

// Retrieve and set profile details from NIMCache
profile, err := r.getNIMCacheProfile(ctx, nimService, modelProfile)
if err != nil {
logger.Error(err, "Failed to get cached NIM profile")
return ctrl.Result{}, err
}

// Auto assign GPU resources in case of the optimized profile
if profile != nil {
if err := r.assignGPUResources(ctx, nimService, profile, deploymentParams); err != nil {
return ctrl.Result{}, err
}
}

// TODO: assign GPU resources and node selector that is required for the selected profile
// TODO: assign GPU resources
// TODO: update the node selector
}

// Sync deployment
Expand Down Expand Up @@ -413,3 +427,114 @@ func (r *NIMServiceReconciler) reconcilePVC(ctx context.Context, nimService *app
}
return &nimService.Spec.Storage.PVC, nil
}

// getNIMCacheProfile returns model profile info from the NIM cache instance
func (r *NIMServiceReconciler) getNIMCacheProfile(ctx context.Context, nimService *appsv1alpha1.NIMService, profile string) (*appsv1alpha1.NIMProfile, error) {
logger := log.FromContext(ctx)

if nimService.GetNIMCacheName() == "" {
// NIM cache is not used
return nil, nil
}

// Lookup NIMCache instance in the same namespace as the NIMService instance
nimCache := &appsv1alpha1.NIMCache{}
if err := r.Get(ctx, types.NamespacedName{Name: nimService.GetNIMCacheName(), Namespace: nimService.Namespace}, nimCache); err != nil {
logger.Error(err, "unable to fetch nimcache", "nimcache", nimService.GetNIMCacheName(), "nimservice", nimService.Name)
return nil, err
}

// Get the status of NIMCache
if nimCache.Status.State != appsv1alpha1.NimCacheStatusReady {
return nil, fmt.Errorf("nimcache %s is not ready, nimservice %s", nimCache.GetName(), nimService.GetName())
}

for _, cachedProfile := range nimCache.Status.Profiles {
if cachedProfile.Name == profile {
return &cachedProfile, nil
}
}

// If the specified profile is not cached, return nil
return nil, nil
}

// getTensorParallelismByProfile returns the value of tensor parallelism parameter in the given NIM profile
func (r *NIMServiceReconciler) getTensorParallelismByProfile(ctx context.Context, profile *appsv1alpha1.NIMProfile) (string, error) {
// List of possible keys for tensor parallelism
possibleKeys := []string{"tensorParallelism", "tp"}

tensorParallelism := ""

// Iterate through possible keys and return the first valid value
for _, key := range possibleKeys {
if value, exists := profile.Config[key]; exists {
tensorParallelism = value
break
}
}

return tensorParallelism, nil
}

// assignGPUResources automatically assigns GPU resources to the NIMService based on the provided profile,
// but retains any user-specified GPU resources if they are explicitly provided.
//
// This function retrieves the tensor parallelism (TP) value from the provided profile config to determine
// the number of GPUs to be allocated. If the TP value is defined and no GPU resources have been
// explicitly provided by the user, the function allocates GPUs according to the TP value.
// If the TP value is not present, the function defaults to allocating 1 GPU.
func (r *NIMServiceReconciler) assignGPUResources(ctx context.Context, nimService *appsv1alpha1.NIMService, profile *appsv1alpha1.NIMProfile, deploymentParams *rendertypes.DeploymentParams) error {
logger := log.FromContext(ctx)

// TODO: Make the resource name configurable
const gpuResourceName = corev1.ResourceName("nvidia.com/gpu")

// Check if the user has already provided a GPU resource quantity in the requests or limits
if deploymentParams.Resources != nil {
if _, gpuRequested := deploymentParams.Resources.Requests[gpuResourceName]; gpuRequested {
logger.V(2).Info("User has provided GPU resource requests, skipping auto-assignment", "gpuResource", gpuResourceName)
return nil
}
if _, gpuLimit := deploymentParams.Resources.Limits[gpuResourceName]; gpuLimit {
logger.V(2).Info("User has provided GPU resource limits, skipping auto-assignment", "gpuResource", gpuResourceName)
return nil
}
}

// If no user-provided GPU resource is found, proceed with auto-assignment
// Get tensorParallelism from the profile
tensorParallelism, err := r.getTensorParallelismByProfile(ctx, profile)
if err != nil {
logger.Error(err, "Failed to retrieve tensorParallelism")
return err
}

// Initialize the Resources field if not already initialized
if deploymentParams.Resources == nil {
deploymentParams.Resources = &corev1.ResourceRequirements{
Requests: corev1.ResourceList{},
Limits: corev1.ResourceList{},
}
}

// Assign GPU resources based on tensorParallelism, or default to 1 GPU if tensorParallelism is not available
gpuQuantity := apiResource.MustParse("1") // Default to 1 GPU

if tensorParallelism != "" {
gpuQuantity, err = apiResource.ParseQuantity(tensorParallelism)
if err != nil {
return fmt.Errorf("failed to parse tensorParallelism: %w", err)
}

logger.V(2).Info("Auto-assigning GPU resources based on tensorParallelism", "tensorParallelism", tensorParallelism, "gpuQuantity", gpuQuantity.String())
} else {
logger.V(2).Info("tensorParallelism not found, assigning 1 GPU by default", "Profile", profile.Name)
}

// Assign the GPU quantity for both requests and limits
deploymentParams.Resources.Requests[gpuResourceName] = gpuQuantity
deploymentParams.Resources.Limits[gpuResourceName] = gpuQuantity

return nil
}
149 changes: 147 additions & 2 deletions internal/controller/platform/standalone/nimservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
appsv1alpha1 "github.com/NVIDIA/k8s-nim-operator/api/apps/v1alpha1"
"github.com/NVIDIA/k8s-nim-operator/internal/conditions"
"github.com/NVIDIA/k8s-nim-operator/internal/render"
rendertypes "github.com/NVIDIA/k8s-nim-operator/internal/render/types"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
Expand All @@ -38,6 +39,7 @@ import (
networkingv1 "k8s.io/api/networking/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/resource"
apiResource "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -72,6 +74,7 @@ var _ = Describe("NIMServiceReconciler for a standalone platform", func() {
reconciler *NIMServiceReconciler
scheme *runtime.Scheme
nimService *appsv1alpha1.NIMService
nimCache *appsv1alpha1.NIMCache
volumeMounts []corev1.VolumeMount
volumes []corev1.Volume
)
Expand All @@ -86,6 +89,7 @@ var _ = Describe("NIMServiceReconciler for a standalone platform", func() {

client = fake.NewClientBuilder().WithScheme(scheme).
WithStatusSubresource(&appsv1alpha1.NIMService{}).
WithStatusSubresource(&appsv1alpha1.NIMCache{}).
Build()
boolTrue := true
cwd, err := os.Getwd()
Expand Down Expand Up @@ -269,7 +273,7 @@ var _ = Describe("NIMServiceReconciler for a standalone platform", func() {
MountPath: "/dev/shm",
},
}
NIMCache := &appsv1alpha1.NIMCache{
nimCache = &appsv1alpha1.NIMCache{
ObjectMeta: metav1.ObjectMeta{
Name: "test-nimcache",
Namespace: "default",
Expand All @@ -281,9 +285,13 @@ var _ = Describe("NIMServiceReconciler for a standalone platform", func() {
Status: appsv1alpha1.NIMCacheStatus{
State: appsv1alpha1.NimCacheStatusReady,
PVC: pvcName,
Profiles: []appsv1alpha1.NIMProfile{{
Name: "test-profile",
Config: map[string]string{"tp": "4"}},
},
},
}
_ = client.Create(context.TODO(), NIMCache)
_ = client.Create(context.TODO(), nimCache)
pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: pvcName,
Expand All @@ -309,6 +317,14 @@ var _ = Describe("NIMServiceReconciler for a standalone platform", func() {
},
}
_ = client.Delete(context.TODO(), nimService)

// Ensure that nimCache status is ready before each test
nimCache.Status = appsv1alpha1.NIMCacheStatus{
State: appsv1alpha1.NimCacheStatusReady,
}

// Update nimCache status
Expect(client.Status().Update(context.TODO(), nimCache)).To(Succeed())
})

Describe("Reconcile", func() {
Expand Down Expand Up @@ -525,4 +541,133 @@ var _ = Describe("NIMServiceReconciler for a standalone platform", func() {
Expect(msg).To(Equal(fmt.Sprintf("deployment %q successfully rolled out\n", deployment.Name)))
})
})

Describe("getNIMCacheProfile", func() {
It("should return nil when NIMCache is not used", func() {
nimService.Spec.Storage.NIMCache.Name = ""
profile, err := reconciler.getNIMCacheProfile(context.TODO(), nimService, "some-profile")
Expect(err).To(BeNil())
Expect(profile).To(BeNil())
})

It("should return an error when NIMCache is not found", func() {
nimService.Spec.Storage.NIMCache.Name = "non-existent-cache"
_, err := reconciler.getNIMCacheProfile(context.TODO(), nimService, "some-profile")
Expect(err).To(HaveOccurred())
})

It("should return an error when NIMCache is not ready", func() {
nimService.Spec.Storage.NIMCache.Name = "test-nimcache"
nimCache.Status = appsv1alpha1.NIMCacheStatus{
State: appsv1alpha1.NimCacheStatusPending,
}

// Update nimCache status
Expect(reconciler.Client.Status().Update(context.TODO(), nimCache)).To(Succeed())
_, err := reconciler.getNIMCacheProfile(context.TODO(), nimService, "test-profile")
Expect(err).To(HaveOccurred())
})

It("should return nil when NIMCache profile is not found", func() {
nimService.Spec.Storage.NIMCache.Name = "test-nimcache"
profile, err := reconciler.getNIMCacheProfile(context.TODO(), nimService, "non-existent-profile")
Expect(err).To(BeNil())
Expect(profile).To(BeNil())
})

It("should return the profile if found in NIMCache", func() {
nimService.Spec.Storage.NIMCache.Name = "test-nimcache"
profile, err := reconciler.getNIMCacheProfile(context.TODO(), nimService, "test-profile")
Expect(err).To(BeNil())
Expect(profile.Name).To(Equal("test-profile"))
})
})

Describe("getTensorParallelismByProfile", func() {
It("should return tensor parallelism value if exists", func() {
profile := &appsv1alpha1.NIMProfile{
Name: "test-profile",
Config: map[string]string{"tp": "4"},
}

tensorParallelism, err := reconciler.getTensorParallelismByProfile(context.TODO(), profile)
Expect(err).To(BeNil())
Expect(tensorParallelism).To(Equal("4"))
})

It("should return empty string if tensor parallelism does not exist", func() {
profile := &appsv1alpha1.NIMProfile{
Name: "test-profile",
Config: map[string]string{},
}
tensorParallelism, err := reconciler.getTensorParallelismByProfile(context.TODO(), profile)
Expect(err).To(BeNil())
Expect(tensorParallelism).To(BeEmpty())
})
})

Describe("assignGPUResources", func() {
It("should retain user-provided GPU resources and not override them", func() {
profile := &appsv1alpha1.NIMProfile{
Name: "test-profile",
Config: map[string]string{"tp": "4"},
}

// Initialize deployment params with user-provided GPU resources
deploymentParams := &rendertypes.DeploymentParams{
Resources: &corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceName("nvidia.com/gpu"): apiResource.MustParse("8"),
},
Limits: corev1.ResourceList{
corev1.ResourceName("nvidia.com/gpu"): apiResource.MustParse("8"),
},
},
}

Expect(reconciler.assignGPUResources(context.TODO(), nimService, profile, deploymentParams)).To(Succeed())

// Ensure the user-provided GPU resources (8) are retained and not overridden
Expect(deploymentParams.Resources.Requests).To(HaveKeyWithValue(corev1.ResourceName("nvidia.com/gpu"), apiResource.MustParse("8")))
Expect(deploymentParams.Resources.Limits).To(HaveKeyWithValue(corev1.ResourceName("nvidia.com/gpu"), apiResource.MustParse("8")))
})

It("should assign GPU resources when tensor parallelism is provided", func() {
profile := &appsv1alpha1.NIMProfile{
Name: "test-profile",
Config: map[string]string{"tp": "4"},
}
// Initialize deployment params with no user-provided GPU resources
deploymentParams := &rendertypes.DeploymentParams{}

Expect(reconciler.assignGPUResources(context.TODO(), nimService, profile, deploymentParams)).To(Succeed())
Expect(deploymentParams.Resources.Requests).To(HaveKeyWithValue(corev1.ResourceName("nvidia.com/gpu"), apiResource.MustParse("4")))
Expect(deploymentParams.Resources.Limits).To(HaveKeyWithValue(corev1.ResourceName("nvidia.com/gpu"), apiResource.MustParse("4")))
})

It("should assign 1 GPU resource if tensor parallelism is not provided", func() {
profile := &appsv1alpha1.NIMProfile{
Name: "test-profile",
Config: map[string]string{},
}
// Initialize deployment params with no user-provided GPU resources
deploymentParams := &rendertypes.DeploymentParams{}

Expect(reconciler.assignGPUResources(context.TODO(), nimService, profile, deploymentParams)).To(Succeed())
Expect(deploymentParams.Resources.Requests).To(HaveKeyWithValue(corev1.ResourceName("nvidia.com/gpu"), apiResource.MustParse("1")))
Expect(deploymentParams.Resources.Limits).To(HaveKeyWithValue(corev1.ResourceName("nvidia.com/gpu"), apiResource.MustParse("1")))
})

It("should return an error if tensor parallelism cannot be parsed", func() {
profile := &appsv1alpha1.NIMProfile{
Name: "test-profile",
Config: map[string]string{"tp": "invalid"},
}
// Initialize deployment params with no user-provided GPU resources
deploymentParams := &rendertypes.DeploymentParams{}

err := reconciler.assignGPUResources(context.TODO(), nimService, profile, deploymentParams)
Expect(err).To(HaveOccurred())
})
})
})

0 comments on commit de5a37f

Please sign in to comment.