Skip to content

src.llm.adapters.openai_compat_adapter.OpenAICompatAdapter

Bases: BaseVendorAdapter

Adapter for interacting with Open AI compatible APIs.

Supports both chat completions and completions endpoints. Handles streaming responses and converts them to standardized SSE chunks.

Attributes:

Name Type Description
model_name str

The model identifier being served

base_url str

URL of the server (default: http://localhost:8000/v1)

api_key str

API key for server authentication

client AsyncOpenAI

Configured OpenAI client

Source code in src/llm/adapters/openai_compat_adapter.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
class OpenAICompatAdapter(BaseVendorAdapter):
    """Adapter for interacting with Open AI compatible APIs.

    Supports both chat completions and completions endpoints. Handles streaming responses and converts
    them to standardized SSE chunks.

    Attributes:
        model_name (str): The model identifier being served
        base_url (str): URL of the server (default: http://localhost:8000/v1)
        api_key (str): API key for server authentication
        client (AsyncOpenAI): Configured OpenAI client
    """

    def __init__(
            self,
            model_name: str,
            base_url: str = "http://localhost:8000/v1",
            api_key: str = "dummy-key",
            **default_params
    ):
        """Initialize the adapter.

        Args:
            model_name (str): Name of the model being served (e.g. "NousResearch/Llama-2-7b")
            base_url (str): URL of the server
            api_key (str): API key for authentication
            **default_params: Additional parameters for generation (temperature etc.)
        """
        self.model_name = model_name
        self.base_url = base_url
        self.api_key = api_key
        self.default_params = default_params

        # Configure OpenAI client for server
        self.client = AsyncOpenAI(
            base_url=self.base_url,
            api_key=self.api_key
        )

        logger.info(f"Initialized adapter for model: {self.model_name}")
        logger.debug(f"Using server at: {self.base_url}")
        logger.debug(f"Default parameters: {default_params}")

    async def gen_chat_sse_stream(
            self,
            messages: List[TextChatMessage],
            tools: Optional[List[Tool]] = None,
            **kwargs
    ) -> AsyncGenerator[SSEChunk, None]:
        """Generate streaming chat completion using chat endpoint.

        Args:
            messages (List[TextChatMessage]): List of chat messages
            tools (Optional[List[Tool]]): Optional tools/functions
            **kwargs: Additional parameters to override defaults

        Yields:
            SSEChunk: Standardized chunks of the streaming response
        """
        try:
            # Convert messages to OpenAI format
            openai_messages = [msg.model_dump() for msg in messages]

            # Prepare request payload
            request_params = {
                "model": self.model_name,
                "messages": openai_messages,
                "stream": True,
                **self.default_params,
                **kwargs
            }

            # Add tools if provided
            if tools:
                request_params["tools"] = [tool.model_dump() for tool in tools]
                request_params["tool_choice"] = "auto"

            # Stream response
            async for chunk in await self.client.chat.completions.create(**request_params):
                yield self._convert_to_sse_chunk(chunk)

        except Exception as e:
            logger.error(f"Error in chat stream: {str(e)}", exc_info=True)
            raise RuntimeError(f"Chat completion failed: {str(e)}") from e

    async def gen_sse_stream(
            self,
            prompt: str,
            **kwargs
    ) -> AsyncGenerator[SSEChunk, None]:
        """Generate streaming completion using completions endpoint.

        Args:
            prompt (str): Input text prompt
            **kwargs: Additional parameters to override defaults

        Yields:
            SSEChunk: Standardized chunks of the streaming response
        """
        try:
            # Prepare request payload
            request_params = {
                "model": self.model_name,
                "prompt": prompt,
                "stream": True,
                **self.default_params,
                **kwargs
            }

            logger.debug(f"Making completions request with prompt: {prompt[:50]}...")

            # Use completions endpoint directly
            async for chunk in await self.client.completions.create(**request_params):
                yield self._convert_to_sse_chunk(chunk)

        except Exception as e:
            logger.error(f"Error in completion stream: {str(e)}", exc_info=True)
            raise RuntimeError(f"Completion failed: {str(e)}") from e

    def _convert_to_sse_chunk(self, raw_chunk) -> SSEChunk:
        """Convert response chunk to standardized SSE format.

        Args:
            raw_chunk: Raw chunk from API

        Returns:
            SSEChunk: Standardized chunk format
        """
        try:
            choices = []

            # Check if this is a text completion or chat completion by looking at the object type
            if raw_chunk.object == 'text_completion':
                # Handle text completion format
                for choice in raw_chunk.choices:
                    choices.append(SSEChoice(
                        index=choice.index,
                        delta=SSEDelta(
                            content=choice.text,
                            role="assistant"
                        ),
                        finish_reason=choice.finish_reason
                    ))
            else:
                # Handle chat completion format
                for choice in raw_chunk.choices:
                    tool_calls = None
                    if hasattr(choice.delta, 'tool_calls') and choice.delta.tool_calls:
                        tool_calls = []
                        for tc in choice.delta.tool_calls:
                            function = None
                            if tc.function:
                                function = SSEFunction(
                                    name=tc.function.name or "",
                                    arguments=tc.function.arguments or ""
                                )
                            tool_calls.append(SSEToolCall(
                                index=tc.index or 0,
                                id=tc.id,
                                type=tc.type or "function",
                                function=function
                            ))

                    choices.append(SSEChoice(
                        index=choice.index,
                        delta=SSEDelta(
                            role=choice.delta.role if hasattr(choice.delta, 'role') else None,
                            content=choice.delta.content if hasattr(choice.delta, 'content') else None,
                            tool_calls=tool_calls
                        ),
                        finish_reason=choice.finish_reason
                    ))

            return SSEChunk(
                id=raw_chunk.id,
                object=raw_chunk.object,
                created=raw_chunk.created,
                model=raw_chunk.model,
                choices=choices
            )

        except Exception as e:
            logger.error(f"Error converting chunk: {raw_chunk}", exc_info=True)
            raise ValueError(f"Failed to convert response: {str(e)}") from e

__init__(model_name, base_url='http://localhost:8000/v1', api_key='dummy-key', **default_params)

Initialize the adapter.

Parameters:

Name Type Description Default
model_name str

Name of the model being served (e.g. "NousResearch/Llama-2-7b")

required
base_url str

URL of the server

'http://localhost:8000/v1'
api_key str

API key for authentication

'dummy-key'
**default_params

Additional parameters for generation (temperature etc.)

{}
Source code in src/llm/adapters/openai_compat_adapter.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def __init__(
        self,
        model_name: str,
        base_url: str = "http://localhost:8000/v1",
        api_key: str = "dummy-key",
        **default_params
):
    """Initialize the adapter.

    Args:
        model_name (str): Name of the model being served (e.g. "NousResearch/Llama-2-7b")
        base_url (str): URL of the server
        api_key (str): API key for authentication
        **default_params: Additional parameters for generation (temperature etc.)
    """
    self.model_name = model_name
    self.base_url = base_url
    self.api_key = api_key
    self.default_params = default_params

    # Configure OpenAI client for server
    self.client = AsyncOpenAI(
        base_url=self.base_url,
        api_key=self.api_key
    )

    logger.info(f"Initialized adapter for model: {self.model_name}")
    logger.debug(f"Using server at: {self.base_url}")
    logger.debug(f"Default parameters: {default_params}")

gen_chat_sse_stream(messages, tools=None, **kwargs) async

Generate streaming chat completion using chat endpoint.

Parameters:

Name Type Description Default
messages List[TextChatMessage]

List of chat messages

required
tools Optional[List[Tool]]

Optional tools/functions

None
**kwargs

Additional parameters to override defaults

{}

Yields:

Name Type Description
SSEChunk AsyncGenerator[SSEChunk, None]

Standardized chunks of the streaming response

Source code in src/llm/adapters/openai_compat_adapter.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
async def gen_chat_sse_stream(
        self,
        messages: List[TextChatMessage],
        tools: Optional[List[Tool]] = None,
        **kwargs
) -> AsyncGenerator[SSEChunk, None]:
    """Generate streaming chat completion using chat endpoint.

    Args:
        messages (List[TextChatMessage]): List of chat messages
        tools (Optional[List[Tool]]): Optional tools/functions
        **kwargs: Additional parameters to override defaults

    Yields:
        SSEChunk: Standardized chunks of the streaming response
    """
    try:
        # Convert messages to OpenAI format
        openai_messages = [msg.model_dump() for msg in messages]

        # Prepare request payload
        request_params = {
            "model": self.model_name,
            "messages": openai_messages,
            "stream": True,
            **self.default_params,
            **kwargs
        }

        # Add tools if provided
        if tools:
            request_params["tools"] = [tool.model_dump() for tool in tools]
            request_params["tool_choice"] = "auto"

        # Stream response
        async for chunk in await self.client.chat.completions.create(**request_params):
            yield self._convert_to_sse_chunk(chunk)

    except Exception as e:
        logger.error(f"Error in chat stream: {str(e)}", exc_info=True)
        raise RuntimeError(f"Chat completion failed: {str(e)}") from e

gen_sse_stream(prompt, **kwargs) async

Generate streaming completion using completions endpoint.

Parameters:

Name Type Description Default
prompt str

Input text prompt

required
**kwargs

Additional parameters to override defaults

{}

Yields:

Name Type Description
SSEChunk AsyncGenerator[SSEChunk, None]

Standardized chunks of the streaming response

Source code in src/llm/adapters/openai_compat_adapter.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
async def gen_sse_stream(
        self,
        prompt: str,
        **kwargs
) -> AsyncGenerator[SSEChunk, None]:
    """Generate streaming completion using completions endpoint.

    Args:
        prompt (str): Input text prompt
        **kwargs: Additional parameters to override defaults

    Yields:
        SSEChunk: Standardized chunks of the streaming response
    """
    try:
        # Prepare request payload
        request_params = {
            "model": self.model_name,
            "prompt": prompt,
            "stream": True,
            **self.default_params,
            **kwargs
        }

        logger.debug(f"Making completions request with prompt: {prompt[:50]}...")

        # Use completions endpoint directly
        async for chunk in await self.client.completions.create(**request_params):
            yield self._convert_to_sse_chunk(chunk)

    except Exception as e:
        logger.error(f"Error in completion stream: {str(e)}", exc_info=True)
        raise RuntimeError(f"Completion failed: {str(e)}") from e