def write_message_chunk()

in databao/executors/frontend/text_frontend.py [0:0]


    def write_message_chunk(self, chunk: BaseMessageChunk) -> None:
        if not isinstance(chunk, AIMessageChunk):
            return  # Handle ToolMessage results in add_state_chunk

        reasoning_text = get_reasoning_content(chunk)
        text = reasoning_text + chunk.text
        if self._escape_markdown:
            text = escape_markdown_text(text)
        self.write(text)

        if len(chunk.tool_call_chunks) > 0:
            # N.B. LangChain sometimes waits for the whole string to complete before yielding chunks
            # That's why long "sql" tool calls take some time to show up and then the whole sql is shown in a batch
            if not self._is_tool_calling:
                self.write("\n\n")
                for tool_call_chunk in chunk.tool_call_chunks:
                    self.write(f"[tool_call: '{tool_call_chunk['name']}']\n")
                self.write("```\n")  # Open code block
                self._is_tool_calling = True
            for tool_call_chunk in chunk.tool_call_chunks:
                if tool_call_chunk["args"] is not None:
                    self.write(tool_call_chunk["args"])
        elif self._is_tool_calling:
            self.write("\n```\n\n")  # Close code block
            self._is_tool_calling = False