Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add nodeSelector and securityContext for apiserver #2556

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions apiserver/pkg/model/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ func convertSecurityContext(securityCtx *corev1.SecurityContext) *api.SecurityCo
result := &api.SecurityContext{
Privileged: securityCtx.Privileged,
Capabilities: &api.Capabilities{},
RunAsUser: *securityCtx.RunAsUser,
RunAsGroup: *securityCtx.RunAsGroup,
}
if securityCtx.Capabilities != nil {
for _, cap := range securityCtx.Capabilities.Add {
Expand Down Expand Up @@ -423,6 +425,15 @@ func FromKubeToAPIComputeTemplate(configMap *corev1.ConfigMap) *api.ComputeTempl
}
}

val, ok = configMap.Data["node_selector"]
if ok {
err := json.Unmarshal([]byte(val), &runtime.NodeSelector)
if err != nil {
klog.Error("failed to unmarshall node selector for compute template ", runtime.Name, " value ",
runtime.NodeSelector, " error ", err)
}
}

val, ok = configMap.Data["tolerations"]
if ok {
err := json.Unmarshal([]byte(val), &runtime.Tolerations)
Expand Down
12 changes: 10 additions & 2 deletions apiserver/pkg/model/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ var headSpecTest = rayv1api.HeadGroupSpec{
"SYS_PTRACE",
},
},
RunAsUser: ptr.To[int64](int64(1000)),
RunAsGroup: ptr.To[int64](int64(3000)),
},
},
},
Expand All @@ -151,6 +153,7 @@ var configMapWithTolerations = corev1.ConfigMap{
"gpu_accelerator": "",
"memory": "8",
"extended_resources": "{\"vpc.amazonaws.com/efa\": 32}",
"node_selector": "{\"node-role\": \"util\"}",
"name": "head-node-template",
"namespace": "max",
"tolerations": "[{\"key\":\"blah1\",\"operator\":\"Exists\",\"effect\":\"NoExecute\"}]",
Expand Down Expand Up @@ -238,6 +241,8 @@ var workerSpecTest = rayv1api.WorkerGroupSpec{
"SYS_PTRACE",
},
},
RunAsUser: ptr.To[int64](int64(1000)),
RunAsGroup: ptr.To[int64](int64(3000)),
},
},
},
Expand Down Expand Up @@ -527,7 +532,8 @@ func TestPopulateHeadNodeSpec(t *testing.T) {
t.Errorf("failed to convert environment, got %v, expected %v", groupSpec.Environment, expectedHeadEnv)
}
// Cannot use deep equal since protobuf locks copying
if groupSpec.SecurityContext == nil || groupSpec.SecurityContext.Capabilities == nil || len(groupSpec.SecurityContext.Capabilities.Add) != 1 {
if groupSpec.SecurityContext == nil || groupSpec.SecurityContext.Capabilities == nil || len(groupSpec.SecurityContext.Capabilities.Add) != 1 ||
groupSpec.SecurityContext.RunAsUser != 1000 || groupSpec.SecurityContext.RunAsGroup != 3000 {
t.Errorf("failed to convert security context")
}
}
Expand All @@ -550,7 +556,8 @@ func TestPopulateWorkerNodeSpec(t *testing.T) {
if !reflect.DeepEqual(groupSpec.Environment, expectedEnv) {
t.Errorf("failed to convert environment, got %v, expected %v", groupSpec.Environment, expectedEnv)
}
if groupSpec.SecurityContext == nil || groupSpec.SecurityContext.Capabilities == nil || len(groupSpec.SecurityContext.Capabilities.Add) != 1 {
if groupSpec.SecurityContext == nil || groupSpec.SecurityContext.Capabilities == nil || len(groupSpec.SecurityContext.Capabilities.Add) != 1 ||
groupSpec.SecurityContext.RunAsUser != 1000 || groupSpec.SecurityContext.RunAsGroup != 3000 {
t.Errorf("failed to convert security context")
}
}
Expand Down Expand Up @@ -636,6 +643,7 @@ func TestPopulateTemplate(t *testing.T) {
template.ExtendedResources,
"Extended resources mismatch",
)
assert.Equal(t, map[string]string{"node-role": "util"}, template.NodeSelector, "Node selector mismatch")
}

func tolerationToString(toleration *api.PodToleration) string {
Expand Down
27 changes: 25 additions & 2 deletions apiserver/pkg/util/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ func buildHeadPodTemplate(imageVersion string, envs *api.EnvironmentVariables, s
Labels: map[string]string{},
},
Spec: corev1.PodSpec{
Tolerations: []corev1.Toleration{},
NodeSelector: map[string]string{},
Tolerations: []corev1.Toleration{},
Containers: []corev1.Container{
{
Name: "ray-head",
Expand Down Expand Up @@ -297,6 +298,13 @@ func buildHeadPodTemplate(imageVersion string, envs *api.EnvironmentVariables, s
}
}

// Add specific node_selector
if computeRuntime.NodeSelector != nil {
for k, v := range computeRuntime.NodeSelector {
podTemplateSpec.Spec.NodeSelector[k] = v
}
}

// Add specific tollerations
if computeRuntime.Tolerations != nil {
for _, t := range computeRuntime.Tolerations {
Expand Down Expand Up @@ -447,7 +455,8 @@ func buildWorkerPodTemplate(imageVersion string, envs *api.EnvironmentVariables,
Labels: map[string]string{},
},
Spec: corev1.PodSpec{
Tolerations: []corev1.Toleration{},
NodeSelector: map[string]string{},
Tolerations: []corev1.Toleration{},
Containers: []corev1.Container{
{
Name: "ray-worker",
Expand Down Expand Up @@ -591,6 +600,13 @@ func buildWorkerPodTemplate(imageVersion string, envs *api.EnvironmentVariables,
}
}

// Add specific node_selector
if computeRuntime.NodeSelector != nil {
for k, v := range computeRuntime.NodeSelector {
podTemplateSpec.Spec.NodeSelector[k] = v
}
}

// Add specific tollerations
if computeRuntime.Tolerations != nil {
for _, t := range computeRuntime.Tolerations {
Expand Down Expand Up @@ -813,6 +829,8 @@ func buildSecurityContext(securityCtx *api.SecurityContext) *corev1.SecurityCont
result := &corev1.SecurityContext{
Privileged: securityCtx.Privileged,
Capabilities: &corev1.Capabilities{},
RunAsUser: &securityCtx.RunAsUser,
RunAsGroup: &securityCtx.RunAsGroup,
}
if securityCtx.Capabilities != nil {
for _, cap := range securityCtx.Capabilities.Add {
Expand Down Expand Up @@ -846,6 +864,10 @@ func NewComputeTemplate(runtime *api.ComputeTemplate) (*corev1.ConfigMap, error)
if err != nil {
return nil, fmt.Errorf("failed to marshal extended resources: %v", err)
}
nodeSelectorJSON, err := json.Marshal(runtime.NodeSelector)
if err != nil {
return nil, fmt.Errorf("failed to marshal extended resources: %v", err)
}

// Create data map
dmap := map[string]string{
Expand All @@ -856,6 +878,7 @@ func NewComputeTemplate(runtime *api.ComputeTemplate) (*corev1.ConfigMap, error)
"gpu": strconv.FormatUint(uint64(runtime.Gpu), 10),
"gpu_accelerator": runtime.GpuAccelerator,
"extended_resources": string(extendedResourcesJSON),
"node_selector": string(nodeSelectorJSON),
}
// Add tolerations in defined
if runtime.Tolerations != nil && len(runtime.Tolerations) > 0 {
Expand Down
6 changes: 6 additions & 0 deletions apiserver/pkg/util/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ var templateWorker = api.ComputeTemplate{
Memory: 8,
Gpu: 4,
ExtendedResources: map[string]uint32{"vpc.amazonaws.com/efa": 32},
NodeSelector: map[string]string{"node-role": "util"},
Tolerations: []*api.PodToleration{
{
Key: "blah1",
Expand All @@ -279,6 +280,10 @@ var expectedLabels = map[string]string{
"foo": "bar",
}

var expectedNodeSelector = map[string]string{
"node-role": "util",
}

var expectedHeadNodeEnv = []corev1.EnvVar{
{
Name: "MY_POD_IP",
Expand Down Expand Up @@ -640,6 +645,7 @@ func TestBuilWorkerPodTemplate(t *testing.T) {
assert.Equal(t, expectedToleration, podSpec.Spec.Tolerations[0], "failed to propagate tolerations")
assert.Equal(t, "bar", podSpec.Annotations["foo"], "failed to convert annotations")
assert.Equal(t, expectedLabels, podSpec.Labels, "failed to convert labels")
assert.Equal(t, expectedNodeSelector, podSpec.Spec.NodeSelector, "failed to propagate node selector")
assert.True(t, containsEnvValueFrom(podSpec.Spec.Containers[0].Env, "CPU_REQUEST", &corev1.EnvVarSource{ResourceFieldRef: &corev1.ResourceFieldSelector{ContainerName: "ray-worker", Resource: "requests.cpu"}}), "failed to propagate environment variable: CPU_REQUEST")
assert.True(t, containsEnvValueFrom(podSpec.Spec.Containers[0].Env, "CPU_LIMITS", &corev1.EnvVarSource{ResourceFieldRef: &corev1.ResourceFieldSelector{ContainerName: "ray-worker", Resource: "limits.cpu"}}), "failed to propagate environment variable: CPU_LIMITS")
assert.True(t, containsEnvValueFrom(podSpec.Spec.Containers[0].Env, "MEMORY_REQUESTS", &corev1.EnvVarSource{ResourceFieldRef: &corev1.ResourceFieldSelector{ContainerName: "ray-worker", Resource: "requests.memory"}}), "failed to propagate environment variable: MEMORY_REQUESTS")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
env_var_from_decoder,
environment_variables_decoder,
)
from python_apiserver_client.params.securitycontext import (
SecurityContext,
security_context_decoder,
)
from python_apiserver_client.params.headnode import (
ServiceType,
HeadNodeSpec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class HeadNodeSpec:
annotations - optional, annotations for head node
labels - optional, labels for head node
image_pull_policy - optional, head node pull image policy. Default IfNotPresent
security_context - optional, security context for head node
"""

def __init__(
Expand All @@ -56,6 +57,7 @@ def __init__(
annotations: dict[str, str] = None,
labels: dict[str, str] = None,
image_pull_policy: str = None,
security_context = None,
):
"""
Initialization
Expand All @@ -71,6 +73,7 @@ def __init__(
:param annotations: head node annotation
:param labels: labels
:param image_pull_policy: image pull policy
:param security_context: head node security context
"""

self.compute_template = compute_template
Expand All @@ -86,6 +89,7 @@ def __init__(
self.annotations = annotations
self.labels = labels
self.image_pull_policy = image_pull_policy
self.security_context = security_context

def to_string(self) -> str:
"""
Expand Down Expand Up @@ -117,6 +121,8 @@ def to_string(self) -> str:
val = val + "]"
if self.environment is not None:
val = val + f",\n environment = {self.environment.to_string()}"
if self.security_context is not None:
val = val + f",\n security_context = {self.security_context.to_string()}"
if self.annotations is not None:
val = val + f",\n annotations = {str(self.annotations)}"
if self.labels is not None:
Expand Down Expand Up @@ -145,6 +151,8 @@ def to_dict(self) -> dict[str, Any]:
dct["volumes"] = [v.to_dict() for v in self.volumes]
if self.environment is not None:
dct["environment"] = self.environment.to_dict()
if self.security_context is not None:
dct["securityContext"] = self.security_context.to_dict()
if self.annotations is not None:
dct["annotations"] = self.annotations
if self.labels is not None:
Expand Down Expand Up @@ -173,6 +181,10 @@ def head_node_spec_decoder(dct: dict[str, Any]) -> HeadNodeSpec:
environments = None
if "environment" in dct and len(dct.get("environment")) > 0:
environments = environment_variables_decoder(dct.get("environment"))
security_context = None
if "securityContext" in dct and len(dct.get("securityContext")) > 0:
from .securitycontext import security_context_decoder
security_context = security_context_decoder(dct["securityContext"])
return HeadNodeSpec(
compute_template=dct.get("computeTemplate"),
ray_start_params=dct.get("rayStartParams"),
Expand All @@ -184,6 +196,7 @@ def head_node_spec_decoder(dct: dict[str, Any]) -> HeadNodeSpec:
image_pull_secret=dct.get("imagePullSecret", None),
image_pull_policy=dct.get("imagePullPolicy", None),
environment=environments,
security_context=security_context,
annotations=dct.get("annotations", None),
labels=dct.get("labels", None),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import Any


class Capabilities:
def __init__(self, add: list[str], drop: list[str]):
self.add = add
self.drop = drop

def to_string(self) -> str:
return f"add = {self.add}, drop = {self.drop}"

def to_dict(self) -> dict[str, Any]:
return {"add": self.add, "drop": self.drop}


class SecurityContext:
def __init__(self, capabilities: Capabilities = None, privileged: bool = False, runAsUser: int = 500,
runAsGroup: int = 100):
self.capabilities = capabilities
self.privileged = privileged
self.runAsUser = runAsUser
self.runAsGroup = runAsGroup

def to_string(self) -> str:
val = f"privileged = {self.privileged}, runAsUser = {self.runAsUser}, runAsGroup = {self.runAsGroup}"
if self.capabilities is not None:
val = f"capabilities = {self.capabilities.to_string()}, " + val
return val

def to_dict(self) -> dict[str, Any]:
dct = {"privileged": self.privileged, "runAsUser": self.runAsUser, "runAsGroup": self.runAsGroup}
if self.capabilities is not None:
dct["capabilities"] = self.capabilities.to_dict()
return dct


def security_context_decoder(dct: dict[str, Any]):
return SecurityContext(
capabilities=dct.get("capabilities", None),
privileged=dct.get("privileged", False),
runAsUser=dct.get("runAsUser", 500),
runAsGroup=dct.get("runAsGroup", 100),
)
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class Template:
gpus - optional, number of GPUs, default 0
gpu_accelerator - optional, if not defined nvidia.com/gpu is assumed
extended_resources - optional, name and number of the extended resources
node_selector - optional, nodeSelector for pod placing, default none
tolerations - optional, tolerations for pod placing, default none
- to_string() -> str: convert toleration to string for printing
- to_dict() -> dict[str, Any] convert to dict
Expand All @@ -108,6 +109,7 @@ def __init__(
gpu: int = 0,
gpu_accelerator: str = None,
extended_resources: dict[str, int] = None,
node_selector: dict[str, str] = None,
tolerations: list[Toleration] = None,
):
"""
Expand All @@ -119,6 +121,7 @@ def __init__(
:param gpu: gpu
:param gpu_accelerator: accelerator type
:param extended_resources: extended resources
:param node_selector: nodeSelector
:param tolerations: tolerations
"""
self.name = name
Expand All @@ -128,6 +131,7 @@ def __init__(
self.gpu = gpu
self.gpu_accelerator = gpu_accelerator
self.extended_resources = extended_resources
self.node_selector = node_selector
self.tolerations = tolerations

def to_string(self) -> str:
Expand All @@ -139,9 +143,11 @@ def to_string(self) -> str:
if self.gpu > 0:
val = val + f", gpu {self.gpu}"
if self.gpu_accelerator is not None:
val = val + f", gpu accelerator {self.gpu_accelerator}"
val = val + f", gpu_accelerator {self.gpu_accelerator}"
if self.extended_resources is not None:
val = val + f", extended resources {self.extended_resources}"
val = val + f", extended_resources {self.extended_resources}"
if self.node_selector is not None:
val = val + f", node_selector {self.node_selector}"
if self.tolerations is None:
return val
val = val + ", tolerations ["
Expand All @@ -163,9 +169,11 @@ def to_dict(self) -> dict[str, Any]:
if self.gpu > 0:
dct["gpu"] = self.gpu
if self.gpu_accelerator is not None:
dct["gpu accelerator"] = self.gpu_accelerator
dct["gpu_accelerator"] = self.gpu_accelerator
if self.extended_resources is not None:
dct["extended resources"] = self.extended_resources
dct["extended_resources"] = self.extended_resources
if self.node_selector is not None:
dct["node_selector"] = self.node_selector
if self.tolerations is not None:
dct["tolerations"] = [tl.to_dict() for tl in self.tolerations]
return dct
Expand Down Expand Up @@ -208,6 +216,7 @@ def template_decoder(dct: dict[str, Any]) -> Template:
gpu=int(dct.get("gpu", "0")),
gpu_accelerator=dct.get("gpu_accelerator"),
extended_resources=dct.get("extended_resources"),
node_selector=dct.get("node_selector"),
tolerations=tolerations,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,16 @@ def _get_access_mode() -> AccessMode:
return AccessMode(int(dst.get("accessMode", "0")))
return None

if "volumeType" not in dst: # TODO
# PVC
return PVCVolume(
name=dst.get("name", ""),
mount_path=dst.get("mountPath", ""),
source=dst.get("source", ""),
read_only=dst.get("readOnly", False),
mount_propagation=_get_mount_propagation(),
)

match dst["volumeType"]:
case 0:
# PVC
Expand Down
Loading