Coverage for mcpgateway / bootstrap_db.py: 100%
262 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/bootstrap_db.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Madhav Kandukuri
7Database bootstrap/upgrade entry-point for ContextForge.
8The script:
101. Creates a synchronous SQLAlchemy ``Engine`` from ``settings.database_url``.
112. Looks for an *alembic.ini* two levels up from this file to drive migrations.
123. Applies Alembic migrations (``alembic upgrade head``) to create or update the schema.
134. Runs post-upgrade normalization tasks and bootstraps admin/roles as configured.
145. Logs a **"Database ready"** message on success.
16It is intended to be invoked via ``python3 -m mcpgateway.bootstrap_db`` or
17directly with ``python3 mcpgateway/bootstrap_db.py``.
19Examples:
20 >>> from mcpgateway.bootstrap_db import logging_service, logger
21 >>> logging_service is not None
22 True
23 >>> logger is not None
24 True
25 >>> hasattr(logger, 'info')
26 True
27 >>> from mcpgateway.bootstrap_db import Base
28 >>> hasattr(Base, 'metadata')
29 True
30"""
32# Standard
33import asyncio
34from contextlib import contextmanager
35from importlib.resources import files
36import json
37import os
38from pathlib import Path
39import tempfile
40from typing import cast
42# Third-Party
43from alembic import command
44from alembic.config import Config
45from filelock import FileLock
46from sqlalchemy import create_engine, inspect, text
47from sqlalchemy.engine import Connection
48from sqlalchemy.orm import Session
50# First-Party
51from mcpgateway.config import settings
52from mcpgateway.db import A2AAgent, Base, EmailTeam, EmailUser, Gateway, Prompt, Resource, Server, Tool
53from mcpgateway.services.logging_service import LoggingService
55# Migration lock to prevent concurrent migrations from multiple workers
56_MIGRATION_LOCK_PATH = os.path.join(tempfile.gettempdir(), "mcpgateway_migration.lock")
57_MIGRATION_LOCK_TIMEOUT = 300 # seconds to wait for lock (5 minutes for slow migrations)
59# Initialize logging service first
60logging_service = LoggingService()
61logger = logging_service.get_logger(__name__)
64def _column_exists(inspector, table_name: str, column_name: str) -> bool:
65 """Check whether a table has a specific column.
67 Args:
68 inspector: SQLAlchemy inspector for the active connection.
69 table_name: Table name to inspect.
70 column_name: Column name to check.
72 Returns:
73 True if the column exists, otherwise False.
74 """
75 try:
76 return any(col["name"] == column_name for col in inspector.get_columns(table_name))
77 except Exception:
78 return False
81def _schema_looks_current(inspector) -> bool:
82 """Best-effort check for unversioned databases that already match current schema.
84 Args:
85 inspector: SQLAlchemy inspector for the active connection.
87 Returns:
88 True when expected columns exist for a recent schema version.
89 """
90 return (
91 _column_exists(inspector, "tools", "display_name")
92 and _column_exists(inspector, "gateways", "oauth_config")
93 and _column_exists(inspector, "prompts", "custom_name")
94 and _column_exists(inspector, "sso_providers", "jwks_uri")
95 )
98@contextmanager
99def advisory_lock(conn: Connection):
100 """
101 Acquire a distributed advisory lock to serialize migrations across multiple instances.
103 Behavior depends on the database backend:
104 - Postgres: Uses `pg_advisory_lock` (blocking)
105 - MySQL: Uses `GET_LOCK` (blocking with timeout)
106 - SQLite: Fallback to local `FileLock`
108 Args:
109 conn: Active SQLAlchemy connection
111 Yields:
112 None
114 Raises:
115 TimeoutError: If the lock cannot be acquired within the timeout period
116 """
117 dialect = conn.dialect.name
118 lock_id = "mcpgateway_migration"
119 # Postgres requires a BIGINT lock ID (arbitrary hash of the string)
120 pg_lock_id = 42424242424242
122 if dialect == "postgresql":
123 logger.info("Acquiring Postgres advisory lock...")
124 conn.execute(text(f"SELECT pg_advisory_lock({pg_lock_id})"))
125 try:
126 yield
127 finally:
128 logger.info("Releasing Postgres advisory lock...")
129 conn.execute(text(f"SELECT pg_advisory_unlock({pg_lock_id})"))
131 elif dialect in ["mysql", "mariadb"]:
132 logger.info("Acquiring MySQL advisory lock...")
133 # GET_LOCK returns 1 if successful, 0 if timed out, NULL on error
134 result = conn.execute(text(f"SELECT GET_LOCK('{lock_id}', {_MIGRATION_LOCK_TIMEOUT})")).scalar()
135 if result != 1:
136 raise TimeoutError(f"Could not acquire MySQL lock '{lock_id}' within {_MIGRATION_LOCK_TIMEOUT}s")
137 try:
138 yield
139 finally:
140 logger.info("Releasing MySQL advisory lock...")
141 conn.execute(text(f"SELECT RELEASE_LOCK('{lock_id}')"))
143 else:
144 # Fallback for SQLite (single-host/container) or other DBs
145 logger.info(f"Using FileLock fallback for {dialect}...")
146 file_lock = FileLock(_MIGRATION_LOCK_PATH, timeout=_MIGRATION_LOCK_TIMEOUT)
147 with file_lock:
148 yield
151async def bootstrap_admin_user(conn: Connection) -> None:
152 """
153 Bootstrap the platform admin user from environment variables.
155 Creates the admin user if email authentication is enabled and the user doesn't exist.
156 Also creates a personal team for the admin user if auto-creation is enabled.
158 Args:
159 conn: Active SQLAlchemy connection
160 """
161 if not settings.email_auth_enabled:
162 logger.info("Email authentication disabled - skipping admin user bootstrap")
163 return
165 try:
166 # Import services here to avoid circular imports
167 # First-Party
168 from mcpgateway.services.email_auth_service import EmailAuthService # pylint: disable=import-outside-toplevel
170 # Use session bound to the locked connection
171 with Session(bind=conn) as db:
172 auth_service = EmailAuthService(db)
174 # Check if admin user already exists
175 existing_user = await auth_service.get_user_by_email(settings.platform_admin_email)
176 if existing_user:
177 logger.info(f"Admin user {settings.platform_admin_email} already exists - skipping creation")
178 return
180 # Create admin user
181 logger.info(f"Creating platform admin user: {settings.platform_admin_email}")
182 admin_user = await auth_service.create_platform_admin(
183 email=settings.platform_admin_email,
184 password=settings.platform_admin_password.get_secret_value(),
185 full_name=settings.platform_admin_full_name,
186 )
188 # Mark admin user as email verified and require password change on first login
189 # First-Party
190 from mcpgateway.db import utc_now # pylint: disable=import-outside-toplevel
192 admin_user.email_verified_at = utc_now()
193 # Respect configuration: only require password change on bootstrap when enabled
194 if getattr(settings, "password_change_enforcement_enabled", True) and getattr(settings, "admin_require_password_change_on_bootstrap", True):
195 admin_user.password_change_required = True # Force admin to change default password
196 try:
197 admin_user.password_changed_at = utc_now()
198 except Exception as exc:
199 logger.debug("Failed to set admin password_changed_at: %s", exc)
200 db.commit()
202 # Personal team is automatically created during user creation if enabled
203 if settings.auto_create_personal_teams:
204 logger.info("Personal team automatically created for admin user")
206 db.commit()
207 logger.info(f"Platform admin user created successfully: {settings.platform_admin_email}")
209 except Exception as e:
210 logger.error(f"Failed to bootstrap admin user: {e}")
211 # Don't fail the entire bootstrap process if admin user creation fails
212 return
215async def bootstrap_default_roles(conn: Connection) -> None:
216 """Bootstrap default system roles and assign them to admin user.
218 Creates essential RBAC roles and assigns administrative privileges
219 to the platform admin user.
221 Args:
222 conn: Active SQLAlchemy connection
223 """
224 if not settings.email_auth_enabled:
225 logger.info("Email authentication disabled - skipping default roles bootstrap")
226 return
228 try:
229 # First-Party
230 from mcpgateway.services.email_auth_service import EmailAuthService # pylint: disable=import-outside-toplevel
231 from mcpgateway.services.role_service import RoleService # pylint: disable=import-outside-toplevel
233 # Use session bound to the locked connection
234 with Session(bind=conn) as db:
235 role_service = RoleService(db)
236 auth_service = EmailAuthService(db)
238 # Check if admin user exists
239 admin_user = await auth_service.get_user_by_email(settings.platform_admin_email)
240 if not admin_user:
241 logger.info("Admin user not found - skipping role assignment")
242 return
244 # Default system roles to create
245 default_roles = [
246 {"name": "platform_admin", "description": "Platform administrator with all permissions", "scope": "global", "permissions": ["*"], "is_system_role": True}, # All permissions
247 {
248 "name": "team_admin",
249 "description": "Team administrator with team management permissions",
250 "scope": "team",
251 "permissions": [
252 "admin.dashboard",
253 "gateways.read",
254 "servers.read",
255 "servers.use",
256 "teams.read",
257 "teams.update",
258 "teams.join",
259 "teams.delete",
260 "teams.manage_members",
261 "tools.read",
262 "tools.execute",
263 "resources.read",
264 "prompts.read",
265 "llm.read",
266 "llm.invoke",
267 "a2a.read",
268 "gateways.create",
269 "servers.create",
270 "tools.create",
271 "resources.create",
272 "prompts.create",
273 "a2a.create",
274 "gateways.update",
275 "servers.update",
276 "tools.update",
277 "resources.update",
278 "prompts.update",
279 "a2a.update",
280 "gateways.delete",
281 "servers.delete",
282 "tools.delete",
283 "resources.delete",
284 "prompts.delete",
285 "a2a.delete",
286 "a2a.invoke",
287 "tokens.create",
288 "tokens.read",
289 "tokens.update",
290 "tokens.revoke",
291 ],
292 "is_system_role": True,
293 },
294 {
295 "name": "developer",
296 "description": "Developer with tool and resource access",
297 "scope": "team",
298 "permissions": [
299 "admin.dashboard",
300 "gateways.read",
301 "servers.read",
302 "servers.use",
303 "teams.read",
304 "teams.join",
305 "tools.read",
306 "tools.execute",
307 "resources.read",
308 "prompts.read",
309 "llm.read",
310 "llm.invoke",
311 "a2a.read",
312 "gateways.create",
313 "servers.create",
314 "tools.create",
315 "resources.create",
316 "prompts.create",
317 "a2a.create",
318 "gateways.update",
319 "servers.update",
320 "tools.update",
321 "resources.update",
322 "prompts.update",
323 "a2a.update",
324 "gateways.delete",
325 "servers.delete",
326 "tools.delete",
327 "resources.delete",
328 "prompts.delete",
329 "a2a.delete",
330 "a2a.invoke",
331 "tokens.create",
332 "tokens.read",
333 "tokens.update",
334 "tokens.revoke",
335 ],
336 "is_system_role": True,
337 },
338 {
339 "name": "viewer",
340 "description": "Read-only access to resources and admin UI",
341 "scope": "team",
342 "permissions": [
343 "admin.dashboard",
344 "gateways.read",
345 "servers.read",
346 "teams.read",
347 "teams.join",
348 "tools.read",
349 "resources.read",
350 "prompts.read",
351 "llm.read",
352 "a2a.read",
353 "tokens.create",
354 "tokens.read",
355 "tokens.update",
356 "tokens.revoke",
357 ],
358 "is_system_role": True,
359 },
360 {
361 "name": "platform_viewer",
362 "description": "Read-only access to resources and admin UI",
363 "scope": "global",
364 "permissions": [
365 "admin.dashboard",
366 "gateways.read",
367 "servers.read",
368 "teams.read",
369 "teams.join",
370 "tools.read",
371 "resources.read",
372 "prompts.read",
373 "llm.read",
374 "a2a.read",
375 "tokens.create",
376 "tokens.read",
377 "tokens.update",
378 "tokens.revoke",
379 ],
380 "is_system_role": True,
381 },
382 ]
384 # Logic to add additional default roles from a json file
385 if settings.mcpgateway_bootstrap_roles_in_db_enabled:
386 try:
387 additional_default_roles_path = Path(settings.mcpgateway_bootstrap_roles_in_db_file)
388 # Try multiple locations for the mcpgateway_bootstrap_roles_in_db_file file
389 if not additional_default_roles_path.is_absolute():
390 # Try current directory first
391 if not additional_default_roles_path.exists():
392 # Try project root (mcpgateway/bootstrap_db.py -> parent.parent = repo root)
393 additional_default_roles_path = Path(__file__).resolve().parent.parent / settings.mcpgateway_bootstrap_roles_in_db_file
395 if not additional_default_roles_path.exists():
396 logger.warning(f"Additional roles file not found. Searched: CWD/{settings.mcpgateway_bootstrap_roles_in_db_file}, {additional_default_roles_path}")
397 else:
398 with open(additional_default_roles_path, "r", encoding="utf-8") as f:
399 additional_default_roles_data = json.load(f)
401 # Validate JSON structure: must be a list of dicts with required keys
402 required_keys = {"name", "scope", "permissions"}
403 if not isinstance(additional_default_roles_data, list):
404 logger.error(f"Additional roles file must contain a JSON array, got {type(additional_default_roles_data).__name__}")
405 else:
406 valid_roles = []
407 for idx, role in enumerate(additional_default_roles_data):
408 if not isinstance(role, dict):
409 logger.warning(f"Skipping invalid role at index {idx}: expected dict, got {type(role).__name__}")
410 continue
411 missing_keys = required_keys - set(role.keys())
412 if missing_keys:
413 role_name = role.get("name", f"<index {idx}>")
414 logger.warning(f"Skipping role '{role_name}': missing required keys {missing_keys}")
415 continue
416 valid_roles.append(role)
418 if valid_roles:
419 default_roles.extend(valid_roles)
420 logger.info(f"Added {len(valid_roles)} additional roles to default roles in bootstrap db")
421 elif additional_default_roles_data:
422 logger.warning("No valid roles found in additional roles file")
423 except Exception as e:
424 logger.error(f"Failed to load mcpgateway_bootstrap_roles_in_db_file: {e}")
426 # Create default roles
427 created_roles = []
428 for role_def in default_roles:
429 try:
430 # Check if role already exists
431 existing_role = await role_service.get_role_by_name(str(role_def["name"]), str(role_def["scope"]))
432 if existing_role:
433 logger.info(f"System role {role_def['name']} already exists - skipping")
434 created_roles.append(existing_role)
435 continue
437 # Create the role (description and is_system_role are optional)
438 role = await role_service.create_role(
439 name=str(role_def["name"]),
440 description=str(role_def.get("description", "")),
441 scope=str(role_def["scope"]),
442 permissions=cast(list[str], role_def["permissions"]),
443 created_by=settings.platform_admin_email,
444 is_system_role=bool(role_def.get("is_system_role", False)),
445 )
446 created_roles.append(role)
447 logger.info(f"Created system role: {role.name}")
449 except Exception as e:
450 logger.error(f"Failed to create role {role_def['name']}: {e}")
451 continue
453 # Assign platform_admin role to admin user
454 platform_admin_role = next((r for r in created_roles if r.name == "platform_admin"), None)
455 if not platform_admin_role:
456 # Role not in created_roles (creation may have failed) — look up from DB as fallback
457 platform_admin_role = await role_service.get_role_by_name("platform_admin", "global")
458 if platform_admin_role:
459 try:
460 # Check if assignment already exists
461 existing_assignment = await role_service.get_user_role_assignment(user_email=admin_user.email, role_id=platform_admin_role.id, scope="global", scope_id=None)
463 if not existing_assignment or not existing_assignment.is_active:
464 await role_service.assign_role_to_user(user_email=admin_user.email, role_id=platform_admin_role.id, scope="global", scope_id=None, granted_by=admin_user.email)
465 logger.info(f"Assigned platform_admin role to {admin_user.email}")
466 else:
467 logger.info("Admin user already has platform_admin role")
469 except Exception as e:
470 logger.error(f"Failed to assign platform_admin role to {admin_user.email}: {e}. Admin UI routes using allow_admin_bypass=False will return 403.")
471 else:
472 logger.error(f"platform_admin role not found — could not assign to {admin_user.email}. Admin UI routes using allow_admin_bypass=False will return 403.")
474 logger.info("Default RBAC roles bootstrap completed successfully")
476 except Exception as e:
477 logger.error(f"Failed to bootstrap default roles: {e}")
478 # Don't fail the entire bootstrap process if role creation fails
479 return
482def normalize_team_visibility(conn: Connection) -> int:
483 """Normalize team visibility values to the supported set {private, public}.
485 Any team with an unsupported visibility (e.g., 'team') is set to 'private'.
487 Args:
488 conn: Active SQLAlchemy connection
490 Returns:
491 int: Number of teams updated
492 """
493 try:
494 # Use session bound to the locked connection
495 with Session(bind=conn) as db:
496 # Find teams with invalid visibility
497 invalid = db.query(EmailTeam).filter(EmailTeam.visibility.notin_(["private", "public"]))
498 count = 0
499 for team in invalid.all():
500 old = team.visibility
501 team.visibility = "private"
502 count += 1
503 logger.info(f"Normalized team visibility: id={team.id} {old} -> private")
504 if count:
505 db.commit()
506 return count
507 except Exception as e:
508 logger.error(f"Failed to normalize team visibility: {e}")
509 return 0
512async def bootstrap_resource_assignments(conn: Connection) -> None:
513 """Assign orphaned resources to the platform admin's personal team.
515 This ensures existing resources (from pre-multitenancy versions) are
516 visible in the new team-based UI by assigning them to the admin's
517 personal team with public visibility.
519 Args:
520 conn: Active SQLAlchemy connection
521 """
522 if not settings.email_auth_enabled:
523 logger.info("Email authentication disabled - skipping resource assignment")
524 return
526 try:
527 # Use session bound to the locked connection
528 with Session(bind=conn) as db:
529 # Find admin user and their personal team
530 admin_user = db.query(EmailUser).filter(EmailUser.email == settings.platform_admin_email, EmailUser.is_admin.is_(True)).first()
532 if not admin_user:
533 logger.warning("Admin user not found - skipping resource assignment")
534 return
536 personal_team = admin_user.get_personal_team()
537 if not personal_team:
538 logger.warning("Admin personal team not found - skipping resource assignment")
539 return
541 logger.info(f"Assigning orphaned resources to admin team: {personal_team.name}")
543 # Resource types to process
544 resource_types = [("servers", Server), ("tools", Tool), ("resources", Resource), ("prompts", Prompt), ("gateways", Gateway), ("a2a_agents", A2AAgent)]
546 total_assigned = 0
548 for resource_name, resource_model in resource_types:
549 try:
550 # Find unassigned resources
551 unassigned = db.query(resource_model).filter((resource_model.team_id.is_(None)) | (resource_model.owner_email.is_(None)) | (resource_model.visibility.is_(None))).all()
553 if unassigned:
554 logger.info(f"Assigning {len(unassigned)} orphaned {resource_name} to admin team")
556 for resource in unassigned:
557 resource.team_id = personal_team.id
558 resource.owner_email = admin_user.email
559 resource.visibility = "public" # Make visible to all users
560 if hasattr(resource, "federation_source") and not resource.federation_source:
561 resource.federation_source = "mcpgateway-0.7.0-migration"
563 db.commit()
564 total_assigned += len(unassigned)
566 except Exception as e:
567 logger.error(f"Failed to assign {resource_name}: {e}")
568 continue
570 if total_assigned > 0:
571 logger.info(f"Successfully assigned {total_assigned} orphaned resources to admin team")
572 else:
573 logger.info("No orphaned resources found - all resources have team assignments")
575 except Exception as e:
576 logger.error(f"Failed to bootstrap resource assignments: {e}")
579async def main() -> None:
580 """
581 Bootstrap or upgrade the database schema, then log readiness.
583 Runs `create_all()` + `alembic stamp head` on an empty DB, otherwise just
584 executes `alembic upgrade head`, leaving application data intact.
585 Also creates the platform admin user if email authentication is enabled.
587 Uses distributed advisory locks (PG/MySQL) or file locking (SQLite)
588 to prevent race conditions when multiple workers start simultaneously.
590 Args:
591 None
593 Raises:
594 Exception: If migration or bootstrap fails
595 """
596 engine = create_engine(settings.database_url)
597 ini_path = files("mcpgateway").joinpath("alembic.ini")
598 cfg = Config(str(ini_path)) # path in container
599 cfg.attributes["configure_logger"] = True
601 # Use advisory lock to prevent concurrent migrations
602 try:
603 with engine.connect() as conn:
604 # Commit any open transaction on the connection before locking (though it should be fresh)
605 conn.commit()
607 with advisory_lock(conn):
608 logger.info("Acquired migration lock, checking database schema...")
610 # Pass the LOCKED connection to Alembic config
611 cfg.attributes["connection"] = conn
613 # Escape '%' characters in URL to avoid configparser interpolation errors
614 # (e.g., URL-encoded passwords like %40 for '@')
615 escaped_url = settings.database_url.replace("%", "%%")
616 cfg.set_main_option("sqlalchemy.url", escaped_url)
618 insp = inspect(conn)
619 table_names = insp.get_table_names()
621 if "gateways" not in table_names:
622 logger.info("Empty DB detected - creating baseline schema")
623 # Apply MariaDB compatibility fixes if needed
624 if settings.database_url.startswith(("mariadb", "mysql")):
625 # pylint: disable=import-outside-toplevel
626 # First-Party
627 from mcpgateway.alembic.env import _modify_metadata_for_mariadb, mariadb_naming_convention
629 _modify_metadata_for_mariadb()
630 Base.metadata.naming_convention = mariadb_naming_convention
631 logger.info("Applied MariaDB compatibility modifications")
633 Base.metadata.create_all(bind=conn)
634 command.stamp(cfg, "head")
635 else:
636 versions: list[str] = []
637 if "alembic_version" in table_names:
638 try:
639 rows = conn.execute(text("SELECT version_num FROM alembic_version")).fetchall()
640 versions = [row[0] for row in rows if row[0]]
641 except Exception as exc:
642 logger.warning("Failed to read alembic_version table: %s", exc)
644 if not versions and _schema_looks_current(insp):
645 logger.warning("Existing database has no Alembic revision rows; stamping head to avoid reapplying migrations")
646 command.stamp(cfg, "head")
647 else:
648 logger.info("Running Alembic migrations to ensure schema is up to date")
649 command.upgrade(cfg, "head")
651 # Post-upgrade normalization passes (inside lock to be safe)
652 updated = normalize_team_visibility(conn)
653 if updated:
654 logger.info(f"Normalized {updated} team record(s) to supported visibility values")
656 # Bootstrap admin user after database is ready, using the LOCKED connection
657 await bootstrap_admin_user(conn)
659 # Bootstrap default RBAC roles after admin user is created
660 await bootstrap_default_roles(conn)
662 # Assign orphaned resources to admin personal team after all setup is complete
663 await bootstrap_resource_assignments(conn)
665 conn.commit() # Ensure all migration changes are permanently committed
667 except Exception as e:
668 logger.error(f"Migration/Bootstrap failed: {e}")
669 # Allow retry logic or container restart to handle transient issues
670 raise
671 finally:
672 # Dispose the engine to close all connections in the pool
673 engine.dispose()
675 logger.info("Database ready")
678if __name__ == "__main__":
679 asyncio.run(main())