Coverage for mcpgateway / bootstrap_db.py: 100%
259 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 07:10 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/bootstrap_db.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Madhav Kandukuri
7Database bootstrap/upgrade entry-point for MCP Gateway.
8The script:
101. Creates a synchronous SQLAlchemy ``Engine`` from ``settings.database_url``.
112. Looks for an *alembic.ini* two levels up from this file to drive migrations.
123. Applies Alembic migrations (``alembic upgrade head``) to create or update the schema.
134. Runs post-upgrade normalization tasks and bootstraps admin/roles as configured.
145. Logs a **"Database ready"** message on success.
16It is intended to be invoked via ``python3 -m mcpgateway.bootstrap_db`` or
17directly with ``python3 mcpgateway/bootstrap_db.py``.
19Examples:
20 >>> from mcpgateway.bootstrap_db import logging_service, logger
21 >>> logging_service is not None
22 True
23 >>> logger is not None
24 True
25 >>> hasattr(logger, 'info')
26 True
27 >>> from mcpgateway.bootstrap_db import Base
28 >>> hasattr(Base, 'metadata')
29 True
30"""
32# Standard
33import asyncio
34from contextlib import contextmanager
35from importlib.resources import files
36import json
37import os
38from pathlib import Path
39import tempfile
40from typing import cast
42# Third-Party
43from alembic import command
44from alembic.config import Config
45from filelock import FileLock
46from sqlalchemy import create_engine, inspect, text
47from sqlalchemy.engine import Connection
48from sqlalchemy.orm import Session
50# First-Party
51from mcpgateway.config import settings
52from mcpgateway.db import A2AAgent, Base, EmailTeam, EmailUser, Gateway, Prompt, Resource, Server, Tool
53from mcpgateway.services.logging_service import LoggingService
55# Migration lock to prevent concurrent migrations from multiple workers
56_MIGRATION_LOCK_PATH = os.path.join(tempfile.gettempdir(), "mcpgateway_migration.lock")
57_MIGRATION_LOCK_TIMEOUT = 300 # seconds to wait for lock (5 minutes for slow migrations)
59# Initialize logging service first
60logging_service = LoggingService()
61logger = logging_service.get_logger(__name__)
64def _column_exists(inspector, table_name: str, column_name: str) -> bool:
65 """Check whether a table has a specific column.
67 Args:
68 inspector: SQLAlchemy inspector for the active connection.
69 table_name: Table name to inspect.
70 column_name: Column name to check.
72 Returns:
73 True if the column exists, otherwise False.
74 """
75 try:
76 return any(col["name"] == column_name for col in inspector.get_columns(table_name))
77 except Exception:
78 return False
81def _schema_looks_current(inspector) -> bool:
82 """Best-effort check for unversioned databases that already match current schema.
84 Args:
85 inspector: SQLAlchemy inspector for the active connection.
87 Returns:
88 True when expected columns exist for a recent schema version.
89 """
90 return _column_exists(inspector, "tools", "display_name") and _column_exists(inspector, "gateways", "oauth_config") and _column_exists(inspector, "prompts", "custom_name")
93@contextmanager
94def advisory_lock(conn: Connection):
95 """
96 Acquire a distributed advisory lock to serialize migrations across multiple instances.
98 Behavior depends on the database backend:
99 - Postgres: Uses `pg_advisory_lock` (blocking)
100 - MySQL: Uses `GET_LOCK` (blocking with timeout)
101 - SQLite: Fallback to local `FileLock`
103 Args:
104 conn: Active SQLAlchemy connection
106 Yields:
107 None
109 Raises:
110 TimeoutError: If the lock cannot be acquired within the timeout period
111 """
112 dialect = conn.dialect.name
113 lock_id = "mcpgateway_migration"
114 # Postgres requires a BIGINT lock ID (arbitrary hash of the string)
115 pg_lock_id = 42424242424242
117 if dialect == "postgresql":
118 logger.info("Acquiring Postgres advisory lock...")
119 conn.execute(text(f"SELECT pg_advisory_lock({pg_lock_id})"))
120 try:
121 yield
122 finally:
123 logger.info("Releasing Postgres advisory lock...")
124 conn.execute(text(f"SELECT pg_advisory_unlock({pg_lock_id})"))
126 elif dialect in ["mysql", "mariadb"]:
127 logger.info("Acquiring MySQL advisory lock...")
128 # GET_LOCK returns 1 if successful, 0 if timed out, NULL on error
129 result = conn.execute(text(f"SELECT GET_LOCK('{lock_id}', {_MIGRATION_LOCK_TIMEOUT})")).scalar()
130 if result != 1:
131 raise TimeoutError(f"Could not acquire MySQL lock '{lock_id}' within {_MIGRATION_LOCK_TIMEOUT}s")
132 try:
133 yield
134 finally:
135 logger.info("Releasing MySQL advisory lock...")
136 conn.execute(text(f"SELECT RELEASE_LOCK('{lock_id}')"))
138 else:
139 # Fallback for SQLite (single-host/container) or other DBs
140 logger.info(f"Using FileLock fallback for {dialect}...")
141 file_lock = FileLock(_MIGRATION_LOCK_PATH, timeout=_MIGRATION_LOCK_TIMEOUT)
142 with file_lock:
143 yield
146async def bootstrap_admin_user(conn: Connection) -> None:
147 """
148 Bootstrap the platform admin user from environment variables.
150 Creates the admin user if email authentication is enabled and the user doesn't exist.
151 Also creates a personal team for the admin user if auto-creation is enabled.
153 Args:
154 conn: Active SQLAlchemy connection
155 """
156 if not settings.email_auth_enabled:
157 logger.info("Email authentication disabled - skipping admin user bootstrap")
158 return
160 try:
161 # Import services here to avoid circular imports
162 # First-Party
163 from mcpgateway.services.email_auth_service import EmailAuthService # pylint: disable=import-outside-toplevel
165 # Use session bound to the locked connection
166 with Session(bind=conn) as db:
167 auth_service = EmailAuthService(db)
169 # Check if admin user already exists
170 existing_user = await auth_service.get_user_by_email(settings.platform_admin_email)
171 if existing_user:
172 logger.info(f"Admin user {settings.platform_admin_email} already exists - skipping creation")
173 return
175 # Create admin user
176 logger.info(f"Creating platform admin user: {settings.platform_admin_email}")
177 admin_user = await auth_service.create_platform_admin(
178 email=settings.platform_admin_email,
179 password=settings.platform_admin_password.get_secret_value(),
180 full_name=settings.platform_admin_full_name,
181 )
183 # Mark admin user as email verified and require password change on first login
184 # First-Party
185 from mcpgateway.db import utc_now # pylint: disable=import-outside-toplevel
187 admin_user.email_verified_at = utc_now()
188 # Respect configuration: only require password change on bootstrap when enabled
189 if getattr(settings, "password_change_enforcement_enabled", True) and getattr(settings, "admin_require_password_change_on_bootstrap", True):
190 admin_user.password_change_required = True # Force admin to change default password
191 try:
192 admin_user.password_changed_at = utc_now()
193 except Exception as exc:
194 logger.debug("Failed to set admin password_changed_at: %s", exc)
195 db.commit()
197 # Personal team is automatically created during user creation if enabled
198 if settings.auto_create_personal_teams:
199 logger.info("Personal team automatically created for admin user")
201 db.commit()
202 logger.info(f"Platform admin user created successfully: {settings.platform_admin_email}")
204 except Exception as e:
205 logger.error(f"Failed to bootstrap admin user: {e}")
206 # Don't fail the entire bootstrap process if admin user creation fails
207 return
210async def bootstrap_default_roles(conn: Connection) -> None:
211 """Bootstrap default system roles and assign them to admin user.
213 Creates essential RBAC roles and assigns administrative privileges
214 to the platform admin user.
216 Args:
217 conn: Active SQLAlchemy connection
218 """
219 if not settings.email_auth_enabled:
220 logger.info("Email authentication disabled - skipping default roles bootstrap")
221 return
223 try:
224 # First-Party
225 from mcpgateway.services.email_auth_service import EmailAuthService # pylint: disable=import-outside-toplevel
226 from mcpgateway.services.role_service import RoleService # pylint: disable=import-outside-toplevel
228 # Use session bound to the locked connection
229 with Session(bind=conn) as db:
230 role_service = RoleService(db)
231 auth_service = EmailAuthService(db)
233 # Check if admin user exists
234 admin_user = await auth_service.get_user_by_email(settings.platform_admin_email)
235 if not admin_user:
236 logger.info("Admin user not found - skipping role assignment")
237 return
239 # Default system roles to create
240 default_roles = [
241 {"name": "platform_admin", "description": "Platform administrator with all permissions", "scope": "global", "permissions": ["*"], "is_system_role": True}, # All permissions
242 {
243 "name": "team_admin",
244 "description": "Team administrator with team management permissions",
245 "scope": "team",
246 "permissions": [
247 "admin.dashboard",
248 "gateways.read",
249 "servers.read",
250 "teams.read",
251 "teams.update",
252 "teams.join",
253 "teams.delete",
254 "teams.manage_members",
255 "tools.read",
256 "tools.execute",
257 "resources.read",
258 "prompts.read",
259 "a2a.read",
260 "gateways.create",
261 "servers.create",
262 "tools.create",
263 "resources.create",
264 "prompts.create",
265 "a2a.create",
266 "gateways.update",
267 "servers.update",
268 "tools.update",
269 "resources.update",
270 "prompts.update",
271 "a2a.update",
272 "gateways.delete",
273 "servers.delete",
274 "tools.delete",
275 "resources.delete",
276 "prompts.delete",
277 "a2a.delete",
278 "a2a.invoke",
279 ],
280 "is_system_role": True,
281 },
282 {
283 "name": "developer",
284 "description": "Developer with tool and resource access",
285 "scope": "team",
286 "permissions": [
287 "admin.dashboard",
288 "gateways.read",
289 "servers.read",
290 "teams.join",
291 "tools.read",
292 "tools.execute",
293 "resources.read",
294 "prompts.read",
295 "a2a.read",
296 "gateways.create",
297 "servers.create",
298 "tools.create",
299 "resources.create",
300 "prompts.create",
301 "a2a.create",
302 "gateways.update",
303 "servers.update",
304 "tools.update",
305 "resources.update",
306 "prompts.update",
307 "a2a.update",
308 "gateways.delete",
309 "servers.delete",
310 "tools.delete",
311 "resources.delete",
312 "prompts.delete",
313 "a2a.delete",
314 "a2a.invoke",
315 ],
316 "is_system_role": True,
317 },
318 {
319 "name": "viewer",
320 "description": "Read-only access to resources and admin UI",
321 "scope": "team",
322 "permissions": ["admin.dashboard", "gateways.read", "servers.read", "teams.join", "tools.read", "resources.read", "prompts.read", "a2a.read"],
323 "is_system_role": True,
324 },
325 {
326 "name": "platform_viewer",
327 "description": "Read-only access to resources and admin UI",
328 "scope": "global",
329 "permissions": ["admin.dashboard", "gateways.read", "servers.read", "teams.join", "tools.read", "resources.read", "prompts.read", "a2a.read"],
330 "is_system_role": True,
331 },
332 ]
334 # Logic to add additional default roles from a json file
335 if settings.mcpgateway_bootstrap_roles_in_db_enabled:
336 try:
337 additional_default_roles_path = Path(settings.mcpgateway_bootstrap_roles_in_db_file)
338 # Try multiple locations for the mcpgateway_bootstrap_roles_in_db_file file
339 if not additional_default_roles_path.is_absolute():
340 # Try current directory first
341 if not additional_default_roles_path.exists():
342 # Try project root (mcpgateway/bootstrap_db.py -> parent.parent = repo root)
343 additional_default_roles_path = Path(__file__).resolve().parent.parent / settings.mcpgateway_bootstrap_roles_in_db_file
345 if not additional_default_roles_path.exists():
346 logger.warning(f"Additional roles file not found. Searched: CWD/{settings.mcpgateway_bootstrap_roles_in_db_file}, {additional_default_roles_path}")
347 else:
348 with open(additional_default_roles_path, "r", encoding="utf-8") as f:
349 additional_default_roles_data = json.load(f)
351 # Validate JSON structure: must be a list of dicts with required keys
352 required_keys = {"name", "scope", "permissions"}
353 if not isinstance(additional_default_roles_data, list):
354 logger.error(f"Additional roles file must contain a JSON array, got {type(additional_default_roles_data).__name__}")
355 else:
356 valid_roles = []
357 for idx, role in enumerate(additional_default_roles_data):
358 if not isinstance(role, dict):
359 logger.warning(f"Skipping invalid role at index {idx}: expected dict, got {type(role).__name__}")
360 continue
361 missing_keys = required_keys - set(role.keys())
362 if missing_keys:
363 role_name = role.get("name", f"<index {idx}>")
364 logger.warning(f"Skipping role '{role_name}': missing required keys {missing_keys}")
365 continue
366 valid_roles.append(role)
368 if valid_roles:
369 default_roles.extend(valid_roles)
370 logger.info(f"Added {len(valid_roles)} additional roles to default roles in bootstrap db")
371 elif additional_default_roles_data:
372 logger.warning("No valid roles found in additional roles file")
373 except Exception as e:
374 logger.error(f"Failed to load mcpgateway_bootstrap_roles_in_db_file: {e}")
376 # Create default roles
377 created_roles = []
378 for role_def in default_roles:
379 try:
380 # Check if role already exists
381 existing_role = await role_service.get_role_by_name(str(role_def["name"]), str(role_def["scope"]))
382 if existing_role:
383 logger.info(f"System role {role_def['name']} already exists - skipping")
384 created_roles.append(existing_role)
385 continue
387 # Create the role (description and is_system_role are optional)
388 role = await role_service.create_role(
389 name=str(role_def["name"]),
390 description=str(role_def.get("description", "")),
391 scope=str(role_def["scope"]),
392 permissions=cast(list[str], role_def["permissions"]),
393 created_by=settings.platform_admin_email,
394 is_system_role=bool(role_def.get("is_system_role", False)),
395 )
396 created_roles.append(role)
397 logger.info(f"Created system role: {role.name}")
399 except Exception as e:
400 logger.error(f"Failed to create role {role_def['name']}: {e}")
401 continue
403 # Assign platform_admin role to admin user
404 platform_admin_role = next((r for r in created_roles if r.name == "platform_admin"), None)
405 if platform_admin_role:
406 try:
407 # Check if assignment already exists
408 existing_assignment = await role_service.get_user_role_assignment(user_email=admin_user.email, role_id=platform_admin_role.id, scope="global", scope_id=None)
410 if not existing_assignment or not existing_assignment.is_active:
411 await role_service.assign_role_to_user(user_email=admin_user.email, role_id=platform_admin_role.id, scope="global", scope_id=None, granted_by=admin_user.email)
412 logger.info(f"Assigned platform_admin role to {admin_user.email}")
413 else:
414 logger.info("Admin user already has platform_admin role")
416 except Exception as e:
417 logger.error(f"Failed to assign platform_admin role: {e}")
419 logger.info("Default RBAC roles bootstrap completed successfully")
421 except Exception as e:
422 logger.error(f"Failed to bootstrap default roles: {e}")
423 # Don't fail the entire bootstrap process if role creation fails
424 return
427def normalize_team_visibility(conn: Connection) -> int:
428 """Normalize team visibility values to the supported set {private, public}.
430 Any team with an unsupported visibility (e.g., 'team') is set to 'private'.
432 Args:
433 conn: Active SQLAlchemy connection
435 Returns:
436 int: Number of teams updated
437 """
438 try:
439 # Use session bound to the locked connection
440 with Session(bind=conn) as db:
441 # Find teams with invalid visibility
442 invalid = db.query(EmailTeam).filter(EmailTeam.visibility.notin_(["private", "public"]))
443 count = 0
444 for team in invalid.all():
445 old = team.visibility
446 team.visibility = "private"
447 count += 1
448 logger.info(f"Normalized team visibility: id={team.id} {old} -> private")
449 if count:
450 db.commit()
451 return count
452 except Exception as e:
453 logger.error(f"Failed to normalize team visibility: {e}")
454 return 0
457async def bootstrap_resource_assignments(conn: Connection) -> None:
458 """Assign orphaned resources to the platform admin's personal team.
460 This ensures existing resources (from pre-multitenancy versions) are
461 visible in the new team-based UI by assigning them to the admin's
462 personal team with public visibility.
464 Args:
465 conn: Active SQLAlchemy connection
466 """
467 if not settings.email_auth_enabled:
468 logger.info("Email authentication disabled - skipping resource assignment")
469 return
471 try:
472 # Use session bound to the locked connection
473 with Session(bind=conn) as db:
474 # Find admin user and their personal team
475 admin_user = db.query(EmailUser).filter(EmailUser.email == settings.platform_admin_email, EmailUser.is_admin.is_(True)).first()
477 if not admin_user:
478 logger.warning("Admin user not found - skipping resource assignment")
479 return
481 personal_team = admin_user.get_personal_team()
482 if not personal_team:
483 logger.warning("Admin personal team not found - skipping resource assignment")
484 return
486 logger.info(f"Assigning orphaned resources to admin team: {personal_team.name}")
488 # Resource types to process
489 resource_types = [("servers", Server), ("tools", Tool), ("resources", Resource), ("prompts", Prompt), ("gateways", Gateway), ("a2a_agents", A2AAgent)]
491 total_assigned = 0
493 for resource_name, resource_model in resource_types:
494 try:
495 # Find unassigned resources
496 unassigned = db.query(resource_model).filter((resource_model.team_id.is_(None)) | (resource_model.owner_email.is_(None)) | (resource_model.visibility.is_(None))).all()
498 if unassigned:
499 logger.info(f"Assigning {len(unassigned)} orphaned {resource_name} to admin team")
501 for resource in unassigned:
502 resource.team_id = personal_team.id
503 resource.owner_email = admin_user.email
504 resource.visibility = "public" # Make visible to all users
505 if hasattr(resource, "federation_source") and not resource.federation_source:
506 resource.federation_source = "mcpgateway-0.7.0-migration"
508 db.commit()
509 total_assigned += len(unassigned)
511 except Exception as e:
512 logger.error(f"Failed to assign {resource_name}: {e}")
513 continue
515 if total_assigned > 0:
516 logger.info(f"Successfully assigned {total_assigned} orphaned resources to admin team")
517 else:
518 logger.info("No orphaned resources found - all resources have team assignments")
520 except Exception as e:
521 logger.error(f"Failed to bootstrap resource assignments: {e}")
524async def main() -> None:
525 """
526 Bootstrap or upgrade the database schema, then log readiness.
528 Runs `create_all()` + `alembic stamp head` on an empty DB, otherwise just
529 executes `alembic upgrade head`, leaving application data intact.
530 Also creates the platform admin user if email authentication is enabled.
532 Uses distributed advisory locks (PG/MySQL) or file locking (SQLite)
533 to prevent race conditions when multiple workers start simultaneously.
535 Args:
536 None
538 Raises:
539 Exception: If migration or bootstrap fails
540 """
541 engine = create_engine(settings.database_url)
542 ini_path = files("mcpgateway").joinpath("alembic.ini")
543 cfg = Config(str(ini_path)) # path in container
544 cfg.attributes["configure_logger"] = True
546 # Use advisory lock to prevent concurrent migrations
547 try:
548 with engine.connect() as conn:
549 # Commit any open transaction on the connection before locking (though it should be fresh)
550 conn.commit()
552 with advisory_lock(conn):
553 logger.info("Acquired migration lock, checking database schema...")
555 # Pass the LOCKED connection to Alembic config
556 cfg.attributes["connection"] = conn
558 # Escape '%' characters in URL to avoid configparser interpolation errors
559 # (e.g., URL-encoded passwords like %40 for '@')
560 escaped_url = settings.database_url.replace("%", "%%")
561 cfg.set_main_option("sqlalchemy.url", escaped_url)
563 insp = inspect(conn)
564 table_names = insp.get_table_names()
566 if "gateways" not in table_names:
567 logger.info("Empty DB detected - creating baseline schema")
568 # Apply MariaDB compatibility fixes if needed
569 if settings.database_url.startswith(("mariadb", "mysql")):
570 # pylint: disable=import-outside-toplevel
571 # First-Party
572 from mcpgateway.alembic.env import _modify_metadata_for_mariadb, mariadb_naming_convention
574 _modify_metadata_for_mariadb()
575 Base.metadata.naming_convention = mariadb_naming_convention
576 logger.info("Applied MariaDB compatibility modifications")
578 Base.metadata.create_all(bind=conn)
579 command.stamp(cfg, "head")
580 else:
581 versions: list[str] = []
582 if "alembic_version" in table_names:
583 try:
584 rows = conn.execute(text("SELECT version_num FROM alembic_version")).fetchall()
585 versions = [row[0] for row in rows if row[0]]
586 except Exception as exc:
587 logger.warning("Failed to read alembic_version table: %s", exc)
589 if not versions and _schema_looks_current(insp):
590 logger.warning("Existing database has no Alembic revision rows; stamping head to avoid reapplying migrations")
591 command.stamp(cfg, "head")
592 else:
593 logger.info("Running Alembic migrations to ensure schema is up to date")
594 command.upgrade(cfg, "head")
596 # Post-upgrade normalization passes (inside lock to be safe)
597 updated = normalize_team_visibility(conn)
598 if updated:
599 logger.info(f"Normalized {updated} team record(s) to supported visibility values")
601 # Bootstrap admin user after database is ready, using the LOCKED connection
602 await bootstrap_admin_user(conn)
604 # Bootstrap default RBAC roles after admin user is created
605 await bootstrap_default_roles(conn)
607 # Assign orphaned resources to admin personal team after all setup is complete
608 await bootstrap_resource_assignments(conn)
610 conn.commit() # Ensure all migration changes are permanently committed
612 except Exception as e:
613 logger.error(f"Migration/Bootstrap failed: {e}")
614 # Allow retry logic or container restart to handle transient issues
615 raise
616 finally:
617 # Dispose the engine to close all connections in the pool
618 engine.dispose()
620 logger.info("Database ready")
623if __name__ == "__main__":
624 asyncio.run(main())