packages/blueprints/gen-ai-chatbot/static-assets/chatbot-genai-components/backend/python/embedding/loaders/youtube.py (95 lines of code) (raw):

from typing import Any, Sequence, Union from urllib.parse import parse_qs, urlparse from embedding.loaders.base import BaseLoader, Document from youtube_transcript_api import ( NoTranscriptFound, TranscriptsDisabled, YouTubeTranscriptApi, ) ALLOWED_SCHEMAS = {"http", "https"} ALLOWED_NETLOCK = { "youtu.be", "m.youtube.com", "youtube.com", "www.youtube.com", "www.youtube-nocookie.com", "vid.plus", } def _parse_video_id(url: str) -> str | None: """Parse a youtube url and return the video id if valid, otherwise None.""" parsed_url = urlparse(url) if parsed_url.scheme not in ALLOWED_SCHEMAS: return None if parsed_url.netloc not in ALLOWED_NETLOCK: return None path = parsed_url.path if path.endswith("/watch"): query = parsed_url.query parsed_query = parse_qs(query) if "v" in parsed_query: ids = parsed_query["v"] video_id = ids if isinstance(ids, str) else ids[0] else: return None else: path = parsed_url.path.lstrip("/") video_id = path.split("/")[-1] if len(video_id) != 11: # Video IDs are 11 characters long return None return video_id def _detect_lang(video_id: str) -> str: try: available_transcripts = YouTubeTranscriptApi.list_transcripts(video_id) languages = [transcript.language_code for transcript in available_transcripts] except Exception as e: raise Exception(f"Failed to detect language: {e}") # Only the first language is used. return languages[0] class YoutubeLoader(BaseLoader): """Load `YouTube` transcripts.""" def __init__( self, video_id: str, language: Union[str, Sequence[str]] = "en", translation: str | None = None, continue_on_failure: bool = False, ): """Initialize with YouTube video ID.""" self.video_id = video_id self.language = language if isinstance(language, str): self.language = [language] else: self.language = language self.translation = translation self.continue_on_failure = continue_on_failure @staticmethod def extract_video_id(youtube_url: str) -> str: """Extract video id from common YT urls.""" video_id = _parse_video_id(youtube_url) if not video_id: raise ValueError( f"Could not determine the video ID for the URL {youtube_url}" ) return video_id def load(self) -> list[Document]: """Load documents.""" metadata = {"source": self.video_id} try: transcript_list = YouTubeTranscriptApi.list_transcripts(self.video_id) except TranscriptsDisabled: return [] try: transcript = transcript_list.find_transcript(self.language) except NoTranscriptFound: transcript = transcript_list.find_transcript(["en"]) if self.translation is not None: transcript = transcript.translate(self.translation) transcript_pieces = transcript.fetch() transcript = " ".join([t["text"].strip(" ") for t in transcript_pieces]) return [Document(page_content=transcript, metadata=metadata)] class YoutubeLoaderWithLangDetection(YoutubeLoader): """Loads a YouTube video transcript and detects the language automatically.""" def __init__(self, urls: list[str]): self._urls = urls def load(self) -> list[Document]: documents = [] for url in self._urls: video_id = YoutubeLoader.extract_video_id(url) language = _detect_lang(video_id) loader = YoutubeLoader(video_id, language=language) documents.extend(loader.load()) return documents