Coverage for mcpgateway / services / base_service.py: 100%

43 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 00:56 +0100

1# Copyright (c) 2025 IBM Corp. All rights reserved. 

2# SPDX-License-Identifier: Apache-2.0 

3 

4"""Abstract base class for services with visibility-filtered listing.""" 

5 

6# Standard 

7from abc import ABC 

8from typing import Any, List, Optional 

9 

10# Third-Party 

11from sqlalchemy import and_ 

12from sqlalchemy import exists as sa_exists 

13from sqlalchemy import or_, select 

14from sqlalchemy.orm import Session 

15 

16# First-Party 

17from mcpgateway.services.team_management_service import TeamManagementService 

18 

19 

20class BaseService(ABC): 

21 """Abstract base class for services with visibility-filtered listing.""" 

22 

23 _visibility_model_cls: type 

24 

25 def __init_subclass__(cls, **kwargs: Any) -> None: 

26 """Ensure subclasses define _visibility_model_cls. 

27 

28 Args: 

29 **kwargs: Keyword arguments forwarded to super().__init_subclass__. 

30 

31 Raises: 

32 TypeError: If the subclass does not set _visibility_model_cls to a type. 

33 """ 

34 super().__init_subclass__(**kwargs) 

35 if not isinstance(cls.__dict__.get("_visibility_model_cls"), type): 

36 raise TypeError(f"{cls.__name__} must set _visibility_model_cls to a model class") 

37 

38 async def entity_exists(self, db: Session, entity_id: str) -> bool: 

39 """Check whether an entity exists in the database by primary key. 

40 

41 Uses a lightweight ``EXISTS`` subquery — no row data is loaded. 

42 All ``BaseService`` subclasses inherit this via ``_visibility_model_cls``. 

43 

44 Args: 

45 db: Database session. 

46 entity_id: Primary-key value to look up. 

47 

48 Returns: 

49 True if a row with the given id exists, False otherwise. 

50 """ 

51 model = self._visibility_model_cls 

52 return db.execute(select(sa_exists().where(model.id == entity_id))).scalar() 

53 

54 async def _apply_access_control( 

55 self, 

56 query: Any, 

57 db: Session, 

58 user_email: Optional[str], 

59 token_teams: Optional[List[str]], 

60 team_id: Optional[str] = None, 

61 ) -> Any: 

62 """Resolve team membership and apply visibility filtering to a query. 

63 

64 Handles the full access-control flow for list endpoints: 

65 1. Returns query unmodified when no auth context is present (admin bypass) 

66 2. Resolves effective teams from JWT token_teams or DB lookup 

67 3. Suppresses owner matching for public-only tokens (token_teams=[]) 

68 4. Delegates to _apply_visibility_filter for SQL WHERE construction 

69 

70 Args: 

71 query: SQLAlchemy query to filter 

72 db: Database session (for team membership lookup when token_teams is None) 

73 user_email: User's email. None = no user context. 

74 token_teams: Teams from JWT via normalize_token_teams(). 

75 None = admin bypass or no auth context. 

76 [] = public-only token. 

77 [...] = team-scoped token. 

78 team_id: Optional specific team filter 

79 

80 Returns: 

81 Query with visibility WHERE clauses applied, or unmodified 

82 if no auth context is present. 

83 """ 

84 if user_email is None and token_teams is None: 

85 return query 

86 

87 effective_teams: List[str] = [] 

88 if token_teams is not None: 

89 effective_teams = token_teams 

90 elif user_email: 

91 team_service = TeamManagementService(db) 

92 user_teams = await team_service.get_user_teams(user_email) 

93 effective_teams = [team.id for team in user_teams] 

94 

95 # Public-only tokens (explicit token_teams=[]) must not get owner access 

96 filter_email = None if (token_teams is not None and not token_teams) else user_email 

97 

98 return self._apply_visibility_filter(query, filter_email, effective_teams, team_id) 

99 

100 def _apply_visibility_filter( 

101 self, 

102 query: Any, 

103 user_email: Optional[str], 

104 token_teams: List[str], 

105 team_id: Optional[str] = None, 

106 ) -> Any: 

107 """Apply visibility-based access control to query. 

108 

109 Note: Callers are responsible for suppressing user_email for public-only 

110 tokens. Use _apply_access_control() which handles this automatically. 

111 

112 Access rules: 

113 - public: visible to all (global listing only; excluded when team_id is set) 

114 - team: visible to team members (token_teams contains team_id) 

115 - private: visible only to owner (requires user_email) 

116 

117 Args: 

118 query: SQLAlchemy query to filter 

119 user_email: User's email for owner matching (None suppresses owner access) 

120 token_teams: Resolved team list (never None; use [] for no teams) 

121 team_id: Optional specific team filter 

122 

123 Returns: 

124 Filtered query 

125 """ 

126 model_cls = self._visibility_model_cls 

127 

128 if team_id: 

129 # User requesting specific team - verify access 

130 if team_id not in token_teams: 

131 return query.where(False) 

132 

133 # Scope results strictly to the requested team 

134 access_conditions = [and_(model_cls.team_id == team_id, model_cls.visibility.in_(["team", "public"]))] 

135 if user_email: 

136 access_conditions.append(and_(model_cls.team_id == team_id, model_cls.owner_email == user_email, model_cls.visibility == "private")) 

137 return query.where(or_(*access_conditions)) 

138 

139 # Global listing: public resources visible to everyone 

140 access_conditions = [model_cls.visibility == "public"] 

141 

142 # Owner can see their own private resources (but NOT team resources 

143 # from teams outside token scope — those are covered by the 

144 # token_teams condition below) 

145 if user_email: 

146 access_conditions.append(and_(model_cls.owner_email == user_email, model_cls.visibility == "private")) 

147 

148 if token_teams: 

149 access_conditions.append(and_(model_cls.team_id.in_(token_teams), model_cls.visibility.in_(["team", "public"]))) 

150 

151 return query.where(or_(*access_conditions))