podcast/main.py (340 lines of code) (raw):

#!/usr/bin/env python # Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import io import logging import math import random import subprocess import tempfile from datetime import date from pathlib import Path from textwrap import dedent import params import requests from absl import app from crewai import Agent, Crew, Task from crewai_tools import tool from google.cloud import texttospeech from googleapiclient.discovery import build from langchain_google_genai import ( ChatGoogleGenerativeAI, HarmBlockThreshold, HarmCategory, ) from moviepy.editor import ImageClip from openai import OpenAI from pydub import AudioSegment from pyparsing import ( ParseException, Suppress, Word, ZeroOrMore, alphas, restOfLine, ) from pytube import YouTube def make_openai() -> OpenAI: # os.environ["OPENAI_API_KEY"] = params.OPEN_AI_KEY return OpenAI( api_key=params.OPEN_AI_KEY ) # openai_api_key=params.OPEN_AI_KEY # temperature=1.0, def make_gemini() -> ChatGoogleGenerativeAI: """Makes a Gemini model. Returns: Gemini model. """ safety_settings = { HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE, HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE, HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: ( HarmBlockThreshold.BLOCK_NONE ), HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: ( HarmBlockThreshold.BLOCK_NONE ), } return ChatGoogleGenerativeAI( model="gemini-pro", google_api_key=params.GOOGLE_API_KEY, temperature=1.0, ).bind(safety_settings=safety_settings) def make_segment(producer: Agent) -> Task: """Makes a podcast segment. Args: podcaster (Agent): podcaster Returns: Podcast task """ return Task( description=dedent( """\ Given the following headlines, produce a short (2 minute) segment as a dialog between the podcasters. The dialog should be spicy and information dense, with the occasional joke. {article}""" ), expected_output="Podcast segment", agent=producer, ) @tool("Get mp3 file") def get_mp3(title: str) -> str: """Searches for music and returns a path to an mp3 file. Args: title (str): Song to search for Returns: Path to the mp3 file""" yt = YouTube(search_youtube(title)[0]) filename = Path( tempfile.NamedTemporaryFile(suffix=".mp3", delete=False).name ) return ( yt.streams.filter(only_audio=True).first().download(filename=filename) ) def make_music(producer: Agent) -> Task: """Makes a podcast segment. Args: podcaster (Agent): podcaster Returns: Podcast task """ return Task( description=dedent( """\ We're going to record a podcast on the following headlines; can you recommend some upbeat intro music? {article}""" ), expected_output="Path to mp3 with intro music", tools=[get_mp3], agent=producer, ) def make_intro(producer: Agent) -> Task: """Makes a podcast task. Args: podcaster (Agent): podcaster Returns: Podcast task """ return Task( description=dedent( """\ Make a quick round of intros between your podcasters; structure it as a light and playful dialog where they mention: 1. Their names. 2. The name of the podcast (something related to {topic}). 3. That the podcast was recorded especially for {recipient}. 4. That the podcast was recorded on {date}. {recipient} should feel special, like this podcast is just for them.""" ), expected_output="Introduction", agent=producer, ) def make_dialog(dialog: list) -> str: return "\n".join([f"{who}: {what}" for who, what in dialog]) def db(volume: float) -> float: return 20 * math.log10(volume) def random_silence(min_ms: int = 500, max_ms: int = 1000) -> AudioSegment: return AudioSegment.silent(duration=random.randint(min_ms, max_ms)) def parse_line(line: str) -> tuple | None: asterisks = Suppress(ZeroOrMore("**")) name = Word(alphas + " ").setResultsName("name") colon = Suppress(":") space = Suppress(ZeroOrMore(" ")) quote = restOfLine.setResultsName("quote") grammar = asterisks + name + asterisks + colon + asterisks + space + quote try: parse = grammar.parseString(line) return parse.name, parse.quote except ParseException: return None def record_line(voice: str, quote: str) -> AudioSegment: client = texttospeech.TextToSpeechClient() synthesis_input = texttospeech.SynthesisInput(text=quote) voice = texttospeech.VoiceSelectionParams( language_code="en-US", name=voice ) audio_config = texttospeech.AudioConfig( audio_encoding=texttospeech.AudioEncoding.MP3 ) response = client.synthesize_speech( input=synthesis_input, voice=voice, audio_config=audio_config ) logging.info(f'Recording "{quote}" in {voice}') return AudioSegment.from_file( io.BytesIO(response.audio_content), format="mp3" ) def sec(seconds: int) -> int: return seconds * 1000 def mix_intro(intro, music): full_intro = music[: sec(5)] fade_to_low = music[sec(5) : sec(7)].fade( # noqa: E203 to_gain=db(0.1), duration=sec(2), start=0 ) low = music[sec(7) : sec(7) + len(intro)] + db(0.1) # noqa: E203 fade_to_full = music[ sec(7) + len(intro) : sec(9) + len(intro) # noqa: E203 ].fade( # noqa: E203 from_gain=db(0.1), duration=sec(2), start=0 ) full_outro = music[ sec(9) + len(intro) : sec(14) + len(intro) # noqa: E203 ] # noqa: E203 outro_fade = music[ sec(14) + len(intro) : sec(16) + len(intro) # noqa: E203 ].fade_out(sec(2)) return ( full_intro + fade_to_low + low.overlay(intro) + fade_to_full + full_outro + outro_fade ) def record_dialog(dialog): voices = { "elena": "en-US-Journey-F", "marcus": "en-US-Journey-D", } recording = AudioSegment.silent(duration=0) for line in dialog.splitlines(): if parse := parse_line(line): name, quote = parse name = name.lower() if name in voices: recording += record_line(voices[name], quote) recording += random_silence() return recording def denewline(line: str) -> str: return line.replace("\n", "") def search_google(engine, query): service = build("customsearch", "v1", developerKey=params.SEARCH_API_KEY) return ( service.cse() .list( q=query, cx=engine, num=10, ) .execute() ) def search_music(query): return [ item["link"] for item in search_google(params.SEARCH_MUSIC, query)["items"] ] def search_youtube(query): # Initialize the YouTube API client youtube = build("youtube", "v3", developerKey=params.SEARCH_API_KEY) # Perform a search query for music videos search_response = ( youtube.search() .list( q=query, part="snippet", type="video", videoCategoryId="10", # Category ID for Music maxResults=5, ) .execute() ) logging.info(search_response) return [ f'https://youtube.com/watch?v={item["id"]["videoId"]}' for item in search_response["items"] ] def search_news(query): from newsapi import NewsApiClient api = NewsApiClient(api_key=params.NEWS_API_KEY) results = api.get_everything(q=query) return "\n".join( [ f'- {denewline(article["title"])}: {denewline(article["description"])}' # noqa: E501 for article in results["articles"] if article["title"] and article["description"] and article["title"] != "[Removed]" ][:10] ) def make_elena(gemini) -> Agent: """Makes a podcaster. Args: gemini (ChatGoogleGenerativeAI): Gemini model Returns: Podcaster """ return Agent( role="Investigative Journalist and Tech Enthusiast", goal=( dedent( """\ To uncover the truth behind tech trends and innovations, presenting clear, well-researched information to the audience while challenging assumptions and pushing for transparency.""" ) ), backstory=( dedent( """\ Elena graduated with a degree in journalism from a top university and spent several years working for a major newspaper where she specialized in technology and innovation. She developed a reputation for her in-depth investigative pieces that not only reported the news but also explored the implications of technological advancements on society. Her passion for technology and commitment to truth led her to co-host the podcast, aiming to bridge the gap between tech experts and the general public.""" ) ), llm=gemini, verbose=True, ) def make_marcus(gemini) -> Agent: """Makes a podcaster. Args: gemini (ChatGoogleGenerativeAI): Gemini model Returns: Podcaster """ return Agent( role="Charismatic Tech Optimist and Startup Advisor", goal=( dedent( """\ To inspire and educate listeners about the potential of new technologies and startups, bringing a positive spin to tech developments and encouraging entrepreneurial thinking.""" ) ), backstory=( dedent( """\ Marcus started as a software developer and quickly moved into the startup world, where he co-founded a successful app that transformed online interactions. After his startup was acquired, he became a sought-after advisor for new tech ventures. His experiences have made him a fervent advocate for tech's potential to solve real-world problems. Co-hosting the podcast allows him to share his optimism and practical insights with a broader audience.""" ) ), llm=gemini, verbose=True, ) def agent_to_string(name: str, agent: Agent) -> str: return dedent( f"""\ Name: {name} Role: {agent.role} Goal: {agent.goal} Backstory: {agent.backstory}""" ) def make_producer(gemini): return Agent( role="Podcaster producer", goal="Produce a podcast by eliciting responses from your podcasters", backstory=( dedent( """\ You are an expect podcast producer; you know how to elicit dialog from your podcasters on a topic. Here are your podcasters: {elena} {marcus}""" ) ), llm=gemini, verbose=True, ) def make_image(topic: str) -> ImageClip: openai = make_openai() image = openai.images.generate( model="dall-e-3", prompt=f"Cover for podcast called '{topic}' starring Elena and Marcus", size="1792x1024", quality="standard", n=1, ) response = requests.get(image.data[0].url) with open( file := tempfile.NamedTemporaryFile(suffix=".png", delete=False).name, "wb", ) as f: f.write(response.content) return file def main(_) -> None: topic = "LLMs" gemini = make_gemini() producer = make_producer(gemini) marcus = agent_to_string("Marcus", make_marcus(gemini)) elena = agent_to_string("Elena", make_elena(gemini)) headlines = search_news(topic) intro = Crew( agents=[producer], tasks=[make_intro(producer)], verbose=2 ).kickoff( inputs={ "elena": elena, "marcus": marcus, "recipient": "Peter Danenberg", "date": date.today().strftime("%Y-%m-%d"), "topic": topic, } ) segment = Crew( agents=[producer], tasks=[make_segment(producer)], verbose=2 ).kickoff( inputs={ "article": headlines, "elena": elena, "marcus": marcus, } ) music = Crew( agents=[producer], tasks=[make_music(producer)], verbose=2 ).kickoff( inputs={ "article": headlines, "elena": elena, "marcus": marcus, } ) ( mix_intro(record_dialog(intro), AudioSegment.from_file(music)) + record_dialog(segment) ).export( podcast := tempfile.NamedTemporaryFile( suffix=".mp3", delete=False ).name ) cover = make_image(topic) ffmpeg_command = [ "ffmpeg", "-loop", "1", # Loop the image "-i", cover, # Input image file "-i", podcast, # Input audio file "-c:v", "libx264", # Video codec to use "-tune", "stillimage", # Tune for still image "-c:a", "aac", # Audio codec to use "-b:a", "192k", # Audio bitrate "-pix_fmt", "yuv420p", # Pixel format "-shortest", # Finish encoding when the shortest input stream ends "-vf", "fps=25", # Set frame rate "-t", "10", # Set the duration of the output file "podcast.mp4", # Output file ] subprocess.run(ffmpeg_command, check=True) if __name__ == "__main__": app.run(main)