google/generativeai/generative_models.py (574 lines of code) (raw):
"""Classes for working with the Gemini models."""
from __future__ import annotations
from collections.abc import Iterable
import textwrap
from typing import Any, Union, overload
import reprlib
# pylint: disable=bad-continuation, line-too-long
import google.api_core.exceptions
from google.generativeai import protos
from google.generativeai import client
from google.generativeai import caching
from google.generativeai.types import content_types
from google.generativeai.types import generation_types
from google.generativeai.types import helper_types
from google.generativeai.types import safety_types
_USER_ROLE = "user"
_MODEL_ROLE = "model"
class GenerativeModel:
"""
The `genai.GenerativeModel` class wraps default parameters for calls to
`GenerativeModel.generate_content`, `GenerativeModel.count_tokens`, and
`GenerativeModel.start_chat`.
This family of functionality is designed to support multi-turn conversations, and multimodal
requests. What media-types are supported for input and output is model-dependant.
>>> import google.generativeai as genai
>>> import PIL.Image
>>> genai.configure(api_key='YOUR_API_KEY')
>>> model = genai.GenerativeModel('models/gemini-1.5-flash')
>>> result = model.generate_content('Tell me a story about a magic backpack')
>>> result.text
"In the quaint little town of Lakeside, there lived a young girl named Lily..."
Multimodal input:
>>> model = genai.GenerativeModel('models/gemini-1.5-flash')
>>> result = model.generate_content([
... "Give me a recipe for these:", PIL.Image.open('scones.jpeg')])
>>> result.text
"**Blueberry Scones** ..."
Multi-turn conversation:
>>> chat = model.start_chat()
>>> response = chat.send_message("Hi, I have some questions for you.")
>>> response.text
"Sure, I'll do my best to answer your questions..."
To list the compatible model names use:
>>> for m in genai.list_models():
... if 'generateContent' in m.supported_generation_methods:
... print(m.name)
Arguments:
model_name: The name of the model to query. To list compatible models use
safety_settings: Sets the default safety filters. This controls which content is blocked
by the api before being returned.
generation_config: A `genai.GenerationConfig` setting the default generation parameters to
use.
"""
def __init__(
self,
model_name: str = "gemini-1.5-flash-002",
safety_settings: safety_types.SafetySettingOptions | None = None,
generation_config: generation_types.GenerationConfigType | None = None,
tools: content_types.FunctionLibraryType | None = None,
tool_config: content_types.ToolConfigType | None = None,
system_instruction: content_types.ContentType | None = None,
):
if "/" not in model_name:
model_name = "models/" + model_name
self._model_name = model_name
self._safety_settings = safety_types.to_easy_safety_dict(safety_settings)
self._generation_config = generation_types.to_generation_config_dict(generation_config)
self._tools = content_types.to_function_library(tools)
if tool_config is None:
self._tool_config = None
else:
self._tool_config = content_types.to_tool_config(tool_config)
if system_instruction is None:
self._system_instruction = None
else:
self._system_instruction = content_types.to_content(system_instruction)
self._client = None
self._async_client = None
@property
def cached_content(self) -> str:
return getattr(self, "_cached_content", None)
@property
def model_name(self):
return self._model_name
def __str__(self):
def maybe_text(content):
if content and len(content.parts) and (t := content.parts[0].text):
return repr(t)
return content
return textwrap.dedent(
f"""\
genai.GenerativeModel(
model_name='{self.model_name}',
generation_config={self._generation_config},
safety_settings={self._safety_settings},
tools={self._tools},
system_instruction={maybe_text(self._system_instruction)},
cached_content={self.cached_content}
)"""
)
__repr__ = __str__
def _prepare_request(
self,
*,
contents: content_types.ContentsType,
generation_config: generation_types.GenerationConfigType | None = None,
safety_settings: safety_types.SafetySettingOptions | None = None,
tools: content_types.FunctionLibraryType | None,
tool_config: content_types.ToolConfigType | None,
) -> protos.GenerateContentRequest:
"""Creates a `protos.GenerateContentRequest` from raw inputs."""
if hasattr(self, "_cached_content") and any([self._system_instruction, tools, tool_config]):
raise ValueError(
"`tools`, `tool_config`, `system_instruction` cannot be set on a model instantiated with `cached_content` as its context."
)
tools_lib = self._get_tools_lib(tools)
if tools_lib is not None:
tools_lib = tools_lib.to_proto()
if tool_config is None:
tool_config = self._tool_config
else:
tool_config = content_types.to_tool_config(tool_config)
contents = content_types.to_contents(contents)
generation_config = generation_types.to_generation_config_dict(generation_config)
merged_gc = self._generation_config.copy()
merged_gc.update(generation_config)
safety_settings = safety_types.to_easy_safety_dict(safety_settings)
merged_ss = self._safety_settings.copy()
merged_ss.update(safety_settings)
merged_ss = safety_types.normalize_safety_settings(merged_ss)
return protos.GenerateContentRequest(
model=self._model_name,
contents=contents,
generation_config=merged_gc,
safety_settings=merged_ss,
tools=tools_lib,
tool_config=tool_config,
system_instruction=self._system_instruction,
cached_content=self.cached_content,
)
def _get_tools_lib(
self, tools: content_types.FunctionLibraryType
) -> content_types.FunctionLibrary | None:
if tools is None:
return self._tools
else:
return content_types.to_function_library(tools)
@overload
@classmethod
def from_cached_content(
cls,
cached_content: str,
*,
generation_config: generation_types.GenerationConfigType | None = None,
safety_settings: safety_types.SafetySettingOptions | None = None,
) -> GenerativeModel: ...
@overload
@classmethod
def from_cached_content(
cls,
cached_content: caching.CachedContent,
*,
generation_config: generation_types.GenerationConfigType | None = None,
safety_settings: safety_types.SafetySettingOptions | None = None,
) -> GenerativeModel: ...
@classmethod
def from_cached_content(
cls,
cached_content: str | caching.CachedContent,
*,
generation_config: generation_types.GenerationConfigType | None = None,
safety_settings: safety_types.SafetySettingOptions | None = None,
) -> GenerativeModel:
"""Creates a model with `cached_content` as model's context.
Args:
cached_content: context for the model.
generation_config: Overrides for the model's generation config.
safety_settings: Overrides for the model's safety settings.
Returns:
`GenerativeModel` object with `cached_content` as its context.
"""
if isinstance(cached_content, str):
cached_content = caching.CachedContent.get(name=cached_content)
# call __init__ to set the model's `generation_config`, `safety_settings`.
# `model_name` will be the name of the model for which the `cached_content` was created.
self = cls(
model_name=cached_content.model,
generation_config=generation_config,
safety_settings=safety_settings,
)
# set the model's context.
setattr(self, "_cached_content", cached_content.name)
return self
def generate_content(
self,
contents: content_types.ContentsType,
*,
generation_config: generation_types.GenerationConfigType | None = None,
safety_settings: safety_types.SafetySettingOptions | None = None,
stream: bool = False,
tools: content_types.FunctionLibraryType | None = None,
tool_config: content_types.ToolConfigType | None = None,
request_options: helper_types.RequestOptionsType | None = None,
) -> generation_types.GenerateContentResponse:
"""A multipurpose function to generate responses from the model.
This `GenerativeModel.generate_content` method can handle multimodal input, and multi-turn
conversations.
>>> model = genai.GenerativeModel('models/gemini-1.5-flash')
>>> response = model.generate_content('Tell me a story about a magic backpack')
>>> response.text
### Streaming
This method supports streaming with the `stream=True`. The result has the same type as the non streaming case,
but you can iterate over the response chunks as they become available:
>>> response = model.generate_content('Tell me a story about a magic backpack', stream=True)
>>> for chunk in response:
... print(chunk.text)
### Multi-turn
This method supports multi-turn chats but is **stateless**: the entire conversation history needs to be sent with each
request. This takes some manual management but gives you complete control:
>>> messages = [{'role':'user', 'parts': ['hello']}]
>>> response = model.generate_content(messages) # "Hello, how can I help"
>>> messages.append(response.candidates[0].content)
>>> messages.append({'role':'user', 'parts': ['How does quantum physics work?']})
>>> response = model.generate_content(messages)
For a simpler multi-turn interface see `GenerativeModel.start_chat`.
### Input type flexibility
While the underlying API strictly expects a `list[protos.Content]` objects, this method
will convert the user input into the correct type. The hierarchy of types that can be
converted is below. Any of these objects can be passed as an equivalent `dict`.
* `Iterable[protos.Content]`
* `protos.Content`
* `Iterable[protos.Part]`
* `protos.Part`
* `str`, `Image`, or `protos.Blob`
In an `Iterable[protos.Content]` each `content` is a separate message.
But note that an `Iterable[protos.Part]` is taken as the parts of a single message.
Arguments:
contents: The contents serving as the model's prompt.
generation_config: Overrides for the model's generation config.
safety_settings: Overrides for the model's safety settings.
stream: If True, yield response chunks as they are generated.
tools: `protos.Tools` more info coming soon.
request_options: Options for the request.
"""
if not contents:
raise TypeError("contents must not be empty")
request = self._prepare_request(
contents=contents,
generation_config=generation_config,
safety_settings=safety_settings,
tools=tools,
tool_config=tool_config,
)
if request.contents and not request.contents[-1].role:
request.contents[-1].role = _USER_ROLE
if self._client is None:
self._client = client.get_default_generative_client()
if request_options is None:
request_options = {}
try:
if stream:
with generation_types.rewrite_stream_error():
iterator = self._client.stream_generate_content(
request,
**request_options,
)
return generation_types.GenerateContentResponse.from_iterator(iterator)
else:
response = self._client.generate_content(
request,
**request_options,
)
return generation_types.GenerateContentResponse.from_response(response)
except google.api_core.exceptions.InvalidArgument as e:
if e.message.startswith("Request payload size exceeds the limit:"):
e.message += (
" The file size is too large. Please use the File API to upload your files instead. "
"Example: `f = genai.upload_file(path); m.generate_content(['tell me about this file:', f])`"
)
raise
async def generate_content_async(
self,
contents: content_types.ContentsType,
*,
generation_config: generation_types.GenerationConfigType | None = None,
safety_settings: safety_types.SafetySettingOptions | None = None,
stream: bool = False,
tools: content_types.FunctionLibraryType | None = None,
tool_config: content_types.ToolConfigType | None = None,
request_options: helper_types.RequestOptionsType | None = None,
) -> generation_types.AsyncGenerateContentResponse:
"""The async version of `GenerativeModel.generate_content`."""
if not contents:
raise TypeError("contents must not be empty")
request = self._prepare_request(
contents=contents,
generation_config=generation_config,
safety_settings=safety_settings,
tools=tools,
tool_config=tool_config,
)
if request.contents and not request.contents[-1].role:
request.contents[-1].role = _USER_ROLE
if self._async_client is None:
self._async_client = client.get_default_generative_async_client()
if request_options is None:
request_options = {}
try:
if stream:
with generation_types.rewrite_stream_error():
iterator = await self._async_client.stream_generate_content(
request,
**request_options,
)
return await generation_types.AsyncGenerateContentResponse.from_aiterator(iterator)
else:
response = await self._async_client.generate_content(
request,
**request_options,
)
return generation_types.AsyncGenerateContentResponse.from_response(response)
except google.api_core.exceptions.InvalidArgument as e:
if e.message.startswith("Request payload size exceeds the limit:"):
e.message += (
" The file size is too large. Please use the File API to upload your files instead. "
"Example: `f = genai.upload_file(path); m.generate_content(['tell me about this file:', f])`"
)
raise
# fmt: off
def count_tokens(
self,
contents: content_types.ContentsType = None,
*,
generation_config: generation_types.GenerationConfigType | None = None,
safety_settings: safety_types.SafetySettingOptions | None = None,
tools: content_types.FunctionLibraryType | None = None,
tool_config: content_types.ToolConfigType | None = None,
request_options: helper_types.RequestOptionsType | None = None,
) -> protos.CountTokensResponse:
if request_options is None:
request_options = {}
if self._client is None:
self._client = client.get_default_generative_client()
request = protos.CountTokensRequest(
model=self.model_name,
generate_content_request=self._prepare_request(
contents=contents,
generation_config=generation_config,
safety_settings=safety_settings,
tools=tools,
tool_config=tool_config,
))
return self._client.count_tokens(request, **request_options)
async def count_tokens_async(
self,
contents: content_types.ContentsType = None,
*,
generation_config: generation_types.GenerationConfigType | None = None,
safety_settings: safety_types.SafetySettingOptions | None = None,
tools: content_types.FunctionLibraryType | None = None,
tool_config: content_types.ToolConfigType | None = None,
request_options: helper_types.RequestOptionsType | None = None,
) -> protos.CountTokensResponse:
if request_options is None:
request_options = {}
if self._async_client is None:
self._async_client = client.get_default_generative_async_client()
request = protos.CountTokensRequest(
model=self.model_name,
generate_content_request=self._prepare_request(
contents=contents,
generation_config=generation_config,
safety_settings=safety_settings,
tools=tools,
tool_config=tool_config,
))
return await self._async_client.count_tokens(request, **request_options)
# fmt: on
def start_chat(
self,
*,
history: Iterable[content_types.StrictContentType] | None = None,
enable_automatic_function_calling: bool = False,
) -> ChatSession:
"""Returns a `genai.ChatSession` attached to this model.
>>> model = genai.GenerativeModel()
>>> chat = model.start_chat(history=[...])
>>> response = chat.send_message("Hello?")
Arguments:
history: An iterable of `protos.Content` objects, or equivalents to initialize the session.
"""
if self._generation_config.get("candidate_count", 1) > 1:
raise ValueError(
"Invalid configuration: The chat functionality does not support `candidate_count` greater than 1."
)
return ChatSession(
model=self,
history=history,
enable_automatic_function_calling=enable_automatic_function_calling,
)
class ChatSession:
"""Contains an ongoing conversation with the model.
>>> model = genai.GenerativeModel('models/gemini-1.5-flash')
>>> chat = model.start_chat()
>>> response = chat.send_message("Hello")
>>> print(response.text)
>>> response = chat.send_message("Hello again")
>>> print(response.text)
>>> response = chat.send_message(...
This `ChatSession` object collects the messages sent and received, in its
`ChatSession.history` attribute.
Arguments:
model: The model to use in the chat.
history: A chat history to initialize the object with.
"""
def __init__(
self,
model: GenerativeModel,
history: Iterable[content_types.StrictContentType] | None = None,
enable_automatic_function_calling: bool = False,
):
self.model: GenerativeModel = model
self._history: list[protos.Content] = content_types.to_contents(history)
self._last_sent: protos.Content | None = None
self._last_received: generation_types.BaseGenerateContentResponse | None = None
self.enable_automatic_function_calling = enable_automatic_function_calling
def send_message(
self,
content: content_types.ContentType,
*,
generation_config: generation_types.GenerationConfigType = None,
safety_settings: safety_types.SafetySettingOptions = None,
stream: bool = False,
tools: content_types.FunctionLibraryType | None = None,
tool_config: content_types.ToolConfigType | None = None,
request_options: helper_types.RequestOptionsType | None = None,
) -> generation_types.GenerateContentResponse:
"""Sends the conversation history with the added message and returns the model's response.
Appends the request and response to the conversation history.
>>> model = genai.GenerativeModel('models/gemini-1.5-flash')
>>> chat = model.start_chat()
>>> response = chat.send_message("Hello")
>>> print(response.text)
"Hello! How can I assist you today?"
>>> len(chat.history)
2
Call it with `stream=True` to receive response chunks as they are generated:
>>> chat = model.start_chat()
>>> response = chat.send_message("Explain quantum physics", stream=True)
>>> for chunk in response:
... print(chunk.text, end='')
Once iteration over chunks is complete, the `response` and `ChatSession` are in states identical to the
`stream=False` case. Some properties are not available until iteration is complete.
Like `GenerativeModel.generate_content` this method lets you override the model's `generation_config` and
`safety_settings`.
Arguments:
content: The message contents.
generation_config: Overrides for the model's generation config.
safety_settings: Overrides for the model's safety settings.
stream: If True, yield response chunks as they are generated.
"""
if request_options is None:
request_options = {}
if self.enable_automatic_function_calling and stream:
raise NotImplementedError(
"Unsupported configuration: The `google.generativeai` SDK currently does not support the combination of `stream=True` and `enable_automatic_function_calling=True`."
)
tools_lib = self.model._get_tools_lib(tools)
content = content_types.to_content(content)
if not content.role:
content.role = _USER_ROLE
history = self.history[:]
history.append(content)
generation_config = generation_types.to_generation_config_dict(generation_config)
if generation_config.get("candidate_count", 1) > 1:
raise ValueError(
"Invalid configuration: The chat functionality does not support `candidate_count` greater than 1."
)
response = self.model.generate_content(
contents=history,
generation_config=generation_config,
safety_settings=safety_settings,
stream=stream,
tools=tools_lib,
tool_config=tool_config,
request_options=request_options,
)
self._check_response(response=response, stream=stream)
if self.enable_automatic_function_calling and tools_lib is not None:
self.history, content, response = self._handle_afc(
response=response,
history=history,
generation_config=generation_config,
safety_settings=safety_settings,
stream=stream,
tools_lib=tools_lib,
request_options=request_options,
)
self._last_sent = content
self._last_received = response
return response
def _check_response(self, *, response, stream):
if response.prompt_feedback.block_reason:
raise generation_types.BlockedPromptException(response.prompt_feedback)
if not stream:
if response.candidates[0].finish_reason not in (
protos.Candidate.FinishReason.FINISH_REASON_UNSPECIFIED,
protos.Candidate.FinishReason.STOP,
protos.Candidate.FinishReason.MAX_TOKENS,
):
raise generation_types.StopCandidateException(response.candidates[0])
def _get_function_calls(self, response) -> list[protos.FunctionCall]:
candidates = response.candidates
if len(candidates) != 1:
raise ValueError(
f"Invalid number of candidates: Automatic function calling only works with 1 candidate, but {len(candidates)} were provided."
)
parts = candidates[0].content.parts
function_calls = [part.function_call for part in parts if part and "function_call" in part]
return function_calls
def _handle_afc(
self,
*,
response,
history,
generation_config,
safety_settings,
stream,
tools_lib,
request_options,
) -> tuple[list[protos.Content], protos.Content, generation_types.BaseGenerateContentResponse]:
while function_calls := self._get_function_calls(response):
if not all(callable(tools_lib[fc]) for fc in function_calls):
break
history.append(response.candidates[0].content)
function_response_parts: list[protos.Part] = []
for fc in function_calls:
fr = tools_lib(fc)
assert fr is not None, (
"Unexpected state: The function reference (fr) should never be None. It should only return None if the declaration "
"is not callable, which is checked earlier in the code."
)
function_response_parts.append(fr)
send = protos.Content(role=_USER_ROLE, parts=function_response_parts)
history.append(send)
response = self.model.generate_content(
contents=history,
generation_config=generation_config,
safety_settings=safety_settings,
stream=stream,
tools=tools_lib,
request_options=request_options,
)
self._check_response(response=response, stream=stream)
*history, content = history
return history, content, response
async def send_message_async(
self,
content: content_types.ContentType,
*,
generation_config: generation_types.GenerationConfigType = None,
safety_settings: safety_types.SafetySettingOptions = None,
stream: bool = False,
tools: content_types.FunctionLibraryType | None = None,
tool_config: content_types.ToolConfigType | None = None,
request_options: helper_types.RequestOptionsType | None = None,
) -> generation_types.AsyncGenerateContentResponse:
"""The async version of `ChatSession.send_message`."""
if request_options is None:
request_options = {}
if self.enable_automatic_function_calling and stream:
raise NotImplementedError(
"Unsupported configuration: The `google.generativeai` SDK currently does not support the combination of `stream=True` and `enable_automatic_function_calling=True`."
)
tools_lib = self.model._get_tools_lib(tools)
content = content_types.to_content(content)
if not content.role:
content.role = _USER_ROLE
history = self.history[:]
history.append(content)
generation_config = generation_types.to_generation_config_dict(generation_config)
if generation_config.get("candidate_count", 1) > 1:
raise ValueError(
"Invalid configuration: The chat functionality does not support `candidate_count` greater than 1."
)
response = await self.model.generate_content_async(
contents=history,
generation_config=generation_config,
safety_settings=safety_settings,
stream=stream,
tools=tools_lib,
tool_config=tool_config,
request_options=request_options,
)
self._check_response(response=response, stream=stream)
if self.enable_automatic_function_calling and tools_lib is not None:
self.history, content, response = await self._handle_afc_async(
response=response,
history=history,
generation_config=generation_config,
safety_settings=safety_settings,
stream=stream,
tools_lib=tools_lib,
request_options=request_options,
)
self._last_sent = content
self._last_received = response
return response
async def _handle_afc_async(
self,
*,
response,
history,
generation_config,
safety_settings,
stream,
tools_lib,
request_options,
) -> tuple[list[protos.Content], protos.Content, generation_types.BaseGenerateContentResponse]:
while function_calls := self._get_function_calls(response):
if not all(callable(tools_lib[fc]) for fc in function_calls):
break
history.append(response.candidates[0].content)
function_response_parts: list[protos.Part] = []
for fc in function_calls:
fr = tools_lib(fc)
assert fr is not None, (
"Unexpected state: The function reference (fr) should never be None. It should only return None if the declaration "
"is not callable, which is checked earlier in the code."
)
function_response_parts.append(fr)
send = protos.Content(role=_USER_ROLE, parts=function_response_parts)
history.append(send)
response = await self.model.generate_content_async(
contents=history,
generation_config=generation_config,
safety_settings=safety_settings,
stream=stream,
tools=tools_lib,
request_options=request_options,
)
self._check_response(response=response, stream=stream)
*history, content = history
return history, content, response
def __copy__(self):
return ChatSession(
model=self.model,
# Be sure the copy doesn't share the history.
history=list(self.history),
)
def rewind(self) -> tuple[protos.Content, protos.Content]:
"""Removes the last request/response pair from the chat history."""
if self._last_received is None:
result = self._history.pop(-2), self._history.pop()
return result
else:
result = self._last_sent, self._last_received.candidates[0].content
self._last_sent = None
self._last_received = None
return result
@property
def last(self) -> generation_types.BaseGenerateContentResponse | None:
"""returns the last received `genai.GenerateContentResponse`"""
return self._last_received
@property
def history(self) -> list[protos.Content]:
"""The chat history."""
last = self._last_received
if last is None:
return self._history
if last.candidates[0].finish_reason not in (
protos.Candidate.FinishReason.FINISH_REASON_UNSPECIFIED,
protos.Candidate.FinishReason.STOP,
protos.Candidate.FinishReason.MAX_TOKENS,
):
error = generation_types.StopCandidateException(last.candidates[0])
last._error = error
if last._error is not None:
raise generation_types.BrokenResponseError(
"Unable to build a coherent chat history due to a broken streaming response. "
"Refer to the previous exception for details. "
"To inspect the last response object, use `chat.last`. "
"To remove the last request/response `Content` objects from the chat, "
"call `last_send, last_received = chat.rewind()` and continue without it."
) from last._error
sent = self._last_sent
received = last.candidates[0].content
if not received.role:
received.role = _MODEL_ROLE
self._history.extend([sent, received])
self._last_sent = None
self._last_received = None
return self._history
@history.setter
def history(self, history):
self._history = content_types.to_contents(history)
self._last_sent = None
self._last_received = None
def __repr__(self) -> str:
_dict_repr = reprlib.Repr()
_model = str(self.model).replace("\n", "\n" + " " * 4)
def content_repr(x):
return f"protos.Content({_dict_repr.repr(type(x).to_dict(x))})"
try:
history = list(self.history)
except (generation_types.BrokenResponseError, generation_types.IncompleteIterationError):
history = list(self._history)
if self._last_sent is not None:
history.append(self._last_sent)
history = [content_repr(x) for x in history]
last_received = self._last_received
if last_received is not None:
if last_received._error is not None:
history.append("<STREAMING ERROR>")
else:
history.append("<STREAMING IN PROGRESS>")
_history = ",\n " + f"history=[{', '.join(history)}]\n)"
return (
textwrap.dedent(
f"""\
ChatSession(
model="""
)
+ _model
+ _history
)