Skip to content

src.llm.tool_detection.manual_detection_strategy.ManualToolCallDetectionStrategy

Bases: BaseToolCallDetectionStrategy

A strategy for detecting tool calls in streaming LLM output using pattern matching.

This class implements manual detection of tool calls by processing streaming chunks of text using the Aho-Corasick algorithm for pattern matching. It maintains internal state to track tool call boundaries and accumulate content.

Parameters:

Name Type Description Default
parser BaseToolCallParser

Parser instance for processing detected tool calls.

required
pattern_config_path str

Path to YAML config file containing tool call patterns. Defaults to "src/configs/tool_call_patterns.yaml".

'src/configs/tool_call_patterns.yaml'

Attributes:

Name Type Description
tool_call_parser BaseToolCallParser

Parser for processing tool calls.

pattern_detector AhoCorasickBufferedProcessor

Pattern matching processor.

pre_tool_call_content List[str]

Buffer for content before tool call.

tool_call_buffer str

Buffer for accumulating tool call content.

in_tool_call bool

Flag indicating if currently processing a tool call.

accumulation_mode bool

Flag for content accumulation mode.

Example
parser = JSONToolCallParser()
detector = ManualToolCallDetectionStrategy(parser)

# Process streaming chunks
async for chunk in stream:
    result = await detector.detect_chunk(chunk, context)
    if result.state == DetectionState.COMPLETE_MATCH:
        # Handle detected tool call
        process_tool_calls(result.tool_calls)
Source code in src/llm/tool_detection/manual_detection_strategy.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
class ManualToolCallDetectionStrategy(BaseToolCallDetectionStrategy):
    """A strategy for detecting tool calls in streaming LLM output using pattern matching.

    This class implements manual detection of tool calls by processing streaming chunks
    of text using the Aho-Corasick algorithm for pattern matching. It maintains internal
    state to track tool call boundaries and accumulate content.

    Args:
        parser (BaseToolCallParser): Parser instance for processing detected tool calls.
        pattern_config_path (str, optional): Path to YAML config file containing tool call patterns.
            Defaults to "src/configs/tool_call_patterns.yaml".

    Attributes:
        tool_call_parser (BaseToolCallParser): Parser for processing tool calls.
        pattern_detector (AhoCorasickBufferedProcessor): Pattern matching processor.
        pre_tool_call_content (List[str]): Buffer for content before tool call.
        tool_call_buffer (str): Buffer for accumulating tool call content.
        in_tool_call (bool): Flag indicating if currently processing a tool call.
        accumulation_mode (bool): Flag for content accumulation mode.

    Example:
        ```python
        parser = JSONToolCallParser()
        detector = ManualToolCallDetectionStrategy(parser)

        # Process streaming chunks
        async for chunk in stream:
            result = await detector.detect_chunk(chunk, context)
            if result.state == DetectionState.COMPLETE_MATCH:
                # Handle detected tool call
                process_tool_calls(result.tool_calls)
        ```
    """

    def __init__(self, parser: BaseToolCallParser, pattern_config_path: str = "src/configs/tool_call_patterns.yaml"):
        self.logger = logging.getLogger(self.__class__.__name__)
        self.logger.debug("Initializing ManualToolCallDetectionStrategy with config: %s", pattern_config_path)
        self.tool_call_parser = parser
        self.pattern_detector = AhoCorasickBufferedProcessorNormalized(pattern_config_path)

        self.pre_tool_call_content: List[str] = []
        self.tool_call_buffer: str = ""
        self.in_tool_call: bool = False
        self.accumulation_mode: bool = False

    def reset(self) -> None:
        """Reset all internal state of the detection strategy.

        This method clears all buffers and resets flags to their initial state.
        Should be called between processing different streams or after errors.
        """
        self.logger.debug("Resetting detector state")
        self.pattern_detector.reset_states()
        self.pre_tool_call_content = []
        self.tool_call_buffer = ""
        self.in_tool_call = False
        self.accumulation_mode = False

    async def detect_chunk(self, sse_chunk: SSEChunk, context: StreamContext) -> DetectionResult:
        """
        Process a single chunk of streaming content for tool call detection.

        Args:
            sse_chunk (SSEChunk): The chunk of streaming content to process.
            context (StreamContext): Context information for the current stream.

        Returns:
            DetectionResult: Result of processing the chunk, including detection state
                and any content or tool calls found.
        """
        # If the chunk has no valid delta content, return NO_MATCH.
        if not sse_chunk.choices or not sse_chunk.choices[0].delta:
            return DetectionResult(state=DetectionState.NO_MATCH, sse_chunk=sse_chunk)

        chunk_content = sse_chunk.choices[0].delta.content
        if not chunk_content:
            return DetectionResult(state=DetectionState.NO_MATCH, sse_chunk=sse_chunk)

        # If already in tool call detection mode, continue accumulating and report a partial match.
        if self.in_tool_call:
            self.tool_call_buffer += chunk_content
            return DetectionResult(
                state=DetectionState.PARTIAL_MATCH,
                sse_chunk=sse_chunk
            )

        # Process the chunk through the pattern detector.
        result = await self.pattern_detector.process_chunk(chunk_content)

        if result.error:
            return DetectionResult(state=DetectionState.NO_MATCH, sse_chunk=sse_chunk)

        # If a pattern is matched, switch into tool call mode.
        # Return any content remaining that could have been in the same chunk as the patter or in a buffer
        if result.matched:
            self.in_tool_call = True
            self.tool_call_buffer = result.text_with_tool_call
            return DetectionResult(
                state=DetectionState.PARTIAL_MATCH,
                content=result.output,
                sse_chunk=sse_chunk
            )

        # For regular content, if there is any output, return it as PARTIAL_MATCH.
        if result.output:
            return DetectionResult(
                state=DetectionState.PARTIAL_MATCH,
                content=result.output,
                sse_chunk=sse_chunk
            )

        # If nothing of interest is found, return NO_MATCH.
        return DetectionResult(state=DetectionState.NO_MATCH, sse_chunk=sse_chunk)

    async def finalize_detection(self, context: StreamContext) -> DetectionResult:
        """Finalize the detection process and handle any accumulated content.

        This method is called at the end of a stream to process any remaining content
        in the buffers and return final detection results.

        Args:
            context (StreamContext): Context information for the current stream.

        Returns:
            DetectionResult: Final result of the detection process, including any
                complete tool calls or remaining content.
        """
        self.logger.debug("Finalizing detection")
        # Flush any remaining content from pattern detector
        final_result = await self.pattern_detector.flush_buffer()

        if self.in_tool_call:
            self.logger.debug("Processing final tool call buffer")
            if final_result.output:
                self.tool_call_buffer += final_result.output

            # Parse accumulated tool call
            parsed_tool_call_data = self.tool_call_parser.parse(self.tool_call_buffer)
            self.logger.debug(f"Tool call buffer: {self.tool_call_buffer}")
            self.logger.debug(f"Parsed tool call data: {parsed_tool_call_data}")

            if "error" in parsed_tool_call_data:
                self.logger.error(f"Tool call parsing failed: {parsed_tool_call_data['error']}")
                return DetectionResult(
                    state=DetectionState.NO_MATCH,
                    content="Sorry, but I was unable to complete your request. Please try again.",
                )

            parsed_tool_calls = self._extract_tool_calls(parsed_tool_call_data)
            self.logger.debug("Successfully parsed %d tool calls", len(parsed_tool_calls))

            return DetectionResult(
                state=DetectionState.COMPLETE_MATCH,
                tool_calls=parsed_tool_calls
            )

        # No tool call detected, return any final content
        if final_result.output:
            self.logger.debug("Returning final content: %s", final_result.output[:50])
            return DetectionResult(
                state=DetectionState.PARTIAL_MATCH,
                content=final_result.output
            )

        self.logger.debug("No final content to return")
        return DetectionResult(state=DetectionState.NO_MATCH)

    def _extract_tool_calls(self, parsed_output: dict) -> List[ToolCall]:
        """Extract structured tool calls from parsed JSON output.

        Converts the parsed JSON format into a list of ToolCall objects with
        appropriate structure and typing.

        Args:
            parsed_output (dict): The parsed JSON output containing tool call data.

        Returns:
            List[ToolCall]: List of structured tool call objects ready for processing.
        """
        tool_calls = []
        for tool_call_dict in parsed_output.get("tool_calls", []):
            tool_call_args = tool_call_dict.get("parameters", tool_call_dict.get("arguments"))
            self.logger.debug("Extracting tool call arguments: %s", tool_call_args)
            tool_calls.append(ToolCall(
                id='123456789',  # Placeholder ID; modify as needed
                type=tool_call_dict.get("type", "function"),
                function=FunctionDetail(
                    name=tool_call_dict.get("name"),
                    arguments=str(tool_call_args)
                )
            ))
        return tool_calls

detect_chunk(sse_chunk, context) async

Process a single chunk of streaming content for tool call detection.

Parameters:

Name Type Description Default
sse_chunk SSEChunk

The chunk of streaming content to process.

required
context StreamContext

Context information for the current stream.

required

Returns:

Name Type Description
DetectionResult DetectionResult

Result of processing the chunk, including detection state and any content or tool calls found.

Source code in src/llm/tool_detection/manual_detection_strategy.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
async def detect_chunk(self, sse_chunk: SSEChunk, context: StreamContext) -> DetectionResult:
    """
    Process a single chunk of streaming content for tool call detection.

    Args:
        sse_chunk (SSEChunk): The chunk of streaming content to process.
        context (StreamContext): Context information for the current stream.

    Returns:
        DetectionResult: Result of processing the chunk, including detection state
            and any content or tool calls found.
    """
    # If the chunk has no valid delta content, return NO_MATCH.
    if not sse_chunk.choices or not sse_chunk.choices[0].delta:
        return DetectionResult(state=DetectionState.NO_MATCH, sse_chunk=sse_chunk)

    chunk_content = sse_chunk.choices[0].delta.content
    if not chunk_content:
        return DetectionResult(state=DetectionState.NO_MATCH, sse_chunk=sse_chunk)

    # If already in tool call detection mode, continue accumulating and report a partial match.
    if self.in_tool_call:
        self.tool_call_buffer += chunk_content
        return DetectionResult(
            state=DetectionState.PARTIAL_MATCH,
            sse_chunk=sse_chunk
        )

    # Process the chunk through the pattern detector.
    result = await self.pattern_detector.process_chunk(chunk_content)

    if result.error:
        return DetectionResult(state=DetectionState.NO_MATCH, sse_chunk=sse_chunk)

    # If a pattern is matched, switch into tool call mode.
    # Return any content remaining that could have been in the same chunk as the patter or in a buffer
    if result.matched:
        self.in_tool_call = True
        self.tool_call_buffer = result.text_with_tool_call
        return DetectionResult(
            state=DetectionState.PARTIAL_MATCH,
            content=result.output,
            sse_chunk=sse_chunk
        )

    # For regular content, if there is any output, return it as PARTIAL_MATCH.
    if result.output:
        return DetectionResult(
            state=DetectionState.PARTIAL_MATCH,
            content=result.output,
            sse_chunk=sse_chunk
        )

    # If nothing of interest is found, return NO_MATCH.
    return DetectionResult(state=DetectionState.NO_MATCH, sse_chunk=sse_chunk)

finalize_detection(context) async

Finalize the detection process and handle any accumulated content.

This method is called at the end of a stream to process any remaining content in the buffers and return final detection results.

Parameters:

Name Type Description Default
context StreamContext

Context information for the current stream.

required

Returns:

Name Type Description
DetectionResult DetectionResult

Final result of the detection process, including any complete tool calls or remaining content.

Source code in src/llm/tool_detection/manual_detection_strategy.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
async def finalize_detection(self, context: StreamContext) -> DetectionResult:
    """Finalize the detection process and handle any accumulated content.

    This method is called at the end of a stream to process any remaining content
    in the buffers and return final detection results.

    Args:
        context (StreamContext): Context information for the current stream.

    Returns:
        DetectionResult: Final result of the detection process, including any
            complete tool calls or remaining content.
    """
    self.logger.debug("Finalizing detection")
    # Flush any remaining content from pattern detector
    final_result = await self.pattern_detector.flush_buffer()

    if self.in_tool_call:
        self.logger.debug("Processing final tool call buffer")
        if final_result.output:
            self.tool_call_buffer += final_result.output

        # Parse accumulated tool call
        parsed_tool_call_data = self.tool_call_parser.parse(self.tool_call_buffer)
        self.logger.debug(f"Tool call buffer: {self.tool_call_buffer}")
        self.logger.debug(f"Parsed tool call data: {parsed_tool_call_data}")

        if "error" in parsed_tool_call_data:
            self.logger.error(f"Tool call parsing failed: {parsed_tool_call_data['error']}")
            return DetectionResult(
                state=DetectionState.NO_MATCH,
                content="Sorry, but I was unable to complete your request. Please try again.",
            )

        parsed_tool_calls = self._extract_tool_calls(parsed_tool_call_data)
        self.logger.debug("Successfully parsed %d tool calls", len(parsed_tool_calls))

        return DetectionResult(
            state=DetectionState.COMPLETE_MATCH,
            tool_calls=parsed_tool_calls
        )

    # No tool call detected, return any final content
    if final_result.output:
        self.logger.debug("Returning final content: %s", final_result.output[:50])
        return DetectionResult(
            state=DetectionState.PARTIAL_MATCH,
            content=final_result.output
        )

    self.logger.debug("No final content to return")
    return DetectionResult(state=DetectionState.NO_MATCH)

reset()

Reset all internal state of the detection strategy.

This method clears all buffers and resets flags to their initial state. Should be called between processing different streams or after errors.

Source code in src/llm/tool_detection/manual_detection_strategy.py
60
61
62
63
64
65
66
67
68
69
70
71
def reset(self) -> None:
    """Reset all internal state of the detection strategy.

    This method clears all buffers and resets flags to their initial state.
    Should be called between processing different streams or after errors.
    """
    self.logger.debug("Resetting detector state")
    self.pattern_detector.reset_states()
    self.pre_tool_call_content = []
    self.tool_call_buffer = ""
    self.in_tool_call = False
    self.accumulation_mode = False