in databao/executors/frontend/text_frontend.py [0:0]
def write_message_chunk(self, chunk: BaseMessageChunk) -> None:
if not isinstance(chunk, AIMessageChunk):
return # Handle ToolMessage results in add_state_chunk
reasoning_text = get_reasoning_content(chunk)
text = reasoning_text + chunk.text
if self._escape_markdown:
text = escape_markdown_text(text)
self.write(text)
if len(chunk.tool_call_chunks) > 0:
# N.B. LangChain sometimes waits for the whole string to complete before yielding chunks
# That's why long "sql" tool calls take some time to show up and then the whole sql is shown in a batch
if not self._is_tool_calling:
self.write("\n\n")
for tool_call_chunk in chunk.tool_call_chunks:
self.write(f"[tool_call: '{tool_call_chunk['name']}']\n")
self.write("```\n") # Open code block
self._is_tool_calling = True
for tool_call_chunk in chunk.tool_call_chunks:
if tool_call_chunk["args"] is not None:
self.write(tool_call_chunk["args"])
elif self._is_tool_calling:
self.write("\n```\n\n") # Close code block
self._is_tool_calling = False