diff --git a/lib/crewai/src/crewai/llm.py b/lib/crewai/src/crewai/llm.py index 00897688b7..5bc5a5a0f2 100644 --- a/lib/crewai/src/crewai/llm.py +++ b/lib/crewai/src/crewai/llm.py @@ -9,6 +9,7 @@ import os import sys import threading +import warnings from typing import ( TYPE_CHECKING, Any, @@ -309,6 +310,7 @@ def writable(self) -> bool: CONTEXT_WINDOW_USAGE_RATIO: Final[float] = 0.85 SUPPORTED_NATIVE_PROVIDERS: Final[list[str]] = [ "openai", + "openai_responses", "anthropic", "claude", "azure", @@ -346,23 +348,50 @@ class LLM(BaseLLM): def __new__(cls, model: str, is_litellm: bool = False, **kwargs: Any) -> LLM: """Factory method that routes to native SDK or falls back to LiteLLM. + Supports both legacy provider parameter and new api parameter for clearer semantics. + Routing priority: - 1. If 'provider' kwarg is present, use that provider with constants + 1. If 'provider' kwarg is present, resolve provider + api combination 2. If only 'model' kwarg, use constants to infer provider 3. If "/" in model name: - Check if prefix is a native provider (openai/anthropic/azure/bedrock/gemini) - If yes, validate model against constants - If valid, route to native SDK; otherwise route to LiteLLM + + Args: + model: Model identifier (e.g., "gpt-4o", "claude-3-sonnet") + is_litellm: Force use of LiteLLM fallback + **kwargs: Additional parameters including: + - provider: Provider name ("openai", "anthropic", etc.) + - api: API type within provider ("chat", "responses", etc.) + - Other model parameters (temperature, etc.) """ if not model or not isinstance(model, str): raise ValueError("Model must be a non-empty string") explicit_provider = kwargs.get("provider") + explicit_api = kwargs.get("api") + # Resolve provider and API combination if explicit_provider: - provider = explicit_provider + try: + resolved_provider = cls._resolve_provider_and_api(explicit_provider, explicit_api) + except ValueError as e: + raise ValueError(f"Invalid provider/api combination: {e}") from e + + # Validate model compatibility with resolved provider + if not cls._validate_model_for_provider_api(model, resolved_provider, explicit_api): + supported_models = cls._get_supported_models_message(explicit_provider, explicit_api) + api_desc = f" with {explicit_api} API" if explicit_api else "" + raise ValueError( + f"Model '{model}' is not compatible with provider '{explicit_provider}'{api_desc}. " + f"Supported models: {supported_models}" + ) + + provider = resolved_provider use_native = True model_string = model + elif "/" in model: prefix, _, model_part = model.partition("/") @@ -378,18 +407,38 @@ def __new__(cls, model: str, is_litellm: bool = False, **kwargs: Any) -> LLM: "aws": "bedrock", } - canonical_provider = provider_mapping.get(prefix.lower()) - - if canonical_provider and cls._validate_model_in_constants( - model_part, canonical_provider - ): - provider = canonical_provider + if prefix.lower() == "openai_responses": + # Backwards-compatibility: allow the old prefix for now, but steer + # users to the explicit provider+api syntax. + if explicit_api not in (None, "responses"): + raise ValueError( + "Model prefix 'openai_responses/' implies api='responses' but " + f"got api='{explicit_api}'. Use provider='openai' with api='responses' instead." + ) + warnings.warn( + "Model prefix 'openai_responses/' is deprecated and will be removed in a future version. " + "Use provider='openai' with api='responses' instead.", + DeprecationWarning, + stacklevel=2, + ) + provider = "openai_responses" use_native = True model_string = model_part + canonical_provider = None else: - provider = prefix - use_native = False - model_string = model_part + canonical_provider = provider_mapping.get(prefix.lower()) + + if canonical_provider is not None: + if canonical_provider and cls._validate_model_in_constants( + model_part, canonical_provider + ): + provider = canonical_provider + use_native = True + model_string = model_part + else: + provider = prefix + use_native = False + model_string = model_part else: provider = cls._infer_provider_from_model(model) use_native = True @@ -398,8 +447,8 @@ def __new__(cls, model: str, is_litellm: bool = False, **kwargs: Any) -> LLM: native_class = cls._get_native_provider(provider) if use_native else None if native_class and not is_litellm and provider in SUPPORTED_NATIVE_PROVIDERS: try: - # Remove 'provider' from kwargs if it exists to avoid duplicate keyword argument - kwargs_copy = {k: v for k, v in kwargs.items() if k != "provider"} + # Remove 'provider' and 'api' from kwargs to avoid duplicate keyword arguments + kwargs_copy = {k: v for k, v in kwargs.items() if k not in ("provider", "api")} return cast( Self, native_class(model=model_string, provider=provider, **kwargs_copy), @@ -419,6 +468,75 @@ def __new__(cls, model: str, is_litellm: bool = False, **kwargs: Any) -> LLM: instance.is_litellm = True return instance + @classmethod + def _resolve_provider_and_api(cls, provider: str, api: str | None) -> str: + """Resolve logical provider + api combination to actual provider implementation. + + Currently only handles OpenAI's multiple APIs to eliminate confusion with provider="openai_responses". + No fallback is kept for provider="openai_responses" since that alias is removed. + + Args: + provider: Logical provider name (e.g., "openai") + api: API type (e.g., "responses", "chat", None) + + Returns: + Actual provider implementation name + """ + if provider == "openai": + if api == "responses": + return "openai_responses" + elif api == "chat" or api is None: + return "openai" + else: + raise ValueError(f"Unsupported API '{api}' for provider 'openai'. Supported: 'chat', 'responses'") + + # Explicitly disallow legacy alias to avoid confusion + if provider == "openai_responses": + # Backwards-compatibility: allow the legacy alias for now, but steer + # users to the explicit provider+api syntax. + if api not in (None, "responses"): + raise ValueError( + "provider='openai_responses' implies api='responses' but " + f"got api='{api}'. Use provider='openai' with api='responses' instead." + ) + warnings.warn( + "provider='openai_responses' is deprecated and will be removed in a future version. " + "Use provider='openai' with api='responses' instead.", + DeprecationWarning, + stacklevel=2, + ) + return "openai_responses" + + return provider + + @classmethod + def _validate_model_for_provider_api(cls, model: str, provider: str, api: str | None) -> bool: + """Validate if a model is compatible with the given provider and API combination. + + Currently focused on OpenAI API validation to prevent using incompatible models. + + Args: + model: Model name to validate + provider: Resolved provider name + api: API type (for additional context) + + Returns: + True if compatible, False otherwise + """ + return cls._matches_provider_pattern(model, provider) + + @classmethod + def _get_supported_models_message(cls, provider: str, api: str | None) -> str: + """Get a human-readable message about supported models for a provider/API combination.""" + if provider in ["openai_responses", "openai"] and api == "responses": + return ("gpt-4o, gpt-4o-mini, gpt-4.1, gpt-4.1-mini, gpt-4.1-nano, " + "o1, o1-mini, o1-preview, o3, o3-mini, o4-mini") + elif provider == "openai" or (provider == "openai" and api in [None, "chat"]): + return ("gpt-3.5-turbo, gpt-4, gpt-4-turbo, gpt-4o, gpt-4o-mini, " + "o1, o1-mini, o1-preview, o3, o3-mini, o4-mini, whisper-1, and other OpenAI models") + else: + return f"models supported by provider '{provider}'" + @classmethod def _matches_provider_pattern(cls, model: str, provider: str) -> bool: """Check if a model name matches provider-specific patterns. @@ -441,6 +559,13 @@ def _matches_provider_pattern(cls, model: str, provider: str) -> bool: for prefix in ["gpt-", "o1", "o3", "o4", "whisper-"] ) + if provider == "openai_responses": + # Responses API supports GPT-4o and o-series models + return any( + model_lower.startswith(prefix) + for prefix in ["gpt-4o", "gpt-4.1", "o1", "o3", "o4"] + ) + if provider == "anthropic" or provider == "claude": return any( model_lower.startswith(prefix) for prefix in ["claude-", "anthropic."] @@ -481,6 +606,15 @@ def _validate_model_in_constants(cls, model: str, provider: str) -> bool: if provider == "openai" and model in OPENAI_MODELS: return True + if provider == "openai_responses": + # Responses API supports subset of OpenAI models + responses_models = {"gpt-4o", "gpt-4o-mini", "gpt-4.1", "gpt-4.1-mini", "gpt-4.1-nano", + "o1", "o1-mini", "o1-preview", "o3", "o3-mini", "o4-mini"} + if model in responses_models: + return True + # Also check pattern matching for newer models with these prefixes + return cls._matches_provider_pattern(model, provider) + if ( provider == "anthropic" or provider == "claude" ) and model in ANTHROPIC_MODELS: @@ -489,11 +623,10 @@ def _validate_model_in_constants(cls, model: str, provider: str) -> bool: if (provider == "gemini" or provider == "google") and model in GEMINI_MODELS: return True - if provider == "bedrock" and model in BEDROCK_MODELS: + if (provider == "bedrock" or provider == "aws") and model in BEDROCK_MODELS: return True - if provider == "azure": - # azure does not provide a list of available models, determine a better way to handle this + if provider == "azure" and model in AZURE_MODELS: return True # Fallback to pattern matching for models not in constants @@ -538,6 +671,13 @@ def _get_native_provider(cls, provider: str) -> type | None: return OpenAICompletion + if provider == "openai_responses": + from crewai.llms.providers.openai_responses.completion import ( + OpenAIResponsesCompletion, + ) + + return OpenAIResponsesCompletion + if provider == "anthropic" or provider == "claude": from crewai.llms.providers.anthropic.completion import ( AnthropicCompletion, diff --git a/lib/crewai/src/crewai/llms/providers/openai_responses/__init__.py b/lib/crewai/src/crewai/llms/providers/openai_responses/__init__.py new file mode 100644 index 0000000000..d17e27b2e7 --- /dev/null +++ b/lib/crewai/src/crewai/llms/providers/openai_responses/__init__.py @@ -0,0 +1,7 @@ +"""OpenAI Responses API provider for CrewAI.""" + +from crewai.llms.providers.openai_responses.completion import ( + OpenAIResponsesCompletion, +) + +__all__ = ["OpenAIResponsesCompletion"] diff --git a/lib/crewai/src/crewai/llms/providers/openai_responses/completion.py b/lib/crewai/src/crewai/llms/providers/openai_responses/completion.py new file mode 100644 index 0000000000..728e8a7033 --- /dev/null +++ b/lib/crewai/src/crewai/llms/providers/openai_responses/completion.py @@ -0,0 +1,1045 @@ +"""OpenAI Responses API completion implementation for CrewAI. + +This module provides native integration with OpenAI's Responses API (/v1/responses), +offering advantages for agent-based workflows including simpler input format, +built-in conversation management via previous_response_id, and native support +for o-series reasoning models. +""" + +from __future__ import annotations + +import json +import logging +import os +from typing import TYPE_CHECKING, Any, Literal + +import httpx +from openai import AsyncOpenAI, OpenAI +from openai.types.responses import Response +from pydantic import BaseModel + +from crewai.events.types.llm_events import LLMCallType +from crewai.llms.base_llm import BaseLLM +from crewai.llms.hooks.transport import AsyncHTTPTransport, HTTPTransport +from crewai.utilities.agent_utils import is_context_length_exceeded +from crewai.utilities.exceptions.context_window_exceeding_exception import ( + LLMContextLengthExceededError, +) + + +if TYPE_CHECKING: + from crewai.agent.core import Agent + from crewai.llms.hooks.base import BaseInterceptor + from crewai.task import Task + from crewai.tools.base_tool import BaseTool + from crewai.utilities.types import LLMMessage + + +# Context window sizes for models supported by Responses API +RESPONSES_API_CONTEXT_WINDOWS: dict[str, int] = { + "gpt-4o": 128000, + "gpt-4o-mini": 128000, + "gpt-4.1": 1047576, + "gpt-4.1-mini": 1047576, + "gpt-4.1-nano": 1047576, + "o1": 200000, + "o1-mini": 128000, + "o1-preview": 128000, + "o3": 200000, + "o3-mini": 200000, + "o4-mini": 200000, +} + + +class OpenAIResponsesCompletion(BaseLLM): + """OpenAI Responses API completion implementation. + + This class provides native integration with OpenAI's Responses API, + offering advantages over the Chat Completions API for agent workflows: + + - Simpler input format: Use plain strings or structured input instead of + complex message arrays + - Built-in conversation management: Stateful interactions with + previous_response_id for multi-turn conversations + - Native tool support: Cleaner function calling semantics + - Streaming support: Real-time token streaming with simpler event handling + - Better support for o-series reasoning models: reasoning_effort parameter + + Usage: + ```python + from crewai import Agent, LLM + + # Option 1: Using provider parameter + llm = LLM(model="gpt-4o", provider="openai_responses") + + # Option 2: Using model prefix + llm = LLM(model="openai_responses/gpt-4o") + + # With o-series reasoning models + llm = LLM( + model="o3-mini", + provider="openai_responses", + reasoning_effort="high" + ) + + agent = Agent( + role="Research Analyst", + goal="Find and summarize information", + backstory="Expert researcher", + llm=llm, + ) + ``` + """ + + def __init__( + self, + model: str = "gpt-4o", + api_key: str | None = None, + base_url: str | None = None, + organization: str | None = None, + project: str | None = None, + timeout: float | None = None, + max_retries: int = 2, + default_headers: dict[str, str] | None = None, + default_query: dict[str, Any] | None = None, + client_params: dict[str, Any] | None = None, + temperature: float | None = None, + top_p: float | None = None, + max_output_tokens: int | None = None, + stream: bool = False, + reasoning_effort: Literal["low", "medium", "high"] | None = None, + previous_response_id: str | None = None, + provider: str | None = None, + interceptor: BaseInterceptor[httpx.Request, httpx.Response] | None = None, + **kwargs: Any, + ) -> None: + """Initialize OpenAI Responses API client. + + Args: + model: Model identifier (e.g., "gpt-4o", "o3-mini") + api_key: OpenAI API key (falls back to OPENAI_API_KEY env var) + base_url: Custom API base URL + organization: OpenAI organization ID + project: OpenAI project ID + timeout: Request timeout in seconds + max_retries: Maximum retry attempts + default_headers: Default headers for all requests + default_query: Default query parameters for all requests + client_params: Additional client configuration parameters + temperature: Sampling temperature (0-2) + top_p: Nucleus sampling parameter + max_output_tokens: Maximum tokens in response + stream: Enable streaming responses + reasoning_effort: Effort level for o-series reasoning models + ("low", "medium", "high") + previous_response_id: ID of previous response for multi-turn + conversations + provider: Provider identifier (typically "openai_responses") + interceptor: HTTP interceptor for request/response modification + **kwargs: Additional provider-specific parameters + """ + if provider is None: + provider = kwargs.pop("provider", "openai_responses") + + self.interceptor = interceptor + + # Client configuration + self.organization = organization + self.project = project + self.max_retries = max_retries + self.default_headers = default_headers + self.default_query = default_query + self.client_params = client_params + self.timeout = timeout + self.base_url = base_url + self.api_base = kwargs.pop("api_base", None) + + super().__init__( + model=model, + temperature=temperature, + api_key=api_key or os.getenv("OPENAI_API_KEY"), + base_url=base_url, + timeout=timeout, + provider=provider, + **kwargs, + ) + + # Initialize sync client + client_config = self._get_client_params() + if self.interceptor: + transport = HTTPTransport(interceptor=self.interceptor) + http_client = httpx.Client(transport=transport) + client_config["http_client"] = http_client + + self.client = OpenAI(**client_config) + + # Initialize async client + async_client_config = self._get_client_params() + if self.interceptor: + async_transport = AsyncHTTPTransport(interceptor=self.interceptor) + async_http_client = httpx.AsyncClient(transport=async_transport) + async_client_config["http_client"] = async_http_client + + self.async_client = AsyncOpenAI(**async_client_config) + + # Responses API specific parameters + self.top_p = top_p + self.max_output_tokens = max_output_tokens + self.stream = stream + self.reasoning_effort = reasoning_effort + self.previous_response_id = previous_response_id + + # Model type detection + self.is_o_series = any( + model.lower().startswith(prefix) + for prefix in ["o1", "o3", "o4"] + ) + + def _get_client_params(self) -> dict[str, Any]: + """Get OpenAI client initialization parameters.""" + if self.api_key is None: + self.api_key = os.getenv("OPENAI_API_KEY") + if self.api_key is None: + raise ValueError("OPENAI_API_KEY is required") + + base_params = { + "api_key": self.api_key, + "organization": self.organization, + "project": self.project, + "base_url": self.base_url + or self.api_base + or os.getenv("OPENAI_BASE_URL") + or None, + "timeout": self.timeout, + "max_retries": self.max_retries, + "default_headers": self.default_headers, + "default_query": self.default_query, + } + + client_params = {k: v for k, v in base_params.items() if v is not None} + + if self.client_params: + client_params.update(self.client_params) + + return client_params + + def call( + self, + messages: str | list[LLMMessage], + tools: list[dict[str, BaseTool]] | None = None, + callbacks: list[Any] | None = None, + available_functions: dict[str, Any] | None = None, + from_task: Task | None = None, + from_agent: Agent | None = None, + response_model: type[BaseModel] | None = None, + ) -> str | Any: + """Call OpenAI Responses API. + + Args: + messages: Input messages (string or list of message dicts). + System messages are converted to the `instructions` parameter. + Other messages become the `input` parameter. + tools: List of tool/function definitions + callbacks: Callback functions (not used in native implementation) + available_functions: Available functions for tool calling + from_task: Task that initiated the call + from_agent: Agent that initiated the call + response_model: Pydantic model for structured output + + Returns: + Response text or tool call result + """ + try: + self._emit_call_started_event( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + ) + + formatted_messages = self._format_messages(messages) + + if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent): + raise ValueError("LLM call blocked by before_llm_call hook") + + params = self._prepare_responses_params( + messages=formatted_messages, + tools=tools, + response_model=response_model, + ) + + if self.stream: + return self._handle_streaming_response( + params=params, + formatted_messages=formatted_messages, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + response_model=response_model, + ) + + return self._handle_response( + params=params, + formatted_messages=formatted_messages, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + response_model=response_model, + ) + + except Exception as e: + if is_context_length_exceeded(e): + logging.error(f"Context window exceeded: {e}") + raise LLMContextLengthExceededError(str(e)) from e + + error_msg = f"OpenAI Responses API call failed: {e!s}" + logging.error(error_msg) + self._emit_call_failed_event( + error=error_msg, from_task=from_task, from_agent=from_agent + ) + raise + + async def acall( + self, + messages: str | list[LLMMessage], + tools: list[dict[str, BaseTool]] | None = None, + callbacks: list[Any] | None = None, + available_functions: dict[str, Any] | None = None, + from_task: Task | None = None, + from_agent: Agent | None = None, + response_model: type[BaseModel] | None = None, + ) -> str | Any: + """Async call to OpenAI Responses API. + + Args: + messages: Input messages (string or list of message dicts) + tools: List of tool/function definitions + callbacks: Callback functions + available_functions: Available functions for tool calling + from_task: Task that initiated the call + from_agent: Agent that initiated the call + response_model: Pydantic model for structured output + + Returns: + Response text or tool call result + """ + try: + self._emit_call_started_event( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + ) + + formatted_messages = self._format_messages(messages) + + if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent): + raise ValueError("LLM call blocked by before_llm_call hook") + + params = self._prepare_responses_params( + messages=formatted_messages, + tools=tools, + response_model=response_model, + ) + + if self.stream: + return await self._ahandle_streaming_response( + params=params, + formatted_messages=formatted_messages, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + response_model=response_model, + ) + + return await self._ahandle_response( + params=params, + formatted_messages=formatted_messages, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + response_model=response_model, + ) + + except Exception as e: + if is_context_length_exceeded(e): + logging.error(f"Context window exceeded: {e}") + raise LLMContextLengthExceededError(str(e)) from e + + error_msg = f"OpenAI Responses API call failed: {e!s}" + logging.error(error_msg) + self._emit_call_failed_event( + error=error_msg, from_task=from_task, from_agent=from_agent + ) + raise + + def _prepare_responses_params( + self, + messages: list[LLMMessage], + tools: list[dict[str, BaseTool]] | None = None, + response_model: type[BaseModel] | None = None, + ) -> dict[str, Any]: + """Prepare parameters for Responses API call. + + Converts CrewAI message format to Responses API format: + - System messages become `instructions` + - User/assistant messages become `input` + + Args: + messages: List of message dictionaries + tools: Optional tool definitions + response_model: Optional Pydantic model for structured output + + Returns: + Parameters dict for responses.create() + """ + # Extract system messages as instructions + instructions_parts: list[str] = [] + input_messages: list[dict[str, str]] = [] + + for msg in messages: + role = msg.get("role", "user") + content = msg.get("content", "") + + if role == "system": + instructions_parts.append(content) + else: + input_messages.append({"role": role, "content": content}) + + # Build input - can be a string for simple cases or structured + if len(input_messages) == 1 and input_messages[0]["role"] == "user": + # Simple case: single user message as string + api_input: str | list[dict[str, str]] = input_messages[0]["content"] + else: + # Complex case: multiple messages as list + api_input = input_messages + + params: dict[str, Any] = { + "model": self.model, + "input": api_input, + } + + # Add instructions if we have system messages + if instructions_parts: + params["instructions"] = "\n\n".join(instructions_parts) + + # Generation parameters + if self.temperature is not None: + params["temperature"] = self.temperature + if self.top_p is not None: + params["top_p"] = self.top_p + if self.max_output_tokens is not None: + params["max_output_tokens"] = self.max_output_tokens + + # Reasoning effort for o-series models + if self.is_o_series and self.reasoning_effort: + params["reasoning"] = {"effort": self.reasoning_effort} + + # Stateful conversation + if self.previous_response_id: + params["previous_response_id"] = self.previous_response_id + + # Tools + if tools: + params["tools"] = self._convert_tools_for_responses(tools) + params["tool_choice"] = "auto" + + # Structured output via text format + if response_model: + params["text"] = { + "format": { + "type": "json_schema", + "name": response_model.__name__, + "schema": response_model.model_json_schema(), + "strict": True, + } + } + + # Streaming + if self.stream: + params["stream"] = True + + return params + + def _convert_tools_for_responses( + self, tools: list[dict[str, BaseTool]] + ) -> list[dict[str, Any]]: + """Convert CrewAI tools to Responses API format. + + The Responses API uses a similar tool format to Chat Completions, + with `strict: true` set by default for better reliability. + + Args: + tools: CrewAI tool definitions + + Returns: + List of Responses API tool definitions + """ + from crewai.llms.providers.utils.common import safe_tool_conversion + + responses_tools: list[dict[str, Any]] = [] + + for tool in tools: + name, description, parameters = safe_tool_conversion(tool, "OpenAI") + + responses_tool: dict[str, Any] = { + "type": "function", + "function": { + "name": name, + "description": description, + "strict": True, # Enable strict mode by default + }, + } + + if parameters: + if isinstance(parameters, dict): + responses_tool["function"]["parameters"] = parameters + else: + responses_tool["function"]["parameters"] = dict(parameters) + + responses_tools.append(responses_tool) + + return responses_tools + + def _handle_response( + self, + params: dict[str, Any], + formatted_messages: list[LLMMessage], + available_functions: dict[str, Any] | None = None, + from_task: Task | None = None, + from_agent: Agent | None = None, + response_model: type[BaseModel] | None = None, + ) -> str | Any: + """Handle non-streaming Responses API call. + + Args: + params: Responses API parameters + available_functions: Available functions for tool execution + from_task: Task context + from_agent: Agent context + response_model: Optional response model for structured output + + Returns: + Response text or tool call result + """ + response: Response = self.client.responses.create(**params) + + # Store response ID for potential follow-up calls + self.previous_response_id = response.id + + # Track token usage + if response.usage: + self._track_token_usage_internal({ + "prompt_tokens": response.usage.input_tokens, + "completion_tokens": response.usage.output_tokens, + "total_tokens": response.usage.total_tokens, + }) + + # Handle tool calls + if response.output and available_functions: + for output_item in response.output: + if hasattr(output_item, "type") and output_item.type == "function_call": + function_name = output_item.name + try: + function_args = json.loads(output_item.arguments) + except json.JSONDecodeError as e: + logging.error(f"Failed to parse tool arguments: {e}") + continue + + result = self._handle_tool_execution( + function_name=function_name, + function_args=function_args, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + ) + + if result is not None: + return result + + # Extract text content + content = response.output_text or "" + content = self._apply_stop_words(content) + + # Handle structured output + if response_model and content: + try: + parsed = response_model.model_validate_json(content) + structured_json = parsed.model_dump_json() + self._emit_call_completed_event( + response=structured_json, + call_type=LLMCallType.LLM_CALL, + from_task=from_task, + from_agent=from_agent, + ) + structured_json = self._invoke_after_llm_call_hooks( + formatted_messages, structured_json, from_agent + ) + return structured_json + except Exception as e: + logging.warning(f"Structured output parsing failed: {e}") + + self._emit_call_completed_event( + response=content, + call_type=LLMCallType.LLM_CALL, + from_task=from_task, + from_agent=from_agent, + ) + + content = self._invoke_after_llm_call_hooks( + formatted_messages, content, from_agent + ) + + return content + + async def _ahandle_response( + self, + params: dict[str, Any], + formatted_messages: list[LLMMessage], + available_functions: dict[str, Any] | None = None, + from_task: Task | None = None, + from_agent: Agent | None = None, + response_model: type[BaseModel] | None = None, + ) -> str | Any: + """Handle async non-streaming Responses API call. + + Args: + params: Responses API parameters + available_functions: Available functions for tool execution + from_task: Task context + from_agent: Agent context + response_model: Optional response model for structured output + + Returns: + Response text or tool call result + """ + response: Response = await self.async_client.responses.create(**params) + + # Store response ID + self.previous_response_id = response.id + + # Track token usage + if response.usage: + self._track_token_usage_internal({ + "prompt_tokens": response.usage.input_tokens, + "completion_tokens": response.usage.output_tokens, + "total_tokens": response.usage.total_tokens, + }) + + # Handle tool calls + if response.output and available_functions: + for output_item in response.output: + if hasattr(output_item, "type") and output_item.type == "function_call": + function_name = output_item.name + try: + function_args = json.loads(output_item.arguments) + except json.JSONDecodeError as e: + logging.error(f"Failed to parse tool arguments: {e}") + continue + + result = self._handle_tool_execution( + function_name=function_name, + function_args=function_args, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + ) + + if result is not None: + return result + + # Extract text content + content = response.output_text or "" + content = self._apply_stop_words(content) + + # Handle structured output + if response_model and content: + try: + parsed = response_model.model_validate_json(content) + structured_json = parsed.model_dump_json() + self._emit_call_completed_event( + response=structured_json, + call_type=LLMCallType.LLM_CALL, + from_task=from_task, + from_agent=from_agent, + ) + structured_json = self._invoke_after_llm_call_hooks( + formatted_messages, structured_json, from_agent + ) + return structured_json + except Exception as e: + logging.warning(f"Structured output parsing failed: {e}") + + self._emit_call_completed_event( + response=content, + call_type=LLMCallType.LLM_CALL, + from_task=from_task, + from_agent=from_agent, + ) + + return self._invoke_after_llm_call_hooks( + formatted_messages, content, from_agent + ) + + def _handle_streaming_response( + self, + params: dict[str, Any], + formatted_messages: list[LLMMessage], + available_functions: dict[str, Any] | None = None, + from_task: Task | None = None, + from_agent: Agent | None = None, + response_model: type[BaseModel] | None = None, + ) -> str | Any: + """Handle streaming Responses API call. + + Args: + params: Responses API parameters + available_functions: Available functions for tool execution + from_task: Task context + from_agent: Agent context + response_model: Optional response model for structured output + + Returns: + Complete response text or tool call result + """ + full_response = "" + tool_calls: dict[str, dict[str, Any]] = {} + response_id: str | None = None + usage_data: dict[str, int] = {} + + with self.client.responses.stream(**params) as stream: + for event in stream: + event_type = getattr(event, "type", None) + + # Handle response created event + if event_type == "response.created": + if hasattr(event, "response") and event.response: + response_id = event.response.id + + # Handle text delta events + elif event_type == "response.output_text.delta": + delta_text = getattr(event, "delta", "") + if delta_text: + full_response += delta_text + self._emit_stream_chunk_event( + chunk=delta_text, + from_task=from_task, + from_agent=from_agent, + ) + + # Handle function call argument delta + elif event_type == "response.function_call_arguments.delta": + item_id = getattr(event, "item_id", "default") + delta = getattr(event, "delta", "") + + if item_id not in tool_calls: + tool_calls[item_id] = { + "name": "", + "arguments": "", + } + + tool_calls[item_id]["arguments"] += delta + + self._emit_stream_chunk_event( + chunk=delta, + from_task=from_task, + from_agent=from_agent, + tool_call={ + "id": item_id, + "function": tool_calls[item_id], + "type": "function", + }, + call_type=LLMCallType.TOOL_CALL, + ) + + # Handle function call name + elif event_type == "response.output_item.added": + item = getattr(event, "item", None) + if item and hasattr(item, "type") and item.type == "function_call": + item_id = getattr(item, "id", "default") + if item_id not in tool_calls: + tool_calls[item_id] = { + "name": getattr(item, "name", ""), + "arguments": "", + } + else: + tool_calls[item_id]["name"] = getattr(item, "name", "") + + # Handle completion event + elif event_type == "response.completed": + if hasattr(event, "response") and event.response: + resp = event.response + response_id = resp.id + if resp.usage: + usage_data = { + "prompt_tokens": resp.usage.input_tokens, + "completion_tokens": resp.usage.output_tokens, + "total_tokens": resp.usage.total_tokens, + } + + # Store response ID + if response_id: + self.previous_response_id = response_id + + # Track token usage + if usage_data: + self._track_token_usage_internal(usage_data) + + # Handle tool calls + if tool_calls and available_functions: + for item_id, call_data in tool_calls.items(): + function_name = call_data.get("name", "") + arguments = call_data.get("arguments", "") + + if not function_name or not arguments: + continue + + if function_name not in available_functions: + logging.warning( + f"Function '{function_name}' not found in available functions" + ) + continue + + try: + function_args = json.loads(arguments) + except json.JSONDecodeError as e: + logging.error(f"Failed to parse streamed tool arguments: {e}") + continue + + result = self._handle_tool_execution( + function_name=function_name, + function_args=function_args, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + ) + + if result is not None: + return result + + # Apply stop words + full_response = self._apply_stop_words(full_response) + + # Handle structured output + if response_model and full_response: + try: + parsed = response_model.model_validate_json(full_response) + structured_json = parsed.model_dump_json() + self._emit_call_completed_event( + response=structured_json, + call_type=LLMCallType.LLM_CALL, + from_task=from_task, + from_agent=from_agent, + ) + structured_json = self._invoke_after_llm_call_hooks( + formatted_messages, structured_json, from_agent + ) + return structured_json + except Exception as e: + logging.warning(f"Structured output parsing failed: {e}") + + self._emit_call_completed_event( + response=full_response, + call_type=LLMCallType.LLM_CALL, + from_task=from_task, + from_agent=from_agent, + ) + + return self._invoke_after_llm_call_hooks( + formatted_messages, full_response, from_agent + ) + + async def _ahandle_streaming_response( + self, + params: dict[str, Any], + formatted_messages: list[LLMMessage], + available_functions: dict[str, Any] | None = None, + from_task: Task | None = None, + from_agent: Agent | None = None, + response_model: type[BaseModel] | None = None, + ) -> str | Any: + """Handle async streaming Responses API call. + + Args: + params: Responses API parameters + available_functions: Available functions for tool execution + from_task: Task context + from_agent: Agent context + response_model: Optional response model for structured output + + Returns: + Complete response text or tool call result + """ + full_response = "" + tool_calls: dict[str, dict[str, Any]] = {} + response_id: str | None = None + usage_data: dict[str, int] = {} + + async with self.async_client.responses.stream(**params) as stream: + async for event in stream: + event_type = getattr(event, "type", None) + + # Handle response created event + if event_type == "response.created": + if hasattr(event, "response") and event.response: + response_id = event.response.id + + # Handle text delta events + elif event_type == "response.output_text.delta": + delta_text = getattr(event, "delta", "") + if delta_text: + full_response += delta_text + self._emit_stream_chunk_event( + chunk=delta_text, + from_task=from_task, + from_agent=from_agent, + ) + + # Handle function call argument delta + elif event_type == "response.function_call_arguments.delta": + item_id = getattr(event, "item_id", "default") + delta = getattr(event, "delta", "") + + if item_id not in tool_calls: + tool_calls[item_id] = { + "name": "", + "arguments": "", + } + + tool_calls[item_id]["arguments"] += delta + + self._emit_stream_chunk_event( + chunk=delta, + from_task=from_task, + from_agent=from_agent, + tool_call={ + "id": item_id, + "function": tool_calls[item_id], + "type": "function", + }, + call_type=LLMCallType.TOOL_CALL, + ) + + # Handle function call name + elif event_type == "response.output_item.added": + item = getattr(event, "item", None) + if item and hasattr(item, "type") and item.type == "function_call": + item_id = getattr(item, "id", "default") + if item_id not in tool_calls: + tool_calls[item_id] = { + "name": getattr(item, "name", ""), + "arguments": "", + } + else: + tool_calls[item_id]["name"] = getattr(item, "name", "") + + # Handle completion event + elif event_type == "response.completed": + if hasattr(event, "response") and event.response: + resp = event.response + response_id = resp.id + if resp.usage: + usage_data = { + "prompt_tokens": resp.usage.input_tokens, + "completion_tokens": resp.usage.output_tokens, + "total_tokens": resp.usage.total_tokens, + } + + # Store response ID + if response_id: + self.previous_response_id = response_id + + # Track token usage + if usage_data: + self._track_token_usage_internal(usage_data) + + # Handle tool calls + if tool_calls and available_functions: + for item_id, call_data in tool_calls.items(): + function_name = call_data.get("name", "") + arguments = call_data.get("arguments", "") + + if not function_name or not arguments: + continue + + if function_name not in available_functions: + logging.warning( + f"Function '{function_name}' not found in available functions" + ) + continue + + try: + function_args = json.loads(arguments) + except json.JSONDecodeError as e: + logging.error(f"Failed to parse streamed tool arguments: {e}") + continue + + result = self._handle_tool_execution( + function_name=function_name, + function_args=function_args, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + ) + + if result is not None: + return result + + # Apply stop words + full_response = self._apply_stop_words(full_response) + + # Handle structured output + if response_model and full_response: + try: + parsed = response_model.model_validate_json(full_response) + structured_json = parsed.model_dump_json() + self._emit_call_completed_event( + response=structured_json, + call_type=LLMCallType.LLM_CALL, + from_task=from_task, + from_agent=from_agent, + ) + structured_json = self._invoke_after_llm_call_hooks( + formatted_messages, structured_json, from_agent + ) + return structured_json + except Exception as e: + logging.warning(f"Structured output parsing failed: {e}") + + self._emit_call_completed_event( + response=full_response, + call_type=LLMCallType.LLM_CALL, + from_task=from_task, + from_agent=from_agent, + ) + + return self._invoke_after_llm_call_hooks( + formatted_messages, full_response, from_agent + ) + + def supports_function_calling(self) -> bool: + """Check if the model supports function calling.""" + # Most models support function calling, but o1-preview doesn't + return "o1-preview" not in self.model.lower() + + def supports_stop_words(self) -> bool: + """Check if the model supports stop words.""" + # Responses API doesn't have native stop word support + # We apply stop words manually in _apply_stop_words + return self._supports_stop_words_implementation() + + def get_context_window_size(self) -> int: + """Get the context window size for the model.""" + from crewai.llm import CONTEXT_WINDOW_USAGE_RATIO + + # Find matching context window size + for model_prefix, size in RESPONSES_API_CONTEXT_WINDOWS.items(): + if self.model.lower().startswith(model_prefix): + return int(size * CONTEXT_WINDOW_USAGE_RATIO) + + # Default context window + return int(128000 * CONTEXT_WINDOW_USAGE_RATIO) diff --git a/lib/crewai/tests/llms/openai_responses/__init__.py b/lib/crewai/tests/llms/openai_responses/__init__.py new file mode 100644 index 0000000000..c03372ad64 --- /dev/null +++ b/lib/crewai/tests/llms/openai_responses/__init__.py @@ -0,0 +1 @@ +"""Tests for OpenAI Responses API provider.""" diff --git a/lib/crewai/tests/llms/openai_responses/test_openai_responses.py b/lib/crewai/tests/llms/openai_responses/test_openai_responses.py new file mode 100644 index 0000000000..5272f98630 --- /dev/null +++ b/lib/crewai/tests/llms/openai_responses/test_openai_responses.py @@ -0,0 +1,336 @@ +"""Tests for OpenAI Responses API via api='responses'.""" + +import sys +import types +from unittest.mock import MagicMock, patch + +import pytest + +from crewai.llm import LLM +from crewai.llms.providers.openai_responses.completion import ( + OpenAIResponsesCompletion, +) + + +def test_responses_completion_is_used_with_api_flag(): + """ + OpenAIResponsesCompletion should be used when provider='openai' and api='responses' + """ + llm = LLM(model="gpt-4o", provider="openai", api="responses") + + assert llm.__class__.__name__ == "OpenAIResponsesCompletion" + assert llm.provider == "openai_responses" + assert llm.model == "gpt-4o" + + +def test_model_prefix_responses_is_rejected(): + """ + Model prefix 'openai_responses/' is deprecated but should still work for now. + """ + with pytest.warns(DeprecationWarning): + llm = LLM(model="openai_responses/gpt-4o") + assert llm.__class__.__name__ == "OpenAIResponsesCompletion" + assert llm.provider == "openai_responses" + assert llm.model == "gpt-4o" + + +def test_provider_openai_responses_is_deprecated_but_supported(): + with pytest.warns(DeprecationWarning): + llm = LLM(model="gpt-4o", provider="openai_responses") + assert llm.__class__.__name__ == "OpenAIResponsesCompletion" + assert llm.provider == "openai_responses" + assert llm.model == "gpt-4o" + + +def test_provider_openai_responses_conflicting_api_is_rejected(): + with pytest.raises(ValueError): + LLM(model="gpt-4o", provider="openai_responses", api="chat") + + +def test_model_prefix_openai_responses_conflicting_api_is_rejected(): + with pytest.raises(ValueError): + LLM(model="openai_responses/gpt-4o", api="chat") + + +def test_responses_completion_module_is_imported(): + """ + The completion module should be imported when using api='responses'. + """ + module_name = "crewai.llms.providers.openai_responses.completion" + + # Remove module from cache if it exists + if module_name in sys.modules: + del sys.modules[module_name] + + # Create LLM instance - this should trigger the import + LLM(model="gpt-4o", provider="openai", api="responses") + + # Verify the module was imported + assert module_name in sys.modules + completion_mod = sys.modules[module_name] + assert isinstance(completion_mod, types.ModuleType) + + # Verify the class exists in the module + assert hasattr(completion_mod, "OpenAIResponsesCompletion") + + +def test_responses_completion_initialization_parameters(): + """ + Test that OpenAIResponsesCompletion is initialized with correct parameters + """ + llm = LLM( + model="gpt-4o", + provider="openai", + api="responses", + temperature=0.7, + max_output_tokens=1000, + api_key="test-key", + ) + + assert isinstance(llm, OpenAIResponsesCompletion) + assert llm.model == "gpt-4o" + assert llm.temperature == 0.7 + assert llm.max_output_tokens == 1000 + + +def test_openai_responses_completion_with_reasoning_effort(): + """ + Test that OpenAIResponsesCompletion accepts reasoning_effort for o-series models + """ + llm = LLM( + model="o3-mini", + provider="openai", + api="responses", + reasoning_effort="high", + ) + + assert isinstance(llm, OpenAIResponsesCompletion) + assert llm.model == "o3-mini" + assert llm.reasoning_effort == "high" + assert llm.is_o_series is True + + +def test_openai_responses_completion_call(): + """ + Test that OpenAIResponsesCompletion call method works + """ + llm = LLM(model="gpt-4o", provider="openai", api="responses") + + # Mock the call method on the instance + with patch.object(llm, "call", return_value="Hello! I'm ready to help.") as mock_call: + result = llm.call("Hello, how are you?") + + assert result == "Hello! I'm ready to help." + mock_call.assert_called_once_with("Hello, how are you?") + + +def test_openai_responses_prepare_params_simple_message(): + """ + Test that _prepare_responses_params handles simple string input correctly + """ + llm = OpenAIResponsesCompletion(model="gpt-4o") + + messages = [{"role": "user", "content": "Hello world"}] + params = llm._prepare_responses_params(messages) + + assert params["model"] == "gpt-4o" + assert params["input"] == "Hello world" # Simple case returns string + assert "instructions" not in params # No system message + + +def test_openai_responses_prepare_params_with_system_message(): + """ + Test that _prepare_responses_params converts system messages to instructions + """ + llm = OpenAIResponsesCompletion(model="gpt-4o") + + messages = [ + {"role": "system", "content": "You are a helpful assistant"}, + {"role": "user", "content": "Hello"}, + ] + params = llm._prepare_responses_params(messages) + + assert params["model"] == "gpt-4o" + assert params["instructions"] == "You are a helpful assistant" + assert params["input"] == "Hello" + + +def test_openai_responses_prepare_params_with_multiple_messages(): + """ + Test that _prepare_responses_params handles multiple messages correctly + """ + llm = OpenAIResponsesCompletion(model="gpt-4o") + + messages = [ + {"role": "system", "content": "You are a helpful assistant"}, + {"role": "user", "content": "What is 2+2?"}, + {"role": "assistant", "content": "4"}, + {"role": "user", "content": "What about 3+3?"}, + ] + params = llm._prepare_responses_params(messages) + + assert params["instructions"] == "You are a helpful assistant" + # Multiple messages should be a list + assert isinstance(params["input"], list) + assert len(params["input"]) == 3 # 3 non-system messages + + +def test_openai_responses_prepare_params_with_reasoning_effort(): + """ + Test that _prepare_responses_params includes reasoning parameter for o-series + """ + llm = OpenAIResponsesCompletion(model="o3-mini", reasoning_effort="high") + + messages = [{"role": "user", "content": "Solve this problem"}] + params = llm._prepare_responses_params(messages) + + assert params["reasoning"] == {"effort": "high"} + + +def test_openai_responses_prepare_params_with_tools(): + """ + Test that _prepare_responses_params correctly converts tools + """ + llm = OpenAIResponsesCompletion(model="gpt-4o") + + messages = [{"role": "user", "content": "Search for information"}] + tools = [ + { + "name": "search", + "description": "Search for information", + "parameters": {"type": "object", "properties": {"query": {"type": "string"}}}, + } + ] + + params = llm._prepare_responses_params(messages, tools=tools) + + assert "tools" in params + assert len(params["tools"]) == 1 + assert params["tools"][0]["type"] == "function" + assert params["tools"][0]["function"]["name"] == "search" + assert params["tools"][0]["function"]["strict"] is True + + +def test_openai_responses_supports_function_calling(): + """ + Test supports_function_calling returns correct value based on model + """ + # Regular model supports function calling + llm_gpt4 = OpenAIResponsesCompletion(model="gpt-4o") + assert llm_gpt4.supports_function_calling() is True + + # o1-preview doesn't support function calling + llm_o1_preview = OpenAIResponsesCompletion(model="o1-preview") + assert llm_o1_preview.supports_function_calling() is False + + +def test_openai_responses_get_context_window_size(): + """ + Test that get_context_window_size returns correct values for different models + """ + # GPT-4o has 128000 context window + llm_gpt4o = OpenAIResponsesCompletion(model="gpt-4o") + context_size = llm_gpt4o.get_context_window_size() + # Should be 85% of 128000 = 108800 + assert context_size == int(128000 * 0.85) + + # o3-mini has 200000 context window + llm_o3 = OpenAIResponsesCompletion(model="o3-mini") + context_size = llm_o3.get_context_window_size() + assert context_size == int(200000 * 0.85) + + +def test_openai_responses_is_o_series_detection(): + """ + Test that o-series models are correctly detected + """ + # O-series models + assert OpenAIResponsesCompletion(model="o1").is_o_series is True + assert OpenAIResponsesCompletion(model="o1-mini").is_o_series is True + assert OpenAIResponsesCompletion(model="o1-preview").is_o_series is True + assert OpenAIResponsesCompletion(model="o3-mini").is_o_series is True + + # Non-O-series models + assert OpenAIResponsesCompletion(model="gpt-4o").is_o_series is False + assert OpenAIResponsesCompletion(model="gpt-4o-mini").is_o_series is False + + +def test_openai_responses_raises_error_when_initialization_fails(): + """ + Test that LLM raises ImportError when native OpenAI Responses completion fails to initialize. + """ + with patch("crewai.llm.LLM._get_native_provider") as mock_get_provider: + + class FailingCompletion: + def __init__(self, *args, **kwargs): + raise Exception("Native SDK failed") + + mock_get_provider.return_value = FailingCompletion + + with pytest.raises(ImportError) as excinfo: + LLM(model="gpt-4o", provider="openai", api="responses") + + assert "Error importing native provider" in str(excinfo.value) + assert "Native SDK failed" in str(excinfo.value) + + +def test_openai_responses_streaming_parameter(): + """ + Test that streaming parameter is correctly set + """ + llm = OpenAIResponsesCompletion(model="gpt-4o", stream=True) + assert llm.stream is True + + llm_no_stream = OpenAIResponsesCompletion(model="gpt-4o", stream=False) + assert llm_no_stream.stream is False + + +def test_openai_responses_previous_response_id(): + """ + Test that previous_response_id parameter is handled correctly + """ + llm = OpenAIResponsesCompletion( + model="gpt-4o", + previous_response_id="resp_123abc", + ) + assert llm.previous_response_id == "resp_123abc" + + +def test_openai_responses_get_client_params_with_api_base(): + """ + Test that _get_client_params correctly converts api_base to base_url + """ + llm = OpenAIResponsesCompletion( + model="gpt-4o", + api_base="https://custom.openai.com/v1", + ) + client_params = llm._get_client_params() + assert client_params["base_url"] == "https://custom.openai.com/v1" + + +def test_openai_responses_convert_tools_strict_mode(): + """ + Test that tools are converted with strict: true by default + """ + llm = OpenAIResponsesCompletion(model="gpt-4o") + + tools = [ + { + "name": "get_weather", + "description": "Get weather information", + "parameters": { + "type": "object", + "properties": { + "location": {"type": "string", "description": "City name"}, + }, + "required": ["location"], + }, + } + ] + + converted = llm._convert_tools_for_responses(tools) + + assert len(converted) == 1 + assert converted[0]["type"] == "function" + assert converted[0]["function"]["strict"] is True + assert converted[0]["function"]["name"] == "get_weather"