-
Notifications
You must be signed in to change notification settings - Fork 578
feat(integrations): openai-agents streaming support #5291
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
e5ad308
8884375
c446c35
ca2b9ba
e5d0ffa
5e0d558
5f73e4f
b09e718
635606f
fcd77a7
049cb16
599cdd0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,5 @@ | ||
| from .models import _create_get_model_wrapper # noqa: F401 | ||
| from .tools import _create_get_all_tools_wrapper # noqa: F401 | ||
| from .runner import _create_run_wrapper # noqa: F401 | ||
| from .runner import _create_run_wrapper, _create_run_streamed_wrapper # noqa: F401 | ||
| from .agent_run import _patch_agent_run # noqa: F401 | ||
| from .error_tracing import _patch_error_tracing # noqa: F401 |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,6 +1,7 @@ | ||||||||||||||||||||||||||||
| import sys | ||||||||||||||||||||||||||||
| from functools import wraps | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| from sentry_sdk.consts import SPANDATA | ||||||||||||||||||||||||||||
| from sentry_sdk.integrations import DidNotEnable | ||||||||||||||||||||||||||||
| from sentry_sdk.utils import reraise | ||||||||||||||||||||||||||||
| from ..spans import ( | ||||||||||||||||||||||||||||
|
|
@@ -31,22 +32,10 @@ def _patch_agent_run() -> None: | |||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| # Store original methods | ||||||||||||||||||||||||||||
| original_run_single_turn = agents.run.AgentRunner._run_single_turn | ||||||||||||||||||||||||||||
| original_run_single_turn_streamed = agents.run.AgentRunner._run_single_turn_streamed | ||||||||||||||||||||||||||||
| original_execute_handoffs = agents._run_impl.RunImpl.execute_handoffs | ||||||||||||||||||||||||||||
| original_execute_final_output = agents._run_impl.RunImpl.execute_final_output | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| def _start_invoke_agent_span( | ||||||||||||||||||||||||||||
| context_wrapper: "agents.RunContextWrapper", | ||||||||||||||||||||||||||||
| agent: "agents.Agent", | ||||||||||||||||||||||||||||
| kwargs: "dict[str, Any]", | ||||||||||||||||||||||||||||
| ) -> "Span": | ||||||||||||||||||||||||||||
| """Start an agent invocation span""" | ||||||||||||||||||||||||||||
| # Store the agent on the context wrapper so we can access it later | ||||||||||||||||||||||||||||
| context_wrapper._sentry_current_agent = agent | ||||||||||||||||||||||||||||
| span = invoke_agent_span(context_wrapper, agent, kwargs) | ||||||||||||||||||||||||||||
| context_wrapper._sentry_agent_span = span | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| return span | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| def _has_active_agent_span(context_wrapper: "agents.RunContextWrapper") -> bool: | ||||||||||||||||||||||||||||
| """Check if there's an active agent span for this context""" | ||||||||||||||||||||||||||||
| return getattr(context_wrapper, "_sentry_current_agent", None) is not None | ||||||||||||||||||||||||||||
|
|
@@ -57,6 +46,46 @@ def _get_current_agent( | |||||||||||||||||||||||||||
| """Get the current agent from context wrapper""" | ||||||||||||||||||||||||||||
| return getattr(context_wrapper, "_sentry_current_agent", None) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| def _close_streaming_workflow_span(agent: "Optional[agents.Agent]") -> None: | ||||||||||||||||||||||||||||
| """Close the workflow span for streaming executions if it exists.""" | ||||||||||||||||||||||||||||
| if agent and hasattr(agent, "_sentry_workflow_span"): | ||||||||||||||||||||||||||||
| workflow_span = agent._sentry_workflow_span | ||||||||||||||||||||||||||||
| workflow_span.__exit__(*sys.exc_info()) | ||||||||||||||||||||||||||||
| delattr(agent, "_sentry_workflow_span") | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| def _maybe_start_agent_span( | ||||||||||||||||||||||||||||
| context_wrapper: "agents.RunContextWrapper", | ||||||||||||||||||||||||||||
| agent: "agents.Agent", | ||||||||||||||||||||||||||||
| should_run_agent_start_hooks: bool, | ||||||||||||||||||||||||||||
| span_kwargs: "dict[str, Any]", | ||||||||||||||||||||||||||||
| is_streaming: bool = False, | ||||||||||||||||||||||||||||
| ) -> "Optional[Span]": | ||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||
| Start an agent invocation span if conditions are met. | ||||||||||||||||||||||||||||
| Handles ending any existing span for a different agent. | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| Returns the new span if started, or the existing span if conditions aren't met. | ||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||
| if not (should_run_agent_start_hooks and agent and context_wrapper): | ||||||||||||||||||||||||||||
| return getattr(context_wrapper, "_sentry_agent_span", None) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| # End any existing span for a different agent | ||||||||||||||||||||||||||||
| if _has_active_agent_span(context_wrapper): | ||||||||||||||||||||||||||||
| current_agent = _get_current_agent(context_wrapper) | ||||||||||||||||||||||||||||
| if current_agent and current_agent != agent: | ||||||||||||||||||||||||||||
| end_invoke_agent_span(context_wrapper, current_agent) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| # Store the agent on the context wrapper so we can access it later | ||||||||||||||||||||||||||||
| context_wrapper._sentry_current_agent = agent | ||||||||||||||||||||||||||||
| span = invoke_agent_span(context_wrapper, agent, span_kwargs) | ||||||||||||||||||||||||||||
| context_wrapper._sentry_agent_span = span | ||||||||||||||||||||||||||||
| agent._sentry_agent_span = span | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| if is_streaming: | ||||||||||||||||||||||||||||
| span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| return span | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| @wraps( | ||||||||||||||||||||||||||||
| original_run_single_turn.__func__ | ||||||||||||||||||||||||||||
| if hasattr(original_run_single_turn, "__func__") | ||||||||||||||||||||||||||||
|
|
@@ -68,28 +97,18 @@ async def patched_run_single_turn( | |||||||||||||||||||||||||||
| """Patched _run_single_turn that creates agent invocation spans""" | ||||||||||||||||||||||||||||
| agent = kwargs.get("agent") | ||||||||||||||||||||||||||||
| context_wrapper = kwargs.get("context_wrapper") | ||||||||||||||||||||||||||||
| should_run_agent_start_hooks = kwargs.get("should_run_agent_start_hooks") | ||||||||||||||||||||||||||||
| should_run_agent_start_hooks = kwargs.get("should_run_agent_start_hooks", False) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| span = getattr(context_wrapper, "_sentry_agent_span", None) | ||||||||||||||||||||||||||||
| # Start agent span when agent starts (but only once per agent) | ||||||||||||||||||||||||||||
| if should_run_agent_start_hooks and agent and context_wrapper: | ||||||||||||||||||||||||||||
| # End any existing span for a different agent | ||||||||||||||||||||||||||||
| if _has_active_agent_span(context_wrapper): | ||||||||||||||||||||||||||||
| current_agent = _get_current_agent(context_wrapper) | ||||||||||||||||||||||||||||
| if current_agent and current_agent != agent: | ||||||||||||||||||||||||||||
| end_invoke_agent_span(context_wrapper, current_agent) | ||||||||||||||||||||||||||||
| span = _maybe_start_agent_span( | ||||||||||||||||||||||||||||
| context_wrapper, agent, should_run_agent_start_hooks, kwargs | ||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| span = _start_invoke_agent_span(context_wrapper, agent, kwargs) | ||||||||||||||||||||||||||||
| agent._sentry_agent_span = span | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| # Call original method with all the correct parameters | ||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||
| result = await original_run_single_turn(*args, **kwargs) | ||||||||||||||||||||||||||||
| except Exception as exc: | ||||||||||||||||||||||||||||
| if span is not None and span.timestamp is None: | ||||||||||||||||||||||||||||
| _record_exception_on_span(span, exc) | ||||||||||||||||||||||||||||
| end_invoke_agent_span(context_wrapper, agent) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| reraise(*sys.exc_info()) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| return result | ||||||||||||||||||||||||||||
|
|
@@ -117,7 +136,9 @@ async def patched_execute_handoffs( | |||||||||||||||||||||||||||
| # Call original method with all parameters | ||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||
| result = await original_execute_handoffs(*args, **kwargs) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| except Exception: | ||||||||||||||||||||||||||||
| _close_streaming_workflow_span(agent) | ||||||||||||||||||||||||||||
| raise | ||||||||||||||||||||||||||||
| finally: | ||||||||||||||||||||||||||||
| # End span for current agent after handoff processing is complete | ||||||||||||||||||||||||||||
| if agent and context_wrapper and _has_active_agent_span(context_wrapper): | ||||||||||||||||||||||||||||
|
|
@@ -139,18 +160,77 @@ async def patched_execute_final_output( | |||||||||||||||||||||||||||
| context_wrapper = kwargs.get("context_wrapper") | ||||||||||||||||||||||||||||
| final_output = kwargs.get("final_output") | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| # Call original method with all parameters | ||||||||||||||||||||||||||||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||
| result = await original_execute_final_output(*args, **kwargs) | ||||||||||||||||||||||||||||
| finally: | ||||||||||||||||||||||||||||
| # End span for current agent after final output processing is complete | ||||||||||||||||||||||||||||
| if agent and context_wrapper and _has_active_agent_span(context_wrapper): | ||||||||||||||||||||||||||||
| end_invoke_agent_span(context_wrapper, agent, final_output) | ||||||||||||||||||||||||||||
| # For streaming, close the workflow span (non-streaming uses context manager in _create_run_wrapper) | ||||||||||||||||||||||||||||
| _close_streaming_workflow_span(agent) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| return result | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| @wraps( | ||||||||||||||||||||||||||||
| original_run_single_turn_streamed.__func__ | ||||||||||||||||||||||||||||
| if hasattr(original_run_single_turn_streamed, "__func__") | ||||||||||||||||||||||||||||
| else original_run_single_turn_streamed | ||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||
| async def patched_run_single_turn_streamed( | ||||||||||||||||||||||||||||
| cls: "agents.Runner", *args: "Any", **kwargs: "Any" | ||||||||||||||||||||||||||||
| ) -> "Any": | ||||||||||||||||||||||||||||
| """Patched _run_single_turn_streamed that creates agent invocation spans for streaming. | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| Note: Unlike _run_single_turn which uses keyword-only arguments (*,), | ||||||||||||||||||||||||||||
| _run_single_turn_streamed uses positional arguments. The call signature is: | ||||||||||||||||||||||||||||
| _run_single_turn_streamed( | ||||||||||||||||||||||||||||
| streamed_result, # args[0] | ||||||||||||||||||||||||||||
| agent, # args[1] | ||||||||||||||||||||||||||||
| hooks, # args[2] | ||||||||||||||||||||||||||||
| context_wrapper, # args[3] | ||||||||||||||||||||||||||||
| run_config, # args[4] | ||||||||||||||||||||||||||||
| should_run_agent_start_hooks, # args[5] | ||||||||||||||||||||||||||||
| tool_use_tracker, # args[6] | ||||||||||||||||||||||||||||
| all_tools, # args[7] | ||||||||||||||||||||||||||||
| server_conversation_tracker, # args[8] (optional) | ||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||
| streamed_result = args[0] if len(args) > 0 else kwargs.get("streamed_result") | ||||||||||||||||||||||||||||
| agent = args[1] if len(args) > 1 else kwargs.get("agent") | ||||||||||||||||||||||||||||
| context_wrapper = args[3] if len(args) > 3 else kwargs.get("context_wrapper") | ||||||||||||||||||||||||||||
| should_run_agent_start_hooks = bool( | ||||||||||||||||||||||||||||
| args[5] | ||||||||||||||||||||||||||||
| if len(args) > 5 | ||||||||||||||||||||||||||||
| else kwargs.get("should_run_agent_start_hooks", False) | ||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| span_kwargs: "dict[str, Any]" = {} | ||||||||||||||||||||||||||||
| if streamed_result and hasattr(streamed_result, "input"): | ||||||||||||||||||||||||||||
| span_kwargs["original_input"] = streamed_result.input | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| span = _maybe_start_agent_span( | ||||||||||||||||||||||||||||
| context_wrapper, | ||||||||||||||||||||||||||||
| agent, | ||||||||||||||||||||||||||||
| should_run_agent_start_hooks, | ||||||||||||||||||||||||||||
| span_kwargs, | ||||||||||||||||||||||||||||
| is_streaming=True, | ||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||
| result = await original_run_single_turn_streamed(*args, **kwargs) | ||||||||||||||||||||||||||||
| except Exception as exc: | ||||||||||||||||||||||||||||
| if span is not None and span.timestamp is None: | ||||||||||||||||||||||||||||
| _record_exception_on_span(span, exc) | ||||||||||||||||||||||||||||
| end_invoke_agent_span(context_wrapper, agent) | ||||||||||||||||||||||||||||
| _close_streaming_workflow_span(agent) | ||||||||||||||||||||||||||||
| reraise(*sys.exc_info()) | ||||||||||||||||||||||||||||
|
Comment on lines
+220
to
+225
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We only re-raise the user's exception, and not anything else, in case our stuff in between fails.
Suggested change
|
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| return result | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| # Apply patches | ||||||||||||||||||||||||||||
| agents.run.AgentRunner._run_single_turn = classmethod(patched_run_single_turn) | ||||||||||||||||||||||||||||
| agents.run.AgentRunner._run_single_turn_streamed = classmethod( | ||||||||||||||||||||||||||||
| patched_run_single_turn_streamed | ||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||
| agents._run_impl.RunImpl.execute_handoffs = classmethod(patched_execute_handoffs) | ||||||||||||||||||||||||||||
| agents._run_impl.RunImpl.execute_final_output = classmethod( | ||||||||||||||||||||||||||||
constantinius marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||
| patched_execute_final_output | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,5 @@ | ||
| import copy | ||
| import sys | ||
| from functools import wraps | ||
|
|
||
| from sentry_sdk.integrations import DidNotEnable | ||
|
|
@@ -17,7 +18,7 @@ | |
| from typing import TYPE_CHECKING | ||
|
|
||
| if TYPE_CHECKING: | ||
| from typing import Any, Callable | ||
| from typing import Any, Callable, Optional | ||
| from sentry_sdk.tracing import Span | ||
|
|
||
| try: | ||
|
|
@@ -27,6 +28,16 @@ | |
| raise DidNotEnable("OpenAI Agents not installed") | ||
|
|
||
|
|
||
| def _set_response_model_on_agent_span( | ||
| agent: "agents.Agent", response_model: "Optional[str]" | ||
| ) -> None: | ||
| """Set the response model on the agent's invoke_agent span if available.""" | ||
| if response_model: | ||
| agent_span = getattr(agent, "_sentry_agent_span", None) | ||
| if agent_span: | ||
| agent_span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, response_model) | ||
|
|
||
|
|
||
| def _inject_trace_propagation_headers( | ||
| hosted_tool: "HostedMCPTool", span: "Span" | ||
| ) -> None: | ||
|
|
@@ -74,15 +85,19 @@ def wrapped_get_model( | |
| # because we only patch its direct methods, all underlying data can remain unchanged. | ||
| model = copy.copy(original_get_model(agent, run_config)) | ||
|
|
||
| # Wrap _fetch_response if it exists (for OpenAI models) to capture raw response model | ||
| # Capture the request model name for spans (agent.model can be None when using defaults) | ||
| request_model_name = model.model if hasattr(model, "model") else str(model) | ||
| agent._sentry_request_model = request_model_name | ||
|
Comment on lines
+88
to
+90
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you split the model default change into it's own PR? |
||
|
|
||
| # Wrap _fetch_response if it exists (for OpenAI models) to capture response model | ||
| if hasattr(model, "_fetch_response"): | ||
| original_fetch_response = model._fetch_response | ||
|
|
||
| @wraps(original_fetch_response) | ||
| async def wrapped_fetch_response(*args: "Any", **kwargs: "Any") -> "Any": | ||
| response = await original_fetch_response(*args, **kwargs) | ||
| if hasattr(response, "model"): | ||
| agent._sentry_raw_response_model = str(response.model) | ||
| if hasattr(response, "model") and response.model: | ||
| agent._sentry_response_model = str(response.model) | ||
| return response | ||
|
|
||
| model._fetch_response = wrapped_fetch_response | ||
|
|
@@ -104,22 +119,59 @@ async def wrapped_get_response(*args: "Any", **kwargs: "Any") -> "Any": | |
|
|
||
| result = await original_get_response(*args, **kwargs) | ||
|
|
||
| response_model = getattr(agent, "_sentry_raw_response_model", None) | ||
| # Get response model captured from _fetch_response and clean up | ||
| response_model = getattr(agent, "_sentry_response_model", None) | ||
| if response_model: | ||
| agent_span = getattr(agent, "_sentry_agent_span", None) | ||
| if agent_span: | ||
| agent_span.set_data( | ||
| SPANDATA.GEN_AI_RESPONSE_MODEL, response_model | ||
| ) | ||
| delattr(agent, "_sentry_response_model") | ||
|
|
||
| delattr(agent, "_sentry_raw_response_model") | ||
|
|
||
| update_ai_client_span(span, agent, kwargs, result, response_model) | ||
| _set_response_model_on_agent_span(agent, response_model) | ||
| update_ai_client_span(span, result, response_model) | ||
|
|
||
| return result | ||
|
|
||
| model.get_response = wrapped_get_response | ||
|
|
||
| # Also wrap stream_response for streaming support | ||
| if hasattr(model, "stream_response"): | ||
| original_stream_response = model.stream_response | ||
|
|
||
| @wraps(original_stream_response) | ||
| async def wrapped_stream_response(*args: "Any", **kwargs: "Any") -> "Any": | ||
| # Uses explicit try/finally instead of context manager to ensure cleanup | ||
| # even if the consumer abandons the stream (GeneratorExit). | ||
| span_kwargs = dict(kwargs) | ||
| if len(args) > 0: | ||
| span_kwargs["system_instructions"] = args[0] | ||
| if len(args) > 1: | ||
| span_kwargs["input"] = args[1] | ||
|
|
||
| span = ai_client_span(agent, span_kwargs) | ||
| span.__enter__() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you're calling both |
||
| span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) | ||
|
|
||
| streaming_response = None | ||
| try: | ||
| async for event in original_stream_response(*args, **kwargs): | ||
| # Capture the full response from ResponseCompletedEvent | ||
| if hasattr(event, "response"): | ||
| streaming_response = event.response | ||
| yield event | ||
|
|
||
| # Update span with response data (usage, output, model) | ||
| if streaming_response: | ||
| response_model = ( | ||
| str(streaming_response.model) | ||
| if hasattr(streaming_response, "model") | ||
| and streaming_response.model | ||
| else None | ||
| ) | ||
| _set_response_model_on_agent_span(agent, response_model) | ||
| update_ai_client_span(span, streaming_response) | ||
| finally: | ||
| span.__exit__(*sys.exc_info()) | ||
|
|
||
| model.stream_response = wrapped_stream_response | ||
|
|
||
| return model | ||
|
|
||
| return wrapped_get_model | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to close the workflow span here?
my understanding is that a new agent takes over with a handoff, so terminating the agent in
finallywould make sense?let me know if I am missing anything.