Skip to content

Streaming

src.data_models.streaming.PatternMatchResult

Bases: BaseModel

Result of a pattern matching operation in stream processing.

This class encapsulates all possible outcomes and data from a pattern matching operation, including matched patterns, processed output, and any errors that occurred.

Attributes:

Name Type Description
output Optional[str]

The processed text after pattern matching. None if no processing occurred.

pattern_name Optional[str]

Name of the matched pattern. None if no pattern was matched.

matched bool

Whether a pattern was successfully matched. Defaults to False.

text_with_tool_call Optional[str]

Complete text containing the tool call if a match was found. None otherwise.

tool_call_message Optional[str]

Message associated with the tool call. None if no tool call was detected.

error Optional[str]

Error message if pattern matching failed. None if processing was successful.

Example
result = PatternMatchResult(
    output="Processed text",
    pattern_name="MistralPattern0",
    matched=True,
    text_with_tool_call="Complete tool call text",
    tool_call_message="Tool call detected"
)
Source code in src/data_models/streaming.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class PatternMatchResult(BaseModel):
    """Result of a pattern matching operation in stream processing.

    This class encapsulates all possible outcomes and data from a pattern
    matching operation, including matched patterns, processed output,
    and any errors that occurred.

    Attributes:
        output (Optional[str]): The processed text after pattern matching.
            None if no processing occurred.
        pattern_name (Optional[str]): Name of the matched pattern.
            None if no pattern was matched.
        matched (bool): Whether a pattern was successfully matched.
            Defaults to False.
        text_with_tool_call (Optional[str]): Complete text containing the tool call
            if a match was found. None otherwise.
        tool_call_message (Optional[str]): Message associated with the tool call.
            None if no tool call was detected.
        error (Optional[str]): Error message if pattern matching failed.
            None if processing was successful.

    Example:
        ```python
        result = PatternMatchResult(
            output="Processed text",
            pattern_name="MistralPattern0",
            matched=True,
            text_with_tool_call="Complete tool call text",
            tool_call_message="Tool call detected"
        )
        ```
    """
    output: Optional[str] = Field(
        default=None,
        description="The processed output text after pattern matching"
    )
    pattern_name: Optional[str] = Field(
        default=None,
        description="The name of the matched pattern"
    )
    matched: bool = Field(
        default=False,
        description="Indicates whether a pattern was successfully matched"
    )
    text_with_tool_call: Optional[str] = Field(
        default=None,
        description="The complete text containing the tool call if matched"
    )
    tool_call_message: Optional[str] = Field(
        default=None,
        description="Any message associated with the tool call"
    )
    error: Optional[str] = Field(
        default=None,
        description="Error message if pattern matching failed"
    )

src.data_models.streaming.StreamConfig

Bases: BaseModel

Configuration settings for stream processing operations.

This class defines the configuration parameters that control how streaming data is processed, buffered, and chunked. It uses Pydantic for validation and provides several factory methods for common configurations.

Attributes:

Name Type Description
buffer_size int

Size of the buffer in characters. Zero disables buffering. Must be greater than or equal to 0.

chunk_separator str

String used to separate chunks when combining buffered content.

strip_whitespace bool

Whether to remove whitespace from chunks before processing.

buffering_enabled bool

Computed field indicating if buffering is active.

Example
# Create a default configuration
config = StreamConfig.create_default()

# Create a buffered configuration
buffered_config = StreamConfig.create_buffered(buffer_size=100)

# Create configuration with custom separator
custom_config = StreamConfig.create_with_separator(
    buffer_size=50,
    separator="\n"
)
Source code in src/data_models/streaming.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
class StreamConfig(BaseModel):
    """Configuration settings for stream processing operations.

    This class defines the configuration parameters that control how streaming
    data is processed, buffered, and chunked. It uses Pydantic for validation
    and provides several factory methods for common configurations.

    Attributes:
        buffer_size (int): Size of the buffer in characters. Zero disables buffering.
            Must be greater than or equal to 0.
        chunk_separator (str): String used to separate chunks when combining buffered content.
        strip_whitespace (bool): Whether to remove whitespace from chunks before processing.
        buffering_enabled (bool): Computed field indicating if buffering is active.

    Example:
        ```python
        # Create a default configuration
        config = StreamConfig.create_default()

        # Create a buffered configuration
        buffered_config = StreamConfig.create_buffered(buffer_size=100)

        # Create configuration with custom separator
        custom_config = StreamConfig.create_with_separator(
            buffer_size=50,
            separator="\\n"
        )
        ```
    """
    buffer_size: int = Field(
        default=0,
        description="Size of the buffer in characters. Set to 0 to disable buffering.",
        ge=0
    )
    chunk_separator: str = Field(
        default="",
        description="Separator to use between chunks when combining buffered content."
    )
    strip_whitespace: bool = Field(
        default=False,
        description="Whether to strip whitespace from chunks before processing."
    )

    @computed_field
    @property
    def buffering_enabled(self) -> bool:
        """Indicates whether buffering is enabled based on buffer size."""
        return self.buffer_size > 0

    model_config = {
        "frozen": True,
        "extra": "forbid",
        "json_schema_extra": {
            "examples": [
                {
                    "buffer_size": 10,
                    "chunk_separator": "\n",
                    "strip_whitespace": True
                }
            ]
        }
    }

    @classmethod
    def create_default(cls) -> "StreamConfig":
        """Create a StreamConfig with default values."""
        return cls()

    @classmethod
    def create_buffered(cls, buffer_size: int) -> "StreamConfig":
        """Create a StreamConfig with specified buffer size."""
        return cls(buffer_size=buffer_size)

    @classmethod
    def create_with_separator(
            cls,
            buffer_size: int = 0,
            separator: str = "\n"
    ) -> "StreamConfig":
        """Create a StreamConfig with specified buffer size and separator."""
        return cls(
            buffer_size=buffer_size,
            chunk_separator=separator
        )

buffering_enabled property

Indicates whether buffering is enabled based on buffer size.

create_buffered(buffer_size) classmethod

Create a StreamConfig with specified buffer size.

Source code in src/data_models/streaming.py
133
134
135
136
@classmethod
def create_buffered(cls, buffer_size: int) -> "StreamConfig":
    """Create a StreamConfig with specified buffer size."""
    return cls(buffer_size=buffer_size)

create_default() classmethod

Create a StreamConfig with default values.

Source code in src/data_models/streaming.py
128
129
130
131
@classmethod
def create_default(cls) -> "StreamConfig":
    """Create a StreamConfig with default values."""
    return cls()

create_with_separator(buffer_size=0, separator='\n') classmethod

Create a StreamConfig with specified buffer size and separator.

Source code in src/data_models/streaming.py
138
139
140
141
142
143
144
145
146
147
148
@classmethod
def create_with_separator(
        cls,
        buffer_size: int = 0,
        separator: str = "\n"
) -> "StreamConfig":
    """Create a StreamConfig with specified buffer size and separator."""
    return cls(
        buffer_size=buffer_size,
        chunk_separator=separator
    )