maga_transformer/utils/complete_response_async_generator.py (32 lines of code) (raw):
from typing import AsyncGenerator, List, Callable, Any
class CompleteResponseAsyncGenerator:
def __init__(self, generator: AsyncGenerator, collect_complete_response_func: Callable):
self._generator = generator
self._collect_complete_response_func = collect_complete_response_func
self._all_responses = []
def __aiter__(self):
return self
async def __anext__(self):
try:
response = await self._generator.__anext__()
self._all_responses.append(response)
return response
except StopAsyncIteration:
raise
async def aclose(self):
return await self._generator.aclose()
async def gen_complete_response_once(self) -> Any:
return await self._collect_complete_response_func(CompleteResponseAsyncGenerator.generate_from_list(self._all_responses))
@staticmethod
async def generate_from_list(response_list) -> AsyncGenerator:
for response in response_list:
yield response
@staticmethod
async def get_last_value(all_responses: AsyncGenerator):
response = None
try:
async for response in all_responses:
pass
except StopAsyncIteration:
pass
return response