in Runtime_env/app/utils/decorators.py [0:0]
def invoke(self, *args: Any, **kwargs: Any) -> AIMessage:
"""
Invoke the wrapped function and process its events.
Returns an AIMessage with content and relative tool calls.
"""
events = self.func(*args, **kwargs)
response_content = ""
tool_calls = []
for event in events:
if isinstance(event, OnChatModelStreamEvent):
if not isinstance(event.data.chunk.content, str):
raise ValueError("Chunk content must be a string")
response_content += event.data.chunk.content
elif isinstance(event, OnToolEndEvent):
tool_calls.append(event.data.model_dump())
return AIMessage(
content=response_content, additional_kwargs={"tool_calls_data": tool_calls}
)