Coverage for mcpgateway / utils / analyze_query_log.py: 100%

97 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Analyze database query logs for N+1 patterns and performance issues. 

3 

4Copyright 2025 

5SPDX-License-Identifier: Apache-2.0 

6 

7Usage: 

8 python -m mcpgateway.utils.analyze_query_log [--json logs/db-queries.jsonl] 

9 make query-log-analyze 

10""" 

11 

12# Standard 

13import argparse 

14from collections import Counter, defaultdict 

15from pathlib import Path 

16import sys 

17from typing import Any, Dict, List 

18 

19# Third-Party 

20import orjson 

21 

22 

23def load_json_log(filepath: Path) -> List[Dict[str, Any]]: 

24 """Load JSON Lines log file. 

25 

26 Args: 

27 filepath: Path to the JSONL file 

28 

29 Returns: 

30 List of log entries 

31 

32 Examples: 

33 >>> import tempfile 

34 >>> from pathlib import Path 

35 >>> from mcpgateway.utils.analyze_query_log import load_json_log 

36 >>> with tempfile.TemporaryDirectory() as d: 

37 ... p = Path(d) / "db-queries.jsonl" 

38 ... _ = p.write_text('{\"query_count\": 2}\\nnot-json\\n{\"query_count\": 1}\\n', encoding="utf-8") 

39 ... entries = load_json_log(p) 

40 ... [e["query_count"] for e in entries] 

41 [2, 1] 

42 """ 

43 entries = [] 

44 with open(filepath, "r", encoding="utf-8") as f: 

45 for line in f: 

46 line = line.strip() 

47 if line: 

48 try: 

49 entries.append(orjson.loads(line)) 

50 except orjson.JSONDecodeError: 

51 continue 

52 return entries 

53 

54 

55def analyze_logs(entries: List[Dict[str, Any]]) -> Dict[str, Any]: 

56 """Analyze log entries for patterns and issues. 

57 

58 Args: 

59 entries: List of log entries 

60 

61 Returns: 

62 Analysis results 

63 

64 Examples: 

65 >>> from mcpgateway.utils.analyze_query_log import analyze_logs 

66 >>> analysis = analyze_logs( 

67 ... [ 

68 ... {"method": "GET", "path": "/x", "query_count": 2, "total_query_ms": 5, "n1_issues": None}, 

69 ... {"method": "GET", "path": "/x", "query_count": 3, "total_query_ms": 7, "n1_issues": [{"pattern": "SELECT 1", "table": "t", "count": 2}]}, 

70 ... ] 

71 ... ) 

72 >>> (analysis["total_requests"], analysis["total_queries"], analysis["requests_with_n1"]) 

73 (2, 5, 1) 

74 >>> analysis["endpoint_stats"][0][0] 

75 'GET /x' 

76 >>> analysis["endpoint_stats"][0][1]["avg_queries"] 

77 2.5 

78 >>> analysis["top_n1_patterns"][0] 

79 ('t: SELECT 1', 2) 

80 """ 

81 total_requests = len(entries) 

82 total_queries = sum(e.get("query_count", 0) for e in entries) 

83 

84 # Find requests with N+1 issues 

85 n1_requests = [e for e in entries if e.get("n1_issues")] 

86 

87 # Group by endpoint 

88 endpoint_stats: Dict[str, Dict[str, Any]] = defaultdict( 

89 lambda: { 

90 "count": 0, 

91 "total_queries": 0, 

92 "total_query_ms": 0, 

93 "n1_count": 0, 

94 "max_queries": 0, 

95 } 

96 ) 

97 

98 for e in entries: 

99 key = f"{e.get('method', '?')} {e.get('path', '?')}" 

100 stats = endpoint_stats[key] 

101 stats["count"] += 1 

102 stats["total_queries"] += e.get("query_count", 0) 

103 stats["total_query_ms"] += e.get("total_query_ms", 0) 

104 if e.get("n1_issues"): 

105 stats["n1_count"] += 1 

106 stats["max_queries"] = max(stats["max_queries"], e.get("query_count", 0)) 

107 

108 # Calculate averages 

109 for stats in endpoint_stats.values(): 

110 if stats["count"] > 0: 

111 stats["avg_queries"] = round(stats["total_queries"] / stats["count"], 1) 

112 stats["avg_query_ms"] = round(stats["total_query_ms"] / stats["count"], 1) 

113 

114 # Sort by total queries (most queries first) 

115 sorted_endpoints = sorted(endpoint_stats.items(), key=lambda x: x[1]["total_queries"], reverse=True) 

116 

117 # Find most common N+1 patterns 

118 n1_patterns: Counter = Counter() 

119 for e in entries: 

120 for issue in e.get("n1_issues") or []: 

121 pattern = issue.get("pattern", "")[:100] 

122 table = issue.get("table", "unknown") 

123 n1_patterns[f"{table}: {pattern}"] += issue.get("count", 1) 

124 

125 return { 

126 "total_requests": total_requests, 

127 "total_queries": total_queries, 

128 "avg_queries_per_request": round(total_queries / total_requests, 1) if total_requests else 0, 

129 "requests_with_n1": len(n1_requests), 

130 "n1_percentage": round(len(n1_requests) / total_requests * 100, 1) if total_requests else 0, 

131 "endpoint_stats": sorted_endpoints, 

132 "top_n1_patterns": n1_patterns.most_common(10), 

133 } 

134 

135 

136def print_report(analysis: Dict[str, Any]) -> None: 

137 """Print analysis report to stdout. 

138 

139 Args: 

140 analysis: Analysis results from analyze_logs() 

141 

142 Examples: 

143 >>> import io 

144 >>> from contextlib import redirect_stdout 

145 >>> from mcpgateway.utils.analyze_query_log import print_report 

146 >>> buf = io.StringIO() 

147 >>> with redirect_stdout(buf): 

148 ... print_report( 

149 ... { 

150 ... "total_requests": 0, 

151 ... "total_queries": 0, 

152 ... "avg_queries_per_request": 0, 

153 ... "requests_with_n1": 0, 

154 ... "n1_percentage": 0, 

155 ... "endpoint_stats": [], 

156 ... "top_n1_patterns": [], 

157 ... } 

158 ... ) 

159 >>> "DATABASE QUERY LOG ANALYSIS" in buf.getvalue() 

160 True 

161 """ 

162 print("\n" + "=" * 80) 

163 print("DATABASE QUERY LOG ANALYSIS") 

164 print("=" * 80) 

165 

166 print("\n📊 SUMMARY") 

167 print(f" Total requests analyzed: {analysis['total_requests']}") 

168 print(f" Total queries executed: {analysis['total_queries']}") 

169 print(f" Avg queries per request: {analysis['avg_queries_per_request']}") 

170 print(f" Requests with N+1: {analysis['requests_with_n1']} ({analysis['n1_percentage']}%)") 

171 

172 if analysis["requests_with_n1"] > 0: 

173 print("\n⚠️ N+1 ISSUES DETECTED") 

174 print(f" {analysis['requests_with_n1']} requests have potential N+1 query patterns") 

175 

176 if analysis["top_n1_patterns"]: 

177 print("\n🔴 TOP N+1 PATTERNS") 

178 for pattern, count in analysis["top_n1_patterns"]: 

179 print(f" {count:4}x {pattern[:70]}...") 

180 

181 print("\n📈 ENDPOINTS BY QUERY COUNT (top 15)") 

182 print(f" {'Endpoint':<40} {'Reqs':>6} {'Queries':>8} {'Avg':>6} {'Max':>5} {'N+1':>4}") 

183 print(" " + "-" * 75) 

184 

185 for endpoint, stats in analysis["endpoint_stats"][:15]: 

186 n1_marker = "⚠️" if stats["n1_count"] > 0 else " " 

187 print(f" {endpoint:<40} {stats['count']:>6} {stats['total_queries']:>8} " f"{stats['avg_queries']:>6} {stats['max_queries']:>5} {n1_marker}{stats['n1_count']:>2}") 

188 

189 # Recommendations 

190 print("\n💡 RECOMMENDATIONS") 

191 

192 high_query_endpoints = [(ep, s) for ep, s in analysis["endpoint_stats"] if s["avg_queries"] > 10] 

193 if high_query_endpoints: 

194 print(f"{len(high_query_endpoints)} endpoints average >10 queries - consider eager loading") 

195 for ep, stats in high_query_endpoints[:3]: 

196 print(f" - {ep} (avg: {stats['avg_queries']} queries)") 

197 

198 if analysis["requests_with_n1"] > 0: 

199 print(" • Review N+1 patterns above and add joinedload/selectinload") 

200 print(" • See: docs/docs/development/db-performance.md") 

201 

202 print("\n" + "=" * 80 + "\n") 

203 

204 

205def main() -> int: 

206 """Main entry point for the analysis script. 

207 

208 Returns: 

209 Exit code (0 for success, 1 for error) 

210 

211 Examples: 

212 Missing logs return a non-zero exit code: 

213 

214 >>> import io 

215 >>> import tempfile 

216 >>> from contextlib import redirect_stdout 

217 >>> from pathlib import Path 

218 >>> from unittest.mock import patch 

219 >>> from mcpgateway.utils.analyze_query_log import main 

220 >>> with tempfile.TemporaryDirectory() as d: 

221 ... missing = str(Path(d) / "missing.jsonl") 

222 ... buf = io.StringIO() 

223 ... with patch("sys.argv", ["prog", "--json", missing]), redirect_stdout(buf): 

224 ... code = main() 

225 ... code 

226 1 

227 """ 

228 parser = argparse.ArgumentParser(description="Analyze database query logs for N+1 patterns") 

229 parser.add_argument("--json", default="logs/db-queries.jsonl", help="Path to JSON Lines log file (default: logs/db-queries.jsonl)") 

230 args = parser.parse_args() 

231 

232 log_path = Path(args.json) 

233 

234 if not log_path.exists(): 

235 print(f"❌ Log file not found: {log_path}") 

236 print(" Start the server with 'make dev-query-log' to generate logs") 

237 return 1 

238 

239 if log_path.stat().st_size == 0: 

240 print(f"❌ Log file is empty: {log_path}") 

241 print(" Make some API requests to generate query logs") 

242 return 1 

243 

244 print(f"📊 Loading {log_path}...") 

245 entries = load_json_log(log_path) 

246 

247 if not entries: 

248 print(f"❌ No valid entries found in {log_path}") 

249 return 1 

250 

251 print(f" Loaded {len(entries)} request entries") 

252 

253 analysis = analyze_logs(entries) 

254 print_report(analysis) 

255 

256 # Return non-zero if N+1 issues found (useful for CI) 

257 return 1 if analysis["requests_with_n1"] > 0 else 0 

258 

259 

260if __name__ == "__main__": 

261 sys.exit(main())