Coverage for mcpgateway / services / base_service.py: 100%
43 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 00:56 +0100
1# Copyright (c) 2025 IBM Corp. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
4"""Abstract base class for services with visibility-filtered listing."""
6# Standard
7from abc import ABC
8from typing import Any, List, Optional
10# Third-Party
11from sqlalchemy import and_
12from sqlalchemy import exists as sa_exists
13from sqlalchemy import or_, select
14from sqlalchemy.orm import Session
16# First-Party
17from mcpgateway.services.team_management_service import TeamManagementService
20class BaseService(ABC):
21 """Abstract base class for services with visibility-filtered listing."""
23 _visibility_model_cls: type
25 def __init_subclass__(cls, **kwargs: Any) -> None:
26 """Ensure subclasses define _visibility_model_cls.
28 Args:
29 **kwargs: Keyword arguments forwarded to super().__init_subclass__.
31 Raises:
32 TypeError: If the subclass does not set _visibility_model_cls to a type.
33 """
34 super().__init_subclass__(**kwargs)
35 if not isinstance(cls.__dict__.get("_visibility_model_cls"), type):
36 raise TypeError(f"{cls.__name__} must set _visibility_model_cls to a model class")
38 async def entity_exists(self, db: Session, entity_id: str) -> bool:
39 """Check whether an entity exists in the database by primary key.
41 Uses a lightweight ``EXISTS`` subquery — no row data is loaded.
42 All ``BaseService`` subclasses inherit this via ``_visibility_model_cls``.
44 Args:
45 db: Database session.
46 entity_id: Primary-key value to look up.
48 Returns:
49 True if a row with the given id exists, False otherwise.
50 """
51 model = self._visibility_model_cls
52 return db.execute(select(sa_exists().where(model.id == entity_id))).scalar()
54 async def _apply_access_control(
55 self,
56 query: Any,
57 db: Session,
58 user_email: Optional[str],
59 token_teams: Optional[List[str]],
60 team_id: Optional[str] = None,
61 ) -> Any:
62 """Resolve team membership and apply visibility filtering to a query.
64 Handles the full access-control flow for list endpoints:
65 1. Returns query unmodified when no auth context is present (admin bypass)
66 2. Resolves effective teams from JWT token_teams or DB lookup
67 3. Suppresses owner matching for public-only tokens (token_teams=[])
68 4. Delegates to _apply_visibility_filter for SQL WHERE construction
70 Args:
71 query: SQLAlchemy query to filter
72 db: Database session (for team membership lookup when token_teams is None)
73 user_email: User's email. None = no user context.
74 token_teams: Teams from JWT via normalize_token_teams().
75 None = admin bypass or no auth context.
76 [] = public-only token.
77 [...] = team-scoped token.
78 team_id: Optional specific team filter
80 Returns:
81 Query with visibility WHERE clauses applied, or unmodified
82 if no auth context is present.
83 """
84 if user_email is None and token_teams is None:
85 return query
87 effective_teams: List[str] = []
88 if token_teams is not None:
89 effective_teams = token_teams
90 elif user_email:
91 team_service = TeamManagementService(db)
92 user_teams = await team_service.get_user_teams(user_email)
93 effective_teams = [team.id for team in user_teams]
95 # Public-only tokens (explicit token_teams=[]) must not get owner access
96 filter_email = None if (token_teams is not None and not token_teams) else user_email
98 return self._apply_visibility_filter(query, filter_email, effective_teams, team_id)
100 def _apply_visibility_filter(
101 self,
102 query: Any,
103 user_email: Optional[str],
104 token_teams: List[str],
105 team_id: Optional[str] = None,
106 ) -> Any:
107 """Apply visibility-based access control to query.
109 Note: Callers are responsible for suppressing user_email for public-only
110 tokens. Use _apply_access_control() which handles this automatically.
112 Access rules:
113 - public: visible to all (global listing only; excluded when team_id is set)
114 - team: visible to team members (token_teams contains team_id)
115 - private: visible only to owner (requires user_email)
117 Args:
118 query: SQLAlchemy query to filter
119 user_email: User's email for owner matching (None suppresses owner access)
120 token_teams: Resolved team list (never None; use [] for no teams)
121 team_id: Optional specific team filter
123 Returns:
124 Filtered query
125 """
126 model_cls = self._visibility_model_cls
128 if team_id:
129 # User requesting specific team - verify access
130 if team_id not in token_teams:
131 return query.where(False)
133 # Scope results strictly to the requested team
134 access_conditions = [and_(model_cls.team_id == team_id, model_cls.visibility.in_(["team", "public"]))]
135 if user_email:
136 access_conditions.append(and_(model_cls.team_id == team_id, model_cls.owner_email == user_email, model_cls.visibility == "private"))
137 return query.where(or_(*access_conditions))
139 # Global listing: public resources visible to everyone
140 access_conditions = [model_cls.visibility == "public"]
142 # Owner can see their own private resources (but NOT team resources
143 # from teams outside token scope — those are covered by the
144 # token_teams condition below)
145 if user_email:
146 access_conditions.append(and_(model_cls.owner_email == user_email, model_cls.visibility == "private"))
148 if token_teams:
149 access_conditions.append(and_(model_cls.team_id.in_(token_teams), model_cls.visibility.in_(["team", "public"])))
151 return query.where(or_(*access_conditions))