in components/llm_service/src/services/query/web_datasource_job.py [0:0]
def download_documents(self, doc_url: str, temp_dir: str) -> \
List[DataSourceFile]:
"""Start webscraper job to download files from doc_url
Args:
doc_url: URL to scrape
temp_dir: Path to temporary directory
Returns:
List of DataSourceFile objects representing scraped pages
"""
Logger.info(f"Starting webscraper job for URL: {doc_url}")
# create batch job model first
job_model = BatchJobModel()
job_model.id = str(uuid.uuid4())
job_model.type = JOB_TYPE_WEBSCRAPER
job_model.status = "pending"
job_model.uuid = job_model.id
job_model.name = f"webscraper-{job_model.id[:8]}"
job_model.save()
# create job input data with URL, job ID and engine name
# depth limit for scraper is GENIE depth+1
job_input = {
"url": doc_url,
"query_engine_name": self.query_engine_name,
"depth_limit": str(self.depth_limit + 1),
}
# Get container image from Artifact Registry
container_image = get_latest_artifact_registry_image(
repository="default",
package_name="webscraper",
project_id=PROJECT_ID
)
job_specs = {
"type": JOB_TYPE_WEBSCRAPER,
"input_data": json.dumps(job_input),
"container_image": container_image
}
# set environment variables.
env_vars = {
"GCP_PROJECT": PROJECT_ID,
"JOB_ID": job_model.id
}
# create and start the job with existing job model
kube_create_job(
job_specs=job_specs,
namespace=self.namespace,
env_vars=env_vars,
existing_job_model=job_model
)
Logger.info(f"Started webscraper job {job_model.id}")
# wait for job completion and get results
@timeout(6000)
def wait_for_job(job_model):
job_model = BatchJobModel.find_by_uuid(job_model.id)
while job_model.status not in ["succeeded", "failed"]:
time.sleep(1)
job_model = BatchJobModel.find_by_uuid(job_model.id)
Logger.info(f"Webscraper job {job_model.id} status {job_model.status}")
try:
wait_for_job(job_model)
except Exception as e:
raise InternalServerError("Timed out waiting for webscraper") from e
job_model = BatchJobModel.find_by_uuid(job_model.id)
if job_model.status != "succeeded":
if job_model.status == "active":
raise InternalServerError("Webscraper job failed to complete")
else:
raise InternalServerError(f"Webscraper job failed: {job_model.errors}")
Logger.info(f"Webscraper job [{job_model.id}] completed")
# download documents from GCS and build DataSourceFile list
doc_files = []
if job_model.result_data and "scraped_documents" in job_model.result_data:
scraped_docs = job_model.result_data["scraped_documents"]
Logger.info(f"downloading [{len(scraped_docs)}] documents")
for doc in scraped_docs:
# Parse GCS path to get bucket and blob path
gcs_path = doc["GCSPath"]
if gcs_path.startswith("gs://"):
bucket_name = gcs_path.split("/")[2]
blob_path = "/".join(gcs_path.split("/")[3:])
else:
raise InternalServerError(f"Invalid GCS path format: {gcs_path}")
# download file from GCS
blob = self.storage_client.get_bucket(bucket_name).blob(blob_path)
local_path = os.path.join(temp_dir, doc["Filename"])
blob.download_to_filename(local_path)
doc_file = DataSourceFile(
doc_name=doc["Filename"],
src_url=doc["URL"],
gcs_path=doc["GCSPath"],
mime_type=doc["ContentType"],
local_path=local_path
)
doc_files.append(doc_file)
Logger.info(f"Webscraper job completed with {len(doc_files)} files")
return doc_files