Coverage for mcpgateway / plugins / framework / hooks / policies.py: 100%

32 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 03:05 +0000

1# -*- coding: utf-8 -*- 

2"""Location: ./mcpgateway/plugins/framework/hooks/policies.py 

3Copyright 2026 

4SPDX-License-Identifier: Apache-2.0 

5Authors: Fred Araujo 

6 

7Hook payload policy types and utilities. 

8 

9The framework provides the types and utilities for controlled payload 

10modification; the gateway defines the actual concrete policies. 

11 

12Examples: 

13 >>> from mcpgateway.plugins.framework.hooks.policies import HookPayloadPolicy, apply_policy 

14 >>> policy = HookPayloadPolicy(writable_fields=frozenset({"name", "args"})) 

15 >>> sorted(policy.writable_fields) 

16 ['args', 'name'] 

17""" 

18 

19# Standard 

20from dataclasses import dataclass 

21from enum import Enum 

22import logging 

23from typing import Any, Optional 

24 

25# Third-Party 

26from pydantic import BaseModel 

27 

28logger = logging.getLogger(__name__) 

29 

30 

31class DefaultHookPolicy(str, Enum): 

32 """Controls behavior for hooks without an explicit policy. 

33 

34 Attributes: 

35 ALLOW: Accept all modifications (backwards compatible). 

36 DENY: Reject all modifications (strict mode). 

37 

38 Examples: 

39 >>> DefaultHookPolicy.ALLOW 

40 <DefaultHookPolicy.ALLOW: 'allow'> 

41 >>> DefaultHookPolicy.DENY.value 

42 'deny' 

43 >>> DefaultHookPolicy('allow') 

44 <DefaultHookPolicy.ALLOW: 'allow'> 

45 """ 

46 

47 ALLOW = "allow" 

48 DENY = "deny" 

49 

50 

51@dataclass(frozen=True) 

52class HookPayloadPolicy: 

53 """Defines which payload fields plugins are allowed to modify. 

54 

55 Attributes: 

56 writable_fields: The set of field names that plugins may change. 

57 

58 Examples: 

59 >>> policy = HookPayloadPolicy(writable_fields=frozenset({"name", "args"})) 

60 >>> "name" in policy.writable_fields 

61 True 

62 >>> "secret" in policy.writable_fields 

63 False 

64 """ 

65 

66 writable_fields: frozenset[str] 

67 

68 

69_SENTINEL = object() 

70 

71 

72def apply_policy( 

73 original: BaseModel, 

74 modified: BaseModel, 

75 policy: HookPayloadPolicy, 

76) -> Optional[BaseModel]: 

77 """Apply policy-based controlled merge. 

78 

79 Only fields listed in ``policy.writable_fields`` are accepted from 

80 *modified*; all other changes are silently discarded. 

81 

82 Args: 

83 original: The original (or current) payload. 

84 modified: The payload returned by the plugin. 

85 policy: The policy defining which fields are writable. 

86 

87 Returns: 

88 An updated payload with only the allowed changes applied, or 

89 ``None`` if the plugin made no effective (allowed) changes. 

90 

91 Examples: 

92 >>> from pydantic import BaseModel, ConfigDict 

93 >>> class P(BaseModel): 

94 ... model_config = ConfigDict(frozen=True) 

95 ... name: str 

96 ... secret: str 

97 >>> orig = P(name="old", secret="s") 

98 >>> mod = P(name="new", secret="hacked") 

99 >>> policy = HookPayloadPolicy(writable_fields=frozenset({"name"})) 

100 >>> result = apply_policy(orig, mod, policy) 

101 >>> result.name 

102 'new' 

103 >>> result.secret 

104 's' 

105 """ 

106 updates: dict[str, Any] = {} 

107 rejected: list[str] = [] 

108 for field in type(modified).model_fields: 

109 old_val = getattr(original, field, _SENTINEL) 

110 new_val = getattr(modified, field, _SENTINEL) 

111 if new_val is _SENTINEL: 

112 continue 

113 # Use model_dump() for BaseModel comparisons to ensure reliable 

114 # equality across StructuredData / extra="allow" instances. 

115 if isinstance(old_val, BaseModel) and isinstance(new_val, BaseModel): 

116 if old_val.model_dump() == new_val.model_dump(): 

117 continue 

118 elif new_val == old_val: 

119 continue 

120 if field in policy.writable_fields: 

121 updates[field] = new_val 

122 else: 

123 rejected.append(field) 

124 if rejected: 

125 logger.warning("Policy rejected modifications to non-writable fields: %s", rejected) 

126 return original.model_copy(update=updates) if updates else None