Coverage for mcpgateway / services / prompt_service.py: 99%

841 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/prompt_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Prompt Service Implementation. 

8This module implements prompt template management according to the MCP specification. 

9It handles: 

10- Prompt template registration and retrieval 

11- Prompt argument validation 

12- Template rendering with arguments 

13- Resource embedding in prompts 

14- Active/inactive prompt management 

15""" 

16 

17# Standard 

18import binascii 

19from datetime import datetime, timezone 

20from functools import lru_cache 

21from string import Formatter 

22import time 

23from typing import Any, AsyncGenerator, Dict, List, Optional, Set, Union 

24import uuid 

25 

26# Third-Party 

27from jinja2 import Environment, meta, select_autoescape, Template 

28from pydantic import ValidationError 

29from sqlalchemy import and_, delete, desc, not_, or_, select 

30from sqlalchemy.exc import IntegrityError, OperationalError 

31from sqlalchemy.orm import joinedload, Session 

32 

33# First-Party 

34from mcpgateway.common.models import Message, PromptResult, Role, TextContent 

35from mcpgateway.config import settings 

36from mcpgateway.db import EmailTeam 

37from mcpgateway.db import Gateway as DbGateway 

38from mcpgateway.db import get_for_update 

39from mcpgateway.db import Prompt as DbPrompt 

40from mcpgateway.db import PromptMetric, PromptMetricsHourly, server_prompt_association 

41from mcpgateway.observability import create_span 

42from mcpgateway.plugins.framework import get_plugin_manager, GlobalContext, PluginContextTable, PluginManager, PromptHookType, PromptPosthookPayload, PromptPrehookPayload 

43from mcpgateway.schemas import PromptCreate, PromptMetrics, PromptRead, PromptUpdate, TopPerformer 

44from mcpgateway.services.audit_trail_service import get_audit_trail_service 

45from mcpgateway.services.base_service import BaseService 

46from mcpgateway.services.event_service import EventService 

47from mcpgateway.services.logging_service import LoggingService 

48from mcpgateway.services.metrics_buffer_service import get_metrics_buffer_service 

49from mcpgateway.services.metrics_cleanup_service import delete_metrics_in_batches, pause_rollup_during_purge 

50from mcpgateway.services.observability_service import current_trace_id, ObservabilityService 

51from mcpgateway.services.structured_logger import get_structured_logger 

52from mcpgateway.services.team_management_service import TeamManagementService 

53from mcpgateway.utils.create_slug import slugify 

54from mcpgateway.utils.metrics_common import build_top_performers 

55from mcpgateway.utils.pagination import unified_paginate 

56from mcpgateway.utils.sqlalchemy_modifier import json_contains_tag_expr 

57 

58# Cache import (lazy to avoid circular dependencies) 

59_REGISTRY_CACHE = None 

60 

61# Module-level Jinja environment singleton for template caching 

62_JINJA_ENV: Optional[Environment] = None 

63 

64 

65def _get_jinja_env() -> Environment: 

66 """Get or create the module-level Jinja environment singleton. 

67 

68 Returns: 

69 Jinja2 Environment with autoescape and trim settings. 

70 """ 

71 global _JINJA_ENV # pylint: disable=global-statement 

72 if _JINJA_ENV is None: 

73 _JINJA_ENV = Environment( 

74 autoescape=select_autoescape(["html", "xml"]), 

75 trim_blocks=True, 

76 lstrip_blocks=True, 

77 ) 

78 return _JINJA_ENV 

79 

80 

81@lru_cache(maxsize=256) 

82def _compile_jinja_template(template: str) -> Template: 

83 """Cache compiled Jinja template by template string. 

84 

85 Args: 

86 template: The template string to compile. 

87 

88 Returns: 

89 Compiled Jinja Template object. 

90 """ 

91 return _get_jinja_env().from_string(template) 

92 

93 

94def _get_registry_cache(): 

95 """Get registry cache singleton lazily. 

96 

97 Returns: 

98 RegistryCache instance. 

99 """ 

100 global _REGISTRY_CACHE # pylint: disable=global-statement 

101 if _REGISTRY_CACHE is None: 

102 # First-Party 

103 from mcpgateway.cache.registry_cache import registry_cache # pylint: disable=import-outside-toplevel 

104 

105 _REGISTRY_CACHE = registry_cache 

106 return _REGISTRY_CACHE 

107 

108 

109# Initialize logging service first 

110logging_service = LoggingService() 

111logger = logging_service.get_logger(__name__) 

112 

113# Initialize structured logger, audit trail, and metrics buffer for prompt operations 

114structured_logger = get_structured_logger("prompt_service") 

115audit_trail = get_audit_trail_service() 

116metrics_buffer = get_metrics_buffer_service() 

117 

118 

119class PromptError(Exception): 

120 """Base class for prompt-related errors.""" 

121 

122 

123class PromptNotFoundError(PromptError): 

124 """Raised when a requested prompt is not found.""" 

125 

126 

127class PromptNameConflictError(PromptError): 

128 """Raised when a prompt name conflicts with existing (active or inactive) prompt.""" 

129 

130 def __init__(self, name: str, enabled: bool = True, prompt_id: Optional[int] = None, visibility: str = "public") -> None: 

131 """Initialize the error with prompt information. 

132 

133 Args: 

134 name: The conflicting prompt name 

135 enabled: Whether the existing prompt is enabled 

136 prompt_id: ID of the existing prompt if available 

137 visibility: Prompt visibility level (private, team, public). 

138 

139 Examples: 

140 >>> from mcpgateway.services.prompt_service import PromptNameConflictError 

141 >>> error = PromptNameConflictError("test_prompt") 

142 >>> error.name 

143 'test_prompt' 

144 >>> error.enabled 

145 True 

146 >>> error.prompt_id is None 

147 True 

148 >>> error = PromptNameConflictError("inactive_prompt", False, 123) 

149 >>> error.enabled 

150 False 

151 >>> error.prompt_id 

152 123 

153 """ 

154 self.name = name 

155 self.enabled = enabled 

156 self.prompt_id = prompt_id 

157 message = f"{visibility.capitalize()} Prompt already exists with name: {name}" 

158 if not enabled: 

159 message += f" (currently inactive, ID: {prompt_id})" 

160 super().__init__(message) 

161 

162 

163class PromptValidationError(PromptError): 

164 """Raised when prompt validation fails.""" 

165 

166 

167class PromptLockConflictError(PromptError): 

168 """Raised when a prompt row is locked by another transaction. 

169 

170 Raises: 

171 PromptLockConflictError: When attempting to modify a prompt that is 

172 currently locked by another concurrent request. 

173 """ 

174 

175 

176class PromptService(BaseService): 

177 """Service for managing prompt templates. 

178 

179 Handles: 

180 - Template registration and retrieval 

181 - Argument validation 

182 - Template rendering 

183 - Resource embedding 

184 - Active/inactive status management 

185 """ 

186 

187 _visibility_model_cls = DbPrompt 

188 

189 def __init__(self) -> None: 

190 """ 

191 Initialize the prompt service. 

192 

193 Sets up the Jinja2 environment for rendering prompt templates. 

194 Although these templates are rendered as JSON for the API, if the output is ever 

195 embedded into an HTML page, unescaped content could be exploited for cross-site scripting (XSS) attacks. 

196 Enabling autoescaping for 'html' and 'xml' templates via select_autoescape helps mitigate this risk. 

197 

198 Examples: 

199 >>> from mcpgateway.services.prompt_service import PromptService 

200 >>> service = PromptService() 

201 >>> isinstance(service._event_service, EventService) 

202 True 

203 >>> service._jinja_env is not None 

204 True 

205 """ 

206 self._event_service = EventService(channel_name="mcpgateway:prompt_events") 

207 # Use the module-level singleton for template caching 

208 self._jinja_env = _get_jinja_env() 

209 self._plugin_manager: PluginManager | None = get_plugin_manager() 

210 

211 async def initialize(self) -> None: 

212 """Initialize the service.""" 

213 logger.info("Initializing prompt service") 

214 await self._event_service.initialize() 

215 

216 async def shutdown(self) -> None: 

217 """Shutdown the service. 

218 

219 Examples: 

220 >>> from mcpgateway.services.prompt_service import PromptService 

221 >>> from unittest.mock import AsyncMock 

222 >>> import asyncio 

223 >>> service = PromptService() 

224 >>> service._event_service = AsyncMock() 

225 >>> asyncio.run(service.shutdown()) 

226 >>> # Verify event service shutdown was called 

227 >>> service._event_service.shutdown.assert_awaited_once() 

228 """ 

229 await self._event_service.shutdown() 

230 logger.info("Prompt service shutdown complete") 

231 

232 async def get_top_prompts(self, db: Session, limit: Optional[int] = 5, include_deleted: bool = False) -> List[TopPerformer]: 

233 """Retrieve the top-performing prompts based on execution count. 

234 

235 Queries the database to get prompts with their metrics, ordered by the number of executions 

236 in descending order. Combines recent raw metrics with historical hourly rollups for complete 

237 historical coverage. Returns a list of TopPerformer objects containing prompt details and 

238 performance metrics. Results are cached for performance. 

239 

240 Args: 

241 db (Session): Database session for querying prompt metrics. 

242 limit (Optional[int]): Maximum number of prompts to return. Defaults to 5. 

243 include_deleted (bool): Whether to include deleted prompts from rollups. 

244 

245 Returns: 

246 List[TopPerformer]: A list of TopPerformer objects, each containing: 

247 - id: Prompt ID. 

248 - name: Prompt name. 

249 - execution_count: Total number of executions. 

250 - avg_response_time: Average response time in seconds, or None if no metrics. 

251 - success_rate: Success rate percentage, or None if no metrics. 

252 - last_execution: Timestamp of the last execution, or None if no metrics. 

253 """ 

254 # Check cache first (if enabled) 

255 # First-Party 

256 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel 

257 

258 effective_limit = limit or 5 

259 cache_key = f"top_prompts:{effective_limit}:include_deleted={include_deleted}" 

260 

261 if is_cache_enabled(): 

262 cached = metrics_cache.get(cache_key) 

263 if cached is not None: 

264 return cached 

265 

266 # Use combined query that includes both raw metrics and rollup data 

267 # First-Party 

268 from mcpgateway.services.metrics_query_service import get_top_performers_combined # pylint: disable=import-outside-toplevel 

269 

270 results = get_top_performers_combined( 

271 db=db, 

272 metric_type="prompt", 

273 entity_model=DbPrompt, 

274 limit=effective_limit, 

275 include_deleted=include_deleted, 

276 ) 

277 top_performers = build_top_performers(results) 

278 

279 # Cache the result (if enabled) 

280 if is_cache_enabled(): 

281 metrics_cache.set(cache_key, top_performers) 

282 

283 return top_performers 

284 

285 def convert_prompt_to_read(self, db_prompt: DbPrompt, include_metrics: bool = False) -> PromptRead: 

286 """ 

287 Convert a DbPrompt instance to a PromptRead Pydantic model, 

288 optionally including aggregated metrics computed from the associated PromptMetric records. 

289 

290 Args: 

291 db_prompt: Db prompt to convert 

292 include_metrics: Whether to include metrics in the result. Defaults to False. 

293 Set to False for list operations to avoid N+1 query issues. 

294 

295 Returns: 

296 PromptRead: Pydantic model instance 

297 """ 

298 arg_schema = db_prompt.argument_schema or {} 

299 properties = arg_schema.get("properties", {}) 

300 required_list = arg_schema.get("required", []) 

301 arguments_list = [] 

302 for arg_name, prop in properties.items(): 

303 arguments_list.append( 

304 { 

305 "name": arg_name, 

306 "description": prop.get("description") or "", 

307 "required": arg_name in required_list, 

308 } 

309 ) 

310 

311 # Compute aggregated metrics only if requested (avoids N+1 queries in list operations) 

312 if include_metrics: 

313 total = len(db_prompt.metrics) if hasattr(db_prompt, "metrics") and db_prompt.metrics is not None else 0 

314 successful = sum(1 for m in db_prompt.metrics if m.is_success) if total > 0 else 0 

315 failed = sum(1 for m in db_prompt.metrics if not m.is_success) if total > 0 else 0 

316 failure_rate = failed / total if total > 0 else 0.0 

317 min_rt = min((m.response_time for m in db_prompt.metrics), default=None) if total > 0 else None 

318 max_rt = max((m.response_time for m in db_prompt.metrics), default=None) if total > 0 else None 

319 avg_rt = (sum(m.response_time for m in db_prompt.metrics) / total) if total > 0 else None 

320 last_time = max((m.timestamp for m in db_prompt.metrics), default=None) if total > 0 else None 

321 

322 metrics_dict = { 

323 "totalExecutions": total, 

324 "successfulExecutions": successful, 

325 "failedExecutions": failed, 

326 "failureRate": failure_rate, 

327 "minResponseTime": min_rt, 

328 "maxResponseTime": max_rt, 

329 "avgResponseTime": avg_rt, 

330 "lastExecutionTime": last_time, 

331 } 

332 else: 

333 metrics_dict = None 

334 

335 original_name = getattr(db_prompt, "original_name", None) or db_prompt.name 

336 custom_name = getattr(db_prompt, "custom_name", None) or original_name 

337 custom_name_slug = getattr(db_prompt, "custom_name_slug", None) or slugify(custom_name) 

338 display_name = getattr(db_prompt, "display_name", None) or custom_name 

339 

340 prompt_dict = { 

341 "id": db_prompt.id, 

342 "name": db_prompt.name, 

343 "original_name": original_name, 

344 "custom_name": custom_name, 

345 "custom_name_slug": custom_name_slug, 

346 "display_name": display_name, 

347 "gateway_slug": getattr(db_prompt, "gateway_slug", None), 

348 "description": db_prompt.description, 

349 "template": db_prompt.template, 

350 "arguments": arguments_list, 

351 "created_at": db_prompt.created_at, 

352 "updated_at": db_prompt.updated_at, 

353 "enabled": db_prompt.enabled, 

354 "metrics": metrics_dict, 

355 "tags": db_prompt.tags or [], 

356 "visibility": db_prompt.visibility, 

357 "team": getattr(db_prompt, "team", None), 

358 # Include metadata fields for proper API response 

359 "created_by": getattr(db_prompt, "created_by", None), 

360 "modified_by": getattr(db_prompt, "modified_by", None), 

361 "created_from_ip": getattr(db_prompt, "created_from_ip", None), 

362 "created_via": getattr(db_prompt, "created_via", None), 

363 "created_user_agent": getattr(db_prompt, "created_user_agent", None), 

364 "modified_from_ip": getattr(db_prompt, "modified_from_ip", None), 

365 "modified_via": getattr(db_prompt, "modified_via", None), 

366 "modified_user_agent": getattr(db_prompt, "modified_user_agent", None), 

367 "version": getattr(db_prompt, "version", None), 

368 "team_id": getattr(db_prompt, "team_id", None), 

369 "owner_email": getattr(db_prompt, "owner_email", None), 

370 } 

371 return PromptRead.model_validate(prompt_dict) 

372 

373 def _get_team_name(self, db: Session, team_id: Optional[str]) -> Optional[str]: 

374 """Retrieve the team name given a team ID. 

375 

376 Args: 

377 db (Session): Database session for querying teams. 

378 team_id (Optional[str]): The ID of the team. 

379 

380 Returns: 

381 Optional[str]: The name of the team if found, otherwise None. 

382 """ 

383 if not team_id: 

384 return None 

385 team = db.query(EmailTeam).filter(EmailTeam.id == team_id, EmailTeam.is_active.is_(True)).first() 

386 db.commit() # Release transaction to avoid idle-in-transaction 

387 return team.name if team else None 

388 

389 def _compute_prompt_name(self, custom_name: str, gateway: Optional[Any] = None) -> str: 

390 """Compute the stored prompt name from custom_name and gateway context. 

391 

392 Args: 

393 custom_name: Prompt name to slugify and store. 

394 gateway: Optional gateway for namespacing. 

395 

396 Returns: 

397 The stored prompt name with gateway prefix when applicable. 

398 """ 

399 name_slug = slugify(custom_name) 

400 if gateway: 

401 gateway_slug = slugify(gateway.name) 

402 return f"{gateway_slug}{settings.gateway_tool_name_separator}{name_slug}" 

403 return name_slug 

404 

405 async def register_prompt( 

406 self, 

407 db: Session, 

408 prompt: PromptCreate, 

409 created_by: Optional[str] = None, 

410 created_from_ip: Optional[str] = None, 

411 created_via: Optional[str] = None, 

412 created_user_agent: Optional[str] = None, 

413 import_batch_id: Optional[str] = None, 

414 federation_source: Optional[str] = None, 

415 team_id: Optional[str] = None, 

416 owner_email: Optional[str] = None, 

417 visibility: Optional[str] = "public", 

418 ) -> PromptRead: 

419 """Register a new prompt template. 

420 

421 Args: 

422 db: Database session 

423 prompt: Prompt creation schema 

424 created_by: Username who created this prompt 

425 created_from_ip: IP address of creator 

426 created_via: Creation method (ui, api, import, federation) 

427 created_user_agent: User agent of creation request 

428 import_batch_id: UUID for bulk import operations 

429 federation_source: Source gateway for federated prompts 

430 team_id (Optional[str]): Team ID to assign the prompt to. 

431 owner_email (Optional[str]): Email of the user who owns this prompt. 

432 visibility (str): Prompt visibility level (private, team, public). 

433 

434 Returns: 

435 Created prompt information 

436 

437 Raises: 

438 IntegrityError: If a database integrity error occurs. 

439 PromptNameConflictError: If a prompt with the same name already exists. 

440 PromptError: For other prompt registration errors 

441 

442 Examples: 

443 >>> from mcpgateway.services.prompt_service import PromptService 

444 >>> from unittest.mock import MagicMock 

445 >>> service = PromptService() 

446 >>> db = MagicMock() 

447 >>> prompt = MagicMock() 

448 >>> db.execute.return_value.scalar_one_or_none.return_value = None 

449 >>> db.add = MagicMock() 

450 >>> db.commit = MagicMock() 

451 >>> db.refresh = MagicMock() 

452 >>> service._notify_prompt_added = MagicMock() 

453 >>> service.convert_prompt_to_read = MagicMock(return_value={}) 

454 >>> import asyncio 

455 >>> try: 

456 ... asyncio.run(service.register_prompt(db, prompt)) 

457 ... except Exception: 

458 ... pass 

459 """ 

460 try: 

461 # Validate template syntax 

462 self._validate_template(prompt.template) 

463 

464 # Extract required arguments from template 

465 required_args = self._get_required_arguments(prompt.template) 

466 

467 # Create argument schema 

468 argument_schema = { 

469 "type": "object", 

470 "properties": {}, 

471 "required": list(required_args), 

472 } 

473 for arg in prompt.arguments: 

474 schema = {"type": "string"} 

475 if arg.description is not None: 

476 schema["description"] = arg.description 

477 argument_schema["properties"][arg.name] = schema 

478 

479 custom_name = prompt.custom_name or prompt.name 

480 display_name = prompt.display_name or custom_name 

481 

482 # Extract gateway_id from prompt if present and look up gateway for namespacing 

483 gateway_id = getattr(prompt, "gateway_id", None) 

484 gateway = None 

485 if gateway_id: 

486 gateway = db.execute(select(DbGateway).where(DbGateway.id == gateway_id)).scalar_one_or_none() 

487 

488 computed_name = self._compute_prompt_name(custom_name, gateway=gateway) 

489 

490 # Create DB model 

491 db_prompt = DbPrompt( 

492 name=computed_name, 

493 original_name=prompt.name, 

494 custom_name=custom_name, 

495 display_name=display_name, 

496 description=prompt.description, 

497 template=prompt.template, 

498 argument_schema=argument_schema, 

499 tags=prompt.tags, 

500 # Metadata fields 

501 created_by=created_by, 

502 created_from_ip=created_from_ip, 

503 created_via=created_via, 

504 created_user_agent=created_user_agent, 

505 import_batch_id=import_batch_id, 

506 federation_source=federation_source, 

507 version=1, 

508 # Team scoping fields - use schema values if provided, otherwise fallback to parameters 

509 team_id=getattr(prompt, "team_id", None) or team_id, 

510 owner_email=getattr(prompt, "owner_email", None) or owner_email or created_by, 

511 visibility=getattr(prompt, "visibility", None) or visibility, 

512 gateway_id=gateway_id, 

513 ) 

514 # Check for existing server with the same name 

515 if visibility.lower() == "public": 

516 # Check for existing public prompt with the same name and gateway_id 

517 existing_prompt = db.execute(select(DbPrompt).where(DbPrompt.name == computed_name, DbPrompt.visibility == "public", DbPrompt.gateway_id == gateway_id)).scalar_one_or_none() 

518 if existing_prompt: 

519 raise PromptNameConflictError(computed_name, enabled=existing_prompt.enabled, prompt_id=existing_prompt.id, visibility=existing_prompt.visibility) 

520 elif visibility.lower() == "team": 

521 # Check for existing team prompt with the same name and gateway_id 

522 existing_prompt = db.execute( 

523 select(DbPrompt).where(DbPrompt.name == computed_name, DbPrompt.visibility == "team", DbPrompt.team_id == team_id, DbPrompt.gateway_id == gateway_id) 

524 ).scalar_one_or_none() 

525 if existing_prompt: 

526 raise PromptNameConflictError(computed_name, enabled=existing_prompt.enabled, prompt_id=existing_prompt.id, visibility=existing_prompt.visibility) 

527 

528 # Set gateway relationship to help the before_insert event handler compute the name correctly 

529 if gateway: 

530 db_prompt.gateway = gateway 

531 db_prompt.gateway_name_cache = gateway.name # type: ignore[attr-defined] 

532 

533 # Add to DB 

534 db.add(db_prompt) 

535 db.commit() 

536 db.refresh(db_prompt) 

537 # Notify subscribers 

538 await self._notify_prompt_added(db_prompt) 

539 

540 logger.info(f"Registered prompt: {prompt.name}") 

541 

542 # Structured logging: Audit trail for prompt creation 

543 audit_trail.log_action( 

544 user_id=created_by or "system", 

545 action="create_prompt", 

546 resource_type="prompt", 

547 resource_id=str(db_prompt.id), 

548 resource_name=db_prompt.name, 

549 user_email=owner_email, 

550 team_id=team_id, 

551 client_ip=created_from_ip, 

552 user_agent=created_user_agent, 

553 new_values={ 

554 "name": db_prompt.name, 

555 "visibility": visibility, 

556 }, 

557 context={ 

558 "created_via": created_via, 

559 "import_batch_id": import_batch_id, 

560 "federation_source": federation_source, 

561 }, 

562 db=db, 

563 ) 

564 

565 # Structured logging: Log successful prompt creation 

566 structured_logger.log( 

567 level="INFO", 

568 message="Prompt created successfully", 

569 event_type="prompt_created", 

570 component="prompt_service", 

571 user_id=created_by, 

572 user_email=owner_email, 

573 team_id=team_id, 

574 resource_type="prompt", 

575 resource_id=str(db_prompt.id), 

576 custom_fields={ 

577 "prompt_name": db_prompt.name, 

578 "visibility": visibility, 

579 }, 

580 ) 

581 

582 db_prompt.team = self._get_team_name(db, db_prompt.team_id) 

583 prompt_dict = self.convert_prompt_to_read(db_prompt) 

584 

585 # Invalidate cache after successful creation 

586 cache = _get_registry_cache() 

587 await cache.invalidate_prompts() 

588 # Also invalidate tags cache since prompt tags may have changed 

589 # First-Party 

590 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel 

591 

592 await admin_stats_cache.invalidate_tags() 

593 # First-Party 

594 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel 

595 

596 metrics_cache.invalidate_prefix("top_prompts:") 

597 metrics_cache.invalidate("prompts") 

598 

599 return PromptRead.model_validate(prompt_dict) 

600 

601 except IntegrityError as ie: 

602 logger.error(f"IntegrityErrors in group: {ie}") 

603 

604 structured_logger.log( 

605 level="ERROR", 

606 message="Prompt creation failed due to database integrity error", 

607 event_type="prompt_creation_failed", 

608 component="prompt_service", 

609 user_id=created_by, 

610 user_email=owner_email, 

611 error=ie, 

612 custom_fields={"prompt_name": prompt.name}, 

613 ) 

614 raise ie 

615 except PromptNameConflictError as se: 

616 db.rollback() 

617 

618 structured_logger.log( 

619 level="WARNING", 

620 message="Prompt creation failed due to name conflict", 

621 event_type="prompt_name_conflict", 

622 component="prompt_service", 

623 user_id=created_by, 

624 user_email=owner_email, 

625 custom_fields={"prompt_name": prompt.name, "visibility": visibility}, 

626 ) 

627 raise se 

628 except Exception as e: 

629 db.rollback() 

630 

631 structured_logger.log( 

632 level="ERROR", 

633 message="Prompt creation failed", 

634 event_type="prompt_creation_failed", 

635 component="prompt_service", 

636 user_id=created_by, 

637 user_email=owner_email, 

638 error=e, 

639 custom_fields={"prompt_name": prompt.name}, 

640 ) 

641 raise PromptError(f"Failed to register prompt: {str(e)}") 

642 

643 async def register_prompts_bulk( 

644 self, 

645 db: Session, 

646 prompts: List[PromptCreate], 

647 created_by: Optional[str] = None, 

648 created_from_ip: Optional[str] = None, 

649 created_via: Optional[str] = None, 

650 created_user_agent: Optional[str] = None, 

651 import_batch_id: Optional[str] = None, 

652 federation_source: Optional[str] = None, 

653 team_id: Optional[str] = None, 

654 owner_email: Optional[str] = None, 

655 visibility: Optional[str] = "public", 

656 conflict_strategy: str = "skip", 

657 ) -> Dict[str, Any]: 

658 """Register multiple prompts in bulk with a single commit. 

659 

660 This method provides significant performance improvements over individual 

661 prompt registration by: 

662 - Using db.add_all() instead of individual db.add() calls 

663 - Performing a single commit for all prompts 

664 - Batch conflict detection 

665 - Chunking for very large imports (>500 items) 

666 

667 Args: 

668 db: Database session 

669 prompts: List of prompt creation schemas 

670 created_by: Username who created these prompts 

671 created_from_ip: IP address of creator 

672 created_via: Creation method (ui, api, import, federation) 

673 created_user_agent: User agent of creation request 

674 import_batch_id: UUID for bulk import operations 

675 federation_source: Source gateway for federated prompts 

676 team_id: Team ID to assign the prompts to 

677 owner_email: Email of the user who owns these prompts 

678 visibility: Prompt visibility level (private, team, public) 

679 conflict_strategy: How to handle conflicts (skip, update, rename, fail) 

680 

681 Returns: 

682 Dict with statistics: 

683 - created: Number of prompts created 

684 - updated: Number of prompts updated 

685 - skipped: Number of prompts skipped 

686 - failed: Number of prompts that failed 

687 - errors: List of error messages 

688 

689 Raises: 

690 PromptError: If bulk registration fails critically 

691 

692 Examples: 

693 >>> from mcpgateway.services.prompt_service import PromptService 

694 >>> from unittest.mock import MagicMock 

695 >>> service = PromptService() 

696 >>> db = MagicMock() 

697 >>> prompts = [MagicMock(), MagicMock()] 

698 >>> import asyncio 

699 >>> try: 

700 ... result = asyncio.run(service.register_prompts_bulk(db, prompts)) 

701 ... except Exception: 

702 ... pass 

703 """ 

704 if not prompts: 

705 return {"created": 0, "updated": 0, "skipped": 0, "failed": 0, "errors": []} 

706 

707 stats = {"created": 0, "updated": 0, "skipped": 0, "failed": 0, "errors": []} 

708 

709 # Process in chunks to avoid memory issues and SQLite parameter limits 

710 chunk_size = 500 

711 

712 for chunk_start in range(0, len(prompts), chunk_size): 

713 chunk = prompts[chunk_start : chunk_start + chunk_size] 

714 

715 try: 

716 # Collect unique gateway_ids and look them up 

717 gateway_ids = set() 

718 for prompt in chunk: 

719 gw_id = getattr(prompt, "gateway_id", None) 

720 if gw_id: 

721 gateway_ids.add(gw_id) 

722 

723 gateways_map: Dict[str, Any] = {} 

724 if gateway_ids: 

725 gateways = db.execute(select(DbGateway).where(DbGateway.id.in_(gateway_ids))).scalars().all() 

726 gateways_map = {gw.id: gw for gw in gateways} 

727 

728 # Batch check for existing prompts to detect conflicts 

729 # Build computed names with gateway context 

730 prompt_names = [] 

731 for prompt in chunk: 

732 custom_name = getattr(prompt, "custom_name", None) or prompt.name 

733 gw_id = getattr(prompt, "gateway_id", None) 

734 gateway = gateways_map.get(gw_id) if gw_id else None 

735 computed_name = self._compute_prompt_name(custom_name, gateway=gateway) 

736 prompt_names.append(computed_name) 

737 

738 # Query for existing prompts - need to consider gateway_id in conflict detection 

739 # Build base query conditions 

740 if visibility.lower() == "public": 

741 base_conditions = [DbPrompt.name.in_(prompt_names), DbPrompt.visibility == "public"] 

742 elif visibility.lower() == "team" and team_id: 

743 base_conditions = [DbPrompt.name.in_(prompt_names), DbPrompt.visibility == "team", DbPrompt.team_id == team_id] 

744 else: 

745 # Private prompts - check by owner 

746 base_conditions = [DbPrompt.name.in_(prompt_names), DbPrompt.visibility == "private", DbPrompt.owner_email == (owner_email or created_by)] 

747 

748 existing_prompts_query = select(DbPrompt).where(*base_conditions) 

749 existing_prompts = db.execute(existing_prompts_query).scalars().all() 

750 # Use (name, gateway_id) tuple as key for proper conflict detection 

751 existing_prompts_map = {(p.name, p.gateway_id): p for p in existing_prompts} 

752 

753 prompts_to_add = [] 

754 prompts_to_update = [] 

755 

756 for prompt in chunk: 

757 try: 

758 # Validate template syntax 

759 self._validate_template(prompt.template) 

760 

761 # Extract required arguments from template 

762 required_args = self._get_required_arguments(prompt.template) 

763 

764 # Create argument schema 

765 argument_schema = { 

766 "type": "object", 

767 "properties": {}, 

768 "required": list(required_args), 

769 } 

770 for arg in prompt.arguments: 

771 schema = {"type": "string"} 

772 if arg.description is not None: 

773 schema["description"] = arg.description 

774 argument_schema["properties"][arg.name] = schema 

775 

776 # Use provided parameters or schema values 

777 prompt_team_id = team_id if team_id is not None else getattr(prompt, "team_id", None) 

778 prompt_owner_email = owner_email or getattr(prompt, "owner_email", None) or created_by 

779 prompt_visibility = visibility if visibility is not None else getattr(prompt, "visibility", "public") 

780 prompt_gateway_id = getattr(prompt, "gateway_id", None) 

781 

782 custom_name = getattr(prompt, "custom_name", None) or prompt.name 

783 display_name = getattr(prompt, "display_name", None) or custom_name 

784 gateway = gateways_map.get(prompt_gateway_id) if prompt_gateway_id else None 

785 computed_name = self._compute_prompt_name(custom_name, gateway=gateway) 

786 

787 # Look up existing prompt by (name, gateway_id) tuple 

788 existing_prompt = existing_prompts_map.get((computed_name, prompt_gateway_id)) 

789 

790 if existing_prompt: 

791 # Handle conflict based on strategy 

792 if conflict_strategy == "skip": 

793 stats["skipped"] += 1 

794 continue 

795 if conflict_strategy == "update": 

796 # Update existing prompt 

797 existing_prompt.description = prompt.description 

798 existing_prompt.template = prompt.template 

799 # Clear template cache to reduce memory growth 

800 _compile_jinja_template.cache_clear() 

801 existing_prompt.argument_schema = argument_schema 

802 existing_prompt.tags = prompt.tags or [] 

803 if getattr(prompt, "custom_name", None) is not None: 

804 existing_prompt.custom_name = custom_name 

805 if getattr(prompt, "display_name", None) is not None: 

806 existing_prompt.display_name = display_name 

807 existing_prompt.modified_by = created_by 

808 existing_prompt.modified_from_ip = created_from_ip 

809 existing_prompt.modified_via = created_via 

810 existing_prompt.modified_user_agent = created_user_agent 

811 existing_prompt.updated_at = datetime.now(timezone.utc) 

812 existing_prompt.version = (existing_prompt.version or 1) + 1 

813 

814 prompts_to_update.append(existing_prompt) 

815 stats["updated"] += 1 

816 elif conflict_strategy == "rename": 

817 # Create with renamed prompt 

818 new_name = f"{prompt.name}_imported_{int(datetime.now().timestamp())}" 

819 new_custom_name = new_name 

820 new_display_name = new_name 

821 computed_name = self._compute_prompt_name(new_custom_name, gateway=gateway) 

822 db_prompt = DbPrompt( 

823 name=computed_name, 

824 original_name=prompt.name, 

825 custom_name=new_custom_name, 

826 display_name=new_display_name, 

827 description=prompt.description, 

828 template=prompt.template, 

829 argument_schema=argument_schema, 

830 tags=prompt.tags or [], 

831 created_by=created_by, 

832 created_from_ip=created_from_ip, 

833 created_via=created_via, 

834 created_user_agent=created_user_agent, 

835 import_batch_id=import_batch_id, 

836 federation_source=federation_source, 

837 version=1, 

838 team_id=prompt_team_id, 

839 owner_email=prompt_owner_email, 

840 visibility=prompt_visibility, 

841 gateway_id=prompt_gateway_id, 

842 ) 

843 # Set gateway relationship to help the before_insert event handler 

844 if gateway: 

845 db_prompt.gateway = gateway 

846 db_prompt.gateway_name_cache = gateway.name # type: ignore[attr-defined] 

847 prompts_to_add.append(db_prompt) 

848 stats["created"] += 1 

849 elif conflict_strategy == "fail": 

850 stats["failed"] += 1 

851 stats["errors"].append(f"Prompt name conflict: {prompt.name}") 

852 continue 

853 else: 

854 # Create new prompt 

855 db_prompt = DbPrompt( 

856 name=computed_name, 

857 original_name=prompt.name, 

858 custom_name=custom_name, 

859 display_name=display_name, 

860 description=prompt.description, 

861 template=prompt.template, 

862 argument_schema=argument_schema, 

863 tags=prompt.tags or [], 

864 created_by=created_by, 

865 created_from_ip=created_from_ip, 

866 created_via=created_via, 

867 created_user_agent=created_user_agent, 

868 import_batch_id=import_batch_id, 

869 federation_source=federation_source, 

870 version=1, 

871 team_id=prompt_team_id, 

872 owner_email=prompt_owner_email, 

873 visibility=prompt_visibility, 

874 gateway_id=prompt_gateway_id, 

875 ) 

876 # Set gateway relationship to help the before_insert event handler 

877 if gateway: 

878 db_prompt.gateway = gateway 

879 db_prompt.gateway_name_cache = gateway.name # type: ignore[attr-defined] 

880 prompts_to_add.append(db_prompt) 

881 stats["created"] += 1 

882 

883 except Exception as e: 

884 stats["failed"] += 1 

885 stats["errors"].append(f"Failed to process prompt {prompt.name}: {str(e)}") 

886 logger.warning(f"Failed to process prompt {prompt.name} in bulk operation: {str(e)}") 

887 continue 

888 

889 # Bulk add new prompts 

890 if prompts_to_add: 

891 db.add_all(prompts_to_add) 

892 

893 # Commit the chunk 

894 db.commit() 

895 

896 # Refresh prompts for notifications and audit trail 

897 for db_prompt in prompts_to_add: 

898 db.refresh(db_prompt) 

899 # Notify subscribers 

900 await self._notify_prompt_added(db_prompt) 

901 

902 # Log bulk audit trail entry 

903 if prompts_to_add or prompts_to_update: 

904 audit_trail.log_action( 

905 user_id=created_by or "system", 

906 action="bulk_create_prompts" if prompts_to_add else "bulk_update_prompts", 

907 resource_type="prompt", 

908 resource_id=import_batch_id or "bulk_operation", 

909 resource_name=f"Bulk operation: {len(prompts_to_add)} created, {len(prompts_to_update)} updated", 

910 user_email=owner_email, 

911 team_id=team_id, 

912 client_ip=created_from_ip, 

913 user_agent=created_user_agent, 

914 new_values={ 

915 "prompts_created": len(prompts_to_add), 

916 "prompts_updated": len(prompts_to_update), 

917 "visibility": visibility, 

918 }, 

919 context={ 

920 "created_via": created_via, 

921 "import_batch_id": import_batch_id, 

922 "federation_source": federation_source, 

923 "conflict_strategy": conflict_strategy, 

924 }, 

925 db=db, 

926 ) 

927 

928 logger.info(f"Bulk registered {len(prompts_to_add)} prompts, updated {len(prompts_to_update)} prompts in chunk") 

929 

930 except Exception as e: 

931 db.rollback() 

932 logger.error(f"Failed to process chunk in bulk prompt registration: {str(e)}") 

933 stats["failed"] += len(chunk) 

934 stats["errors"].append(f"Chunk processing failed: {str(e)}") 

935 continue 

936 

937 # Final structured logging 

938 structured_logger.log( 

939 level="INFO", 

940 message="Bulk prompt registration completed", 

941 event_type="prompts_bulk_created", 

942 component="prompt_service", 

943 user_id=created_by, 

944 user_email=owner_email, 

945 team_id=team_id, 

946 resource_type="prompt", 

947 custom_fields={ 

948 "prompts_created": stats["created"], 

949 "prompts_updated": stats["updated"], 

950 "prompts_skipped": stats["skipped"], 

951 "prompts_failed": stats["failed"], 

952 "total_prompts": len(prompts), 

953 "visibility": visibility, 

954 "conflict_strategy": conflict_strategy, 

955 }, 

956 ) 

957 

958 return stats 

959 

960 async def list_prompts( 

961 self, 

962 db: Session, 

963 include_inactive: bool = False, 

964 cursor: Optional[str] = None, 

965 tags: Optional[List[str]] = None, 

966 limit: Optional[int] = None, 

967 page: Optional[int] = None, 

968 per_page: Optional[int] = None, 

969 user_email: Optional[str] = None, 

970 team_id: Optional[str] = None, 

971 visibility: Optional[str] = None, 

972 token_teams: Optional[List[str]] = None, 

973 ) -> Union[tuple[List[PromptRead], Optional[str]], Dict[str, Any]]: 

974 """ 

975 Retrieve a list of prompt templates from the database with pagination support. 

976 

977 This method retrieves prompt templates from the database and converts them into a list 

978 of PromptRead objects. It supports filtering out inactive prompts based on the 

979 include_inactive parameter and cursor-based pagination. 

980 

981 Args: 

982 db (Session): The SQLAlchemy database session. 

983 include_inactive (bool): If True, include inactive prompts in the result. 

984 Defaults to False. 

985 cursor (Optional[str], optional): An opaque cursor token for pagination. 

986 Opaque base64-encoded string containing last item's ID and created_at. 

987 tags (Optional[List[str]]): Filter prompts by tags. If provided, only prompts with at least one matching tag will be returned. 

988 limit (Optional[int]): Maximum number of prompts to return. Use 0 for all prompts (no limit). 

989 If not specified, uses pagination_default_page_size. 

990 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor. 

991 per_page: Items per page for page-based pagination. Defaults to pagination_default_page_size. 

992 user_email (Optional[str]): User email for team-based access control. If None, no access control is applied. 

993 team_id (Optional[str]): Filter by specific team ID. Requires user_email for access validation. 

994 visibility (Optional[str]): Filter by visibility (private, team, public). 

995 token_teams (Optional[List[str]]): Override DB team lookup with token's teams. Used for MCP/API token access 

996 where the token scope should be respected instead of the user's full team memberships. 

997 

998 Returns: 

999 If page is provided: Dict with {"data": [...], "pagination": {...}, "links": {...}} 

1000 If cursor is provided or neither: tuple of (list of PromptRead objects, next_cursor). 

1001 

1002 Examples: 

1003 >>> from mcpgateway.services.prompt_service import PromptService 

1004 >>> from unittest.mock import MagicMock 

1005 >>> from mcpgateway.schemas import PromptRead 

1006 >>> service = PromptService() 

1007 >>> db = MagicMock() 

1008 >>> prompt_read_obj = MagicMock(spec=PromptRead) 

1009 >>> service.convert_prompt_to_read = MagicMock(return_value=prompt_read_obj) 

1010 >>> db.execute.return_value.scalars.return_value.all.return_value = [MagicMock()] 

1011 >>> import asyncio 

1012 >>> prompts, next_cursor = asyncio.run(service.list_prompts(db)) 

1013 >>> prompts == [prompt_read_obj] 

1014 True 

1015 """ 

1016 # Check cache for first page only (cursor=None) 

1017 # Skip caching when: 

1018 # - user_email is provided (team-filtered results are user-specific) 

1019 # - token_teams is set (scoped access, e.g., public-only or team-scoped tokens) 

1020 # - page-based pagination is used 

1021 # This prevents cache poisoning where admin results could leak to public-only requests 

1022 cache = _get_registry_cache() 

1023 if cursor is None and user_email is None and token_teams is None and page is None: 

1024 filters_hash = cache.hash_filters(include_inactive=include_inactive, tags=sorted(tags) if tags else None) 

1025 cached = await cache.get("prompts", filters_hash) 

1026 if cached is not None: 

1027 # Reconstruct PromptRead objects from cached dicts 

1028 cached_prompts = [PromptRead.model_validate(p) for p in cached["prompts"]] 

1029 return (cached_prompts, cached.get("next_cursor")) 

1030 

1031 # Build base query with ordering and eager load gateway to avoid N+1 

1032 query = select(DbPrompt).options(joinedload(DbPrompt.gateway)).order_by(desc(DbPrompt.created_at), desc(DbPrompt.id)) 

1033 

1034 if not include_inactive: 

1035 query = query.where(DbPrompt.enabled) 

1036 

1037 query = await self._apply_access_control(query, db, user_email, token_teams, team_id) 

1038 

1039 if visibility: 

1040 query = query.where(DbPrompt.visibility == visibility) 

1041 

1042 # Add tag filtering if tags are provided (supports both List[str] and List[Dict] formats) 

1043 if tags: 

1044 query = query.where(json_contains_tag_expr(db, DbPrompt.tags, tags, match_any=True)) 

1045 

1046 # Use unified pagination helper - handles both page and cursor pagination 

1047 pag_result = await unified_paginate( 

1048 db=db, 

1049 query=query, 

1050 page=page, 

1051 per_page=per_page, 

1052 cursor=cursor, 

1053 limit=limit, 

1054 base_url="/admin/prompts", # Used for page-based links 

1055 query_params={"include_inactive": include_inactive} if include_inactive else {}, 

1056 ) 

1057 

1058 next_cursor = None 

1059 # Extract servers based on pagination type 

1060 if page is not None: 

1061 # Page-based: pag_result is a dict 

1062 prompts_db = pag_result["data"] 

1063 else: 

1064 # Cursor-based: pag_result is a tuple 

1065 prompts_db, next_cursor = pag_result 

1066 

1067 # Fetch team names for the prompts (common for both pagination types) 

1068 team_ids_set = {s.team_id for s in prompts_db if s.team_id} 

1069 team_map = {} 

1070 if team_ids_set: 

1071 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(team_ids_set), EmailTeam.is_active.is_(True))).all() 

1072 team_map = {team.id: team.name for team in teams} 

1073 

1074 db.commit() # Release transaction to avoid idle-in-transaction 

1075 

1076 # Convert to PromptRead (common for both pagination types) 

1077 result = [] 

1078 for s in prompts_db: 

1079 try: 

1080 s.team = team_map.get(s.team_id) if s.team_id else None 

1081 result.append(self.convert_prompt_to_read(s, include_metrics=False)) 

1082 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e: 

1083 logger.exception(f"Failed to convert prompt {getattr(s, 'id', 'unknown')} ({getattr(s, 'name', 'unknown')}): {e}") 

1084 # Continue with remaining prompts instead of failing completely 

1085 # Return appropriate format based on pagination type 

1086 if page is not None: 

1087 # Page-based format 

1088 return { 

1089 "data": result, 

1090 "pagination": pag_result["pagination"], 

1091 "links": pag_result["links"], 

1092 } 

1093 

1094 # Cursor-based format 

1095 

1096 # Cache first page results - only for non-user-specific/non-scoped queries 

1097 # Must match the same conditions as cache lookup to prevent cache poisoning 

1098 if cursor is None and user_email is None and token_teams is None: 

1099 try: 

1100 cache_data = {"prompts": [s.model_dump(mode="json") for s in result], "next_cursor": next_cursor} 

1101 await cache.set("prompts", cache_data, filters_hash) 

1102 except AttributeError: 

1103 pass # Skip caching if result objects don't support model_dump (e.g., in doctests) 

1104 

1105 return (result, next_cursor) 

1106 

1107 async def list_prompts_for_user( 

1108 self, db: Session, user_email: str, team_id: Optional[str] = None, visibility: Optional[str] = None, include_inactive: bool = False, skip: int = 0, limit: int = 100 

1109 ) -> List[PromptRead]: 

1110 """ 

1111 DEPRECATED: Use list_prompts() with user_email parameter instead. 

1112 

1113 This method is maintained for backward compatibility but is no longer used. 

1114 New code should call list_prompts() with user_email, team_id, and visibility parameters. 

1115 

1116 List prompts user has access to with team filtering. 

1117 

1118 Args: 

1119 db: Database session 

1120 user_email: Email of the user requesting prompts 

1121 team_id: Optional team ID to filter by specific team 

1122 visibility: Optional visibility filter (private, team, public) 

1123 include_inactive: Whether to include inactive prompts 

1124 skip: Number of prompts to skip for pagination 

1125 limit: Maximum number of prompts to return 

1126 

1127 Returns: 

1128 List[PromptRead]: Prompts the user has access to 

1129 """ 

1130 # Build query following existing patterns from list_prompts() 

1131 team_service = TeamManagementService(db) 

1132 user_teams = await team_service.get_user_teams(user_email) 

1133 team_ids = [team.id for team in user_teams] 

1134 

1135 # Build query following existing patterns from list_resources() 

1136 # Eager load gateway to avoid N+1 when accessing gateway_slug 

1137 query = select(DbPrompt).options(joinedload(DbPrompt.gateway)) 

1138 

1139 # Apply active/inactive filter 

1140 if not include_inactive: 

1141 query = query.where(DbPrompt.enabled) 

1142 

1143 if team_id: 

1144 if team_id not in team_ids: 

1145 return [] # No access to team 

1146 

1147 access_conditions = [] 

1148 # Filter by specific team 

1149 access_conditions.append(and_(DbPrompt.team_id == team_id, DbPrompt.visibility.in_(["team", "public"]))) 

1150 

1151 access_conditions.append(and_(DbPrompt.team_id == team_id, DbPrompt.owner_email == user_email)) 

1152 

1153 query = query.where(or_(*access_conditions)) 

1154 else: 

1155 # Get user's accessible teams 

1156 # Build access conditions following existing patterns 

1157 access_conditions = [] 

1158 # 1. User's personal resources (owner_email matches) 

1159 access_conditions.append(DbPrompt.owner_email == user_email) 

1160 # 2. Team resources where user is member 

1161 if team_ids: 

1162 access_conditions.append(and_(DbPrompt.team_id.in_(team_ids), DbPrompt.visibility.in_(["team", "public"]))) 

1163 # 3. Public resources (if visibility allows) 

1164 access_conditions.append(DbPrompt.visibility == "public") 

1165 

1166 query = query.where(or_(*access_conditions)) 

1167 

1168 # Apply visibility filter if specified 

1169 if visibility: 

1170 query = query.where(DbPrompt.visibility == visibility) 

1171 

1172 # Apply pagination following existing patterns 

1173 query = query.offset(skip).limit(limit) 

1174 

1175 prompts = db.execute(query).scalars().all() 

1176 

1177 # Batch fetch team names to avoid N+1 queries 

1178 prompt_team_ids = {p.team_id for p in prompts if p.team_id} 

1179 team_map = {} 

1180 if prompt_team_ids: 

1181 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(prompt_team_ids), EmailTeam.is_active.is_(True))).all() 

1182 team_map = {str(team.id): team.name for team in teams} 

1183 

1184 db.commit() # Release transaction to avoid idle-in-transaction 

1185 

1186 result = [] 

1187 for t in prompts: 

1188 try: 

1189 t.team = team_map.get(str(t.team_id)) if t.team_id else None 

1190 result.append(self.convert_prompt_to_read(t, include_metrics=False)) 

1191 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e: 

1192 logger.exception(f"Failed to convert prompt {getattr(t, 'id', 'unknown')} ({getattr(t, 'name', 'unknown')}): {e}") 

1193 # Continue with remaining prompts instead of failing completely 

1194 return result 

1195 

1196 async def list_server_prompts( 

1197 self, 

1198 db: Session, 

1199 server_id: str, 

1200 include_inactive: bool = False, 

1201 cursor: Optional[str] = None, 

1202 user_email: Optional[str] = None, 

1203 token_teams: Optional[List[str]] = None, 

1204 ) -> List[PromptRead]: 

1205 """ 

1206 Retrieve a list of prompt templates from the database. 

1207 

1208 This method retrieves prompt templates from the database and converts them into a list 

1209 of PromptRead objects. It supports filtering out inactive prompts based on the 

1210 include_inactive parameter. The cursor parameter is reserved for future pagination support 

1211 but is currently not implemented. 

1212 

1213 Args: 

1214 db (Session): The SQLAlchemy database session. 

1215 server_id (str): Server ID 

1216 include_inactive (bool): If True, include inactive prompts in the result. 

1217 Defaults to False. 

1218 cursor (Optional[str], optional): An opaque cursor token for pagination. Currently, 

1219 this parameter is ignored. Defaults to None. 

1220 user_email (Optional[str]): User email for visibility filtering. If None, no filtering applied. 

1221 token_teams (Optional[List[str]]): Override DB team lookup with token's teams. Used for MCP/API 

1222 token access where the token scope should be respected. 

1223 

1224 Returns: 

1225 List[PromptRead]: A list of prompt templates represented as PromptRead objects. 

1226 

1227 Examples: 

1228 >>> from mcpgateway.services.prompt_service import PromptService 

1229 >>> from unittest.mock import MagicMock 

1230 >>> from mcpgateway.schemas import PromptRead 

1231 >>> service = PromptService() 

1232 >>> db = MagicMock() 

1233 >>> prompt_read_obj = MagicMock(spec=PromptRead) 

1234 >>> service.convert_prompt_to_read = MagicMock(return_value=prompt_read_obj) 

1235 >>> db.execute.return_value.scalars.return_value.all.return_value = [MagicMock()] 

1236 >>> import asyncio 

1237 >>> result = asyncio.run(service.list_server_prompts(db, 'server1')) 

1238 >>> result == [prompt_read_obj] 

1239 True 

1240 """ 

1241 # Eager load gateway to avoid N+1 when accessing gateway_slug 

1242 query = ( 

1243 select(DbPrompt) 

1244 .options(joinedload(DbPrompt.gateway)) 

1245 .join(server_prompt_association, DbPrompt.id == server_prompt_association.c.prompt_id) 

1246 .where(server_prompt_association.c.server_id == server_id) 

1247 ) 

1248 if not include_inactive: 

1249 query = query.where(DbPrompt.enabled) 

1250 

1251 # Add visibility filtering if user context OR token_teams provided 

1252 # This ensures unauthenticated requests with token_teams=[] only see public prompts 

1253 if user_email is not None or token_teams is not None: # empty-string user_email -> public-only filtering (secure default) 

1254 # Use token_teams if provided (for MCP/API token access), otherwise look up from DB 

1255 if token_teams is not None: 

1256 team_ids = token_teams 

1257 elif user_email: 

1258 team_service = TeamManagementService(db) 

1259 user_teams = await team_service.get_user_teams(user_email) 

1260 team_ids = [team.id for team in user_teams] 

1261 else: 

1262 team_ids = [] 

1263 

1264 # Check if this is a public-only token (empty teams array) 

1265 # Public-only tokens can ONLY see public resources - no owner access 

1266 is_public_only_token = token_teams is not None and len(token_teams) == 0 

1267 

1268 access_conditions = [ 

1269 DbPrompt.visibility == "public", 

1270 ] 

1271 # Only include owner access for non-public-only tokens with user_email 

1272 if not is_public_only_token and user_email: 

1273 access_conditions.append(DbPrompt.owner_email == user_email) 

1274 if team_ids: 

1275 access_conditions.append(and_(DbPrompt.team_id.in_(team_ids), DbPrompt.visibility.in_(["team", "public"]))) 

1276 query = query.where(or_(*access_conditions)) 

1277 

1278 # Cursor-based pagination logic can be implemented here in the future. 

1279 logger.debug(cursor) 

1280 prompts = db.execute(query).scalars().all() 

1281 

1282 # Batch fetch team names to avoid N+1 queries 

1283 prompt_team_ids = {p.team_id for p in prompts if p.team_id} 

1284 team_map = {} 

1285 if prompt_team_ids: 

1286 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(prompt_team_ids), EmailTeam.is_active.is_(True))).all() 

1287 team_map = {str(team.id): team.name for team in teams} 

1288 

1289 db.commit() # Release transaction to avoid idle-in-transaction 

1290 

1291 result = [] 

1292 for t in prompts: 

1293 try: 

1294 t.team = team_map.get(str(t.team_id)) if t.team_id else None 

1295 result.append(self.convert_prompt_to_read(t, include_metrics=False)) 

1296 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e: 

1297 logger.exception(f"Failed to convert prompt {getattr(t, 'id', 'unknown')} ({getattr(t, 'name', 'unknown')}): {e}") 

1298 # Continue with remaining prompts instead of failing completely 

1299 return result 

1300 

1301 async def _record_prompt_metric(self, db: Session, prompt: DbPrompt, start_time: float, success: bool, error_message: Optional[str]) -> None: 

1302 """ 

1303 Records a metric for a prompt invocation. 

1304 

1305 Args: 

1306 db: Database session 

1307 prompt: The prompt that was invoked 

1308 start_time: Monotonic start time of the invocation 

1309 success: True if successful, False otherwise 

1310 error_message: Error message if failed, None otherwise 

1311 """ 

1312 end_time = time.monotonic() 

1313 response_time = end_time - start_time 

1314 

1315 metric = PromptMetric( 

1316 prompt_id=prompt.id, 

1317 response_time=response_time, 

1318 is_success=success, 

1319 error_message=error_message, 

1320 ) 

1321 db.add(metric) 

1322 db.commit() 

1323 

1324 async def _check_prompt_access( 

1325 self, 

1326 db: Session, 

1327 prompt: DbPrompt, 

1328 user_email: Optional[str], 

1329 token_teams: Optional[List[str]], 

1330 ) -> bool: 

1331 """Check if user has access to a prompt based on visibility rules. 

1332 

1333 Implements the same access control logic as list_prompts() for consistency. 

1334 

1335 Args: 

1336 db: Database session for team membership lookup if needed. 

1337 prompt: Prompt ORM object with visibility, team_id, owner_email. 

1338 user_email: Email of the requesting user (None = unauthenticated). 

1339 token_teams: List of team IDs from token. 

1340 - None = unrestricted admin access 

1341 - [] = public-only token 

1342 - [...] = team-scoped token 

1343 

1344 Returns: 

1345 True if access is allowed, False otherwise. 

1346 """ 

1347 visibility = getattr(prompt, "visibility", "public") 

1348 prompt_team_id = getattr(prompt, "team_id", None) 

1349 prompt_owner_email = getattr(prompt, "owner_email", None) 

1350 

1351 # Public prompts are accessible by everyone 

1352 if visibility == "public": 

1353 return True 

1354 

1355 # Admin bypass: token_teams=None AND user_email=None means unrestricted admin 

1356 # This happens when is_admin=True and no team scoping in token 

1357 if token_teams is None and user_email is None: 

1358 return True 

1359 

1360 # No user context (but not admin) = deny access to non-public prompts 

1361 if not user_email: 

1362 return False 

1363 

1364 # Public-only tokens (empty teams array) can ONLY access public prompts 

1365 is_public_only_token = token_teams is not None and len(token_teams) == 0 

1366 if is_public_only_token: 

1367 return False # Already checked public above 

1368 

1369 # Owner can access their own private prompts 

1370 if visibility == "private" and prompt_owner_email and prompt_owner_email == user_email: 

1371 return True 

1372 

1373 # Team prompts: check team membership (matches list_prompts behavior) 

1374 if prompt_team_id: 

1375 # Use token_teams if provided, otherwise look up from DB 

1376 if token_teams is not None: 

1377 team_ids = token_teams 

1378 else: 

1379 team_service = TeamManagementService(db) 

1380 user_teams = await team_service.get_user_teams(user_email) 

1381 team_ids = [team.id for team in user_teams] 

1382 

1383 # Team/public visibility allows access if user is in the team 

1384 if visibility in ["team", "public"] and prompt_team_id in team_ids: 

1385 return True 

1386 

1387 return False 

1388 

1389 async def get_prompt( 

1390 self, 

1391 db: Session, 

1392 prompt_id: Union[int, str], 

1393 arguments: Optional[Dict[str, str]] = None, 

1394 user: Optional[str] = None, 

1395 tenant_id: Optional[str] = None, 

1396 server_id: Optional[str] = None, 

1397 request_id: Optional[str] = None, 

1398 token_teams: Optional[List[str]] = None, 

1399 plugin_context_table: Optional[PluginContextTable] = None, 

1400 plugin_global_context: Optional[GlobalContext] = None, 

1401 _meta_data: Optional[Dict[str, Any]] = None, 

1402 ) -> PromptResult: 

1403 """Get a prompt template and optionally render it. 

1404 

1405 Args: 

1406 db: Database session 

1407 prompt_id: ID of the prompt to retrieve 

1408 arguments: Optional arguments for rendering 

1409 user: Optional user email for authorization checks 

1410 tenant_id: Optional tenant identifier for plugin context 

1411 server_id: Optional server ID for server scoping enforcement 

1412 request_id: Optional request ID, generated if not provided 

1413 token_teams: Optional list of team IDs from token for authorization. 

1414 None = unrestricted admin, [] = public-only, [...] = team-scoped. 

1415 plugin_context_table: Optional plugin context table from previous hooks for cross-hook state sharing. 

1416 plugin_global_context: Optional global context from middleware for consistency across hooks. 

1417 _meta_data: Optional metadata for prompt retrieval (not used currently). 

1418 

1419 Returns: 

1420 Prompt result with rendered messages 

1421 

1422 Raises: 

1423 PluginViolationError: If prompt violates a plugin policy 

1424 PromptNotFoundError: If prompt not found or access denied 

1425 PromptError: For other prompt errors 

1426 PluginError: If encounters issue with plugin 

1427 

1428 Examples: 

1429 >>> from mcpgateway.services.prompt_service import PromptService 

1430 >>> from unittest.mock import MagicMock 

1431 >>> service = PromptService() 

1432 >>> db = MagicMock() 

1433 >>> db.execute.return_value.scalar_one_or_none.return_value = MagicMock() 

1434 >>> import asyncio 

1435 >>> try: 

1436 ... asyncio.run(service.get_prompt(db, 'prompt_id')) 

1437 ... except Exception: 

1438 ... pass 

1439 """ 

1440 

1441 start_time = time.monotonic() 

1442 success = False 

1443 error_message = None 

1444 prompt = None 

1445 server_scoped = False 

1446 

1447 # Create database span for observability dashboard 

1448 trace_id = current_trace_id.get() 

1449 db_span_id = None 

1450 db_span_ended = False 

1451 observability_service = ObservabilityService() if trace_id else None 

1452 

1453 if trace_id and observability_service: 

1454 try: 

1455 db_span_id = observability_service.start_span( 

1456 db=db, 

1457 trace_id=trace_id, 

1458 name="prompt.render", 

1459 attributes={ 

1460 "prompt.id": str(prompt_id), 

1461 "arguments_count": len(arguments) if arguments else 0, 

1462 "user": user or "anonymous", 

1463 "server_id": server_id, 

1464 "tenant_id": tenant_id, 

1465 "request_id": request_id or "none", 

1466 }, 

1467 ) 

1468 logger.debug(f"✓ Created prompt.render span: {db_span_id} for prompt: {prompt_id}") 

1469 except Exception as e: 

1470 logger.warning(f"Failed to start observability span for prompt rendering: {e}") 

1471 db_span_id = None 

1472 

1473 # Create a trace span for OpenTelemetry export (Jaeger, Zipkin, etc.) 

1474 with create_span( 

1475 "prompt.render", 

1476 { 

1477 "prompt.id": prompt_id, 

1478 "arguments_count": len(arguments) if arguments else 0, 

1479 "user": user or "anonymous", 

1480 "server_id": server_id, 

1481 "tenant_id": tenant_id, 

1482 "request_id": request_id or "none", 

1483 }, 

1484 ) as span: 

1485 try: 

1486 # Check if any prompt hooks are registered to avoid unnecessary context creation 

1487 has_pre_fetch = self._plugin_manager and self._plugin_manager.has_hooks_for(PromptHookType.PROMPT_PRE_FETCH) 

1488 has_post_fetch = self._plugin_manager and self._plugin_manager.has_hooks_for(PromptHookType.PROMPT_POST_FETCH) 

1489 

1490 # Initialize plugin context variables only if hooks are registered 

1491 context_table = None 

1492 global_context = None 

1493 if has_pre_fetch or has_post_fetch: 

1494 context_table = plugin_context_table 

1495 if plugin_global_context: 

1496 global_context = plugin_global_context 

1497 # Update fields with prompt-specific information 

1498 if user: 

1499 global_context.user = user 

1500 if server_id: 

1501 global_context.server_id = server_id 

1502 if tenant_id: 

1503 global_context.tenant_id = tenant_id 

1504 else: 

1505 # Create new context (fallback when middleware didn't run) 

1506 if not request_id: 

1507 request_id = uuid.uuid4().hex 

1508 global_context = GlobalContext(request_id=request_id, user=user, server_id=server_id, tenant_id=tenant_id) 

1509 

1510 if has_pre_fetch: 

1511 pre_result, context_table = await self._plugin_manager.invoke_hook( 

1512 PromptHookType.PROMPT_PRE_FETCH, 

1513 payload=PromptPrehookPayload(prompt_id=prompt_id, args=arguments), 

1514 global_context=global_context, 

1515 local_contexts=context_table, # Pass context from previous hooks 

1516 violations_as_exceptions=True, 

1517 ) 

1518 

1519 # Use modified payload if provided 

1520 if pre_result.modified_payload: 

1521 payload = pre_result.modified_payload 

1522 arguments = payload.args 

1523 

1524 # Find prompt by ID first, then by name (active prompts only) 

1525 search_key = str(prompt_id) 

1526 prompt = db.execute(select(DbPrompt).where(DbPrompt.id == prompt_id).where(DbPrompt.enabled)).scalar_one_or_none() 

1527 if not prompt: 

1528 prompt = db.execute(select(DbPrompt).where(DbPrompt.name == prompt_id).where(DbPrompt.enabled)).scalar_one_or_none() 

1529 

1530 if not prompt: 

1531 # Check if an inactive prompt exists 

1532 inactive_prompt = db.execute(select(DbPrompt).where(DbPrompt.id == prompt_id).where(not_(DbPrompt.enabled))).scalar_one_or_none() 

1533 if not inactive_prompt: 

1534 inactive_prompt = db.execute(select(DbPrompt).where(DbPrompt.name == prompt_id).where(not_(DbPrompt.enabled))).scalar_one_or_none() 

1535 

1536 if inactive_prompt: 

1537 raise PromptNotFoundError(f"Prompt '{search_key}' exists but is inactive") 

1538 

1539 raise PromptNotFoundError(f"Prompt not found: {search_key}") 

1540 

1541 # ═══════════════════════════════════════════════════════════════════════════ 

1542 # SECURITY: Check prompt access based on visibility and team membership 

1543 # ═══════════════════════════════════════════════════════════════════════════ 

1544 if not await self._check_prompt_access(db, prompt, user, token_teams): 

1545 # Don't reveal prompt existence - return generic "not found" 

1546 raise PromptNotFoundError(f"Prompt not found: {search_key}") 

1547 

1548 # ═══════════════════════════════════════════════════════════════════════════ 

1549 # SECURITY: Enforce server scoping if server_id is provided 

1550 # Prompt must be attached to the specified virtual server 

1551 # ═══════════════════════════════════════════════════════════════════════════ 

1552 if server_id: 

1553 server_match = db.execute( 

1554 select(server_prompt_association.c.prompt_id).where( 

1555 server_prompt_association.c.server_id == server_id, 

1556 server_prompt_association.c.prompt_id == prompt.id, 

1557 ) 

1558 ).first() 

1559 if not server_match: 

1560 raise PromptNotFoundError(f"Prompt not found: {search_key}") 

1561 server_scoped = True 

1562 

1563 if not arguments: 

1564 result = PromptResult( 

1565 messages=[ 

1566 Message( 

1567 role=Role.USER, 

1568 content=TextContent(type="text", text=prompt.template), 

1569 ) 

1570 ], 

1571 description=prompt.description, 

1572 ) 

1573 else: 

1574 try: 

1575 prompt.validate_arguments(arguments) 

1576 rendered = self._render_template(prompt.template, arguments) 

1577 messages = self._parse_messages(rendered) 

1578 result = PromptResult(messages=messages, description=prompt.description) 

1579 except Exception as e: 

1580 if span: 

1581 span.set_attribute("error", True) 

1582 span.set_attribute("error.message", str(e)) 

1583 raise PromptError(f"Failed to process prompt: {str(e)}") 

1584 

1585 if has_post_fetch: 

1586 post_result, _ = await self._plugin_manager.invoke_hook( 

1587 PromptHookType.PROMPT_POST_FETCH, 

1588 payload=PromptPosthookPayload(prompt_id=str(prompt.id), result=result), 

1589 global_context=global_context, 

1590 local_contexts=context_table, 

1591 violations_as_exceptions=True, 

1592 ) 

1593 # Use modified payload if provided 

1594 result = post_result.modified_payload.result if post_result.modified_payload else result 

1595 

1596 arguments_supplied = bool(arguments) 

1597 

1598 audit_trail.log_action( 

1599 user_id=user or "anonymous", 

1600 action="view_prompt", 

1601 resource_type="prompt", 

1602 resource_id=str(prompt.id), 

1603 resource_name=prompt.name, 

1604 team_id=prompt.team_id, 

1605 context={ 

1606 "tenant_id": tenant_id, 

1607 "server_id": server_id, 

1608 "arguments_provided": arguments_supplied, 

1609 "request_id": request_id, 

1610 }, 

1611 db=db, 

1612 ) 

1613 

1614 structured_logger.log( 

1615 level="INFO", 

1616 message="Prompt retrieved successfully", 

1617 event_type="prompt_viewed", 

1618 component="prompt_service", 

1619 user_id=user, 

1620 team_id=prompt.team_id, 

1621 resource_type="prompt", 

1622 resource_id=str(prompt.id), 

1623 request_id=request_id, 

1624 custom_fields={ 

1625 "prompt_name": prompt.name, 

1626 "arguments_provided": arguments_supplied, 

1627 "tenant_id": tenant_id, 

1628 "server_id": server_id, 

1629 }, 

1630 ) 

1631 

1632 # Set success attributes on span 

1633 if span: 

1634 span.set_attribute("success", True) 

1635 span.set_attribute("duration.ms", (time.monotonic() - start_time) * 1000) 

1636 if result and hasattr(result, "messages"): 

1637 span.set_attribute("messages.count", len(result.messages)) 

1638 

1639 success = True 

1640 logger.info(f"Retrieved prompt: {prompt.id} successfully") 

1641 return result 

1642 

1643 except Exception as e: 

1644 success = False 

1645 error_message = str(e) 

1646 raise 

1647 finally: 

1648 # Record metrics only if we found a prompt 

1649 if prompt: 

1650 try: 

1651 metrics_buffer.record_prompt_metric( 

1652 prompt_id=prompt.id, 

1653 start_time=start_time, 

1654 success=success, 

1655 error_message=error_message, 

1656 ) 

1657 except Exception as metrics_error: 

1658 logger.warning(f"Failed to record prompt metric: {metrics_error}") 

1659 

1660 # Record server metrics ONLY when the server scoping check passed. 

1661 # This prevents recording metrics with unvalidated server_id values 

1662 # from admin API headers (X-Server-ID) or RPC params. 

1663 if server_scoped: 

1664 try: 

1665 # Record server metric only for the specific virtual server being accessed 

1666 metrics_buffer.record_server_metric( 

1667 server_id=server_id, 

1668 start_time=start_time, 

1669 success=success, 

1670 error_message=error_message, 

1671 ) 

1672 except Exception as metrics_error: 

1673 logger.warning(f"Failed to record server metric: {metrics_error}") 

1674 

1675 # End database span for observability dashboard 

1676 if db_span_id and observability_service and not db_span_ended: 

1677 try: 

1678 observability_service.end_span( 

1679 db=db, 

1680 span_id=db_span_id, 

1681 status="ok" if success else "error", 

1682 status_message=error_message if error_message else None, 

1683 ) 

1684 db_span_ended = True 

1685 logger.debug(f"✓ Ended prompt.render span: {db_span_id}") 

1686 except Exception as e: 

1687 logger.warning(f"Failed to end observability span for prompt rendering: {e}") 

1688 

1689 async def update_prompt( 

1690 self, 

1691 db: Session, 

1692 prompt_id: Union[int, str], 

1693 prompt_update: PromptUpdate, 

1694 modified_by: Optional[str] = None, 

1695 modified_from_ip: Optional[str] = None, 

1696 modified_via: Optional[str] = None, 

1697 modified_user_agent: Optional[str] = None, 

1698 user_email: Optional[str] = None, 

1699 ) -> PromptRead: 

1700 """ 

1701 Update a prompt template. 

1702 

1703 Args: 

1704 db: Database session 

1705 prompt_id: ID of prompt to update 

1706 prompt_update: Prompt update object 

1707 modified_by: Username of the person modifying the prompt 

1708 modified_from_ip: IP address where the modification originated 

1709 modified_via: Source of modification (ui/api/import) 

1710 modified_user_agent: User agent string from the modification request 

1711 user_email: Email of user performing update (for ownership check) 

1712 

1713 Returns: 

1714 The updated PromptRead object 

1715 

1716 Raises: 

1717 PromptNotFoundError: If the prompt is not found 

1718 PermissionError: If user doesn't own the prompt 

1719 IntegrityError: If a database integrity error occurs. 

1720 PromptNameConflictError: If a prompt with the same name already exists. 

1721 PromptError: For other update errors 

1722 

1723 Examples: 

1724 >>> from mcpgateway.services.prompt_service import PromptService 

1725 >>> from unittest.mock import MagicMock 

1726 >>> service = PromptService() 

1727 >>> db = MagicMock() 

1728 >>> db.execute.return_value.scalar_one_or_none.return_value = MagicMock() 

1729 >>> db.commit = MagicMock() 

1730 >>> db.refresh = MagicMock() 

1731 >>> service._notify_prompt_updated = MagicMock() 

1732 >>> service.convert_prompt_to_read = MagicMock(return_value={}) 

1733 >>> import asyncio 

1734 >>> try: 

1735 ... asyncio.run(service.update_prompt(db, 'prompt_name', MagicMock())) 

1736 ... except Exception: 

1737 ... pass 

1738 """ 

1739 try: 

1740 # Acquire a row-level lock for the prompt being updated to make 

1741 # name-checks and the subsequent update atomic in PostgreSQL. 

1742 # For SQLite `get_for_update` falls back to a regular get. 

1743 prompt = get_for_update(db, DbPrompt, prompt_id) 

1744 if not prompt: 

1745 raise PromptNotFoundError(f"Prompt not found: {prompt_id}") 

1746 

1747 visibility = prompt_update.visibility or prompt.visibility 

1748 team_id = prompt_update.team_id or prompt.team_id 

1749 owner_email = prompt.owner_email or user_email 

1750 

1751 candidate_custom_name = prompt.custom_name 

1752 

1753 if prompt_update.name is not None: 

1754 candidate_custom_name = prompt_update.custom_name or prompt_update.name 

1755 elif prompt_update.custom_name is not None: 

1756 candidate_custom_name = prompt_update.custom_name 

1757 

1758 computed_name = self._compute_prompt_name(candidate_custom_name, prompt.gateway) 

1759 if computed_name != prompt.name: 

1760 if visibility.lower() == "public": 

1761 # Lock any conflicting row so concurrent updates cannot race. 

1762 existing_prompt = get_for_update(db, DbPrompt, where=and_(DbPrompt.name == computed_name, DbPrompt.visibility == "public", DbPrompt.id != prompt.id)) 

1763 if existing_prompt: 

1764 raise PromptNameConflictError(computed_name, enabled=existing_prompt.enabled, prompt_id=existing_prompt.id, visibility=existing_prompt.visibility) 

1765 elif visibility.lower() == "team" and team_id: 

1766 existing_prompt = get_for_update(db, DbPrompt, where=and_(DbPrompt.name == computed_name, DbPrompt.visibility == "team", DbPrompt.team_id == team_id, DbPrompt.id != prompt.id)) 

1767 logger.info(f"Existing prompt check result: {existing_prompt}") 

1768 if existing_prompt: 

1769 raise PromptNameConflictError(computed_name, enabled=existing_prompt.enabled, prompt_id=existing_prompt.id, visibility=existing_prompt.visibility) 

1770 elif visibility.lower() == "private": 

1771 existing_prompt = get_for_update( 

1772 db, DbPrompt, where=and_(DbPrompt.name == computed_name, DbPrompt.visibility == "private", DbPrompt.owner_email == owner_email, DbPrompt.id != prompt.id) 

1773 ) 

1774 if existing_prompt: 

1775 raise PromptNameConflictError(computed_name, enabled=existing_prompt.enabled, prompt_id=existing_prompt.id, visibility=existing_prompt.visibility) 

1776 

1777 # Check ownership if user_email provided 

1778 if user_email: 

1779 # First-Party 

1780 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel 

1781 

1782 permission_service = PermissionService(db) 

1783 if not await permission_service.check_resource_ownership(user_email, prompt): 

1784 raise PermissionError("Only the owner can update this prompt") 

1785 

1786 if prompt_update.name is not None: 

1787 if prompt.gateway_id: 

1788 prompt.custom_name = prompt_update.custom_name or prompt_update.name 

1789 else: 

1790 prompt.original_name = prompt_update.name 

1791 if prompt_update.custom_name is None: 

1792 prompt.custom_name = prompt_update.name 

1793 if prompt_update.custom_name is not None: 

1794 prompt.custom_name = prompt_update.custom_name 

1795 if prompt_update.display_name is not None: 

1796 prompt.display_name = prompt_update.display_name 

1797 if prompt_update.description is not None: 

1798 prompt.description = prompt_update.description 

1799 if prompt_update.template is not None: 

1800 prompt.template = prompt_update.template 

1801 self._validate_template(prompt.template) 

1802 # Clear template cache to reduce memory growth 

1803 _compile_jinja_template.cache_clear() 

1804 if prompt_update.arguments is not None: 

1805 required_args = self._get_required_arguments(prompt.template) 

1806 argument_schema = { 

1807 "type": "object", 

1808 "properties": {}, 

1809 "required": list(required_args), 

1810 } 

1811 for arg in prompt_update.arguments: 

1812 schema = {"type": "string"} 

1813 if arg.description is not None: 

1814 schema["description"] = arg.description 

1815 argument_schema["properties"][arg.name] = schema 

1816 prompt.argument_schema = argument_schema 

1817 

1818 if prompt_update.visibility is not None: 

1819 prompt.visibility = prompt_update.visibility 

1820 

1821 # Update tags if provided 

1822 if prompt_update.tags is not None: 

1823 prompt.tags = prompt_update.tags 

1824 

1825 # Update metadata fields 

1826 prompt.updated_at = datetime.now(timezone.utc) 

1827 if modified_by: 

1828 prompt.modified_by = modified_by 

1829 if modified_from_ip: 

1830 prompt.modified_from_ip = modified_from_ip 

1831 if modified_via: 

1832 prompt.modified_via = modified_via 

1833 if modified_user_agent: 

1834 prompt.modified_user_agent = modified_user_agent 

1835 if hasattr(prompt, "version") and prompt.version is not None: 

1836 prompt.version = prompt.version + 1 

1837 else: 

1838 prompt.version = 1 

1839 

1840 db.commit() 

1841 db.refresh(prompt) 

1842 

1843 await self._notify_prompt_updated(prompt) 

1844 

1845 # Structured logging: Audit trail for prompt update 

1846 audit_trail.log_action( 

1847 user_id=user_email or modified_by or "system", 

1848 action="update_prompt", 

1849 resource_type="prompt", 

1850 resource_id=str(prompt.id), 

1851 resource_name=prompt.name, 

1852 user_email=user_email, 

1853 team_id=prompt.team_id, 

1854 client_ip=modified_from_ip, 

1855 user_agent=modified_user_agent, 

1856 new_values={"name": prompt.name, "version": prompt.version}, 

1857 context={"modified_via": modified_via}, 

1858 db=db, 

1859 ) 

1860 

1861 structured_logger.log( 

1862 level="INFO", 

1863 message="Prompt updated successfully", 

1864 event_type="prompt_updated", 

1865 component="prompt_service", 

1866 user_id=modified_by, 

1867 user_email=user_email, 

1868 team_id=prompt.team_id, 

1869 resource_type="prompt", 

1870 resource_id=str(prompt.id), 

1871 custom_fields={"prompt_name": prompt.name, "version": prompt.version}, 

1872 ) 

1873 

1874 prompt.team = self._get_team_name(db, prompt.team_id) 

1875 

1876 # Invalidate cache after successful update 

1877 cache = _get_registry_cache() 

1878 await cache.invalidate_prompts() 

1879 # Also invalidate tags cache since prompt tags may have changed 

1880 # First-Party 

1881 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel 

1882 

1883 await admin_stats_cache.invalidate_tags() 

1884 

1885 return self.convert_prompt_to_read(prompt) 

1886 

1887 except PermissionError as pe: 

1888 db.rollback() 

1889 

1890 structured_logger.log( 

1891 level="WARNING", 

1892 message="Prompt update failed due to permission error", 

1893 event_type="prompt_update_permission_denied", 

1894 component="prompt_service", 

1895 user_email=user_email, 

1896 resource_type="prompt", 

1897 resource_id=str(prompt_id), 

1898 error=pe, 

1899 ) 

1900 raise 

1901 except IntegrityError as ie: 

1902 db.rollback() 

1903 logger.error(f"IntegrityErrors in group: {ie}") 

1904 

1905 structured_logger.log( 

1906 level="ERROR", 

1907 message="Prompt update failed due to database integrity error", 

1908 event_type="prompt_update_failed", 

1909 component="prompt_service", 

1910 user_email=user_email, 

1911 resource_type="prompt", 

1912 resource_id=str(prompt_id), 

1913 error=ie, 

1914 ) 

1915 raise ie 

1916 except PromptNotFoundError as e: 

1917 db.rollback() 

1918 logger.error(f"Prompt not found: {e}") 

1919 

1920 structured_logger.log( 

1921 level="ERROR", 

1922 message="Prompt update failed - prompt not found", 

1923 event_type="prompt_not_found", 

1924 component="prompt_service", 

1925 user_email=user_email, 

1926 resource_type="prompt", 

1927 resource_id=str(prompt_id), 

1928 error=e, 

1929 ) 

1930 raise e 

1931 except PromptNameConflictError as pnce: 

1932 db.rollback() 

1933 logger.error(f"Prompt name conflict: {pnce}") 

1934 

1935 structured_logger.log( 

1936 level="WARNING", 

1937 message="Prompt update failed due to name conflict", 

1938 event_type="prompt_name_conflict", 

1939 component="prompt_service", 

1940 user_email=user_email, 

1941 resource_type="prompt", 

1942 resource_id=str(prompt_id), 

1943 error=pnce, 

1944 ) 

1945 raise pnce 

1946 except Exception as e: 

1947 db.rollback() 

1948 

1949 structured_logger.log( 

1950 level="ERROR", 

1951 message="Prompt update failed", 

1952 event_type="prompt_update_failed", 

1953 component="prompt_service", 

1954 user_email=user_email, 

1955 resource_type="prompt", 

1956 resource_id=str(prompt_id), 

1957 error=e, 

1958 ) 

1959 raise PromptError(f"Failed to update prompt: {str(e)}") 

1960 

1961 async def set_prompt_state(self, db: Session, prompt_id: int, activate: bool, user_email: Optional[str] = None, skip_cache_invalidation: bool = False) -> PromptRead: 

1962 """ 

1963 Set the activation status of a prompt. 

1964 

1965 Args: 

1966 db: Database session 

1967 prompt_id: Prompt ID 

1968 activate: True to activate, False to deactivate 

1969 user_email: Optional[str] The email of the user to check if the user has permission to modify. 

1970 skip_cache_invalidation: If True, skip cache invalidation (used for batch operations). 

1971 

1972 Returns: 

1973 The updated PromptRead object 

1974 

1975 Raises: 

1976 PromptNotFoundError: If the prompt is not found. 

1977 PromptLockConflictError: If the prompt is locked by another transaction. 

1978 PromptError: For other errors. 

1979 PermissionError: If user doesn't own the prompt. 

1980 

1981 Examples: 

1982 >>> from mcpgateway.services.prompt_service import PromptService 

1983 >>> from unittest.mock import MagicMock 

1984 >>> service = PromptService() 

1985 >>> db = MagicMock() 

1986 >>> prompt = MagicMock() 

1987 >>> db.get.return_value = prompt 

1988 >>> db.commit = MagicMock() 

1989 >>> db.refresh = MagicMock() 

1990 >>> service._notify_prompt_activated = MagicMock() 

1991 >>> service._notify_prompt_deactivated = MagicMock() 

1992 >>> service.convert_prompt_to_read = MagicMock(return_value={}) 

1993 >>> import asyncio 

1994 >>> try: 

1995 ... asyncio.run(service.set_prompt_state(db, 1, True)) 

1996 ... except Exception: 

1997 ... pass 

1998 """ 

1999 try: 

2000 # Use nowait=True to fail fast if row is locked, preventing lock contention under high load 

2001 try: 

2002 prompt = get_for_update(db, DbPrompt, prompt_id, nowait=True) 

2003 except OperationalError as lock_err: 

2004 # Row is locked by another transaction - fail fast with 409 

2005 db.rollback() 

2006 raise PromptLockConflictError(f"Prompt {prompt_id} is currently being modified by another request") from lock_err 

2007 if not prompt: 

2008 raise PromptNotFoundError(f"Prompt not found: {prompt_id}") 

2009 

2010 if user_email: 

2011 # First-Party 

2012 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel 

2013 

2014 permission_service = PermissionService(db) 

2015 if not await permission_service.check_resource_ownership(user_email, prompt): 

2016 raise PermissionError("Only the owner can activate the Prompt" if activate else "Only the owner can deactivate the Prompt") 

2017 

2018 if prompt.enabled != activate: 

2019 prompt.enabled = activate 

2020 prompt.updated_at = datetime.now(timezone.utc) 

2021 db.commit() 

2022 db.refresh(prompt) 

2023 

2024 # Invalidate cache after status change (skip for batch operations) 

2025 if not skip_cache_invalidation: 

2026 cache = _get_registry_cache() 

2027 await cache.invalidate_prompts() 

2028 

2029 if activate: 

2030 await self._notify_prompt_activated(prompt) 

2031 else: 

2032 await self._notify_prompt_deactivated(prompt) 

2033 logger.info(f"Prompt {prompt.name} {'activated' if activate else 'deactivated'}") 

2034 

2035 # Structured logging: Audit trail for prompt state change 

2036 audit_trail.log_action( 

2037 user_id=user_email or "system", 

2038 action="set_prompt_state", 

2039 resource_type="prompt", 

2040 resource_id=str(prompt.id), 

2041 resource_name=prompt.name, 

2042 user_email=user_email, 

2043 team_id=prompt.team_id, 

2044 new_values={"enabled": prompt.enabled}, 

2045 context={"action": "activate" if activate else "deactivate"}, 

2046 db=db, 

2047 ) 

2048 

2049 structured_logger.log( 

2050 level="INFO", 

2051 message=f"Prompt {'activated' if activate else 'deactivated'} successfully", 

2052 event_type="prompt_state_changed", 

2053 component="prompt_service", 

2054 user_email=user_email, 

2055 team_id=prompt.team_id, 

2056 resource_type="prompt", 

2057 resource_id=str(prompt.id), 

2058 custom_fields={"prompt_name": prompt.name, "enabled": prompt.enabled}, 

2059 ) 

2060 

2061 prompt.team = self._get_team_name(db, prompt.team_id) 

2062 return self.convert_prompt_to_read(prompt) 

2063 except PermissionError as e: 

2064 structured_logger.log( 

2065 level="WARNING", 

2066 message="Prompt state change failed due to permission error", 

2067 event_type="prompt_state_change_permission_denied", 

2068 component="prompt_service", 

2069 user_email=user_email, 

2070 resource_type="prompt", 

2071 resource_id=str(prompt_id), 

2072 error=e, 

2073 ) 

2074 raise e 

2075 except PromptLockConflictError: 

2076 # Re-raise lock conflicts without wrapping - allows 409 response 

2077 raise 

2078 except PromptNotFoundError: 

2079 # Re-raise not found without wrapping - allows 404 response 

2080 raise 

2081 except Exception as e: 

2082 db.rollback() 

2083 

2084 structured_logger.log( 

2085 level="ERROR", 

2086 message="Prompt state change failed", 

2087 event_type="prompt_state_change_failed", 

2088 component="prompt_service", 

2089 user_email=user_email, 

2090 resource_type="prompt", 

2091 resource_id=str(prompt_id), 

2092 error=e, 

2093 ) 

2094 raise PromptError(f"Failed to set prompt state: {str(e)}") 

2095 

2096 # Get prompt details for admin ui 

2097 

2098 async def get_prompt_details(self, db: Session, prompt_id: Union[int, str], include_inactive: bool = False) -> Dict[str, Any]: # pylint: disable=unused-argument 

2099 """ 

2100 Get prompt details by ID. 

2101 

2102 Args: 

2103 db: Database session 

2104 prompt_id: ID of prompt 

2105 include_inactive: Whether to include inactive prompts 

2106 

2107 Returns: 

2108 Dictionary of prompt details 

2109 

2110 Raises: 

2111 PromptNotFoundError: If the prompt is not found 

2112 

2113 Examples: 

2114 >>> from mcpgateway.services.prompt_service import PromptService 

2115 >>> from unittest.mock import MagicMock 

2116 >>> service = PromptService() 

2117 >>> db = MagicMock() 

2118 >>> prompt_dict = {'id': '1', 'name': 'test', 'description': 'desc', 'template': 'tpl', 'arguments': [], 'createdAt': '2023-01-01T00:00:00', 'updatedAt': '2023-01-01T00:00:00', 'isActive': True, 'metrics': {}} 

2119 >>> service.convert_prompt_to_read = MagicMock(return_value=prompt_dict) 

2120 >>> db.execute.return_value.scalar_one_or_none.return_value = MagicMock() 

2121 >>> import asyncio 

2122 >>> result = asyncio.run(service.get_prompt_details(db, 'prompt_name')) 

2123 >>> result == prompt_dict 

2124 True 

2125 """ 

2126 prompt = db.get(DbPrompt, prompt_id) 

2127 if not prompt: 

2128 raise PromptNotFoundError(f"Prompt not found: {prompt_id}") 

2129 # Return the fully converted prompt including metrics 

2130 prompt.team = self._get_team_name(db, prompt.team_id) 

2131 prompt_data = self.convert_prompt_to_read(prompt) 

2132 

2133 audit_trail.log_action( 

2134 user_id="system", 

2135 action="view_prompt_details", 

2136 resource_type="prompt", 

2137 resource_id=str(prompt.id), 

2138 resource_name=prompt.name, 

2139 team_id=prompt.team_id, 

2140 context={"include_inactive": include_inactive}, 

2141 db=db, 

2142 ) 

2143 

2144 structured_logger.log( 

2145 level="INFO", 

2146 message="Prompt details retrieved", 

2147 event_type="prompt_details_viewed", 

2148 component="prompt_service", 

2149 resource_type="prompt", 

2150 resource_id=str(prompt.id), 

2151 team_id=prompt.team_id, 

2152 custom_fields={ 

2153 "prompt_name": prompt.name, 

2154 "include_inactive": include_inactive, 

2155 }, 

2156 ) 

2157 

2158 return prompt_data 

2159 

2160 async def delete_prompt(self, db: Session, prompt_id: Union[int, str], user_email: Optional[str] = None, purge_metrics: bool = False) -> None: 

2161 """ 

2162 Delete a prompt template by its ID. 

2163 

2164 Args: 

2165 db (Session): Database session. 

2166 prompt_id (str): ID of the prompt to delete. 

2167 user_email (Optional[str]): Email of user performing delete (for ownership check). 

2168 purge_metrics (bool): If True, delete raw + rollup metrics for this prompt. 

2169 

2170 Raises: 

2171 PromptNotFoundError: If the prompt is not found. 

2172 PermissionError: If user doesn't own the prompt. 

2173 PromptError: For other deletion errors. 

2174 Exception: For unexpected errors. 

2175 

2176 Examples: 

2177 >>> from mcpgateway.services.prompt_service import PromptService 

2178 >>> from unittest.mock import MagicMock 

2179 >>> service = PromptService() 

2180 >>> db = MagicMock() 

2181 >>> prompt = MagicMock() 

2182 >>> db.get.return_value = prompt 

2183 >>> db.delete = MagicMock() 

2184 >>> db.commit = MagicMock() 

2185 >>> service._notify_prompt_deleted = MagicMock() 

2186 >>> import asyncio 

2187 >>> try: 

2188 ... asyncio.run(service.delete_prompt(db, '123')) 

2189 ... except Exception: 

2190 ... pass 

2191 """ 

2192 try: 

2193 prompt = db.get(DbPrompt, prompt_id) 

2194 if not prompt: 

2195 raise PromptNotFoundError(f"Prompt not found: {prompt_id}") 

2196 

2197 # Check ownership if user_email provided 

2198 if user_email: 

2199 # First-Party 

2200 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel 

2201 

2202 permission_service = PermissionService(db) 

2203 if not await permission_service.check_resource_ownership(user_email, prompt): 

2204 raise PermissionError("Only the owner can delete this prompt") 

2205 

2206 prompt_info = {"id": prompt.id, "name": prompt.name} 

2207 prompt_name = prompt.name 

2208 prompt_team_id = prompt.team_id 

2209 

2210 if purge_metrics: 

2211 with pause_rollup_during_purge(reason=f"purge_prompt:{prompt_id}"): 

2212 delete_metrics_in_batches(db, PromptMetric, PromptMetric.prompt_id, prompt_id) 

2213 delete_metrics_in_batches(db, PromptMetricsHourly, PromptMetricsHourly.prompt_id, prompt_id) 

2214 

2215 db.delete(prompt) 

2216 db.commit() 

2217 await self._notify_prompt_deleted(prompt_info) 

2218 logger.info(f"Deleted prompt: {prompt_info['name']}") 

2219 

2220 # Structured logging: Audit trail for prompt deletion 

2221 audit_trail.log_action( 

2222 user_id=user_email or "system", 

2223 action="delete_prompt", 

2224 resource_type="prompt", 

2225 resource_id=str(prompt_info["id"]), 

2226 resource_name=prompt_name, 

2227 user_email=user_email, 

2228 team_id=prompt_team_id, 

2229 old_values={"name": prompt_name}, 

2230 db=db, 

2231 ) 

2232 

2233 # Structured logging: Log successful prompt deletion 

2234 structured_logger.log( 

2235 level="INFO", 

2236 message="Prompt deleted successfully", 

2237 event_type="prompt_deleted", 

2238 component="prompt_service", 

2239 user_email=user_email, 

2240 team_id=prompt_team_id, 

2241 resource_type="prompt", 

2242 resource_id=str(prompt_info["id"]), 

2243 custom_fields={ 

2244 "prompt_name": prompt_name, 

2245 "purge_metrics": purge_metrics, 

2246 }, 

2247 ) 

2248 

2249 # Invalidate cache after successful deletion 

2250 cache = _get_registry_cache() 

2251 await cache.invalidate_prompts() 

2252 # Also invalidate tags cache since prompt tags may have changed 

2253 # First-Party 

2254 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel 

2255 

2256 await admin_stats_cache.invalidate_tags() 

2257 except PermissionError as pe: 

2258 db.rollback() 

2259 

2260 # Structured logging: Log permission error 

2261 structured_logger.log( 

2262 level="WARNING", 

2263 message="Prompt deletion failed due to permission error", 

2264 event_type="prompt_delete_permission_denied", 

2265 component="prompt_service", 

2266 user_email=user_email, 

2267 resource_type="prompt", 

2268 resource_id=str(prompt_id), 

2269 error=pe, 

2270 ) 

2271 raise 

2272 except Exception as e: 

2273 db.rollback() 

2274 if isinstance(e, PromptNotFoundError): 

2275 # Structured logging: Log not found error 

2276 structured_logger.log( 

2277 level="ERROR", 

2278 message="Prompt deletion failed - prompt not found", 

2279 event_type="prompt_not_found", 

2280 component="prompt_service", 

2281 user_email=user_email, 

2282 resource_type="prompt", 

2283 resource_id=str(prompt_id), 

2284 error=e, 

2285 ) 

2286 raise e 

2287 

2288 # Structured logging: Log generic prompt deletion failure 

2289 structured_logger.log( 

2290 level="ERROR", 

2291 message="Prompt deletion failed", 

2292 event_type="prompt_deletion_failed", 

2293 component="prompt_service", 

2294 user_email=user_email, 

2295 resource_type="prompt", 

2296 resource_id=str(prompt_id), 

2297 error=e, 

2298 ) 

2299 raise PromptError(f"Failed to delete prompt: {str(e)}") 

2300 

2301 async def subscribe_events(self) -> AsyncGenerator[Dict[str, Any], None]: 

2302 """Subscribe to Prompt events via the EventService. 

2303 

2304 Yields: 

2305 Prompt event messages. 

2306 """ 

2307 async for event in self._event_service.subscribe_events(): 

2308 yield event 

2309 

2310 def _validate_template(self, template: str) -> None: 

2311 """Validate template syntax. 

2312 

2313 Args: 

2314 template: Template to validate 

2315 

2316 Raises: 

2317 PromptValidationError: If template is invalid 

2318 

2319 Examples: 

2320 >>> from mcpgateway.services.prompt_service import PromptService 

2321 >>> service = PromptService() 

2322 >>> service._validate_template("Hello {{ name }}") # Valid template 

2323 >>> try: 

2324 ... service._validate_template("Hello {{ invalid") # Invalid template 

2325 ... except Exception as e: 

2326 ... "Invalid template syntax" in str(e) 

2327 True 

2328 """ 

2329 try: 

2330 self._jinja_env.parse(template) 

2331 except Exception as e: 

2332 raise PromptValidationError(f"Invalid template syntax: {str(e)}") 

2333 

2334 def _get_required_arguments(self, template: str) -> Set[str]: 

2335 """Extract required arguments from template. 

2336 

2337 Args: 

2338 template: Template to analyze 

2339 

2340 Returns: 

2341 Set of required argument names 

2342 

2343 Examples: 

2344 >>> from mcpgateway.services.prompt_service import PromptService 

2345 >>> service = PromptService() 

2346 >>> args = service._get_required_arguments("Hello {{ name }} from {{ place }}") 

2347 >>> sorted(args) 

2348 ['name', 'place'] 

2349 >>> service._get_required_arguments("No variables") == set() 

2350 True 

2351 """ 

2352 ast = self._jinja_env.parse(template) 

2353 variables = meta.find_undeclared_variables(ast) 

2354 formatter = Formatter() 

2355 format_vars = {field_name for _, field_name, _, _ in formatter.parse(template) if field_name is not None} 

2356 return variables.union(format_vars) 

2357 

2358 def _render_template(self, template: str, arguments: Dict[str, str]) -> str: 

2359 """Render template with arguments using cached compiled templates. 

2360 

2361 Args: 

2362 template: Template to render 

2363 arguments: Arguments for rendering 

2364 

2365 Returns: 

2366 Rendered template text 

2367 

2368 Raises: 

2369 PromptError: If rendering fails 

2370 

2371 Examples: 

2372 >>> from mcpgateway.services.prompt_service import PromptService 

2373 >>> service = PromptService() 

2374 >>> result = service._render_template("Hello {{ name }}", {"name": "World"}) 

2375 >>> result 

2376 'Hello World' 

2377 >>> service._render_template("No variables", {}) 

2378 'No variables' 

2379 """ 

2380 try: 

2381 jinja_template = _compile_jinja_template(template) 

2382 return jinja_template.render(**arguments) 

2383 except Exception: 

2384 try: 

2385 return template.format(**arguments) 

2386 except Exception as e: 

2387 raise PromptError(f"Failed to render template: {str(e)}") 

2388 

2389 def _parse_messages(self, text: str) -> List[Message]: 

2390 """Parse rendered text into messages. 

2391 

2392 Args: 

2393 text: Text to parse 

2394 

2395 Returns: 

2396 List of parsed messages 

2397 

2398 Examples: 

2399 >>> from mcpgateway.services.prompt_service import PromptService 

2400 >>> service = PromptService() 

2401 >>> messages = service._parse_messages("Simple text") 

2402 >>> len(messages) 

2403 1 

2404 >>> messages[0].role.value 

2405 'user' 

2406 >>> messages = service._parse_messages("# User:\\nHello\\n# Assistant:\\nHi there") 

2407 >>> len(messages) 

2408 2 

2409 """ 

2410 messages = [] 

2411 current_role = Role.USER 

2412 current_text = [] 

2413 for line in text.split("\n"): 

2414 if line.startswith("# Assistant:"): 

2415 if current_text: 

2416 messages.append( 

2417 Message( 

2418 role=current_role, 

2419 content=TextContent(type="text", text="\n".join(current_text).strip()), 

2420 ) 

2421 ) 

2422 current_role = Role.ASSISTANT 

2423 current_text = [] 

2424 elif line.startswith("# User:"): 

2425 if current_text: 

2426 messages.append( 

2427 Message( 

2428 role=current_role, 

2429 content=TextContent(type="text", text="\n".join(current_text).strip()), 

2430 ) 

2431 ) 

2432 current_role = Role.USER 

2433 current_text = [] 

2434 else: 

2435 current_text.append(line) 

2436 if current_text: 

2437 messages.append( 

2438 Message( 

2439 role=current_role, 

2440 content=TextContent(type="text", text="\n".join(current_text).strip()), 

2441 ) 

2442 ) 

2443 return messages 

2444 

2445 async def _notify_prompt_added(self, prompt: DbPrompt) -> None: 

2446 """ 

2447 Notify subscribers of prompt addition. 

2448 

2449 Args: 

2450 prompt: Prompt to add 

2451 """ 

2452 event = { 

2453 "type": "prompt_added", 

2454 "data": { 

2455 "id": prompt.id, 

2456 "name": prompt.name, 

2457 "description": prompt.description, 

2458 "enabled": prompt.enabled, 

2459 }, 

2460 "timestamp": datetime.now(timezone.utc).isoformat(), 

2461 } 

2462 await self._publish_event(event) 

2463 

2464 async def _notify_prompt_updated(self, prompt: DbPrompt) -> None: 

2465 """ 

2466 Notify subscribers of prompt update. 

2467 

2468 Args: 

2469 prompt: Prompt to update 

2470 """ 

2471 event = { 

2472 "type": "prompt_updated", 

2473 "data": { 

2474 "id": prompt.id, 

2475 "name": prompt.name, 

2476 "description": prompt.description, 

2477 "enabled": prompt.enabled, 

2478 }, 

2479 "timestamp": datetime.now(timezone.utc).isoformat(), 

2480 } 

2481 await self._publish_event(event) 

2482 

2483 async def _notify_prompt_activated(self, prompt: DbPrompt) -> None: 

2484 """ 

2485 Notify subscribers of prompt activation. 

2486 

2487 Args: 

2488 prompt: Prompt to activate 

2489 """ 

2490 event = { 

2491 "type": "prompt_activated", 

2492 "data": {"id": prompt.id, "name": prompt.name, "enabled": True}, 

2493 "timestamp": datetime.now(timezone.utc).isoformat(), 

2494 } 

2495 await self._publish_event(event) 

2496 

2497 async def _notify_prompt_deactivated(self, prompt: DbPrompt) -> None: 

2498 """ 

2499 Notify subscribers of prompt deactivation. 

2500 

2501 Args: 

2502 prompt: Prompt to deactivate 

2503 """ 

2504 event = { 

2505 "type": "prompt_deactivated", 

2506 "data": {"id": prompt.id, "name": prompt.name, "enabled": False}, 

2507 "timestamp": datetime.now(timezone.utc).isoformat(), 

2508 } 

2509 await self._publish_event(event) 

2510 

2511 async def _notify_prompt_deleted(self, prompt_info: Dict[str, Any]) -> None: 

2512 """ 

2513 Notify subscribers of prompt deletion. 

2514 

2515 Args: 

2516 prompt_info: Dict on prompt to notify as deleted 

2517 """ 

2518 event = { 

2519 "type": "prompt_deleted", 

2520 "data": prompt_info, 

2521 "timestamp": datetime.now(timezone.utc).isoformat(), 

2522 } 

2523 await self._publish_event(event) 

2524 

2525 async def _notify_prompt_removed(self, prompt: DbPrompt) -> None: 

2526 """ 

2527 Notify subscribers of prompt removal (deactivation). 

2528 

2529 Args: 

2530 prompt: Prompt to remove 

2531 """ 

2532 event = { 

2533 "type": "prompt_removed", 

2534 "data": {"id": prompt.id, "name": prompt.name, "enabled": False}, 

2535 "timestamp": datetime.now(timezone.utc).isoformat(), 

2536 } 

2537 await self._publish_event(event) 

2538 

2539 async def _publish_event(self, event: Dict[str, Any]) -> None: 

2540 """ 

2541 Publish event to all subscribers via the EventService. 

2542 

2543 Args: 

2544 event: Event to publish 

2545 """ 

2546 await self._event_service.publish_event(event) 

2547 

2548 # --- Metrics --- 

2549 async def aggregate_metrics(self, db: Session) -> PromptMetrics: 

2550 """ 

2551 Aggregate metrics for all prompt invocations across all prompts. 

2552 

2553 Combines recent raw metrics (within retention period) with historical 

2554 hourly rollups for complete historical coverage. Uses in-memory caching 

2555 (10s TTL) to reduce database load under high request rates. 

2556 

2557 Args: 

2558 db: Database session 

2559 

2560 Returns: 

2561 PromptMetrics: Aggregated prompt metrics from raw + hourly rollups. 

2562 

2563 Examples: 

2564 >>> from mcpgateway.services.prompt_service import PromptService 

2565 >>> service = PromptService() 

2566 >>> # Method exists and is callable 

2567 >>> callable(service.aggregate_metrics) 

2568 True 

2569 """ 

2570 # Check cache first (if enabled) 

2571 # First-Party 

2572 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel 

2573 

2574 if is_cache_enabled(): 

2575 cached = metrics_cache.get("prompts") 

2576 if cached is not None: 

2577 return PromptMetrics(**cached) 

2578 

2579 # Use combined raw + rollup query for full historical coverage 

2580 # First-Party 

2581 from mcpgateway.services.metrics_query_service import aggregate_metrics_combined # pylint: disable=import-outside-toplevel 

2582 

2583 result = aggregate_metrics_combined(db, "prompt") 

2584 

2585 metrics = PromptMetrics( 

2586 total_executions=result.total_executions, 

2587 successful_executions=result.successful_executions, 

2588 failed_executions=result.failed_executions, 

2589 failure_rate=result.failure_rate, 

2590 min_response_time=result.min_response_time, 

2591 max_response_time=result.max_response_time, 

2592 avg_response_time=result.avg_response_time, 

2593 last_execution_time=result.last_execution_time, 

2594 ) 

2595 

2596 # Cache the result as dict for serialization compatibility (if enabled) 

2597 if is_cache_enabled(): 

2598 metrics_cache.set("prompts", metrics.model_dump()) 

2599 

2600 return metrics 

2601 

2602 async def reset_metrics(self, db: Session) -> None: 

2603 """ 

2604 Reset all prompt metrics by deleting raw and hourly rollup records. 

2605 

2606 Args: 

2607 db: Database session 

2608 

2609 Examples: 

2610 >>> from mcpgateway.services.prompt_service import PromptService 

2611 >>> from unittest.mock import MagicMock 

2612 >>> service = PromptService() 

2613 >>> db = MagicMock() 

2614 >>> db.execute = MagicMock() 

2615 >>> db.commit = MagicMock() 

2616 >>> import asyncio 

2617 >>> asyncio.run(service.reset_metrics(db)) 

2618 """ 

2619 

2620 db.execute(delete(PromptMetric)) 

2621 db.execute(delete(PromptMetricsHourly)) 

2622 db.commit() 

2623 

2624 # Invalidate metrics cache 

2625 # First-Party 

2626 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel 

2627 

2628 metrics_cache.invalidate("prompts") 

2629 metrics_cache.invalidate_prefix("top_prompts:") 

2630 

2631 

2632# Lazy singleton - created on first access, not at module import time. 

2633# This avoids instantiation when only exception classes are imported. 

2634_prompt_service_instance = None # pylint: disable=invalid-name 

2635 

2636 

2637def __getattr__(name: str): 

2638 """Module-level __getattr__ for lazy singleton creation. 

2639 

2640 Args: 

2641 name: The attribute name being accessed. 

2642 

2643 Returns: 

2644 The prompt_service singleton instance if name is "prompt_service". 

2645 

2646 Raises: 

2647 AttributeError: If the attribute name is not "prompt_service". 

2648 """ 

2649 global _prompt_service_instance # pylint: disable=global-statement 

2650 if name == "prompt_service": 

2651 if _prompt_service_instance is None: 

2652 _prompt_service_instance = PromptService() 

2653 return _prompt_service_instance 

2654 raise AttributeError(f"module {__name__!r} has no attribute {name!r}")