Coverage for mcpgateway / utils / psycopg3_optimizations.py: 100%
100 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/utils/psycopg3_optimizations.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
6psycopg3-specific optimizations for database operations.
8This module provides optimized database operations leveraging psycopg3's
9advanced features:
10- COPY protocol for bulk inserts (5-10x faster than INSERT)
11- Pipeline mode for batch queries (reduced round-trips)
12- Prepared statement hints
14These optimizations are PostgreSQL-specific and gracefully fall back to
15standard SQLAlchemy operations for other databases.
17Examples:
18 >>> from mcpgateway.utils.psycopg3_optimizations import is_psycopg3_backend
19 >>> isinstance(is_psycopg3_backend(), bool)
20 True
21"""
23# Standard
24from datetime import datetime
25import io
26import logging
27from typing import Any, Iterable, List, Optional, Sequence, Tuple, TypeVar
29# Third-Party
30from sqlalchemy import text
31from sqlalchemy.orm import Session
33logger = logging.getLogger(__name__)
35# Check if we're using psycopg3 backend
36_is_psycopg3: Optional[bool] = None
39def is_psycopg3_backend() -> bool:
40 """Check if the current database backend is PostgreSQL with psycopg3.
42 Returns:
43 True if using PostgreSQL with psycopg3 driver, False otherwise.
45 Examples:
46 >>> isinstance(is_psycopg3_backend(), bool)
47 True
48 """
49 global _is_psycopg3
50 if _is_psycopg3 is None:
51 try:
52 # First-Party
53 from mcpgateway.db import backend, driver
55 _is_psycopg3 = backend == "postgresql" and driver in ("psycopg", "default", "")
56 except ImportError:
57 _is_psycopg3 = False
58 return _is_psycopg3
61def _format_value_for_copy(value: Any) -> str:
62 """Format a Python value for PostgreSQL COPY TEXT format.
64 Args:
65 value: The value to format.
67 Returns:
68 String representation suitable for COPY TEXT format.
69 """
70 if value is None:
71 return "\\N" # NULL representation in COPY
72 if isinstance(value, bool):
73 return "t" if value else "f"
74 if isinstance(value, datetime):
75 return value.isoformat()
76 if isinstance(value, str):
77 # Escape special characters for COPY TEXT format
78 return value.replace("\\", "\\\\").replace("\t", "\\t").replace("\n", "\\n").replace("\r", "\\r")
79 return str(value)
82def bulk_insert_with_copy(
83 db: Session,
84 table_name: str,
85 columns: Sequence[str],
86 rows: Iterable[Sequence[Any]],
87 schema: Optional[str] = None,
88) -> int:
89 """Bulk insert rows using PostgreSQL COPY protocol.
91 This is significantly faster than individual INSERT statements or even
92 bulk_insert_mappings for large datasets. The COPY protocol streams data
93 directly to PostgreSQL with minimal overhead.
95 Args:
96 db: SQLAlchemy session.
97 table_name: Name of the target table.
98 columns: Sequence of column names to insert.
99 rows: Iterable of row tuples matching column order.
100 schema: Optional schema name (defaults to search_path).
102 Returns:
103 Number of rows inserted.
105 Note:
106 Falls back to executemany for non-PostgreSQL databases.
107 """
108 if not is_psycopg3_backend():
109 # Fallback to standard INSERT for non-PostgreSQL
110 return _bulk_insert_fallback(db, table_name, columns, rows, schema)
112 try:
113 # Get raw psycopg connection from SQLAlchemy
114 raw_conn = db.connection().connection.dbapi_connection
116 # Build the qualified table name
117 qualified_table = f"{schema}.{table_name}" if schema else table_name
118 columns_str = ", ".join(columns)
120 # Create a file-like object with COPY data
121 buffer = io.StringIO()
122 row_count = 0
124 for row in rows:
125 line = "\t".join(_format_value_for_copy(v) for v in row)
126 buffer.write(line + "\n")
127 row_count += 1
129 if row_count == 0:
130 return 0
132 buffer.seek(0)
134 # Use psycopg3's COPY FROM
135 with raw_conn.cursor() as cur:
136 with cur.copy(f"COPY {qualified_table} ({columns_str}) FROM STDIN") as copy:
137 while data := buffer.read(8192):
138 copy.write(data)
140 logger.debug("COPY inserted %d rows into %s", row_count, qualified_table)
141 return row_count
143 except Exception as e:
144 logger.warning("COPY failed, falling back to INSERT: %s", e)
145 return _bulk_insert_fallback(db, table_name, columns, rows, schema)
148def _bulk_insert_fallback(
149 db: Session,
150 table_name: str,
151 columns: Sequence[str],
152 rows: Iterable[Sequence[Any]],
153 schema: Optional[str] = None,
154) -> int:
155 """Fallback bulk insert using executemany.
157 Args:
158 db: SQLAlchemy session.
159 table_name: Name of the target table.
160 columns: Sequence of column names to insert.
161 rows: Iterable of row tuples matching column order.
162 schema: Optional schema name.
164 Returns:
165 Number of rows inserted.
166 """
167 qualified_table = f"{schema}.{table_name}" if schema else table_name
168 columns_str = ", ".join(columns)
169 placeholders = ", ".join(f":{col}" for col in columns)
171 sql = text(f"INSERT INTO {qualified_table} ({columns_str}) VALUES ({placeholders})") # nosec B608 - table/columns from SQLAlchemy models, not user input
173 row_list = list(rows)
174 if not row_list:
175 return 0
177 # Convert rows to list of dicts
178 data = [dict(zip(columns, row)) for row in row_list]
179 db.execute(sql, data)
181 return len(row_list)
184T = TypeVar("T")
187def execute_pipelined(
188 db: Session,
189 queries: Sequence[Tuple[str, dict]],
190) -> List[List[Any]]:
191 """Execute multiple queries in pipeline mode for reduced round-trips.
193 Pipeline mode allows sending multiple queries without waiting for
194 individual responses, significantly reducing latency for independent
195 queries.
197 Args:
198 db: SQLAlchemy session.
199 queries: Sequence of (sql_string, params_dict) tuples.
201 Returns:
202 List of result lists, one per query.
204 Note:
205 Falls back to sequential execution for non-PostgreSQL databases.
206 """
207 if not queries:
208 return []
210 if not is_psycopg3_backend():
211 # Fallback to sequential execution
212 return [list(db.execute(text(sql), params).fetchall()) for sql, params in queries]
214 try:
215 raw_conn = db.connection().connection.dbapi_connection
217 results = []
218 with raw_conn.pipeline():
219 cursors = []
220 for sql, params in queries:
221 cur = raw_conn.execute(sql, params)
222 cursors.append(cur)
224 for cur in cursors:
225 try:
226 results.append(list(cur.fetchall()))
227 except Exception:
228 results.append([])
230 logger.debug("Pipelined %d queries", len(queries))
231 return results
233 except Exception as e:
234 logger.warning("Pipeline mode failed, falling back to sequential: %s", e)
235 return [list(db.execute(text(sql), params).fetchall()) for sql, params in queries]
238def bulk_insert_metrics(
239 db: Session,
240 table_name: str,
241 metrics: Sequence[dict],
242 columns: Optional[Sequence[str]] = None,
243) -> int:
244 """Optimized bulk insert for metric records.
246 Uses COPY protocol on PostgreSQL for maximum performance when
247 writing metrics data.
249 Args:
250 db: SQLAlchemy session.
251 table_name: Name of the metrics table.
252 metrics: Sequence of metric dictionaries.
253 columns: Optional explicit column list. If not provided,
254 uses keys from first metric dict.
256 Returns:
257 Number of rows inserted.
259 Examples:
260 >>> # Example usage (would need actual DB connection):
261 >>> # bulk_insert_metrics(db, "tool_metrics", [
262 >>> # {"tool_id": "abc", "timestamp": datetime.now(), "response_time": 0.5, "is_success": True}
263 >>> # ])
264 """
265 if not metrics:
266 return 0
268 # Determine columns from first metric if not provided
269 if columns is None:
270 columns = list(metrics[0].keys())
272 # Convert dicts to ordered tuples
273 rows = [[m.get(col) for col in columns] for m in metrics]
275 return bulk_insert_with_copy(db, table_name, columns, rows)
278def get_raw_connection(db: Session) -> Any:
279 """Get the raw psycopg3 connection from a SQLAlchemy session.
281 Args:
282 db: SQLAlchemy session.
284 Returns:
285 The underlying psycopg3 connection, or None if not using psycopg3.
286 """
287 if not is_psycopg3_backend():
288 return None
290 try:
291 return db.connection().connection.dbapi_connection
292 except Exception:
293 return None