HowTo/gRPC/Linux/OpenAI/LangChain/PyServer/venv/langchain/callbacks/manager.py (1,219 lines of code) (raw):
from __future__ import annotations
import asyncio
import functools
import logging
import os
import uuid
from contextlib import asynccontextmanager, contextmanager
from contextvars import ContextVar
from typing import (
TYPE_CHECKING,
Any,
AsyncGenerator,
Dict,
Generator,
List,
Optional,
Sequence,
Type,
TypeVar,
Union,
cast,
)
from uuid import UUID
from tenacity import RetryCallState
import langchain
from langchain.callbacks.base import (
BaseCallbackHandler,
BaseCallbackManager,
Callbacks,
ChainManagerMixin,
LLMManagerMixin,
RetrieverManagerMixin,
RunManagerMixin,
ToolManagerMixin,
)
from langchain.callbacks.openai_info import OpenAICallbackHandler
from langchain.callbacks.stdout import StdOutCallbackHandler
from langchain.callbacks.tracers import run_collector
from langchain.callbacks.tracers.langchain import LangChainTracer
from langchain.callbacks.tracers.langchain_v1 import LangChainTracerV1, TracerSessionV1
from langchain.callbacks.tracers.stdout import ConsoleCallbackHandler
from langchain.callbacks.tracers.wandb import WandbTracer
from langchain.schema import (
AgentAction,
AgentFinish,
Document,
LLMResult,
)
from langchain.schema.messages import BaseMessage, get_buffer_string
from langchain.schema.output import ChatGenerationChunk, GenerationChunk
if TYPE_CHECKING:
from langsmith import Client as LangSmithClient
logger = logging.getLogger(__name__)
openai_callback_var: ContextVar[Optional[OpenAICallbackHandler]] = ContextVar(
"openai_callback", default=None
)
tracing_callback_var: ContextVar[
Optional[LangChainTracerV1]
] = ContextVar( # noqa: E501
"tracing_callback", default=None
)
wandb_tracing_callback_var: ContextVar[
Optional[WandbTracer]
] = ContextVar( # noqa: E501
"tracing_wandb_callback", default=None
)
tracing_v2_callback_var: ContextVar[
Optional[LangChainTracer]
] = ContextVar( # noqa: E501
"tracing_callback_v2", default=None
)
run_collector_var: ContextVar[
Optional[run_collector.RunCollectorCallbackHandler]
] = ContextVar( # noqa: E501
"run_collector", default=None
)
def _get_debug() -> bool:
return langchain.debug
@contextmanager
def get_openai_callback() -> Generator[OpenAICallbackHandler, None, None]:
"""Get the OpenAI callback handler in a context manager.
which conveniently exposes token and cost information.
Returns:
OpenAICallbackHandler: The OpenAI callback handler.
Example:
>>> with get_openai_callback() as cb:
... # Use the OpenAI callback handler
"""
cb = OpenAICallbackHandler()
openai_callback_var.set(cb)
yield cb
openai_callback_var.set(None)
@contextmanager
def tracing_enabled(
session_name: str = "default",
) -> Generator[TracerSessionV1, None, None]:
"""Get the Deprecated LangChainTracer in a context manager.
Args:
session_name (str, optional): The name of the session.
Defaults to "default".
Returns:
TracerSessionV1: The LangChainTracer session.
Example:
>>> with tracing_enabled() as session:
... # Use the LangChainTracer session
"""
cb = LangChainTracerV1()
session = cast(TracerSessionV1, cb.load_session(session_name))
tracing_callback_var.set(cb)
yield session
tracing_callback_var.set(None)
@contextmanager
def wandb_tracing_enabled(
session_name: str = "default",
) -> Generator[None, None, None]:
"""Get the WandbTracer in a context manager.
Args:
session_name (str, optional): The name of the session.
Defaults to "default".
Returns:
None
Example:
>>> with wandb_tracing_enabled() as session:
... # Use the WandbTracer session
"""
cb = WandbTracer()
wandb_tracing_callback_var.set(cb)
yield None
wandb_tracing_callback_var.set(None)
@contextmanager
def tracing_v2_enabled(
project_name: Optional[str] = None,
*,
example_id: Optional[Union[str, UUID]] = None,
tags: Optional[List[str]] = None,
client: Optional[LangSmithClient] = None,
) -> Generator[None, None, None]:
"""Instruct LangChain to log all runs in context to LangSmith.
Args:
project_name (str, optional): The name of the project.
Defaults to "default".
example_id (str or UUID, optional): The ID of the example.
Defaults to None.
tags (List[str], optional): The tags to add to the run.
Defaults to None.
Returns:
None
Example:
>>> with tracing_v2_enabled():
... # LangChain code will automatically be traced
"""
if isinstance(example_id, str):
example_id = UUID(example_id)
cb = LangChainTracer(
example_id=example_id,
project_name=project_name,
tags=tags,
client=client,
)
tracing_v2_callback_var.set(cb)
yield
tracing_v2_callback_var.set(None)
@contextmanager
def collect_runs() -> Generator[run_collector.RunCollectorCallbackHandler, None, None]:
"""Collect all run traces in context.
Returns:
run_collector.RunCollectorCallbackHandler: The run collector callback handler.
Example:
>>> with collect_runs() as runs_cb:
chain.invoke("foo")
run_id = runs_cb.traced_runs[0].id
"""
cb = run_collector.RunCollectorCallbackHandler()
run_collector_var.set(cb)
yield cb
run_collector_var.set(None)
@contextmanager
def trace_as_chain_group(
group_name: str,
callback_manager: Optional[CallbackManager] = None,
*,
project_name: Optional[str] = None,
example_id: Optional[Union[str, UUID]] = None,
run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
) -> Generator[CallbackManager, None, None]:
"""Get a callback manager for a chain group in a context manager.
Useful for grouping different calls together as a single run even if
they aren't composed in a single chain.
Args:
group_name (str): The name of the chain group.
project_name (str, optional): The name of the project.
Defaults to None.
example_id (str or UUID, optional): The ID of the example.
Defaults to None.
run_id (UUID, optional): The ID of the run.
tags (List[str], optional): The inheritable tags to apply to all runs.
Defaults to None.
Returns:
CallbackManager: The callback manager for the chain group.
Example:
>>> with trace_as_chain_group("group_name") as manager:
... # Use the callback manager for the chain group
... llm.predict("Foo", callbacks=manager)
"""
cb = cast(
Callbacks,
[
LangChainTracer(
project_name=project_name,
example_id=example_id,
)
]
if callback_manager is None
else callback_manager,
)
cm = CallbackManager.configure(
inheritable_callbacks=cb,
inheritable_tags=tags,
)
run_manager = cm.on_chain_start({"name": group_name}, {}, run_id=run_id)
yield run_manager.get_child()
run_manager.on_chain_end({})
@asynccontextmanager
async def atrace_as_chain_group(
group_name: str,
callback_manager: Optional[AsyncCallbackManager] = None,
*,
project_name: Optional[str] = None,
example_id: Optional[Union[str, UUID]] = None,
run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
) -> AsyncGenerator[AsyncCallbackManager, None]:
"""Get an async callback manager for a chain group in a context manager.
Useful for grouping different async calls together as a single run even if
they aren't composed in a single chain.
Args:
group_name (str): The name of the chain group.
project_name (str, optional): The name of the project.
Defaults to None.
example_id (str or UUID, optional): The ID of the example.
Defaults to None.
run_id (UUID, optional): The ID of the run.
tags (List[str], optional): The inheritable tags to apply to all runs.
Defaults to None.
Returns:
AsyncCallbackManager: The async callback manager for the chain group.
Example:
>>> async with atrace_as_chain_group("group_name") as manager:
... # Use the async callback manager for the chain group
... await llm.apredict("Foo", callbacks=manager)
"""
cb = cast(
Callbacks,
[
LangChainTracer(
project_name=project_name,
example_id=example_id,
)
]
if callback_manager is None
else callback_manager,
)
cm = AsyncCallbackManager.configure(inheritable_callbacks=cb, inheritable_tags=tags)
run_manager = await cm.on_chain_start({"name": group_name}, {}, run_id=run_id)
try:
yield run_manager.get_child()
finally:
await run_manager.on_chain_end({})
def _handle_event(
handlers: List[BaseCallbackHandler],
event_name: str,
ignore_condition_name: Optional[str],
*args: Any,
**kwargs: Any,
) -> None:
"""Generic event handler for CallbackManager."""
message_strings: Optional[List[str]] = None
for handler in handlers:
try:
if ignore_condition_name is None or not getattr(
handler, ignore_condition_name
):
getattr(handler, event_name)(*args, **kwargs)
except NotImplementedError as e:
if event_name == "on_chat_model_start":
if message_strings is None:
message_strings = [get_buffer_string(m) for m in args[1]]
_handle_event(
[handler],
"on_llm_start",
"ignore_llm",
args[0],
message_strings,
*args[2:],
**kwargs,
)
else:
logger.warning(
f"NotImplementedError in {handler.__class__.__name__}.{event_name}"
f" callback: {e}"
)
except Exception as e:
logger.warning(
f"Error in {handler.__class__.__name__}.{event_name} callback: {e}"
)
if handler.raise_error:
raise e
async def _ahandle_event_for_handler(
handler: BaseCallbackHandler,
event_name: str,
ignore_condition_name: Optional[str],
*args: Any,
**kwargs: Any,
) -> None:
try:
if ignore_condition_name is None or not getattr(handler, ignore_condition_name):
event = getattr(handler, event_name)
if asyncio.iscoroutinefunction(event):
await event(*args, **kwargs)
else:
if handler.run_inline:
event(*args, **kwargs)
else:
await asyncio.get_event_loop().run_in_executor(
None, functools.partial(event, *args, **kwargs)
)
except NotImplementedError as e:
if event_name == "on_chat_model_start":
message_strings = [get_buffer_string(m) for m in args[1]]
await _ahandle_event_for_handler(
handler,
"on_llm_start",
"ignore_llm",
args[0],
message_strings,
*args[2:],
**kwargs,
)
else:
logger.warning(
f"NotImplementedError in {handler.__class__.__name__}.{event_name}"
f" callback: {e}"
)
except Exception as e:
logger.warning(
f"Error in {handler.__class__.__name__}.{event_name} callback: {e}"
)
if handler.raise_error:
raise e
async def _ahandle_event(
handlers: List[BaseCallbackHandler],
event_name: str,
ignore_condition_name: Optional[str],
*args: Any,
**kwargs: Any,
) -> None:
"""Generic event handler for AsyncCallbackManager."""
for handler in [h for h in handlers if h.run_inline]:
await _ahandle_event_for_handler(
handler, event_name, ignore_condition_name, *args, **kwargs
)
await asyncio.gather(
*(
_ahandle_event_for_handler(
handler, event_name, ignore_condition_name, *args, **kwargs
)
for handler in handlers
if not handler.run_inline
)
)
BRM = TypeVar("BRM", bound="BaseRunManager")
class BaseRunManager(RunManagerMixin):
"""Base class for run manager (a bound callback manager)."""
def __init__(
self,
*,
run_id: UUID,
handlers: List[BaseCallbackHandler],
inheritable_handlers: List[BaseCallbackHandler],
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
inheritable_tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
inheritable_metadata: Optional[Dict[str, Any]] = None,
) -> None:
"""Initialize the run manager.
Args:
run_id (UUID): The ID of the run.
handlers (List[BaseCallbackHandler]): The list of handlers.
inheritable_handlers (List[BaseCallbackHandler]):
The list of inheritable handlers.
parent_run_id (UUID, optional): The ID of the parent run.
Defaults to None.
tags (Optional[List[str]]): The list of tags.
inheritable_tags (Optional[List[str]]): The list of inheritable tags.
metadata (Optional[Dict[str, Any]]): The metadata.
inheritable_metadata (Optional[Dict[str, Any]]): The inheritable metadata.
"""
self.run_id = run_id
self.handlers = handlers
self.inheritable_handlers = inheritable_handlers
self.parent_run_id = parent_run_id
self.tags = tags or []
self.inheritable_tags = inheritable_tags or []
self.metadata = metadata or {}
self.inheritable_metadata = inheritable_metadata or {}
@classmethod
def get_noop_manager(cls: Type[BRM]) -> BRM:
"""Return a manager that doesn't perform any operations.
Returns:
BaseRunManager: The noop manager.
"""
return cls(
run_id=uuid.uuid4(),
handlers=[],
inheritable_handlers=[],
tags=[],
inheritable_tags=[],
metadata={},
inheritable_metadata={},
)
class RunManager(BaseRunManager):
"""Sync Run Manager."""
def on_text(
self,
text: str,
**kwargs: Any,
) -> Any:
"""Run when text is received.
Args:
text (str): The received text.
Returns:
Any: The result of the callback.
"""
_handle_event(
self.handlers,
"on_text",
None,
text,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
def on_retry(
self,
retry_state: RetryCallState,
**kwargs: Any,
) -> None:
_handle_event(
self.handlers,
"on_retry",
"ignore_retry",
retry_state,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
class ParentRunManager(RunManager):
"""Sync Parent Run Manager."""
def get_child(self, tag: Optional[str] = None) -> CallbackManager:
"""Get a child callback manager.
Args:
tag (str, optional): The tag for the child callback manager.
Defaults to None.
Returns:
CallbackManager: The child callback manager.
"""
manager = CallbackManager(handlers=[], parent_run_id=self.run_id)
manager.set_handlers(self.inheritable_handlers)
manager.add_tags(self.inheritable_tags)
manager.add_metadata(self.inheritable_metadata)
if tag is not None:
manager.add_tags([tag], False)
return manager
class AsyncRunManager(BaseRunManager):
"""Async Run Manager."""
async def on_text(
self,
text: str,
**kwargs: Any,
) -> Any:
"""Run when text is received.
Args:
text (str): The received text.
Returns:
Any: The result of the callback.
"""
await _ahandle_event(
self.handlers,
"on_text",
None,
text,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
async def on_retry(
self,
retry_state: RetryCallState,
**kwargs: Any,
) -> None:
await _ahandle_event(
self.handlers,
"on_retry",
"ignore_retry",
retry_state,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
class AsyncParentRunManager(AsyncRunManager):
"""Async Parent Run Manager."""
def get_child(self, tag: Optional[str] = None) -> AsyncCallbackManager:
"""Get a child callback manager.
Args:
tag (str, optional): The tag for the child callback manager.
Defaults to None.
Returns:
AsyncCallbackManager: The child callback manager.
"""
manager = AsyncCallbackManager(handlers=[], parent_run_id=self.run_id)
manager.set_handlers(self.inheritable_handlers)
manager.add_tags(self.inheritable_tags)
manager.add_metadata(self.inheritable_metadata)
if tag is not None:
manager.add_tags([tag], False)
return manager
class CallbackManagerForLLMRun(RunManager, LLMManagerMixin):
"""Callback manager for LLM run."""
def on_llm_new_token(
self,
token: str,
*,
chunk: Optional[Union[GenerationChunk, ChatGenerationChunk]] = None,
**kwargs: Any,
) -> None:
"""Run when LLM generates a new token.
Args:
token (str): The new token.
"""
_handle_event(
self.handlers,
"on_llm_new_token",
"ignore_llm",
token=token,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
chunk=chunk,
**kwargs,
)
def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
"""Run when LLM ends running.
Args:
response (LLMResult): The LLM result.
"""
_handle_event(
self.handlers,
"on_llm_end",
"ignore_llm",
response,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
def on_llm_error(
self,
error: Union[Exception, KeyboardInterrupt],
**kwargs: Any,
) -> None:
"""Run when LLM errors.
Args:
error (Exception or KeyboardInterrupt): The error.
"""
_handle_event(
self.handlers,
"on_llm_error",
"ignore_llm",
error,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
class AsyncCallbackManagerForLLMRun(AsyncRunManager, LLMManagerMixin):
"""Async callback manager for LLM run."""
async def on_llm_new_token(
self,
token: str,
*,
chunk: Optional[Union[GenerationChunk, ChatGenerationChunk]] = None,
**kwargs: Any,
) -> None:
"""Run when LLM generates a new token.
Args:
token (str): The new token.
"""
await _ahandle_event(
self.handlers,
"on_llm_new_token",
"ignore_llm",
token,
chunk=chunk,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
"""Run when LLM ends running.
Args:
response (LLMResult): The LLM result.
"""
await _ahandle_event(
self.handlers,
"on_llm_end",
"ignore_llm",
response,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
async def on_llm_error(
self,
error: Union[Exception, KeyboardInterrupt],
**kwargs: Any,
) -> None:
"""Run when LLM errors.
Args:
error (Exception or KeyboardInterrupt): The error.
"""
await _ahandle_event(
self.handlers,
"on_llm_error",
"ignore_llm",
error,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
class CallbackManagerForChainRun(ParentRunManager, ChainManagerMixin):
"""Callback manager for chain run."""
def on_chain_end(self, outputs: Union[Dict[str, Any], Any], **kwargs: Any) -> None:
"""Run when chain ends running.
Args:
outputs (Union[Dict[str, Any], Any]): The outputs of the chain.
"""
_handle_event(
self.handlers,
"on_chain_end",
"ignore_chain",
outputs,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
def on_chain_error(
self,
error: BaseException,
**kwargs: Any,
) -> None:
"""Run when chain errors.
Args:
error (Exception or KeyboardInterrupt): The error.
"""
_handle_event(
self.handlers,
"on_chain_error",
"ignore_chain",
error,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
def on_agent_action(self, action: AgentAction, **kwargs: Any) -> Any:
"""Run when agent action is received.
Args:
action (AgentAction): The agent action.
Returns:
Any: The result of the callback.
"""
_handle_event(
self.handlers,
"on_agent_action",
"ignore_agent",
action,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> Any:
"""Run when agent finish is received.
Args:
finish (AgentFinish): The agent finish.
Returns:
Any: The result of the callback.
"""
_handle_event(
self.handlers,
"on_agent_finish",
"ignore_agent",
finish,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
class AsyncCallbackManagerForChainRun(AsyncParentRunManager, ChainManagerMixin):
"""Async callback manager for chain run."""
async def on_chain_end(
self, outputs: Union[Dict[str, Any], Any], **kwargs: Any
) -> None:
"""Run when chain ends running.
Args:
outputs (Union[Dict[str, Any], Any]): The outputs of the chain.
"""
await _ahandle_event(
self.handlers,
"on_chain_end",
"ignore_chain",
outputs,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
async def on_chain_error(
self,
error: BaseException,
**kwargs: Any,
) -> None:
"""Run when chain errors.
Args:
error (Exception or KeyboardInterrupt): The error.
"""
await _ahandle_event(
self.handlers,
"on_chain_error",
"ignore_chain",
error,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
async def on_agent_action(self, action: AgentAction, **kwargs: Any) -> Any:
"""Run when agent action is received.
Args:
action (AgentAction): The agent action.
Returns:
Any: The result of the callback.
"""
await _ahandle_event(
self.handlers,
"on_agent_action",
"ignore_agent",
action,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
async def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> Any:
"""Run when agent finish is received.
Args:
finish (AgentFinish): The agent finish.
Returns:
Any: The result of the callback.
"""
await _ahandle_event(
self.handlers,
"on_agent_finish",
"ignore_agent",
finish,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
class CallbackManagerForToolRun(ParentRunManager, ToolManagerMixin):
"""Callback manager for tool run."""
def on_tool_end(
self,
output: str,
**kwargs: Any,
) -> None:
"""Run when tool ends running.
Args:
output (str): The output of the tool.
"""
_handle_event(
self.handlers,
"on_tool_end",
"ignore_agent",
output,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
def on_tool_error(
self,
error: Union[Exception, KeyboardInterrupt],
**kwargs: Any,
) -> None:
"""Run when tool errors.
Args:
error (Exception or KeyboardInterrupt): The error.
"""
_handle_event(
self.handlers,
"on_tool_error",
"ignore_agent",
error,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
class AsyncCallbackManagerForToolRun(AsyncParentRunManager, ToolManagerMixin):
"""Async callback manager for tool run."""
async def on_tool_end(self, output: str, **kwargs: Any) -> None:
"""Run when tool ends running.
Args:
output (str): The output of the tool.
"""
await _ahandle_event(
self.handlers,
"on_tool_end",
"ignore_agent",
output,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
async def on_tool_error(
self,
error: Union[Exception, KeyboardInterrupt],
**kwargs: Any,
) -> None:
"""Run when tool errors.
Args:
error (Exception or KeyboardInterrupt): The error.
"""
await _ahandle_event(
self.handlers,
"on_tool_error",
"ignore_agent",
error,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
class CallbackManagerForRetrieverRun(ParentRunManager, RetrieverManagerMixin):
"""Callback manager for retriever run."""
def on_retriever_end(
self,
documents: Sequence[Document],
**kwargs: Any,
) -> None:
"""Run when retriever ends running."""
_handle_event(
self.handlers,
"on_retriever_end",
"ignore_retriever",
documents,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
def on_retriever_error(
self,
error: Union[Exception, KeyboardInterrupt],
**kwargs: Any,
) -> None:
"""Run when retriever errors."""
_handle_event(
self.handlers,
"on_retriever_error",
"ignore_retriever",
error,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
class AsyncCallbackManagerForRetrieverRun(
AsyncParentRunManager,
RetrieverManagerMixin,
):
"""Async callback manager for retriever run."""
async def on_retriever_end(
self, documents: Sequence[Document], **kwargs: Any
) -> None:
"""Run when retriever ends running."""
await _ahandle_event(
self.handlers,
"on_retriever_end",
"ignore_retriever",
documents,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
async def on_retriever_error(
self,
error: Union[Exception, KeyboardInterrupt],
**kwargs: Any,
) -> None:
"""Run when retriever errors."""
await _ahandle_event(
self.handlers,
"on_retriever_error",
"ignore_retriever",
error,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
class CallbackManager(BaseCallbackManager):
"""Callback manager that handles callbacks from langchain."""
def on_llm_start(
self,
serialized: Dict[str, Any],
prompts: List[str],
**kwargs: Any,
) -> List[CallbackManagerForLLMRun]:
"""Run when LLM starts running.
Args:
serialized (Dict[str, Any]): The serialized LLM.
prompts (List[str]): The list of prompts.
run_id (UUID, optional): The ID of the run. Defaults to None.
Returns:
List[CallbackManagerForLLMRun]: A callback manager for each
prompt as an LLM run.
"""
managers = []
for prompt in prompts:
run_id_ = uuid.uuid4()
_handle_event(
self.handlers,
"on_llm_start",
"ignore_llm",
serialized,
[prompt],
run_id=run_id_,
parent_run_id=self.parent_run_id,
tags=self.tags,
metadata=self.metadata,
**kwargs,
)
managers.append(
CallbackManagerForLLMRun(
run_id=run_id_,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
)
return managers
def on_chat_model_start(
self,
serialized: Dict[str, Any],
messages: List[List[BaseMessage]],
**kwargs: Any,
) -> List[CallbackManagerForLLMRun]:
"""Run when LLM starts running.
Args:
serialized (Dict[str, Any]): The serialized LLM.
messages (List[List[BaseMessage]]): The list of messages.
run_id (UUID, optional): The ID of the run. Defaults to None.
Returns:
List[CallbackManagerForLLMRun]: A callback manager for each
list of messages as an LLM run.
"""
managers = []
for message_list in messages:
run_id_ = uuid.uuid4()
_handle_event(
self.handlers,
"on_chat_model_start",
"ignore_chat_model",
serialized,
[message_list],
run_id=run_id_,
parent_run_id=self.parent_run_id,
tags=self.tags,
metadata=self.metadata,
**kwargs,
)
managers.append(
CallbackManagerForLLMRun(
run_id=run_id_,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
)
return managers
def on_chain_start(
self,
serialized: Dict[str, Any],
inputs: Union[Dict[str, Any], Any],
run_id: Optional[UUID] = None,
**kwargs: Any,
) -> CallbackManagerForChainRun:
"""Run when chain starts running.
Args:
serialized (Dict[str, Any]): The serialized chain.
inputs (Union[Dict[str, Any], Any]): The inputs to the chain.
run_id (UUID, optional): The ID of the run. Defaults to None.
Returns:
CallbackManagerForChainRun: The callback manager for the chain run.
"""
if run_id is None:
run_id = uuid.uuid4()
_handle_event(
self.handlers,
"on_chain_start",
"ignore_chain",
serialized,
inputs,
run_id=run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
metadata=self.metadata,
**kwargs,
)
return CallbackManagerForChainRun(
run_id=run_id,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
def on_tool_start(
self,
serialized: Dict[str, Any],
input_str: str,
run_id: Optional[UUID] = None,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> CallbackManagerForToolRun:
"""Run when tool starts running.
Args:
serialized (Dict[str, Any]): The serialized tool.
input_str (str): The input to the tool.
run_id (UUID, optional): The ID of the run. Defaults to None.
parent_run_id (UUID, optional): The ID of the parent run. Defaults to None.
Returns:
CallbackManagerForToolRun: The callback manager for the tool run.
"""
if run_id is None:
run_id = uuid.uuid4()
_handle_event(
self.handlers,
"on_tool_start",
"ignore_agent",
serialized,
input_str,
run_id=run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
metadata=self.metadata,
**kwargs,
)
return CallbackManagerForToolRun(
run_id=run_id,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
def on_retriever_start(
self,
serialized: Dict[str, Any],
query: str,
run_id: Optional[UUID] = None,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> CallbackManagerForRetrieverRun:
"""Run when retriever starts running."""
if run_id is None:
run_id = uuid.uuid4()
_handle_event(
self.handlers,
"on_retriever_start",
"ignore_retriever",
serialized,
query,
run_id=run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
metadata=self.metadata,
**kwargs,
)
return CallbackManagerForRetrieverRun(
run_id=run_id,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
@classmethod
def configure(
cls,
inheritable_callbacks: Callbacks = None,
local_callbacks: Callbacks = None,
verbose: bool = False,
inheritable_tags: Optional[List[str]] = None,
local_tags: Optional[List[str]] = None,
inheritable_metadata: Optional[Dict[str, Any]] = None,
local_metadata: Optional[Dict[str, Any]] = None,
) -> CallbackManager:
"""Configure the callback manager.
Args:
inheritable_callbacks (Optional[Callbacks], optional): The inheritable
callbacks. Defaults to None.
local_callbacks (Optional[Callbacks], optional): The local callbacks.
Defaults to None.
verbose (bool, optional): Whether to enable verbose mode. Defaults to False.
inheritable_tags (Optional[List[str]], optional): The inheritable tags.
Defaults to None.
local_tags (Optional[List[str]], optional): The local tags.
Defaults to None.
inheritable_metadata (Optional[Dict[str, Any]], optional): The inheritable
metadata. Defaults to None.
local_metadata (Optional[Dict[str, Any]], optional): The local metadata.
Defaults to None.
Returns:
CallbackManager: The configured callback manager.
"""
return _configure(
cls,
inheritable_callbacks,
local_callbacks,
verbose,
inheritable_tags,
local_tags,
inheritable_metadata,
local_metadata,
)
class AsyncCallbackManager(BaseCallbackManager):
"""Async callback manager that handles callbacks from LangChain."""
@property
def is_async(self) -> bool:
"""Return whether the handler is async."""
return True
async def on_llm_start(
self,
serialized: Dict[str, Any],
prompts: List[str],
**kwargs: Any,
) -> List[AsyncCallbackManagerForLLMRun]:
"""Run when LLM starts running.
Args:
serialized (Dict[str, Any]): The serialized LLM.
prompts (List[str]): The list of prompts.
run_id (UUID, optional): The ID of the run. Defaults to None.
Returns:
List[AsyncCallbackManagerForLLMRun]: The list of async
callback managers, one for each LLM Run corresponding
to each prompt.
"""
tasks = []
managers = []
for prompt in prompts:
run_id_ = uuid.uuid4()
tasks.append(
_ahandle_event(
self.handlers,
"on_llm_start",
"ignore_llm",
serialized,
[prompt],
run_id=run_id_,
parent_run_id=self.parent_run_id,
tags=self.tags,
metadata=self.metadata,
**kwargs,
)
)
managers.append(
AsyncCallbackManagerForLLMRun(
run_id=run_id_,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
)
await asyncio.gather(*tasks)
return managers
async def on_chat_model_start(
self,
serialized: Dict[str, Any],
messages: List[List[BaseMessage]],
**kwargs: Any,
) -> List[AsyncCallbackManagerForLLMRun]:
"""Run when LLM starts running.
Args:
serialized (Dict[str, Any]): The serialized LLM.
messages (List[List[BaseMessage]]): The list of messages.
run_id (UUID, optional): The ID of the run. Defaults to None.
Returns:
List[AsyncCallbackManagerForLLMRun]: The list of
async callback managers, one for each LLM Run
corresponding to each inner message list.
"""
tasks = []
managers = []
for message_list in messages:
run_id_ = uuid.uuid4()
tasks.append(
_ahandle_event(
self.handlers,
"on_chat_model_start",
"ignore_chat_model",
serialized,
[message_list],
run_id=run_id_,
parent_run_id=self.parent_run_id,
tags=self.tags,
metadata=self.metadata,
**kwargs,
)
)
managers.append(
AsyncCallbackManagerForLLMRun(
run_id=run_id_,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
)
await asyncio.gather(*tasks)
return managers
async def on_chain_start(
self,
serialized: Dict[str, Any],
inputs: Union[Dict[str, Any], Any],
run_id: Optional[UUID] = None,
**kwargs: Any,
) -> AsyncCallbackManagerForChainRun:
"""Run when chain starts running.
Args:
serialized (Dict[str, Any]): The serialized chain.
inputs (Union[Dict[str, Any], Any]): The inputs to the chain.
run_id (UUID, optional): The ID of the run. Defaults to None.
Returns:
AsyncCallbackManagerForChainRun: The async callback manager
for the chain run.
"""
if run_id is None:
run_id = uuid.uuid4()
await _ahandle_event(
self.handlers,
"on_chain_start",
"ignore_chain",
serialized,
inputs,
run_id=run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
metadata=self.metadata,
**kwargs,
)
return AsyncCallbackManagerForChainRun(
run_id=run_id,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
async def on_tool_start(
self,
serialized: Dict[str, Any],
input_str: str,
run_id: Optional[UUID] = None,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> AsyncCallbackManagerForToolRun:
"""Run when tool starts running.
Args:
serialized (Dict[str, Any]): The serialized tool.
input_str (str): The input to the tool.
run_id (UUID, optional): The ID of the run. Defaults to None.
parent_run_id (UUID, optional): The ID of the parent run.
Defaults to None.
Returns:
AsyncCallbackManagerForToolRun: The async callback manager
for the tool run.
"""
if run_id is None:
run_id = uuid.uuid4()
await _ahandle_event(
self.handlers,
"on_tool_start",
"ignore_agent",
serialized,
input_str,
run_id=run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
metadata=self.metadata,
**kwargs,
)
return AsyncCallbackManagerForToolRun(
run_id=run_id,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
async def on_retriever_start(
self,
serialized: Dict[str, Any],
query: str,
run_id: Optional[UUID] = None,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> AsyncCallbackManagerForRetrieverRun:
"""Run when retriever starts running."""
if run_id is None:
run_id = uuid.uuid4()
await _ahandle_event(
self.handlers,
"on_retriever_start",
"ignore_retriever",
serialized,
query,
run_id=run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
metadata=self.metadata,
**kwargs,
)
return AsyncCallbackManagerForRetrieverRun(
run_id=run_id,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
@classmethod
def configure(
cls,
inheritable_callbacks: Callbacks = None,
local_callbacks: Callbacks = None,
verbose: bool = False,
inheritable_tags: Optional[List[str]] = None,
local_tags: Optional[List[str]] = None,
inheritable_metadata: Optional[Dict[str, Any]] = None,
local_metadata: Optional[Dict[str, Any]] = None,
) -> AsyncCallbackManager:
"""Configure the async callback manager.
Args:
inheritable_callbacks (Optional[Callbacks], optional): The inheritable
callbacks. Defaults to None.
local_callbacks (Optional[Callbacks], optional): The local callbacks.
Defaults to None.
verbose (bool, optional): Whether to enable verbose mode. Defaults to False.
inheritable_tags (Optional[List[str]], optional): The inheritable tags.
Defaults to None.
local_tags (Optional[List[str]], optional): The local tags.
Defaults to None.
inheritable_metadata (Optional[Dict[str, Any]], optional): The inheritable
metadata. Defaults to None.
local_metadata (Optional[Dict[str, Any]], optional): The local metadata.
Defaults to None.
Returns:
AsyncCallbackManager: The configured async callback manager.
"""
return _configure(
cls,
inheritable_callbacks,
local_callbacks,
verbose,
inheritable_tags,
local_tags,
inheritable_metadata,
local_metadata,
)
T = TypeVar("T", CallbackManager, AsyncCallbackManager)
def env_var_is_set(env_var: str) -> bool:
"""Check if an environment variable is set.
Args:
env_var (str): The name of the environment variable.
Returns:
bool: True if the environment variable is set, False otherwise.
"""
return env_var in os.environ and os.environ[env_var] not in (
"",
"0",
"false",
"False",
)
def _configure(
callback_manager_cls: Type[T],
inheritable_callbacks: Callbacks = None,
local_callbacks: Callbacks = None,
verbose: bool = False,
inheritable_tags: Optional[List[str]] = None,
local_tags: Optional[List[str]] = None,
inheritable_metadata: Optional[Dict[str, Any]] = None,
local_metadata: Optional[Dict[str, Any]] = None,
) -> T:
"""Configure the callback manager.
Args:
callback_manager_cls (Type[T]): The callback manager class.
inheritable_callbacks (Optional[Callbacks], optional): The inheritable
callbacks. Defaults to None.
local_callbacks (Optional[Callbacks], optional): The local callbacks.
Defaults to None.
verbose (bool, optional): Whether to enable verbose mode. Defaults to False.
inheritable_tags (Optional[List[str]], optional): The inheritable tags.
Defaults to None.
local_tags (Optional[List[str]], optional): The local tags. Defaults to None.
inheritable_metadata (Optional[Dict[str, Any]], optional): The inheritable
metadata. Defaults to None.
local_metadata (Optional[Dict[str, Any]], optional): The local metadata.
Defaults to None.
Returns:
T: The configured callback manager.
"""
callback_manager = callback_manager_cls(handlers=[])
if inheritable_callbacks or local_callbacks:
if isinstance(inheritable_callbacks, list) or inheritable_callbacks is None:
inheritable_callbacks_ = inheritable_callbacks or []
callback_manager = callback_manager_cls(
handlers=inheritable_callbacks_.copy(),
inheritable_handlers=inheritable_callbacks_.copy(),
)
else:
callback_manager = callback_manager_cls(
handlers=inheritable_callbacks.handlers,
inheritable_handlers=inheritable_callbacks.inheritable_handlers,
parent_run_id=inheritable_callbacks.parent_run_id,
tags=inheritable_callbacks.tags,
inheritable_tags=inheritable_callbacks.inheritable_tags,
metadata=inheritable_callbacks.metadata,
inheritable_metadata=inheritable_callbacks.inheritable_metadata,
)
local_handlers_ = (
local_callbacks
if isinstance(local_callbacks, list)
else (local_callbacks.handlers if local_callbacks else [])
)
for handler in local_handlers_:
callback_manager.add_handler(handler, False)
if inheritable_tags or local_tags:
callback_manager.add_tags(inheritable_tags or [])
callback_manager.add_tags(local_tags or [], False)
if inheritable_metadata or local_metadata:
callback_manager.add_metadata(inheritable_metadata or {})
callback_manager.add_metadata(local_metadata or {}, False)
tracer = tracing_callback_var.get()
wandb_tracer = wandb_tracing_callback_var.get()
open_ai = openai_callback_var.get()
tracing_enabled_ = (
env_var_is_set("LANGCHAIN_TRACING")
or tracer is not None
or env_var_is_set("LANGCHAIN_HANDLER")
)
wandb_tracing_enabled_ = (
env_var_is_set("LANGCHAIN_WANDB_TRACING") or wandb_tracer is not None
)
tracer_v2 = tracing_v2_callback_var.get()
tracing_v2_enabled_ = (
env_var_is_set("LANGCHAIN_TRACING_V2") or tracer_v2 is not None
)
tracer_project = os.environ.get(
"LANGCHAIN_PROJECT", os.environ.get("LANGCHAIN_SESSION", "default")
)
run_collector_ = run_collector_var.get()
debug = _get_debug()
if (
verbose
or debug
or tracing_enabled_
or tracing_v2_enabled_
or wandb_tracing_enabled_
or open_ai is not None
):
if verbose and not any(
isinstance(handler, StdOutCallbackHandler)
for handler in callback_manager.handlers
):
if debug:
pass
else:
callback_manager.add_handler(StdOutCallbackHandler(), False)
if debug and not any(
isinstance(handler, ConsoleCallbackHandler)
for handler in callback_manager.handlers
):
callback_manager.add_handler(ConsoleCallbackHandler(), True)
if tracing_enabled_ and not any(
isinstance(handler, LangChainTracerV1)
for handler in callback_manager.handlers
):
if tracer:
callback_manager.add_handler(tracer, True)
else:
handler = LangChainTracerV1()
handler.load_session(tracer_project)
callback_manager.add_handler(handler, True)
if wandb_tracing_enabled_ and not any(
isinstance(handler, WandbTracer) for handler in callback_manager.handlers
):
if wandb_tracer:
callback_manager.add_handler(wandb_tracer, True)
else:
handler = WandbTracer()
callback_manager.add_handler(handler, True)
if tracing_v2_enabled_ and not any(
isinstance(handler, LangChainTracer)
for handler in callback_manager.handlers
):
if tracer_v2:
callback_manager.add_handler(tracer_v2, True)
else:
try:
handler = LangChainTracer(project_name=tracer_project)
callback_manager.add_handler(handler, True)
except Exception as e:
logger.warning(
"Unable to load requested LangChainTracer."
" To disable this warning,"
" unset the LANGCHAIN_TRACING_V2 environment variables.",
e,
)
if open_ai is not None and not any(
isinstance(handler, OpenAICallbackHandler)
for handler in callback_manager.handlers
):
callback_manager.add_handler(open_ai, True)
if run_collector_ is not None:
callback_manager.add_handler(run_collector_, False)
return callback_manager