in packages/python-packages/apiview-copilot/src/_apiview_reviewer.py [0:0]
def _generate_comments(self):
"""
Generate comments for the API view by submitting jobs in parallel.
"""
summary_tag = "summary"
guideline_tag = "guideline"
generic_tag = "generic"
outline_tag = "outline"
sectioned_doc = self._create_sectioned_document()
sections_to_process = [(i, section) for i, section in enumerate(sectioned_doc)]
# Select appropriate prompts based on mode
if self.mode == ApiViewReviewMode.FULL:
guideline_prompt_file = "guidelines_review.prompty"
generic_prompt_file = "generic_review.prompty"
summary_prompt_file = "summarize_api.prompty"
summary_content = self.target
elif self.mode == ApiViewReviewMode.DIFF:
guideline_prompt_file = "guidelines_diff_review.prompty"
generic_prompt_file = "generic_diff_review.prompty"
summary_prompt_file = "summarize_diff.prompty"
summary_content = create_diff_with_line_numbers(old=self.base, new=self.target)
else:
raise NotImplementedError(f"Review mode {self.mode} is not implemented.")
# Outline prompt is always based on self.target
outline_prompt_file = "generate_outline.prompty"
outline_content = self.target
# Set up progress tracking
print("Processing sections: ", end="", flush=True)
total_prompts = 1 + (len(sections_to_process) * 2) + 1 # 1 for summary, 1 for outline, 2 for each section
prompt_status = [self.PENDING] * total_prompts
# Set up keyboard interrupt handler for more responsive cancellation
cancel_event = threading.Event()
original_handler = signal.getsignal(signal.SIGINT)
def keyboard_interrupt_handler(signal, frame):
print("\n\nCancellation requested! Terminating process...")
cancel_event.set()
os._exit(1)
signal.signal(signal.SIGINT, keyboard_interrupt_handler)
# Submit all jobs to the executor
all_futures = {}
# 1. Summary task
all_futures[summary_tag] = self.executor.submit(
self._execute_prompt_task,
prompt_path=os.path.join(_PROMPTS_FOLDER, summary_prompt_file),
inputs={
"language": self._get_language_pretty_name(),
"content": summary_content,
},
task_name=summary_tag,
status_idx=0,
status_array=prompt_status,
)
# 2. Outline task (always based on self.target)
all_futures[outline_tag] = self.executor.submit(
self._execute_prompt_task,
prompt_path=os.path.join(_PROMPTS_FOLDER, outline_prompt_file),
inputs={
"content": outline_content,
},
task_name=outline_tag,
status_idx=1,
status_array=prompt_status,
)
# 3. Guideline and generic tasks for each section
for idx, (section_idx, section) in enumerate(sections_to_process):
# First check if cancellation is requested
if cancel_event.is_set():
break
# Prepare context for guideline tasks
if self.use_rag:
context = self._retrieve_and_resolve_guidelines(str(section))
if context:
context_string = context.to_markdown()
else:
logger.warning(
f"Failed to retrieve guidelines for section {section_idx}, using static guidelines instead."
)
self.semantic_search_failed = True
context_string = json.dumps(self.search.static_guidelines)
else:
context_string = json.dumps(self.search.static_guidelines)
# Guideline prompt
guideline_key = f"{guideline_tag}_{section_idx}"
all_futures[guideline_key] = self.executor.submit(
self._execute_prompt_task,
prompt_path=os.path.join(_PROMPTS_FOLDER, guideline_prompt_file),
inputs={
"language": self._get_language_pretty_name(),
"context": context_string,
"content": section.numbered(),
},
task_name=guideline_key,
status_idx=(idx * 2) + 2,
status_array=prompt_status,
)
# Generic prompt
generic_metadata = self._load_generic_metadata()
generic_key = f"{generic_tag}_{section_idx}"
all_futures[generic_key] = self.executor.submit(
self._execute_prompt_task,
prompt_path=os.path.join(_PROMPTS_FOLDER, generic_prompt_file),
inputs={
"language": self._get_language_pretty_name(),
"custom_rules": generic_metadata["custom_rules"],
"content": section.numbered(),
},
task_name=generic_key,
status_idx=(idx * 2) + 3,
status_array=prompt_status,
)
# Process results as they complete
try:
# Process summary result
summary_response = all_futures[summary_tag].result()
if summary_response:
self.summary = Comment(
rule_ids=[],
line_no=1,
bad_code="",
suggestion=None,
comment=summary_response,
source="summary",
)
# Process outline result
outline_response = all_futures[outline_tag].result()
if outline_response:
self.outline = outline_response
# Process each section's results
section_results = {}
for key, future in all_futures.items():
if key in {summary_tag, outline_tag}:
continue # Already processed
try:
result = future.result()
if result:
section_type, section_idx = key.split("_")
section_idx = int(section_idx)
# Initialize section result if needed
if section_idx not in section_results:
section_results[section_idx] = {"comments": []}
# Add comments from this prompt
if "comments" in result:
# Tag comments with their source
for comment in result["comments"]:
comment["source"] = section_type
section_results[section_idx]["comments"].extend(result["comments"])
except Exception as e:
logger.error(f"Error processing {key}: {str(e)}")
print() # Add newline after progress indicator
# Merge results from all sections
for section_idx, section_result in section_results.items():
if section_result and section_result["comments"]:
section = sections_to_process[section_idx][1]
section_result = ReviewResult(**section_result)
self.results.merge(section_result, section=section)
except KeyboardInterrupt:
print("\n\nCancellation requested! Terminating process...")
cancel_event.set()
os._exit(1)
finally:
# Restore original signal handler
signal.signal(signal.SIGINT, original_handler)