Coverage for mcpgateway / utils / time_restrictions.py: 100%
77 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/utils/time_restrictions.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Sebastian Iozu
7Time restriction validation utilities for token access control.
8This module provides functions to validate time-based access restrictions
9on API tokens, ensuring tokens can only be used during specified time windows.
10"""
12# Standard
13from datetime import datetime
14import logging
15from typing import Any, Dict
16from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
18# Third-Party
19from fastapi import HTTPException, status
21# Initialize logging
22logger = logging.getLogger(__name__)
24# Valid day names for day restrictions
25VALID_DAYS = {"Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"}
28def validate_time_restrictions(payload: Dict[str, Any]) -> None:
29 """Validate time restrictions from JWT token payload.
31 Checks if the current time falls within the allowed time windows and days
32 specified in the token's time_restrictions. Raises HTTPException if the
33 token is being used outside of its allowed time periods.
35 Time restrictions format in JWT payload:
36 {
37 "scopes": {
38 "time_restrictions": {
39 "start_time": "09:00",
40 "end_time": "17:00",
41 "timezone": "UTC",
42 "days": ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday"]
43 }
44 }
45 }
47 Args:
48 payload: Decoded JWT payload containing time_restrictions in scopes
50 Raises:
51 HTTPException: 403 if current time is outside allowed windows
53 Examples:
54 >>> payload = {
55 ... "sub": "user@example.com",
56 ... "scopes": {
57 ... "time_restrictions": {
58 ... "start_time": "09:00",
59 ... "end_time": "17:00",
60 ... "timezone": "UTC",
61 ... "days": ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday"]
62 ... }
63 ... }
64 ... }
65 >>> # Will raise HTTPException if called outside business hours
66 >>> # validate_time_restrictions(payload)
67 """
68 # Extract time restrictions from token scopes
69 scopes = payload.get("scopes", {})
70 if not isinstance(scopes, dict):
71 return # No restrictions to validate
73 time_restrictions = scopes.get("time_restrictions", {})
74 if not time_restrictions or not isinstance(time_restrictions, dict):
75 return # No time restrictions configured
77 # Get restriction parameters
78 start_time_str = time_restrictions.get("start_time")
79 end_time_str = time_restrictions.get("end_time")
80 timezone_str = time_restrictions.get("timezone", "UTC")
81 allowed_days = time_restrictions.get("days", [])
83 # If no restrictions are actually set, allow access
84 if not (start_time_str or end_time_str or allowed_days):
85 return
87 # SECURITY: Type-validate all fields before processing.
88 # Malformed types (e.g. start_time=123, days="Monday") would cause
89 # TypeError in strptime/set operations; fail closed to prevent bypass.
90 if (start_time_str is not None and not isinstance(start_time_str, str)) or (end_time_str is not None and not isinstance(end_time_str, str)):
91 logger.warning(
92 "Invalid type for start_time or end_time in time_restrictions",
93 extra={
94 "security_event": "time_restriction_validation_error",
95 "error_type": "invalid_field_type",
96 "user": payload.get("sub"),
97 },
98 )
99 raise HTTPException(
100 status_code=status.HTTP_403_FORBIDDEN,
101 detail="Token has invalid time restriction format: start_time and end_time must be strings",
102 )
104 if not isinstance(timezone_str, str):
105 logger.warning(
106 "Invalid type for timezone in time_restrictions",
107 extra={
108 "security_event": "time_restriction_validation_error",
109 "error_type": "invalid_field_type",
110 "user": payload.get("sub"),
111 },
112 )
113 raise HTTPException(
114 status_code=status.HTTP_403_FORBIDDEN,
115 detail="Token has invalid time restriction format: timezone must be a string",
116 )
118 if not isinstance(allowed_days, list) or not all(isinstance(d, str) for d in allowed_days):
119 logger.warning(
120 "Invalid type for days in time_restrictions",
121 extra={
122 "security_event": "time_restriction_validation_error",
123 "error_type": "invalid_field_type",
124 "user": payload.get("sub"),
125 },
126 )
127 raise HTTPException(
128 status_code=status.HTTP_403_FORBIDDEN,
129 detail="Token has invalid time restriction format: days must be a list of strings",
130 )
132 # SECURITY: Fail closed on half-configured time windows.
133 # Only start_time or only end_time is malformed — deny to prevent bypass.
134 if bool(start_time_str) != bool(end_time_str):
135 logger.warning(
136 f"Incomplete time window in time_restrictions: start_time={start_time_str!r}, end_time={end_time_str!r}",
137 extra={
138 "security_event": "time_restriction_validation_error",
139 "error_type": "incomplete_time_window",
140 "start_time": start_time_str,
141 "end_time": end_time_str,
142 "user": payload.get("sub"),
143 },
144 )
145 raise HTTPException(
146 status_code=status.HTTP_403_FORBIDDEN,
147 detail="Token has incomplete time restriction: both start_time and end_time are required",
148 )
150 # Validate day names if day restrictions are present
151 if allowed_days:
152 invalid_days = set(allowed_days) - VALID_DAYS
153 if invalid_days:
154 logger.warning(
155 f"Invalid day names in time_restrictions: {invalid_days}",
156 extra={
157 "security_event": "time_restriction_validation_error",
158 "error_type": "invalid_day_names",
159 "invalid_days": list(invalid_days),
160 "user": payload.get("sub"),
161 },
162 )
163 raise HTTPException(
164 status_code=status.HTTP_403_FORBIDDEN,
165 detail=f"Token has invalid day names in time restrictions: {', '.join(sorted(invalid_days))}",
166 )
168 # Get current time in the specified timezone
169 try:
170 tz = ZoneInfo(timezone_str)
171 except (ZoneInfoNotFoundError, KeyError):
172 logger.warning(
173 f"Invalid timezone in time_restrictions: {timezone_str}",
174 extra={
175 "security_event": "time_restriction_validation_error",
176 "error_type": "invalid_timezone",
177 "timezone": timezone_str,
178 "user": payload.get("sub"),
179 },
180 )
181 # SECURITY: Fail closed - deny access on invalid timezone to prevent bypass
182 raise HTTPException(
183 status_code=status.HTTP_403_FORBIDDEN,
184 detail=f"Token has invalid timezone in time restrictions: {timezone_str}",
185 )
187 now = datetime.now(tz)
188 current_day = now.strftime("%A") # e.g., "Monday", "Tuesday", etc.
189 current_time = now.time()
191 # Check day restriction
192 if allowed_days and current_day not in allowed_days:
193 logger.warning(
194 f"Token access denied: current day '{current_day}' not in allowed days {allowed_days}",
195 extra={
196 "security_event": "time_restriction_violation",
197 "violation_type": "day_restriction",
198 "current_day": current_day,
199 "allowed_days": allowed_days,
200 "user": payload.get("sub"),
201 },
202 )
203 raise HTTPException(
204 status_code=status.HTTP_403_FORBIDDEN,
205 detail=f"Token access is restricted to specific days. Current day: {current_day}. Allowed days: {', '.join(allowed_days)}",
206 )
208 # Check time range restriction
209 if start_time_str and end_time_str:
210 try:
211 # Parse time strings (format: "HH:MM" or "HH:MM:SS")
212 # Try HH:MM:SS first, then HH:MM as fallback
213 start_time = None
214 for fmt in ["%H:%M:%S", "%H:%M"]:
215 try:
216 start_time = datetime.strptime(start_time_str, fmt).time()
217 break
218 except ValueError:
219 continue
221 if start_time is None:
222 raise ValueError(f"Invalid start_time format: {start_time_str}")
224 end_time = None
225 for fmt in ["%H:%M:%S", "%H:%M"]:
226 try:
227 end_time = datetime.strptime(end_time_str, fmt).time()
228 break
229 except ValueError:
230 continue
232 if end_time is None:
233 raise ValueError(f"Invalid end_time format: {end_time_str}")
235 # Handle time ranges that cross midnight
236 if start_time <= end_time:
237 # Normal range (e.g., 09:00 - 17:00)
238 time_allowed = start_time <= current_time <= end_time
239 else:
240 # Range crosses midnight (e.g., 22:00 - 06:00)
241 time_allowed = current_time >= start_time or current_time <= end_time
243 if not time_allowed:
244 logger.warning(
245 f"Token access denied: current time {current_time.strftime('%H:%M')} outside allowed range {start_time_str} - {end_time_str}",
246 extra={
247 "security_event": "time_restriction_violation",
248 "violation_type": "time_range_restriction",
249 "current_time": current_time.isoformat(),
250 "start_time": start_time_str,
251 "end_time": end_time_str,
252 "timezone": timezone_str,
253 "user": payload.get("sub"),
254 },
255 )
256 raise HTTPException(
257 status_code=status.HTTP_403_FORBIDDEN,
258 detail=f"Token access is restricted to {start_time_str} - {end_time_str} {timezone_str}. Current time: {current_time.strftime('%H:%M')} {timezone_str}",
259 )
260 except ValueError as e:
261 logger.error(
262 f"Invalid time format in time_restrictions: {e}",
263 extra={
264 "security_event": "time_restriction_validation_error",
265 "error_type": "invalid_time_format",
266 "start_time": start_time_str,
267 "end_time": end_time_str,
268 "user": payload.get("sub"),
269 },
270 )
271 # SECURITY: Fail closed - deny access on parsing errors to prevent bypass
272 raise HTTPException(
273 status_code=status.HTTP_403_FORBIDDEN,
274 detail=f"Token has invalid time format in time restrictions: {e}",
275 )