supporting-blog-content/github-assistant/index.py (165 lines of code) (raw):
from llama_index.core import (
Document,
Settings,
SimpleDirectoryReader,
StorageContext,
VectorStoreIndex,
)
from llama_index.core.node_parser import (
SentenceSplitter,
CodeSplitter,
MarkdownNodeParser,
JSONNodeParser,
)
from llama_index.vector_stores.elasticsearch import ElasticsearchStore
from dotenv import load_dotenv
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.ingestion import IngestionPipeline
import tree_sitter_python as tspython
from tree_sitter_languages import get_parser, get_language
from tree_sitter import Parser, Language
import logging
import nest_asyncio
import elastic_transport
import sys
import subprocess
import shutil
import time
import glob
import os
nest_asyncio.apply()
load_dotenv(".env")
Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-large")
Settings.chunk_lines = 1024
Settings.chunk_size = 1024
Settings.chunk_lines_overlap = 20
Settings.max_chars = 1500
def clone_repository(owner, repo, branch, base_path="/tmp"):
branch = branch or os.getenv("GITHUB_BRANCH")
if not branch:
raise ValueError(
"Branch is not provided and GITHUB_BRANCH environment variable is not set."
)
local_repo_path = os.path.join(base_path, owner, repo)
clone_url = f"https://github.com/{owner}/{repo}.git"
if os.path.exists(local_repo_path):
print(f"Repository already exists at {local_repo_path}. Skipping clone.")
return local_repo_path
attempts = 3
for attempt in range(attempts):
try:
os.makedirs(local_repo_path, exist_ok=True)
print(f"Attempting to clone repository... Attempt {attempt + 1}")
subprocess.run(
["git", "clone", "-b", branch, clone_url, local_repo_path], check=True
)
print(f"Repository cloned into {local_repo_path}.")
return local_repo_path
except subprocess.CalledProcessError:
print(f"Attempt {attempt + 1} failed, retrying...")
time.sleep(10)
if attempt < attempts - 1:
continue
else:
raise Exception("Failed to clone repository after multiple attempts")
def print_docs_and_nodes(docs, nodes):
print("\n=== Documents ===\n")
for doc in docs:
print(f"Document ID: {doc.doc_id}")
print(f"Document Content:\n{doc.text}\n\n---\n")
print("\n=== Nodes ===\n")
for node in nodes:
print(f"Node ID: {node.id_}")
print(f"Node Content:\n{node.text}\n\n---\n")
def collect_and_print_file_summary(file_summary):
print("\n=== File Summary ===\n")
for summary in file_summary:
print(summary)
def parse_documents():
owner = os.getenv("GITHUB_OWNER")
repo = os.getenv("GITHUB_REPO")
branch = os.getenv("GITHUB_BRANCH")
base_path = os.getenv("BASE_PATH", "/tmp")
if not owner or not repo:
raise ValueError(
"GITHUB_OWNER and GITHUB_REPO environment variables must be set."
)
local_repo_path = clone_repository(owner, repo, branch, base_path)
nodes = []
file_summary = []
ts_parser = get_parser("typescript")
py_parser = get_parser("python")
go_parser = get_parser("go")
js_parser = get_parser("javascript")
bash_parser = get_parser("bash")
yaml_parser = get_parser("yaml")
parsers_and_extensions = [
(SentenceSplitter(), [".md"]),
(CodeSplitter(language="python", parser=py_parser), [".py", ".ipynb"]),
(CodeSplitter(language="typescript", parser=ts_parser), [".ts"]),
(CodeSplitter(language="go", parser=go_parser), [".go"]),
(CodeSplitter(language="javascript", parser=js_parser), [".js"]),
(CodeSplitter(language="bash", parser=bash_parser), [".bash", ",sh"]),
(CodeSplitter(language="yaml", parser=yaml_parser), [".yaml", ".yml"]),
(JSONNodeParser(), [".json"]),
]
for parser, extensions in parsers_and_extensions:
matching_files = []
for ext in extensions:
matching_files.extend(
glob.glob(f"{local_repo_path}/**/*{ext}", recursive=True)
)
if len(matching_files) > 0:
extension_list = ", ".join(extensions)
file_summary.append(
f"Found {len(matching_files)} {extension_list} files in the repository."
)
loader = SimpleDirectoryReader(
input_dir=local_repo_path, required_exts=extensions, recursive=True
)
docs = loader.load_data()
parsed_nodes = parser.get_nodes_from_documents(docs)
print_docs_and_nodes(docs, parsed_nodes)
nodes.extend(parsed_nodes)
else:
extension_list = ", ".join(extensions)
file_summary.append(f"No {extension_list} files found in the repository.")
collect_and_print_file_summary(file_summary)
print("\n")
return nodes
def get_es_vector_store():
print("Initializing Elasticsearch store...")
es_cloud_id = os.getenv("ELASTIC_CLOUD_ID")
es_user = os.getenv("ELASTIC_USER")
es_password = os.getenv("ELASTIC_PASSWORD")
index_name = os.getenv("ELASTIC_INDEX")
retries = 20
for attempt in range(retries):
try:
es_vector_store = ElasticsearchStore(
index_name=index_name,
es_cloud_id=es_cloud_id,
es_user=es_user,
es_password=es_password,
batch_size=100,
)
print("Elasticsearch store initialized.")
return es_vector_store
except elastic_transport.ConnectionTimeout:
print(f"Connection attempt {attempt + 1}/{retries} timed out. Retrying...")
time.sleep(10)
raise Exception("Failed to initialize Elasticsearch store after multiple attempts")
def main():
nodes = parse_documents()
es_vector_store = get_es_vector_store()
try:
pipeline = IngestionPipeline(
vector_store=es_vector_store,
)
pipeline.run(documents=nodes, show_progress=True)
finally:
if hasattr(es_vector_store, "close"):
es_vector_store.close()
print("Elasticsearch connection closed.")
if __name__ == "__main__":
main()