Coverage for mcpgateway / utils / keycloak_discovery.py: 100%

46 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/utils/keycloak_discovery.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Keycloak OIDC endpoint discovery utility. 

8""" 

9 

10# Standard 

11import logging 

12from typing import Dict, Optional 

13 

14# Third-Party 

15import httpx 

16 

17# Logger 

18logger = logging.getLogger(__name__) 

19 

20 

21async def discover_keycloak_endpoints(base_url: str, realm: str, timeout: int = 10) -> Optional[Dict[str, str]]: 

22 """ 

23 Discover Keycloak OIDC endpoints from well-known configuration. 

24 

25 Args: 

26 base_url: Keycloak base URL (e.g., https://keycloak.example.com) 

27 realm: Realm name (e.g., master) 

28 timeout: HTTP request timeout in seconds 

29 

30 Returns: 

31 Dict containing authorization_url, token_url, userinfo_url, issuer, jwks_uri 

32 Returns None if discovery fails 

33 

34 Examples: 

35 >>> import asyncio 

36 >>> # Mock successful discovery 

37 >>> async def test(): 

38 ... # This would require a real Keycloak instance 

39 ... result = await discover_keycloak_endpoints('https://keycloak.example.com', 'master') 

40 ... return result is None or isinstance(result, dict) 

41 >>> asyncio.run(test()) 

42 True 

43 """ 

44 well_known_url = f"{base_url}/realms/{realm}/.well-known/openid-configuration" 

45 

46 try: 

47 # First-Party 

48 from mcpgateway.services.http_client_service import get_http_client # pylint: disable=import-outside-toplevel 

49 

50 client = await get_http_client() 

51 logger.info(f"Discovering Keycloak endpoints from {well_known_url}") 

52 response = await client.get(well_known_url, timeout=timeout) 

53 response.raise_for_status() 

54 config = response.json() 

55 

56 endpoints = { 

57 "authorization_url": config.get("authorization_endpoint"), 

58 "token_url": config.get("token_endpoint"), 

59 "userinfo_url": config.get("userinfo_endpoint"), 

60 "issuer": config.get("issuer"), 

61 "jwks_uri": config.get("jwks_uri"), 

62 } 

63 

64 # Validate that all required endpoints are present 

65 if not all(endpoints.values()): 

66 logger.error(f"Incomplete OIDC configuration from {well_known_url}") 

67 return None 

68 

69 logger.info(f"Successfully discovered Keycloak endpoints for realm '{realm}'") 

70 return endpoints 

71 

72 except httpx.HTTPError as e: 

73 logger.error(f"Failed to discover Keycloak endpoints from {well_known_url}: {e}") 

74 return None 

75 except Exception as e: 

76 logger.error(f"Unexpected error discovering Keycloak endpoints: {e}") 

77 return None 

78 

79 

80def discover_keycloak_endpoints_sync(base_url: str, realm: str, timeout: int = 10) -> Optional[Dict[str, str]]: 

81 """ 

82 Synchronous version of discover_keycloak_endpoints. 

83 

84 Args: 

85 base_url: Keycloak base URL (e.g., https://keycloak.example.com) 

86 realm: Realm name (e.g., master) 

87 timeout: HTTP request timeout in seconds 

88 

89 Returns: 

90 Dict containing authorization_url, token_url, userinfo_url, issuer, jwks_uri 

91 Returns None if discovery fails 

92 """ 

93 well_known_url = f"{base_url}/realms/{realm}/.well-known/openid-configuration" 

94 

95 try: 

96 # First-Party 

97 from mcpgateway.config import settings # pylint: disable=import-outside-toplevel 

98 

99 with httpx.Client( 

100 timeout=timeout, 

101 limits=httpx.Limits( 

102 max_connections=settings.httpx_max_connections, 

103 max_keepalive_connections=settings.httpx_max_keepalive_connections, 

104 keepalive_expiry=settings.httpx_keepalive_expiry, 

105 ), 

106 verify=not settings.skip_ssl_verify, 

107 ) as client: 

108 logger.info(f"Discovering Keycloak endpoints from {well_known_url}") 

109 response = client.get(well_known_url) 

110 response.raise_for_status() 

111 config = response.json() 

112 

113 endpoints = { 

114 "authorization_url": config.get("authorization_endpoint"), 

115 "token_url": config.get("token_endpoint"), 

116 "userinfo_url": config.get("userinfo_endpoint"), 

117 "issuer": config.get("issuer"), 

118 "jwks_uri": config.get("jwks_uri"), 

119 } 

120 

121 # Validate that all required endpoints are present 

122 if not all(endpoints.values()): 

123 logger.error(f"Incomplete OIDC configuration from {well_known_url}") 

124 return None 

125 

126 logger.info(f"Successfully discovered Keycloak endpoints for realm '{realm}'") 

127 return endpoints 

128 

129 except httpx.HTTPError as e: 

130 logger.error(f"Failed to discover Keycloak endpoints from {well_known_url}: {e}") 

131 return None 

132 except Exception as e: 

133 logger.error(f"Unexpected error discovering Keycloak endpoints: {e}") 

134 return None