Coverage for mcpgateway / utils / time_restrictions.py: 100%

77 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/utils/time_restrictions.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Sebastian Iozu 

6 

7Time restriction validation utilities for token access control. 

8This module provides functions to validate time-based access restrictions 

9on API tokens, ensuring tokens can only be used during specified time windows. 

10""" 

11 

12# Standard 

13from datetime import datetime 

14import logging 

15from typing import Any, Dict 

16from zoneinfo import ZoneInfo, ZoneInfoNotFoundError 

17 

18# Third-Party 

19from fastapi import HTTPException, status 

20 

21# Initialize logging 

22logger = logging.getLogger(__name__) 

23 

24# Valid day names for day restrictions 

25VALID_DAYS = {"Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"} 

26 

27 

28def validate_time_restrictions(payload: Dict[str, Any]) -> None: 

29 """Validate time restrictions from JWT token payload. 

30 

31 Checks if the current time falls within the allowed time windows and days 

32 specified in the token's time_restrictions. Raises HTTPException if the 

33 token is being used outside of its allowed time periods. 

34 

35 Time restrictions format in JWT payload: 

36 { 

37 "scopes": { 

38 "time_restrictions": { 

39 "start_time": "09:00", 

40 "end_time": "17:00", 

41 "timezone": "UTC", 

42 "days": ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday"] 

43 } 

44 } 

45 } 

46 

47 Args: 

48 payload: Decoded JWT payload containing time_restrictions in scopes 

49 

50 Raises: 

51 HTTPException: 403 if current time is outside allowed windows 

52 

53 Examples: 

54 >>> payload = { 

55 ... "sub": "user@example.com", 

56 ... "scopes": { 

57 ... "time_restrictions": { 

58 ... "start_time": "09:00", 

59 ... "end_time": "17:00", 

60 ... "timezone": "UTC", 

61 ... "days": ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday"] 

62 ... } 

63 ... } 

64 ... } 

65 >>> # Will raise HTTPException if called outside business hours 

66 >>> # validate_time_restrictions(payload) 

67 """ 

68 # Extract time restrictions from token scopes 

69 scopes = payload.get("scopes", {}) 

70 if not isinstance(scopes, dict): 

71 return # No restrictions to validate 

72 

73 time_restrictions = scopes.get("time_restrictions", {}) 

74 if not time_restrictions or not isinstance(time_restrictions, dict): 

75 return # No time restrictions configured 

76 

77 # Get restriction parameters 

78 start_time_str = time_restrictions.get("start_time") 

79 end_time_str = time_restrictions.get("end_time") 

80 timezone_str = time_restrictions.get("timezone", "UTC") 

81 allowed_days = time_restrictions.get("days", []) 

82 

83 # If no restrictions are actually set, allow access 

84 if not (start_time_str or end_time_str or allowed_days): 

85 return 

86 

87 # SECURITY: Type-validate all fields before processing. 

88 # Malformed types (e.g. start_time=123, days="Monday") would cause 

89 # TypeError in strptime/set operations; fail closed to prevent bypass. 

90 if (start_time_str is not None and not isinstance(start_time_str, str)) or (end_time_str is not None and not isinstance(end_time_str, str)): 

91 logger.warning( 

92 "Invalid type for start_time or end_time in time_restrictions", 

93 extra={ 

94 "security_event": "time_restriction_validation_error", 

95 "error_type": "invalid_field_type", 

96 "user": payload.get("sub"), 

97 }, 

98 ) 

99 raise HTTPException( 

100 status_code=status.HTTP_403_FORBIDDEN, 

101 detail="Token has invalid time restriction format: start_time and end_time must be strings", 

102 ) 

103 

104 if not isinstance(timezone_str, str): 

105 logger.warning( 

106 "Invalid type for timezone in time_restrictions", 

107 extra={ 

108 "security_event": "time_restriction_validation_error", 

109 "error_type": "invalid_field_type", 

110 "user": payload.get("sub"), 

111 }, 

112 ) 

113 raise HTTPException( 

114 status_code=status.HTTP_403_FORBIDDEN, 

115 detail="Token has invalid time restriction format: timezone must be a string", 

116 ) 

117 

118 if not isinstance(allowed_days, list) or not all(isinstance(d, str) for d in allowed_days): 

119 logger.warning( 

120 "Invalid type for days in time_restrictions", 

121 extra={ 

122 "security_event": "time_restriction_validation_error", 

123 "error_type": "invalid_field_type", 

124 "user": payload.get("sub"), 

125 }, 

126 ) 

127 raise HTTPException( 

128 status_code=status.HTTP_403_FORBIDDEN, 

129 detail="Token has invalid time restriction format: days must be a list of strings", 

130 ) 

131 

132 # SECURITY: Fail closed on half-configured time windows. 

133 # Only start_time or only end_time is malformed — deny to prevent bypass. 

134 if bool(start_time_str) != bool(end_time_str): 

135 logger.warning( 

136 f"Incomplete time window in time_restrictions: start_time={start_time_str!r}, end_time={end_time_str!r}", 

137 extra={ 

138 "security_event": "time_restriction_validation_error", 

139 "error_type": "incomplete_time_window", 

140 "start_time": start_time_str, 

141 "end_time": end_time_str, 

142 "user": payload.get("sub"), 

143 }, 

144 ) 

145 raise HTTPException( 

146 status_code=status.HTTP_403_FORBIDDEN, 

147 detail="Token has incomplete time restriction: both start_time and end_time are required", 

148 ) 

149 

150 # Validate day names if day restrictions are present 

151 if allowed_days: 

152 invalid_days = set(allowed_days) - VALID_DAYS 

153 if invalid_days: 

154 logger.warning( 

155 f"Invalid day names in time_restrictions: {invalid_days}", 

156 extra={ 

157 "security_event": "time_restriction_validation_error", 

158 "error_type": "invalid_day_names", 

159 "invalid_days": list(invalid_days), 

160 "user": payload.get("sub"), 

161 }, 

162 ) 

163 raise HTTPException( 

164 status_code=status.HTTP_403_FORBIDDEN, 

165 detail=f"Token has invalid day names in time restrictions: {', '.join(sorted(invalid_days))}", 

166 ) 

167 

168 # Get current time in the specified timezone 

169 try: 

170 tz = ZoneInfo(timezone_str) 

171 except (ZoneInfoNotFoundError, KeyError): 

172 logger.warning( 

173 f"Invalid timezone in time_restrictions: {timezone_str}", 

174 extra={ 

175 "security_event": "time_restriction_validation_error", 

176 "error_type": "invalid_timezone", 

177 "timezone": timezone_str, 

178 "user": payload.get("sub"), 

179 }, 

180 ) 

181 # SECURITY: Fail closed - deny access on invalid timezone to prevent bypass 

182 raise HTTPException( 

183 status_code=status.HTTP_403_FORBIDDEN, 

184 detail=f"Token has invalid timezone in time restrictions: {timezone_str}", 

185 ) 

186 

187 now = datetime.now(tz) 

188 current_day = now.strftime("%A") # e.g., "Monday", "Tuesday", etc. 

189 current_time = now.time() 

190 

191 # Check day restriction 

192 if allowed_days and current_day not in allowed_days: 

193 logger.warning( 

194 f"Token access denied: current day '{current_day}' not in allowed days {allowed_days}", 

195 extra={ 

196 "security_event": "time_restriction_violation", 

197 "violation_type": "day_restriction", 

198 "current_day": current_day, 

199 "allowed_days": allowed_days, 

200 "user": payload.get("sub"), 

201 }, 

202 ) 

203 raise HTTPException( 

204 status_code=status.HTTP_403_FORBIDDEN, 

205 detail=f"Token access is restricted to specific days. Current day: {current_day}. Allowed days: {', '.join(allowed_days)}", 

206 ) 

207 

208 # Check time range restriction 

209 if start_time_str and end_time_str: 

210 try: 

211 # Parse time strings (format: "HH:MM" or "HH:MM:SS") 

212 # Try HH:MM:SS first, then HH:MM as fallback 

213 start_time = None 

214 for fmt in ["%H:%M:%S", "%H:%M"]: 

215 try: 

216 start_time = datetime.strptime(start_time_str, fmt).time() 

217 break 

218 except ValueError: 

219 continue 

220 

221 if start_time is None: 

222 raise ValueError(f"Invalid start_time format: {start_time_str}") 

223 

224 end_time = None 

225 for fmt in ["%H:%M:%S", "%H:%M"]: 

226 try: 

227 end_time = datetime.strptime(end_time_str, fmt).time() 

228 break 

229 except ValueError: 

230 continue 

231 

232 if end_time is None: 

233 raise ValueError(f"Invalid end_time format: {end_time_str}") 

234 

235 # Handle time ranges that cross midnight 

236 if start_time <= end_time: 

237 # Normal range (e.g., 09:00 - 17:00) 

238 time_allowed = start_time <= current_time <= end_time 

239 else: 

240 # Range crosses midnight (e.g., 22:00 - 06:00) 

241 time_allowed = current_time >= start_time or current_time <= end_time 

242 

243 if not time_allowed: 

244 logger.warning( 

245 f"Token access denied: current time {current_time.strftime('%H:%M')} outside allowed range {start_time_str} - {end_time_str}", 

246 extra={ 

247 "security_event": "time_restriction_violation", 

248 "violation_type": "time_range_restriction", 

249 "current_time": current_time.isoformat(), 

250 "start_time": start_time_str, 

251 "end_time": end_time_str, 

252 "timezone": timezone_str, 

253 "user": payload.get("sub"), 

254 }, 

255 ) 

256 raise HTTPException( 

257 status_code=status.HTTP_403_FORBIDDEN, 

258 detail=f"Token access is restricted to {start_time_str} - {end_time_str} {timezone_str}. Current time: {current_time.strftime('%H:%M')} {timezone_str}", 

259 ) 

260 except ValueError as e: 

261 logger.error( 

262 f"Invalid time format in time_restrictions: {e}", 

263 extra={ 

264 "security_event": "time_restriction_validation_error", 

265 "error_type": "invalid_time_format", 

266 "start_time": start_time_str, 

267 "end_time": end_time_str, 

268 "user": payload.get("sub"), 

269 }, 

270 ) 

271 # SECURITY: Fail closed - deny access on parsing errors to prevent bypass 

272 raise HTTPException( 

273 status_code=status.HTTP_403_FORBIDDEN, 

274 detail=f"Token has invalid time format in time restrictions: {e}", 

275 )