Coverage for mcpgateway / bootstrap_db.py: 100%

292 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/bootstrap_db.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Madhav Kandukuri 

6 

7Database bootstrap/upgrade entry-point for ContextForge. 

8The script: 

9 

101. Creates a synchronous SQLAlchemy ``Engine`` from ``settings.database_url``. 

112. Looks for an *alembic.ini* two levels up from this file to drive migrations. 

123. Applies Alembic migrations (``alembic upgrade head``) to create or update the schema. 

134. Runs post-upgrade normalization tasks and bootstraps admin/roles as configured. 

145. Logs a **"Database ready"** message on success. 

15 

16It is intended to be invoked via ``python3 -m mcpgateway.bootstrap_db`` or 

17directly with ``python3 mcpgateway/bootstrap_db.py``. 

18 

19Examples: 

20 >>> from mcpgateway.bootstrap_db import logging_service, logger 

21 >>> logging_service is not None 

22 True 

23 >>> logger is not None 

24 True 

25 >>> hasattr(logger, 'info') 

26 True 

27 >>> from mcpgateway.bootstrap_db import Base 

28 >>> hasattr(Base, 'metadata') 

29 True 

30""" 

31 

32# Standard 

33import asyncio 

34from contextlib import contextmanager 

35from importlib.resources import files 

36import json 

37import os 

38from pathlib import Path 

39import random 

40import re 

41import tempfile 

42import time 

43from typing import cast 

44 

45# Third-Party 

46from alembic import command 

47from alembic.config import Config 

48from filelock import FileLock 

49from sqlalchemy import create_engine, inspect, or_, text 

50from sqlalchemy.engine import Connection 

51from sqlalchemy.orm import Session 

52 

53# First-Party 

54from mcpgateway.common.validators import SecurityValidator 

55from mcpgateway.config import settings 

56from mcpgateway.db import A2AAgent, Base, EmailTeam, EmailUser, Gateway, Prompt, Resource, Server, Tool 

57from mcpgateway.services.logging_service import LoggingService 

58 

59# Migration lock to prevent concurrent migrations from multiple workers 

60_MIGRATION_LOCK_PATH = os.path.join(tempfile.gettempdir(), "mcpgateway_migration.lock") 

61_MIGRATION_LOCK_TIMEOUT = 300 # seconds to wait for lock (5 minutes for slow migrations) 

62 

63# Initialize logging service first 

64logging_service = LoggingService() 

65logger = logging_service.get_logger(__name__) 

66 

67 

68def _column_exists(inspector, table_name: str, column_name: str) -> bool: 

69 """Check whether a table has a specific column. 

70 

71 Args: 

72 inspector: SQLAlchemy inspector for the active connection. 

73 table_name: Table name to inspect. 

74 column_name: Column name to check. 

75 

76 Returns: 

77 True if the column exists, otherwise False. 

78 """ 

79 try: 

80 return any(col["name"] == column_name for col in inspector.get_columns(table_name)) 

81 except Exception: 

82 return False 

83 

84 

85def _schema_looks_current(inspector) -> bool: 

86 """Best-effort check for unversioned databases that already match current schema. 

87 

88 Args: 

89 inspector: SQLAlchemy inspector for the active connection. 

90 

91 Returns: 

92 True when expected columns exist for a recent schema version. 

93 """ 

94 return ( 

95 _column_exists(inspector, "tools", "display_name") 

96 and _column_exists(inspector, "gateways", "oauth_config") 

97 and _column_exists(inspector, "prompts", "custom_name") 

98 and _column_exists(inspector, "sso_providers", "jwks_uri") 

99 ) 

100 

101 

102@contextmanager 

103def advisory_lock(conn: Connection): 

104 """ 

105 Acquire a distributed advisory lock to serialize migrations across multiple instances. 

106 

107 Behavior depends on the database backend: 

108 - Postgres: Uses `pg_try_advisory_lock` (non-blocking) 

109 - SQLite: Fallback to local `FileLock` 

110 

111 Args: 

112 conn: Active SQLAlchemy connection 

113 

114 Yields: 

115 None 

116 

117 Raises: 

118 TimeoutError: If the lock cannot be acquired within the timeout period 

119 """ 

120 dialect = conn.dialect.name 

121 # Postgres requires a BIGINT lock ID (arbitrary hash of the string) 

122 pg_lock_id = 42424242424242 

123 

124 if dialect == "postgresql": 

125 logger.info("Attempting to acquire Postgres advisory lock...") 

126 

127 # Retry parameters 

128 max_retries = 60 # 60 attempts 

129 base_delay = 1.0 # Start with 1 second 

130 max_delay = 10.0 # Cap at 10 seconds 

131 

132 acquired = False 

133 for attempt in range(max_retries): 

134 # Try non-blocking lock 

135 result = conn.execute(text(f"SELECT pg_try_advisory_lock({pg_lock_id})")) 

136 acquired = result.scalar() 

137 

138 if acquired: 

139 logger.info(f"Acquired Postgres advisory lock on attempt {attempt + 1}") 

140 break 

141 

142 # Exponential backoff with jitter 

143 delay = min(base_delay * (1.5**attempt), max_delay) 

144 jitter = delay * random.uniform(-0.1, 0.1) # nosec B311 # noqa: DUO102 

145 sleep_time = delay + jitter 

146 

147 logger.info(f"Lock held by another instance, retrying in {sleep_time:.1f}s (attempt {attempt + 1}/{max_retries})") 

148 time.sleep(sleep_time) 

149 

150 if not acquired: 

151 raise TimeoutError(f"Failed to acquire advisory lock after {max_retries} attempts") 

152 

153 try: 

154 yield 

155 finally: 

156 logger.info("Releasing Postgres advisory lock...") 

157 conn.execute(text(f"SELECT pg_advisory_unlock({pg_lock_id})")) 

158 

159 else: 

160 # Fallback for SQLite (single-host/container) or other DBs 

161 logger.info(f"Using FileLock fallback for {dialect}...") 

162 file_lock = FileLock(_MIGRATION_LOCK_PATH, timeout=_MIGRATION_LOCK_TIMEOUT) 

163 with file_lock: 

164 yield 

165 

166 

167async def bootstrap_admin_user(conn: Connection) -> None: 

168 """ 

169 Bootstrap the platform admin user from environment variables. 

170 

171 Creates the admin user if email authentication is enabled and the user doesn't exist. 

172 Also creates a personal team for the admin user if auto-creation is enabled. 

173 

174 Args: 

175 conn: Active SQLAlchemy connection 

176 """ 

177 if not settings.email_auth_enabled: 

178 logger.info("Email authentication disabled - skipping admin user bootstrap") 

179 return 

180 

181 try: 

182 # Import services here to avoid circular imports 

183 # First-Party 

184 from mcpgateway.services.email_auth_service import EmailAuthService # pylint: disable=import-outside-toplevel 

185 

186 # Use session bound to the locked connection 

187 with Session(bind=conn) as db: 

188 auth_service = EmailAuthService(db) 

189 

190 # Check if admin user already exists 

191 existing_user = await auth_service.get_user_by_email(settings.platform_admin_email) 

192 if existing_user: 

193 logger.info(f"Admin user {SecurityValidator.sanitize_log_message(settings.platform_admin_email)} already exists - skipping creation") 

194 return 

195 

196 # Create admin user 

197 logger.info(f"Creating platform admin user: {SecurityValidator.sanitize_log_message(settings.platform_admin_email)}") 

198 admin_user = await auth_service.create_platform_admin( 

199 email=settings.platform_admin_email, 

200 password=settings.platform_admin_password.get_secret_value(), 

201 full_name=settings.platform_admin_full_name, 

202 ) 

203 

204 # Mark admin user as email verified and require password change on first login 

205 # First-Party 

206 from mcpgateway.db import utc_now # pylint: disable=import-outside-toplevel 

207 

208 admin_user.email_verified_at = utc_now() 

209 # Respect configuration: only require password change on bootstrap when enabled 

210 if getattr(settings, "password_change_enforcement_enabled", True) and getattr(settings, "admin_require_password_change_on_bootstrap", True): 

211 admin_user.password_change_required = True # Force admin to change default password 

212 try: 

213 admin_user.password_changed_at = utc_now() 

214 except Exception as exc: 

215 logger.debug("Failed to set admin password_changed_at: %s", exc) 

216 db.commit() 

217 

218 # Personal team is automatically created during user creation if enabled 

219 if settings.auto_create_personal_teams: 

220 logger.info("Personal team automatically created for admin user") 

221 

222 db.commit() 

223 logger.info(f"Platform admin user created successfully: {SecurityValidator.sanitize_log_message(settings.platform_admin_email)}") 

224 

225 except Exception as e: 

226 logger.error(f"Failed to bootstrap admin user: {e}") 

227 # Don't fail the entire bootstrap process if admin user creation fails 

228 return 

229 

230 

231async def bootstrap_default_roles(conn: Connection) -> None: 

232 """Bootstrap default system roles and assign them to admin user. 

233 

234 Creates essential RBAC roles and assigns administrative privileges 

235 to the platform admin user. 

236 

237 Args: 

238 conn: Active SQLAlchemy connection 

239 """ 

240 if not settings.email_auth_enabled: 

241 logger.info("Email authentication disabled - skipping default roles bootstrap") 

242 return 

243 

244 try: 

245 # First-Party 

246 from mcpgateway.services.email_auth_service import EmailAuthService # pylint: disable=import-outside-toplevel 

247 from mcpgateway.services.role_service import RoleService # pylint: disable=import-outside-toplevel 

248 

249 # Use session bound to the locked connection 

250 with Session(bind=conn) as db: 

251 role_service = RoleService(db) 

252 auth_service = EmailAuthService(db) 

253 

254 # Check if admin user exists 

255 admin_user = await auth_service.get_user_by_email(settings.platform_admin_email) 

256 if not admin_user: 

257 logger.info("Admin user not found - skipping role assignment") 

258 return 

259 

260 # Default system roles to create 

261 default_roles = [ 

262 {"name": "platform_admin", "description": "Platform administrator with all permissions", "scope": "global", "permissions": ["*"], "is_system_role": True}, # All permissions 

263 { 

264 "name": "team_admin", 

265 "description": "Team administrator with team management permissions", 

266 "scope": "team", 

267 "permissions": [ 

268 "admin.dashboard", 

269 "admin.overview", 

270 "gateways.read", 

271 "servers.read", 

272 "servers.use", 

273 "teams.read", 

274 "teams.update", 

275 "teams.join", 

276 "teams.delete", 

277 "teams.manage_members", 

278 "tools.read", 

279 "tools.execute", 

280 "resources.read", 

281 "prompts.read", 

282 "llm.read", 

283 "llm.invoke", 

284 "a2a.read", 

285 "gateways.create", 

286 "servers.create", 

287 "tools.create", 

288 "resources.create", 

289 "prompts.create", 

290 "a2a.create", 

291 "gateways.update", 

292 "servers.update", 

293 "tools.update", 

294 "resources.update", 

295 "prompts.update", 

296 "a2a.update", 

297 "gateways.delete", 

298 "servers.delete", 

299 "tools.delete", 

300 "resources.delete", 

301 "prompts.delete", 

302 "a2a.delete", 

303 "a2a.invoke", 

304 "tokens.create", 

305 "tokens.read", 

306 "tokens.update", 

307 "tokens.revoke", 

308 ], 

309 "is_system_role": True, 

310 }, 

311 { 

312 "name": "developer", 

313 "description": "Developer with tool and resource access", 

314 "scope": "team", 

315 "permissions": [ 

316 "admin.dashboard", 

317 "admin.overview", 

318 "gateways.read", 

319 "servers.read", 

320 "servers.use", 

321 "teams.read", 

322 "teams.join", 

323 "tools.read", 

324 "tools.execute", 

325 "resources.read", 

326 "prompts.read", 

327 "llm.read", 

328 "llm.invoke", 

329 "a2a.read", 

330 "gateways.create", 

331 "servers.create", 

332 "tools.create", 

333 "resources.create", 

334 "prompts.create", 

335 "a2a.create", 

336 "gateways.update", 

337 "servers.update", 

338 "tools.update", 

339 "resources.update", 

340 "prompts.update", 

341 "a2a.update", 

342 "gateways.delete", 

343 "servers.delete", 

344 "tools.delete", 

345 "resources.delete", 

346 "prompts.delete", 

347 "a2a.delete", 

348 "a2a.invoke", 

349 "tokens.create", 

350 "tokens.read", 

351 "tokens.update", 

352 "tokens.revoke", 

353 ], 

354 "is_system_role": True, 

355 }, 

356 { 

357 "name": "viewer", 

358 "description": "Read access and tool execution within team scope", 

359 "scope": "team", 

360 "permissions": [ 

361 "admin.dashboard", 

362 "admin.overview", 

363 "gateways.read", 

364 "servers.read", 

365 "servers.use", 

366 "teams.read", 

367 "teams.join", 

368 "tools.read", 

369 "tools.execute", 

370 "resources.read", 

371 "prompts.read", 

372 "llm.read", 

373 "a2a.read", 

374 "tokens.create", 

375 "tokens.read", 

376 "tokens.update", 

377 "tokens.revoke", 

378 ], 

379 "is_system_role": True, 

380 }, 

381 { 

382 "name": "platform_viewer", 

383 "description": "Read-only access to resources and admin UI", 

384 "scope": "global", 

385 "permissions": [ 

386 "admin.dashboard", 

387 "admin.overview", 

388 "gateways.read", 

389 "servers.read", 

390 "servers.use", 

391 "teams.read", 

392 "teams.join", 

393 "tools.read", 

394 "resources.read", 

395 "prompts.read", 

396 "llm.read", 

397 "a2a.read", 

398 "tokens.create", 

399 "tokens.read", 

400 "tokens.update", 

401 "tokens.revoke", 

402 ], 

403 "is_system_role": True, 

404 }, 

405 ] 

406 

407 # Logic to add additional default roles from a json file 

408 if settings.mcpgateway_bootstrap_roles_in_db_enabled: 

409 try: 

410 additional_default_roles_path = Path(settings.mcpgateway_bootstrap_roles_in_db_file) 

411 # Try multiple locations for the mcpgateway_bootstrap_roles_in_db_file file 

412 if not additional_default_roles_path.is_absolute(): 

413 # Try current directory first 

414 if not additional_default_roles_path.exists(): 

415 # Try project root (mcpgateway/bootstrap_db.py -> parent.parent = repo root) 

416 additional_default_roles_path = Path(__file__).resolve().parent.parent / settings.mcpgateway_bootstrap_roles_in_db_file 

417 

418 if not additional_default_roles_path.exists(): 

419 logger.warning( 

420 f"Additional roles file not found. Searched: CWD/{SecurityValidator.sanitize_log_message(settings.mcpgateway_bootstrap_roles_in_db_file)}, {SecurityValidator.sanitize_log_message(str(additional_default_roles_path))}" 

421 ) 

422 else: 

423 with open(additional_default_roles_path, "r", encoding="utf-8") as f: 

424 additional_default_roles_data = json.load(f) 

425 

426 # Validate JSON structure: must be a list of dicts with required keys 

427 required_keys = {"name", "scope", "permissions"} 

428 if not isinstance(additional_default_roles_data, list): 

429 logger.error(f"Additional roles file must contain a JSON array, got {type(additional_default_roles_data).__name__}") 

430 else: 

431 valid_roles = [] 

432 for idx, role in enumerate(additional_default_roles_data): 

433 if not isinstance(role, dict): 

434 logger.warning(f"Skipping invalid role at index {idx}: expected dict, got {type(role).__name__}") 

435 continue 

436 missing_keys = required_keys - set(role.keys()) 

437 if missing_keys: 

438 role_name = role.get("name", f"<index {idx}>") 

439 logger.warning(f"Skipping role '{SecurityValidator.sanitize_log_message(str(role_name))}': missing required keys {missing_keys}") 

440 continue 

441 valid_roles.append(role) 

442 

443 if valid_roles: 

444 default_roles.extend(valid_roles) 

445 logger.info(f"Added {len(valid_roles)} additional roles to default roles in bootstrap db") 

446 elif additional_default_roles_data: 

447 logger.warning("No valid roles found in additional roles file") 

448 except Exception as e: 

449 logger.error(f"Failed to load mcpgateway_bootstrap_roles_in_db_file: {e}") 

450 

451 # Create default roles 

452 created_roles = [] 

453 for role_def in default_roles: 

454 try: 

455 # Check if role already exists 

456 existing_role = await role_service.get_role_by_name(str(role_def["name"]), str(role_def["scope"])) 

457 if existing_role: 

458 logger.info(f"System role {SecurityValidator.sanitize_log_message(str(role_def['name']))} already exists - skipping") 

459 created_roles.append(existing_role) 

460 continue 

461 

462 # Create the role (description and is_system_role are optional) 

463 role = await role_service.create_role( 

464 name=str(role_def["name"]), 

465 description=str(role_def.get("description", "")), 

466 scope=str(role_def["scope"]), 

467 permissions=cast(list[str], role_def["permissions"]), 

468 created_by=settings.platform_admin_email, 

469 is_system_role=bool(role_def.get("is_system_role", False)), 

470 ) 

471 created_roles.append(role) 

472 logger.info(f"Created system role: {role.name}") 

473 

474 except Exception as e: 

475 logger.error(f"Failed to create role {SecurityValidator.sanitize_log_message(str(role_def['name']))}: {SecurityValidator.sanitize_log_message(str(e))}") 

476 continue 

477 

478 # Assign platform_admin role to admin user 

479 platform_admin_role = next((r for r in created_roles if r.name == "platform_admin"), None) 

480 if not platform_admin_role: 

481 # Role not in created_roles (creation may have failed) — look up from DB as fallback 

482 platform_admin_role = await role_service.get_role_by_name("platform_admin", "global") 

483 if platform_admin_role: 

484 try: 

485 # Check if assignment already exists 

486 existing_assignment = await role_service.get_user_role_assignment(user_email=admin_user.email, role_id=platform_admin_role.id, scope="global", scope_id=None) 

487 

488 if not existing_assignment or not existing_assignment.is_active: 

489 await role_service.assign_role_to_user(user_email=admin_user.email, role_id=platform_admin_role.id, scope="global", scope_id=None, granted_by=admin_user.email) 

490 logger.info(f"Assigned platform_admin role to {SecurityValidator.sanitize_log_message(admin_user.email)}") 

491 else: 

492 logger.info("Admin user already has platform_admin role") 

493 

494 # Synchronize is_admin flag with platform_admin role assignment 

495 # This ensures consistency when admin is manually demoted in DB but role is re-assigned during bootstrap 

496 if not admin_user.is_admin: 

497 logger.info(f"Synchronizing is_admin flag for {SecurityValidator.sanitize_log_message(admin_user.email)} (was False, setting to True)") 

498 admin_user.is_admin = True 

499 db.commit() 

500 

501 except Exception as e: 

502 logger.error( 

503 f"Failed to assign platform_admin role to {SecurityValidator.sanitize_log_message(admin_user.email)}: {SecurityValidator.sanitize_log_message(str(e))}. Admin UI routes using allow_admin_bypass=False will return 403." 

504 ) 

505 else: 

506 logger.error( 

507 f"platform_admin role not found — could not assign to {SecurityValidator.sanitize_log_message(admin_user.email)}. Admin UI routes using allow_admin_bypass=False will return 403." 

508 ) 

509 

510 logger.info("Default RBAC roles bootstrap completed successfully") 

511 

512 except Exception as e: 

513 logger.error(f"Failed to bootstrap default roles: {e}") 

514 # Don't fail the entire bootstrap process if role creation fails 

515 return 

516 

517 

518def normalize_team_visibility(conn: Connection) -> int: 

519 """Normalize team visibility values to the supported set {private, public}. 

520 

521 Any team with an unsupported visibility (e.g., 'team') is set to 'private'. 

522 

523 Args: 

524 conn: Active SQLAlchemy connection 

525 

526 Returns: 

527 int: Number of teams updated 

528 """ 

529 try: 

530 # Use session bound to the locked connection 

531 with Session(bind=conn) as db: 

532 # Find teams with invalid visibility 

533 invalid = db.query(EmailTeam).filter(EmailTeam.visibility.notin_(["private", "public"])) 

534 count = 0 

535 for team in invalid.all(): 

536 old = team.visibility 

537 team.visibility = "private" 

538 count += 1 

539 logger.info(f"Normalized team visibility: id={team.id} {old} -> private") 

540 if count: 

541 db.commit() 

542 return count 

543 except Exception as e: 

544 logger.error(f"Failed to normalize team visibility: {e}") 

545 return 0 

546 

547 

548async def bootstrap_resource_assignments(conn: Connection) -> None: 

549 """Assign orphaned resources to the platform admin's personal team. 

550 

551 This ensures existing resources (from pre-multitenancy versions) are 

552 visible in the new team-based UI by assigning them to the admin's 

553 personal team with public visibility. 

554 

555 Args: 

556 conn: Active SQLAlchemy connection 

557 """ 

558 if not settings.email_auth_enabled: 

559 logger.info("Email authentication disabled - skipping resource assignment") 

560 return 

561 

562 try: 

563 # Use session bound to the locked connection 

564 with Session(bind=conn) as db: 

565 # Find admin user and their personal team 

566 admin_user = db.query(EmailUser).filter(EmailUser.email == settings.platform_admin_email, EmailUser.is_admin.is_(True)).first() 

567 

568 if not admin_user: 

569 logger.warning("Admin user not found - skipping resource assignment") 

570 return 

571 

572 personal_team = admin_user.get_personal_team() 

573 if not personal_team: 

574 logger.warning("Admin personal team not found - skipping resource assignment") 

575 return 

576 

577 logger.info(f"Assigning orphaned resources to admin team: {SecurityValidator.sanitize_log_message(personal_team.name)}") 

578 

579 # Resource types to process 

580 resource_types = [("servers", Server), ("tools", Tool), ("resources", Resource), ("prompts", Prompt), ("gateways", Gateway), ("a2a_agents", A2AAgent)] 

581 

582 total_assigned = 0 

583 

584 # Unique field per resource type that participates in the team-scoped unique constraint 

585 unique_field: dict[str, str] = { 

586 "servers": "name", 

587 "tools": "name", 

588 "resources": "uri", 

589 "prompts": "name", 

590 "gateways": "slug", 

591 "a2a_agents": "slug", 

592 } 

593 

594 def _like_safe(v: str) -> str: 

595 """Escape SQL LIKE wildcard characters for safe use in LIKE patterns. 

596 

597 Args: 

598 v: The string value to escape. 

599 

600 Returns: 

601 The escaped string safe for use in SQL LIKE patterns. 

602 """ 

603 return v.replace("\\", "\\\\").replace("%", r"\%").replace("_", r"\_") 

604 

605 for resource_name, resource_model in resource_types: 

606 try: 

607 # Find unassigned resources 

608 unassigned = db.query(resource_model).filter((resource_model.team_id.is_(None)) | (resource_model.owner_email.is_(None)) | (resource_model.visibility.is_(None))).all() 

609 

610 if not unassigned: 

611 continue 

612 

613 logger.info(f"Assigning {len(unassigned)} orphaned {resource_name} to admin team") 

614 

615 field = unique_field[resource_name] 

616 field_col = getattr(resource_model, field) 

617 

618 # Collect unique field values from the orphaned batch 

619 original_values = {getattr(r, field) for r in unassigned if getattr(r, field) is not None} 

620 

621 # One query: fetch all names already taken in the admin team that match any 

622 # original value exactly or as a suffixed variant (value-N). 

623 # NOTE: This intentionally omits gateway_id from the filter, making it 

624 # conservative — for Resource/Prompt models whose uniqueness also depends 

625 # on gateway_id, this may produce unnecessary renames but can never miss a 

626 # real conflict. That is the correct tradeoff for one-time bootstrap code. 

627 existing_taken: set[str] = ( 

628 { 

629 row[0] 

630 for row in db.query(field_col).filter( 

631 resource_model.team_id == personal_team.id, 

632 resource_model.owner_email == admin_user.email, 

633 or_(*[cond for v in original_values for cond in (field_col == v, field_col.like(f"{_like_safe(v)}-%", escape="\\"))]), 

634 ) 

635 } 

636 if original_values 

637 else set() 

638 ) 

639 

640 # Pre-compile suffix regexes keyed by original value 

641 suffix_res = {v: re.compile(rf"^{re.escape(v)}-(\d+)$") for v in original_values} 

642 

643 # Track names claimed within this batch to catch intra-batch duplicates 

644 batch_assigned: set[str] = set() 

645 

646 for resource in unassigned: 

647 original_value = getattr(resource, field) 

648 

649 if original_value is not None: 

650 taken = existing_taken | batch_assigned 

651 if original_value in taken: 

652 # Parse numeric suffixes from taken values to find next free one 

653 suffix_re = suffix_res[original_value] 

654 used = {int(m.group(1)) for v in taken if (m := suffix_re.match(v))} 

655 new_value = f"{original_value}-{(max(used) if used else 1) + 1}" 

656 logger.warning( 

657 f"Name conflict for {SecurityValidator.sanitize_log_message(resource_name)} '{SecurityValidator.sanitize_log_message(original_value)}' — renaming to '{SecurityValidator.sanitize_log_message(new_value)}'" 

658 ) 

659 setattr(resource, field, new_value) 

660 batch_assigned.add(new_value) 

661 else: 

662 batch_assigned.add(original_value) 

663 

664 resource.team_id = personal_team.id 

665 resource.owner_email = admin_user.email 

666 resource.visibility = "public" # Make visible to all users 

667 if hasattr(resource, "federation_source") and not resource.federation_source: 

668 resource.federation_source = "mcpgateway-0.7.0-migration" 

669 

670 db.commit() 

671 total_assigned += len(unassigned) 

672 

673 except Exception as e: 

674 logger.error(f"Failed to assign {SecurityValidator.sanitize_log_message(resource_name)}: {SecurityValidator.sanitize_log_message(str(e))}") 

675 continue 

676 

677 if total_assigned > 0: 

678 logger.info(f"Successfully assigned {total_assigned} orphaned resources to admin team") 

679 else: 

680 logger.info("No orphaned resources found - all resources have team assignments") 

681 

682 except Exception as e: 

683 logger.error(f"Failed to bootstrap resource assignments: {e}") 

684 

685 

686async def main() -> None: 

687 """ 

688 Bootstrap or upgrade the database schema, then log readiness. 

689 

690 Runs `create_all()` + `alembic stamp head` on an empty DB, otherwise just 

691 executes `alembic upgrade head`, leaving application data intact. 

692 Also creates the platform admin user if email authentication is enabled. 

693 

694 Uses distributed advisory locks (PG) or file locking (SQLite) 

695 to prevent race conditions when multiple workers start simultaneously. 

696 

697 Args: 

698 None 

699 

700 Raises: 

701 Exception: If migration or bootstrap fails 

702 """ 

703 engine = create_engine(settings.database_url) 

704 ini_path = files("mcpgateway").joinpath("alembic.ini") 

705 cfg = Config(str(ini_path)) # path in container 

706 cfg.attributes["configure_logger"] = True 

707 

708 # Use advisory lock to prevent concurrent migrations 

709 try: 

710 with engine.connect() as conn: 

711 # Commit any open transaction on the connection before locking (though it should be fresh) 

712 conn.commit() 

713 

714 with advisory_lock(conn): 

715 logger.info("Acquired migration lock, checking database schema...") 

716 

717 # Pass the LOCKED connection to Alembic config 

718 cfg.attributes["connection"] = conn 

719 

720 # Escape '%' characters in URL to avoid configparser interpolation errors 

721 # (e.g., URL-encoded passwords like %40 for '@') 

722 escaped_url = settings.database_url.replace("%", "%%") 

723 cfg.set_main_option("sqlalchemy.url", escaped_url) 

724 

725 insp = inspect(conn) 

726 table_names = insp.get_table_names() 

727 

728 if "gateways" not in table_names: 

729 logger.info("Empty DB detected - creating baseline schema") 

730 Base.metadata.create_all(bind=conn) 

731 command.stamp(cfg, "head") 

732 else: 

733 versions: list[str] = [] 

734 if "alembic_version" in table_names: 

735 try: 

736 rows = conn.execute(text("SELECT version_num FROM alembic_version")).fetchall() 

737 versions = [row[0] for row in rows if row[0]] 

738 except Exception as exc: 

739 logger.warning("Failed to read alembic_version table: %s", exc) 

740 

741 if not versions and _schema_looks_current(insp): 

742 logger.warning("Existing database has no Alembic revision rows; stamping head to avoid reapplying migrations") 

743 command.stamp(cfg, "head") 

744 else: 

745 logger.info("Running Alembic migrations to ensure schema is up to date") 

746 command.upgrade(cfg, "head") 

747 

748 # Post-upgrade normalization passes (inside lock to be safe) 

749 updated = normalize_team_visibility(conn) 

750 if updated: 

751 logger.info(f"Normalized {updated} team record(s) to supported visibility values") 

752 

753 # Bootstrap admin user after database is ready, using the LOCKED connection 

754 await bootstrap_admin_user(conn) 

755 

756 # Bootstrap default RBAC roles after admin user is created 

757 await bootstrap_default_roles(conn) 

758 

759 # Assign orphaned resources to admin personal team after all setup is complete 

760 await bootstrap_resource_assignments(conn) 

761 

762 conn.commit() # Ensure all migration changes are permanently committed 

763 

764 except Exception as e: 

765 logger.error(f"Migration/Bootstrap failed: {e}") 

766 # Allow retry logic or container restart to handle transient issues 

767 raise 

768 finally: 

769 # Dispose the engine to close all connections in the pool 

770 engine.dispose() 

771 

772 logger.info("Database ready") 

773 

774 

775if __name__ == "__main__": 

776 asyncio.run(main())