Coverage for mcpgateway / services / base_service.py: 100%
38 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# Copyright (c) 2025 IBM Corp. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
4"""Abstract base class for services with visibility-filtered listing."""
6# Standard
7from abc import ABC
8from typing import Any, List, Optional
10# Third-Party
11from sqlalchemy import and_, or_
12from sqlalchemy.orm import Session
14# First-Party
15from mcpgateway.services.team_management_service import TeamManagementService
18class BaseService(ABC):
19 """Abstract base class for services with visibility-filtered listing."""
21 _visibility_model_cls: type
23 def __init_subclass__(cls, **kwargs: Any) -> None:
24 """Ensure subclasses define _visibility_model_cls.
26 Args:
27 **kwargs: Keyword arguments forwarded to super().__init_subclass__.
29 Raises:
30 TypeError: If the subclass does not set _visibility_model_cls to a type.
31 """
32 super().__init_subclass__(**kwargs)
33 if not isinstance(cls.__dict__.get("_visibility_model_cls"), type):
34 raise TypeError(f"{cls.__name__} must set _visibility_model_cls to a model class")
36 async def _apply_access_control(
37 self,
38 query: Any,
39 db: Session,
40 user_email: Optional[str],
41 token_teams: Optional[List[str]],
42 team_id: Optional[str] = None,
43 ) -> Any:
44 """Resolve team membership and apply visibility filtering to a query.
46 Handles the full access-control flow for list endpoints:
47 1. Returns query unmodified when no auth context is present (admin bypass)
48 2. Resolves effective teams from JWT token_teams or DB lookup
49 3. Suppresses owner matching for public-only tokens (token_teams=[])
50 4. Delegates to _apply_visibility_filter for SQL WHERE construction
52 Args:
53 query: SQLAlchemy query to filter
54 db: Database session (for team membership lookup when token_teams is None)
55 user_email: User's email. None = no user context.
56 token_teams: Teams from JWT via normalize_token_teams().
57 None = admin bypass or no auth context.
58 [] = public-only token.
59 [...] = team-scoped token.
60 team_id: Optional specific team filter
62 Returns:
63 Query with visibility WHERE clauses applied, or unmodified
64 if no auth context is present.
65 """
66 if user_email is None and token_teams is None:
67 return query
69 effective_teams: List[str] = []
70 if token_teams is not None:
71 effective_teams = token_teams
72 elif user_email:
73 team_service = TeamManagementService(db)
74 user_teams = await team_service.get_user_teams(user_email)
75 effective_teams = [team.id for team in user_teams]
77 # Public-only tokens (explicit token_teams=[]) must not get owner access
78 filter_email = None if (token_teams is not None and not token_teams) else user_email
80 return self._apply_visibility_filter(query, filter_email, effective_teams, team_id)
82 def _apply_visibility_filter(
83 self,
84 query: Any,
85 user_email: Optional[str],
86 token_teams: List[str],
87 team_id: Optional[str] = None,
88 ) -> Any:
89 """Apply visibility-based access control to query.
91 Note: Callers are responsible for suppressing user_email for public-only
92 tokens. Use _apply_access_control() which handles this automatically.
94 Access rules:
95 - public: visible to all (global listing only; excluded when team_id is set)
96 - team: visible to team members (token_teams contains team_id)
97 - private: visible only to owner (requires user_email)
99 Args:
100 query: SQLAlchemy query to filter
101 user_email: User's email for owner matching (None suppresses owner access)
102 token_teams: Resolved team list (never None; use [] for no teams)
103 team_id: Optional specific team filter
105 Returns:
106 Filtered query
107 """
108 model_cls = self._visibility_model_cls
110 if team_id:
111 # User requesting specific team - verify access
112 if team_id not in token_teams:
113 return query.where(False)
115 # Scope results strictly to the requested team
116 access_conditions = [and_(model_cls.team_id == team_id, model_cls.visibility.in_(["team", "public"]))]
117 if user_email:
118 access_conditions.append(and_(model_cls.team_id == team_id, model_cls.owner_email == user_email, model_cls.visibility == "private"))
119 return query.where(or_(*access_conditions))
121 # Global listing: public resources visible to everyone
122 access_conditions = [model_cls.visibility == "public"]
124 # Owner can see their own private resources (but NOT team resources
125 # from teams outside token scope — those are covered by the
126 # token_teams condition below)
127 if user_email:
128 access_conditions.append(and_(model_cls.owner_email == user_email, model_cls.visibility == "private"))
130 if token_teams:
131 access_conditions.append(and_(model_cls.team_id.in_(token_teams), model_cls.visibility.in_(["team", "public"])))
133 return query.where(or_(*access_conditions))