Coverage for mcpgateway / utils / psycopg3_optimizations.py: 100%

100 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/utils/psycopg3_optimizations.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5 

6psycopg3-specific optimizations for database operations. 

7 

8This module provides optimized database operations leveraging psycopg3's 

9advanced features: 

10- COPY protocol for bulk inserts (5-10x faster than INSERT) 

11- Pipeline mode for batch queries (reduced round-trips) 

12- Prepared statement hints 

13 

14These optimizations are PostgreSQL-specific and gracefully fall back to 

15standard SQLAlchemy operations for other databases. 

16 

17Examples: 

18 >>> from mcpgateway.utils.psycopg3_optimizations import is_psycopg3_backend 

19 >>> isinstance(is_psycopg3_backend(), bool) 

20 True 

21""" 

22 

23# Standard 

24from datetime import datetime 

25import io 

26import logging 

27from typing import Any, Iterable, List, Optional, Sequence, Tuple, TypeVar 

28 

29# Third-Party 

30from sqlalchemy import text 

31from sqlalchemy.orm import Session 

32 

33logger = logging.getLogger(__name__) 

34 

35# Check if we're using psycopg3 backend 

36_is_psycopg3: Optional[bool] = None 

37 

38 

39def is_psycopg3_backend() -> bool: 

40 """Check if the current database backend is PostgreSQL with psycopg3. 

41 

42 Returns: 

43 True if using PostgreSQL with psycopg3 driver, False otherwise. 

44 

45 Examples: 

46 >>> isinstance(is_psycopg3_backend(), bool) 

47 True 

48 """ 

49 global _is_psycopg3 

50 if _is_psycopg3 is None: 

51 try: 

52 # First-Party 

53 from mcpgateway.db import backend, driver 

54 

55 _is_psycopg3 = backend == "postgresql" and driver in ("psycopg", "default", "") 

56 except ImportError: 

57 _is_psycopg3 = False 

58 return _is_psycopg3 

59 

60 

61def _format_value_for_copy(value: Any) -> str: 

62 """Format a Python value for PostgreSQL COPY TEXT format. 

63 

64 Args: 

65 value: The value to format. 

66 

67 Returns: 

68 String representation suitable for COPY TEXT format. 

69 """ 

70 if value is None: 

71 return "\\N" # NULL representation in COPY 

72 if isinstance(value, bool): 

73 return "t" if value else "f" 

74 if isinstance(value, datetime): 

75 return value.isoformat() 

76 if isinstance(value, str): 

77 # Escape special characters for COPY TEXT format 

78 return value.replace("\\", "\\\\").replace("\t", "\\t").replace("\n", "\\n").replace("\r", "\\r") 

79 return str(value) 

80 

81 

82def bulk_insert_with_copy( 

83 db: Session, 

84 table_name: str, 

85 columns: Sequence[str], 

86 rows: Iterable[Sequence[Any]], 

87 schema: Optional[str] = None, 

88) -> int: 

89 """Bulk insert rows using PostgreSQL COPY protocol. 

90 

91 This is significantly faster than individual INSERT statements or even 

92 bulk_insert_mappings for large datasets. The COPY protocol streams data 

93 directly to PostgreSQL with minimal overhead. 

94 

95 Args: 

96 db: SQLAlchemy session. 

97 table_name: Name of the target table. 

98 columns: Sequence of column names to insert. 

99 rows: Iterable of row tuples matching column order. 

100 schema: Optional schema name (defaults to search_path). 

101 

102 Returns: 

103 Number of rows inserted. 

104 

105 Note: 

106 Falls back to executemany for non-PostgreSQL databases. 

107 """ 

108 if not is_psycopg3_backend(): 

109 # Fallback to standard INSERT for non-PostgreSQL 

110 return _bulk_insert_fallback(db, table_name, columns, rows, schema) 

111 

112 try: 

113 # Get raw psycopg connection from SQLAlchemy 

114 raw_conn = db.connection().connection.dbapi_connection 

115 

116 # Build the qualified table name 

117 qualified_table = f"{schema}.{table_name}" if schema else table_name 

118 columns_str = ", ".join(columns) 

119 

120 # Create a file-like object with COPY data 

121 buffer = io.StringIO() 

122 row_count = 0 

123 

124 for row in rows: 

125 line = "\t".join(_format_value_for_copy(v) for v in row) 

126 buffer.write(line + "\n") 

127 row_count += 1 

128 

129 if row_count == 0: 

130 return 0 

131 

132 buffer.seek(0) 

133 

134 # Use psycopg3's COPY FROM 

135 with raw_conn.cursor() as cur: 

136 with cur.copy(f"COPY {qualified_table} ({columns_str}) FROM STDIN") as copy: 

137 while data := buffer.read(8192): 

138 copy.write(data) 

139 

140 logger.debug("COPY inserted %d rows into %s", row_count, qualified_table) 

141 return row_count 

142 

143 except Exception as e: 

144 logger.warning("COPY failed, falling back to INSERT: %s", e) 

145 return _bulk_insert_fallback(db, table_name, columns, rows, schema) 

146 

147 

148def _bulk_insert_fallback( 

149 db: Session, 

150 table_name: str, 

151 columns: Sequence[str], 

152 rows: Iterable[Sequence[Any]], 

153 schema: Optional[str] = None, 

154) -> int: 

155 """Fallback bulk insert using executemany. 

156 

157 Args: 

158 db: SQLAlchemy session. 

159 table_name: Name of the target table. 

160 columns: Sequence of column names to insert. 

161 rows: Iterable of row tuples matching column order. 

162 schema: Optional schema name. 

163 

164 Returns: 

165 Number of rows inserted. 

166 """ 

167 qualified_table = f"{schema}.{table_name}" if schema else table_name 

168 columns_str = ", ".join(columns) 

169 placeholders = ", ".join(f":{col}" for col in columns) 

170 

171 sql = text(f"INSERT INTO {qualified_table} ({columns_str}) VALUES ({placeholders})") # nosec B608 - table/columns from SQLAlchemy models, not user input 

172 

173 row_list = list(rows) 

174 if not row_list: 

175 return 0 

176 

177 # Convert rows to list of dicts 

178 data = [dict(zip(columns, row)) for row in row_list] 

179 db.execute(sql, data) 

180 

181 return len(row_list) 

182 

183 

184T = TypeVar("T") 

185 

186 

187def execute_pipelined( 

188 db: Session, 

189 queries: Sequence[Tuple[str, dict]], 

190) -> List[List[Any]]: 

191 """Execute multiple queries in pipeline mode for reduced round-trips. 

192 

193 Pipeline mode allows sending multiple queries without waiting for 

194 individual responses, significantly reducing latency for independent 

195 queries. 

196 

197 Args: 

198 db: SQLAlchemy session. 

199 queries: Sequence of (sql_string, params_dict) tuples. 

200 

201 Returns: 

202 List of result lists, one per query. 

203 

204 Note: 

205 Falls back to sequential execution for non-PostgreSQL databases. 

206 """ 

207 if not queries: 

208 return [] 

209 

210 if not is_psycopg3_backend(): 

211 # Fallback to sequential execution 

212 return [list(db.execute(text(sql), params).fetchall()) for sql, params in queries] 

213 

214 try: 

215 raw_conn = db.connection().connection.dbapi_connection 

216 

217 results = [] 

218 with raw_conn.pipeline(): 

219 cursors = [] 

220 for sql, params in queries: 

221 cur = raw_conn.execute(sql, params) 

222 cursors.append(cur) 

223 

224 for cur in cursors: 

225 try: 

226 results.append(list(cur.fetchall())) 

227 except Exception: 

228 results.append([]) 

229 

230 logger.debug("Pipelined %d queries", len(queries)) 

231 return results 

232 

233 except Exception as e: 

234 logger.warning("Pipeline mode failed, falling back to sequential: %s", e) 

235 return [list(db.execute(text(sql), params).fetchall()) for sql, params in queries] 

236 

237 

238def bulk_insert_metrics( 

239 db: Session, 

240 table_name: str, 

241 metrics: Sequence[dict], 

242 columns: Optional[Sequence[str]] = None, 

243) -> int: 

244 """Optimized bulk insert for metric records. 

245 

246 Uses COPY protocol on PostgreSQL for maximum performance when 

247 writing metrics data. 

248 

249 Args: 

250 db: SQLAlchemy session. 

251 table_name: Name of the metrics table. 

252 metrics: Sequence of metric dictionaries. 

253 columns: Optional explicit column list. If not provided, 

254 uses keys from first metric dict. 

255 

256 Returns: 

257 Number of rows inserted. 

258 

259 Examples: 

260 >>> # Example usage (would need actual DB connection): 

261 >>> # bulk_insert_metrics(db, "tool_metrics", [ 

262 >>> # {"tool_id": "abc", "timestamp": datetime.now(), "response_time": 0.5, "is_success": True} 

263 >>> # ]) 

264 """ 

265 if not metrics: 

266 return 0 

267 

268 # Determine columns from first metric if not provided 

269 if columns is None: 

270 columns = list(metrics[0].keys()) 

271 

272 # Convert dicts to ordered tuples 

273 rows = [[m.get(col) for col in columns] for m in metrics] 

274 

275 return bulk_insert_with_copy(db, table_name, columns, rows) 

276 

277 

278def get_raw_connection(db: Session) -> Any: 

279 """Get the raw psycopg3 connection from a SQLAlchemy session. 

280 

281 Args: 

282 db: SQLAlchemy session. 

283 

284 Returns: 

285 The underlying psycopg3 connection, or None if not using psycopg3. 

286 """ 

287 if not is_psycopg3_backend(): 

288 return None 

289 

290 try: 

291 return db.connection().connection.dbapi_connection 

292 except Exception: 

293 return None