importasyncioimportthreadingfromasyncioimportCancelledErrorfromcontextlibimportsuppressfromtypingimportCallable,Optionalfromgenai._utils.shared_instanceimportSharedResource__all__=["shared_event_loop"]class_SharedEventLoop(SharedResource[asyncio.AbstractEventLoop]):""" A class that manages a shared asyncio event loop. """def__init__(self):super().__init__()self._close_signal_handlers:list[Callable[[],None]]=[]def_enter(self)->asyncio.AbstractEventLoop:loop=asyncio.new_event_loop()defworker():asyncio.set_event_loop(loop)loop.run_forever()not_finished_tasks=[tfortinasyncio.all_tasks(loop)ifnott.done()]# Tasks should be given time to run even if they are cancelled so they can do cleanup# https://xinhuang.github.io/posts/2017-07-31-common-mistakes-using-python3-asyncio.htmlfortaskinnot_finished_tasks:withsuppress(CancelledError):loop.run_until_complete(task)loop.close()self._loop=loopself._thread=threading.Thread(target=worker)self._thread.start()returnloopdefadd_close_handler(self,handler:Callable[[],None])->None:self._close_signal_handlers.append(handler)defremove_close_handler(self,handler:Callable[[],None]):ifhandlerinself._close_signal_handlers:self._close_signal_handlers.remove(handler)defsignal_emit_close(self):forhandlerinself._close_signal_handlers:handler()def_exit(self):self._close_signal_handlers.clear()self._loop.call_soon_threadsafe(lambda:self._loop.stop())self._thread.join()defget_running_loop(self)->Optional[asyncio.AbstractEventLoop]:returnself._resourceshared_event_loop=_SharedEventLoop()
[docs]defhandle_shutdown_event(*_args)->None:""" Handles the shutdown event. """shared_event_loop.signal_emit_close()