devai-api/app/file_processor.py (83 lines of code) (raw):
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import subprocess
def is_ascii_text(file_path):
"""
Check if the file contains ASCII text.
:param file_path: Path to the file
:return: Boolean indicating whether the file contains ASCII text
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
f.read()
return True
except UnicodeDecodeError:
return False
def get_text_files_contents(path, ignore=None):
"""
Returns a dictionary with file paths (including file name) as keys
and the respective file contents as values.
:param path: Directory path
:param ignore: List of file or folder names to be ignored
:return: Dictionary with file paths as keys and file contents as values
"""
if ignore is None:
ignore = set(['venv', '__pycache__', '.gitignore'])
result = {}
for dirpath, dirnames, filenames in os.walk(path):
# Remove ignored directories from dirnames so os.walk will skip them
dirnames[:] = [dirname for dirname in dirnames if dirname not in ignore]
for filename in filenames:
if filename not in ignore:
full_path = os.path.join(dirpath, filename)
if is_ascii_text(full_path):
with open(full_path, 'r', encoding='ascii') as f:
result[full_path] = f.read()
return result
def format_files_as_string(input):
def process_file(file_path):
if not is_ascii_text(file_path):
return f"file: {file_path}\nsource: [Binary File - Not ASCII Text]\n"
# pass
with open(file_path, 'r') as file:
content = file.read()
return f"\nfile: {file_path}\ncontent:\n{content}\n"
# return f"{content}\n\n"
formatted_string = ""
exclude_directories = set(['venv', '__pycache__', '.gitignore'])
if isinstance(input, str):
if os.path.isdir(input):
for root, dirs, files in os.walk(input):
dirs[:] = [d for d in dirs if d not in exclude_directories]
files[:] = [f for f in files if f not in exclude_directories]
for file in files:
file_path = os.path.join(root, file)
if os.path.exists(file_path):
formatted_string += process_file(file_path)
else:
if os.path.exists(input):
formatted_string += process_file(input)
elif isinstance(input, list):
for file_path in input:
if os.path.exists(file_path):
formatted_string += process_file(file_path)
else:
raise ValueError("Input must be a directory path, a single file path, or a list of file paths")
return formatted_string
def list_files(start_sha, end_sha, refer_commit_parent=False):
if refer_commit_parent:
start_sha = f"{start_sha}^"
command = ["git", "diff", "--name-only", start_sha, end_sha]
return run_git_command(command)
def list_changes(start_sha, end_sha, refer_commit_parent=False):
if refer_commit_parent:
start_sha = f"{start_sha}^"
command = ["git", "diff", start_sha, end_sha]
output = subprocess.check_output(command, text=True)
return output
def list_commit_messages(start_sha, end_sha, refer_commit_parent=False):
command = ["git", "log", "--pretty=format:%s", "--name-only", start_sha, end_sha]
if refer_commit_parent:
command = ["git", "log", "--pretty=format:%s", "--name-only", f"{start_sha}^..{end_sha}"]
output = subprocess.check_output(command, text=True)
return output
def list_commits_for_branches(branch_a, branch_b):
command = ["git", "log", "--pretty=format:%h", f"{branch_a}..{branch_b}"]
return run_git_command(command)
def list_commits_for_tags(tag_a, tag_b):
command = ["git", "log", "--pretty=format:%h", tag_a, tag_b]
return run_git_command(command)
def list_tags():
command = ["git", "tag"]
return run_git_command(command)
def run_git_command(command):
output = subprocess.check_output(command).decode("utf-8").strip()
records = output.splitlines()
list = []
for record in records:
list.append(record)
return list