Coverage for mcpgateway / utils / analyze_query_log.py: 100%
97 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Analyze database query logs for N+1 patterns and performance issues.
4Copyright 2025
5SPDX-License-Identifier: Apache-2.0
7Usage:
8 python -m mcpgateway.utils.analyze_query_log [--json logs/db-queries.jsonl]
9 make query-log-analyze
10"""
12# Standard
13import argparse
14from collections import Counter, defaultdict
15from pathlib import Path
16import sys
17from typing import Any, Dict, List
19# Third-Party
20import orjson
23def load_json_log(filepath: Path) -> List[Dict[str, Any]]:
24 """Load JSON Lines log file.
26 Args:
27 filepath: Path to the JSONL file
29 Returns:
30 List of log entries
32 Examples:
33 >>> import tempfile
34 >>> from pathlib import Path
35 >>> from mcpgateway.utils.analyze_query_log import load_json_log
36 >>> with tempfile.TemporaryDirectory() as d:
37 ... p = Path(d) / "db-queries.jsonl"
38 ... _ = p.write_text('{\"query_count\": 2}\\nnot-json\\n{\"query_count\": 1}\\n', encoding="utf-8")
39 ... entries = load_json_log(p)
40 ... [e["query_count"] for e in entries]
41 [2, 1]
42 """
43 entries = []
44 with open(filepath, "r", encoding="utf-8") as f:
45 for line in f:
46 line = line.strip()
47 if line:
48 try:
49 entries.append(orjson.loads(line))
50 except orjson.JSONDecodeError:
51 continue
52 return entries
55def analyze_logs(entries: List[Dict[str, Any]]) -> Dict[str, Any]:
56 """Analyze log entries for patterns and issues.
58 Args:
59 entries: List of log entries
61 Returns:
62 Analysis results
64 Examples:
65 >>> from mcpgateway.utils.analyze_query_log import analyze_logs
66 >>> analysis = analyze_logs(
67 ... [
68 ... {"method": "GET", "path": "/x", "query_count": 2, "total_query_ms": 5, "n1_issues": None},
69 ... {"method": "GET", "path": "/x", "query_count": 3, "total_query_ms": 7, "n1_issues": [{"pattern": "SELECT 1", "table": "t", "count": 2}]},
70 ... ]
71 ... )
72 >>> (analysis["total_requests"], analysis["total_queries"], analysis["requests_with_n1"])
73 (2, 5, 1)
74 >>> analysis["endpoint_stats"][0][0]
75 'GET /x'
76 >>> analysis["endpoint_stats"][0][1]["avg_queries"]
77 2.5
78 >>> analysis["top_n1_patterns"][0]
79 ('t: SELECT 1', 2)
80 """
81 total_requests = len(entries)
82 total_queries = sum(e.get("query_count", 0) for e in entries)
84 # Find requests with N+1 issues
85 n1_requests = [e for e in entries if e.get("n1_issues")]
87 # Group by endpoint
88 endpoint_stats: Dict[str, Dict[str, Any]] = defaultdict(
89 lambda: {
90 "count": 0,
91 "total_queries": 0,
92 "total_query_ms": 0,
93 "n1_count": 0,
94 "max_queries": 0,
95 }
96 )
98 for e in entries:
99 key = f"{e.get('method', '?')} {e.get('path', '?')}"
100 stats = endpoint_stats[key]
101 stats["count"] += 1
102 stats["total_queries"] += e.get("query_count", 0)
103 stats["total_query_ms"] += e.get("total_query_ms", 0)
104 if e.get("n1_issues"):
105 stats["n1_count"] += 1
106 stats["max_queries"] = max(stats["max_queries"], e.get("query_count", 0))
108 # Calculate averages
109 for stats in endpoint_stats.values():
110 if stats["count"] > 0:
111 stats["avg_queries"] = round(stats["total_queries"] / stats["count"], 1)
112 stats["avg_query_ms"] = round(stats["total_query_ms"] / stats["count"], 1)
114 # Sort by total queries (most queries first)
115 sorted_endpoints = sorted(endpoint_stats.items(), key=lambda x: x[1]["total_queries"], reverse=True)
117 # Find most common N+1 patterns
118 n1_patterns: Counter = Counter()
119 for e in entries:
120 for issue in e.get("n1_issues") or []:
121 pattern = issue.get("pattern", "")[:100]
122 table = issue.get("table", "unknown")
123 n1_patterns[f"{table}: {pattern}"] += issue.get("count", 1)
125 return {
126 "total_requests": total_requests,
127 "total_queries": total_queries,
128 "avg_queries_per_request": round(total_queries / total_requests, 1) if total_requests else 0,
129 "requests_with_n1": len(n1_requests),
130 "n1_percentage": round(len(n1_requests) / total_requests * 100, 1) if total_requests else 0,
131 "endpoint_stats": sorted_endpoints,
132 "top_n1_patterns": n1_patterns.most_common(10),
133 }
136def print_report(analysis: Dict[str, Any]) -> None:
137 """Print analysis report to stdout.
139 Args:
140 analysis: Analysis results from analyze_logs()
142 Examples:
143 >>> import io
144 >>> from contextlib import redirect_stdout
145 >>> from mcpgateway.utils.analyze_query_log import print_report
146 >>> buf = io.StringIO()
147 >>> with redirect_stdout(buf):
148 ... print_report(
149 ... {
150 ... "total_requests": 0,
151 ... "total_queries": 0,
152 ... "avg_queries_per_request": 0,
153 ... "requests_with_n1": 0,
154 ... "n1_percentage": 0,
155 ... "endpoint_stats": [],
156 ... "top_n1_patterns": [],
157 ... }
158 ... )
159 >>> "DATABASE QUERY LOG ANALYSIS" in buf.getvalue()
160 True
161 """
162 print("\n" + "=" * 80)
163 print("DATABASE QUERY LOG ANALYSIS")
164 print("=" * 80)
166 print("\n📊 SUMMARY")
167 print(f" Total requests analyzed: {analysis['total_requests']}")
168 print(f" Total queries executed: {analysis['total_queries']}")
169 print(f" Avg queries per request: {analysis['avg_queries_per_request']}")
170 print(f" Requests with N+1: {analysis['requests_with_n1']} ({analysis['n1_percentage']}%)")
172 if analysis["requests_with_n1"] > 0:
173 print("\n⚠️ N+1 ISSUES DETECTED")
174 print(f" {analysis['requests_with_n1']} requests have potential N+1 query patterns")
176 if analysis["top_n1_patterns"]:
177 print("\n🔴 TOP N+1 PATTERNS")
178 for pattern, count in analysis["top_n1_patterns"]:
179 print(f" {count:4}x {pattern[:70]}...")
181 print("\n📈 ENDPOINTS BY QUERY COUNT (top 15)")
182 print(f" {'Endpoint':<40} {'Reqs':>6} {'Queries':>8} {'Avg':>6} {'Max':>5} {'N+1':>4}")
183 print(" " + "-" * 75)
185 for endpoint, stats in analysis["endpoint_stats"][:15]:
186 n1_marker = "⚠️" if stats["n1_count"] > 0 else " "
187 print(f" {endpoint:<40} {stats['count']:>6} {stats['total_queries']:>8} " f"{stats['avg_queries']:>6} {stats['max_queries']:>5} {n1_marker}{stats['n1_count']:>2}")
189 # Recommendations
190 print("\n💡 RECOMMENDATIONS")
192 high_query_endpoints = [(ep, s) for ep, s in analysis["endpoint_stats"] if s["avg_queries"] > 10]
193 if high_query_endpoints:
194 print(f" • {len(high_query_endpoints)} endpoints average >10 queries - consider eager loading")
195 for ep, stats in high_query_endpoints[:3]:
196 print(f" - {ep} (avg: {stats['avg_queries']} queries)")
198 if analysis["requests_with_n1"] > 0:
199 print(" • Review N+1 patterns above and add joinedload/selectinload")
200 print(" • See: docs/docs/development/db-performance.md")
202 print("\n" + "=" * 80 + "\n")
205def main() -> int:
206 """Main entry point for the analysis script.
208 Returns:
209 Exit code (0 for success, 1 for error)
211 Examples:
212 Missing logs return a non-zero exit code:
214 >>> import io
215 >>> import tempfile
216 >>> from contextlib import redirect_stdout
217 >>> from pathlib import Path
218 >>> from unittest.mock import patch
219 >>> from mcpgateway.utils.analyze_query_log import main
220 >>> with tempfile.TemporaryDirectory() as d:
221 ... missing = str(Path(d) / "missing.jsonl")
222 ... buf = io.StringIO()
223 ... with patch("sys.argv", ["prog", "--json", missing]), redirect_stdout(buf):
224 ... code = main()
225 ... code
226 1
227 """
228 parser = argparse.ArgumentParser(description="Analyze database query logs for N+1 patterns")
229 parser.add_argument("--json", default="logs/db-queries.jsonl", help="Path to JSON Lines log file (default: logs/db-queries.jsonl)")
230 args = parser.parse_args()
232 log_path = Path(args.json)
234 if not log_path.exists():
235 print(f"❌ Log file not found: {log_path}")
236 print(" Start the server with 'make dev-query-log' to generate logs")
237 return 1
239 if log_path.stat().st_size == 0:
240 print(f"❌ Log file is empty: {log_path}")
241 print(" Make some API requests to generate query logs")
242 return 1
244 print(f"📊 Loading {log_path}...")
245 entries = load_json_log(log_path)
247 if not entries:
248 print(f"❌ No valid entries found in {log_path}")
249 return 1
251 print(f" Loaded {len(entries)} request entries")
253 analysis = analyze_logs(entries)
254 print_report(analysis)
256 # Return non-zero if N+1 issues found (useful for CI)
257 return 1 if analysis["requests_with_n1"] > 0 else 0
260if __name__ == "__main__":
261 sys.exit(main())