Coverage for mcpgateway / bootstrap_db.py: 100%

259 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 07:10 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/bootstrap_db.py 

3Copyright 2025 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Madhav Kandukuri 

6 

7Database bootstrap/upgrade entry-point for MCP Gateway. 

8The script: 

9 

101. Creates a synchronous SQLAlchemy ``Engine`` from ``settings.database_url``. 

112. Looks for an *alembic.ini* two levels up from this file to drive migrations. 

123. Applies Alembic migrations (``alembic upgrade head``) to create or update the schema. 

134. Runs post-upgrade normalization tasks and bootstraps admin/roles as configured. 

145. Logs a **"Database ready"** message on success. 

15 

16It is intended to be invoked via ``python3 -m mcpgateway.bootstrap_db`` or 

17directly with ``python3 mcpgateway/bootstrap_db.py``. 

18 

19Examples: 

20 >>> from mcpgateway.bootstrap_db import logging_service, logger 

21 >>> logging_service is not None 

22 True 

23 >>> logger is not None 

24 True 

25 >>> hasattr(logger, 'info') 

26 True 

27 >>> from mcpgateway.bootstrap_db import Base 

28 >>> hasattr(Base, 'metadata') 

29 True 

30""" 

31 

32# Standard 

33import asyncio 

34from contextlib import contextmanager 

35from importlib.resources import files 

36import json 

37import os 

38from pathlib import Path 

39import tempfile 

40from typing import cast 

41 

42# Third-Party 

43from alembic import command 

44from alembic.config import Config 

45from filelock import FileLock 

46from sqlalchemy import create_engine, inspect, text 

47from sqlalchemy.engine import Connection 

48from sqlalchemy.orm import Session 

49 

50# First-Party 

51from mcpgateway.config import settings 

52from mcpgateway.db import A2AAgent, Base, EmailTeam, EmailUser, Gateway, Prompt, Resource, Server, Tool 

53from mcpgateway.services.logging_service import LoggingService 

54 

55# Migration lock to prevent concurrent migrations from multiple workers 

56_MIGRATION_LOCK_PATH = os.path.join(tempfile.gettempdir(), "mcpgateway_migration.lock") 

57_MIGRATION_LOCK_TIMEOUT = 300 # seconds to wait for lock (5 minutes for slow migrations) 

58 

59# Initialize logging service first 

60logging_service = LoggingService() 

61logger = logging_service.get_logger(__name__) 

62 

63 

64def _column_exists(inspector, table_name: str, column_name: str) -> bool: 

65 """Check whether a table has a specific column. 

66 

67 Args: 

68 inspector: SQLAlchemy inspector for the active connection. 

69 table_name: Table name to inspect. 

70 column_name: Column name to check. 

71 

72 Returns: 

73 True if the column exists, otherwise False. 

74 """ 

75 try: 

76 return any(col["name"] == column_name for col in inspector.get_columns(table_name)) 

77 except Exception: 

78 return False 

79 

80 

81def _schema_looks_current(inspector) -> bool: 

82 """Best-effort check for unversioned databases that already match current schema. 

83 

84 Args: 

85 inspector: SQLAlchemy inspector for the active connection. 

86 

87 Returns: 

88 True when expected columns exist for a recent schema version. 

89 """ 

90 return _column_exists(inspector, "tools", "display_name") and _column_exists(inspector, "gateways", "oauth_config") and _column_exists(inspector, "prompts", "custom_name") 

91 

92 

93@contextmanager 

94def advisory_lock(conn: Connection): 

95 """ 

96 Acquire a distributed advisory lock to serialize migrations across multiple instances. 

97 

98 Behavior depends on the database backend: 

99 - Postgres: Uses `pg_advisory_lock` (blocking) 

100 - MySQL: Uses `GET_LOCK` (blocking with timeout) 

101 - SQLite: Fallback to local `FileLock` 

102 

103 Args: 

104 conn: Active SQLAlchemy connection 

105 

106 Yields: 

107 None 

108 

109 Raises: 

110 TimeoutError: If the lock cannot be acquired within the timeout period 

111 """ 

112 dialect = conn.dialect.name 

113 lock_id = "mcpgateway_migration" 

114 # Postgres requires a BIGINT lock ID (arbitrary hash of the string) 

115 pg_lock_id = 42424242424242 

116 

117 if dialect == "postgresql": 

118 logger.info("Acquiring Postgres advisory lock...") 

119 conn.execute(text(f"SELECT pg_advisory_lock({pg_lock_id})")) 

120 try: 

121 yield 

122 finally: 

123 logger.info("Releasing Postgres advisory lock...") 

124 conn.execute(text(f"SELECT pg_advisory_unlock({pg_lock_id})")) 

125 

126 elif dialect in ["mysql", "mariadb"]: 

127 logger.info("Acquiring MySQL advisory lock...") 

128 # GET_LOCK returns 1 if successful, 0 if timed out, NULL on error 

129 result = conn.execute(text(f"SELECT GET_LOCK('{lock_id}', {_MIGRATION_LOCK_TIMEOUT})")).scalar() 

130 if result != 1: 

131 raise TimeoutError(f"Could not acquire MySQL lock '{lock_id}' within {_MIGRATION_LOCK_TIMEOUT}s") 

132 try: 

133 yield 

134 finally: 

135 logger.info("Releasing MySQL advisory lock...") 

136 conn.execute(text(f"SELECT RELEASE_LOCK('{lock_id}')")) 

137 

138 else: 

139 # Fallback for SQLite (single-host/container) or other DBs 

140 logger.info(f"Using FileLock fallback for {dialect}...") 

141 file_lock = FileLock(_MIGRATION_LOCK_PATH, timeout=_MIGRATION_LOCK_TIMEOUT) 

142 with file_lock: 

143 yield 

144 

145 

146async def bootstrap_admin_user(conn: Connection) -> None: 

147 """ 

148 Bootstrap the platform admin user from environment variables. 

149 

150 Creates the admin user if email authentication is enabled and the user doesn't exist. 

151 Also creates a personal team for the admin user if auto-creation is enabled. 

152 

153 Args: 

154 conn: Active SQLAlchemy connection 

155 """ 

156 if not settings.email_auth_enabled: 

157 logger.info("Email authentication disabled - skipping admin user bootstrap") 

158 return 

159 

160 try: 

161 # Import services here to avoid circular imports 

162 # First-Party 

163 from mcpgateway.services.email_auth_service import EmailAuthService # pylint: disable=import-outside-toplevel 

164 

165 # Use session bound to the locked connection 

166 with Session(bind=conn) as db: 

167 auth_service = EmailAuthService(db) 

168 

169 # Check if admin user already exists 

170 existing_user = await auth_service.get_user_by_email(settings.platform_admin_email) 

171 if existing_user: 

172 logger.info(f"Admin user {settings.platform_admin_email} already exists - skipping creation") 

173 return 

174 

175 # Create admin user 

176 logger.info(f"Creating platform admin user: {settings.platform_admin_email}") 

177 admin_user = await auth_service.create_platform_admin( 

178 email=settings.platform_admin_email, 

179 password=settings.platform_admin_password.get_secret_value(), 

180 full_name=settings.platform_admin_full_name, 

181 ) 

182 

183 # Mark admin user as email verified and require password change on first login 

184 # First-Party 

185 from mcpgateway.db import utc_now # pylint: disable=import-outside-toplevel 

186 

187 admin_user.email_verified_at = utc_now() 

188 # Respect configuration: only require password change on bootstrap when enabled 

189 if getattr(settings, "password_change_enforcement_enabled", True) and getattr(settings, "admin_require_password_change_on_bootstrap", True): 

190 admin_user.password_change_required = True # Force admin to change default password 

191 try: 

192 admin_user.password_changed_at = utc_now() 

193 except Exception as exc: 

194 logger.debug("Failed to set admin password_changed_at: %s", exc) 

195 db.commit() 

196 

197 # Personal team is automatically created during user creation if enabled 

198 if settings.auto_create_personal_teams: 

199 logger.info("Personal team automatically created for admin user") 

200 

201 db.commit() 

202 logger.info(f"Platform admin user created successfully: {settings.platform_admin_email}") 

203 

204 except Exception as e: 

205 logger.error(f"Failed to bootstrap admin user: {e}") 

206 # Don't fail the entire bootstrap process if admin user creation fails 

207 return 

208 

209 

210async def bootstrap_default_roles(conn: Connection) -> None: 

211 """Bootstrap default system roles and assign them to admin user. 

212 

213 Creates essential RBAC roles and assigns administrative privileges 

214 to the platform admin user. 

215 

216 Args: 

217 conn: Active SQLAlchemy connection 

218 """ 

219 if not settings.email_auth_enabled: 

220 logger.info("Email authentication disabled - skipping default roles bootstrap") 

221 return 

222 

223 try: 

224 # First-Party 

225 from mcpgateway.services.email_auth_service import EmailAuthService # pylint: disable=import-outside-toplevel 

226 from mcpgateway.services.role_service import RoleService # pylint: disable=import-outside-toplevel 

227 

228 # Use session bound to the locked connection 

229 with Session(bind=conn) as db: 

230 role_service = RoleService(db) 

231 auth_service = EmailAuthService(db) 

232 

233 # Check if admin user exists 

234 admin_user = await auth_service.get_user_by_email(settings.platform_admin_email) 

235 if not admin_user: 

236 logger.info("Admin user not found - skipping role assignment") 

237 return 

238 

239 # Default system roles to create 

240 default_roles = [ 

241 {"name": "platform_admin", "description": "Platform administrator with all permissions", "scope": "global", "permissions": ["*"], "is_system_role": True}, # All permissions 

242 { 

243 "name": "team_admin", 

244 "description": "Team administrator with team management permissions", 

245 "scope": "team", 

246 "permissions": [ 

247 "admin.dashboard", 

248 "gateways.read", 

249 "servers.read", 

250 "teams.read", 

251 "teams.update", 

252 "teams.join", 

253 "teams.delete", 

254 "teams.manage_members", 

255 "tools.read", 

256 "tools.execute", 

257 "resources.read", 

258 "prompts.read", 

259 "a2a.read", 

260 "gateways.create", 

261 "servers.create", 

262 "tools.create", 

263 "resources.create", 

264 "prompts.create", 

265 "a2a.create", 

266 "gateways.update", 

267 "servers.update", 

268 "tools.update", 

269 "resources.update", 

270 "prompts.update", 

271 "a2a.update", 

272 "gateways.delete", 

273 "servers.delete", 

274 "tools.delete", 

275 "resources.delete", 

276 "prompts.delete", 

277 "a2a.delete", 

278 "a2a.invoke", 

279 ], 

280 "is_system_role": True, 

281 }, 

282 { 

283 "name": "developer", 

284 "description": "Developer with tool and resource access", 

285 "scope": "team", 

286 "permissions": [ 

287 "admin.dashboard", 

288 "gateways.read", 

289 "servers.read", 

290 "teams.join", 

291 "tools.read", 

292 "tools.execute", 

293 "resources.read", 

294 "prompts.read", 

295 "a2a.read", 

296 "gateways.create", 

297 "servers.create", 

298 "tools.create", 

299 "resources.create", 

300 "prompts.create", 

301 "a2a.create", 

302 "gateways.update", 

303 "servers.update", 

304 "tools.update", 

305 "resources.update", 

306 "prompts.update", 

307 "a2a.update", 

308 "gateways.delete", 

309 "servers.delete", 

310 "tools.delete", 

311 "resources.delete", 

312 "prompts.delete", 

313 "a2a.delete", 

314 "a2a.invoke", 

315 ], 

316 "is_system_role": True, 

317 }, 

318 { 

319 "name": "viewer", 

320 "description": "Read-only access to resources and admin UI", 

321 "scope": "team", 

322 "permissions": ["admin.dashboard", "gateways.read", "servers.read", "teams.join", "tools.read", "resources.read", "prompts.read", "a2a.read"], 

323 "is_system_role": True, 

324 }, 

325 { 

326 "name": "platform_viewer", 

327 "description": "Read-only access to resources and admin UI", 

328 "scope": "global", 

329 "permissions": ["admin.dashboard", "gateways.read", "servers.read", "teams.join", "tools.read", "resources.read", "prompts.read", "a2a.read"], 

330 "is_system_role": True, 

331 }, 

332 ] 

333 

334 # Logic to add additional default roles from a json file 

335 if settings.mcpgateway_bootstrap_roles_in_db_enabled: 

336 try: 

337 additional_default_roles_path = Path(settings.mcpgateway_bootstrap_roles_in_db_file) 

338 # Try multiple locations for the mcpgateway_bootstrap_roles_in_db_file file 

339 if not additional_default_roles_path.is_absolute(): 

340 # Try current directory first 

341 if not additional_default_roles_path.exists(): 

342 # Try project root (mcpgateway/bootstrap_db.py -> parent.parent = repo root) 

343 additional_default_roles_path = Path(__file__).resolve().parent.parent / settings.mcpgateway_bootstrap_roles_in_db_file 

344 

345 if not additional_default_roles_path.exists(): 

346 logger.warning(f"Additional roles file not found. Searched: CWD/{settings.mcpgateway_bootstrap_roles_in_db_file}, {additional_default_roles_path}") 

347 else: 

348 with open(additional_default_roles_path, "r", encoding="utf-8") as f: 

349 additional_default_roles_data = json.load(f) 

350 

351 # Validate JSON structure: must be a list of dicts with required keys 

352 required_keys = {"name", "scope", "permissions"} 

353 if not isinstance(additional_default_roles_data, list): 

354 logger.error(f"Additional roles file must contain a JSON array, got {type(additional_default_roles_data).__name__}") 

355 else: 

356 valid_roles = [] 

357 for idx, role in enumerate(additional_default_roles_data): 

358 if not isinstance(role, dict): 

359 logger.warning(f"Skipping invalid role at index {idx}: expected dict, got {type(role).__name__}") 

360 continue 

361 missing_keys = required_keys - set(role.keys()) 

362 if missing_keys: 

363 role_name = role.get("name", f"<index {idx}>") 

364 logger.warning(f"Skipping role '{role_name}': missing required keys {missing_keys}") 

365 continue 

366 valid_roles.append(role) 

367 

368 if valid_roles: 

369 default_roles.extend(valid_roles) 

370 logger.info(f"Added {len(valid_roles)} additional roles to default roles in bootstrap db") 

371 elif additional_default_roles_data: 

372 logger.warning("No valid roles found in additional roles file") 

373 except Exception as e: 

374 logger.error(f"Failed to load mcpgateway_bootstrap_roles_in_db_file: {e}") 

375 

376 # Create default roles 

377 created_roles = [] 

378 for role_def in default_roles: 

379 try: 

380 # Check if role already exists 

381 existing_role = await role_service.get_role_by_name(str(role_def["name"]), str(role_def["scope"])) 

382 if existing_role: 

383 logger.info(f"System role {role_def['name']} already exists - skipping") 

384 created_roles.append(existing_role) 

385 continue 

386 

387 # Create the role (description and is_system_role are optional) 

388 role = await role_service.create_role( 

389 name=str(role_def["name"]), 

390 description=str(role_def.get("description", "")), 

391 scope=str(role_def["scope"]), 

392 permissions=cast(list[str], role_def["permissions"]), 

393 created_by=settings.platform_admin_email, 

394 is_system_role=bool(role_def.get("is_system_role", False)), 

395 ) 

396 created_roles.append(role) 

397 logger.info(f"Created system role: {role.name}") 

398 

399 except Exception as e: 

400 logger.error(f"Failed to create role {role_def['name']}: {e}") 

401 continue 

402 

403 # Assign platform_admin role to admin user 

404 platform_admin_role = next((r for r in created_roles if r.name == "platform_admin"), None) 

405 if platform_admin_role: 

406 try: 

407 # Check if assignment already exists 

408 existing_assignment = await role_service.get_user_role_assignment(user_email=admin_user.email, role_id=platform_admin_role.id, scope="global", scope_id=None) 

409 

410 if not existing_assignment or not existing_assignment.is_active: 

411 await role_service.assign_role_to_user(user_email=admin_user.email, role_id=platform_admin_role.id, scope="global", scope_id=None, granted_by=admin_user.email) 

412 logger.info(f"Assigned platform_admin role to {admin_user.email}") 

413 else: 

414 logger.info("Admin user already has platform_admin role") 

415 

416 except Exception as e: 

417 logger.error(f"Failed to assign platform_admin role: {e}") 

418 

419 logger.info("Default RBAC roles bootstrap completed successfully") 

420 

421 except Exception as e: 

422 logger.error(f"Failed to bootstrap default roles: {e}") 

423 # Don't fail the entire bootstrap process if role creation fails 

424 return 

425 

426 

427def normalize_team_visibility(conn: Connection) -> int: 

428 """Normalize team visibility values to the supported set {private, public}. 

429 

430 Any team with an unsupported visibility (e.g., 'team') is set to 'private'. 

431 

432 Args: 

433 conn: Active SQLAlchemy connection 

434 

435 Returns: 

436 int: Number of teams updated 

437 """ 

438 try: 

439 # Use session bound to the locked connection 

440 with Session(bind=conn) as db: 

441 # Find teams with invalid visibility 

442 invalid = db.query(EmailTeam).filter(EmailTeam.visibility.notin_(["private", "public"])) 

443 count = 0 

444 for team in invalid.all(): 

445 old = team.visibility 

446 team.visibility = "private" 

447 count += 1 

448 logger.info(f"Normalized team visibility: id={team.id} {old} -> private") 

449 if count: 

450 db.commit() 

451 return count 

452 except Exception as e: 

453 logger.error(f"Failed to normalize team visibility: {e}") 

454 return 0 

455 

456 

457async def bootstrap_resource_assignments(conn: Connection) -> None: 

458 """Assign orphaned resources to the platform admin's personal team. 

459 

460 This ensures existing resources (from pre-multitenancy versions) are 

461 visible in the new team-based UI by assigning them to the admin's 

462 personal team with public visibility. 

463 

464 Args: 

465 conn: Active SQLAlchemy connection 

466 """ 

467 if not settings.email_auth_enabled: 

468 logger.info("Email authentication disabled - skipping resource assignment") 

469 return 

470 

471 try: 

472 # Use session bound to the locked connection 

473 with Session(bind=conn) as db: 

474 # Find admin user and their personal team 

475 admin_user = db.query(EmailUser).filter(EmailUser.email == settings.platform_admin_email, EmailUser.is_admin.is_(True)).first() 

476 

477 if not admin_user: 

478 logger.warning("Admin user not found - skipping resource assignment") 

479 return 

480 

481 personal_team = admin_user.get_personal_team() 

482 if not personal_team: 

483 logger.warning("Admin personal team not found - skipping resource assignment") 

484 return 

485 

486 logger.info(f"Assigning orphaned resources to admin team: {personal_team.name}") 

487 

488 # Resource types to process 

489 resource_types = [("servers", Server), ("tools", Tool), ("resources", Resource), ("prompts", Prompt), ("gateways", Gateway), ("a2a_agents", A2AAgent)] 

490 

491 total_assigned = 0 

492 

493 for resource_name, resource_model in resource_types: 

494 try: 

495 # Find unassigned resources 

496 unassigned = db.query(resource_model).filter((resource_model.team_id.is_(None)) | (resource_model.owner_email.is_(None)) | (resource_model.visibility.is_(None))).all() 

497 

498 if unassigned: 

499 logger.info(f"Assigning {len(unassigned)} orphaned {resource_name} to admin team") 

500 

501 for resource in unassigned: 

502 resource.team_id = personal_team.id 

503 resource.owner_email = admin_user.email 

504 resource.visibility = "public" # Make visible to all users 

505 if hasattr(resource, "federation_source") and not resource.federation_source: 

506 resource.federation_source = "mcpgateway-0.7.0-migration" 

507 

508 db.commit() 

509 total_assigned += len(unassigned) 

510 

511 except Exception as e: 

512 logger.error(f"Failed to assign {resource_name}: {e}") 

513 continue 

514 

515 if total_assigned > 0: 

516 logger.info(f"Successfully assigned {total_assigned} orphaned resources to admin team") 

517 else: 

518 logger.info("No orphaned resources found - all resources have team assignments") 

519 

520 except Exception as e: 

521 logger.error(f"Failed to bootstrap resource assignments: {e}") 

522 

523 

524async def main() -> None: 

525 """ 

526 Bootstrap or upgrade the database schema, then log readiness. 

527 

528 Runs `create_all()` + `alembic stamp head` on an empty DB, otherwise just 

529 executes `alembic upgrade head`, leaving application data intact. 

530 Also creates the platform admin user if email authentication is enabled. 

531 

532 Uses distributed advisory locks (PG/MySQL) or file locking (SQLite) 

533 to prevent race conditions when multiple workers start simultaneously. 

534 

535 Args: 

536 None 

537 

538 Raises: 

539 Exception: If migration or bootstrap fails 

540 """ 

541 engine = create_engine(settings.database_url) 

542 ini_path = files("mcpgateway").joinpath("alembic.ini") 

543 cfg = Config(str(ini_path)) # path in container 

544 cfg.attributes["configure_logger"] = True 

545 

546 # Use advisory lock to prevent concurrent migrations 

547 try: 

548 with engine.connect() as conn: 

549 # Commit any open transaction on the connection before locking (though it should be fresh) 

550 conn.commit() 

551 

552 with advisory_lock(conn): 

553 logger.info("Acquired migration lock, checking database schema...") 

554 

555 # Pass the LOCKED connection to Alembic config 

556 cfg.attributes["connection"] = conn 

557 

558 # Escape '%' characters in URL to avoid configparser interpolation errors 

559 # (e.g., URL-encoded passwords like %40 for '@') 

560 escaped_url = settings.database_url.replace("%", "%%") 

561 cfg.set_main_option("sqlalchemy.url", escaped_url) 

562 

563 insp = inspect(conn) 

564 table_names = insp.get_table_names() 

565 

566 if "gateways" not in table_names: 

567 logger.info("Empty DB detected - creating baseline schema") 

568 # Apply MariaDB compatibility fixes if needed 

569 if settings.database_url.startswith(("mariadb", "mysql")): 

570 # pylint: disable=import-outside-toplevel 

571 # First-Party 

572 from mcpgateway.alembic.env import _modify_metadata_for_mariadb, mariadb_naming_convention 

573 

574 _modify_metadata_for_mariadb() 

575 Base.metadata.naming_convention = mariadb_naming_convention 

576 logger.info("Applied MariaDB compatibility modifications") 

577 

578 Base.metadata.create_all(bind=conn) 

579 command.stamp(cfg, "head") 

580 else: 

581 versions: list[str] = [] 

582 if "alembic_version" in table_names: 

583 try: 

584 rows = conn.execute(text("SELECT version_num FROM alembic_version")).fetchall() 

585 versions = [row[0] for row in rows if row[0]] 

586 except Exception as exc: 

587 logger.warning("Failed to read alembic_version table: %s", exc) 

588 

589 if not versions and _schema_looks_current(insp): 

590 logger.warning("Existing database has no Alembic revision rows; stamping head to avoid reapplying migrations") 

591 command.stamp(cfg, "head") 

592 else: 

593 logger.info("Running Alembic migrations to ensure schema is up to date") 

594 command.upgrade(cfg, "head") 

595 

596 # Post-upgrade normalization passes (inside lock to be safe) 

597 updated = normalize_team_visibility(conn) 

598 if updated: 

599 logger.info(f"Normalized {updated} team record(s) to supported visibility values") 

600 

601 # Bootstrap admin user after database is ready, using the LOCKED connection 

602 await bootstrap_admin_user(conn) 

603 

604 # Bootstrap default RBAC roles after admin user is created 

605 await bootstrap_default_roles(conn) 

606 

607 # Assign orphaned resources to admin personal team after all setup is complete 

608 await bootstrap_resource_assignments(conn) 

609 

610 conn.commit() # Ensure all migration changes are permanently committed 

611 

612 except Exception as e: 

613 logger.error(f"Migration/Bootstrap failed: {e}") 

614 # Allow retry logic or container restart to handle transient issues 

615 raise 

616 finally: 

617 # Dispose the engine to close all connections in the pool 

618 engine.dispose() 

619 

620 logger.info("Database ready") 

621 

622 

623if __name__ == "__main__": 

624 asyncio.run(main())