Coverage for mcpgateway / services / grpc_service.py: 98%

305 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/services/grpc_service.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: ContextForge Contributors 

6 

7gRPC Service Management 

8 

9This module implements gRPC service management for ContextForge. 

10It handles gRPC service registration, reflection-based discovery, listing, 

11retrieval, updates, activation toggling, and deletion. 

12""" 

13 

14# Standard 

15import asyncio 

16from datetime import datetime, timezone 

17import ipaddress 

18from pathlib import Path 

19from typing import Any, Dict, List, Optional, Union 

20 

21try: 

22 # Third-Party 

23 import grpc 

24 from grpc_reflection.v1alpha import reflection_pb2, reflection_pb2_grpc 

25 

26 GRPC_AVAILABLE = True 

27except ImportError: 

28 GRPC_AVAILABLE = False 

29 # grpc module will not be used if not available 

30 grpc = None # type: ignore 

31 reflection_pb2 = None # type: ignore 

32 reflection_pb2_grpc = None # type: ignore 

33 

34# Third-Party 

35from pydantic import ValidationError 

36from sqlalchemy import and_, desc, select 

37from sqlalchemy.orm import Session 

38 

39# First-Party 

40from mcpgateway.db import EmailTeam 

41from mcpgateway.db import GrpcService as DbGrpcService 

42from mcpgateway.schemas import GrpcServiceCreate, GrpcServiceRead, GrpcServiceUpdate 

43from mcpgateway.services.logging_service import LoggingService 

44from mcpgateway.services.team_management_service import TeamManagementService 

45from mcpgateway.utils.pagination import unified_paginate 

46 

47# Initialize logging 

48logging_service = LoggingService() 

49logger = logging_service.get_logger(__name__) 

50 

51 

52def _validate_grpc_target(target: str) -> None: 

53 """Validate a gRPC target address against SSRF-unsafe destinations. 

54 

55 Consults the platform SSRF settings (``ssrf_allow_localhost``, 

56 ``ssrf_allow_private_networks``, ``ssrf_allowed_networks``, 

57 ``ssrf_blocked_networks``, ``ssrf_blocked_hosts``) so that gRPC 

58 targets follow the same rules as HTTP URLs validated by 

59 ``SecurityValidator.validate_url``. 

60 

61 Args: 

62 target: gRPC target string (host:port or host). 

63 

64 Raises: 

65 GrpcServiceError: If the target resolves to a blocked address. 

66 """ 

67 # First-Party 

68 from mcpgateway.config import settings # pylint: disable=import-outside-toplevel 

69 

70 # Extract host (strip port) 

71 host = target.rsplit(":", 1)[0].strip("[]") 

72 if not host: 

73 raise GrpcServiceError("Empty gRPC target address") 

74 

75 # Check blocked hostnames 

76 hostname_normalized = host.lower().rstrip(".") 

77 for blocked_host in settings.ssrf_blocked_hosts: 

78 if hostname_normalized == blocked_host.lower().rstrip("."): 

79 raise GrpcServiceError(f"gRPC target hostname '{host}' is blocked") 

80 

81 # Resolve IP and apply network-level checks 

82 try: 

83 addr = ipaddress.ip_address(host) 

84 except ValueError: 

85 # Hostname, not an IP literal — hostname check above is sufficient 

86 if hostname_normalized == "localhost": 

87 if not settings.ssrf_allow_localhost: 

88 raise GrpcServiceError(f"gRPC target hostname '{host}' is blocked (localhost not allowed)") 

89 return 

90 

91 # Always block: cloud metadata, link-local (from ssrf_blocked_networks) 

92 for network_str in settings.ssrf_blocked_networks: 

93 try: 

94 network = ipaddress.ip_network(network_str, strict=False) 

95 if addr in network: 

96 raise GrpcServiceError(f"gRPC target address '{host}' is blocked (network: {network_str})") 

97 except ValueError: 

98 continue 

99 

100 # Loopback 

101 if addr.is_loopback: 

102 if not settings.ssrf_allow_localhost: 

103 raise GrpcServiceError(f"gRPC target address '{host}' is blocked (loopback not allowed)") 

104 return 

105 

106 # Reserved / multicast — always block 

107 if addr.is_reserved or addr.is_multicast: 

108 raise GrpcServiceError(f"gRPC target address '{host}' is blocked (reserved/multicast)") 

109 

110 # Private networks — consult settings 

111 if addr.is_private and not addr.is_loopback: 

112 if settings.ssrf_allow_private_networks: 

113 return # Explicitly allowed 

114 # Check per-network allowlist 

115 for network_str in settings.ssrf_allowed_networks or []: 

116 try: 

117 network = ipaddress.ip_network(network_str, strict=False) 

118 if addr in network: 

119 return # Allowed by specific network allowlist 

120 except ValueError: 

121 continue 

122 raise GrpcServiceError(f"gRPC target address '{host}' is blocked (private network not allowed)") 

123 

124 

125def _validate_tls_path(path_str: str, label: str = "TLS path") -> Path: 

126 """Validate that a TLS cert/key path is within allowed directories. 

127 

128 Args: 

129 path_str: The file path to validate. 

130 label: Label for error messages. 

131 

132 Returns: 

133 Resolved Path object. 

134 

135 Raises: 

136 GrpcServiceError: If the path escapes allowed directories. 

137 """ 

138 resolved = Path(path_str).resolve() 

139 # Allow only paths under /certs/, /etc/ssl/, /etc/pki/, or the CWD/certs dir 

140 allowed_prefixes = ( 

141 Path("/certs").resolve(), 

142 Path("/etc/ssl").resolve(), 

143 Path("/etc/pki").resolve(), 

144 Path.cwd().joinpath("certs").resolve(), 

145 ) 

146 if not any(resolved.is_relative_to(prefix) for prefix in allowed_prefixes): 

147 raise GrpcServiceError(f"{label} '{path_str}' is outside allowed certificate directories") 

148 return resolved 

149 

150 

151class GrpcServiceError(Exception): 

152 """Base class for gRPC service-related errors.""" 

153 

154 

155class GrpcServiceNotFoundError(GrpcServiceError): 

156 """Raised when a requested gRPC service is not found.""" 

157 

158 

159class GrpcServiceNameConflictError(GrpcServiceError): 

160 """Raised when a gRPC service name conflicts with an existing one.""" 

161 

162 def __init__(self, name: str, is_active: bool = True, service_id: Optional[str] = None): 

163 """Initialize the GrpcServiceNameConflictError. 

164 

165 Args: 

166 name: The conflicting gRPC service name 

167 is_active: Whether the conflicting service is currently active 

168 service_id: The ID of the conflicting service, if known 

169 """ 

170 self.name = name 

171 self.is_active = is_active 

172 self.service_id = service_id 

173 msg = f"gRPC service with name '{name}' already exists" 

174 if not is_active: 

175 msg += " (inactive)" 

176 if service_id: 

177 msg += f" (ID: {service_id})" 

178 super().__init__(msg) 

179 

180 

181class GrpcService: 

182 """Service for managing gRPC services with reflection-based discovery.""" 

183 

184 def __init__(self): 

185 """Initialize the gRPC service manager.""" 

186 

187 async def register_service( 

188 self, 

189 db: Session, 

190 service_data: GrpcServiceCreate, 

191 user_email: Optional[str] = None, 

192 metadata: Optional[Dict[str, Any]] = None, 

193 ) -> GrpcServiceRead: 

194 """Register a new gRPC service. 

195 

196 Args: 

197 db: Database session 

198 service_data: gRPC service creation data 

199 user_email: Email of the user creating the service 

200 metadata: Additional metadata (IP, user agent, etc.) 

201 

202 Returns: 

203 GrpcServiceRead: The created service 

204 

205 Raises: 

206 GrpcServiceNameConflictError: If service name already exists 

207 """ 

208 # Check for name conflicts 

209 existing = db.execute(select(DbGrpcService).where(DbGrpcService.name == service_data.name)).scalar_one_or_none() 

210 

211 if existing: 

212 raise GrpcServiceNameConflictError(name=service_data.name, is_active=existing.enabled, service_id=existing.id) 

213 

214 # Create service 

215 db_service = DbGrpcService( 

216 name=service_data.name, 

217 target=service_data.target, 

218 description=service_data.description, 

219 reflection_enabled=service_data.reflection_enabled, 

220 tls_enabled=service_data.tls_enabled, 

221 tls_cert_path=service_data.tls_cert_path, 

222 tls_key_path=service_data.tls_key_path, 

223 grpc_metadata=service_data.grpc_metadata or {}, 

224 tags=service_data.tags or [], 

225 team_id=service_data.team_id, 

226 owner_email=user_email or service_data.owner_email, 

227 visibility=service_data.visibility, 

228 created_at=datetime.now(timezone.utc), 

229 updated_at=datetime.now(timezone.utc), 

230 ) 

231 

232 # Set audit metadata if provided 

233 if metadata: 

234 db_service.created_by = user_email 

235 db_service.created_from_ip = metadata.get("created_from_ip") 

236 db_service.created_via = metadata.get("created_via") 

237 db_service.created_user_agent = metadata.get("created_user_agent") 

238 

239 db.add(db_service) 

240 db.commit() 

241 db.refresh(db_service) 

242 

243 logger.info(f"Registered gRPC service: {db_service.name} (target: {db_service.target})") 

244 

245 # Perform initial reflection if enabled 

246 if db_service.reflection_enabled: 

247 try: 

248 await self._perform_reflection(db, db_service) 

249 except Exception as e: 

250 logger.warning(f"Initial reflection failed for {db_service.name}: {e}") 

251 

252 return GrpcServiceRead.model_validate(db_service) 

253 

254 async def list_services( 

255 self, 

256 db: Session, 

257 cursor: Optional[str] = None, 

258 include_inactive: bool = False, 

259 limit: Optional[int] = None, 

260 page: Optional[int] = None, 

261 per_page: Optional[int] = None, 

262 user_email: Optional[str] = None, 

263 team_id: Optional[str] = None, 

264 ) -> Union[tuple[List[GrpcServiceRead], Optional[str]], Dict[str, Any]]: 

265 """List gRPC services with pagination and optional filtering. 

266 

267 Args: 

268 db: Database session 

269 cursor: Pagination cursor for keyset pagination 

270 include_inactive: Include disabled services 

271 limit: Maximum number of services to return. None for default, 0 for unlimited 

272 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor 

273 per_page: Items per page for page-based pagination 

274 user_email: Filter by user email for team access control 

275 team_id: Filter by team ID 

276 

277 Returns: 

278 If page is provided: Dict with {"data": [...], "pagination": {...}, "links": {...}} 

279 If cursor is provided or neither: tuple of (list of GrpcServiceRead objects, next_cursor) 

280 """ 

281 # Build base query with ordering 

282 query = select(DbGrpcService).order_by(desc(DbGrpcService.created_at), desc(DbGrpcService.id)) 

283 

284 # Apply team filtering 

285 if user_email and team_id: 

286 team_service = TeamManagementService(db) 

287 team_filter = await team_service.build_team_filter_clause(DbGrpcService, user_email, team_id) # pylint: disable=no-member 

288 if team_filter is not None: 

289 query = query.where(team_filter) 

290 elif team_id: 

291 query = query.where(DbGrpcService.team_id == team_id) 

292 

293 # Apply active filter 

294 if not include_inactive: 

295 query = query.where(DbGrpcService.enabled.is_(True)) # pylint: disable=singleton-comparison 

296 

297 # Use unified pagination helper - handles both page and cursor pagination 

298 pag_result = await unified_paginate( 

299 db=db, 

300 query=query, 

301 page=page, 

302 per_page=per_page, 

303 cursor=cursor, 

304 limit=limit, 

305 base_url="/admin/grpc", 

306 query_params={"include_inactive": include_inactive} if include_inactive else {}, 

307 ) 

308 

309 next_cursor = None 

310 # Extract services based on pagination type 

311 if page is not None: 

312 # Page-based: pag_result is a dict 

313 services_db = pag_result["data"] 

314 else: 

315 # Cursor-based: pag_result is a tuple 

316 services_db, next_cursor = pag_result 

317 

318 # Fetch team names for the services 

319 team_ids_set = {s.team_id for s in services_db if s.team_id} 

320 team_map = {} 

321 if team_ids_set: 

322 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(team_ids_set), EmailTeam.is_active.is_(True))).all() 

323 team_map = {team.id: team.name for team in teams} 

324 

325 db.commit() # Release transaction to avoid idle-in-transaction 

326 

327 # Convert to GrpcServiceRead 

328 result = [] 

329 for s in services_db: 

330 try: 

331 s.team = team_map.get(s.team_id) if s.team_id else None 

332 result.append(GrpcServiceRead.model_validate(s)) 

333 except (ValidationError, ValueError, KeyError, TypeError) as e: 

334 logger.exception(f"Failed to convert gRPC service {getattr(s, 'id', 'unknown')} ({getattr(s, 'name', 'unknown')}): {e}") 

335 

336 # Return appropriate format based on pagination type 

337 if page is not None: 

338 # Page-based format 

339 return { 

340 "data": result, 

341 "pagination": pag_result["pagination"], 

342 "links": pag_result["links"], 

343 } 

344 

345 # Cursor-based format (tuple) 

346 return (result, next_cursor) 

347 

348 async def get_service( 

349 self, 

350 db: Session, 

351 service_id: str, 

352 user_email: Optional[str] = None, 

353 ) -> GrpcServiceRead: 

354 """Get a specific gRPC service by ID. 

355 

356 Args: 

357 db: Database session 

358 service_id: Service ID 

359 user_email: Email for team access control 

360 

361 Returns: 

362 The gRPC service 

363 

364 Raises: 

365 GrpcServiceNotFoundError: If service not found or access denied 

366 """ 

367 query = select(DbGrpcService).where(DbGrpcService.id == service_id) 

368 

369 # Apply team access control 

370 if user_email: 

371 team_service = TeamManagementService(db) 

372 team_filter = await team_service.build_team_filter_clause(DbGrpcService, user_email, None) # pylint: disable=no-member 

373 if team_filter is not None: 

374 query = query.where(team_filter) 

375 

376 service = db.execute(query).scalar_one_or_none() 

377 

378 if not service: 

379 raise GrpcServiceNotFoundError(f"gRPC service with ID '{service_id}' not found") 

380 

381 return GrpcServiceRead.model_validate(service) 

382 

383 async def update_service( 

384 self, 

385 db: Session, 

386 service_id: str, 

387 service_data: GrpcServiceUpdate, 

388 user_email: Optional[str] = None, 

389 metadata: Optional[Dict[str, Any]] = None, 

390 ) -> GrpcServiceRead: 

391 """Update an existing gRPC service. 

392 

393 Args: 

394 db: Database session 

395 service_id: Service ID to update 

396 service_data: Update data 

397 user_email: Email of user performing update 

398 metadata: Audit metadata 

399 

400 Returns: 

401 Updated service 

402 

403 Raises: 

404 GrpcServiceNotFoundError: If service not found 

405 GrpcServiceNameConflictError: If new name conflicts 

406 """ 

407 service = db.execute(select(DbGrpcService).where(DbGrpcService.id == service_id)).scalar_one_or_none() 

408 

409 if not service: 

410 raise GrpcServiceNotFoundError(f"gRPC service with ID '{service_id}' not found") 

411 

412 # Check name conflict if name is being changed 

413 if service_data.name and service_data.name != service.name: 

414 existing = db.execute(select(DbGrpcService).where(and_(DbGrpcService.name == service_data.name, DbGrpcService.id != service_id))).scalar_one_or_none() 

415 

416 if existing: 

417 raise GrpcServiceNameConflictError(name=service_data.name, is_active=existing.enabled, service_id=existing.id) 

418 

419 # Update fields 

420 update_data = service_data.model_dump(exclude_unset=True) 

421 for field, value in update_data.items(): 

422 setattr(service, field, value) 

423 

424 service.updated_at = datetime.now(timezone.utc) 

425 

426 # Set audit metadata 

427 if metadata and user_email: 

428 service.modified_by = user_email 

429 service.modified_from_ip = metadata.get("modified_from_ip") 

430 service.modified_via = metadata.get("modified_via") 

431 service.modified_user_agent = metadata.get("modified_user_agent") 

432 

433 service.version += 1 

434 

435 db.commit() 

436 db.refresh(service) 

437 

438 logger.info(f"Updated gRPC service: {service.name}") 

439 

440 return GrpcServiceRead.model_validate(service) 

441 

442 async def set_service_state( 

443 self, 

444 db: Session, 

445 service_id: str, 

446 activate: bool, 

447 ) -> GrpcServiceRead: 

448 """Set a gRPC service's enabled status. 

449 

450 Args: 

451 db: Database session 

452 service_id: Service ID 

453 activate: True to enable, False to disable 

454 

455 Returns: 

456 Updated service 

457 

458 Raises: 

459 GrpcServiceNotFoundError: If service not found 

460 """ 

461 service = db.execute(select(DbGrpcService).where(DbGrpcService.id == service_id)).scalar_one_or_none() 

462 

463 if not service: 

464 raise GrpcServiceNotFoundError(f"gRPC service with ID '{service_id}' not found") 

465 

466 service.enabled = activate 

467 service.updated_at = datetime.now(timezone.utc) 

468 

469 db.commit() 

470 db.refresh(service) 

471 

472 action = "activated" if activate else "deactivated" 

473 logger.info(f"gRPC service {service.name} {action}") 

474 

475 return GrpcServiceRead.model_validate(service) 

476 

477 async def delete_service( 

478 self, 

479 db: Session, 

480 service_id: str, 

481 ) -> None: 

482 """Delete a gRPC service. 

483 

484 Args: 

485 db: Database session 

486 service_id: Service ID to delete 

487 

488 Raises: 

489 GrpcServiceNotFoundError: If service not found 

490 """ 

491 service = db.execute(select(DbGrpcService).where(DbGrpcService.id == service_id)).scalar_one_or_none() 

492 

493 if not service: 

494 raise GrpcServiceNotFoundError(f"gRPC service with ID '{service_id}' not found") 

495 

496 db.delete(service) 

497 db.commit() 

498 

499 logger.info(f"Deleted gRPC service: {service.name}") 

500 

501 async def reflect_service( 

502 self, 

503 db: Session, 

504 service_id: str, 

505 ) -> GrpcServiceRead: 

506 """Trigger reflection on a gRPC service to discover services and methods. 

507 

508 Args: 

509 db: Database session 

510 service_id: Service ID 

511 

512 Returns: 

513 Updated service with reflection results 

514 

515 Raises: 

516 GrpcServiceNotFoundError: If service not found 

517 GrpcServiceError: If reflection fails 

518 """ 

519 service = db.execute(select(DbGrpcService).where(DbGrpcService.id == service_id)).scalar_one_or_none() 

520 

521 if not service: 

522 raise GrpcServiceNotFoundError(f"gRPC service with ID '{service_id}' not found") 

523 

524 try: 

525 await self._perform_reflection(db, service) 

526 logger.info(f"Reflection completed for {service.name}: {service.service_count} services, {service.method_count} methods") 

527 except Exception as e: 

528 logger.error(f"Reflection failed for {service.name}: {e}") 

529 service.reachable = False 

530 db.commit() 

531 raise GrpcServiceError(f"Reflection failed: {str(e)}") 

532 

533 return GrpcServiceRead.model_validate(service) 

534 

535 async def get_service_methods( 

536 self, 

537 db: Session, 

538 service_id: str, 

539 ) -> List[Dict[str, Any]]: 

540 """Get the list of methods for a gRPC service. 

541 

542 Args: 

543 db: Database session 

544 service_id: Service ID 

545 

546 Returns: 

547 List of method descriptors 

548 

549 Raises: 

550 GrpcServiceNotFoundError: If service not found 

551 """ 

552 service = db.execute(select(DbGrpcService).where(DbGrpcService.id == service_id)).scalar_one_or_none() 

553 

554 if not service: 

555 raise GrpcServiceNotFoundError(f"gRPC service with ID '{service_id}' not found") 

556 

557 methods = [] 

558 discovered = service.discovered_services or {} 

559 

560 for service_name, service_desc in discovered.items(): 

561 for method in service_desc.get("methods", []): 

562 methods.append( 

563 { 

564 "service": service_name, 

565 "method": method["name"], 

566 "full_name": f"{service_name}.{method['name']}", 

567 "input_type": method.get("input_type"), 

568 "output_type": method.get("output_type"), 

569 "client_streaming": method.get("client_streaming", False), 

570 "server_streaming": method.get("server_streaming", False), 

571 } 

572 ) 

573 

574 return methods 

575 

576 async def _perform_reflection( 

577 self, 

578 db: Session, 

579 service: DbGrpcService, 

580 ) -> None: 

581 """Perform gRPC server reflection to discover services. 

582 

583 Args: 

584 db: Database session 

585 service: GrpcService model instance 

586 

587 Raises: 

588 GrpcServiceError: If TLS certificate files not found 

589 Exception: If reflection or connection fails 

590 """ 

591 # Validate target address against SSRF 

592 _validate_grpc_target(service.target) 

593 

594 # Create gRPC channel 

595 if service.tls_enabled: 

596 if service.tls_cert_path and service.tls_key_path: 

597 # Validate TLS paths against traversal 

598 cert_path = _validate_tls_path(service.tls_cert_path, "TLS cert path") 

599 key_path = _validate_tls_path(service.tls_key_path, "TLS key path") 

600 # Load TLS certificates 

601 try: 

602 cert = await asyncio.to_thread(cert_path.read_bytes) 

603 key = await asyncio.to_thread(key_path.read_bytes) 

604 credentials = grpc.ssl_channel_credentials(root_certificates=cert, private_key=key) 

605 except FileNotFoundError as e: 

606 raise GrpcServiceError(f"TLS certificate or key file not found: {e}") 

607 else: 

608 # Use default system certificates 

609 credentials = grpc.ssl_channel_credentials() 

610 

611 channel = grpc.secure_channel(service.target, credentials) 

612 else: 

613 channel = grpc.insecure_channel(service.target) 

614 

615 try: # pylint: disable=too-many-nested-blocks 

616 # Import here to avoid circular dependency 

617 # Third-Party 

618 from google.protobuf.descriptor_pb2 import FileDescriptorProto # pylint: disable=import-outside-toplevel,no-name-in-module 

619 

620 # Create reflection stub 

621 stub = reflection_pb2_grpc.ServerReflectionStub(channel) 

622 

623 # List services 

624 request = reflection_pb2.ServerReflectionRequest(list_services="") # pylint: disable=no-member 

625 

626 response = stub.ServerReflectionInfo(iter([request])) 

627 

628 service_names = [] 

629 for resp in response: 

630 if resp.HasField("list_services_response"): 

631 for svc in resp.list_services_response.service: 

632 service_name = svc.name 

633 # Skip reflection service itself 

634 if "ServerReflection" in service_name: 

635 continue 

636 service_names.append(service_name) 

637 

638 # Get detailed information for each service 

639 discovered_services = {} 

640 service_count = 0 

641 method_count = 0 

642 

643 for service_name in service_names: 

644 try: 

645 # Request file descriptor containing this service 

646 file_request = reflection_pb2.ServerReflectionRequest(file_containing_symbol=service_name) # pylint: disable=no-member 

647 

648 file_response = stub.ServerReflectionInfo(iter([file_request])) 

649 

650 for resp in file_response: 

651 if resp.HasField("file_descriptor_response"): 

652 # Process file descriptors 

653 for file_desc_proto_bytes in resp.file_descriptor_response.file_descriptor_proto: 

654 file_desc_proto = FileDescriptorProto() 

655 file_desc_proto.ParseFromString(file_desc_proto_bytes) 

656 

657 # Extract service and method information 

658 for service_desc in file_desc_proto.service: 

659 if service_desc.name in service_name or service_name.endswith(service_desc.name): 

660 full_service_name = f"{file_desc_proto.package}.{service_desc.name}" if file_desc_proto.package else service_desc.name 

661 

662 methods = [] 

663 for method_desc in service_desc.method: 

664 methods.append( 

665 { 

666 "name": method_desc.name, 

667 "input_type": method_desc.input_type, 

668 "output_type": method_desc.output_type, 

669 "client_streaming": method_desc.client_streaming, 

670 "server_streaming": method_desc.server_streaming, 

671 } 

672 ) 

673 method_count += 1 

674 

675 discovered_services[full_service_name] = { 

676 "name": full_service_name, 

677 "methods": methods, 

678 "package": file_desc_proto.package, 

679 } 

680 service_count += 1 

681 

682 except Exception as detail_error: 

683 logger.warning(f"Failed to get details for {service_name}: {detail_error}") 

684 # Add basic info even if detailed discovery fails 

685 discovered_services[service_name] = { 

686 "name": service_name, 

687 "methods": [], 

688 } 

689 service_count += 1 

690 

691 service.discovered_services = discovered_services 

692 service.service_count = service_count 

693 service.method_count = method_count 

694 service.last_reflection = datetime.now(timezone.utc) 

695 service.reachable = True 

696 

697 db.commit() 

698 

699 except Exception as e: 

700 logger.error(f"Reflection error for {service.target}: {e}") 

701 service.reachable = False 

702 db.commit() 

703 raise 

704 

705 finally: 

706 channel.close() 

707 

708 async def invoke_method( 

709 self, 

710 db: Session, 

711 service_id: str, 

712 method_name: str, 

713 request_data: Dict[str, Any], 

714 ) -> Dict[str, Any]: 

715 """Invoke a gRPC method on a registered service. 

716 

717 Args: 

718 db: Database session 

719 service_id: Service ID 

720 method_name: Full method name (service.Method) 

721 request_data: JSON request data 

722 

723 Returns: 

724 JSON response data 

725 

726 Raises: 

727 GrpcServiceNotFoundError: If service not found 

728 GrpcServiceError: If invocation fails 

729 """ 

730 service = db.execute(select(DbGrpcService).where(DbGrpcService.id == service_id)).scalar_one_or_none() 

731 

732 if not service: 

733 raise GrpcServiceNotFoundError(f"gRPC service with ID '{service_id}' not found") 

734 

735 if not service.enabled: 

736 raise GrpcServiceError(f"Service '{service.name}' is disabled") 

737 

738 # Import here to avoid circular dependency 

739 # First-Party 

740 from mcpgateway.translate_grpc import GrpcEndpoint # pylint: disable=import-outside-toplevel 

741 

742 # Parse method name (service.Method format) 

743 if "." not in method_name: 

744 raise GrpcServiceError(f"Invalid method name '{method_name}', expected 'service.Method' format") 

745 

746 parts = method_name.rsplit(".", 1) 

747 service_name = ".".join(parts[:-1]) if len(parts) > 1 else parts[0] 

748 method = parts[-1] 

749 

750 # Validate target address and TLS paths before connecting 

751 _validate_grpc_target(service.target) 

752 if service.tls_cert_path: 

753 _validate_tls_path(service.tls_cert_path, "TLS cert path") 

754 if service.tls_key_path: 

755 _validate_tls_path(service.tls_key_path, "TLS key path") 

756 

757 # Create endpoint and invoke 

758 endpoint = GrpcEndpoint( 

759 target=service.target, 

760 reflection_enabled=False, # Assume already discovered 

761 tls_enabled=service.tls_enabled, 

762 tls_cert_path=service.tls_cert_path, 

763 tls_key_path=service.tls_key_path, 

764 metadata=service.grpc_metadata or {}, 

765 ) 

766 

767 try: 

768 # Start connection 

769 await endpoint.start() 

770 

771 # If we have stored service info, use it 

772 if service.discovered_services: 

773 endpoint._services = service.discovered_services # pylint: disable=protected-access 

774 

775 # Invoke method 

776 response = await endpoint.invoke(service_name, method, request_data) 

777 

778 return response 

779 

780 except Exception as e: 

781 logger.error(f"Failed to invoke {method_name} on {service.name}: {e}") 

782 raise GrpcServiceError(f"Method invocation failed: {e}") 

783 

784 finally: 

785 await endpoint.close()