Source code for codeflare_sdk.common.kubernetes_cluster.auth

# Copyright 2022 IBM, Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
The auth sub-module contains the definitions for the Authentication objects, which represent
the methods by which a user can authenticate to their cluster(s). The abstract class, `Authentication`,
contains two required methods `login()` and `logout()`. Users can use one of the existing concrete classes to
authenticate to their cluster or add their own custom concrete classes here.
"""

import abc
from kubernetes import client, config
import os
import urllib3
from .kube_api_helpers import _kube_api_error_handling

from typing import Optional

global api_client
api_client = None
global config_path
config_path = None

WORKBENCH_CA_CERT_PATH = "/etc/pki/tls/custom-certs/ca-bundle.crt"


[docs] class Authentication(metaclass=abc.ABCMeta): """ An abstract class that defines the necessary methods for authenticating to a remote environment. Specifically, this class defines the need for a `login()` and a `logout()` function. """
[docs] def login(self): """ Method for logging in to a remote cluster. """ pass
[docs] def logout(self): """ Method for logging out of the remote cluster. """ pass
[docs] class KubeConfiguration(metaclass=abc.ABCMeta): """ An abstract class that defines the method for loading a user defined config file using the `load_kube_config()` function """
[docs] def load_kube_config(self): """ Method for setting your Kubernetes configuration to a certain file """ pass
[docs] def logout(self): """ Method for logging out of the remote cluster """ pass
[docs] class TokenAuthentication(Authentication): """ `TokenAuthentication` is a subclass of `Authentication`. It can be used to authenticate to a Kubernetes cluster when the user has an API token and the API server address. """ def __init__( self, token: str, server: str, skip_tls: bool = False, ca_cert_path: str = None, ): """ Initialize a TokenAuthentication object that requires a value for `token`, the API Token and `server`, the API server address for authenticating to a Kubernetes cluster. """ self.token = token self.server = server self.skip_tls = skip_tls self.ca_cert_path = _gen_ca_cert_path(ca_cert_path)
[docs] def login(self) -> str: """ This function is used to log in to a Kubernetes cluster using the user's API token and API server address. Depending on the cluster, a user can choose to login in with `--insecure-skip-tls-verify` by setting `skip_tls` to `True` or `--certificate-authority` by setting `skip_tls` to False and providing a path to a ca bundle with `ca_cert_path`. """ global config_path global api_client try: configuration = client.Configuration() configuration.api_key_prefix["authorization"] = "Bearer" configuration.host = self.server configuration.api_key["authorization"] = self.token if self.skip_tls: urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) print("Insecure request warnings have been disabled") configuration.verify_ssl = False api_client = client.ApiClient(configuration) if not self.skip_tls: _client_with_cert(api_client, self.ca_cert_path) client.AuthenticationApi(api_client).get_api_group() config_path = None return "Logged into %s" % self.server except client.ApiException as e: _kube_api_error_handling(e)
[docs] def logout(self) -> str: """ This function is used to logout of a Kubernetes cluster. """ global config_path config_path = None global api_client api_client = None return "Successfully logged out of %s" % self.server
[docs] class KubeConfigFileAuthentication(KubeConfiguration): """ A class that defines the necessary methods for passing a user's own Kubernetes config file. Specifically this class defines the `load_kube_config()` and `config_check()` functions. """ def __init__(self, kube_config_path: str = None): self.kube_config_path = kube_config_path
[docs] def load_kube_config(self): """ Function for loading a user's own predefined Kubernetes config file. """ global config_path global api_client try: if self.kube_config_path == None: return "Please specify a config file path" config_path = self.kube_config_path api_client = None config.load_kube_config(config_path) response = "Loaded user config file at path %s" % self.kube_config_path except config.ConfigException: # pragma: no cover config_path = None raise Exception("Please specify a config file path") return response
[docs] def config_check() -> str: """ Check and load the Kubernetes config from the default location. This function checks if a Kubernetes config file exists at the default path (`~/.kube/config`). If none is provided, it tries to load in-cluster config. If the `config_path` global variable is set by an external module (e.g., `auth.py`), this path will be used directly. Returns: str: The loaded config path if successful. Raises: PermissionError: If no valid credentials or config file is found. """ global config_path global api_client home_directory = os.path.expanduser("~") if config_path == None and api_client == None: if os.path.isfile("%s/.kube/config" % home_directory): try: config.load_kube_config() except Exception as e: # pragma: no cover _kube_api_error_handling(e) elif "KUBERNETES_PORT" in os.environ: try: config.load_incluster_config() except Exception as e: # pragma: no cover _kube_api_error_handling(e) else: raise PermissionError( "Action not permitted, have you put in correct/up-to-date auth credentials?" ) if config_path != None and api_client == None: return config_path
def _client_with_cert(client: client.ApiClient, ca_cert_path: Optional[str] = None): client.configuration.verify_ssl = True cert_path = _gen_ca_cert_path(ca_cert_path) if cert_path is None: client.configuration.ssl_ca_cert = None elif os.path.isfile(cert_path): client.configuration.ssl_ca_cert = cert_path else: raise FileNotFoundError(f"Certificate file not found at {cert_path}") def _gen_ca_cert_path(ca_cert_path: Optional[str]): """Gets the path to the default CA certificate file either through env config or default path""" if ca_cert_path is not None: return ca_cert_path elif "CF_SDK_CA_CERT_PATH" in os.environ: return os.environ.get("CF_SDK_CA_CERT_PATH") elif os.path.exists(WORKBENCH_CA_CERT_PATH): return WORKBENCH_CA_CERT_PATH else: return None
[docs] def get_api_client() -> client.ApiClient: """ Retrieve the Kubernetes API client with the default configuration. This function returns the current API client instance if already loaded, or creates a new API client with the default configuration. Returns: client.ApiClient: The Kubernetes API client object. """ if api_client != None: return api_client to_return = client.ApiClient() _client_with_cert(to_return) return to_return