Skip to content

src.agent.chat_agent_streaming.StreamingChatAgent

An agent that handles streaming chat interactions with support for tool execution.

This agent processes conversations in a streaming fashion, handling message generation, tool detection, and tool execution. It supports both vendor-specific and manual tool detection strategies.

Parameters:

Name Type Description Default
config Dict

Configuration dictionary containing:

  • history_limit (int): Maximum number of historical messages to consider
  • system_prompt (str): System prompt to prepend to conversations
  • detection_mode (str): Mode for tool detection ('vendor' or 'manual')
  • use_vendor_chat_completions (bool): Whether to use vendor chat completions
  • models_config (Dict): Configuration for language models
  • tools_config (Dict): Configuration for available tools
  • logging_level (str): Logging level (default: 'INFO')
  • max_streaming_iterations (int): Maximum number of streaming iterations
required

Attributes:

Name Type Description
response_model_name str

Name of the main chat model

history_limit int

Maximum number of historical messages to consider

system_prompt str

System prompt prepended to conversations

logger Logger

Logger instance for the agent

detection_mode str

Current tool detection mode

use_vendor_chat_completions bool

Whether vendor chat completions are enabled

Source code in src/agent/chat_agent_streaming.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
class StreamingChatAgent:
    """An agent that handles streaming chat interactions with support for tool execution.

    This agent processes conversations in a streaming fashion, handling message generation,
    tool detection, and tool execution. It supports both vendor-specific and manual tool
    detection strategies.

    Args:
        config (Dict): Configuration dictionary containing:

            - `history_limit` (int): Maximum number of historical messages to consider
            - `system_prompt` (str): System prompt to prepend to conversations
            - `detection_mode` (str): Mode for tool detection ('vendor' or 'manual')
            - `use_vendor_chat_completions` (bool): Whether to use vendor chat completions
            - `models_config` (Dict): Configuration for language models
            - `tools_config` (Dict): Configuration for available tools
            - `logging_level` (str): Logging level (default: 'INFO')
            - `max_streaming_iterations` (int): Maximum number of streaming iterations

    Attributes:
        response_model_name (str): Name of the main chat model
        history_limit (int): Maximum number of historical messages to consider
        system_prompt (str): System prompt prepended to conversations
        logger (logging.Logger): Logger instance for the agent
        detection_mode (str): Current tool detection mode
        use_vendor_chat_completions (bool): Whether vendor chat completions are enabled
    """
    def __init__(self, config: Dict) -> None:
        self.config = config
        self.response_model_name = "main_chat_model"
        self.history_limit = self.config.get('history_limit', 3)
        self.system_prompt = self.config.get('system_prompt')

        # Initialize logger
        self.logger = logging.getLogger(self.__class__.__name__)
        logging_level = os.getenv("LOG_LEVEL", "INFO")
        logging.basicConfig(level=getattr(logging, logging_level.upper(), None))
        self.logger.info(f'Logger set to {logging_level}')

        # Initialize vendor-specific components
        self.main_chat_model_config = self.config.get('models_config').get('main_chat_model')
        self.llm_factory = LLMFactory(config=self.config.get('models_config'))

        # Determine detection strategy first
        self.detection_mode = self.config.get("detection_mode", "vendor")
        self.use_vendor_chat_completions = self.config.get("use_vendor_chat_completions", True)

        # Load parser config if needed for manual detection
        self.logger.info("\n\n" + "=" * 60 + "\n" + "Agent Configuration Summary" + "\n" + "=" * 60 + "\n" +
                         f"Main Chat Model Config:\n{json5.dumps(self.main_chat_model_config, indent=4)}\n" +
                         f"Tool Detection Mode: {self.detection_mode}\n" +
                         f"Vendor Chat API Mode: {self.use_vendor_chat_completions}\n" +
                         "\n" + "=" * 60 + "\n")
        if self.detection_mode == "manual":
            with open("src/configs/parsing.yaml", "r") as f:
                parser_config = yaml.safe_load(f)
            self.tool_call_parser = ToolCallParserFactory.get_parser(
                FormatType.JSON,
                parser_config
            )
            self.detection_strategy = ManualToolCallDetectionStrategy(
                parser=self.tool_call_parser
            )
        else:
            self.detection_strategy = VendorToolCallDetectionStrategy()
            self.tool_call_parser = None

        # Initialize prompt builder with inject_tools flag based on detection mode
        self.prompt_builder: BasePromptBuilder = PromptBuilderFactory.get_prompt_builder(
            vendor=self.main_chat_model_config.get('vendor')
        )

        # Initialize tool registry
        self.tool_registry = ToolRegistry(
            tools_config=self.config.get("tools_config"),
            mcp_config=self.config.get("mcp_config")
        )
        asyncio.create_task(self.tool_registry.initialize_all_tools())

    @handle_streaming_errors
    async def stream_step(
            self,
            conversation_history: List[TextChatMessage],
            api_passed_context: Optional[Dict[str, Any]] = None
    ) -> AsyncGenerator[SSEChunk, None]:
        """Process a conversation step in a streaming fashion.

        Handles the main conversation flow, including message generation, tool detection,
        and tool execution. The process flows through different states until completion.

        Args:
            conversation_history (List[TextChatMessage]): List of previous conversation messages
            api_passed_context (Optional[Dict[str, Any]]): Additional context passed from the API

        Yields:
            SSEChunk: Server-Sent Events chunks containing response content or status updates

        States:
            - STREAMING: Generating response content
            - TOOL_DETECTION: Detecting tool calls in the response
            - EXECUTING_TOOLS: Running detected tools
            - INTERMEDIATE: Handling intermediate steps
            - COMPLETING: Finalizing the response
        """
        self.logger.debug("Starting streaming agent processing")

        context = await self._initialize_context(conversation_history, api_passed_context)

        while context.current_state != StreamState.COMPLETED:
            match context.current_state:

                case StreamState.STREAMING:
                    self.logger.info(f"--- Entering Streaming State ---")
                    async for item in self._handle_streaming(context):
                        yield item

                case StreamState.TOOL_DETECTION:
                    self.logger.info(f"--- Entering Tool Detection State ---")
                    async for item in self._handle_tool_detection(context):
                        yield item

                case StreamState.EXECUTING_TOOLS:
                    self.logger.info(f"--- Entering Executing Tools State ---")
                    async for item in self._handle_tool_execution(context):
                        yield item

                case StreamState.INTERMEDIATE:
                    self.logger.info(f"--- Entering Intermediate State ---")
                    async for item in self._handle_intermediate(context):
                        yield item

                case StreamState.COMPLETING:
                    self.logger.info(f"--- Entering Completing State ---")
                    async for item in self._handle_completing(context):
                        yield item
                    context.current_state = StreamState.COMPLETED

    # ----------------------------------------------------------------
    #  STATE HANDLERS
    # ----------------------------------------------------------------

    @handle_streaming_errors
    async def _handle_streaming(self, context: StreamContext) -> AsyncGenerator[SSEChunk, None]:
        """Handle the streaming state of the conversation.

        Processes the main content generation phase, monitoring for tool calls and
        managing the response stream.

        Args:
            context (StreamContext): Current streaming context

        Yields:
            SSEChunk: Content chunks and status updates

        Raises:
            Exception: If maximum streaming iterations are exceeded
        """
        self.detection_strategy.reset()
        context.streaming_entry_count += 1
        if context.streaming_entry_count > context.max_streaming_iterations:
            self.logger.error("Maximum streaming iterations reached. Aborting further streaming.")
            yield await SSEChunk.make_stop_chunk(
                content="Maximum streaming depth reached. Please try your request again."
            )
            context.current_state = StreamState.COMPLETING
            return

        prompt_payload = PromptPayload(
            conversation_history=context.conversation_history,
            tool_definitions=context.tool_definitions if self.detection_mode == "manual" else None
        )

        prompt_output: PromptBuilderOutput = (
            await self.prompt_builder.build_chat(prompt_payload) if self.use_vendor_chat_completions
            else await self.prompt_builder.build_text(prompt_payload)
        )
        llm_input = prompt_output.get_output()

        stream_kwargs = {
            'prompt': llm_input if isinstance(llm_input, str) else None,
            'messages': llm_input if isinstance(llm_input, list) else None,
            'tools': context.tool_definitions if self.detection_mode != "manual" else None
        }
        stream_kwargs = {k: v for k, v in stream_kwargs.items() if v is not None}

        self.logger.debug(f"stream_kwargs: {stream_kwargs}")
        llm_adapter = context.llm_factory.get_adapter(self.response_model_name)
        stream_gen = llm_adapter.gen_sse_stream if isinstance(llm_input, str) else llm_adapter.gen_chat_sse_stream

        accumulated_content = []
        async for sse_chunk in stream_gen(**stream_kwargs):
            detection_result = await self.detection_strategy.detect_chunk(sse_chunk, context)
            self.logger.debug(f"Detection result: {detection_result}")

            if detection_result.state in [DetectionState.NO_MATCH, DetectionState.PARTIAL_MATCH]:
                if detection_result.content:
                    accumulated_content.append(detection_result.content)
                    yield SSEChunk.make_text_chunk(detection_result.content)

            elif detection_result.state == DetectionState.COMPLETE_MATCH:
                async for chunk in self._handle_complete_match(context, detection_result, accumulated_content):
                    yield chunk
                return

        final_result = await self.detection_strategy.finalize_detection(context)
        self.logger.debug(f"Final detection result: {final_result}")

        if final_result.state == DetectionState.COMPLETE_MATCH:
            async for chunk in self._handle_complete_match(context, final_result, accumulated_content):
                yield chunk
        else:
            if final_result.content:
                accumulated_content.append(final_result.content)
                yield SSEChunk.make_text_chunk(final_result.content)

            if accumulated_content:
                context.conversation_history.append(
                    AssistantMessage(content="".join(accumulated_content))
                )

            yield await SSEChunk.make_stop_chunk()
            context.current_state = StreamState.COMPLETING

    @handle_streaming_errors
    async def _handle_tool_detection(self, context: StreamContext) -> AsyncGenerator[SSEChunk, None]:
        """Handle the tool detection state.

        Processes detected tool calls and transitions to tool execution.

        Args:
            context (StreamContext): Current streaming context

        Yields:
            SSEChunk: Tool detection status updates
        """
        self.logger.debug("Tool calls detected, transitioning to EXECUTING_TOOLS")
        context.current_state = StreamState.EXECUTING_TOOLS
        yield await SSEChunk.make_status_chunk(
            AgentStatus.TOOL_DETECTED,
            {"tools": [tc.format_tool_calls() for tc in context.current_tool_call]}
        )

    @handle_streaming_errors
    async def _handle_tool_execution(
            self,
            context: StreamContext
    ) -> AsyncGenerator[SSEChunk, None]:
        """Execute detected tools and process their results.

        Runs the detected tools concurrently and handles their results, including
        error cases.

        Args:
            context (StreamContext): Current streaming context

        Yields:
            SSEChunk: Tool execution status updates
        """
        if context.message_buffer.strip():
            context.conversation_history.append(
                AssistantMessage(content=context.message_buffer)
            )
            context.message_buffer = ""

        results = await self._execute_tools_concurrently(context)

        tool_results = []
        for call, result in zip(context.current_tool_call, results):
            context.conversation_history.append(
                AssistantMessage(tool_calls=[call])
            )
            if isinstance(result, Exception):
                context.conversation_history.append(
                    AssistantMessage(
                        content=f"Error executing tool {call.function.name}: {str(result)}"
                    )
                )
            else:
                tool_results.append(result)
                context.conversation_history.append(
                    ToolMessage(
                        content=result["result"],
                        tool_call_id=call.id
                    )
                )

        self.logger.debug("Tool execution results: %s", tool_results)
        context.current_state = StreamState.INTERMEDIATE
        yield await SSEChunk.make_status_chunk(AgentStatus.TOOLS_EXECUTED)

    @handle_streaming_errors
    async def _handle_intermediate(self, context: StreamContext) -> AsyncGenerator[SSEChunk, None]:
        """Handle the intermediate state between tool executions.

        Resets the message buffer and detection strategy for the next iteration.

        Args:
            context (StreamContext): Current streaming context

        Yields:
            SSEChunk: Status updates for continuation
        """
        context.message_buffer = ""
        self.detection_strategy.reset()
        context.current_state = StreamState.STREAMING
        yield await SSEChunk.make_status_chunk(AgentStatus.CONTINUING)

    @handle_streaming_errors
    async def _handle_completing(self, context: StreamContext) -> AsyncGenerator[SSEChunk, None]:
        """Handle the completion state of the conversation.

        Finalizes the conversation and sends the stop signal.

        Args:
            context (StreamContext): Current streaming context

        Yields:
            SSEChunk: Final stop chunk
        """
        self.logger.info(f"--- Entering COMPLETING State ---")

        yield await SSEChunk.make_stop_chunk()
        self.logger.info(f"Streaming process completed.")

    # ----------------------------------------------------------------
    #  HELPER METHODS
    # ----------------------------------------------------------------

    async def _initialize_context(
            self,
            conversation_history: List[TextChatMessage],
            api_passed_context: Optional[Dict]
    ) -> StreamContext:
        """Initialize the streaming context for a new conversation step.

        Sets up the conversation history with system prompt and creates the
        streaming context with necessary configurations.

        Args:
            conversation_history (List[TextChatMessage]): Previous conversation messages
            api_passed_context (Optional[Dict]): Additional context from the API

        Returns:
            StreamContext: Initialized streaming context
        """
        selected_history = (
            conversation_history[-self.history_limit:]
            if self.history_limit > 0
            else conversation_history
        )

        if self.system_prompt:
            system_message = SystemMessage(content=self.system_prompt)
            selected_history.insert(0, system_message)

        return StreamContext(
            conversation_history=selected_history,
            tool_definitions=await self.tool_registry.get_tool_definitions(),
            context=api_passed_context,
            llm_factory=self.llm_factory,
            current_state=StreamState.STREAMING,
            max_streaming_iterations=self.config.get("max_streaming_iterations", 1),
            streaming_entry_count=0
        )

    async def _handle_complete_match(
            self,
            context: StreamContext,
            result: DetectionResult,
            accumulated_content: List[str]
    ) -> AsyncGenerator[SSEChunk, None]:
        """Handle a complete tool call match detection.

        Process detected tool calls and update the conversation history.

        Args:
            context (StreamContext): Current streaming context
            result (DetectionResult): Tool detection result
            accumulated_content (List[str]): Accumulated response content

        Yields:
            SSEChunk: Content chunks and updates
        """
        if result.content:
            accumulated_content.append(result.content)
            yield SSEChunk.make_text_chunk(result.content)

        context.current_tool_call = result.tool_calls or []
        if accumulated_content:
            context.conversation_history.append(
                AssistantMessage(content="".join(accumulated_content))
            )
        context.current_state = StreamState.TOOL_DETECTION

    async def _execute_tools_concurrently(self, context: StreamContext) -> List[Any]:
        """Execute multiple tools concurrently.

        Run detected tools in parallel and handle their results or errors.

        Args:
            context (StreamContext): Current streaming context

        Returns:
            List[Any]: List of tool execution results or exceptions

        Raises:
            RuntimeError: If a requested tool is not found
        """
        async def run_tool(tool_call: ToolCall):
            try:
                tool = await self.tool_registry.get_tool(tool_call.function.name)
                if not tool:
                    raise RuntimeError(f"Tool {tool_call.function.name} not found")
                tool_args = json5.loads(tool_call.function.arguments)
                self.logger.info(f"Running tool {tool_call.function.name} with arguments: {tool_args}")
                result = await tool.execute(
                    context=context,
                    **tool_args
                )
                return {"tool_name": tool_call.function.name, "result": result.result}

            except Exception as e:
                self.logger.error(f"Error executing tool {tool_call.function.name}", exc_info=True)
                return e

        tasks = [run_tool(tc) for tc in context.current_tool_call]
        return await asyncio.gather(*tasks, return_exceptions=True)

stream_step(conversation_history, api_passed_context=None) async

Process a conversation step in a streaming fashion.

Handles the main conversation flow, including message generation, tool detection, and tool execution. The process flows through different states until completion.

Parameters:

Name Type Description Default
conversation_history List[TextChatMessage]

List of previous conversation messages

required
api_passed_context Optional[Dict[str, Any]]

Additional context passed from the API

None

Yields:

Name Type Description
SSEChunk AsyncGenerator[SSEChunk, None]

Server-Sent Events chunks containing response content or status updates

States
  • STREAMING: Generating response content
  • TOOL_DETECTION: Detecting tool calls in the response
  • EXECUTING_TOOLS: Running detected tools
  • INTERMEDIATE: Handling intermediate steps
  • COMPLETING: Finalizing the response
Source code in src/agent/chat_agent_streaming.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
@handle_streaming_errors
async def stream_step(
        self,
        conversation_history: List[TextChatMessage],
        api_passed_context: Optional[Dict[str, Any]] = None
) -> AsyncGenerator[SSEChunk, None]:
    """Process a conversation step in a streaming fashion.

    Handles the main conversation flow, including message generation, tool detection,
    and tool execution. The process flows through different states until completion.

    Args:
        conversation_history (List[TextChatMessage]): List of previous conversation messages
        api_passed_context (Optional[Dict[str, Any]]): Additional context passed from the API

    Yields:
        SSEChunk: Server-Sent Events chunks containing response content or status updates

    States:
        - STREAMING: Generating response content
        - TOOL_DETECTION: Detecting tool calls in the response
        - EXECUTING_TOOLS: Running detected tools
        - INTERMEDIATE: Handling intermediate steps
        - COMPLETING: Finalizing the response
    """
    self.logger.debug("Starting streaming agent processing")

    context = await self._initialize_context(conversation_history, api_passed_context)

    while context.current_state != StreamState.COMPLETED:
        match context.current_state:

            case StreamState.STREAMING:
                self.logger.info(f"--- Entering Streaming State ---")
                async for item in self._handle_streaming(context):
                    yield item

            case StreamState.TOOL_DETECTION:
                self.logger.info(f"--- Entering Tool Detection State ---")
                async for item in self._handle_tool_detection(context):
                    yield item

            case StreamState.EXECUTING_TOOLS:
                self.logger.info(f"--- Entering Executing Tools State ---")
                async for item in self._handle_tool_execution(context):
                    yield item

            case StreamState.INTERMEDIATE:
                self.logger.info(f"--- Entering Intermediate State ---")
                async for item in self._handle_intermediate(context):
                    yield item

            case StreamState.COMPLETING:
                self.logger.info(f"--- Entering Completing State ---")
                async for item in self._handle_completing(context):
                    yield item
                context.current_state = StreamState.COMPLETED