From fc0a286104b4563425bc32b4960af54b115c5272 Mon Sep 17 00:00:00 2001 From: Julian Ubico Date: Wed, 25 Jun 2025 19:12:05 -0400 Subject: [PATCH 1/2] Support Claude extended thinking Enhances support for Claude's extended thinking by collecting, preserving, and injecting thinking blocks into model responses and conversation history. Updates serialization and deserialization logic in items.py and chatcmpl_converter.py to ensure thinking blocks are retained and correctly restored, allowing richer assistant reasoning to be surfaced in outputs. See: https://docs.anthropic.com/en/docs/build-with-claude/extended-thinking#extended-thinking-with-tool-use --- src/agents/extensions/models/litellm_model.py | 66 +++++++++++++++++-- src/agents/items.py | 63 ++++++++++++++++-- src/agents/models/chatcmpl_converter.py | 27 ++++++-- 3 files changed, 142 insertions(+), 14 deletions(-) diff --git a/src/agents/extensions/models/litellm_model.py b/src/agents/extensions/models/litellm_model.py index c58a52dae..63526b722 100644 --- a/src/agents/extensions/models/litellm_model.py +++ b/src/agents/extensions/models/litellm_model.py @@ -178,12 +178,53 @@ async def stream_response( ) final_response: Response | None = None - async for chunk in ChatCmplStreamHandler.handle_stream(response, stream): + collected_chunks = [] + + # Create async generator from collected chunks + async def replay_chunks(): + async for raw_chunk in stream: + collected_chunks.append(raw_chunk) + yield raw_chunk + + # Process chunks through handler + async for chunk in ChatCmplStreamHandler.handle_stream(response, replay_chunks()): yield chunk - if chunk.type == "response.completed": final_response = chunk.response + # Claude extended thinking: reconstruct thinking blocks from stream chunks + if final_response and collected_chunks: + # Extract thinking blocks and reasoning content from chunks + complete_thinking_blocks = [] + aggregated_reasoning = "" + + for chunk in collected_chunks: + if hasattr(chunk, 'choices') and chunk.choices: + choice = chunk.choices[0] + if hasattr(choice, 'delta'): + delta = choice.delta + if hasattr(delta, 'thinking_blocks') and delta.thinking_blocks: + for tb in delta.thinking_blocks: + if isinstance(tb, dict) and 'signature' in tb: + complete_thinking_blocks.append(tb) + if hasattr(delta, 'reasoning_content') and delta.reasoning_content: + aggregated_reasoning += delta.reasoning_content + + # Fill empty thinking fields with reasoning content + if complete_thinking_blocks and aggregated_reasoning: + for thinking_block in complete_thinking_blocks: + if isinstance(thinking_block, dict) and thinking_block.get('thinking') == '': + thinking_block['thinking'] = aggregated_reasoning + + # Inject thinking blocks into final response + if complete_thinking_blocks and final_response.output: + for output_item in final_response.output: + if (hasattr(output_item, 'role') and getattr(output_item, 'role') == 'assistant' and + hasattr(output_item, 'content') and isinstance(output_item.content, list)): + # Insert thinking blocks at the beginning + output_item.content = complete_thinking_blocks + output_item.content + break + if tracing.include_data() and final_response: span_generation.span_data.output = [final_response.model_dump()] @@ -360,14 +401,31 @@ def convert_message_to_openai( provider_specific_fields.get("refusal", None) if provider_specific_fields else None ) - return ChatCompletionMessage( - content=message.content, + # For ChatCompletionMessage, we need to keep content as string only + # We'll store thinking blocks separately and restore them during items_to_messages conversion + content = message.content + + # Store thinking blocks in a custom field that we can retrieve later + thinking_blocks = None + if hasattr(message, 'thinking_blocks') and getattr(message, 'thinking_blocks', None): + thinking_blocks = getattr(message, 'thinking_blocks', None) + + # Note: We don't modify content here because ChatCompletionMessage.content must be a string + + completion_message = ChatCompletionMessage( + content=content, refusal=refusal, role="assistant", annotations=cls.convert_annotations_to_openai(message), audio=message.get("audio", None), # litellm deletes audio if not present tool_calls=tool_calls, ) + + # Store thinking blocks as an extra field so we can retrieve them later + if thinking_blocks: + completion_message.thinking_blocks = thinking_blocks + + return completion_message @classmethod def convert_annotations_to_openai( diff --git a/src/agents/items.py b/src/agents/items.py index 64797ad22..8909389dc 100644 --- a/src/agents/items.py +++ b/src/agents/items.py @@ -76,8 +76,33 @@ def to_input_item(self) -> TResponseInputItem: # We know that input items are dicts, so we can ignore the type error return self.raw_item # type: ignore elif isinstance(self.raw_item, BaseModel): - # All output items are Pydantic models that can be converted to input items. - return self.raw_item.model_dump(exclude_unset=True) # type: ignore + # For Claude extended thinking: preserve thinking blocks by bypassing Pydantic filtering + if hasattr(self.raw_item, 'content') and hasattr(self.raw_item, 'role'): + result_dict = {} + # Copy all fields except content + for field_name, field_value in self.raw_item.__dict__.items(): + if field_name != 'content': + result_dict[field_name] = field_value + + # Preserve thinking blocks in content + if hasattr(self.raw_item, 'content') and isinstance(self.raw_item.content, list): + preserved_content = [] + for content_item in self.raw_item.content: + if isinstance(content_item, dict): + preserved_content.append(content_item) # Thinking blocks + elif hasattr(content_item, 'model_dump'): + preserved_content.append(content_item.model_dump(exclude_unset=False)) + else: + try: + preserved_content.append(dict(content_item)) + except: + pass + result_dict['content'] = preserved_content + + return result_dict # type: ignore + else: + # All output items are Pydantic models that can be converted to input items. + return self.raw_item.model_dump(exclude_unset=True) # type: ignore else: raise AgentsException(f"Unexpected raw item type: {type(self.raw_item)}") @@ -229,10 +254,36 @@ class ModelResponse: def to_input_items(self) -> list[TResponseInputItem]: """Convert the output into a list of input items suitable for passing to the model.""" - # We happen to know that the shape of the Pydantic output items are the same as the - # equivalent TypedDict input items, so we can just convert each one. - # This is also tested via unit tests. - return [it.model_dump(exclude_unset=True) for it in self.output] # type: ignore + result = [] + for item in self.output: + if isinstance(item, ResponseOutputMessage): + # For Claude extended thinking: preserve thinking blocks in conversation history + result_dict = {} + for field_name, field_value in item.__dict__.items(): + if field_name != 'content': + result_dict[field_name] = field_value + + if hasattr(item, 'content') and isinstance(item.content, list): + preserved_content = [] + for content_item in item.content: + if isinstance(content_item, dict): + preserved_content.append(content_item) # Thinking blocks + elif hasattr(content_item, 'model_dump'): + preserved_content.append(content_item.model_dump(exclude_unset=False)) + else: + try: + preserved_content.append(dict(content_item)) + except: + pass + result_dict['content'] = preserved_content + + result.append(result_dict) # type: ignore + else: + # We happen to know that the shape of the Pydantic output items are the same as the + # equivalent TypedDict input items, so we can just convert each one. + # This is also tested via unit tests. + result.append(item.model_dump(exclude_unset=True)) # type: ignore + return result class ItemHelpers: diff --git a/src/agents/models/chatcmpl_converter.py b/src/agents/models/chatcmpl_converter.py index 1d599e8c0..d20eea1b1 100644 --- a/src/agents/models/chatcmpl_converter.py +++ b/src/agents/models/chatcmpl_converter.py @@ -92,6 +92,11 @@ def message_to_output_items(cls, message: ChatCompletionMessage) -> list[TRespon type="message", status="completed", ) + + # Claude extended thinking: store thinking blocks as direct content items + thinking_blocks = getattr(message, 'thinking_blocks', None) + if thinking_blocks: + message_item.content.extend(thinking_blocks) if message.content: message_item.content.append( ResponseOutputText(text=message.content, type="output_text", annotations=[]) @@ -359,9 +364,13 @@ def ensure_assistant_message() -> ChatCompletionAssistantMessageParam: new_asst = ChatCompletionAssistantMessageParam(role="assistant") contents = resp_msg["content"] + # Claude extended thinking: extract thinking blocks and text segments + thinking_blocks = [] text_segments = [] for c in contents: - if c["type"] == "output_text": + if c["type"] == "thinking": + thinking_blocks.append(c) + elif c["type"] == "output_text": text_segments.append(c["text"]) elif c["type"] == "refusal": new_asst["refusal"] = c["refusal"] @@ -373,9 +382,19 @@ def ensure_assistant_message() -> ChatCompletionAssistantMessageParam: else: raise UserError(f"Unknown content type in ResponseOutputMessage: {c}") - if text_segments: - combined = "\n".join(text_segments) - new_asst["content"] = combined + # Create content array with thinking blocks first + if thinking_blocks: + content_array = thinking_blocks[:] + if text_segments: + content_array.append({ + "type": "text", + "text": "\n".join(text_segments) + }) + new_asst["content"] = content_array + elif text_segments: + combined = "\n".join(text_segments) + new_asst["content"] = combined + new_asst["content"] = "\n".join(text_segments) new_asst["tool_calls"] = [] current_assistant_msg = new_asst From d035610d9c9b18316882e6d03b5defe09c907190 Mon Sep 17 00:00:00 2001 From: Julian Ubico Date: Wed, 25 Jun 2025 21:33:55 -0400 Subject: [PATCH 2/2] Stream thinking tokens --- src/agents/extensions/models/litellm_model.py | 2 +- src/agents/models/chatcmpl_stream_handler.py | 27 +++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/src/agents/extensions/models/litellm_model.py b/src/agents/extensions/models/litellm_model.py index 63526b722..f3914b787 100644 --- a/src/agents/extensions/models/litellm_model.py +++ b/src/agents/extensions/models/litellm_model.py @@ -180,7 +180,7 @@ async def stream_response( final_response: Response | None = None collected_chunks = [] - # Create async generator from collected chunks + # Create async generator that streams thinking and regular content async def replay_chunks(): async for raw_chunk in stream: collected_chunks.append(raw_chunk) diff --git a/src/agents/models/chatcmpl_stream_handler.py b/src/agents/models/chatcmpl_stream_handler.py index d18f5912a..462ad2ffd 100644 --- a/src/agents/models/chatcmpl_stream_handler.py +++ b/src/agents/models/chatcmpl_stream_handler.py @@ -6,6 +6,7 @@ from openai import AsyncStream from openai.types.chat import ChatCompletionChunk from openai.types.completion_usage import CompletionUsage +from dataclasses import dataclass from openai.types.responses import ( Response, ResponseCompletedEvent, @@ -26,6 +27,12 @@ ) from openai.types.responses.response_usage import InputTokensDetails, OutputTokensDetails +@dataclass +class ThinkingDeltaEvent: + type: str + delta: str + sequence_number: int + from ..items import TResponseStreamEvent from .fake_id import FAKE_RESPONSES_ID @@ -75,6 +82,26 @@ async def handle_stream( delta = chunk.choices[0].delta + # Handle thinking content - emit as custom events + # Prioritize reasoning_content over thinking_blocks to avoid duplicates + thinking_content = None + + if hasattr(delta, 'reasoning_content') and delta.reasoning_content: + thinking_content = delta.reasoning_content + elif hasattr(delta, 'thinking_blocks') and delta.thinking_blocks: + # Only use thinking_blocks if no reasoning_content + for tb in delta.thinking_blocks: + if isinstance(tb, dict) and tb.get('thinking'): + thinking_content = tb['thinking'] + break # Only take the first one + + if thinking_content: + yield ThinkingDeltaEvent( + type="thinking.delta", + delta=thinking_content, + sequence_number=sequence_number.get_and_increment(), + ) + # Handle text if delta.content: if not state.text_content_index_and_output: