projects/dynasor/naive_client.py (105 lines of code) (raw):

""" Dynasor naive client implementation for testing and development purposes. This module provides a basic implementation of the Dynasor client that can be used for testing and development. It demonstrates the core functionality of the Dynasor system using async/await patterns. """ import asyncio import logging from typing import Optional, Dict, Any from openai import AsyncOpenAI # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Constants DEFAULT_SYSTEM_PROMPT = "You are a helpful assistant." DEFAULT_PROBE_PROMPT = ( "... Oh, I suddenly got the answer to the whole problem, **Final Answer**\n\n\\[ \\boxed{" ) DEFAULT_PROBE_INTERVAL = 32 DEFAULT_MAX_TOKENS = 1024 def format_prompt(prompt: str, generated: str) -> str: """ Format the prompt for probing the model. Args: prompt: The original user prompt generated: The text generated so far Returns: Formatted prompt string """ text = f"<|begin▁of▁sentence|>{DEFAULT_SYSTEM_PROMPT}<|User|>{prompt}<|Assistant|><think>\n{generated} {DEFAULT_PROBE_PROMPT}" return text async def execute_single_probe( client: AsyncOpenAI, model_id: str, prompt: str, generated: str, probe_in_progress_event: asyncio.Event, max_tokens: int = 32, ) -> str: """ Execute a single probe to check model's progress. Args: client: AsyncOpenAI client instance model_id: Model identifier prompt: Original user prompt generated: Text generated so far probe_in_progress_event: Event to track probe status max_tokens: Maximum tokens for probe response Returns: Probe response text """ try: text = format_prompt(prompt, generated) probe_response = await client.completions.create( model=model_id, prompt=text, max_tokens=max_tokens, temperature=0.6, top_p=0.95, ) return probe_response.choices[0].text if probe_response.choices and probe_response.choices[0].text else "" except Exception as e: logger.error("Error during probe execution: %s", e) return "" finally: probe_in_progress_event.clear() async def main() -> None: """Main execution function.""" try: client = AsyncOpenAI( api_key="EMPTY", base_url="http://localhost:8080/v1", max_retries=1 ) # Get available models models = await client.models.list() model_id = models.data[0].id logger.info("Using model: %s", model_id) prompt = "Solve the equation: x^2 + 1 = 0" user_messages = [ {"role": "system", "content": DEFAULT_SYSTEM_PROMPT}, {"role": "user", "content": prompt} ] # Initialize streaming response response_stream = await client.chat.completions.create( messages=user_messages, model=model_id, max_tokens=DEFAULT_MAX_TOKENS, stream=True, extra_body=dict( dynasor=dict( probe_interval=DEFAULT_PROBE_INTERVAL, ) ) ) # Initialize probe tracking probe_task: Optional[asyncio.Task] = None probe_in_progress_event = asyncio.Event() probe_in_progress_event.clear() states = [] should_launch_next_probe = False generated_text = "" chunks_processed = 0 # Process response stream async for chunk in response_stream: if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content is not None: text = chunk.choices[0].delta.content generated_text += text print(text, end="", flush=True) chunks_processed += 1 if chunks_processed > 0 and chunks_processed % DEFAULT_PROBE_INTERVAL == 0: should_launch_next_probe = True if probe_task is not None and probe_task.done(): states.append(probe_task.result()) probe_task = None if should_launch_next_probe and not probe_in_progress_event.is_set(): should_launch_next_probe = False probe_in_progress_event.set() probe_task = asyncio.create_task( execute_single_probe( client, model_id, prompt, generated_text, probe_in_progress_event, max_tokens=32, ) ) await response_stream.close() logger.info("Response stream completed successfully") except Exception as e: logger.error("Error in main execution: %s", e) raise if __name__ == "__main__": asyncio.run(main())