An implementation of the Aho-Corasick string matching automaton.
This class implements a finite state machine that can efficiently match multiple
patterns simultaneously in a given text. It uses a trie data structure augmented
with failure links to achieve linear-time pattern matching.
Attributes:
| Name |
Type |
Description |
patterns |
|
A dictionary mapping pattern names to their string values.
|
next_states |
List[Dict[str, int]]
|
A list of dictionaries representing state transitions.
|
fail |
List[int]
|
A list of failure link states.
|
output |
List[List[str]]
|
A list of pattern names associated with each state.
|
current_state |
|
The current state of the automaton.
|
Parameters:
| Name |
Type |
Description |
Default |
patterns
|
Dict[str, str]
|
A dictionary where keys are pattern names and values are the
pattern strings to match.
|
required
|
Source code in src/llm/pattern_detection/aho_corasick.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134 | class AhoCorasickAutomaton:
"""An implementation of the Aho-Corasick string matching automaton.
This class implements a finite state machine that can efficiently match multiple
patterns simultaneously in a given text. It uses a trie data structure augmented
with failure links to achieve linear-time pattern matching.
Attributes:
patterns: A dictionary mapping pattern names to their string values.
next_states: A list of dictionaries representing state transitions.
fail: A list of failure link states.
output: A list of pattern names associated with each state.
current_state: The current state of the automaton.
Args:
patterns: A dictionary where keys are pattern names and values are the
pattern strings to match.
"""
def __init__(self, patterns: Dict[str, str]):
"""Initializes the Aho-Corasick automaton with the given patterns.
Args:
patterns: A dictionary mapping pattern names to their string values.
"""
self.patterns = patterns
self.next_states: List[Dict[str, int]] = []
self.fail: List[int] = []
self.output: List[List[str]] = []
self.current_state = 0
self._build_machine()
def _build_machine(self):
"""Builds the Aho-Corasick automaton.
Constructs the trie structure, sets up failure links, and computes output
functions for the automaton. This is called automatically during initialization.
"""
# Initialize root state
self.next_states.append({})
self.fail.append(0)
self.output.append([])
# Build the trie from patterns
for pattern_name, pattern_str in self.patterns.items():
self._insert(pattern_str, pattern_name)
# Build failure links using BFS
queue = deque()
for char, nxt_state in self.next_states[0].items():
self.fail[nxt_state] = 0
queue.append(nxt_state)
while queue:
state = queue.popleft()
for char, nxt_state in self.next_states[state].items():
queue.append(nxt_state)
f = self.fail[state]
while f > 0 and char not in self.next_states[f]:
f = self.fail[f]
f = self.next_states[f].get(char, 0)
self.fail[nxt_state] = f
self.output[nxt_state].extend(self.output[f])
def _insert(self, pattern_str: str, pattern_name: str):
"""Inserts a pattern into the trie structure of the automaton.
Args:
pattern_str: The string pattern to insert.
pattern_name: The name associated with the pattern.
"""
current_state = 0
for char in pattern_str:
if char not in self.next_states[current_state]:
self.next_states.append({})
self.fail.append(0)
self.output.append([])
self.next_states[current_state][char] = len(self.next_states) - 1
current_state = self.next_states[current_state][char]
self.output[current_state].append(pattern_name)
def reset_state(self):
"""Resets the automaton to its initial state.
This method should be called before starting a new search if the automaton
has been used previously.
"""
self.current_state = 0
def search_chunk(self, chunk: str) -> List[Tuple[int, str]]:
"""Searches for pattern matches in the given text chunk.
Args:
chunk: The text string to search for pattern matches.
Returns:
A list of tuples, where each tuple contains:
- The ending index of the match in the chunk (int)
- The name of the matched pattern (str)
``` python title="Example usage"
automaton = AhoCorasickAutomaton({'pat1': 'abc', 'pat2': 'bc'})
automaton.search_chunk('abc') # [(1, 'pat2'), (2, 'pat1')]
```
"""
found_patterns = []
for i, char in enumerate(chunk):
while self.current_state > 0 and char not in self.next_states[self.current_state]:
self.current_state = self.fail[self.current_state]
self.current_state = self.next_states[self.current_state].get(char, 0)
if self.output[self.current_state]:
for pattern_name in self.output[self.current_state]:
found_patterns.append((i, pattern_name))
return found_patterns
|
__init__(patterns)
Initializes the Aho-Corasick automaton with the given patterns.
Parameters:
| Name |
Type |
Description |
Default |
patterns
|
Dict[str, str]
|
A dictionary mapping pattern names to their string values.
|
required
|
Source code in src/llm/pattern_detection/aho_corasick.py
40
41
42
43
44
45
46
47
48
49
50
51 | def __init__(self, patterns: Dict[str, str]):
"""Initializes the Aho-Corasick automaton with the given patterns.
Args:
patterns: A dictionary mapping pattern names to their string values.
"""
self.patterns = patterns
self.next_states: List[Dict[str, int]] = []
self.fail: List[int] = []
self.output: List[List[str]] = []
self.current_state = 0
self._build_machine()
|
reset_state()
Resets the automaton to its initial state.
This method should be called before starting a new search if the automaton
has been used previously.
Source code in src/llm/pattern_detection/aho_corasick.py
102
103
104
105
106
107
108 | def reset_state(self):
"""Resets the automaton to its initial state.
This method should be called before starting a new search if the automaton
has been used previously.
"""
self.current_state = 0
|
search_chunk(chunk)
Searches for pattern matches in the given text chunk.
Parameters:
| Name |
Type |
Description |
Default |
chunk
|
str
|
The text string to search for pattern matches.
|
required
|
Returns:
| Type |
Description |
List[Tuple[int, str]]
|
A list of tuples, where each tuple contains:
- The ending index of the match in the chunk (int)
- The name of the matched pattern (str)
|
Example usageautomaton = AhoCorasickAutomaton({'pat1': 'abc', 'pat2': 'bc'})
automaton.search_chunk('abc') # [(1, 'pat2'), (2, 'pat1')]
Source code in src/llm/pattern_detection/aho_corasick.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134 | def search_chunk(self, chunk: str) -> List[Tuple[int, str]]:
"""Searches for pattern matches in the given text chunk.
Args:
chunk: The text string to search for pattern matches.
Returns:
A list of tuples, where each tuple contains:
- The ending index of the match in the chunk (int)
- The name of the matched pattern (str)
``` python title="Example usage"
automaton = AhoCorasickAutomaton({'pat1': 'abc', 'pat2': 'bc'})
automaton.search_chunk('abc') # [(1, 'pat2'), (2, 'pat1')]
```
"""
found_patterns = []
for i, char in enumerate(chunk):
while self.current_state > 0 and char not in self.next_states[self.current_state]:
self.current_state = self.fail[self.current_state]
self.current_state = self.next_states[self.current_state].get(char, 0)
if self.output[self.current_state]:
for pattern_name in self.output[self.current_state]:
found_patterns.append((i, pattern_name))
return found_patterns
|