Source code for kube_authkit.config

"""
Configuration dataclass for authentication.

This module provides the AuthConfig dataclass that centralizes all
authentication configuration options.
"""

import os
import warnings
from dataclasses import dataclass, field

from .exceptions import ConfigurationError


[docs] @dataclass class AuthConfig: """Configuration for Kubernetes/OpenShift authentication. This dataclass holds all configuration options for authenticating to a Kubernetes or OpenShift cluster. It supports multiple authentication methods and can be configured explicitly or through environment variables. Args: method: Authentication method to use. Must be specified. Options: - "auto": Auto-detect best method by probing the environment - "kubeconfig": Use ~/.kube/config or KUBECONFIG - "incluster": Use in-cluster service account - "oidc": Use OpenID Connect - "openshift": Use OpenShift OAuth - None (default): Raises ConfigurationError; caller must choose k8s_api_host: Kubernetes API server URL (auto-detected if None) oidc_issuer: OIDC issuer URL (required for OIDC) client_id: OIDC client ID (required for OIDC) client_secret: OIDC client secret (for confidential clients) token: Bearer token for authentication (optional, for OIDC/OpenShift) scopes: OIDC scopes to request use_device_flow: Use Device Code Flow instead of Authorization Code use_keyring: Store refresh tokens in system keyring oidc_callback_port: Port for OAuth callback server (default: 8080) ca_cert: Path to custom CA certificate bundle verify_ssl: Verify SSL certificates (WARNING: only disable for development) kubeconfig_path: Path to kubeconfig file (overrides KUBECONFIG env var) Example: >>> # Explicit OIDC with device flow >>> config = AuthConfig( ... method="oidc", ... oidc_issuer="https://keycloak.example.com/auth/realms/myrealm", ... client_id="my-client", ... use_device_flow=True ... ) >>> >>> # Auto-detection (opt-in) >>> config = AuthConfig(method="auto") """ method: str | None = None k8s_api_host: str | None = None oidc_issuer: str | None = None client_id: str | None = None client_secret: str | None = None token: str | None = None scopes: list[str] = field(default_factory=lambda: ["openid"]) use_device_flow: bool = False use_keyring: bool = False oidc_callback_port: int = 8080 ca_cert: str | None = None verify_ssl: bool = True kubeconfig_path: str | None = None def __post_init__(self) -> None: """Validate configuration after initialization. This method is automatically called after __init__ to: 1. Load configuration from environment variables if not explicitly set 2. Validate that required parameters are present for the chosen method 3. Emit security warnings for dangerous configurations Raises: ConfigurationError: If configuration is invalid or incomplete """ # Load from environment variables if not explicitly set self._load_from_environment() # Validate that method is specified if self.method is None: raise ConfigurationError( "Authentication method must be specified", "Provide method='auto' to auto-detect, or specify an explicit method " "(e.g., method='kubeconfig', method='oidc', method='incluster', " "method='openshift')", ) # Normalize method name self.method = self.method.lower() # Validate method is known valid_methods = {"auto", "kubeconfig", "incluster", "oidc", "openshift"} if self.method not in valid_methods: raise ConfigurationError( f"Invalid authentication method: {self.method}", f"Valid methods are: {', '.join(sorted(valid_methods))}", ) # Validate OIDC-specific configuration if self.method == "oidc": if not self.oidc_issuer: raise ConfigurationError( "OIDC authentication requires 'oidc_issuer' parameter", "Provide oidc_issuer or set OIDC_ISSUER environment variable", ) if not self.client_id: raise ConfigurationError( "OIDC authentication requires 'client_id' parameter", "Provide client_id or set OIDC_CLIENT_ID environment variable", ) # Validate issuer URL format if not self.oidc_issuer.startswith(("https://", "http://")): raise ConfigurationError( f"OIDC issuer must be a valid URL: {self.oidc_issuer}", "issuer should start with https:// (or http:// for local development)", ) # Warn about http:// (insecure) if self.oidc_issuer.startswith("http://"): warnings.warn( f"OIDC issuer uses insecure http:// protocol: {self.oidc_issuer}. " "This should only be used for local development.", SecurityWarning, stacklevel=2, ) # Security warning for disabled SSL verification if not self.verify_ssl: warnings.warn( "TLS/SSL verification is disabled (verify_ssl=False). " "This is insecure and should only be used in development environments. " "Your credentials and data may be exposed to man-in-the-middle attacks.", SecurityWarning, stacklevel=2, ) # Validate CA cert path if provided if self.ca_cert and not os.path.exists(self.ca_cert): raise ConfigurationError( f"CA certificate file not found: {self.ca_cert}", "Provide a valid path to a CA certificate bundle", ) # Validate kubeconfig path if provided if self.kubeconfig_path and not os.path.exists(self.kubeconfig_path): raise ConfigurationError( f"Kubeconfig file not found: {self.kubeconfig_path}", "Provide a valid path to a kubeconfig file", ) def _load_from_environment(self) -> None: """Load configuration from environment variables. This method loads configuration from environment variables for parameters that were not explicitly set. It allows users to configure authentication through environment variables without modifying code. Environment Variables: AUTHKIT_OIDC_ISSUER: OIDC issuer URL AUTHKIT_CLIENT_ID: OIDC client ID AUTHKIT_CLIENT_SECRET: OIDC client secret AUTHKIT_TOKEN: Bearer token for authentication AUTHKIT_API_HOST: Kubernetes API server URL KUBECONFIG: Path to kubeconfig file (standard Kubernetes variable) """ # Load OIDC configuration from environment if not self.oidc_issuer: self.oidc_issuer = os.getenv("AUTHKIT_OIDC_ISSUER") if not self.client_id: self.client_id = os.getenv("AUTHKIT_CLIENT_ID") if not self.client_secret: self.client_secret = os.getenv("AUTHKIT_CLIENT_SECRET") # Load token from environment (support legacy OPENSHIFT_TOKEN) if not self.token: self.token = os.getenv("AUTHKIT_TOKEN") or os.getenv("OPENSHIFT_TOKEN") # Load kubeconfig path from environment (standard Kubernetes variable) if not self.kubeconfig_path: self.kubeconfig_path = os.getenv("KUBECONFIG") # Load API host from environment if not self.k8s_api_host: self.k8s_api_host = os.getenv("AUTHKIT_API_HOST") # Validate callback port if self.oidc_callback_port < 1 or self.oidc_callback_port > 65535: raise ConfigurationError( f"Invalid OIDC callback port: {self.oidc_callback_port}", "Port must be between 1 and 65535", ) def __repr__(self) -> str: """Return string representation with sensitive fields redacted. This ensures that sensitive information like client_secret is not exposed in logs or error messages. Returns: String representation with secrets redacted """ # Create a copy of the config dict and redact sensitive fields config_dict = { "method": self.method, "k8s_api_host": self.k8s_api_host, "oidc_issuer": self.oidc_issuer, "client_id": self.client_id, "client_secret": "***REDACTED***" if self.client_secret else None, "token": "***REDACTED***" if self.token else None, "scopes": self.scopes, "use_device_flow": self.use_device_flow, "use_keyring": self.use_keyring, "oidc_callback_port": self.oidc_callback_port, "ca_cert": self.ca_cert, "verify_ssl": self.verify_ssl, "kubeconfig_path": self.kubeconfig_path, } params = ", ".join(f"{k}={v!r}" for k, v in config_dict.items() if v is not None) return f"AuthConfig({params})"
[docs] @classmethod def from_dict(cls, config_dict: dict) -> "AuthConfig": """Create AuthConfig from dictionary. This is useful for loading configuration from JSON or YAML files. Args: config_dict: Dictionary containing configuration parameters Returns: AuthConfig instance Example: >>> config_data = { ... "method": "oidc", ... "oidc_issuer": "https://keycloak.example.com", ... "client_id": "my-client" ... } >>> config = AuthConfig.from_dict(config_data) """ # Filter out unknown keys to avoid TypeError valid_fields = {f.name for f in cls.__dataclass_fields__.values()} filtered_dict = {k: v for k, v in config_dict.items() if k in valid_fields} return cls(**filtered_dict)
class SecurityWarning(UserWarning): """Warning category for security-related issues. This custom warning category allows users to filter security warnings separately from other warnings if desired. """ pass