Source code for codeflare_sdk.ray.rayjobs.config

# Copyright 2022 IBM, Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
The config sub-module contains the definition of the ManagedClusterConfig dataclass,
which is used to specify resource requirements and other details when creating a
Cluster object.
"""

import pathlib
from dataclasses import dataclass, field, fields
from typing import Dict, List, Optional, Union, get_args, get_origin, Any, Tuple
from kubernetes.client import (
    V1LocalObjectReference,
    V1SecretVolumeSource,
    V1Toleration,
    V1Volume,
    V1VolumeMount,
    V1ObjectMeta,
    V1Container,
    V1ContainerPort,
    V1Lifecycle,
    V1ExecAction,
    V1LifecycleHandler,
    V1EnvVar,
    V1PodTemplateSpec,
    V1PodSpec,
    V1ResourceRequirements,
)

import logging

from ...common.utils.constants import MOUNT_PATH, RAY_VERSION
from ...common.utils.utils import update_image

logger = logging.getLogger(__name__)

dir = pathlib.Path(__file__).parent.parent.resolve()

# https://docs.ray.io/en/latest/ray-core/scheduling/accelerators.html
DEFAULT_ACCELERATORS = {
    "nvidia.com/gpu": "GPU",
    "intel.com/gpu": "GPU",
    "amd.com/gpu": "GPU",
    "aws.amazon.com/neuroncore": "neuron_cores",
    "google.com/tpu": "TPU",
    "habana.ai/gaudi": "HPU",
    "huawei.com/Ascend910": "NPU",
    "huawei.com/Ascend310": "NPU",
}


[docs] @dataclass class ManagedClusterConfig: """ This dataclass is used to specify resource requirements and other details for RayJobs. The cluster name and namespace are automatically derived from the RayJob configuration. Args: head_accelerators: A dictionary of extended resource requests for the head node. ex: {"nvidia.com/gpu": 1} head_tolerations: List of tolerations for head nodes. num_workers: The number of workers to create. worker_tolerations: List of tolerations for worker nodes. envs: A dictionary of environment variables to set for the cluster. image: The image to use for the cluster. image_pull_secrets: A list of image pull secrets to use for the cluster. labels: A dictionary of labels to apply to the cluster. worker_accelerators: A dictionary of extended resource requests for each worker. ex: {"nvidia.com/gpu": 1} accelerator_configs: A dictionary of custom resource mappings to map extended resource requests to RayCluster resource names. Defaults to DEFAULT_ACCELERATORS but can be overridden with custom mappings. annotations: A dictionary of annotations to apply to the Job. volumes: A list of V1Volume objects to add to the Cluster volume_mounts: A list of V1VolumeMount objects to add to the Cluster """ head_cpu_requests: Union[int, str] = 2 head_cpu_limits: Union[int, str] = 2 head_memory_requests: Union[int, str] = 8 head_memory_limits: Union[int, str] = 8 head_accelerators: Dict[str, Union[str, int]] = field(default_factory=dict) head_tolerations: Optional[List[V1Toleration]] = None worker_cpu_requests: Union[int, str] = 1 worker_cpu_limits: Union[int, str] = 1 num_workers: int = 1 worker_memory_requests: Union[int, str] = 2 worker_memory_limits: Union[int, str] = 2 worker_tolerations: Optional[List[V1Toleration]] = None envs: Dict[str, str] = field(default_factory=dict) image: str = "" image_pull_secrets: List[str] = field(default_factory=list) labels: Dict[str, str] = field(default_factory=dict) worker_accelerators: Dict[str, Union[str, int]] = field(default_factory=dict) accelerator_configs: Dict[str, str] = field( default_factory=lambda: DEFAULT_ACCELERATORS.copy() ) annotations: Dict[str, str] = field(default_factory=dict) volumes: list[V1Volume] = field(default_factory=list) volume_mounts: list[V1VolumeMount] = field(default_factory=list) def __post_init__(self): self.envs["RAY_USAGE_STATS_ENABLED"] = "0" self._validate_types() self._memory_to_string() self._validate_gpu_config(self.head_accelerators) self._validate_gpu_config(self.worker_accelerators) def _validate_gpu_config(self, gpu_config: Dict[str, int]): for k in gpu_config.keys(): if k not in self.accelerator_configs.keys(): raise ValueError( f"GPU configuration '{k}' not found in accelerator_configs, available resources are {list(self.accelerator_configs.keys())}, to add more supported resources use accelerator_configs. i.e. accelerator_configs = {{'{k}': 'FOO_BAR'}}" ) def _memory_to_string(self): if isinstance(self.head_memory_requests, int): self.head_memory_requests = f"{self.head_memory_requests}G" if isinstance(self.head_memory_limits, int): self.head_memory_limits = f"{self.head_memory_limits}G" if isinstance(self.worker_memory_requests, int): self.worker_memory_requests = f"{self.worker_memory_requests}G" if isinstance(self.worker_memory_limits, int): self.worker_memory_limits = f"{self.worker_memory_limits}G" def _validate_types(self): """Validate the types of all fields in the ManagedClusterConfig dataclass.""" errors = [] for field_info in fields(self): value = getattr(self, field_info.name) expected_type = field_info.type if not self._is_type(value, expected_type): errors.append(f"'{field_info.name}' should be of type {expected_type}.") if errors: raise TypeError("Type validation failed:\n" + "\n".join(errors)) @staticmethod def _is_type(value, expected_type): """Check if the value matches the expected type.""" def check_type(value, expected_type): origin_type = get_origin(expected_type) args = get_args(expected_type) if origin_type is Union: return any(check_type(value, union_type) for union_type in args) if origin_type is list: if value is not None: return all(check_type(elem, args[0]) for elem in (value or [])) else: return True if origin_type is dict: if value is not None: return all( check_type(k, args[0]) and check_type(v, args[1]) for k, v in value.items() ) else: return True if origin_type is tuple: return all(check_type(elem, etype) for elem, etype in zip(value, args)) if expected_type is int: return isinstance(value, int) and not isinstance(value, bool) if expected_type is bool: return isinstance(value, bool) return isinstance(value, expected_type) return check_type(value, expected_type)
[docs] def build_ray_cluster_spec(self, cluster_name: str) -> Dict[str, Any]: """ Build the RayCluster spec from ManagedClusterConfig for embedding in RayJob. Args: self: The cluster configuration object (ManagedClusterConfig) cluster_name: The name for the cluster (derived from RayJob name) Returns: Dict containing the RayCluster spec for embedding in RayJob """ ray_cluster_spec = { "rayVersion": RAY_VERSION, "enableInTreeAutoscaling": False, # Required for Kueue-managed jobs "headGroupSpec": self._build_head_group_spec(), "workerGroupSpecs": [self._build_worker_group_spec(cluster_name)], } return ray_cluster_spec
def _build_head_group_spec(self) -> Dict[str, Any]: """Build the head group specification.""" return { "serviceType": "ClusterIP", "enableIngress": False, "rayStartParams": self._build_head_ray_params(), "template": V1PodTemplateSpec( metadata=V1ObjectMeta(annotations=self.annotations), spec=self._build_pod_spec(self._build_head_container(), is_head=True), ), } def _build_worker_group_spec(self, cluster_name: str) -> Dict[str, Any]: """Build the worker group specification.""" return { "replicas": self.num_workers, "minReplicas": self.num_workers, "maxReplicas": self.num_workers, "groupName": f"worker-group-{cluster_name}", "rayStartParams": self._build_worker_ray_params(), "template": V1PodTemplateSpec( metadata=V1ObjectMeta(annotations=self.annotations), spec=self._build_pod_spec( self._build_worker_container(), is_head=False, ), ), } def _build_head_ray_params(self) -> Dict[str, str]: """Build Ray start parameters for head node.""" params = { "dashboard-host": "0.0.0.0", "block": "true", } # Add GPU count if specified if self.head_accelerators: gpu_count = sum( count for resource_type, count in self.head_accelerators.items() if "gpu" in resource_type.lower() ) if gpu_count > 0: params["num-gpus"] = str(gpu_count) return params def _build_worker_ray_params(self) -> Dict[str, str]: """Build Ray start parameters for worker nodes.""" params = { "block": "true", } # Add GPU count if specified if self.worker_accelerators: gpu_count = sum( count for resource_type, count in self.worker_accelerators.items() if "gpu" in resource_type.lower() ) if gpu_count > 0: params["num-gpus"] = str(gpu_count) return params def _build_head_container(self) -> V1Container: """Build the head container specification.""" container = V1Container( name="ray-head", image=update_image(self.image), image_pull_policy="IfNotPresent", # Always IfNotPresent for RayJobs ports=[ V1ContainerPort(name="gcs", container_port=6379), V1ContainerPort(name="dashboard", container_port=8265), V1ContainerPort(name="client", container_port=10001), ], lifecycle=V1Lifecycle( pre_stop=V1LifecycleHandler( _exec=V1ExecAction(command=["/bin/sh", "-c", "ray stop"]) ) ), resources=self._build_resource_requirements( self.head_cpu_requests, self.head_cpu_limits, self.head_memory_requests, self.head_memory_limits, self.head_accelerators, ), volume_mounts=self._generate_volume_mounts(), env=self._build_env_vars() if hasattr(self, "envs") and self.envs else None, ) return container def _build_worker_container(self) -> V1Container: """Build the worker container specification.""" container = V1Container( name="ray-worker", image=update_image(self.image), image_pull_policy="IfNotPresent", # Always IfNotPresent for RayJobs lifecycle=V1Lifecycle( pre_stop=V1LifecycleHandler( _exec=V1ExecAction(command=["/bin/sh", "-c", "ray stop"]) ) ), resources=self._build_resource_requirements( self.worker_cpu_requests, self.worker_cpu_limits, self.worker_memory_requests, self.worker_memory_limits, self.worker_accelerators, ), volume_mounts=self._generate_volume_mounts(), env=self._build_env_vars() if hasattr(self, "envs") and self.envs else None, ) return container def _build_resource_requirements( self, cpu_requests: Union[int, str], cpu_limits: Union[int, str], memory_requests: Union[int, str], memory_limits: Union[int, str], extended_resource_requests: Dict[str, Union[int, str]] = None, ) -> V1ResourceRequirements: """Build Kubernetes resource requirements.""" resource_requirements = V1ResourceRequirements( requests={"cpu": cpu_requests, "memory": memory_requests}, limits={"cpu": cpu_limits, "memory": memory_limits}, ) # Add extended resources (e.g., GPUs) if extended_resource_requests: for resource_type, amount in extended_resource_requests.items(): resource_requirements.limits[resource_type] = amount resource_requirements.requests[resource_type] = amount return resource_requirements def _build_pod_spec(self, container: V1Container, is_head: bool) -> V1PodSpec: """Build the pod specification.""" pod_spec = V1PodSpec( containers=[container], volumes=self._generate_volumes(), restart_policy="Never", # RayJobs should not restart ) # Add tolerations if specified if is_head and hasattr(self, "head_tolerations") and self.head_tolerations: pod_spec.tolerations = self.head_tolerations elif ( not is_head and hasattr(self, "worker_tolerations") and self.worker_tolerations ): pod_spec.tolerations = self.worker_tolerations # Add image pull secrets if specified if hasattr(self, "image_pull_secrets") and self.image_pull_secrets: pod_spec.image_pull_secrets = [ V1LocalObjectReference(name=secret) for secret in self.image_pull_secrets ] return pod_spec def _generate_volume_mounts(self) -> list: """Generate volume mounts for the container.""" volume_mounts = [] # Add custom volume mounts if specified if hasattr(self, "volume_mounts") and self.volume_mounts: volume_mounts.extend(self.volume_mounts) return volume_mounts def _generate_volumes(self) -> list: """Generate volumes for the pod.""" volumes = [] # Add custom volumes if specified if hasattr(self, "volumes") and self.volumes: volumes.extend(self.volumes) return volumes def _build_env_vars(self) -> list: """Build environment variables list.""" return [V1EnvVar(name=key, value=value) for key, value in self.envs.items()]
[docs] def add_file_volumes(self, secret_name: str, mount_path: str = MOUNT_PATH): """ Add file volume and mount references to cluster configuration. Args: secret_name: Name of the Secret containing files mount_path: Where to mount files in containers (default: /home/ray/scripts) """ # Check if file volume already exists volume_name = "ray-job-files" existing_volume = next( (v for v in self.volumes if getattr(v, "name", None) == volume_name), None ) if existing_volume: logger.debug(f"File volume '{volume_name}' already exists, skipping...") return # Check if file mount already exists existing_mount = next( (m for m in self.volume_mounts if getattr(m, "name", None) == volume_name), None, ) if existing_mount: logger.debug( f"File volume mount '{volume_name}' already exists, skipping..." ) return # Add file volume to cluster configuration file_volume = V1Volume( name=volume_name, secret=V1SecretVolumeSource(secret_name=secret_name) ) self.volumes.append(file_volume) # Add file volume mount to cluster configuration file_mount = V1VolumeMount(name=volume_name, mount_path=mount_path) self.volume_mounts.append(file_mount) logger.info( f"Added file volume '{secret_name}' to cluster config: mount_path={mount_path}" )
[docs] def validate_secret_size(self, files: Dict[str, str]) -> None: total_size = sum(len(content.encode("utf-8")) for content in files.values()) if total_size > 1024 * 1024: # 1MB raise ValueError( f"Secret size exceeds 1MB limit. Total size: {total_size} bytes" )
[docs] def build_file_secret_spec( self, job_name: str, namespace: str, files: Dict[str, str] ) -> Dict[str, Any]: """ Build Secret specification for files Args: job_name: Name of the RayJob (used for Secret naming) namespace: Kubernetes namespace files: Dictionary of file_name -> file_content Returns: Dict: Secret specification ready for Kubernetes API """ secret_name = f"{job_name}-files" return { "apiVersion": "v1", "kind": "Secret", "type": "Opaque", "metadata": { "name": secret_name, "namespace": namespace, "labels": { "ray.io/job-name": job_name, "app.kubernetes.io/managed-by": "codeflare-sdk", "app.kubernetes.io/component": "rayjob-files", }, }, "data": files, }
[docs] def build_file_volume_specs( self, secret_name: str, mount_path: str = MOUNT_PATH ) -> Tuple[Dict[str, Any], Dict[str, Any]]: """ Build volume and mount specifications for files Args: secret_name: Name of the Secret containing files mount_path: Where to mount files in containers Returns: Tuple of (volume_spec, mount_spec) as dictionaries """ volume_spec = {"name": "ray-job-files", "secret": {"secretName": secret_name}} mount_spec = {"name": "ray-job-files", "mountPath": mount_path} return volume_spec, mount_spec