Coverage for mcpgateway / services / prompt_service.py: 99%
841 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/prompt_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Mihai Criveti
7Prompt Service Implementation.
8This module implements prompt template management according to the MCP specification.
9It handles:
10- Prompt template registration and retrieval
11- Prompt argument validation
12- Template rendering with arguments
13- Resource embedding in prompts
14- Active/inactive prompt management
15"""
17# Standard
18import binascii
19from datetime import datetime, timezone
20from functools import lru_cache
21from string import Formatter
22import time
23from typing import Any, AsyncGenerator, Dict, List, Optional, Set, Union
24import uuid
26# Third-Party
27from jinja2 import Environment, meta, select_autoescape, Template
28from pydantic import ValidationError
29from sqlalchemy import and_, delete, desc, not_, or_, select
30from sqlalchemy.exc import IntegrityError, OperationalError
31from sqlalchemy.orm import joinedload, Session
33# First-Party
34from mcpgateway.common.models import Message, PromptResult, Role, TextContent
35from mcpgateway.config import settings
36from mcpgateway.db import EmailTeam
37from mcpgateway.db import Gateway as DbGateway
38from mcpgateway.db import get_for_update
39from mcpgateway.db import Prompt as DbPrompt
40from mcpgateway.db import PromptMetric, PromptMetricsHourly, server_prompt_association
41from mcpgateway.observability import create_span
42from mcpgateway.plugins.framework import get_plugin_manager, GlobalContext, PluginContextTable, PluginManager, PromptHookType, PromptPosthookPayload, PromptPrehookPayload
43from mcpgateway.schemas import PromptCreate, PromptMetrics, PromptRead, PromptUpdate, TopPerformer
44from mcpgateway.services.audit_trail_service import get_audit_trail_service
45from mcpgateway.services.base_service import BaseService
46from mcpgateway.services.event_service import EventService
47from mcpgateway.services.logging_service import LoggingService
48from mcpgateway.services.metrics_buffer_service import get_metrics_buffer_service
49from mcpgateway.services.metrics_cleanup_service import delete_metrics_in_batches, pause_rollup_during_purge
50from mcpgateway.services.observability_service import current_trace_id, ObservabilityService
51from mcpgateway.services.structured_logger import get_structured_logger
52from mcpgateway.services.team_management_service import TeamManagementService
53from mcpgateway.utils.create_slug import slugify
54from mcpgateway.utils.metrics_common import build_top_performers
55from mcpgateway.utils.pagination import unified_paginate
56from mcpgateway.utils.sqlalchemy_modifier import json_contains_tag_expr
58# Cache import (lazy to avoid circular dependencies)
59_REGISTRY_CACHE = None
61# Module-level Jinja environment singleton for template caching
62_JINJA_ENV: Optional[Environment] = None
65def _get_jinja_env() -> Environment:
66 """Get or create the module-level Jinja environment singleton.
68 Returns:
69 Jinja2 Environment with autoescape and trim settings.
70 """
71 global _JINJA_ENV # pylint: disable=global-statement
72 if _JINJA_ENV is None:
73 _JINJA_ENV = Environment(
74 autoescape=select_autoescape(["html", "xml"]),
75 trim_blocks=True,
76 lstrip_blocks=True,
77 )
78 return _JINJA_ENV
81@lru_cache(maxsize=256)
82def _compile_jinja_template(template: str) -> Template:
83 """Cache compiled Jinja template by template string.
85 Args:
86 template: The template string to compile.
88 Returns:
89 Compiled Jinja Template object.
90 """
91 return _get_jinja_env().from_string(template)
94def _get_registry_cache():
95 """Get registry cache singleton lazily.
97 Returns:
98 RegistryCache instance.
99 """
100 global _REGISTRY_CACHE # pylint: disable=global-statement
101 if _REGISTRY_CACHE is None:
102 # First-Party
103 from mcpgateway.cache.registry_cache import registry_cache # pylint: disable=import-outside-toplevel
105 _REGISTRY_CACHE = registry_cache
106 return _REGISTRY_CACHE
109# Initialize logging service first
110logging_service = LoggingService()
111logger = logging_service.get_logger(__name__)
113# Initialize structured logger, audit trail, and metrics buffer for prompt operations
114structured_logger = get_structured_logger("prompt_service")
115audit_trail = get_audit_trail_service()
116metrics_buffer = get_metrics_buffer_service()
119class PromptError(Exception):
120 """Base class for prompt-related errors."""
123class PromptNotFoundError(PromptError):
124 """Raised when a requested prompt is not found."""
127class PromptNameConflictError(PromptError):
128 """Raised when a prompt name conflicts with existing (active or inactive) prompt."""
130 def __init__(self, name: str, enabled: bool = True, prompt_id: Optional[int] = None, visibility: str = "public") -> None:
131 """Initialize the error with prompt information.
133 Args:
134 name: The conflicting prompt name
135 enabled: Whether the existing prompt is enabled
136 prompt_id: ID of the existing prompt if available
137 visibility: Prompt visibility level (private, team, public).
139 Examples:
140 >>> from mcpgateway.services.prompt_service import PromptNameConflictError
141 >>> error = PromptNameConflictError("test_prompt")
142 >>> error.name
143 'test_prompt'
144 >>> error.enabled
145 True
146 >>> error.prompt_id is None
147 True
148 >>> error = PromptNameConflictError("inactive_prompt", False, 123)
149 >>> error.enabled
150 False
151 >>> error.prompt_id
152 123
153 """
154 self.name = name
155 self.enabled = enabled
156 self.prompt_id = prompt_id
157 message = f"{visibility.capitalize()} Prompt already exists with name: {name}"
158 if not enabled:
159 message += f" (currently inactive, ID: {prompt_id})"
160 super().__init__(message)
163class PromptValidationError(PromptError):
164 """Raised when prompt validation fails."""
167class PromptLockConflictError(PromptError):
168 """Raised when a prompt row is locked by another transaction.
170 Raises:
171 PromptLockConflictError: When attempting to modify a prompt that is
172 currently locked by another concurrent request.
173 """
176class PromptService(BaseService):
177 """Service for managing prompt templates.
179 Handles:
180 - Template registration and retrieval
181 - Argument validation
182 - Template rendering
183 - Resource embedding
184 - Active/inactive status management
185 """
187 _visibility_model_cls = DbPrompt
189 def __init__(self) -> None:
190 """
191 Initialize the prompt service.
193 Sets up the Jinja2 environment for rendering prompt templates.
194 Although these templates are rendered as JSON for the API, if the output is ever
195 embedded into an HTML page, unescaped content could be exploited for cross-site scripting (XSS) attacks.
196 Enabling autoescaping for 'html' and 'xml' templates via select_autoescape helps mitigate this risk.
198 Examples:
199 >>> from mcpgateway.services.prompt_service import PromptService
200 >>> service = PromptService()
201 >>> isinstance(service._event_service, EventService)
202 True
203 >>> service._jinja_env is not None
204 True
205 """
206 self._event_service = EventService(channel_name="mcpgateway:prompt_events")
207 # Use the module-level singleton for template caching
208 self._jinja_env = _get_jinja_env()
209 self._plugin_manager: PluginManager | None = get_plugin_manager()
211 async def initialize(self) -> None:
212 """Initialize the service."""
213 logger.info("Initializing prompt service")
214 await self._event_service.initialize()
216 async def shutdown(self) -> None:
217 """Shutdown the service.
219 Examples:
220 >>> from mcpgateway.services.prompt_service import PromptService
221 >>> from unittest.mock import AsyncMock
222 >>> import asyncio
223 >>> service = PromptService()
224 >>> service._event_service = AsyncMock()
225 >>> asyncio.run(service.shutdown())
226 >>> # Verify event service shutdown was called
227 >>> service._event_service.shutdown.assert_awaited_once()
228 """
229 await self._event_service.shutdown()
230 logger.info("Prompt service shutdown complete")
232 async def get_top_prompts(self, db: Session, limit: Optional[int] = 5, include_deleted: bool = False) -> List[TopPerformer]:
233 """Retrieve the top-performing prompts based on execution count.
235 Queries the database to get prompts with their metrics, ordered by the number of executions
236 in descending order. Combines recent raw metrics with historical hourly rollups for complete
237 historical coverage. Returns a list of TopPerformer objects containing prompt details and
238 performance metrics. Results are cached for performance.
240 Args:
241 db (Session): Database session for querying prompt metrics.
242 limit (Optional[int]): Maximum number of prompts to return. Defaults to 5.
243 include_deleted (bool): Whether to include deleted prompts from rollups.
245 Returns:
246 List[TopPerformer]: A list of TopPerformer objects, each containing:
247 - id: Prompt ID.
248 - name: Prompt name.
249 - execution_count: Total number of executions.
250 - avg_response_time: Average response time in seconds, or None if no metrics.
251 - success_rate: Success rate percentage, or None if no metrics.
252 - last_execution: Timestamp of the last execution, or None if no metrics.
253 """
254 # Check cache first (if enabled)
255 # First-Party
256 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel
258 effective_limit = limit or 5
259 cache_key = f"top_prompts:{effective_limit}:include_deleted={include_deleted}"
261 if is_cache_enabled():
262 cached = metrics_cache.get(cache_key)
263 if cached is not None:
264 return cached
266 # Use combined query that includes both raw metrics and rollup data
267 # First-Party
268 from mcpgateway.services.metrics_query_service import get_top_performers_combined # pylint: disable=import-outside-toplevel
270 results = get_top_performers_combined(
271 db=db,
272 metric_type="prompt",
273 entity_model=DbPrompt,
274 limit=effective_limit,
275 include_deleted=include_deleted,
276 )
277 top_performers = build_top_performers(results)
279 # Cache the result (if enabled)
280 if is_cache_enabled():
281 metrics_cache.set(cache_key, top_performers)
283 return top_performers
285 def convert_prompt_to_read(self, db_prompt: DbPrompt, include_metrics: bool = False) -> PromptRead:
286 """
287 Convert a DbPrompt instance to a PromptRead Pydantic model,
288 optionally including aggregated metrics computed from the associated PromptMetric records.
290 Args:
291 db_prompt: Db prompt to convert
292 include_metrics: Whether to include metrics in the result. Defaults to False.
293 Set to False for list operations to avoid N+1 query issues.
295 Returns:
296 PromptRead: Pydantic model instance
297 """
298 arg_schema = db_prompt.argument_schema or {}
299 properties = arg_schema.get("properties", {})
300 required_list = arg_schema.get("required", [])
301 arguments_list = []
302 for arg_name, prop in properties.items():
303 arguments_list.append(
304 {
305 "name": arg_name,
306 "description": prop.get("description") or "",
307 "required": arg_name in required_list,
308 }
309 )
311 # Compute aggregated metrics only if requested (avoids N+1 queries in list operations)
312 if include_metrics:
313 total = len(db_prompt.metrics) if hasattr(db_prompt, "metrics") and db_prompt.metrics is not None else 0
314 successful = sum(1 for m in db_prompt.metrics if m.is_success) if total > 0 else 0
315 failed = sum(1 for m in db_prompt.metrics if not m.is_success) if total > 0 else 0
316 failure_rate = failed / total if total > 0 else 0.0
317 min_rt = min((m.response_time for m in db_prompt.metrics), default=None) if total > 0 else None
318 max_rt = max((m.response_time for m in db_prompt.metrics), default=None) if total > 0 else None
319 avg_rt = (sum(m.response_time for m in db_prompt.metrics) / total) if total > 0 else None
320 last_time = max((m.timestamp for m in db_prompt.metrics), default=None) if total > 0 else None
322 metrics_dict = {
323 "totalExecutions": total,
324 "successfulExecutions": successful,
325 "failedExecutions": failed,
326 "failureRate": failure_rate,
327 "minResponseTime": min_rt,
328 "maxResponseTime": max_rt,
329 "avgResponseTime": avg_rt,
330 "lastExecutionTime": last_time,
331 }
332 else:
333 metrics_dict = None
335 original_name = getattr(db_prompt, "original_name", None) or db_prompt.name
336 custom_name = getattr(db_prompt, "custom_name", None) or original_name
337 custom_name_slug = getattr(db_prompt, "custom_name_slug", None) or slugify(custom_name)
338 display_name = getattr(db_prompt, "display_name", None) or custom_name
340 prompt_dict = {
341 "id": db_prompt.id,
342 "name": db_prompt.name,
343 "original_name": original_name,
344 "custom_name": custom_name,
345 "custom_name_slug": custom_name_slug,
346 "display_name": display_name,
347 "gateway_slug": getattr(db_prompt, "gateway_slug", None),
348 "description": db_prompt.description,
349 "template": db_prompt.template,
350 "arguments": arguments_list,
351 "created_at": db_prompt.created_at,
352 "updated_at": db_prompt.updated_at,
353 "enabled": db_prompt.enabled,
354 "metrics": metrics_dict,
355 "tags": db_prompt.tags or [],
356 "visibility": db_prompt.visibility,
357 "team": getattr(db_prompt, "team", None),
358 # Include metadata fields for proper API response
359 "created_by": getattr(db_prompt, "created_by", None),
360 "modified_by": getattr(db_prompt, "modified_by", None),
361 "created_from_ip": getattr(db_prompt, "created_from_ip", None),
362 "created_via": getattr(db_prompt, "created_via", None),
363 "created_user_agent": getattr(db_prompt, "created_user_agent", None),
364 "modified_from_ip": getattr(db_prompt, "modified_from_ip", None),
365 "modified_via": getattr(db_prompt, "modified_via", None),
366 "modified_user_agent": getattr(db_prompt, "modified_user_agent", None),
367 "version": getattr(db_prompt, "version", None),
368 "team_id": getattr(db_prompt, "team_id", None),
369 "owner_email": getattr(db_prompt, "owner_email", None),
370 }
371 return PromptRead.model_validate(prompt_dict)
373 def _get_team_name(self, db: Session, team_id: Optional[str]) -> Optional[str]:
374 """Retrieve the team name given a team ID.
376 Args:
377 db (Session): Database session for querying teams.
378 team_id (Optional[str]): The ID of the team.
380 Returns:
381 Optional[str]: The name of the team if found, otherwise None.
382 """
383 if not team_id:
384 return None
385 team = db.query(EmailTeam).filter(EmailTeam.id == team_id, EmailTeam.is_active.is_(True)).first()
386 db.commit() # Release transaction to avoid idle-in-transaction
387 return team.name if team else None
389 def _compute_prompt_name(self, custom_name: str, gateway: Optional[Any] = None) -> str:
390 """Compute the stored prompt name from custom_name and gateway context.
392 Args:
393 custom_name: Prompt name to slugify and store.
394 gateway: Optional gateway for namespacing.
396 Returns:
397 The stored prompt name with gateway prefix when applicable.
398 """
399 name_slug = slugify(custom_name)
400 if gateway:
401 gateway_slug = slugify(gateway.name)
402 return f"{gateway_slug}{settings.gateway_tool_name_separator}{name_slug}"
403 return name_slug
405 async def register_prompt(
406 self,
407 db: Session,
408 prompt: PromptCreate,
409 created_by: Optional[str] = None,
410 created_from_ip: Optional[str] = None,
411 created_via: Optional[str] = None,
412 created_user_agent: Optional[str] = None,
413 import_batch_id: Optional[str] = None,
414 federation_source: Optional[str] = None,
415 team_id: Optional[str] = None,
416 owner_email: Optional[str] = None,
417 visibility: Optional[str] = "public",
418 ) -> PromptRead:
419 """Register a new prompt template.
421 Args:
422 db: Database session
423 prompt: Prompt creation schema
424 created_by: Username who created this prompt
425 created_from_ip: IP address of creator
426 created_via: Creation method (ui, api, import, federation)
427 created_user_agent: User agent of creation request
428 import_batch_id: UUID for bulk import operations
429 federation_source: Source gateway for federated prompts
430 team_id (Optional[str]): Team ID to assign the prompt to.
431 owner_email (Optional[str]): Email of the user who owns this prompt.
432 visibility (str): Prompt visibility level (private, team, public).
434 Returns:
435 Created prompt information
437 Raises:
438 IntegrityError: If a database integrity error occurs.
439 PromptNameConflictError: If a prompt with the same name already exists.
440 PromptError: For other prompt registration errors
442 Examples:
443 >>> from mcpgateway.services.prompt_service import PromptService
444 >>> from unittest.mock import MagicMock
445 >>> service = PromptService()
446 >>> db = MagicMock()
447 >>> prompt = MagicMock()
448 >>> db.execute.return_value.scalar_one_or_none.return_value = None
449 >>> db.add = MagicMock()
450 >>> db.commit = MagicMock()
451 >>> db.refresh = MagicMock()
452 >>> service._notify_prompt_added = MagicMock()
453 >>> service.convert_prompt_to_read = MagicMock(return_value={})
454 >>> import asyncio
455 >>> try:
456 ... asyncio.run(service.register_prompt(db, prompt))
457 ... except Exception:
458 ... pass
459 """
460 try:
461 # Validate template syntax
462 self._validate_template(prompt.template)
464 # Extract required arguments from template
465 required_args = self._get_required_arguments(prompt.template)
467 # Create argument schema
468 argument_schema = {
469 "type": "object",
470 "properties": {},
471 "required": list(required_args),
472 }
473 for arg in prompt.arguments:
474 schema = {"type": "string"}
475 if arg.description is not None:
476 schema["description"] = arg.description
477 argument_schema["properties"][arg.name] = schema
479 custom_name = prompt.custom_name or prompt.name
480 display_name = prompt.display_name or custom_name
482 # Extract gateway_id from prompt if present and look up gateway for namespacing
483 gateway_id = getattr(prompt, "gateway_id", None)
484 gateway = None
485 if gateway_id:
486 gateway = db.execute(select(DbGateway).where(DbGateway.id == gateway_id)).scalar_one_or_none()
488 computed_name = self._compute_prompt_name(custom_name, gateway=gateway)
490 # Create DB model
491 db_prompt = DbPrompt(
492 name=computed_name,
493 original_name=prompt.name,
494 custom_name=custom_name,
495 display_name=display_name,
496 description=prompt.description,
497 template=prompt.template,
498 argument_schema=argument_schema,
499 tags=prompt.tags,
500 # Metadata fields
501 created_by=created_by,
502 created_from_ip=created_from_ip,
503 created_via=created_via,
504 created_user_agent=created_user_agent,
505 import_batch_id=import_batch_id,
506 federation_source=federation_source,
507 version=1,
508 # Team scoping fields - use schema values if provided, otherwise fallback to parameters
509 team_id=getattr(prompt, "team_id", None) or team_id,
510 owner_email=getattr(prompt, "owner_email", None) or owner_email or created_by,
511 visibility=getattr(prompt, "visibility", None) or visibility,
512 gateway_id=gateway_id,
513 )
514 # Check for existing server with the same name
515 if visibility.lower() == "public":
516 # Check for existing public prompt with the same name and gateway_id
517 existing_prompt = db.execute(select(DbPrompt).where(DbPrompt.name == computed_name, DbPrompt.visibility == "public", DbPrompt.gateway_id == gateway_id)).scalar_one_or_none()
518 if existing_prompt:
519 raise PromptNameConflictError(computed_name, enabled=existing_prompt.enabled, prompt_id=existing_prompt.id, visibility=existing_prompt.visibility)
520 elif visibility.lower() == "team":
521 # Check for existing team prompt with the same name and gateway_id
522 existing_prompt = db.execute(
523 select(DbPrompt).where(DbPrompt.name == computed_name, DbPrompt.visibility == "team", DbPrompt.team_id == team_id, DbPrompt.gateway_id == gateway_id)
524 ).scalar_one_or_none()
525 if existing_prompt:
526 raise PromptNameConflictError(computed_name, enabled=existing_prompt.enabled, prompt_id=existing_prompt.id, visibility=existing_prompt.visibility)
528 # Set gateway relationship to help the before_insert event handler compute the name correctly
529 if gateway:
530 db_prompt.gateway = gateway
531 db_prompt.gateway_name_cache = gateway.name # type: ignore[attr-defined]
533 # Add to DB
534 db.add(db_prompt)
535 db.commit()
536 db.refresh(db_prompt)
537 # Notify subscribers
538 await self._notify_prompt_added(db_prompt)
540 logger.info(f"Registered prompt: {prompt.name}")
542 # Structured logging: Audit trail for prompt creation
543 audit_trail.log_action(
544 user_id=created_by or "system",
545 action="create_prompt",
546 resource_type="prompt",
547 resource_id=str(db_prompt.id),
548 resource_name=db_prompt.name,
549 user_email=owner_email,
550 team_id=team_id,
551 client_ip=created_from_ip,
552 user_agent=created_user_agent,
553 new_values={
554 "name": db_prompt.name,
555 "visibility": visibility,
556 },
557 context={
558 "created_via": created_via,
559 "import_batch_id": import_batch_id,
560 "federation_source": federation_source,
561 },
562 db=db,
563 )
565 # Structured logging: Log successful prompt creation
566 structured_logger.log(
567 level="INFO",
568 message="Prompt created successfully",
569 event_type="prompt_created",
570 component="prompt_service",
571 user_id=created_by,
572 user_email=owner_email,
573 team_id=team_id,
574 resource_type="prompt",
575 resource_id=str(db_prompt.id),
576 custom_fields={
577 "prompt_name": db_prompt.name,
578 "visibility": visibility,
579 },
580 )
582 db_prompt.team = self._get_team_name(db, db_prompt.team_id)
583 prompt_dict = self.convert_prompt_to_read(db_prompt)
585 # Invalidate cache after successful creation
586 cache = _get_registry_cache()
587 await cache.invalidate_prompts()
588 # Also invalidate tags cache since prompt tags may have changed
589 # First-Party
590 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel
592 await admin_stats_cache.invalidate_tags()
593 # First-Party
594 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel
596 metrics_cache.invalidate_prefix("top_prompts:")
597 metrics_cache.invalidate("prompts")
599 return PromptRead.model_validate(prompt_dict)
601 except IntegrityError as ie:
602 logger.error(f"IntegrityErrors in group: {ie}")
604 structured_logger.log(
605 level="ERROR",
606 message="Prompt creation failed due to database integrity error",
607 event_type="prompt_creation_failed",
608 component="prompt_service",
609 user_id=created_by,
610 user_email=owner_email,
611 error=ie,
612 custom_fields={"prompt_name": prompt.name},
613 )
614 raise ie
615 except PromptNameConflictError as se:
616 db.rollback()
618 structured_logger.log(
619 level="WARNING",
620 message="Prompt creation failed due to name conflict",
621 event_type="prompt_name_conflict",
622 component="prompt_service",
623 user_id=created_by,
624 user_email=owner_email,
625 custom_fields={"prompt_name": prompt.name, "visibility": visibility},
626 )
627 raise se
628 except Exception as e:
629 db.rollback()
631 structured_logger.log(
632 level="ERROR",
633 message="Prompt creation failed",
634 event_type="prompt_creation_failed",
635 component="prompt_service",
636 user_id=created_by,
637 user_email=owner_email,
638 error=e,
639 custom_fields={"prompt_name": prompt.name},
640 )
641 raise PromptError(f"Failed to register prompt: {str(e)}")
643 async def register_prompts_bulk(
644 self,
645 db: Session,
646 prompts: List[PromptCreate],
647 created_by: Optional[str] = None,
648 created_from_ip: Optional[str] = None,
649 created_via: Optional[str] = None,
650 created_user_agent: Optional[str] = None,
651 import_batch_id: Optional[str] = None,
652 federation_source: Optional[str] = None,
653 team_id: Optional[str] = None,
654 owner_email: Optional[str] = None,
655 visibility: Optional[str] = "public",
656 conflict_strategy: str = "skip",
657 ) -> Dict[str, Any]:
658 """Register multiple prompts in bulk with a single commit.
660 This method provides significant performance improvements over individual
661 prompt registration by:
662 - Using db.add_all() instead of individual db.add() calls
663 - Performing a single commit for all prompts
664 - Batch conflict detection
665 - Chunking for very large imports (>500 items)
667 Args:
668 db: Database session
669 prompts: List of prompt creation schemas
670 created_by: Username who created these prompts
671 created_from_ip: IP address of creator
672 created_via: Creation method (ui, api, import, federation)
673 created_user_agent: User agent of creation request
674 import_batch_id: UUID for bulk import operations
675 federation_source: Source gateway for federated prompts
676 team_id: Team ID to assign the prompts to
677 owner_email: Email of the user who owns these prompts
678 visibility: Prompt visibility level (private, team, public)
679 conflict_strategy: How to handle conflicts (skip, update, rename, fail)
681 Returns:
682 Dict with statistics:
683 - created: Number of prompts created
684 - updated: Number of prompts updated
685 - skipped: Number of prompts skipped
686 - failed: Number of prompts that failed
687 - errors: List of error messages
689 Raises:
690 PromptError: If bulk registration fails critically
692 Examples:
693 >>> from mcpgateway.services.prompt_service import PromptService
694 >>> from unittest.mock import MagicMock
695 >>> service = PromptService()
696 >>> db = MagicMock()
697 >>> prompts = [MagicMock(), MagicMock()]
698 >>> import asyncio
699 >>> try:
700 ... result = asyncio.run(service.register_prompts_bulk(db, prompts))
701 ... except Exception:
702 ... pass
703 """
704 if not prompts:
705 return {"created": 0, "updated": 0, "skipped": 0, "failed": 0, "errors": []}
707 stats = {"created": 0, "updated": 0, "skipped": 0, "failed": 0, "errors": []}
709 # Process in chunks to avoid memory issues and SQLite parameter limits
710 chunk_size = 500
712 for chunk_start in range(0, len(prompts), chunk_size):
713 chunk = prompts[chunk_start : chunk_start + chunk_size]
715 try:
716 # Collect unique gateway_ids and look them up
717 gateway_ids = set()
718 for prompt in chunk:
719 gw_id = getattr(prompt, "gateway_id", None)
720 if gw_id:
721 gateway_ids.add(gw_id)
723 gateways_map: Dict[str, Any] = {}
724 if gateway_ids:
725 gateways = db.execute(select(DbGateway).where(DbGateway.id.in_(gateway_ids))).scalars().all()
726 gateways_map = {gw.id: gw for gw in gateways}
728 # Batch check for existing prompts to detect conflicts
729 # Build computed names with gateway context
730 prompt_names = []
731 for prompt in chunk:
732 custom_name = getattr(prompt, "custom_name", None) or prompt.name
733 gw_id = getattr(prompt, "gateway_id", None)
734 gateway = gateways_map.get(gw_id) if gw_id else None
735 computed_name = self._compute_prompt_name(custom_name, gateway=gateway)
736 prompt_names.append(computed_name)
738 # Query for existing prompts - need to consider gateway_id in conflict detection
739 # Build base query conditions
740 if visibility.lower() == "public":
741 base_conditions = [DbPrompt.name.in_(prompt_names), DbPrompt.visibility == "public"]
742 elif visibility.lower() == "team" and team_id:
743 base_conditions = [DbPrompt.name.in_(prompt_names), DbPrompt.visibility == "team", DbPrompt.team_id == team_id]
744 else:
745 # Private prompts - check by owner
746 base_conditions = [DbPrompt.name.in_(prompt_names), DbPrompt.visibility == "private", DbPrompt.owner_email == (owner_email or created_by)]
748 existing_prompts_query = select(DbPrompt).where(*base_conditions)
749 existing_prompts = db.execute(existing_prompts_query).scalars().all()
750 # Use (name, gateway_id) tuple as key for proper conflict detection
751 existing_prompts_map = {(p.name, p.gateway_id): p for p in existing_prompts}
753 prompts_to_add = []
754 prompts_to_update = []
756 for prompt in chunk:
757 try:
758 # Validate template syntax
759 self._validate_template(prompt.template)
761 # Extract required arguments from template
762 required_args = self._get_required_arguments(prompt.template)
764 # Create argument schema
765 argument_schema = {
766 "type": "object",
767 "properties": {},
768 "required": list(required_args),
769 }
770 for arg in prompt.arguments:
771 schema = {"type": "string"}
772 if arg.description is not None:
773 schema["description"] = arg.description
774 argument_schema["properties"][arg.name] = schema
776 # Use provided parameters or schema values
777 prompt_team_id = team_id if team_id is not None else getattr(prompt, "team_id", None)
778 prompt_owner_email = owner_email or getattr(prompt, "owner_email", None) or created_by
779 prompt_visibility = visibility if visibility is not None else getattr(prompt, "visibility", "public")
780 prompt_gateway_id = getattr(prompt, "gateway_id", None)
782 custom_name = getattr(prompt, "custom_name", None) or prompt.name
783 display_name = getattr(prompt, "display_name", None) or custom_name
784 gateway = gateways_map.get(prompt_gateway_id) if prompt_gateway_id else None
785 computed_name = self._compute_prompt_name(custom_name, gateway=gateway)
787 # Look up existing prompt by (name, gateway_id) tuple
788 existing_prompt = existing_prompts_map.get((computed_name, prompt_gateway_id))
790 if existing_prompt:
791 # Handle conflict based on strategy
792 if conflict_strategy == "skip":
793 stats["skipped"] += 1
794 continue
795 if conflict_strategy == "update":
796 # Update existing prompt
797 existing_prompt.description = prompt.description
798 existing_prompt.template = prompt.template
799 # Clear template cache to reduce memory growth
800 _compile_jinja_template.cache_clear()
801 existing_prompt.argument_schema = argument_schema
802 existing_prompt.tags = prompt.tags or []
803 if getattr(prompt, "custom_name", None) is not None:
804 existing_prompt.custom_name = custom_name
805 if getattr(prompt, "display_name", None) is not None:
806 existing_prompt.display_name = display_name
807 existing_prompt.modified_by = created_by
808 existing_prompt.modified_from_ip = created_from_ip
809 existing_prompt.modified_via = created_via
810 existing_prompt.modified_user_agent = created_user_agent
811 existing_prompt.updated_at = datetime.now(timezone.utc)
812 existing_prompt.version = (existing_prompt.version or 1) + 1
814 prompts_to_update.append(existing_prompt)
815 stats["updated"] += 1
816 elif conflict_strategy == "rename":
817 # Create with renamed prompt
818 new_name = f"{prompt.name}_imported_{int(datetime.now().timestamp())}"
819 new_custom_name = new_name
820 new_display_name = new_name
821 computed_name = self._compute_prompt_name(new_custom_name, gateway=gateway)
822 db_prompt = DbPrompt(
823 name=computed_name,
824 original_name=prompt.name,
825 custom_name=new_custom_name,
826 display_name=new_display_name,
827 description=prompt.description,
828 template=prompt.template,
829 argument_schema=argument_schema,
830 tags=prompt.tags or [],
831 created_by=created_by,
832 created_from_ip=created_from_ip,
833 created_via=created_via,
834 created_user_agent=created_user_agent,
835 import_batch_id=import_batch_id,
836 federation_source=federation_source,
837 version=1,
838 team_id=prompt_team_id,
839 owner_email=prompt_owner_email,
840 visibility=prompt_visibility,
841 gateway_id=prompt_gateway_id,
842 )
843 # Set gateway relationship to help the before_insert event handler
844 if gateway:
845 db_prompt.gateway = gateway
846 db_prompt.gateway_name_cache = gateway.name # type: ignore[attr-defined]
847 prompts_to_add.append(db_prompt)
848 stats["created"] += 1
849 elif conflict_strategy == "fail":
850 stats["failed"] += 1
851 stats["errors"].append(f"Prompt name conflict: {prompt.name}")
852 continue
853 else:
854 # Create new prompt
855 db_prompt = DbPrompt(
856 name=computed_name,
857 original_name=prompt.name,
858 custom_name=custom_name,
859 display_name=display_name,
860 description=prompt.description,
861 template=prompt.template,
862 argument_schema=argument_schema,
863 tags=prompt.tags or [],
864 created_by=created_by,
865 created_from_ip=created_from_ip,
866 created_via=created_via,
867 created_user_agent=created_user_agent,
868 import_batch_id=import_batch_id,
869 federation_source=federation_source,
870 version=1,
871 team_id=prompt_team_id,
872 owner_email=prompt_owner_email,
873 visibility=prompt_visibility,
874 gateway_id=prompt_gateway_id,
875 )
876 # Set gateway relationship to help the before_insert event handler
877 if gateway:
878 db_prompt.gateway = gateway
879 db_prompt.gateway_name_cache = gateway.name # type: ignore[attr-defined]
880 prompts_to_add.append(db_prompt)
881 stats["created"] += 1
883 except Exception as e:
884 stats["failed"] += 1
885 stats["errors"].append(f"Failed to process prompt {prompt.name}: {str(e)}")
886 logger.warning(f"Failed to process prompt {prompt.name} in bulk operation: {str(e)}")
887 continue
889 # Bulk add new prompts
890 if prompts_to_add:
891 db.add_all(prompts_to_add)
893 # Commit the chunk
894 db.commit()
896 # Refresh prompts for notifications and audit trail
897 for db_prompt in prompts_to_add:
898 db.refresh(db_prompt)
899 # Notify subscribers
900 await self._notify_prompt_added(db_prompt)
902 # Log bulk audit trail entry
903 if prompts_to_add or prompts_to_update:
904 audit_trail.log_action(
905 user_id=created_by or "system",
906 action="bulk_create_prompts" if prompts_to_add else "bulk_update_prompts",
907 resource_type="prompt",
908 resource_id=import_batch_id or "bulk_operation",
909 resource_name=f"Bulk operation: {len(prompts_to_add)} created, {len(prompts_to_update)} updated",
910 user_email=owner_email,
911 team_id=team_id,
912 client_ip=created_from_ip,
913 user_agent=created_user_agent,
914 new_values={
915 "prompts_created": len(prompts_to_add),
916 "prompts_updated": len(prompts_to_update),
917 "visibility": visibility,
918 },
919 context={
920 "created_via": created_via,
921 "import_batch_id": import_batch_id,
922 "federation_source": federation_source,
923 "conflict_strategy": conflict_strategy,
924 },
925 db=db,
926 )
928 logger.info(f"Bulk registered {len(prompts_to_add)} prompts, updated {len(prompts_to_update)} prompts in chunk")
930 except Exception as e:
931 db.rollback()
932 logger.error(f"Failed to process chunk in bulk prompt registration: {str(e)}")
933 stats["failed"] += len(chunk)
934 stats["errors"].append(f"Chunk processing failed: {str(e)}")
935 continue
937 # Final structured logging
938 structured_logger.log(
939 level="INFO",
940 message="Bulk prompt registration completed",
941 event_type="prompts_bulk_created",
942 component="prompt_service",
943 user_id=created_by,
944 user_email=owner_email,
945 team_id=team_id,
946 resource_type="prompt",
947 custom_fields={
948 "prompts_created": stats["created"],
949 "prompts_updated": stats["updated"],
950 "prompts_skipped": stats["skipped"],
951 "prompts_failed": stats["failed"],
952 "total_prompts": len(prompts),
953 "visibility": visibility,
954 "conflict_strategy": conflict_strategy,
955 },
956 )
958 return stats
960 async def list_prompts(
961 self,
962 db: Session,
963 include_inactive: bool = False,
964 cursor: Optional[str] = None,
965 tags: Optional[List[str]] = None,
966 limit: Optional[int] = None,
967 page: Optional[int] = None,
968 per_page: Optional[int] = None,
969 user_email: Optional[str] = None,
970 team_id: Optional[str] = None,
971 visibility: Optional[str] = None,
972 token_teams: Optional[List[str]] = None,
973 ) -> Union[tuple[List[PromptRead], Optional[str]], Dict[str, Any]]:
974 """
975 Retrieve a list of prompt templates from the database with pagination support.
977 This method retrieves prompt templates from the database and converts them into a list
978 of PromptRead objects. It supports filtering out inactive prompts based on the
979 include_inactive parameter and cursor-based pagination.
981 Args:
982 db (Session): The SQLAlchemy database session.
983 include_inactive (bool): If True, include inactive prompts in the result.
984 Defaults to False.
985 cursor (Optional[str], optional): An opaque cursor token for pagination.
986 Opaque base64-encoded string containing last item's ID and created_at.
987 tags (Optional[List[str]]): Filter prompts by tags. If provided, only prompts with at least one matching tag will be returned.
988 limit (Optional[int]): Maximum number of prompts to return. Use 0 for all prompts (no limit).
989 If not specified, uses pagination_default_page_size.
990 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor.
991 per_page: Items per page for page-based pagination. Defaults to pagination_default_page_size.
992 user_email (Optional[str]): User email for team-based access control. If None, no access control is applied.
993 team_id (Optional[str]): Filter by specific team ID. Requires user_email for access validation.
994 visibility (Optional[str]): Filter by visibility (private, team, public).
995 token_teams (Optional[List[str]]): Override DB team lookup with token's teams. Used for MCP/API token access
996 where the token scope should be respected instead of the user's full team memberships.
998 Returns:
999 If page is provided: Dict with {"data": [...], "pagination": {...}, "links": {...}}
1000 If cursor is provided or neither: tuple of (list of PromptRead objects, next_cursor).
1002 Examples:
1003 >>> from mcpgateway.services.prompt_service import PromptService
1004 >>> from unittest.mock import MagicMock
1005 >>> from mcpgateway.schemas import PromptRead
1006 >>> service = PromptService()
1007 >>> db = MagicMock()
1008 >>> prompt_read_obj = MagicMock(spec=PromptRead)
1009 >>> service.convert_prompt_to_read = MagicMock(return_value=prompt_read_obj)
1010 >>> db.execute.return_value.scalars.return_value.all.return_value = [MagicMock()]
1011 >>> import asyncio
1012 >>> prompts, next_cursor = asyncio.run(service.list_prompts(db))
1013 >>> prompts == [prompt_read_obj]
1014 True
1015 """
1016 # Check cache for first page only (cursor=None)
1017 # Skip caching when:
1018 # - user_email is provided (team-filtered results are user-specific)
1019 # - token_teams is set (scoped access, e.g., public-only or team-scoped tokens)
1020 # - page-based pagination is used
1021 # This prevents cache poisoning where admin results could leak to public-only requests
1022 cache = _get_registry_cache()
1023 if cursor is None and user_email is None and token_teams is None and page is None:
1024 filters_hash = cache.hash_filters(include_inactive=include_inactive, tags=sorted(tags) if tags else None)
1025 cached = await cache.get("prompts", filters_hash)
1026 if cached is not None:
1027 # Reconstruct PromptRead objects from cached dicts
1028 cached_prompts = [PromptRead.model_validate(p) for p in cached["prompts"]]
1029 return (cached_prompts, cached.get("next_cursor"))
1031 # Build base query with ordering and eager load gateway to avoid N+1
1032 query = select(DbPrompt).options(joinedload(DbPrompt.gateway)).order_by(desc(DbPrompt.created_at), desc(DbPrompt.id))
1034 if not include_inactive:
1035 query = query.where(DbPrompt.enabled)
1037 query = await self._apply_access_control(query, db, user_email, token_teams, team_id)
1039 if visibility:
1040 query = query.where(DbPrompt.visibility == visibility)
1042 # Add tag filtering if tags are provided (supports both List[str] and List[Dict] formats)
1043 if tags:
1044 query = query.where(json_contains_tag_expr(db, DbPrompt.tags, tags, match_any=True))
1046 # Use unified pagination helper - handles both page and cursor pagination
1047 pag_result = await unified_paginate(
1048 db=db,
1049 query=query,
1050 page=page,
1051 per_page=per_page,
1052 cursor=cursor,
1053 limit=limit,
1054 base_url="/admin/prompts", # Used for page-based links
1055 query_params={"include_inactive": include_inactive} if include_inactive else {},
1056 )
1058 next_cursor = None
1059 # Extract servers based on pagination type
1060 if page is not None:
1061 # Page-based: pag_result is a dict
1062 prompts_db = pag_result["data"]
1063 else:
1064 # Cursor-based: pag_result is a tuple
1065 prompts_db, next_cursor = pag_result
1067 # Fetch team names for the prompts (common for both pagination types)
1068 team_ids_set = {s.team_id for s in prompts_db if s.team_id}
1069 team_map = {}
1070 if team_ids_set:
1071 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(team_ids_set), EmailTeam.is_active.is_(True))).all()
1072 team_map = {team.id: team.name for team in teams}
1074 db.commit() # Release transaction to avoid idle-in-transaction
1076 # Convert to PromptRead (common for both pagination types)
1077 result = []
1078 for s in prompts_db:
1079 try:
1080 s.team = team_map.get(s.team_id) if s.team_id else None
1081 result.append(self.convert_prompt_to_read(s, include_metrics=False))
1082 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e:
1083 logger.exception(f"Failed to convert prompt {getattr(s, 'id', 'unknown')} ({getattr(s, 'name', 'unknown')}): {e}")
1084 # Continue with remaining prompts instead of failing completely
1085 # Return appropriate format based on pagination type
1086 if page is not None:
1087 # Page-based format
1088 return {
1089 "data": result,
1090 "pagination": pag_result["pagination"],
1091 "links": pag_result["links"],
1092 }
1094 # Cursor-based format
1096 # Cache first page results - only for non-user-specific/non-scoped queries
1097 # Must match the same conditions as cache lookup to prevent cache poisoning
1098 if cursor is None and user_email is None and token_teams is None:
1099 try:
1100 cache_data = {"prompts": [s.model_dump(mode="json") for s in result], "next_cursor": next_cursor}
1101 await cache.set("prompts", cache_data, filters_hash)
1102 except AttributeError:
1103 pass # Skip caching if result objects don't support model_dump (e.g., in doctests)
1105 return (result, next_cursor)
1107 async def list_prompts_for_user(
1108 self, db: Session, user_email: str, team_id: Optional[str] = None, visibility: Optional[str] = None, include_inactive: bool = False, skip: int = 0, limit: int = 100
1109 ) -> List[PromptRead]:
1110 """
1111 DEPRECATED: Use list_prompts() with user_email parameter instead.
1113 This method is maintained for backward compatibility but is no longer used.
1114 New code should call list_prompts() with user_email, team_id, and visibility parameters.
1116 List prompts user has access to with team filtering.
1118 Args:
1119 db: Database session
1120 user_email: Email of the user requesting prompts
1121 team_id: Optional team ID to filter by specific team
1122 visibility: Optional visibility filter (private, team, public)
1123 include_inactive: Whether to include inactive prompts
1124 skip: Number of prompts to skip for pagination
1125 limit: Maximum number of prompts to return
1127 Returns:
1128 List[PromptRead]: Prompts the user has access to
1129 """
1130 # Build query following existing patterns from list_prompts()
1131 team_service = TeamManagementService(db)
1132 user_teams = await team_service.get_user_teams(user_email)
1133 team_ids = [team.id for team in user_teams]
1135 # Build query following existing patterns from list_resources()
1136 # Eager load gateway to avoid N+1 when accessing gateway_slug
1137 query = select(DbPrompt).options(joinedload(DbPrompt.gateway))
1139 # Apply active/inactive filter
1140 if not include_inactive:
1141 query = query.where(DbPrompt.enabled)
1143 if team_id:
1144 if team_id not in team_ids:
1145 return [] # No access to team
1147 access_conditions = []
1148 # Filter by specific team
1149 access_conditions.append(and_(DbPrompt.team_id == team_id, DbPrompt.visibility.in_(["team", "public"])))
1151 access_conditions.append(and_(DbPrompt.team_id == team_id, DbPrompt.owner_email == user_email))
1153 query = query.where(or_(*access_conditions))
1154 else:
1155 # Get user's accessible teams
1156 # Build access conditions following existing patterns
1157 access_conditions = []
1158 # 1. User's personal resources (owner_email matches)
1159 access_conditions.append(DbPrompt.owner_email == user_email)
1160 # 2. Team resources where user is member
1161 if team_ids:
1162 access_conditions.append(and_(DbPrompt.team_id.in_(team_ids), DbPrompt.visibility.in_(["team", "public"])))
1163 # 3. Public resources (if visibility allows)
1164 access_conditions.append(DbPrompt.visibility == "public")
1166 query = query.where(or_(*access_conditions))
1168 # Apply visibility filter if specified
1169 if visibility:
1170 query = query.where(DbPrompt.visibility == visibility)
1172 # Apply pagination following existing patterns
1173 query = query.offset(skip).limit(limit)
1175 prompts = db.execute(query).scalars().all()
1177 # Batch fetch team names to avoid N+1 queries
1178 prompt_team_ids = {p.team_id for p in prompts if p.team_id}
1179 team_map = {}
1180 if prompt_team_ids:
1181 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(prompt_team_ids), EmailTeam.is_active.is_(True))).all()
1182 team_map = {str(team.id): team.name for team in teams}
1184 db.commit() # Release transaction to avoid idle-in-transaction
1186 result = []
1187 for t in prompts:
1188 try:
1189 t.team = team_map.get(str(t.team_id)) if t.team_id else None
1190 result.append(self.convert_prompt_to_read(t, include_metrics=False))
1191 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e:
1192 logger.exception(f"Failed to convert prompt {getattr(t, 'id', 'unknown')} ({getattr(t, 'name', 'unknown')}): {e}")
1193 # Continue with remaining prompts instead of failing completely
1194 return result
1196 async def list_server_prompts(
1197 self,
1198 db: Session,
1199 server_id: str,
1200 include_inactive: bool = False,
1201 cursor: Optional[str] = None,
1202 user_email: Optional[str] = None,
1203 token_teams: Optional[List[str]] = None,
1204 ) -> List[PromptRead]:
1205 """
1206 Retrieve a list of prompt templates from the database.
1208 This method retrieves prompt templates from the database and converts them into a list
1209 of PromptRead objects. It supports filtering out inactive prompts based on the
1210 include_inactive parameter. The cursor parameter is reserved for future pagination support
1211 but is currently not implemented.
1213 Args:
1214 db (Session): The SQLAlchemy database session.
1215 server_id (str): Server ID
1216 include_inactive (bool): If True, include inactive prompts in the result.
1217 Defaults to False.
1218 cursor (Optional[str], optional): An opaque cursor token for pagination. Currently,
1219 this parameter is ignored. Defaults to None.
1220 user_email (Optional[str]): User email for visibility filtering. If None, no filtering applied.
1221 token_teams (Optional[List[str]]): Override DB team lookup with token's teams. Used for MCP/API
1222 token access where the token scope should be respected.
1224 Returns:
1225 List[PromptRead]: A list of prompt templates represented as PromptRead objects.
1227 Examples:
1228 >>> from mcpgateway.services.prompt_service import PromptService
1229 >>> from unittest.mock import MagicMock
1230 >>> from mcpgateway.schemas import PromptRead
1231 >>> service = PromptService()
1232 >>> db = MagicMock()
1233 >>> prompt_read_obj = MagicMock(spec=PromptRead)
1234 >>> service.convert_prompt_to_read = MagicMock(return_value=prompt_read_obj)
1235 >>> db.execute.return_value.scalars.return_value.all.return_value = [MagicMock()]
1236 >>> import asyncio
1237 >>> result = asyncio.run(service.list_server_prompts(db, 'server1'))
1238 >>> result == [prompt_read_obj]
1239 True
1240 """
1241 # Eager load gateway to avoid N+1 when accessing gateway_slug
1242 query = (
1243 select(DbPrompt)
1244 .options(joinedload(DbPrompt.gateway))
1245 .join(server_prompt_association, DbPrompt.id == server_prompt_association.c.prompt_id)
1246 .where(server_prompt_association.c.server_id == server_id)
1247 )
1248 if not include_inactive:
1249 query = query.where(DbPrompt.enabled)
1251 # Add visibility filtering if user context OR token_teams provided
1252 # This ensures unauthenticated requests with token_teams=[] only see public prompts
1253 if user_email is not None or token_teams is not None: # empty-string user_email -> public-only filtering (secure default)
1254 # Use token_teams if provided (for MCP/API token access), otherwise look up from DB
1255 if token_teams is not None:
1256 team_ids = token_teams
1257 elif user_email:
1258 team_service = TeamManagementService(db)
1259 user_teams = await team_service.get_user_teams(user_email)
1260 team_ids = [team.id for team in user_teams]
1261 else:
1262 team_ids = []
1264 # Check if this is a public-only token (empty teams array)
1265 # Public-only tokens can ONLY see public resources - no owner access
1266 is_public_only_token = token_teams is not None and len(token_teams) == 0
1268 access_conditions = [
1269 DbPrompt.visibility == "public",
1270 ]
1271 # Only include owner access for non-public-only tokens with user_email
1272 if not is_public_only_token and user_email:
1273 access_conditions.append(DbPrompt.owner_email == user_email)
1274 if team_ids:
1275 access_conditions.append(and_(DbPrompt.team_id.in_(team_ids), DbPrompt.visibility.in_(["team", "public"])))
1276 query = query.where(or_(*access_conditions))
1278 # Cursor-based pagination logic can be implemented here in the future.
1279 logger.debug(cursor)
1280 prompts = db.execute(query).scalars().all()
1282 # Batch fetch team names to avoid N+1 queries
1283 prompt_team_ids = {p.team_id for p in prompts if p.team_id}
1284 team_map = {}
1285 if prompt_team_ids:
1286 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(prompt_team_ids), EmailTeam.is_active.is_(True))).all()
1287 team_map = {str(team.id): team.name for team in teams}
1289 db.commit() # Release transaction to avoid idle-in-transaction
1291 result = []
1292 for t in prompts:
1293 try:
1294 t.team = team_map.get(str(t.team_id)) if t.team_id else None
1295 result.append(self.convert_prompt_to_read(t, include_metrics=False))
1296 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e:
1297 logger.exception(f"Failed to convert prompt {getattr(t, 'id', 'unknown')} ({getattr(t, 'name', 'unknown')}): {e}")
1298 # Continue with remaining prompts instead of failing completely
1299 return result
1301 async def _record_prompt_metric(self, db: Session, prompt: DbPrompt, start_time: float, success: bool, error_message: Optional[str]) -> None:
1302 """
1303 Records a metric for a prompt invocation.
1305 Args:
1306 db: Database session
1307 prompt: The prompt that was invoked
1308 start_time: Monotonic start time of the invocation
1309 success: True if successful, False otherwise
1310 error_message: Error message if failed, None otherwise
1311 """
1312 end_time = time.monotonic()
1313 response_time = end_time - start_time
1315 metric = PromptMetric(
1316 prompt_id=prompt.id,
1317 response_time=response_time,
1318 is_success=success,
1319 error_message=error_message,
1320 )
1321 db.add(metric)
1322 db.commit()
1324 async def _check_prompt_access(
1325 self,
1326 db: Session,
1327 prompt: DbPrompt,
1328 user_email: Optional[str],
1329 token_teams: Optional[List[str]],
1330 ) -> bool:
1331 """Check if user has access to a prompt based on visibility rules.
1333 Implements the same access control logic as list_prompts() for consistency.
1335 Args:
1336 db: Database session for team membership lookup if needed.
1337 prompt: Prompt ORM object with visibility, team_id, owner_email.
1338 user_email: Email of the requesting user (None = unauthenticated).
1339 token_teams: List of team IDs from token.
1340 - None = unrestricted admin access
1341 - [] = public-only token
1342 - [...] = team-scoped token
1344 Returns:
1345 True if access is allowed, False otherwise.
1346 """
1347 visibility = getattr(prompt, "visibility", "public")
1348 prompt_team_id = getattr(prompt, "team_id", None)
1349 prompt_owner_email = getattr(prompt, "owner_email", None)
1351 # Public prompts are accessible by everyone
1352 if visibility == "public":
1353 return True
1355 # Admin bypass: token_teams=None AND user_email=None means unrestricted admin
1356 # This happens when is_admin=True and no team scoping in token
1357 if token_teams is None and user_email is None:
1358 return True
1360 # No user context (but not admin) = deny access to non-public prompts
1361 if not user_email:
1362 return False
1364 # Public-only tokens (empty teams array) can ONLY access public prompts
1365 is_public_only_token = token_teams is not None and len(token_teams) == 0
1366 if is_public_only_token:
1367 return False # Already checked public above
1369 # Owner can access their own private prompts
1370 if visibility == "private" and prompt_owner_email and prompt_owner_email == user_email:
1371 return True
1373 # Team prompts: check team membership (matches list_prompts behavior)
1374 if prompt_team_id:
1375 # Use token_teams if provided, otherwise look up from DB
1376 if token_teams is not None:
1377 team_ids = token_teams
1378 else:
1379 team_service = TeamManagementService(db)
1380 user_teams = await team_service.get_user_teams(user_email)
1381 team_ids = [team.id for team in user_teams]
1383 # Team/public visibility allows access if user is in the team
1384 if visibility in ["team", "public"] and prompt_team_id in team_ids:
1385 return True
1387 return False
1389 async def get_prompt(
1390 self,
1391 db: Session,
1392 prompt_id: Union[int, str],
1393 arguments: Optional[Dict[str, str]] = None,
1394 user: Optional[str] = None,
1395 tenant_id: Optional[str] = None,
1396 server_id: Optional[str] = None,
1397 request_id: Optional[str] = None,
1398 token_teams: Optional[List[str]] = None,
1399 plugin_context_table: Optional[PluginContextTable] = None,
1400 plugin_global_context: Optional[GlobalContext] = None,
1401 _meta_data: Optional[Dict[str, Any]] = None,
1402 ) -> PromptResult:
1403 """Get a prompt template and optionally render it.
1405 Args:
1406 db: Database session
1407 prompt_id: ID of the prompt to retrieve
1408 arguments: Optional arguments for rendering
1409 user: Optional user email for authorization checks
1410 tenant_id: Optional tenant identifier for plugin context
1411 server_id: Optional server ID for server scoping enforcement
1412 request_id: Optional request ID, generated if not provided
1413 token_teams: Optional list of team IDs from token for authorization.
1414 None = unrestricted admin, [] = public-only, [...] = team-scoped.
1415 plugin_context_table: Optional plugin context table from previous hooks for cross-hook state sharing.
1416 plugin_global_context: Optional global context from middleware for consistency across hooks.
1417 _meta_data: Optional metadata for prompt retrieval (not used currently).
1419 Returns:
1420 Prompt result with rendered messages
1422 Raises:
1423 PluginViolationError: If prompt violates a plugin policy
1424 PromptNotFoundError: If prompt not found or access denied
1425 PromptError: For other prompt errors
1426 PluginError: If encounters issue with plugin
1428 Examples:
1429 >>> from mcpgateway.services.prompt_service import PromptService
1430 >>> from unittest.mock import MagicMock
1431 >>> service = PromptService()
1432 >>> db = MagicMock()
1433 >>> db.execute.return_value.scalar_one_or_none.return_value = MagicMock()
1434 >>> import asyncio
1435 >>> try:
1436 ... asyncio.run(service.get_prompt(db, 'prompt_id'))
1437 ... except Exception:
1438 ... pass
1439 """
1441 start_time = time.monotonic()
1442 success = False
1443 error_message = None
1444 prompt = None
1445 server_scoped = False
1447 # Create database span for observability dashboard
1448 trace_id = current_trace_id.get()
1449 db_span_id = None
1450 db_span_ended = False
1451 observability_service = ObservabilityService() if trace_id else None
1453 if trace_id and observability_service:
1454 try:
1455 db_span_id = observability_service.start_span(
1456 db=db,
1457 trace_id=trace_id,
1458 name="prompt.render",
1459 attributes={
1460 "prompt.id": str(prompt_id),
1461 "arguments_count": len(arguments) if arguments else 0,
1462 "user": user or "anonymous",
1463 "server_id": server_id,
1464 "tenant_id": tenant_id,
1465 "request_id": request_id or "none",
1466 },
1467 )
1468 logger.debug(f"✓ Created prompt.render span: {db_span_id} for prompt: {prompt_id}")
1469 except Exception as e:
1470 logger.warning(f"Failed to start observability span for prompt rendering: {e}")
1471 db_span_id = None
1473 # Create a trace span for OpenTelemetry export (Jaeger, Zipkin, etc.)
1474 with create_span(
1475 "prompt.render",
1476 {
1477 "prompt.id": prompt_id,
1478 "arguments_count": len(arguments) if arguments else 0,
1479 "user": user or "anonymous",
1480 "server_id": server_id,
1481 "tenant_id": tenant_id,
1482 "request_id": request_id or "none",
1483 },
1484 ) as span:
1485 try:
1486 # Check if any prompt hooks are registered to avoid unnecessary context creation
1487 has_pre_fetch = self._plugin_manager and self._plugin_manager.has_hooks_for(PromptHookType.PROMPT_PRE_FETCH)
1488 has_post_fetch = self._plugin_manager and self._plugin_manager.has_hooks_for(PromptHookType.PROMPT_POST_FETCH)
1490 # Initialize plugin context variables only if hooks are registered
1491 context_table = None
1492 global_context = None
1493 if has_pre_fetch or has_post_fetch:
1494 context_table = plugin_context_table
1495 if plugin_global_context:
1496 global_context = plugin_global_context
1497 # Update fields with prompt-specific information
1498 if user:
1499 global_context.user = user
1500 if server_id:
1501 global_context.server_id = server_id
1502 if tenant_id:
1503 global_context.tenant_id = tenant_id
1504 else:
1505 # Create new context (fallback when middleware didn't run)
1506 if not request_id:
1507 request_id = uuid.uuid4().hex
1508 global_context = GlobalContext(request_id=request_id, user=user, server_id=server_id, tenant_id=tenant_id)
1510 if has_pre_fetch:
1511 pre_result, context_table = await self._plugin_manager.invoke_hook(
1512 PromptHookType.PROMPT_PRE_FETCH,
1513 payload=PromptPrehookPayload(prompt_id=prompt_id, args=arguments),
1514 global_context=global_context,
1515 local_contexts=context_table, # Pass context from previous hooks
1516 violations_as_exceptions=True,
1517 )
1519 # Use modified payload if provided
1520 if pre_result.modified_payload:
1521 payload = pre_result.modified_payload
1522 arguments = payload.args
1524 # Find prompt by ID first, then by name (active prompts only)
1525 search_key = str(prompt_id)
1526 prompt = db.execute(select(DbPrompt).where(DbPrompt.id == prompt_id).where(DbPrompt.enabled)).scalar_one_or_none()
1527 if not prompt:
1528 prompt = db.execute(select(DbPrompt).where(DbPrompt.name == prompt_id).where(DbPrompt.enabled)).scalar_one_or_none()
1530 if not prompt:
1531 # Check if an inactive prompt exists
1532 inactive_prompt = db.execute(select(DbPrompt).where(DbPrompt.id == prompt_id).where(not_(DbPrompt.enabled))).scalar_one_or_none()
1533 if not inactive_prompt:
1534 inactive_prompt = db.execute(select(DbPrompt).where(DbPrompt.name == prompt_id).where(not_(DbPrompt.enabled))).scalar_one_or_none()
1536 if inactive_prompt:
1537 raise PromptNotFoundError(f"Prompt '{search_key}' exists but is inactive")
1539 raise PromptNotFoundError(f"Prompt not found: {search_key}")
1541 # ═══════════════════════════════════════════════════════════════════════════
1542 # SECURITY: Check prompt access based on visibility and team membership
1543 # ═══════════════════════════════════════════════════════════════════════════
1544 if not await self._check_prompt_access(db, prompt, user, token_teams):
1545 # Don't reveal prompt existence - return generic "not found"
1546 raise PromptNotFoundError(f"Prompt not found: {search_key}")
1548 # ═══════════════════════════════════════════════════════════════════════════
1549 # SECURITY: Enforce server scoping if server_id is provided
1550 # Prompt must be attached to the specified virtual server
1551 # ═══════════════════════════════════════════════════════════════════════════
1552 if server_id:
1553 server_match = db.execute(
1554 select(server_prompt_association.c.prompt_id).where(
1555 server_prompt_association.c.server_id == server_id,
1556 server_prompt_association.c.prompt_id == prompt.id,
1557 )
1558 ).first()
1559 if not server_match:
1560 raise PromptNotFoundError(f"Prompt not found: {search_key}")
1561 server_scoped = True
1563 if not arguments:
1564 result = PromptResult(
1565 messages=[
1566 Message(
1567 role=Role.USER,
1568 content=TextContent(type="text", text=prompt.template),
1569 )
1570 ],
1571 description=prompt.description,
1572 )
1573 else:
1574 try:
1575 prompt.validate_arguments(arguments)
1576 rendered = self._render_template(prompt.template, arguments)
1577 messages = self._parse_messages(rendered)
1578 result = PromptResult(messages=messages, description=prompt.description)
1579 except Exception as e:
1580 if span:
1581 span.set_attribute("error", True)
1582 span.set_attribute("error.message", str(e))
1583 raise PromptError(f"Failed to process prompt: {str(e)}")
1585 if has_post_fetch:
1586 post_result, _ = await self._plugin_manager.invoke_hook(
1587 PromptHookType.PROMPT_POST_FETCH,
1588 payload=PromptPosthookPayload(prompt_id=str(prompt.id), result=result),
1589 global_context=global_context,
1590 local_contexts=context_table,
1591 violations_as_exceptions=True,
1592 )
1593 # Use modified payload if provided
1594 result = post_result.modified_payload.result if post_result.modified_payload else result
1596 arguments_supplied = bool(arguments)
1598 audit_trail.log_action(
1599 user_id=user or "anonymous",
1600 action="view_prompt",
1601 resource_type="prompt",
1602 resource_id=str(prompt.id),
1603 resource_name=prompt.name,
1604 team_id=prompt.team_id,
1605 context={
1606 "tenant_id": tenant_id,
1607 "server_id": server_id,
1608 "arguments_provided": arguments_supplied,
1609 "request_id": request_id,
1610 },
1611 db=db,
1612 )
1614 structured_logger.log(
1615 level="INFO",
1616 message="Prompt retrieved successfully",
1617 event_type="prompt_viewed",
1618 component="prompt_service",
1619 user_id=user,
1620 team_id=prompt.team_id,
1621 resource_type="prompt",
1622 resource_id=str(prompt.id),
1623 request_id=request_id,
1624 custom_fields={
1625 "prompt_name": prompt.name,
1626 "arguments_provided": arguments_supplied,
1627 "tenant_id": tenant_id,
1628 "server_id": server_id,
1629 },
1630 )
1632 # Set success attributes on span
1633 if span:
1634 span.set_attribute("success", True)
1635 span.set_attribute("duration.ms", (time.monotonic() - start_time) * 1000)
1636 if result and hasattr(result, "messages"):
1637 span.set_attribute("messages.count", len(result.messages))
1639 success = True
1640 logger.info(f"Retrieved prompt: {prompt.id} successfully")
1641 return result
1643 except Exception as e:
1644 success = False
1645 error_message = str(e)
1646 raise
1647 finally:
1648 # Record metrics only if we found a prompt
1649 if prompt:
1650 try:
1651 metrics_buffer.record_prompt_metric(
1652 prompt_id=prompt.id,
1653 start_time=start_time,
1654 success=success,
1655 error_message=error_message,
1656 )
1657 except Exception as metrics_error:
1658 logger.warning(f"Failed to record prompt metric: {metrics_error}")
1660 # Record server metrics ONLY when the server scoping check passed.
1661 # This prevents recording metrics with unvalidated server_id values
1662 # from admin API headers (X-Server-ID) or RPC params.
1663 if server_scoped:
1664 try:
1665 # Record server metric only for the specific virtual server being accessed
1666 metrics_buffer.record_server_metric(
1667 server_id=server_id,
1668 start_time=start_time,
1669 success=success,
1670 error_message=error_message,
1671 )
1672 except Exception as metrics_error:
1673 logger.warning(f"Failed to record server metric: {metrics_error}")
1675 # End database span for observability dashboard
1676 if db_span_id and observability_service and not db_span_ended:
1677 try:
1678 observability_service.end_span(
1679 db=db,
1680 span_id=db_span_id,
1681 status="ok" if success else "error",
1682 status_message=error_message if error_message else None,
1683 )
1684 db_span_ended = True
1685 logger.debug(f"✓ Ended prompt.render span: {db_span_id}")
1686 except Exception as e:
1687 logger.warning(f"Failed to end observability span for prompt rendering: {e}")
1689 async def update_prompt(
1690 self,
1691 db: Session,
1692 prompt_id: Union[int, str],
1693 prompt_update: PromptUpdate,
1694 modified_by: Optional[str] = None,
1695 modified_from_ip: Optional[str] = None,
1696 modified_via: Optional[str] = None,
1697 modified_user_agent: Optional[str] = None,
1698 user_email: Optional[str] = None,
1699 ) -> PromptRead:
1700 """
1701 Update a prompt template.
1703 Args:
1704 db: Database session
1705 prompt_id: ID of prompt to update
1706 prompt_update: Prompt update object
1707 modified_by: Username of the person modifying the prompt
1708 modified_from_ip: IP address where the modification originated
1709 modified_via: Source of modification (ui/api/import)
1710 modified_user_agent: User agent string from the modification request
1711 user_email: Email of user performing update (for ownership check)
1713 Returns:
1714 The updated PromptRead object
1716 Raises:
1717 PromptNotFoundError: If the prompt is not found
1718 PermissionError: If user doesn't own the prompt
1719 IntegrityError: If a database integrity error occurs.
1720 PromptNameConflictError: If a prompt with the same name already exists.
1721 PromptError: For other update errors
1723 Examples:
1724 >>> from mcpgateway.services.prompt_service import PromptService
1725 >>> from unittest.mock import MagicMock
1726 >>> service = PromptService()
1727 >>> db = MagicMock()
1728 >>> db.execute.return_value.scalar_one_or_none.return_value = MagicMock()
1729 >>> db.commit = MagicMock()
1730 >>> db.refresh = MagicMock()
1731 >>> service._notify_prompt_updated = MagicMock()
1732 >>> service.convert_prompt_to_read = MagicMock(return_value={})
1733 >>> import asyncio
1734 >>> try:
1735 ... asyncio.run(service.update_prompt(db, 'prompt_name', MagicMock()))
1736 ... except Exception:
1737 ... pass
1738 """
1739 try:
1740 # Acquire a row-level lock for the prompt being updated to make
1741 # name-checks and the subsequent update atomic in PostgreSQL.
1742 # For SQLite `get_for_update` falls back to a regular get.
1743 prompt = get_for_update(db, DbPrompt, prompt_id)
1744 if not prompt:
1745 raise PromptNotFoundError(f"Prompt not found: {prompt_id}")
1747 visibility = prompt_update.visibility or prompt.visibility
1748 team_id = prompt_update.team_id or prompt.team_id
1749 owner_email = prompt.owner_email or user_email
1751 candidate_custom_name = prompt.custom_name
1753 if prompt_update.name is not None:
1754 candidate_custom_name = prompt_update.custom_name or prompt_update.name
1755 elif prompt_update.custom_name is not None:
1756 candidate_custom_name = prompt_update.custom_name
1758 computed_name = self._compute_prompt_name(candidate_custom_name, prompt.gateway)
1759 if computed_name != prompt.name:
1760 if visibility.lower() == "public":
1761 # Lock any conflicting row so concurrent updates cannot race.
1762 existing_prompt = get_for_update(db, DbPrompt, where=and_(DbPrompt.name == computed_name, DbPrompt.visibility == "public", DbPrompt.id != prompt.id))
1763 if existing_prompt:
1764 raise PromptNameConflictError(computed_name, enabled=existing_prompt.enabled, prompt_id=existing_prompt.id, visibility=existing_prompt.visibility)
1765 elif visibility.lower() == "team" and team_id:
1766 existing_prompt = get_for_update(db, DbPrompt, where=and_(DbPrompt.name == computed_name, DbPrompt.visibility == "team", DbPrompt.team_id == team_id, DbPrompt.id != prompt.id))
1767 logger.info(f"Existing prompt check result: {existing_prompt}")
1768 if existing_prompt:
1769 raise PromptNameConflictError(computed_name, enabled=existing_prompt.enabled, prompt_id=existing_prompt.id, visibility=existing_prompt.visibility)
1770 elif visibility.lower() == "private":
1771 existing_prompt = get_for_update(
1772 db, DbPrompt, where=and_(DbPrompt.name == computed_name, DbPrompt.visibility == "private", DbPrompt.owner_email == owner_email, DbPrompt.id != prompt.id)
1773 )
1774 if existing_prompt:
1775 raise PromptNameConflictError(computed_name, enabled=existing_prompt.enabled, prompt_id=existing_prompt.id, visibility=existing_prompt.visibility)
1777 # Check ownership if user_email provided
1778 if user_email:
1779 # First-Party
1780 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel
1782 permission_service = PermissionService(db)
1783 if not await permission_service.check_resource_ownership(user_email, prompt):
1784 raise PermissionError("Only the owner can update this prompt")
1786 if prompt_update.name is not None:
1787 if prompt.gateway_id:
1788 prompt.custom_name = prompt_update.custom_name or prompt_update.name
1789 else:
1790 prompt.original_name = prompt_update.name
1791 if prompt_update.custom_name is None:
1792 prompt.custom_name = prompt_update.name
1793 if prompt_update.custom_name is not None:
1794 prompt.custom_name = prompt_update.custom_name
1795 if prompt_update.display_name is not None:
1796 prompt.display_name = prompt_update.display_name
1797 if prompt_update.description is not None:
1798 prompt.description = prompt_update.description
1799 if prompt_update.template is not None:
1800 prompt.template = prompt_update.template
1801 self._validate_template(prompt.template)
1802 # Clear template cache to reduce memory growth
1803 _compile_jinja_template.cache_clear()
1804 if prompt_update.arguments is not None:
1805 required_args = self._get_required_arguments(prompt.template)
1806 argument_schema = {
1807 "type": "object",
1808 "properties": {},
1809 "required": list(required_args),
1810 }
1811 for arg in prompt_update.arguments:
1812 schema = {"type": "string"}
1813 if arg.description is not None:
1814 schema["description"] = arg.description
1815 argument_schema["properties"][arg.name] = schema
1816 prompt.argument_schema = argument_schema
1818 if prompt_update.visibility is not None:
1819 prompt.visibility = prompt_update.visibility
1821 # Update tags if provided
1822 if prompt_update.tags is not None:
1823 prompt.tags = prompt_update.tags
1825 # Update metadata fields
1826 prompt.updated_at = datetime.now(timezone.utc)
1827 if modified_by:
1828 prompt.modified_by = modified_by
1829 if modified_from_ip:
1830 prompt.modified_from_ip = modified_from_ip
1831 if modified_via:
1832 prompt.modified_via = modified_via
1833 if modified_user_agent:
1834 prompt.modified_user_agent = modified_user_agent
1835 if hasattr(prompt, "version") and prompt.version is not None:
1836 prompt.version = prompt.version + 1
1837 else:
1838 prompt.version = 1
1840 db.commit()
1841 db.refresh(prompt)
1843 await self._notify_prompt_updated(prompt)
1845 # Structured logging: Audit trail for prompt update
1846 audit_trail.log_action(
1847 user_id=user_email or modified_by or "system",
1848 action="update_prompt",
1849 resource_type="prompt",
1850 resource_id=str(prompt.id),
1851 resource_name=prompt.name,
1852 user_email=user_email,
1853 team_id=prompt.team_id,
1854 client_ip=modified_from_ip,
1855 user_agent=modified_user_agent,
1856 new_values={"name": prompt.name, "version": prompt.version},
1857 context={"modified_via": modified_via},
1858 db=db,
1859 )
1861 structured_logger.log(
1862 level="INFO",
1863 message="Prompt updated successfully",
1864 event_type="prompt_updated",
1865 component="prompt_service",
1866 user_id=modified_by,
1867 user_email=user_email,
1868 team_id=prompt.team_id,
1869 resource_type="prompt",
1870 resource_id=str(prompt.id),
1871 custom_fields={"prompt_name": prompt.name, "version": prompt.version},
1872 )
1874 prompt.team = self._get_team_name(db, prompt.team_id)
1876 # Invalidate cache after successful update
1877 cache = _get_registry_cache()
1878 await cache.invalidate_prompts()
1879 # Also invalidate tags cache since prompt tags may have changed
1880 # First-Party
1881 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel
1883 await admin_stats_cache.invalidate_tags()
1885 return self.convert_prompt_to_read(prompt)
1887 except PermissionError as pe:
1888 db.rollback()
1890 structured_logger.log(
1891 level="WARNING",
1892 message="Prompt update failed due to permission error",
1893 event_type="prompt_update_permission_denied",
1894 component="prompt_service",
1895 user_email=user_email,
1896 resource_type="prompt",
1897 resource_id=str(prompt_id),
1898 error=pe,
1899 )
1900 raise
1901 except IntegrityError as ie:
1902 db.rollback()
1903 logger.error(f"IntegrityErrors in group: {ie}")
1905 structured_logger.log(
1906 level="ERROR",
1907 message="Prompt update failed due to database integrity error",
1908 event_type="prompt_update_failed",
1909 component="prompt_service",
1910 user_email=user_email,
1911 resource_type="prompt",
1912 resource_id=str(prompt_id),
1913 error=ie,
1914 )
1915 raise ie
1916 except PromptNotFoundError as e:
1917 db.rollback()
1918 logger.error(f"Prompt not found: {e}")
1920 structured_logger.log(
1921 level="ERROR",
1922 message="Prompt update failed - prompt not found",
1923 event_type="prompt_not_found",
1924 component="prompt_service",
1925 user_email=user_email,
1926 resource_type="prompt",
1927 resource_id=str(prompt_id),
1928 error=e,
1929 )
1930 raise e
1931 except PromptNameConflictError as pnce:
1932 db.rollback()
1933 logger.error(f"Prompt name conflict: {pnce}")
1935 structured_logger.log(
1936 level="WARNING",
1937 message="Prompt update failed due to name conflict",
1938 event_type="prompt_name_conflict",
1939 component="prompt_service",
1940 user_email=user_email,
1941 resource_type="prompt",
1942 resource_id=str(prompt_id),
1943 error=pnce,
1944 )
1945 raise pnce
1946 except Exception as e:
1947 db.rollback()
1949 structured_logger.log(
1950 level="ERROR",
1951 message="Prompt update failed",
1952 event_type="prompt_update_failed",
1953 component="prompt_service",
1954 user_email=user_email,
1955 resource_type="prompt",
1956 resource_id=str(prompt_id),
1957 error=e,
1958 )
1959 raise PromptError(f"Failed to update prompt: {str(e)}")
1961 async def set_prompt_state(self, db: Session, prompt_id: int, activate: bool, user_email: Optional[str] = None, skip_cache_invalidation: bool = False) -> PromptRead:
1962 """
1963 Set the activation status of a prompt.
1965 Args:
1966 db: Database session
1967 prompt_id: Prompt ID
1968 activate: True to activate, False to deactivate
1969 user_email: Optional[str] The email of the user to check if the user has permission to modify.
1970 skip_cache_invalidation: If True, skip cache invalidation (used for batch operations).
1972 Returns:
1973 The updated PromptRead object
1975 Raises:
1976 PromptNotFoundError: If the prompt is not found.
1977 PromptLockConflictError: If the prompt is locked by another transaction.
1978 PromptError: For other errors.
1979 PermissionError: If user doesn't own the prompt.
1981 Examples:
1982 >>> from mcpgateway.services.prompt_service import PromptService
1983 >>> from unittest.mock import MagicMock
1984 >>> service = PromptService()
1985 >>> db = MagicMock()
1986 >>> prompt = MagicMock()
1987 >>> db.get.return_value = prompt
1988 >>> db.commit = MagicMock()
1989 >>> db.refresh = MagicMock()
1990 >>> service._notify_prompt_activated = MagicMock()
1991 >>> service._notify_prompt_deactivated = MagicMock()
1992 >>> service.convert_prompt_to_read = MagicMock(return_value={})
1993 >>> import asyncio
1994 >>> try:
1995 ... asyncio.run(service.set_prompt_state(db, 1, True))
1996 ... except Exception:
1997 ... pass
1998 """
1999 try:
2000 # Use nowait=True to fail fast if row is locked, preventing lock contention under high load
2001 try:
2002 prompt = get_for_update(db, DbPrompt, prompt_id, nowait=True)
2003 except OperationalError as lock_err:
2004 # Row is locked by another transaction - fail fast with 409
2005 db.rollback()
2006 raise PromptLockConflictError(f"Prompt {prompt_id} is currently being modified by another request") from lock_err
2007 if not prompt:
2008 raise PromptNotFoundError(f"Prompt not found: {prompt_id}")
2010 if user_email:
2011 # First-Party
2012 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel
2014 permission_service = PermissionService(db)
2015 if not await permission_service.check_resource_ownership(user_email, prompt):
2016 raise PermissionError("Only the owner can activate the Prompt" if activate else "Only the owner can deactivate the Prompt")
2018 if prompt.enabled != activate:
2019 prompt.enabled = activate
2020 prompt.updated_at = datetime.now(timezone.utc)
2021 db.commit()
2022 db.refresh(prompt)
2024 # Invalidate cache after status change (skip for batch operations)
2025 if not skip_cache_invalidation:
2026 cache = _get_registry_cache()
2027 await cache.invalidate_prompts()
2029 if activate:
2030 await self._notify_prompt_activated(prompt)
2031 else:
2032 await self._notify_prompt_deactivated(prompt)
2033 logger.info(f"Prompt {prompt.name} {'activated' if activate else 'deactivated'}")
2035 # Structured logging: Audit trail for prompt state change
2036 audit_trail.log_action(
2037 user_id=user_email or "system",
2038 action="set_prompt_state",
2039 resource_type="prompt",
2040 resource_id=str(prompt.id),
2041 resource_name=prompt.name,
2042 user_email=user_email,
2043 team_id=prompt.team_id,
2044 new_values={"enabled": prompt.enabled},
2045 context={"action": "activate" if activate else "deactivate"},
2046 db=db,
2047 )
2049 structured_logger.log(
2050 level="INFO",
2051 message=f"Prompt {'activated' if activate else 'deactivated'} successfully",
2052 event_type="prompt_state_changed",
2053 component="prompt_service",
2054 user_email=user_email,
2055 team_id=prompt.team_id,
2056 resource_type="prompt",
2057 resource_id=str(prompt.id),
2058 custom_fields={"prompt_name": prompt.name, "enabled": prompt.enabled},
2059 )
2061 prompt.team = self._get_team_name(db, prompt.team_id)
2062 return self.convert_prompt_to_read(prompt)
2063 except PermissionError as e:
2064 structured_logger.log(
2065 level="WARNING",
2066 message="Prompt state change failed due to permission error",
2067 event_type="prompt_state_change_permission_denied",
2068 component="prompt_service",
2069 user_email=user_email,
2070 resource_type="prompt",
2071 resource_id=str(prompt_id),
2072 error=e,
2073 )
2074 raise e
2075 except PromptLockConflictError:
2076 # Re-raise lock conflicts without wrapping - allows 409 response
2077 raise
2078 except PromptNotFoundError:
2079 # Re-raise not found without wrapping - allows 404 response
2080 raise
2081 except Exception as e:
2082 db.rollback()
2084 structured_logger.log(
2085 level="ERROR",
2086 message="Prompt state change failed",
2087 event_type="prompt_state_change_failed",
2088 component="prompt_service",
2089 user_email=user_email,
2090 resource_type="prompt",
2091 resource_id=str(prompt_id),
2092 error=e,
2093 )
2094 raise PromptError(f"Failed to set prompt state: {str(e)}")
2096 # Get prompt details for admin ui
2098 async def get_prompt_details(self, db: Session, prompt_id: Union[int, str], include_inactive: bool = False) -> Dict[str, Any]: # pylint: disable=unused-argument
2099 """
2100 Get prompt details by ID.
2102 Args:
2103 db: Database session
2104 prompt_id: ID of prompt
2105 include_inactive: Whether to include inactive prompts
2107 Returns:
2108 Dictionary of prompt details
2110 Raises:
2111 PromptNotFoundError: If the prompt is not found
2113 Examples:
2114 >>> from mcpgateway.services.prompt_service import PromptService
2115 >>> from unittest.mock import MagicMock
2116 >>> service = PromptService()
2117 >>> db = MagicMock()
2118 >>> prompt_dict = {'id': '1', 'name': 'test', 'description': 'desc', 'template': 'tpl', 'arguments': [], 'createdAt': '2023-01-01T00:00:00', 'updatedAt': '2023-01-01T00:00:00', 'isActive': True, 'metrics': {}}
2119 >>> service.convert_prompt_to_read = MagicMock(return_value=prompt_dict)
2120 >>> db.execute.return_value.scalar_one_or_none.return_value = MagicMock()
2121 >>> import asyncio
2122 >>> result = asyncio.run(service.get_prompt_details(db, 'prompt_name'))
2123 >>> result == prompt_dict
2124 True
2125 """
2126 prompt = db.get(DbPrompt, prompt_id)
2127 if not prompt:
2128 raise PromptNotFoundError(f"Prompt not found: {prompt_id}")
2129 # Return the fully converted prompt including metrics
2130 prompt.team = self._get_team_name(db, prompt.team_id)
2131 prompt_data = self.convert_prompt_to_read(prompt)
2133 audit_trail.log_action(
2134 user_id="system",
2135 action="view_prompt_details",
2136 resource_type="prompt",
2137 resource_id=str(prompt.id),
2138 resource_name=prompt.name,
2139 team_id=prompt.team_id,
2140 context={"include_inactive": include_inactive},
2141 db=db,
2142 )
2144 structured_logger.log(
2145 level="INFO",
2146 message="Prompt details retrieved",
2147 event_type="prompt_details_viewed",
2148 component="prompt_service",
2149 resource_type="prompt",
2150 resource_id=str(prompt.id),
2151 team_id=prompt.team_id,
2152 custom_fields={
2153 "prompt_name": prompt.name,
2154 "include_inactive": include_inactive,
2155 },
2156 )
2158 return prompt_data
2160 async def delete_prompt(self, db: Session, prompt_id: Union[int, str], user_email: Optional[str] = None, purge_metrics: bool = False) -> None:
2161 """
2162 Delete a prompt template by its ID.
2164 Args:
2165 db (Session): Database session.
2166 prompt_id (str): ID of the prompt to delete.
2167 user_email (Optional[str]): Email of user performing delete (for ownership check).
2168 purge_metrics (bool): If True, delete raw + rollup metrics for this prompt.
2170 Raises:
2171 PromptNotFoundError: If the prompt is not found.
2172 PermissionError: If user doesn't own the prompt.
2173 PromptError: For other deletion errors.
2174 Exception: For unexpected errors.
2176 Examples:
2177 >>> from mcpgateway.services.prompt_service import PromptService
2178 >>> from unittest.mock import MagicMock
2179 >>> service = PromptService()
2180 >>> db = MagicMock()
2181 >>> prompt = MagicMock()
2182 >>> db.get.return_value = prompt
2183 >>> db.delete = MagicMock()
2184 >>> db.commit = MagicMock()
2185 >>> service._notify_prompt_deleted = MagicMock()
2186 >>> import asyncio
2187 >>> try:
2188 ... asyncio.run(service.delete_prompt(db, '123'))
2189 ... except Exception:
2190 ... pass
2191 """
2192 try:
2193 prompt = db.get(DbPrompt, prompt_id)
2194 if not prompt:
2195 raise PromptNotFoundError(f"Prompt not found: {prompt_id}")
2197 # Check ownership if user_email provided
2198 if user_email:
2199 # First-Party
2200 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel
2202 permission_service = PermissionService(db)
2203 if not await permission_service.check_resource_ownership(user_email, prompt):
2204 raise PermissionError("Only the owner can delete this prompt")
2206 prompt_info = {"id": prompt.id, "name": prompt.name}
2207 prompt_name = prompt.name
2208 prompt_team_id = prompt.team_id
2210 if purge_metrics:
2211 with pause_rollup_during_purge(reason=f"purge_prompt:{prompt_id}"):
2212 delete_metrics_in_batches(db, PromptMetric, PromptMetric.prompt_id, prompt_id)
2213 delete_metrics_in_batches(db, PromptMetricsHourly, PromptMetricsHourly.prompt_id, prompt_id)
2215 db.delete(prompt)
2216 db.commit()
2217 await self._notify_prompt_deleted(prompt_info)
2218 logger.info(f"Deleted prompt: {prompt_info['name']}")
2220 # Structured logging: Audit trail for prompt deletion
2221 audit_trail.log_action(
2222 user_id=user_email or "system",
2223 action="delete_prompt",
2224 resource_type="prompt",
2225 resource_id=str(prompt_info["id"]),
2226 resource_name=prompt_name,
2227 user_email=user_email,
2228 team_id=prompt_team_id,
2229 old_values={"name": prompt_name},
2230 db=db,
2231 )
2233 # Structured logging: Log successful prompt deletion
2234 structured_logger.log(
2235 level="INFO",
2236 message="Prompt deleted successfully",
2237 event_type="prompt_deleted",
2238 component="prompt_service",
2239 user_email=user_email,
2240 team_id=prompt_team_id,
2241 resource_type="prompt",
2242 resource_id=str(prompt_info["id"]),
2243 custom_fields={
2244 "prompt_name": prompt_name,
2245 "purge_metrics": purge_metrics,
2246 },
2247 )
2249 # Invalidate cache after successful deletion
2250 cache = _get_registry_cache()
2251 await cache.invalidate_prompts()
2252 # Also invalidate tags cache since prompt tags may have changed
2253 # First-Party
2254 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel
2256 await admin_stats_cache.invalidate_tags()
2257 except PermissionError as pe:
2258 db.rollback()
2260 # Structured logging: Log permission error
2261 structured_logger.log(
2262 level="WARNING",
2263 message="Prompt deletion failed due to permission error",
2264 event_type="prompt_delete_permission_denied",
2265 component="prompt_service",
2266 user_email=user_email,
2267 resource_type="prompt",
2268 resource_id=str(prompt_id),
2269 error=pe,
2270 )
2271 raise
2272 except Exception as e:
2273 db.rollback()
2274 if isinstance(e, PromptNotFoundError):
2275 # Structured logging: Log not found error
2276 structured_logger.log(
2277 level="ERROR",
2278 message="Prompt deletion failed - prompt not found",
2279 event_type="prompt_not_found",
2280 component="prompt_service",
2281 user_email=user_email,
2282 resource_type="prompt",
2283 resource_id=str(prompt_id),
2284 error=e,
2285 )
2286 raise e
2288 # Structured logging: Log generic prompt deletion failure
2289 structured_logger.log(
2290 level="ERROR",
2291 message="Prompt deletion failed",
2292 event_type="prompt_deletion_failed",
2293 component="prompt_service",
2294 user_email=user_email,
2295 resource_type="prompt",
2296 resource_id=str(prompt_id),
2297 error=e,
2298 )
2299 raise PromptError(f"Failed to delete prompt: {str(e)}")
2301 async def subscribe_events(self) -> AsyncGenerator[Dict[str, Any], None]:
2302 """Subscribe to Prompt events via the EventService.
2304 Yields:
2305 Prompt event messages.
2306 """
2307 async for event in self._event_service.subscribe_events():
2308 yield event
2310 def _validate_template(self, template: str) -> None:
2311 """Validate template syntax.
2313 Args:
2314 template: Template to validate
2316 Raises:
2317 PromptValidationError: If template is invalid
2319 Examples:
2320 >>> from mcpgateway.services.prompt_service import PromptService
2321 >>> service = PromptService()
2322 >>> service._validate_template("Hello {{ name }}") # Valid template
2323 >>> try:
2324 ... service._validate_template("Hello {{ invalid") # Invalid template
2325 ... except Exception as e:
2326 ... "Invalid template syntax" in str(e)
2327 True
2328 """
2329 try:
2330 self._jinja_env.parse(template)
2331 except Exception as e:
2332 raise PromptValidationError(f"Invalid template syntax: {str(e)}")
2334 def _get_required_arguments(self, template: str) -> Set[str]:
2335 """Extract required arguments from template.
2337 Args:
2338 template: Template to analyze
2340 Returns:
2341 Set of required argument names
2343 Examples:
2344 >>> from mcpgateway.services.prompt_service import PromptService
2345 >>> service = PromptService()
2346 >>> args = service._get_required_arguments("Hello {{ name }} from {{ place }}")
2347 >>> sorted(args)
2348 ['name', 'place']
2349 >>> service._get_required_arguments("No variables") == set()
2350 True
2351 """
2352 ast = self._jinja_env.parse(template)
2353 variables = meta.find_undeclared_variables(ast)
2354 formatter = Formatter()
2355 format_vars = {field_name for _, field_name, _, _ in formatter.parse(template) if field_name is not None}
2356 return variables.union(format_vars)
2358 def _render_template(self, template: str, arguments: Dict[str, str]) -> str:
2359 """Render template with arguments using cached compiled templates.
2361 Args:
2362 template: Template to render
2363 arguments: Arguments for rendering
2365 Returns:
2366 Rendered template text
2368 Raises:
2369 PromptError: If rendering fails
2371 Examples:
2372 >>> from mcpgateway.services.prompt_service import PromptService
2373 >>> service = PromptService()
2374 >>> result = service._render_template("Hello {{ name }}", {"name": "World"})
2375 >>> result
2376 'Hello World'
2377 >>> service._render_template("No variables", {})
2378 'No variables'
2379 """
2380 try:
2381 jinja_template = _compile_jinja_template(template)
2382 return jinja_template.render(**arguments)
2383 except Exception:
2384 try:
2385 return template.format(**arguments)
2386 except Exception as e:
2387 raise PromptError(f"Failed to render template: {str(e)}")
2389 def _parse_messages(self, text: str) -> List[Message]:
2390 """Parse rendered text into messages.
2392 Args:
2393 text: Text to parse
2395 Returns:
2396 List of parsed messages
2398 Examples:
2399 >>> from mcpgateway.services.prompt_service import PromptService
2400 >>> service = PromptService()
2401 >>> messages = service._parse_messages("Simple text")
2402 >>> len(messages)
2403 1
2404 >>> messages[0].role.value
2405 'user'
2406 >>> messages = service._parse_messages("# User:\\nHello\\n# Assistant:\\nHi there")
2407 >>> len(messages)
2408 2
2409 """
2410 messages = []
2411 current_role = Role.USER
2412 current_text = []
2413 for line in text.split("\n"):
2414 if line.startswith("# Assistant:"):
2415 if current_text:
2416 messages.append(
2417 Message(
2418 role=current_role,
2419 content=TextContent(type="text", text="\n".join(current_text).strip()),
2420 )
2421 )
2422 current_role = Role.ASSISTANT
2423 current_text = []
2424 elif line.startswith("# User:"):
2425 if current_text:
2426 messages.append(
2427 Message(
2428 role=current_role,
2429 content=TextContent(type="text", text="\n".join(current_text).strip()),
2430 )
2431 )
2432 current_role = Role.USER
2433 current_text = []
2434 else:
2435 current_text.append(line)
2436 if current_text:
2437 messages.append(
2438 Message(
2439 role=current_role,
2440 content=TextContent(type="text", text="\n".join(current_text).strip()),
2441 )
2442 )
2443 return messages
2445 async def _notify_prompt_added(self, prompt: DbPrompt) -> None:
2446 """
2447 Notify subscribers of prompt addition.
2449 Args:
2450 prompt: Prompt to add
2451 """
2452 event = {
2453 "type": "prompt_added",
2454 "data": {
2455 "id": prompt.id,
2456 "name": prompt.name,
2457 "description": prompt.description,
2458 "enabled": prompt.enabled,
2459 },
2460 "timestamp": datetime.now(timezone.utc).isoformat(),
2461 }
2462 await self._publish_event(event)
2464 async def _notify_prompt_updated(self, prompt: DbPrompt) -> None:
2465 """
2466 Notify subscribers of prompt update.
2468 Args:
2469 prompt: Prompt to update
2470 """
2471 event = {
2472 "type": "prompt_updated",
2473 "data": {
2474 "id": prompt.id,
2475 "name": prompt.name,
2476 "description": prompt.description,
2477 "enabled": prompt.enabled,
2478 },
2479 "timestamp": datetime.now(timezone.utc).isoformat(),
2480 }
2481 await self._publish_event(event)
2483 async def _notify_prompt_activated(self, prompt: DbPrompt) -> None:
2484 """
2485 Notify subscribers of prompt activation.
2487 Args:
2488 prompt: Prompt to activate
2489 """
2490 event = {
2491 "type": "prompt_activated",
2492 "data": {"id": prompt.id, "name": prompt.name, "enabled": True},
2493 "timestamp": datetime.now(timezone.utc).isoformat(),
2494 }
2495 await self._publish_event(event)
2497 async def _notify_prompt_deactivated(self, prompt: DbPrompt) -> None:
2498 """
2499 Notify subscribers of prompt deactivation.
2501 Args:
2502 prompt: Prompt to deactivate
2503 """
2504 event = {
2505 "type": "prompt_deactivated",
2506 "data": {"id": prompt.id, "name": prompt.name, "enabled": False},
2507 "timestamp": datetime.now(timezone.utc).isoformat(),
2508 }
2509 await self._publish_event(event)
2511 async def _notify_prompt_deleted(self, prompt_info: Dict[str, Any]) -> None:
2512 """
2513 Notify subscribers of prompt deletion.
2515 Args:
2516 prompt_info: Dict on prompt to notify as deleted
2517 """
2518 event = {
2519 "type": "prompt_deleted",
2520 "data": prompt_info,
2521 "timestamp": datetime.now(timezone.utc).isoformat(),
2522 }
2523 await self._publish_event(event)
2525 async def _notify_prompt_removed(self, prompt: DbPrompt) -> None:
2526 """
2527 Notify subscribers of prompt removal (deactivation).
2529 Args:
2530 prompt: Prompt to remove
2531 """
2532 event = {
2533 "type": "prompt_removed",
2534 "data": {"id": prompt.id, "name": prompt.name, "enabled": False},
2535 "timestamp": datetime.now(timezone.utc).isoformat(),
2536 }
2537 await self._publish_event(event)
2539 async def _publish_event(self, event: Dict[str, Any]) -> None:
2540 """
2541 Publish event to all subscribers via the EventService.
2543 Args:
2544 event: Event to publish
2545 """
2546 await self._event_service.publish_event(event)
2548 # --- Metrics ---
2549 async def aggregate_metrics(self, db: Session) -> PromptMetrics:
2550 """
2551 Aggregate metrics for all prompt invocations across all prompts.
2553 Combines recent raw metrics (within retention period) with historical
2554 hourly rollups for complete historical coverage. Uses in-memory caching
2555 (10s TTL) to reduce database load under high request rates.
2557 Args:
2558 db: Database session
2560 Returns:
2561 PromptMetrics: Aggregated prompt metrics from raw + hourly rollups.
2563 Examples:
2564 >>> from mcpgateway.services.prompt_service import PromptService
2565 >>> service = PromptService()
2566 >>> # Method exists and is callable
2567 >>> callable(service.aggregate_metrics)
2568 True
2569 """
2570 # Check cache first (if enabled)
2571 # First-Party
2572 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel
2574 if is_cache_enabled():
2575 cached = metrics_cache.get("prompts")
2576 if cached is not None:
2577 return PromptMetrics(**cached)
2579 # Use combined raw + rollup query for full historical coverage
2580 # First-Party
2581 from mcpgateway.services.metrics_query_service import aggregate_metrics_combined # pylint: disable=import-outside-toplevel
2583 result = aggregate_metrics_combined(db, "prompt")
2585 metrics = PromptMetrics(
2586 total_executions=result.total_executions,
2587 successful_executions=result.successful_executions,
2588 failed_executions=result.failed_executions,
2589 failure_rate=result.failure_rate,
2590 min_response_time=result.min_response_time,
2591 max_response_time=result.max_response_time,
2592 avg_response_time=result.avg_response_time,
2593 last_execution_time=result.last_execution_time,
2594 )
2596 # Cache the result as dict for serialization compatibility (if enabled)
2597 if is_cache_enabled():
2598 metrics_cache.set("prompts", metrics.model_dump())
2600 return metrics
2602 async def reset_metrics(self, db: Session) -> None:
2603 """
2604 Reset all prompt metrics by deleting raw and hourly rollup records.
2606 Args:
2607 db: Database session
2609 Examples:
2610 >>> from mcpgateway.services.prompt_service import PromptService
2611 >>> from unittest.mock import MagicMock
2612 >>> service = PromptService()
2613 >>> db = MagicMock()
2614 >>> db.execute = MagicMock()
2615 >>> db.commit = MagicMock()
2616 >>> import asyncio
2617 >>> asyncio.run(service.reset_metrics(db))
2618 """
2620 db.execute(delete(PromptMetric))
2621 db.execute(delete(PromptMetricsHourly))
2622 db.commit()
2624 # Invalidate metrics cache
2625 # First-Party
2626 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel
2628 metrics_cache.invalidate("prompts")
2629 metrics_cache.invalidate_prefix("top_prompts:")
2632# Lazy singleton - created on first access, not at module import time.
2633# This avoids instantiation when only exception classes are imported.
2634_prompt_service_instance = None # pylint: disable=invalid-name
2637def __getattr__(name: str):
2638 """Module-level __getattr__ for lazy singleton creation.
2640 Args:
2641 name: The attribute name being accessed.
2643 Returns:
2644 The prompt_service singleton instance if name is "prompt_service".
2646 Raises:
2647 AttributeError: If the attribute name is not "prompt_service".
2648 """
2649 global _prompt_service_instance # pylint: disable=global-statement
2650 if name == "prompt_service":
2651 if _prompt_service_instance is None:
2652 _prompt_service_instance = PromptService()
2653 return _prompt_service_instance
2654 raise AttributeError(f"module {__name__!r} has no attribute {name!r}")