bugbounty_gpt/handlers/submission_handler.py (91 lines of code) (raw):

from bugbounty_gpt.handlers.bugcrowd_api import BugCrowdAPI from bugbounty_gpt.env import RESPONSES import logging import json logger = logging.getLogger(__name__) class BugCrowdSubmission: def __init__(self, submission_id, classification, reasoning): """ Initializes a BugCrowdSubmission object. :param submission_id: ID of the submission. :param classification: Classification information for the submission. :param reasoning: Reasoning information for the submission. """ self.submission_id = submission_id self.classification = classification self.reasoning = reasoning def _prepare_assign_data(self, user_id): """ Prepares data to assign a user to the submission. :param user_id: ID of the user to be assigned. :return: Dictionary containing the required data. """ return { 'data': { 'type': 'submission', 'relationships': { 'assignee': { 'data': { 'id': user_id, 'type': 'identity' } } } } } def _handle_assign_response(self, response, user_id): """ Handles the response after assigning a user to the submission. :param response: Response object from the assignment operation. :param user_id: ID of the user assigned. """ if response.status_code == 200: logger.info(f"Submission {self.submission_id} assigned to user {user_id}.") else: logger.error(f"Unable to assign submission {self.submission_id} to user {user_id}. Status code: {response.status_code}") async def assign_to_user(self, user_id): """ Assigns a user to the submission. :param user_id: ID of the user to be assigned. """ data = self._prepare_assign_data(user_id) response = await BugCrowdAPI.patch_submission(self.submission_id, data) self._handle_assign_response(response, user_id) async def is_submission_new(self): """ Checks if the submission is new. :return: True if the submission is new, False otherwise. """ submission_data = await BugCrowdAPI.fetch_submission(self.submission_id) submission_state = submission_data['data']['attributes']['state'] return submission_state.lower() == 'new' async def close_submission(self): """ Closes the submission on BugCrowd. """ logger.info(f"Closing submission {self.submission_id} on BugCrowd.") data = { 'data': { 'type': 'submission', 'attributes': { 'state': 'not_applicable' } } } response = await BugCrowdAPI.patch_submission(self.submission_id, data) if response.status_code != 200: raise Exception(f"Failed to close submission {self.submission_id}. Status code: {response.status_code}, Content: {response.content}") def _prepare_comment_data(self, comment_body, visibility_scope='everyone'): """ Prepares data to create a comment. :param comment_body: Text of the comment. :param visibility_scope: Visibility scope of the comment. Default is 'everyone'. :return: Dictionary containing the required data. """ return { "data": { "type": "comment", "attributes": { "body": comment_body, "visibility_scope": visibility_scope }, "relationships": { "submission": { "data": { "id": self.submission_id, "type": "submission" } } } } } def _handle_comment_response_error(self, response): """ Handles the error response for a comment creation request. :param response: Response object from the comment creation operation. """ try: error_message = response.json()["errors"][0]["detail"] except (json.JSONDecodeError, KeyError, IndexError): error_message = "An error occurred, but the response is not a valid JSON object." logger.error("Error: " + error_message) async def create_comment(self, comment_body, visibility_scope='everyone'): """ Creates a comment for the submission on BugCrowd. :param comment_body: Text of the comment. :param visibility_scope: Visibility scope of the comment. Default is 'everyone'. """ logger.info(f"Creating comment for submission {self.submission_id} on BugCrowd.") comment_data = self._prepare_comment_data(comment_body, visibility_scope) response = await BugCrowdAPI.create_comment(comment_data) if response.status_code in [400, 404, 409]: self._handle_comment_response_error(response) elif response.status_code != 201: logger.error("An unexpected error occurred.") def generate_comment_text(self): """ Generates the text for a comment based on the classification. :return: Generated comment text or None if the classification is not found. """ try: specific_classification_name = self.classification.name specific_classification_text = RESPONSES[specific_classification_name] comment_text = f"Hello!\n\n{specific_classification_text}" return comment_text except KeyError: logger.error(f"Response for classification {self.classification.name} not found.") return None