Coverage for mcpgateway / utils / keycloak_discovery.py: 100%

69 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/utils/keycloak_discovery.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Keycloak OIDC endpoint discovery utility. 

8""" 

9 

10# Standard 

11import logging 

12from typing import Dict, Optional 

13from urllib.parse import urlsplit, urlunsplit 

14 

15# Third-Party 

16import httpx 

17 

18# Logger 

19logger = logging.getLogger(__name__) 

20 

21 

22def _rewrite_endpoint_base(endpoint_url: Optional[str], target_base_url: Optional[str], endpoint_type: str) -> Optional[str]: 

23 """Rewrite a discovered endpoint URL to use a target base URL. 

24 

25 Keeps the discovered path/query/fragment and swaps scheme+host+port only. 

26 

27 Args: 

28 endpoint_url: Endpoint URL discovered from OIDC metadata. 

29 target_base_url: Replacement base URL to apply (scheme/host/port). 

30 endpoint_type: Endpoint identifier used for logging. 

31 

32 Returns: 

33 Rewritten URL when target base is valid and differs, otherwise the original URL. 

34 """ 

35 if not endpoint_url or not target_base_url: 

36 return endpoint_url 

37 

38 parsed_endpoint = urlsplit(endpoint_url) 

39 parsed_base = urlsplit(target_base_url) 

40 

41 if not parsed_base.scheme or not parsed_base.netloc: 

42 return endpoint_url 

43 

44 if parsed_endpoint.scheme == parsed_base.scheme and parsed_endpoint.netloc == parsed_base.netloc: 

45 return endpoint_url 

46 

47 rewritten = urlunsplit( 

48 ( 

49 parsed_base.scheme, 

50 parsed_base.netloc, 

51 parsed_endpoint.path, 

52 parsed_endpoint.query, 

53 parsed_endpoint.fragment, 

54 ) 

55 ) 

56 logger.info("Rewrote Keycloak %s URL from %s to %s", endpoint_type, endpoint_url, rewritten) 

57 return rewritten 

58 

59 

60async def discover_keycloak_endpoints(base_url: str, realm: str, timeout: int = 10, public_base_url: Optional[str] = None) -> Optional[Dict[str, str]]: 

61 """ 

62 Discover Keycloak OIDC endpoints from well-known configuration. 

63 

64 Args: 

65 base_url: Keycloak base URL (e.g., https://keycloak.example.com) 

66 realm: Realm name (e.g., master) 

67 timeout: HTTP request timeout in seconds 

68 public_base_url: Optional browser-facing Keycloak base URL for authorization URL rewrite 

69 

70 Returns: 

71 Dict containing authorization_url, token_url, userinfo_url, issuer, jwks_uri 

72 Returns None if discovery fails 

73 

74 Examples: 

75 >>> import asyncio 

76 >>> # Mock successful discovery 

77 >>> async def test(): 

78 ... # This would require a real Keycloak instance 

79 ... result = await discover_keycloak_endpoints('https://keycloak.example.com', 'master') 

80 ... return result is None or isinstance(result, dict) 

81 >>> asyncio.run(test()) 

82 True 

83 """ 

84 well_known_url = f"{base_url}/realms/{realm}/.well-known/openid-configuration" 

85 

86 try: 

87 # First-Party 

88 from mcpgateway.services.http_client_service import get_http_client # pylint: disable=import-outside-toplevel 

89 

90 client = await get_http_client() 

91 logger.info(f"Discovering Keycloak endpoints from {well_known_url}") 

92 response = await client.get(well_known_url, timeout=timeout) 

93 response.raise_for_status() 

94 config = response.json() 

95 

96 endpoints = { 

97 "authorization_url": config.get("authorization_endpoint"), 

98 "token_url": config.get("token_endpoint"), 

99 "userinfo_url": config.get("userinfo_endpoint"), 

100 "issuer": config.get("issuer"), 

101 "jwks_uri": config.get("jwks_uri"), 

102 } 

103 

104 # Use optional browser-facing base for authorization endpoint and issuer 

105 # (tokens issued via browser flow contain the public-facing issuer) while 

106 # keeping token/userinfo/jwks endpoints reachable from the gateway runtime. 

107 endpoints["authorization_url"] = _rewrite_endpoint_base(endpoints.get("authorization_url"), public_base_url, "authorization") 

108 endpoints["issuer"] = _rewrite_endpoint_base(endpoints.get("issuer"), public_base_url, "issuer") 

109 endpoints["token_url"] = _rewrite_endpoint_base(endpoints.get("token_url"), base_url, "token") 

110 endpoints["userinfo_url"] = _rewrite_endpoint_base(endpoints.get("userinfo_url"), base_url, "userinfo") 

111 endpoints["jwks_uri"] = _rewrite_endpoint_base(endpoints.get("jwks_uri"), base_url, "jwks") 

112 

113 # Validate that all required endpoints are present 

114 if not all(endpoints.values()): 

115 logger.error(f"Incomplete OIDC configuration from {well_known_url}") 

116 return None 

117 

118 logger.info(f"Successfully discovered Keycloak endpoints for realm '{realm}'") 

119 return endpoints 

120 

121 except httpx.HTTPError as e: 

122 logger.error(f"Failed to discover Keycloak endpoints from {well_known_url}: {e}") 

123 return None 

124 except Exception as e: 

125 logger.error(f"Unexpected error discovering Keycloak endpoints: {e}") 

126 return None 

127 

128 

129def discover_keycloak_endpoints_sync(base_url: str, realm: str, timeout: int = 10, public_base_url: Optional[str] = None) -> Optional[Dict[str, str]]: 

130 """ 

131 Synchronous version of discover_keycloak_endpoints. 

132 

133 Args: 

134 base_url: Keycloak base URL (e.g., https://keycloak.example.com) 

135 realm: Realm name (e.g., master) 

136 timeout: HTTP request timeout in seconds 

137 public_base_url: Optional browser-facing Keycloak base URL for authorization URL rewrite 

138 

139 Returns: 

140 Dict containing authorization_url, token_url, userinfo_url, issuer, jwks_uri 

141 Returns None if discovery fails 

142 """ 

143 well_known_url = f"{base_url}/realms/{realm}/.well-known/openid-configuration" 

144 

145 try: 

146 # First-Party 

147 from mcpgateway.config import settings # pylint: disable=import-outside-toplevel 

148 

149 with httpx.Client( 

150 timeout=timeout, 

151 limits=httpx.Limits( 

152 max_connections=settings.httpx_max_connections, 

153 max_keepalive_connections=settings.httpx_max_keepalive_connections, 

154 keepalive_expiry=settings.httpx_keepalive_expiry, 

155 ), 

156 verify=not settings.skip_ssl_verify, 

157 ) as client: 

158 logger.info(f"Discovering Keycloak endpoints from {well_known_url}") 

159 response = client.get(well_known_url) 

160 response.raise_for_status() 

161 config = response.json() 

162 

163 endpoints = { 

164 "authorization_url": config.get("authorization_endpoint"), 

165 "token_url": config.get("token_endpoint"), 

166 "userinfo_url": config.get("userinfo_endpoint"), 

167 "issuer": config.get("issuer"), 

168 "jwks_uri": config.get("jwks_uri"), 

169 } 

170 

171 # Use optional browser-facing base for authorization endpoint and issuer 

172 # (tokens issued via browser flow contain the public-facing issuer) while 

173 # keeping token/userinfo/jwks endpoints reachable from the gateway runtime. 

174 endpoints["authorization_url"] = _rewrite_endpoint_base(endpoints.get("authorization_url"), public_base_url, "authorization") 

175 endpoints["issuer"] = _rewrite_endpoint_base(endpoints.get("issuer"), public_base_url, "issuer") 

176 endpoints["token_url"] = _rewrite_endpoint_base(endpoints.get("token_url"), base_url, "token") 

177 endpoints["userinfo_url"] = _rewrite_endpoint_base(endpoints.get("userinfo_url"), base_url, "userinfo") 

178 endpoints["jwks_uri"] = _rewrite_endpoint_base(endpoints.get("jwks_uri"), base_url, "jwks") 

179 

180 # Validate that all required endpoints are present 

181 if not all(endpoints.values()): 

182 logger.error(f"Incomplete OIDC configuration from {well_known_url}") 

183 return None 

184 

185 logger.info(f"Successfully discovered Keycloak endpoints for realm '{realm}'") 

186 return endpoints 

187 

188 except httpx.HTTPError as e: 

189 logger.error(f"Failed to discover Keycloak endpoints from {well_known_url}: {e}") 

190 return None 

191 except Exception as e: 

192 logger.error(f"Unexpected error discovering Keycloak endpoints: {e}") 

193 return None