Coverage for mcpgateway / wrapper.py: 100%
307 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/wrapper.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Keval Mahajan
7ContextForge Wrapper.
8MCP Client (stdio) <-> ContextForge Bridge
10This module implements a wrapper stdio bridge that facilitates
11interaction between the MCP client and the MCP gateway.
12It provides several functionalities, including listing tools,
13invoking tools, managing resources, retrieving prompts,
14and handling tool calls via the MCP gateway.
16- All JSON-RPC traffic is written to stdout.
17- All logs/diagnostics are written to stderr, ensuring clean separation.
19Environment Variables
20---------------------
21- **MCP_SERVER_URL** (or `--url`): Gateway MCP endpoint URL.
22- **MCP_AUTH** (or `--auth`): Authorization header value.
23- **MCP_TOOL_CALL_TIMEOUT** (or `--timeout`): Response timeout in seconds (default: 60).
24- **MCP_WRAPPER_LOG_LEVEL** (or `--log-level`): Logging level, or OFF to disable.
25- **CONCURRENCY**: Max concurrent tool calls (default: 10).
27Example usage:
28--------------
30Method 1: Using environment variables
31 $ export MCP_SERVER_URL='http://localhost:4444/servers/UUID/mcp'
32 $ export MCP_AUTH='Bearer <token>'
33 $ export MCP_TOOL_CALL_TIMEOUT=120
34 $ export MCP_WRAPPER_LOG_LEVEL=DEBUG
35 $ python3 -m mcpgateway.wrapper
37Method 2: Using command-line arguments
38 $ python3 -m mcpgateway.wrapper --url 'http://localhost:4444/servers/UUID/mcp' --auth 'Bearer <token>' --timeout 120 --log-level DEBUG
39"""
41# Future
42from __future__ import annotations
44# Standard
45import argparse
46import asyncio
47from contextlib import suppress
48from dataclasses import dataclass
49import logging
50import os
51import signal
52import sys
53from typing import Any, AsyncIterator, Dict, List, Optional, Union
54from urllib.parse import urlencode
56# Third-Party
57import httpx
58import orjson
60# First-Party
61from mcpgateway.utils.retry_manager import ResilientHttpClient
63# -----------------------
64# Configuration Defaults
65# -----------------------
66DEFAULT_CONCURRENCY = int(os.environ.get("CONCURRENCY", "10"))
67DEFAULT_CONNECT_TIMEOUT = 15
68DEFAULT_RESPONSE_TIMEOUT = float(os.environ.get("MCP_TOOL_CALL_TIMEOUT", "60"))
70JSONRPC_PARSE_ERROR = -32700
71JSONRPC_INTERNAL_ERROR = -32603
72JSONRPC_SERVER_ERROR = -32000
74CONTENT_TYPE = os.getenv("FORGE_CONTENT_TYPE", "application/json")
76# Global logger
77logger = logging.getLogger("mcpgateway.wrapper")
78logger.addHandler(logging.StreamHandler(sys.stderr))
79logger.propagate = False
80logger.disabled = True # default: disabled
82# Shutdown flag
83_shutdown = asyncio.Event()
86def _mark_shutdown():
87 """Mark the shutdown flag for graceful termination.
88 This is triggered when stdin closes, stdout fails, or a signal is caught.
90 Args:
91 None
93 Examples:
94 >>> _mark_shutdown() # doctest: +ELLIPSIS
95 >>> shutting_down()
96 True
97 >>> # Reset for following doctests:
98 >>> _ = _shutdown.clear()
99 """
100 if not _shutdown.is_set():
101 _shutdown.set()
104def shutting_down() -> bool:
105 """Check whether the server is shutting down.
107 Args:
108 None
110 Returns:
111 bool: True if shutdown has been triggered, False otherwise.
113 Examples:
114 >>> shutting_down()
115 False
116 """
117 return _shutdown.is_set()
120# -----------------------
121# Utilities
122# -----------------------
123def setup_logging(level: Optional[str]) -> None:
124 """Configure logging for the wrapper.
126 Args:
127 level: Logging level (e.g. "INFO", "DEBUG"), or OFF/None to disable.
129 Examples:
130 >>> setup_logging("DEBUG")
131 >>> logger.disabled
132 False
133 >>> setup_logging("OFF")
134 >>> logger.disabled
135 True
136 """
137 if not level:
138 logger.disabled = True
139 return
141 log_level = level.strip().upper()
142 if log_level in {"OFF", "NONE", "DISABLE", "FALSE", "0"}:
143 logger.disabled = True
144 return
146 logger.setLevel(getattr(logging, log_level, logging.INFO))
147 formatter = logging.Formatter(
148 "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
149 datefmt="%Y-%m-%dT%H:%M:%S",
150 )
151 for handler in logger.handlers:
152 handler.setFormatter(formatter)
153 logger.disabled = False
156def convert_url(url: str) -> str:
157 """Normalize an MCP server URL.
159 - If it ends with `/sse`, replace with `/mcp`.
160 - If it ends with `/mcp` already, leave it.
161 - Otherwise, append `/mcp`.
163 Args:
164 url: The input server URL.
166 Returns:
167 str: Normalized MCP URL.
169 Examples:
170 >>> convert_url("http://localhost:4444/servers/uuid")
171 'http://localhost:4444/servers/uuid/mcp/'
172 >>> convert_url("http://localhost:4444/servers/uuid/sse")
173 'http://localhost:4444/servers/uuid/mcp/'
174 >>> convert_url("http://localhost:4444/servers/uuid/mcp")
175 'http://localhost:4444/servers/uuid/mcp/'
176 """
177 if url.endswith("/mcp") or url.endswith("/mcp/"):
178 if url.endswith("/mcp"):
179 return url + "/"
180 return url
181 if url.endswith("/sse"):
182 return url.replace("/sse", "/mcp/")
183 return url + "/mcp/"
186def send_to_stdout(obj: Union[dict, str, bytes]) -> None:
187 """Write JSON-serializable object to stdout.
189 Args:
190 obj: Object to serialize and write. Falls back to str() if JSON fails.
192 Notes:
193 If writing fails (e.g., broken pipe), triggers shutdown.
194 """
195 try:
196 # orjson.dumps returns bytes
197 line = orjson.dumps(obj)
198 except Exception:
199 if isinstance(obj, bytes):
200 line = obj
201 else:
202 line = str(obj).encode("utf-8")
203 try:
204 # Check if sys.stdout has buffer attribute
205 # If not (eg. some mocks), fall back to write str
206 if hasattr(sys.stdout, "buffer"):
207 sys.stdout.buffer.write(line + b"\n")
208 sys.stdout.buffer.flush()
209 else:
210 # Fallback for testing environments that mock sys.stdout but not buffer
211 sys.stdout.write(line.decode("utf-8") + "\n")
212 sys.stdout.flush()
213 except OSError as e:
214 logger.error("OS error: %s", e)
215 _mark_shutdown()
218def make_error(message: str, code: int = JSONRPC_INTERNAL_ERROR, data: Any = None) -> dict:
219 """Construct a JSON-RPC error response.
221 Args:
222 message: Error message.
223 code: JSON-RPC error code (default -32603).
224 data: Optional extra error data.
226 Returns:
227 dict: JSON-RPC error object.
229 Examples:
230 >>> make_error("Invalid input", code=-32600)
231 {'jsonrpc': '2.0', 'id': 'bridge', 'error': {'code': -32600, 'message': 'Invalid input'}}
232 >>> make_error("Oops", data={"info": 1})["error"]["data"]
233 {'info': 1}
234 """
235 err: dict[str, Any] = {
236 "jsonrpc": "2.0",
237 "id": "bridge",
238 "error": {"code": code, "message": message},
239 }
240 if data is not None:
241 err["error"]["data"] = data
242 return err
245async def stdin_reader(queue: "asyncio.Queue[Union[dict, list, str, None]]") -> None:
246 """Read lines from stdin and push parsed JSON into a queue.
248 Args:
249 queue: Target asyncio.Queue where parsed messages are enqueued.
251 Notes:
252 - On EOF, pushes None and triggers shutdown.
253 - Invalid JSON produces a JSON-RPC error object.
255 Examples:
256 >>> # Example pattern (not executed): asyncio.create_task(stdin_reader(q))
257 >>> True
258 True
259 """
260 while True:
261 # read bytes directly if possible
262 if hasattr(sys.stdin, "buffer"):
263 line = await asyncio.to_thread(sys.stdin.buffer.readline)
264 else:
265 # Fallback
266 line_str = await asyncio.to_thread(sys.stdin.readline)
267 line = line_str.encode("utf-8") if line_str else b""
269 if not line:
270 await queue.put(None)
271 _mark_shutdown()
272 break
274 line = line.strip()
275 if not line:
276 continue
277 try:
278 # orjson.loads accepts bytes
279 obj = orjson.loads(line)
280 except Exception:
281 # Decode for error message if needed
282 try:
283 line_str = line.decode("utf-8", errors="replace")
284 except Exception:
285 line_str = str(line)
286 obj = make_error("Invalid JSON from stdin", JSONRPC_PARSE_ERROR, line_str)
287 await queue.put(obj)
290# -----------------------
291# Stream Parsers
292# -----------------------
293async def ndjson_lines(resp: httpx.Response) -> AsyncIterator[bytes]:
294 """Parse newline-delimited JSON (NDJSON) from an HTTP response.
296 Args:
297 resp: httpx.Response with NDJSON content.
299 Yields:
300 bytes: Individual JSON lines as bytes.
302 Examples:
303 >>> # This function is a parser for network streams; doctest uses patterns only.
304 >>> True
305 True
306 """
307 # read bytes directly if possible
308 partial_line = b""
309 async for chunk in resp.aiter_bytes():
310 if shutting_down():
311 break
312 if not chunk:
313 continue
315 # Split chunk into lines, handling partial line from previous chunk
316 lines = (partial_line + chunk).split(b"\n")
318 # The last element is always the new partial line (might be empty if chunk ended with newline)
319 partial_line = lines.pop()
321 for line in lines:
322 if line.strip():
323 yield line.strip()
325 # Process remaining partial line
326 if partial_line.strip():
327 yield partial_line.strip()
330async def sse_events(resp: httpx.Response) -> AsyncIterator[bytes]:
331 """Parse Server-Sent Events (SSE) from an HTTP response.
333 Args:
334 resp: httpx.Response with SSE content.
336 Yields:
337 bytes: Event payload data lines (joined).
338 """
339 partial_line = b""
340 event_lines: List[bytes] = []
342 async for chunk in resp.aiter_bytes():
343 if shutting_down():
344 break
345 if not chunk:
346 continue
348 # Split chunk into lines
349 lines = (partial_line + chunk).split(b"\n")
350 partial_line = lines.pop()
352 for line in lines:
353 line = line.rstrip(b"\r")
354 if not line:
355 if event_lines:
356 yield b"\n".join(event_lines)
357 event_lines = []
358 continue
359 if line.startswith(b":"):
360 continue
362 if b":" in line:
363 field, value = line.split(b":", 1)
364 value = value.lstrip(b" ")
365 else:
366 field, value = line, b""
368 if field == b"data":
369 event_lines.append(value)
371 # Process remaining partial line if any (though standard SSE ends with \n\n)
372 if partial_line:
373 line = partial_line.rstrip(b"\r")
374 # Process the partial line same as above
375 if line and not line.startswith(b":"):
376 if b":" in line:
377 field, value = line.split(b":", 1)
378 value = value.lstrip(b" ")
379 else:
380 field, value = line, b""
381 if field == b"data":
382 event_lines.append(value)
384 # Always yield any remaining accumulated event data
385 if event_lines:
386 yield b"\n".join(event_lines)
389# -----------------------
390# Core HTTP forwarder
391# -----------------------
392async def forward_once(
393 client: ResilientHttpClient,
394 settings: "Settings",
395 payload: Union[str, Dict[str, Any], List[Any]],
396) -> None:
397 """Forward a single JSON-RPC payload to the MCP gateway and stream responses.
399 The function:
400 - Sets content negotiation headers (JSON, NDJSON, SSE)
401 - Adds Authorization header when configured
402 - Streams the gateway response and forwards every JSON object to stdout
403 (supports application/json, application/x-ndjson, and text/event-stream)
405 Args:
406 client: Resilient HTTP client used to make the request.
407 settings: Bridge configuration (URL, auth, timeouts).
408 payload: JSON-RPC request payload as str/dict/list.
409 """
410 if shutting_down():
411 return
413 headers = {
414 "Content-Type": "application/json; charset=utf-8",
415 "Accept": "application/json, application/x-ndjson, text/event-stream",
416 }
417 if settings.auth_header:
418 headers["Authorization"] = settings.auth_header
420 # Step 1: Decide content type (manual override > auto-detect)
421 content_type = getattr(settings, "content_type", None) or CONTENT_TYPE
423 if content_type == "application/x-www-form-urlencoded":
424 # Always encode as form data
425 if isinstance(payload, dict):
426 body = urlencode(payload)
427 else:
428 body = str(payload)
429 headers["Content-Type"] = "application/x-www-form-urlencoded"
431 elif content_type == "application/json":
432 # Force JSON
433 body = payload if isinstance(payload, str) else orjson.dumps(payload).decode()
434 headers["Content-Type"] = "application/json; charset=utf-8"
436 else:
437 # Auto-detect
438 if isinstance(payload, dict) and all(isinstance(v, (str, int, float, bool, type(None))) for v in payload.values()):
439 body = urlencode(payload)
440 headers["Content-Type"] = "application/x-www-form-urlencoded"
441 else:
442 body = payload if isinstance(payload, str) else orjson.dumps(payload).decode()
443 headers["Content-Type"] = "application/json; charset=utf-8"
445 body_bytes = body.encode("utf-8")
447 # Step 2: Send request and process response
448 async with client.stream("POST", settings.server_url, data=body_bytes, headers=headers) as resp:
449 ctype = (resp.headers.get("Content-Type") or "").lower()
450 status = resp.status_code
451 logger.debug("HTTP %s %s", status, ctype)
453 if shutting_down():
454 return
456 if status < 200 or status >= 300:
457 send_to_stdout(make_error(f"HTTP {status}", code=status))
458 return
460 async def _process_line(line: Union[str, bytes]):
461 """
462 Asynchronously processes a single line of text/bytes, expected to be a valid JSON.
464 If the system is shutting down, the function returns immediately.
465 Otherwise, it attempts to parse the line as JSON and sends the resulting object to stdout.
466 If parsing fails, logs a warning and sends a standardized error response to stdout.
468 Args:
469 line (Union[str, bytes]): Valid JSON object (bytes optimized).
470 """
471 if shutting_down():
472 return
473 try:
474 # orjson.loads accepts bytes or str
475 obj = orjson.loads(line)
476 send_to_stdout(obj)
477 except Exception:
478 logger.warning("Invalid JSON from server: %s", line)
479 # Ensure line is str for error message
480 line_str = line if isinstance(line, str) else str(line)
481 send_to_stdout(make_error("Invalid JSON from server", JSONRPC_PARSE_ERROR, line_str))
483 # Step 3: Handle response content types
484 if "event-stream" in ctype:
485 async for data_payload in sse_events(resp):
486 if shutting_down():
487 break
488 if not data_payload:
489 continue
490 await _process_line(data_payload)
491 return
493 if "x-ndjson" in ctype or "ndjson" in ctype:
494 async for line in ndjson_lines(resp):
495 if shutting_down():
496 break
497 await _process_line(line)
498 return
500 if "application/json" in ctype:
501 raw = await resp.aread()
502 if not shutting_down():
503 # raw is bytes
504 if not raw.strip():
505 return
506 try:
507 send_to_stdout(orjson.loads(raw))
508 except Exception:
509 send_to_stdout(make_error("Invalid JSON response", JSONRPC_PARSE_ERROR, raw.decode("utf-8", "replace")))
510 return
512 # Fallback: try parsing as NDJSON
513 async for line in ndjson_lines(resp):
514 if shutting_down():
515 break
516 await _process_line(line)
519async def make_request(
520 client: ResilientHttpClient,
521 settings: "Settings",
522 payload: Union[str, Dict[str, Any], List[Any]],
523 *,
524 max_retries: int = 5,
525 base_delay: float = 0.25,
526) -> None:
527 """Make a gateway request with retry/backoff around a single forward attempt.
529 Args:
530 client: Resilient HTTP client used to make the request.
531 settings: Bridge configuration (URL, auth, timeouts).
532 payload: JSON-RPC request payload as str/dict/list.
533 max_retries: Maximum retry attempts upon exceptions (default 5).
534 base_delay: Base delay in seconds for exponential backoff (default 0.25).
535 """
536 attempt = 0
537 while not shutting_down():
538 try:
539 await forward_once(client, settings, payload)
540 return
541 except Exception as e:
542 if shutting_down():
543 return
544 logger.warning("Network or unexpected error in forward_once: %s", e)
545 attempt += 1
546 if attempt > max_retries:
547 send_to_stdout(make_error("max retries exceeded", JSONRPC_SERVER_ERROR))
548 return
549 delay = min(base_delay * (2 ** (attempt - 1)), 8.0)
550 await asyncio.sleep(delay)
553# -----------------------
554# Main loop & CLI
555# -----------------------
556@dataclass
557class Settings:
558 """Bridge configuration settings.
560 Args:
561 server_url: MCP server URL
562 auth_header: Authorization header (optional)
563 connect_timeout: HTTP connect timeout in seconds
564 response_timeout: Max response wait in seconds
565 concurrency: Max concurrent tool calls
566 log_level: Logging verbosity
568 Examples:
569 >>> s = Settings("http://x/mcp", "Bearer token", 5, 10, 2, "DEBUG")
570 >>> s.server_url
571 'http://x/mcp'
572 >>> s.concurrency
573 2
574 """
576 server_url: str
577 auth_header: Optional[str]
578 connect_timeout: float = DEFAULT_CONNECT_TIMEOUT
579 response_timeout: float = DEFAULT_RESPONSE_TIMEOUT
580 concurrency: int = DEFAULT_CONCURRENCY
581 log_level: Optional[str] = None
584async def main_async(settings: Settings) -> None:
585 """Main async loop: reads stdin JSON lines and forwards them to the gateway.
587 - Spawns a reader task that pushes parsed lines to a queue.
588 - Uses a semaphore to cap concurrent requests.
589 - Creates tasks to forward each queued payload.
590 - Gracefully shuts down on EOF or signals.
592 Args:
593 settings: Bridge configuration settings.
595 Examples:
596 >>> # Smoke-test structure only; no network or stdin in doctest.
597 >>> settings = Settings("http://x/mcp", None)
598 >>> async def _run_once():
599 ... q = asyncio.Queue()
600 ... # Immediately signal shutdown by marking the queue end:
601 ... await q.put(None)
602 ... _mark_shutdown()
603 ... # Minimal run: create then cancel tasks cleanly.
604 ... await asyncio.sleep(0)
605 >>> # Note: We avoid running main_async here due to stdin/network.
606 >>> True
607 True
608 """
609 queue: "asyncio.Queue[Union[dict, list, str, None]]" = asyncio.Queue()
610 reader_task = asyncio.create_task(stdin_reader(queue))
612 sem = asyncio.Semaphore(settings.concurrency)
614 httpx_timeout = httpx.Timeout(
615 connect=settings.connect_timeout,
616 read=settings.response_timeout,
617 write=settings.response_timeout,
618 pool=settings.response_timeout,
619 )
621 # Get SSL verify setting from global config (with fallback for standalone usage)
622 try:
623 # First-Party
624 from mcpgateway.config import settings as global_settings # pylint: disable=import-outside-toplevel
626 ssl_verify = not global_settings.skip_ssl_verify
627 except ImportError:
628 ssl_verify = True # Default to verifying SSL when config unavailable
630 client_args = {"timeout": httpx_timeout, "http2": True, "verify": ssl_verify}
631 resilient = ResilientHttpClient(
632 max_retries=5,
633 base_backoff=0.25,
634 max_delay=8.0,
635 jitter_max=0.25,
636 client_args=client_args,
637 )
639 tasks: set[asyncio.Task[None]] = set()
640 try:
641 while not shutting_down():
642 item = await queue.get()
643 if item is None:
644 break
646 async def _worker(payload=item):
647 """
648 Executes an asynchronous request with concurrency control.
650 Acquires a semaphore to limit the number of concurrent executions.
651 If the system is not shutting down, sends the given payload using `make_request`.
653 Args:
654 payload (Any): The data to be sent in the request. Defaults to `item`.
655 """
656 async with sem:
657 if not shutting_down():
658 await make_request(resilient, settings, payload)
660 t = asyncio.create_task(_worker())
661 tasks.add(t)
662 t.add_done_callback(lambda fut, s=tasks: s.discard(fut))
664 _mark_shutdown()
665 for t in list(tasks):
666 t.cancel()
667 if tasks:
668 with suppress(asyncio.CancelledError):
669 await asyncio.gather(*tasks)
670 finally:
671 reader_task.cancel()
672 with suppress(Exception):
673 await reader_task
674 with suppress(Exception):
675 await resilient.aclose()
678def parse_args() -> Settings:
679 """Parse CLI arguments and environment variables into Settings.
681 Recognized flags:
682 --url / MCP_SERVER_URL
683 --auth / MCP_AUTH
684 --timeout / MCP_TOOL_CALL_TIMEOUT
685 --log-level / MCP_WRAPPER_LOG_LEVEL
687 Returns:
688 Settings: Parsed and normalized configuration.
690 Examples:
691 >>> import sys, os
692 >>> _argv = sys.argv
693 >>> sys.argv = ["prog", "--url", "http://localhost:4444/servers/u"]
694 >>> try:
695 ... s = parse_args()
696 ... s.server_url.endswith("/mcp/")
697 ... finally:
698 ... sys.argv = _argv
699 True
700 """
701 parser = argparse.ArgumentParser(description="Stdio MCP Client <-> MCP HTTP Bridge")
702 parser.add_argument("--url", default=os.environ.get("MCP_SERVER_URL"), help="MCP server URL (env: MCP_SERVER_URL)")
703 parser.add_argument("--auth", default=os.environ.get("MCP_AUTH"), help="Authorization header value (env: MCP_AUTH)")
704 parser.add_argument("--timeout", default=os.environ.get("MCP_TOOL_CALL_TIMEOUT"), help="Response timeout in seconds")
705 parser.add_argument(
706 "--log-level",
707 default=os.environ.get("MCP_WRAPPER_LOG_LEVEL", "INFO"),
708 help="Enable logging at this level (case-insensitive, default: disabled)",
709 )
710 args = parser.parse_args()
712 if not args.url:
713 print("Error: MCP server URL must be provided via --url or MCP_SERVER_URL", file=sys.stderr)
714 sys.exit(2)
716 server_url = convert_url(args.url)
717 response_timeout = float(args.timeout) if args.timeout else DEFAULT_RESPONSE_TIMEOUT
719 return Settings(
720 server_url=server_url,
721 auth_header=args.auth,
722 connect_timeout=DEFAULT_CONNECT_TIMEOUT,
723 response_timeout=response_timeout,
724 log_level=args.log_level,
725 concurrency=DEFAULT_CONCURRENCY,
726 )
729def _install_signal_handlers(loop: asyncio.AbstractEventLoop) -> None:
730 """Install SIGINT/SIGTERM handlers that trigger graceful shutdown.
732 Args:
733 loop: The asyncio event loop to attach handlers to.
735 Examples:
736 >>> import asyncio
737 >>> loop = asyncio.new_event_loop()
738 >>> _install_signal_handlers(loop) # doctest: +ELLIPSIS
739 >>> loop.close()
740 """
741 for sig in (getattr(signal, "SIGINT", None), getattr(signal, "SIGTERM", None)):
742 if sig is None:
743 continue
744 with suppress(NotImplementedError):
745 loop.add_signal_handler(sig, _mark_shutdown)
748def main() -> None:
749 """Entry point for the MCP stdio wrapper.
751 - Parses args/env vars into Settings
752 - Configures logging
753 - Runs the async main loop with signal handling
755 Args:
756 None
757 """
758 settings = parse_args()
759 setup_logging(settings.log_level)
760 if not logger.disabled:
761 logger.info("Starting MCP stdio wrapper -> %s", settings.server_url)
763 loop = asyncio.new_event_loop()
764 asyncio.set_event_loop(loop)
765 _install_signal_handlers(loop)
767 try:
768 loop.run_until_complete(main_async(settings))
769 finally:
770 loop.run_until_complete(asyncio.sleep(0))
771 with suppress(Exception):
772 loop.close()
773 if not logger.disabled:
774 logger.info("Shutdown complete.")
777if __name__ == "__main__":
778 main()