Skip to content

Commit

Permalink
Add support for capturing Ray job logs via a sidecar
Browse files Browse the repository at this point in the history
Signed-off-by: Jeev B <[email protected]>
  • Loading branch information
jeevb committed Oct 27, 2023
1 parent e1c05db commit 1c6ac06
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 1 deletion.
3 changes: 3 additions & 0 deletions flyteplugins/go/tasks/plugins/k8s/ray/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package ray
import (
"context"

v1 "k8s.io/api/core/v1"

pluginsConfig "github.com/flyteorg/flyte/flyteplugins/go/tasks/config"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/logs"
pluginmachinery "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/k8s"
Expand Down Expand Up @@ -78,6 +80,7 @@ type Config struct {
// Remote Ray Cluster Config
RemoteClusterConfig pluginmachinery.ClusterConfig `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for ray jobs"`
Logs logs.LogConfig `json:"logs" pflag:"-,Log configuration for ray jobs"`
LogsSidecar *v1.Container `json:"logsSidecar" pflag:"-,Sidecar to inject into head pods for capturing ray job logs"`
Defaults DefaultConfig `json:"defaults" pflag:"-,Default configuration for ray jobs"`
EnableUsageStats bool `json:"enableUsageStats" pflag:",Enable usage stats for ray jobs. These stats are submitted to usage-stats.ray.io per https://docs.ray.io/en/latest/cluster/usage-stats.html"`
}
Expand Down
63 changes: 62 additions & 1 deletion flyteplugins/go/tasks/plugins/k8s/ray/ray.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
)

const (
headPodPrimaryContainerName = "ray-head"
rayStateMountPath = "/tmp/ray"
defaultRayStateVolName = "system-ray-state"
rayTaskType = "ray"
KindRayJob = "RayJob"
IncludeDashboard = "include-dashboard"
Expand Down Expand Up @@ -179,11 +182,68 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC
return &rayJobObject, nil
}

func addLogsSidecarToHeadPodSpec(primaryContainer *v1.Container, podSpec *v1.PodSpec) {
cfg := GetConfig()
if cfg.LogsSidecar == nil {
return
}
sidecar := cfg.LogsSidecar.DeepCopy()

// Ray logs integration
var rayStateVolMount *v1.VolumeMount
// Look for an existing volume mount on the primary container, mounted at /tmp/ray
for _, vm := range primaryContainer.VolumeMounts {
if vm.MountPath == rayStateMountPath {
vm := vm
rayStateVolMount = &vm
break

Check warning on line 199 in flyteplugins/go/tasks/plugins/k8s/ray/ray.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/ray/ray.go#L190-L199

Added lines #L190 - L199 were not covered by tests
}
}
// No existing volume mount exists at /tmp/ray. We create a new volume and volume
// mount and add it to the pod and container specs respectively
if rayStateVolMount == nil {
vol := v1.Volume{
Name: defaultRayStateVolName,
VolumeSource: v1.VolumeSource{
EmptyDir: &v1.EmptyDirVolumeSource{},
},
}
podSpec.Volumes = append(podSpec.Volumes, vol)
volMount := v1.VolumeMount{
Name: defaultRayStateVolName,
MountPath: rayStateMountPath,
}
primaryContainer.VolumeMounts = append(primaryContainer.VolumeMounts, volMount)
rayStateVolMount = &volMount
}

Check warning on line 218 in flyteplugins/go/tasks/plugins/k8s/ray/ray.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/ray/ray.go#L204-L218

Added lines #L204 - L218 were not covered by tests
// We need to mirror the ray state volume mount into the sidecar as readonly,
// so that we can read the logs written by the head node.
readOnlyRayStateVolMount := *rayStateVolMount.DeepCopy()
readOnlyRayStateVolMount.ReadOnly = true

// Update volume mounts on sidecar
// If one already exists with the desired mount path, simply replace it. Otherwise,
// add it to sidecar's volume mounts.
foundExistingSidecarVolMount := false
for idx, vm := range sidecar.VolumeMounts {
if vm.MountPath == rayStateMountPath {
foundExistingSidecarVolMount = true
sidecar.VolumeMounts[idx] = readOnlyRayStateVolMount
}

Check warning on line 232 in flyteplugins/go/tasks/plugins/k8s/ray/ray.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/ray/ray.go#L221-L232

Added lines #L221 - L232 were not covered by tests
}
if !foundExistingSidecarVolMount {
sidecar.VolumeMounts = append(sidecar.VolumeMounts, readOnlyRayStateVolMount)
}

Check warning on line 236 in flyteplugins/go/tasks/plugins/k8s/ray/ray.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/ray/ray.go#L234-L236

Added lines #L234 - L236 were not covered by tests

// Add sidecar to containers
podSpec.Containers = append(podSpec.Containers, *sidecar)

Check warning on line 239 in flyteplugins/go/tasks/plugins/k8s/ray/ray.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/ray/ray.go#L239

Added line #L239 was not covered by tests
}

func buildHeadPodTemplate(container *v1.Container, podSpec *v1.PodSpec, objectMeta *metav1.ObjectMeta, taskCtx pluginsCore.TaskExecutionContext) v1.PodTemplateSpec {
// Some configs are copy from https://github.com/ray-project/kuberay/blob/b72e6bdcd9b8c77a9dc6b5da8560910f3a0c3ffd/apiserver/pkg/util/cluster.go#L97
// They should always be the same, so we could hard code here.
primaryContainer := container.DeepCopy()
primaryContainer.Name = "ray-head"
primaryContainer.Name = headPodPrimaryContainerName

envs := []v1.EnvVar{
{
Expand Down Expand Up @@ -220,6 +280,7 @@ func buildHeadPodTemplate(container *v1.Container, podSpec *v1.PodSpec, objectMe
headPodSpec := podSpec.DeepCopy()

headPodSpec.Containers = []v1.Container{*primaryContainer}
addLogsSidecarToHeadPodSpec(primaryContainer, headPodSpec)

podTemplateSpec := v1.PodTemplateSpec{
Spec: *headPodSpec,
Expand Down

0 comments on commit 1c6ac06

Please sign in to comment.