Coverage for mcpgateway / wrapper.py: 100%
305 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/wrapper.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Keval Mahajan
7ContextForge Wrapper.
8MCP Client (stdio) <-> ContextForge Bridge
10This module implements a wrapper stdio bridge that facilitates
11interaction between the MCP client and the MCP gateway.
12It provides several functionalities, including listing tools,
13invoking tools, managing resources, retrieving prompts,
14and handling tool calls via the MCP gateway.
16- All JSON-RPC traffic is written to stdout.
17- All logs/diagnostics are written to stderr, ensuring clean separation.
19Environment Variables
20---------------------
21- **MCP_SERVER_URL** (or `--url`): Gateway MCP endpoint URL.
22- **MCP_AUTH** (or `--auth`): Authorization header value.
23- **MCP_TOOL_CALL_TIMEOUT** (or `--timeout`): Response timeout in seconds (default: 60).
24- **MCP_WRAPPER_LOG_LEVEL** (or `--log-level`): Logging level, or OFF to disable.
25- **CONCURRENCY**: Max concurrent tool calls (default: 10).
27Example usage:
28--------------
30Method 1: Using environment variables
31 $ export MCP_SERVER_URL='http://localhost:4444/servers/UUID/mcp'
32 $ export MCP_AUTH='Bearer <token>'
33 $ export MCP_TOOL_CALL_TIMEOUT=120
34 $ export MCP_WRAPPER_LOG_LEVEL=DEBUG
35 $ python3 -m mcpgateway.wrapper
37Method 2: Using command-line arguments
38 $ python3 -m mcpgateway.wrapper --url 'http://localhost:4444/servers/UUID/mcp' --auth 'Bearer <token>' --timeout 120 --log-level DEBUG
39"""
41# Future
42from __future__ import annotations
44# Standard
45import argparse
46import asyncio
47from contextlib import suppress
48from dataclasses import dataclass
49import logging
50import os
51import signal
52import sys
53from typing import Any, AsyncIterator, Dict, List, Optional, Union
54from urllib.parse import urlencode
56# Third-Party
57import httpx
58import orjson
60# First-Party
61from mcpgateway.utils.retry_manager import ResilientHttpClient
63# -----------------------
64# Configuration Defaults
65# -----------------------
66DEFAULT_CONCURRENCY = int(os.environ.get("CONCURRENCY", "10"))
67DEFAULT_CONNECT_TIMEOUT = 15
68DEFAULT_RESPONSE_TIMEOUT = float(os.environ.get("MCP_TOOL_CALL_TIMEOUT", "60"))
70JSONRPC_PARSE_ERROR = -32700
71JSONRPC_INTERNAL_ERROR = -32603
72JSONRPC_SERVER_ERROR = -32000
74CONTENT_TYPE = os.getenv("FORGE_CONTENT_TYPE", "application/json")
76# Global logger
77logger = logging.getLogger("mcpgateway.wrapper")
78logger.addHandler(logging.StreamHandler(sys.stderr))
79logger.propagate = False
80logger.disabled = True # default: disabled
82# Shutdown flag
83_shutdown = asyncio.Event()
86def _mark_shutdown():
87 """Mark the shutdown flag for graceful termination.
88 This is triggered when stdin closes, stdout fails, or a signal is caught.
90 Args:
91 None
93 Examples:
94 >>> _mark_shutdown() # doctest: +ELLIPSIS
95 >>> shutting_down()
96 True
97 >>> # Reset for following doctests:
98 >>> _ = _shutdown.clear()
99 """
100 if not _shutdown.is_set():
101 _shutdown.set()
104def shutting_down() -> bool:
105 """Check whether the server is shutting down.
107 Args:
108 None
110 Returns:
111 bool: True if shutdown has been triggered, False otherwise.
113 Examples:
114 >>> shutting_down()
115 False
116 """
117 return _shutdown.is_set()
120# -----------------------
121# Utilities
122# -----------------------
123def setup_logging(level: Optional[str]) -> None:
124 """Configure logging for the wrapper.
126 Args:
127 level: Logging level (e.g. "INFO", "DEBUG"), or OFF/None to disable.
129 Examples:
130 >>> setup_logging("DEBUG")
131 >>> logger.disabled
132 False
133 >>> setup_logging("OFF")
134 >>> logger.disabled
135 True
136 """
137 if not level:
138 logger.disabled = True
139 return
141 log_level = level.strip().upper()
142 if log_level in {"OFF", "NONE", "DISABLE", "FALSE", "0"}:
143 logger.disabled = True
144 return
146 logger.setLevel(getattr(logging, log_level, logging.INFO))
147 formatter = logging.Formatter(
148 "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
149 datefmt="%Y-%m-%dT%H:%M:%S",
150 )
151 for handler in logger.handlers:
152 handler.setFormatter(formatter)
153 logger.disabled = False
156def convert_url(url: str) -> str:
157 """Normalize an MCP server URL.
159 - If it ends with `/sse`, replace with `/mcp`.
160 - If it ends with `/mcp` already, leave it.
161 - Otherwise, append `/mcp`.
163 Args:
164 url: The input server URL.
166 Returns:
167 str: Normalized MCP URL.
169 Examples:
170 >>> convert_url("http://localhost:4444/servers/uuid")
171 'http://localhost:4444/servers/uuid/mcp/'
172 >>> convert_url("http://localhost:4444/servers/uuid/sse")
173 'http://localhost:4444/servers/uuid/mcp/'
174 >>> convert_url("http://localhost:4444/servers/uuid/mcp")
175 'http://localhost:4444/servers/uuid/mcp/'
176 """
177 if url.endswith("/mcp") or url.endswith("/mcp/"):
178 if url.endswith("/mcp"):
179 return url + "/"
180 return url
181 if url.endswith("/sse"):
182 return url.replace("/sse", "/mcp/")
183 return url + "/mcp/"
186def send_to_stdout(obj: Union[dict, str, bytes]) -> None:
187 """Write JSON-serializable object to stdout.
189 Args:
190 obj: Object to serialize and write. Falls back to str() if JSON fails.
192 Notes:
193 If writing fails (e.g., broken pipe), triggers shutdown.
194 """
195 try:
196 # orjson.dumps returns bytes
197 line = orjson.dumps(obj)
198 except Exception:
199 if isinstance(obj, bytes):
200 line = obj
201 else:
202 line = str(obj).encode("utf-8")
203 try:
204 # Check if sys.stdout has buffer attribute
205 # If not (eg. some mocks), fall back to write str
206 if hasattr(sys.stdout, "buffer"):
207 sys.stdout.buffer.write(line + b"\n")
208 sys.stdout.buffer.flush()
209 else:
210 # Fallback for testing environments that mock sys.stdout but not buffer
211 sys.stdout.write(line.decode("utf-8") + "\n")
212 sys.stdout.flush()
213 except OSError as e:
214 logger.error("OS error: %s", e)
215 _mark_shutdown()
218def make_error(message: str, code: int = JSONRPC_INTERNAL_ERROR, data: Any = None) -> dict:
219 """Construct a JSON-RPC error response.
221 Args:
222 message: Error message.
223 code: JSON-RPC error code (default -32603).
224 data: Optional extra error data.
226 Returns:
227 dict: JSON-RPC error object.
229 Examples:
230 >>> make_error("Invalid input", code=-32600)
231 {'jsonrpc': '2.0', 'id': 'bridge', 'error': {'code': -32600, 'message': 'Invalid input'}}
232 >>> make_error("Oops", data={"info": 1})["error"]["data"]
233 {'info': 1}
234 """
235 err: dict[str, Any] = {
236 "jsonrpc": "2.0",
237 "id": "bridge",
238 "error": {"code": code, "message": message},
239 }
240 if data is not None:
241 err["error"]["data"] = data
242 return err
245async def stdin_reader(queue: "asyncio.Queue[Union[dict, list, str, None]]") -> None:
246 """Read lines from stdin and push parsed JSON into a queue.
248 Args:
249 queue: Target asyncio.Queue where parsed messages are enqueued.
251 Notes:
252 - On EOF, pushes None and triggers shutdown.
253 - Invalid JSON produces a JSON-RPC error object.
255 Examples:
256 >>> # Example pattern (not executed): asyncio.create_task(stdin_reader(q))
257 >>> True
258 True
259 """
260 while True:
261 # read bytes directly if possible
262 if hasattr(sys.stdin, "buffer"):
263 line = await asyncio.to_thread(sys.stdin.buffer.readline)
264 else:
265 # Fallback
266 line_str = await asyncio.to_thread(sys.stdin.readline)
267 line = line_str.encode("utf-8") if line_str else b""
269 if not line:
270 await queue.put(None)
271 _mark_shutdown()
272 break
274 line = line.strip()
275 if not line:
276 continue
277 try:
278 # orjson.loads accepts bytes
279 obj = orjson.loads(line)
280 except Exception:
281 # Decode for error message if needed
282 try:
283 line_str = line.decode("utf-8", errors="replace")
284 except Exception:
285 line_str = str(line)
286 obj = make_error("Invalid JSON from stdin", JSONRPC_PARSE_ERROR, line_str)
287 await queue.put(obj)
290# -----------------------
291# Stream Parsers
292# -----------------------
293async def ndjson_lines(resp: httpx.Response) -> AsyncIterator[bytes]:
294 """Parse newline-delimited JSON (NDJSON) from an HTTP response.
296 Args:
297 resp: httpx.Response with NDJSON content.
299 Yields:
300 bytes: Individual JSON lines as bytes.
302 Examples:
303 >>> # This function is a parser for network streams; doctest uses patterns only.
304 >>> True
305 True
306 """
307 # read bytes directly if possible
308 partial_line = b""
309 async for chunk in resp.aiter_bytes():
310 if shutting_down():
311 break
312 if not chunk:
313 continue
315 # Split chunk into lines, handling partial line from previous chunk
316 lines = (partial_line + chunk).split(b"\n")
318 # The last element is always the new partial line (might be empty if chunk ended with newline)
319 partial_line = lines.pop()
321 for line in lines:
322 if line.strip():
323 yield line.strip()
325 # Process remaining partial line
326 if partial_line.strip():
327 yield partial_line.strip()
330async def sse_events(resp: httpx.Response) -> AsyncIterator[bytes]:
331 """Parse Server-Sent Events (SSE) from an HTTP response.
333 Args:
334 resp: httpx.Response with SSE content.
336 Yields:
337 bytes: Event payload data lines (joined).
338 """
339 partial_line = b""
340 event_lines: List[bytes] = []
342 async for chunk in resp.aiter_bytes():
343 if shutting_down():
344 break
345 if not chunk:
346 continue
348 # Split chunk into lines
349 lines = (partial_line + chunk).split(b"\n")
350 partial_line = lines.pop()
352 for line in lines:
353 line = line.rstrip(b"\r")
354 if not line:
355 if event_lines:
356 yield b"\n".join(event_lines)
357 event_lines = []
358 continue
359 if line.startswith(b":"):
360 continue
362 if b":" in line:
363 field, value = line.split(b":", 1)
364 value = value.lstrip(b" ")
365 else:
366 field, value = line, b""
368 if field == b"data":
369 event_lines.append(value)
371 # Process remaining partial line if any (though standard SSE ends with \n\n)
372 if partial_line:
373 line = partial_line.rstrip(b"\r")
374 # Process the partial line same as above
375 if line and not line.startswith(b":"):
376 if b":" in line:
377 field, value = line.split(b":", 1)
378 value = value.lstrip(b" ")
379 else:
380 field, value = line, b""
381 if field == b"data":
382 event_lines.append(value)
384 # Always yield any remaining accumulated event data
385 if event_lines:
386 yield b"\n".join(event_lines)
389# -----------------------
390# Core HTTP forwarder
391# -----------------------
392async def forward_once(
393 client: ResilientHttpClient,
394 settings: "Settings",
395 payload: Union[str, Dict[str, Any], List[Any]],
396) -> None:
397 """Forward a single JSON-RPC payload to the MCP gateway and stream responses.
399 The function:
400 - Sets content negotiation headers (JSON, NDJSON, SSE)
401 - Adds Authorization header when configured
402 - Streams the gateway response and forwards every JSON object to stdout
403 (supports application/json, application/x-ndjson, and text/event-stream)
405 Args:
406 client: Resilient HTTP client used to make the request.
407 settings: Bridge configuration (URL, auth, timeouts).
408 payload: JSON-RPC request payload as str/dict/list.
409 """
410 if shutting_down():
411 return
413 headers = {
414 "Content-Type": "application/json; charset=utf-8",
415 "Accept": "application/json, application/x-ndjson, text/event-stream",
416 }
417 if settings.auth_header:
418 headers["Authorization"] = settings.auth_header
420 # Step 1: Decide content type (manual override > auto-detect)
421 content_type = getattr(settings, "content_type", None) or CONTENT_TYPE
423 if content_type == "application/x-www-form-urlencoded":
424 # Always encode as form data
425 if isinstance(payload, dict):
426 body = urlencode(payload)
427 else:
428 body = str(payload)
429 headers["Content-Type"] = "application/x-www-form-urlencoded"
431 elif content_type == "application/json":
432 # Force JSON
433 body = payload if isinstance(payload, str) else orjson.dumps(payload).decode()
434 headers["Content-Type"] = "application/json; charset=utf-8"
436 else:
437 # Auto-detect
438 if isinstance(payload, dict) and all(isinstance(v, (str, int, float, bool, type(None))) for v in payload.values()):
439 body = urlencode(payload)
440 headers["Content-Type"] = "application/x-www-form-urlencoded"
441 else:
442 body = payload if isinstance(payload, str) else orjson.dumps(payload).decode()
443 headers["Content-Type"] = "application/json; charset=utf-8"
445 body_bytes = body.encode("utf-8")
447 # Step 2: Send request and process response
448 async with client.stream("POST", settings.server_url, data=body_bytes, headers=headers) as resp:
449 ctype = (resp.headers.get("Content-Type") or "").lower()
450 status = resp.status_code
451 logger.debug("HTTP %s %s", status, ctype)
453 if shutting_down():
454 return
456 if status < 200 or status >= 300:
457 send_to_stdout(make_error(f"HTTP {status}", code=status))
458 return
460 async def _process_line(line: Union[str, bytes]):
461 """
462 Asynchronously processes a single line of text/bytes, expected to be a valid JSON.
464 If the system is shutting down, the function returns immediately.
465 Otherwise, it attempts to parse the line as JSON and sends the resulting object to stdout.
466 If parsing fails, logs a warning and sends a standardized error response to stdout.
468 Args:
469 line (Union[str, bytes]): Valid JSON object (bytes optimized).
470 """
471 if shutting_down():
472 return
473 try:
474 # orjson.loads accepts bytes or str
475 obj = orjson.loads(line)
476 send_to_stdout(obj)
477 except Exception:
478 logger.warning("Invalid JSON from server: %s", line)
479 # Ensure line is str for error message
480 line_str = line if isinstance(line, str) else str(line)
481 send_to_stdout(make_error("Invalid JSON from server", JSONRPC_PARSE_ERROR, line_str))
483 # Step 3: Handle response content types
484 if "event-stream" in ctype:
485 async for data_payload in sse_events(resp):
486 if shutting_down():
487 break
488 if not data_payload:
489 continue
490 await _process_line(data_payload)
491 return
493 if "x-ndjson" in ctype or "ndjson" in ctype:
494 async for line in ndjson_lines(resp):
495 if shutting_down():
496 break
497 await _process_line(line)
498 return
500 if "application/json" in ctype:
501 raw = await resp.aread()
502 if not shutting_down():
503 # raw is bytes
504 try:
505 send_to_stdout(orjson.loads(raw))
506 except Exception:
507 send_to_stdout(make_error("Invalid JSON response", JSONRPC_PARSE_ERROR, raw.decode("utf-8", "replace")))
508 return
510 # Fallback: try parsing as NDJSON
511 async for line in ndjson_lines(resp):
512 if shutting_down():
513 break
514 await _process_line(line)
517async def make_request(
518 client: ResilientHttpClient,
519 settings: "Settings",
520 payload: Union[str, Dict[str, Any], List[Any]],
521 *,
522 max_retries: int = 5,
523 base_delay: float = 0.25,
524) -> None:
525 """Make a gateway request with retry/backoff around a single forward attempt.
527 Args:
528 client: Resilient HTTP client used to make the request.
529 settings: Bridge configuration (URL, auth, timeouts).
530 payload: JSON-RPC request payload as str/dict/list.
531 max_retries: Maximum retry attempts upon exceptions (default 5).
532 base_delay: Base delay in seconds for exponential backoff (default 0.25).
533 """
534 attempt = 0
535 while not shutting_down():
536 try:
537 await forward_once(client, settings, payload)
538 return
539 except Exception as e:
540 if shutting_down():
541 return
542 logger.warning("Network or unexpected error in forward_once: %s", e)
543 attempt += 1
544 if attempt > max_retries:
545 send_to_stdout(make_error("max retries exceeded", JSONRPC_SERVER_ERROR))
546 return
547 delay = min(base_delay * (2 ** (attempt - 1)), 8.0)
548 await asyncio.sleep(delay)
551# -----------------------
552# Main loop & CLI
553# -----------------------
554@dataclass
555class Settings:
556 """Bridge configuration settings.
558 Args:
559 server_url: MCP server URL
560 auth_header: Authorization header (optional)
561 connect_timeout: HTTP connect timeout in seconds
562 response_timeout: Max response wait in seconds
563 concurrency: Max concurrent tool calls
564 log_level: Logging verbosity
566 Examples:
567 >>> s = Settings("http://x/mcp", "Bearer token", 5, 10, 2, "DEBUG")
568 >>> s.server_url
569 'http://x/mcp'
570 >>> s.concurrency
571 2
572 """
574 server_url: str
575 auth_header: Optional[str]
576 connect_timeout: float = DEFAULT_CONNECT_TIMEOUT
577 response_timeout: float = DEFAULT_RESPONSE_TIMEOUT
578 concurrency: int = DEFAULT_CONCURRENCY
579 log_level: Optional[str] = None
582async def main_async(settings: Settings) -> None:
583 """Main async loop: reads stdin JSON lines and forwards them to the gateway.
585 - Spawns a reader task that pushes parsed lines to a queue.
586 - Uses a semaphore to cap concurrent requests.
587 - Creates tasks to forward each queued payload.
588 - Gracefully shuts down on EOF or signals.
590 Args:
591 settings: Bridge configuration settings.
593 Examples:
594 >>> # Smoke-test structure only; no network or stdin in doctest.
595 >>> settings = Settings("http://x/mcp", None)
596 >>> async def _run_once():
597 ... q = asyncio.Queue()
598 ... # Immediately signal shutdown by marking the queue end:
599 ... await q.put(None)
600 ... _mark_shutdown()
601 ... # Minimal run: create then cancel tasks cleanly.
602 ... await asyncio.sleep(0)
603 >>> # Note: We avoid running main_async here due to stdin/network.
604 >>> True
605 True
606 """
607 queue: "asyncio.Queue[Union[dict, list, str, None]]" = asyncio.Queue()
608 reader_task = asyncio.create_task(stdin_reader(queue))
610 sem = asyncio.Semaphore(settings.concurrency)
612 httpx_timeout = httpx.Timeout(
613 connect=settings.connect_timeout,
614 read=settings.response_timeout,
615 write=settings.response_timeout,
616 pool=settings.response_timeout,
617 )
619 # Get SSL verify setting from global config (with fallback for standalone usage)
620 try:
621 # First-Party
622 from mcpgateway.config import settings as global_settings # pylint: disable=import-outside-toplevel
624 ssl_verify = not global_settings.skip_ssl_verify
625 except ImportError:
626 ssl_verify = True # Default to verifying SSL when config unavailable
628 client_args = {"timeout": httpx_timeout, "http2": True, "verify": ssl_verify}
629 resilient = ResilientHttpClient(
630 max_retries=5,
631 base_backoff=0.25,
632 max_delay=8.0,
633 jitter_max=0.25,
634 client_args=client_args,
635 )
637 tasks: set[asyncio.Task[None]] = set()
638 try:
639 while not shutting_down():
640 item = await queue.get()
641 if item is None:
642 break
644 async def _worker(payload=item):
645 """
646 Executes an asynchronous request with concurrency control.
648 Acquires a semaphore to limit the number of concurrent executions.
649 If the system is not shutting down, sends the given payload using `make_request`.
651 Args:
652 payload (Any): The data to be sent in the request. Defaults to `item`.
653 """
654 async with sem:
655 if not shutting_down():
656 await make_request(resilient, settings, payload)
658 t = asyncio.create_task(_worker())
659 tasks.add(t)
660 t.add_done_callback(lambda fut, s=tasks: s.discard(fut))
662 _mark_shutdown()
663 for t in list(tasks):
664 t.cancel()
665 if tasks:
666 with suppress(asyncio.CancelledError):
667 await asyncio.gather(*tasks)
668 finally:
669 reader_task.cancel()
670 with suppress(Exception):
671 await reader_task
672 with suppress(Exception):
673 await resilient.aclose()
676def parse_args() -> Settings:
677 """Parse CLI arguments and environment variables into Settings.
679 Recognized flags:
680 --url / MCP_SERVER_URL
681 --auth / MCP_AUTH
682 --timeout / MCP_TOOL_CALL_TIMEOUT
683 --log-level / MCP_WRAPPER_LOG_LEVEL
685 Returns:
686 Settings: Parsed and normalized configuration.
688 Examples:
689 >>> import sys, os
690 >>> _argv = sys.argv
691 >>> sys.argv = ["prog", "--url", "http://localhost:4444/servers/u"]
692 >>> try:
693 ... s = parse_args()
694 ... s.server_url.endswith("/mcp/")
695 ... finally:
696 ... sys.argv = _argv
697 True
698 """
699 parser = argparse.ArgumentParser(description="Stdio MCP Client <-> MCP HTTP Bridge")
700 parser.add_argument("--url", default=os.environ.get("MCP_SERVER_URL"), help="MCP server URL (env: MCP_SERVER_URL)")
701 parser.add_argument("--auth", default=os.environ.get("MCP_AUTH"), help="Authorization header value (env: MCP_AUTH)")
702 parser.add_argument("--timeout", default=os.environ.get("MCP_TOOL_CALL_TIMEOUT"), help="Response timeout in seconds")
703 parser.add_argument(
704 "--log-level",
705 default=os.environ.get("MCP_WRAPPER_LOG_LEVEL", "INFO"),
706 help="Enable logging at this level (case-insensitive, default: disabled)",
707 )
708 args = parser.parse_args()
710 if not args.url:
711 print("Error: MCP server URL must be provided via --url or MCP_SERVER_URL", file=sys.stderr)
712 sys.exit(2)
714 server_url = convert_url(args.url)
715 response_timeout = float(args.timeout) if args.timeout else DEFAULT_RESPONSE_TIMEOUT
717 return Settings(
718 server_url=server_url,
719 auth_header=args.auth,
720 connect_timeout=DEFAULT_CONNECT_TIMEOUT,
721 response_timeout=response_timeout,
722 log_level=args.log_level,
723 concurrency=DEFAULT_CONCURRENCY,
724 )
727def _install_signal_handlers(loop: asyncio.AbstractEventLoop) -> None:
728 """Install SIGINT/SIGTERM handlers that trigger graceful shutdown.
730 Args:
731 loop: The asyncio event loop to attach handlers to.
733 Examples:
734 >>> import asyncio
735 >>> loop = asyncio.new_event_loop()
736 >>> _install_signal_handlers(loop) # doctest: +ELLIPSIS
737 >>> loop.close()
738 """
739 for sig in (getattr(signal, "SIGINT", None), getattr(signal, "SIGTERM", None)):
740 if sig is None:
741 continue
742 with suppress(NotImplementedError):
743 loop.add_signal_handler(sig, _mark_shutdown)
746def main() -> None:
747 """Entry point for the MCP stdio wrapper.
749 - Parses args/env vars into Settings
750 - Configures logging
751 - Runs the async main loop with signal handling
753 Args:
754 None
755 """
756 settings = parse_args()
757 setup_logging(settings.log_level)
758 if not logger.disabled:
759 logger.info("Starting MCP stdio wrapper -> %s", settings.server_url)
761 loop = asyncio.new_event_loop()
762 asyncio.set_event_loop(loop)
763 _install_signal_handlers(loop)
765 try:
766 loop.run_until_complete(main_async(settings))
767 finally:
768 loop.run_until_complete(asyncio.sleep(0))
769 with suppress(Exception):
770 loop.close()
771 if not logger.disabled:
772 logger.info("Shutdown complete.")
775if __name__ == "__main__":
776 main()