in databao/executors/lighthouse/history_cleaning.py [0:0]
def clean_tool_history(messages: list[BaseMessage], token_limit: int) -> list[BaseMessage]:
"""
If message history exceeds token limit, truncates it.
It removes all intermediate messages and changes a final AI message.
The final message contains SQL, dataframe and text.
Specific for AgentState and ExecuteSubmit graph.
Returns: messages ready to be sent to LLM.
"""
if count_tokens_approximately(messages) < token_limit:
return messages.copy()
assert isinstance(messages[-1], HumanMessage)
dfs: dict[str, dict[str, str]] = {}
buffer = []
result: list[BaseMessage] = []
for i in range(len(messages)):
curr_message = messages[i]
buffer.append(curr_message)
if isinstance(curr_message, AIMessage):
# Fill `dfs` dict
if curr_message.tool_calls:
for tool_call in curr_message.tool_calls:
if tool_call["name"] == "run_sql_query":
call_id = str(tool_call["id"])
sql = tool_call["args"]["sql"]
dfs[call_id] = {"sql": sql}
else:
if len(buffer) > 3:
# Long thread with no submission at the end.
result.append(_truncate_no_df_block(buffer))
buffer = []
elif isinstance(curr_message, ToolMessage):
call_id = curr_message.tool_call_id
if call_id in dfs and curr_message.artifact is not None and "csv" in curr_message.artifact:
# Enrich `dfs` dict with calculation results
dfs[call_id]["df"] = curr_message.artifact.get("csv")
dfs[call_id]["query_id"] = curr_message.artifact.get("query_id")
elif messages[i - 1].tool_calls[0]["name"] == "submit_result": # type: ignore
result.append(_truncate_block(dfs, buffer))
buffer = []
else:
# For system and human messages
result.extend(buffer)
buffer = []
return result