bugbounty_gpt/handlers/bugcrowd_api.py (78 lines of code) (raw):
import json
import httpx
import logging
import time
from bugbounty_gpt.env import API_BASE_URL, BUGCROWD_API_KEY
logger = logging.getLogger(__name__)
class BugCrowdAPI:
@staticmethod
def _get_headers(content_type='application/vnd.bugcrowd+json'):
"""
Returns common headers for Bugcrowd API requests.
:param content_type: Content type for the Accept header. Default is 'application/vnd.bugcrowd+json'.
:return: Dictionary containing the required headers.
"""
return {
'Accept': content_type,
'Authorization': f'Token {BUGCROWD_API_KEY}'
}
@staticmethod
async def _fetch_page(url, params, page_limit, page_offset):
"""
Fetches a page of data from the specified URL with pagination.
:param url: URL to fetch data from.
:param params: Parameters to include in the request.
:param page_limit: Limit of items per page.
:param page_offset: Offset for pagination.
:return: List of data fetched from the page or an empty list if there is an error.
"""
pagination_params = {
'page[limit]': page_limit,
'page[offset]': page_offset,
}
complete_params = {**params, **pagination_params}
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=BugCrowdAPI._get_headers(), params=complete_params)
try:
data = response.json()
except json.JSONDecodeError as e:
logger.error(f"Error: Unable to decode JSON. {e}")
return []
return data['data'] if data['data'] else []
@staticmethod
async def fetch_submissions(params):
"""
Fetches all submissions from BugCrowd.
:param params: Parameters to include in the request.
:return: List of all submissions or None if no submissions found.
"""
logger.info("Fetching submissions from BugCrowd.")
url = f'{API_BASE_URL}/submissions'
page_limit = 100
page_offset = 0
all_submissions = []
delay = 2 # Delay in seconds
while True:
submissions = await BugCrowdAPI._fetch_page(url, params, page_limit, page_offset)
if not submissions:
break
all_submissions.extend(submissions)
page_offset += page_limit
time.sleep(delay) # Add a delay between API calls
return all_submissions if all_submissions else None
@staticmethod
async def fetch_submission(submission_id):
"""
Fetches a specific submission from BugCrowd.
:param submission_id: ID of the submission to fetch.
:return: Submission data as a dictionary or None if an error occurred.
"""
logger.info(f"Fetching submission {submission_id} from BugCrowd.")
url = f'{API_BASE_URL}/submissions/{submission_id}'
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=BugCrowdAPI._get_headers())
if response.status_code == 200:
return response.json()
else:
logger.error(f"Failed to fetch submission {submission_id}. Status code: {response.status_code}")
return None
@staticmethod
async def create_comment(comment_data):
"""
Creates a comment using the provided data.
:param comment_data: Data for the comment.
:return: Response object from the comment creation operation.
"""
url = f'{API_BASE_URL}/comments'
headers = BugCrowdAPI._get_headers('application/json')
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, json=comment_data)
if response.status_code == 201:
logger.info("Comment created successfully.")
else:
logger.error(f"Failed to create comment. Status code: {response.status_code}")
return response
@staticmethod
async def patch_submission(submission_id, data):
"""
Patches a specific submission on BugCrowd.
:param submission_id: ID of the submission to patch.
:param data: Data to be patched.
:return: Response object from the patch operation or None if an error occurred.
"""
logger.info(f"Patching submission {submission_id} on BugCrowd.")
url = f'{API_BASE_URL}/submissions/{submission_id}'
headers = BugCrowdAPI._get_headers()
headers['Content-Type'] = 'application/vnd.bugcrowd.v4+json'
async with httpx.AsyncClient() as client:
response = await client.patch(url, headers=headers, data=json.dumps(data))
if response.status_code != 200:
logger.error(f"Failed to patch submission {submission_id}. Status code: {response.status_code}")
return None
return response