Coverage for mcpgateway / services / system_stats_service.py: 100%

100 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/system_stats_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7System Metrics Service Implementation. 

8This module provides comprehensive system metrics for monitoring deployment scale 

9and resource utilization across all entity types in the MCP Gateway. 

10 

11It includes: 

12- User and team counts (users, teams, memberships) 

13- MCP resource counts (servers, tools, resources, prompts, A2A agents, gateways) 

14- API token counts (active, revoked, total) 

15- Session and activity metrics 

16- Comprehensive metrics and analytics counts 

17- Security and audit log counts 

18- Workflow state tracking 

19 

20Examples: 

21 >>> from mcpgateway.services.system_stats_service import SystemStatsService 

22 >>> service = SystemStatsService() 

23 >>> # Get all metrics (requires database session) 

24 >>> # stats = service.get_comprehensive_stats(db) 

25 >>> # stats["users"]["total"] # Total user count 

26 >>> # stats["mcp_resources"]["breakdown"]["tools"] # Tool count 

27""" 

28 

29# Standard 

30import logging 

31from typing import Any, Dict 

32 

33# Third-Party 

34from sqlalchemy import case, func, literal, select 

35from sqlalchemy.orm import Session 

36 

37# First-Party 

38from mcpgateway.db import ( 

39 A2AAgent, 

40 A2AAgentMetric, 

41 EmailApiToken, 

42 EmailAuthEvent, 

43 EmailTeam, 

44 EmailTeamInvitation, 

45 EmailTeamJoinRequest, 

46 EmailTeamMember, 

47 EmailUser, 

48 Gateway, 

49 OAuthToken, 

50 PendingUserApproval, 

51 PermissionAuditLog, 

52 Prompt, 

53 PromptMetric, 

54 Resource, 

55 ResourceMetric, 

56 ResourceSubscription, 

57 Server, 

58 ServerMetric, 

59 SessionMessageRecord, 

60 SessionRecord, 

61 SSOProvider, 

62 TokenRevocation, 

63 TokenUsageLog, 

64 Tool, 

65 ToolMetric, 

66) 

67 

68logger = logging.getLogger(__name__) 

69 

70# Cache import (lazy to avoid circular dependencies) 

71_ADMIN_STATS_CACHE = None 

72 

73 

74def _get_admin_stats_cache(): 

75 """Get admin stats cache singleton lazily. 

76 

77 Returns: 

78 AdminStatsCache instance. 

79 """ 

80 global _ADMIN_STATS_CACHE # pylint: disable=global-statement 

81 if _ADMIN_STATS_CACHE is None: 

82 # First-Party 

83 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel 

84 

85 _ADMIN_STATS_CACHE = admin_stats_cache 

86 return _ADMIN_STATS_CACHE 

87 

88 

89# pylint: disable=not-callable 

90# SQLAlchemy's func.count() is callable at runtime but pylint cannot detect this 

91class SystemStatsService: 

92 """Service for retrieving comprehensive system metrics. 

93 

94 This service provides read-only access to system-wide metrics across 

95 all entity types, providing administrators with at-a-glance visibility 

96 into deployment scale and resource utilization. 

97 

98 Examples: 

99 >>> service = SystemStatsService() 

100 >>> # With database session 

101 >>> # stats = service.get_comprehensive_stats(db) 

102 >>> # print(f"Total users: {stats['users']['total']}") 

103 >>> # print(f"Total tools: {stats['mcp_resources']['breakdown']['tools']}") 

104 """ 

105 

106 def get_comprehensive_stats(self, db: Session) -> Dict[str, Any]: 

107 """Get comprehensive system metrics across all categories. 

108 

109 Args: 

110 db: Database session for querying metrics 

111 

112 Returns: 

113 Dictionary containing categorized metrics with totals and breakdowns 

114 

115 Raises: 

116 Exception: If database queries fail or metrics collection encounters errors 

117 

118 Examples: 

119 >>> service = SystemStatsService() 

120 >>> # stats = service.get_comprehensive_stats(db) 

121 >>> # assert "users" in stats 

122 >>> # assert "mcp_resources" in stats 

123 >>> # assert "total" in stats["users"] 

124 >>> # assert "breakdown" in stats["users"] 

125 """ 

126 logger.info("Collecting comprehensive system metrics") 

127 

128 try: 

129 stats = { 

130 "users": self._get_user_stats(db), 

131 "teams": self._get_team_stats(db), 

132 "mcp_resources": self._get_mcp_resource_stats(db), 

133 "tokens": self._get_token_stats(db), 

134 "sessions": self._get_session_stats(db), 

135 "metrics": self._get_metrics_stats(db), 

136 "security": self._get_security_stats(db), 

137 "workflow": self._get_workflow_stats(db), 

138 } 

139 

140 logger.info("Successfully collected system metrics") 

141 return stats 

142 

143 except Exception as e: 

144 logger.error(f"Error collecting system metrics: {str(e)}") 

145 raise 

146 

147 async def get_comprehensive_stats_cached(self, db: Session) -> Dict[str, Any]: 

148 """Get comprehensive system metrics with caching. 

149 

150 This is the async-friendly version that uses the admin stats cache. 

151 Call this from async endpoints for optimal performance. 

152 

153 Args: 

154 db: Database session for querying metrics 

155 

156 Returns: 

157 Dictionary containing categorized metrics with totals and breakdowns 

158 

159 Examples: 

160 >>> service = SystemStatsService() 

161 >>> # import asyncio 

162 >>> # stats = asyncio.run(service.get_comprehensive_stats_cached(db)) 

163 """ 

164 cache = _get_admin_stats_cache() 

165 cached = await cache.get_system_stats() 

166 if cached is not None: 

167 return cached 

168 

169 # Cache miss - compute and cache 

170 stats = self.get_comprehensive_stats(db) 

171 await cache.set_system_stats(stats) 

172 return stats 

173 

174 def _get_user_stats(self, db: Session) -> Dict[str, Any]: 

175 """Get user-related metrics. 

176 

177 Args: 

178 db: Database session 

179 

180 Returns: 

181 Dictionary with total user count and breakdown by status 

182 

183 Examples: 

184 >>> service = SystemStatsService() 

185 >>> # stats = service._get_user_stats(db) 

186 >>> # assert stats["total"] >= 0 

187 >>> # assert "breakdown" in stats 

188 >>> # assert "active" in stats["breakdown"] 

189 """ 

190 # Optimized from 3 queries to 1 using aggregated SELECT 

191 result = db.execute( 

192 select( 

193 func.count(EmailUser.email).label("total"), 

194 func.sum(case((EmailUser.is_active.is_(True), 1), else_=0)).label("active"), 

195 func.sum(case((EmailUser.is_admin.is_(True), 1), else_=0)).label("admins"), 

196 ) 

197 ).one() 

198 

199 total = result.total or 0 

200 active = result.active or 0 

201 admins = result.admins or 0 

202 

203 return {"total": total, "breakdown": {"active": active, "inactive": total - active, "admins": admins}} 

204 

205 def _get_team_stats(self, db: Session) -> Dict[str, Any]: 

206 """Get team-related metrics. 

207 

208 Args: 

209 db: Database session 

210 

211 Returns: 

212 Dictionary with total team count and breakdown by type 

213 

214 Examples: 

215 >>> service = SystemStatsService() 

216 >>> # stats = service._get_team_stats(db) 

217 >>> # assert stats["total"] >= 0 

218 >>> # assert "personal" in stats["breakdown"] 

219 >>> # assert "organizational" in stats["breakdown"] 

220 """ 

221 # Optimized from 3 queries to 2 using aggregated SELECT (separate tables need separate queries) 

222 team_result = db.execute( 

223 select( 

224 func.count(EmailTeam.id).label("total_teams"), 

225 func.sum(case((EmailTeam.is_personal.is_(True), 1), else_=0)).label("personal_teams"), 

226 ).select_from(EmailTeam) 

227 ).one() 

228 team_members = db.execute(select(func.count(EmailTeamMember.id))).scalar() or 0 

229 

230 total_teams = team_result.total_teams or 0 

231 personal_teams = team_result.personal_teams or 0 

232 

233 return {"total": total_teams, "breakdown": {"personal": personal_teams, "organizational": total_teams - personal_teams, "members": team_members}} 

234 

235 def _get_mcp_resource_stats(self, db: Session) -> Dict[str, Any]: 

236 """Get MCP resource metrics in a SINGLE query using UNION ALL. 

237 

238 Optimized from 6 queries to 1. 

239 

240 Args: 

241 db: Database session 

242 

243 Returns: 

244 Dictionary with total MCP resource count and breakdown by type 

245 """ 

246 # Create a single query that combines counts from all tables with consistent column labels 

247 stmt = ( 

248 select(literal("servers").label("type"), func.count(Server.id).label("cnt")) 

249 .select_from(Server) 

250 .union_all( 

251 select(literal("gateways").label("type"), func.count(Gateway.id).label("cnt")).select_from(Gateway), 

252 select(literal("tools").label("type"), func.count(Tool.id).label("cnt")).select_from(Tool), 

253 select(literal("resources").label("type"), func.count(Resource.uri).label("cnt")).select_from(Resource), 

254 select(literal("prompts").label("type"), func.count(Prompt.name).label("cnt")).select_from(Prompt), 

255 select(literal("a2a_agents").label("type"), func.count(A2AAgent.id).label("cnt")).select_from(A2AAgent), 

256 ) 

257 ) 

258 

259 # Execute once - this is now a single database query instead of 6 separate queries 

260 results = db.execute(stmt).all() 

261 

262 # Convert list of rows to a dictionary 

263 counts = {row.type: row.cnt for row in results} 

264 

265 # Safe lookups (defaults to 0 if table is empty) 

266 servers = counts.get("servers", 0) 

267 gateways = counts.get("gateways", 0) 

268 tools = counts.get("tools", 0) 

269 resources = counts.get("resources", 0) 

270 prompts = counts.get("prompts", 0) 

271 agents = counts.get("a2a_agents", 0) 

272 

273 total = servers + gateways + tools + resources + prompts + agents 

274 

275 return {"total": total, "breakdown": {"servers": servers, "gateways": gateways, "tools": tools, "resources": resources, "prompts": prompts, "a2a_agents": agents}} 

276 

277 def _get_token_stats(self, db: Session) -> Dict[str, Any]: 

278 """Get API token metrics. 

279 

280 Args: 

281 db: Database session 

282 

283 Returns: 

284 Dictionary with total token count and breakdown by status 

285 

286 Examples: 

287 >>> service = SystemStatsService() 

288 >>> # stats = service._get_token_stats(db) 

289 >>> # assert stats["total"] >= 0 

290 >>> # assert "active" in stats["breakdown"] 

291 """ 

292 # Optimized from 3 queries to 2 using aggregated SELECT (separate tables need separate queries) 

293 token_result = db.execute( 

294 select( 

295 func.count(EmailApiToken.id).label("total"), 

296 func.sum(case((EmailApiToken.is_active.is_(True), 1), else_=0)).label("active"), 

297 ).select_from(EmailApiToken) 

298 ).one() 

299 revoked = db.execute(select(func.count(TokenRevocation.jti))).scalar() or 0 

300 

301 total = token_result.total or 0 

302 active = token_result.active or 0 

303 

304 return {"total": total, "breakdown": {"active": active, "inactive": total - active, "revoked": revoked}} 

305 

306 def _get_session_stats(self, db: Session) -> Dict[str, Any]: 

307 """Get session and activity metrics. 

308 

309 Args: 

310 db: Database session 

311 

312 Returns: 

313 Dictionary with total session count and breakdown by type 

314 

315 Examples: 

316 >>> service = SystemStatsService() 

317 >>> # stats = service._get_session_stats(db) 

318 >>> # assert stats["total"] >= 0 

319 >>> # assert "mcp_sessions" in stats["breakdown"] 

320 """ 

321 # Optimized from 4 queries to 1 using UNION ALL (separate tables need separate selects) 

322 stmt = ( 

323 select(literal("mcp_sessions").label("type"), func.count(SessionRecord.session_id).label("cnt")) 

324 .select_from(SessionRecord) 

325 .union_all( 

326 select(literal("mcp_messages").label("type"), func.count(SessionMessageRecord.id).label("cnt")).select_from(SessionMessageRecord), 

327 select(literal("subscriptions").label("type"), func.count(ResourceSubscription.id).label("cnt")).select_from(ResourceSubscription), 

328 select(literal("oauth_tokens").label("type"), func.count(OAuthToken.access_token).label("cnt")).select_from(OAuthToken), 

329 ) 

330 ) 

331 results = db.execute(stmt).all() 

332 counts = {row.type: row.cnt for row in results} 

333 

334 mcp_sessions = counts.get("mcp_sessions", 0) 

335 mcp_messages = counts.get("mcp_messages", 0) 

336 subscriptions = counts.get("subscriptions", 0) 

337 oauth_tokens = counts.get("oauth_tokens", 0) 

338 total = mcp_sessions + mcp_messages + subscriptions + oauth_tokens 

339 

340 return {"total": total, "breakdown": {"mcp_sessions": mcp_sessions, "mcp_messages": mcp_messages, "subscriptions": subscriptions, "oauth_tokens": oauth_tokens}} 

341 

342 def _get_metrics_stats(self, db: Session) -> Dict[str, Any]: 

343 """Get metrics and analytics counts. 

344 

345 Args: 

346 db: Database session 

347 

348 Returns: 

349 Dictionary with total metrics count and breakdown by type 

350 

351 Examples: 

352 >>> service = SystemStatsService() 

353 >>> # stats = service._get_metrics_stats(db) 

354 >>> # assert stats["total"] >= 0 

355 >>> # assert "tool_metrics" in stats["breakdown"] 

356 """ 

357 # Optimized from 6 queries to 1 using UNION ALL (separate tables need separate selects) 

358 stmt = ( 

359 select(literal("tool_metrics").label("type"), func.count(ToolMetric.id).label("cnt")) 

360 .select_from(ToolMetric) 

361 .union_all( 

362 select(literal("resource_metrics").label("type"), func.count(ResourceMetric.id).label("cnt")).select_from(ResourceMetric), 

363 select(literal("prompt_metrics").label("type"), func.count(PromptMetric.id).label("cnt")).select_from(PromptMetric), 

364 select(literal("server_metrics").label("type"), func.count(ServerMetric.id).label("cnt")).select_from(ServerMetric), 

365 select(literal("a2a_agent_metrics").label("type"), func.count(A2AAgentMetric.id).label("cnt")).select_from(A2AAgentMetric), 

366 select(literal("token_usage_logs").label("type"), func.count(TokenUsageLog.id).label("cnt")).select_from(TokenUsageLog), 

367 ) 

368 ) 

369 results = db.execute(stmt).all() 

370 counts = {row.type: row.cnt for row in results} 

371 

372 tool_metrics = counts.get("tool_metrics", 0) 

373 resource_metrics = counts.get("resource_metrics", 0) 

374 prompt_metrics = counts.get("prompt_metrics", 0) 

375 server_metrics = counts.get("server_metrics", 0) 

376 a2a_agent_metrics = counts.get("a2a_agent_metrics", 0) 

377 token_usage_logs = counts.get("token_usage_logs", 0) 

378 total = tool_metrics + resource_metrics + prompt_metrics + server_metrics + a2a_agent_metrics + token_usage_logs 

379 

380 return { 

381 "total": total, 

382 "breakdown": { 

383 "tool_metrics": tool_metrics, 

384 "resource_metrics": resource_metrics, 

385 "prompt_metrics": prompt_metrics, 

386 "server_metrics": server_metrics, 

387 "a2a_agent_metrics": a2a_agent_metrics, 

388 "token_usage_logs": token_usage_logs, 

389 }, 

390 } 

391 

392 def _get_security_stats(self, db: Session) -> Dict[str, Any]: 

393 """Get security and audit metrics. 

394 

395 Args: 

396 db: Database session 

397 

398 Returns: 

399 Dictionary with total security event count and breakdown by type 

400 

401 Examples: 

402 >>> service = SystemStatsService() 

403 >>> # stats = service._get_security_stats(db) 

404 >>> # assert stats["total"] >= 0 

405 >>> # assert "auth_events" in stats["breakdown"] 

406 """ 

407 # Optimized from 4 queries to 1 using UNION ALL (separate tables need separate selects) 

408 stmt = ( 

409 select(literal("auth_events").label("type"), func.count(EmailAuthEvent.id).label("cnt")) 

410 .select_from(EmailAuthEvent) 

411 .union_all( 

412 select(literal("audit_logs").label("type"), func.count(PermissionAuditLog.id).label("cnt")).select_from(PermissionAuditLog), 

413 select(literal("pending_approvals").label("type"), func.count(PendingUserApproval.id).label("cnt")).select_from(PendingUserApproval).where(PendingUserApproval.status == "pending"), 

414 select(literal("sso_providers").label("type"), func.count(SSOProvider.id).label("cnt")).select_from(SSOProvider).where(SSOProvider.is_enabled.is_(True)), 

415 ) 

416 ) 

417 results = db.execute(stmt).all() 

418 counts = {row.type: row.cnt for row in results} 

419 

420 auth_events = counts.get("auth_events", 0) 

421 audit_logs = counts.get("audit_logs", 0) 

422 pending_approvals = counts.get("pending_approvals", 0) 

423 sso_providers = counts.get("sso_providers", 0) 

424 total = auth_events + audit_logs + pending_approvals 

425 

426 return {"total": total, "breakdown": {"auth_events": auth_events, "audit_logs": audit_logs, "pending_approvals": pending_approvals, "sso_providers": sso_providers}} 

427 

428 def _get_workflow_stats(self, db: Session) -> Dict[str, Any]: 

429 """Get workflow state metrics. 

430 

431 Args: 

432 db: Database session 

433 

434 Returns: 

435 Dictionary with total workflow item count and breakdown by type 

436 

437 Examples: 

438 >>> service = SystemStatsService() 

439 >>> # stats = service._get_workflow_stats(db) 

440 >>> # assert stats["total"] >= 0 

441 >>> # assert "team_invitations" in stats["breakdown"] 

442 """ 

443 # Optimized from 2 queries to 1 using UNION ALL (separate tables need separate selects) 

444 stmt = ( 

445 select(literal("invitations").label("type"), func.count(EmailTeamInvitation.id).label("cnt")) 

446 .select_from(EmailTeamInvitation) 

447 .where(EmailTeamInvitation.is_active.is_(True)) 

448 .union_all( 

449 select(literal("join_requests").label("type"), func.count(EmailTeamJoinRequest.id).label("cnt")).select_from(EmailTeamJoinRequest).where(EmailTeamJoinRequest.status == "pending"), 

450 ) 

451 ) 

452 results = db.execute(stmt).all() 

453 counts = {row.type: row.cnt for row in results} 

454 

455 invitations = counts.get("invitations", 0) 

456 join_requests = counts.get("join_requests", 0) 

457 total = invitations + join_requests 

458 

459 return {"total": total, "breakdown": {"team_invitations": invitations, "join_requests": join_requests}}