Coverage for mcpgateway / routers / metrics_maintenance.py: 100%

93 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Metrics Maintenance Router. 

3 

4This router provides admin endpoints for metrics cleanup and rollup operations. 

5 

6Copyright 2025 

7SPDX-License-Identifier: Apache-2.0 

8""" 

9 

10# Standard 

11import logging 

12from typing import Dict, Optional 

13 

14# Third-Party 

15from fastapi import APIRouter, Depends, HTTPException 

16from pydantic import BaseModel, Field 

17 

18# First-Party 

19from mcpgateway.config import settings 

20from mcpgateway.utils.verify_credentials import require_admin_auth 

21 

22logger = logging.getLogger(__name__) 

23 

24router = APIRouter( 

25 prefix="/api/metrics", 

26 tags=["Metrics Maintenance"], 

27 dependencies=[Depends(require_admin_auth)], 

28) 

29 

30 

31class CleanupRequest(BaseModel): 

32 """Request model for manual cleanup.""" 

33 

34 retention_days: Optional[int] = Field(None, ge=0, le=365, description="Override retention period in days (0 = delete all)") 

35 include_rollup: bool = Field(True, description="Also clean up old rollup data") 

36 table_type: Optional[str] = Field(None, description="Clean specific table: tool, resource, prompt, server, a2a_agent") 

37 

38 

39class RollupRequest(BaseModel): 

40 """Request model for manual rollup.""" 

41 

42 hours_back: int = Field(24, ge=1, le=8760, description="How many hours back to process (max 365 days)") 

43 force_reprocess: bool = Field( 

44 False, 

45 description="Deprecated: rollup now always re-aggregates hours with raw data to include late-arriving metrics. This parameter is kept for API compatibility but has no effect.", 

46 ) 

47 

48 

49class CleanupResultResponse(BaseModel): 

50 """Response model for cleanup result.""" 

51 

52 table_name: str 

53 deleted_count: int 

54 remaining_count: int 

55 cutoff_date: str 

56 duration_seconds: float 

57 error: Optional[str] = None 

58 

59 

60class CleanupSummaryResponse(BaseModel): 

61 """Response model for cleanup summary.""" 

62 

63 total_deleted: int 

64 tables: Dict[str, CleanupResultResponse] 

65 duration_seconds: float 

66 started_at: str 

67 completed_at: str 

68 

69 

70class RollupResultResponse(BaseModel): 

71 """Response model for rollup result.""" 

72 

73 table_name: str 

74 hours_processed: int 

75 records_aggregated: int 

76 rollups_created: int 

77 rollups_updated: int 

78 raw_deleted: int 

79 duration_seconds: float 

80 error: Optional[str] = None 

81 

82 

83class RollupSummaryResponse(BaseModel): 

84 """Response model for rollup summary.""" 

85 

86 total_hours_processed: int 

87 total_records_aggregated: int 

88 total_rollups_created: int 

89 total_rollups_updated: int 

90 tables: Dict[str, RollupResultResponse] 

91 duration_seconds: float 

92 started_at: str 

93 completed_at: str 

94 

95 

96class MetricsStatsResponse(BaseModel): 

97 """Response model for metrics stats.""" 

98 

99 cleanup: Dict 

100 rollup: Dict 

101 table_sizes: Dict[str, int] 

102 

103 

104@router.post("/cleanup", response_model=CleanupSummaryResponse) 

105async def trigger_cleanup(request: CleanupRequest = CleanupRequest()): 

106 """Trigger manual cleanup of old metrics data. 

107 

108 This endpoint allows administrators to manually trigger cleanup of old 

109 metrics data. By default, it uses the configured retention period, but 

110 this can be overridden. 

111 

112 Args: 

113 request: Cleanup request parameters 

114 

115 Returns: 

116 CleanupSummaryResponse: Summary of the cleanup operation 

117 

118 Raises: 

119 HTTPException: If metrics cleanup is disabled (400). 

120 """ 

121 if not settings.metrics_cleanup_enabled: 

122 raise HTTPException(status_code=400, detail="Metrics cleanup is disabled") 

123 

124 # First-Party 

125 from mcpgateway.services.metrics_cleanup_service import get_metrics_cleanup_service 

126 

127 service = get_metrics_cleanup_service() 

128 

129 if request.table_type: 

130 # Clean specific table 

131 # Standard 

132 from datetime import datetime, timezone 

133 

134 started_at = datetime.now(timezone.utc) 

135 try: 

136 result = await service.cleanup_table( 

137 table_type=request.table_type, 

138 retention_days=request.retention_days, 

139 ) 

140 except ValueError as e: 

141 raise HTTPException(status_code=400, detail=str(e)) from e 

142 completed_at = datetime.now(timezone.utc) 

143 return CleanupSummaryResponse( 

144 total_deleted=result.deleted_count, 

145 tables={ 

146 result.table_name: CleanupResultResponse( 

147 table_name=result.table_name, 

148 deleted_count=result.deleted_count, 

149 remaining_count=result.remaining_count, 

150 cutoff_date=result.cutoff_date.isoformat(), 

151 duration_seconds=result.duration_seconds, 

152 error=result.error, 

153 ) 

154 }, 

155 duration_seconds=result.duration_seconds, 

156 started_at=started_at.isoformat(), 

157 completed_at=completed_at.isoformat(), 

158 ) 

159 

160 # Clean all tables 

161 summary = await service.cleanup_all( 

162 retention_days=request.retention_days, 

163 include_rollup=request.include_rollup, 

164 ) 

165 

166 return CleanupSummaryResponse( 

167 total_deleted=summary.total_deleted, 

168 tables={ 

169 name: CleanupResultResponse( 

170 table_name=result.table_name, 

171 deleted_count=result.deleted_count, 

172 remaining_count=result.remaining_count, 

173 cutoff_date=result.cutoff_date.isoformat(), 

174 duration_seconds=result.duration_seconds, 

175 error=result.error, 

176 ) 

177 for name, result in summary.tables.items() 

178 }, 

179 duration_seconds=summary.duration_seconds, 

180 started_at=summary.started_at.isoformat(), 

181 completed_at=summary.completed_at.isoformat(), 

182 ) 

183 

184 

185@router.post("/rollup", response_model=RollupSummaryResponse) 

186async def trigger_rollup(request: RollupRequest = RollupRequest()): 

187 """Trigger manual rollup of raw metrics into hourly summaries. 

188 

189 This endpoint allows administrators to manually trigger rollup of raw 

190 metrics into hourly summary tables for efficient historical queries. 

191 

192 Args: 

193 request: Rollup request parameters 

194 

195 Returns: 

196 RollupSummaryResponse: Summary of the rollup operation 

197 

198 Raises: 

199 HTTPException: If metrics rollup is disabled (400). 

200 """ 

201 if not settings.metrics_rollup_enabled: 

202 raise HTTPException(status_code=400, detail="Metrics rollup is disabled") 

203 

204 # First-Party 

205 from mcpgateway.services.metrics_rollup_service import get_metrics_rollup_service 

206 

207 service = get_metrics_rollup_service() 

208 

209 summary = await service.rollup_all( 

210 hours_back=request.hours_back, 

211 force_reprocess=request.force_reprocess, 

212 ) 

213 

214 return RollupSummaryResponse( 

215 total_hours_processed=summary.total_hours_processed, 

216 total_records_aggregated=summary.total_records_aggregated, 

217 total_rollups_created=summary.total_rollups_created, 

218 total_rollups_updated=summary.total_rollups_updated, 

219 tables={ 

220 name: RollupResultResponse( 

221 table_name=result.table_name, 

222 hours_processed=result.hours_processed, 

223 records_aggregated=result.records_aggregated, 

224 rollups_created=result.rollups_created, 

225 rollups_updated=result.rollups_updated, 

226 raw_deleted=result.raw_deleted, 

227 duration_seconds=result.duration_seconds, 

228 error=result.error, 

229 ) 

230 for name, result in summary.tables.items() 

231 }, 

232 duration_seconds=summary.duration_seconds, 

233 started_at=summary.started_at.isoformat(), 

234 completed_at=summary.completed_at.isoformat(), 

235 ) 

236 

237 

238@router.get("/stats", response_model=MetricsStatsResponse) 

239async def get_metrics_stats(): 

240 """Get statistics about metrics cleanup and rollup services. 

241 

242 Returns: 

243 MetricsStatsResponse: Statistics including service status and table sizes 

244 """ 

245 cleanup_stats = {"enabled": False} 

246 rollup_stats = {"enabled": False} 

247 table_sizes = {} 

248 

249 if settings.metrics_cleanup_enabled: 

250 # First-Party 

251 from mcpgateway.services.metrics_cleanup_service import get_metrics_cleanup_service 

252 

253 cleanup_service = get_metrics_cleanup_service() 

254 cleanup_stats = cleanup_service.get_stats() 

255 table_sizes = await cleanup_service.get_table_sizes() 

256 

257 if settings.metrics_rollup_enabled: 

258 # First-Party 

259 from mcpgateway.services.metrics_rollup_service import get_metrics_rollup_service 

260 

261 rollup_service = get_metrics_rollup_service() 

262 rollup_stats = rollup_service.get_stats() 

263 

264 return MetricsStatsResponse( 

265 cleanup=cleanup_stats, 

266 rollup=rollup_stats, 

267 table_sizes=table_sizes, 

268 ) 

269 

270 

271@router.get("/config") 

272async def get_metrics_config(): 

273 """Get current metrics maintenance configuration. 

274 

275 Returns information about cleanup and rollup configuration settings. 

276 

277 Returns: 

278 dict: Current configuration settings 

279 """ 

280 return { 

281 "cleanup": { 

282 "enabled": settings.metrics_cleanup_enabled, 

283 "retention_days": settings.metrics_retention_days, 

284 "interval_hours": settings.metrics_cleanup_interval_hours, 

285 "batch_size": settings.metrics_cleanup_batch_size, 

286 }, 

287 "rollup": { 

288 "enabled": settings.metrics_rollup_enabled, 

289 "interval_hours": settings.metrics_rollup_interval_hours, 

290 "retention_days": settings.metrics_rollup_retention_days, 

291 "late_data_hours": settings.metrics_rollup_late_data_hours, 

292 "delete_raw_after_rollup": settings.metrics_delete_raw_after_rollup, 

293 "delete_raw_after_rollup_hours": settings.metrics_delete_raw_after_rollup_hours, 

294 }, 

295 }