Coverage for mcpgateway / utils / keycloak_discovery.py: 100%
46 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/utils/keycloak_discovery.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Keycloak OIDC endpoint discovery utility.
8"""
10# Standard
11import logging
12from typing import Dict, Optional
14# Third-Party
15import httpx
17# Logger
18logger = logging.getLogger(__name__)
21async def discover_keycloak_endpoints(base_url: str, realm: str, timeout: int = 10) -> Optional[Dict[str, str]]:
22 """
23 Discover Keycloak OIDC endpoints from well-known configuration.
25 Args:
26 base_url: Keycloak base URL (e.g., https://keycloak.example.com)
27 realm: Realm name (e.g., master)
28 timeout: HTTP request timeout in seconds
30 Returns:
31 Dict containing authorization_url, token_url, userinfo_url, issuer, jwks_uri
32 Returns None if discovery fails
34 Examples:
35 >>> import asyncio
36 >>> # Mock successful discovery
37 >>> async def test():
38 ... # This would require a real Keycloak instance
39 ... result = await discover_keycloak_endpoints('https://keycloak.example.com', 'master')
40 ... return result is None or isinstance(result, dict)
41 >>> asyncio.run(test())
42 True
43 """
44 well_known_url = f"{base_url}/realms/{realm}/.well-known/openid-configuration"
46 try:
47 # First-Party
48 from mcpgateway.services.http_client_service import get_http_client # pylint: disable=import-outside-toplevel
50 client = await get_http_client()
51 logger.info(f"Discovering Keycloak endpoints from {well_known_url}")
52 response = await client.get(well_known_url, timeout=timeout)
53 response.raise_for_status()
54 config = response.json()
56 endpoints = {
57 "authorization_url": config.get("authorization_endpoint"),
58 "token_url": config.get("token_endpoint"),
59 "userinfo_url": config.get("userinfo_endpoint"),
60 "issuer": config.get("issuer"),
61 "jwks_uri": config.get("jwks_uri"),
62 }
64 # Validate that all required endpoints are present
65 if not all(endpoints.values()):
66 logger.error(f"Incomplete OIDC configuration from {well_known_url}")
67 return None
69 logger.info(f"Successfully discovered Keycloak endpoints for realm '{realm}'")
70 return endpoints
72 except httpx.HTTPError as e:
73 logger.error(f"Failed to discover Keycloak endpoints from {well_known_url}: {e}")
74 return None
75 except Exception as e:
76 logger.error(f"Unexpected error discovering Keycloak endpoints: {e}")
77 return None
80def discover_keycloak_endpoints_sync(base_url: str, realm: str, timeout: int = 10) -> Optional[Dict[str, str]]:
81 """
82 Synchronous version of discover_keycloak_endpoints.
84 Args:
85 base_url: Keycloak base URL (e.g., https://keycloak.example.com)
86 realm: Realm name (e.g., master)
87 timeout: HTTP request timeout in seconds
89 Returns:
90 Dict containing authorization_url, token_url, userinfo_url, issuer, jwks_uri
91 Returns None if discovery fails
92 """
93 well_known_url = f"{base_url}/realms/{realm}/.well-known/openid-configuration"
95 try:
96 # First-Party
97 from mcpgateway.config import settings # pylint: disable=import-outside-toplevel
99 with httpx.Client(
100 timeout=timeout,
101 limits=httpx.Limits(
102 max_connections=settings.httpx_max_connections,
103 max_keepalive_connections=settings.httpx_max_keepalive_connections,
104 keepalive_expiry=settings.httpx_keepalive_expiry,
105 ),
106 verify=not settings.skip_ssl_verify,
107 ) as client:
108 logger.info(f"Discovering Keycloak endpoints from {well_known_url}")
109 response = client.get(well_known_url)
110 response.raise_for_status()
111 config = response.json()
113 endpoints = {
114 "authorization_url": config.get("authorization_endpoint"),
115 "token_url": config.get("token_endpoint"),
116 "userinfo_url": config.get("userinfo_endpoint"),
117 "issuer": config.get("issuer"),
118 "jwks_uri": config.get("jwks_uri"),
119 }
121 # Validate that all required endpoints are present
122 if not all(endpoints.values()):
123 logger.error(f"Incomplete OIDC configuration from {well_known_url}")
124 return None
126 logger.info(f"Successfully discovered Keycloak endpoints for realm '{realm}'")
127 return endpoints
129 except httpx.HTTPError as e:
130 logger.error(f"Failed to discover Keycloak endpoints from {well_known_url}: {e}")
131 return None
132 except Exception as e:
133 logger.error(f"Unexpected error discovering Keycloak endpoints: {e}")
134 return None