Coverage for mcpgateway / services / prompt_service.py: 99%

860 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/prompt_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Mihai Criveti 

6 

7Prompt Service Implementation. 

8This module implements prompt template management according to the MCP specification. 

9It handles: 

10- Prompt template registration and retrieval 

11- Prompt argument validation 

12- Template rendering with arguments 

13- Resource embedding in prompts 

14- Active/inactive prompt management 

15""" 

16 

17# Standard 

18import binascii 

19from datetime import datetime, timezone 

20from functools import lru_cache 

21import os 

22from string import Formatter 

23import time 

24from typing import Any, AsyncGenerator, Dict, List, Optional, Set, Union 

25import uuid 

26 

27# Third-Party 

28from jinja2 import Environment, meta, select_autoescape, Template 

29from pydantic import ValidationError 

30from sqlalchemy import and_, delete, desc, not_, or_, select 

31from sqlalchemy.exc import IntegrityError, OperationalError 

32from sqlalchemy.orm import joinedload, Session 

33 

34# First-Party 

35from mcpgateway.common.models import Message, PromptResult, Role, TextContent 

36from mcpgateway.config import settings 

37from mcpgateway.db import EmailTeam 

38from mcpgateway.db import Gateway as DbGateway 

39from mcpgateway.db import get_for_update 

40from mcpgateway.db import Prompt as DbPrompt 

41from mcpgateway.db import PromptMetric, PromptMetricsHourly, server_prompt_association 

42from mcpgateway.observability import create_span 

43from mcpgateway.plugins.framework import GlobalContext, PluginContextTable, PluginManager, PromptHookType, PromptPosthookPayload, PromptPrehookPayload 

44from mcpgateway.schemas import PromptCreate, PromptRead, PromptUpdate, TopPerformer 

45from mcpgateway.services.audit_trail_service import get_audit_trail_service 

46from mcpgateway.services.event_service import EventService 

47from mcpgateway.services.logging_service import LoggingService 

48from mcpgateway.services.metrics_cleanup_service import delete_metrics_in_batches, pause_rollup_during_purge 

49from mcpgateway.services.observability_service import current_trace_id, ObservabilityService 

50from mcpgateway.services.structured_logger import get_structured_logger 

51from mcpgateway.services.team_management_service import TeamManagementService 

52from mcpgateway.utils.create_slug import slugify 

53from mcpgateway.utils.metrics_common import build_top_performers 

54from mcpgateway.utils.pagination import unified_paginate 

55from mcpgateway.utils.sqlalchemy_modifier import json_contains_tag_expr 

56 

57# Cache import (lazy to avoid circular dependencies) 

58_REGISTRY_CACHE = None 

59 

60# Module-level Jinja environment singleton for template caching 

61_JINJA_ENV: Optional[Environment] = None 

62 

63 

64def _get_jinja_env() -> Environment: 

65 """Get or create the module-level Jinja environment singleton. 

66 

67 Returns: 

68 Jinja2 Environment with autoescape and trim settings. 

69 """ 

70 global _JINJA_ENV # pylint: disable=global-statement 

71 if _JINJA_ENV is None: 

72 _JINJA_ENV = Environment( 

73 autoescape=select_autoescape(["html", "xml"]), 

74 trim_blocks=True, 

75 lstrip_blocks=True, 

76 ) 

77 return _JINJA_ENV 

78 

79 

80@lru_cache(maxsize=256) 

81def _compile_jinja_template(template: str) -> Template: 

82 """Cache compiled Jinja template by template string. 

83 

84 Args: 

85 template: The template string to compile. 

86 

87 Returns: 

88 Compiled Jinja Template object. 

89 """ 

90 return _get_jinja_env().from_string(template) 

91 

92 

93def _get_registry_cache(): 

94 """Get registry cache singleton lazily. 

95 

96 Returns: 

97 RegistryCache instance. 

98 """ 

99 global _REGISTRY_CACHE # pylint: disable=global-statement 

100 if _REGISTRY_CACHE is None: 

101 # First-Party 

102 from mcpgateway.cache.registry_cache import registry_cache # pylint: disable=import-outside-toplevel 

103 

104 _REGISTRY_CACHE = registry_cache 

105 return _REGISTRY_CACHE 

106 

107 

108# Initialize logging service first 

109logging_service = LoggingService() 

110logger = logging_service.get_logger(__name__) 

111 

112# Initialize structured logger and audit trail for prompt operations 

113structured_logger = get_structured_logger("prompt_service") 

114audit_trail = get_audit_trail_service() 

115 

116 

117class PromptError(Exception): 

118 """Base class for prompt-related errors.""" 

119 

120 

121class PromptNotFoundError(PromptError): 

122 """Raised when a requested prompt is not found.""" 

123 

124 

125class PromptNameConflictError(PromptError): 

126 """Raised when a prompt name conflicts with existing (active or inactive) prompt.""" 

127 

128 def __init__(self, name: str, enabled: bool = True, prompt_id: Optional[int] = None, visibility: str = "public") -> None: 

129 """Initialize the error with prompt information. 

130 

131 Args: 

132 name: The conflicting prompt name 

133 enabled: Whether the existing prompt is enabled 

134 prompt_id: ID of the existing prompt if available 

135 visibility: Prompt visibility level (private, team, public). 

136 

137 Examples: 

138 >>> from mcpgateway.services.prompt_service import PromptNameConflictError 

139 >>> error = PromptNameConflictError("test_prompt") 

140 >>> error.name 

141 'test_prompt' 

142 >>> error.enabled 

143 True 

144 >>> error.prompt_id is None 

145 True 

146 >>> error = PromptNameConflictError("inactive_prompt", False, 123) 

147 >>> error.enabled 

148 False 

149 >>> error.prompt_id 

150 123 

151 """ 

152 self.name = name 

153 self.enabled = enabled 

154 self.prompt_id = prompt_id 

155 message = f"{visibility.capitalize()} Prompt already exists with name: {name}" 

156 if not enabled: 

157 message += f" (currently inactive, ID: {prompt_id})" 

158 super().__init__(message) 

159 

160 

161class PromptValidationError(PromptError): 

162 """Raised when prompt validation fails.""" 

163 

164 

165class PromptLockConflictError(PromptError): 

166 """Raised when a prompt row is locked by another transaction. 

167 

168 Raises: 

169 PromptLockConflictError: When attempting to modify a prompt that is 

170 currently locked by another concurrent request. 

171 """ 

172 

173 

174class PromptService: 

175 """Service for managing prompt templates. 

176 

177 Handles: 

178 - Template registration and retrieval 

179 - Argument validation 

180 - Template rendering 

181 - Resource embedding 

182 - Active/inactive status management 

183 """ 

184 

185 def __init__(self) -> None: 

186 """ 

187 Initialize the prompt service. 

188 

189 Sets up the Jinja2 environment for rendering prompt templates. 

190 Although these templates are rendered as JSON for the API, if the output is ever 

191 embedded into an HTML page, unescaped content could be exploited for cross-site scripting (XSS) attacks. 

192 Enabling autoescaping for 'html' and 'xml' templates via select_autoescape helps mitigate this risk. 

193 

194 Examples: 

195 >>> from mcpgateway.services.prompt_service import PromptService 

196 >>> service = PromptService() 

197 >>> isinstance(service._event_service, EventService) 

198 True 

199 >>> service._jinja_env is not None 

200 True 

201 """ 

202 self._event_service = EventService(channel_name="mcpgateway:prompt_events") 

203 # Use the module-level singleton for template caching 

204 self._jinja_env = _get_jinja_env() 

205 # Initialize plugin manager with env overrides for testability 

206 env_flag = os.getenv("PLUGINS_ENABLED") 

207 if env_flag is not None: 

208 env_enabled = env_flag.strip().lower() in {"1", "true", "yes", "on"} 

209 plugins_enabled = env_enabled 

210 else: 

211 plugins_enabled = settings.plugins_enabled 

212 config_file = os.getenv("PLUGIN_CONFIG_FILE", getattr(settings, "plugin_config_file", "plugins/config.yaml")) 

213 self._plugin_manager: PluginManager | None = PluginManager(config_file) if plugins_enabled else None 

214 

215 async def initialize(self) -> None: 

216 """Initialize the service.""" 

217 logger.info("Initializing prompt service") 

218 await self._event_service.initialize() 

219 

220 async def shutdown(self) -> None: 

221 """Shutdown the service. 

222 

223 Examples: 

224 >>> from mcpgateway.services.prompt_service import PromptService 

225 >>> from unittest.mock import AsyncMock 

226 >>> import asyncio 

227 >>> service = PromptService() 

228 >>> service._event_service = AsyncMock() 

229 >>> asyncio.run(service.shutdown()) 

230 >>> # Verify event service shutdown was called 

231 >>> service._event_service.shutdown.assert_awaited_once() 

232 """ 

233 await self._event_service.shutdown() 

234 logger.info("Prompt service shutdown complete") 

235 

236 async def get_top_prompts(self, db: Session, limit: Optional[int] = 5, include_deleted: bool = False) -> List[TopPerformer]: 

237 """Retrieve the top-performing prompts based on execution count. 

238 

239 Queries the database to get prompts with their metrics, ordered by the number of executions 

240 in descending order. Combines recent raw metrics with historical hourly rollups for complete 

241 historical coverage. Returns a list of TopPerformer objects containing prompt details and 

242 performance metrics. Results are cached for performance. 

243 

244 Args: 

245 db (Session): Database session for querying prompt metrics. 

246 limit (Optional[int]): Maximum number of prompts to return. Defaults to 5. 

247 include_deleted (bool): Whether to include deleted prompts from rollups. 

248 

249 Returns: 

250 List[TopPerformer]: A list of TopPerformer objects, each containing: 

251 - id: Prompt ID. 

252 - name: Prompt name. 

253 - execution_count: Total number of executions. 

254 - avg_response_time: Average response time in seconds, or None if no metrics. 

255 - success_rate: Success rate percentage, or None if no metrics. 

256 - last_execution: Timestamp of the last execution, or None if no metrics. 

257 """ 

258 # Check cache first (if enabled) 

259 # First-Party 

260 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel 

261 

262 effective_limit = limit or 5 

263 cache_key = f"top_prompts:{effective_limit}:include_deleted={include_deleted}" 

264 

265 if is_cache_enabled(): 

266 cached = metrics_cache.get(cache_key) 

267 if cached is not None: 

268 return cached 

269 

270 # Use combined query that includes both raw metrics and rollup data 

271 # First-Party 

272 from mcpgateway.services.metrics_query_service import get_top_performers_combined # pylint: disable=import-outside-toplevel 

273 

274 results = get_top_performers_combined( 

275 db=db, 

276 metric_type="prompt", 

277 entity_model=DbPrompt, 

278 limit=effective_limit, 

279 include_deleted=include_deleted, 

280 ) 

281 top_performers = build_top_performers(results) 

282 

283 # Cache the result (if enabled) 

284 if is_cache_enabled(): 

285 metrics_cache.set(cache_key, top_performers) 

286 

287 return top_performers 

288 

289 def convert_prompt_to_read(self, db_prompt: DbPrompt, include_metrics: bool = False) -> PromptRead: 

290 """ 

291 Convert a DbPrompt instance to a PromptRead Pydantic model, 

292 optionally including aggregated metrics computed from the associated PromptMetric records. 

293 

294 Args: 

295 db_prompt: Db prompt to convert 

296 include_metrics: Whether to include metrics in the result. Defaults to False. 

297 Set to False for list operations to avoid N+1 query issues. 

298 

299 Returns: 

300 PromptRead: Pydantic model instance 

301 """ 

302 arg_schema = db_prompt.argument_schema or {} 

303 properties = arg_schema.get("properties", {}) 

304 required_list = arg_schema.get("required", []) 

305 arguments_list = [] 

306 for arg_name, prop in properties.items(): 

307 arguments_list.append( 

308 { 

309 "name": arg_name, 

310 "description": prop.get("description") or "", 

311 "required": arg_name in required_list, 

312 } 

313 ) 

314 

315 # Compute aggregated metrics only if requested (avoids N+1 queries in list operations) 

316 if include_metrics: 

317 total = len(db_prompt.metrics) if hasattr(db_prompt, "metrics") and db_prompt.metrics is not None else 0 

318 successful = sum(1 for m in db_prompt.metrics if m.is_success) if total > 0 else 0 

319 failed = sum(1 for m in db_prompt.metrics if not m.is_success) if total > 0 else 0 

320 failure_rate = failed / total if total > 0 else 0.0 

321 min_rt = min((m.response_time for m in db_prompt.metrics), default=None) if total > 0 else None 

322 max_rt = max((m.response_time for m in db_prompt.metrics), default=None) if total > 0 else None 

323 avg_rt = (sum(m.response_time for m in db_prompt.metrics) / total) if total > 0 else None 

324 last_time = max((m.timestamp for m in db_prompt.metrics), default=None) if total > 0 else None 

325 

326 metrics_dict = { 

327 "totalExecutions": total, 

328 "successfulExecutions": successful, 

329 "failedExecutions": failed, 

330 "failureRate": failure_rate, 

331 "minResponseTime": min_rt, 

332 "maxResponseTime": max_rt, 

333 "avgResponseTime": avg_rt, 

334 "lastExecutionTime": last_time, 

335 } 

336 else: 

337 metrics_dict = None 

338 

339 original_name = getattr(db_prompt, "original_name", None) or db_prompt.name 

340 custom_name = getattr(db_prompt, "custom_name", None) or original_name 

341 custom_name_slug = getattr(db_prompt, "custom_name_slug", None) or slugify(custom_name) 

342 display_name = getattr(db_prompt, "display_name", None) or custom_name 

343 

344 prompt_dict = { 

345 "id": db_prompt.id, 

346 "name": db_prompt.name, 

347 "original_name": original_name, 

348 "custom_name": custom_name, 

349 "custom_name_slug": custom_name_slug, 

350 "display_name": display_name, 

351 "gateway_slug": getattr(db_prompt, "gateway_slug", None), 

352 "description": db_prompt.description, 

353 "template": db_prompt.template, 

354 "arguments": arguments_list, 

355 "created_at": db_prompt.created_at, 

356 "updated_at": db_prompt.updated_at, 

357 "enabled": db_prompt.enabled, 

358 "metrics": metrics_dict, 

359 "tags": db_prompt.tags or [], 

360 "visibility": db_prompt.visibility, 

361 "team": getattr(db_prompt, "team", None), 

362 # Include metadata fields for proper API response 

363 "created_by": getattr(db_prompt, "created_by", None), 

364 "modified_by": getattr(db_prompt, "modified_by", None), 

365 "created_from_ip": getattr(db_prompt, "created_from_ip", None), 

366 "created_via": getattr(db_prompt, "created_via", None), 

367 "created_user_agent": getattr(db_prompt, "created_user_agent", None), 

368 "modified_from_ip": getattr(db_prompt, "modified_from_ip", None), 

369 "modified_via": getattr(db_prompt, "modified_via", None), 

370 "modified_user_agent": getattr(db_prompt, "modified_user_agent", None), 

371 "version": getattr(db_prompt, "version", None), 

372 "team_id": getattr(db_prompt, "team_id", None), 

373 "owner_email": getattr(db_prompt, "owner_email", None), 

374 } 

375 return PromptRead.model_validate(prompt_dict) 

376 

377 def _get_team_name(self, db: Session, team_id: Optional[str]) -> Optional[str]: 

378 """Retrieve the team name given a team ID. 

379 

380 Args: 

381 db (Session): Database session for querying teams. 

382 team_id (Optional[str]): The ID of the team. 

383 

384 Returns: 

385 Optional[str]: The name of the team if found, otherwise None. 

386 """ 

387 if not team_id: 

388 return None 

389 team = db.query(EmailTeam).filter(EmailTeam.id == team_id, EmailTeam.is_active.is_(True)).first() 

390 db.commit() # Release transaction to avoid idle-in-transaction 

391 return team.name if team else None 

392 

393 def _compute_prompt_name(self, custom_name: str, gateway: Optional[Any] = None) -> str: 

394 """Compute the stored prompt name from custom_name and gateway context. 

395 

396 Args: 

397 custom_name: Prompt name to slugify and store. 

398 gateway: Optional gateway for namespacing. 

399 

400 Returns: 

401 The stored prompt name with gateway prefix when applicable. 

402 """ 

403 name_slug = slugify(custom_name) 

404 if gateway: 

405 gateway_slug = slugify(gateway.name) 

406 return f"{gateway_slug}{settings.gateway_tool_name_separator}{name_slug}" 

407 return name_slug 

408 

409 async def register_prompt( 

410 self, 

411 db: Session, 

412 prompt: PromptCreate, 

413 created_by: Optional[str] = None, 

414 created_from_ip: Optional[str] = None, 

415 created_via: Optional[str] = None, 

416 created_user_agent: Optional[str] = None, 

417 import_batch_id: Optional[str] = None, 

418 federation_source: Optional[str] = None, 

419 team_id: Optional[str] = None, 

420 owner_email: Optional[str] = None, 

421 visibility: Optional[str] = "public", 

422 ) -> PromptRead: 

423 """Register a new prompt template. 

424 

425 Args: 

426 db: Database session 

427 prompt: Prompt creation schema 

428 created_by: Username who created this prompt 

429 created_from_ip: IP address of creator 

430 created_via: Creation method (ui, api, import, federation) 

431 created_user_agent: User agent of creation request 

432 import_batch_id: UUID for bulk import operations 

433 federation_source: Source gateway for federated prompts 

434 team_id (Optional[str]): Team ID to assign the prompt to. 

435 owner_email (Optional[str]): Email of the user who owns this prompt. 

436 visibility (str): Prompt visibility level (private, team, public). 

437 

438 Returns: 

439 Created prompt information 

440 

441 Raises: 

442 IntegrityError: If a database integrity error occurs. 

443 PromptNameConflictError: If a prompt with the same name already exists. 

444 PromptError: For other prompt registration errors 

445 

446 Examples: 

447 >>> from mcpgateway.services.prompt_service import PromptService 

448 >>> from unittest.mock import MagicMock 

449 >>> service = PromptService() 

450 >>> db = MagicMock() 

451 >>> prompt = MagicMock() 

452 >>> db.execute.return_value.scalar_one_or_none.return_value = None 

453 >>> db.add = MagicMock() 

454 >>> db.commit = MagicMock() 

455 >>> db.refresh = MagicMock() 

456 >>> service._notify_prompt_added = MagicMock() 

457 >>> service.convert_prompt_to_read = MagicMock(return_value={}) 

458 >>> import asyncio 

459 >>> try: 

460 ... asyncio.run(service.register_prompt(db, prompt)) 

461 ... except Exception: 

462 ... pass 

463 """ 

464 try: 

465 # Validate template syntax 

466 self._validate_template(prompt.template) 

467 

468 # Extract required arguments from template 

469 required_args = self._get_required_arguments(prompt.template) 

470 

471 # Create argument schema 

472 argument_schema = { 

473 "type": "object", 

474 "properties": {}, 

475 "required": list(required_args), 

476 } 

477 for arg in prompt.arguments: 

478 schema = {"type": "string"} 

479 if arg.description is not None: 479 ↛ 481line 479 didn't jump to line 481 because the condition on line 479 was always true

480 schema["description"] = arg.description 

481 argument_schema["properties"][arg.name] = schema 

482 

483 custom_name = prompt.custom_name or prompt.name 

484 display_name = prompt.display_name or custom_name 

485 

486 # Extract gateway_id from prompt if present and look up gateway for namespacing 

487 gateway_id = getattr(prompt, "gateway_id", None) 

488 gateway = None 

489 if gateway_id: 

490 gateway = db.execute(select(DbGateway).where(DbGateway.id == gateway_id)).scalar_one_or_none() 

491 

492 computed_name = self._compute_prompt_name(custom_name, gateway=gateway) 

493 

494 # Create DB model 

495 db_prompt = DbPrompt( 

496 name=computed_name, 

497 original_name=prompt.name, 

498 custom_name=custom_name, 

499 display_name=display_name, 

500 description=prompt.description, 

501 template=prompt.template, 

502 argument_schema=argument_schema, 

503 tags=prompt.tags, 

504 # Metadata fields 

505 created_by=created_by, 

506 created_from_ip=created_from_ip, 

507 created_via=created_via, 

508 created_user_agent=created_user_agent, 

509 import_batch_id=import_batch_id, 

510 federation_source=federation_source, 

511 version=1, 

512 # Team scoping fields - use schema values if provided, otherwise fallback to parameters 

513 team_id=getattr(prompt, "team_id", None) or team_id, 

514 owner_email=getattr(prompt, "owner_email", None) or owner_email or created_by, 

515 visibility=getattr(prompt, "visibility", None) or visibility, 

516 gateway_id=gateway_id, 

517 ) 

518 # Check for existing server with the same name 

519 if visibility.lower() == "public": 

520 # Check for existing public prompt with the same name and gateway_id 

521 existing_prompt = db.execute(select(DbPrompt).where(DbPrompt.name == computed_name, DbPrompt.visibility == "public", DbPrompt.gateway_id == gateway_id)).scalar_one_or_none() 

522 if existing_prompt: 

523 raise PromptNameConflictError(computed_name, enabled=existing_prompt.enabled, prompt_id=existing_prompt.id, visibility=existing_prompt.visibility) 

524 elif visibility.lower() == "team": 

525 # Check for existing team prompt with the same name and gateway_id 

526 existing_prompt = db.execute( 

527 select(DbPrompt).where(DbPrompt.name == computed_name, DbPrompt.visibility == "team", DbPrompt.team_id == team_id, DbPrompt.gateway_id == gateway_id) 

528 ).scalar_one_or_none() 

529 if existing_prompt: 

530 raise PromptNameConflictError(computed_name, enabled=existing_prompt.enabled, prompt_id=existing_prompt.id, visibility=existing_prompt.visibility) 

531 

532 # Set gateway relationship to help the before_insert event handler compute the name correctly 

533 if gateway: 

534 db_prompt.gateway = gateway 

535 db_prompt.gateway_name_cache = gateway.name # type: ignore[attr-defined] 

536 

537 # Add to DB 

538 db.add(db_prompt) 

539 db.commit() 

540 db.refresh(db_prompt) 

541 # Notify subscribers 

542 await self._notify_prompt_added(db_prompt) 

543 

544 logger.info(f"Registered prompt: {prompt.name}") 

545 

546 # Structured logging: Audit trail for prompt creation 

547 audit_trail.log_action( 

548 user_id=created_by or "system", 

549 action="create_prompt", 

550 resource_type="prompt", 

551 resource_id=str(db_prompt.id), 

552 resource_name=db_prompt.name, 

553 user_email=owner_email, 

554 team_id=team_id, 

555 client_ip=created_from_ip, 

556 user_agent=created_user_agent, 

557 new_values={ 

558 "name": db_prompt.name, 

559 "visibility": visibility, 

560 }, 

561 context={ 

562 "created_via": created_via, 

563 "import_batch_id": import_batch_id, 

564 "federation_source": federation_source, 

565 }, 

566 db=db, 

567 ) 

568 

569 # Structured logging: Log successful prompt creation 

570 structured_logger.log( 

571 level="INFO", 

572 message="Prompt created successfully", 

573 event_type="prompt_created", 

574 component="prompt_service", 

575 user_id=created_by, 

576 user_email=owner_email, 

577 team_id=team_id, 

578 resource_type="prompt", 

579 resource_id=str(db_prompt.id), 

580 custom_fields={ 

581 "prompt_name": db_prompt.name, 

582 "visibility": visibility, 

583 }, 

584 db=db, 

585 ) 

586 

587 db_prompt.team = self._get_team_name(db, db_prompt.team_id) 

588 prompt_dict = self.convert_prompt_to_read(db_prompt) 

589 

590 # Invalidate cache after successful creation 

591 cache = _get_registry_cache() 

592 await cache.invalidate_prompts() 

593 # Also invalidate tags cache since prompt tags may have changed 

594 # First-Party 

595 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel 

596 

597 await admin_stats_cache.invalidate_tags() 

598 # First-Party 

599 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel 

600 

601 metrics_cache.invalidate_prefix("top_prompts:") 

602 metrics_cache.invalidate("prompts") 

603 

604 return PromptRead.model_validate(prompt_dict) 

605 

606 except IntegrityError as ie: 

607 logger.error(f"IntegrityErrors in group: {ie}") 

608 

609 structured_logger.log( 

610 level="ERROR", 

611 message="Prompt creation failed due to database integrity error", 

612 event_type="prompt_creation_failed", 

613 component="prompt_service", 

614 user_id=created_by, 

615 user_email=owner_email, 

616 error=ie, 

617 custom_fields={"prompt_name": prompt.name}, 

618 db=db, 

619 ) 

620 raise ie 

621 except PromptNameConflictError as se: 

622 db.rollback() 

623 

624 structured_logger.log( 

625 level="WARNING", 

626 message="Prompt creation failed due to name conflict", 

627 event_type="prompt_name_conflict", 

628 component="prompt_service", 

629 user_id=created_by, 

630 user_email=owner_email, 

631 custom_fields={"prompt_name": prompt.name, "visibility": visibility}, 

632 db=db, 

633 ) 

634 raise se 

635 except Exception as e: 

636 db.rollback() 

637 

638 structured_logger.log( 

639 level="ERROR", 

640 message="Prompt creation failed", 

641 event_type="prompt_creation_failed", 

642 component="prompt_service", 

643 user_id=created_by, 

644 user_email=owner_email, 

645 error=e, 

646 custom_fields={"prompt_name": prompt.name}, 

647 db=db, 

648 ) 

649 raise PromptError(f"Failed to register prompt: {str(e)}") 

650 

651 async def register_prompts_bulk( 

652 self, 

653 db: Session, 

654 prompts: List[PromptCreate], 

655 created_by: Optional[str] = None, 

656 created_from_ip: Optional[str] = None, 

657 created_via: Optional[str] = None, 

658 created_user_agent: Optional[str] = None, 

659 import_batch_id: Optional[str] = None, 

660 federation_source: Optional[str] = None, 

661 team_id: Optional[str] = None, 

662 owner_email: Optional[str] = None, 

663 visibility: Optional[str] = "public", 

664 conflict_strategy: str = "skip", 

665 ) -> Dict[str, Any]: 

666 """Register multiple prompts in bulk with a single commit. 

667 

668 This method provides significant performance improvements over individual 

669 prompt registration by: 

670 - Using db.add_all() instead of individual db.add() calls 

671 - Performing a single commit for all prompts 

672 - Batch conflict detection 

673 - Chunking for very large imports (>500 items) 

674 

675 Args: 

676 db: Database session 

677 prompts: List of prompt creation schemas 

678 created_by: Username who created these prompts 

679 created_from_ip: IP address of creator 

680 created_via: Creation method (ui, api, import, federation) 

681 created_user_agent: User agent of creation request 

682 import_batch_id: UUID for bulk import operations 

683 federation_source: Source gateway for federated prompts 

684 team_id: Team ID to assign the prompts to 

685 owner_email: Email of the user who owns these prompts 

686 visibility: Prompt visibility level (private, team, public) 

687 conflict_strategy: How to handle conflicts (skip, update, rename, fail) 

688 

689 Returns: 

690 Dict with statistics: 

691 - created: Number of prompts created 

692 - updated: Number of prompts updated 

693 - skipped: Number of prompts skipped 

694 - failed: Number of prompts that failed 

695 - errors: List of error messages 

696 

697 Raises: 

698 PromptError: If bulk registration fails critically 

699 

700 Examples: 

701 >>> from mcpgateway.services.prompt_service import PromptService 

702 >>> from unittest.mock import MagicMock 

703 >>> service = PromptService() 

704 >>> db = MagicMock() 

705 >>> prompts = [MagicMock(), MagicMock()] 

706 >>> import asyncio 

707 >>> try: 

708 ... result = asyncio.run(service.register_prompts_bulk(db, prompts)) 

709 ... except Exception: 

710 ... pass 

711 """ 

712 if not prompts: 

713 return {"created": 0, "updated": 0, "skipped": 0, "failed": 0, "errors": []} 

714 

715 stats = {"created": 0, "updated": 0, "skipped": 0, "failed": 0, "errors": []} 

716 

717 # Process in chunks to avoid memory issues and SQLite parameter limits 

718 chunk_size = 500 

719 

720 for chunk_start in range(0, len(prompts), chunk_size): 

721 chunk = prompts[chunk_start : chunk_start + chunk_size] 

722 

723 try: 

724 # Collect unique gateway_ids and look them up 

725 gateway_ids = set() 

726 for prompt in chunk: 

727 gw_id = getattr(prompt, "gateway_id", None) 

728 if gw_id: 

729 gateway_ids.add(gw_id) 

730 

731 gateways_map: Dict[str, Any] = {} 

732 if gateway_ids: 

733 gateways = db.execute(select(DbGateway).where(DbGateway.id.in_(gateway_ids))).scalars().all() 

734 gateways_map = {gw.id: gw for gw in gateways} 

735 

736 # Batch check for existing prompts to detect conflicts 

737 # Build computed names with gateway context 

738 prompt_names = [] 

739 for prompt in chunk: 

740 custom_name = getattr(prompt, "custom_name", None) or prompt.name 

741 gw_id = getattr(prompt, "gateway_id", None) 

742 gateway = gateways_map.get(gw_id) if gw_id else None 

743 computed_name = self._compute_prompt_name(custom_name, gateway=gateway) 

744 prompt_names.append(computed_name) 

745 

746 # Query for existing prompts - need to consider gateway_id in conflict detection 

747 # Build base query conditions 

748 if visibility.lower() == "public": 

749 base_conditions = [DbPrompt.name.in_(prompt_names), DbPrompt.visibility == "public"] 

750 elif visibility.lower() == "team" and team_id: 

751 base_conditions = [DbPrompt.name.in_(prompt_names), DbPrompt.visibility == "team", DbPrompt.team_id == team_id] 

752 else: 

753 # Private prompts - check by owner 

754 base_conditions = [DbPrompt.name.in_(prompt_names), DbPrompt.visibility == "private", DbPrompt.owner_email == (owner_email or created_by)] 

755 

756 existing_prompts_query = select(DbPrompt).where(*base_conditions) 

757 existing_prompts = db.execute(existing_prompts_query).scalars().all() 

758 # Use (name, gateway_id) tuple as key for proper conflict detection 

759 existing_prompts_map = {(p.name, p.gateway_id): p for p in existing_prompts} 

760 

761 prompts_to_add = [] 

762 prompts_to_update = [] 

763 

764 for prompt in chunk: 

765 try: 

766 # Validate template syntax 

767 self._validate_template(prompt.template) 

768 

769 # Extract required arguments from template 

770 required_args = self._get_required_arguments(prompt.template) 

771 

772 # Create argument schema 

773 argument_schema = { 

774 "type": "object", 

775 "properties": {}, 

776 "required": list(required_args), 

777 } 

778 for arg in prompt.arguments: 

779 schema = {"type": "string"} 

780 if arg.description is not None: 

781 schema["description"] = arg.description 

782 argument_schema["properties"][arg.name] = schema 

783 

784 # Use provided parameters or schema values 

785 prompt_team_id = team_id if team_id is not None else getattr(prompt, "team_id", None) 

786 prompt_owner_email = owner_email or getattr(prompt, "owner_email", None) or created_by 

787 prompt_visibility = visibility if visibility is not None else getattr(prompt, "visibility", "public") 

788 prompt_gateway_id = getattr(prompt, "gateway_id", None) 

789 

790 custom_name = getattr(prompt, "custom_name", None) or prompt.name 

791 display_name = getattr(prompt, "display_name", None) or custom_name 

792 gateway = gateways_map.get(prompt_gateway_id) if prompt_gateway_id else None 

793 computed_name = self._compute_prompt_name(custom_name, gateway=gateway) 

794 

795 # Look up existing prompt by (name, gateway_id) tuple 

796 existing_prompt = existing_prompts_map.get((computed_name, prompt_gateway_id)) 

797 

798 if existing_prompt: 

799 # Handle conflict based on strategy 

800 if conflict_strategy == "skip": 

801 stats["skipped"] += 1 

802 continue 

803 if conflict_strategy == "update": 

804 # Update existing prompt 

805 existing_prompt.description = prompt.description 

806 existing_prompt.template = prompt.template 

807 # Clear template cache to reduce memory growth 

808 _compile_jinja_template.cache_clear() 

809 existing_prompt.argument_schema = argument_schema 

810 existing_prompt.tags = prompt.tags or [] 

811 if getattr(prompt, "custom_name", None) is not None: 

812 existing_prompt.custom_name = custom_name 

813 if getattr(prompt, "display_name", None) is not None: 

814 existing_prompt.display_name = display_name 

815 existing_prompt.modified_by = created_by 

816 existing_prompt.modified_from_ip = created_from_ip 

817 existing_prompt.modified_via = created_via 

818 existing_prompt.modified_user_agent = created_user_agent 

819 existing_prompt.updated_at = datetime.now(timezone.utc) 

820 existing_prompt.version = (existing_prompt.version or 1) + 1 

821 

822 prompts_to_update.append(existing_prompt) 

823 stats["updated"] += 1 

824 elif conflict_strategy == "rename": 

825 # Create with renamed prompt 

826 new_name = f"{prompt.name}_imported_{int(datetime.now().timestamp())}" 

827 new_custom_name = new_name 

828 new_display_name = new_name 

829 computed_name = self._compute_prompt_name(new_custom_name, gateway=gateway) 

830 db_prompt = DbPrompt( 

831 name=computed_name, 

832 original_name=prompt.name, 

833 custom_name=new_custom_name, 

834 display_name=new_display_name, 

835 description=prompt.description, 

836 template=prompt.template, 

837 argument_schema=argument_schema, 

838 tags=prompt.tags or [], 

839 created_by=created_by, 

840 created_from_ip=created_from_ip, 

841 created_via=created_via, 

842 created_user_agent=created_user_agent, 

843 import_batch_id=import_batch_id, 

844 federation_source=federation_source, 

845 version=1, 

846 team_id=prompt_team_id, 

847 owner_email=prompt_owner_email, 

848 visibility=prompt_visibility, 

849 gateway_id=prompt_gateway_id, 

850 ) 

851 # Set gateway relationship to help the before_insert event handler 

852 if gateway: 852 ↛ 855line 852 didn't jump to line 855 because the condition on line 852 was always true

853 db_prompt.gateway = gateway 

854 db_prompt.gateway_name_cache = gateway.name # type: ignore[attr-defined] 

855 prompts_to_add.append(db_prompt) 

856 stats["created"] += 1 

857 elif conflict_strategy == "fail": 857 ↛ 764line 857 didn't jump to line 764 because the condition on line 857 was always true

858 stats["failed"] += 1 

859 stats["errors"].append(f"Prompt name conflict: {prompt.name}") 

860 continue 

861 else: 

862 # Create new prompt 

863 db_prompt = DbPrompt( 

864 name=computed_name, 

865 original_name=prompt.name, 

866 custom_name=custom_name, 

867 display_name=display_name, 

868 description=prompt.description, 

869 template=prompt.template, 

870 argument_schema=argument_schema, 

871 tags=prompt.tags or [], 

872 created_by=created_by, 

873 created_from_ip=created_from_ip, 

874 created_via=created_via, 

875 created_user_agent=created_user_agent, 

876 import_batch_id=import_batch_id, 

877 federation_source=federation_source, 

878 version=1, 

879 team_id=prompt_team_id, 

880 owner_email=prompt_owner_email, 

881 visibility=prompt_visibility, 

882 gateway_id=prompt_gateway_id, 

883 ) 

884 # Set gateway relationship to help the before_insert event handler 

885 if gateway: 

886 db_prompt.gateway = gateway 

887 db_prompt.gateway_name_cache = gateway.name # type: ignore[attr-defined] 

888 prompts_to_add.append(db_prompt) 

889 stats["created"] += 1 

890 

891 except Exception as e: 

892 stats["failed"] += 1 

893 stats["errors"].append(f"Failed to process prompt {prompt.name}: {str(e)}") 

894 logger.warning(f"Failed to process prompt {prompt.name} in bulk operation: {str(e)}") 

895 continue 

896 

897 # Bulk add new prompts 

898 if prompts_to_add: 

899 db.add_all(prompts_to_add) 

900 

901 # Commit the chunk 

902 db.commit() 

903 

904 # Refresh prompts for notifications and audit trail 

905 for db_prompt in prompts_to_add: 

906 db.refresh(db_prompt) 

907 # Notify subscribers 

908 await self._notify_prompt_added(db_prompt) 

909 

910 # Log bulk audit trail entry 

911 if prompts_to_add or prompts_to_update: 

912 audit_trail.log_action( 

913 user_id=created_by or "system", 

914 action="bulk_create_prompts" if prompts_to_add else "bulk_update_prompts", 

915 resource_type="prompt", 

916 resource_id=import_batch_id or "bulk_operation", 

917 resource_name=f"Bulk operation: {len(prompts_to_add)} created, {len(prompts_to_update)} updated", 

918 user_email=owner_email, 

919 team_id=team_id, 

920 client_ip=created_from_ip, 

921 user_agent=created_user_agent, 

922 new_values={ 

923 "prompts_created": len(prompts_to_add), 

924 "prompts_updated": len(prompts_to_update), 

925 "visibility": visibility, 

926 }, 

927 context={ 

928 "created_via": created_via, 

929 "import_batch_id": import_batch_id, 

930 "federation_source": federation_source, 

931 "conflict_strategy": conflict_strategy, 

932 }, 

933 db=db, 

934 ) 

935 

936 logger.info(f"Bulk registered {len(prompts_to_add)} prompts, updated {len(prompts_to_update)} prompts in chunk") 

937 

938 except Exception as e: 

939 db.rollback() 

940 logger.error(f"Failed to process chunk in bulk prompt registration: {str(e)}") 

941 stats["failed"] += len(chunk) 

942 stats["errors"].append(f"Chunk processing failed: {str(e)}") 

943 continue 

944 

945 # Final structured logging 

946 structured_logger.log( 

947 level="INFO", 

948 message="Bulk prompt registration completed", 

949 event_type="prompts_bulk_created", 

950 component="prompt_service", 

951 user_id=created_by, 

952 user_email=owner_email, 

953 team_id=team_id, 

954 resource_type="prompt", 

955 custom_fields={ 

956 "prompts_created": stats["created"], 

957 "prompts_updated": stats["updated"], 

958 "prompts_skipped": stats["skipped"], 

959 "prompts_failed": stats["failed"], 

960 "total_prompts": len(prompts), 

961 "visibility": visibility, 

962 "conflict_strategy": conflict_strategy, 

963 }, 

964 db=db, 

965 ) 

966 

967 return stats 

968 

969 async def list_prompts( 

970 self, 

971 db: Session, 

972 include_inactive: bool = False, 

973 cursor: Optional[str] = None, 

974 tags: Optional[List[str]] = None, 

975 limit: Optional[int] = None, 

976 page: Optional[int] = None, 

977 per_page: Optional[int] = None, 

978 user_email: Optional[str] = None, 

979 team_id: Optional[str] = None, 

980 visibility: Optional[str] = None, 

981 token_teams: Optional[List[str]] = None, 

982 ) -> Union[tuple[List[PromptRead], Optional[str]], Dict[str, Any]]: 

983 """ 

984 Retrieve a list of prompt templates from the database with pagination support. 

985 

986 This method retrieves prompt templates from the database and converts them into a list 

987 of PromptRead objects. It supports filtering out inactive prompts based on the 

988 include_inactive parameter and cursor-based pagination. 

989 

990 Args: 

991 db (Session): The SQLAlchemy database session. 

992 include_inactive (bool): If True, include inactive prompts in the result. 

993 Defaults to False. 

994 cursor (Optional[str], optional): An opaque cursor token for pagination. 

995 Opaque base64-encoded string containing last item's ID and created_at. 

996 tags (Optional[List[str]]): Filter prompts by tags. If provided, only prompts with at least one matching tag will be returned. 

997 limit (Optional[int]): Maximum number of prompts to return. Use 0 for all prompts (no limit). 

998 If not specified, uses pagination_default_page_size. 

999 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor. 

1000 per_page: Items per page for page-based pagination. Defaults to pagination_default_page_size. 

1001 user_email (Optional[str]): User email for team-based access control. If None, no access control is applied. 

1002 team_id (Optional[str]): Filter by specific team ID. Requires user_email for access validation. 

1003 visibility (Optional[str]): Filter by visibility (private, team, public). 

1004 token_teams (Optional[List[str]]): Override DB team lookup with token's teams. Used for MCP/API token access 

1005 where the token scope should be respected instead of the user's full team memberships. 

1006 

1007 Returns: 

1008 If page is provided: Dict with {"data": [...], "pagination": {...}, "links": {...}} 

1009 If cursor is provided or neither: tuple of (list of PromptRead objects, next_cursor). 

1010 

1011 Examples: 

1012 >>> from mcpgateway.services.prompt_service import PromptService 

1013 >>> from unittest.mock import MagicMock 

1014 >>> from mcpgateway.schemas import PromptRead 

1015 >>> service = PromptService() 

1016 >>> db = MagicMock() 

1017 >>> prompt_read_obj = MagicMock(spec=PromptRead) 

1018 >>> service.convert_prompt_to_read = MagicMock(return_value=prompt_read_obj) 

1019 >>> db.execute.return_value.scalars.return_value.all.return_value = [MagicMock()] 

1020 >>> import asyncio 

1021 >>> prompts, next_cursor = asyncio.run(service.list_prompts(db)) 

1022 >>> prompts == [prompt_read_obj] 

1023 True 

1024 """ 

1025 # Check cache for first page only (cursor=None) 

1026 # Skip caching when: 

1027 # - user_email is provided (team-filtered results are user-specific) 

1028 # - token_teams is set (scoped access, e.g., public-only or team-scoped tokens) 

1029 # - page-based pagination is used 

1030 # This prevents cache poisoning where admin results could leak to public-only requests 

1031 cache = _get_registry_cache() 

1032 if cursor is None and user_email is None and token_teams is None and page is None: 

1033 filters_hash = cache.hash_filters(include_inactive=include_inactive, tags=sorted(tags) if tags else None) 

1034 cached = await cache.get("prompts", filters_hash) 

1035 if cached is not None: 

1036 # Reconstruct PromptRead objects from cached dicts 

1037 cached_prompts = [PromptRead.model_validate(p) for p in cached["prompts"]] 

1038 return (cached_prompts, cached.get("next_cursor")) 

1039 

1040 # Build base query with ordering and eager load gateway to avoid N+1 

1041 query = select(DbPrompt).options(joinedload(DbPrompt.gateway)).order_by(desc(DbPrompt.created_at), desc(DbPrompt.id)) 

1042 

1043 if not include_inactive: 

1044 query = query.where(DbPrompt.enabled) 

1045 

1046 # Apply team-based access control if user_email is provided OR token_teams is explicitly set 

1047 # This ensures unauthenticated requests with token_teams=[] only see public prompts 

1048 if user_email is not None or token_teams is not None: # empty-string user_email -> public-only filtering (secure default) 

1049 # Use token_teams if provided (for MCP/API token access), otherwise look up from DB 

1050 if token_teams is not None: 

1051 team_ids = token_teams 

1052 elif user_email: 

1053 team_service = TeamManagementService(db) 

1054 user_teams = await team_service.get_user_teams(user_email) 

1055 team_ids = [team.id for team in user_teams] 

1056 else: 

1057 team_ids = [] 

1058 

1059 # Check if this is a public-only token (empty teams array) 

1060 # Public-only tokens can ONLY see public resources - no owner access 

1061 is_public_only_token = token_teams is not None and len(token_teams) == 0 

1062 

1063 if team_id: 

1064 # User requesting specific team - verify access 

1065 if team_id not in team_ids: 

1066 return ([], None) 

1067 access_conditions = [ 

1068 and_(DbPrompt.team_id == team_id, DbPrompt.visibility.in_(["team", "public"])), 

1069 ] 

1070 # Only include owner access for non-public-only tokens with user_email 

1071 if not is_public_only_token and user_email: 1071 ↛ 1073line 1071 didn't jump to line 1073 because the condition on line 1071 was always true

1072 access_conditions.append(and_(DbPrompt.team_id == team_id, DbPrompt.owner_email == user_email)) 

1073 query = query.where(or_(*access_conditions)) 

1074 else: 

1075 # General access: public prompts + team prompts (+ owner prompts if not public-only token) 

1076 access_conditions = [ 

1077 DbPrompt.visibility == "public", 

1078 ] 

1079 # Only include owner access for non-public-only tokens with user_email 

1080 if not is_public_only_token and user_email: 

1081 access_conditions.append(DbPrompt.owner_email == user_email) 

1082 if team_ids: 

1083 access_conditions.append(and_(DbPrompt.team_id.in_(team_ids), DbPrompt.visibility.in_(["team", "public"]))) 

1084 query = query.where(or_(*access_conditions)) 

1085 

1086 if visibility: 

1087 query = query.where(DbPrompt.visibility == visibility) 

1088 

1089 # Add tag filtering if tags are provided (supports both List[str] and List[Dict] formats) 

1090 if tags: 

1091 query = query.where(json_contains_tag_expr(db, DbPrompt.tags, tags, match_any=True)) 

1092 

1093 # Use unified pagination helper - handles both page and cursor pagination 

1094 pag_result = await unified_paginate( 

1095 db=db, 

1096 query=query, 

1097 page=page, 

1098 per_page=per_page, 

1099 cursor=cursor, 

1100 limit=limit, 

1101 base_url="/admin/prompts", # Used for page-based links 

1102 query_params={"include_inactive": include_inactive} if include_inactive else {}, 

1103 ) 

1104 

1105 next_cursor = None 

1106 # Extract servers based on pagination type 

1107 if page is not None: 

1108 # Page-based: pag_result is a dict 

1109 prompts_db = pag_result["data"] 

1110 else: 

1111 # Cursor-based: pag_result is a tuple 

1112 prompts_db, next_cursor = pag_result 

1113 

1114 # Fetch team names for the prompts (common for both pagination types) 

1115 team_ids_set = {s.team_id for s in prompts_db if s.team_id} 

1116 team_map = {} 

1117 if team_ids_set: 

1118 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(team_ids_set), EmailTeam.is_active.is_(True))).all() 

1119 team_map = {team.id: team.name for team in teams} 

1120 

1121 db.commit() # Release transaction to avoid idle-in-transaction 

1122 

1123 # Convert to PromptRead (common for both pagination types) 

1124 result = [] 

1125 for s in prompts_db: 

1126 try: 

1127 s.team = team_map.get(s.team_id) if s.team_id else None 

1128 result.append(self.convert_prompt_to_read(s, include_metrics=False)) 

1129 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e: 

1130 logger.exception(f"Failed to convert prompt {getattr(s, 'id', 'unknown')} ({getattr(s, 'name', 'unknown')}): {e}") 

1131 # Continue with remaining prompts instead of failing completely 

1132 # Return appropriate format based on pagination type 

1133 if page is not None: 

1134 # Page-based format 

1135 return { 

1136 "data": result, 

1137 "pagination": pag_result["pagination"], 

1138 "links": pag_result["links"], 

1139 } 

1140 

1141 # Cursor-based format 

1142 

1143 # Cache first page results - only for non-user-specific/non-scoped queries 

1144 # Must match the same conditions as cache lookup to prevent cache poisoning 

1145 if cursor is None and user_email is None and token_teams is None: 

1146 try: 

1147 cache_data = {"prompts": [s.model_dump(mode="json") for s in result], "next_cursor": next_cursor} 

1148 await cache.set("prompts", cache_data, filters_hash) 

1149 except AttributeError: 

1150 pass # Skip caching if result objects don't support model_dump (e.g., in doctests) 

1151 

1152 return (result, next_cursor) 

1153 

1154 async def list_prompts_for_user( 

1155 self, db: Session, user_email: str, team_id: Optional[str] = None, visibility: Optional[str] = None, include_inactive: bool = False, skip: int = 0, limit: int = 100 

1156 ) -> List[PromptRead]: 

1157 """ 

1158 DEPRECATED: Use list_prompts() with user_email parameter instead. 

1159 

1160 This method is maintained for backward compatibility but is no longer used. 

1161 New code should call list_prompts() with user_email, team_id, and visibility parameters. 

1162 

1163 List prompts user has access to with team filtering. 

1164 

1165 Args: 

1166 db: Database session 

1167 user_email: Email of the user requesting prompts 

1168 team_id: Optional team ID to filter by specific team 

1169 visibility: Optional visibility filter (private, team, public) 

1170 include_inactive: Whether to include inactive prompts 

1171 skip: Number of prompts to skip for pagination 

1172 limit: Maximum number of prompts to return 

1173 

1174 Returns: 

1175 List[PromptRead]: Prompts the user has access to 

1176 """ 

1177 # Build query following existing patterns from list_prompts() 

1178 team_service = TeamManagementService(db) 

1179 user_teams = await team_service.get_user_teams(user_email) 

1180 team_ids = [team.id for team in user_teams] 

1181 

1182 # Build query following existing patterns from list_resources() 

1183 # Eager load gateway to avoid N+1 when accessing gateway_slug 

1184 query = select(DbPrompt).options(joinedload(DbPrompt.gateway)) 

1185 

1186 # Apply active/inactive filter 

1187 if not include_inactive: 

1188 query = query.where(DbPrompt.enabled) 

1189 

1190 if team_id: 

1191 if team_id not in team_ids: 

1192 return [] # No access to team 

1193 

1194 access_conditions = [] 

1195 # Filter by specific team 

1196 access_conditions.append(and_(DbPrompt.team_id == team_id, DbPrompt.visibility.in_(["team", "public"]))) 

1197 

1198 access_conditions.append(and_(DbPrompt.team_id == team_id, DbPrompt.owner_email == user_email)) 

1199 

1200 query = query.where(or_(*access_conditions)) 

1201 else: 

1202 # Get user's accessible teams 

1203 # Build access conditions following existing patterns 

1204 access_conditions = [] 

1205 # 1. User's personal resources (owner_email matches) 

1206 access_conditions.append(DbPrompt.owner_email == user_email) 

1207 # 2. Team resources where user is member 

1208 if team_ids: 1208 ↛ 1209line 1208 didn't jump to line 1209 because the condition on line 1208 was never true

1209 access_conditions.append(and_(DbPrompt.team_id.in_(team_ids), DbPrompt.visibility.in_(["team", "public"]))) 

1210 # 3. Public resources (if visibility allows) 

1211 access_conditions.append(DbPrompt.visibility == "public") 

1212 

1213 query = query.where(or_(*access_conditions)) 

1214 

1215 # Apply visibility filter if specified 

1216 if visibility: 

1217 query = query.where(DbPrompt.visibility == visibility) 

1218 

1219 # Apply pagination following existing patterns 

1220 query = query.offset(skip).limit(limit) 

1221 

1222 prompts = db.execute(query).scalars().all() 

1223 

1224 # Batch fetch team names to avoid N+1 queries 

1225 prompt_team_ids = {p.team_id for p in prompts if p.team_id} 

1226 team_map = {} 

1227 if prompt_team_ids: 

1228 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(prompt_team_ids), EmailTeam.is_active.is_(True))).all() 

1229 team_map = {str(team.id): team.name for team in teams} 

1230 

1231 db.commit() # Release transaction to avoid idle-in-transaction 

1232 

1233 result = [] 

1234 for t in prompts: 

1235 try: 

1236 t.team = team_map.get(str(t.team_id)) if t.team_id else None 

1237 result.append(self.convert_prompt_to_read(t, include_metrics=False)) 

1238 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e: 

1239 logger.exception(f"Failed to convert prompt {getattr(t, 'id', 'unknown')} ({getattr(t, 'name', 'unknown')}): {e}") 

1240 # Continue with remaining prompts instead of failing completely 

1241 return result 

1242 

1243 async def list_server_prompts( 

1244 self, 

1245 db: Session, 

1246 server_id: str, 

1247 include_inactive: bool = False, 

1248 cursor: Optional[str] = None, 

1249 user_email: Optional[str] = None, 

1250 token_teams: Optional[List[str]] = None, 

1251 ) -> List[PromptRead]: 

1252 """ 

1253 Retrieve a list of prompt templates from the database. 

1254 

1255 This method retrieves prompt templates from the database and converts them into a list 

1256 of PromptRead objects. It supports filtering out inactive prompts based on the 

1257 include_inactive parameter. The cursor parameter is reserved for future pagination support 

1258 but is currently not implemented. 

1259 

1260 Args: 

1261 db (Session): The SQLAlchemy database session. 

1262 server_id (str): Server ID 

1263 include_inactive (bool): If True, include inactive prompts in the result. 

1264 Defaults to False. 

1265 cursor (Optional[str], optional): An opaque cursor token for pagination. Currently, 

1266 this parameter is ignored. Defaults to None. 

1267 user_email (Optional[str]): User email for visibility filtering. If None, no filtering applied. 

1268 token_teams (Optional[List[str]]): Override DB team lookup with token's teams. Used for MCP/API 

1269 token access where the token scope should be respected. 

1270 

1271 Returns: 

1272 List[PromptRead]: A list of prompt templates represented as PromptRead objects. 

1273 

1274 Examples: 

1275 >>> from mcpgateway.services.prompt_service import PromptService 

1276 >>> from unittest.mock import MagicMock 

1277 >>> from mcpgateway.schemas import PromptRead 

1278 >>> service = PromptService() 

1279 >>> db = MagicMock() 

1280 >>> prompt_read_obj = MagicMock(spec=PromptRead) 

1281 >>> service.convert_prompt_to_read = MagicMock(return_value=prompt_read_obj) 

1282 >>> db.execute.return_value.scalars.return_value.all.return_value = [MagicMock()] 

1283 >>> import asyncio 

1284 >>> result = asyncio.run(service.list_server_prompts(db, 'server1')) 

1285 >>> result == [prompt_read_obj] 

1286 True 

1287 """ 

1288 # Eager load gateway to avoid N+1 when accessing gateway_slug 

1289 query = ( 

1290 select(DbPrompt) 

1291 .options(joinedload(DbPrompt.gateway)) 

1292 .join(server_prompt_association, DbPrompt.id == server_prompt_association.c.prompt_id) 

1293 .where(server_prompt_association.c.server_id == server_id) 

1294 ) 

1295 if not include_inactive: 

1296 query = query.where(DbPrompt.enabled) 

1297 

1298 # Add visibility filtering if user context OR token_teams provided 

1299 # This ensures unauthenticated requests with token_teams=[] only see public prompts 

1300 if user_email is not None or token_teams is not None: # empty-string user_email -> public-only filtering (secure default) 

1301 # Use token_teams if provided (for MCP/API token access), otherwise look up from DB 

1302 if token_teams is not None: 

1303 team_ids = token_teams 

1304 elif user_email: 

1305 team_service = TeamManagementService(db) 

1306 user_teams = await team_service.get_user_teams(user_email) 

1307 team_ids = [team.id for team in user_teams] 

1308 else: 

1309 team_ids = [] 

1310 

1311 # Check if this is a public-only token (empty teams array) 

1312 # Public-only tokens can ONLY see public resources - no owner access 

1313 is_public_only_token = token_teams is not None and len(token_teams) == 0 

1314 

1315 access_conditions = [ 

1316 DbPrompt.visibility == "public", 

1317 ] 

1318 # Only include owner access for non-public-only tokens with user_email 

1319 if not is_public_only_token and user_email: 

1320 access_conditions.append(DbPrompt.owner_email == user_email) 

1321 if team_ids: 

1322 access_conditions.append(and_(DbPrompt.team_id.in_(team_ids), DbPrompt.visibility.in_(["team", "public"]))) 

1323 query = query.where(or_(*access_conditions)) 

1324 

1325 # Cursor-based pagination logic can be implemented here in the future. 

1326 logger.debug(cursor) 

1327 prompts = db.execute(query).scalars().all() 

1328 

1329 # Batch fetch team names to avoid N+1 queries 

1330 prompt_team_ids = {p.team_id for p in prompts if p.team_id} 

1331 team_map = {} 

1332 if prompt_team_ids: 

1333 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(prompt_team_ids), EmailTeam.is_active.is_(True))).all() 

1334 team_map = {str(team.id): team.name for team in teams} 

1335 

1336 db.commit() # Release transaction to avoid idle-in-transaction 

1337 

1338 result = [] 

1339 for t in prompts: 

1340 try: 

1341 t.team = team_map.get(str(t.team_id)) if t.team_id else None 

1342 result.append(self.convert_prompt_to_read(t, include_metrics=False)) 

1343 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e: 

1344 logger.exception(f"Failed to convert prompt {getattr(t, 'id', 'unknown')} ({getattr(t, 'name', 'unknown')}): {e}") 

1345 # Continue with remaining prompts instead of failing completely 

1346 return result 

1347 

1348 async def _record_prompt_metric(self, db: Session, prompt: DbPrompt, start_time: float, success: bool, error_message: Optional[str]) -> None: 

1349 """ 

1350 Records a metric for a prompt invocation. 

1351 

1352 Args: 

1353 db: Database session 

1354 prompt: The prompt that was invoked 

1355 start_time: Monotonic start time of the invocation 

1356 success: True if successful, False otherwise 

1357 error_message: Error message if failed, None otherwise 

1358 """ 

1359 end_time = time.monotonic() 

1360 response_time = end_time - start_time 

1361 

1362 metric = PromptMetric( 

1363 prompt_id=prompt.id, 

1364 response_time=response_time, 

1365 is_success=success, 

1366 error_message=error_message, 

1367 ) 

1368 db.add(metric) 

1369 db.commit() 

1370 

1371 async def _check_prompt_access( 

1372 self, 

1373 db: Session, 

1374 prompt: DbPrompt, 

1375 user_email: Optional[str], 

1376 token_teams: Optional[List[str]], 

1377 ) -> bool: 

1378 """Check if user has access to a prompt based on visibility rules. 

1379 

1380 Implements the same access control logic as list_prompts() for consistency. 

1381 

1382 Args: 

1383 db: Database session for team membership lookup if needed. 

1384 prompt: Prompt ORM object with visibility, team_id, owner_email. 

1385 user_email: Email of the requesting user (None = unauthenticated). 

1386 token_teams: List of team IDs from token. 

1387 - None = unrestricted admin access 

1388 - [] = public-only token 

1389 - [...] = team-scoped token 

1390 

1391 Returns: 

1392 True if access is allowed, False otherwise. 

1393 """ 

1394 visibility = getattr(prompt, "visibility", "public") 

1395 prompt_team_id = getattr(prompt, "team_id", None) 

1396 prompt_owner_email = getattr(prompt, "owner_email", None) 

1397 

1398 # Public prompts are accessible by everyone 

1399 if visibility == "public": 

1400 return True 

1401 

1402 # Admin bypass: token_teams=None AND user_email=None means unrestricted admin 

1403 # This happens when is_admin=True and no team scoping in token 

1404 if token_teams is None and user_email is None: 

1405 return True 

1406 

1407 # No user context (but not admin) = deny access to non-public prompts 

1408 if not user_email: 

1409 return False 

1410 

1411 # Public-only tokens (empty teams array) can ONLY access public prompts 

1412 is_public_only_token = token_teams is not None and len(token_teams) == 0 

1413 if is_public_only_token: 

1414 return False # Already checked public above 

1415 

1416 # Owner can always access their own prompts 

1417 if prompt_owner_email and prompt_owner_email == user_email: 

1418 return True 

1419 

1420 # Team prompts: check team membership (matches list_prompts behavior) 

1421 if prompt_team_id: 

1422 # Use token_teams if provided, otherwise look up from DB 

1423 if token_teams is not None: 

1424 team_ids = token_teams 

1425 else: 

1426 team_service = TeamManagementService(db) 

1427 user_teams = await team_service.get_user_teams(user_email) 

1428 team_ids = [team.id for team in user_teams] 

1429 

1430 # Team/public visibility allows access if user is in the team 

1431 if visibility in ["team", "public"] and prompt_team_id in team_ids: 

1432 return True 

1433 

1434 return False 

1435 

1436 async def get_prompt( 

1437 self, 

1438 db: Session, 

1439 prompt_id: Union[int, str], 

1440 arguments: Optional[Dict[str, str]] = None, 

1441 user: Optional[str] = None, 

1442 tenant_id: Optional[str] = None, 

1443 server_id: Optional[str] = None, 

1444 request_id: Optional[str] = None, 

1445 token_teams: Optional[List[str]] = None, 

1446 plugin_context_table: Optional[PluginContextTable] = None, 

1447 plugin_global_context: Optional[GlobalContext] = None, 

1448 _meta_data: Optional[Dict[str, Any]] = None, 

1449 ) -> PromptResult: 

1450 """Get a prompt template and optionally render it. 

1451 

1452 Args: 

1453 db: Database session 

1454 prompt_id: ID of the prompt to retrieve 

1455 arguments: Optional arguments for rendering 

1456 user: Optional user email for authorization checks 

1457 tenant_id: Optional tenant identifier for plugin context 

1458 server_id: Optional server ID for server scoping enforcement 

1459 request_id: Optional request ID, generated if not provided 

1460 token_teams: Optional list of team IDs from token for authorization. 

1461 None = unrestricted admin, [] = public-only, [...] = team-scoped. 

1462 plugin_context_table: Optional plugin context table from previous hooks for cross-hook state sharing. 

1463 plugin_global_context: Optional global context from middleware for consistency across hooks. 

1464 _meta_data: Optional metadata for prompt retrieval (not used currently). 

1465 

1466 Returns: 

1467 Prompt result with rendered messages 

1468 

1469 Raises: 

1470 PluginViolationError: If prompt violates a plugin policy 

1471 PromptNotFoundError: If prompt not found or access denied 

1472 PromptError: For other prompt errors 

1473 PluginError: If encounters issue with plugin 

1474 

1475 Examples: 

1476 >>> from mcpgateway.services.prompt_service import PromptService 

1477 >>> from unittest.mock import MagicMock 

1478 >>> service = PromptService() 

1479 >>> db = MagicMock() 

1480 >>> db.execute.return_value.scalar_one_or_none.return_value = MagicMock() 

1481 >>> import asyncio 

1482 >>> try: 

1483 ... asyncio.run(service.get_prompt(db, 'prompt_id')) 

1484 ... except Exception: 

1485 ... pass 

1486 """ 

1487 

1488 start_time = time.monotonic() 

1489 success = False 

1490 error_message = None 

1491 prompt = None 

1492 

1493 # Create database span for observability dashboard 

1494 trace_id = current_trace_id.get() 

1495 db_span_id = None 

1496 db_span_ended = False 

1497 observability_service = ObservabilityService() if trace_id else None 

1498 

1499 if trace_id and observability_service: 

1500 try: 

1501 db_span_id = observability_service.start_span( 

1502 db=db, 

1503 trace_id=trace_id, 

1504 name="prompt.render", 

1505 attributes={ 

1506 "prompt.id": str(prompt_id), 

1507 "arguments_count": len(arguments) if arguments else 0, 

1508 "user": user or "anonymous", 

1509 "server_id": server_id, 

1510 "tenant_id": tenant_id, 

1511 "request_id": request_id or "none", 

1512 }, 

1513 ) 

1514 logger.debug(f"✓ Created prompt.render span: {db_span_id} for prompt: {prompt_id}") 

1515 except Exception as e: 

1516 logger.warning(f"Failed to start observability span for prompt rendering: {e}") 

1517 db_span_id = None 

1518 

1519 # Create a trace span for OpenTelemetry export (Jaeger, Zipkin, etc.) 

1520 with create_span( 

1521 "prompt.render", 

1522 { 

1523 "prompt.id": prompt_id, 

1524 "arguments_count": len(arguments) if arguments else 0, 

1525 "user": user or "anonymous", 

1526 "server_id": server_id, 

1527 "tenant_id": tenant_id, 

1528 "request_id": request_id or "none", 

1529 }, 

1530 ) as span: 

1531 try: 

1532 # Check if any prompt hooks are registered to avoid unnecessary context creation 

1533 has_pre_fetch = self._plugin_manager and self._plugin_manager.has_hooks_for(PromptHookType.PROMPT_PRE_FETCH) 

1534 has_post_fetch = self._plugin_manager and self._plugin_manager.has_hooks_for(PromptHookType.PROMPT_POST_FETCH) 

1535 

1536 # Initialize plugin context variables only if hooks are registered 

1537 context_table = None 

1538 global_context = None 

1539 if has_pre_fetch or has_post_fetch: 

1540 context_table = plugin_context_table 

1541 if plugin_global_context: 

1542 global_context = plugin_global_context 

1543 # Update fields with prompt-specific information 

1544 if user: 1544 ↛ 1546line 1544 didn't jump to line 1546 because the condition on line 1544 was always true

1545 global_context.user = user 

1546 if server_id: 

1547 global_context.server_id = server_id 

1548 if tenant_id: 1548 ↛ 1556line 1548 didn't jump to line 1556 because the condition on line 1548 was always true

1549 global_context.tenant_id = tenant_id 

1550 else: 

1551 # Create new context (fallback when middleware didn't run) 

1552 if not request_id: 1552 ↛ 1554line 1552 didn't jump to line 1554 because the condition on line 1552 was always true

1553 request_id = uuid.uuid4().hex 

1554 global_context = GlobalContext(request_id=request_id, user=user, server_id=server_id, tenant_id=tenant_id) 

1555 

1556 if has_pre_fetch: 

1557 pre_result, context_table = await self._plugin_manager.invoke_hook( 

1558 PromptHookType.PROMPT_PRE_FETCH, 

1559 payload=PromptPrehookPayload(prompt_id=prompt_id, args=arguments), 

1560 global_context=global_context, 

1561 local_contexts=context_table, # Pass context from previous hooks 

1562 violations_as_exceptions=True, 

1563 ) 

1564 

1565 # Use modified payload if provided 

1566 if pre_result.modified_payload: 1566 ↛ 1571line 1566 didn't jump to line 1571 because the condition on line 1566 was always true

1567 payload = pre_result.modified_payload 

1568 arguments = payload.args 

1569 

1570 # Find prompt by ID first, then by name (active prompts only) 

1571 search_key = str(prompt_id) 

1572 prompt = db.execute(select(DbPrompt).where(DbPrompt.id == prompt_id).where(DbPrompt.enabled)).scalar_one_or_none() 

1573 if not prompt: 

1574 prompt = db.execute(select(DbPrompt).where(DbPrompt.name == prompt_id).where(DbPrompt.enabled)).scalar_one_or_none() 

1575 

1576 if not prompt: 

1577 # Check if an inactive prompt exists 

1578 inactive_prompt = db.execute(select(DbPrompt).where(DbPrompt.id == prompt_id).where(not_(DbPrompt.enabled))).scalar_one_or_none() 

1579 if not inactive_prompt: 

1580 inactive_prompt = db.execute(select(DbPrompt).where(DbPrompt.name == prompt_id).where(not_(DbPrompt.enabled))).scalar_one_or_none() 

1581 

1582 if inactive_prompt: 

1583 raise PromptNotFoundError(f"Prompt '{search_key}' exists but is inactive") 

1584 

1585 raise PromptNotFoundError(f"Prompt not found: {search_key}") 

1586 

1587 # ═══════════════════════════════════════════════════════════════════════════ 

1588 # SECURITY: Check prompt access based on visibility and team membership 

1589 # ═══════════════════════════════════════════════════════════════════════════ 

1590 if not await self._check_prompt_access(db, prompt, user, token_teams): 

1591 # Don't reveal prompt existence - return generic "not found" 

1592 raise PromptNotFoundError(f"Prompt not found: {search_key}") 

1593 

1594 # ═══════════════════════════════════════════════════════════════════════════ 

1595 # SECURITY: Enforce server scoping if server_id is provided 

1596 # Prompt must be attached to the specified virtual server 

1597 # ═══════════════════════════════════════════════════════════════════════════ 

1598 if server_id: 

1599 server_match = db.execute( 

1600 select(server_prompt_association.c.prompt_id).where( 

1601 server_prompt_association.c.server_id == server_id, 

1602 server_prompt_association.c.prompt_id == prompt.id, 

1603 ) 

1604 ).first() 

1605 if not server_match: 

1606 raise PromptNotFoundError(f"Prompt not found: {search_key}") 

1607 

1608 if not arguments: 

1609 result = PromptResult( 

1610 messages=[ 

1611 Message( 

1612 role=Role.USER, 

1613 content=TextContent(type="text", text=prompt.template), 

1614 ) 

1615 ], 

1616 description=prompt.description, 

1617 ) 

1618 else: 

1619 try: 

1620 prompt.validate_arguments(arguments) 

1621 rendered = self._render_template(prompt.template, arguments) 

1622 messages = self._parse_messages(rendered) 

1623 result = PromptResult(messages=messages, description=prompt.description) 

1624 except Exception as e: 

1625 if span: 

1626 span.set_attribute("error", True) 

1627 span.set_attribute("error.message", str(e)) 

1628 raise PromptError(f"Failed to process prompt: {str(e)}") 

1629 

1630 if has_post_fetch: 

1631 post_result, _ = await self._plugin_manager.invoke_hook( 

1632 PromptHookType.PROMPT_POST_FETCH, 

1633 payload=PromptPosthookPayload(prompt_id=str(prompt.id), result=result), 

1634 global_context=global_context, 

1635 local_contexts=context_table, 

1636 violations_as_exceptions=True, 

1637 ) 

1638 # Use modified payload if provided 

1639 result = post_result.modified_payload.result if post_result.modified_payload else result 

1640 

1641 arguments_supplied = bool(arguments) 

1642 

1643 audit_trail.log_action( 

1644 user_id=user or "anonymous", 

1645 action="view_prompt", 

1646 resource_type="prompt", 

1647 resource_id=str(prompt.id), 

1648 resource_name=prompt.name, 

1649 team_id=prompt.team_id, 

1650 context={ 

1651 "tenant_id": tenant_id, 

1652 "server_id": server_id, 

1653 "arguments_provided": arguments_supplied, 

1654 "request_id": request_id, 

1655 }, 

1656 db=db, 

1657 ) 

1658 

1659 structured_logger.log( 

1660 level="INFO", 

1661 message="Prompt retrieved successfully", 

1662 event_type="prompt_viewed", 

1663 component="prompt_service", 

1664 user_id=user, 

1665 team_id=prompt.team_id, 

1666 resource_type="prompt", 

1667 resource_id=str(prompt.id), 

1668 request_id=request_id, 

1669 custom_fields={ 

1670 "prompt_name": prompt.name, 

1671 "arguments_provided": arguments_supplied, 

1672 "tenant_id": tenant_id, 

1673 "server_id": server_id, 

1674 }, 

1675 db=db, 

1676 ) 

1677 

1678 # Set success attributes on span 

1679 if span: 

1680 span.set_attribute("success", True) 

1681 span.set_attribute("duration.ms", (time.monotonic() - start_time) * 1000) 

1682 if result and hasattr(result, "messages"): 1682 ↛ 1685line 1682 didn't jump to line 1685 because the condition on line 1682 was always true

1683 span.set_attribute("messages.count", len(result.messages)) 

1684 

1685 success = True 

1686 logger.info(f"Retrieved prompt: {prompt.id} successfully") 

1687 return result 

1688 

1689 except Exception as e: 

1690 success = False 

1691 error_message = str(e) 

1692 raise 

1693 finally: 

1694 # Record metrics only if we found a prompt 

1695 if prompt: 

1696 try: 

1697 # First-Party 

1698 from mcpgateway.services.metrics_buffer_service import get_metrics_buffer_service # pylint: disable=import-outside-toplevel 

1699 

1700 metrics_buffer = get_metrics_buffer_service() 

1701 metrics_buffer.record_prompt_metric( 

1702 prompt_id=prompt.id, 

1703 start_time=start_time, 

1704 success=success, 

1705 error_message=error_message, 

1706 ) 

1707 except Exception as metrics_error: 

1708 logger.warning(f"Failed to record prompt metric: {metrics_error}") 

1709 

1710 # End database span for observability dashboard 

1711 if db_span_id and observability_service and not db_span_ended: 

1712 try: 

1713 observability_service.end_span( 

1714 db=db, 

1715 span_id=db_span_id, 

1716 status="ok" if success else "error", 

1717 status_message=error_message if error_message else None, 

1718 ) 

1719 db_span_ended = True 

1720 logger.debug(f"✓ Ended prompt.render span: {db_span_id}") 

1721 except Exception as e: 

1722 logger.warning(f"Failed to end observability span for prompt rendering: {e}") 

1723 

1724 async def update_prompt( 

1725 self, 

1726 db: Session, 

1727 prompt_id: Union[int, str], 

1728 prompt_update: PromptUpdate, 

1729 modified_by: Optional[str] = None, 

1730 modified_from_ip: Optional[str] = None, 

1731 modified_via: Optional[str] = None, 

1732 modified_user_agent: Optional[str] = None, 

1733 user_email: Optional[str] = None, 

1734 ) -> PromptRead: 

1735 """ 

1736 Update a prompt template. 

1737 

1738 Args: 

1739 db: Database session 

1740 prompt_id: ID of prompt to update 

1741 prompt_update: Prompt update object 

1742 modified_by: Username of the person modifying the prompt 

1743 modified_from_ip: IP address where the modification originated 

1744 modified_via: Source of modification (ui/api/import) 

1745 modified_user_agent: User agent string from the modification request 

1746 user_email: Email of user performing update (for ownership check) 

1747 

1748 Returns: 

1749 The updated PromptRead object 

1750 

1751 Raises: 

1752 PromptNotFoundError: If the prompt is not found 

1753 PermissionError: If user doesn't own the prompt 

1754 IntegrityError: If a database integrity error occurs. 

1755 PromptNameConflictError: If a prompt with the same name already exists. 

1756 PromptError: For other update errors 

1757 

1758 Examples: 

1759 >>> from mcpgateway.services.prompt_service import PromptService 

1760 >>> from unittest.mock import MagicMock 

1761 >>> service = PromptService() 

1762 >>> db = MagicMock() 

1763 >>> db.execute.return_value.scalar_one_or_none.return_value = MagicMock() 

1764 >>> db.commit = MagicMock() 

1765 >>> db.refresh = MagicMock() 

1766 >>> service._notify_prompt_updated = MagicMock() 

1767 >>> service.convert_prompt_to_read = MagicMock(return_value={}) 

1768 >>> import asyncio 

1769 >>> try: 

1770 ... asyncio.run(service.update_prompt(db, 'prompt_name', MagicMock())) 

1771 ... except Exception: 

1772 ... pass 

1773 """ 

1774 try: 

1775 # Acquire a row-level lock for the prompt being updated to make 

1776 # name-checks and the subsequent update atomic in PostgreSQL. 

1777 # For SQLite `get_for_update` falls back to a regular get. 

1778 prompt = get_for_update(db, DbPrompt, prompt_id) 

1779 if not prompt: 

1780 raise PromptNotFoundError(f"Prompt not found: {prompt_id}") 

1781 

1782 visibility = prompt_update.visibility or prompt.visibility 

1783 team_id = prompt_update.team_id or prompt.team_id 

1784 owner_email = prompt_update.owner_email or prompt.owner_email or user_email 

1785 

1786 candidate_custom_name = prompt.custom_name 

1787 

1788 if prompt_update.name is not None: 

1789 candidate_custom_name = prompt_update.custom_name or prompt_update.name 

1790 elif prompt_update.custom_name is not None: 

1791 candidate_custom_name = prompt_update.custom_name 

1792 

1793 computed_name = self._compute_prompt_name(candidate_custom_name, prompt.gateway) 

1794 if computed_name != prompt.name: 

1795 if visibility.lower() == "public": 

1796 # Lock any conflicting row so concurrent updates cannot race. 

1797 existing_prompt = get_for_update(db, DbPrompt, where=and_(DbPrompt.name == computed_name, DbPrompt.visibility == "public", DbPrompt.id != prompt.id)) 

1798 if existing_prompt: 

1799 raise PromptNameConflictError(computed_name, enabled=existing_prompt.enabled, prompt_id=existing_prompt.id, visibility=existing_prompt.visibility) 

1800 elif visibility.lower() == "team" and team_id: 

1801 existing_prompt = get_for_update(db, DbPrompt, where=and_(DbPrompt.name == computed_name, DbPrompt.visibility == "team", DbPrompt.team_id == team_id, DbPrompt.id != prompt.id)) 

1802 logger.info(f"Existing prompt check result: {existing_prompt}") 

1803 if existing_prompt: 1803 ↛ 1813line 1803 didn't jump to line 1813 because the condition on line 1803 was always true

1804 raise PromptNameConflictError(computed_name, enabled=existing_prompt.enabled, prompt_id=existing_prompt.id, visibility=existing_prompt.visibility) 

1805 elif visibility.lower() == "private": 1805 ↛ 1813line 1805 didn't jump to line 1813 because the condition on line 1805 was always true

1806 existing_prompt = get_for_update( 

1807 db, DbPrompt, where=and_(DbPrompt.name == computed_name, DbPrompt.visibility == "private", DbPrompt.owner_email == owner_email, DbPrompt.id != prompt.id) 

1808 ) 

1809 if existing_prompt: 1809 ↛ 1813line 1809 didn't jump to line 1813 because the condition on line 1809 was always true

1810 raise PromptNameConflictError(computed_name, enabled=existing_prompt.enabled, prompt_id=existing_prompt.id, visibility=existing_prompt.visibility) 

1811 

1812 # Check ownership if user_email provided 

1813 if user_email: 

1814 # First-Party 

1815 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel 

1816 

1817 permission_service = PermissionService(db) 

1818 if not await permission_service.check_resource_ownership(user_email, prompt): 

1819 raise PermissionError("Only the owner can update this prompt") 

1820 

1821 if prompt_update.name is not None: 

1822 if prompt.gateway_id: 

1823 prompt.custom_name = prompt_update.custom_name or prompt_update.name 

1824 else: 

1825 prompt.original_name = prompt_update.name 

1826 if prompt_update.custom_name is None: 1826 ↛ 1828line 1826 didn't jump to line 1828 because the condition on line 1826 was always true

1827 prompt.custom_name = prompt_update.name 

1828 if prompt_update.custom_name is not None: 

1829 prompt.custom_name = prompt_update.custom_name 

1830 if prompt_update.display_name is not None: 

1831 prompt.display_name = prompt_update.display_name 

1832 if prompt_update.description is not None: 

1833 prompt.description = prompt_update.description 

1834 if prompt_update.template is not None: 

1835 prompt.template = prompt_update.template 

1836 self._validate_template(prompt.template) 

1837 # Clear template cache to reduce memory growth 

1838 _compile_jinja_template.cache_clear() 

1839 if prompt_update.arguments is not None: 

1840 required_args = self._get_required_arguments(prompt.template) 

1841 argument_schema = { 

1842 "type": "object", 

1843 "properties": {}, 

1844 "required": list(required_args), 

1845 } 

1846 for arg in prompt_update.arguments: 

1847 schema = {"type": "string"} 

1848 if arg.description is not None: 

1849 schema["description"] = arg.description 

1850 argument_schema["properties"][arg.name] = schema 

1851 prompt.argument_schema = argument_schema 

1852 

1853 if prompt_update.visibility is not None: 

1854 prompt.visibility = prompt_update.visibility 

1855 

1856 # Update tags if provided 

1857 if prompt_update.tags is not None: 

1858 prompt.tags = prompt_update.tags 

1859 

1860 # Update metadata fields 

1861 prompt.updated_at = datetime.now(timezone.utc) 

1862 if modified_by: 

1863 prompt.modified_by = modified_by 

1864 if modified_from_ip: 

1865 prompt.modified_from_ip = modified_from_ip 

1866 if modified_via: 

1867 prompt.modified_via = modified_via 

1868 if modified_user_agent: 

1869 prompt.modified_user_agent = modified_user_agent 

1870 if hasattr(prompt, "version") and prompt.version is not None: 

1871 prompt.version = prompt.version + 1 

1872 else: 

1873 prompt.version = 1 

1874 

1875 db.commit() 

1876 db.refresh(prompt) 

1877 

1878 await self._notify_prompt_updated(prompt) 

1879 

1880 # Structured logging: Audit trail for prompt update 

1881 audit_trail.log_action( 

1882 user_id=user_email or modified_by or "system", 

1883 action="update_prompt", 

1884 resource_type="prompt", 

1885 resource_id=str(prompt.id), 

1886 resource_name=prompt.name, 

1887 user_email=user_email, 

1888 team_id=prompt.team_id, 

1889 client_ip=modified_from_ip, 

1890 user_agent=modified_user_agent, 

1891 new_values={"name": prompt.name, "version": prompt.version}, 

1892 context={"modified_via": modified_via}, 

1893 db=db, 

1894 ) 

1895 

1896 structured_logger.log( 

1897 level="INFO", 

1898 message="Prompt updated successfully", 

1899 event_type="prompt_updated", 

1900 component="prompt_service", 

1901 user_id=modified_by, 

1902 user_email=user_email, 

1903 team_id=prompt.team_id, 

1904 resource_type="prompt", 

1905 resource_id=str(prompt.id), 

1906 custom_fields={"prompt_name": prompt.name, "version": prompt.version}, 

1907 db=db, 

1908 ) 

1909 

1910 prompt.team = self._get_team_name(db, prompt.team_id) 

1911 

1912 # Invalidate cache after successful update 

1913 cache = _get_registry_cache() 

1914 await cache.invalidate_prompts() 

1915 # Also invalidate tags cache since prompt tags may have changed 

1916 # First-Party 

1917 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel 

1918 

1919 await admin_stats_cache.invalidate_tags() 

1920 

1921 return self.convert_prompt_to_read(prompt) 

1922 

1923 except PermissionError as pe: 

1924 db.rollback() 

1925 

1926 structured_logger.log( 

1927 level="WARNING", 

1928 message="Prompt update failed due to permission error", 

1929 event_type="prompt_update_permission_denied", 

1930 component="prompt_service", 

1931 user_email=user_email, 

1932 resource_type="prompt", 

1933 resource_id=str(prompt_id), 

1934 error=pe, 

1935 db=db, 

1936 ) 

1937 raise 

1938 except IntegrityError as ie: 

1939 db.rollback() 

1940 logger.error(f"IntegrityErrors in group: {ie}") 

1941 

1942 structured_logger.log( 

1943 level="ERROR", 

1944 message="Prompt update failed due to database integrity error", 

1945 event_type="prompt_update_failed", 

1946 component="prompt_service", 

1947 user_email=user_email, 

1948 resource_type="prompt", 

1949 resource_id=str(prompt_id), 

1950 error=ie, 

1951 db=db, 

1952 ) 

1953 raise ie 

1954 except PromptNotFoundError as e: 

1955 db.rollback() 

1956 logger.error(f"Prompt not found: {e}") 

1957 

1958 structured_logger.log( 

1959 level="ERROR", 

1960 message="Prompt update failed - prompt not found", 

1961 event_type="prompt_not_found", 

1962 component="prompt_service", 

1963 user_email=user_email, 

1964 resource_type="prompt", 

1965 resource_id=str(prompt_id), 

1966 error=e, 

1967 db=db, 

1968 ) 

1969 raise e 

1970 except PromptNameConflictError as pnce: 

1971 db.rollback() 

1972 logger.error(f"Prompt name conflict: {pnce}") 

1973 

1974 structured_logger.log( 

1975 level="WARNING", 

1976 message="Prompt update failed due to name conflict", 

1977 event_type="prompt_name_conflict", 

1978 component="prompt_service", 

1979 user_email=user_email, 

1980 resource_type="prompt", 

1981 resource_id=str(prompt_id), 

1982 error=pnce, 

1983 db=db, 

1984 ) 

1985 raise pnce 

1986 except Exception as e: 

1987 db.rollback() 

1988 

1989 structured_logger.log( 

1990 level="ERROR", 

1991 message="Prompt update failed", 

1992 event_type="prompt_update_failed", 

1993 component="prompt_service", 

1994 user_email=user_email, 

1995 resource_type="prompt", 

1996 resource_id=str(prompt_id), 

1997 error=e, 

1998 db=db, 

1999 ) 

2000 raise PromptError(f"Failed to update prompt: {str(e)}") 

2001 

2002 async def set_prompt_state(self, db: Session, prompt_id: int, activate: bool, user_email: Optional[str] = None, skip_cache_invalidation: bool = False) -> PromptRead: 

2003 """ 

2004 Set the activation status of a prompt. 

2005 

2006 Args: 

2007 db: Database session 

2008 prompt_id: Prompt ID 

2009 activate: True to activate, False to deactivate 

2010 user_email: Optional[str] The email of the user to check if the user has permission to modify. 

2011 skip_cache_invalidation: If True, skip cache invalidation (used for batch operations). 

2012 

2013 Returns: 

2014 The updated PromptRead object 

2015 

2016 Raises: 

2017 PromptNotFoundError: If the prompt is not found. 

2018 PromptLockConflictError: If the prompt is locked by another transaction. 

2019 PromptError: For other errors. 

2020 PermissionError: If user doesn't own the prompt. 

2021 

2022 Examples: 

2023 >>> from mcpgateway.services.prompt_service import PromptService 

2024 >>> from unittest.mock import MagicMock 

2025 >>> service = PromptService() 

2026 >>> db = MagicMock() 

2027 >>> prompt = MagicMock() 

2028 >>> db.get.return_value = prompt 

2029 >>> db.commit = MagicMock() 

2030 >>> db.refresh = MagicMock() 

2031 >>> service._notify_prompt_activated = MagicMock() 

2032 >>> service._notify_prompt_deactivated = MagicMock() 

2033 >>> service.convert_prompt_to_read = MagicMock(return_value={}) 

2034 >>> import asyncio 

2035 >>> try: 

2036 ... asyncio.run(service.set_prompt_state(db, 1, True)) 

2037 ... except Exception: 

2038 ... pass 

2039 """ 

2040 try: 

2041 # Use nowait=True to fail fast if row is locked, preventing lock contention under high load 

2042 try: 

2043 prompt = get_for_update(db, DbPrompt, prompt_id, nowait=True) 

2044 except OperationalError as lock_err: 

2045 # Row is locked by another transaction - fail fast with 409 

2046 db.rollback() 

2047 raise PromptLockConflictError(f"Prompt {prompt_id} is currently being modified by another request") from lock_err 

2048 if not prompt: 

2049 raise PromptNotFoundError(f"Prompt not found: {prompt_id}") 

2050 

2051 if user_email: 

2052 # First-Party 

2053 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel 

2054 

2055 permission_service = PermissionService(db) 

2056 if not await permission_service.check_resource_ownership(user_email, prompt): 

2057 raise PermissionError("Only the owner can activate the Prompt" if activate else "Only the owner can deactivate the Prompt") 

2058 

2059 if prompt.enabled != activate: 

2060 prompt.enabled = activate 

2061 prompt.updated_at = datetime.now(timezone.utc) 

2062 db.commit() 

2063 db.refresh(prompt) 

2064 

2065 # Invalidate cache after status change (skip for batch operations) 

2066 if not skip_cache_invalidation: 

2067 cache = _get_registry_cache() 

2068 await cache.invalidate_prompts() 

2069 

2070 if activate: 

2071 await self._notify_prompt_activated(prompt) 

2072 else: 

2073 await self._notify_prompt_deactivated(prompt) 

2074 logger.info(f"Prompt {prompt.name} {'activated' if activate else 'deactivated'}") 

2075 

2076 # Structured logging: Audit trail for prompt state change 

2077 audit_trail.log_action( 

2078 user_id=user_email or "system", 

2079 action="set_prompt_state", 

2080 resource_type="prompt", 

2081 resource_id=str(prompt.id), 

2082 resource_name=prompt.name, 

2083 user_email=user_email, 

2084 team_id=prompt.team_id, 

2085 new_values={"enabled": prompt.enabled}, 

2086 context={"action": "activate" if activate else "deactivate"}, 

2087 db=db, 

2088 ) 

2089 

2090 structured_logger.log( 

2091 level="INFO", 

2092 message=f"Prompt {'activated' if activate else 'deactivated'} successfully", 

2093 event_type="prompt_state_changed", 

2094 component="prompt_service", 

2095 user_email=user_email, 

2096 team_id=prompt.team_id, 

2097 resource_type="prompt", 

2098 resource_id=str(prompt.id), 

2099 custom_fields={"prompt_name": prompt.name, "enabled": prompt.enabled}, 

2100 db=db, 

2101 ) 

2102 

2103 prompt.team = self._get_team_name(db, prompt.team_id) 

2104 return self.convert_prompt_to_read(prompt) 

2105 except PermissionError as e: 

2106 structured_logger.log( 

2107 level="WARNING", 

2108 message="Prompt state change failed due to permission error", 

2109 event_type="prompt_state_change_permission_denied", 

2110 component="prompt_service", 

2111 user_email=user_email, 

2112 resource_type="prompt", 

2113 resource_id=str(prompt_id), 

2114 error=e, 

2115 db=db, 

2116 ) 

2117 raise e 

2118 except PromptLockConflictError: 

2119 # Re-raise lock conflicts without wrapping - allows 409 response 

2120 raise 

2121 except PromptNotFoundError: 

2122 # Re-raise not found without wrapping - allows 404 response 

2123 raise 

2124 except Exception as e: 

2125 db.rollback() 

2126 

2127 structured_logger.log( 

2128 level="ERROR", 

2129 message="Prompt state change failed", 

2130 event_type="prompt_state_change_failed", 

2131 component="prompt_service", 

2132 user_email=user_email, 

2133 resource_type="prompt", 

2134 resource_id=str(prompt_id), 

2135 error=e, 

2136 db=db, 

2137 ) 

2138 raise PromptError(f"Failed to set prompt state: {str(e)}") 

2139 

2140 # Get prompt details for admin ui 

2141 

2142 async def get_prompt_details(self, db: Session, prompt_id: Union[int, str], include_inactive: bool = False) -> Dict[str, Any]: # pylint: disable=unused-argument 

2143 """ 

2144 Get prompt details by ID. 

2145 

2146 Args: 

2147 db: Database session 

2148 prompt_id: ID of prompt 

2149 include_inactive: Whether to include inactive prompts 

2150 

2151 Returns: 

2152 Dictionary of prompt details 

2153 

2154 Raises: 

2155 PromptNotFoundError: If the prompt is not found 

2156 

2157 Examples: 

2158 >>> from mcpgateway.services.prompt_service import PromptService 

2159 >>> from unittest.mock import MagicMock 

2160 >>> service = PromptService() 

2161 >>> db = MagicMock() 

2162 >>> prompt_dict = {'id': '1', 'name': 'test', 'description': 'desc', 'template': 'tpl', 'arguments': [], 'createdAt': '2023-01-01T00:00:00', 'updatedAt': '2023-01-01T00:00:00', 'isActive': True, 'metrics': {}} 

2163 >>> service.convert_prompt_to_read = MagicMock(return_value=prompt_dict) 

2164 >>> db.execute.return_value.scalar_one_or_none.return_value = MagicMock() 

2165 >>> import asyncio 

2166 >>> result = asyncio.run(service.get_prompt_details(db, 'prompt_name')) 

2167 >>> result == prompt_dict 

2168 True 

2169 """ 

2170 prompt = db.get(DbPrompt, prompt_id) 

2171 if not prompt: 

2172 raise PromptNotFoundError(f"Prompt not found: {prompt_id}") 

2173 # Return the fully converted prompt including metrics 

2174 prompt.team = self._get_team_name(db, prompt.team_id) 

2175 prompt_data = self.convert_prompt_to_read(prompt) 

2176 

2177 audit_trail.log_action( 

2178 user_id="system", 

2179 action="view_prompt_details", 

2180 resource_type="prompt", 

2181 resource_id=str(prompt.id), 

2182 resource_name=prompt.name, 

2183 team_id=prompt.team_id, 

2184 context={"include_inactive": include_inactive}, 

2185 db=db, 

2186 ) 

2187 

2188 structured_logger.log( 

2189 level="INFO", 

2190 message="Prompt details retrieved", 

2191 event_type="prompt_details_viewed", 

2192 component="prompt_service", 

2193 resource_type="prompt", 

2194 resource_id=str(prompt.id), 

2195 team_id=prompt.team_id, 

2196 custom_fields={ 

2197 "prompt_name": prompt.name, 

2198 "include_inactive": include_inactive, 

2199 }, 

2200 db=db, 

2201 ) 

2202 

2203 return prompt_data 

2204 

2205 async def delete_prompt(self, db: Session, prompt_id: Union[int, str], user_email: Optional[str] = None, purge_metrics: bool = False) -> None: 

2206 """ 

2207 Delete a prompt template by its ID. 

2208 

2209 Args: 

2210 db (Session): Database session. 

2211 prompt_id (str): ID of the prompt to delete. 

2212 user_email (Optional[str]): Email of user performing delete (for ownership check). 

2213 purge_metrics (bool): If True, delete raw + rollup metrics for this prompt. 

2214 

2215 Raises: 

2216 PromptNotFoundError: If the prompt is not found. 

2217 PermissionError: If user doesn't own the prompt. 

2218 PromptError: For other deletion errors. 

2219 Exception: For unexpected errors. 

2220 

2221 Examples: 

2222 >>> from mcpgateway.services.prompt_service import PromptService 

2223 >>> from unittest.mock import MagicMock 

2224 >>> service = PromptService() 

2225 >>> db = MagicMock() 

2226 >>> prompt = MagicMock() 

2227 >>> db.get.return_value = prompt 

2228 >>> db.delete = MagicMock() 

2229 >>> db.commit = MagicMock() 

2230 >>> service._notify_prompt_deleted = MagicMock() 

2231 >>> import asyncio 

2232 >>> try: 

2233 ... asyncio.run(service.delete_prompt(db, '123')) 

2234 ... except Exception: 

2235 ... pass 

2236 """ 

2237 try: 

2238 prompt = db.get(DbPrompt, prompt_id) 

2239 if not prompt: 

2240 raise PromptNotFoundError(f"Prompt not found: {prompt_id}") 

2241 

2242 # Check ownership if user_email provided 

2243 if user_email: 

2244 # First-Party 

2245 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel 

2246 

2247 permission_service = PermissionService(db) 

2248 if not await permission_service.check_resource_ownership(user_email, prompt): 

2249 raise PermissionError("Only the owner can delete this prompt") 

2250 

2251 prompt_info = {"id": prompt.id, "name": prompt.name} 

2252 prompt_name = prompt.name 

2253 prompt_team_id = prompt.team_id 

2254 

2255 if purge_metrics: 

2256 with pause_rollup_during_purge(reason=f"purge_prompt:{prompt_id}"): 

2257 delete_metrics_in_batches(db, PromptMetric, PromptMetric.prompt_id, prompt_id) 

2258 delete_metrics_in_batches(db, PromptMetricsHourly, PromptMetricsHourly.prompt_id, prompt_id) 

2259 

2260 db.delete(prompt) 

2261 db.commit() 

2262 await self._notify_prompt_deleted(prompt_info) 

2263 logger.info(f"Deleted prompt: {prompt_info['name']}") 

2264 

2265 # Structured logging: Audit trail for prompt deletion 

2266 audit_trail.log_action( 

2267 user_id=user_email or "system", 

2268 action="delete_prompt", 

2269 resource_type="prompt", 

2270 resource_id=str(prompt_info["id"]), 

2271 resource_name=prompt_name, 

2272 user_email=user_email, 

2273 team_id=prompt_team_id, 

2274 old_values={"name": prompt_name}, 

2275 db=db, 

2276 ) 

2277 

2278 # Structured logging: Log successful prompt deletion 

2279 structured_logger.log( 

2280 level="INFO", 

2281 message="Prompt deleted successfully", 

2282 event_type="prompt_deleted", 

2283 component="prompt_service", 

2284 user_email=user_email, 

2285 team_id=prompt_team_id, 

2286 resource_type="prompt", 

2287 resource_id=str(prompt_info["id"]), 

2288 custom_fields={ 

2289 "prompt_name": prompt_name, 

2290 "purge_metrics": purge_metrics, 

2291 }, 

2292 db=db, 

2293 ) 

2294 

2295 # Invalidate cache after successful deletion 

2296 cache = _get_registry_cache() 

2297 await cache.invalidate_prompts() 

2298 # Also invalidate tags cache since prompt tags may have changed 

2299 # First-Party 

2300 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel 

2301 

2302 await admin_stats_cache.invalidate_tags() 

2303 except PermissionError as pe: 

2304 db.rollback() 

2305 

2306 # Structured logging: Log permission error 

2307 structured_logger.log( 

2308 level="WARNING", 

2309 message="Prompt deletion failed due to permission error", 

2310 event_type="prompt_delete_permission_denied", 

2311 component="prompt_service", 

2312 user_email=user_email, 

2313 resource_type="prompt", 

2314 resource_id=str(prompt_id), 

2315 error=pe, 

2316 db=db, 

2317 ) 

2318 raise 

2319 except Exception as e: 

2320 db.rollback() 

2321 if isinstance(e, PromptNotFoundError): 

2322 # Structured logging: Log not found error 

2323 structured_logger.log( 

2324 level="ERROR", 

2325 message="Prompt deletion failed - prompt not found", 

2326 event_type="prompt_not_found", 

2327 component="prompt_service", 

2328 user_email=user_email, 

2329 resource_type="prompt", 

2330 resource_id=str(prompt_id), 

2331 error=e, 

2332 db=db, 

2333 ) 

2334 raise e 

2335 

2336 # Structured logging: Log generic prompt deletion failure 

2337 structured_logger.log( 

2338 level="ERROR", 

2339 message="Prompt deletion failed", 

2340 event_type="prompt_deletion_failed", 

2341 component="prompt_service", 

2342 user_email=user_email, 

2343 resource_type="prompt", 

2344 resource_id=str(prompt_id), 

2345 error=e, 

2346 db=db, 

2347 ) 

2348 raise PromptError(f"Failed to delete prompt: {str(e)}") 

2349 

2350 async def subscribe_events(self) -> AsyncGenerator[Dict[str, Any], None]: 

2351 """Subscribe to Prompt events via the EventService. 

2352 

2353 Yields: 

2354 Prompt event messages. 

2355 """ 

2356 async for event in self._event_service.subscribe_events(): 

2357 yield event 

2358 

2359 def _validate_template(self, template: str) -> None: 

2360 """Validate template syntax. 

2361 

2362 Args: 

2363 template: Template to validate 

2364 

2365 Raises: 

2366 PromptValidationError: If template is invalid 

2367 

2368 Examples: 

2369 >>> from mcpgateway.services.prompt_service import PromptService 

2370 >>> service = PromptService() 

2371 >>> service._validate_template("Hello {{ name }}") # Valid template 

2372 >>> try: 

2373 ... service._validate_template("Hello {{ invalid") # Invalid template 

2374 ... except Exception as e: 

2375 ... "Invalid template syntax" in str(e) 

2376 True 

2377 """ 

2378 try: 

2379 self._jinja_env.parse(template) 

2380 except Exception as e: 

2381 raise PromptValidationError(f"Invalid template syntax: {str(e)}") 

2382 

2383 def _get_required_arguments(self, template: str) -> Set[str]: 

2384 """Extract required arguments from template. 

2385 

2386 Args: 

2387 template: Template to analyze 

2388 

2389 Returns: 

2390 Set of required argument names 

2391 

2392 Examples: 

2393 >>> from mcpgateway.services.prompt_service import PromptService 

2394 >>> service = PromptService() 

2395 >>> args = service._get_required_arguments("Hello {{ name }} from {{ place }}") 

2396 >>> sorted(args) 

2397 ['name', 'place'] 

2398 >>> service._get_required_arguments("No variables") == set() 

2399 True 

2400 """ 

2401 ast = self._jinja_env.parse(template) 

2402 variables = meta.find_undeclared_variables(ast) 

2403 formatter = Formatter() 

2404 format_vars = {field_name for _, field_name, _, _ in formatter.parse(template) if field_name is not None} 

2405 return variables.union(format_vars) 

2406 

2407 def _render_template(self, template: str, arguments: Dict[str, str]) -> str: 

2408 """Render template with arguments using cached compiled templates. 

2409 

2410 Args: 

2411 template: Template to render 

2412 arguments: Arguments for rendering 

2413 

2414 Returns: 

2415 Rendered template text 

2416 

2417 Raises: 

2418 PromptError: If rendering fails 

2419 

2420 Examples: 

2421 >>> from mcpgateway.services.prompt_service import PromptService 

2422 >>> service = PromptService() 

2423 >>> result = service._render_template("Hello {{ name }}", {"name": "World"}) 

2424 >>> result 

2425 'Hello World' 

2426 >>> service._render_template("No variables", {}) 

2427 'No variables' 

2428 """ 

2429 try: 

2430 jinja_template = _compile_jinja_template(template) 

2431 return jinja_template.render(**arguments) 

2432 except Exception: 

2433 try: 

2434 return template.format(**arguments) 

2435 except Exception as e: 

2436 raise PromptError(f"Failed to render template: {str(e)}") 

2437 

2438 def _parse_messages(self, text: str) -> List[Message]: 

2439 """Parse rendered text into messages. 

2440 

2441 Args: 

2442 text: Text to parse 

2443 

2444 Returns: 

2445 List of parsed messages 

2446 

2447 Examples: 

2448 >>> from mcpgateway.services.prompt_service import PromptService 

2449 >>> service = PromptService() 

2450 >>> messages = service._parse_messages("Simple text") 

2451 >>> len(messages) 

2452 1 

2453 >>> messages[0].role.value 

2454 'user' 

2455 >>> messages = service._parse_messages("# User:\\nHello\\n# Assistant:\\nHi there") 

2456 >>> len(messages) 

2457 2 

2458 """ 

2459 messages = [] 

2460 current_role = Role.USER 

2461 current_text = [] 

2462 for line in text.split("\n"): 

2463 if line.startswith("# Assistant:"): 

2464 if current_text: 

2465 messages.append( 

2466 Message( 

2467 role=current_role, 

2468 content=TextContent(type="text", text="\n".join(current_text).strip()), 

2469 ) 

2470 ) 

2471 current_role = Role.ASSISTANT 

2472 current_text = [] 

2473 elif line.startswith("# User:"): 

2474 if current_text: 

2475 messages.append( 

2476 Message( 

2477 role=current_role, 

2478 content=TextContent(type="text", text="\n".join(current_text).strip()), 

2479 ) 

2480 ) 

2481 current_role = Role.USER 

2482 current_text = [] 

2483 else: 

2484 current_text.append(line) 

2485 if current_text: 

2486 messages.append( 

2487 Message( 

2488 role=current_role, 

2489 content=TextContent(type="text", text="\n".join(current_text).strip()), 

2490 ) 

2491 ) 

2492 return messages 

2493 

2494 async def _notify_prompt_added(self, prompt: DbPrompt) -> None: 

2495 """ 

2496 Notify subscribers of prompt addition. 

2497 

2498 Args: 

2499 prompt: Prompt to add 

2500 """ 

2501 event = { 

2502 "type": "prompt_added", 

2503 "data": { 

2504 "id": prompt.id, 

2505 "name": prompt.name, 

2506 "description": prompt.description, 

2507 "enabled": prompt.enabled, 

2508 }, 

2509 "timestamp": datetime.now(timezone.utc).isoformat(), 

2510 } 

2511 await self._publish_event(event) 

2512 

2513 async def _notify_prompt_updated(self, prompt: DbPrompt) -> None: 

2514 """ 

2515 Notify subscribers of prompt update. 

2516 

2517 Args: 

2518 prompt: Prompt to update 

2519 """ 

2520 event = { 

2521 "type": "prompt_updated", 

2522 "data": { 

2523 "id": prompt.id, 

2524 "name": prompt.name, 

2525 "description": prompt.description, 

2526 "enabled": prompt.enabled, 

2527 }, 

2528 "timestamp": datetime.now(timezone.utc).isoformat(), 

2529 } 

2530 await self._publish_event(event) 

2531 

2532 async def _notify_prompt_activated(self, prompt: DbPrompt) -> None: 

2533 """ 

2534 Notify subscribers of prompt activation. 

2535 

2536 Args: 

2537 prompt: Prompt to activate 

2538 """ 

2539 event = { 

2540 "type": "prompt_activated", 

2541 "data": {"id": prompt.id, "name": prompt.name, "enabled": True}, 

2542 "timestamp": datetime.now(timezone.utc).isoformat(), 

2543 } 

2544 await self._publish_event(event) 

2545 

2546 async def _notify_prompt_deactivated(self, prompt: DbPrompt) -> None: 

2547 """ 

2548 Notify subscribers of prompt deactivation. 

2549 

2550 Args: 

2551 prompt: Prompt to deactivate 

2552 """ 

2553 event = { 

2554 "type": "prompt_deactivated", 

2555 "data": {"id": prompt.id, "name": prompt.name, "enabled": False}, 

2556 "timestamp": datetime.now(timezone.utc).isoformat(), 

2557 } 

2558 await self._publish_event(event) 

2559 

2560 async def _notify_prompt_deleted(self, prompt_info: Dict[str, Any]) -> None: 

2561 """ 

2562 Notify subscribers of prompt deletion. 

2563 

2564 Args: 

2565 prompt_info: Dict on prompt to notify as deleted 

2566 """ 

2567 event = { 

2568 "type": "prompt_deleted", 

2569 "data": prompt_info, 

2570 "timestamp": datetime.now(timezone.utc).isoformat(), 

2571 } 

2572 await self._publish_event(event) 

2573 

2574 async def _notify_prompt_removed(self, prompt: DbPrompt) -> None: 

2575 """ 

2576 Notify subscribers of prompt removal (deactivation). 

2577 

2578 Args: 

2579 prompt: Prompt to remove 

2580 """ 

2581 event = { 

2582 "type": "prompt_removed", 

2583 "data": {"id": prompt.id, "name": prompt.name, "enabled": False}, 

2584 "timestamp": datetime.now(timezone.utc).isoformat(), 

2585 } 

2586 await self._publish_event(event) 

2587 

2588 async def _publish_event(self, event: Dict[str, Any]) -> None: 

2589 """ 

2590 Publish event to all subscribers via the EventService. 

2591 

2592 Args: 

2593 event: Event to publish 

2594 """ 

2595 await self._event_service.publish_event(event) 

2596 

2597 # --- Metrics --- 

2598 async def aggregate_metrics(self, db: Session) -> Dict[str, Any]: 

2599 """ 

2600 Aggregate metrics for all prompt invocations across all prompts. 

2601 

2602 Combines recent raw metrics (within retention period) with historical 

2603 hourly rollups for complete historical coverage. Uses in-memory caching 

2604 (10s TTL) to reduce database load under high request rates. 

2605 

2606 Args: 

2607 db: Database session 

2608 

2609 Returns: 

2610 Dict[str, Any]: Aggregated prompt metrics from raw + hourly rollups. 

2611 

2612 Examples: 

2613 >>> from mcpgateway.services.prompt_service import PromptService 

2614 >>> service = PromptService() 

2615 >>> # Method exists and is callable 

2616 >>> callable(service.aggregate_metrics) 

2617 True 

2618 """ 

2619 # Check cache first (if enabled) 

2620 # First-Party 

2621 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel 

2622 

2623 if is_cache_enabled(): 

2624 cached = metrics_cache.get("prompts") 

2625 if cached is not None: 

2626 return cached 

2627 

2628 # Use combined raw + rollup query for full historical coverage 

2629 # First-Party 

2630 from mcpgateway.services.metrics_query_service import aggregate_metrics_combined # pylint: disable=import-outside-toplevel 

2631 

2632 result = aggregate_metrics_combined(db, "prompt") 

2633 metrics = result.to_dict() 

2634 

2635 # Cache the result (if enabled) 

2636 if is_cache_enabled(): 

2637 metrics_cache.set("prompts", metrics) 

2638 

2639 return metrics 

2640 

2641 async def reset_metrics(self, db: Session) -> None: 

2642 """ 

2643 Reset all prompt metrics by deleting raw and hourly rollup records. 

2644 

2645 Args: 

2646 db: Database session 

2647 

2648 Examples: 

2649 >>> from mcpgateway.services.prompt_service import PromptService 

2650 >>> from unittest.mock import MagicMock 

2651 >>> service = PromptService() 

2652 >>> db = MagicMock() 

2653 >>> db.execute = MagicMock() 

2654 >>> db.commit = MagicMock() 

2655 >>> import asyncio 

2656 >>> asyncio.run(service.reset_metrics(db)) 

2657 """ 

2658 

2659 db.execute(delete(PromptMetric)) 

2660 db.execute(delete(PromptMetricsHourly)) 

2661 db.commit() 

2662 

2663 # Invalidate metrics cache 

2664 # First-Party 

2665 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel 

2666 

2667 metrics_cache.invalidate("prompts") 

2668 metrics_cache.invalidate_prefix("top_prompts:") 

2669 

2670 

2671# Lazy singleton - created on first access, not at module import time. 

2672# This avoids instantiation when only exception classes are imported. 

2673_prompt_service_instance = None # pylint: disable=invalid-name 

2674 

2675 

2676def __getattr__(name: str): 

2677 """Module-level __getattr__ for lazy singleton creation. 

2678 

2679 Args: 

2680 name: The attribute name being accessed. 

2681 

2682 Returns: 

2683 The prompt_service singleton instance if name is "prompt_service". 

2684 

2685 Raises: 

2686 AttributeError: If the attribute name is not "prompt_service". 

2687 """ 

2688 global _prompt_service_instance # pylint: disable=global-statement 

2689 if name == "prompt_service": 

2690 if _prompt_service_instance is None: 

2691 _prompt_service_instance = PromptService() 

2692 return _prompt_service_instance 

2693 raise AttributeError(f"module {__name__!r} has no attribute {name!r}")