in src/psearch/gen_ai/services/marketing_service.py [0:0]
def _get_image_bytes_from_url(self, image_url: str) -> Optional[bytes]:
"""Fetch image bytes from URL"""
try:
# Handle different URL formats (especially for Google Cloud Storage)
parsed_url = urlparse(image_url)
# If it's a GCS URL (gs://), convert it to HTTPS
if parsed_url.scheme == "gs":
bucket = parsed_url.netloc
object_path = parsed_url.path.lstrip("/")
url = f"https://storage.googleapis.com/{bucket}/{object_path}"
else:
url = image_url
response = requests.get(url, timeout=10)
if response.status_code == 200:
return response.content
else:
logger.warning(
f"Failed to fetch image from {url}: {response.status_code}"
)
return None
except Exception as e:
logger.warning(f"Error fetching image from {image_url}: {str(e)}")
return None