Source code for genai._utils.shared_loop
import asyncio
import threading
from asyncio import CancelledError
from contextlib import suppress
from typing import Callable, Optional
from genai._utils.shared_instance import SharedResource
__all__ = ["shared_event_loop"]
class _SharedEventLoop(SharedResource[asyncio.AbstractEventLoop]):
"""
A class that manages a shared asyncio event loop.
"""
def __init__(self):
super().__init__()
self._close_signal_handlers: list[Callable[[], None]] = []
def _enter(self) -> asyncio.AbstractEventLoop:
loop = asyncio.new_event_loop()
def worker():
asyncio.set_event_loop(loop)
loop.run_forever()
not_finished_tasks = [t for t in asyncio.all_tasks(loop) if not t.done()]
# Tasks should be given time to run even if they are cancelled so they can do cleanup
# https://xinhuang.github.io/posts/2017-07-31-common-mistakes-using-python3-asyncio.html
for task in not_finished_tasks:
with suppress(CancelledError):
loop.run_until_complete(task)
loop.close()
self._loop = loop
self._thread = threading.Thread(target=worker)
self._thread.start()
return loop
def add_close_handler(self, handler: Callable[[], None]) -> None:
self._close_signal_handlers.append(handler)
def remove_close_handler(self, handler: Callable[[], None]):
if handler in self._close_signal_handlers:
self._close_signal_handlers.remove(handler)
def signal_emit_close(self):
for handler in self._close_signal_handlers:
handler()
def _exit(self):
self._close_signal_handlers.clear()
self._loop.call_soon_threadsafe(lambda: self._loop.stop())
self._thread.join()
def get_running_loop(self) -> Optional[asyncio.AbstractEventLoop]:
return self._resource
shared_event_loop = _SharedEventLoop()
[docs]
def handle_shutdown_event(*_args) -> None:
"""
Handles the shutdown event.
"""
shared_event_loop.signal_emit_close()