Coverage for mcpgateway / wrapper.py: 100%
307 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/wrapper.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Keval Mahajan
7MCP Gateway Wrapper.
8MCP Client (stdio) <-> MCP Gateway Bridge
10This module implements a wrapper stdio bridge that facilitates
11interaction between the MCP client and the MCP gateway.
12It provides several functionalities, including listing tools,
13invoking tools, managing resources, retrieving prompts,
14and handling tool calls via the MCP gateway.
16- All JSON-RPC traffic is written to stdout.
17- All logs/diagnostics are written to stderr, ensuring clean separation.
19Environment Variables
20---------------------
21- **MCP_SERVER_URL** (or `--url`): Gateway MCP endpoint URL.
22- **MCP_AUTH** (or `--auth`): Authorization header value.
23- **MCP_TOOL_CALL_TIMEOUT** (or `--timeout`): Response timeout in seconds (default: 60).
24- **MCP_WRAPPER_LOG_LEVEL** (or `--log-level`): Logging level, or OFF to disable.
25- **CONCURRENCY**: Max concurrent tool calls (default: 10).
27Example usage:
28--------------
30Method 1: Using environment variables
31 $ export MCP_SERVER_URL='http://localhost:4444/servers/UUID/mcp'
32 $ export MCP_AUTH='Bearer <token>'
33 $ export MCP_TOOL_CALL_TIMEOUT=120
34 $ export MCP_WRAPPER_LOG_LEVEL=DEBUG
35 $ python3 -m mcpgateway.wrapper
37Method 2: Using command-line arguments
38 $ python3 -m mcpgateway.wrapper --url 'http://localhost:4444/servers/UUID/mcp' --auth 'Bearer <token>' --timeout 120 --log-level DEBUG
39"""
41# Future
42from __future__ import annotations
44# Standard
45import argparse
46import asyncio
47from contextlib import suppress
48from dataclasses import dataclass
49import errno
50import logging
51import os
52import signal
53import sys
54from typing import Any, AsyncIterator, Dict, List, Optional, Union
55from urllib.parse import urlencode
57# Third-Party
58import httpx
59import orjson
61# First-Party
62from mcpgateway.utils.retry_manager import ResilientHttpClient
64# -----------------------
65# Configuration Defaults
66# -----------------------
67DEFAULT_CONCURRENCY = int(os.environ.get("CONCURRENCY", "10"))
68DEFAULT_CONNECT_TIMEOUT = 15
69DEFAULT_RESPONSE_TIMEOUT = float(os.environ.get("MCP_TOOL_CALL_TIMEOUT", "60"))
71JSONRPC_PARSE_ERROR = -32700
72JSONRPC_INTERNAL_ERROR = -32603
73JSONRPC_SERVER_ERROR = -32000
75CONTENT_TYPE = os.getenv("FORGE_CONTENT_TYPE", "application/json")
77# Global logger
78logger = logging.getLogger("mcpgateway.wrapper")
79logger.addHandler(logging.StreamHandler(sys.stderr))
80logger.propagate = False
81logger.disabled = True # default: disabled
83# Shutdown flag
84_shutdown = asyncio.Event()
87def _mark_shutdown():
88 """Mark the shutdown flag for graceful termination.
89 This is triggered when stdin closes, stdout fails, or a signal is caught.
91 Args:
92 None
94 Examples:
95 >>> _mark_shutdown() # doctest: +ELLIPSIS
96 >>> shutting_down()
97 True
98 >>> # Reset for following doctests:
99 >>> _ = _shutdown.clear()
100 """
101 if not _shutdown.is_set():
102 _shutdown.set()
105def shutting_down() -> bool:
106 """Check whether the server is shutting down.
108 Args:
109 None
111 Returns:
112 bool: True if shutdown has been triggered, False otherwise.
114 Examples:
115 >>> shutting_down()
116 False
117 """
118 return _shutdown.is_set()
121# -----------------------
122# Utilities
123# -----------------------
124def setup_logging(level: Optional[str]) -> None:
125 """Configure logging for the wrapper.
127 Args:
128 level: Logging level (e.g. "INFO", "DEBUG"), or OFF/None to disable.
130 Examples:
131 >>> setup_logging("DEBUG")
132 >>> logger.disabled
133 False
134 >>> setup_logging("OFF")
135 >>> logger.disabled
136 True
137 """
138 if not level:
139 logger.disabled = True
140 return
142 log_level = level.strip().upper()
143 if log_level in {"OFF", "NONE", "DISABLE", "FALSE", "0"}:
144 logger.disabled = True
145 return
147 logger.setLevel(getattr(logging, log_level, logging.INFO))
148 formatter = logging.Formatter(
149 "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
150 datefmt="%Y-%m-%dT%H:%M:%S",
151 )
152 for handler in logger.handlers:
153 handler.setFormatter(formatter)
154 logger.disabled = False
157def convert_url(url: str) -> str:
158 """Normalize an MCP server URL.
160 - If it ends with `/sse`, replace with `/mcp`.
161 - If it ends with `/mcp` already, leave it.
162 - Otherwise, append `/mcp`.
164 Args:
165 url: The input server URL.
167 Returns:
168 str: Normalized MCP URL.
170 Examples:
171 >>> convert_url("http://localhost:4444/servers/uuid")
172 'http://localhost:4444/servers/uuid/mcp/'
173 >>> convert_url("http://localhost:4444/servers/uuid/sse")
174 'http://localhost:4444/servers/uuid/mcp/'
175 >>> convert_url("http://localhost:4444/servers/uuid/mcp")
176 'http://localhost:4444/servers/uuid/mcp/'
177 """
178 if url.endswith("/mcp") or url.endswith("/mcp/"):
179 if url.endswith("/mcp"):
180 return url + "/"
181 return url
182 if url.endswith("/sse"):
183 return url.replace("/sse", "/mcp/")
184 return url + "/mcp/"
187def send_to_stdout(obj: Union[dict, str, bytes]) -> None:
188 """Write JSON-serializable object to stdout.
190 Args:
191 obj: Object to serialize and write. Falls back to str() if JSON fails.
193 Notes:
194 If writing fails (e.g., broken pipe), triggers shutdown.
195 """
196 try:
197 # orjson.dumps returns bytes
198 line = orjson.dumps(obj)
199 except Exception:
200 if isinstance(obj, bytes):
201 line = obj
202 else:
203 line = str(obj).encode("utf-8")
204 try:
205 # Check if sys.stdout has buffer attribute
206 # If not (eg. some mocks), fall back to write str
207 if hasattr(sys.stdout, "buffer"):
208 sys.stdout.buffer.write(line + b"\n")
209 sys.stdout.buffer.flush()
210 else:
211 # Fallback for testing environments that mock sys.stdout but not buffer
212 sys.stdout.write(line.decode("utf-8") + "\n")
213 sys.stdout.flush()
214 except OSError as e:
215 if e.errno in (errno.EPIPE, errno.EINVAL):
216 _mark_shutdown()
217 else:
218 _mark_shutdown()
221def make_error(message: str, code: int = JSONRPC_INTERNAL_ERROR, data: Any = None) -> dict:
222 """Construct a JSON-RPC error response.
224 Args:
225 message: Error message.
226 code: JSON-RPC error code (default -32603).
227 data: Optional extra error data.
229 Returns:
230 dict: JSON-RPC error object.
232 Examples:
233 >>> make_error("Invalid input", code=-32600)
234 {'jsonrpc': '2.0', 'id': 'bridge', 'error': {'code': -32600, 'message': 'Invalid input'}}
235 >>> make_error("Oops", data={"info": 1})["error"]["data"]
236 {'info': 1}
237 """
238 err: dict[str, Any] = {
239 "jsonrpc": "2.0",
240 "id": "bridge",
241 "error": {"code": code, "message": message},
242 }
243 if data is not None:
244 err["error"]["data"] = data
245 return err
248async def stdin_reader(queue: "asyncio.Queue[Union[dict, list, str, None]]") -> None:
249 """Read lines from stdin and push parsed JSON into a queue.
251 Args:
252 queue: Target asyncio.Queue where parsed messages are enqueued.
254 Notes:
255 - On EOF, pushes None and triggers shutdown.
256 - Invalid JSON produces a JSON-RPC error object.
258 Examples:
259 >>> # Example pattern (not executed): asyncio.create_task(stdin_reader(q))
260 >>> True
261 True
262 """
263 while True:
264 # read bytes directly if possible
265 if hasattr(sys.stdin, "buffer"):
266 line = await asyncio.to_thread(sys.stdin.buffer.readline)
267 else:
268 # Fallback
269 line_str = await asyncio.to_thread(sys.stdin.readline)
270 line = line_str.encode("utf-8") if line_str else b""
272 if not line:
273 await queue.put(None)
274 _mark_shutdown()
275 break
277 line = line.strip()
278 if not line:
279 continue
280 try:
281 # orjson.loads accepts bytes
282 obj = orjson.loads(line)
283 except Exception:
284 # Decode for error message if needed
285 try:
286 line_str = line.decode("utf-8", errors="replace")
287 except Exception:
288 line_str = str(line)
289 obj = make_error("Invalid JSON from stdin", JSONRPC_PARSE_ERROR, line_str)
290 await queue.put(obj)
293# -----------------------
294# Stream Parsers
295# -----------------------
296async def ndjson_lines(resp: httpx.Response) -> AsyncIterator[bytes]:
297 """Parse newline-delimited JSON (NDJSON) from an HTTP response.
299 Args:
300 resp: httpx.Response with NDJSON content.
302 Yields:
303 bytes: Individual JSON lines as bytes.
305 Examples:
306 >>> # This function is a parser for network streams; doctest uses patterns only.
307 >>> True
308 True
309 """
310 # read bytes directly if possible
311 partial_line = b""
312 async for chunk in resp.aiter_bytes():
313 if shutting_down():
314 break
315 if not chunk:
316 continue
318 # Split chunk into lines, handling partial line from previous chunk
319 lines = (partial_line + chunk).split(b"\n")
321 # The last element is always the new partial line (might be empty if chunk ended with newline)
322 partial_line = lines.pop()
324 for line in lines:
325 if line.strip():
326 yield line.strip()
328 # Process remaining partial line
329 if partial_line.strip():
330 yield partial_line.strip()
333async def sse_events(resp: httpx.Response) -> AsyncIterator[bytes]:
334 """Parse Server-Sent Events (SSE) from an HTTP response.
336 Args:
337 resp: httpx.Response with SSE content.
339 Yields:
340 bytes: Event payload data lines (joined).
341 """
342 partial_line = b""
343 event_lines: List[bytes] = []
345 async for chunk in resp.aiter_bytes():
346 if shutting_down():
347 break
348 if not chunk:
349 continue
351 # Split chunk into lines
352 lines = (partial_line + chunk).split(b"\n")
353 partial_line = lines.pop()
355 for line in lines:
356 line = line.rstrip(b"\r")
357 if not line:
358 if event_lines:
359 yield b"\n".join(event_lines)
360 event_lines = []
361 continue
362 if line.startswith(b":"):
363 continue
365 if b":" in line:
366 field, value = line.split(b":", 1)
367 value = value.lstrip(b" ")
368 else:
369 field, value = line, b""
371 if field == b"data":
372 event_lines.append(value)
374 # Process remaining partial line if any (though standard SSE ends with \n\n)
375 if partial_line:
376 line = partial_line.rstrip(b"\r")
377 # Process the partial line same as above
378 if line and not line.startswith(b":"):
379 if b":" in line:
380 field, value = line.split(b":", 1)
381 value = value.lstrip(b" ")
382 else:
383 field, value = line, b""
384 if field == b"data":
385 event_lines.append(value)
387 # Always yield any remaining accumulated event data
388 if event_lines:
389 yield b"\n".join(event_lines)
392# -----------------------
393# Core HTTP forwarder
394# -----------------------
395async def forward_once(
396 client: ResilientHttpClient,
397 settings: "Settings",
398 payload: Union[str, Dict[str, Any], List[Any]],
399) -> None:
400 """Forward a single JSON-RPC payload to the MCP gateway and stream responses.
402 The function:
403 - Sets content negotiation headers (JSON, NDJSON, SSE)
404 - Adds Authorization header when configured
405 - Streams the gateway response and forwards every JSON object to stdout
406 (supports application/json, application/x-ndjson, and text/event-stream)
408 Args:
409 client: Resilient HTTP client used to make the request.
410 settings: Bridge configuration (URL, auth, timeouts).
411 payload: JSON-RPC request payload as str/dict/list.
412 """
413 if shutting_down():
414 return
416 headers = {
417 "Content-Type": "application/json; charset=utf-8",
418 "Accept": "application/json, application/x-ndjson, text/event-stream",
419 }
420 if settings.auth_header:
421 headers["Authorization"] = settings.auth_header
423 # Step 1: Decide content type (manual override > auto-detect)
424 content_type = getattr(settings, "content_type", None) or CONTENT_TYPE
426 if content_type == "application/x-www-form-urlencoded":
427 # Always encode as form data
428 if isinstance(payload, dict):
429 body = urlencode(payload)
430 else:
431 body = str(payload)
432 headers["Content-Type"] = "application/x-www-form-urlencoded"
434 elif content_type == "application/json":
435 # Force JSON
436 body = payload if isinstance(payload, str) else orjson.dumps(payload).decode()
437 headers["Content-Type"] = "application/json; charset=utf-8"
439 else:
440 # Auto-detect
441 if isinstance(payload, dict) and all(isinstance(v, (str, int, float, bool, type(None))) for v in payload.values()):
442 body = urlencode(payload)
443 headers["Content-Type"] = "application/x-www-form-urlencoded"
444 else:
445 body = payload if isinstance(payload, str) else orjson.dumps(payload).decode()
446 headers["Content-Type"] = "application/json; charset=utf-8"
448 body_bytes = body.encode("utf-8")
450 # Step 2: Send request and process response
451 async with client.stream("POST", settings.server_url, data=body_bytes, headers=headers) as resp:
452 ctype = (resp.headers.get("Content-Type") or "").lower()
453 status = resp.status_code
454 logger.debug("HTTP %s %s", status, ctype)
456 if shutting_down():
457 return
459 if status < 200 or status >= 300:
460 send_to_stdout(make_error(f"HTTP {status}", code=status))
461 return
463 async def _process_line(line: Union[str, bytes]):
464 """
465 Asynchronously processes a single line of text/bytes, expected to be a valid JSON.
467 If the system is shutting down, the function returns immediately.
468 Otherwise, it attempts to parse the line as JSON and sends the resulting object to stdout.
469 If parsing fails, logs a warning and sends a standardized error response to stdout.
471 Args:
472 line (Union[str, bytes]): Valid JSON object (bytes optimized).
473 """
474 if shutting_down():
475 return
476 try:
477 # orjson.loads accepts bytes or str
478 obj = orjson.loads(line)
479 send_to_stdout(obj)
480 except Exception:
481 logger.warning("Invalid JSON from server: %s", line)
482 # Ensure line is str for error message
483 line_str = line if isinstance(line, str) else str(line)
484 send_to_stdout(make_error("Invalid JSON from server", JSONRPC_PARSE_ERROR, line_str))
486 # Step 3: Handle response content types
487 if "event-stream" in ctype:
488 async for data_payload in sse_events(resp):
489 if shutting_down():
490 break
491 if not data_payload:
492 continue
493 await _process_line(data_payload)
494 return
496 if "x-ndjson" in ctype or "ndjson" in ctype:
497 async for line in ndjson_lines(resp):
498 if shutting_down():
499 break
500 await _process_line(line)
501 return
503 if "application/json" in ctype:
504 raw = await resp.aread()
505 if not shutting_down():
506 # raw is bytes
507 try:
508 send_to_stdout(orjson.loads(raw))
509 except Exception:
510 send_to_stdout(make_error("Invalid JSON response", JSONRPC_PARSE_ERROR, raw.decode("utf-8", "replace")))
511 return
513 # Fallback: try parsing as NDJSON
514 async for line in ndjson_lines(resp):
515 if shutting_down():
516 break
517 await _process_line(line)
520async def make_request(
521 client: ResilientHttpClient,
522 settings: "Settings",
523 payload: Union[str, Dict[str, Any], List[Any]],
524 *,
525 max_retries: int = 5,
526 base_delay: float = 0.25,
527) -> None:
528 """Make a gateway request with retry/backoff around a single forward attempt.
530 Args:
531 client: Resilient HTTP client used to make the request.
532 settings: Bridge configuration (URL, auth, timeouts).
533 payload: JSON-RPC request payload as str/dict/list.
534 max_retries: Maximum retry attempts upon exceptions (default 5).
535 base_delay: Base delay in seconds for exponential backoff (default 0.25).
536 """
537 attempt = 0
538 while not shutting_down():
539 try:
540 await forward_once(client, settings, payload)
541 return
542 except Exception as e:
543 if shutting_down():
544 return
545 logger.warning("Network or unexpected error in forward_once: %s", e)
546 attempt += 1
547 if attempt > max_retries:
548 send_to_stdout(make_error("max retries exceeded", JSONRPC_SERVER_ERROR))
549 return
550 delay = min(base_delay * (2 ** (attempt - 1)), 8.0)
551 await asyncio.sleep(delay)
554# -----------------------
555# Main loop & CLI
556# -----------------------
557@dataclass
558class Settings:
559 """Bridge configuration settings.
561 Args:
562 server_url: MCP server URL
563 auth_header: Authorization header (optional)
564 connect_timeout: HTTP connect timeout in seconds
565 response_timeout: Max response wait in seconds
566 concurrency: Max concurrent tool calls
567 log_level: Logging verbosity
569 Examples:
570 >>> s = Settings("http://x/mcp", "Bearer token", 5, 10, 2, "DEBUG")
571 >>> s.server_url
572 'http://x/mcp'
573 >>> s.concurrency
574 2
575 """
577 server_url: str
578 auth_header: Optional[str]
579 connect_timeout: float = DEFAULT_CONNECT_TIMEOUT
580 response_timeout: float = DEFAULT_RESPONSE_TIMEOUT
581 concurrency: int = DEFAULT_CONCURRENCY
582 log_level: Optional[str] = None
585async def main_async(settings: Settings) -> None:
586 """Main async loop: reads stdin JSON lines and forwards them to the gateway.
588 - Spawns a reader task that pushes parsed lines to a queue.
589 - Uses a semaphore to cap concurrent requests.
590 - Creates tasks to forward each queued payload.
591 - Gracefully shuts down on EOF or signals.
593 Args:
594 settings: Bridge configuration settings.
596 Examples:
597 >>> # Smoke-test structure only; no network or stdin in doctest.
598 >>> settings = Settings("http://x/mcp", None)
599 >>> async def _run_once():
600 ... q = asyncio.Queue()
601 ... # Immediately signal shutdown by marking the queue end:
602 ... await q.put(None)
603 ... _mark_shutdown()
604 ... # Minimal run: create then cancel tasks cleanly.
605 ... await asyncio.sleep(0)
606 >>> # Note: We avoid running main_async here due to stdin/network.
607 >>> True
608 True
609 """
610 queue: "asyncio.Queue[Union[dict, list, str, None]]" = asyncio.Queue()
611 reader_task = asyncio.create_task(stdin_reader(queue))
613 sem = asyncio.Semaphore(settings.concurrency)
615 httpx_timeout = httpx.Timeout(
616 connect=settings.connect_timeout,
617 read=settings.response_timeout,
618 write=settings.response_timeout,
619 pool=settings.response_timeout,
620 )
622 # Get SSL verify setting from global config (with fallback for standalone usage)
623 try:
624 # First-Party
625 from mcpgateway.config import settings as global_settings # pylint: disable=import-outside-toplevel
627 ssl_verify = not global_settings.skip_ssl_verify
628 except ImportError:
629 ssl_verify = True # Default to verifying SSL when config unavailable
631 client_args = {"timeout": httpx_timeout, "http2": True, "verify": ssl_verify}
632 resilient = ResilientHttpClient(
633 max_retries=5,
634 base_backoff=0.25,
635 max_delay=8.0,
636 jitter_max=0.25,
637 client_args=client_args,
638 )
640 tasks: set[asyncio.Task[None]] = set()
641 try:
642 while not shutting_down():
643 item = await queue.get()
644 if item is None:
645 break
647 async def _worker(payload=item):
648 """
649 Executes an asynchronous request with concurrency control.
651 Acquires a semaphore to limit the number of concurrent executions.
652 If the system is not shutting down, sends the given payload using `make_request`.
654 Args:
655 payload (Any): The data to be sent in the request. Defaults to `item`.
656 """
657 async with sem:
658 if not shutting_down():
659 await make_request(resilient, settings, payload)
661 t = asyncio.create_task(_worker())
662 tasks.add(t)
663 t.add_done_callback(lambda fut, s=tasks: s.discard(fut))
665 _mark_shutdown()
666 for t in list(tasks):
667 t.cancel()
668 if tasks:
669 with suppress(asyncio.CancelledError):
670 await asyncio.gather(*tasks)
671 finally:
672 reader_task.cancel()
673 with suppress(Exception):
674 await reader_task
675 with suppress(Exception):
676 await resilient.aclose()
679def parse_args() -> Settings:
680 """Parse CLI arguments and environment variables into Settings.
682 Recognized flags:
683 --url / MCP_SERVER_URL
684 --auth / MCP_AUTH
685 --timeout / MCP_TOOL_CALL_TIMEOUT
686 --log-level / MCP_WRAPPER_LOG_LEVEL
688 Returns:
689 Settings: Parsed and normalized configuration.
691 Examples:
692 >>> import sys, os
693 >>> _argv = sys.argv
694 >>> sys.argv = ["prog", "--url", "http://localhost:4444/servers/u"]
695 >>> try:
696 ... s = parse_args()
697 ... s.server_url.endswith("/mcp/")
698 ... finally:
699 ... sys.argv = _argv
700 True
701 """
702 parser = argparse.ArgumentParser(description="Stdio MCP Client <-> MCP HTTP Bridge")
703 parser.add_argument("--url", default=os.environ.get("MCP_SERVER_URL"), help="MCP server URL (env: MCP_SERVER_URL)")
704 parser.add_argument("--auth", default=os.environ.get("MCP_AUTH"), help="Authorization header value (env: MCP_AUTH)")
705 parser.add_argument("--timeout", default=os.environ.get("MCP_TOOL_CALL_TIMEOUT"), help="Response timeout in seconds")
706 parser.add_argument(
707 "--log-level",
708 default=os.environ.get("MCP_WRAPPER_LOG_LEVEL", "INFO"),
709 help="Enable logging at this level (case-insensitive, default: disabled)",
710 )
711 args = parser.parse_args()
713 if not args.url:
714 print("Error: MCP server URL must be provided via --url or MCP_SERVER_URL", file=sys.stderr)
715 sys.exit(2)
717 server_url = convert_url(args.url)
718 response_timeout = float(args.timeout) if args.timeout else DEFAULT_RESPONSE_TIMEOUT
720 return Settings(
721 server_url=server_url,
722 auth_header=args.auth,
723 connect_timeout=DEFAULT_CONNECT_TIMEOUT,
724 response_timeout=response_timeout,
725 log_level=args.log_level,
726 concurrency=DEFAULT_CONCURRENCY,
727 )
730def _install_signal_handlers(loop: asyncio.AbstractEventLoop) -> None:
731 """Install SIGINT/SIGTERM handlers that trigger graceful shutdown.
733 Args:
734 loop: The asyncio event loop to attach handlers to.
736 Examples:
737 >>> import asyncio
738 >>> loop = asyncio.new_event_loop()
739 >>> _install_signal_handlers(loop) # doctest: +ELLIPSIS
740 >>> loop.close()
741 """
742 for sig in (getattr(signal, "SIGINT", None), getattr(signal, "SIGTERM", None)):
743 if sig is None:
744 continue
745 with suppress(NotImplementedError):
746 loop.add_signal_handler(sig, _mark_shutdown)
749def main() -> None:
750 """Entry point for the MCP stdio wrapper.
752 - Parses args/env vars into Settings
753 - Configures logging
754 - Runs the async main loop with signal handling
756 Args:
757 None
758 """
759 settings = parse_args()
760 setup_logging(settings.log_level)
761 if not logger.disabled:
762 logger.info("Starting MCP stdio wrapper -> %s", settings.server_url)
764 loop = asyncio.new_event_loop()
765 asyncio.set_event_loop(loop)
766 _install_signal_handlers(loop)
768 try:
769 loop.run_until_complete(main_async(settings))
770 finally:
771 loop.run_until_complete(asyncio.sleep(0))
772 with suppress(Exception):
773 loop.close()
774 if not logger.disabled:
775 logger.info("Shutdown complete.")
778if __name__ == "__main__":
779 main()