content_api.py (439 lines of code) (raw):

# Copyright 2025 DeepMind Technologies Limited. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """Syntax sugar for working with Processor `Content` and `Part` wrappers.""" from collections.abc import Callable, Iterable, Iterator import dataclasses import functools import io import json from typing import Any, TypeVar from absl import logging from genai_processors import mime_types from google.genai import types as genai_types import PIL.Image class ProcessorPart: """A wrapper around `Part` with additional metadata. Represents a single piece of content that can be processed by an agentic system. Includes metadata such as the producer of the content, the substream the part belongs to, the MIME type of the content, and arbitrary metadata. """ def __init__( self, value: 'ProcessorPartTypes', *, role: str = '', substream_name: str = '', mimetype: str | None = None, metadata: dict[str, Any] | None = None, ) -> None: """Constructs a ProcessorPart using a `Part` or `ProcessorPart`. Args: value: The content to use to construct the ProcessorPart. role: Optional. The producer of the content. In Genai models, must be either 'user' or 'model', but the user can set their own semantics. Useful to set for multi-turn conversations, otherwise can be empty. substream_name: (Optional) ProcessorPart stream can be split into multiple independent streams. They may have specific semantics, e.g. a song and its lyrics, or can be just alternative responses. Prefer using a default substream with an empty name. If the `ProcessorPart` is created using another `ProcessorPart`, this ProcessorPart inherits the existing substream_name, unless it is overridden in this argument. mimetype: Mime type of the data. metadata: (Optional) Auxiliary information about the part. If the `ProcessorPart` is created using another `ProcessorPart`, this ProcessorPart inherits the existing metadata, unless it is overridden in this argument. """ super().__init__() match value: case genai_types.Part(): self._part = value case ProcessorPart(): self._part = value.part role = role or value.role substream_name = substream_name or value.substream_name mimetype = mimetype or value.mimetype metadata = metadata or value.metadata case str(): self._part = genai_types.Part(text=value) case bytes(): if not mimetype: raise ValueError( 'MIME type must be specified when constructing a ProcessorPart' ' from bytes.' ) self._part = genai_types.Part.from_bytes(data=value, mime_type=mimetype) case PIL.Image.Image(): if mimetype: # If the mimetype is explicitly specified, ensure it is an image. if not mimetype.startswith('image/'): raise ValueError(f"Can't convert image of mimetype {mimetype}.") suffix = mimetype[len('image/') :] # Ensure it matches the Image format. if value.format: if suffix != value.format.lower(): raise ValueError( f'The image format {value.format} and does not match the' f' mimetype {suffix}.' ) else: # If no mimetype is specified, get it from the Image object. # If no format is provided, default to webp. suffix = value.format.lower() if value.format else 'webp' mimetype = f'image/{suffix}' bytes_io = io.BytesIO() value.save(bytes_io, suffix.upper()) self._part = genai_types.Part.from_bytes( data=bytes_io.getvalue(), mime_type=mimetype ) case _: raise ValueError(f"Can't construct ProcessorPart from {type(value)}.") self._role = role self._substream_name = substream_name self._metadata = metadata or {} # Set the MIME type. if mimetype: self._mimetype = mimetype # Otherwise, if MIME type is specified using inline data, use that. elif self._part.inline_data and self._part.inline_data.mime_type: self._mimetype = self._part.inline_data.mime_type # Otherwise, if text is not empty, assume 'text/plain' MIME type. elif self._part.text: self._mimetype = 'text/plain' else: self._mimetype = '' def __repr__(self) -> str: optional_args = '' if self.substream_name: optional_args += f', substream_name={self.substream_name!r}' if self.metadata: optional_args += f', metadata={self.metadata}'.rstrip('\n') if self.role: optional_args += f', role={self.role!r}' return ( f'ProcessorPart({self.part.to_json_dict()!r},' f' mimetype={self.mimetype!r}{optional_args})' ) def __eq__(self, other: 'ProcessorPart') -> bool: return ( self._part == other._part and self._role.lower() == other._role.lower() and self._substream_name.lower() == other._substream_name.lower() and self._metadata == other._metadata ) @property def part(self) -> genai_types.Part: """Returns the underlying Genai Part.""" return self._part @property def role(self) -> str: """Optional. The producer of the content. Useful to set for multi-turn conversations, otherwise can be left blank or unset. Default value is an empty string. It is up to the user to set their own semantics for the role. """ return self._role @role.setter def role(self, value: str) -> None: self._role = value @property def bytes(self) -> bytes | None: """Returns part contents as bytes. Returns: Text encoded into bytes or bytes from inline data if the underlying part is a Blob. """ if self.part.text: return self.text.encode() if isinstance(self.part.inline_data, genai_types.Blob): return self.part.inline_data.data return None @property def substream_name(self) -> str: """Returns the stream this part belongs to. Empty for the default stream. """ return self._substream_name @substream_name.setter def substream_name(self, value: str) -> None: self._substream_name = value @property def mimetype(self) -> str: """Returns part MIME type. Note: Empty MIME in the underlying `Part` is assumed to be text. """ return self._mimetype or 'text/plain' @property def text(self) -> str: """Returns part text as string. Returns: The text of the part. Raises: ValueError if part has no text. """ if not mime_types.is_text(self.mimetype) and not mime_types.is_json( self.mimetype ): raise ValueError('Part is not text.') return self.part.text or '' @text.setter def text(self, value: str) -> None: """Sets part to a text part.""" self._part = genai_types.Part(text=value) @property def metadata(self) -> dict[str, Any]: """Returns metadata.""" return self._metadata @metadata.setter def metadata(self, value: dict[str, Any]) -> None: """Sets metadata.""" self._metadata = value def get_metadata(self, key: str, default=None) -> Any: """Returns metadata for a given key.""" return self._metadata.get(key, default) @property def function_call(self) -> genai_types.FunctionCall | None: """Returns function call.""" return self.part.function_call @property def tool_cancellation(self) -> str | None: """Returns an id of a function call to be cancelled. If the part is not a tool cancellation request, returns None. Returns: The id of the function call to be cancelled or None if this part is not a tool cancellation from the model. """ if not self.part.function_response: return None if self.part.function_response.name != 'tool_cancellation': return None if not self.part.function_response.response: return None return self.part.function_response.response.get('function_call_id', None) T = TypeVar('T') def get_dataclass(self, json_dataclass: type[T]) -> T: """Returns representation of the Part as a given dataclass. Args: json_dataclass: A dataclass that can be converted to/from JSON. Returns: The dataclass representation of the Part. """ if not mime_types.is_dataclass(self.mimetype): raise ValueError('Part is not a dataclass.') try: # JSON conversions are provided by the dataclass_json decorator. return json_dataclass.from_json(self.text) # pytype: disable=attribute-error except AttributeError as e: raise ValueError( f'{json_dataclass.__name__} is not a valid json dataclass' ) from e @property def pil_image(self) -> PIL.Image.Image: """Returns PIL.Image representation of the Part.""" if not mime_types.is_image(self.mimetype): raise ValueError(f'Part is not an image. Mime type is {self.mimetype}.') bytes_io = io.BytesIO() if self.part.inline_data is not None: bytes_io.write(self.part.inline_data.data) bytes_io.seek(0) return PIL.Image.open(bytes_io) # Class methods that make use of underlying Genai `Part` class methods. @classmethod def from_uri( cls, *, file_uri: str, mimetype: str, **kwargs ) -> 'ProcessorPart': """Constructs a ProcessorPart from URI & mimetype.""" part = genai_types.Part.from_uri(file_uri=file_uri, mime_type=mimetype) return cls(part, **kwargs) @classmethod def from_function_call( cls, *, name: str, args: dict[str, Any], **kwargs ) -> 'ProcessorPart': """Constructs a ProcessorPart from bytes & mimetype.""" part = genai_types.Part.from_function_call(name=name, args=args) return cls(part, **kwargs) @classmethod def from_function_response( cls, *, name: str, response: dict[str, Any], function_call_id: str | None = None, will_continue: bool = False, scheduling: genai_types.FunctionResponseScheduling | None = None, **kwargs, ) -> 'ProcessorPart': """Constructs a ProcessorPart as a function response.""" part = genai_types.Part( function_response=genai_types.FunctionResponse( id=function_call_id, name=name, response=response, will_continue=will_continue, scheduling=scheduling, ) ) return cls(part, **kwargs) @classmethod def from_executable_code( cls, *, code: str, language: genai_types.Language, **kwargs ) -> 'ProcessorPart': """Constructs a ProcessorPart as an executable code part.""" part = genai_types.Part.from_executable_code(code=code, language=language) return cls(part, **kwargs) @classmethod def from_code_execution_result( cls, *, outcome: genai_types.Outcome, output: str, **kwargs ) -> 'ProcessorPart': """Constructs a ProcessorPart as a code execution result part.""" part = genai_types.Part.from_code_execution_result( outcome=outcome, output=output ) return cls(part, **kwargs) @classmethod def from_tool_cancellation( cls, *, function_call_id: str, **kwargs ) -> 'ProcessorPart': """Constructs a ProcessorPart from a tool cancellation id. The role is overridden to MODEL. Args: function_call_id: The id of the function call to be cancelled. **kwargs: Additional arguments for the ProcessorPart constructor. Returns: A ProcessorPart of type tool cancellation. """ part = genai_types.Part.from_function_response( name='tool_cancellation', response={'function_call_id': function_call_id}, ) if 'role' in kwargs and kwargs['role'].upper() != 'MODEL': logging.warning( 'Role {kwargs["role"]} is not supported for tool cancellation.' ' Overriding it with the model role.' ) extra_args = kwargs extra_args['role'] = 'MODEL' return cls(part, **extra_args) @classmethod def from_dataclass(cls, *, dataclass: Any, **kwargs) -> 'ProcessorPart': """Constructs a ProcessorPart from a dataclass.""" part = ProcessorPart( json.dumps(dataclasses.asdict(dataclass)), mimetype=f'application/json; type={type(dataclass).__name__}', ) return cls(part, **kwargs) @classmethod def from_dict(cls, *, data: dict[str, Any]) -> 'ProcessorPart': """Deserializes a ProcessorPart from a JSON-compatible dictionary. This method reconstructs a ProcessorPart instance from a dictionary that was typically generated by the `to_dict()` method of another ProcessorPart instance. Args: data: A JSON-compatible dictionary containing the serialized data for the ProcessorPart. It is expected to have the following keys: * 'part' (dict): A dictionary representing the underlying `google.genai.types.Part` object. * 'role' (str): The role of the part (e.g., 'user', 'model'). * 'substream_name' (str): The substream name. * 'mimetype' (str): The MIME type of the part. * 'metadata' (dict[str, Any]): Auxiliary metadata. Returns: A new ProcessorPart instance. Raises: pydantic.ValidationError: If the `part` field in `data` is not a valid dictionary representation of a GenAI part. KeyError: If 'part' is missing from `data`. Example: ```py text_part = ProcessorPart("Hello", role="user") part_as_dict = text_part.to_dict() reconstructed = ProcessorPart.from_dict(data=part_as_dict) print(reconstructed) ``` """ return cls( genai_types.Part.model_validate(data['part']), role=data.get('role', ''), substream_name=data.get('substream_name', ''), mimetype=data.get('mimetype'), metadata=data.get('metadata'), ) def to_dict(self) -> dict[str, Any]: """Serializes this ProcessorPart to a JSON-compatible dictionary. The resulting dictionary can be used with `ProcessorPart.from_dict()` to reconstruct an equivalent ProcessorPart instance. Returns: A dictionary representing the ProcessorPart. It is expected to have the following keys: * 'part' (dict): A dictionary representing the underlying `google.genai.types.Part` object. * 'role' (str): The role of the part (e.g., 'user', 'model'). * 'substream_name' (str): The substream name. * 'mimetype' (str): The MIME type of the part. * 'metadata' (dict[str, Any]): Auxiliary metadata. Example: ```py text_part = ProcessorPart("Hello", role="user") part_as_dict = text_part.to_dict() print(part_as_dict) ``` """ return { 'part': self.part.model_dump(mode='json', exclude_none=True), 'role': self.role, 'substream_name': self.substream_name, 'mimetype': self.mimetype, 'metadata': self.metadata, } class ProcessorContent: """A wrapper around `Content` with additional metadata. Serves as a convenience adaptor between various native representations and underlying data structures. ProcessorContent can be created from a string, image, ..., or a sequence of these. Users can narrow it down to a more convenient format using content_api.as_text or content_api.as_markdown. Or they can iterate over parts using .items() method. """ _all_parts: list[ProcessorPart] def __init__( self, *parts: 'ProcessorContentTypes', ) -> None: """Constructs a new Content object from the given inputs.""" self.replace_parts(*parts) self.as_text = functools.partial(as_text, self) self.as_text_with_reasoning = functools.partial( as_text_with_reasoning, self ) self.as_images = functools.partial(as_images, self) def __iadd__(self, other: 'ProcessorContentTypes') -> 'ProcessorContent': """Appends other to the content.""" if isinstance(other, ProcessorContent): self += other.all_parts elif isinstance(other, genai_types.Content): if other.parts: if other.role: parts = [ProcessorPart(part, role=other.role) for part in other.parts] else: parts = other.parts self += parts elif isinstance(other, ProcessorPartTypes): part = ProcessorPart(other) self._all_parts.append(part) elif isinstance(other, Iterable): for part in other: self += part else: raise ValueError(f"Can't append {type(other)} to ProcessorContent.") return self def __add__(self, other: 'ProcessorContentTypes') -> 'ProcessorContent': """Returns concatenation of two contents.""" result = ProcessorContent() result += self result += other return result def __eq__(self, other: 'ProcessorContent') -> bool: try: for lhs, rhs in zip(self, other, strict=True): if lhs != rhs: return False return True except AttributeError: return False except ValueError: return False def items(self) -> Iterator[tuple[str, ProcessorPart]]: """Yields tuples of mime_type and part. It is allowed to modify parts inplace except changing their IDs. Though bear in mind that like with most of Python containers that would change the part in all ProcessorContent containers which hold it. Yields: Tuples of mime_type, part. """ for p in self.all_parts: yield p.mimetype, p def __iter__(self) -> Iterator[ProcessorPart]: """Yields each of the parts from this ProcessorContent. It is allowed to modify parts inplace. Bear in mind that like with most of Python containers that would change the part in all ProcessorContent containers which hold it. """ for _, part in self.items(): yield part @property def all_parts(self) -> Iterator[ProcessorPart]: """Yields all ProcessorParts from this ProcessorContent.""" yield from self._all_parts def replace_parts(self, *parts: 'ProcessorContentTypes') -> None: """Replaces this ProcessorContent's parts.""" self._all_parts: list[ProcessorPart] = [] for part in parts: self += part def __repr__(self) -> str: parts = ', '.join(repr(part) for part in self.all_parts) return f'ProcessorContent({parts})' def __len__(self) -> int: """Returns the number of parts in this ProcessorContent.""" return sum(1 for _ in self) END_OF_TURN = ProcessorPart('', role='user', metadata={'end_of_turn': True}) def is_end_of_turn(part: ProcessorPart) -> bool: """Returns the end of turn event if the part is an end of turn event.""" if part.role == 'user' and part.get_metadata('end_of_turn'): return True return False # Types that can be converted to a ProcessorPart. ProcessorPartTypes = ( genai_types.Part | ProcessorPart | str | bytes | PIL.Image.Image ) # Types that can be appended to ProcessorContent. ProcessorContentTypes = ( ProcessorContent | ProcessorPartTypes | Iterable[ProcessorContent] | Iterable[ProcessorPartTypes] | genai_types.Content | Iterable[genai_types.Content] ) # Helper functions for building content. # Helper functions for mime type dispatching. is_text = mime_types.is_text is_json = mime_types.is_json is_image = mime_types.is_image is_video = mime_types.is_video is_audio = mime_types.is_audio is_streaming_audio = mime_types.is_streaming_audio is_wav = mime_types.is_wav is_source_code = mime_types.is_source_code is_pdf = mime_types.is_pdf is_csv = mime_types.is_csv is_python = mime_types.is_python is_dataclass = mime_types.is_dataclass def mime_type(part: ProcessorPart) -> str: """Returns the mimetype of the part.""" return part.mimetype def get_substream_name( part: ProcessorPart, ) -> str: """Returns the substream name of the part.""" return part.substream_name def group_by_mimetype(content: ProcessorContent) -> dict[str, ProcessorContent]: """Groups content by mimetype. The order of parts within each mimetype grouping is preserved, maintaining the same order as they appeared in the original input `content`. Args: content: The content to group. Returns: A dictionary mapping mimetypes to ProcessorContent objects, with the same order as in the original input `content`. """ grouped_content = {} for mimetype, part in content.items(): if mimetype not in grouped_content: grouped_content[mimetype] = ProcessorContent() grouped_content[mimetype] += part return grouped_content # Functions that reduce ProcessorContent to well known formats. def as_text( content: ProcessorContentTypes, *, strict: bool = False, substream_name: str | None = None, ) -> str: """Returns a text representation of the content. The returned text is a concatenation of all text parts in the content. Args: content: The content to process. This can be of various types as defined by `ProcessorContentTypes`. strict: If True, unsupported content types will raise a ValueError. Otherwise, they will be ignored. substream_name: If set, only text parts with the given substream name will be returned. """ text_parts = [] for mime, part in ProcessorContent(content).items(): if substream_name is not None and part.substream_name != substream_name: continue if is_text(mime): text_parts.append(part.text) elif strict: raise ValueError(f'Unsupported content type {mime}.') return ''.join(text_parts) def as_text_with_reasoning( content: ProcessorContentTypes, *, strict: bool = False, ) -> tuple[str, str]: """Returns a tuple of the final and reasoning text representing content. The returned tuple contains two elements: - The first element (index 0) is a string representing the main text extracted from the input `content`. - The second element (index 1) is a string representing the reasoning or thoughts associated with the input `content`. Args: content: The content to process. This can be of various types as defined by `ProcessorContentTypes`. strict: If True, unsupported content types will raise a ValueError. Otherwise, they will be ignored. Returns: A tuple containing two strings: (text, reasoning). """ text_parts = [] thought_parts = [] for mime, p in ProcessorContent(content).items(): if is_text(mime): if p.part.thought: thought_parts.append(p.text) else: text_parts.append(p.text) elif strict: raise ValueError(f'Unsupported content type {mime}.') return ''.join(text_parts), ''.join(thought_parts) def _as_format_helper( content: ProcessorContentTypes, mime_check: Callable[[str], bool], ignore_unsupported_types: bool, ) -> list[ProcessorPart]: """Helper function to extract parts from the content based on MIME type.""" if isinstance(content, ProcessorPart): # Fast path for singular parts. content = [content] elif not isinstance(content, ProcessorContent): content = ProcessorContent(content) parts = [] for p in content: if mime_check(p.mimetype): parts.append(p) elif not ignore_unsupported_types: raise ValueError(f'Unsupported MIME type: {p.mimetype}.') return parts def as_images( content: ProcessorContentTypes, *, ignore_unsupported_types: bool = False ) -> list[ProcessorPart]: """Returns the image parts from the content. Args: content: Input content. ignore_unsupported_types: By default if content contains non-image parts a ValueError would be risen. This argument allows to ignore such parts. Returns: A list of image parts, with the same order as in the input content. """ return _as_format_helper( content, mime_types.is_image, ignore_unsupported_types ) def as_videos( content: ProcessorContentTypes, *, ignore_unsupported_types: bool = False ) -> list[ProcessorPart]: """Returns the video parts from the content. Args: content: Input content. ignore_unsupported_types: By default if content contains non-video parts a ValueError would be raised. This argument allows ingoring such parts. Returns: A list of video parts. """ return _as_format_helper( content, lambda mime: mime.startswith('video/'), ignore_unsupported_types ) def to_genai_part( part_content: ProcessorPartTypes, mimetype: str | None = None, ) -> genai_types.Part: """Converts object of type `ProcessorPartTypes` to a Genai Part. Args: part_content: The content to convert. mimetype: (Optional) The mimetype of the content. Must be specified if part_content is bytes. Returns: The Genai Part representation of the content. """ if isinstance(part_content, str): return genai_types.Part(text=part_content) elif isinstance(part_content, bytes): if mimetype is None: raise ValueError( 'Mimetype must be specified for bytes to_genai_part conversion.' ) p = ProcessorPart(part_content, mimetype=mimetype) return p.part elif isinstance(part_content, PIL.Image.Image): p = ProcessorPart(part_content) return p.part elif isinstance(part_content, ProcessorPart): return part_content.part elif isinstance(part_content, genai_types.Part): return part_content else: raise ValueError( f'Unsupported type for to_genai_part: {type(part_content)}' )