Coverage for mcpgateway / services / base_service.py: 100%

38 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# Copyright (c) 2025 IBM Corp. All rights reserved. 

2# SPDX-License-Identifier: Apache-2.0 

3 

4"""Abstract base class for services with visibility-filtered listing.""" 

5 

6# Standard 

7from abc import ABC 

8from typing import Any, List, Optional 

9 

10# Third-Party 

11from sqlalchemy import and_, or_ 

12from sqlalchemy.orm import Session 

13 

14# First-Party 

15from mcpgateway.services.team_management_service import TeamManagementService 

16 

17 

18class BaseService(ABC): 

19 """Abstract base class for services with visibility-filtered listing.""" 

20 

21 _visibility_model_cls: type 

22 

23 def __init_subclass__(cls, **kwargs: Any) -> None: 

24 """Ensure subclasses define _visibility_model_cls. 

25 

26 Args: 

27 **kwargs: Keyword arguments forwarded to super().__init_subclass__. 

28 

29 Raises: 

30 TypeError: If the subclass does not set _visibility_model_cls to a type. 

31 """ 

32 super().__init_subclass__(**kwargs) 

33 if not isinstance(cls.__dict__.get("_visibility_model_cls"), type): 

34 raise TypeError(f"{cls.__name__} must set _visibility_model_cls to a model class") 

35 

36 async def _apply_access_control( 

37 self, 

38 query: Any, 

39 db: Session, 

40 user_email: Optional[str], 

41 token_teams: Optional[List[str]], 

42 team_id: Optional[str] = None, 

43 ) -> Any: 

44 """Resolve team membership and apply visibility filtering to a query. 

45 

46 Handles the full access-control flow for list endpoints: 

47 1. Returns query unmodified when no auth context is present (admin bypass) 

48 2. Resolves effective teams from JWT token_teams or DB lookup 

49 3. Suppresses owner matching for public-only tokens (token_teams=[]) 

50 4. Delegates to _apply_visibility_filter for SQL WHERE construction 

51 

52 Args: 

53 query: SQLAlchemy query to filter 

54 db: Database session (for team membership lookup when token_teams is None) 

55 user_email: User's email. None = no user context. 

56 token_teams: Teams from JWT via normalize_token_teams(). 

57 None = admin bypass or no auth context. 

58 [] = public-only token. 

59 [...] = team-scoped token. 

60 team_id: Optional specific team filter 

61 

62 Returns: 

63 Query with visibility WHERE clauses applied, or unmodified 

64 if no auth context is present. 

65 """ 

66 if user_email is None and token_teams is None: 

67 return query 

68 

69 effective_teams: List[str] = [] 

70 if token_teams is not None: 

71 effective_teams = token_teams 

72 elif user_email: 

73 team_service = TeamManagementService(db) 

74 user_teams = await team_service.get_user_teams(user_email) 

75 effective_teams = [team.id for team in user_teams] 

76 

77 # Public-only tokens (explicit token_teams=[]) must not get owner access 

78 filter_email = None if (token_teams is not None and not token_teams) else user_email 

79 

80 return self._apply_visibility_filter(query, filter_email, effective_teams, team_id) 

81 

82 def _apply_visibility_filter( 

83 self, 

84 query: Any, 

85 user_email: Optional[str], 

86 token_teams: List[str], 

87 team_id: Optional[str] = None, 

88 ) -> Any: 

89 """Apply visibility-based access control to query. 

90 

91 Note: Callers are responsible for suppressing user_email for public-only 

92 tokens. Use _apply_access_control() which handles this automatically. 

93 

94 Access rules: 

95 - public: visible to all (global listing only; excluded when team_id is set) 

96 - team: visible to team members (token_teams contains team_id) 

97 - private: visible only to owner (requires user_email) 

98 

99 Args: 

100 query: SQLAlchemy query to filter 

101 user_email: User's email for owner matching (None suppresses owner access) 

102 token_teams: Resolved team list (never None; use [] for no teams) 

103 team_id: Optional specific team filter 

104 

105 Returns: 

106 Filtered query 

107 """ 

108 model_cls = self._visibility_model_cls 

109 

110 if team_id: 

111 # User requesting specific team - verify access 

112 if team_id not in token_teams: 

113 return query.where(False) 

114 

115 # Scope results strictly to the requested team 

116 access_conditions = [and_(model_cls.team_id == team_id, model_cls.visibility.in_(["team", "public"]))] 

117 if user_email: 

118 access_conditions.append(and_(model_cls.team_id == team_id, model_cls.owner_email == user_email, model_cls.visibility == "private")) 

119 return query.where(or_(*access_conditions)) 

120 

121 # Global listing: public resources visible to everyone 

122 access_conditions = [model_cls.visibility == "public"] 

123 

124 # Owner can see their own private resources (but NOT team resources 

125 # from teams outside token scope — those are covered by the 

126 # token_teams condition below) 

127 if user_email: 

128 access_conditions.append(and_(model_cls.owner_email == user_email, model_cls.visibility == "private")) 

129 

130 if token_teams: 

131 access_conditions.append(and_(model_cls.team_id.in_(token_teams), model_cls.visibility.in_(["team", "public"]))) 

132 

133 return query.where(or_(*access_conditions))