Coverage for mcpgateway / bootstrap_db.py: 100%
292 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/bootstrap_db.py
3Copyright 2025
4SPDX-License-Identifier: Apache-2.0
5Authors: Madhav Kandukuri
7Database bootstrap/upgrade entry-point for ContextForge.
8The script:
101. Creates a synchronous SQLAlchemy ``Engine`` from ``settings.database_url``.
112. Looks for an *alembic.ini* two levels up from this file to drive migrations.
123. Applies Alembic migrations (``alembic upgrade head``) to create or update the schema.
134. Runs post-upgrade normalization tasks and bootstraps admin/roles as configured.
145. Logs a **"Database ready"** message on success.
16It is intended to be invoked via ``python3 -m mcpgateway.bootstrap_db`` or
17directly with ``python3 mcpgateway/bootstrap_db.py``.
19Examples:
20 >>> from mcpgateway.bootstrap_db import logging_service, logger
21 >>> logging_service is not None
22 True
23 >>> logger is not None
24 True
25 >>> hasattr(logger, 'info')
26 True
27 >>> from mcpgateway.bootstrap_db import Base
28 >>> hasattr(Base, 'metadata')
29 True
30"""
32# Standard
33import asyncio
34from contextlib import contextmanager
35from importlib.resources import files
36import json
37import os
38from pathlib import Path
39import random
40import re
41import tempfile
42import time
43from typing import cast
45# Third-Party
46from alembic import command
47from alembic.config import Config
48from filelock import FileLock
49from sqlalchemy import create_engine, inspect, or_, text
50from sqlalchemy.engine import Connection
51from sqlalchemy.orm import Session
53# First-Party
54from mcpgateway.common.validators import SecurityValidator
55from mcpgateway.config import settings
56from mcpgateway.db import A2AAgent, Base, EmailTeam, EmailUser, Gateway, Prompt, Resource, Server, Tool
57from mcpgateway.services.logging_service import LoggingService
59# Migration lock to prevent concurrent migrations from multiple workers
60_MIGRATION_LOCK_PATH = os.path.join(tempfile.gettempdir(), "mcpgateway_migration.lock")
61_MIGRATION_LOCK_TIMEOUT = 300 # seconds to wait for lock (5 minutes for slow migrations)
63# Initialize logging service first
64logging_service = LoggingService()
65logger = logging_service.get_logger(__name__)
68def _column_exists(inspector, table_name: str, column_name: str) -> bool:
69 """Check whether a table has a specific column.
71 Args:
72 inspector: SQLAlchemy inspector for the active connection.
73 table_name: Table name to inspect.
74 column_name: Column name to check.
76 Returns:
77 True if the column exists, otherwise False.
78 """
79 try:
80 return any(col["name"] == column_name for col in inspector.get_columns(table_name))
81 except Exception:
82 return False
85def _schema_looks_current(inspector) -> bool:
86 """Best-effort check for unversioned databases that already match current schema.
88 Args:
89 inspector: SQLAlchemy inspector for the active connection.
91 Returns:
92 True when expected columns exist for a recent schema version.
93 """
94 return (
95 _column_exists(inspector, "tools", "display_name")
96 and _column_exists(inspector, "gateways", "oauth_config")
97 and _column_exists(inspector, "prompts", "custom_name")
98 and _column_exists(inspector, "sso_providers", "jwks_uri")
99 )
102@contextmanager
103def advisory_lock(conn: Connection):
104 """
105 Acquire a distributed advisory lock to serialize migrations across multiple instances.
107 Behavior depends on the database backend:
108 - Postgres: Uses `pg_try_advisory_lock` (non-blocking)
109 - SQLite: Fallback to local `FileLock`
111 Args:
112 conn: Active SQLAlchemy connection
114 Yields:
115 None
117 Raises:
118 TimeoutError: If the lock cannot be acquired within the timeout period
119 """
120 dialect = conn.dialect.name
121 # Postgres requires a BIGINT lock ID (arbitrary hash of the string)
122 pg_lock_id = 42424242424242
124 if dialect == "postgresql":
125 logger.info("Attempting to acquire Postgres advisory lock...")
127 # Retry parameters
128 max_retries = 60 # 60 attempts
129 base_delay = 1.0 # Start with 1 second
130 max_delay = 10.0 # Cap at 10 seconds
132 acquired = False
133 for attempt in range(max_retries):
134 # Try non-blocking lock
135 result = conn.execute(text(f"SELECT pg_try_advisory_lock({pg_lock_id})"))
136 acquired = result.scalar()
138 if acquired:
139 logger.info(f"Acquired Postgres advisory lock on attempt {attempt + 1}")
140 break
142 # Exponential backoff with jitter
143 delay = min(base_delay * (1.5**attempt), max_delay)
144 jitter = delay * random.uniform(-0.1, 0.1) # nosec B311 # noqa: DUO102
145 sleep_time = delay + jitter
147 logger.info(f"Lock held by another instance, retrying in {sleep_time:.1f}s (attempt {attempt + 1}/{max_retries})")
148 time.sleep(sleep_time)
150 if not acquired:
151 raise TimeoutError(f"Failed to acquire advisory lock after {max_retries} attempts")
153 try:
154 yield
155 finally:
156 logger.info("Releasing Postgres advisory lock...")
157 conn.execute(text(f"SELECT pg_advisory_unlock({pg_lock_id})"))
159 else:
160 # Fallback for SQLite (single-host/container) or other DBs
161 logger.info(f"Using FileLock fallback for {dialect}...")
162 file_lock = FileLock(_MIGRATION_LOCK_PATH, timeout=_MIGRATION_LOCK_TIMEOUT)
163 with file_lock:
164 yield
167async def bootstrap_admin_user(conn: Connection) -> None:
168 """
169 Bootstrap the platform admin user from environment variables.
171 Creates the admin user if email authentication is enabled and the user doesn't exist.
172 Also creates a personal team for the admin user if auto-creation is enabled.
174 Args:
175 conn: Active SQLAlchemy connection
176 """
177 if not settings.email_auth_enabled:
178 logger.info("Email authentication disabled - skipping admin user bootstrap")
179 return
181 try:
182 # Import services here to avoid circular imports
183 # First-Party
184 from mcpgateway.services.email_auth_service import EmailAuthService # pylint: disable=import-outside-toplevel
186 # Use session bound to the locked connection
187 with Session(bind=conn) as db:
188 auth_service = EmailAuthService(db)
190 # Check if admin user already exists
191 existing_user = await auth_service.get_user_by_email(settings.platform_admin_email)
192 if existing_user:
193 logger.info(f"Admin user {SecurityValidator.sanitize_log_message(settings.platform_admin_email)} already exists - skipping creation")
194 return
196 # Create admin user
197 logger.info(f"Creating platform admin user: {SecurityValidator.sanitize_log_message(settings.platform_admin_email)}")
198 admin_user = await auth_service.create_platform_admin(
199 email=settings.platform_admin_email,
200 password=settings.platform_admin_password.get_secret_value(),
201 full_name=settings.platform_admin_full_name,
202 )
204 # Mark admin user as email verified and require password change on first login
205 # First-Party
206 from mcpgateway.db import utc_now # pylint: disable=import-outside-toplevel
208 admin_user.email_verified_at = utc_now()
209 # Respect configuration: only require password change on bootstrap when enabled
210 if getattr(settings, "password_change_enforcement_enabled", True) and getattr(settings, "admin_require_password_change_on_bootstrap", True):
211 admin_user.password_change_required = True # Force admin to change default password
212 try:
213 admin_user.password_changed_at = utc_now()
214 except Exception as exc:
215 logger.debug("Failed to set admin password_changed_at: %s", exc)
216 db.commit()
218 # Personal team is automatically created during user creation if enabled
219 if settings.auto_create_personal_teams:
220 logger.info("Personal team automatically created for admin user")
222 db.commit()
223 logger.info(f"Platform admin user created successfully: {SecurityValidator.sanitize_log_message(settings.platform_admin_email)}")
225 except Exception as e:
226 logger.error(f"Failed to bootstrap admin user: {e}")
227 # Don't fail the entire bootstrap process if admin user creation fails
228 return
231async def bootstrap_default_roles(conn: Connection) -> None:
232 """Bootstrap default system roles and assign them to admin user.
234 Creates essential RBAC roles and assigns administrative privileges
235 to the platform admin user.
237 Args:
238 conn: Active SQLAlchemy connection
239 """
240 if not settings.email_auth_enabled:
241 logger.info("Email authentication disabled - skipping default roles bootstrap")
242 return
244 try:
245 # First-Party
246 from mcpgateway.services.email_auth_service import EmailAuthService # pylint: disable=import-outside-toplevel
247 from mcpgateway.services.role_service import RoleService # pylint: disable=import-outside-toplevel
249 # Use session bound to the locked connection
250 with Session(bind=conn) as db:
251 role_service = RoleService(db)
252 auth_service = EmailAuthService(db)
254 # Check if admin user exists
255 admin_user = await auth_service.get_user_by_email(settings.platform_admin_email)
256 if not admin_user:
257 logger.info("Admin user not found - skipping role assignment")
258 return
260 # Default system roles to create
261 default_roles = [
262 {"name": "platform_admin", "description": "Platform administrator with all permissions", "scope": "global", "permissions": ["*"], "is_system_role": True}, # All permissions
263 {
264 "name": "team_admin",
265 "description": "Team administrator with team management permissions",
266 "scope": "team",
267 "permissions": [
268 "admin.dashboard",
269 "admin.overview",
270 "gateways.read",
271 "servers.read",
272 "servers.use",
273 "teams.read",
274 "teams.update",
275 "teams.join",
276 "teams.delete",
277 "teams.manage_members",
278 "tools.read",
279 "tools.execute",
280 "resources.read",
281 "prompts.read",
282 "llm.read",
283 "llm.invoke",
284 "a2a.read",
285 "gateways.create",
286 "servers.create",
287 "tools.create",
288 "resources.create",
289 "prompts.create",
290 "a2a.create",
291 "gateways.update",
292 "servers.update",
293 "tools.update",
294 "resources.update",
295 "prompts.update",
296 "a2a.update",
297 "gateways.delete",
298 "servers.delete",
299 "tools.delete",
300 "resources.delete",
301 "prompts.delete",
302 "a2a.delete",
303 "a2a.invoke",
304 "tokens.create",
305 "tokens.read",
306 "tokens.update",
307 "tokens.revoke",
308 ],
309 "is_system_role": True,
310 },
311 {
312 "name": "developer",
313 "description": "Developer with tool and resource access",
314 "scope": "team",
315 "permissions": [
316 "admin.dashboard",
317 "admin.overview",
318 "gateways.read",
319 "servers.read",
320 "servers.use",
321 "teams.read",
322 "teams.join",
323 "tools.read",
324 "tools.execute",
325 "resources.read",
326 "prompts.read",
327 "llm.read",
328 "llm.invoke",
329 "a2a.read",
330 "gateways.create",
331 "servers.create",
332 "tools.create",
333 "resources.create",
334 "prompts.create",
335 "a2a.create",
336 "gateways.update",
337 "servers.update",
338 "tools.update",
339 "resources.update",
340 "prompts.update",
341 "a2a.update",
342 "gateways.delete",
343 "servers.delete",
344 "tools.delete",
345 "resources.delete",
346 "prompts.delete",
347 "a2a.delete",
348 "a2a.invoke",
349 "tokens.create",
350 "tokens.read",
351 "tokens.update",
352 "tokens.revoke",
353 ],
354 "is_system_role": True,
355 },
356 {
357 "name": "viewer",
358 "description": "Read access and tool execution within team scope",
359 "scope": "team",
360 "permissions": [
361 "admin.dashboard",
362 "admin.overview",
363 "gateways.read",
364 "servers.read",
365 "servers.use",
366 "teams.read",
367 "teams.join",
368 "tools.read",
369 "tools.execute",
370 "resources.read",
371 "prompts.read",
372 "llm.read",
373 "a2a.read",
374 "tokens.create",
375 "tokens.read",
376 "tokens.update",
377 "tokens.revoke",
378 ],
379 "is_system_role": True,
380 },
381 {
382 "name": "platform_viewer",
383 "description": "Read-only access to resources and admin UI",
384 "scope": "global",
385 "permissions": [
386 "admin.dashboard",
387 "admin.overview",
388 "gateways.read",
389 "servers.read",
390 "servers.use",
391 "teams.read",
392 "teams.join",
393 "tools.read",
394 "resources.read",
395 "prompts.read",
396 "llm.read",
397 "a2a.read",
398 "tokens.create",
399 "tokens.read",
400 "tokens.update",
401 "tokens.revoke",
402 ],
403 "is_system_role": True,
404 },
405 ]
407 # Logic to add additional default roles from a json file
408 if settings.mcpgateway_bootstrap_roles_in_db_enabled:
409 try:
410 additional_default_roles_path = Path(settings.mcpgateway_bootstrap_roles_in_db_file)
411 # Try multiple locations for the mcpgateway_bootstrap_roles_in_db_file file
412 if not additional_default_roles_path.is_absolute():
413 # Try current directory first
414 if not additional_default_roles_path.exists():
415 # Try project root (mcpgateway/bootstrap_db.py -> parent.parent = repo root)
416 additional_default_roles_path = Path(__file__).resolve().parent.parent / settings.mcpgateway_bootstrap_roles_in_db_file
418 if not additional_default_roles_path.exists():
419 logger.warning(
420 f"Additional roles file not found. Searched: CWD/{SecurityValidator.sanitize_log_message(settings.mcpgateway_bootstrap_roles_in_db_file)}, {SecurityValidator.sanitize_log_message(str(additional_default_roles_path))}"
421 )
422 else:
423 with open(additional_default_roles_path, "r", encoding="utf-8") as f:
424 additional_default_roles_data = json.load(f)
426 # Validate JSON structure: must be a list of dicts with required keys
427 required_keys = {"name", "scope", "permissions"}
428 if not isinstance(additional_default_roles_data, list):
429 logger.error(f"Additional roles file must contain a JSON array, got {type(additional_default_roles_data).__name__}")
430 else:
431 valid_roles = []
432 for idx, role in enumerate(additional_default_roles_data):
433 if not isinstance(role, dict):
434 logger.warning(f"Skipping invalid role at index {idx}: expected dict, got {type(role).__name__}")
435 continue
436 missing_keys = required_keys - set(role.keys())
437 if missing_keys:
438 role_name = role.get("name", f"<index {idx}>")
439 logger.warning(f"Skipping role '{SecurityValidator.sanitize_log_message(str(role_name))}': missing required keys {missing_keys}")
440 continue
441 valid_roles.append(role)
443 if valid_roles:
444 default_roles.extend(valid_roles)
445 logger.info(f"Added {len(valid_roles)} additional roles to default roles in bootstrap db")
446 elif additional_default_roles_data:
447 logger.warning("No valid roles found in additional roles file")
448 except Exception as e:
449 logger.error(f"Failed to load mcpgateway_bootstrap_roles_in_db_file: {e}")
451 # Create default roles
452 created_roles = []
453 for role_def in default_roles:
454 try:
455 # Check if role already exists
456 existing_role = await role_service.get_role_by_name(str(role_def["name"]), str(role_def["scope"]))
457 if existing_role:
458 logger.info(f"System role {SecurityValidator.sanitize_log_message(str(role_def['name']))} already exists - skipping")
459 created_roles.append(existing_role)
460 continue
462 # Create the role (description and is_system_role are optional)
463 role = await role_service.create_role(
464 name=str(role_def["name"]),
465 description=str(role_def.get("description", "")),
466 scope=str(role_def["scope"]),
467 permissions=cast(list[str], role_def["permissions"]),
468 created_by=settings.platform_admin_email,
469 is_system_role=bool(role_def.get("is_system_role", False)),
470 )
471 created_roles.append(role)
472 logger.info(f"Created system role: {role.name}")
474 except Exception as e:
475 logger.error(f"Failed to create role {SecurityValidator.sanitize_log_message(str(role_def['name']))}: {SecurityValidator.sanitize_log_message(str(e))}")
476 continue
478 # Assign platform_admin role to admin user
479 platform_admin_role = next((r for r in created_roles if r.name == "platform_admin"), None)
480 if not platform_admin_role:
481 # Role not in created_roles (creation may have failed) — look up from DB as fallback
482 platform_admin_role = await role_service.get_role_by_name("platform_admin", "global")
483 if platform_admin_role:
484 try:
485 # Check if assignment already exists
486 existing_assignment = await role_service.get_user_role_assignment(user_email=admin_user.email, role_id=platform_admin_role.id, scope="global", scope_id=None)
488 if not existing_assignment or not existing_assignment.is_active:
489 await role_service.assign_role_to_user(user_email=admin_user.email, role_id=platform_admin_role.id, scope="global", scope_id=None, granted_by=admin_user.email)
490 logger.info(f"Assigned platform_admin role to {SecurityValidator.sanitize_log_message(admin_user.email)}")
491 else:
492 logger.info("Admin user already has platform_admin role")
494 # Synchronize is_admin flag with platform_admin role assignment
495 # This ensures consistency when admin is manually demoted in DB but role is re-assigned during bootstrap
496 if not admin_user.is_admin:
497 logger.info(f"Synchronizing is_admin flag for {SecurityValidator.sanitize_log_message(admin_user.email)} (was False, setting to True)")
498 admin_user.is_admin = True
499 db.commit()
501 except Exception as e:
502 logger.error(
503 f"Failed to assign platform_admin role to {SecurityValidator.sanitize_log_message(admin_user.email)}: {SecurityValidator.sanitize_log_message(str(e))}. Admin UI routes using allow_admin_bypass=False will return 403."
504 )
505 else:
506 logger.error(
507 f"platform_admin role not found — could not assign to {SecurityValidator.sanitize_log_message(admin_user.email)}. Admin UI routes using allow_admin_bypass=False will return 403."
508 )
510 logger.info("Default RBAC roles bootstrap completed successfully")
512 except Exception as e:
513 logger.error(f"Failed to bootstrap default roles: {e}")
514 # Don't fail the entire bootstrap process if role creation fails
515 return
518def normalize_team_visibility(conn: Connection) -> int:
519 """Normalize team visibility values to the supported set {private, public}.
521 Any team with an unsupported visibility (e.g., 'team') is set to 'private'.
523 Args:
524 conn: Active SQLAlchemy connection
526 Returns:
527 int: Number of teams updated
528 """
529 try:
530 # Use session bound to the locked connection
531 with Session(bind=conn) as db:
532 # Find teams with invalid visibility
533 invalid = db.query(EmailTeam).filter(EmailTeam.visibility.notin_(["private", "public"]))
534 count = 0
535 for team in invalid.all():
536 old = team.visibility
537 team.visibility = "private"
538 count += 1
539 logger.info(f"Normalized team visibility: id={team.id} {old} -> private")
540 if count:
541 db.commit()
542 return count
543 except Exception as e:
544 logger.error(f"Failed to normalize team visibility: {e}")
545 return 0
548async def bootstrap_resource_assignments(conn: Connection) -> None:
549 """Assign orphaned resources to the platform admin's personal team.
551 This ensures existing resources (from pre-multitenancy versions) are
552 visible in the new team-based UI by assigning them to the admin's
553 personal team with public visibility.
555 Args:
556 conn: Active SQLAlchemy connection
557 """
558 if not settings.email_auth_enabled:
559 logger.info("Email authentication disabled - skipping resource assignment")
560 return
562 try:
563 # Use session bound to the locked connection
564 with Session(bind=conn) as db:
565 # Find admin user and their personal team
566 admin_user = db.query(EmailUser).filter(EmailUser.email == settings.platform_admin_email, EmailUser.is_admin.is_(True)).first()
568 if not admin_user:
569 logger.warning("Admin user not found - skipping resource assignment")
570 return
572 personal_team = admin_user.get_personal_team()
573 if not personal_team:
574 logger.warning("Admin personal team not found - skipping resource assignment")
575 return
577 logger.info(f"Assigning orphaned resources to admin team: {SecurityValidator.sanitize_log_message(personal_team.name)}")
579 # Resource types to process
580 resource_types = [("servers", Server), ("tools", Tool), ("resources", Resource), ("prompts", Prompt), ("gateways", Gateway), ("a2a_agents", A2AAgent)]
582 total_assigned = 0
584 # Unique field per resource type that participates in the team-scoped unique constraint
585 unique_field: dict[str, str] = {
586 "servers": "name",
587 "tools": "name",
588 "resources": "uri",
589 "prompts": "name",
590 "gateways": "slug",
591 "a2a_agents": "slug",
592 }
594 def _like_safe(v: str) -> str:
595 """Escape SQL LIKE wildcard characters for safe use in LIKE patterns.
597 Args:
598 v: The string value to escape.
600 Returns:
601 The escaped string safe for use in SQL LIKE patterns.
602 """
603 return v.replace("\\", "\\\\").replace("%", r"\%").replace("_", r"\_")
605 for resource_name, resource_model in resource_types:
606 try:
607 # Find unassigned resources
608 unassigned = db.query(resource_model).filter((resource_model.team_id.is_(None)) | (resource_model.owner_email.is_(None)) | (resource_model.visibility.is_(None))).all()
610 if not unassigned:
611 continue
613 logger.info(f"Assigning {len(unassigned)} orphaned {resource_name} to admin team")
615 field = unique_field[resource_name]
616 field_col = getattr(resource_model, field)
618 # Collect unique field values from the orphaned batch
619 original_values = {getattr(r, field) for r in unassigned if getattr(r, field) is not None}
621 # One query: fetch all names already taken in the admin team that match any
622 # original value exactly or as a suffixed variant (value-N).
623 # NOTE: This intentionally omits gateway_id from the filter, making it
624 # conservative — for Resource/Prompt models whose uniqueness also depends
625 # on gateway_id, this may produce unnecessary renames but can never miss a
626 # real conflict. That is the correct tradeoff for one-time bootstrap code.
627 existing_taken: set[str] = (
628 {
629 row[0]
630 for row in db.query(field_col).filter(
631 resource_model.team_id == personal_team.id,
632 resource_model.owner_email == admin_user.email,
633 or_(*[cond for v in original_values for cond in (field_col == v, field_col.like(f"{_like_safe(v)}-%", escape="\\"))]),
634 )
635 }
636 if original_values
637 else set()
638 )
640 # Pre-compile suffix regexes keyed by original value
641 suffix_res = {v: re.compile(rf"^{re.escape(v)}-(\d+)$") for v in original_values}
643 # Track names claimed within this batch to catch intra-batch duplicates
644 batch_assigned: set[str] = set()
646 for resource in unassigned:
647 original_value = getattr(resource, field)
649 if original_value is not None:
650 taken = existing_taken | batch_assigned
651 if original_value in taken:
652 # Parse numeric suffixes from taken values to find next free one
653 suffix_re = suffix_res[original_value]
654 used = {int(m.group(1)) for v in taken if (m := suffix_re.match(v))}
655 new_value = f"{original_value}-{(max(used) if used else 1) + 1}"
656 logger.warning(
657 f"Name conflict for {SecurityValidator.sanitize_log_message(resource_name)} '{SecurityValidator.sanitize_log_message(original_value)}' — renaming to '{SecurityValidator.sanitize_log_message(new_value)}'"
658 )
659 setattr(resource, field, new_value)
660 batch_assigned.add(new_value)
661 else:
662 batch_assigned.add(original_value)
664 resource.team_id = personal_team.id
665 resource.owner_email = admin_user.email
666 resource.visibility = "public" # Make visible to all users
667 if hasattr(resource, "federation_source") and not resource.federation_source:
668 resource.federation_source = "mcpgateway-0.7.0-migration"
670 db.commit()
671 total_assigned += len(unassigned)
673 except Exception as e:
674 logger.error(f"Failed to assign {SecurityValidator.sanitize_log_message(resource_name)}: {SecurityValidator.sanitize_log_message(str(e))}")
675 continue
677 if total_assigned > 0:
678 logger.info(f"Successfully assigned {total_assigned} orphaned resources to admin team")
679 else:
680 logger.info("No orphaned resources found - all resources have team assignments")
682 except Exception as e:
683 logger.error(f"Failed to bootstrap resource assignments: {e}")
686async def main() -> None:
687 """
688 Bootstrap or upgrade the database schema, then log readiness.
690 Runs `create_all()` + `alembic stamp head` on an empty DB, otherwise just
691 executes `alembic upgrade head`, leaving application data intact.
692 Also creates the platform admin user if email authentication is enabled.
694 Uses distributed advisory locks (PG) or file locking (SQLite)
695 to prevent race conditions when multiple workers start simultaneously.
697 Args:
698 None
700 Raises:
701 Exception: If migration or bootstrap fails
702 """
703 engine = create_engine(settings.database_url)
704 ini_path = files("mcpgateway").joinpath("alembic.ini")
705 cfg = Config(str(ini_path)) # path in container
706 cfg.attributes["configure_logger"] = True
708 # Use advisory lock to prevent concurrent migrations
709 try:
710 with engine.connect() as conn:
711 # Commit any open transaction on the connection before locking (though it should be fresh)
712 conn.commit()
714 with advisory_lock(conn):
715 logger.info("Acquired migration lock, checking database schema...")
717 # Pass the LOCKED connection to Alembic config
718 cfg.attributes["connection"] = conn
720 # Escape '%' characters in URL to avoid configparser interpolation errors
721 # (e.g., URL-encoded passwords like %40 for '@')
722 escaped_url = settings.database_url.replace("%", "%%")
723 cfg.set_main_option("sqlalchemy.url", escaped_url)
725 insp = inspect(conn)
726 table_names = insp.get_table_names()
728 if "gateways" not in table_names:
729 logger.info("Empty DB detected - creating baseline schema")
730 Base.metadata.create_all(bind=conn)
731 command.stamp(cfg, "head")
732 else:
733 versions: list[str] = []
734 if "alembic_version" in table_names:
735 try:
736 rows = conn.execute(text("SELECT version_num FROM alembic_version")).fetchall()
737 versions = [row[0] for row in rows if row[0]]
738 except Exception as exc:
739 logger.warning("Failed to read alembic_version table: %s", exc)
741 if not versions and _schema_looks_current(insp):
742 logger.warning("Existing database has no Alembic revision rows; stamping head to avoid reapplying migrations")
743 command.stamp(cfg, "head")
744 else:
745 logger.info("Running Alembic migrations to ensure schema is up to date")
746 command.upgrade(cfg, "head")
748 # Post-upgrade normalization passes (inside lock to be safe)
749 updated = normalize_team_visibility(conn)
750 if updated:
751 logger.info(f"Normalized {updated} team record(s) to supported visibility values")
753 # Bootstrap admin user after database is ready, using the LOCKED connection
754 await bootstrap_admin_user(conn)
756 # Bootstrap default RBAC roles after admin user is created
757 await bootstrap_default_roles(conn)
759 # Assign orphaned resources to admin personal team after all setup is complete
760 await bootstrap_resource_assignments(conn)
762 conn.commit() # Ensure all migration changes are permanently committed
764 except Exception as e:
765 logger.error(f"Migration/Bootstrap failed: {e}")
766 # Allow retry logic or container restart to handle transient issues
767 raise
768 finally:
769 # Dispose the engine to close all connections in the pool
770 engine.dispose()
772 logger.info("Database ready")
775if __name__ == "__main__":
776 asyncio.run(main())