podcast/main.py (340 lines of code) (raw):
#!/usr/bin/env python
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import logging
import math
import random
import subprocess
import tempfile
from datetime import date
from pathlib import Path
from textwrap import dedent
import params
import requests
from absl import app
from crewai import Agent, Crew, Task
from crewai_tools import tool
from google.cloud import texttospeech
from googleapiclient.discovery import build
from langchain_google_genai import (
ChatGoogleGenerativeAI,
HarmBlockThreshold,
HarmCategory,
)
from moviepy.editor import ImageClip
from openai import OpenAI
from pydub import AudioSegment
from pyparsing import (
ParseException,
Suppress,
Word,
ZeroOrMore,
alphas,
restOfLine,
)
from pytube import YouTube
def make_openai() -> OpenAI:
# os.environ["OPENAI_API_KEY"] = params.OPEN_AI_KEY
return OpenAI(
api_key=params.OPEN_AI_KEY
) # openai_api_key=params.OPEN_AI_KEY # temperature=1.0,
def make_gemini() -> ChatGoogleGenerativeAI:
"""Makes a Gemini model.
Returns:
Gemini model.
"""
safety_settings = {
HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE,
HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: (
HarmBlockThreshold.BLOCK_NONE
),
HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: (
HarmBlockThreshold.BLOCK_NONE
),
}
return ChatGoogleGenerativeAI(
model="gemini-pro",
google_api_key=params.GOOGLE_API_KEY,
temperature=1.0,
).bind(safety_settings=safety_settings)
def make_segment(producer: Agent) -> Task:
"""Makes a podcast segment.
Args:
podcaster (Agent): podcaster
Returns:
Podcast task
"""
return Task(
description=dedent(
"""\
Given the following headlines, produce a short (2 minute)
segment as a dialog between the podcasters. The dialog
should be spicy and information dense, with the occasional
joke.
{article}"""
),
expected_output="Podcast segment",
agent=producer,
)
@tool("Get mp3 file")
def get_mp3(title: str) -> str:
"""Searches for music and returns a path to an mp3 file.
Args:
title (str): Song to search for
Returns:
Path to the mp3 file"""
yt = YouTube(search_youtube(title)[0])
filename = Path(
tempfile.NamedTemporaryFile(suffix=".mp3", delete=False).name
)
return (
yt.streams.filter(only_audio=True).first().download(filename=filename)
)
def make_music(producer: Agent) -> Task:
"""Makes a podcast segment.
Args:
podcaster (Agent): podcaster
Returns:
Podcast task
"""
return Task(
description=dedent(
"""\
We're going to record a podcast on the following
headlines; can you recommend some upbeat intro music?
{article}"""
),
expected_output="Path to mp3 with intro music",
tools=[get_mp3],
agent=producer,
)
def make_intro(producer: Agent) -> Task:
"""Makes a podcast task.
Args:
podcaster (Agent): podcaster
Returns:
Podcast task
"""
return Task(
description=dedent(
"""\
Make a quick round of intros between your podcasters;
structure it as a light and playful dialog where they
mention:
1. Their names.
2. The name of the podcast (something related to {topic}).
3. That the podcast was recorded especially for {recipient}.
4. That the podcast was recorded on {date}.
{recipient} should feel special, like this podcast is just for
them."""
),
expected_output="Introduction",
agent=producer,
)
def make_dialog(dialog: list) -> str:
return "\n".join([f"{who}: {what}" for who, what in dialog])
def db(volume: float) -> float:
return 20 * math.log10(volume)
def random_silence(min_ms: int = 500, max_ms: int = 1000) -> AudioSegment:
return AudioSegment.silent(duration=random.randint(min_ms, max_ms))
def parse_line(line: str) -> tuple | None:
asterisks = Suppress(ZeroOrMore("**"))
name = Word(alphas + " ").setResultsName("name")
colon = Suppress(":")
space = Suppress(ZeroOrMore(" "))
quote = restOfLine.setResultsName("quote")
grammar = asterisks + name + asterisks + colon + asterisks + space + quote
try:
parse = grammar.parseString(line)
return parse.name, parse.quote
except ParseException:
return None
def record_line(voice: str, quote: str) -> AudioSegment:
client = texttospeech.TextToSpeechClient()
synthesis_input = texttospeech.SynthesisInput(text=quote)
voice = texttospeech.VoiceSelectionParams(
language_code="en-US", name=voice
)
audio_config = texttospeech.AudioConfig(
audio_encoding=texttospeech.AudioEncoding.MP3
)
response = client.synthesize_speech(
input=synthesis_input, voice=voice, audio_config=audio_config
)
logging.info(f'Recording "{quote}" in {voice}')
return AudioSegment.from_file(
io.BytesIO(response.audio_content), format="mp3"
)
def sec(seconds: int) -> int:
return seconds * 1000
def mix_intro(intro, music):
full_intro = music[: sec(5)]
fade_to_low = music[sec(5) : sec(7)].fade( # noqa: E203
to_gain=db(0.1), duration=sec(2), start=0
)
low = music[sec(7) : sec(7) + len(intro)] + db(0.1) # noqa: E203
fade_to_full = music[
sec(7) + len(intro) : sec(9) + len(intro) # noqa: E203
].fade( # noqa: E203
from_gain=db(0.1), duration=sec(2), start=0
)
full_outro = music[
sec(9) + len(intro) : sec(14) + len(intro) # noqa: E203
] # noqa: E203
outro_fade = music[
sec(14) + len(intro) : sec(16) + len(intro) # noqa: E203
].fade_out(sec(2))
return (
full_intro
+ fade_to_low
+ low.overlay(intro)
+ fade_to_full
+ full_outro
+ outro_fade
)
def record_dialog(dialog):
voices = {
"elena": "en-US-Journey-F",
"marcus": "en-US-Journey-D",
}
recording = AudioSegment.silent(duration=0)
for line in dialog.splitlines():
if parse := parse_line(line):
name, quote = parse
name = name.lower()
if name in voices:
recording += record_line(voices[name], quote)
recording += random_silence()
return recording
def denewline(line: str) -> str:
return line.replace("\n", "")
def search_google(engine, query):
service = build("customsearch", "v1", developerKey=params.SEARCH_API_KEY)
return (
service.cse()
.list(
q=query,
cx=engine,
num=10,
)
.execute()
)
def search_music(query):
return [
item["link"]
for item in search_google(params.SEARCH_MUSIC, query)["items"]
]
def search_youtube(query):
# Initialize the YouTube API client
youtube = build("youtube", "v3", developerKey=params.SEARCH_API_KEY)
# Perform a search query for music videos
search_response = (
youtube.search()
.list(
q=query,
part="snippet",
type="video",
videoCategoryId="10", # Category ID for Music
maxResults=5,
)
.execute()
)
logging.info(search_response)
return [
f'https://youtube.com/watch?v={item["id"]["videoId"]}'
for item in search_response["items"]
]
def search_news(query):
from newsapi import NewsApiClient
api = NewsApiClient(api_key=params.NEWS_API_KEY)
results = api.get_everything(q=query)
return "\n".join(
[
f'- {denewline(article["title"])}: {denewline(article["description"])}' # noqa: E501
for article in results["articles"]
if article["title"]
and article["description"]
and article["title"] != "[Removed]"
][:10]
)
def make_elena(gemini) -> Agent:
"""Makes a podcaster.
Args:
gemini (ChatGoogleGenerativeAI): Gemini model
Returns:
Podcaster
"""
return Agent(
role="Investigative Journalist and Tech Enthusiast",
goal=(
dedent(
"""\
To uncover the truth behind tech trends and
innovations, presenting clear, well-researched
information to the audience while challenging
assumptions and pushing for transparency."""
)
),
backstory=(
dedent(
"""\
Elena graduated with a degree in journalism from a top
university and spent several years working for a major
newspaper where she specialized in technology and
innovation. She developed a reputation for her
in-depth investigative pieces that not only reported
the news but also explored the implications of
technological advancements on society. Her passion for
technology and commitment to truth led her to co-host
the podcast, aiming to bridge the gap between tech
experts and the general public."""
)
),
llm=gemini,
verbose=True,
)
def make_marcus(gemini) -> Agent:
"""Makes a podcaster.
Args:
gemini (ChatGoogleGenerativeAI): Gemini model
Returns:
Podcaster
"""
return Agent(
role="Charismatic Tech Optimist and Startup Advisor",
goal=(
dedent(
"""\
To inspire and educate listeners about the potential
of new technologies and startups, bringing a positive
spin to tech developments and encouraging
entrepreneurial thinking."""
)
),
backstory=(
dedent(
"""\
Marcus started as a software developer and quickly
moved into the startup world, where he co-founded a
successful app that transformed online
interactions. After his startup was acquired, he
became a sought-after advisor for new tech
ventures. His experiences have made him a fervent
advocate for tech's potential to solve real-world
problems. Co-hosting the podcast allows him to share
his optimism and practical insights with a broader
audience."""
)
),
llm=gemini,
verbose=True,
)
def agent_to_string(name: str, agent: Agent) -> str:
return dedent(
f"""\
Name: {name}
Role: {agent.role}
Goal: {agent.goal}
Backstory: {agent.backstory}"""
)
def make_producer(gemini):
return Agent(
role="Podcaster producer",
goal="Produce a podcast by eliciting responses from your podcasters",
backstory=(
dedent(
"""\
You are an expect podcast producer; you know how to
elicit dialog from your podcasters on a topic.
Here are your podcasters:
{elena}
{marcus}"""
)
),
llm=gemini,
verbose=True,
)
def make_image(topic: str) -> ImageClip:
openai = make_openai()
image = openai.images.generate(
model="dall-e-3",
prompt=f"Cover for podcast called '{topic}' starring Elena and Marcus",
size="1792x1024",
quality="standard",
n=1,
)
response = requests.get(image.data[0].url)
with open(
file := tempfile.NamedTemporaryFile(suffix=".png", delete=False).name,
"wb",
) as f:
f.write(response.content)
return file
def main(_) -> None:
topic = "LLMs"
gemini = make_gemini()
producer = make_producer(gemini)
marcus = agent_to_string("Marcus", make_marcus(gemini))
elena = agent_to_string("Elena", make_elena(gemini))
headlines = search_news(topic)
intro = Crew(
agents=[producer], tasks=[make_intro(producer)], verbose=2
).kickoff(
inputs={
"elena": elena,
"marcus": marcus,
"recipient": "Peter Danenberg",
"date": date.today().strftime("%Y-%m-%d"),
"topic": topic,
}
)
segment = Crew(
agents=[producer], tasks=[make_segment(producer)], verbose=2
).kickoff(
inputs={
"article": headlines,
"elena": elena,
"marcus": marcus,
}
)
music = Crew(
agents=[producer], tasks=[make_music(producer)], verbose=2
).kickoff(
inputs={
"article": headlines,
"elena": elena,
"marcus": marcus,
}
)
(
mix_intro(record_dialog(intro), AudioSegment.from_file(music))
+ record_dialog(segment)
).export(
podcast := tempfile.NamedTemporaryFile(
suffix=".mp3", delete=False
).name
)
cover = make_image(topic)
ffmpeg_command = [
"ffmpeg",
"-loop",
"1", # Loop the image
"-i",
cover, # Input image file
"-i",
podcast, # Input audio file
"-c:v",
"libx264", # Video codec to use
"-tune",
"stillimage", # Tune for still image
"-c:a",
"aac", # Audio codec to use
"-b:a",
"192k", # Audio bitrate
"-pix_fmt",
"yuv420p", # Pixel format
"-shortest", # Finish encoding when the shortest input stream ends
"-vf",
"fps=25", # Set frame rate
"-t",
"10", # Set the duration of the output file
"podcast.mp4", # Output file
]
subprocess.run(ffmpeg_command, check=True)
if __name__ == "__main__":
app.run(main)