Coverage for mcpgateway / services / system_stats_service.py: 100%
100 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/system_stats_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7System Metrics Service Implementation.
8This module provides comprehensive system metrics for monitoring deployment scale
9and resource utilization across all entity types in the MCP Gateway.
11It includes:
12- User and team counts (users, teams, memberships)
13- MCP resource counts (servers, tools, resources, prompts, A2A agents, gateways)
14- API token counts (active, revoked, total)
15- Session and activity metrics
16- Comprehensive metrics and analytics counts
17- Security and audit log counts
18- Workflow state tracking
20Examples:
21 >>> from mcpgateway.services.system_stats_service import SystemStatsService
22 >>> service = SystemStatsService()
23 >>> # Get all metrics (requires database session)
24 >>> # stats = service.get_comprehensive_stats(db)
25 >>> # stats["users"]["total"] # Total user count
26 >>> # stats["mcp_resources"]["breakdown"]["tools"] # Tool count
27"""
29# Standard
30import logging
31from typing import Any, Dict
33# Third-Party
34from sqlalchemy import case, func, literal, select
35from sqlalchemy.orm import Session
37# First-Party
38from mcpgateway.db import (
39 A2AAgent,
40 A2AAgentMetric,
41 EmailApiToken,
42 EmailAuthEvent,
43 EmailTeam,
44 EmailTeamInvitation,
45 EmailTeamJoinRequest,
46 EmailTeamMember,
47 EmailUser,
48 Gateway,
49 OAuthToken,
50 PendingUserApproval,
51 PermissionAuditLog,
52 Prompt,
53 PromptMetric,
54 Resource,
55 ResourceMetric,
56 ResourceSubscription,
57 Server,
58 ServerMetric,
59 SessionMessageRecord,
60 SessionRecord,
61 SSOProvider,
62 TokenRevocation,
63 TokenUsageLog,
64 Tool,
65 ToolMetric,
66)
68logger = logging.getLogger(__name__)
70# Cache import (lazy to avoid circular dependencies)
71_ADMIN_STATS_CACHE = None
74def _get_admin_stats_cache():
75 """Get admin stats cache singleton lazily.
77 Returns:
78 AdminStatsCache instance.
79 """
80 global _ADMIN_STATS_CACHE # pylint: disable=global-statement
81 if _ADMIN_STATS_CACHE is None:
82 # First-Party
83 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel
85 _ADMIN_STATS_CACHE = admin_stats_cache
86 return _ADMIN_STATS_CACHE
89# pylint: disable=not-callable
90# SQLAlchemy's func.count() is callable at runtime but pylint cannot detect this
91class SystemStatsService:
92 """Service for retrieving comprehensive system metrics.
94 This service provides read-only access to system-wide metrics across
95 all entity types, providing administrators with at-a-glance visibility
96 into deployment scale and resource utilization.
98 Examples:
99 >>> service = SystemStatsService()
100 >>> # With database session
101 >>> # stats = service.get_comprehensive_stats(db)
102 >>> # print(f"Total users: {stats['users']['total']}")
103 >>> # print(f"Total tools: {stats['mcp_resources']['breakdown']['tools']}")
104 """
106 def get_comprehensive_stats(self, db: Session) -> Dict[str, Any]:
107 """Get comprehensive system metrics across all categories.
109 Args:
110 db: Database session for querying metrics
112 Returns:
113 Dictionary containing categorized metrics with totals and breakdowns
115 Raises:
116 Exception: If database queries fail or metrics collection encounters errors
118 Examples:
119 >>> service = SystemStatsService()
120 >>> # stats = service.get_comprehensive_stats(db)
121 >>> # assert "users" in stats
122 >>> # assert "mcp_resources" in stats
123 >>> # assert "total" in stats["users"]
124 >>> # assert "breakdown" in stats["users"]
125 """
126 logger.info("Collecting comprehensive system metrics")
128 try:
129 stats = {
130 "users": self._get_user_stats(db),
131 "teams": self._get_team_stats(db),
132 "mcp_resources": self._get_mcp_resource_stats(db),
133 "tokens": self._get_token_stats(db),
134 "sessions": self._get_session_stats(db),
135 "metrics": self._get_metrics_stats(db),
136 "security": self._get_security_stats(db),
137 "workflow": self._get_workflow_stats(db),
138 }
140 logger.info("Successfully collected system metrics")
141 return stats
143 except Exception as e:
144 logger.error(f"Error collecting system metrics: {str(e)}")
145 raise
147 async def get_comprehensive_stats_cached(self, db: Session) -> Dict[str, Any]:
148 """Get comprehensive system metrics with caching.
150 This is the async-friendly version that uses the admin stats cache.
151 Call this from async endpoints for optimal performance.
153 Args:
154 db: Database session for querying metrics
156 Returns:
157 Dictionary containing categorized metrics with totals and breakdowns
159 Examples:
160 >>> service = SystemStatsService()
161 >>> # import asyncio
162 >>> # stats = asyncio.run(service.get_comprehensive_stats_cached(db))
163 """
164 cache = _get_admin_stats_cache()
165 cached = await cache.get_system_stats()
166 if cached is not None:
167 return cached
169 # Cache miss - compute and cache
170 stats = self.get_comprehensive_stats(db)
171 await cache.set_system_stats(stats)
172 return stats
174 def _get_user_stats(self, db: Session) -> Dict[str, Any]:
175 """Get user-related metrics.
177 Args:
178 db: Database session
180 Returns:
181 Dictionary with total user count and breakdown by status
183 Examples:
184 >>> service = SystemStatsService()
185 >>> # stats = service._get_user_stats(db)
186 >>> # assert stats["total"] >= 0
187 >>> # assert "breakdown" in stats
188 >>> # assert "active" in stats["breakdown"]
189 """
190 # Optimized from 3 queries to 1 using aggregated SELECT
191 result = db.execute(
192 select(
193 func.count(EmailUser.email).label("total"),
194 func.sum(case((EmailUser.is_active.is_(True), 1), else_=0)).label("active"),
195 func.sum(case((EmailUser.is_admin.is_(True), 1), else_=0)).label("admins"),
196 )
197 ).one()
199 total = result.total or 0
200 active = result.active or 0
201 admins = result.admins or 0
203 return {"total": total, "breakdown": {"active": active, "inactive": total - active, "admins": admins}}
205 def _get_team_stats(self, db: Session) -> Dict[str, Any]:
206 """Get team-related metrics.
208 Args:
209 db: Database session
211 Returns:
212 Dictionary with total team count and breakdown by type
214 Examples:
215 >>> service = SystemStatsService()
216 >>> # stats = service._get_team_stats(db)
217 >>> # assert stats["total"] >= 0
218 >>> # assert "personal" in stats["breakdown"]
219 >>> # assert "organizational" in stats["breakdown"]
220 """
221 # Optimized from 3 queries to 2 using aggregated SELECT (separate tables need separate queries)
222 team_result = db.execute(
223 select(
224 func.count(EmailTeam.id).label("total_teams"),
225 func.sum(case((EmailTeam.is_personal.is_(True), 1), else_=0)).label("personal_teams"),
226 ).select_from(EmailTeam)
227 ).one()
228 team_members = db.execute(select(func.count(EmailTeamMember.id))).scalar() or 0
230 total_teams = team_result.total_teams or 0
231 personal_teams = team_result.personal_teams or 0
233 return {"total": total_teams, "breakdown": {"personal": personal_teams, "organizational": total_teams - personal_teams, "members": team_members}}
235 def _get_mcp_resource_stats(self, db: Session) -> Dict[str, Any]:
236 """Get MCP resource metrics in a SINGLE query using UNION ALL.
238 Optimized from 6 queries to 1.
240 Args:
241 db: Database session
243 Returns:
244 Dictionary with total MCP resource count and breakdown by type
245 """
246 # Create a single query that combines counts from all tables with consistent column labels
247 stmt = (
248 select(literal("servers").label("type"), func.count(Server.id).label("cnt"))
249 .select_from(Server)
250 .union_all(
251 select(literal("gateways").label("type"), func.count(Gateway.id).label("cnt")).select_from(Gateway),
252 select(literal("tools").label("type"), func.count(Tool.id).label("cnt")).select_from(Tool),
253 select(literal("resources").label("type"), func.count(Resource.uri).label("cnt")).select_from(Resource),
254 select(literal("prompts").label("type"), func.count(Prompt.name).label("cnt")).select_from(Prompt),
255 select(literal("a2a_agents").label("type"), func.count(A2AAgent.id).label("cnt")).select_from(A2AAgent),
256 )
257 )
259 # Execute once - this is now a single database query instead of 6 separate queries
260 results = db.execute(stmt).all()
262 # Convert list of rows to a dictionary
263 counts = {row.type: row.cnt for row in results}
265 # Safe lookups (defaults to 0 if table is empty)
266 servers = counts.get("servers", 0)
267 gateways = counts.get("gateways", 0)
268 tools = counts.get("tools", 0)
269 resources = counts.get("resources", 0)
270 prompts = counts.get("prompts", 0)
271 agents = counts.get("a2a_agents", 0)
273 total = servers + gateways + tools + resources + prompts + agents
275 return {"total": total, "breakdown": {"servers": servers, "gateways": gateways, "tools": tools, "resources": resources, "prompts": prompts, "a2a_agents": agents}}
277 def _get_token_stats(self, db: Session) -> Dict[str, Any]:
278 """Get API token metrics.
280 Args:
281 db: Database session
283 Returns:
284 Dictionary with total token count and breakdown by status
286 Examples:
287 >>> service = SystemStatsService()
288 >>> # stats = service._get_token_stats(db)
289 >>> # assert stats["total"] >= 0
290 >>> # assert "active" in stats["breakdown"]
291 """
292 # Optimized from 3 queries to 2 using aggregated SELECT (separate tables need separate queries)
293 token_result = db.execute(
294 select(
295 func.count(EmailApiToken.id).label("total"),
296 func.sum(case((EmailApiToken.is_active.is_(True), 1), else_=0)).label("active"),
297 ).select_from(EmailApiToken)
298 ).one()
299 revoked = db.execute(select(func.count(TokenRevocation.jti))).scalar() or 0
301 total = token_result.total or 0
302 active = token_result.active or 0
304 return {"total": total, "breakdown": {"active": active, "inactive": total - active, "revoked": revoked}}
306 def _get_session_stats(self, db: Session) -> Dict[str, Any]:
307 """Get session and activity metrics.
309 Args:
310 db: Database session
312 Returns:
313 Dictionary with total session count and breakdown by type
315 Examples:
316 >>> service = SystemStatsService()
317 >>> # stats = service._get_session_stats(db)
318 >>> # assert stats["total"] >= 0
319 >>> # assert "mcp_sessions" in stats["breakdown"]
320 """
321 # Optimized from 4 queries to 1 using UNION ALL (separate tables need separate selects)
322 stmt = (
323 select(literal("mcp_sessions").label("type"), func.count(SessionRecord.session_id).label("cnt"))
324 .select_from(SessionRecord)
325 .union_all(
326 select(literal("mcp_messages").label("type"), func.count(SessionMessageRecord.id).label("cnt")).select_from(SessionMessageRecord),
327 select(literal("subscriptions").label("type"), func.count(ResourceSubscription.id).label("cnt")).select_from(ResourceSubscription),
328 select(literal("oauth_tokens").label("type"), func.count(OAuthToken.access_token).label("cnt")).select_from(OAuthToken),
329 )
330 )
331 results = db.execute(stmt).all()
332 counts = {row.type: row.cnt for row in results}
334 mcp_sessions = counts.get("mcp_sessions", 0)
335 mcp_messages = counts.get("mcp_messages", 0)
336 subscriptions = counts.get("subscriptions", 0)
337 oauth_tokens = counts.get("oauth_tokens", 0)
338 total = mcp_sessions + mcp_messages + subscriptions + oauth_tokens
340 return {"total": total, "breakdown": {"mcp_sessions": mcp_sessions, "mcp_messages": mcp_messages, "subscriptions": subscriptions, "oauth_tokens": oauth_tokens}}
342 def _get_metrics_stats(self, db: Session) -> Dict[str, Any]:
343 """Get metrics and analytics counts.
345 Args:
346 db: Database session
348 Returns:
349 Dictionary with total metrics count and breakdown by type
351 Examples:
352 >>> service = SystemStatsService()
353 >>> # stats = service._get_metrics_stats(db)
354 >>> # assert stats["total"] >= 0
355 >>> # assert "tool_metrics" in stats["breakdown"]
356 """
357 # Optimized from 6 queries to 1 using UNION ALL (separate tables need separate selects)
358 stmt = (
359 select(literal("tool_metrics").label("type"), func.count(ToolMetric.id).label("cnt"))
360 .select_from(ToolMetric)
361 .union_all(
362 select(literal("resource_metrics").label("type"), func.count(ResourceMetric.id).label("cnt")).select_from(ResourceMetric),
363 select(literal("prompt_metrics").label("type"), func.count(PromptMetric.id).label("cnt")).select_from(PromptMetric),
364 select(literal("server_metrics").label("type"), func.count(ServerMetric.id).label("cnt")).select_from(ServerMetric),
365 select(literal("a2a_agent_metrics").label("type"), func.count(A2AAgentMetric.id).label("cnt")).select_from(A2AAgentMetric),
366 select(literal("token_usage_logs").label("type"), func.count(TokenUsageLog.id).label("cnt")).select_from(TokenUsageLog),
367 )
368 )
369 results = db.execute(stmt).all()
370 counts = {row.type: row.cnt for row in results}
372 tool_metrics = counts.get("tool_metrics", 0)
373 resource_metrics = counts.get("resource_metrics", 0)
374 prompt_metrics = counts.get("prompt_metrics", 0)
375 server_metrics = counts.get("server_metrics", 0)
376 a2a_agent_metrics = counts.get("a2a_agent_metrics", 0)
377 token_usage_logs = counts.get("token_usage_logs", 0)
378 total = tool_metrics + resource_metrics + prompt_metrics + server_metrics + a2a_agent_metrics + token_usage_logs
380 return {
381 "total": total,
382 "breakdown": {
383 "tool_metrics": tool_metrics,
384 "resource_metrics": resource_metrics,
385 "prompt_metrics": prompt_metrics,
386 "server_metrics": server_metrics,
387 "a2a_agent_metrics": a2a_agent_metrics,
388 "token_usage_logs": token_usage_logs,
389 },
390 }
392 def _get_security_stats(self, db: Session) -> Dict[str, Any]:
393 """Get security and audit metrics.
395 Args:
396 db: Database session
398 Returns:
399 Dictionary with total security event count and breakdown by type
401 Examples:
402 >>> service = SystemStatsService()
403 >>> # stats = service._get_security_stats(db)
404 >>> # assert stats["total"] >= 0
405 >>> # assert "auth_events" in stats["breakdown"]
406 """
407 # Optimized from 4 queries to 1 using UNION ALL (separate tables need separate selects)
408 stmt = (
409 select(literal("auth_events").label("type"), func.count(EmailAuthEvent.id).label("cnt"))
410 .select_from(EmailAuthEvent)
411 .union_all(
412 select(literal("audit_logs").label("type"), func.count(PermissionAuditLog.id).label("cnt")).select_from(PermissionAuditLog),
413 select(literal("pending_approvals").label("type"), func.count(PendingUserApproval.id).label("cnt")).select_from(PendingUserApproval).where(PendingUserApproval.status == "pending"),
414 select(literal("sso_providers").label("type"), func.count(SSOProvider.id).label("cnt")).select_from(SSOProvider).where(SSOProvider.is_enabled.is_(True)),
415 )
416 )
417 results = db.execute(stmt).all()
418 counts = {row.type: row.cnt for row in results}
420 auth_events = counts.get("auth_events", 0)
421 audit_logs = counts.get("audit_logs", 0)
422 pending_approvals = counts.get("pending_approvals", 0)
423 sso_providers = counts.get("sso_providers", 0)
424 total = auth_events + audit_logs + pending_approvals
426 return {"total": total, "breakdown": {"auth_events": auth_events, "audit_logs": audit_logs, "pending_approvals": pending_approvals, "sso_providers": sso_providers}}
428 def _get_workflow_stats(self, db: Session) -> Dict[str, Any]:
429 """Get workflow state metrics.
431 Args:
432 db: Database session
434 Returns:
435 Dictionary with total workflow item count and breakdown by type
437 Examples:
438 >>> service = SystemStatsService()
439 >>> # stats = service._get_workflow_stats(db)
440 >>> # assert stats["total"] >= 0
441 >>> # assert "team_invitations" in stats["breakdown"]
442 """
443 # Optimized from 2 queries to 1 using UNION ALL (separate tables need separate selects)
444 stmt = (
445 select(literal("invitations").label("type"), func.count(EmailTeamInvitation.id).label("cnt"))
446 .select_from(EmailTeamInvitation)
447 .where(EmailTeamInvitation.is_active.is_(True))
448 .union_all(
449 select(literal("join_requests").label("type"), func.count(EmailTeamJoinRequest.id).label("cnt")).select_from(EmailTeamJoinRequest).where(EmailTeamJoinRequest.status == "pending"),
450 )
451 )
452 results = db.execute(stmt).all()
453 counts = {row.type: row.cnt for row in results}
455 invitations = counts.get("invitations", 0)
456 join_requests = counts.get("join_requests", 0)
457 total = invitations + join_requests
459 return {"total": total, "breakdown": {"team_invitations": invitations, "join_requests": join_requests}}