Coverage for mcpgateway / plugins / framework / hooks / policies.py: 100%
32 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 03:05 +0000
1# -*- coding: utf-8 -*-
2"""Location: ./mcpgateway/plugins/framework/hooks/policies.py
3Copyright 2026
4SPDX-License-Identifier: Apache-2.0
5Authors: Fred Araujo
7Hook payload policy types and utilities.
9The framework provides the types and utilities for controlled payload
10modification; the gateway defines the actual concrete policies.
12Examples:
13 >>> from mcpgateway.plugins.framework.hooks.policies import HookPayloadPolicy, apply_policy
14 >>> policy = HookPayloadPolicy(writable_fields=frozenset({"name", "args"}))
15 >>> sorted(policy.writable_fields)
16 ['args', 'name']
17"""
19# Standard
20from dataclasses import dataclass
21from enum import Enum
22import logging
23from typing import Any, Optional
25# Third-Party
26from pydantic import BaseModel
28logger = logging.getLogger(__name__)
31class DefaultHookPolicy(str, Enum):
32 """Controls behavior for hooks without an explicit policy.
34 Attributes:
35 ALLOW: Accept all modifications (backwards compatible).
36 DENY: Reject all modifications (strict mode).
38 Examples:
39 >>> DefaultHookPolicy.ALLOW
40 <DefaultHookPolicy.ALLOW: 'allow'>
41 >>> DefaultHookPolicy.DENY.value
42 'deny'
43 >>> DefaultHookPolicy('allow')
44 <DefaultHookPolicy.ALLOW: 'allow'>
45 """
47 ALLOW = "allow"
48 DENY = "deny"
51@dataclass(frozen=True)
52class HookPayloadPolicy:
53 """Defines which payload fields plugins are allowed to modify.
55 Attributes:
56 writable_fields: The set of field names that plugins may change.
58 Examples:
59 >>> policy = HookPayloadPolicy(writable_fields=frozenset({"name", "args"}))
60 >>> "name" in policy.writable_fields
61 True
62 >>> "secret" in policy.writable_fields
63 False
64 """
66 writable_fields: frozenset[str]
69_SENTINEL = object()
72def apply_policy(
73 original: BaseModel,
74 modified: BaseModel,
75 policy: HookPayloadPolicy,
76) -> Optional[BaseModel]:
77 """Apply policy-based controlled merge.
79 Only fields listed in ``policy.writable_fields`` are accepted from
80 *modified*; all other changes are silently discarded.
82 Args:
83 original: The original (or current) payload.
84 modified: The payload returned by the plugin.
85 policy: The policy defining which fields are writable.
87 Returns:
88 An updated payload with only the allowed changes applied, or
89 ``None`` if the plugin made no effective (allowed) changes.
91 Examples:
92 >>> from pydantic import BaseModel, ConfigDict
93 >>> class P(BaseModel):
94 ... model_config = ConfigDict(frozen=True)
95 ... name: str
96 ... secret: str
97 >>> orig = P(name="old", secret="s")
98 >>> mod = P(name="new", secret="hacked")
99 >>> policy = HookPayloadPolicy(writable_fields=frozenset({"name"}))
100 >>> result = apply_policy(orig, mod, policy)
101 >>> result.name
102 'new'
103 >>> result.secret
104 's'
105 """
106 updates: dict[str, Any] = {}
107 rejected: list[str] = []
108 for field in type(modified).model_fields:
109 old_val = getattr(original, field, _SENTINEL)
110 new_val = getattr(modified, field, _SENTINEL)
111 if new_val is _SENTINEL:
112 continue
113 # Use model_dump() for BaseModel comparisons to ensure reliable
114 # equality across StructuredData / extra="allow" instances.
115 if isinstance(old_val, BaseModel) and isinstance(new_val, BaseModel):
116 if old_val.model_dump() == new_val.model_dump():
117 continue
118 elif new_val == old_val:
119 continue
120 if field in policy.writable_fields:
121 updates[field] = new_val
122 else:
123 rejected.append(field)
124 if rejected:
125 logger.warning("Policy rejected modifications to non-writable fields: %s", rejected)
126 return original.model_copy(update=updates) if updates else None