from __future__ import annotations # Postpone evaluation of annotations
import logging
import os
import re
import yaml
import zipfile
import base64
import io
from typing import Dict, Any, Optional, List, TYPE_CHECKING
from codeflare_sdk.common.utils.constants import MOUNT_PATH
from kubernetes import client
from ray.runtime_env import RuntimeEnv
from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig
from ...common.kubernetes_cluster.auth import get_api_client
# Use TYPE_CHECKING to avoid circular import at runtime
if TYPE_CHECKING:
from codeflare_sdk.ray.rayjobs.rayjob import RayJob
logger = logging.getLogger(__name__)
# Regex pattern for finding Python files in entrypoint commands
# Matches paths like: test.py, ./test.py, dir/test.py, my-dir/test.py
PYTHON_FILE_PATTERN = r"(?:python\s+)?([./\w/-]+\.py)"
# Path where working_dir will be unzipped on submitter pod
UNZIP_PATH = "/tmp/rayjob-working-dir"
# Exclude Jupyter notebook and Markdown files from working directory zips
JUPYTER_NOTEBOOK_PATTERN = r"\.ipynb$"
MARKDOWN_FILE_PATTERN = r"\.md$"
def _should_exclude_file(file_path: str) -> bool:
"""
Check if file should be excluded from working directory zip.
Currently excludes:
- Jupyter notebook files (.ipynb)
- Markdown files (.md)
Args:
file_path: Relative file path within the working directory
Returns:
True if file should be excluded, False otherwise
"""
return bool(
re.search(JUPYTER_NOTEBOOK_PATTERN, file_path, re.IGNORECASE)
or re.search(MARKDOWN_FILE_PATTERN, file_path, re.IGNORECASE)
)
def _normalize_runtime_env(
runtime_env: Optional[RuntimeEnv],
) -> Optional[Dict[str, Any]]:
if runtime_env is None:
return None
return runtime_env.to_dict()
def _zip_directory(directory_path: str) -> Optional[bytes]:
"""
Zip entire directory preserving structure, excluding Jupyter notebook and markdown files.
Args:
directory_path: Path to directory to zip
Returns:
Bytes of zip file, or None on error
"""
try:
# Create in-memory zip file
zip_buffer = io.BytesIO()
excluded_count = 0
with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zipf:
# Walk through directory and add all files
for root, dirs, files in os.walk(directory_path):
for file in files:
file_path = os.path.join(root, file)
# Calculate relative path from directory_path
arcname = os.path.relpath(file_path, directory_path)
# Check if file should be excluded
if _should_exclude_file(arcname):
excluded_count += 1
logger.debug(f"Excluded from zip: {arcname}")
continue
zipf.write(file_path, arcname)
logger.debug(f"Added to zip: {arcname}")
zip_data = zip_buffer.getvalue()
# Log summary with exclusion count
log_message = (
f"Successfully zipped directory: {directory_path} ({len(zip_data)} bytes)"
)
if excluded_count > 0:
log_message += f" - Excluded {excluded_count} file(s) (.ipynb, .md)"
logger.info(log_message)
return zip_data
except (IOError, OSError) as e:
logger.error(f"Failed to zip directory {directory_path}: {e}")
return None
def _extract_single_entrypoint_file(job: RayJob) -> Optional[Dict[str, str]]:
"""
Extract single Python file from entrypoint if no working_dir specified.
Returns a dict with metadata about the file path structure so we can
preserve it when mounting via Secret.
Args:
job: RayJob instance
Returns:
Dict with special format: {"__entrypoint_path__": path, "filename": content}
This allows us to preserve directory structure when mounting
"""
if not job.entrypoint:
return None
# Look for Python file in entrypoint
matches = re.findall(PYTHON_FILE_PATTERN, job.entrypoint)
for file_path in matches:
# Check if it's a local file
if os.path.isfile(file_path):
try:
with open(file_path, "r") as f:
content = f.read()
# Use basename as key (Secret keys can't have slashes)
# But store the full path for later use in Secret item.path
filename = os.path.basename(file_path)
relative_path = file_path.lstrip("./")
logger.info(f"Extracted single entrypoint file: {file_path}")
# Return special format with metadata
return {"__entrypoint_path__": relative_path, filename: content}
except (IOError, OSError) as e:
logger.warning(f"Could not read entrypoint file {file_path}: {e}")
return None
[docs]
def process_runtime_env(
job: RayJob, files: Optional[Dict[str, str]] = None
) -> Optional[str]:
"""
Process runtime_env field to handle env_vars, pip dependencies, and working_dir.
Returns:
Processed runtime environment as YAML string, or None if no processing needed
"""
# Convert RuntimeEnv to dict for processing
runtime_env_dict = _normalize_runtime_env(job.runtime_env)
processed_env = {}
# Handle env_vars
if runtime_env_dict and "env_vars" in runtime_env_dict:
processed_env["env_vars"] = runtime_env_dict["env_vars"]
logger.info(
f"Added {len(runtime_env_dict['env_vars'])} environment variables to runtime_env"
)
# Handle pip dependencies
if runtime_env_dict and "pip" in runtime_env_dict:
pip_deps = process_pip_dependencies(job, runtime_env_dict["pip"])
if pip_deps:
processed_env["pip"] = pip_deps
# Handle working_dir
if runtime_env_dict and "working_dir" in runtime_env_dict:
working_dir = runtime_env_dict["working_dir"]
if os.path.isdir(working_dir):
# Local working directory - will be zipped and unzipped to UNZIP_PATH by submitter pod
processed_env["working_dir"] = UNZIP_PATH
logger.info(
f"Local working_dir will be zipped, mounted, and unzipped to: {UNZIP_PATH}"
)
else:
# Remote URI (e.g., GitHub, S3) - pass through as-is
processed_env["working_dir"] = working_dir
logger.info(f"Using remote working_dir: {working_dir}")
# If no working_dir specified but we have files (single file case)
elif not runtime_env_dict or "working_dir" not in runtime_env_dict:
if files and "working_dir.zip" not in files:
# Single file case - mount at MOUNT_PATH
processed_env["working_dir"] = MOUNT_PATH
logger.info(f"Single file will be mounted at: {MOUNT_PATH}")
# Convert to YAML string if we have any processed environment
if processed_env:
return yaml.dump(processed_env, default_flow_style=False)
return None
[docs]
def process_pip_dependencies(job: RayJob, pip_spec) -> Optional[List[str]]:
"""
Process pip dependencies from runtime_env.
Args:
pip_spec: Can be a list of packages, a string path to requirements.txt, or dict
Returns:
List of pip dependencies
"""
if isinstance(pip_spec, list):
# Already a list of dependencies
logger.info(f"Using provided pip dependencies: {len(pip_spec)} packages")
return pip_spec
elif isinstance(pip_spec, str):
# Assume it's a path to requirements.txt
return parse_requirements_file(pip_spec)
elif isinstance(pip_spec, dict):
# Handle dict format (e.g., {"packages": [...], "pip_check": False})
if "packages" in pip_spec:
logger.info(
f"Using pip dependencies from dict: {len(pip_spec['packages'])} packages"
)
return pip_spec["packages"]
logger.warning(f"Unsupported pip specification format: {type(pip_spec)}")
return None
[docs]
def parse_requirements_file(requirements_path: str) -> Optional[List[str]]:
"""
Parse a requirements.txt file and return list of dependencies.
Args:
requirements_path: Path to requirements.txt file
Returns:
List of pip dependencies
"""
if not os.path.isfile(requirements_path):
logger.warning(f"Requirements file not found: {requirements_path}")
return None
try:
with open(requirements_path, "r") as f:
lines = f.readlines()
# Parse requirements, filtering out comments and empty lines
requirements = []
for line in lines:
line = line.strip()
if line and not line.startswith("#"):
requirements.append(line)
logger.info(f"Parsed {len(requirements)} dependencies from {requirements_path}")
return requirements
except (IOError, OSError) as e:
logger.warning(f"Could not read requirements file {requirements_path}: {e}")
return None
[docs]
def create_secret_from_spec(
job: RayJob, secret_spec: Dict[str, Any], rayjob_result: Dict[str, Any] = None
) -> str:
"""
Create Secret from specification via Kubernetes API.
Args:
secret_spec: Secret specification dictionary
rayjob_result: The result from RayJob creation containing UID
Returns:
str: Name of the created Secret
"""
secret_name = secret_spec["metadata"]["name"]
metadata = client.V1ObjectMeta(**secret_spec["metadata"])
# Add owner reference to ensure proper cleanup
# We can trust that rayjob_result contains UID since submit_job() only returns
# complete K8s resources or None, and we already validated result exists
logger.info(
f"Adding owner reference to Secret '{secret_name}' with RayJob UID: {rayjob_result['metadata']['uid']}"
)
metadata.owner_references = [
client.V1OwnerReference(
api_version="ray.io/v1",
kind="RayJob",
name=job.name,
uid=rayjob_result["metadata"]["uid"],
controller=True,
block_owner_deletion=True,
)
]
# Convert dict spec to V1Secret
# Use stringData instead of data to avoid double base64 encoding
# Our zip files are already base64-encoded, so stringData will handle the final encoding
secret = client.V1Secret(
metadata=metadata,
type=secret_spec.get("type", "Opaque"),
string_data=secret_spec["data"],
)
# Create Secret via Kubernetes API
k8s_api = client.CoreV1Api(get_api_client())
try:
k8s_api.create_namespaced_secret(namespace=job.namespace, body=secret)
logger.info(
f"Created Secret '{secret_name}' with {len(secret_spec['data'])} files"
)
except client.ApiException as e:
if e.status == 409: # Already exists
logger.info(f"Secret '{secret_name}' already exists, updating...")
k8s_api.replace_namespaced_secret(
name=secret_name, namespace=job.namespace, body=secret
)
else:
raise RuntimeError(f"Failed to create Secret '{secret_name}': {e}")
return secret_name
[docs]
def create_file_secret(
job: RayJob, files: Dict[str, str], rayjob_result: Dict[str, Any]
):
"""
Create Secret with owner reference for local files.
"""
# Use a basic config builder for Secret creation
config_builder = ManagedClusterConfig()
# Filter out metadata keys (like __entrypoint_path__) from Secret data
secret_files = {k: v for k, v in files.items() if not k.startswith("__")}
# Validate and build Secret spec
config_builder.validate_secret_size(secret_files)
secret_spec = config_builder.build_file_secret_spec(
job_name=job.name, namespace=job.namespace, files=secret_files
)
# Create Secret with owner reference
create_secret_from_spec(job, secret_spec, rayjob_result)