Coverage for mcpgateway / services / a2a_service.py: 98%
699 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2# pylint: disable=invalid-name, import-outside-toplevel, unused-import, no-name-in-module
3"""Location: ./mcpgateway/services/a2a_service.py
4Copyright 2025
5SPDX-License-Identifier: Apache-2.0
6Authors: Mihai Criveti
8A2A Agent Service
10This module implements A2A (Agent-to-Agent) agent management for ContextForge.
11It handles agent registration, listing, retrieval, updates, activation toggling, deletion,
12and interactions with A2A-compatible agents.
13"""
15# Standard
16import binascii
17from datetime import datetime, timezone
18from typing import Any, AsyncGenerator, Dict, List, Optional, Union
20# Third-Party
21from pydantic import ValidationError
22from sqlalchemy import and_, delete, desc, or_, select, update
23from sqlalchemy.exc import IntegrityError
24from sqlalchemy.orm import Session
26# First-Party
27from mcpgateway.cache.a2a_stats_cache import a2a_stats_cache
28from mcpgateway.db import A2AAgent as DbA2AAgent
29from mcpgateway.db import A2AAgentMetric, A2AAgentMetricsHourly, EmailTeam
30from mcpgateway.db import EmailTeamMember as DbEmailTeamMember
31from mcpgateway.db import fresh_db_session, get_for_update
32from mcpgateway.db import Tool as DbTool
33from mcpgateway.observability import create_span, set_span_attribute, set_span_error
34from mcpgateway.schemas import A2AAgentAggregateMetrics, A2AAgentCreate, A2AAgentMetrics, A2AAgentRead, A2AAgentUpdate
35from mcpgateway.services.base_service import BaseService
36from mcpgateway.services.encryption_service import protect_oauth_config_for_storage
37from mcpgateway.services.logging_service import LoggingService
38from mcpgateway.services.metrics_cleanup_service import delete_metrics_in_batches, pause_rollup_during_purge
39from mcpgateway.services.structured_logger import get_structured_logger
40from mcpgateway.services.team_management_service import TeamManagementService
41from mcpgateway.utils.correlation_id import get_correlation_id
42from mcpgateway.utils.create_slug import slugify
43from mcpgateway.utils.pagination import unified_paginate
44from mcpgateway.utils.services_auth import decode_auth, encode_auth
45from mcpgateway.utils.sqlalchemy_modifier import json_contains_tag_expr
46from mcpgateway.utils.trace_redaction import is_input_capture_enabled, is_output_capture_enabled, serialize_trace_payload
48# Cache import (lazy to avoid circular dependencies)
49_REGISTRY_CACHE = None
50_TOOL_LOOKUP_CACHE = None
53def _get_registry_cache():
54 """Get registry cache singleton lazily.
56 Returns:
57 RegistryCache instance.
58 """
59 global _REGISTRY_CACHE # pylint: disable=global-statement
60 if _REGISTRY_CACHE is None:
61 # First-Party
62 from mcpgateway.cache.registry_cache import registry_cache # pylint: disable=import-outside-toplevel
64 _REGISTRY_CACHE = registry_cache
65 return _REGISTRY_CACHE
68def _get_tool_lookup_cache():
69 """Get tool lookup cache singleton lazily.
71 Returns:
72 ToolLookupCache instance.
73 """
74 global _TOOL_LOOKUP_CACHE # pylint: disable=global-statement
75 if _TOOL_LOOKUP_CACHE is None:
76 # First-Party
77 from mcpgateway.cache.tool_lookup_cache import tool_lookup_cache # pylint: disable=import-outside-toplevel
79 _TOOL_LOOKUP_CACHE = tool_lookup_cache
80 return _TOOL_LOOKUP_CACHE
83# Initialize logging service first
84logging_service = LoggingService()
85logger = logging_service.get_logger(__name__)
87# Initialize structured logger for A2A lifecycle tracking
88structured_logger = get_structured_logger("a2a_service")
91class A2AAgentError(Exception):
92 """Base class for A2A agent-related errors.
94 Examples:
95 >>> try:
96 ... raise A2AAgentError("Agent operation failed")
97 ... except A2AAgentError as e:
98 ... str(e)
99 'Agent operation failed'
100 >>> try:
101 ... raise A2AAgentError("Connection error")
102 ... except Exception as e:
103 ... isinstance(e, A2AAgentError)
104 True
105 """
108class A2AAgentNotFoundError(A2AAgentError):
109 """Raised when a requested A2A agent is not found.
111 Examples:
112 >>> try:
113 ... raise A2AAgentNotFoundError("Agent 'test-agent' not found")
114 ... except A2AAgentNotFoundError as e:
115 ... str(e)
116 "Agent 'test-agent' not found"
117 >>> try:
118 ... raise A2AAgentNotFoundError("No such agent")
119 ... except A2AAgentError as e:
120 ... isinstance(e, A2AAgentError) # Should inherit from A2AAgentError
121 True
122 """
125class A2AAgentNameConflictError(A2AAgentError):
126 """Raised when an A2A agent name conflicts with an existing one."""
128 def __init__(self, name: str, is_active: bool = True, agent_id: Optional[str] = None, visibility: Optional[str] = "public"):
129 """Initialize an A2AAgentNameConflictError exception.
131 Creates an exception that indicates an agent name conflict, with additional
132 context about whether the conflicting agent is active and its ID if known.
134 Args:
135 name: The agent name that caused the conflict.
136 is_active: Whether the conflicting agent is currently active.
137 agent_id: The ID of the conflicting agent, if known.
138 visibility: The visibility level of the conflicting agent (private, team, public).
140 Examples:
141 >>> error = A2AAgentNameConflictError("test-agent")
142 >>> error.name
143 'test-agent'
144 >>> error.is_active
145 True
146 >>> error.agent_id is None
147 True
148 >>> "test-agent" in str(error)
149 True
150 >>>
151 >>> # Test inactive agent conflict
152 >>> error = A2AAgentNameConflictError("inactive-agent", is_active=False, agent_id="agent-123")
153 >>> error.is_active
154 False
155 >>> error.agent_id
156 'agent-123'
157 >>> "inactive" in str(error)
158 True
159 >>> "agent-123" in str(error)
160 True
161 """
162 self.name = name
163 self.is_active = is_active
164 self.agent_id = agent_id
165 message = f"{visibility.capitalize()} A2A Agent already exists with name: {name}"
166 if not is_active:
167 message += f" (currently inactive, ID: {agent_id})"
168 super().__init__(message)
171def _validate_a2a_team_assignment(db: Session, user_email: Optional[str], target_team_id: Optional[str]) -> None:
172 """Validate team assignment for A2A agent updates.
174 Args:
175 db: Database session used for membership checks.
176 user_email: Requesting user email. When omitted, ownership checks are skipped.
177 target_team_id: Team identifier to validate.
179 Raises:
180 ValueError: If team does not exist or caller lacks ownership.
181 """
182 if not target_team_id:
183 raise ValueError("Cannot set visibility to 'team' without a team_id")
185 team = db.query(EmailTeam).filter(EmailTeam.id == target_team_id).first()
186 if not team:
187 raise ValueError(f"Team {target_team_id} not found")
189 if not user_email:
190 return
192 membership = (
193 db.query(DbEmailTeamMember)
194 .filter(DbEmailTeamMember.team_id == target_team_id, DbEmailTeamMember.user_email == user_email, DbEmailTeamMember.is_active, DbEmailTeamMember.role == "owner")
195 .first()
196 )
197 if not membership:
198 raise ValueError("User membership in team not sufficient for this update.")
201class A2AAgentService(BaseService):
202 """Service for managing A2A agents in the gateway.
204 Provides methods to create, list, retrieve, update, set state, and delete agent records.
205 Also supports interactions with A2A-compatible agents.
206 """
208 _visibility_model_cls = DbA2AAgent
210 def __init__(self) -> None:
211 """Initialize a new A2AAgentService instance."""
212 self._initialized = False
213 self._event_streams: List[AsyncGenerator[str, None]] = []
215 async def initialize(self) -> None:
216 """Initialize the A2A agent service."""
217 if not self._initialized:
218 logger.info("Initializing A2A Agent Service")
219 self._initialized = True
221 async def shutdown(self) -> None:
222 """Shutdown the A2A agent service and cleanup resources."""
223 if self._initialized:
224 logger.info("Shutting down A2A Agent Service")
225 self._initialized = False
227 def _get_team_name(self, db: Session, team_id: Optional[str]) -> Optional[str]:
228 """Retrieve the team name given a team ID.
230 Args:
231 db (Session): Database session for querying teams.
232 team_id (Optional[str]): The ID of the team.
234 Returns:
235 Optional[str]: The name of the team if found, otherwise None.
236 """
237 if not team_id:
238 return None
240 team = db.query(EmailTeam).filter(EmailTeam.id == team_id, EmailTeam.is_active.is_(True)).first()
241 db.commit() # Release transaction to avoid idle-in-transaction
242 return team.name if team else None
244 def _batch_get_team_names(self, db: Session, team_ids: List[str]) -> Dict[str, str]:
245 """Batch retrieve team names for multiple team IDs.
247 This method fetches team names in a single query to avoid N+1 issues
248 when converting multiple agents to schemas in list operations.
250 Args:
251 db (Session): Database session for querying teams.
252 team_ids (List[str]): List of team IDs to look up.
254 Returns:
255 Dict[str, str]: Mapping of team_id -> team_name for active teams.
256 """
257 if not team_ids:
258 return {}
260 # Single query for all teams
261 teams = db.query(EmailTeam.id, EmailTeam.name).filter(EmailTeam.id.in_(team_ids), EmailTeam.is_active.is_(True)).all()
263 return {team.id: team.name for team in teams}
265 def _check_agent_access(
266 self,
267 agent: DbA2AAgent,
268 user_email: Optional[str],
269 token_teams: Optional[List[str]],
270 ) -> bool:
271 """Check if user has access to agent based on visibility rules.
273 Access rules (matching tools/resources/prompts):
274 - public visibility: Always allowed
275 - token_teams is None AND user_email is None: Admin bypass (unrestricted access)
276 - No user context (but not admin): Deny access to non-public agents
277 - team visibility: Allowed if agent.team_id in token_teams
278 - private visibility: Allowed if owner (requires user_email and non-empty token_teams)
280 Args:
281 agent: The agent to check access for
282 user_email: User's email for owner matching
283 token_teams: Teams from JWT. None = admin bypass, [] = public-only (no owner access)
285 Returns:
286 True if access allowed, False otherwise.
287 """
288 # Public agents are accessible by everyone
289 if agent.visibility == "public":
290 return True
292 # Admin bypass: token_teams=None AND user_email=None means unrestricted admin
293 # This happens when is_admin=True and no team scoping in token
294 if token_teams is None and user_email is None:
295 return True
297 # No user context (but not admin) = deny access to non-public agents
298 if not user_email:
299 return False
301 # Public-only tokens (empty teams array) can ONLY access public agents
302 is_public_only_token = token_teams is not None and len(token_teams) == 0
303 if is_public_only_token:
304 return False # Already checked public above
306 # Owner can access their own private agents
307 if agent.visibility == "private" and agent.owner_email and agent.owner_email == user_email:
308 return True
310 # Team agents: check team membership
311 # token_teams=None means admin bypass — allow all team agents
312 # ([] already handled by public-only check above)
313 if agent.visibility == "team":
314 if token_teams is None:
315 return True
316 return agent.team_id in token_teams
318 return False
320 async def register_agent(
321 self,
322 db: Session,
323 agent_data: A2AAgentCreate,
324 created_by: Optional[str] = None,
325 created_from_ip: Optional[str] = None,
326 created_via: Optional[str] = None,
327 created_user_agent: Optional[str] = None,
328 import_batch_id: Optional[str] = None,
329 federation_source: Optional[str] = None,
330 team_id: Optional[str] = None,
331 owner_email: Optional[str] = None,
332 visibility: Optional[str] = "public",
333 ) -> A2AAgentRead:
334 """Register a new A2A agent.
336 Args:
337 db (Session): Database session.
338 agent_data (A2AAgentCreate): Data required to create an agent.
339 created_by (Optional[str]): User who created the agent.
340 created_from_ip (Optional[str]): IP address of the creator.
341 created_via (Optional[str]): Method used for creation (e.g., API, import).
342 created_user_agent (Optional[str]): User agent of the creation request.
343 import_batch_id (Optional[str]): UUID of a bulk import batch.
344 federation_source (Optional[str]): Source gateway for federated agents.
345 team_id (Optional[str]): ID of the team to assign the agent to.
346 owner_email (Optional[str]): Email of the agent owner.
347 visibility (Optional[str]): Visibility level ('public', 'team', 'private').
349 Returns:
350 A2AAgentRead: The created agent object.
352 Raises:
353 A2AAgentNameConflictError: If another agent with the same name already exists.
354 IntegrityError: If a database constraint or integrity violation occurs.
355 ValueError: If invalid configuration or data is provided.
356 A2AAgentError: For any other unexpected errors during registration.
358 Examples:
359 # TODO
360 """
361 with create_span(
362 "a2a.register",
363 {
364 "a2a.agent.name": agent_data.name,
365 "a2a.agent.type": agent_data.agent_type,
366 "created_by": created_by,
367 "team_id": team_id,
368 "visibility": visibility,
369 },
370 ) as span:
371 try:
372 agent_data.slug = slugify(agent_data.name)
373 # Check for existing server with the same slug within the same team or public scope
374 if visibility.lower() == "public":
375 logger.info(f"visibility.lower(): {visibility.lower()}")
376 logger.info(f"agent_data.name: {agent_data.name}")
377 logger.info(f"agent_data.slug: {agent_data.slug}")
378 # Check for existing public a2a agent with the same slug
379 existing_agent = get_for_update(db, DbA2AAgent, where=and_(DbA2AAgent.slug == agent_data.slug, DbA2AAgent.visibility == "public"))
380 if existing_agent:
381 raise A2AAgentNameConflictError(name=agent_data.slug, is_active=existing_agent.enabled, agent_id=existing_agent.id, visibility=existing_agent.visibility)
382 elif visibility.lower() == "team" and team_id:
383 # Check for existing team a2a agent with the same slug
384 existing_agent = get_for_update(db, DbA2AAgent, where=and_(DbA2AAgent.slug == agent_data.slug, DbA2AAgent.visibility == "team", DbA2AAgent.team_id == team_id))
385 if existing_agent:
386 raise A2AAgentNameConflictError(name=agent_data.slug, is_active=existing_agent.enabled, agent_id=existing_agent.id, visibility=existing_agent.visibility)
388 auth_type = getattr(agent_data, "auth_type", None)
389 # Support multiple custom headers
390 auth_value = getattr(agent_data, "auth_value", {})
392 # authentication_headers: Optional[Dict[str, str]] = None
394 if hasattr(agent_data, "auth_headers") and agent_data.auth_headers:
395 # Convert list of {key, value} to dict
396 header_dict = {h["key"]: h["value"] for h in agent_data.auth_headers if h.get("key")}
397 # Keep encoded form for persistence, but pass raw headers for initialization
398 auth_value = encode_auth(header_dict) # Encode the dict for consistency
399 # authentication_headers = {str(k): str(v) for k, v in header_dict.items()}
400 # elif isinstance(auth_value, str) and auth_value:
401 # # Decode persisted auth for initialization
402 # decoded = decode_auth(auth_value)
403 # authentication_headers = {str(k): str(v) for k, v in decoded.items()}
404 else:
405 # authentication_headers = None
406 pass
407 # auth_value = {}
409 oauth_config = await protect_oauth_config_for_storage(getattr(agent_data, "oauth_config", None))
411 # Handle query_param auth - encrypt and prepare for storage
412 auth_query_params_encrypted: Optional[Dict[str, str]] = None
413 if auth_type == "query_param":
414 # Standard
415 from urllib.parse import urlparse # pylint: disable=import-outside-toplevel
417 # First-Party
418 from mcpgateway.config import settings # pylint: disable=import-outside-toplevel
420 # Service-layer enforcement: Check feature flag
421 if not settings.insecure_allow_queryparam_auth:
422 raise ValueError("Query parameter authentication is disabled. Set INSECURE_ALLOW_QUERYPARAM_AUTH=true to enable.")
424 # Service-layer enforcement: Check host allowlist
425 if settings.insecure_queryparam_auth_allowed_hosts:
426 parsed = urlparse(str(agent_data.endpoint_url))
427 hostname = (parsed.hostname or "").lower()
428 allowed_hosts = [h.lower() for h in settings.insecure_queryparam_auth_allowed_hosts]
429 if hostname not in allowed_hosts:
430 allowed = ", ".join(settings.insecure_queryparam_auth_allowed_hosts)
431 raise ValueError(f"Host '{hostname}' is not in the allowed hosts for query param auth. " f"Allowed: {allowed}")
433 # Extract and encrypt query param auth
434 param_key = getattr(agent_data, "auth_query_param_key", None)
435 param_value = getattr(agent_data, "auth_query_param_value", None)
436 if param_key and param_value:
437 # Handle SecretStr
438 if hasattr(param_value, "get_secret_value"):
439 raw_value = param_value.get_secret_value()
440 else:
441 raw_value = str(param_value)
442 # Encrypt for storage
443 encrypted_value = encode_auth({param_key: raw_value})
444 auth_query_params_encrypted = {param_key: encrypted_value}
445 # Query param auth doesn't use auth_value
446 auth_value = None
448 # Create new agent
449 new_agent = DbA2AAgent(
450 name=agent_data.name,
451 description=agent_data.description,
452 endpoint_url=agent_data.endpoint_url,
453 agent_type=agent_data.agent_type,
454 protocol_version=agent_data.protocol_version,
455 capabilities=agent_data.capabilities,
456 config=agent_data.config,
457 auth_type=auth_type,
458 auth_value=auth_value, # This should be encrypted in practice
459 auth_query_params=auth_query_params_encrypted, # Encrypted query param auth
460 oauth_config=oauth_config,
461 tags=agent_data.tags,
462 passthrough_headers=getattr(agent_data, "passthrough_headers", None),
463 # Team scoping fields - use schema values if provided, otherwise fallback to parameters
464 team_id=getattr(agent_data, "team_id", None) or team_id,
465 owner_email=getattr(agent_data, "owner_email", None) or owner_email or created_by,
466 # Endpoint visibility parameter takes precedence over schema default
467 visibility=visibility if visibility is not None else getattr(agent_data, "visibility", "public"),
468 created_by=created_by,
469 created_from_ip=created_from_ip,
470 created_via=created_via,
471 created_user_agent=created_user_agent,
472 import_batch_id=import_batch_id,
473 federation_source=federation_source,
474 )
476 db.add(new_agent)
477 # Commit agent FIRST to ensure it persists even if tool creation fails
478 # This is critical because ToolService.register_tool calls db.rollback()
479 # on error, which would undo a pending (flushed but uncommitted) agent
480 db.commit()
481 db.refresh(new_agent)
483 # Invalidate caches since agent count changed
484 # Wrapped in try/except to ensure cache failures don't fail the request
485 # when the agent is already successfully committed
486 try:
487 a2a_stats_cache.invalidate()
488 cache = _get_registry_cache()
489 await cache.invalidate_agents()
490 # Also invalidate tags cache since agent tags may have changed
491 # First-Party
492 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel
494 await admin_stats_cache.invalidate_tags()
495 # First-Party
496 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel
498 metrics_cache.invalidate("a2a")
499 except Exception as cache_error:
500 logger.warning(f"Cache invalidation failed after agent commit: {cache_error}")
502 # Automatically create a tool for the A2A agent if not already present
503 # Tool creation is wrapped in try/except to ensure agent registration succeeds
504 # even if tool creation fails (e.g., due to visibility or permission issues)
505 tool_db = None
506 try:
507 # First-Party
508 from mcpgateway.services.tool_service import tool_service
510 tool_db = await tool_service.create_tool_from_a2a_agent(
511 db=db,
512 agent=new_agent,
513 created_by=created_by,
514 created_from_ip=created_from_ip,
515 created_via=created_via,
516 created_user_agent=created_user_agent,
517 )
519 # Associate the tool with the agent using the relationship
520 # This sets both the tool_id foreign key and the tool relationship
521 new_agent.tool = tool_db
522 db.commit()
523 db.refresh(new_agent)
524 logger.info(f"Registered new A2A agent: {new_agent.name} (ID: {new_agent.id}) with tool ID: {tool_db.id}")
525 except Exception as tool_error:
526 # Log the error but don't fail agent registration
527 # Agent was already committed above, so it persists even if tool creation fails
528 logger.warning(f"Failed to create tool for A2A agent {new_agent.name}: {tool_error}")
529 structured_logger.warning(
530 f"A2A agent '{new_agent.name}' created without tool association",
531 user_id=created_by,
532 resource_type="a2a_agent",
533 resource_id=str(new_agent.id),
534 custom_fields={"error": str(tool_error), "agent_name": new_agent.name},
535 )
536 # Refresh the agent to ensure it's in a clean state after any rollback
537 db.refresh(new_agent)
538 logger.info(f"Registered new A2A agent: {new_agent.name} (ID: {new_agent.id}) without tool")
540 # Log A2A agent registration for lifecycle tracking
541 structured_logger.info(
542 f"A2A agent '{new_agent.name}' registered successfully",
543 user_id=created_by,
544 user_email=owner_email,
545 team_id=team_id,
546 resource_type="a2a_agent",
547 resource_id=str(new_agent.id),
548 resource_action="create",
549 custom_fields={
550 "agent_name": new_agent.name,
551 "agent_type": new_agent.agent_type,
552 "protocol_version": new_agent.protocol_version,
553 "visibility": visibility,
554 "endpoint_url": new_agent.endpoint_url,
555 },
556 )
558 if span:
559 set_span_attribute(span, "success", True)
560 set_span_attribute(span, "a2a.agent.id", str(new_agent.id))
561 return self.convert_agent_to_read(new_agent, db=db)
563 except A2AAgentNameConflictError as ie:
564 set_span_error(span, ie)
565 db.rollback()
566 raise ie
567 except IntegrityError as ie:
568 set_span_error(span, ie)
569 db.rollback()
570 logger.error(f"IntegrityErrors in group: {ie}")
571 raise ie
572 except ValueError as ve:
573 set_span_error(span, ve)
574 raise ve
575 except Exception as e:
576 set_span_error(span, e)
577 db.rollback()
578 raise A2AAgentError(f"Failed to register A2A agent: {str(e)}")
580 async def list_agents(
581 self,
582 db: Session,
583 cursor: Optional[str] = None,
584 include_inactive: bool = False,
585 tags: Optional[List[str]] = None,
586 limit: Optional[int] = None,
587 page: Optional[int] = None,
588 per_page: Optional[int] = None,
589 user_email: Optional[str] = None,
590 token_teams: Optional[List[str]] = None,
591 team_id: Optional[str] = None,
592 visibility: Optional[str] = None,
593 ) -> Union[tuple[List[A2AAgentRead], Optional[str]], Dict[str, Any]]:
594 """List A2A agents with cursor pagination and optional team filtering.
596 Args:
597 db: Database session.
598 cursor: Pagination cursor for keyset pagination.
599 include_inactive: Whether to include inactive agents.
600 tags: List of tags to filter by.
601 limit: Maximum number of agents to return. None for default, 0 for unlimited.
602 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor.
603 per_page: Items per page for page-based pagination. Defaults to pagination_default_page_size.
604 user_email: Email of user for owner matching in visibility checks.
605 token_teams: Teams from JWT token. None = admin (no filtering),
606 [] = public-only, [...] = team-scoped access.
607 team_id: Optional team ID to filter by specific team.
608 visibility: Optional visibility filter (private, team, public).
610 Returns:
611 If page is provided: Dict with {"data": [...], "pagination": {...}, "links": {...}}
612 If cursor is provided or neither: tuple of (list of A2AAgentRead objects, next_cursor).
614 Examples:
615 >>> from mcpgateway.services.a2a_service import A2AAgentService
616 >>> from unittest.mock import MagicMock
617 >>> from mcpgateway.schemas import A2AAgentRead
618 >>> import asyncio
620 >>> service = A2AAgentService()
621 >>> db = MagicMock()
623 >>> # Mock a single agent object returned by the DB
624 >>> agent_obj = MagicMock()
625 >>> db.execute.return_value.scalars.return_value.all.return_value = [agent_obj]
627 >>> # Mock the A2AAgentRead schema to return a masked string
628 >>> mocked_agent_read = MagicMock()
629 >>> mocked_agent_read.masked.return_value = 'agent_read'
630 >>> A2AAgentRead.model_validate = MagicMock(return_value=mocked_agent_read)
632 >>> # Run the service method
633 >>> agents, cursor = asyncio.run(service.list_agents(db))
634 >>> agents == ['agent_read'] and cursor is None
635 True
637 >>> # Test include_inactive parameter (same mock works)
638 >>> agents_with_inactive, cursor = asyncio.run(service.list_agents(db, include_inactive=True))
639 >>> agents_with_inactive == ['agent_read'] and cursor is None
640 True
642 >>> # Test empty result
643 >>> db.execute.return_value.scalars.return_value.all.return_value = []
644 >>> empty_agents, cursor = asyncio.run(service.list_agents(db))
645 >>> empty_agents == [] and cursor is None
646 True
648 """
649 # ══════════════════════════════════════════════════════════════════════
650 # CACHE READ: Skip cache when ANY access filtering is applied
651 # This prevents leaking admin-level results to filtered requests
652 # Cache only when: user_email is None AND token_teams is None AND page is None
653 # ══════════════════════════════════════════════════════════════════════
654 cache = _get_registry_cache()
655 if cursor is None and user_email is None and token_teams is None and page is None:
656 filters_hash = cache.hash_filters(include_inactive=include_inactive, tags=sorted(tags) if tags else None, visibility=visibility)
657 cached = await cache.get("agents", filters_hash)
658 if cached is not None:
659 # Reconstruct A2AAgentRead objects from cached dicts
660 cached_agents = [A2AAgentRead.model_validate(a).masked() for a in cached["agents"]]
661 return (cached_agents, cached.get("next_cursor"))
663 # Build base query with ordering
664 query = select(DbA2AAgent).order_by(desc(DbA2AAgent.created_at), desc(DbA2AAgent.id))
666 # Apply active/inactive filter
667 if not include_inactive:
668 query = query.where(DbA2AAgent.enabled)
670 query = await self._apply_access_control(query, db, user_email, token_teams, team_id)
672 if visibility:
673 query = query.where(DbA2AAgent.visibility == visibility)
675 # Add tag filtering if tags are provided (supports both List[str] and List[Dict] formats)
676 if tags:
677 query = query.where(json_contains_tag_expr(db, DbA2AAgent.tags, tags, match_any=True))
679 # Use unified pagination helper - handles both page and cursor pagination
680 pag_result = await unified_paginate(
681 db=db,
682 query=query,
683 page=page,
684 per_page=per_page,
685 cursor=cursor,
686 limit=limit,
687 base_url="/admin/a2a", # Used for page-based links
688 query_params={"include_inactive": include_inactive} if include_inactive else {},
689 )
691 next_cursor = None
692 # Extract servers based on pagination type
693 if page is not None:
694 # Page-based: pag_result is a dict
695 a2a_agents_db = pag_result["data"]
696 else:
697 # Cursor-based: pag_result is a tuple
698 a2a_agents_db, next_cursor = pag_result
700 # Fetch team names for the agents (common for both pagination types)
701 team_ids_set = {s.team_id for s in a2a_agents_db if s.team_id}
702 team_map = {}
703 if team_ids_set:
704 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(team_ids_set), EmailTeam.is_active.is_(True))).all()
705 team_map = {team.id: team.name for team in teams}
707 db.commit() # Release transaction to avoid idle-in-transaction
709 # Convert to A2AAgentRead (common for both pagination types)
710 result = []
711 for s in a2a_agents_db:
712 try:
713 s.team = team_map.get(s.team_id) if s.team_id else None
714 result.append(self.convert_agent_to_read(s, include_metrics=False, db=db, team_map=team_map))
715 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e:
716 logger.exception(f"Failed to convert A2A agent {getattr(s, 'id', 'unknown')} ({getattr(s, 'name', 'unknown')}): {e}")
717 # Continue with remaining agents instead of failing completely
719 # Return appropriate format based on pagination type
720 if page is not None:
721 # Page-based format
722 return {
723 "data": result,
724 "pagination": pag_result["pagination"],
725 "links": pag_result["links"],
726 }
728 # Cursor-based format
730 # ══════════════════════════════════════════════════════════════════════
731 # CACHE WRITE: Only cache admin-level results (matches read guard)
732 # MUST check token_teams is None to prevent caching scoped responses
733 # ══════════════════════════════════════════════════════════════════════
734 if cursor is None and user_email is None and token_teams is None:
735 try:
736 cache_data = {"agents": [s.model_dump(mode="json") for s in result], "next_cursor": next_cursor}
737 await cache.set("agents", cache_data, filters_hash)
738 except AttributeError:
739 pass # Skip caching if result objects don't support model_dump (e.g., in doctests)
741 return (result, next_cursor)
743 async def list_agents_for_user(
744 self, db: Session, user_info: Dict[str, Any], team_id: Optional[str] = None, visibility: Optional[str] = None, include_inactive: bool = False, skip: int = 0, limit: int = 100
745 ) -> List[A2AAgentRead]:
746 """
747 DEPRECATED: Use list_agents() with user_email parameter instead.
749 This method is maintained for backward compatibility but is no longer used.
750 New code should call list_agents() with user_email, team_id, and visibility parameters.
752 List A2A agents user has access to with team filtering.
754 Args:
755 db: Database session
756 user_info: Object representing identity of the user who is requesting agents
757 team_id: Optional team ID to filter by specific team
758 visibility: Optional visibility filter (private, team, public)
759 include_inactive: Whether to include inactive agents
760 skip: Number of agents to skip for pagination
761 limit: Maximum number of agents to return
763 Returns:
764 List[A2AAgentRead]: A2A agents the user has access to
765 """
767 # Handle case where user_info is a string (email) instead of dict (<0.7.0)
768 if isinstance(user_info, str):
769 user_email = str(user_info)
770 else:
771 user_email = user_info.get("email", "")
773 # Build query following existing patterns from list_prompts()
774 team_service = TeamManagementService(db)
775 user_teams = await team_service.get_user_teams(user_email)
776 team_ids = [team.id for team in user_teams]
778 # Build query following existing patterns from list_agents()
779 query = select(DbA2AAgent)
781 # Apply active/inactive filter
782 if not include_inactive:
783 query = query.where(DbA2AAgent.enabled.is_(True))
785 if team_id:
786 if team_id not in team_ids:
787 return [] # No access to team
789 access_conditions = []
790 # Filter by specific team
791 access_conditions.append(and_(DbA2AAgent.team_id == team_id, DbA2AAgent.visibility.in_(["team", "public"])))
793 access_conditions.append(and_(DbA2AAgent.team_id == team_id, DbA2AAgent.owner_email == user_email))
795 query = query.where(or_(*access_conditions))
796 else:
797 # Get user's accessible teams
798 # Build access conditions following existing patterns
799 access_conditions = []
800 # 1. User's personal resources (owner_email matches)
801 access_conditions.append(DbA2AAgent.owner_email == user_email)
802 # 2. Team A2A Agents where user is member
803 if team_ids:
804 access_conditions.append(and_(DbA2AAgent.team_id.in_(team_ids), DbA2AAgent.visibility.in_(["team", "public"])))
805 # 3. Public resources (if visibility allows)
806 access_conditions.append(DbA2AAgent.visibility == "public")
808 query = query.where(or_(*access_conditions))
810 # Apply visibility filter if specified
811 if visibility:
812 query = query.where(DbA2AAgent.visibility == visibility)
814 # Apply pagination following existing patterns
815 query = query.order_by(desc(DbA2AAgent.created_at))
816 query = query.offset(skip).limit(limit)
818 agents = db.execute(query).scalars().all()
820 # Batch fetch team names to avoid N+1 queries
821 team_ids = list({a.team_id for a in agents if a.team_id})
822 team_map = self._batch_get_team_names(db, team_ids)
824 db.commit() # Release transaction to avoid idle-in-transaction
826 # Skip metrics to avoid N+1 queries in list operations
827 result = []
828 for agent in agents:
829 try:
830 result.append(self.convert_agent_to_read(agent, include_metrics=False, db=db, team_map=team_map))
831 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e:
832 logger.exception(f"Failed to convert A2A agent {getattr(agent, 'id', 'unknown')} ({getattr(agent, 'name', 'unknown')}): {e}")
833 # Continue with remaining agents instead of failing completely
835 return result
837 async def get_agent(
838 self,
839 db: Session,
840 agent_id: str,
841 include_inactive: bool = True,
842 user_email: Optional[str] = None,
843 token_teams: Optional[List[str]] = None,
844 ) -> A2AAgentRead:
845 """Retrieve an A2A agent by ID.
847 Args:
848 db: Database session.
849 agent_id: Agent ID.
850 include_inactive: Whether to include inactive a2a agents.
851 user_email: User's email for owner matching in visibility checks.
852 token_teams: Teams from JWT token. None = admin (no filtering),
853 [] = public-only, [...] = team-scoped access.
855 Returns:
856 Agent data.
858 Raises:
859 A2AAgentNotFoundError: If the agent is not found or user lacks access.
861 Examples:
862 >>> from unittest.mock import MagicMock
863 >>> from datetime import datetime
864 >>> import asyncio
865 >>> from mcpgateway.schemas import A2AAgentRead
866 >>> from mcpgateway.services.a2a_service import A2AAgentService, A2AAgentNotFoundError
868 >>> service = A2AAgentService()
869 >>> db = MagicMock()
871 >>> # Create a mock agent
872 >>> agent_mock = MagicMock()
873 >>> agent_mock.enabled = True
874 >>> agent_mock.id = "agent_id"
875 >>> agent_mock.name = "Test Agent"
876 >>> agent_mock.slug = "test-agent"
877 >>> agent_mock.description = "A2A test agent"
878 >>> agent_mock.endpoint_url = "https://example.com"
879 >>> agent_mock.agent_type = "rest"
880 >>> agent_mock.protocol_version = "v1"
881 >>> agent_mock.capabilities = {}
882 >>> agent_mock.config = {}
883 >>> agent_mock.reachable = True
884 >>> agent_mock.created_at = datetime.now()
885 >>> agent_mock.updated_at = datetime.now()
886 >>> agent_mock.last_interaction = None
887 >>> agent_mock.tags = []
888 >>> agent_mock.metrics = MagicMock()
889 >>> agent_mock.metrics.success_rate = 1.0
890 >>> agent_mock.metrics.failure_rate = 0.0
891 >>> agent_mock.metrics.last_error = None
892 >>> agent_mock.auth_type = None
893 >>> agent_mock.auth_value = None
894 >>> agent_mock.oauth_config = None
895 >>> agent_mock.created_by = "user"
896 >>> agent_mock.created_from_ip = "127.0.0.1"
897 >>> agent_mock.created_via = "ui"
898 >>> agent_mock.created_user_agent = "test-agent"
899 >>> agent_mock.modified_by = "user"
900 >>> agent_mock.modified_from_ip = "127.0.0.1"
901 >>> agent_mock.modified_via = "ui"
902 >>> agent_mock.modified_user_agent = "test-agent"
903 >>> agent_mock.import_batch_id = None
904 >>> agent_mock.federation_source = None
905 >>> agent_mock.team_id = "team-1"
906 >>> agent_mock.team = "Team 1"
907 >>> agent_mock.owner_email = "owner@example.com"
908 >>> agent_mock.visibility = "public"
910 >>> db.get.return_value = agent_mock
912 >>> # Mock convert_agent_to_read to simplify test
913 >>> service.convert_agent_to_read = lambda db_agent, **kwargs: 'agent_read'
915 >>> # Test with active agent
916 >>> result = asyncio.run(service.get_agent(db, 'agent_id'))
917 >>> result
918 'agent_read'
920 >>> # Test with inactive agent but include_inactive=True
921 >>> agent_mock.enabled = False
922 >>> result_inactive = asyncio.run(service.get_agent(db, 'agent_id', include_inactive=True))
923 >>> result_inactive
924 'agent_read'
926 """
927 query = select(DbA2AAgent).where(DbA2AAgent.id == agent_id)
928 agent = db.execute(query).scalar_one_or_none()
930 if not agent:
931 raise A2AAgentNotFoundError(f"A2A Agent not found with ID: {agent_id}")
933 if not agent.enabled and not include_inactive:
934 raise A2AAgentNotFoundError(f"A2A Agent not found with ID: {agent_id}")
936 # SECURITY: Check visibility/team access
937 # Return 404 (not 403) to avoid leaking existence of private agents
938 if not self._check_agent_access(agent, user_email, token_teams):
939 raise A2AAgentNotFoundError(f"A2A Agent not found with ID: {agent_id}")
941 # Delegate conversion and masking to convert_agent_to_read()
942 return self.convert_agent_to_read(agent, db=db)
944 async def get_agent_by_name(self, db: Session, agent_name: str) -> A2AAgentRead:
945 """Retrieve an A2A agent by name.
947 Args:
948 db: Database session.
949 agent_name: Agent name.
951 Returns:
952 Agent data.
954 Raises:
955 A2AAgentNotFoundError: If the agent is not found.
956 """
957 query = select(DbA2AAgent).where(DbA2AAgent.name == agent_name)
958 agent = db.execute(query).scalar_one_or_none()
960 if not agent:
961 raise A2AAgentNotFoundError(f"A2A Agent not found with name: {agent_name}")
963 return self.convert_agent_to_read(agent, db=db)
965 async def update_agent(
966 self,
967 db: Session,
968 agent_id: str,
969 agent_data: A2AAgentUpdate,
970 modified_by: Optional[str] = None,
971 modified_from_ip: Optional[str] = None,
972 modified_via: Optional[str] = None,
973 modified_user_agent: Optional[str] = None,
974 user_email: Optional[str] = None,
975 ) -> A2AAgentRead:
976 """Update an existing A2A agent.
978 Args:
979 db: Database session.
980 agent_id: Agent ID.
981 agent_data: Agent update data.
982 modified_by: Username who modified this agent.
983 modified_from_ip: IP address of modifier.
984 modified_via: Modification method.
985 modified_user_agent: User agent of modification request.
986 user_email: Email of user performing update (for ownership check).
988 Returns:
989 Updated agent data.
991 Raises:
992 A2AAgentNotFoundError: If the agent is not found.
993 PermissionError: If user doesn't own the agent.
994 A2AAgentNameConflictError: If name conflicts with another agent.
995 A2AAgentError: For other errors during update.
996 IntegrityError: If a database integrity error occurs.
997 ValueError: If query_param auth is disabled or host not in allowlist.
998 """
999 try:
1000 # Acquire row lock for update to avoid lost-update on `version` and other fields
1001 agent = get_for_update(db, DbA2AAgent, agent_id)
1003 if not agent:
1004 raise A2AAgentNotFoundError(f"A2A Agent not found with ID: {agent_id}")
1006 # Check ownership if user_email provided
1007 if user_email:
1008 # First-Party
1009 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel
1011 permission_service = PermissionService(db)
1012 if not await permission_service.check_resource_ownership(user_email, agent):
1013 raise PermissionError("Only the owner can update this agent")
1014 # Check for name conflict if name is being updated
1015 if agent_data.name and agent_data.name != agent.name:
1016 new_slug = slugify(agent_data.name)
1017 visibility = agent_data.visibility or agent.visibility
1018 team_id = agent_data.team_id or agent.team_id
1019 # Check for existing server with the same slug within the same team or public scope
1020 if visibility.lower() == "public":
1021 # Check for existing public a2a agent with the same slug
1022 existing_agent = get_for_update(db, DbA2AAgent, where=and_(DbA2AAgent.slug == new_slug, DbA2AAgent.visibility == "public"))
1023 if existing_agent:
1024 raise A2AAgentNameConflictError(name=new_slug, is_active=existing_agent.enabled, agent_id=existing_agent.id, visibility=existing_agent.visibility)
1025 elif visibility.lower() == "team" and team_id:
1026 # Check for existing team a2a agent with the same slug
1027 existing_agent = get_for_update(db, DbA2AAgent, where=and_(DbA2AAgent.slug == new_slug, DbA2AAgent.visibility == "team", DbA2AAgent.team_id == team_id))
1028 if existing_agent:
1029 raise A2AAgentNameConflictError(name=new_slug, is_active=existing_agent.enabled, agent_id=existing_agent.id, visibility=existing_agent.visibility)
1030 # Update the slug when name changes
1031 agent.slug = new_slug
1032 # Update fields
1033 # Avoid `model_dump()` here: tests use `model_construct()` to create intentionally invalid
1034 # payloads, and `model_dump()` emits serializer warnings when encountering unexpected types.
1035 update_data = {field: getattr(agent_data, field) for field in agent_data.model_fields_set}
1037 # Track original auth_type and endpoint_url before updates
1038 original_auth_type = agent.auth_type
1039 original_endpoint_url = agent.endpoint_url
1041 for field, value in update_data.items():
1042 if field == "passthrough_headers":
1043 if value is not None:
1044 if isinstance(value, list):
1045 # Clean list: remove empty or whitespace-only entries
1046 cleaned = [h.strip() for h in value if isinstance(h, str) and h.strip()]
1047 agent.passthrough_headers = cleaned or None
1048 elif isinstance(value, str):
1049 # Parse comma-separated string and clean
1050 parsed: List[str] = [h.strip() for h in value.split(",") if h.strip()]
1051 agent.passthrough_headers = parsed or None
1052 else:
1053 raise A2AAgentError("Invalid passthrough_headers format: must be list[str] or comma-separated string")
1054 else:
1055 # Explicitly set to None if value is None
1056 agent.passthrough_headers = None
1057 continue
1059 # Skip query_param fields - handled separately below
1060 if field in ("auth_query_param_key", "auth_query_param_value"):
1061 continue
1063 # auth_headers is on the schema but not the DB model; translate
1064 # it into auth_value, preserving masked placeholders from the
1065 # existing encrypted value so an unchanged edit does not
1066 # overwrite real credentials with the mask string.
1067 if field == "auth_headers" and value and isinstance(value, list):
1068 # First-Party
1069 from mcpgateway.config import settings as _settings # pylint: disable=import-outside-toplevel
1071 existing_auth_raw = getattr(agent, "auth_value", None)
1072 existing_auth: Dict[str, str] = {}
1073 if isinstance(existing_auth_raw, str):
1074 try:
1075 existing_auth = decode_auth(existing_auth_raw)
1076 except Exception:
1077 existing_auth = {}
1078 elif isinstance(existing_auth_raw, dict):
1079 existing_auth = existing_auth_raw
1081 header_dict: Dict[str, str] = {}
1082 for header in value:
1083 key = header.get("key")
1084 if not key:
1085 continue
1086 hval = header.get("value", "")
1087 if hval == _settings.masked_auth_value and key in existing_auth:
1088 header_dict[key] = existing_auth[key]
1089 else:
1090 header_dict[key] = hval
1092 if header_dict:
1093 agent.auth_value = encode_auth(header_dict)
1094 continue
1096 if field == "oauth_config":
1097 value = await protect_oauth_config_for_storage(value, existing_oauth_config=agent.oauth_config)
1099 # Validate team reassignment before persisting
1100 if field == "team_id" and value is not None and value != agent.team_id:
1101 _validate_a2a_team_assignment(db, user_email, value)
1103 # Validate visibility transition to "team"
1104 if field == "visibility" and value == "team":
1105 target_team_id = update_data.get("team_id", agent.team_id) if "team_id" in update_data else agent.team_id
1106 _validate_a2a_team_assignment(db, user_email, target_team_id)
1108 if hasattr(agent, field):
1109 setattr(agent, field, value)
1111 # Handle query_param auth updates
1112 # Clear auth_query_params when switching away from query_param auth
1113 if original_auth_type == "query_param" and agent_data.auth_type is not None and agent_data.auth_type != "query_param":
1114 agent.auth_query_params = None
1115 logger.debug(f"Cleared auth_query_params for agent {agent.id} (switched from query_param to {agent_data.auth_type})")
1117 # Handle switching to query_param auth or updating existing query_param credentials
1118 is_switching_to_queryparam = agent_data.auth_type == "query_param" and original_auth_type != "query_param"
1119 is_updating_queryparam_creds = original_auth_type == "query_param" and (agent_data.auth_query_param_key is not None or agent_data.auth_query_param_value is not None)
1120 is_url_changing = agent_data.endpoint_url is not None and str(agent_data.endpoint_url) != original_endpoint_url
1122 if is_switching_to_queryparam or is_updating_queryparam_creds or (is_url_changing and original_auth_type == "query_param"):
1123 # Standard
1124 from urllib.parse import urlparse # pylint: disable=import-outside-toplevel
1126 # First-Party
1127 from mcpgateway.config import settings # pylint: disable=import-outside-toplevel
1129 # Service-layer enforcement: Check feature flag
1130 if not settings.insecure_allow_queryparam_auth:
1131 # Grandfather clause: Allow updates to existing query_param agents
1132 # unless they're trying to change credentials
1133 if is_switching_to_queryparam or is_updating_queryparam_creds:
1134 raise ValueError("Query parameter authentication is disabled. Set INSECURE_ALLOW_QUERYPARAM_AUTH=true to enable.")
1136 # Service-layer enforcement: Check host allowlist
1137 if settings.insecure_queryparam_auth_allowed_hosts:
1138 check_url = str(agent_data.endpoint_url) if agent_data.endpoint_url else agent.endpoint_url
1139 parsed = urlparse(check_url)
1140 hostname = (parsed.hostname or "").lower()
1141 allowed_hosts = [h.lower() for h in settings.insecure_queryparam_auth_allowed_hosts]
1142 if hostname not in allowed_hosts:
1143 allowed = ", ".join(settings.insecure_queryparam_auth_allowed_hosts)
1144 raise ValueError(f"Host '{hostname}' is not in the allowed hosts for query param auth. " f"Allowed: {allowed}")
1146 if is_switching_to_queryparam or is_updating_queryparam_creds:
1147 # Get query param key and value
1148 param_key = getattr(agent_data, "auth_query_param_key", None)
1149 param_value = getattr(agent_data, "auth_query_param_value", None)
1151 # If no key provided but value is, reuse existing key (value-only rotation)
1152 existing_key = next(iter(agent.auth_query_params.keys()), None) if agent.auth_query_params else None
1153 if not param_key and param_value and existing_key:
1154 param_key = existing_key
1156 if param_key:
1157 # Check if value is masked (user didn't change it) or new value provided
1158 is_masked_placeholder = False
1159 if param_value and hasattr(param_value, "get_secret_value"):
1160 raw_value = param_value.get_secret_value()
1161 # First-Party
1162 from mcpgateway.config import settings # pylint: disable=import-outside-toplevel
1164 is_masked_placeholder = raw_value == settings.masked_auth_value
1165 elif param_value:
1166 raw_value = str(param_value)
1167 else:
1168 raw_value = None
1170 if raw_value and not is_masked_placeholder:
1171 # New value provided - encrypt for storage
1172 encrypted_value = encode_auth({param_key: raw_value})
1173 agent.auth_query_params = {param_key: encrypted_value}
1174 elif agent.auth_query_params and is_masked_placeholder:
1175 # Use existing encrypted value (user didn't change the password)
1176 # But key may have changed, so preserve with new key if different
1177 if existing_key and existing_key != param_key:
1178 # Key changed but value is masked - decrypt and re-encrypt with new key
1179 existing_encrypted = agent.auth_query_params.get(existing_key, "")
1180 if existing_encrypted:
1181 decrypted = decode_auth(existing_encrypted)
1182 existing_value = decrypted.get(existing_key, "")
1183 if existing_value:
1184 encrypted_value = encode_auth({param_key: existing_value})
1185 agent.auth_query_params = {param_key: encrypted_value}
1187 # Update auth_type if switching
1188 if is_switching_to_queryparam:
1189 agent.auth_type = "query_param"
1190 agent.auth_value = None # Query param auth doesn't use auth_value
1192 # Update metadata
1193 if modified_by:
1194 agent.modified_by = modified_by
1195 if modified_from_ip:
1196 agent.modified_from_ip = modified_from_ip
1197 if modified_via:
1198 agent.modified_via = modified_via
1199 if modified_user_agent:
1200 agent.modified_user_agent = modified_user_agent
1202 agent.version += 1
1204 db.commit()
1205 db.refresh(agent)
1207 # Invalidate cache after successful update
1208 cache = _get_registry_cache()
1209 await cache.invalidate_agents()
1210 # Also invalidate tags cache since agent tags may have changed
1211 # First-Party
1212 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel
1214 await admin_stats_cache.invalidate_tags()
1216 # Update the associated tool if it exists
1217 # Wrap in try/except to handle tool sync failures gracefully - the agent
1218 # update is the primary operation and should succeed even if tool sync fails
1219 try:
1220 # First-Party
1221 from mcpgateway.services.tool_service import tool_service
1223 await tool_service.update_tool_from_a2a_agent(
1224 db=db,
1225 agent=agent,
1226 modified_by=modified_by,
1227 modified_from_ip=modified_from_ip,
1228 modified_via=modified_via,
1229 modified_user_agent=modified_user_agent,
1230 )
1231 except Exception as tool_err:
1232 logger.warning(f"Failed to sync tool for A2A agent {agent.id}: {tool_err}. Agent update succeeded but tool may be out of sync.")
1234 logger.info(f"Updated A2A agent: {agent.name} (ID: {agent.id})")
1235 return self.convert_agent_to_read(agent, db=db)
1236 except PermissionError:
1237 db.rollback()
1238 raise
1239 except A2AAgentNameConflictError as ie:
1240 db.rollback()
1241 raise ie
1242 except A2AAgentNotFoundError as nf:
1243 db.rollback()
1244 raise nf
1245 except IntegrityError as ie:
1246 db.rollback()
1247 logger.error(f"IntegrityErrors in group: {ie}")
1248 raise ie
1249 except Exception as e:
1250 db.rollback()
1251 raise A2AAgentError(f"Failed to update A2A agent: {str(e)}")
1253 async def set_agent_state(self, db: Session, agent_id: str, activate: bool, reachable: Optional[bool] = None, user_email: Optional[str] = None) -> A2AAgentRead:
1254 """Set the activation status of an A2A agent.
1256 Args:
1257 db: Database session.
1258 agent_id: Agent ID.
1259 activate: True to activate, False to deactivate.
1260 reachable: Optional reachability status.
1261 user_email: Optional[str] The email of the user to check if the user has permission to modify.
1263 Returns:
1264 Updated agent data.
1266 Raises:
1267 A2AAgentNotFoundError: If the agent is not found.
1268 PermissionError: If user doesn't own the agent.
1269 """
1270 with create_span(
1271 "a2a.state_change",
1272 {
1273 "a2a.agent.id": agent_id,
1274 "a2a.agent.activate": activate,
1275 "user.email": user_email,
1276 },
1277 ) as span:
1278 try:
1279 query = select(DbA2AAgent).where(DbA2AAgent.id == agent_id)
1280 agent = db.execute(query).scalar_one_or_none()
1282 if not agent:
1283 raise A2AAgentNotFoundError(f"A2A Agent not found with ID: {agent_id}")
1285 if user_email:
1286 # First-Party
1287 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel
1289 permission_service = PermissionService(db)
1290 if not await permission_service.check_resource_ownership(user_email, agent):
1291 raise PermissionError("Only the owner can activate the Agent" if activate else "Only the owner can deactivate the Agent")
1293 agent.enabled = activate
1294 if reachable is not None:
1295 agent.reachable = reachable
1297 db.commit()
1298 db.refresh(agent)
1300 # Invalidate caches since agent status changed
1301 a2a_stats_cache.invalidate()
1302 cache = _get_registry_cache()
1303 await cache.invalidate_agents()
1305 # Cascade: update associated tool's enabled status to match agent.
1306 # This mirrors gateway_service.set_gateway_state() which lets cascade
1307 # failures propagate so the caller knows the operation was incomplete.
1308 if agent.tool_id:
1309 now = datetime.now(timezone.utc)
1310 tool_result = db.execute(update(DbTool).where(DbTool.id == agent.tool_id).where(DbTool.enabled != activate).values(enabled=activate, updated_at=now))
1311 if tool_result.rowcount > 0:
1312 db.commit()
1313 await cache.invalidate_tools()
1314 tool_lookup_cache = _get_tool_lookup_cache()
1315 if agent.tool and agent.tool.name:
1316 await tool_lookup_cache.invalidate(agent.tool.name, gateway_id=str(agent.tool.gateway_id) if agent.tool.gateway_id else None)
1318 status = "activated" if activate else "deactivated"
1319 logger.info(f"A2A agent {status}: {agent.name} (ID: {agent.id})")
1321 structured_logger.log(
1322 level="INFO",
1323 message=f"A2A agent {status}",
1324 event_type="a2a_agent_status_changed",
1325 component="a2a_service",
1326 user_email=user_email,
1327 resource_type="a2a_agent",
1328 resource_id=str(agent.id),
1329 custom_fields={
1330 "agent_name": agent.name,
1331 "enabled": agent.enabled,
1332 "reachable": agent.reachable,
1333 },
1334 )
1336 result = self.convert_agent_to_read(agent, db=db)
1337 if span:
1338 set_span_attribute(span, "success", True)
1339 return result
1340 except Exception as exc:
1341 set_span_error(span, exc)
1342 raise
1344 async def delete_agent(self, db: Session, agent_id: str, user_email: Optional[str] = None, purge_metrics: bool = False) -> None:
1345 """Delete an A2A agent.
1347 Args:
1348 db: Database session.
1349 agent_id: Agent ID.
1350 user_email: Email of user performing delete (for ownership check).
1351 purge_metrics: If True, delete raw + rollup metrics for this agent.
1353 Raises:
1354 A2AAgentNotFoundError: If the agent is not found.
1355 PermissionError: If user doesn't own the agent.
1356 """
1357 with create_span(
1358 "a2a.delete",
1359 {
1360 "a2a.agent.id": agent_id,
1361 "user.email": user_email,
1362 "purge_metrics": purge_metrics,
1363 },
1364 ) as span:
1365 try:
1366 query = select(DbA2AAgent).where(DbA2AAgent.id == agent_id)
1367 agent = db.execute(query).scalar_one_or_none()
1369 if not agent:
1370 raise A2AAgentNotFoundError(f"A2A Agent not found with ID: {agent_id}")
1372 # Check ownership if user_email provided
1373 if user_email:
1374 # First-Party
1375 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel
1377 permission_service = PermissionService(db)
1378 if not await permission_service.check_resource_ownership(user_email, agent):
1379 raise PermissionError("Only the owner can delete this agent")
1381 agent_name = agent.name
1383 # Delete the associated tool before deleting the agent
1384 # First-Party
1385 from mcpgateway.services.tool_service import tool_service
1387 await tool_service.delete_tool_from_a2a_agent(db=db, agent=agent, user_email=user_email, purge_metrics=purge_metrics)
1389 if purge_metrics:
1390 with pause_rollup_during_purge(reason=f"purge_a2a_agent:{agent_id}"):
1391 delete_metrics_in_batches(db, A2AAgentMetric, A2AAgentMetric.a2a_agent_id, agent_id)
1392 delete_metrics_in_batches(db, A2AAgentMetricsHourly, A2AAgentMetricsHourly.a2a_agent_id, agent_id)
1393 db.delete(agent)
1394 db.commit()
1396 # Invalidate caches since agent count changed
1397 a2a_stats_cache.invalidate()
1398 cache = _get_registry_cache()
1399 await cache.invalidate_agents()
1400 # Also invalidate tags cache since agent tags may have changed
1401 # First-Party
1402 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel
1404 await admin_stats_cache.invalidate_tags()
1406 logger.info(f"Deleted A2A agent: {agent_name} (ID: {agent_id})")
1408 structured_logger.log(
1409 level="INFO",
1410 message="A2A agent deleted",
1411 event_type="a2a_agent_deleted",
1412 component="a2a_service",
1413 user_email=user_email,
1414 resource_type="a2a_agent",
1415 resource_id=str(agent_id),
1416 custom_fields={
1417 "agent_name": agent_name,
1418 "purge_metrics": purge_metrics,
1419 },
1420 )
1421 if span:
1422 set_span_attribute(span, "success", True)
1423 except PermissionError:
1424 if span:
1425 set_span_attribute(span, "error", True)
1426 db.rollback()
1427 raise
1429 async def invoke_agent(
1430 self,
1431 db: Session,
1432 agent_name: str,
1433 parameters: Dict[str, Any],
1434 interaction_type: str = "query",
1435 *,
1436 user_id: Optional[str] = None,
1437 user_email: Optional[str] = None,
1438 token_teams: Optional[List[str]] = None,
1439 ) -> Dict[str, Any]:
1440 """Invoke an A2A agent.
1442 Args:
1443 db: Database session.
1444 agent_name: Name of the agent to invoke.
1445 parameters: Parameters for the interaction.
1446 interaction_type: Type of interaction.
1447 user_id: Identifier of the user initiating the call.
1448 user_email: Email of the user initiating the call.
1449 token_teams: Teams from JWT token. None = admin (no filtering),
1450 [] = public-only, [...] = team-scoped access.
1452 Returns:
1453 Agent response.
1455 Raises:
1456 A2AAgentNotFoundError: If the agent is not found or user lacks access.
1457 A2AAgentError: If the agent is disabled or invocation fails.
1458 """
1459 # ═══════════════════════════════════════════════════════════════════════════
1460 # PHASE 1: Acquire a short row lock to read `enabled` + `auth_value`,
1461 # then release the lock before performing the external HTTP call.
1462 # This avoids TOCTOU for the critical checks while not holding DB
1463 # connections during the potentially slow HTTP request.
1464 # ═══════════════════════════════════════════════════════════════════════════
1466 # Lookup the agent id, then lock the row by id using get_for_update
1467 agent_row = db.execute(select(DbA2AAgent.id).where(DbA2AAgent.name == agent_name)).scalar_one_or_none()
1468 if not agent_row:
1469 raise A2AAgentNotFoundError(f"A2A Agent not found with name: {agent_name}")
1471 agent = get_for_update(db, DbA2AAgent, agent_row)
1472 if not agent:
1473 raise A2AAgentNotFoundError(f"A2A Agent not found with name: {agent_name}")
1475 # ═══════════════════════════════════════════════════════════════════════════
1476 # SECURITY: Check visibility/team access WHILE ROW IS LOCKED
1477 # Return 404 (not 403) to avoid leaking existence of private agents
1478 # ═══════════════════════════════════════════════════════════════════════════
1479 if not self._check_agent_access(agent, user_email, token_teams):
1480 raise A2AAgentNotFoundError(f"A2A Agent not found with name: {agent_name}")
1482 if not agent.enabled:
1483 raise A2AAgentError(f"A2A Agent '{agent_name}' is disabled")
1485 # Extract all needed data to local variables before releasing DB connection
1486 agent_id = agent.id
1487 agent_endpoint_url = agent.endpoint_url
1488 agent_type = agent.agent_type
1489 agent_protocol_version = agent.protocol_version
1490 agent_auth_type = agent.auth_type
1491 agent_auth_value = agent.auth_value
1492 agent_auth_query_params = agent.auth_query_params
1494 # Handle query_param auth - decrypt and apply to URL
1495 auth_query_params_decrypted: Optional[Dict[str, str]] = None
1496 if agent_auth_type == "query_param" and agent_auth_query_params:
1497 # First-Party
1498 from mcpgateway.utils.url_auth import apply_query_param_auth # pylint: disable=import-outside-toplevel
1500 auth_query_params_decrypted = {}
1501 for param_key, encrypted_value in agent_auth_query_params.items():
1502 if encrypted_value:
1503 try:
1504 decrypted = decode_auth(encrypted_value)
1505 auth_query_params_decrypted[param_key] = decrypted.get(param_key, "")
1506 except Exception:
1507 logger.debug(f"Failed to decrypt query param '{param_key}' for A2A agent invocation")
1508 if auth_query_params_decrypted:
1509 agent_endpoint_url = apply_query_param_auth(agent_endpoint_url, auth_query_params_decrypted)
1511 # Decode auth_value for supported auth types (before closing session)
1512 auth_headers = {}
1513 if agent_auth_type in ("basic", "bearer", "authheaders") and agent_auth_value:
1514 # Decrypt auth_value and extract headers (follows gateway_service pattern)
1515 if isinstance(agent_auth_value, str):
1516 try:
1517 auth_headers = decode_auth(agent_auth_value)
1518 except Exception as e:
1519 raise A2AAgentError(f"Failed to decrypt authentication for agent '{agent_name}': {e}")
1520 elif isinstance(agent_auth_value, dict):
1521 auth_headers = {str(k): str(v) for k, v in agent_auth_value.items()}
1523 # ═══════════════════════════════════════════════════════════════════════════
1524 # CRITICAL: Release DB connection back to pool BEFORE making HTTP calls
1525 # This prevents connection pool exhaustion during slow upstream requests.
1526 # ═══════════════════════════════════════════════════════════════════════════
1527 db.commit() # End read-only transaction cleanly (commit not rollback to avoid inflating rollback stats)
1528 db.close()
1530 start_time = datetime.now(timezone.utc)
1531 success = False
1532 error_message = None
1533 response = None
1535 # ═══════════════════════════════════════════════════════════════════════════
1536 # PHASE 2: Make HTTP call (no DB connection held)
1537 # ═══════════════════════════════════════════════════════════════════════════
1539 # Create sanitized URL for logging (redacts auth query params)
1540 # First-Party
1541 from mcpgateway.utils.url_auth import sanitize_exception_message, sanitize_url_for_logging # pylint: disable=import-outside-toplevel
1543 sanitized_endpoint_url = sanitize_url_for_logging(agent_endpoint_url, auth_query_params_decrypted)
1544 span_attributes = {
1545 "a2a.agent.name": agent_name,
1546 "a2a.agent.id": str(agent_id),
1547 "a2a.agent.url": sanitized_endpoint_url,
1548 "a2a.agent.type": agent_type,
1549 "a2a.interaction_type": interaction_type,
1550 }
1551 if is_input_capture_enabled("a2a.invoke"):
1552 span_attributes["langfuse.observation.input"] = serialize_trace_payload(parameters or {})
1554 with create_span("a2a.invoke", span_attributes) as span:
1555 try:
1556 # Prepare the request to the A2A agent
1557 # Format request based on agent type and endpoint
1558 if agent_type in ["generic", "jsonrpc"] or agent_endpoint_url.endswith("/"):
1559 # Use JSONRPC format for agents that expect it
1560 request_data = {"jsonrpc": "2.0", "method": parameters.get("method", "message/send"), "params": parameters.get("params", parameters), "id": 1}
1561 else:
1562 # Use custom A2A format
1563 request_data = {"interaction_type": interaction_type, "parameters": parameters, "protocol_version": agent_protocol_version}
1565 # Make HTTP request to the agent endpoint using shared HTTP client
1566 # First-Party
1567 from mcpgateway.services.http_client_service import get_http_client # pylint: disable=import-outside-toplevel
1569 client = await get_http_client()
1570 headers = {"Content-Type": "application/json"}
1572 # Add authentication if configured (using decoded auth headers)
1573 headers.update(auth_headers)
1575 # Add correlation ID to outbound headers for distributed tracing
1576 correlation_id = get_correlation_id()
1577 if correlation_id:
1578 headers["X-Correlation-ID"] = correlation_id
1580 # Log A2A external call start (with sanitized URL to prevent credential leakage)
1581 call_start_time = datetime.now(timezone.utc)
1582 structured_logger.log(
1583 level="INFO",
1584 message=f"A2A external call started: {agent_name}",
1585 component="a2a_service",
1586 user_id=user_id,
1587 user_email=user_email,
1588 correlation_id=correlation_id,
1589 metadata={
1590 "event": "a2a_call_started",
1591 "agent_name": agent_name,
1592 "agent_id": agent_id,
1593 "endpoint_url": sanitized_endpoint_url,
1594 "interaction_type": interaction_type,
1595 "protocol_version": agent_protocol_version,
1596 },
1597 )
1599 http_response = await client.post(agent_endpoint_url, json=request_data, headers=headers)
1600 call_duration_ms = (datetime.now(timezone.utc) - call_start_time).total_seconds() * 1000
1602 if http_response.status_code == 200:
1603 response = http_response.json()
1604 success = True
1605 if span and is_output_capture_enabled("a2a.invoke"):
1606 set_span_attribute(span, "langfuse.observation.output", serialize_trace_payload(response))
1608 # Log successful A2A call
1609 structured_logger.log(
1610 level="INFO",
1611 message=f"A2A external call completed: {agent_name}",
1612 component="a2a_service",
1613 user_id=user_id,
1614 user_email=user_email,
1615 correlation_id=correlation_id,
1616 duration_ms=call_duration_ms,
1617 metadata={"event": "a2a_call_completed", "agent_name": agent_name, "agent_id": agent_id, "status_code": http_response.status_code, "success": True},
1618 )
1619 else:
1620 # Sanitize error message to prevent URL secrets from leaking in logs
1621 raw_error = f"HTTP {http_response.status_code}: {http_response.text}"
1622 error_message = sanitize_exception_message(raw_error, auth_query_params_decrypted)
1624 # Log failed A2A call
1625 structured_logger.log(
1626 level="ERROR",
1627 message=f"A2A external call failed: {agent_name}",
1628 component="a2a_service",
1629 user_id=user_id,
1630 user_email=user_email,
1631 correlation_id=correlation_id,
1632 duration_ms=call_duration_ms,
1633 error_details={"error_type": "A2AHTTPError", "error_message": error_message},
1634 metadata={"event": "a2a_call_failed", "agent_name": agent_name, "agent_id": agent_id, "status_code": http_response.status_code},
1635 )
1637 raise A2AAgentError(error_message)
1639 except A2AAgentError:
1640 # Re-raise A2AAgentError without wrapping
1641 if span and error_message:
1642 set_span_error(span, error_message)
1643 raise
1644 except Exception as e:
1645 # Sanitize error message to prevent URL secrets from leaking in logs
1646 error_message = sanitize_exception_message(str(e), auth_query_params_decrypted)
1647 logger.error(f"Failed to invoke A2A agent '{agent_name}': {error_message}")
1648 if span:
1649 set_span_error(span, error_message)
1650 raise A2AAgentError(f"Failed to invoke A2A agent: {error_message}")
1652 finally:
1653 # ═══════════════════════════════════════════════════════════════════════════
1654 # PHASE 3: Record metrics via buffered service (batches writes for performance)
1655 # ═══════════════════════════════════════════════════════════════════════════
1656 end_time = datetime.now(timezone.utc)
1657 response_time = (end_time - start_time).total_seconds()
1659 try:
1660 # First-Party
1661 from mcpgateway.services.metrics_buffer_service import get_metrics_buffer_service # pylint: disable=import-outside-toplevel
1663 metrics_buffer = get_metrics_buffer_service()
1664 metrics_buffer.record_a2a_agent_metric_with_duration(
1665 a2a_agent_id=agent_id,
1666 response_time=response_time,
1667 success=success,
1668 interaction_type=interaction_type,
1669 error_message=error_message,
1670 )
1671 except Exception as metrics_error:
1672 logger.warning(f"Failed to record A2A metrics for '{agent_name}': {metrics_error}")
1674 # Update last interaction timestamp (quick separate write)
1675 try:
1676 with fresh_db_session() as ts_db:
1677 # Reacquire short lock and re-check enabled before writing
1678 db_agent = get_for_update(ts_db, DbA2AAgent, agent_id)
1679 if db_agent and getattr(db_agent, "enabled", False):
1680 db_agent.last_interaction = end_time
1681 ts_db.commit()
1682 except Exception as ts_error:
1683 logger.warning(f"Failed to update last_interaction for '{agent_name}': {ts_error}")
1684 if span:
1685 set_span_attribute(span, "success", success)
1686 set_span_attribute(span, "duration.ms", response_time * 1000)
1688 return response or {"error": error_message}
1690 async def aggregate_metrics(self, db: Session) -> A2AAgentAggregateMetrics:
1691 """Aggregate metrics for all A2A agents.
1693 Combines recent raw metrics (within retention period) with historical
1694 hourly rollups for complete historical coverage. Uses in-memory caching
1695 (10s TTL) to reduce database load under high request rates.
1697 Args:
1698 db: Database session.
1700 Returns:
1701 A2AAgentAggregateMetrics: Aggregated metrics from raw + hourly rollup tables.
1702 """
1703 # Check cache first (if enabled)
1704 # First-Party
1705 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel
1707 if is_cache_enabled():
1708 cached = metrics_cache.get("a2a")
1709 if cached is not None and isinstance(cached, dict):
1710 return A2AAgentAggregateMetrics(**cached)
1712 # Get total/active agent counts from cache (avoids 2 COUNT queries per call)
1713 counts = a2a_stats_cache.get_counts(db)
1714 total_agents = counts["total"]
1715 active_agents = counts["active"]
1717 # Use combined raw + rollup query for full historical coverage
1718 # First-Party
1719 from mcpgateway.services.metrics_query_service import aggregate_metrics_combined # pylint: disable=import-outside-toplevel
1721 result = aggregate_metrics_combined(db, "a2a_agent")
1723 total_interactions = result.total_executions
1724 successful_interactions = result.successful_executions
1725 failed_interactions = result.failed_executions
1727 metrics = A2AAgentAggregateMetrics(
1728 total_agents=total_agents,
1729 active_agents=active_agents,
1730 total_interactions=total_interactions,
1731 successful_interactions=successful_interactions,
1732 failed_interactions=failed_interactions,
1733 success_rate=(successful_interactions / total_interactions * 100) if total_interactions > 0 else 0.0,
1734 avg_response_time=float(result.avg_response_time or 0.0),
1735 min_response_time=float(result.min_response_time or 0.0),
1736 max_response_time=float(result.max_response_time or 0.0),
1737 )
1739 # Cache the result as dict for serialization compatibility (if enabled)
1740 if is_cache_enabled():
1741 metrics_cache.set("a2a", metrics.model_dump())
1743 return metrics
1745 async def reset_metrics(self, db: Session, agent_id: Optional[str] = None) -> None:
1746 """Reset metrics for agents (raw + hourly rollups).
1748 Args:
1749 db: Database session.
1750 agent_id: Optional agent ID to reset metrics for specific agent.
1751 """
1752 if agent_id:
1753 db.execute(delete(A2AAgentMetric).where(A2AAgentMetric.a2a_agent_id == agent_id))
1754 db.execute(delete(A2AAgentMetricsHourly).where(A2AAgentMetricsHourly.a2a_agent_id == agent_id))
1755 else:
1756 db.execute(delete(A2AAgentMetric))
1757 db.execute(delete(A2AAgentMetricsHourly))
1758 db.commit()
1760 # Invalidate metrics cache
1761 # First-Party
1762 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel
1764 metrics_cache.invalidate("a2a")
1766 logger.info("Reset A2A agent metrics" + (f" for agent {agent_id}" if agent_id else ""))
1768 def _prepare_a2a_agent_for_read(self, agent: DbA2AAgent) -> DbA2AAgent:
1769 """Prepare a a2a agent object for A2AAgentRead validation.
1771 Ensures auth_value is in the correct format (encoded string) for the schema.
1773 Args:
1774 agent: A2A Agent database object
1776 Returns:
1777 A2A Agent object with properly formatted auth_value
1778 """
1779 # If auth_value is a dict, encode it to string for GatewayRead schema
1780 if isinstance(agent.auth_value, dict):
1781 agent.auth_value = encode_auth(agent.auth_value)
1782 return agent
1784 def convert_agent_to_read(self, db_agent: DbA2AAgent, include_metrics: bool = False, db: Optional[Session] = None, team_map: Optional[Dict[str, str]] = None) -> A2AAgentRead:
1785 """Convert database model to schema.
1787 Args:
1788 db_agent (DbA2AAgent): Database agent model.
1789 include_metrics (bool): Whether to include metrics in the result. Defaults to False.
1790 Set to False for list operations to avoid N+1 query issues.
1791 db (Optional[Session]): Database session. Only required if team name is not pre-populated
1792 on the db_agent object and team_map is not provided.
1793 team_map (Optional[Dict[str, str]]): Pre-fetched team_id -> team_name mapping.
1794 If provided, avoids N+1 queries for team name lookups in list operations.
1796 Returns:
1797 A2AAgentRead: Agent read schema.
1799 Raises:
1800 A2AAgentNotFoundError: If the provided agent is not found or invalid.
1802 """
1804 if not db_agent:
1805 raise A2AAgentNotFoundError("Agent not found")
1807 # Check if team attribute already exists (pre-populated in batch operations)
1808 # Otherwise use pre-fetched team map if available, otherwise query individually
1809 if not hasattr(db_agent, "team") or db_agent.team is None:
1810 team_id = getattr(db_agent, "team_id", None)
1811 if team_map is not None and team_id:
1812 team_name = team_map.get(team_id)
1813 elif db is not None:
1814 team_name = self._get_team_name(db, team_id)
1815 else:
1816 team_name = None
1817 setattr(db_agent, "team", team_name)
1819 # Compute metrics only if requested (avoids N+1 queries in list operations)
1820 if include_metrics:
1821 total_executions = len(db_agent.metrics)
1822 successful_executions = sum(1 for m in db_agent.metrics if m.is_success)
1823 failed_executions = total_executions - successful_executions
1824 failure_rate = (failed_executions / total_executions * 100) if total_executions > 0 else 0.0
1826 min_response_time = max_response_time = avg_response_time = last_execution_time = None
1827 if db_agent.metrics:
1828 response_times = [m.response_time for m in db_agent.metrics if m.response_time is not None]
1829 if response_times:
1830 min_response_time = min(response_times)
1831 max_response_time = max(response_times)
1832 avg_response_time = sum(response_times) / len(response_times)
1833 last_execution_time = max((m.timestamp for m in db_agent.metrics), default=None)
1835 metrics = A2AAgentMetrics(
1836 total_executions=total_executions,
1837 successful_executions=successful_executions,
1838 failed_executions=failed_executions,
1839 failure_rate=failure_rate,
1840 min_response_time=min_response_time,
1841 max_response_time=max_response_time,
1842 avg_response_time=avg_response_time,
1843 last_execution_time=last_execution_time,
1844 )
1845 else:
1846 metrics = None
1848 # Build dict from ORM model
1849 agent_data = {k: getattr(db_agent, k, None) for k in A2AAgentRead.model_fields.keys()}
1850 agent_data["metrics"] = metrics
1851 agent_data["team"] = getattr(db_agent, "team", None)
1852 # Include auth_query_params for the _mask_query_param_auth validator
1853 agent_data["auth_query_params"] = getattr(db_agent, "auth_query_params", None)
1855 # Validate using Pydantic model
1856 validated_agent = A2AAgentRead.model_validate(agent_data)
1858 # Return masked version (like GatewayRead)
1859 return validated_agent.masked()