Coverage for mcpgateway / utils / keycloak_discovery.py: 100%
69 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/utils/keycloak_discovery.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Keycloak OIDC endpoint discovery utility.
8"""
10# Standard
11import logging
12from typing import Dict, Optional
13from urllib.parse import urlsplit, urlunsplit
15# Third-Party
16import httpx
18# Logger
19logger = logging.getLogger(__name__)
22def _rewrite_endpoint_base(endpoint_url: Optional[str], target_base_url: Optional[str], endpoint_type: str) -> Optional[str]:
23 """Rewrite a discovered endpoint URL to use a target base URL.
25 Keeps the discovered path/query/fragment and swaps scheme+host+port only.
27 Args:
28 endpoint_url: Endpoint URL discovered from OIDC metadata.
29 target_base_url: Replacement base URL to apply (scheme/host/port).
30 endpoint_type: Endpoint identifier used for logging.
32 Returns:
33 Rewritten URL when target base is valid and differs, otherwise the original URL.
34 """
35 if not endpoint_url or not target_base_url:
36 return endpoint_url
38 parsed_endpoint = urlsplit(endpoint_url)
39 parsed_base = urlsplit(target_base_url)
41 if not parsed_base.scheme or not parsed_base.netloc:
42 return endpoint_url
44 if parsed_endpoint.scheme == parsed_base.scheme and parsed_endpoint.netloc == parsed_base.netloc:
45 return endpoint_url
47 rewritten = urlunsplit(
48 (
49 parsed_base.scheme,
50 parsed_base.netloc,
51 parsed_endpoint.path,
52 parsed_endpoint.query,
53 parsed_endpoint.fragment,
54 )
55 )
56 logger.info("Rewrote Keycloak %s URL from %s to %s", endpoint_type, endpoint_url, rewritten)
57 return rewritten
60async def discover_keycloak_endpoints(base_url: str, realm: str, timeout: int = 10, public_base_url: Optional[str] = None) -> Optional[Dict[str, str]]:
61 """
62 Discover Keycloak OIDC endpoints from well-known configuration.
64 Args:
65 base_url: Keycloak base URL (e.g., https://keycloak.example.com)
66 realm: Realm name (e.g., master)
67 timeout: HTTP request timeout in seconds
68 public_base_url: Optional browser-facing Keycloak base URL for authorization URL rewrite
70 Returns:
71 Dict containing authorization_url, token_url, userinfo_url, issuer, jwks_uri
72 Returns None if discovery fails
74 Examples:
75 >>> import asyncio
76 >>> # Mock successful discovery
77 >>> async def test():
78 ... # This would require a real Keycloak instance
79 ... result = await discover_keycloak_endpoints('https://keycloak.example.com', 'master')
80 ... return result is None or isinstance(result, dict)
81 >>> asyncio.run(test())
82 True
83 """
84 well_known_url = f"{base_url}/realms/{realm}/.well-known/openid-configuration"
86 try:
87 # First-Party
88 from mcpgateway.services.http_client_service import get_http_client # pylint: disable=import-outside-toplevel
90 client = await get_http_client()
91 logger.info(f"Discovering Keycloak endpoints from {well_known_url}")
92 response = await client.get(well_known_url, timeout=timeout)
93 response.raise_for_status()
94 config = response.json()
96 endpoints = {
97 "authorization_url": config.get("authorization_endpoint"),
98 "token_url": config.get("token_endpoint"),
99 "userinfo_url": config.get("userinfo_endpoint"),
100 "issuer": config.get("issuer"),
101 "jwks_uri": config.get("jwks_uri"),
102 }
104 # Use optional browser-facing base for authorization endpoint and issuer
105 # (tokens issued via browser flow contain the public-facing issuer) while
106 # keeping token/userinfo/jwks endpoints reachable from the gateway runtime.
107 endpoints["authorization_url"] = _rewrite_endpoint_base(endpoints.get("authorization_url"), public_base_url, "authorization")
108 endpoints["issuer"] = _rewrite_endpoint_base(endpoints.get("issuer"), public_base_url, "issuer")
109 endpoints["token_url"] = _rewrite_endpoint_base(endpoints.get("token_url"), base_url, "token")
110 endpoints["userinfo_url"] = _rewrite_endpoint_base(endpoints.get("userinfo_url"), base_url, "userinfo")
111 endpoints["jwks_uri"] = _rewrite_endpoint_base(endpoints.get("jwks_uri"), base_url, "jwks")
113 # Validate that all required endpoints are present
114 if not all(endpoints.values()):
115 logger.error(f"Incomplete OIDC configuration from {well_known_url}")
116 return None
118 logger.info(f"Successfully discovered Keycloak endpoints for realm '{realm}'")
119 return endpoints
121 except httpx.HTTPError as e:
122 logger.error(f"Failed to discover Keycloak endpoints from {well_known_url}: {e}")
123 return None
124 except Exception as e:
125 logger.error(f"Unexpected error discovering Keycloak endpoints: {e}")
126 return None
129def discover_keycloak_endpoints_sync(base_url: str, realm: str, timeout: int = 10, public_base_url: Optional[str] = None) -> Optional[Dict[str, str]]:
130 """
131 Synchronous version of discover_keycloak_endpoints.
133 Args:
134 base_url: Keycloak base URL (e.g., https://keycloak.example.com)
135 realm: Realm name (e.g., master)
136 timeout: HTTP request timeout in seconds
137 public_base_url: Optional browser-facing Keycloak base URL for authorization URL rewrite
139 Returns:
140 Dict containing authorization_url, token_url, userinfo_url, issuer, jwks_uri
141 Returns None if discovery fails
142 """
143 well_known_url = f"{base_url}/realms/{realm}/.well-known/openid-configuration"
145 try:
146 # First-Party
147 from mcpgateway.config import settings # pylint: disable=import-outside-toplevel
149 with httpx.Client(
150 timeout=timeout,
151 limits=httpx.Limits(
152 max_connections=settings.httpx_max_connections,
153 max_keepalive_connections=settings.httpx_max_keepalive_connections,
154 keepalive_expiry=settings.httpx_keepalive_expiry,
155 ),
156 verify=not settings.skip_ssl_verify,
157 ) as client:
158 logger.info(f"Discovering Keycloak endpoints from {well_known_url}")
159 response = client.get(well_known_url)
160 response.raise_for_status()
161 config = response.json()
163 endpoints = {
164 "authorization_url": config.get("authorization_endpoint"),
165 "token_url": config.get("token_endpoint"),
166 "userinfo_url": config.get("userinfo_endpoint"),
167 "issuer": config.get("issuer"),
168 "jwks_uri": config.get("jwks_uri"),
169 }
171 # Use optional browser-facing base for authorization endpoint and issuer
172 # (tokens issued via browser flow contain the public-facing issuer) while
173 # keeping token/userinfo/jwks endpoints reachable from the gateway runtime.
174 endpoints["authorization_url"] = _rewrite_endpoint_base(endpoints.get("authorization_url"), public_base_url, "authorization")
175 endpoints["issuer"] = _rewrite_endpoint_base(endpoints.get("issuer"), public_base_url, "issuer")
176 endpoints["token_url"] = _rewrite_endpoint_base(endpoints.get("token_url"), base_url, "token")
177 endpoints["userinfo_url"] = _rewrite_endpoint_base(endpoints.get("userinfo_url"), base_url, "userinfo")
178 endpoints["jwks_uri"] = _rewrite_endpoint_base(endpoints.get("jwks_uri"), base_url, "jwks")
180 # Validate that all required endpoints are present
181 if not all(endpoints.values()):
182 logger.error(f"Incomplete OIDC configuration from {well_known_url}")
183 return None
185 logger.info(f"Successfully discovered Keycloak endpoints for realm '{realm}'")
186 return endpoints
188 except httpx.HTTPError as e:
189 logger.error(f"Failed to discover Keycloak endpoints from {well_known_url}: {e}")
190 return None
191 except Exception as e:
192 logger.error(f"Unexpected error discovering Keycloak endpoints: {e}")
193 return None