Source code for kube_authkit.factory

"""
Authentication factory for strategy selection.

This module provides the main entry point for authentication and implements
the auto-detection logic that selects the appropriate authentication strategy
based on the environment and configuration.
"""

import logging
import os

from kubernetes.client import ApiClient, Configuration

from .config import AuthConfig
from .exceptions import AuthenticationError, ConfigurationError
from .strategies.base import AuthStrategy
from .strategies.incluster import InClusterStrategy
from .strategies.kubeconfig import KubeConfigStrategy
from .strategies.oidc import OIDCStrategy
from .strategies.openshift import OpenShiftOAuthStrategy

logger = logging.getLogger(__name__)


def get_k8s_config(config: AuthConfig | None = None) -> Configuration:
    """Get authenticated Kubernetes client configuration.

    This function returns just the Configuration object, allowing users to
    inspect or customize it before creating an ApiClient. This is useful for
    advanced use cases where fine-grained control over the client configuration
    is needed.

    Args:
        config: AuthConfig instance. Must specify a method.
            If None, raises ConfigurationError.

    Returns:
        Configured Kubernetes Configuration object with authentication set up

    Raises:
        ConfigurationError: If configuration is invalid or method not specified
        AuthenticationError: If authentication fails
        StrategyNotAvailableError: If requested method is not available

    Example:
        >>> # Explicit auth config
        >>> auth_config = AuthConfig(method="oidc", oidc_issuer="...", client_id="...")
        >>> k8s_config = get_k8s_config(auth_config)
        >>> k8s_config.debug = True  # Enable debug logging
        >>> api_client = ApiClient(k8s_config)
        >>>
        >>> # Auto-detection (opt-in)
        >>> k8s_config = get_k8s_config(AuthConfig(method="auto"))
    """
    # Use default config if none provided
    if config is None:
        config = AuthConfig()

    logger.debug(f"Getting Kubernetes configuration with config: {config}")

    # Select and execute authentication strategy
    factory = AuthFactory(config)
    strategy = factory.get_strategy()

    logger.info(f"Using authentication strategy: {strategy.get_description()}")

    # Authenticate and return Configuration
    api_client = strategy.authenticate()
    return api_client.configuration


[docs] def get_k8s_client(config: AuthConfig | None = None) -> ApiClient: """Get authenticated Kubernetes API client. This is the main entry point for the library. It automatically detects the best authentication method based on the environment and configuration, then returns a ready-to-use Kubernetes ApiClient. Args: config: AuthConfig instance. Must specify a method. If None, raises ConfigurationError. Returns: Configured Kubernetes ApiClient ready to make API calls Raises: ConfigurationError: If configuration is invalid or method not specified AuthenticationError: If authentication fails StrategyNotAvailableError: If requested method is not available Example: >>> # Explicit configuration >>> config = AuthConfig(method="oidc", oidc_issuer="...", client_id="...") >>> api_client = get_k8s_client(config) >>> v1 = client.CoreV1Api(api_client) >>> >>> # Auto-detection (opt-in) >>> api_client = get_k8s_client(AuthConfig(method="auto")) Note: This is a convenience wrapper around get_k8s_config(). If you need to customize the configuration before creating the client, use get_k8s_config() instead. """ # Get authenticated configuration and wrap in ApiClient return ApiClient(get_k8s_config(config))
def get_token(config: AuthConfig | None = None) -> str: """Get raw authentication token. Authenticates using the selected strategy and returns the raw bearer token string. This is useful for consumers that need the token itself rather than a pre-configured Kubernetes client (e.g., Feast). Args: config: Optional AuthConfig. If None, uses auto-detection with defaults. Returns: Raw bearer token string Raises: ConfigurationError: If configuration is invalid AuthenticationError: If authentication fails or no token is available """ if config is None: config = AuthConfig() factory = AuthFactory(config) strategy = factory.get_strategy() strategy.authenticate() return strategy.get_token() class AuthFactory: """Factory for selecting and creating authentication strategies. This class implements the strategy selection logic, including auto-detection of the best authentication method based on the current environment. Args: config: AuthConfig instance with authentication parameters """ def __init__(self, config: AuthConfig) -> None: """Initialize factory with configuration. Args: config: AuthConfig instance """ self.config = config def get_strategy(self) -> AuthStrategy: """Select and return appropriate authentication strategy. This method implements the auto-detection logic when method="auto", or returns the explicitly requested strategy when method is specified. Returns: AuthStrategy instance ready to authenticate Raises: ConfigurationError: If method is invalid or required params missing StrategyNotAvailableError: If requested strategy not available """ # If method is explicitly specified, use that if self.config.method != "auto": return self._get_strategy_by_name(self.config.method) # Auto-detection logic logger.debug("Auto-detecting authentication method") return self._auto_detect_strategy() def _get_strategy_by_name(self, method: str) -> AuthStrategy: """Get strategy by explicit method name. Args: method: Strategy name ("kubeconfig", "incluster", "oidc", "openshift") Returns: AuthStrategy instance Raises: ConfigurationError: If method is unknown StrategyNotAvailableError: If strategy prerequisites not met """ strategy_map = { "kubeconfig": KubeConfigStrategy, "incluster": InClusterStrategy, "oidc": OIDCStrategy, "openshift": OpenShiftOAuthStrategy, } strategy_class = strategy_map.get(method) if strategy_class is None: available = ", ".join(sorted(strategy_map.keys())) raise ConfigurationError( f"Unknown authentication method: {method}", f"Available methods: {available}" ) # Create and validate strategy strategy = strategy_class(self.config) if not strategy.is_available(): raise ConfigurationError( f"Authentication method '{method}' is not available in this environment", f"Strategy check failed: {strategy.get_description()}", ) return strategy def _auto_detect_strategy(self) -> AuthStrategy: """Auto-detect best authentication strategy for current environment. Detection precedence: 1. Environment variables (OIDC_CLIENT_ID, etc.) -> OIDC 2. In-cluster (KUBERNETES_SERVICE_HOST, service account) -> InCluster 3. KubeConfig (~/.kube/config or KUBECONFIG) -> KubeConfig 4. Interactive fallback (Phase 2) -> OIDC with device/browser flow Returns: AuthStrategy instance Raises: AuthenticationError: If no suitable strategy found """ # Check for OIDC environment variables if self._has_oidc_env_vars(): logger.debug("Detected OIDC environment variables") oidc_strategy = OIDCStrategy(self.config) if oidc_strategy.is_available(): logger.debug("OIDC authentication available") return oidc_strategy # Check for OpenShift OAuth (token or k8s_api_host) if self.config.token or os.getenv("AUTHKIT_TOKEN") or os.getenv("OPENSHIFT_TOKEN"): logger.debug("Detected authentication token") openshift_strategy = OpenShiftOAuthStrategy(self.config) if openshift_strategy.is_available(): logger.debug("OpenShift OAuth available") return openshift_strategy # Check for in-cluster environment incluster_strategy = InClusterStrategy(self.config) if incluster_strategy.is_available(): logger.debug("Detected in-cluster environment") return incluster_strategy # Check for kubeconfig kubeconfig_strategy = KubeConfigStrategy(self.config) if kubeconfig_strategy.is_available(): logger.debug("Detected kubeconfig") return kubeconfig_strategy # No strategy available raise AuthenticationError( "No authentication method available", "Could not find any valid authentication credentials. Tried:\n" "1. OIDC environment variables (OIDC_ISSUER, OIDC_CLIENT_ID) - not found or invalid\n" "2. In-cluster service account - not available\n" "3. KubeConfig file (~/.kube/config) - not found\n\n" "Please provide authentication credentials using one of these methods:\n" "- Use kubectl to log in and generate ~/.kube/config\n" "- Run inside a Kubernetes Pod with a service account\n" "- Provide explicit OIDC configuration via AuthConfig\n" "- Set OIDC environment variables (OIDC_ISSUER, OIDC_CLIENT_ID)", ) def _has_oidc_env_vars(self) -> bool: """Check if OIDC environment variables are present. Returns: True if OIDC env vars are set, False otherwise """ # Check for minimal OIDC configuration in environment has_issuer = os.getenv("AUTHKIT_OIDC_ISSUER") is not None has_client_id = os.getenv("AUTHKIT_CLIENT_ID") is not None return has_issuer and has_client_id