Skip to content

src.llm.pattern_detection.aho_corasick.AhoCorasickAutomaton

An implementation of the Aho-Corasick string matching automaton.

This class implements a finite state machine that can efficiently match multiple patterns simultaneously in a given text. It uses a trie data structure augmented with failure links to achieve linear-time pattern matching.

Attributes:

Name Type Description
patterns

A dictionary mapping pattern names to their string values.

next_states List[Dict[str, int]]

A list of dictionaries representing state transitions.

fail List[int]

A list of failure link states.

output List[List[str]]

A list of pattern names associated with each state.

current_state

The current state of the automaton.

Parameters:

Name Type Description Default
patterns Dict[str, str]

A dictionary where keys are pattern names and values are the pattern strings to match.

required
Source code in src/llm/pattern_detection/aho_corasick.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
class AhoCorasickAutomaton:
    """An implementation of the Aho-Corasick string matching automaton.

    This class implements a finite state machine that can efficiently match multiple
    patterns simultaneously in a given text. It uses a trie data structure augmented
    with failure links to achieve linear-time pattern matching.

    Attributes:
        patterns: A dictionary mapping pattern names to their string values.
        next_states: A list of dictionaries representing state transitions.
        fail: A list of failure link states.
        output: A list of pattern names associated with each state.
        current_state: The current state of the automaton.

    Args:
        patterns: A dictionary where keys are pattern names and values are the
            pattern strings to match.
    """

    def __init__(self, patterns: Dict[str, str]):
        """Initializes the Aho-Corasick automaton with the given patterns.

        Args:
            patterns: A dictionary mapping pattern names to their string values.
        """
        self.patterns = patterns
        self.next_states: List[Dict[str, int]] = []
        self.fail: List[int] = []
        self.output: List[List[str]] = []
        self.current_state = 0
        self._build_machine()

    def _build_machine(self):
        """Builds the Aho-Corasick automaton.

        Constructs the trie structure, sets up failure links, and computes output
        functions for the automaton. This is called automatically during initialization.
        """
        # Initialize root state
        self.next_states.append({})
        self.fail.append(0)
        self.output.append([])

        # Build the trie from patterns
        for pattern_name, pattern_str in self.patterns.items():
            self._insert(pattern_str, pattern_name)

        # Build failure links using BFS
        queue = deque()
        for char, nxt_state in self.next_states[0].items():
            self.fail[nxt_state] = 0
            queue.append(nxt_state)

        while queue:
            state = queue.popleft()
            for char, nxt_state in self.next_states[state].items():
                queue.append(nxt_state)
                f = self.fail[state]
                while f > 0 and char not in self.next_states[f]:
                    f = self.fail[f]
                f = self.next_states[f].get(char, 0)
                self.fail[nxt_state] = f
                self.output[nxt_state].extend(self.output[f])

    def _insert(self, pattern_str: str, pattern_name: str):
        """Inserts a pattern into the trie structure of the automaton.

         Args:
             pattern_str: The string pattern to insert.
             pattern_name: The name associated with the pattern.
         """
        current_state = 0
        for char in pattern_str:
            if char not in self.next_states[current_state]:
                self.next_states.append({})
                self.fail.append(0)
                self.output.append([])
                self.next_states[current_state][char] = len(self.next_states) - 1
            current_state = self.next_states[current_state][char]
        self.output[current_state].append(pattern_name)

    def reset_state(self):
        """Resets the automaton to its initial state.

        This method should be called before starting a new search if the automaton
        has been used previously.
        """
        self.current_state = 0

    def search_chunk(self, chunk: str) -> List[Tuple[int, str]]:
        """Searches for pattern matches in the given text chunk.

        Args:
            chunk: The text string to search for pattern matches.

        Returns:
            A list of tuples, where each tuple contains:
                - The ending index of the match in the chunk (int)
                - The name of the matched pattern (str)

        ``` python title="Example usage"
        automaton = AhoCorasickAutomaton({'pat1': 'abc', 'pat2': 'bc'})
        automaton.search_chunk('abc')  # [(1, 'pat2'), (2, 'pat1')]
        ```
        """
        found_patterns = []
        for i, char in enumerate(chunk):
            while self.current_state > 0 and char not in self.next_states[self.current_state]:
                self.current_state = self.fail[self.current_state]
            self.current_state = self.next_states[self.current_state].get(char, 0)
            if self.output[self.current_state]:
                for pattern_name in self.output[self.current_state]:
                    found_patterns.append((i, pattern_name))
        return found_patterns

__init__(patterns)

Initializes the Aho-Corasick automaton with the given patterns.

Parameters:

Name Type Description Default
patterns Dict[str, str]

A dictionary mapping pattern names to their string values.

required
Source code in src/llm/pattern_detection/aho_corasick.py
40
41
42
43
44
45
46
47
48
49
50
51
def __init__(self, patterns: Dict[str, str]):
    """Initializes the Aho-Corasick automaton with the given patterns.

    Args:
        patterns: A dictionary mapping pattern names to their string values.
    """
    self.patterns = patterns
    self.next_states: List[Dict[str, int]] = []
    self.fail: List[int] = []
    self.output: List[List[str]] = []
    self.current_state = 0
    self._build_machine()

reset_state()

Resets the automaton to its initial state.

This method should be called before starting a new search if the automaton has been used previously.

Source code in src/llm/pattern_detection/aho_corasick.py
102
103
104
105
106
107
108
def reset_state(self):
    """Resets the automaton to its initial state.

    This method should be called before starting a new search if the automaton
    has been used previously.
    """
    self.current_state = 0

search_chunk(chunk)

Searches for pattern matches in the given text chunk.

Parameters:

Name Type Description Default
chunk str

The text string to search for pattern matches.

required

Returns:

Type Description
List[Tuple[int, str]]

A list of tuples, where each tuple contains: - The ending index of the match in the chunk (int) - The name of the matched pattern (str)

Example usage
automaton = AhoCorasickAutomaton({'pat1': 'abc', 'pat2': 'bc'})
automaton.search_chunk('abc')  # [(1, 'pat2'), (2, 'pat1')]
Source code in src/llm/pattern_detection/aho_corasick.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def search_chunk(self, chunk: str) -> List[Tuple[int, str]]:
    """Searches for pattern matches in the given text chunk.

    Args:
        chunk: The text string to search for pattern matches.

    Returns:
        A list of tuples, where each tuple contains:
            - The ending index of the match in the chunk (int)
            - The name of the matched pattern (str)

    ``` python title="Example usage"
    automaton = AhoCorasickAutomaton({'pat1': 'abc', 'pat2': 'bc'})
    automaton.search_chunk('abc')  # [(1, 'pat2'), (2, 'pat1')]
    ```
    """
    found_patterns = []
    for i, char in enumerate(chunk):
        while self.current_state > 0 and char not in self.next_states[self.current_state]:
            self.current_state = self.fail[self.current_state]
        self.current_state = self.next_states[self.current_state].get(char, 0)
        if self.output[self.current_state]:
            for pattern_name in self.output[self.current_state]:
                found_patterns.append((i, pattern_name))
    return found_patterns