Skip to content

Commit

Permalink
[RayCluster][Feature] add redis username to head pod from GcsFaultTol…
Browse files Browse the repository at this point in the history
…eranceOptions

Signed-off-by: win5923 <[email protected]>
  • Loading branch information
win5923 committed Jan 16, 2025
1 parent 28ab5c9 commit 4793752
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 4 deletions.
15 changes: 15 additions & 0 deletions ray-operator/controllers/ray/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ func DefaultHeadPodTemplate(ctx context.Context, instance rayv1.RayCluster, head
podTemplate.Spec.Containers = append(podTemplate.Spec.Containers, autoscalerContainer)
}

if gcsFtOptions := instance.Spec.GcsFaultToleranceOptions; gcsFtOptions != nil {
// If `GcsFaultToleranceOptions.RedisUsername` is set, it will be put into the `REDIS_USERNAME` environment variable later.
// Here, we use `$REDIS_USERNAME` in rayStartParams to refer to the environment variable.
if gcsFtOptions.RedisUsername != nil {
headSpec.RayStartParams["redis-username"] = "$REDIS_USERNAME"
}
}

// If the metrics port does not exist in the Ray container, add a default one for Prometheus.
isMetricsPortExists := utils.FindContainerPort(&podTemplate.Spec.Containers[utils.RayContainerIndex], utils.MetricsPortName, -1) != -1
if !isMetricsPortExists {
Expand Down Expand Up @@ -695,6 +703,13 @@ func setContainerEnvVars(pod *corev1.Pod, rayNodeType rayv1.RayNodeType, gcsOpti
Name: utils.RAY_REDIS_ADDRESS,
Value: gcsOptions.RedisAddress,
})
if gcsOptions.RedisUsername != nil {
container.Env = append(container.Env, corev1.EnvVar{
Name: utils.REDIS_USERNAME,
Value: gcsOptions.RedisUsername.Value,
ValueFrom: gcsOptions.RedisUsername.ValueFrom,
})
}
if gcsOptions.RedisPassword != nil {
container.Env = utils.UpsertEnvVar(container.Env, corev1.EnvVar{
Name: utils.REDIS_PASSWORD,
Expand Down
41 changes: 39 additions & 2 deletions ray-operator/controllers/ray/common/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,8 @@ func TestBuildPod_WithGcsFtEnabled(t *testing.T) {
cluster.Spec.GcsFaultToleranceOptions = &rayv1.GcsFaultToleranceOptions{
ExternalStorageNamespace: "myns",
RedisAddress: "redis://127.0.0.1:6379",
RedisUsername: &rayv1.RedisCredential{Value: "myuser"},
RedisPassword: &rayv1.RedisCredential{Value: "mypass"},
}
podTemplateSpec = DefaultHeadPodTemplate(ctx, *cluster, cluster.Spec.HeadGroupSpec, podName, "6379")
pod = BuildPod(ctx, podTemplateSpec, rayv1.HeadNode, cluster.Spec.GcsFaultToleranceOptions, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, utils.GetCRDType(""), "")
Expand All @@ -665,31 +667,55 @@ func TestBuildPod_WithGcsFtEnabled(t *testing.T) {
}
checkContainerEnv(t, rayContainer, utils.RAY_EXTERNAL_STORAGE_NS, "myns")
checkContainerEnv(t, rayContainer, utils.RAY_REDIS_ADDRESS, "redis://127.0.0.1:6379")
checkContainerEnv(t, rayContainer, utils.REDIS_USERNAME, "myuser")
checkContainerEnv(t, rayContainer, utils.REDIS_PASSWORD, "mypass")

// Test 7 with a plain text redis password in GcsFaultToleranceOptions
if !strings.Contains(rayContainer.Args[0], "--redis-username=$REDIS_USERNAME") {
t.Fatalf("redis username not found in the ray start command %s", rayContainer.Args[0])
}
if !strings.Contains(rayContainer.Args[0], "--redis-password=$REDIS_PASSWORD") {
t.Fatalf("redis password not found in the ray start command %s", rayContainer.Args[0])
}

// Test 7 with a plain text redis password and username in GcsFaultToleranceOptions
cluster = instance.DeepCopy()
cluster.UID = "mycluster"
cluster.Annotations = map[string]string{}
cluster.Spec.GcsFaultToleranceOptions = &rayv1.GcsFaultToleranceOptions{
RedisAddress: "redis://127.0.0.1:6379",
RedisUsername: &rayv1.RedisCredential{Value: "myuser"},
RedisPassword: &rayv1.RedisCredential{Value: "mypassword"},
}

podTemplateSpec = DefaultHeadPodTemplate(ctx, *cluster, cluster.Spec.HeadGroupSpec, podName, "6379")
pod = BuildPod(ctx, podTemplateSpec, rayv1.HeadNode, cluster.Spec.GcsFaultToleranceOptions, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, utils.GetCRDType(""), "")
rayContainer = pod.Spec.Containers[utils.RayContainerIndex]

if !strings.Contains(rayContainer.Args[0], "--redis-username=$REDIS_USERNAME") {
t.Fatalf("redis username not found in the ray start command %s", rayContainer.Args[0])
}
if !strings.Contains(rayContainer.Args[0], "--redis-password=$REDIS_PASSWORD") {
t.Fatalf("Ray container expected `--redis-password=$REDIS_PASSWORD` in `%v`", rayContainer.Args[0])
}
checkContainerEnv(t, rayContainer, utils.REDIS_USERNAME, "myuser")
checkContainerEnv(t, rayContainer, utils.REDIS_PASSWORD, "mypassword")

// Test 8 with a redis password from secret in GcsFaultToleranceOptions
// Test 8 with a redis password and username from secret in GcsFaultToleranceOptions
cluster = instance.DeepCopy()
cluster.UID = "mycluster"
cluster.Annotations = map[string]string{}
cluster.Spec.GcsFaultToleranceOptions = &rayv1.GcsFaultToleranceOptions{
RedisAddress: "redis://127.0.0.1:6379",
RedisUsername: &rayv1.RedisCredential{
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: "redis-username-secret",
},
Key: "username",
},
},
},
RedisPassword: &rayv1.RedisCredential{
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
Expand All @@ -706,10 +732,21 @@ func TestBuildPod_WithGcsFtEnabled(t *testing.T) {
pod = BuildPod(ctx, podTemplateSpec, rayv1.HeadNode, cluster.Spec.GcsFaultToleranceOptions, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, utils.GetCRDType(""), "")
rayContainer = pod.Spec.Containers[utils.RayContainerIndex]

if !strings.Contains(rayContainer.Args[0], "--redis-username=$REDIS_USERNAME") {
t.Fatalf("redis username not found in the ray start command %s", rayContainer.Args[0])
}
if !strings.Contains(rayContainer.Args[0], "--redis-password=$REDIS_PASSWORD") {
t.Fatalf("Ray container expected `--redis-password=$REDIS_PASSWORD` in `%v`", rayContainer.Args[0])
}

usernameEnv := getEnvVar(rayContainer, utils.REDIS_USERNAME)
if usernameEnv == nil || usernameEnv.ValueFrom == nil || usernameEnv.ValueFrom.SecretKeyRef == nil {
t.Fatalf("Ray container expected env `%v` in `%v`", utils.REDIS_USERNAME, rayContainer.Env)
}
if usernameEnv.ValueFrom.SecretKeyRef.LocalObjectReference.Name != "redis-username-secret" ||
usernameEnv.ValueFrom.SecretKeyRef.Key != "username" {
t.Fatalf("Ray container expected secret `redis-username-secret` with key `username` ")
}
passwordEnv := getEnvVar(rayContainer, utils.REDIS_PASSWORD)
if passwordEnv == nil || passwordEnv.ValueFrom == nil || passwordEnv.ValueFrom.SecretKeyRef == nil {
t.Fatalf("Ray container expected env `%v` in `%v`", utils.REDIS_PASSWORD, rayContainer.Env)
Expand Down
8 changes: 6 additions & 2 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1212,8 +1212,12 @@ func (r *RayClusterReconciler) buildRedisCleanupJob(ctx context.Context, instanc
"import sys; " +
"redis_address = os.getenv('RAY_REDIS_ADDRESS', '').split(',')[0]; " +
"redis_address = redis_address if '://' in redis_address else 'redis://' + redis_address; " +
"parsed = urlparse(redis_address); " +
"sys.exit(1) if not cleanup_redis_storage(host=parsed.hostname, port=parsed.port, password=os.getenv('REDIS_PASSWORD', parsed.password), use_ssl=parsed.scheme=='rediss', storage_namespace=os.getenv('RAY_external_storage_namespace')) else None\"",
"parsed = urlparse(redis_address); ",
}
if utils.EnvVarExists(utils.REDIS_USERNAME, pod.Spec.Containers[utils.RayContainerIndex].Env) {
pod.Spec.Containers[utils.RayContainerIndex].Args[0] += "sys.exit(1) if not cleanup_redis_storage(host=parsed.hostname, port=parsed.port, username=os.getenv('REDIS_USERNAME', parsed.username), password=os.getenv('REDIS_PASSWORD', parsed.password), use_ssl=parsed.scheme=='rediss', storage_namespace=os.getenv('RAY_external_storage_namespace')) else None\""
} else {
pod.Spec.Containers[utils.RayContainerIndex].Args[0] += "sys.exit(1) if not cleanup_redis_storage(host=parsed.hostname, port=parsed.port, password=os.getenv('REDIS_PASSWORD', parsed.password), use_ssl=parsed.scheme=='rediss', storage_namespace=os.getenv('RAY_external_storage_namespace')) else None\""
}

// Disable liveness and readiness probes because the Job will not launch processes like Raylet and GCS.
Expand Down
1 change: 1 addition & 0 deletions ray-operator/controllers/ray/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ const (
RAY_ADDRESS = "RAY_ADDRESS"
RAY_REDIS_ADDRESS = "RAY_REDIS_ADDRESS"
REDIS_PASSWORD = "REDIS_PASSWORD"
REDIS_USERNAME = "REDIS_USERNAME"
RAY_DASHBOARD_ENABLE_K8S_DISK_USAGE = "RAY_DASHBOARD_ENABLE_K8S_DISK_USAGE"
RAY_EXTERNAL_STORAGE_NS = "RAY_external_storage_namespace"
RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S = "RAY_gcs_rpc_server_reconnect_timeout_s"
Expand Down

0 comments on commit 4793752

Please sign in to comment.