Bases: BaseBufferedProcessor
A buffered processor that performs exact pattern matching.
This class implements exact pattern matching using the Aho-Corasick algorithm
for efficient multiple pattern matching. Unlike the normalized version, this
processor is sensitive to whitespace and performs exact string matching.
Attributes:
Parameters:
Source code in src/llm/pattern_detection/buffered_processor_standard.py
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85 | class AhoCorasickBufferedProcessor(BaseBufferedProcessor):
"""A buffered processor that performs exact pattern matching.
This class implements exact pattern matching using the Aho-Corasick algorithm
for efficient multiple pattern matching. Unlike the normalized version, this
processor is sensitive to whitespace and performs exact string matching.
Attributes:
`automaton`: An instance of AhoCorasickAutomaton for pattern matching.
`max_pattern_len`: The length of the longest pattern in the raw patterns.
`tool_call_message`: Message to include when a tool call is detected.
Args:
`yaml_path`: Path to the YAML file containing pattern definitions.
`tool_call_message`: Optional message to use when a tool call is detected.
Defaults to "Tool call detected."
"""
def __init__(self, yaml_path: str, tool_call_message: str = "Tool call detected."):
super().__init__(tool_call_message)
raw_patterns = load_patterns(yaml_path)
self.automaton = AhoCorasickAutomaton(raw_patterns)
self.max_pattern_len = max(len(p) for p in raw_patterns.values())
self.automaton.reset_state()
def process_chunk_impl(self, combined_original: str):
"""Processes a chunk of text to find exact pattern matches.
This method performs exact pattern matching on the input text and returns
the earliest match found along with any safe text that can be output.
Args:
`combined_original`: The text chunk to process.
Returns:
A tuple containing:
- `PatternMatchResult`: Result object containing match information and
processed text.
- `str`: Any trailing text that needs to be carried over to the next
chunk.
``` python title="Example usage"
processor = AhoCorasickBufferedProcessor('patterns.yaml')
result, trailing = processor.process_chunk_impl('some text')
print(result.matched, result.pattern_name) # False None
```
"""
result = PatternMatchResult()
# Search in the original text
matches = self.automaton.search_chunk(combined_original)
if not matches:
# Keep up to max_pattern_len - 1 characters for partial match
keep_len = min(self.max_pattern_len - 1, len(combined_original))
if keep_len > 0:
safe_text = combined_original[:-keep_len]
new_trailing = combined_original[-keep_len:]
else:
safe_text = combined_original
new_trailing = ""
result.output = safe_text
return result, new_trailing
# Otherwise, use the earliest match
earliest_end, pattern_name = min(matches, key=lambda x: x[0])
pattern_str = self.automaton.patterns[pattern_name]
match_start = earliest_end - len(pattern_str) + 1
result.matched = True
result.pattern_name = pattern_name
result.tool_call_message = self.tool_call_message
result.output = combined_original[:match_start]
result.text_with_tool_call = combined_original[match_start:]
new_trailing = ""
self.automaton.reset_state()
return result, new_trailing
|
process_chunk_impl(combined_original)
Processes a chunk of text to find exact pattern matches.
This method performs exact pattern matching on the input text and returns
the earliest match found along with any safe text that can be output.
Parameters:
Returns:
Example usageprocessor = AhoCorasickBufferedProcessor('patterns.yaml')
result, trailing = processor.process_chunk_impl('some text')
print(result.matched, result.pattern_name) # False None
Source code in src/llm/pattern_detection/buffered_processor_standard.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85 | def process_chunk_impl(self, combined_original: str):
"""Processes a chunk of text to find exact pattern matches.
This method performs exact pattern matching on the input text and returns
the earliest match found along with any safe text that can be output.
Args:
`combined_original`: The text chunk to process.
Returns:
A tuple containing:
- `PatternMatchResult`: Result object containing match information and
processed text.
- `str`: Any trailing text that needs to be carried over to the next
chunk.
``` python title="Example usage"
processor = AhoCorasickBufferedProcessor('patterns.yaml')
result, trailing = processor.process_chunk_impl('some text')
print(result.matched, result.pattern_name) # False None
```
"""
result = PatternMatchResult()
# Search in the original text
matches = self.automaton.search_chunk(combined_original)
if not matches:
# Keep up to max_pattern_len - 1 characters for partial match
keep_len = min(self.max_pattern_len - 1, len(combined_original))
if keep_len > 0:
safe_text = combined_original[:-keep_len]
new_trailing = combined_original[-keep_len:]
else:
safe_text = combined_original
new_trailing = ""
result.output = safe_text
return result, new_trailing
# Otherwise, use the earliest match
earliest_end, pattern_name = min(matches, key=lambda x: x[0])
pattern_str = self.automaton.patterns[pattern_name]
match_start = earliest_end - len(pattern_str) + 1
result.matched = True
result.pattern_name = pattern_name
result.tool_call_message = self.tool_call_message
result.output = combined_original[:match_start]
result.text_with_tool_call = combined_original[match_start:]
new_trailing = ""
self.automaton.reset_state()
return result, new_trailing
|