infra/scripts/create_gdrive_folder.py (208 lines of code) (raw):

import argparse import json import google.auth from googleapiclient.discovery import build from googleapiclient.errors import HttpError from googleapiclient.http import MediaFileUpload import os import random import string import time parser = argparse.ArgumentParser() parser.add_argument("--folder-name", help="Name of the drive folder", type=str) parser.add_argument( "--service-account-email", help="email of your service account", type=str ) parser.add_argument( "--force", help="Force script execution even if output file exists", action="store_true", ) dict_args = parser.parse_args() folder_name = dict_args.folder_name service_account_email = dict_args.service_account_email output_dir = os.path.join(os.path.dirname(__file__), "..", "output_config") output_file = "gdrive_folder_results.json" output_path = os.path.join(output_dir, output_file) templates_path = os.path.join(os.path.dirname(__file__), "..", "..", "templates") terraform_state_path = os.path.join( os.path.dirname(__file__), "..", "terraform.tfstate" ) if not service_account_email: try: # Construct the path to terraform.tfstate in the parent directory # Check if the file exists if not os.path.exists(terraform_state_path): raise FileNotFoundError(f"File not found: {terraform_state_path}") with open(terraform_state_path, "r") as f: tfstate = json.load(f) service_account_email = tfstate["outputs"]["genai_marketing_infra"][ "value" ]["cloud_run_backend_sa"] except (FileNotFoundError, KeyError) as e: print(f"Error: Could not get service account email from terraform state: {e}") exit(1) print( f"Using arguments\n folder_name: {folder_name}\n service_account_email: {service_account_email}" ) def create_folder(folder_name): """Create a folder and prints the folder ID Returns : Folder Id """ creds, _ = google.auth.default() try: # create drive api client service = build("drive", "v3", credentials=creds) file_metadata = { "name": folder_name, "mimeType": "application/vnd.google-apps.folder", } # pylint: disable=maybe-no-member file = service.files().create(body=file_metadata, fields="id").execute() print(f'Folder ID: "{file.get("id")}".') return file.get("id") except HttpError as error: print(f"An error occurred: {error}") return None # Sharing Folder with DOMAIN def share_file(real_folder_id, real_user): creds, _ = google.auth.default() try: # create drive api client service = build("drive", "v3", credentials=creds) ids = [] folder_id = real_folder_id def callback(request_id, response, exception): if exception: # Handle error print(exception) else: print(f"Request_Id: {request_id}") print(f'Permission Id: {response.get("id")}') ids.append(response.get("id")) # pylint: disable=maybe-no-member batch = service.new_batch_http_request(callback=callback) user_permission = { "type": "user", "role": "writer", "emailAddress": real_user, } batch.add( service.permissions().create( fileId=folder_id, body=user_permission, fields="id", ) ) batch.execute() except HttpError as error: print(f"An error occurred: {error}") ids = None return ids # ----- Files upload ---- def upload_with_conversion(folder_id, source_file, destination_file, file_type): """Upload file with conversion Returns: ID of the file uploaded """ creds, _ = google.auth.default() try: # create drive api client service = build("drive", "v3", credentials=creds) results = ( service.files() .list( q=f"name='{destination_file}'", pageSize=10, fields="nextPageToken, files(id, name)", supportsAllDrives=True, includeItemsFromAllDrives=True, ) .execute() ) query = "'{}' in parents".format(folder_id) children = ( service.files() .list(q=query, fields="nextPageToken, files(id, name)") .execute() ) # print(children['files']) fileCheck = next( (item for item in children["files"] if item["name"] == destination_file), None, ) if len(children["files"]) == 0 or fileCheck == None: file_metadata = { "name": destination_file, "mimeType": "application/vnd.google-apps.spreadsheet", "parents": [folder_id], } media = MediaFileUpload(source_file, mimetype=file_type, resumable=True) # pylint: disable=maybe-no-member file = ( service.files() .create(body=file_metadata, media_body=media, fields="id") .execute() ) print(f'File with ID: "{file.get("id")}" has been uploaded.') return file.get("id") else: fileData = next( ( item for item in children["files"] if item["name"] == destination_file ), None, ) print("File already Exists:", fileData["id"]) return fileData["id"] except HttpError as error: print(f"An error occurred: {error}") file = None if __name__ == "__main__": start_time = time.time() required_keys = [ "folder_gdrive_id", "sheet_gdrive_id", "doc_gdrive_id", "slide_gdrive_id", ] if os.path.exists(output_path): with open(output_path, "r") as jsonFile: data = json.load(jsonFile) if all(key in data for key in required_keys) and not dict_args.force: print("Script already run. Use --force to override.") exit(0) creds, _ = google.auth.default() service = build("drive", "v3", credentials=creds) print(folder_name) results = ( service.files() .list( q=f"name='{folder_name}'", pageSize=10, fields="nextPageToken, files(id, name)", supportsAllDrives=True, includeItemsFromAllDrives=True, ) .execute() ) if len(results["files"]) == 0: print(" folder not found") GDRIVE_FOLDER_ID = create_folder(folder_name) else: print(" folder Already Exists, so using the same") GDRIVE_FOLDER_ID = results["files"][0]["id"] share_file(GDRIVE_FOLDER_ID, service_account_email) MarketingExcelID = upload_with_conversion( GDRIVE_FOLDER_ID, f"{templates_path}/[data source] GenAI for Marketing.xlsx", "GenAI for Marketing.xlsx", "text/xls", ) MarketingDocID = upload_with_conversion( GDRIVE_FOLDER_ID, f"{templates_path}/[template] Gen AI for Marketing Google Doc Template.docx", "Gen AI for Marketing Google Doc Template.docx", "text/doc", ) MarketingPptID = upload_with_conversion( GDRIVE_FOLDER_ID, f"{templates_path}/[template] Marketing Assets.pptx", "Marketing Assets.pptx", "text/ppt", ) if not os.path.exists(output_path): data = {} else: with open(output_path, "r") as jsonFile: data = json.load(jsonFile) data["folder_gdrive_id"] = GDRIVE_FOLDER_ID data["sheet_gdrive_id"] = MarketingExcelID data["doc_gdrive_id"] = MarketingDocID data["slide_gdrive_id"] = MarketingPptID data["execution_time"] = f"{time.time() - start_time:.2f} seconds" with open(output_path, "w") as jsonFile: json.dump(data, jsonFile)