Skip to content

Parallel Session Cleanup with asyncio.gather()ΒΆ

OverviewΒΆ

The MCP Gateway implements a high-performance parallel session cleanup mechanism using asyncio.gather() with bounded concurrency to optimize database operations in multi-worker deployments. This document explains the implementation and performance benefits.

ImplementationΒΆ

Two-Phase StrategyΒΆ

The _cleanup_database_sessions() method uses a two-phase approach:

  1. Connection Check Phase (Sequential)
  2. Quickly checks each session's connection status
  3. Immediately removes disconnected sessions
  4. Reduces workload for the parallel phase

  5. Database Refresh Phase (Parallel with Bounded Concurrency)

  6. Uses asyncio.gather() with a semaphore to refresh connected sessions
  7. Limits concurrent DB operations to prevent resource exhaustion (default: 20)
  8. Each refresh updates the last_accessed timestamp in the database
  9. Prevents sessions from being marked as expired

Code StructureΒΆ

async def _cleanup_database_sessions(self, max_concurrent: int = 20) -> None:
    # Phase 1: Sequential connection checks (fast)
    connected: list[str] = []
    for session_id, transport in local_transports.items():
        if not await transport.is_connected():
            await self.remove_session(session_id)
        else:
            connected.append(session_id)

    # Phase 2: Parallel database refreshes with bounded concurrency
    if connected:
        semaphore = asyncio.Semaphore(max_concurrent)

        async def bounded_refresh(session_id: str) -> bool:
            async with semaphore:
                return await asyncio.to_thread(self._refresh_session_db, session_id)

        refresh_tasks = [bounded_refresh(session_id) for session_id in connected]
        results = await asyncio.gather(*refresh_tasks, return_exceptions=True)

Performance BenefitsΒΆ

Time Complexity ComparisonΒΆ

  • Sequential Execution: N Γ— (connection_check_time + db_refresh_time)
  • Parallel Execution: N Γ— connection_check_time + ceil(N / max_concurrent) Γ— db_refresh_time

Real-World ExampleΒΆ

For 100 sessions with 50ms database latency and max_concurrent=20: - Sequential: ~5 seconds total - Parallel: ~250ms (5 batches Γ— 50ms)

Bounded ConcurrencyΒΆ

Why Limit Concurrency?ΒΆ

Without limits, parallel cleanup can: - Exhaust database connection pools under high session counts - Cause DB timeouts when many operations queue simultaneously - Create memory pressure from thousands of pending task objects

Default ConfigurationΒΆ

  • max_concurrent=20: Balances parallelism with resource usage
  • Works well with typical DB pool sizes (50-200 connections)
  • Can be tuned based on deployment requirements

Error HandlingΒΆ

Robust Exception ManagementΒΆ

  • Uses return_exceptions=True to prevent one failed refresh from stopping others
  • Processes results individually to handle mixed success/failure scenarios
  • Maintains session registry consistency even when database operations fail

Graceful DegradationΒΆ

for session_id, result in zip(connected, results):
    if isinstance(result, Exception):
        # Only log error, don't remove session on transient DB errors
        logger.error(f"Error refreshing session {session_id}: {result}")
    elif not result:
        # Session no longer in database, remove locally
        await self.remove_session(session_id)

Transient errors (network blips, temporary DB issues) are logged but don't remove active sessions. Sessions are only removed when explicitly confirmed to no longer exist in the database.

BenefitsΒΆ

  1. Scalability: Handles thousands of concurrent sessions efficiently
  2. Reliability: Continues processing even when individual operations fail
  3. Performance: Dramatically reduces cleanup time through parallelization
  4. Resource Safety: Bounded concurrency prevents DB/thread pool exhaustion
  5. Consistency: Maintains accurate session state across distributed workers

UsageΒΆ

This optimization is automatically applied in database-backed session registries and runs every 5 minutes as part of the cleanup task. No configuration changes are required to benefit from the parallel implementation.