components/webscraper/scrape_url.py (92 lines of code) (raw):
#!/usr/bin/env python3
import argparse
import json
import os
import subprocess
import sys
import uuid
try:
from common.models.batch_job import BatchJobModel, JobStatus
except ImportError:
print("Error: Unable to import common package.")
print("Usage: PYTHONPATH=../common/src python scrape_url.py <your url>")
print("\nMake sure you've set your PYTHONPATH to include the common package location.")
print("\nAlso make sure to run in a virtual environment with fireo and other packages from common/requirements.txt installed.")
sys.exit(1)
def get_gcp_project():
"""Get GCP project from gcloud config"""
result = subprocess.run(
['gcloud', 'config', 'get', 'project'],
capture_output=True,
text=True
)
return result.stdout.strip()
def create_batch_job(url: str, depth_limit: str, engine_name: str) -> BatchJobModel:
"""Create a new batch job for the webscraper"""
job_input = {
"url": url,
"depth_limit": str(depth_limit),
"query_engine_name": engine_name
}
job = BatchJobModel()
job.id = str(uuid.uuid4())
job.uuid = job.id
job.name = f"webscraper-{job.id[:8]}"
job.type = "webscraper"
job.status = JobStatus.JOB_STATUS_PENDING.value
job.input_data = json.dumps(job_input)
job.save()
return job
def run_webscraper(job_id: str, project_id: str):
"""Run the webscraper with the given job ID"""
env = os.environ.copy()
env["GCP_PROJECT"] = project_id
env["JOB_ID"] = job_id
result = subprocess.run(
['go', 'run', 'main.go'],
env=env,
capture_output=True,
text=True
)
if result.returncode != 0:
print(f"Error running webscraper: {result.stderr}")
return False
return True
def main():
parser = argparse.ArgumentParser(
description='Scrape a URL using the webscraper',
epilog='Example usage: PYTHONPATH=../common/src python scrape_url.py <your url>'
)
parser.add_argument('url', help='URL to scrape')
parser.add_argument('--depth', type=str, default="1", help='Depth limit for scraping')
parser.add_argument('--engine', type=str, default="default", help='Query engine name')
args = parser.parse_args()
# Get GCP project
project_id = get_gcp_project()
if not project_id:
print("Error: Could not determine GCP project")
return
# Create batch job
job = create_batch_job(args.url, args.depth, args.engine)
print(f"Created job with ID: {job.id}")
# Run webscraper
success = run_webscraper(job.id, project_id)
if not success:
print("Webscraper failed")
return
# Fetch updated job to get results
job = BatchJobModel.find_by_uuid(job.id)
if job.status != JobStatus.JOB_STATUS_SUCCEEDED.value:
print(f"Job failed with status: {job.status}")
if job.errors:
print(f"Errors: {job.errors}")
return
# Extract and display results
if job.result_data and 'scraped_documents' in job.result_data:
docs = job.result_data['scraped_documents']
if docs is None:
print(f"\nScraped 0 pages")
else:
print(f"\nScraped {len(docs)} pages:")
gcs_path = docs[0]['GCSPath']
bucket_name = gcs_path.split('/')[2]
print(f"\nGCS Bucket: gs://{bucket_name}")
# Print all URLs and their corresponding GCS paths
print("\nScraped URLs:")
for doc in docs:
print(f"- {doc['URL']}")
print("\nDownloaded files:")
for doc in docs:
print(f"- {doc['GCSPath']}")
if __name__ == "__main__":
main()