Source code for codeflare_sdk.ray.client.ray_jobs

# Copyright 2022 IBM, Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
The ray_jobs sub-module contains methods needed to submit jobs and connect to Ray Clusters that were not created by CodeFlare.
The SDK acts as a wrapper for the Ray Job Submission Client.
"""

from ray.job_submission import JobSubmissionClient
from ray.dashboard.modules.job.pydantic_models import JobDetails
from typing import Iterator, Optional, Dict, Any, Union, List


[docs] class RayJobClient: """ A wrapper class for the Ray Job Submission Client, used for interacting with Ray clusters to manage job submissions, deletions, and other job-related information. Args: address (Optional[str]): The Ray cluster's address, which may be either the Ray Client address, HTTP address of the dashboard server on the head node, or "auto" / "localhost:<port>" for a local cluster. This is overridden by the RAY_ADDRESS environment variable if set. create_cluster_if_needed (bool): If True, a new cluster will be created if not already running at the specified address. By default, Ray requires an existing cluster. cookies (Optional[Dict[str, Any]]): HTTP cookies to send with requests to the job server. metadata (Optional[Dict[str, Any]]): Global metadata to store with all jobs, merged with job-specific metadata during job submission. headers (Optional[Dict[str, Any]]): HTTP headers to send with requests to the job server, can be used for authentication. verify (Optional[Union[str, bool]]): If True, verifies the server's TLS certificate. Can also be a path to trusted certificates. Default is True. """ def __init__( self, address: Optional[str] = None, create_cluster_if_needed: bool = False, cookies: Optional[Dict[str, Any]] = None, metadata: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, Any]] = None, verify: Optional[Union[str, bool]] = True, ): self.rayJobClient = JobSubmissionClient( address=address, create_cluster_if_needed=create_cluster_if_needed, cookies=cookies, metadata=metadata, headers=headers, verify=verify, )
[docs] def submit_job( self, entrypoint: str, job_id: Optional[str] = None, runtime_env: Optional[Dict[str, Any]] = None, metadata: Optional[Dict[str, str]] = None, submission_id: Optional[str] = None, entrypoint_num_cpus: Optional[Union[int, float]] = None, entrypoint_num_gpus: Optional[Union[int, float]] = None, entrypoint_memory: Optional[int] = None, entrypoint_resources: Optional[Dict[str, float]] = None, ) -> str: """ Submits a job to the Ray cluster with specified resources and returns the job ID. Args: entrypoint (str): The command to execute for this job. job_id (Optional[str]): Deprecated, use `submission_id`. A unique job identifier. runtime_env (Optional[Dict[str, Any]]): The runtime environment for this job. metadata (Optional[Dict[str, str]]): Metadata associated with the job, merged with global metadata. submission_id (Optional[str]): Unique ID for the job submission. entrypoint_num_cpus (Optional[Union[int, float]]): The quantity of CPU cores to reserve for the execution of the entrypoint command, separately from any tasks or actors launched by it. Defaults to 0. entrypoint_num_gpus (Optional[Union[int, float]]): The quantity of GPUs to reserve for the execution of the entrypoint command, separately from any tasks or actors launched by it. Defaults to 0. entrypoint_memory (Optional[int]): The quantity of memory to reserve for the execution of the entrypoint command, separately from any tasks or actors launched by it. Defaults to 0. entrypoint_resources (Optional[Dict[str, float]]): The quantity of custom resources to reserve for the execution of the entrypoint command, separately from any tasks or actors launched by it. Returns: str: The unique identifier for the submitted job. """ return self.rayJobClient.submit_job( entrypoint=entrypoint, job_id=job_id, runtime_env=runtime_env, metadata=metadata, submission_id=submission_id, entrypoint_num_cpus=entrypoint_num_cpus, entrypoint_num_gpus=entrypoint_num_gpus, entrypoint_memory=entrypoint_memory, entrypoint_resources=entrypoint_resources, )
[docs] def delete_job(self, job_id: str) -> (bool, str): """ Deletes a job by job ID. Args: job_id (str): The unique identifier of the job to delete. Returns: tuple(bool, str): A tuple with deletion status and a message. """ deletion_status = self.rayJobClient.delete_job(job_id=job_id) if deletion_status: message = f"Successfully deleted Job {job_id}" else: message = f"Failed to delete Job {job_id}" return deletion_status, message
[docs] def get_address(self) -> str: """ Retrieves the address of the connected Ray cluster. Returns: str: The Ray cluster's address. """ return self.rayJobClient.get_address()
[docs] def get_job_info(self, job_id: str): """ Fetches information about a job by job ID. Args: job_id (str): The unique identifier of the job. Returns: JobInfo: Information about the job's status, progress, and other details. """ return self.rayJobClient.get_job_info(job_id=job_id)
[docs] def get_job_logs(self, job_id: str) -> str: """ Retrieves the logs for a specific job by job ID. Args: job_id (str): The unique identifier of the job. Returns: str: Logs output from the job. """ return self.rayJobClient.get_job_logs(job_id=job_id)
[docs] def get_job_status(self, job_id: str) -> str: """ Fetches the current status of a job by job ID. Args: job_id (str): The unique identifier of the job. Returns: str: The job's status. """ return self.rayJobClient.get_job_status(job_id=job_id)
[docs] def list_jobs(self) -> List[JobDetails]: """ Lists all current jobs in the Ray cluster. Returns: List[JobDetails]: A list of job details for each current job in the cluster. """ return self.rayJobClient.list_jobs()
[docs] def stop_job(self, job_id: str) -> (bool, str): """ Stops a running job by job ID. Args: job_id (str): The unique identifier of the job to stop. Returns: tuple(bool, str): A tuple with the stop status and a message. """ stop_job_status = self.rayJobClient.stop_job(job_id=job_id) if stop_job_status: message = f"Successfully stopped Job {job_id}" else: message = f"Failed to stop Job, {job_id} could have already completed." return stop_job_status, message
[docs] def tail_job_logs(self, job_id: str) -> Iterator[str]: """ Continuously streams the logs of a job. Args: job_id (str): The unique identifier of the job. Returns: Iterator[str]: An iterator that yields log entries in real-time. """ return self.rayJobClient.tail_job_logs(job_id=job_id)