Coverage for mcpgateway / services / grpc_service.py: 98%
305 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/services/grpc_service.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: ContextForge Contributors
7gRPC Service Management
9This module implements gRPC service management for ContextForge.
10It handles gRPC service registration, reflection-based discovery, listing,
11retrieval, updates, activation toggling, and deletion.
12"""
14# Standard
15import asyncio
16from datetime import datetime, timezone
17import ipaddress
18from pathlib import Path
19from typing import Any, Dict, List, Optional, Union
21try:
22 # Third-Party
23 import grpc
24 from grpc_reflection.v1alpha import reflection_pb2, reflection_pb2_grpc
26 GRPC_AVAILABLE = True
27except ImportError:
28 GRPC_AVAILABLE = False
29 # grpc module will not be used if not available
30 grpc = None # type: ignore
31 reflection_pb2 = None # type: ignore
32 reflection_pb2_grpc = None # type: ignore
34# Third-Party
35from pydantic import ValidationError
36from sqlalchemy import and_, desc, select
37from sqlalchemy.orm import Session
39# First-Party
40from mcpgateway.db import EmailTeam
41from mcpgateway.db import GrpcService as DbGrpcService
42from mcpgateway.schemas import GrpcServiceCreate, GrpcServiceRead, GrpcServiceUpdate
43from mcpgateway.services.logging_service import LoggingService
44from mcpgateway.services.team_management_service import TeamManagementService
45from mcpgateway.utils.pagination import unified_paginate
47# Initialize logging
48logging_service = LoggingService()
49logger = logging_service.get_logger(__name__)
52def _validate_grpc_target(target: str) -> None:
53 """Validate a gRPC target address against SSRF-unsafe destinations.
55 Consults the platform SSRF settings (``ssrf_allow_localhost``,
56 ``ssrf_allow_private_networks``, ``ssrf_allowed_networks``,
57 ``ssrf_blocked_networks``, ``ssrf_blocked_hosts``) so that gRPC
58 targets follow the same rules as HTTP URLs validated by
59 ``SecurityValidator.validate_url``.
61 Args:
62 target: gRPC target string (host:port or host).
64 Raises:
65 GrpcServiceError: If the target resolves to a blocked address.
66 """
67 # First-Party
68 from mcpgateway.config import settings # pylint: disable=import-outside-toplevel
70 # Extract host (strip port)
71 host = target.rsplit(":", 1)[0].strip("[]")
72 if not host:
73 raise GrpcServiceError("Empty gRPC target address")
75 # Check blocked hostnames
76 hostname_normalized = host.lower().rstrip(".")
77 for blocked_host in settings.ssrf_blocked_hosts:
78 if hostname_normalized == blocked_host.lower().rstrip("."):
79 raise GrpcServiceError(f"gRPC target hostname '{host}' is blocked")
81 # Resolve IP and apply network-level checks
82 try:
83 addr = ipaddress.ip_address(host)
84 except ValueError:
85 # Hostname, not an IP literal — hostname check above is sufficient
86 if hostname_normalized == "localhost":
87 if not settings.ssrf_allow_localhost:
88 raise GrpcServiceError(f"gRPC target hostname '{host}' is blocked (localhost not allowed)")
89 return
91 # Always block: cloud metadata, link-local (from ssrf_blocked_networks)
92 for network_str in settings.ssrf_blocked_networks:
93 try:
94 network = ipaddress.ip_network(network_str, strict=False)
95 if addr in network:
96 raise GrpcServiceError(f"gRPC target address '{host}' is blocked (network: {network_str})")
97 except ValueError:
98 continue
100 # Loopback
101 if addr.is_loopback:
102 if not settings.ssrf_allow_localhost:
103 raise GrpcServiceError(f"gRPC target address '{host}' is blocked (loopback not allowed)")
104 return
106 # Reserved / multicast — always block
107 if addr.is_reserved or addr.is_multicast:
108 raise GrpcServiceError(f"gRPC target address '{host}' is blocked (reserved/multicast)")
110 # Private networks — consult settings
111 if addr.is_private and not addr.is_loopback:
112 if settings.ssrf_allow_private_networks:
113 return # Explicitly allowed
114 # Check per-network allowlist
115 for network_str in settings.ssrf_allowed_networks or []:
116 try:
117 network = ipaddress.ip_network(network_str, strict=False)
118 if addr in network:
119 return # Allowed by specific network allowlist
120 except ValueError:
121 continue
122 raise GrpcServiceError(f"gRPC target address '{host}' is blocked (private network not allowed)")
125def _validate_tls_path(path_str: str, label: str = "TLS path") -> Path:
126 """Validate that a TLS cert/key path is within allowed directories.
128 Args:
129 path_str: The file path to validate.
130 label: Label for error messages.
132 Returns:
133 Resolved Path object.
135 Raises:
136 GrpcServiceError: If the path escapes allowed directories.
137 """
138 resolved = Path(path_str).resolve()
139 # Allow only paths under /certs/, /etc/ssl/, /etc/pki/, or the CWD/certs dir
140 allowed_prefixes = (
141 Path("/certs").resolve(),
142 Path("/etc/ssl").resolve(),
143 Path("/etc/pki").resolve(),
144 Path.cwd().joinpath("certs").resolve(),
145 )
146 if not any(resolved.is_relative_to(prefix) for prefix in allowed_prefixes):
147 raise GrpcServiceError(f"{label} '{path_str}' is outside allowed certificate directories")
148 return resolved
151class GrpcServiceError(Exception):
152 """Base class for gRPC service-related errors."""
155class GrpcServiceNotFoundError(GrpcServiceError):
156 """Raised when a requested gRPC service is not found."""
159class GrpcServiceNameConflictError(GrpcServiceError):
160 """Raised when a gRPC service name conflicts with an existing one."""
162 def __init__(self, name: str, is_active: bool = True, service_id: Optional[str] = None):
163 """Initialize the GrpcServiceNameConflictError.
165 Args:
166 name: The conflicting gRPC service name
167 is_active: Whether the conflicting service is currently active
168 service_id: The ID of the conflicting service, if known
169 """
170 self.name = name
171 self.is_active = is_active
172 self.service_id = service_id
173 msg = f"gRPC service with name '{name}' already exists"
174 if not is_active:
175 msg += " (inactive)"
176 if service_id:
177 msg += f" (ID: {service_id})"
178 super().__init__(msg)
181class GrpcService:
182 """Service for managing gRPC services with reflection-based discovery."""
184 def __init__(self):
185 """Initialize the gRPC service manager."""
187 async def register_service(
188 self,
189 db: Session,
190 service_data: GrpcServiceCreate,
191 user_email: Optional[str] = None,
192 metadata: Optional[Dict[str, Any]] = None,
193 ) -> GrpcServiceRead:
194 """Register a new gRPC service.
196 Args:
197 db: Database session
198 service_data: gRPC service creation data
199 user_email: Email of the user creating the service
200 metadata: Additional metadata (IP, user agent, etc.)
202 Returns:
203 GrpcServiceRead: The created service
205 Raises:
206 GrpcServiceNameConflictError: If service name already exists
207 """
208 # Check for name conflicts
209 existing = db.execute(select(DbGrpcService).where(DbGrpcService.name == service_data.name)).scalar_one_or_none()
211 if existing:
212 raise GrpcServiceNameConflictError(name=service_data.name, is_active=existing.enabled, service_id=existing.id)
214 # Create service
215 db_service = DbGrpcService(
216 name=service_data.name,
217 target=service_data.target,
218 description=service_data.description,
219 reflection_enabled=service_data.reflection_enabled,
220 tls_enabled=service_data.tls_enabled,
221 tls_cert_path=service_data.tls_cert_path,
222 tls_key_path=service_data.tls_key_path,
223 grpc_metadata=service_data.grpc_metadata or {},
224 tags=service_data.tags or [],
225 team_id=service_data.team_id,
226 owner_email=user_email or service_data.owner_email,
227 visibility=service_data.visibility,
228 created_at=datetime.now(timezone.utc),
229 updated_at=datetime.now(timezone.utc),
230 )
232 # Set audit metadata if provided
233 if metadata:
234 db_service.created_by = user_email
235 db_service.created_from_ip = metadata.get("created_from_ip")
236 db_service.created_via = metadata.get("created_via")
237 db_service.created_user_agent = metadata.get("created_user_agent")
239 db.add(db_service)
240 db.commit()
241 db.refresh(db_service)
243 logger.info(f"Registered gRPC service: {db_service.name} (target: {db_service.target})")
245 # Perform initial reflection if enabled
246 if db_service.reflection_enabled:
247 try:
248 await self._perform_reflection(db, db_service)
249 except Exception as e:
250 logger.warning(f"Initial reflection failed for {db_service.name}: {e}")
252 return GrpcServiceRead.model_validate(db_service)
254 async def list_services(
255 self,
256 db: Session,
257 cursor: Optional[str] = None,
258 include_inactive: bool = False,
259 limit: Optional[int] = None,
260 page: Optional[int] = None,
261 per_page: Optional[int] = None,
262 user_email: Optional[str] = None,
263 team_id: Optional[str] = None,
264 ) -> Union[tuple[List[GrpcServiceRead], Optional[str]], Dict[str, Any]]:
265 """List gRPC services with pagination and optional filtering.
267 Args:
268 db: Database session
269 cursor: Pagination cursor for keyset pagination
270 include_inactive: Include disabled services
271 limit: Maximum number of services to return. None for default, 0 for unlimited
272 page: Page number for page-based pagination (1-indexed). Mutually exclusive with cursor
273 per_page: Items per page for page-based pagination
274 user_email: Filter by user email for team access control
275 team_id: Filter by team ID
277 Returns:
278 If page is provided: Dict with {"data": [...], "pagination": {...}, "links": {...}}
279 If cursor is provided or neither: tuple of (list of GrpcServiceRead objects, next_cursor)
280 """
281 # Build base query with ordering
282 query = select(DbGrpcService).order_by(desc(DbGrpcService.created_at), desc(DbGrpcService.id))
284 # Apply team filtering
285 if user_email and team_id:
286 team_service = TeamManagementService(db)
287 team_filter = await team_service.build_team_filter_clause(DbGrpcService, user_email, team_id) # pylint: disable=no-member
288 if team_filter is not None:
289 query = query.where(team_filter)
290 elif team_id:
291 query = query.where(DbGrpcService.team_id == team_id)
293 # Apply active filter
294 if not include_inactive:
295 query = query.where(DbGrpcService.enabled.is_(True)) # pylint: disable=singleton-comparison
297 # Use unified pagination helper - handles both page and cursor pagination
298 pag_result = await unified_paginate(
299 db=db,
300 query=query,
301 page=page,
302 per_page=per_page,
303 cursor=cursor,
304 limit=limit,
305 base_url="/admin/grpc",
306 query_params={"include_inactive": include_inactive} if include_inactive else {},
307 )
309 next_cursor = None
310 # Extract services based on pagination type
311 if page is not None:
312 # Page-based: pag_result is a dict
313 services_db = pag_result["data"]
314 else:
315 # Cursor-based: pag_result is a tuple
316 services_db, next_cursor = pag_result
318 # Fetch team names for the services
319 team_ids_set = {s.team_id for s in services_db if s.team_id}
320 team_map = {}
321 if team_ids_set:
322 teams = db.execute(select(EmailTeam.id, EmailTeam.name).where(EmailTeam.id.in_(team_ids_set), EmailTeam.is_active.is_(True))).all()
323 team_map = {team.id: team.name for team in teams}
325 db.commit() # Release transaction to avoid idle-in-transaction
327 # Convert to GrpcServiceRead
328 result = []
329 for s in services_db:
330 try:
331 s.team = team_map.get(s.team_id) if s.team_id else None
332 result.append(GrpcServiceRead.model_validate(s))
333 except (ValidationError, ValueError, KeyError, TypeError) as e:
334 logger.exception(f"Failed to convert gRPC service {getattr(s, 'id', 'unknown')} ({getattr(s, 'name', 'unknown')}): {e}")
336 # Return appropriate format based on pagination type
337 if page is not None:
338 # Page-based format
339 return {
340 "data": result,
341 "pagination": pag_result["pagination"],
342 "links": pag_result["links"],
343 }
345 # Cursor-based format (tuple)
346 return (result, next_cursor)
348 async def get_service(
349 self,
350 db: Session,
351 service_id: str,
352 user_email: Optional[str] = None,
353 ) -> GrpcServiceRead:
354 """Get a specific gRPC service by ID.
356 Args:
357 db: Database session
358 service_id: Service ID
359 user_email: Email for team access control
361 Returns:
362 The gRPC service
364 Raises:
365 GrpcServiceNotFoundError: If service not found or access denied
366 """
367 query = select(DbGrpcService).where(DbGrpcService.id == service_id)
369 # Apply team access control
370 if user_email:
371 team_service = TeamManagementService(db)
372 team_filter = await team_service.build_team_filter_clause(DbGrpcService, user_email, None) # pylint: disable=no-member
373 if team_filter is not None:
374 query = query.where(team_filter)
376 service = db.execute(query).scalar_one_or_none()
378 if not service:
379 raise GrpcServiceNotFoundError(f"gRPC service with ID '{service_id}' not found")
381 return GrpcServiceRead.model_validate(service)
383 async def update_service(
384 self,
385 db: Session,
386 service_id: str,
387 service_data: GrpcServiceUpdate,
388 user_email: Optional[str] = None,
389 metadata: Optional[Dict[str, Any]] = None,
390 ) -> GrpcServiceRead:
391 """Update an existing gRPC service.
393 Args:
394 db: Database session
395 service_id: Service ID to update
396 service_data: Update data
397 user_email: Email of user performing update
398 metadata: Audit metadata
400 Returns:
401 Updated service
403 Raises:
404 GrpcServiceNotFoundError: If service not found
405 GrpcServiceNameConflictError: If new name conflicts
406 """
407 service = db.execute(select(DbGrpcService).where(DbGrpcService.id == service_id)).scalar_one_or_none()
409 if not service:
410 raise GrpcServiceNotFoundError(f"gRPC service with ID '{service_id}' not found")
412 # Check name conflict if name is being changed
413 if service_data.name and service_data.name != service.name:
414 existing = db.execute(select(DbGrpcService).where(and_(DbGrpcService.name == service_data.name, DbGrpcService.id != service_id))).scalar_one_or_none()
416 if existing:
417 raise GrpcServiceNameConflictError(name=service_data.name, is_active=existing.enabled, service_id=existing.id)
419 # Update fields
420 update_data = service_data.model_dump(exclude_unset=True)
421 for field, value in update_data.items():
422 setattr(service, field, value)
424 service.updated_at = datetime.now(timezone.utc)
426 # Set audit metadata
427 if metadata and user_email:
428 service.modified_by = user_email
429 service.modified_from_ip = metadata.get("modified_from_ip")
430 service.modified_via = metadata.get("modified_via")
431 service.modified_user_agent = metadata.get("modified_user_agent")
433 service.version += 1
435 db.commit()
436 db.refresh(service)
438 logger.info(f"Updated gRPC service: {service.name}")
440 return GrpcServiceRead.model_validate(service)
442 async def set_service_state(
443 self,
444 db: Session,
445 service_id: str,
446 activate: bool,
447 ) -> GrpcServiceRead:
448 """Set a gRPC service's enabled status.
450 Args:
451 db: Database session
452 service_id: Service ID
453 activate: True to enable, False to disable
455 Returns:
456 Updated service
458 Raises:
459 GrpcServiceNotFoundError: If service not found
460 """
461 service = db.execute(select(DbGrpcService).where(DbGrpcService.id == service_id)).scalar_one_or_none()
463 if not service:
464 raise GrpcServiceNotFoundError(f"gRPC service with ID '{service_id}' not found")
466 service.enabled = activate
467 service.updated_at = datetime.now(timezone.utc)
469 db.commit()
470 db.refresh(service)
472 action = "activated" if activate else "deactivated"
473 logger.info(f"gRPC service {service.name} {action}")
475 return GrpcServiceRead.model_validate(service)
477 async def delete_service(
478 self,
479 db: Session,
480 service_id: str,
481 ) -> None:
482 """Delete a gRPC service.
484 Args:
485 db: Database session
486 service_id: Service ID to delete
488 Raises:
489 GrpcServiceNotFoundError: If service not found
490 """
491 service = db.execute(select(DbGrpcService).where(DbGrpcService.id == service_id)).scalar_one_or_none()
493 if not service:
494 raise GrpcServiceNotFoundError(f"gRPC service with ID '{service_id}' not found")
496 db.delete(service)
497 db.commit()
499 logger.info(f"Deleted gRPC service: {service.name}")
501 async def reflect_service(
502 self,
503 db: Session,
504 service_id: str,
505 ) -> GrpcServiceRead:
506 """Trigger reflection on a gRPC service to discover services and methods.
508 Args:
509 db: Database session
510 service_id: Service ID
512 Returns:
513 Updated service with reflection results
515 Raises:
516 GrpcServiceNotFoundError: If service not found
517 GrpcServiceError: If reflection fails
518 """
519 service = db.execute(select(DbGrpcService).where(DbGrpcService.id == service_id)).scalar_one_or_none()
521 if not service:
522 raise GrpcServiceNotFoundError(f"gRPC service with ID '{service_id}' not found")
524 try:
525 await self._perform_reflection(db, service)
526 logger.info(f"Reflection completed for {service.name}: {service.service_count} services, {service.method_count} methods")
527 except Exception as e:
528 logger.error(f"Reflection failed for {service.name}: {e}")
529 service.reachable = False
530 db.commit()
531 raise GrpcServiceError(f"Reflection failed: {str(e)}")
533 return GrpcServiceRead.model_validate(service)
535 async def get_service_methods(
536 self,
537 db: Session,
538 service_id: str,
539 ) -> List[Dict[str, Any]]:
540 """Get the list of methods for a gRPC service.
542 Args:
543 db: Database session
544 service_id: Service ID
546 Returns:
547 List of method descriptors
549 Raises:
550 GrpcServiceNotFoundError: If service not found
551 """
552 service = db.execute(select(DbGrpcService).where(DbGrpcService.id == service_id)).scalar_one_or_none()
554 if not service:
555 raise GrpcServiceNotFoundError(f"gRPC service with ID '{service_id}' not found")
557 methods = []
558 discovered = service.discovered_services or {}
560 for service_name, service_desc in discovered.items():
561 for method in service_desc.get("methods", []):
562 methods.append(
563 {
564 "service": service_name,
565 "method": method["name"],
566 "full_name": f"{service_name}.{method['name']}",
567 "input_type": method.get("input_type"),
568 "output_type": method.get("output_type"),
569 "client_streaming": method.get("client_streaming", False),
570 "server_streaming": method.get("server_streaming", False),
571 }
572 )
574 return methods
576 async def _perform_reflection(
577 self,
578 db: Session,
579 service: DbGrpcService,
580 ) -> None:
581 """Perform gRPC server reflection to discover services.
583 Args:
584 db: Database session
585 service: GrpcService model instance
587 Raises:
588 GrpcServiceError: If TLS certificate files not found
589 Exception: If reflection or connection fails
590 """
591 # Validate target address against SSRF
592 _validate_grpc_target(service.target)
594 # Create gRPC channel
595 if service.tls_enabled:
596 if service.tls_cert_path and service.tls_key_path:
597 # Validate TLS paths against traversal
598 cert_path = _validate_tls_path(service.tls_cert_path, "TLS cert path")
599 key_path = _validate_tls_path(service.tls_key_path, "TLS key path")
600 # Load TLS certificates
601 try:
602 cert = await asyncio.to_thread(cert_path.read_bytes)
603 key = await asyncio.to_thread(key_path.read_bytes)
604 credentials = grpc.ssl_channel_credentials(root_certificates=cert, private_key=key)
605 except FileNotFoundError as e:
606 raise GrpcServiceError(f"TLS certificate or key file not found: {e}")
607 else:
608 # Use default system certificates
609 credentials = grpc.ssl_channel_credentials()
611 channel = grpc.secure_channel(service.target, credentials)
612 else:
613 channel = grpc.insecure_channel(service.target)
615 try: # pylint: disable=too-many-nested-blocks
616 # Import here to avoid circular dependency
617 # Third-Party
618 from google.protobuf.descriptor_pb2 import FileDescriptorProto # pylint: disable=import-outside-toplevel,no-name-in-module
620 # Create reflection stub
621 stub = reflection_pb2_grpc.ServerReflectionStub(channel)
623 # List services
624 request = reflection_pb2.ServerReflectionRequest(list_services="") # pylint: disable=no-member
626 response = stub.ServerReflectionInfo(iter([request]))
628 service_names = []
629 for resp in response:
630 if resp.HasField("list_services_response"):
631 for svc in resp.list_services_response.service:
632 service_name = svc.name
633 # Skip reflection service itself
634 if "ServerReflection" in service_name:
635 continue
636 service_names.append(service_name)
638 # Get detailed information for each service
639 discovered_services = {}
640 service_count = 0
641 method_count = 0
643 for service_name in service_names:
644 try:
645 # Request file descriptor containing this service
646 file_request = reflection_pb2.ServerReflectionRequest(file_containing_symbol=service_name) # pylint: disable=no-member
648 file_response = stub.ServerReflectionInfo(iter([file_request]))
650 for resp in file_response:
651 if resp.HasField("file_descriptor_response"):
652 # Process file descriptors
653 for file_desc_proto_bytes in resp.file_descriptor_response.file_descriptor_proto:
654 file_desc_proto = FileDescriptorProto()
655 file_desc_proto.ParseFromString(file_desc_proto_bytes)
657 # Extract service and method information
658 for service_desc in file_desc_proto.service:
659 if service_desc.name in service_name or service_name.endswith(service_desc.name):
660 full_service_name = f"{file_desc_proto.package}.{service_desc.name}" if file_desc_proto.package else service_desc.name
662 methods = []
663 for method_desc in service_desc.method:
664 methods.append(
665 {
666 "name": method_desc.name,
667 "input_type": method_desc.input_type,
668 "output_type": method_desc.output_type,
669 "client_streaming": method_desc.client_streaming,
670 "server_streaming": method_desc.server_streaming,
671 }
672 )
673 method_count += 1
675 discovered_services[full_service_name] = {
676 "name": full_service_name,
677 "methods": methods,
678 "package": file_desc_proto.package,
679 }
680 service_count += 1
682 except Exception as detail_error:
683 logger.warning(f"Failed to get details for {service_name}: {detail_error}")
684 # Add basic info even if detailed discovery fails
685 discovered_services[service_name] = {
686 "name": service_name,
687 "methods": [],
688 }
689 service_count += 1
691 service.discovered_services = discovered_services
692 service.service_count = service_count
693 service.method_count = method_count
694 service.last_reflection = datetime.now(timezone.utc)
695 service.reachable = True
697 db.commit()
699 except Exception as e:
700 logger.error(f"Reflection error for {service.target}: {e}")
701 service.reachable = False
702 db.commit()
703 raise
705 finally:
706 channel.close()
708 async def invoke_method(
709 self,
710 db: Session,
711 service_id: str,
712 method_name: str,
713 request_data: Dict[str, Any],
714 ) -> Dict[str, Any]:
715 """Invoke a gRPC method on a registered service.
717 Args:
718 db: Database session
719 service_id: Service ID
720 method_name: Full method name (service.Method)
721 request_data: JSON request data
723 Returns:
724 JSON response data
726 Raises:
727 GrpcServiceNotFoundError: If service not found
728 GrpcServiceError: If invocation fails
729 """
730 service = db.execute(select(DbGrpcService).where(DbGrpcService.id == service_id)).scalar_one_or_none()
732 if not service:
733 raise GrpcServiceNotFoundError(f"gRPC service with ID '{service_id}' not found")
735 if not service.enabled:
736 raise GrpcServiceError(f"Service '{service.name}' is disabled")
738 # Import here to avoid circular dependency
739 # First-Party
740 from mcpgateway.translate_grpc import GrpcEndpoint # pylint: disable=import-outside-toplevel
742 # Parse method name (service.Method format)
743 if "." not in method_name:
744 raise GrpcServiceError(f"Invalid method name '{method_name}', expected 'service.Method' format")
746 parts = method_name.rsplit(".", 1)
747 service_name = ".".join(parts[:-1]) if len(parts) > 1 else parts[0]
748 method = parts[-1]
750 # Validate target address and TLS paths before connecting
751 _validate_grpc_target(service.target)
752 if service.tls_cert_path:
753 _validate_tls_path(service.tls_cert_path, "TLS cert path")
754 if service.tls_key_path:
755 _validate_tls_path(service.tls_key_path, "TLS key path")
757 # Create endpoint and invoke
758 endpoint = GrpcEndpoint(
759 target=service.target,
760 reflection_enabled=False, # Assume already discovered
761 tls_enabled=service.tls_enabled,
762 tls_cert_path=service.tls_cert_path,
763 tls_key_path=service.tls_key_path,
764 metadata=service.grpc_metadata or {},
765 )
767 try:
768 # Start connection
769 await endpoint.start()
771 # If we have stored service info, use it
772 if service.discovered_services:
773 endpoint._services = service.discovered_services # pylint: disable=protected-access
775 # Invoke method
776 response = await endpoint.invoke(service_name, method, request_data)
778 return response
780 except Exception as e:
781 logger.error(f"Failed to invoke {method_name} on {service.name}: {e}")
782 raise GrpcServiceError(f"Method invocation failed: {e}")
784 finally:
785 await endpoint.close()