Skip to content

src.llm.adapters.anthropic_adapter.AnthropicAdapter

Bases: BaseVendorAdapter

Adapter for interacting with Anthropic's API.

Source code in src/llm/adapters/anthropic_adapter.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
class AnthropicAdapter(BaseVendorAdapter):
    """Adapter for interacting with Anthropic's API."""

    def __init__(self, model_name: str, **default_params):
        """Initialize Anthropic Adapter.

        Args:
            model_name (str): The name of the model to use.
            **default_params: Additional default parameters for the adapter.

        Raises:
            ValueError: If the Anthropic API key is missing.
        """
        self.api_key = os.getenv("ANTHROPIC_API_KEY")
        if not self.api_key:
            raise ValueError(
                "Missing Anthropic API key. Set the ANTHROPIC_API_KEY environment variable."
            )
        self.client = AsyncAnthropic(api_key=self.api_key)
        self.model_name = model_name
        self.default_params = default_params
        logger.info(f"Anthropic Adapter initialized with model: {self.model_name}")

    async def gen_sse_stream(
        self, prompt: str, **kwargs
    ) -> AsyncGenerator[SSEChunk, None]:
        """Generate an SSE stream from a text prompt.

        Args:
            prompt (str): The text prompt to generate an SSE stream for.
            **kwargs: Additional keyword arguments for generation.

        Yields:
            AsyncGenerator[SSEChunk, None]: A generator yielding SSEChunk objects.
        """
        async for chunk in self.gen_chat_sse_stream([{"role": "user", "content": prompt}], **kwargs):
            yield chunk

    async def gen_chat_sse_stream(
        self,
        messages: List[TextChatMessage],
        tools: Optional[List[Tool]] = None,
        **kwargs,
    ) -> AsyncGenerator[SSEChunk, None]:
        """Generate a streaming chat response.

        Args:
            `messages` (List[TextChatMessage]): A list of chat messages.
            `tools` (Optional[List[Tool]], optional): A list of Tool objects. Defaults to None.
            `**kwargs`: Additional keyword arguments for generation.

        Yields:
            AsyncGenerator[SSEChunk, None]: A generator yielding SSEChunk objects.
        """
        request_payload = {
            "model": self.model_name,
            "max_tokens": self.default_params.get("max_tokens", 1024),
            "stream": True,
            **self.default_params,
            **kwargs,
            **convert_messages_to_anthropic(messages),
        }

        if tools:
            anthropic_tools = [convert_tool_to_anthropic_format(tool) for tool in tools]
            request_payload["tools"] = anthropic_tools
            request_payload["tool_choice"] = {"type": "auto"}

        try:
            stream = await self.client.messages.create(**request_payload)
            async for event in stream:
                yield await self._convert_to_sse_chunk(event)
        except Exception as e:
            logger.error(f"Error in Anthropic streaming: {str(e)}", exc_info=True)
            raise RuntimeError(f"Anthropic API streaming failed: {str(e)}") from e

    async def _convert_to_sse_chunk(self, raw_event: Any) -> SSEChunk:
        """Convert an Anthropic event to an SSEChunk.

        Args:
            `raw_event` (Any): The raw event from the Anthropic API.

        Returns:
            SSEChunk: The converted SSEChunk object.

        Raises:
            ValueError: If conversion of the event fails.
        """
        try:
            event_type = raw_event.type
            current_time = int(time.time())

            match event_type:
                case "content_block_start":
                    content_block = raw_event.content_block
                    if content_block.type == "text":
                        delta = SSEDelta(
                            role="assistant",
                            content=getattr(content_block, "text", ""),
                        )
                    elif content_block.type == "tool_use":
                        delta = SSEDelta(
                            role="assistant",
                            content="",
                            tool_calls=[SSEToolCall(
                                id=content_block.id,
                                type="function",
                                function=SSEFunction(name=content_block.name, arguments=""),
                            )],
                        )
                    else:
                        delta = SSEDelta(role="assistant", content="")
                    choice = SSEChoice(index=raw_event.index, delta=delta)
                    return SSEChunk(
                        id=f"content_block_start_{raw_event.index}",
                        object="chat.completion.chunk",
                        created=current_time,
                        model=self.model_name,
                        choices=[choice],
                    )

                case "content_block_delta":
                    delta_info = raw_event.delta
                    if delta_info.type == "text_delta":
                        delta = SSEDelta(
                            role="assistant",
                            content=delta_info.text,
                        )
                    elif delta_info.type == "input_json_delta":
                        delta = SSEDelta(
                            role="assistant",
                            content="",
                            tool_calls=[SSEToolCall(
                                type="function",
                                function=SSEFunction(
                                    name="",
                                    arguments=delta_info.partial_json,
                                ),
                            )],
                        )
                    else:
                        delta = SSEDelta(role="assistant", content="")
                    choice = SSEChoice(index=raw_event.index, delta=delta)
                    return SSEChunk(
                        id=f"delta_{raw_event.index}",
                        object="chat.completion.chunk",
                        created=current_time,
                        model=self.model_name,
                        choices=[choice],
                    )

                case "content_block_stop":
                    delta = SSEDelta(role="assistant", content="")
                    choice = SSEChoice(index=raw_event.index, delta=delta)
                    return SSEChunk(
                        id=f"block_stop_{raw_event.index}",
                        object="chat.completion.chunk",
                        created=current_time,
                        model=self.model_name,
                        choices=[choice],
                    )

                case "message_delta":
                    delta = SSEDelta(role="assistant", content="")
                    choice = SSEChoice(
                        index=0,
                        delta=delta,
                        finish_reason=getattr(raw_event.delta, "stop_reason", None),
                    )
                    return SSEChunk(
                        id="message_delta",
                        object="chat.completion.chunk",
                        created=current_time,
                        model=self.model_name,
                        choices=[choice],
                    )

                case "message_stop":
                    delta = SSEDelta(role="assistant", content="")
                    choice = SSEChoice(index=0, delta=delta)
                    return SSEChunk(
                        id="message_stop",
                        object="chat.completion.chunk",
                        created=current_time,
                        model=self.model_name,
                        choices=[choice],
                    )

                case _:
                    delta = SSEDelta(role="assistant", content="")
                    choice = SSEChoice(index=0, delta=delta)
                    return SSEChunk(
                        id=f"unknown_{event_type}",
                        object="chat.completion.chunk",
                        created=current_time,
                        model=self.model_name,
                        choices=[choice],
                    )

        except Exception as e:
            logger.error(f"Error converting Anthropic event: {raw_event}", exc_info=True)
            raise ValueError(f"Failed to convert Anthropic response to SSEChunk: {str(e)}") from e

__init__(model_name, **default_params)

Initialize Anthropic Adapter.

Parameters:

Name Type Description Default
model_name str

The name of the model to use.

required
**default_params

Additional default parameters for the adapter.

{}

Raises:

Type Description
ValueError

If the Anthropic API key is missing.

Source code in src/llm/adapters/anthropic_adapter.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def __init__(self, model_name: str, **default_params):
    """Initialize Anthropic Adapter.

    Args:
        model_name (str): The name of the model to use.
        **default_params: Additional default parameters for the adapter.

    Raises:
        ValueError: If the Anthropic API key is missing.
    """
    self.api_key = os.getenv("ANTHROPIC_API_KEY")
    if not self.api_key:
        raise ValueError(
            "Missing Anthropic API key. Set the ANTHROPIC_API_KEY environment variable."
        )
    self.client = AsyncAnthropic(api_key=self.api_key)
    self.model_name = model_name
    self.default_params = default_params
    logger.info(f"Anthropic Adapter initialized with model: {self.model_name}")

gen_chat_sse_stream(messages, tools=None, **kwargs) async

Generate a streaming chat response.

Parameters:

Name Type Description Default
`messages` List[TextChatMessage]

A list of chat messages.

required
`tools` Optional[List[Tool]]

A list of Tool objects. Defaults to None.

required
`**kwargs`

Additional keyword arguments for generation.

required

Yields:

Type Description
AsyncGenerator[SSEChunk, None]

AsyncGenerator[SSEChunk, None]: A generator yielding SSEChunk objects.

Source code in src/llm/adapters/anthropic_adapter.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
async def gen_chat_sse_stream(
    self,
    messages: List[TextChatMessage],
    tools: Optional[List[Tool]] = None,
    **kwargs,
) -> AsyncGenerator[SSEChunk, None]:
    """Generate a streaming chat response.

    Args:
        `messages` (List[TextChatMessage]): A list of chat messages.
        `tools` (Optional[List[Tool]], optional): A list of Tool objects. Defaults to None.
        `**kwargs`: Additional keyword arguments for generation.

    Yields:
        AsyncGenerator[SSEChunk, None]: A generator yielding SSEChunk objects.
    """
    request_payload = {
        "model": self.model_name,
        "max_tokens": self.default_params.get("max_tokens", 1024),
        "stream": True,
        **self.default_params,
        **kwargs,
        **convert_messages_to_anthropic(messages),
    }

    if tools:
        anthropic_tools = [convert_tool_to_anthropic_format(tool) for tool in tools]
        request_payload["tools"] = anthropic_tools
        request_payload["tool_choice"] = {"type": "auto"}

    try:
        stream = await self.client.messages.create(**request_payload)
        async for event in stream:
            yield await self._convert_to_sse_chunk(event)
    except Exception as e:
        logger.error(f"Error in Anthropic streaming: {str(e)}", exc_info=True)
        raise RuntimeError(f"Anthropic API streaming failed: {str(e)}") from e

gen_sse_stream(prompt, **kwargs) async

Generate an SSE stream from a text prompt.

Parameters:

Name Type Description Default
prompt str

The text prompt to generate an SSE stream for.

required
**kwargs

Additional keyword arguments for generation.

{}

Yields:

Type Description
AsyncGenerator[SSEChunk, None]

AsyncGenerator[SSEChunk, None]: A generator yielding SSEChunk objects.

Source code in src/llm/adapters/anthropic_adapter.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
async def gen_sse_stream(
    self, prompt: str, **kwargs
) -> AsyncGenerator[SSEChunk, None]:
    """Generate an SSE stream from a text prompt.

    Args:
        prompt (str): The text prompt to generate an SSE stream for.
        **kwargs: Additional keyword arguments for generation.

    Yields:
        AsyncGenerator[SSEChunk, None]: A generator yielding SSEChunk objects.
    """
    async for chunk in self.gen_chat_sse_stream([{"role": "user", "content": prompt}], **kwargs):
        yield chunk