Skip to content

src.llm.adapters.xai_adapter.XAIAdapter

Bases: BaseVendorAdapter

Adapter for interacting with xAI's API.

Utilizes the OpenAI client for compatibility with xAI API endpoints. Supports streaming responses and converts them to standardized SSE chunks.

Attributes:

Name Type Description
model_name str

The model identifier being served

api_key str

xAI API key for authentication

base_url str

URL of the xAI API server

client AsyncOpenAI

Configured OpenAI-compatible client for xAI

Source code in src/llm/adapters/xai_adapter.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
class XAIAdapter(BaseVendorAdapter):
    """Adapter for interacting with xAI's API.

    Utilizes the OpenAI client for compatibility with xAI API endpoints.
    Supports streaming responses and converts them to standardized SSE chunks.

    Attributes:
        model_name (str): The model identifier being served
        api_key (str): xAI API key for authentication
        base_url (str): URL of the xAI API server
        client (AsyncOpenAI): Configured OpenAI-compatible client for xAI
    """

    def __init__(
            self,
            model_name: str,
            base_url: str = "https://api.x.ai/v1",
            api_key: str = None,
            **default_params
    ):
        """Initialize the xAI adapter.

        Args:
            model_name (str): Name of the xAI model to use
            base_url (str): URL of the xAI API server
            api_key (str): xAI API key for authentication
            **default_params: Additional parameters for generation (temperature etc.)
        """
        self.model_name = model_name
        self.base_url = base_url

        # Get API key from environment or parameter
        self.api_key = api_key or os.getenv("XAI_API_KEY")
        if not self.api_key:
            raise ValueError("xAI API key is required. Provide as parameter or set `XAI_API_KEY` environment variable.")

        self.default_params = default_params

        # Configure OpenAI-compatible client for X.AI
        self.client = AsyncOpenAI(
            base_url=self.base_url,
            api_key=self.api_key
        )

        logger.info(f"Initialized xAI adapter for model: {self.model_name}")
        logger.debug(f"Using X.AI server at: {self.base_url}")
        logger.debug(f"Default parameters: {default_params}")

    async def gen_chat_sse_stream(
            self,
            messages: List[TextChatMessage],
            tools: Optional[List[Tool]] = None,
            **kwargs
    ) -> AsyncGenerator[SSEChunk, None]:
        """Generate streaming chat completion.

        Uses xAI's chat completions endpoint with streaming enabled.

        Args:
            messages (List[TextChatMessage]): List of chat messages
            tools (Optional[List[Tool]]): Optional tools/functions definitions
            **kwargs: Additional parameters to override defaults

        Yields:
            SSEChunk: Standardized chunks of the streaming response
        """
        try:
            # Convert messages to OpenAI format
            openai_messages = [msg.model_dump() for msg in messages]

            # Prepare request payload
            request_params = {
                "model": self.model_name,
                "messages": openai_messages,
                "stream": True,
                **self.default_params,
                **kwargs
            }

            # Add tools if provided
            if tools:
                request_params["tools"] = [tool.model_dump() for tool in tools]
                request_params["tool_choice"] = "auto"

            # Stream response
            async for chunk in await self.client.chat.completions.create(**request_params):
                yield self._convert_to_sse_chunk(chunk)

        except Exception as e:
            logger.error(f"Error in xAI chat stream: {str(e)}", exc_info=True)
            raise RuntimeError(f"xAI chat completion failed: {str(e)}") from e

    async def gen_sse_stream(
            self,
            prompt: str,
            **kwargs
    ) -> AsyncGenerator[SSEChunk, None]:
        """Generate streaming completion from a text prompt.

        For xAI, this simply converts the text prompt to a chat message and calls gen_chat_sse_stream.

        Args:
            prompt (str): Input text prompt
            **kwargs: Additional parameters to override defaults

        Yields:
            SSEChunk: Standardized chunks of the streaming response
        """
        # Convert the prompt to a single user message
        messages = [{"role": "user", "content": prompt}]

        # Use the chat completions endpoint
        async for chunk in self.gen_chat_sse_stream(messages, **kwargs):
            yield chunk

    def _convert_to_sse_chunk(self, raw_chunk) -> SSEChunk:
        """Convert xAI API response chunk to standardized SSE format.

        Args:
            raw_chunk: Raw chunk from xAI API

        Returns:
            SSEChunk: Standardized chunk format
        """
        try:
            choices = []

            # Handle chat completion format
            for choice in raw_chunk.choices:
                tool_calls = None

                # Handle tool calls if present
                if hasattr(choice.delta, 'tool_calls') and choice.delta.tool_calls:
                    tool_calls = []
                    for tc in choice.delta.tool_calls:
                        function = None
                        if tc.function:
                            function = SSEFunction(
                                name=tc.function.name or "",
                                arguments=tc.function.arguments or ""
                            )
                        tool_calls.append(SSEToolCall(
                            index=tc.index or 0,
                            id=tc.id,
                            type=tc.type or "function",
                            function=function
                        ))

                choices.append(SSEChoice(
                    index=choice.index,
                    delta=SSEDelta(
                        role=choice.delta.role if hasattr(choice.delta, 'role') else None,
                        content=choice.delta.content if hasattr(choice.delta, 'content') else None,
                        tool_calls=tool_calls
                    ),
                    finish_reason=choice.finish_reason
                ))

            return SSEChunk(
                id=raw_chunk.id,
                object=raw_chunk.object,
                created=raw_chunk.created,
                model=raw_chunk.model,
                choices=choices
            )

        except Exception as e:
            logger.error(f"Error converting xAI chunk: {raw_chunk}", exc_info=True)
            raise ValueError(f"Failed to convert xAI response: {str(e)}") from e

__init__(model_name, base_url='https://api.x.ai/v1', api_key=None, **default_params)

Initialize the xAI adapter.

Parameters:

Name Type Description Default
model_name str

Name of the xAI model to use

required
base_url str

URL of the xAI API server

'https://api.x.ai/v1'
api_key str

xAI API key for authentication

None
**default_params

Additional parameters for generation (temperature etc.)

{}
Source code in src/llm/adapters/xai_adapter.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def __init__(
        self,
        model_name: str,
        base_url: str = "https://api.x.ai/v1",
        api_key: str = None,
        **default_params
):
    """Initialize the xAI adapter.

    Args:
        model_name (str): Name of the xAI model to use
        base_url (str): URL of the xAI API server
        api_key (str): xAI API key for authentication
        **default_params: Additional parameters for generation (temperature etc.)
    """
    self.model_name = model_name
    self.base_url = base_url

    # Get API key from environment or parameter
    self.api_key = api_key or os.getenv("XAI_API_KEY")
    if not self.api_key:
        raise ValueError("xAI API key is required. Provide as parameter or set `XAI_API_KEY` environment variable.")

    self.default_params = default_params

    # Configure OpenAI-compatible client for X.AI
    self.client = AsyncOpenAI(
        base_url=self.base_url,
        api_key=self.api_key
    )

    logger.info(f"Initialized xAI adapter for model: {self.model_name}")
    logger.debug(f"Using X.AI server at: {self.base_url}")
    logger.debug(f"Default parameters: {default_params}")

gen_chat_sse_stream(messages, tools=None, **kwargs) async

Generate streaming chat completion.

Uses xAI's chat completions endpoint with streaming enabled.

Parameters:

Name Type Description Default
messages List[TextChatMessage]

List of chat messages

required
tools Optional[List[Tool]]

Optional tools/functions definitions

None
**kwargs

Additional parameters to override defaults

{}

Yields:

Name Type Description
SSEChunk AsyncGenerator[SSEChunk, None]

Standardized chunks of the streaming response

Source code in src/llm/adapters/xai_adapter.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
async def gen_chat_sse_stream(
        self,
        messages: List[TextChatMessage],
        tools: Optional[List[Tool]] = None,
        **kwargs
) -> AsyncGenerator[SSEChunk, None]:
    """Generate streaming chat completion.

    Uses xAI's chat completions endpoint with streaming enabled.

    Args:
        messages (List[TextChatMessage]): List of chat messages
        tools (Optional[List[Tool]]): Optional tools/functions definitions
        **kwargs: Additional parameters to override defaults

    Yields:
        SSEChunk: Standardized chunks of the streaming response
    """
    try:
        # Convert messages to OpenAI format
        openai_messages = [msg.model_dump() for msg in messages]

        # Prepare request payload
        request_params = {
            "model": self.model_name,
            "messages": openai_messages,
            "stream": True,
            **self.default_params,
            **kwargs
        }

        # Add tools if provided
        if tools:
            request_params["tools"] = [tool.model_dump() for tool in tools]
            request_params["tool_choice"] = "auto"

        # Stream response
        async for chunk in await self.client.chat.completions.create(**request_params):
            yield self._convert_to_sse_chunk(chunk)

    except Exception as e:
        logger.error(f"Error in xAI chat stream: {str(e)}", exc_info=True)
        raise RuntimeError(f"xAI chat completion failed: {str(e)}") from e

gen_sse_stream(prompt, **kwargs) async

Generate streaming completion from a text prompt.

For xAI, this simply converts the text prompt to a chat message and calls gen_chat_sse_stream.

Parameters:

Name Type Description Default
prompt str

Input text prompt

required
**kwargs

Additional parameters to override defaults

{}

Yields:

Name Type Description
SSEChunk AsyncGenerator[SSEChunk, None]

Standardized chunks of the streaming response

Source code in src/llm/adapters/xai_adapter.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
async def gen_sse_stream(
        self,
        prompt: str,
        **kwargs
) -> AsyncGenerator[SSEChunk, None]:
    """Generate streaming completion from a text prompt.

    For xAI, this simply converts the text prompt to a chat message and calls gen_chat_sse_stream.

    Args:
        prompt (str): Input text prompt
        **kwargs: Additional parameters to override defaults

    Yields:
        SSEChunk: Standardized chunks of the streaming response
    """
    # Convert the prompt to a single user message
    messages = [{"role": "user", "content": prompt}]

    # Use the chat completions endpoint
    async for chunk in self.gen_chat_sse_stream(messages, **kwargs):
        yield chunk