Coverage for mcpgateway / bootstrap_db.py: 100%

262 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/bootstrap_db.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Madhav Kandukuri 

6 

7Database bootstrap/upgrade entry-point for ContextForge. 

8The script: 

9 

101. Creates a synchronous SQLAlchemy ``Engine`` from ``settings.database_url``. 

112. Looks for an *alembic.ini* two levels up from this file to drive migrations. 

123. Applies Alembic migrations (``alembic upgrade head``) to create or update the schema. 

134. Runs post-upgrade normalization tasks and bootstraps admin/roles as configured. 

145. Logs a **"Database ready"** message on success. 

15 

16It is intended to be invoked via ``python3 -m mcpgateway.bootstrap_db`` or 

17directly with ``python3 mcpgateway/bootstrap_db.py``. 

18 

19Examples: 

20 >>> from mcpgateway.bootstrap_db import logging_service, logger 

21 >>> logging_service is not None 

22 True 

23 >>> logger is not None 

24 True 

25 >>> hasattr(logger, 'info') 

26 True 

27 >>> from mcpgateway.bootstrap_db import Base 

28 >>> hasattr(Base, 'metadata') 

29 True 

30""" 

31 

32# Standard 

33import asyncio 

34from contextlib import contextmanager 

35from importlib.resources import files 

36import json 

37import os 

38from pathlib import Path 

39import tempfile 

40from typing import cast 

41 

42# Third-Party 

43from alembic import command 

44from alembic.config import Config 

45from filelock import FileLock 

46from sqlalchemy import create_engine, inspect, text 

47from sqlalchemy.engine import Connection 

48from sqlalchemy.orm import Session 

49 

50# First-Party 

51from mcpgateway.config import settings 

52from mcpgateway.db import A2AAgent, Base, EmailTeam, EmailUser, Gateway, Prompt, Resource, Server, Tool 

53from mcpgateway.services.logging_service import LoggingService 

54 

55# Migration lock to prevent concurrent migrations from multiple workers 

56_MIGRATION_LOCK_PATH = os.path.join(tempfile.gettempdir(), "mcpgateway_migration.lock") 

57_MIGRATION_LOCK_TIMEOUT = 300 # seconds to wait for lock (5 minutes for slow migrations) 

58 

59# Initialize logging service first 

60logging_service = LoggingService() 

61logger = logging_service.get_logger(__name__) 

62 

63 

64def _column_exists(inspector, table_name: str, column_name: str) -> bool: 

65 """Check whether a table has a specific column. 

66 

67 Args: 

68 inspector: SQLAlchemy inspector for the active connection. 

69 table_name: Table name to inspect. 

70 column_name: Column name to check. 

71 

72 Returns: 

73 True if the column exists, otherwise False. 

74 """ 

75 try: 

76 return any(col["name"] == column_name for col in inspector.get_columns(table_name)) 

77 except Exception: 

78 return False 

79 

80 

81def _schema_looks_current(inspector) -> bool: 

82 """Best-effort check for unversioned databases that already match current schema. 

83 

84 Args: 

85 inspector: SQLAlchemy inspector for the active connection. 

86 

87 Returns: 

88 True when expected columns exist for a recent schema version. 

89 """ 

90 return ( 

91 _column_exists(inspector, "tools", "display_name") 

92 and _column_exists(inspector, "gateways", "oauth_config") 

93 and _column_exists(inspector, "prompts", "custom_name") 

94 and _column_exists(inspector, "sso_providers", "jwks_uri") 

95 ) 

96 

97 

98@contextmanager 

99def advisory_lock(conn: Connection): 

100 """ 

101 Acquire a distributed advisory lock to serialize migrations across multiple instances. 

102 

103 Behavior depends on the database backend: 

104 - Postgres: Uses `pg_advisory_lock` (blocking) 

105 - MySQL: Uses `GET_LOCK` (blocking with timeout) 

106 - SQLite: Fallback to local `FileLock` 

107 

108 Args: 

109 conn: Active SQLAlchemy connection 

110 

111 Yields: 

112 None 

113 

114 Raises: 

115 TimeoutError: If the lock cannot be acquired within the timeout period 

116 """ 

117 dialect = conn.dialect.name 

118 lock_id = "mcpgateway_migration" 

119 # Postgres requires a BIGINT lock ID (arbitrary hash of the string) 

120 pg_lock_id = 42424242424242 

121 

122 if dialect == "postgresql": 

123 logger.info("Acquiring Postgres advisory lock...") 

124 conn.execute(text(f"SELECT pg_advisory_lock({pg_lock_id})")) 

125 try: 

126 yield 

127 finally: 

128 logger.info("Releasing Postgres advisory lock...") 

129 conn.execute(text(f"SELECT pg_advisory_unlock({pg_lock_id})")) 

130 

131 elif dialect in ["mysql", "mariadb"]: 

132 logger.info("Acquiring MySQL advisory lock...") 

133 # GET_LOCK returns 1 if successful, 0 if timed out, NULL on error 

134 result = conn.execute(text(f"SELECT GET_LOCK('{lock_id}', {_MIGRATION_LOCK_TIMEOUT})")).scalar() 

135 if result != 1: 

136 raise TimeoutError(f"Could not acquire MySQL lock '{lock_id}' within {_MIGRATION_LOCK_TIMEOUT}s") 

137 try: 

138 yield 

139 finally: 

140 logger.info("Releasing MySQL advisory lock...") 

141 conn.execute(text(f"SELECT RELEASE_LOCK('{lock_id}')")) 

142 

143 else: 

144 # Fallback for SQLite (single-host/container) or other DBs 

145 logger.info(f"Using FileLock fallback for {dialect}...") 

146 file_lock = FileLock(_MIGRATION_LOCK_PATH, timeout=_MIGRATION_LOCK_TIMEOUT) 

147 with file_lock: 

148 yield 

149 

150 

151async def bootstrap_admin_user(conn: Connection) -> None: 

152 """ 

153 Bootstrap the platform admin user from environment variables. 

154 

155 Creates the admin user if email authentication is enabled and the user doesn't exist. 

156 Also creates a personal team for the admin user if auto-creation is enabled. 

157 

158 Args: 

159 conn: Active SQLAlchemy connection 

160 """ 

161 if not settings.email_auth_enabled: 

162 logger.info("Email authentication disabled - skipping admin user bootstrap") 

163 return 

164 

165 try: 

166 # Import services here to avoid circular imports 

167 # First-Party 

168 from mcpgateway.services.email_auth_service import EmailAuthService # pylint: disable=import-outside-toplevel 

169 

170 # Use session bound to the locked connection 

171 with Session(bind=conn) as db: 

172 auth_service = EmailAuthService(db) 

173 

174 # Check if admin user already exists 

175 existing_user = await auth_service.get_user_by_email(settings.platform_admin_email) 

176 if existing_user: 

177 logger.info(f"Admin user {settings.platform_admin_email} already exists - skipping creation") 

178 return 

179 

180 # Create admin user 

181 logger.info(f"Creating platform admin user: {settings.platform_admin_email}") 

182 admin_user = await auth_service.create_platform_admin( 

183 email=settings.platform_admin_email, 

184 password=settings.platform_admin_password.get_secret_value(), 

185 full_name=settings.platform_admin_full_name, 

186 ) 

187 

188 # Mark admin user as email verified and require password change on first login 

189 # First-Party 

190 from mcpgateway.db import utc_now # pylint: disable=import-outside-toplevel 

191 

192 admin_user.email_verified_at = utc_now() 

193 # Respect configuration: only require password change on bootstrap when enabled 

194 if getattr(settings, "password_change_enforcement_enabled", True) and getattr(settings, "admin_require_password_change_on_bootstrap", True): 

195 admin_user.password_change_required = True # Force admin to change default password 

196 try: 

197 admin_user.password_changed_at = utc_now() 

198 except Exception as exc: 

199 logger.debug("Failed to set admin password_changed_at: %s", exc) 

200 db.commit() 

201 

202 # Personal team is automatically created during user creation if enabled 

203 if settings.auto_create_personal_teams: 

204 logger.info("Personal team automatically created for admin user") 

205 

206 db.commit() 

207 logger.info(f"Platform admin user created successfully: {settings.platform_admin_email}") 

208 

209 except Exception as e: 

210 logger.error(f"Failed to bootstrap admin user: {e}") 

211 # Don't fail the entire bootstrap process if admin user creation fails 

212 return 

213 

214 

215async def bootstrap_default_roles(conn: Connection) -> None: 

216 """Bootstrap default system roles and assign them to admin user. 

217 

218 Creates essential RBAC roles and assigns administrative privileges 

219 to the platform admin user. 

220 

221 Args: 

222 conn: Active SQLAlchemy connection 

223 """ 

224 if not settings.email_auth_enabled: 

225 logger.info("Email authentication disabled - skipping default roles bootstrap") 

226 return 

227 

228 try: 

229 # First-Party 

230 from mcpgateway.services.email_auth_service import EmailAuthService # pylint: disable=import-outside-toplevel 

231 from mcpgateway.services.role_service import RoleService # pylint: disable=import-outside-toplevel 

232 

233 # Use session bound to the locked connection 

234 with Session(bind=conn) as db: 

235 role_service = RoleService(db) 

236 auth_service = EmailAuthService(db) 

237 

238 # Check if admin user exists 

239 admin_user = await auth_service.get_user_by_email(settings.platform_admin_email) 

240 if not admin_user: 

241 logger.info("Admin user not found - skipping role assignment") 

242 return 

243 

244 # Default system roles to create 

245 default_roles = [ 

246 {"name": "platform_admin", "description": "Platform administrator with all permissions", "scope": "global", "permissions": ["*"], "is_system_role": True}, # All permissions 

247 { 

248 "name": "team_admin", 

249 "description": "Team administrator with team management permissions", 

250 "scope": "team", 

251 "permissions": [ 

252 "admin.dashboard", 

253 "gateways.read", 

254 "servers.read", 

255 "servers.use", 

256 "teams.read", 

257 "teams.update", 

258 "teams.join", 

259 "teams.delete", 

260 "teams.manage_members", 

261 "tools.read", 

262 "tools.execute", 

263 "resources.read", 

264 "prompts.read", 

265 "llm.read", 

266 "llm.invoke", 

267 "a2a.read", 

268 "gateways.create", 

269 "servers.create", 

270 "tools.create", 

271 "resources.create", 

272 "prompts.create", 

273 "a2a.create", 

274 "gateways.update", 

275 "servers.update", 

276 "tools.update", 

277 "resources.update", 

278 "prompts.update", 

279 "a2a.update", 

280 "gateways.delete", 

281 "servers.delete", 

282 "tools.delete", 

283 "resources.delete", 

284 "prompts.delete", 

285 "a2a.delete", 

286 "a2a.invoke", 

287 "tokens.create", 

288 "tokens.read", 

289 "tokens.update", 

290 "tokens.revoke", 

291 ], 

292 "is_system_role": True, 

293 }, 

294 { 

295 "name": "developer", 

296 "description": "Developer with tool and resource access", 

297 "scope": "team", 

298 "permissions": [ 

299 "admin.dashboard", 

300 "gateways.read", 

301 "servers.read", 

302 "servers.use", 

303 "teams.read", 

304 "teams.join", 

305 "tools.read", 

306 "tools.execute", 

307 "resources.read", 

308 "prompts.read", 

309 "llm.read", 

310 "llm.invoke", 

311 "a2a.read", 

312 "gateways.create", 

313 "servers.create", 

314 "tools.create", 

315 "resources.create", 

316 "prompts.create", 

317 "a2a.create", 

318 "gateways.update", 

319 "servers.update", 

320 "tools.update", 

321 "resources.update", 

322 "prompts.update", 

323 "a2a.update", 

324 "gateways.delete", 

325 "servers.delete", 

326 "tools.delete", 

327 "resources.delete", 

328 "prompts.delete", 

329 "a2a.delete", 

330 "a2a.invoke", 

331 "tokens.create", 

332 "tokens.read", 

333 "tokens.update", 

334 "tokens.revoke", 

335 ], 

336 "is_system_role": True, 

337 }, 

338 { 

339 "name": "viewer", 

340 "description": "Read-only access to resources and admin UI", 

341 "scope": "team", 

342 "permissions": [ 

343 "admin.dashboard", 

344 "gateways.read", 

345 "servers.read", 

346 "teams.read", 

347 "teams.join", 

348 "tools.read", 

349 "resources.read", 

350 "prompts.read", 

351 "llm.read", 

352 "a2a.read", 

353 "tokens.create", 

354 "tokens.read", 

355 "tokens.update", 

356 "tokens.revoke", 

357 ], 

358 "is_system_role": True, 

359 }, 

360 { 

361 "name": "platform_viewer", 

362 "description": "Read-only access to resources and admin UI", 

363 "scope": "global", 

364 "permissions": [ 

365 "admin.dashboard", 

366 "gateways.read", 

367 "servers.read", 

368 "teams.read", 

369 "teams.join", 

370 "tools.read", 

371 "resources.read", 

372 "prompts.read", 

373 "llm.read", 

374 "a2a.read", 

375 "tokens.create", 

376 "tokens.read", 

377 "tokens.update", 

378 "tokens.revoke", 

379 ], 

380 "is_system_role": True, 

381 }, 

382 ] 

383 

384 # Logic to add additional default roles from a json file 

385 if settings.mcpgateway_bootstrap_roles_in_db_enabled: 

386 try: 

387 additional_default_roles_path = Path(settings.mcpgateway_bootstrap_roles_in_db_file) 

388 # Try multiple locations for the mcpgateway_bootstrap_roles_in_db_file file 

389 if not additional_default_roles_path.is_absolute(): 

390 # Try current directory first 

391 if not additional_default_roles_path.exists(): 

392 # Try project root (mcpgateway/bootstrap_db.py -> parent.parent = repo root) 

393 additional_default_roles_path = Path(__file__).resolve().parent.parent / settings.mcpgateway_bootstrap_roles_in_db_file 

394 

395 if not additional_default_roles_path.exists(): 

396 logger.warning(f"Additional roles file not found. Searched: CWD/{settings.mcpgateway_bootstrap_roles_in_db_file}, {additional_default_roles_path}") 

397 else: 

398 with open(additional_default_roles_path, "r", encoding="utf-8") as f: 

399 additional_default_roles_data = json.load(f) 

400 

401 # Validate JSON structure: must be a list of dicts with required keys 

402 required_keys = {"name", "scope", "permissions"} 

403 if not isinstance(additional_default_roles_data, list): 

404 logger.error(f"Additional roles file must contain a JSON array, got {type(additional_default_roles_data).__name__}") 

405 else: 

406 valid_roles = [] 

407 for idx, role in enumerate(additional_default_roles_data): 

408 if not isinstance(role, dict): 

409 logger.warning(f"Skipping invalid role at index {idx}: expected dict, got {type(role).__name__}") 

410 continue 

411 missing_keys = required_keys - set(role.keys()) 

412 if missing_keys: 

413 role_name = role.get("name", f"<index {idx}>") 

414 logger.warning(f"Skipping role '{role_name}': missing required keys {missing_keys}") 

415 continue 

416 valid_roles.append(role) 

417 

418 if valid_roles: 

419 default_roles.extend(valid_roles) 

420 logger.info(f"Added {len(valid_roles)} additional roles to default roles in bootstrap db") 

421 elif additional_default_roles_data: 

422 logger.warning("No valid roles found in additional roles file") 

423 except Exception as e: 

424 logger.error(f"Failed to load mcpgateway_bootstrap_roles_in_db_file: {e}") 

425 

426 # Create default roles 

427 created_roles = [] 

428 for role_def in default_roles: 

429 try: 

430 # Check if role already exists 

431 existing_role = await role_service.get_role_by_name(str(role_def["name"]), str(role_def["scope"])) 

432 if existing_role: 

433 logger.info(f"System role {role_def['name']} already exists - skipping") 

434 created_roles.append(existing_role) 

435 continue 

436 

437 # Create the role (description and is_system_role are optional) 

438 role = await role_service.create_role( 

439 name=str(role_def["name"]), 

440 description=str(role_def.get("description", "")), 

441 scope=str(role_def["scope"]), 

442 permissions=cast(list[str], role_def["permissions"]), 

443 created_by=settings.platform_admin_email, 

444 is_system_role=bool(role_def.get("is_system_role", False)), 

445 ) 

446 created_roles.append(role) 

447 logger.info(f"Created system role: {role.name}") 

448 

449 except Exception as e: 

450 logger.error(f"Failed to create role {role_def['name']}: {e}") 

451 continue 

452 

453 # Assign platform_admin role to admin user 

454 platform_admin_role = next((r for r in created_roles if r.name == "platform_admin"), None) 

455 if not platform_admin_role: 

456 # Role not in created_roles (creation may have failed) — look up from DB as fallback 

457 platform_admin_role = await role_service.get_role_by_name("platform_admin", "global") 

458 if platform_admin_role: 

459 try: 

460 # Check if assignment already exists 

461 existing_assignment = await role_service.get_user_role_assignment(user_email=admin_user.email, role_id=platform_admin_role.id, scope="global", scope_id=None) 

462 

463 if not existing_assignment or not existing_assignment.is_active: 

464 await role_service.assign_role_to_user(user_email=admin_user.email, role_id=platform_admin_role.id, scope="global", scope_id=None, granted_by=admin_user.email) 

465 logger.info(f"Assigned platform_admin role to {admin_user.email}") 

466 else: 

467 logger.info("Admin user already has platform_admin role") 

468 

469 except Exception as e: 

470 logger.error(f"Failed to assign platform_admin role to {admin_user.email}: {e}. Admin UI routes using allow_admin_bypass=False will return 403.") 

471 else: 

472 logger.error(f"platform_admin role not found — could not assign to {admin_user.email}. Admin UI routes using allow_admin_bypass=False will return 403.") 

473 

474 logger.info("Default RBAC roles bootstrap completed successfully") 

475 

476 except Exception as e: 

477 logger.error(f"Failed to bootstrap default roles: {e}") 

478 # Don't fail the entire bootstrap process if role creation fails 

479 return 

480 

481 

482def normalize_team_visibility(conn: Connection) -> int: 

483 """Normalize team visibility values to the supported set {private, public}. 

484 

485 Any team with an unsupported visibility (e.g., 'team') is set to 'private'. 

486 

487 Args: 

488 conn: Active SQLAlchemy connection 

489 

490 Returns: 

491 int: Number of teams updated 

492 """ 

493 try: 

494 # Use session bound to the locked connection 

495 with Session(bind=conn) as db: 

496 # Find teams with invalid visibility 

497 invalid = db.query(EmailTeam).filter(EmailTeam.visibility.notin_(["private", "public"])) 

498 count = 0 

499 for team in invalid.all(): 

500 old = team.visibility 

501 team.visibility = "private" 

502 count += 1 

503 logger.info(f"Normalized team visibility: id={team.id} {old} -> private") 

504 if count: 

505 db.commit() 

506 return count 

507 except Exception as e: 

508 logger.error(f"Failed to normalize team visibility: {e}") 

509 return 0 

510 

511 

512async def bootstrap_resource_assignments(conn: Connection) -> None: 

513 """Assign orphaned resources to the platform admin's personal team. 

514 

515 This ensures existing resources (from pre-multitenancy versions) are 

516 visible in the new team-based UI by assigning them to the admin's 

517 personal team with public visibility. 

518 

519 Args: 

520 conn: Active SQLAlchemy connection 

521 """ 

522 if not settings.email_auth_enabled: 

523 logger.info("Email authentication disabled - skipping resource assignment") 

524 return 

525 

526 try: 

527 # Use session bound to the locked connection 

528 with Session(bind=conn) as db: 

529 # Find admin user and their personal team 

530 admin_user = db.query(EmailUser).filter(EmailUser.email == settings.platform_admin_email, EmailUser.is_admin.is_(True)).first() 

531 

532 if not admin_user: 

533 logger.warning("Admin user not found - skipping resource assignment") 

534 return 

535 

536 personal_team = admin_user.get_personal_team() 

537 if not personal_team: 

538 logger.warning("Admin personal team not found - skipping resource assignment") 

539 return 

540 

541 logger.info(f"Assigning orphaned resources to admin team: {personal_team.name}") 

542 

543 # Resource types to process 

544 resource_types = [("servers", Server), ("tools", Tool), ("resources", Resource), ("prompts", Prompt), ("gateways", Gateway), ("a2a_agents", A2AAgent)] 

545 

546 total_assigned = 0 

547 

548 for resource_name, resource_model in resource_types: 

549 try: 

550 # Find unassigned resources 

551 unassigned = db.query(resource_model).filter((resource_model.team_id.is_(None)) | (resource_model.owner_email.is_(None)) | (resource_model.visibility.is_(None))).all() 

552 

553 if unassigned: 

554 logger.info(f"Assigning {len(unassigned)} orphaned {resource_name} to admin team") 

555 

556 for resource in unassigned: 

557 resource.team_id = personal_team.id 

558 resource.owner_email = admin_user.email 

559 resource.visibility = "public" # Make visible to all users 

560 if hasattr(resource, "federation_source") and not resource.federation_source: 

561 resource.federation_source = "mcpgateway-0.7.0-migration" 

562 

563 db.commit() 

564 total_assigned += len(unassigned) 

565 

566 except Exception as e: 

567 logger.error(f"Failed to assign {resource_name}: {e}") 

568 continue 

569 

570 if total_assigned > 0: 

571 logger.info(f"Successfully assigned {total_assigned} orphaned resources to admin team") 

572 else: 

573 logger.info("No orphaned resources found - all resources have team assignments") 

574 

575 except Exception as e: 

576 logger.error(f"Failed to bootstrap resource assignments: {e}") 

577 

578 

579async def main() -> None: 

580 """ 

581 Bootstrap or upgrade the database schema, then log readiness. 

582 

583 Runs `create_all()` + `alembic stamp head` on an empty DB, otherwise just 

584 executes `alembic upgrade head`, leaving application data intact. 

585 Also creates the platform admin user if email authentication is enabled. 

586 

587 Uses distributed advisory locks (PG/MySQL) or file locking (SQLite) 

588 to prevent race conditions when multiple workers start simultaneously. 

589 

590 Args: 

591 None 

592 

593 Raises: 

594 Exception: If migration or bootstrap fails 

595 """ 

596 engine = create_engine(settings.database_url) 

597 ini_path = files("mcpgateway").joinpath("alembic.ini") 

598 cfg = Config(str(ini_path)) # path in container 

599 cfg.attributes["configure_logger"] = True 

600 

601 # Use advisory lock to prevent concurrent migrations 

602 try: 

603 with engine.connect() as conn: 

604 # Commit any open transaction on the connection before locking (though it should be fresh) 

605 conn.commit() 

606 

607 with advisory_lock(conn): 

608 logger.info("Acquired migration lock, checking database schema...") 

609 

610 # Pass the LOCKED connection to Alembic config 

611 cfg.attributes["connection"] = conn 

612 

613 # Escape '%' characters in URL to avoid configparser interpolation errors 

614 # (e.g., URL-encoded passwords like %40 for '@') 

615 escaped_url = settings.database_url.replace("%", "%%") 

616 cfg.set_main_option("sqlalchemy.url", escaped_url) 

617 

618 insp = inspect(conn) 

619 table_names = insp.get_table_names() 

620 

621 if "gateways" not in table_names: 

622 logger.info("Empty DB detected - creating baseline schema") 

623 # Apply MariaDB compatibility fixes if needed 

624 if settings.database_url.startswith(("mariadb", "mysql")): 

625 # pylint: disable=import-outside-toplevel 

626 # First-Party 

627 from mcpgateway.alembic.env import _modify_metadata_for_mariadb, mariadb_naming_convention 

628 

629 _modify_metadata_for_mariadb() 

630 Base.metadata.naming_convention = mariadb_naming_convention 

631 logger.info("Applied MariaDB compatibility modifications") 

632 

633 Base.metadata.create_all(bind=conn) 

634 command.stamp(cfg, "head") 

635 else: 

636 versions: list[str] = [] 

637 if "alembic_version" in table_names: 

638 try: 

639 rows = conn.execute(text("SELECT version_num FROM alembic_version")).fetchall() 

640 versions = [row[0] for row in rows if row[0]] 

641 except Exception as exc: 

642 logger.warning("Failed to read alembic_version table: %s", exc) 

643 

644 if not versions and _schema_looks_current(insp): 

645 logger.warning("Existing database has no Alembic revision rows; stamping head to avoid reapplying migrations") 

646 command.stamp(cfg, "head") 

647 else: 

648 logger.info("Running Alembic migrations to ensure schema is up to date") 

649 command.upgrade(cfg, "head") 

650 

651 # Post-upgrade normalization passes (inside lock to be safe) 

652 updated = normalize_team_visibility(conn) 

653 if updated: 

654 logger.info(f"Normalized {updated} team record(s) to supported visibility values") 

655 

656 # Bootstrap admin user after database is ready, using the LOCKED connection 

657 await bootstrap_admin_user(conn) 

658 

659 # Bootstrap default RBAC roles after admin user is created 

660 await bootstrap_default_roles(conn) 

661 

662 # Assign orphaned resources to admin personal team after all setup is complete 

663 await bootstrap_resource_assignments(conn) 

664 

665 conn.commit() # Ensure all migration changes are permanently committed 

666 

667 except Exception as e: 

668 logger.error(f"Migration/Bootstrap failed: {e}") 

669 # Allow retry logic or container restart to handle transient issues 

670 raise 

671 finally: 

672 # Dispose the engine to close all connections in the pool 

673 engine.dispose() 

674 

675 logger.info("Database ready") 

676 

677 

678if __name__ == "__main__": 

679 asyncio.run(main())