4-mmrag_tooluse/ingest.py (155 lines of code) (raw):

import os import json import sqlite3 def create_tables(cursor): """ Create tables for Free Cash Flow data. """ cursor.execute(''' CREATE TABLE IF NOT EXISTS Free_Cash_Flow_Less_Equipment_Finance_Leases ( id INTEGER PRIMARY KEY AUTOINCREMENT, report_date TEXT, quarter TEXT, operating_cash_flow REAL, purchases_of_property_and_equipment REAL, equipment_acquired_under_finance_leases REAL, principal_repayments_of_other_finance_leases REAL, principal_repayments_of_financing_obligations REAL, free_cash_flow_less_equipment_finance_leases REAL ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS Free_Cash_Flow_Less_Principal_Repayments ( id INTEGER PRIMARY KEY AUTOINCREMENT, report_date TEXT, quarter TEXT, operating_cash_flow REAL, purchases_of_property_and_equipment REAL, principal_repayments_of_finance_leases REAL, principal_repayments_of_financing_obligations REAL, free_cash_flow_less_principal_repayments REAL ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS Free_Cash_Flow_Reconciliation ( id INTEGER PRIMARY KEY AUTOINCREMENT, report_date TEXT, quarter TEXT, operating_cash_flow REAL, purchases_of_property_and_equipment REAL, free_cash_flow REAL ) ''') def ingest_json_files(json_folder_path, db_path): """ Ingest JSON files into the SQLite database. """ # Connect to the SQLite3 database conn = sqlite3.connect(db_path) cursor = conn.cursor() # Create tables create_tables(cursor) # Loop over all JSON files in the specified folder for filename in os.listdir(json_folder_path): if filename.endswith(".json"): file_path = os.path.join(json_folder_path, filename) # Load the JSON data with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) title = data.get("title") report_date = data.get("report_date") records = data.get("data", []) if title == "Free_Cash_Flow_Less_Equipment_Finance_Leases": for record in records: cursor.execute(''' INSERT INTO Free_Cash_Flow_Less_Equipment_Finance_Leases ( report_date, quarter, operating_cash_flow, purchases_of_property_and_equipment, equipment_acquired_under_finance_leases, principal_repayments_of_other_finance_leases, principal_repayments_of_financing_obligations, free_cash_flow_less_equipment_finance_leases ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ''', ( report_date, record.get("quarter"), record.get("operating_cash_flow"), record.get("purchases_of_property_and_equipment"), record.get("equipment_acquired_under_finance_leases"), record.get( "principal_repayments_of_other_finance_leases"), record.get( "principal_repayments_of_financing_obligations"), record.get( "free_cash_flow_less_equipment_finance_leases") )) elif title == "Free_Cash_Flow_Less_Principal_Repayments": for record in records: cursor.execute(''' INSERT INTO Free_Cash_Flow_Less_Principal_Repayments ( report_date, quarter, operating_cash_flow, purchases_of_property_and_equipment, principal_repayments_of_finance_leases, principal_repayments_of_financing_obligations, free_cash_flow_less_principal_repayments ) VALUES (?, ?, ?, ?, ?, ?, ?) ''', ( report_date, record.get("quarter"), record.get("operating_cash_flow"), record.get("purchases_of_property_and_equipment"), record.get("principal_repayments_of_finance_leases"), record.get( "principal_repayments_of_financing_obligations"), record.get("free_cash_flow_less_principal_repayments") )) elif title == "Free_Cash_Flow_Reconciliation": for record in records: cursor.execute(''' INSERT INTO Free_Cash_Flow_Reconciliation ( report_date, quarter, operating_cash_flow, purchases_of_property_and_equipment, free_cash_flow ) VALUES (?, ?, ?, ?, ?) ''', ( report_date, record.get("quarter"), record.get("operating_cash_flow"), record.get("purchases_of_property_and_equipment"), record.get("free_cash_flow") )) else: print( f"Unknown title '{title}' in file '{filename}'. Skipping.") # Commit changes and close connection conn.commit() conn.close() def execute_query(db_path, query, params=()): """ Execute a SQL query and return the results. Parameters: db_path (str): Path to the SQLite database file. query (str): SQL query to be executed. params (tuple): Parameters to be passed to the query (default is an empty tuple). Returns: list: List of rows returned by the query. """ try: # Connect to the SQLite database conn = sqlite3.connect(db_path) cursor = conn.cursor() # Execute the query with parameters cursor.execute(query, params) results = cursor.fetchall() # Commit if it's an INSERT/UPDATE/DELETE query if query.strip().upper().startswith(('INSERT', 'UPDATE', 'DELETE')): conn.commit() return results except sqlite3.Error as e: print(f"An error occurred: {e}") return [] finally: # Close the connection if conn: conn.close() # Example usage if __name__ == "__main__": json_folder_path = "./table_json" db_path = "earnings.db" # Ingest JSON files into the database ingest_json_files(json_folder_path, db_path) # Example query: Retrieve all records from Free_Cash_Flow_Reconciliation where free_cash_flow > 25000 query = ''' SELECT DISTINCT quarter, operating_cash_flow, purchases_of_property_and_equipment, free_cash_flow FROM Free_Cash_Flow_Reconciliation WHERE free_cash_flow > ? ''' results = execute_query(db_path, query, (25700,)) for row in results: print(row)