Coverage for mcpgateway / services / a2a_service.py: 98%

699 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2# pylint: disable=invalid-name, import-outside-toplevel, unused-import, no-name-in-module 

3"""Location: ./mcpgateway/services/a2a_service.py 

4Copyright 2025 

5SPDX-License-Identifier: Apache-2.0 

6Authors: Mihai Criveti 

7 

8A2A Agent Service 

9 

10This module implements A2A (Agent-to-Agent) agent management for ContextForge. 

11It handles agent registration, listing, retrieval, updates, activation toggling, deletion, 

12and interactions with A2A-compatible agents. 

13""" 

14 

15# Standard 

16import binascii 

17from datetime import datetime, timezone 

18from typing import Any, AsyncGenerator, Dict, List, Optional, Union 

19 

20# Third-Party 

21from pydantic import ValidationError 

22from sqlalchemy import and_, delete, desc, or_, select, update 

23from sqlalchemy.exc import IntegrityError 

24from sqlalchemy.orm import Session 

25 

26# First-Party 

27from mcpgateway.cache.a2a_stats_cache import a2a_stats_cache 

28from mcpgateway.db import A2AAgent as DbA2AAgent 

29from mcpgateway.db import A2AAgentMetric, A2AAgentMetricsHourly, EmailTeam 

30from mcpgateway.db import EmailTeamMember as DbEmailTeamMember 

31from mcpgateway.db import fresh_db_session, get_for_update 

32from mcpgateway.db import Tool as DbTool 

33from mcpgateway.observability import create_span, set_span_attribute, set_span_error 

34from mcpgateway.schemas import A2AAgentAggregateMetrics, A2AAgentCreate, A2AAgentMetrics, A2AAgentRead, A2AAgentUpdate 

35from mcpgateway.services.base_service import BaseService 

36from mcpgateway.services.encryption_service import protect_oauth_config_for_storage 

37from mcpgateway.services.logging_service import LoggingService 

38from mcpgateway.services.metrics_cleanup_service import delete_metrics_in_batches, pause_rollup_during_purge 

39from mcpgateway.services.structured_logger import get_structured_logger 

40from mcpgateway.services.team_management_service import TeamManagementService 

41from mcpgateway.utils.correlation_id import get_correlation_id 

42from mcpgateway.utils.create_slug import slugify 

43from mcpgateway.utils.pagination import unified_paginate 

44from mcpgateway.utils.services_auth import decode_auth, encode_auth 

45from mcpgateway.utils.sqlalchemy_modifier import json_contains_tag_expr 

46from mcpgateway.utils.trace_redaction import is_input_capture_enabled, is_output_capture_enabled, serialize_trace_payload 

47 

48# Cache import (lazy to avoid circular dependencies) 

49_REGISTRY_CACHE = None 

50_TOOL_LOOKUP_CACHE = None 

51 

52 

53def _get_registry_cache(): 

54 """Get registry cache singleton lazily. 

55 

56 Returns: 

57 RegistryCache instance. 

58 """ 

59 global _REGISTRY_CACHE # pylint: disable=global-statement 

60 if _REGISTRY_CACHE is None: 

61 # First-Party 

62 from mcpgateway.cache.registry_cache import registry_cache # pylint: disable=import-outside-toplevel 

63 

64 _REGISTRY_CACHE = registry_cache 

65 return _REGISTRY_CACHE 

66 

67 

68def _get_tool_lookup_cache(): 

69 """Get tool lookup cache singleton lazily. 

70 

71 Returns: 

72 ToolLookupCache instance. 

73 """ 

74 global _TOOL_LOOKUP_CACHE # pylint: disable=global-statement 

75 if _TOOL_LOOKUP_CACHE is None: 

76 # First-Party 

77 from mcpgateway.cache.tool_lookup_cache import tool_lookup_cache # pylint: disable=import-outside-toplevel 

78 

79 _TOOL_LOOKUP_CACHE = tool_lookup_cache 

80 return _TOOL_LOOKUP_CACHE 

81 

82 

83# Initialize logging service first 

84logging_service = LoggingService() 

85logger = logging_service.get_logger(__name__) 

86 

87# Initialize structured logger for A2A lifecycle tracking 

88structured_logger = get_structured_logger("a2a_service") 

89 

90 

91class A2AAgentError(Exception): 

92 """Base class for A2A agent-related errors. 

93 

94 Examples: 

95 >>> try: 

96 ... raise A2AAgentError("Agent operation failed") 

97 ... except A2AAgentError as e: 

98 ... str(e) 

99 'Agent operation failed' 

100 >>> try: 

101 ... raise A2AAgentError("Connection error") 

102 ... except Exception as e: 

103 ... isinstance(e, A2AAgentError) 

104 True 

105 """ 

106 

107 

108class A2AAgentNotFoundError(A2AAgentError): 

109 """Raised when a requested A2A agent is not found. 

110 

111 Examples: 

112 >>> try: 

113 ... raise A2AAgentNotFoundError("Agent 'test-agent' not found") 

114 ... except A2AAgentNotFoundError as e: 

115 ... str(e) 

116 "Agent 'test-agent' not found" 

117 >>> try: 

118 ... raise A2AAgentNotFoundError("No such agent") 

119 ... except A2AAgentError as e: 

120 ... isinstance(e, A2AAgentError) # Should inherit from A2AAgentError 

121 True 

122 """ 

123 

124 

125class A2AAgentNameConflictError(A2AAgentError): 

126 """Raised when an A2A agent name conflicts with an existing one.""" 

127 

128 def __init__(self, name: str, is_active: bool = True, agent_id: Optional[str] = None, visibility: Optional[str] = "public"): 

129 """Initialize an A2AAgentNameConflictError exception. 

130 

131 Creates an exception that indicates an agent name conflict, with additional 

132 context about whether the conflicting agent is active and its ID if known. 

133 

134 Args: 

135 name: The agent name that caused the conflict. 

136 is_active: Whether the conflicting agent is currently active. 

137 agent_id: The ID of the conflicting agent, if known. 

138 visibility: The visibility level of the conflicting agent (private, team, public). 

139 

140 Examples: 

141 >>> error = A2AAgentNameConflictError("test-agent") 

142 >>> error.name 

143 'test-agent' 

144 >>> error.is_active 

145 True 

146 >>> error.agent_id is None 

147 True 

148 >>> "test-agent" in str(error) 

149 True 

150 >>> 

151 >>> # Test inactive agent conflict 

152 >>> error = A2AAgentNameConflictError("inactive-agent", is_active=False, agent_id="agent-123") 

153 >>> error.is_active 

154 False 

155 >>> error.agent_id 

156 'agent-123' 

157 >>> "inactive" in str(error) 

158 True 

159 >>> "agent-123" in str(error) 

160 True 

161 """ 

162 self.name = name 

163 self.is_active = is_active 

164 self.agent_id = agent_id 

165 message = f"{visibility.capitalize()} A2A Agent already exists with name: {name}" 

166 if not is_active: 

167 message += f" (currently inactive, ID: {agent_id})" 

168 super().__init__(message) 

169 

170 

171def _validate_a2a_team_assignment(db: Session, user_email: Optional[str], target_team_id: Optional[str]) -> None: 

172 """Validate team assignment for A2A agent updates. 

173 

174 Args: 

175 db: Database session used for membership checks. 

176 user_email: Requesting user email. When omitted, ownership checks are skipped. 

177 target_team_id: Team identifier to validate. 

178 

179 Raises: 

180 ValueError: If team does not exist or caller lacks ownership. 

181 """ 

182 if not target_team_id: 

183 raise ValueError("Cannot set visibility to 'team' without a team_id") 

184 

185 team = db.query(EmailTeam).filter(EmailTeam.id == target_team_id).first() 

186 if not team: 

187 raise ValueError(f"Team {target_team_id} not found") 

188 

189 if not user_email: 

190 return 

191 

192 membership = ( 

193 db.query(DbEmailTeamMember) 

194 .filter(DbEmailTeamMember.team_id == target_team_id, DbEmailTeamMember.user_email == user_email, DbEmailTeamMember.is_active, DbEmailTeamMember.role == "owner") 

195 .first() 

196 ) 

197 if not membership: 

198 raise ValueError("User membership in team not sufficient for this update.") 

199 

200 

201class A2AAgentService(BaseService): 

202 """Service for managing A2A agents in the gateway. 

203 

204 Provides methods to create, list, retrieve, update, set state, and delete agent records. 

205 Also supports interactions with A2A-compatible agents. 

206 """ 

207 

208 _visibility_model_cls = DbA2AAgent 

209 

210 def __init__(self) -> None: 

211 """Initialize a new A2AAgentService instance.""" 

212 self._initialized = False 

213 self._event_streams: List[AsyncGenerator[str, None]] = [] 

214 

215 async def initialize(self) -> None: 

216 """Initialize the A2A agent service.""" 

217 if not self._initialized: 

218 logger.info("Initializing A2A Agent Service") 

219 self._initialized = True 

220 

221 async def shutdown(self) -> None: 

222 """Shutdown the A2A agent service and cleanup resources.""" 

223 if self._initialized: 

224 logger.info("Shutting down A2A Agent Service") 

225 self._initialized = False 

226 

227 def _get_team_name(self, db: Session, team_id: Optional[str]) -> Optional[str]: 

228 """Retrieve the team name given a team ID. 

229 

230 Args: 

231 db (Session): Database session for querying teams. 

232 team_id (Optional[str]): The ID of the team. 

233 

234 Returns: 

235 Optional[str]: The name of the team if found, otherwise None. 

236 """ 

237 if not team_id: 

238 return None 

239 

240 team = db.query(EmailTeam).filter(EmailTeam.id == team_id, EmailTeam.is_active.is_(True)).first() 

241 db.commit() # Release transaction to avoid idle-in-transaction 

242 return team.name if team else None 

243 

244 def _batch_get_team_names(self, db: Session, team_ids: List[str]) -> Dict[str, str]: 

245 """Batch retrieve team names for multiple team IDs. 

246 

247 This method fetches team names in a single query to avoid N+1 issues 

248 when converting multiple agents to schemas in list operations. 

249 

250 Args: 

251 db (Session): Database session for querying teams. 

252 team_ids (List[str]): List of team IDs to look up. 

253 

254 Returns: 

255 Dict[str, str]: Mapping of team_id -> team_name for active teams. 

256 """ 

257 if not team_ids: 

258 return {} 

259 

260 # Single query for all teams 

261 teams = db.query(EmailTeam.id, EmailTeam.name).filter(EmailTeam.id.in_(team_ids), EmailTeam.is_active.is_(True)).all() 

262 

263 return {team.id: team.name for team in teams} 

264 

265 def _check_agent_access( 

266 self, 

267 agent: DbA2AAgent, 

268 user_email: Optional[str], 

269 token_teams: Optional[List[str]], 

270 ) -> bool: 

271 """Check if user has access to agent based on visibility rules. 

272 

273 Access rules (matching tools/resources/prompts): 

274 - public visibility: Always allowed 

275 - token_teams is None AND user_email is None: Admin bypass (unrestricted access) 

276 - No user context (but not admin): Deny access to non-public agents 

277 - team visibility: Allowed if agent.team_id in token_teams 

278 - private visibility: Allowed if owner (requires user_email and non-empty token_teams) 

279 

280 Args: 

281 agent: The agent to check access for 

282 user_email: User's email for owner matching 

283 token_teams: Teams from JWT. None = admin bypass, [] = public-only (no owner access) 

284 

285 Returns: 

286 True if access allowed, False otherwise. 

287 """ 

288 # Public agents are accessible by everyone 

289 if agent.visibility == "public": 

290 return True 

291 

292 # Admin bypass: token_teams=None AND user_email=None means unrestricted admin 

293 # This happens when is_admin=True and no team scoping in token 

294 if token_teams is None and user_email is None: 

295 return True 

296 

297 # No user context (but not admin) = deny access to non-public agents 

298 if not user_email: 

299 return False 

300 

301 # Public-only tokens (empty teams array) can ONLY access public agents 

302 is_public_only_token = token_teams is not None and len(token_teams) == 0 

303 if is_public_only_token: 

304 return False # Already checked public above 

305 

306 # Owner can access their own private agents 

307 if agent.visibility == "private" and agent.owner_email and agent.owner_email == user_email: 

308 return True 

309 

310 # Team agents: check team membership 

311 # token_teams=None means admin bypass — allow all team agents 

312 # ([] already handled by public-only check above) 

313 if agent.visibility == "team": 

314 if token_teams is None: 

315 return True 

316 return agent.team_id in token_teams 

317 

318 return False 

319 

320 async def register_agent( 

321 self, 

322 db: Session, 

323 agent_data: A2AAgentCreate, 

324 created_by: Optional[str] = None, 

325 created_from_ip: Optional[str] = None, 

326 created_via: Optional[str] = None, 

327 created_user_agent: Optional[str] = None, 

328 import_batch_id: Optional[str] = None, 

329 federation_source: Optional[str] = None, 

330 team_id: Optional[str] = None, 

331 owner_email: Optional[str] = None, 

332 visibility: Optional[str] = "public", 

333 ) -> A2AAgentRead: 

334 """Register a new A2A agent. 

335 

336 Args: 

337 db (Session): Database session. 

338 agent_data (A2AAgentCreate): Data required to create an agent. 

339 created_by (Optional[str]): User who created the agent. 

340 created_from_ip (Optional[str]): IP address of the creator. 

341 created_via (Optional[str]): Method used for creation (e.g., API, import). 

342 created_user_agent (Optional[str]): User agent of the creation request. 

343 import_batch_id (Optional[str]): UUID of a bulk import batch. 

344 federation_source (Optional[str]): Source gateway for federated agents. 

345 team_id (Optional[str]): ID of the team to assign the agent to. 

346 owner_email (Optional[str]): Email of the agent owner. 

347 visibility (Optional[str]): Visibility level ('public', 'team', 'private'). 

348 

349 Returns: 

350 A2AAgentRead: The created agent object. 

351 

352 Raises: 

353 A2AAgentNameConflictError: If another agent with the same name already exists. 

354 IntegrityError: If a database constraint or integrity violation occurs. 

355 ValueError: If invalid configuration or data is provided. 

356 A2AAgentError: For any other unexpected errors during registration. 

357 

358 Examples: 

359 # TODO 

360 """ 

361 with create_span( 

362 "a2a.register", 

363 { 

364 "a2a.agent.name": agent_data.name, 

365 "a2a.agent.type": agent_data.agent_type, 

366 "created_by": created_by, 

367 "team_id": team_id, 

368 "visibility": visibility, 

369 }, 

370 ) as span: 

371 try: 

372 agent_data.slug = slugify(agent_data.name) 

373 # Check for existing server with the same slug within the same team or public scope 

374 if visibility.lower() == "public": 

375 logger.info(f"visibility.lower(): {visibility.lower()}") 

376 logger.info(f"agent_data.name: {agent_data.name}") 

377 logger.info(f"agent_data.slug: {agent_data.slug}") 

378 # Check for existing public a2a agent with the same slug 

379 existing_agent = get_for_update(db, DbA2AAgent, where=and_(DbA2AAgent.slug == agent_data.slug, DbA2AAgent.visibility == "public")) 

380 if existing_agent: 

381 raise A2AAgentNameConflictError(name=agent_data.slug, is_active=existing_agent.enabled, agent_id=existing_agent.id, visibility=existing_agent.visibility) 

382 elif visibility.lower() == "team" and team_id: 

383 # Check for existing team a2a agent with the same slug 

384 existing_agent = get_for_update(db, DbA2AAgent, where=and_(DbA2AAgent.slug == agent_data.slug, DbA2AAgent.visibility == "team", DbA2AAgent.team_id == team_id)) 

385 if existing_agent: 

386 raise A2AAgentNameConflictError(name=agent_data.slug, is_active=existing_agent.enabled, agent_id=existing_agent.id, visibility=existing_agent.visibility) 

387 

388 auth_type = getattr(agent_data, "auth_type", None) 

389 # Support multiple custom headers 

390 auth_value = getattr(agent_data, "auth_value", {}) 

391 

392 # authentication_headers: Optional[Dict[str, str]] = None 

393 

394 if hasattr(agent_data, "auth_headers") and agent_data.auth_headers: 

395 # Convert list of {key, value} to dict 

396 header_dict = {h["key"]: h["value"] for h in agent_data.auth_headers if h.get("key")} 

397 # Keep encoded form for persistence, but pass raw headers for initialization 

398 auth_value = encode_auth(header_dict) # Encode the dict for consistency 

399 # authentication_headers = {str(k): str(v) for k, v in header_dict.items()} 

400 # elif isinstance(auth_value, str) and auth_value: 

401 # # Decode persisted auth for initialization 

402 # decoded = decode_auth(auth_value) 

403 # authentication_headers = {str(k): str(v) for k, v in decoded.items()} 

404 else: 

405 # authentication_headers = None 

406 pass 

407 # auth_value = {} 

408 

409 oauth_config = await protect_oauth_config_for_storage(getattr(agent_data, "oauth_config", None)) 

410 

411 # Handle query_param auth - encrypt and prepare for storage 

412 auth_query_params_encrypted: Optional[Dict[str, str]] = None 

413 if auth_type == "query_param": 

414 # Standard 

415 from urllib.parse import urlparse # pylint: disable=import-outside-toplevel 

416 

417 # First-Party 

418 from mcpgateway.config import settings # pylint: disable=import-outside-toplevel 

419 

420 # Service-layer enforcement: Check feature flag 

421 if not settings.insecure_allow_queryparam_auth: 

422 raise ValueError("Query parameter authentication is disabled. Set INSECURE_ALLOW_QUERYPARAM_AUTH=true to enable.") 

423 

424 # Service-layer enforcement: Check host allowlist 

425 if settings.insecure_queryparam_auth_allowed_hosts: 

426 parsed = urlparse(str(agent_data.endpoint_url)) 

427 hostname = (parsed.hostname or "").lower() 

428 allowed_hosts = [h.lower() for h in settings.insecure_queryparam_auth_allowed_hosts] 

429 if hostname not in allowed_hosts: 

430 allowed = ", ".join(settings.insecure_queryparam_auth_allowed_hosts) 

431 raise ValueError(f"Host '{hostname}' is not in the allowed hosts for query param auth. " f"Allowed: {allowed}") 

432 

433 # Extract and encrypt query param auth 

434 param_key = getattr(agent_data, "auth_query_param_key", None) 

435 param_value = getattr(agent_data, "auth_query_param_value", None) 

436 if param_key and param_value: 

437 # Handle SecretStr 

438 if hasattr(param_value, "get_secret_value"): 

439 raw_value = param_value.get_secret_value() 

440 else: 

441 raw_value = str(param_value) 

442 # Encrypt for storage 

443 encrypted_value = encode_auth({param_key: raw_value}) 

444 auth_query_params_encrypted = {param_key: encrypted_value} 

445 # Query param auth doesn't use auth_value 

446 auth_value = None 

447 

448 # Create new agent 

449 new_agent = DbA2AAgent( 

450 name=agent_data.name, 

451 description=agent_data.description, 

452 endpoint_url=agent_data.endpoint_url, 

453 agent_type=agent_data.agent_type, 

454 protocol_version=agent_data.protocol_version, 

455 capabilities=agent_data.capabilities, 

456 config=agent_data.config, 

457 auth_type=auth_type, 

458 auth_value=auth_value, # This should be encrypted in practice 

459 auth_query_params=auth_query_params_encrypted, # Encrypted query param auth 

460 oauth_config=oauth_config, 

461 tags=agent_data.tags, 

462 passthrough_headers=getattr(agent_data, "passthrough_headers", None), 

463 # Team scoping fields - use schema values if provided, otherwise fallback to parameters 

464 team_id=getattr(agent_data, "team_id", None) or team_id, 

465 owner_email=getattr(agent_data, "owner_email", None) or owner_email or created_by, 

466 # Endpoint visibility parameter takes precedence over schema default 

467 visibility=visibility if visibility is not None else getattr(agent_data, "visibility", "public"), 

468 created_by=created_by, 

469 created_from_ip=created_from_ip, 

470 created_via=created_via, 

471 created_user_agent=created_user_agent, 

472 import_batch_id=import_batch_id, 

473 federation_source=federation_source, 

474 ) 

475 

476 db.add(new_agent) 

477 # Commit agent FIRST to ensure it persists even if tool creation fails 

478 # This is critical because ToolService.register_tool calls db.rollback() 

479 # on error, which would undo a pending (flushed but uncommitted) agent 

480 db.commit() 

481 db.refresh(new_agent) 

482 

483 # Invalidate caches since agent count changed 

484 # Wrapped in try/except to ensure cache failures don't fail the request 

485 # when the agent is already successfully committed 

486 try: 

487 a2a_stats_cache.invalidate() 

488 cache = _get_registry_cache() 

489 await cache.invalidate_agents() 

490 # Also invalidate tags cache since agent tags may have changed 

491 # First-Party 

492 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel 

493 

494 await admin_stats_cache.invalidate_tags() 

495 # First-Party 

496 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel 

497 

498 metrics_cache.invalidate("a2a") 

499 except Exception as cache_error: 

500 logger.warning(f"Cache invalidation failed after agent commit: {cache_error}") 

501 

502 # Automatically create a tool for the A2A agent if not already present 

503 # Tool creation is wrapped in try/except to ensure agent registration succeeds 

504 # even if tool creation fails (e.g., due to visibility or permission issues) 

505 tool_db = None 

506 try: 

507 # First-Party 

508 from mcpgateway.services.tool_service import tool_service 

509 

510 tool_db = await tool_service.create_tool_from_a2a_agent( 

511 db=db, 

512 agent=new_agent, 

513 created_by=created_by, 

514 created_from_ip=created_from_ip, 

515 created_via=created_via, 

516 created_user_agent=created_user_agent, 

517 ) 

518 

519 # Associate the tool with the agent using the relationship 

520 # This sets both the tool_id foreign key and the tool relationship 

521 new_agent.tool = tool_db 

522 db.commit() 

523 db.refresh(new_agent) 

524 logger.info(f"Registered new A2A agent: {new_agent.name} (ID: {new_agent.id}) with tool ID: {tool_db.id}") 

525 except Exception as tool_error: 

526 # Log the error but don't fail agent registration 

527 # Agent was already committed above, so it persists even if tool creation fails 

528 logger.warning(f"Failed to create tool for A2A agent {new_agent.name}: {tool_error}") 

529 structured_logger.warning( 

530 f"A2A agent '{new_agent.name}' created without tool association", 

531 user_id=created_by, 

532 resource_type="a2a_agent", 

533 resource_id=str(new_agent.id), 

534 custom_fields={"error": str(tool_error), "agent_name": new_agent.name}, 

535 ) 

536 # Refresh the agent to ensure it's in a clean state after any rollback 

537 db.refresh(new_agent) 

538 logger.info(f"Registered new A2A agent: {new_agent.name} (ID: {new_agent.id}) without tool") 

539 

540 # Log A2A agent registration for lifecycle tracking 

541 structured_logger.info( 

542 f"A2A agent '{new_agent.name}' registered successfully", 

543 user_id=created_by, 

544 user_email=owner_email, 

545 team_id=team_id, 

546 resource_type="a2a_agent", 

547 resource_id=str(new_agent.id), 

548 resource_action="create", 

549 custom_fields={ 

550 "agent_name": new_agent.name, 

551 "agent_type": new_agent.agent_type, 

552 "protocol_version": new_agent.protocol_version, 

553 "visibility": visibility, 

554 "endpoint_url": new_agent.endpoint_url, 

555 }, 

556 ) 

557 

558 if span: 

559 set_span_attribute(span, "success", True) 

560 set_span_attribute(span, "a2a.agent.id", str(new_agent.id)) 

561 return self.convert_agent_to_read(new_agent, db=db) 

562 

563 except A2AAgentNameConflictError as ie: 

564 set_span_error(span, ie) 

565 db.rollback() 

566 raise ie 

567 except IntegrityError as ie: 

568 set_span_error(span, ie) 

569 db.rollback() 

570 logger.error(f"IntegrityErrors in group: {ie}") 

571 raise ie 

572 except ValueError as ve: 

573 set_span_error(span, ve) 

574 raise ve 

575 except Exception as e: 

576 set_span_error(span, e) 

577 db.rollback() 

578 raise A2AAgentError(f"Failed to register A2A agent: {str(e)}") 

579 

580 async def list_agents( 

581 self, 

582 db: Session, 

583 cursor: Optional[str] = None, 

584 include_inactive: bool = False, 

585 tags: Optional[List[str]] = None, 

586 limit: Optional[int] = None, 

587 page: Optional[int] = None, 

588 per_page: Optional[int] = None, 

589 user_email: Optional[str] = None, 

590 token_teams: Optional[List[str]] = None, 

591 team_id: Optional[str] = None, 

592 visibility: Optional[str] = None, 

593 ) -> Union[tuple[List[A2AAgentRead], Optional[str]], Dict[str, Any]]: 

594 """List A2A agents with cursor pagination and optional team filtering. 

595 

596 Args: 

597 db: Database session. 

598 cursor: Pagination cursor for keyset pagination. 

599 include_inactive: Whether to include inactive agents. 

600 tags: List of tags to filter by. 

601 limit: Maximum number of agents to return. None for default, 0 for unlimited. 

602 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor. 

603 per_page: Items per page for page-based pagination. Defaults to pagination_default_page_size. 

604 user_email: Email of user for owner matching in visibility checks. 

605 token_teams: Teams from JWT token. None = admin (no filtering), 

606 [] = public-only, [...] = team-scoped access. 

607 team_id: Optional team ID to filter by specific team. 

608 visibility: Optional visibility filter (private, team, public). 

609 

610 Returns: 

611 If page is provided: Dict with {"data": [...], "pagination": {...}, "links": {...}} 

612 If cursor is provided or neither: tuple of (list of A2AAgentRead objects, next_cursor). 

613 

614 Examples: 

615 >>> from mcpgateway.services.a2a_service import A2AAgentService 

616 >>> from unittest.mock import MagicMock 

617 >>> from mcpgateway.schemas import A2AAgentRead 

618 >>> import asyncio 

619 

620 >>> service = A2AAgentService() 

621 >>> db = MagicMock() 

622 

623 >>> # Mock a single agent object returned by the DB 

624 >>> agent_obj = MagicMock() 

625 >>> db.execute.return_value.scalars.return_value.all.return_value = [agent_obj] 

626 

627 >>> # Mock the A2AAgentRead schema to return a masked string 

628 >>> mocked_agent_read = MagicMock() 

629 >>> mocked_agent_read.masked.return_value = 'agent_read' 

630 >>> A2AAgentRead.model_validate = MagicMock(return_value=mocked_agent_read) 

631 

632 >>> # Run the service method 

633 >>> agents, cursor = asyncio.run(service.list_agents(db)) 

634 >>> agents == ['agent_read'] and cursor is None 

635 True 

636 

637 >>> # Test include_inactive parameter (same mock works) 

638 >>> agents_with_inactive, cursor = asyncio.run(service.list_agents(db, include_inactive=True)) 

639 >>> agents_with_inactive == ['agent_read'] and cursor is None 

640 True 

641 

642 >>> # Test empty result 

643 >>> db.execute.return_value.scalars.return_value.all.return_value = [] 

644 >>> empty_agents, cursor = asyncio.run(service.list_agents(db)) 

645 >>> empty_agents == [] and cursor is None 

646 True 

647 

648 """ 

649 # ══════════════════════════════════════════════════════════════════════ 

650 # CACHE READ: Skip cache when ANY access filtering is applied 

651 # This prevents leaking admin-level results to filtered requests 

652 # Cache only when: user_email is None AND token_teams is None AND page is None 

653 # ══════════════════════════════════════════════════════════════════════ 

654 cache = _get_registry_cache() 

655 if cursor is None and user_email is None and token_teams is None and page is None: 

656 filters_hash = cache.hash_filters(include_inactive=include_inactive, tags=sorted(tags) if tags else None, visibility=visibility) 

657 cached = await cache.get("agents", filters_hash) 

658 if cached is not None: 

659 # Reconstruct A2AAgentRead objects from cached dicts 

660 cached_agents = [A2AAgentRead.model_validate(a).masked() for a in cached["agents"]] 

661 return (cached_agents, cached.get("next_cursor")) 

662 

663 # Build base query with ordering 

664 query = select(DbA2AAgent).order_by(desc(DbA2AAgent.created_at), desc(DbA2AAgent.id)) 

665 

666 # Apply active/inactive filter 

667 if not include_inactive: 

668 query = query.where(DbA2AAgent.enabled) 

669 

670 query = await self._apply_access_control(query, db, user_email, token_teams, team_id) 

671 

672 if visibility: 

673 query = query.where(DbA2AAgent.visibility == visibility) 

674 

675 # Add tag filtering if tags are provided (supports both List[str] and List[Dict] formats) 

676 if tags: 

677 query = query.where(json_contains_tag_expr(db, DbA2AAgent.tags, tags, match_any=True)) 

678 

679 # Use unified pagination helper - handles both page and cursor pagination 

680 pag_result = await unified_paginate( 

681 db=db, 

682 query=query, 

683 page=page, 

684 per_page=per_page, 

685 cursor=cursor, 

686 limit=limit, 

687 base_url="/admin/a2a", # Used for page-based links 

688 query_params={"include_inactive": include_inactive} if include_inactive else {}, 

689 ) 

690 

691 next_cursor = None 

692 # Extract servers based on pagination type 

693 if page is not None: 

694 # Page-based: pag_result is a dict 

695 a2a_agents_db = pag_result["data"] 

696 else: 

697 # Cursor-based: pag_result is a tuple 

698 a2a_agents_db, next_cursor = pag_result 

699 

700 # Fetch team names for the agents (common for both pagination types) 

701 team_ids_set = {s.team_id for s in a2a_agents_db if s.team_id} 

702 team_map = {} 

703 if team_ids_set: 

704 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(team_ids_set), EmailTeam.is_active.is_(True))).all() 

705 team_map = {team.id: team.name for team in teams} 

706 

707 db.commit() # Release transaction to avoid idle-in-transaction 

708 

709 # Convert to A2AAgentRead (common for both pagination types) 

710 result = [] 

711 for s in a2a_agents_db: 

712 try: 

713 s.team = team_map.get(s.team_id) if s.team_id else None 

714 result.append(self.convert_agent_to_read(s, include_metrics=False, db=db, team_map=team_map)) 

715 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e: 

716 logger.exception(f"Failed to convert A2A agent {getattr(s, 'id', 'unknown')} ({getattr(s, 'name', 'unknown')}): {e}") 

717 # Continue with remaining agents instead of failing completely 

718 

719 # Return appropriate format based on pagination type 

720 if page is not None: 

721 # Page-based format 

722 return { 

723 "data": result, 

724 "pagination": pag_result["pagination"], 

725 "links": pag_result["links"], 

726 } 

727 

728 # Cursor-based format 

729 

730 # ══════════════════════════════════════════════════════════════════════ 

731 # CACHE WRITE: Only cache admin-level results (matches read guard) 

732 # MUST check token_teams is None to prevent caching scoped responses 

733 # ══════════════════════════════════════════════════════════════════════ 

734 if cursor is None and user_email is None and token_teams is None: 

735 try: 

736 cache_data = {"agents": [s.model_dump(mode="json") for s in result], "next_cursor": next_cursor} 

737 await cache.set("agents", cache_data, filters_hash) 

738 except AttributeError: 

739 pass # Skip caching if result objects don't support model_dump (e.g., in doctests) 

740 

741 return (result, next_cursor) 

742 

743 async def list_agents_for_user( 

744 self, db: Session, user_info: Dict[str, Any], team_id: Optional[str] = None, visibility: Optional[str] = None, include_inactive: bool = False, skip: int = 0, limit: int = 100 

745 ) -> List[A2AAgentRead]: 

746 """ 

747 DEPRECATED: Use list_agents() with user_email parameter instead. 

748 

749 This method is maintained for backward compatibility but is no longer used. 

750 New code should call list_agents() with user_email, team_id, and visibility parameters. 

751 

752 List A2A agents user has access to with team filtering. 

753 

754 Args: 

755 db: Database session 

756 user_info: Object representing identity of the user who is requesting agents 

757 team_id: Optional team ID to filter by specific team 

758 visibility: Optional visibility filter (private, team, public) 

759 include_inactive: Whether to include inactive agents 

760 skip: Number of agents to skip for pagination 

761 limit: Maximum number of agents to return 

762 

763 Returns: 

764 List[A2AAgentRead]: A2A agents the user has access to 

765 """ 

766 

767 # Handle case where user_info is a string (email) instead of dict (<0.7.0) 

768 if isinstance(user_info, str): 

769 user_email = str(user_info) 

770 else: 

771 user_email = user_info.get("email", "") 

772 

773 # Build query following existing patterns from list_prompts() 

774 team_service = TeamManagementService(db) 

775 user_teams = await team_service.get_user_teams(user_email) 

776 team_ids = [team.id for team in user_teams] 

777 

778 # Build query following existing patterns from list_agents() 

779 query = select(DbA2AAgent) 

780 

781 # Apply active/inactive filter 

782 if not include_inactive: 

783 query = query.where(DbA2AAgent.enabled.is_(True)) 

784 

785 if team_id: 

786 if team_id not in team_ids: 

787 return [] # No access to team 

788 

789 access_conditions = [] 

790 # Filter by specific team 

791 access_conditions.append(and_(DbA2AAgent.team_id == team_id, DbA2AAgent.visibility.in_(["team", "public"]))) 

792 

793 access_conditions.append(and_(DbA2AAgent.team_id == team_id, DbA2AAgent.owner_email == user_email)) 

794 

795 query = query.where(or_(*access_conditions)) 

796 else: 

797 # Get user's accessible teams 

798 # Build access conditions following existing patterns 

799 access_conditions = [] 

800 # 1. User's personal resources (owner_email matches) 

801 access_conditions.append(DbA2AAgent.owner_email == user_email) 

802 # 2. Team A2A Agents where user is member 

803 if team_ids: 

804 access_conditions.append(and_(DbA2AAgent.team_id.in_(team_ids), DbA2AAgent.visibility.in_(["team", "public"]))) 

805 # 3. Public resources (if visibility allows) 

806 access_conditions.append(DbA2AAgent.visibility == "public") 

807 

808 query = query.where(or_(*access_conditions)) 

809 

810 # Apply visibility filter if specified 

811 if visibility: 

812 query = query.where(DbA2AAgent.visibility == visibility) 

813 

814 # Apply pagination following existing patterns 

815 query = query.order_by(desc(DbA2AAgent.created_at)) 

816 query = query.offset(skip).limit(limit) 

817 

818 agents = db.execute(query).scalars().all() 

819 

820 # Batch fetch team names to avoid N+1 queries 

821 team_ids = list({a.team_id for a in agents if a.team_id}) 

822 team_map = self._batch_get_team_names(db, team_ids) 

823 

824 db.commit() # Release transaction to avoid idle-in-transaction 

825 

826 # Skip metrics to avoid N+1 queries in list operations 

827 result = [] 

828 for agent in agents: 

829 try: 

830 result.append(self.convert_agent_to_read(agent, include_metrics=False, db=db, team_map=team_map)) 

831 except (ValidationError, ValueError, KeyError, TypeError, binascii.Error) as e: 

832 logger.exception(f"Failed to convert A2A agent {getattr(agent, 'id', 'unknown')} ({getattr(agent, 'name', 'unknown')}): {e}") 

833 # Continue with remaining agents instead of failing completely 

834 

835 return result 

836 

837 async def get_agent( 

838 self, 

839 db: Session, 

840 agent_id: str, 

841 include_inactive: bool = True, 

842 user_email: Optional[str] = None, 

843 token_teams: Optional[List[str]] = None, 

844 ) -> A2AAgentRead: 

845 """Retrieve an A2A agent by ID. 

846 

847 Args: 

848 db: Database session. 

849 agent_id: Agent ID. 

850 include_inactive: Whether to include inactive a2a agents. 

851 user_email: User's email for owner matching in visibility checks. 

852 token_teams: Teams from JWT token. None = admin (no filtering), 

853 [] = public-only, [...] = team-scoped access. 

854 

855 Returns: 

856 Agent data. 

857 

858 Raises: 

859 A2AAgentNotFoundError: If the agent is not found or user lacks access. 

860 

861 Examples: 

862 >>> from unittest.mock import MagicMock 

863 >>> from datetime import datetime 

864 >>> import asyncio 

865 >>> from mcpgateway.schemas import A2AAgentRead 

866 >>> from mcpgateway.services.a2a_service import A2AAgentService, A2AAgentNotFoundError 

867 

868 >>> service = A2AAgentService() 

869 >>> db = MagicMock() 

870 

871 >>> # Create a mock agent 

872 >>> agent_mock = MagicMock() 

873 >>> agent_mock.enabled = True 

874 >>> agent_mock.id = "agent_id" 

875 >>> agent_mock.name = "Test Agent" 

876 >>> agent_mock.slug = "test-agent" 

877 >>> agent_mock.description = "A2A test agent" 

878 >>> agent_mock.endpoint_url = "https://example.com" 

879 >>> agent_mock.agent_type = "rest" 

880 >>> agent_mock.protocol_version = "v1" 

881 >>> agent_mock.capabilities = {} 

882 >>> agent_mock.config = {} 

883 >>> agent_mock.reachable = True 

884 >>> agent_mock.created_at = datetime.now() 

885 >>> agent_mock.updated_at = datetime.now() 

886 >>> agent_mock.last_interaction = None 

887 >>> agent_mock.tags = [] 

888 >>> agent_mock.metrics = MagicMock() 

889 >>> agent_mock.metrics.success_rate = 1.0 

890 >>> agent_mock.metrics.failure_rate = 0.0 

891 >>> agent_mock.metrics.last_error = None 

892 >>> agent_mock.auth_type = None 

893 >>> agent_mock.auth_value = None 

894 >>> agent_mock.oauth_config = None 

895 >>> agent_mock.created_by = "user" 

896 >>> agent_mock.created_from_ip = "127.0.0.1" 

897 >>> agent_mock.created_via = "ui" 

898 >>> agent_mock.created_user_agent = "test-agent" 

899 >>> agent_mock.modified_by = "user" 

900 >>> agent_mock.modified_from_ip = "127.0.0.1" 

901 >>> agent_mock.modified_via = "ui" 

902 >>> agent_mock.modified_user_agent = "test-agent" 

903 >>> agent_mock.import_batch_id = None 

904 >>> agent_mock.federation_source = None 

905 >>> agent_mock.team_id = "team-1" 

906 >>> agent_mock.team = "Team 1" 

907 >>> agent_mock.owner_email = "owner@example.com" 

908 >>> agent_mock.visibility = "public" 

909 

910 >>> db.get.return_value = agent_mock 

911 

912 >>> # Mock convert_agent_to_read to simplify test 

913 >>> service.convert_agent_to_read = lambda db_agent, **kwargs: 'agent_read' 

914 

915 >>> # Test with active agent 

916 >>> result = asyncio.run(service.get_agent(db, 'agent_id')) 

917 >>> result 

918 'agent_read' 

919 

920 >>> # Test with inactive agent but include_inactive=True 

921 >>> agent_mock.enabled = False 

922 >>> result_inactive = asyncio.run(service.get_agent(db, 'agent_id', include_inactive=True)) 

923 >>> result_inactive 

924 'agent_read' 

925 

926 """ 

927 query = select(DbA2AAgent).where(DbA2AAgent.id == agent_id) 

928 agent = db.execute(query).scalar_one_or_none() 

929 

930 if not agent: 

931 raise A2AAgentNotFoundError(f"A2A Agent not found with ID: {agent_id}") 

932 

933 if not agent.enabled and not include_inactive: 

934 raise A2AAgentNotFoundError(f"A2A Agent not found with ID: {agent_id}") 

935 

936 # SECURITY: Check visibility/team access 

937 # Return 404 (not 403) to avoid leaking existence of private agents 

938 if not self._check_agent_access(agent, user_email, token_teams): 

939 raise A2AAgentNotFoundError(f"A2A Agent not found with ID: {agent_id}") 

940 

941 # Delegate conversion and masking to convert_agent_to_read() 

942 return self.convert_agent_to_read(agent, db=db) 

943 

944 async def get_agent_by_name(self, db: Session, agent_name: str) -> A2AAgentRead: 

945 """Retrieve an A2A agent by name. 

946 

947 Args: 

948 db: Database session. 

949 agent_name: Agent name. 

950 

951 Returns: 

952 Agent data. 

953 

954 Raises: 

955 A2AAgentNotFoundError: If the agent is not found. 

956 """ 

957 query = select(DbA2AAgent).where(DbA2AAgent.name == agent_name) 

958 agent = db.execute(query).scalar_one_or_none() 

959 

960 if not agent: 

961 raise A2AAgentNotFoundError(f"A2A Agent not found with name: {agent_name}") 

962 

963 return self.convert_agent_to_read(agent, db=db) 

964 

965 async def update_agent( 

966 self, 

967 db: Session, 

968 agent_id: str, 

969 agent_data: A2AAgentUpdate, 

970 modified_by: Optional[str] = None, 

971 modified_from_ip: Optional[str] = None, 

972 modified_via: Optional[str] = None, 

973 modified_user_agent: Optional[str] = None, 

974 user_email: Optional[str] = None, 

975 ) -> A2AAgentRead: 

976 """Update an existing A2A agent. 

977 

978 Args: 

979 db: Database session. 

980 agent_id: Agent ID. 

981 agent_data: Agent update data. 

982 modified_by: Username who modified this agent. 

983 modified_from_ip: IP address of modifier. 

984 modified_via: Modification method. 

985 modified_user_agent: User agent of modification request. 

986 user_email: Email of user performing update (for ownership check). 

987 

988 Returns: 

989 Updated agent data. 

990 

991 Raises: 

992 A2AAgentNotFoundError: If the agent is not found. 

993 PermissionError: If user doesn't own the agent. 

994 A2AAgentNameConflictError: If name conflicts with another agent. 

995 A2AAgentError: For other errors during update. 

996 IntegrityError: If a database integrity error occurs. 

997 ValueError: If query_param auth is disabled or host not in allowlist. 

998 """ 

999 try: 

1000 # Acquire row lock for update to avoid lost-update on `version` and other fields 

1001 agent = get_for_update(db, DbA2AAgent, agent_id) 

1002 

1003 if not agent: 

1004 raise A2AAgentNotFoundError(f"A2A Agent not found with ID: {agent_id}") 

1005 

1006 # Check ownership if user_email provided 

1007 if user_email: 

1008 # First-Party 

1009 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel 

1010 

1011 permission_service = PermissionService(db) 

1012 if not await permission_service.check_resource_ownership(user_email, agent): 

1013 raise PermissionError("Only the owner can update this agent") 

1014 # Check for name conflict if name is being updated 

1015 if agent_data.name and agent_data.name != agent.name: 

1016 new_slug = slugify(agent_data.name) 

1017 visibility = agent_data.visibility or agent.visibility 

1018 team_id = agent_data.team_id or agent.team_id 

1019 # Check for existing server with the same slug within the same team or public scope 

1020 if visibility.lower() == "public": 

1021 # Check for existing public a2a agent with the same slug 

1022 existing_agent = get_for_update(db, DbA2AAgent, where=and_(DbA2AAgent.slug == new_slug, DbA2AAgent.visibility == "public")) 

1023 if existing_agent: 

1024 raise A2AAgentNameConflictError(name=new_slug, is_active=existing_agent.enabled, agent_id=existing_agent.id, visibility=existing_agent.visibility) 

1025 elif visibility.lower() == "team" and team_id: 

1026 # Check for existing team a2a agent with the same slug 

1027 existing_agent = get_for_update(db, DbA2AAgent, where=and_(DbA2AAgent.slug == new_slug, DbA2AAgent.visibility == "team", DbA2AAgent.team_id == team_id)) 

1028 if existing_agent: 

1029 raise A2AAgentNameConflictError(name=new_slug, is_active=existing_agent.enabled, agent_id=existing_agent.id, visibility=existing_agent.visibility) 

1030 # Update the slug when name changes 

1031 agent.slug = new_slug 

1032 # Update fields 

1033 # Avoid `model_dump()` here: tests use `model_construct()` to create intentionally invalid 

1034 # payloads, and `model_dump()` emits serializer warnings when encountering unexpected types. 

1035 update_data = {field: getattr(agent_data, field) for field in agent_data.model_fields_set} 

1036 

1037 # Track original auth_type and endpoint_url before updates 

1038 original_auth_type = agent.auth_type 

1039 original_endpoint_url = agent.endpoint_url 

1040 

1041 for field, value in update_data.items(): 

1042 if field == "passthrough_headers": 

1043 if value is not None: 

1044 if isinstance(value, list): 

1045 # Clean list: remove empty or whitespace-only entries 

1046 cleaned = [h.strip() for h in value if isinstance(h, str) and h.strip()] 

1047 agent.passthrough_headers = cleaned or None 

1048 elif isinstance(value, str): 

1049 # Parse comma-separated string and clean 

1050 parsed: List[str] = [h.strip() for h in value.split(",") if h.strip()] 

1051 agent.passthrough_headers = parsed or None 

1052 else: 

1053 raise A2AAgentError("Invalid passthrough_headers format: must be list[str] or comma-separated string") 

1054 else: 

1055 # Explicitly set to None if value is None 

1056 agent.passthrough_headers = None 

1057 continue 

1058 

1059 # Skip query_param fields - handled separately below 

1060 if field in ("auth_query_param_key", "auth_query_param_value"): 

1061 continue 

1062 

1063 # auth_headers is on the schema but not the DB model; translate 

1064 # it into auth_value, preserving masked placeholders from the 

1065 # existing encrypted value so an unchanged edit does not 

1066 # overwrite real credentials with the mask string. 

1067 if field == "auth_headers" and value and isinstance(value, list): 

1068 # First-Party 

1069 from mcpgateway.config import settings as _settings # pylint: disable=import-outside-toplevel 

1070 

1071 existing_auth_raw = getattr(agent, "auth_value", None) 

1072 existing_auth: Dict[str, str] = {} 

1073 if isinstance(existing_auth_raw, str): 

1074 try: 

1075 existing_auth = decode_auth(existing_auth_raw) 

1076 except Exception: 

1077 existing_auth = {} 

1078 elif isinstance(existing_auth_raw, dict): 

1079 existing_auth = existing_auth_raw 

1080 

1081 header_dict: Dict[str, str] = {} 

1082 for header in value: 

1083 key = header.get("key") 

1084 if not key: 

1085 continue 

1086 hval = header.get("value", "") 

1087 if hval == _settings.masked_auth_value and key in existing_auth: 

1088 header_dict[key] = existing_auth[key] 

1089 else: 

1090 header_dict[key] = hval 

1091 

1092 if header_dict: 

1093 agent.auth_value = encode_auth(header_dict) 

1094 continue 

1095 

1096 if field == "oauth_config": 

1097 value = await protect_oauth_config_for_storage(value, existing_oauth_config=agent.oauth_config) 

1098 

1099 # Validate team reassignment before persisting 

1100 if field == "team_id" and value is not None and value != agent.team_id: 

1101 _validate_a2a_team_assignment(db, user_email, value) 

1102 

1103 # Validate visibility transition to "team" 

1104 if field == "visibility" and value == "team": 

1105 target_team_id = update_data.get("team_id", agent.team_id) if "team_id" in update_data else agent.team_id 

1106 _validate_a2a_team_assignment(db, user_email, target_team_id) 

1107 

1108 if hasattr(agent, field): 

1109 setattr(agent, field, value) 

1110 

1111 # Handle query_param auth updates 

1112 # Clear auth_query_params when switching away from query_param auth 

1113 if original_auth_type == "query_param" and agent_data.auth_type is not None and agent_data.auth_type != "query_param": 

1114 agent.auth_query_params = None 

1115 logger.debug(f"Cleared auth_query_params for agent {agent.id} (switched from query_param to {agent_data.auth_type})") 

1116 

1117 # Handle switching to query_param auth or updating existing query_param credentials 

1118 is_switching_to_queryparam = agent_data.auth_type == "query_param" and original_auth_type != "query_param" 

1119 is_updating_queryparam_creds = original_auth_type == "query_param" and (agent_data.auth_query_param_key is not None or agent_data.auth_query_param_value is not None) 

1120 is_url_changing = agent_data.endpoint_url is not None and str(agent_data.endpoint_url) != original_endpoint_url 

1121 

1122 if is_switching_to_queryparam or is_updating_queryparam_creds or (is_url_changing and original_auth_type == "query_param"): 

1123 # Standard 

1124 from urllib.parse import urlparse # pylint: disable=import-outside-toplevel 

1125 

1126 # First-Party 

1127 from mcpgateway.config import settings # pylint: disable=import-outside-toplevel 

1128 

1129 # Service-layer enforcement: Check feature flag 

1130 if not settings.insecure_allow_queryparam_auth: 

1131 # Grandfather clause: Allow updates to existing query_param agents 

1132 # unless they're trying to change credentials 

1133 if is_switching_to_queryparam or is_updating_queryparam_creds: 

1134 raise ValueError("Query parameter authentication is disabled. Set INSECURE_ALLOW_QUERYPARAM_AUTH=true to enable.") 

1135 

1136 # Service-layer enforcement: Check host allowlist 

1137 if settings.insecure_queryparam_auth_allowed_hosts: 

1138 check_url = str(agent_data.endpoint_url) if agent_data.endpoint_url else agent.endpoint_url 

1139 parsed = urlparse(check_url) 

1140 hostname = (parsed.hostname or "").lower() 

1141 allowed_hosts = [h.lower() for h in settings.insecure_queryparam_auth_allowed_hosts] 

1142 if hostname not in allowed_hosts: 

1143 allowed = ", ".join(settings.insecure_queryparam_auth_allowed_hosts) 

1144 raise ValueError(f"Host '{hostname}' is not in the allowed hosts for query param auth. " f"Allowed: {allowed}") 

1145 

1146 if is_switching_to_queryparam or is_updating_queryparam_creds: 

1147 # Get query param key and value 

1148 param_key = getattr(agent_data, "auth_query_param_key", None) 

1149 param_value = getattr(agent_data, "auth_query_param_value", None) 

1150 

1151 # If no key provided but value is, reuse existing key (value-only rotation) 

1152 existing_key = next(iter(agent.auth_query_params.keys()), None) if agent.auth_query_params else None 

1153 if not param_key and param_value and existing_key: 

1154 param_key = existing_key 

1155 

1156 if param_key: 

1157 # Check if value is masked (user didn't change it) or new value provided 

1158 is_masked_placeholder = False 

1159 if param_value and hasattr(param_value, "get_secret_value"): 

1160 raw_value = param_value.get_secret_value() 

1161 # First-Party 

1162 from mcpgateway.config import settings # pylint: disable=import-outside-toplevel 

1163 

1164 is_masked_placeholder = raw_value == settings.masked_auth_value 

1165 elif param_value: 

1166 raw_value = str(param_value) 

1167 else: 

1168 raw_value = None 

1169 

1170 if raw_value and not is_masked_placeholder: 

1171 # New value provided - encrypt for storage 

1172 encrypted_value = encode_auth({param_key: raw_value}) 

1173 agent.auth_query_params = {param_key: encrypted_value} 

1174 elif agent.auth_query_params and is_masked_placeholder: 

1175 # Use existing encrypted value (user didn't change the password) 

1176 # But key may have changed, so preserve with new key if different 

1177 if existing_key and existing_key != param_key: 

1178 # Key changed but value is masked - decrypt and re-encrypt with new key 

1179 existing_encrypted = agent.auth_query_params.get(existing_key, "") 

1180 if existing_encrypted: 

1181 decrypted = decode_auth(existing_encrypted) 

1182 existing_value = decrypted.get(existing_key, "") 

1183 if existing_value: 

1184 encrypted_value = encode_auth({param_key: existing_value}) 

1185 agent.auth_query_params = {param_key: encrypted_value} 

1186 

1187 # Update auth_type if switching 

1188 if is_switching_to_queryparam: 

1189 agent.auth_type = "query_param" 

1190 agent.auth_value = None # Query param auth doesn't use auth_value 

1191 

1192 # Update metadata 

1193 if modified_by: 

1194 agent.modified_by = modified_by 

1195 if modified_from_ip: 

1196 agent.modified_from_ip = modified_from_ip 

1197 if modified_via: 

1198 agent.modified_via = modified_via 

1199 if modified_user_agent: 

1200 agent.modified_user_agent = modified_user_agent 

1201 

1202 agent.version += 1 

1203 

1204 db.commit() 

1205 db.refresh(agent) 

1206 

1207 # Invalidate cache after successful update 

1208 cache = _get_registry_cache() 

1209 await cache.invalidate_agents() 

1210 # Also invalidate tags cache since agent tags may have changed 

1211 # First-Party 

1212 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel 

1213 

1214 await admin_stats_cache.invalidate_tags() 

1215 

1216 # Update the associated tool if it exists 

1217 # Wrap in try/except to handle tool sync failures gracefully - the agent 

1218 # update is the primary operation and should succeed even if tool sync fails 

1219 try: 

1220 # First-Party 

1221 from mcpgateway.services.tool_service import tool_service 

1222 

1223 await tool_service.update_tool_from_a2a_agent( 

1224 db=db, 

1225 agent=agent, 

1226 modified_by=modified_by, 

1227 modified_from_ip=modified_from_ip, 

1228 modified_via=modified_via, 

1229 modified_user_agent=modified_user_agent, 

1230 ) 

1231 except Exception as tool_err: 

1232 logger.warning(f"Failed to sync tool for A2A agent {agent.id}: {tool_err}. Agent update succeeded but tool may be out of sync.") 

1233 

1234 logger.info(f"Updated A2A agent: {agent.name} (ID: {agent.id})") 

1235 return self.convert_agent_to_read(agent, db=db) 

1236 except PermissionError: 

1237 db.rollback() 

1238 raise 

1239 except A2AAgentNameConflictError as ie: 

1240 db.rollback() 

1241 raise ie 

1242 except A2AAgentNotFoundError as nf: 

1243 db.rollback() 

1244 raise nf 

1245 except IntegrityError as ie: 

1246 db.rollback() 

1247 logger.error(f"IntegrityErrors in group: {ie}") 

1248 raise ie 

1249 except Exception as e: 

1250 db.rollback() 

1251 raise A2AAgentError(f"Failed to update A2A agent: {str(e)}") 

1252 

1253 async def set_agent_state(self, db: Session, agent_id: str, activate: bool, reachable: Optional[bool] = None, user_email: Optional[str] = None) -> A2AAgentRead: 

1254 """Set the activation status of an A2A agent. 

1255 

1256 Args: 

1257 db: Database session. 

1258 agent_id: Agent ID. 

1259 activate: True to activate, False to deactivate. 

1260 reachable: Optional reachability status. 

1261 user_email: Optional[str] The email of the user to check if the user has permission to modify. 

1262 

1263 Returns: 

1264 Updated agent data. 

1265 

1266 Raises: 

1267 A2AAgentNotFoundError: If the agent is not found. 

1268 PermissionError: If user doesn't own the agent. 

1269 """ 

1270 with create_span( 

1271 "a2a.state_change", 

1272 { 

1273 "a2a.agent.id": agent_id, 

1274 "a2a.agent.activate": activate, 

1275 "user.email": user_email, 

1276 }, 

1277 ) as span: 

1278 try: 

1279 query = select(DbA2AAgent).where(DbA2AAgent.id == agent_id) 

1280 agent = db.execute(query).scalar_one_or_none() 

1281 

1282 if not agent: 

1283 raise A2AAgentNotFoundError(f"A2A Agent not found with ID: {agent_id}") 

1284 

1285 if user_email: 

1286 # First-Party 

1287 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel 

1288 

1289 permission_service = PermissionService(db) 

1290 if not await permission_service.check_resource_ownership(user_email, agent): 

1291 raise PermissionError("Only the owner can activate the Agent" if activate else "Only the owner can deactivate the Agent") 

1292 

1293 agent.enabled = activate 

1294 if reachable is not None: 

1295 agent.reachable = reachable 

1296 

1297 db.commit() 

1298 db.refresh(agent) 

1299 

1300 # Invalidate caches since agent status changed 

1301 a2a_stats_cache.invalidate() 

1302 cache = _get_registry_cache() 

1303 await cache.invalidate_agents() 

1304 

1305 # Cascade: update associated tool's enabled status to match agent. 

1306 # This mirrors gateway_service.set_gateway_state() which lets cascade 

1307 # failures propagate so the caller knows the operation was incomplete. 

1308 if agent.tool_id: 

1309 now = datetime.now(timezone.utc) 

1310 tool_result = db.execute(update(DbTool).where(DbTool.id == agent.tool_id).where(DbTool.enabled != activate).values(enabled=activate, updated_at=now)) 

1311 if tool_result.rowcount > 0: 

1312 db.commit() 

1313 await cache.invalidate_tools() 

1314 tool_lookup_cache = _get_tool_lookup_cache() 

1315 if agent.tool and agent.tool.name: 

1316 await tool_lookup_cache.invalidate(agent.tool.name, gateway_id=str(agent.tool.gateway_id) if agent.tool.gateway_id else None) 

1317 

1318 status = "activated" if activate else "deactivated" 

1319 logger.info(f"A2A agent {status}: {agent.name} (ID: {agent.id})") 

1320 

1321 structured_logger.log( 

1322 level="INFO", 

1323 message=f"A2A agent {status}", 

1324 event_type="a2a_agent_status_changed", 

1325 component="a2a_service", 

1326 user_email=user_email, 

1327 resource_type="a2a_agent", 

1328 resource_id=str(agent.id), 

1329 custom_fields={ 

1330 "agent_name": agent.name, 

1331 "enabled": agent.enabled, 

1332 "reachable": agent.reachable, 

1333 }, 

1334 ) 

1335 

1336 result = self.convert_agent_to_read(agent, db=db) 

1337 if span: 

1338 set_span_attribute(span, "success", True) 

1339 return result 

1340 except Exception as exc: 

1341 set_span_error(span, exc) 

1342 raise 

1343 

1344 async def delete_agent(self, db: Session, agent_id: str, user_email: Optional[str] = None, purge_metrics: bool = False) -> None: 

1345 """Delete an A2A agent. 

1346 

1347 Args: 

1348 db: Database session. 

1349 agent_id: Agent ID. 

1350 user_email: Email of user performing delete (for ownership check). 

1351 purge_metrics: If True, delete raw + rollup metrics for this agent. 

1352 

1353 Raises: 

1354 A2AAgentNotFoundError: If the agent is not found. 

1355 PermissionError: If user doesn't own the agent. 

1356 """ 

1357 with create_span( 

1358 "a2a.delete", 

1359 { 

1360 "a2a.agent.id": agent_id, 

1361 "user.email": user_email, 

1362 "purge_metrics": purge_metrics, 

1363 }, 

1364 ) as span: 

1365 try: 

1366 query = select(DbA2AAgent).where(DbA2AAgent.id == agent_id) 

1367 agent = db.execute(query).scalar_one_or_none() 

1368 

1369 if not agent: 

1370 raise A2AAgentNotFoundError(f"A2A Agent not found with ID: {agent_id}") 

1371 

1372 # Check ownership if user_email provided 

1373 if user_email: 

1374 # First-Party 

1375 from mcpgateway.services.permission_service import PermissionService # pylint: disable=import-outside-toplevel 

1376 

1377 permission_service = PermissionService(db) 

1378 if not await permission_service.check_resource_ownership(user_email, agent): 

1379 raise PermissionError("Only the owner can delete this agent") 

1380 

1381 agent_name = agent.name 

1382 

1383 # Delete the associated tool before deleting the agent 

1384 # First-Party 

1385 from mcpgateway.services.tool_service import tool_service 

1386 

1387 await tool_service.delete_tool_from_a2a_agent(db=db, agent=agent, user_email=user_email, purge_metrics=purge_metrics) 

1388 

1389 if purge_metrics: 

1390 with pause_rollup_during_purge(reason=f"purge_a2a_agent:{agent_id}"): 

1391 delete_metrics_in_batches(db, A2AAgentMetric, A2AAgentMetric.a2a_agent_id, agent_id) 

1392 delete_metrics_in_batches(db, A2AAgentMetricsHourly, A2AAgentMetricsHourly.a2a_agent_id, agent_id) 

1393 db.delete(agent) 

1394 db.commit() 

1395 

1396 # Invalidate caches since agent count changed 

1397 a2a_stats_cache.invalidate() 

1398 cache = _get_registry_cache() 

1399 await cache.invalidate_agents() 

1400 # Also invalidate tags cache since agent tags may have changed 

1401 # First-Party 

1402 from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel 

1403 

1404 await admin_stats_cache.invalidate_tags() 

1405 

1406 logger.info(f"Deleted A2A agent: {agent_name} (ID: {agent_id})") 

1407 

1408 structured_logger.log( 

1409 level="INFO", 

1410 message="A2A agent deleted", 

1411 event_type="a2a_agent_deleted", 

1412 component="a2a_service", 

1413 user_email=user_email, 

1414 resource_type="a2a_agent", 

1415 resource_id=str(agent_id), 

1416 custom_fields={ 

1417 "agent_name": agent_name, 

1418 "purge_metrics": purge_metrics, 

1419 }, 

1420 ) 

1421 if span: 

1422 set_span_attribute(span, "success", True) 

1423 except PermissionError: 

1424 if span: 

1425 set_span_attribute(span, "error", True) 

1426 db.rollback() 

1427 raise 

1428 

1429 async def invoke_agent( 

1430 self, 

1431 db: Session, 

1432 agent_name: str, 

1433 parameters: Dict[str, Any], 

1434 interaction_type: str = "query", 

1435 *, 

1436 user_id: Optional[str] = None, 

1437 user_email: Optional[str] = None, 

1438 token_teams: Optional[List[str]] = None, 

1439 ) -> Dict[str, Any]: 

1440 """Invoke an A2A agent. 

1441 

1442 Args: 

1443 db: Database session. 

1444 agent_name: Name of the agent to invoke. 

1445 parameters: Parameters for the interaction. 

1446 interaction_type: Type of interaction. 

1447 user_id: Identifier of the user initiating the call. 

1448 user_email: Email of the user initiating the call. 

1449 token_teams: Teams from JWT token. None = admin (no filtering), 

1450 [] = public-only, [...] = team-scoped access. 

1451 

1452 Returns: 

1453 Agent response. 

1454 

1455 Raises: 

1456 A2AAgentNotFoundError: If the agent is not found or user lacks access. 

1457 A2AAgentError: If the agent is disabled or invocation fails. 

1458 """ 

1459 # ═══════════════════════════════════════════════════════════════════════════ 

1460 # PHASE 1: Acquire a short row lock to read `enabled` + `auth_value`, 

1461 # then release the lock before performing the external HTTP call. 

1462 # This avoids TOCTOU for the critical checks while not holding DB 

1463 # connections during the potentially slow HTTP request. 

1464 # ═══════════════════════════════════════════════════════════════════════════ 

1465 

1466 # Lookup the agent id, then lock the row by id using get_for_update 

1467 agent_row = db.execute(select(DbA2AAgent.id).where(DbA2AAgent.name == agent_name)).scalar_one_or_none() 

1468 if not agent_row: 

1469 raise A2AAgentNotFoundError(f"A2A Agent not found with name: {agent_name}") 

1470 

1471 agent = get_for_update(db, DbA2AAgent, agent_row) 

1472 if not agent: 

1473 raise A2AAgentNotFoundError(f"A2A Agent not found with name: {agent_name}") 

1474 

1475 # ═══════════════════════════════════════════════════════════════════════════ 

1476 # SECURITY: Check visibility/team access WHILE ROW IS LOCKED 

1477 # Return 404 (not 403) to avoid leaking existence of private agents 

1478 # ═══════════════════════════════════════════════════════════════════════════ 

1479 if not self._check_agent_access(agent, user_email, token_teams): 

1480 raise A2AAgentNotFoundError(f"A2A Agent not found with name: {agent_name}") 

1481 

1482 if not agent.enabled: 

1483 raise A2AAgentError(f"A2A Agent '{agent_name}' is disabled") 

1484 

1485 # Extract all needed data to local variables before releasing DB connection 

1486 agent_id = agent.id 

1487 agent_endpoint_url = agent.endpoint_url 

1488 agent_type = agent.agent_type 

1489 agent_protocol_version = agent.protocol_version 

1490 agent_auth_type = agent.auth_type 

1491 agent_auth_value = agent.auth_value 

1492 agent_auth_query_params = agent.auth_query_params 

1493 

1494 # Handle query_param auth - decrypt and apply to URL 

1495 auth_query_params_decrypted: Optional[Dict[str, str]] = None 

1496 if agent_auth_type == "query_param" and agent_auth_query_params: 

1497 # First-Party 

1498 from mcpgateway.utils.url_auth import apply_query_param_auth # pylint: disable=import-outside-toplevel 

1499 

1500 auth_query_params_decrypted = {} 

1501 for param_key, encrypted_value in agent_auth_query_params.items(): 

1502 if encrypted_value: 

1503 try: 

1504 decrypted = decode_auth(encrypted_value) 

1505 auth_query_params_decrypted[param_key] = decrypted.get(param_key, "") 

1506 except Exception: 

1507 logger.debug(f"Failed to decrypt query param '{param_key}' for A2A agent invocation") 

1508 if auth_query_params_decrypted: 

1509 agent_endpoint_url = apply_query_param_auth(agent_endpoint_url, auth_query_params_decrypted) 

1510 

1511 # Decode auth_value for supported auth types (before closing session) 

1512 auth_headers = {} 

1513 if agent_auth_type in ("basic", "bearer", "authheaders") and agent_auth_value: 

1514 # Decrypt auth_value and extract headers (follows gateway_service pattern) 

1515 if isinstance(agent_auth_value, str): 

1516 try: 

1517 auth_headers = decode_auth(agent_auth_value) 

1518 except Exception as e: 

1519 raise A2AAgentError(f"Failed to decrypt authentication for agent '{agent_name}': {e}") 

1520 elif isinstance(agent_auth_value, dict): 

1521 auth_headers = {str(k): str(v) for k, v in agent_auth_value.items()} 

1522 

1523 # ═══════════════════════════════════════════════════════════════════════════ 

1524 # CRITICAL: Release DB connection back to pool BEFORE making HTTP calls 

1525 # This prevents connection pool exhaustion during slow upstream requests. 

1526 # ═══════════════════════════════════════════════════════════════════════════ 

1527 db.commit() # End read-only transaction cleanly (commit not rollback to avoid inflating rollback stats) 

1528 db.close() 

1529 

1530 start_time = datetime.now(timezone.utc) 

1531 success = False 

1532 error_message = None 

1533 response = None 

1534 

1535 # ═══════════════════════════════════════════════════════════════════════════ 

1536 # PHASE 2: Make HTTP call (no DB connection held) 

1537 # ═══════════════════════════════════════════════════════════════════════════ 

1538 

1539 # Create sanitized URL for logging (redacts auth query params) 

1540 # First-Party 

1541 from mcpgateway.utils.url_auth import sanitize_exception_message, sanitize_url_for_logging # pylint: disable=import-outside-toplevel 

1542 

1543 sanitized_endpoint_url = sanitize_url_for_logging(agent_endpoint_url, auth_query_params_decrypted) 

1544 span_attributes = { 

1545 "a2a.agent.name": agent_name, 

1546 "a2a.agent.id": str(agent_id), 

1547 "a2a.agent.url": sanitized_endpoint_url, 

1548 "a2a.agent.type": agent_type, 

1549 "a2a.interaction_type": interaction_type, 

1550 } 

1551 if is_input_capture_enabled("a2a.invoke"): 

1552 span_attributes["langfuse.observation.input"] = serialize_trace_payload(parameters or {}) 

1553 

1554 with create_span("a2a.invoke", span_attributes) as span: 

1555 try: 

1556 # Prepare the request to the A2A agent 

1557 # Format request based on agent type and endpoint 

1558 if agent_type in ["generic", "jsonrpc"] or agent_endpoint_url.endswith("/"): 

1559 # Use JSONRPC format for agents that expect it 

1560 request_data = {"jsonrpc": "2.0", "method": parameters.get("method", "message/send"), "params": parameters.get("params", parameters), "id": 1} 

1561 else: 

1562 # Use custom A2A format 

1563 request_data = {"interaction_type": interaction_type, "parameters": parameters, "protocol_version": agent_protocol_version} 

1564 

1565 # Make HTTP request to the agent endpoint using shared HTTP client 

1566 # First-Party 

1567 from mcpgateway.services.http_client_service import get_http_client # pylint: disable=import-outside-toplevel 

1568 

1569 client = await get_http_client() 

1570 headers = {"Content-Type": "application/json"} 

1571 

1572 # Add authentication if configured (using decoded auth headers) 

1573 headers.update(auth_headers) 

1574 

1575 # Add correlation ID to outbound headers for distributed tracing 

1576 correlation_id = get_correlation_id() 

1577 if correlation_id: 

1578 headers["X-Correlation-ID"] = correlation_id 

1579 

1580 # Log A2A external call start (with sanitized URL to prevent credential leakage) 

1581 call_start_time = datetime.now(timezone.utc) 

1582 structured_logger.log( 

1583 level="INFO", 

1584 message=f"A2A external call started: {agent_name}", 

1585 component="a2a_service", 

1586 user_id=user_id, 

1587 user_email=user_email, 

1588 correlation_id=correlation_id, 

1589 metadata={ 

1590 "event": "a2a_call_started", 

1591 "agent_name": agent_name, 

1592 "agent_id": agent_id, 

1593 "endpoint_url": sanitized_endpoint_url, 

1594 "interaction_type": interaction_type, 

1595 "protocol_version": agent_protocol_version, 

1596 }, 

1597 ) 

1598 

1599 http_response = await client.post(agent_endpoint_url, json=request_data, headers=headers) 

1600 call_duration_ms = (datetime.now(timezone.utc) - call_start_time).total_seconds() * 1000 

1601 

1602 if http_response.status_code == 200: 

1603 response = http_response.json() 

1604 success = True 

1605 if span and is_output_capture_enabled("a2a.invoke"): 

1606 set_span_attribute(span, "langfuse.observation.output", serialize_trace_payload(response)) 

1607 

1608 # Log successful A2A call 

1609 structured_logger.log( 

1610 level="INFO", 

1611 message=f"A2A external call completed: {agent_name}", 

1612 component="a2a_service", 

1613 user_id=user_id, 

1614 user_email=user_email, 

1615 correlation_id=correlation_id, 

1616 duration_ms=call_duration_ms, 

1617 metadata={"event": "a2a_call_completed", "agent_name": agent_name, "agent_id": agent_id, "status_code": http_response.status_code, "success": True}, 

1618 ) 

1619 else: 

1620 # Sanitize error message to prevent URL secrets from leaking in logs 

1621 raw_error = f"HTTP {http_response.status_code}: {http_response.text}" 

1622 error_message = sanitize_exception_message(raw_error, auth_query_params_decrypted) 

1623 

1624 # Log failed A2A call 

1625 structured_logger.log( 

1626 level="ERROR", 

1627 message=f"A2A external call failed: {agent_name}", 

1628 component="a2a_service", 

1629 user_id=user_id, 

1630 user_email=user_email, 

1631 correlation_id=correlation_id, 

1632 duration_ms=call_duration_ms, 

1633 error_details={"error_type": "A2AHTTPError", "error_message": error_message}, 

1634 metadata={"event": "a2a_call_failed", "agent_name": agent_name, "agent_id": agent_id, "status_code": http_response.status_code}, 

1635 ) 

1636 

1637 raise A2AAgentError(error_message) 

1638 

1639 except A2AAgentError: 

1640 # Re-raise A2AAgentError without wrapping 

1641 if span and error_message: 

1642 set_span_error(span, error_message) 

1643 raise 

1644 except Exception as e: 

1645 # Sanitize error message to prevent URL secrets from leaking in logs 

1646 error_message = sanitize_exception_message(str(e), auth_query_params_decrypted) 

1647 logger.error(f"Failed to invoke A2A agent '{agent_name}': {error_message}") 

1648 if span: 

1649 set_span_error(span, error_message) 

1650 raise A2AAgentError(f"Failed to invoke A2A agent: {error_message}") 

1651 

1652 finally: 

1653 # ═══════════════════════════════════════════════════════════════════════════ 

1654 # PHASE 3: Record metrics via buffered service (batches writes for performance) 

1655 # ═══════════════════════════════════════════════════════════════════════════ 

1656 end_time = datetime.now(timezone.utc) 

1657 response_time = (end_time - start_time).total_seconds() 

1658 

1659 try: 

1660 # First-Party 

1661 from mcpgateway.services.metrics_buffer_service import get_metrics_buffer_service # pylint: disable=import-outside-toplevel 

1662 

1663 metrics_buffer = get_metrics_buffer_service() 

1664 metrics_buffer.record_a2a_agent_metric_with_duration( 

1665 a2a_agent_id=agent_id, 

1666 response_time=response_time, 

1667 success=success, 

1668 interaction_type=interaction_type, 

1669 error_message=error_message, 

1670 ) 

1671 except Exception as metrics_error: 

1672 logger.warning(f"Failed to record A2A metrics for '{agent_name}': {metrics_error}") 

1673 

1674 # Update last interaction timestamp (quick separate write) 

1675 try: 

1676 with fresh_db_session() as ts_db: 

1677 # Reacquire short lock and re-check enabled before writing 

1678 db_agent = get_for_update(ts_db, DbA2AAgent, agent_id) 

1679 if db_agent and getattr(db_agent, "enabled", False): 

1680 db_agent.last_interaction = end_time 

1681 ts_db.commit() 

1682 except Exception as ts_error: 

1683 logger.warning(f"Failed to update last_interaction for '{agent_name}': {ts_error}") 

1684 if span: 

1685 set_span_attribute(span, "success", success) 

1686 set_span_attribute(span, "duration.ms", response_time * 1000) 

1687 

1688 return response or {"error": error_message} 

1689 

1690 async def aggregate_metrics(self, db: Session) -> A2AAgentAggregateMetrics: 

1691 """Aggregate metrics for all A2A agents. 

1692 

1693 Combines recent raw metrics (within retention period) with historical 

1694 hourly rollups for complete historical coverage. Uses in-memory caching 

1695 (10s TTL) to reduce database load under high request rates. 

1696 

1697 Args: 

1698 db: Database session. 

1699 

1700 Returns: 

1701 A2AAgentAggregateMetrics: Aggregated metrics from raw + hourly rollup tables. 

1702 """ 

1703 # Check cache first (if enabled) 

1704 # First-Party 

1705 from mcpgateway.cache.metrics_cache import is_cache_enabled, metrics_cache # pylint: disable=import-outside-toplevel 

1706 

1707 if is_cache_enabled(): 

1708 cached = metrics_cache.get("a2a") 

1709 if cached is not None and isinstance(cached, dict): 

1710 return A2AAgentAggregateMetrics(**cached) 

1711 

1712 # Get total/active agent counts from cache (avoids 2 COUNT queries per call) 

1713 counts = a2a_stats_cache.get_counts(db) 

1714 total_agents = counts["total"] 

1715 active_agents = counts["active"] 

1716 

1717 # Use combined raw + rollup query for full historical coverage 

1718 # First-Party 

1719 from mcpgateway.services.metrics_query_service import aggregate_metrics_combined # pylint: disable=import-outside-toplevel 

1720 

1721 result = aggregate_metrics_combined(db, "a2a_agent") 

1722 

1723 total_interactions = result.total_executions 

1724 successful_interactions = result.successful_executions 

1725 failed_interactions = result.failed_executions 

1726 

1727 metrics = A2AAgentAggregateMetrics( 

1728 total_agents=total_agents, 

1729 active_agents=active_agents, 

1730 total_interactions=total_interactions, 

1731 successful_interactions=successful_interactions, 

1732 failed_interactions=failed_interactions, 

1733 success_rate=(successful_interactions / total_interactions * 100) if total_interactions > 0 else 0.0, 

1734 avg_response_time=float(result.avg_response_time or 0.0), 

1735 min_response_time=float(result.min_response_time or 0.0), 

1736 max_response_time=float(result.max_response_time or 0.0), 

1737 ) 

1738 

1739 # Cache the result as dict for serialization compatibility (if enabled) 

1740 if is_cache_enabled(): 

1741 metrics_cache.set("a2a", metrics.model_dump()) 

1742 

1743 return metrics 

1744 

1745 async def reset_metrics(self, db: Session, agent_id: Optional[str] = None) -> None: 

1746 """Reset metrics for agents (raw + hourly rollups). 

1747 

1748 Args: 

1749 db: Database session. 

1750 agent_id: Optional agent ID to reset metrics for specific agent. 

1751 """ 

1752 if agent_id: 

1753 db.execute(delete(A2AAgentMetric).where(A2AAgentMetric.a2a_agent_id == agent_id)) 

1754 db.execute(delete(A2AAgentMetricsHourly).where(A2AAgentMetricsHourly.a2a_agent_id == agent_id)) 

1755 else: 

1756 db.execute(delete(A2AAgentMetric)) 

1757 db.execute(delete(A2AAgentMetricsHourly)) 

1758 db.commit() 

1759 

1760 # Invalidate metrics cache 

1761 # First-Party 

1762 from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel 

1763 

1764 metrics_cache.invalidate("a2a") 

1765 

1766 logger.info("Reset A2A agent metrics" + (f" for agent {agent_id}" if agent_id else "")) 

1767 

1768 def _prepare_a2a_agent_for_read(self, agent: DbA2AAgent) -> DbA2AAgent: 

1769 """Prepare a a2a agent object for A2AAgentRead validation. 

1770 

1771 Ensures auth_value is in the correct format (encoded string) for the schema. 

1772 

1773 Args: 

1774 agent: A2A Agent database object 

1775 

1776 Returns: 

1777 A2A Agent object with properly formatted auth_value 

1778 """ 

1779 # If auth_value is a dict, encode it to string for GatewayRead schema 

1780 if isinstance(agent.auth_value, dict): 

1781 agent.auth_value = encode_auth(agent.auth_value) 

1782 return agent 

1783 

1784 def convert_agent_to_read(self, db_agent: DbA2AAgent, include_metrics: bool = False, db: Optional[Session] = None, team_map: Optional[Dict[str, str]] = None) -> A2AAgentRead: 

1785 """Convert database model to schema. 

1786 

1787 Args: 

1788 db_agent (DbA2AAgent): Database agent model. 

1789 include_metrics (bool): Whether to include metrics in the result. Defaults to False. 

1790 Set to False for list operations to avoid N+1 query issues. 

1791 db (Optional[Session]): Database session. Only required if team name is not pre-populated 

1792 on the db_agent object and team_map is not provided. 

1793 team_map (Optional[Dict[str, str]]): Pre-fetched team_id -> team_name mapping. 

1794 If provided, avoids N+1 queries for team name lookups in list operations. 

1795 

1796 Returns: 

1797 A2AAgentRead: Agent read schema. 

1798 

1799 Raises: 

1800 A2AAgentNotFoundError: If the provided agent is not found or invalid. 

1801 

1802 """ 

1803 

1804 if not db_agent: 

1805 raise A2AAgentNotFoundError("Agent not found") 

1806 

1807 # Check if team attribute already exists (pre-populated in batch operations) 

1808 # Otherwise use pre-fetched team map if available, otherwise query individually 

1809 if not hasattr(db_agent, "team") or db_agent.team is None: 

1810 team_id = getattr(db_agent, "team_id", None) 

1811 if team_map is not None and team_id: 

1812 team_name = team_map.get(team_id) 

1813 elif db is not None: 

1814 team_name = self._get_team_name(db, team_id) 

1815 else: 

1816 team_name = None 

1817 setattr(db_agent, "team", team_name) 

1818 

1819 # Compute metrics only if requested (avoids N+1 queries in list operations) 

1820 if include_metrics: 

1821 total_executions = len(db_agent.metrics) 

1822 successful_executions = sum(1 for m in db_agent.metrics if m.is_success) 

1823 failed_executions = total_executions - successful_executions 

1824 failure_rate = (failed_executions / total_executions * 100) if total_executions > 0 else 0.0 

1825 

1826 min_response_time = max_response_time = avg_response_time = last_execution_time = None 

1827 if db_agent.metrics: 

1828 response_times = [m.response_time for m in db_agent.metrics if m.response_time is not None] 

1829 if response_times: 

1830 min_response_time = min(response_times) 

1831 max_response_time = max(response_times) 

1832 avg_response_time = sum(response_times) / len(response_times) 

1833 last_execution_time = max((m.timestamp for m in db_agent.metrics), default=None) 

1834 

1835 metrics = A2AAgentMetrics( 

1836 total_executions=total_executions, 

1837 successful_executions=successful_executions, 

1838 failed_executions=failed_executions, 

1839 failure_rate=failure_rate, 

1840 min_response_time=min_response_time, 

1841 max_response_time=max_response_time, 

1842 avg_response_time=avg_response_time, 

1843 last_execution_time=last_execution_time, 

1844 ) 

1845 else: 

1846 metrics = None 

1847 

1848 # Build dict from ORM model 

1849 agent_data = {k: getattr(db_agent, k, None) for k in A2AAgentRead.model_fields.keys()} 

1850 agent_data["metrics"] = metrics 

1851 agent_data["team"] = getattr(db_agent, "team", None) 

1852 # Include auth_query_params for the _mask_query_param_auth validator 

1853 agent_data["auth_query_params"] = getattr(db_agent, "auth_query_params", None) 

1854 

1855 # Validate using Pydantic model 

1856 validated_agent = A2AAgentRead.model_validate(agent_data) 

1857 

1858 # Return masked version (like GatewayRead) 

1859 return validated_agent.masked()