Source code for codeflare_sdk.common.widgets.widgets
# Copyright 2024 IBM, Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
The widgets sub-module contains the ui widgets created using the ipywidgets package.
"""
import contextlib
import io
import os
import warnings
import time
import codeflare_sdk
from kubernetes import client
from kubernetes.client.rest import ApiException
import ipywidgets as widgets
from IPython.display import display, HTML, Javascript
import pandas as pd
from ...common.utils import get_current_namespace
from ...ray.cluster.config import ClusterConfiguration
from ...ray.cluster.status import RayClusterStatus
from ..kubernetes_cluster import _kube_api_error_handling
from ..kubernetes_cluster.auth import (
config_check,
get_api_client,
)
[docs]
class RayClusterManagerWidgets:
"""
The RayClusterManagerWidgets class is responsible for initialising the ToggleButtons, Button, and Output widgets.
It also handles the user interactions and displays the cluster details.
Used when calling the view_clusters function.
"""
def __init__(self, ray_clusters_df: pd.DataFrame, namespace: str = None):
# Data
self.ray_clusters_df = ray_clusters_df
self.namespace = get_current_namespace() if not namespace else namespace
self.raycluster_data_output = widgets.Output()
self.user_output = widgets.Output()
self.url_output = widgets.Output()
# Widgets
self.classification_widget = widgets.ToggleButtons(
options=ray_clusters_df["Name"].tolist(),
value=ray_clusters_df["Name"].tolist()[0],
description="Select an existing cluster:",
)
self.delete_button = widgets.Button(
description="Delete Cluster",
icon="trash",
tooltip="Delete the selected cluster",
)
self.list_jobs_button = widgets.Button(
description="View Jobs",
icon="suitcase",
tooltip="Open the Ray Job Dashboard",
)
self.ray_dashboard_button = widgets.Button(
description="Open Ray Dashboard",
icon="dashboard",
tooltip="Open the Ray Dashboard in a new tab",
layout=widgets.Layout(width="auto"),
)
self.refresh_data_button = widgets.Button(
description="Refresh Data",
icon="refresh",
tooltip="Refresh the list of Ray Clusters",
layout=widgets.Layout(width="auto", left="1em"),
)
# Set up interactions
self._initialize_callbacks()
self._trigger_initial_display()
def _initialize_callbacks(self):
"""
Called upon RayClusterManagerWidgets initialisation.
Sets up event handlers and callbacks for UI interactions.
"""
# Observe cluster selection
self.classification_widget.observe(
lambda selection_change: self._on_cluster_click(selection_change),
names="value",
)
# Set up button clicks
self.delete_button.on_click(lambda b: self._on_delete_button_click(b))
self.list_jobs_button.on_click(lambda b: self._on_list_jobs_button_click(b))
self.ray_dashboard_button.on_click(
lambda b: self._on_ray_dashboard_button_click(b)
)
self.refresh_data_button.on_click(
lambda b: self._on_refresh_data_button_click(b)
)
def _trigger_initial_display(self):
"""
Called upon RayClusterManagerWidgets initialisation.
Triggers an initial display update with the current cluster value.
"""
# Trigger display with initial cluster value
initial_value = self.classification_widget.value
self._on_cluster_click({"new": initial_value})
def _on_cluster_click(self, selection_change):
"""
_on_cluster_click handles the event when a cluster is selected from the toggle buttons, updating the output with cluster details.
"""
new_value = selection_change["new"]
self.classification_widget.value = new_value
self._refresh_dataframe()
def _on_delete_button_click(self, b):
"""
_on_delete_button_click handles the event when the Delete Button is clicked, deleting the selected cluster.
"""
cluster_name = self.classification_widget.value
_delete_cluster(cluster_name, self.namespace)
with self.user_output:
self.user_output.clear_output()
print(
f"Cluster {cluster_name} in the {self.namespace} namespace was deleted successfully."
)
# Refresh the dataframe
self._refresh_dataframe()
def _on_list_jobs_button_click(self, b):
"""
_on_list_jobs_button_click handles the event when the View Jobs button is clicked, opening the Ray Jobs Dashboard in a new tab
"""
from codeflare_sdk import Cluster
cluster_name = self.classification_widget.value
# Suppress from Cluster Object initialisation widgets and outputs
with widgets.Output(), contextlib.redirect_stdout(
io.StringIO()
), contextlib.redirect_stderr(io.StringIO()):
cluster = Cluster(ClusterConfiguration(cluster_name, self.namespace))
dashboard_url = cluster.cluster_dashboard_uri()
with self.user_output:
self.user_output.clear_output()
print(
f"Opening Ray Jobs Dashboard for {cluster_name} cluster:\n{dashboard_url}/#/jobs"
)
with self.url_output:
display(Javascript(f'window.open("{dashboard_url}/#/jobs", "_blank");'))
def _on_ray_dashboard_button_click(self, b):
"""
_on_ray_dashboard_button_click handles the event when the Open Ray Dashboard button is clicked, opening the Ray Dashboard in a new tab
"""
from codeflare_sdk import Cluster
cluster_name = self.classification_widget.value
# Suppress from Cluster Object initialisation widgets and outputs
with widgets.Output(), contextlib.redirect_stdout(
io.StringIO()
), contextlib.redirect_stderr(io.StringIO()):
cluster = Cluster(ClusterConfiguration(cluster_name, self.namespace))
dashboard_url = cluster.cluster_dashboard_uri()
with self.user_output:
self.user_output.clear_output()
print(f"Opening Ray Dashboard for {cluster_name} cluster:\n{dashboard_url}")
with self.url_output:
display(Javascript(f'window.open("{dashboard_url}", "_blank");'))
def _on_refresh_data_button_click(self, b):
"""
_on_refresh_button_click handles the event when the Refresh Data button is clicked, refreshing the list of Ray Clusters.
"""
self.refresh_data_button.disabled = True
self._refresh_dataframe()
self.refresh_data_button.disabled = False
def _refresh_dataframe(self):
"""
_refresh_data function refreshes the list of Ray Clusters.
"""
self.ray_clusters_df = _fetch_cluster_data(self.namespace)
if self.ray_clusters_df.empty:
self.classification_widget.close()
self.delete_button.close()
self.list_jobs_button.close()
self.ray_dashboard_button.close()
self.refresh_data_button.close()
with self.raycluster_data_output:
self.raycluster_data_output.clear_output()
print(f"No clusters found in the {self.namespace} namespace.")
else:
# Store the current selection if it still exists (Was not previously deleted).
selected_cluster = (
self.classification_widget.value
if self.classification_widget.value
in self.ray_clusters_df["Name"].tolist()
else None
)
# Update list of Ray Clusters.
self.classification_widget.options = self.ray_clusters_df["Name"].tolist()
# If the selected cluster exists, preserve the selection to remain viewing the currently selected cluster.
# If it does not exist, default to the first available cluster.
if selected_cluster:
self.classification_widget.value = selected_cluster
else:
self.classification_widget.value = self.ray_clusters_df["Name"].iloc[0]
# Update the output with the current Ray Cluster details.
self._display_cluster_details()
def _display_cluster_details(self):
"""
_display_cluster_details function displays the selected cluster details in the output widget.
"""
self.raycluster_data_output.clear_output()
selected_cluster = self.ray_clusters_df[
self.ray_clusters_df["Name"] == self.classification_widget.value
]
with self.raycluster_data_output:
display(
HTML(
selected_cluster[
[
"Name",
"Namespace",
"Num Workers",
"Head GPUs",
"Head CPU Req~Lim",
"Head Memory Req~Lim",
"Worker GPUs",
"Worker CPU Req~Lim",
"Worker Memory Req~Lim",
"status",
]
].to_html(escape=False, index=False, border=2)
)
)
[docs]
def display_widgets(self):
display(widgets.VBox([self.classification_widget, self.raycluster_data_output]))
display(
widgets.HBox(
[
self.delete_button,
self.list_jobs_button,
self.ray_dashboard_button,
self.refresh_data_button,
]
),
self.url_output,
self.user_output,
)
[docs]
def cluster_apply_down_buttons(
cluster: "codeflare_sdk.ray.cluster.cluster.Cluster",
) -> widgets.Button:
"""
The cluster_apply_down_buttons function returns two button widgets for a create and delete button.
The function uses the appwrapper bool to distinguish between resource type for the tool tip.
"""
resource = "Ray Cluster"
if cluster.config.appwrapper:
resource = "AppWrapper"
apply_button = widgets.Button(
description="Cluster Apply",
tooltip=f"Create the {resource}",
icon="play",
)
delete_button = widgets.Button(
description="Cluster Down",
tooltip=f"Delete the {resource}",
icon="trash",
)
wait_ready_check = _wait_ready_check_box()
output = widgets.Output()
# Display the buttons in an HBox wrapped in a VBox which includes the wait_ready Checkbox
button_display = widgets.HBox([apply_button, delete_button])
display(widgets.VBox([button_display, wait_ready_check]), output)
def on_apply_button_clicked(b): # Handle the apply button click event
with output:
output.clear_output()
cluster.apply()
# If the wait_ready Checkbox is clicked(value == True) trigger the wait_ready function
if wait_ready_check.value:
cluster.wait_ready()
def on_down_button_clicked(b): # Handle the down button click event
with output:
output.clear_output()
cluster.down()
apply_button.on_click(on_apply_button_clicked)
delete_button.on_click(on_down_button_clicked)
def _wait_ready_check_box():
"""
The wait_ready_check_box function will return a checkbox widget used for waiting for the resource to be in the state READY.
"""
wait_ready_check_box = widgets.Checkbox(
False,
description="Wait for Cluster?",
)
return wait_ready_check_box
[docs]
def is_notebook() -> bool:
"""
The is_notebook function checks if Jupyter Notebook environment variables exist in the given environment and return True/False based on that.
"""
if (
"PYDEVD_IPYTHON_COMPATIBLE_DEBUGGING" in os.environ
or "JPY_SESSION_NAME" in os.environ
): # If running Jupyter NBs in VsCode or RHOAI/ODH display UI buttons
return True
else:
return False
[docs]
def view_clusters(namespace: str = None):
"""
view_clusters function will display existing clusters with their specs, and handle user interactions.
"""
if not is_notebook():
warnings.warn(
"view_clusters can only be used in a Jupyter Notebook environment."
)
return # Exit function if not in Jupyter Notebook
if not namespace:
namespace = get_current_namespace()
ray_clusters_df = _fetch_cluster_data(namespace)
if ray_clusters_df.empty:
print(f"No clusters found in the {namespace} namespace.")
return
# Initialize the RayClusterManagerWidgets class
ray_cluster_manager = RayClusterManagerWidgets(
ray_clusters_df=ray_clusters_df, namespace=namespace
)
# Display the UI components
ray_cluster_manager.display_widgets()
def _delete_cluster(
cluster_name: str,
namespace: str,
timeout: int = 5,
interval: int = 1,
):
"""
_delete_cluster function deletes the cluster with the given name and namespace.
It optionally waits for the cluster to be deleted.
"""
from ...ray.cluster.cluster import _check_aw_exists
try:
config_check()
api_instance = client.CustomObjectsApi(get_api_client())
if _check_aw_exists(cluster_name, namespace):
api_instance.delete_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta2",
namespace=namespace,
plural="appwrappers",
name=cluster_name,
)
group = "workload.codeflare.dev"
version = "v1beta2"
plural = "appwrappers"
else:
api_instance.delete_namespaced_custom_object(
group="ray.io",
version="v1",
namespace=namespace,
plural="rayclusters",
name=cluster_name,
)
group = "ray.io"
version = "v1"
plural = "rayclusters"
# Wait for the resource to be deleted
while timeout > 0:
try:
api_instance.get_namespaced_custom_object(
group=group,
version=version,
namespace=namespace,
plural=plural,
name=cluster_name,
)
# Retry if resource still exists
time.sleep(interval)
timeout -= interval
if timeout <= 0:
raise TimeoutError(
f"Timeout waiting for {cluster_name} to be deleted."
)
except ApiException as e:
# Resource is deleted
if e.status == 404:
break
except Exception as e: # pragma: no cover
return _kube_api_error_handling(e)
def _fetch_cluster_data(namespace):
"""
_fetch_cluster_data function fetches all clusters and their spec in a given namespace and returns a DataFrame.
"""
from ...ray.cluster.cluster import list_all_clusters
rayclusters = list_all_clusters(namespace, False)
if not rayclusters:
return pd.DataFrame()
names = [item.name for item in rayclusters]
namespaces = [item.namespace for item in rayclusters]
num_workers = [item.num_workers for item in rayclusters]
head_extended_resources = [
(
f"{list(item.head_extended_resources.keys())[0]}: {list(item.head_extended_resources.values())[0]}"
if item.head_extended_resources
else "0"
)
for item in rayclusters
]
worker_extended_resources = [
(
f"{list(item.worker_extended_resources.keys())[0]}: {list(item.worker_extended_resources.values())[0]}"
if item.worker_extended_resources
else "0"
)
for item in rayclusters
]
head_cpu_requests = [
item.head_cpu_requests if item.head_cpu_requests else 0 for item in rayclusters
]
head_cpu_limits = [
item.head_cpu_limits if item.head_cpu_limits else 0 for item in rayclusters
]
head_cpu_rl = [
f"{requests}~{limits}"
for requests, limits in zip(head_cpu_requests, head_cpu_limits)
]
head_mem_requests = [
item.head_mem_requests if item.head_mem_requests else 0 for item in rayclusters
]
head_mem_limits = [
item.head_mem_limits if item.head_mem_limits else 0 for item in rayclusters
]
head_mem_rl = [
f"{requests}~{limits}"
for requests, limits in zip(head_mem_requests, head_mem_limits)
]
worker_cpu_requests = [
item.worker_cpu_requests if item.worker_cpu_requests else 0
for item in rayclusters
]
worker_cpu_limits = [
item.worker_cpu_limits if item.worker_cpu_limits else 0 for item in rayclusters
]
worker_cpu_rl = [
f"{requests}~{limits}"
for requests, limits in zip(worker_cpu_requests, worker_cpu_limits)
]
worker_mem_requests = [
item.worker_mem_requests if item.worker_mem_requests else 0
for item in rayclusters
]
worker_mem_limits = [
item.worker_mem_limits if item.worker_mem_limits else 0 for item in rayclusters
]
worker_mem_rl = [
f"{requests}~{limits}"
for requests, limits in zip(worker_mem_requests, worker_mem_limits)
]
status = [item.status.name for item in rayclusters]
status = [_format_status(item.status) for item in rayclusters]
data = {
"Name": names,
"Namespace": namespaces,
"Num Workers": num_workers,
"Head GPUs": head_extended_resources,
"Worker GPUs": worker_extended_resources,
"Head CPU Req~Lim": head_cpu_rl,
"Head Memory Req~Lim": head_mem_rl,
"Worker CPU Req~Lim": worker_cpu_rl,
"Worker Memory Req~Lim": worker_mem_rl,
"status": status,
}
return pd.DataFrame(data)
def _format_status(status):
"""
_format_status function formats the status enum.
"""
status_map = {
RayClusterStatus.READY: '<span style="color: green;">Ready ✓</span>',
RayClusterStatus.SUSPENDED: '<span style="color: #007BFF;">Suspended ❄️</span>',
RayClusterStatus.FAILED: '<span style="color: red;">Failed ✗</span>',
RayClusterStatus.UNHEALTHY: '<span style="color: purple;">Unhealthy</span>',
RayClusterStatus.UNKNOWN: '<span style="color: purple;">Unknown</span>',
}
return status_map.get(status, status)