Coverage for mcpgateway / services / prompt_service.py: 99%
860 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/prompt_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Prompt Service Implementation.
8This module implements prompt template management according to the MCP specification.
9It handles:
10- Prompt template registration and retrieval
11- Prompt argument validation
12- Template rendering with arguments
13- Resource embedding in prompts
14- Active/inactive prompt management
15"""
17# Standard
18import binascii
19from datetime import datetime, timezone
20from functools import lru_cache
21import os
22from string import Formatter
23import time
24from typing import Any, AsyncGenerator, Dict, List, Optional, Set, Union
25import uuid
27# Third-Party
28from jinja2 import Environment, meta, select_autoescape, Template
29from pydantic import ValidationError
30from sqlalchemy import and_, delete, desc, not_, or_, select
31from sqlalchemy.exc import IntegrityError, OperationalError
32from sqlalchemy.orm import joinedload, Session
34# First-Party
35from mcpgateway.common.models import Message, PromptResult, Role, TextContent
36from mcpgateway.config import settings
37from mcpgateway.db import EmailTeam
38from mcpgateway.db import Gateway as DbGateway
39from mcpgateway.db import get_for_update
40from mcpgateway.db import Prompt as DbPrompt
41from mcpgateway.db import PromptMetric, PromptMetricsHourly, server_prompt_association
42from mcpgateway.observability import create_span
43from mcpgateway.plugins.framework import GlobalContext, PluginContextTable, PluginManager, PromptHookType, PromptPosthookPayload, PromptPrehookPayload
44from mcpgateway.schemas import PromptCreate, PromptRead, PromptUpdate, TopPerformer
45from mcpgateway.services.audit_trail_service import get_audit_trail_service
46from mcpgateway.services.event_service import EventService
47from mcpgateway.services.logging_service import LoggingService
48from mcpgateway.services.metrics_cleanup_service import delete_metrics_in_batches, pause_rollup_during_purge
49from mcpgateway.services.observability_service import current_trace_id, ObservabilityService
50from mcpgateway.services.structured_logger import get_structured_logger
51from mcpgateway.services.team_management_service import TeamManagementService
52from mcpgateway.utils.create_slug import slugify
53from mcpgateway.utils.metrics_common import build_top_performers
54from mcpgateway.utils.pagination import unified_paginate
55from mcpgateway.utils.sqlalchemy_modifier import json_contains_tag_expr
57# Cache import (lazy to avoid circular dependencies)
58_REGISTRY_CACHE = None
60# Module-level Jinja environment singleton for template caching
61_JINJA_ENV: Optional[Environment] = None
64def _get_jinja_env() -> Environment:
65 """Get or create the module-level Jinja environment singleton.
67 Returns:
68 Jinja2 Environment with autoescape and trim settings.
69 """
70 global _JINJA_ENV # pylint: disable=global-statement
71 if _JINJA_ENV is None:
72 _JINJA_ENV = Environment(
73 autoescape=select_autoescape(["html", "xml"]),
74 trim_blocks=True,
75 lstrip_blocks=True,
76 )
77 return _JINJA_ENV
80@lru_cache(maxsize=256)
81def _compile_jinja_template(template: str) -> Template:
82 """Cache compiled Jinja template by template string.
84 Args:
85 template: The template string to compile.
87 Returns:
88 Compiled Jinja Template object.
89 """
90 return _get_jinja_env().from_string(template)
93def _get_registry_cache():
94 """Get registry cache singleton lazily.
96 Returns:
97 RegistryCache instance.
98 """
99 global _REGISTRY_CACHE # pylint: disable=global-statement
100 if _REGISTRY_CACHE is None:
101 # First-Party
102 from mcpgateway.cache.registry_cache import registry_cache # pylint: disable=import-outside-toplevel
104 _REGISTRY_CACHE = registry_cache
105 return _REGISTRY_CACHE
108# Initialize logging service first
109logging_service = LoggingService()
110logger = logging_service.get_logger(__name__)
112# Initialize structured logger and audit trail for prompt operations
113structured_logger = get_structured_logger("prompt_service")
114audit_trail = get_audit_trail_service()
117class PromptError(Exception):
118 """Base class for prompt-related errors."""
121class PromptNotFoundError(PromptError):
122 """Raised when a requested prompt is not found."""
125class PromptNameConflictError(PromptError):
126 """Raised when a prompt name conflicts with existing (active or inactive) prompt."""
128 def __init__(self, name: str, enabled: bool = True, prompt_id: Optional[int] = None, visibility: str = "public") -> None:
129 """Initialize the error with prompt information.
131 Args:
132 name: The conflicting prompt name
133 enabled: Whether the existing prompt is enabled
134 prompt_id: ID of the existing prompt if available
135 visibility: Prompt visibility level (private, team, public).
137 Examples:
138 >>> from mcpgateway.services.prompt_service import PromptNameConflictError
139 >>> error = PromptNameConflictError("test_prompt")
140 >>> error.name
141 'test_prompt'
142 >>> error.enabled
143 True
144 >>> error.prompt_id is None
145 True
146 >>> error = PromptNameConflictError("inactive_prompt", False, 123)
147 >>> error.enabled
148 False
149 >>> error.prompt_id
150 123
151 """
152 self.name = name
153 self.enabled = enabled
154 self.prompt_id = prompt_id
155 message = f"{visibility.capitalize()} Prompt already exists with name: {name}"
156 if not enabled:
157 message += f" (currently inactive, ID: {prompt_id})"
158 super().__init__(message)
161class PromptValidationError(PromptError):
162 """Raised when prompt validation fails."""
165class PromptLockConflictError(PromptError):
166 """Raised when a prompt row is locked by another transaction.
168 Raises:
169 PromptLockConflictError: When attempting to modify a prompt that is
170 currently locked by another concurrent request.
171 """
174class PromptService:
175 """Service for managing prompt templates.
177 Handles:
178 - Template registration and retrieval
179 - Argument validation
180 - Template rendering
181 - Resource embedding
182 - Active/inactive status management
183 """
185 def __init__(self) -> None:
186 """
187 Initialize the prompt service.
189 Sets up the Jinja2 environment for rendering prompt templates.
190 Although these templates are rendered as JSON for the API, if the output is ever
191 embedded into an HTML page, unescaped content could be exploited for cross-site scripting (XSS) attacks.
192 Enabling autoescaping for 'html' and 'xml' templates via select_autoescape helps mitigate this risk.
194 Examples:
195 >>> from mcpgateway.services.prompt_service import PromptService
196 >>> service = PromptService()
197 >>> isinstance(service._event_service, EventService)
198 True
199 >>> service._jinja_env is not None
200 True
201 """
202 self._event_service = EventService(channel_name="mcpgateway:prompt_events")
203 # Use the module-level singleton for template caching
204 self._jinja_env = _get_jinja_env()
205 # Initialize plugin manager with env overrides for testability
206 env_flag = os.getenv("PLUGINS_ENABLED")
207 if env_flag is not None:
208 env_enabled = env_flag.strip().lower() in {"1", "true", "yes", "on"}
209 plugins_enabled = env_enabled
210 else:
211 plugins_enabled = settings.plugins_enabled
212 config_file = os.getenv("PLUGIN_CONFIG_FILE", getattr(settings, "plugin_config_file", "plugins/config.yaml"))
213 self._plugin_manager: PluginManager | None = PluginManager(config_file) if plugins_enabled else None
215 async def initialize(self) -> None:
216 """Initialize the service."""
217 logger.info("Initializing prompt service")
218 await self._event_service.initialize()
220 async def shutdown(self) -> None:
221 """Shutdown the service.
223 Examples:
224 >>> from mcpgateway.services.prompt_service import PromptService
225 >>> from unittest.mock import AsyncMock
226 >>> import asyncio
227 >>> service = PromptService()
228 >>> service._event_service = AsyncMock()
229 >>> asyncio.run(service.shutdown())
230 >>> # Verify event service shutdown was called
231 >>> service._event_service.shutdown.assert_awaited_once()
232 """
233 await self._event_service.shutdown()
234 logger.info("Prompt service shutdown complete")
236 async def get_top_prompts(self, db: Session, limit: Optional[int] = 5, include_deleted: bool = False) -> List[TopPerformer]:
237 """Retrieve the top-performing prompts based on execution count.
239 Queries the database to get prompts with their metrics, ordered by the number of executions
240 in descending order. Combines recent raw metrics with historical hourly rollups for complete
241 historical coverage. Returns a list of TopPerformer objects containing prompt details and
242 performance metrics. Results are cached for performance.
244 Args:
245 db (Session): Database session for querying prompt metrics.
246 limit (Optional[int]): Maximum number of prompts to return. Defaults to 5.
247 include_deleted (bool): Whether to include deleted prompts from rollups.
249 Returns:
250 List[TopPerformer]: A list of TopPerformer objects, each containing:
251 - id: Prompt ID.
252 - name: Prompt name.
253 - execution_count: Total number of executions.
254 - avg_response_time: Average response time in seconds, or None if no metrics.
255 - success_rate: Success rate percentage, or None if no metrics.
256 - last_execution: Timestamp of the last execution, or None if no metrics.
257 """
258 # Check cache first (if enabled)
259 # First-Party
260 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel
262 effective_limit = limit or 5
263 cache_key = f"top_prompts:{effective_limit}:include_deleted={include_deleted}"
265 if is_cache_enabled():
266 cached = metrics_cache.get(cache_key)
267 if cached is not None:
268 return cached
270 # Use combined query that includes both raw metrics and rollup data
271 # First-Party
272 from mcpgateway.services.metrics_query_service import get_top_performers_combined # pylint: disable=import-outside-toplevel
274 results = get_top_performers_combined(
275 db=db,
276 metric_type="prompt",
277 entity_model=DbPrompt,
278 limit=effective_limit,
279 include_deleted=include_deleted,
280 )
281 top_performers = build_top_performers(results)
283 # Cache the result (if enabled)
284 if is_cache_enabled():
285 metrics_cache.set(cache_key, top_performers)
287 return top_performers
289 def convert_prompt_to_read(self, db_prompt: DbPrompt, include_metrics: bool = False) -> PromptRead:
290 """
291 Convert a DbPrompt instance to a PromptRead Pydantic model,
292 optionally including aggregated metrics computed from the associated PromptMetric records.
294 Args:
295 db_prompt: Db prompt to convert
296 include_metrics: Whether to include metrics in the result. Defaults to False.
297 Set to False for list operations to avoid N+1 query issues.
299 Returns:
300 PromptRead: Pydantic model instance
301 """
302 arg_schema = db_prompt.argument_schema or {}
303 properties = arg_schema.get("properties", {})
304 required_list = arg_schema.get("required", [])
305 arguments_list = []
306 for arg_name, prop in properties.items():
307 arguments_list.append(
308 {
309 "name": arg_name,
310 "description": prop.get("description") or "",
311 "required": arg_name in required_list,
312 }
313 )
315 # Compute aggregated metrics only if requested (avoids N+1 queries in list operations)
316 if include_metrics:
317 total = len(db_prompt.metrics) if hasattr(db_prompt, "metrics") and db_prompt.metrics is not None else 0
318 successful = sum(1 for m in db_prompt.metrics if m.is_success) if total > 0 else 0
319 failed = sum(1 for m in db_prompt.metrics if not m.is_success) if total > 0 else 0
320 failure_rate = failed / total if total > 0 else 0.0
321 min_rt = min((m.response_time for m in db_prompt.metrics), default=None) if total > 0 else None
322 max_rt = max((m.response_time for m in db_prompt.metrics), default=None) if total > 0 else None
323 avg_rt = (sum(m.response_time for m in db_prompt.metrics) / total) if total > 0 else None
324 last_time = max((m.timestamp for m in db_prompt.metrics), default=None) if total > 0 else None
326 metrics_dict = {
327 "totalExecutions": total,
328 "successfulExecutions": successful,
329 "failedExecutions": failed,
330 "failureRate": failure_rate,
331 "minResponseTime": min_rt,
332 "maxResponseTime": max_rt,
333 "avgResponseTime": avg_rt,
334 "lastExecutionTime": last_time,
335 }
336 else:
337 metrics_dict = None
339 original_name = getattr(db_prompt, "original_name", None) or db_prompt.name
340 custom_name = getattr(db_prompt, "custom_name", None) or original_name
341 custom_name_slug = getattr(db_prompt, "custom_name_slug", None) or slugify(custom_name)
342 display_name = getattr(db_prompt, "display_name", None) or custom_name
344 prompt_dict = {
345 "id": db_prompt.id,
346 "name": db_prompt.name,
347 "original_name": original_name,
348 "custom_name": custom_name,
349 "custom_name_slug": custom_name_slug,
350 "display_name": display_name,
351 "gateway_slug": getattr(db_prompt, "gateway_slug", None),
352 "description": db_prompt.description,
353 "template": db_prompt.template,
354 "arguments": arguments_list,
355 "created_at": db_prompt.created_at,
356 "updated_at": db_prompt.updated_at,
357 "enabled": db_prompt.enabled,
358 "metrics": metrics_dict,
359 "tags": db_prompt.tags or [],
360 "visibility": db_prompt.visibility,
361 "team": getattr(db_prompt, "team", None),
362 # Include metadata fields for proper API response
363 "created_by": getattr(db_prompt, "created_by", None),
364 "modified_by": getattr(db_prompt, "modified_by", None),
365 "created_from_ip": getattr(db_prompt, "created_from_ip", None),
366 "created_via": getattr(db_prompt, "created_via", None),
367 "created_user_agent": getattr(db_prompt, "created_user_agent", None),
368 "modified_from_ip": getattr(db_prompt, "modified_from_ip", None),
369 "modified_via": getattr(db_prompt, "modified_via", None),
370 "modified_user_agent": getattr(db_prompt, "modified_user_agent", None),
371 "version": getattr(db_prompt, "version", None),
372 "team_id": getattr(db_prompt, "team_id", None),
373 "owner_email": getattr(db_prompt, "owner_email", None),
374 }
375 return PromptRead.model_validate(prompt_dict)
377 def _get_team_name(self, db: Session, team_id: Optional[str]) -> Optional[str]:
378 """Retrieve the team name given a team ID.
380 Args:
381 db (Session): Database session for querying teams.
382 team_id (Optional[str]): The ID of the team.
384 Returns:
385 Optional[str]: The name of the team if found, otherwise None.
386 """
387 if not team_id:
388 return None
389 team = db.query(EmailTeam).filter(EmailTeam.id == team_id, EmailTeam.is_active.is_(True)).first()
390 db.commit() # Release transaction to avoid idle-in-transaction
391 return team.name if team else None
393 def _compute_prompt_name(self, custom_name: str, gateway: Optional[Any] = None) -> str:
394 """Compute the stored prompt name from custom_name and gateway context.
396 Args:
397 custom_name: Prompt name to slugify and store.
398 gateway: Optional gateway for namespacing.
400 Returns:
401 The stored prompt name with gateway prefix when applicable.
402 """
403 name_slug = slugify(custom_name)
404 if gateway:
405 gateway_slug = slugify(gateway.name)
406 return f"{gateway_slug}{settings.gateway_tool_name_separator}{name_slug}"
407 return name_slug
409 async def register_prompt(
410 self,
411 db: Session,
412 prompt: PromptCreate,
413 created_by: Optional[str] = None,
414 created_from_ip: Optional[str] = None,
415 created_via: Optional[str] = None,
416 created_user_agent: Optional[str] = None,
417 import_batch_id: Optional[str] = None,
418 federation_source: Optional[str] = None,
419 team_id: Optional[str] = None,
420 owner_email: Optional[str] = None,
421 visibility: Optional[str] = "public",
422 ) -> PromptRead:
423 """Register a new prompt template.
425 Args:
426 db: Database session
427 prompt: Prompt creation schema
428 created_by: Username who created this prompt
429 created_from_ip: IP address of creator
430 created_via: Creation method (ui, api, import, federation)
431 created_user_agent: User agent of creation request
432 import_batch_id: UUID for bulk import operations
433 federation_source: Source gateway for federated prompts
434 team_id (Optional[str]): Team ID to assign the prompt to.
435 owner_email (Optional[str]): Email of the user who owns this prompt.
436 visibility (str): Prompt visibility level (private, team, public).
438 Returns:
439 Created prompt information
441 Raises:
442 IntegrityError: If a database integrity error occurs.
443 PromptNameConflictError: If a prompt with the same name already exists.
444 PromptError: For other prompt registration errors
446 Examples:
447 >>> from mcpgateway.services.prompt_service import PromptService
448 >>> from unittest.mock import MagicMock
449 >>> service = PromptService()
450 >>> db = MagicMock()
451 >>> prompt = MagicMock()
452 >>> db.execute.return_value.scalar_one_or_none.return_value = None
453 >>> db.add = MagicMock()
454 >>> db.commit = MagicMock()
455 >>> db.refresh = MagicMock()
456 >>> service._notify_prompt_added = MagicMock()
457 >>> service.convert_prompt_to_read = MagicMock(return_value={})
458 >>> import asyncio
459 >>> try:
460 ... asyncio.run(service.register_prompt(db, prompt))
461 ... except Exception:
462 ... pass
463 """
464 try:
465 # Validate template syntax
466 self._validate_template(prompt.template)
468 # Extract required arguments from template
469 required_args = self._get_required_arguments(prompt.template)
471 # Create argument schema
472 argument_schema = {
473 "type": "object",
474 "properties": {},
475 "required": list(required_args),
476 }
477 for arg in prompt.arguments:
478 schema = {"type": "string"}
479 if arg.description is not None: 479 ↛ 481line 479 didn't jump to line 481 because the condition on line 479 was always true
480 schema["description"] = arg.description
481 argument_schema["properties"][arg.name] = schema
483 custom_name = prompt.custom_name or prompt.name
484 display_name = prompt.display_name or custom_name
486 # Extract gateway_id from prompt if present and look up gateway for namespacing
487 gateway_id = getattr(prompt, "gateway_id", None)
488 gateway = None
489 if gateway_id:
490 gateway = db.execute(select(DbGateway).where(DbGateway.id == gateway_id)).scalar_one_or_none()
492 computed_name = self._compute_prompt_name(custom_name, gateway=gateway)
494 # Create DB model
495 db_prompt = DbPrompt(
496 name=computed_name,
497 original_name=prompt.name,
498 custom_name=custom_name,
499 display_name=display_name,
500 description=prompt.description,
501 template=prompt.template,
502 argument_schema=argument_schema,
503 tags=prompt.tags,
504 # Metadata fields
505 created_by=created_by,
506 created_from_ip=created_from_ip,
507 created_via=created_via,
508 created_user_agent=created_user_agent,
509 import_batch_id=import_batch_id,
510 federation_source=federation_source,
511 version=1,
512 # Team scoping fields - use schema values if provided, otherwise fallback to parameters
513 team_id=getattr(prompt, "team_id", None) or team_id,
514 owner_email=getattr(prompt, "owner_email", None) or owner_email or created_by,
515 visibility=getattr(prompt, "visibility", None) or visibility,
516 gateway_id=gateway_id,
517 )
518 # Check for existing server with the same name
519 if visibility.lower() == "public":
520 # Check for existing public prompt with the same name and gateway_id
521 existing_prompt = db.execute(select(DbPrompt).where(DbPrompt.name == computed_name, DbPrompt.visibility == "public", DbPrompt.gateway_id == gateway_id)).scalar_one_or_none()
522 if existing_prompt:
523 raise PromptNameConflictError(computed_name, enabled=existing_prompt.enabled, prompt_id=existing_prompt.id, visibility=existing_prompt.visibility)
524 elif visibility.lower() == "team":
525 # Check for existing team prompt with the same name and gateway_id
526 existing_prompt = db.execute(
527 select(DbPrompt).where(DbPrompt.name == computed_name, DbPrompt.visibility == "team", DbPrompt.team_id == team_id, DbPrompt.gateway_id == gateway_id)
528 ).scalar_one_or_none()
529 if existing_prompt:
530 raise PromptNameConflictError(computed_name, enabled=existing_prompt.enabled, prompt_id=existing_prompt.id, visibility=existing_prompt.visibility)
532 # Set gateway relationship to help the before_insert event handler compute the name correctly
533 if gateway:
534 db_prompt.gateway = gateway
535 db_prompt.gateway_name_cache = gateway.name # type: ignore[attr-defined]
537 # Add to DB
538 db.add(db_prompt)
539 db.commit()
540 db.refresh(db_prompt)
541 # Notify subscribers
542 await self._notify_prompt_added(db_prompt)
544 logger.info(f"Registered prompt: {prompt.name}")
546 # Structured logging: Audit trail for prompt creation
547 audit_trail.log_action(
548 user_id=created_by or "system",
549 action="create_prompt",
550 resource_type="prompt",
551 resource_id=str(db_prompt.id),
552 resource_name=db_prompt.name,
553 user_email=owner_email,
554 team_id=team_id,
555 client_ip=created_from_ip,
556 user_agent=created_user_agent,
557 new_values={
558 "name": db_prompt.name,
559 "visibility": visibility,
560 },
561 context={
562 "created_via": created_via,
563 "import_batch_id": import_batch_id,
564 "federation_source": federation_source,
565 },
566 db=db,
567 )
569 # Structured logging: Log successful prompt creation
570 structured_logger.log(
571 level="INFO",
572 message="Prompt created successfully",
573 event_type="prompt_created",
574 component="prompt_service",
575 user_id=created_by,
576 user_email=owner_email,
577 team_id=team_id,
578 resource_type="prompt",
579 resource_id=str(db_prompt.id),
580 custom_fields={
581 "prompt_name": db_prompt.name,
582 "visibility": visibility,
583 },
584 db=db,
585 )
587 db_prompt.team = self._get_team_name(db, db_prompt.team_id)
588 prompt_dict = self.convert_prompt_to_read(db_prompt)
590 # Invalidate cache after successful creation
591 cache = _get_registry_cache()
592 await cache.invalidate_prompts()
593 # Also invalidate tags cache since prompt tags may have changed
594 # First-Party
595 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel
597 await admin_stats_cache.invalidate_tags()
598 # First-Party
599 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel
601 metrics_cache.invalidate_prefix("top_prompts:")
602 metrics_cache.invalidate("prompts")
604 return PromptRead.model_validate(prompt_dict)
606 except IntegrityError as ie:
607 logger.error(f"IntegrityErrors in group: {ie}")
609 structured_logger.log(
610 level="ERROR",
611 message="Prompt creation failed due to database integrity error",
612 event_type="prompt_creation_failed",
613 component="prompt_service",
614 user_id=created_by,
615 user_email=owner_email,
616 error=ie,
617 custom_fields={"prompt_name": prompt.name},
618 db=db,
619 )
620 raise ie
621 except PromptNameConflictError as se:
622 db.rollback()
624 structured_logger.log(
625 level="WARNING",
626 message="Prompt creation failed due to name conflict",
627 event_type="prompt_name_conflict",
628 component="prompt_service",
629 user_id=created_by,
630 user_email=owner_email,
631 custom_fields={"prompt_name": prompt.name, "visibility": visibility},
632 db=db,
633 )
634 raise se
635 except Exception as e:
636 db.rollback()
638 structured_logger.log(
639 level="ERROR",
640 message="Prompt creation failed",
641 event_type="prompt_creation_failed",
642 component="prompt_service",
643 user_id=created_by,
644 user_email=owner_email,
645 error=e,
646 custom_fields={"prompt_name": prompt.name},
647 db=db,
648 )
649 raise PromptError(f"Failed to register prompt: {str(e)}")
651 async def register_prompts_bulk(
652 self,
653 db: Session,
654 prompts: List[PromptCreate],
655 created_by: Optional[str] = None,
656 created_from_ip: Optional[str] = None,
657 created_via: Optional[str] = None,
658 created_user_agent: Optional[str] = None,
659 import_batch_id: Optional[str] = None,
660 federation_source: Optional[str] = None,
661 team_id: Optional[str] = None,
662 owner_email: Optional[str] = None,
663 visibility: Optional[str] = "public",
664 conflict_strategy: str = "skip",
665 ) -> Dict[str, Any]:
666 """Register multiple prompts in bulk with a single commit.
668 This method provides significant performance improvements over individual
669 prompt registration by:
670 - Using db.add_all() instead of individual db.add() calls
671 - Performing a single commit for all prompts
672 - Batch conflict detection
673 - Chunking for very large imports (>500 items)
675 Args:
676 db: Database session
677 prompts: List of prompt creation schemas
678 created_by: Username who created these prompts
679 created_from_ip: IP address of creator
680 created_via: Creation method (ui, api, import, federation)
681 created_user_agent: User agent of creation request
682 import_batch_id: UUID for bulk import operations
683 federation_source: Source gateway for federated prompts
684 team_id: Team ID to assign the prompts to
685 owner_email: Email of the user who owns these prompts
686 visibility: Prompt visibility level (private, team, public)
687 conflict_strategy: How to handle conflicts (skip, update, rename, fail)
689 Returns:
690 Dict with statistics:
691 - created: Number of prompts created
692 - updated: Number of prompts updated
693 - skipped: Number of prompts skipped
694 - failed: Number of prompts that failed
695 - errors: List of error messages
697 Raises:
698 PromptError: If bulk registration fails critically
700 Examples:
701 >>> from mcpgateway.services.prompt_service import PromptService
702 >>> from unittest.mock import MagicMock
703 >>> service = PromptService()
704 >>> db = MagicMock()
705 >>> prompts = [MagicMock(), MagicMock()]
706 >>> import asyncio
707 >>> try:
708 ... result = asyncio.run(service.register_prompts_bulk(db, prompts))
709 ... except Exception:
710 ... pass
711 """
712 if not prompts:
713 return {"created": 0, "updated": 0, "skipped": 0, "failed": 0, "errors": []}
715 stats = {"created": 0, "updated": 0, "skipped": 0, "failed": 0, "errors": []}
717 # Process in chunks to avoid memory issues and SQLite parameter limits
718 chunk_size = 500
720 for chunk_start in range(0, len(prompts), chunk_size):
721 chunk = prompts[chunk_start : chunk_start + chunk_size]
723 try:
724 # Collect unique gateway_ids and look them up
725 gateway_ids = set()
726 for prompt in chunk:
727 gw_id = getattr(prompt, "gateway_id", None)
728 if gw_id:
729 gateway_ids.add(gw_id)
731 gateways_map: Dict[str, Any] = {}
732 if gateway_ids:
733 gateways = db.execute(select(DbGateway).where(DbGateway.id.in_(gateway_ids))).scalars().all()
734 gateways_map = {gw.id: gw for gw in gateways}
736 # Batch check for existing prompts to detect conflicts
737 # Build computed names with gateway context
738 prompt_names = []
739 for prompt in chunk:
740 custom_name = getattr(prompt, "custom_name", None) or prompt.name
741 gw_id = getattr(prompt, "gateway_id", None)
742 gateway = gateways_map.get(gw_id) if gw_id else None
743 computed_name = self._compute_prompt_name(custom_name, gateway=gateway)
744 prompt_names.append(computed_name)
746 # Query for existing prompts - need to consider gateway_id in conflict detection
747 # Build base query conditions
748 if visibility.lower() == "public":
749 base_conditions = [DbPrompt.name.in_(prompt_names), DbPrompt.visibility == "public"]
750 elif visibility.lower() == "team" and team_id:
751 base_conditions = [DbPrompt.name.in_(prompt_names), DbPrompt.visibility == "team", DbPrompt.team_id == team_id]
752 else:
753 # Private prompts - check by owner
754 base_conditions = [DbPrompt.name.in_(prompt_names), DbPrompt.visibility == "private", DbPrompt.owner_email == (owner_email or created_by)]
756 existing_prompts_query = select(DbPrompt).where(*base_conditions)
757 existing_prompts = db.execute(existing_prompts_query).scalars().all()
758 # Use (name, gateway_id) tuple as key for proper conflict detection
759 existing_prompts_map = {(p.name, p.gateway_id): p for p in existing_prompts}
761 prompts_to_add = []
762 prompts_to_update = []
764 for prompt in chunk:
765 try:
766 # Validate template syntax
767 self._validate_template(prompt.template)
769 # Extract required arguments from template
770 required_args = self._get_required_arguments(prompt.template)
772 # Create argument schema
773 argument_schema = {
774 "type": "object",
775 "properties": {},
776 "required": list(required_args),
777 }
778 for arg in prompt.arguments:
779 schema = {"type": "string"}
780 if arg.description is not None:
781 schema["description"] = arg.description
782 argument_schema["properties"][arg.name] = schema
784 # Use provided parameters or schema values
785 prompt_team_id = team_id if team_id is not None else getattr(prompt, "team_id", None)
786 prompt_owner_email = owner_email or getattr(prompt, "owner_email", None) or created_by
787 prompt_visibility = visibility if visibility is not None else getattr(prompt, "visibility", "public")
788 prompt_gateway_id = getattr(prompt, "gateway_id", None)
790 custom_name = getattr(prompt, "custom_name", None) or prompt.name
791 display_name = getattr(prompt, "display_name", None) or custom_name
792 gateway = gateways_map.get(prompt_gateway_id) if prompt_gateway_id else None
793 computed_name = self._compute_prompt_name(custom_name, gateway=gateway)
795 # Look up existing prompt by (name, gateway_id) tuple
796 existing_prompt = existing_prompts_map.get((computed_name, prompt_gateway_id))
798 if existing_prompt:
799 # Handle conflict based on strategy
800 if conflict_strategy == "skip":
801 stats["skipped"] += 1
802 continue
803 if conflict_strategy == "update":
804 # Update existing prompt
805 existing_prompt.description = prompt.description
806 existing_prompt.template = prompt.template
807 # Clear template cache to reduce memory growth
808 _compile_jinja_template.cache_clear()
809 existing_prompt.argument_schema = argument_schema
810 existing_prompt.tags = prompt.tags or []
811 if getattr(prompt, "custom_name", None) is not None:
812 existing_prompt.custom_name = custom_name
813 if getattr(prompt, "display_name", None) is not None:
814 existing_prompt.display_name = display_name
815 existing_prompt.modified_by = created_by
816 existing_prompt.modified_from_ip = created_from_ip
817 existing_prompt.modified_via = created_via
818 existing_prompt.modified_user_agent = created_user_agent
819 existing_prompt.updated_at = datetime.now(timezone.utc)
820 existing_prompt.version = (existing_prompt.version or 1) + 1
822 prompts_to_update.append(existing_prompt)
823 stats["updated"] += 1
824 elif conflict_strategy == "rename":
825 # Create with renamed prompt
826 new_name = f"{prompt.name}_imported_{int(datetime.now().timestamp())}"
827 new_custom_name = new_name
828 new_display_name = new_name
829 computed_name = self._compute_prompt_name(new_custom_name, gateway=gateway)
830 db_prompt = DbPrompt(
831 name=computed_name,
832 original_name=prompt.name,
833 custom_name=new_custom_name,
834 display_name=new_display_name,
835 description=prompt.description,
836 template=prompt.template,
837 argument_schema=argument_schema,
838 tags=prompt.tags or [],
839 created_by=created_by,
840 created_from_ip=created_from_ip,
841 created_via=created_via,
842 created_user_agent=created_user_agent,
843 import_batch_id=import_batch_id,
844 federation_source=federation_source,
845 version=1,
846 team_id=prompt_team_id,
847 owner_email=prompt_owner_email,
848 visibility=prompt_visibility,
849 gateway_id=prompt_gateway_id,
850 )
851 # Set gateway relationship to help the before_insert event handler
852 if gateway: 852 ↛ 855line 852 didn't jump to line 855 because the condition on line 852 was always true
853 db_prompt.gateway = gateway
854 db_prompt.gateway_name_cache = gateway.name # type: ignore[attr-defined]
855 prompts_to_add.append(db_prompt)
856 stats["created"] += 1
857 elif conflict_strategy == "fail": 857 ↛ 764line 857 didn't jump to line 764 because the condition on line 857 was always true
858 stats["failed"] += 1
859 stats["errors"].append(f"Prompt name conflict: {prompt.name}")
860 continue
861 else:
862 # Create new prompt
863 db_prompt = DbPrompt(
864 name=computed_name,
865 original_name=prompt.name,
866 custom_name=custom_name,
867 display_name=display_name,
868 description=prompt.description,
869 template=prompt.template,
870 argument_schema=argument_schema,
871 tags=prompt.tags or [],
872 created_by=created_by,
873 created_from_ip=created_from_ip,
874 created_via=created_via,
875 created_user_agent=created_user_agent,
876 import_batch_id=import_batch_id,
877 federation_source=federation_source,
878 version=1,
879 team_id=prompt_team_id,
880 owner_email=prompt_owner_email,
881 visibility=prompt_visibility,
882 gateway_id=prompt_gateway_id,
883 )
884 # Set gateway relationship to help the before_insert event handler
885 if gateway:
886 db_prompt.gateway = gateway
887 db_prompt.gateway_name_cache = gateway.name # type: ignore[attr-defined]
888 prompts_to_add.append(db_prompt)
889 stats["created"] += 1
891 except Exception as e:
892 stats["failed"] += 1
893 stats["errors"].append(f"Failed to process prompt {prompt.name}: {str(e)}")
894 logger.warning(f"Failed to process prompt {prompt.name} in bulk operation: {str(e)}")
895 continue
897 # Bulk add new prompts
898 if prompts_to_add:
899 db.add_all(prompts_to_add)
901 # Commit the chunk
902 db.commit()
904 # Refresh prompts for notifications and audit trail
905 for db_prompt in prompts_to_add:
906 db.refresh(db_prompt)
907 # Notify subscribers
908 await self._notify_prompt_added(db_prompt)
910 # Log bulk audit trail entry
911 if prompts_to_add or prompts_to_update:
912 audit_trail.log_action(
913 user_id=created_by or "system",
914 action="bulk_create_prompts" if prompts_to_add else "bulk_update_prompts",
915 resource_type="prompt",
916 resource_id=import_batch_id or "bulk_operation",
917 resource_name=f"Bulk operation: {len(prompts_to_add)} created, {len(prompts_to_update)} updated",
918 user_email=owner_email,
919 team_id=team_id,
920 client_ip=created_from_ip,
921 user_agent=created_user_agent,
922 new_values={
923 "prompts_created": len(prompts_to_add),
924 "prompts_updated": len(prompts_to_update),
925 "visibility": visibility,
926 },
927 context={
928 "created_via": created_via,
929 "import_batch_id": import_batch_id,
930 "federation_source": federation_source,
931 "conflict_strategy": conflict_strategy,
932 },
933 db=db,
934 )
936 logger.info(f"Bulk registered {len(prompts_to_add)} prompts, updated {len(prompts_to_update)} prompts in chunk")
938 except Exception as e:
939 db.rollback()
940 logger.error(f"Failed to process chunk in bulk prompt registration: {str(e)}")
941 stats["failed"] += len(chunk)
942 stats["errors"].append(f"Chunk processing failed: {str(e)}")
943 continue
945 # Final structured logging
946 structured_logger.log(
947 level="INFO",
948 message="Bulk prompt registration completed",
949 event_type="prompts_bulk_created",
950 component="prompt_service",
951 user_id=created_by,
952 user_email=owner_email,
953 team_id=team_id,
954 resource_type="prompt",
955 custom_fields={
956 "prompts_created": stats["created"],
957 "prompts_updated": stats["updated"],
958 "prompts_skipped": stats["skipped"],
959 "prompts_failed": stats["failed"],
960 "total_prompts": len(prompts),
961 "visibility": visibility,
962 "conflict_strategy": conflict_strategy,
963 },
964 db=db,
965 )
967 return stats
969 async def list_prompts(
970 self,
971 db: Session,
972 include_inactive: bool = False,
973 cursor: Optional[str] = None,
974 tags: Optional[List[str]] = None,
975 limit: Optional[int] = None,
976 page: Optional[int] = None,
977 per_page: Optional[int] = None,
978 user_email: Optional[str] = None,
979 team_id: Optional[str] = None,
980 visibility: Optional[str] = None,
981 token_teams: Optional[List[str]] = None,
982 ) -> Union[tuple[List[PromptRead], Optional[str]], Dict[str, Any]]:
983 """
984 Retrieve a list of prompt templates from the database with pagination support.
986 This method retrieves prompt templates from the database and converts them into a list
987 of PromptRead objects. It supports filtering out inactive prompts based on the
988 include_inactive parameter and cursor-based pagination.
990 Args:
991 db (Session): The SQLAlchemy database session.
992 include_inactive (bool): If True, include inactive prompts in the result.
993 Defaults to False.
994 cursor (Optional[str], optional): An opaque cursor token for pagination.
995 Opaque base64-encoded string containing last item's ID and created_at.
996 tags (Optional[List[str]]): Filter prompts by tags. If provided, only prompts with at least one matching tag will be returned.
997 limit (Optional[int]): Maximum number of prompts to return. Use 0 for all prompts (no limit).
998 If not specified, uses pagination_default_page_size.
999 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor.
1000 per_page: Items per page for page-based pagination. Defaults to pagination_default_page_size.
1001 user_email (Optional[str]): User email for team-based access control. If None, no access control is applied.
1002 team_id (Optional[str]): Filter by specific team ID. Requires user_email for access validation.
1003 visibility (Optional[str]): Filter by visibility (private, team, public).
1004 token_teams (Optional[List[str]]): Override DB team lookup with token's teams. Used for MCP/API token access
1005 where the token scope should be respected instead of the user's full team memberships.
1007 Returns:
1008 If page is provided: Dict with {"data": [...], "pagination": {...}, "links": {...}}
1009 If cursor is provided or neither: tuple of (list of PromptRead objects, next_cursor).
1011 Examples:
1012 >>> from mcpgateway.services.prompt_service import PromptService
1013 >>> from unittest.mock import MagicMock
1014 >>> from mcpgateway.schemas import PromptRead
1015 >>> service = PromptService()
1016 >>> db = MagicMock()
1017 >>> prompt_read_obj = MagicMock(spec=PromptRead)
1018 >>> service.convert_prompt_to_read = MagicMock(return_value=prompt_read_obj)
1019 >>> db.execute.return_value.scalars.return_value.all.return_value = [MagicMock()]
1020 >>> import asyncio
1021 >>> prompts, next_cursor = asyncio.run(service.list_prompts(db))
1022 >>> prompts == [prompt_read_obj]
1023 True
1024 """
1025 # Check cache for first page only (cursor=None)
1026 # Skip caching when:
1027 # - user_email is provided (team-filtered results are user-specific)
1028 # - token_teams is set (scoped access, e.g., public-only or team-scoped tokens)
1029 # - page-based pagination is used
1030 # This prevents cache poisoning where admin results could leak to public-only requests
1031 cache = _get_registry_cache()
1032 if cursor is None and user_email is None and token_teams is None and page is None:
1033 filters_hash = cache.hash_filters(include_inactive=include_inactive, tags=sorted(tags) if tags else None)
1034 cached = await cache.get("prompts", filters_hash)
1035 if cached is not None:
1036 # Reconstruct PromptRead objects from cached dicts
1037 cached_prompts = [PromptRead.model_validate(p) for p in cached["prompts"]]
1038 return (cached_prompts, cached.get("next_cursor"))
1040 # Build base query with ordering and eager load gateway to avoid N+1
1041 query = select(DbPrompt).options(joinedload(DbPrompt.gateway)).order_by(desc(DbPrompt.created_at), desc(DbPrompt.id))
1043 if not include_inactive:
1044 query = query.where(DbPrompt.enabled)
1046 # Apply team-based access control if user_email is provided OR token_teams is explicitly set
1047 # This ensures unauthenticated requests with token_teams=[] only see public prompts
1048 if user_email is not None or token_teams is not None: # empty-string user_email -> public-only filtering (secure default)
1049 # Use token_teams if provided (for MCP/API token access), otherwise look up from DB
1050 if token_teams is not None:
1051 team_ids = token_teams
1052 elif user_email:
1053 team_service = TeamManagementService(db)
1054 user_teams = await team_service.get_user_teams(user_email)
1055 team_ids = [team.id for team in user_teams]
1056 else:
1057 team_ids = []
1059 # Check if this is a public-only token (empty teams array)
1060 # Public-only tokens can ONLY see public resources - no owner access
1061 is_public_only_token = token_teams is not None and len(token_teams) == 0
1063 if team_id:
1064 # User requesting specific team - verify access
1065 if team_id not in team_ids:
1066 return ([], None)
1067 access_conditions = [
1068 and_(DbPrompt.team_id == team_id, DbPrompt.visibility.in_(["team", "public"])),
1069 ]
1070 # Only include owner access for non-public-only tokens with user_email
1071 if not is_public_only_token and user_email: 1071 ↛ 1073line 1071 didn't jump to line 1073 because the condition on line 1071 was always true
1072 access_conditions.append(and_(DbPrompt.team_id == team_id, DbPrompt.owner_email == user_email))
1073 query = query.where(or_(*access_conditions))
1074 else:
1075 # General access: public prompts + team prompts (+ owner prompts if not public-only token)
1076 access_conditions = [
1077 DbPrompt.visibility == "public",
1078 ]
1079 # Only include owner access for non-public-only tokens with user_email
1080 if not is_public_only_token and user_email:
1081 access_conditions.append(DbPrompt.owner_email == user_email)
1082 if team_ids:
1083 access_conditions.append(and_(DbPrompt.team_id.in_(team_ids), DbPrompt.visibility.in_(["team", "public"])))
1084 query = query.where(or_(*access_conditions))
1086 if visibility:
1087 query = query.where(DbPrompt.visibility == visibility)
1089 # Add tag filtering if tags are provided (supports both List[str] and List[Dict] formats)
1090 if tags:
1091 query = query.where(json_contains_tag_expr(db, DbPrompt.tags, tags, match_any=True))
1093 # Use unified pagination helper - handles both page and cursor pagination
1094 pag_result = await unified_paginate(
1095 db=db,
1096 query=query,
1097 page=page,
1098 per_page=per_page,
1099 cursor=cursor,
1100 limit=limit,
1101 base_url="/admin/prompts", # Used for page-based links
1102 query_params={"include_inactive": include_inactive} if include_inactive else {},
1103 )
1105 next_cursor = None
1106 # Extract servers based on pagination type
1107 if page is not None:
1108 # Page-based: pag_result is a dict
1109 prompts_db = pag_result["data"]
1110 else:
1111 # Cursor-based: pag_result is a tuple
1112 prompts_db, next_cursor = pag_result
1114 # Fetch team names for the prompts (common for both pagination types)
1115 team_ids_set = {s.team_id for s in prompts_db if s.team_id}
1116 team_map = {}
1117 if team_ids_set:
1118 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(team_ids_set), EmailTeam.is_active.is_(True))).all()
1119 team_map = {team.id: team.name for team in teams}
1121 db.commit() # Release transaction to avoid idle-in-transaction
1123 # Convert to PromptRead (common for both pagination types)
1124 result = []
1125 for s in prompts_db:
1126 try:
1127 s.team = team_map.get(s.team_id) if s.team_id else None
1128 result.append(self.convert_prompt_to_read(s, include_metrics=False))
1129 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e:
1130 logger.exception(f"Failed to convert prompt {getattr(s, 'id', 'unknown')} ({getattr(s, 'name', 'unknown')}): {e}")
1131 # Continue with remaining prompts instead of failing completely
1132 # Return appropriate format based on pagination type
1133 if page is not None:
1134 # Page-based format
1135 return {
1136 "data": result,
1137 "pagination": pag_result["pagination"],
1138 "links": pag_result["links"],
1139 }
1141 # Cursor-based format
1143 # Cache first page results - only for non-user-specific/non-scoped queries
1144 # Must match the same conditions as cache lookup to prevent cache poisoning
1145 if cursor is None and user_email is None and token_teams is None:
1146 try:
1147 cache_data = {"prompts": [s.model_dump(mode="json") for s in result], "next_cursor": next_cursor}
1148 await cache.set("prompts", cache_data, filters_hash)
1149 except AttributeError:
1150 pass # Skip caching if result objects don't support model_dump (e.g., in doctests)
1152 return (result, next_cursor)
1154 async def list_prompts_for_user(
1155 self, db: Session, user_email: str, team_id: Optional[str] = None, visibility: Optional[str] = None, include_inactive: bool = False, skip: int = 0, limit: int = 100
1156 ) -> List[PromptRead]:
1157 """
1158 DEPRECATED: Use list_prompts() with user_email parameter instead.
1160 This method is maintained for backward compatibility but is no longer used.
1161 New code should call list_prompts() with user_email, team_id, and visibility parameters.
1163 List prompts user has access to with team filtering.
1165 Args:
1166 db: Database session
1167 user_email: Email of the user requesting prompts
1168 team_id: Optional team ID to filter by specific team
1169 visibility: Optional visibility filter (private, team, public)
1170 include_inactive: Whether to include inactive prompts
1171 skip: Number of prompts to skip for pagination
1172 limit: Maximum number of prompts to return
1174 Returns:
1175 List[PromptRead]: Prompts the user has access to
1176 """
1177 # Build query following existing patterns from list_prompts()
1178 team_service = TeamManagementService(db)
1179 user_teams = await team_service.get_user_teams(user_email)
1180 team_ids = [team.id for team in user_teams]
1182 # Build query following existing patterns from list_resources()
1183 # Eager load gateway to avoid N+1 when accessing gateway_slug
1184 query = select(DbPrompt).options(joinedload(DbPrompt.gateway))
1186 # Apply active/inactive filter
1187 if not include_inactive:
1188 query = query.where(DbPrompt.enabled)
1190 if team_id:
1191 if team_id not in team_ids:
1192 return [] # No access to team
1194 access_conditions = []
1195 # Filter by specific team
1196 access_conditions.append(and_(DbPrompt.team_id == team_id, DbPrompt.visibility.in_(["team", "public"])))
1198 access_conditions.append(and_(DbPrompt.team_id == team_id, DbPrompt.owner_email == user_email))
1200 query = query.where(or_(*access_conditions))
1201 else:
1202 # Get user's accessible teams
1203 # Build access conditions following existing patterns
1204 access_conditions = []
1205 # 1. User's personal resources (owner_email matches)
1206 access_conditions.append(DbPrompt.owner_email == user_email)
1207 # 2. Team resources where user is member
1208 if team_ids: 1208 ↛ 1209line 1208 didn't jump to line 1209 because the condition on line 1208 was never true
1209 access_conditions.append(and_(DbPrompt.team_id.in_(team_ids), DbPrompt.visibility.in_(["team", "public"])))
1210 # 3. Public resources (if visibility allows)
1211 access_conditions.append(DbPrompt.visibility == "public")
1213 query = query.where(or_(*access_conditions))
1215 # Apply visibility filter if specified
1216 if visibility:
1217 query = query.where(DbPrompt.visibility == visibility)
1219 # Apply pagination following existing patterns
1220 query = query.offset(skip).limit(limit)
1222 prompts = db.execute(query).scalars().all()
1224 # Batch fetch team names to avoid N+1 queries
1225 prompt_team_ids = {p.team_id for p in prompts if p.team_id}
1226 team_map = {}
1227 if prompt_team_ids:
1228 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(prompt_team_ids), EmailTeam.is_active.is_(True))).all()
1229 team_map = {str(team.id): team.name for team in teams}
1231 db.commit() # Release transaction to avoid idle-in-transaction
1233 result = []
1234 for t in prompts:
1235 try:
1236 t.team = team_map.get(str(t.team_id)) if t.team_id else None
1237 result.append(self.convert_prompt_to_read(t, include_metrics=False))
1238 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e:
1239 logger.exception(f"Failed to convert prompt {getattr(t, 'id', 'unknown')} ({getattr(t, 'name', 'unknown')}): {e}")
1240 # Continue with remaining prompts instead of failing completely
1241 return result
1243 async def list_server_prompts(
1244 self,
1245 db: Session,
1246 server_id: str,
1247 include_inactive: bool = False,
1248 cursor: Optional[str] = None,
1249 user_email: Optional[str] = None,
1250 token_teams: Optional[List[str]] = None,
1251 ) -> List[PromptRead]:
1252 """
1253 Retrieve a list of prompt templates from the database.
1255 This method retrieves prompt templates from the database and converts them into a list
1256 of PromptRead objects. It supports filtering out inactive prompts based on the
1257 include_inactive parameter. The cursor parameter is reserved for future pagination support
1258 but is currently not implemented.
1260 Args:
1261 db (Session): The SQLAlchemy database session.
1262 server_id (str): Server ID
1263 include_inactive (bool): If True, include inactive prompts in the result.
1264 Defaults to False.
1265 cursor (Optional[str], optional): An opaque cursor token for pagination. Currently,
1266 this parameter is ignored. Defaults to None.
1267 user_email (Optional[str]): User email for visibility filtering. If None, no filtering applied.
1268 token_teams (Optional[List[str]]): Override DB team lookup with token's teams. Used for MCP/API
1269 token access where the token scope should be respected.
1271 Returns:
1272 List[PromptRead]: A list of prompt templates represented as PromptRead objects.
1274 Examples:
1275 >>> from mcpgateway.services.prompt_service import PromptService
1276 >>> from unittest.mock import MagicMock
1277 >>> from mcpgateway.schemas import PromptRead
1278 >>> service = PromptService()
1279 >>> db = MagicMock()
1280 >>> prompt_read_obj = MagicMock(spec=PromptRead)
1281 >>> service.convert_prompt_to_read = MagicMock(return_value=prompt_read_obj)
1282 >>> db.execute.return_value.scalars.return_value.all.return_value = [MagicMock()]
1283 >>> import asyncio
1284 >>> result = asyncio.run(service.list_server_prompts(db, 'server1'))
1285 >>> result == [prompt_read_obj]
1286 True
1287 """
1288 # Eager load gateway to avoid N+1 when accessing gateway_slug
1289 query = (
1290 select(DbPrompt)
1291 .options(joinedload(DbPrompt.gateway))
1292 .join(server_prompt_association, DbPrompt.id == server_prompt_association.c.prompt_id)
1293 .where(server_prompt_association.c.server_id == server_id)
1294 )
1295 if not include_inactive:
1296 query = query.where(DbPrompt.enabled)
1298 # Add visibility filtering if user context OR token_teams provided
1299 # This ensures unauthenticated requests with token_teams=[] only see public prompts
1300 if user_email is not None or token_teams is not None: # empty-string user_email -> public-only filtering (secure default)
1301 # Use token_teams if provided (for MCP/API token access), otherwise look up from DB
1302 if token_teams is not None:
1303 team_ids = token_teams
1304 elif user_email:
1305 team_service = TeamManagementService(db)
1306 user_teams = await team_service.get_user_teams(user_email)
1307 team_ids = [team.id for team in user_teams]
1308 else:
1309 team_ids = []
1311 # Check if this is a public-only token (empty teams array)
1312 # Public-only tokens can ONLY see public resources - no owner access
1313 is_public_only_token = token_teams is not None and len(token_teams) == 0
1315 access_conditions = [
1316 DbPrompt.visibility == "public",
1317 ]
1318 # Only include owner access for non-public-only tokens with user_email
1319 if not is_public_only_token and user_email:
1320 access_conditions.append(DbPrompt.owner_email == user_email)
1321 if team_ids:
1322 access_conditions.append(and_(DbPrompt.team_id.in_(team_ids), DbPrompt.visibility.in_(["team", "public"])))
1323 query = query.where(or_(*access_conditions))
1325 # Cursor-based pagination logic can be implemented here in the future.
1326 logger.debug(cursor)
1327 prompts = db.execute(query).scalars().all()
1329 # Batch fetch team names to avoid N+1 queries
1330 prompt_team_ids = {p.team_id for p in prompts if p.team_id}
1331 team_map = {}
1332 if prompt_team_ids:
1333 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(prompt_team_ids), EmailTeam.is_active.is_(True))).all()
1334 team_map = {str(team.id): team.name for team in teams}
1336 db.commit() # Release transaction to avoid idle-in-transaction
1338 result = []
1339 for t in prompts:
1340 try:
1341 t.team = team_map.get(str(t.team_id)) if t.team_id else None
1342 result.append(self.convert_prompt_to_read(t, include_metrics=False))
1343 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e:
1344 logger.exception(f"Failed to convert prompt {getattr(t, 'id', 'unknown')} ({getattr(t, 'name', 'unknown')}): {e}")
1345 # Continue with remaining prompts instead of failing completely
1346 return result
1348 async def _record_prompt_metric(self, db: Session, prompt: DbPrompt, start_time: float, success: bool, error_message: Optional[str]) -> None:
1349 """
1350 Records a metric for a prompt invocation.
1352 Args:
1353 db: Database session
1354 prompt: The prompt that was invoked
1355 start_time: Monotonic start time of the invocation
1356 success: True if successful, False otherwise
1357 error_message: Error message if failed, None otherwise
1358 """
1359 end_time = time.monotonic()
1360 response_time = end_time - start_time
1362 metric = PromptMetric(
1363 prompt_id=prompt.id,
1364 response_time=response_time,
1365 is_success=success,
1366 error_message=error_message,
1367 )
1368 db.add(metric)
1369 db.commit()
1371 async def _check_prompt_access(
1372 self,
1373 db: Session,
1374 prompt: DbPrompt,
1375 user_email: Optional[str],
1376 token_teams: Optional[List[str]],
1377 ) -> bool:
1378 """Check if user has access to a prompt based on visibility rules.
1380 Implements the same access control logic as list_prompts() for consistency.
1382 Args:
1383 db: Database session for team membership lookup if needed.
1384 prompt: Prompt ORM object with visibility, team_id, owner_email.
1385 user_email: Email of the requesting user (None = unauthenticated).
1386 token_teams: List of team IDs from token.
1387 - None = unrestricted admin access
1388 - [] = public-only token
1389 - [...] = team-scoped token
1391 Returns:
1392 True if access is allowed, False otherwise.
1393 """
1394 visibility = getattr(prompt, "visibility", "public")
1395 prompt_team_id = getattr(prompt, "team_id", None)
1396 prompt_owner_email = getattr(prompt, "owner_email", None)
1398 # Public prompts are accessible by everyone
1399 if visibility == "public":
1400 return True
1402 # Admin bypass: token_teams=None AND user_email=None means unrestricted admin
1403 # This happens when is_admin=True and no team scoping in token
1404 if token_teams is None and user_email is None:
1405 return True
1407 # No user context (but not admin) = deny access to non-public prompts
1408 if not user_email:
1409 return False
1411 # Public-only tokens (empty teams array) can ONLY access public prompts
1412 is_public_only_token = token_teams is not None and len(token_teams) == 0
1413 if is_public_only_token:
1414 return False # Already checked public above
1416 # Owner can always access their own prompts
1417 if prompt_owner_email and prompt_owner_email == user_email:
1418 return True
1420 # Team prompts: check team membership (matches list_prompts behavior)
1421 if prompt_team_id:
1422 # Use token_teams if provided, otherwise look up from DB
1423 if token_teams is not None:
1424 team_ids = token_teams
1425 else:
1426 team_service = TeamManagementService(db)
1427 user_teams = await team_service.get_user_teams(user_email)
1428 team_ids = [team.id for team in user_teams]
1430 # Team/public visibility allows access if user is in the team
1431 if visibility in ["team", "public"] and prompt_team_id in team_ids:
1432 return True
1434 return False
1436 async def get_prompt(
1437 self,
1438 db: Session,
1439 prompt_id: Union[int, str],
1440 arguments: Optional[Dict[str, str]] = None,
1441 user: Optional[str] = None,
1442 tenant_id: Optional[str] = None,
1443 server_id: Optional[str] = None,
1444 request_id: Optional[str] = None,
1445 token_teams: Optional[List[str]] = None,
1446 plugin_context_table: Optional[PluginContextTable] = None,
1447 plugin_global_context: Optional[GlobalContext] = None,
1448 _meta_data: Optional[Dict[str, Any]] = None,
1449 ) -> PromptResult:
1450 """Get a prompt template and optionally render it.
1452 Args:
1453 db: Database session
1454 prompt_id: ID of the prompt to retrieve
1455 arguments: Optional arguments for rendering
1456 user: Optional user email for authorization checks
1457 tenant_id: Optional tenant identifier for plugin context
1458 server_id: Optional server ID for server scoping enforcement
1459 request_id: Optional request ID, generated if not provided
1460 token_teams: Optional list of team IDs from token for authorization.
1461 None = unrestricted admin, [] = public-only, [...] = team-scoped.
1462 plugin_context_table: Optional plugin context table from previous hooks for cross-hook state sharing.
1463 plugin_global_context: Optional global context from middleware for consistency across hooks.
1464 _meta_data: Optional metadata for prompt retrieval (not used currently).
1466 Returns:
1467 Prompt result with rendered messages
1469 Raises:
1470 PluginViolationError: If prompt violates a plugin policy
1471 PromptNotFoundError: If prompt not found or access denied
1472 PromptError: For other prompt errors
1473 PluginError: If encounters issue with plugin
1475 Examples:
1476 >>> from mcpgateway.services.prompt_service import PromptService
1477 >>> from unittest.mock import MagicMock
1478 >>> service = PromptService()
1479 >>> db = MagicMock()
1480 >>> db.execute.return_value.scalar_one_or_none.return_value = MagicMock()
1481 >>> import asyncio
1482 >>> try:
1483 ... asyncio.run(service.get_prompt(db, 'prompt_id'))
1484 ... except Exception:
1485 ... pass
1486 """
1488 start_time = time.monotonic()
1489 success = False
1490 error_message = None
1491 prompt = None
1493 # Create database span for observability dashboard
1494 trace_id = current_trace_id.get()
1495 db_span_id = None
1496 db_span_ended = False
1497 observability_service = ObservabilityService() if trace_id else None
1499 if trace_id and observability_service:
1500 try:
1501 db_span_id = observability_service.start_span(
1502 db=db,
1503 trace_id=trace_id,
1504 name="prompt.render",
1505 attributes={
1506 "prompt.id": str(prompt_id),
1507 "arguments_count": len(arguments) if arguments else 0,
1508 "user": user or "anonymous",
1509 "server_id": server_id,
1510 "tenant_id": tenant_id,
1511 "request_id": request_id or "none",
1512 },
1513 )
1514 logger.debug(f"✓ Created prompt.render span: {db_span_id} for prompt: {prompt_id}")
1515 except Exception as e:
1516 logger.warning(f"Failed to start observability span for prompt rendering: {e}")
1517 db_span_id = None
1519 # Create a trace span for OpenTelemetry export (Jaeger, Zipkin, etc.)
1520 with create_span(
1521 "prompt.render",
1522 {
1523 "prompt.id": prompt_id,
1524 "arguments_count": len(arguments) if arguments else 0,
1525 "user": user or "anonymous",
1526 "server_id": server_id,
1527 "tenant_id": tenant_id,
1528 "request_id": request_id or "none",
1529 },
1530 ) as span:
1531 try:
1532 # Check if any prompt hooks are registered to avoid unnecessary context creation
1533 has_pre_fetch = self._plugin_manager and self._plugin_manager.has_hooks_for(PromptHookType.PROMPT_PRE_FETCH)
1534 has_post_fetch = self._plugin_manager and self._plugin_manager.has_hooks_for(PromptHookType.PROMPT_POST_FETCH)
1536 # Initialize plugin context variables only if hooks are registered
1537 context_table = None
1538 global_context = None
1539 if has_pre_fetch or has_post_fetch:
1540 context_table = plugin_context_table
1541 if plugin_global_context:
1542 global_context = plugin_global_context
1543 # Update fields with prompt-specific information
1544 if user: 1544 ↛ 1546line 1544 didn't jump to line 1546 because the condition on line 1544 was always true
1545 global_context.user = user
1546 if server_id:
1547 global_context.server_id = server_id
1548 if tenant_id: 1548 ↛ 1556line 1548 didn't jump to line 1556 because the condition on line 1548 was always true
1549 global_context.tenant_id = tenant_id
1550 else:
1551 # Create new context (fallback when middleware didn't run)
1552 if not request_id: 1552 ↛ 1554line 1552 didn't jump to line 1554 because the condition on line 1552 was always true
1553 request_id = uuid.uuid4().hex
1554 global_context = GlobalContext(request_id=request_id, user=user, server_id=server_id, tenant_id=tenant_id)
1556 if has_pre_fetch:
1557 pre_result, context_table = await self._plugin_manager.invoke_hook(
1558 PromptHookType.PROMPT_PRE_FETCH,
1559 payload=PromptPrehookPayload(prompt_id=prompt_id, args=arguments),
1560 global_context=global_context,
1561 local_contexts=context_table, # Pass context from previous hooks
1562 violations_as_exceptions=True,
1563 )
1565 # Use modified payload if provided
1566 if pre_result.modified_payload: 1566 ↛ 1571line 1566 didn't jump to line 1571 because the condition on line 1566 was always true
1567 payload = pre_result.modified_payload
1568 arguments = payload.args
1570 # Find prompt by ID first, then by name (active prompts only)
1571 search_key = str(prompt_id)
1572 prompt = db.execute(select(DbPrompt).where(DbPrompt.id == prompt_id).where(DbPrompt.enabled)).scalar_one_or_none()
1573 if not prompt:
1574 prompt = db.execute(select(DbPrompt).where(DbPrompt.name == prompt_id).where(DbPrompt.enabled)).scalar_one_or_none()
1576 if not prompt:
1577 # Check if an inactive prompt exists
1578 inactive_prompt = db.execute(select(DbPrompt).where(DbPrompt.id == prompt_id).where(not_(DbPrompt.enabled))).scalar_one_or_none()
1579 if not inactive_prompt:
1580 inactive_prompt = db.execute(select(DbPrompt).where(DbPrompt.name == prompt_id).where(not_(DbPrompt.enabled))).scalar_one_or_none()
1582 if inactive_prompt:
1583 raise PromptNotFoundError(f"Prompt '{search_key}' exists but is inactive")
1585 raise PromptNotFoundError(f"Prompt not found: {search_key}")
1587 # ═══════════════════════════════════════════════════════════════════════════
1588 # SECURITY: Check prompt access based on visibility and team membership
1589 # ═══════════════════════════════════════════════════════════════════════════
1590 if not await self._check_prompt_access(db, prompt, user, token_teams):
1591 # Don't reveal prompt existence - return generic "not found"
1592 raise PromptNotFoundError(f"Prompt not found: {search_key}")
1594 # ═══════════════════════════════════════════════════════════════════════════
1595 # SECURITY: Enforce server scoping if server_id is provided
1596 # Prompt must be attached to the specified virtual server
1597 # ═══════════════════════════════════════════════════════════════════════════
1598 if server_id:
1599 server_match = db.execute(
1600 select(server_prompt_association.c.prompt_id).where(
1601 server_prompt_association.c.server_id == server_id,
1602 server_prompt_association.c.prompt_id == prompt.id,
1603 )
1604 ).first()
1605 if not server_match:
1606 raise PromptNotFoundError(f"Prompt not found: {search_key}")
1608 if not arguments:
1609 result = PromptResult(
1610 messages=[
1611 Message(
1612 role=Role.USER,
1613 content=TextContent(type="text", text=prompt.template),
1614 )
1615 ],
1616 description=prompt.description,
1617 )
1618 else:
1619 try:
1620 prompt.validate_arguments(arguments)
1621 rendered = self._render_template(prompt.template, arguments)
1622 messages = self._parse_messages(rendered)
1623 result = PromptResult(messages=messages, description=prompt.description)
1624 except Exception as e:
1625 if span:
1626 span.set_attribute("error", True)
1627 span.set_attribute("error.message", str(e))
1628 raise PromptError(f"Failed to process prompt: {str(e)}")
1630 if has_post_fetch:
1631 post_result, _ = await self._plugin_manager.invoke_hook(
1632 PromptHookType.PROMPT_POST_FETCH,
1633 payload=PromptPosthookPayload(prompt_id=str(prompt.id), result=result),
1634 global_context=global_context,
1635 local_contexts=context_table,
1636 violations_as_exceptions=True,
1637 )
1638 # Use modified payload if provided
1639 result = post_result.modified_payload.result if post_result.modified_payload else result
1641 arguments_supplied = bool(arguments)
1643 audit_trail.log_action(
1644 user_id=user or "anonymous",
1645 action="view_prompt",
1646 resource_type="prompt",
1647 resource_id=str(prompt.id),
1648 resource_name=prompt.name,
1649 team_id=prompt.team_id,
1650 context={
1651 "tenant_id": tenant_id,
1652 "server_id": server_id,
1653 "arguments_provided": arguments_supplied,
1654 "request_id": request_id,
1655 },
1656 db=db,
1657 )
1659 structured_logger.log(
1660 level="INFO",
1661 message="Prompt retrieved successfully",
1662 event_type="prompt_viewed",
1663 component="prompt_service",
1664 user_id=user,
1665 team_id=prompt.team_id,
1666 resource_type="prompt",
1667 resource_id=str(prompt.id),
1668 request_id=request_id,
1669 custom_fields={
1670 "prompt_name": prompt.name,
1671 "arguments_provided": arguments_supplied,
1672 "tenant_id": tenant_id,
1673 "server_id": server_id,
1674 },
1675 db=db,
1676 )
1678 # Set success attributes on span
1679 if span:
1680 span.set_attribute("success", True)
1681 span.set_attribute("duration.ms", (time.monotonic() - start_time) * 1000)
1682 if result and hasattr(result, "messages"): 1682 ↛ 1685line 1682 didn't jump to line 1685 because the condition on line 1682 was always true
1683 span.set_attribute("messages.count", len(result.messages))
1685 success = True
1686 logger.info(f"Retrieved prompt: {prompt.id} successfully")
1687 return result
1689 except Exception as e:
1690 success = False
1691 error_message = str(e)
1692 raise
1693 finally:
1694 # Record metrics only if we found a prompt
1695 if prompt:
1696 try:
1697 # First-Party
1698 from mcpgateway.services.metrics_buffer_service import get_metrics_buffer_service # pylint: disable=import-outside-toplevel
1700 metrics_buffer = get_metrics_buffer_service()
1701 metrics_buffer.record_prompt_metric(
1702 prompt_id=prompt.id,
1703 start_time=start_time,
1704 success=success,
1705 error_message=error_message,
1706 )
1707 except Exception as metrics_error:
1708 logger.warning(f"Failed to record prompt metric: {metrics_error}")
1710 # End database span for observability dashboard
1711 if db_span_id and observability_service and not db_span_ended:
1712 try:
1713 observability_service.end_span(
1714 db=db,
1715 span_id=db_span_id,
1716 status="ok" if success else "error",
1717 status_message=error_message if error_message else None,
1718 )
1719 db_span_ended = True
1720 logger.debug(f"✓ Ended prompt.render span: {db_span_id}")
1721 except Exception as e:
1722 logger.warning(f"Failed to end observability span for prompt rendering: {e}")
1724 async def update_prompt(
1725 self,
1726 db: Session,
1727 prompt_id: Union[int, str],
1728 prompt_update: PromptUpdate,
1729 modified_by: Optional[str] = None,
1730 modified_from_ip: Optional[str] = None,
1731 modified_via: Optional[str] = None,
1732 modified_user_agent: Optional[str] = None,
1733 user_email: Optional[str] = None,
1734 ) -> PromptRead:
1735 """
1736 Update a prompt template.
1738 Args:
1739 db: Database session
1740 prompt_id: ID of prompt to update
1741 prompt_update: Prompt update object
1742 modified_by: Username of the person modifying the prompt
1743 modified_from_ip: IP address where the modification originated
1744 modified_via: Source of modification (ui/api/import)
1745 modified_user_agent: User agent string from the modification request
1746 user_email: Email of user performing update (for ownership check)
1748 Returns:
1749 The updated PromptRead object
1751 Raises:
1752 PromptNotFoundError: If the prompt is not found
1753 PermissionError: If user doesn't own the prompt
1754 IntegrityError: If a database integrity error occurs.
1755 PromptNameConflictError: If a prompt with the same name already exists.
1756 PromptError: For other update errors
1758 Examples:
1759 >>> from mcpgateway.services.prompt_service import PromptService
1760 >>> from unittest.mock import MagicMock
1761 >>> service = PromptService()
1762 >>> db = MagicMock()
1763 >>> db.execute.return_value.scalar_one_or_none.return_value = MagicMock()
1764 >>> db.commit = MagicMock()
1765 >>> db.refresh = MagicMock()
1766 >>> service._notify_prompt_updated = MagicMock()
1767 >>> service.convert_prompt_to_read = MagicMock(return_value={})
1768 >>> import asyncio
1769 >>> try:
1770 ... asyncio.run(service.update_prompt(db, 'prompt_name', MagicMock()))
1771 ... except Exception:
1772 ... pass
1773 """
1774 try:
1775 # Acquire a row-level lock for the prompt being updated to make
1776 # name-checks and the subsequent update atomic in PostgreSQL.
1777 # For SQLite `get_for_update` falls back to a regular get.
1778 prompt = get_for_update(db, DbPrompt, prompt_id)
1779 if not prompt:
1780 raise PromptNotFoundError(f"Prompt not found: {prompt_id}")
1782 visibility = prompt_update.visibility or prompt.visibility
1783 team_id = prompt_update.team_id or prompt.team_id
1784 owner_email = prompt_update.owner_email or prompt.owner_email or user_email
1786 candidate_custom_name = prompt.custom_name
1788 if prompt_update.name is not None:
1789 candidate_custom_name = prompt_update.custom_name or prompt_update.name
1790 elif prompt_update.custom_name is not None:
1791 candidate_custom_name = prompt_update.custom_name
1793 computed_name = self._compute_prompt_name(candidate_custom_name, prompt.gateway)
1794 if computed_name != prompt.name:
1795 if visibility.lower() == "public":
1796 # Lock any conflicting row so concurrent updates cannot race.
1797 existing_prompt = get_for_update(db, DbPrompt, where=and_(DbPrompt.name == computed_name, DbPrompt.visibility == "public", DbPrompt.id != prompt.id))
1798 if existing_prompt:
1799 raise PromptNameConflictError(computed_name, enabled=existing_prompt.enabled, prompt_id=existing_prompt.id, visibility=existing_prompt.visibility)
1800 elif visibility.lower() == "team" and team_id:
1801 existing_prompt = get_for_update(db, DbPrompt, where=and_(DbPrompt.name == computed_name, DbPrompt.visibility == "team", DbPrompt.team_id == team_id, DbPrompt.id != prompt.id))
1802 logger.info(f"Existing prompt check result: {existing_prompt}")
1803 if existing_prompt: 1803 ↛ 1813line 1803 didn't jump to line 1813 because the condition on line 1803 was always true
1804 raise PromptNameConflictError(computed_name, enabled=existing_prompt.enabled, prompt_id=existing_prompt.id, visibility=existing_prompt.visibility)
1805 elif visibility.lower() == "private": 1805 ↛ 1813line 1805 didn't jump to line 1813 because the condition on line 1805 was always true
1806 existing_prompt = get_for_update(
1807 db, DbPrompt, where=and_(DbPrompt.name == computed_name, DbPrompt.visibility == "private", DbPrompt.owner_email == owner_email, DbPrompt.id != prompt.id)
1808 )
1809 if existing_prompt: 1809 ↛ 1813line 1809 didn't jump to line 1813 because the condition on line 1809 was always true
1810 raise PromptNameConflictError(computed_name, enabled=existing_prompt.enabled, prompt_id=existing_prompt.id, visibility=existing_prompt.visibility)
1812 # Check ownership if user_email provided
1813 if user_email:
1814 # First-Party
1815 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel
1817 permission_service = PermissionService(db)
1818 if not await permission_service.check_resource_ownership(user_email, prompt):
1819 raise PermissionError("Only the owner can update this prompt")
1821 if prompt_update.name is not None:
1822 if prompt.gateway_id:
1823 prompt.custom_name = prompt_update.custom_name or prompt_update.name
1824 else:
1825 prompt.original_name = prompt_update.name
1826 if prompt_update.custom_name is None: 1826 ↛ 1828line 1826 didn't jump to line 1828 because the condition on line 1826 was always true
1827 prompt.custom_name = prompt_update.name
1828 if prompt_update.custom_name is not None:
1829 prompt.custom_name = prompt_update.custom_name
1830 if prompt_update.display_name is not None:
1831 prompt.display_name = prompt_update.display_name
1832 if prompt_update.description is not None:
1833 prompt.description = prompt_update.description
1834 if prompt_update.template is not None:
1835 prompt.template = prompt_update.template
1836 self._validate_template(prompt.template)
1837 # Clear template cache to reduce memory growth
1838 _compile_jinja_template.cache_clear()
1839 if prompt_update.arguments is not None:
1840 required_args = self._get_required_arguments(prompt.template)
1841 argument_schema = {
1842 "type": "object",
1843 "properties": {},
1844 "required": list(required_args),
1845 }
1846 for arg in prompt_update.arguments:
1847 schema = {"type": "string"}
1848 if arg.description is not None:
1849 schema["description"] = arg.description
1850 argument_schema["properties"][arg.name] = schema
1851 prompt.argument_schema = argument_schema
1853 if prompt_update.visibility is not None:
1854 prompt.visibility = prompt_update.visibility
1856 # Update tags if provided
1857 if prompt_update.tags is not None:
1858 prompt.tags = prompt_update.tags
1860 # Update metadata fields
1861 prompt.updated_at = datetime.now(timezone.utc)
1862 if modified_by:
1863 prompt.modified_by = modified_by
1864 if modified_from_ip:
1865 prompt.modified_from_ip = modified_from_ip
1866 if modified_via:
1867 prompt.modified_via = modified_via
1868 if modified_user_agent:
1869 prompt.modified_user_agent = modified_user_agent
1870 if hasattr(prompt, "version") and prompt.version is not None:
1871 prompt.version = prompt.version + 1
1872 else:
1873 prompt.version = 1
1875 db.commit()
1876 db.refresh(prompt)
1878 await self._notify_prompt_updated(prompt)
1880 # Structured logging: Audit trail for prompt update
1881 audit_trail.log_action(
1882 user_id=user_email or modified_by or "system",
1883 action="update_prompt",
1884 resource_type="prompt",
1885 resource_id=str(prompt.id),
1886 resource_name=prompt.name,
1887 user_email=user_email,
1888 team_id=prompt.team_id,
1889 client_ip=modified_from_ip,
1890 user_agent=modified_user_agent,
1891 new_values={"name": prompt.name, "version": prompt.version},
1892 context={"modified_via": modified_via},
1893 db=db,
1894 )
1896 structured_logger.log(
1897 level="INFO",
1898 message="Prompt updated successfully",
1899 event_type="prompt_updated",
1900 component="prompt_service",
1901 user_id=modified_by,
1902 user_email=user_email,
1903 team_id=prompt.team_id,
1904 resource_type="prompt",
1905 resource_id=str(prompt.id),
1906 custom_fields={"prompt_name": prompt.name, "version": prompt.version},
1907 db=db,
1908 )
1910 prompt.team = self._get_team_name(db, prompt.team_id)
1912 # Invalidate cache after successful update
1913 cache = _get_registry_cache()
1914 await cache.invalidate_prompts()
1915 # Also invalidate tags cache since prompt tags may have changed
1916 # First-Party
1917 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel
1919 await admin_stats_cache.invalidate_tags()
1921 return self.convert_prompt_to_read(prompt)
1923 except PermissionError as pe:
1924 db.rollback()
1926 structured_logger.log(
1927 level="WARNING",
1928 message="Prompt update failed due to permission error",
1929 event_type="prompt_update_permission_denied",
1930 component="prompt_service",
1931 user_email=user_email,
1932 resource_type="prompt",
1933 resource_id=str(prompt_id),
1934 error=pe,
1935 db=db,
1936 )
1937 raise
1938 except IntegrityError as ie:
1939 db.rollback()
1940 logger.error(f"IntegrityErrors in group: {ie}")
1942 structured_logger.log(
1943 level="ERROR",
1944 message="Prompt update failed due to database integrity error",
1945 event_type="prompt_update_failed",
1946 component="prompt_service",
1947 user_email=user_email,
1948 resource_type="prompt",
1949 resource_id=str(prompt_id),
1950 error=ie,
1951 db=db,
1952 )
1953 raise ie
1954 except PromptNotFoundError as e:
1955 db.rollback()
1956 logger.error(f"Prompt not found: {e}")
1958 structured_logger.log(
1959 level="ERROR",
1960 message="Prompt update failed - prompt not found",
1961 event_type="prompt_not_found",
1962 component="prompt_service",
1963 user_email=user_email,
1964 resource_type="prompt",
1965 resource_id=str(prompt_id),
1966 error=e,
1967 db=db,
1968 )
1969 raise e
1970 except PromptNameConflictError as pnce:
1971 db.rollback()
1972 logger.error(f"Prompt name conflict: {pnce}")
1974 structured_logger.log(
1975 level="WARNING",
1976 message="Prompt update failed due to name conflict",
1977 event_type="prompt_name_conflict",
1978 component="prompt_service",
1979 user_email=user_email,
1980 resource_type="prompt",
1981 resource_id=str(prompt_id),
1982 error=pnce,
1983 db=db,
1984 )
1985 raise pnce
1986 except Exception as e:
1987 db.rollback()
1989 structured_logger.log(
1990 level="ERROR",
1991 message="Prompt update failed",
1992 event_type="prompt_update_failed",
1993 component="prompt_service",
1994 user_email=user_email,
1995 resource_type="prompt",
1996 resource_id=str(prompt_id),
1997 error=e,
1998 db=db,
1999 )
2000 raise PromptError(f"Failed to update prompt: {str(e)}")
2002 async def set_prompt_state(self, db: Session, prompt_id: int, activate: bool, user_email: Optional[str] = None, skip_cache_invalidation: bool = False) -> PromptRead:
2003 """
2004 Set the activation status of a prompt.
2006 Args:
2007 db: Database session
2008 prompt_id: Prompt ID
2009 activate: True to activate, False to deactivate
2010 user_email: Optional[str] The email of the user to check if the user has permission to modify.
2011 skip_cache_invalidation: If True, skip cache invalidation (used for batch operations).
2013 Returns:
2014 The updated PromptRead object
2016 Raises:
2017 PromptNotFoundError: If the prompt is not found.
2018 PromptLockConflictError: If the prompt is locked by another transaction.
2019 PromptError: For other errors.
2020 PermissionError: If user doesn't own the prompt.
2022 Examples:
2023 >>> from mcpgateway.services.prompt_service import PromptService
2024 >>> from unittest.mock import MagicMock
2025 >>> service = PromptService()
2026 >>> db = MagicMock()
2027 >>> prompt = MagicMock()
2028 >>> db.get.return_value = prompt
2029 >>> db.commit = MagicMock()
2030 >>> db.refresh = MagicMock()
2031 >>> service._notify_prompt_activated = MagicMock()
2032 >>> service._notify_prompt_deactivated = MagicMock()
2033 >>> service.convert_prompt_to_read = MagicMock(return_value={})
2034 >>> import asyncio
2035 >>> try:
2036 ... asyncio.run(service.set_prompt_state(db, 1, True))
2037 ... except Exception:
2038 ... pass
2039 """
2040 try:
2041 # Use nowait=True to fail fast if row is locked, preventing lock contention under high load
2042 try:
2043 prompt = get_for_update(db, DbPrompt, prompt_id, nowait=True)
2044 except OperationalError as lock_err:
2045 # Row is locked by another transaction - fail fast with 409
2046 db.rollback()
2047 raise PromptLockConflictError(f"Prompt {prompt_id} is currently being modified by another request") from lock_err
2048 if not prompt:
2049 raise PromptNotFoundError(f"Prompt not found: {prompt_id}")
2051 if user_email:
2052 # First-Party
2053 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel
2055 permission_service = PermissionService(db)
2056 if not await permission_service.check_resource_ownership(user_email, prompt):
2057 raise PermissionError("Only the owner can activate the Prompt" if activate else "Only the owner can deactivate the Prompt")
2059 if prompt.enabled != activate:
2060 prompt.enabled = activate
2061 prompt.updated_at = datetime.now(timezone.utc)
2062 db.commit()
2063 db.refresh(prompt)
2065 # Invalidate cache after status change (skip for batch operations)
2066 if not skip_cache_invalidation:
2067 cache = _get_registry_cache()
2068 await cache.invalidate_prompts()
2070 if activate:
2071 await self._notify_prompt_activated(prompt)
2072 else:
2073 await self._notify_prompt_deactivated(prompt)
2074 logger.info(f"Prompt {prompt.name} {'activated' if activate else 'deactivated'}")
2076 # Structured logging: Audit trail for prompt state change
2077 audit_trail.log_action(
2078 user_id=user_email or "system",
2079 action="set_prompt_state",
2080 resource_type="prompt",
2081 resource_id=str(prompt.id),
2082 resource_name=prompt.name,
2083 user_email=user_email,
2084 team_id=prompt.team_id,
2085 new_values={"enabled": prompt.enabled},
2086 context={"action": "activate" if activate else "deactivate"},
2087 db=db,
2088 )
2090 structured_logger.log(
2091 level="INFO",
2092 message=f"Prompt {'activated' if activate else 'deactivated'} successfully",
2093 event_type="prompt_state_changed",
2094 component="prompt_service",
2095 user_email=user_email,
2096 team_id=prompt.team_id,
2097 resource_type="prompt",
2098 resource_id=str(prompt.id),
2099 custom_fields={"prompt_name": prompt.name, "enabled": prompt.enabled},
2100 db=db,
2101 )
2103 prompt.team = self._get_team_name(db, prompt.team_id)
2104 return self.convert_prompt_to_read(prompt)
2105 except PermissionError as e:
2106 structured_logger.log(
2107 level="WARNING",
2108 message="Prompt state change failed due to permission error",
2109 event_type="prompt_state_change_permission_denied",
2110 component="prompt_service",
2111 user_email=user_email,
2112 resource_type="prompt",
2113 resource_id=str(prompt_id),
2114 error=e,
2115 db=db,
2116 )
2117 raise e
2118 except PromptLockConflictError:
2119 # Re-raise lock conflicts without wrapping - allows 409 response
2120 raise
2121 except PromptNotFoundError:
2122 # Re-raise not found without wrapping - allows 404 response
2123 raise
2124 except Exception as e:
2125 db.rollback()
2127 structured_logger.log(
2128 level="ERROR",
2129 message="Prompt state change failed",
2130 event_type="prompt_state_change_failed",
2131 component="prompt_service",
2132 user_email=user_email,
2133 resource_type="prompt",
2134 resource_id=str(prompt_id),
2135 error=e,
2136 db=db,
2137 )
2138 raise PromptError(f"Failed to set prompt state: {str(e)}")
2140 # Get prompt details for admin ui
2142 async def get_prompt_details(self, db: Session, prompt_id: Union[int, str], include_inactive: bool = False) -> Dict[str, Any]: # pylint: disable=unused-argument
2143 """
2144 Get prompt details by ID.
2146 Args:
2147 db: Database session
2148 prompt_id: ID of prompt
2149 include_inactive: Whether to include inactive prompts
2151 Returns:
2152 Dictionary of prompt details
2154 Raises:
2155 PromptNotFoundError: If the prompt is not found
2157 Examples:
2158 >>> from mcpgateway.services.prompt_service import PromptService
2159 >>> from unittest.mock import MagicMock
2160 >>> service = PromptService()
2161 >>> db = MagicMock()
2162 >>> prompt_dict = {'id': '1', 'name': 'test', 'description': 'desc', 'template': 'tpl', 'arguments': [], 'createdAt': '2023-01-01T00:00:00', 'updatedAt': '2023-01-01T00:00:00', 'isActive': True, 'metrics': {}}
2163 >>> service.convert_prompt_to_read = MagicMock(return_value=prompt_dict)
2164 >>> db.execute.return_value.scalar_one_or_none.return_value = MagicMock()
2165 >>> import asyncio
2166 >>> result = asyncio.run(service.get_prompt_details(db, 'prompt_name'))
2167 >>> result == prompt_dict
2168 True
2169 """
2170 prompt = db.get(DbPrompt, prompt_id)
2171 if not prompt:
2172 raise PromptNotFoundError(f"Prompt not found: {prompt_id}")
2173 # Return the fully converted prompt including metrics
2174 prompt.team = self._get_team_name(db, prompt.team_id)
2175 prompt_data = self.convert_prompt_to_read(prompt)
2177 audit_trail.log_action(
2178 user_id="system",
2179 action="view_prompt_details",
2180 resource_type="prompt",
2181 resource_id=str(prompt.id),
2182 resource_name=prompt.name,
2183 team_id=prompt.team_id,
2184 context={"include_inactive": include_inactive},
2185 db=db,
2186 )
2188 structured_logger.log(
2189 level="INFO",
2190 message="Prompt details retrieved",
2191 event_type="prompt_details_viewed",
2192 component="prompt_service",
2193 resource_type="prompt",
2194 resource_id=str(prompt.id),
2195 team_id=prompt.team_id,
2196 custom_fields={
2197 "prompt_name": prompt.name,
2198 "include_inactive": include_inactive,
2199 },
2200 db=db,
2201 )
2203 return prompt_data
2205 async def delete_prompt(self, db: Session, prompt_id: Union[int, str], user_email: Optional[str] = None, purge_metrics: bool = False) -> None:
2206 """
2207 Delete a prompt template by its ID.
2209 Args:
2210 db (Session): Database session.
2211 prompt_id (str): ID of the prompt to delete.
2212 user_email (Optional[str]): Email of user performing delete (for ownership check).
2213 purge_metrics (bool): If True, delete raw + rollup metrics for this prompt.
2215 Raises:
2216 PromptNotFoundError: If the prompt is not found.
2217 PermissionError: If user doesn't own the prompt.
2218 PromptError: For other deletion errors.
2219 Exception: For unexpected errors.
2221 Examples:
2222 >>> from mcpgateway.services.prompt_service import PromptService
2223 >>> from unittest.mock import MagicMock
2224 >>> service = PromptService()
2225 >>> db = MagicMock()
2226 >>> prompt = MagicMock()
2227 >>> db.get.return_value = prompt
2228 >>> db.delete = MagicMock()
2229 >>> db.commit = MagicMock()
2230 >>> service._notify_prompt_deleted = MagicMock()
2231 >>> import asyncio
2232 >>> try:
2233 ... asyncio.run(service.delete_prompt(db, '123'))
2234 ... except Exception:
2235 ... pass
2236 """
2237 try:
2238 prompt = db.get(DbPrompt, prompt_id)
2239 if not prompt:
2240 raise PromptNotFoundError(f"Prompt not found: {prompt_id}")
2242 # Check ownership if user_email provided
2243 if user_email:
2244 # First-Party
2245 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel
2247 permission_service = PermissionService(db)
2248 if not await permission_service.check_resource_ownership(user_email, prompt):
2249 raise PermissionError("Only the owner can delete this prompt")
2251 prompt_info = {"id": prompt.id, "name": prompt.name}
2252 prompt_name = prompt.name
2253 prompt_team_id = prompt.team_id
2255 if purge_metrics:
2256 with pause_rollup_during_purge(reason=f"purge_prompt:{prompt_id}"):
2257 delete_metrics_in_batches(db, PromptMetric, PromptMetric.prompt_id, prompt_id)
2258 delete_metrics_in_batches(db, PromptMetricsHourly, PromptMetricsHourly.prompt_id, prompt_id)
2260 db.delete(prompt)
2261 db.commit()
2262 await self._notify_prompt_deleted(prompt_info)
2263 logger.info(f"Deleted prompt: {prompt_info['name']}")
2265 # Structured logging: Audit trail for prompt deletion
2266 audit_trail.log_action(
2267 user_id=user_email or "system",
2268 action="delete_prompt",
2269 resource_type="prompt",
2270 resource_id=str(prompt_info["id"]),
2271 resource_name=prompt_name,
2272 user_email=user_email,
2273 team_id=prompt_team_id,
2274 old_values={"name": prompt_name},
2275 db=db,
2276 )
2278 # Structured logging: Log successful prompt deletion
2279 structured_logger.log(
2280 level="INFO",
2281 message="Prompt deleted successfully",
2282 event_type="prompt_deleted",
2283 component="prompt_service",
2284 user_email=user_email,
2285 team_id=prompt_team_id,
2286 resource_type="prompt",
2287 resource_id=str(prompt_info["id"]),
2288 custom_fields={
2289 "prompt_name": prompt_name,
2290 "purge_metrics": purge_metrics,
2291 },
2292 db=db,
2293 )
2295 # Invalidate cache after successful deletion
2296 cache = _get_registry_cache()
2297 await cache.invalidate_prompts()
2298 # Also invalidate tags cache since prompt tags may have changed
2299 # First-Party
2300 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel
2302 await admin_stats_cache.invalidate_tags()
2303 except PermissionError as pe:
2304 db.rollback()
2306 # Structured logging: Log permission error
2307 structured_logger.log(
2308 level="WARNING",
2309 message="Prompt deletion failed due to permission error",
2310 event_type="prompt_delete_permission_denied",
2311 component="prompt_service",
2312 user_email=user_email,
2313 resource_type="prompt",
2314 resource_id=str(prompt_id),
2315 error=pe,
2316 db=db,
2317 )
2318 raise
2319 except Exception as e:
2320 db.rollback()
2321 if isinstance(e, PromptNotFoundError):
2322 # Structured logging: Log not found error
2323 structured_logger.log(
2324 level="ERROR",
2325 message="Prompt deletion failed - prompt not found",
2326 event_type="prompt_not_found",
2327 component="prompt_service",
2328 user_email=user_email,
2329 resource_type="prompt",
2330 resource_id=str(prompt_id),
2331 error=e,
2332 db=db,
2333 )
2334 raise e
2336 # Structured logging: Log generic prompt deletion failure
2337 structured_logger.log(
2338 level="ERROR",
2339 message="Prompt deletion failed",
2340 event_type="prompt_deletion_failed",
2341 component="prompt_service",
2342 user_email=user_email,
2343 resource_type="prompt",
2344 resource_id=str(prompt_id),
2345 error=e,
2346 db=db,
2347 )
2348 raise PromptError(f"Failed to delete prompt: {str(e)}")
2350 async def subscribe_events(self) -> AsyncGenerator[Dict[str, Any], None]:
2351 """Subscribe to Prompt events via the EventService.
2353 Yields:
2354 Prompt event messages.
2355 """
2356 async for event in self._event_service.subscribe_events():
2357 yield event
2359 def _validate_template(self, template: str) -> None:
2360 """Validate template syntax.
2362 Args:
2363 template: Template to validate
2365 Raises:
2366 PromptValidationError: If template is invalid
2368 Examples:
2369 >>> from mcpgateway.services.prompt_service import PromptService
2370 >>> service = PromptService()
2371 >>> service._validate_template("Hello {{ name }}") # Valid template
2372 >>> try:
2373 ... service._validate_template("Hello {{ invalid") # Invalid template
2374 ... except Exception as e:
2375 ... "Invalid template syntax" in str(e)
2376 True
2377 """
2378 try:
2379 self._jinja_env.parse(template)
2380 except Exception as e:
2381 raise PromptValidationError(f"Invalid template syntax: {str(e)}")
2383 def _get_required_arguments(self, template: str) -> Set[str]:
2384 """Extract required arguments from template.
2386 Args:
2387 template: Template to analyze
2389 Returns:
2390 Set of required argument names
2392 Examples:
2393 >>> from mcpgateway.services.prompt_service import PromptService
2394 >>> service = PromptService()
2395 >>> args = service._get_required_arguments("Hello {{ name }} from {{ place }}")
2396 >>> sorted(args)
2397 ['name', 'place']
2398 >>> service._get_required_arguments("No variables") == set()
2399 True
2400 """
2401 ast = self._jinja_env.parse(template)
2402 variables = meta.find_undeclared_variables(ast)
2403 formatter = Formatter()
2404 format_vars = {field_name for _, field_name, _, _ in formatter.parse(template) if field_name is not None}
2405 return variables.union(format_vars)
2407 def _render_template(self, template: str, arguments: Dict[str, str]) -> str:
2408 """Render template with arguments using cached compiled templates.
2410 Args:
2411 template: Template to render
2412 arguments: Arguments for rendering
2414 Returns:
2415 Rendered template text
2417 Raises:
2418 PromptError: If rendering fails
2420 Examples:
2421 >>> from mcpgateway.services.prompt_service import PromptService
2422 >>> service = PromptService()
2423 >>> result = service._render_template("Hello {{ name }}", {"name": "World"})
2424 >>> result
2425 'Hello World'
2426 >>> service._render_template("No variables", {})
2427 'No variables'
2428 """
2429 try:
2430 jinja_template = _compile_jinja_template(template)
2431 return jinja_template.render(**arguments)
2432 except Exception:
2433 try:
2434 return template.format(**arguments)
2435 except Exception as e:
2436 raise PromptError(f"Failed to render template: {str(e)}")
2438 def _parse_messages(self, text: str) -> List[Message]:
2439 """Parse rendered text into messages.
2441 Args:
2442 text: Text to parse
2444 Returns:
2445 List of parsed messages
2447 Examples:
2448 >>> from mcpgateway.services.prompt_service import PromptService
2449 >>> service = PromptService()
2450 >>> messages = service._parse_messages("Simple text")
2451 >>> len(messages)
2452 1
2453 >>> messages[0].role.value
2454 'user'
2455 >>> messages = service._parse_messages("# User:\\nHello\\n# Assistant:\\nHi there")
2456 >>> len(messages)
2457 2
2458 """
2459 messages = []
2460 current_role = Role.USER
2461 current_text = []
2462 for line in text.split("\n"):
2463 if line.startswith("# Assistant:"):
2464 if current_text:
2465 messages.append(
2466 Message(
2467 role=current_role,
2468 content=TextContent(type="text", text="\n".join(current_text).strip()),
2469 )
2470 )
2471 current_role = Role.ASSISTANT
2472 current_text = []
2473 elif line.startswith("# User:"):
2474 if current_text:
2475 messages.append(
2476 Message(
2477 role=current_role,
2478 content=TextContent(type="text", text="\n".join(current_text).strip()),
2479 )
2480 )
2481 current_role = Role.USER
2482 current_text = []
2483 else:
2484 current_text.append(line)
2485 if current_text:
2486 messages.append(
2487 Message(
2488 role=current_role,
2489 content=TextContent(type="text", text="\n".join(current_text).strip()),
2490 )
2491 )
2492 return messages
2494 async def _notify_prompt_added(self, prompt: DbPrompt) -> None:
2495 """
2496 Notify subscribers of prompt addition.
2498 Args:
2499 prompt: Prompt to add
2500 """
2501 event = {
2502 "type": "prompt_added",
2503 "data": {
2504 "id": prompt.id,
2505 "name": prompt.name,
2506 "description": prompt.description,
2507 "enabled": prompt.enabled,
2508 },
2509 "timestamp": datetime.now(timezone.utc).isoformat(),
2510 }
2511 await self._publish_event(event)
2513 async def _notify_prompt_updated(self, prompt: DbPrompt) -> None:
2514 """
2515 Notify subscribers of prompt update.
2517 Args:
2518 prompt: Prompt to update
2519 """
2520 event = {
2521 "type": "prompt_updated",
2522 "data": {
2523 "id": prompt.id,
2524 "name": prompt.name,
2525 "description": prompt.description,
2526 "enabled": prompt.enabled,
2527 },
2528 "timestamp": datetime.now(timezone.utc).isoformat(),
2529 }
2530 await self._publish_event(event)
2532 async def _notify_prompt_activated(self, prompt: DbPrompt) -> None:
2533 """
2534 Notify subscribers of prompt activation.
2536 Args:
2537 prompt: Prompt to activate
2538 """
2539 event = {
2540 "type": "prompt_activated",
2541 "data": {"id": prompt.id, "name": prompt.name, "enabled": True},
2542 "timestamp": datetime.now(timezone.utc).isoformat(),
2543 }
2544 await self._publish_event(event)
2546 async def _notify_prompt_deactivated(self, prompt: DbPrompt) -> None:
2547 """
2548 Notify subscribers of prompt deactivation.
2550 Args:
2551 prompt: Prompt to deactivate
2552 """
2553 event = {
2554 "type": "prompt_deactivated",
2555 "data": {"id": prompt.id, "name": prompt.name, "enabled": False},
2556 "timestamp": datetime.now(timezone.utc).isoformat(),
2557 }
2558 await self._publish_event(event)
2560 async def _notify_prompt_deleted(self, prompt_info: Dict[str, Any]) -> None:
2561 """
2562 Notify subscribers of prompt deletion.
2564 Args:
2565 prompt_info: Dict on prompt to notify as deleted
2566 """
2567 event = {
2568 "type": "prompt_deleted",
2569 "data": prompt_info,
2570 "timestamp": datetime.now(timezone.utc).isoformat(),
2571 }
2572 await self._publish_event(event)
2574 async def _notify_prompt_removed(self, prompt: DbPrompt) -> None:
2575 """
2576 Notify subscribers of prompt removal (deactivation).
2578 Args:
2579 prompt: Prompt to remove
2580 """
2581 event = {
2582 "type": "prompt_removed",
2583 "data": {"id": prompt.id, "name": prompt.name, "enabled": False},
2584 "timestamp": datetime.now(timezone.utc).isoformat(),
2585 }
2586 await self._publish_event(event)
2588 async def _publish_event(self, event: Dict[str, Any]) -> None:
2589 """
2590 Publish event to all subscribers via the EventService.
2592 Args:
2593 event: Event to publish
2594 """
2595 await self._event_service.publish_event(event)
2597 # --- Metrics ---
2598 async def aggregate_metrics(self, db: Session) -> Dict[str, Any]:
2599 """
2600 Aggregate metrics for all prompt invocations across all prompts.
2602 Combines recent raw metrics (within retention period) with historical
2603 hourly rollups for complete historical coverage. Uses in-memory caching
2604 (10s TTL) to reduce database load under high request rates.
2606 Args:
2607 db: Database session
2609 Returns:
2610 Dict[str, Any]: Aggregated prompt metrics from raw + hourly rollups.
2612 Examples:
2613 >>> from mcpgateway.services.prompt_service import PromptService
2614 >>> service = PromptService()
2615 >>> # Method exists and is callable
2616 >>> callable(service.aggregate_metrics)
2617 True
2618 """
2619 # Check cache first (if enabled)
2620 # First-Party
2621 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel
2623 if is_cache_enabled():
2624 cached = metrics_cache.get("prompts")
2625 if cached is not None:
2626 return cached
2628 # Use combined raw + rollup query for full historical coverage
2629 # First-Party
2630 from mcpgateway.services.metrics_query_service import aggregate_metrics_combined # pylint: disable=import-outside-toplevel
2632 result = aggregate_metrics_combined(db, "prompt")
2633 metrics = result.to_dict()
2635 # Cache the result (if enabled)
2636 if is_cache_enabled():
2637 metrics_cache.set("prompts", metrics)
2639 return metrics
2641 async def reset_metrics(self, db: Session) -> None:
2642 """
2643 Reset all prompt metrics by deleting raw and hourly rollup records.
2645 Args:
2646 db: Database session
2648 Examples:
2649 >>> from mcpgateway.services.prompt_service import PromptService
2650 >>> from unittest.mock import MagicMock
2651 >>> service = PromptService()
2652 >>> db = MagicMock()
2653 >>> db.execute = MagicMock()
2654 >>> db.commit = MagicMock()
2655 >>> import asyncio
2656 >>> asyncio.run(service.reset_metrics(db))
2657 """
2659 db.execute(delete(PromptMetric))
2660 db.execute(delete(PromptMetricsHourly))
2661 db.commit()
2663 # Invalidate metrics cache
2664 # First-Party
2665 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel
2667 metrics_cache.invalidate("prompts")
2668 metrics_cache.invalidate_prefix("top_prompts:")
2671# Lazy singleton - created on first access, not at module import time.
2672# This avoids instantiation when only exception classes are imported.
2673_prompt_service_instance = None # pylint: disable=invalid-name
2676def __getattr__(name: str):
2677 """Module-level __getattr__ for lazy singleton creation.
2679 Args:
2680 name: The attribute name being accessed.
2682 Returns:
2683 The prompt_service singleton instance if name is "prompt_service".
2685 Raises:
2686 AttributeError: If the attribute name is not "prompt_service".
2687 """
2688 global _prompt_service_instance # pylint: disable=global-statement
2689 if name == "prompt_service":
2690 if _prompt_service_instance is None:
2691 _prompt_service_instance = PromptService()
2692 return _prompt_service_instance
2693 raise AttributeError(f"module {__name__!r} has no attribute {name!r}")