scripts/bulk-update-status.py (225 lines of code) (raw):

import argparse import sys import os import time import requests import json import logging import datetime import jwt from datetime import datetime, timezone # Disable SSL warnings requests.packages.urllib3.disable_warnings() STATUS_STRINGS = ["New", "Held", "Completed", "Killed", "In Production", None] ALLOWED_INPUT = ["1", "2", "3", "4", "5", "6", "7"] MAX_RECORDS_PER_PAGE = 100 def setup_argparser() -> argparse.ArgumentParser: """Set up the argument parser for the script""" argparser = argparse.ArgumentParser(description='Bulk update status of records') argparser.add_argument('-b', '--baseurl', help='Base URL of the environment to run the script against') argparser.add_argument('-t', '--timestamp', help='Date to filter records before (yyyy-mm-dd)') argparser.add_argument('-T', '--title', help='Title to filter records by') return argparser def get_token() -> str: """Set token from environment variable""" token = os.environ.get("PLUTO_TOKEN") if token == None: print("No token found. Exiting script...") sys.exit() decoded_token = jwt.decode(token, algorithms=[], options={"verify_signature": False}) expiration_time = datetime.fromtimestamp(decoded_token["exp"]) if expiration_time < datetime.now(): print("Token has expired. Exiting script...") sys.exit() print(f"Token expires at: {expiration_time}\n") return token def create_urls(base_url): update_url = f"{base_url}/pluto-core/api/project" commission_list_url = f"{base_url}/pluto-core/api/pluto/commission/list" project_list_url = f"{base_url}/pluto-core/api/project/list" return update_url, commission_list_url, project_list_url def get_headers(token: str) -> dict: return { "Content-Type": "application/json", "Authorization": f"Bearer {token}", } def setup_logging() -> None: logging.basicConfig(filename="data.log", level=logging.DEBUG) def api_put_request(url, headers, json_body, max_retries=5): backoff_factor = 2 for retry in range(max_retries): try: with requests.put(url, headers=headers, data=json_body, verify=False) as response: response.raise_for_status() return response.json() except (requests.exceptions.HTTPError, requests.exceptions.RequestException) as e: if retry == max_retries - 1: # If this is the last retry, raise the exception. raise wait_time = backoff_factor ** retry print(f"An error occurred: {e}. Retrying in {wait_time} seconds...") time.sleep(wait_time) def get_filtered_commission_records(timestamp, status, headers, commission_list_url, title=None) -> list: print("TIMESTAMP: ", timestamp) request_body = { "match": "W_CONTAINS", "completionDateBefore": timestamp } if status: request_body["status"] = status if title: request_body["title"] = title json_body = json.dumps(request_body) records = [] try: json_content = api_put_request(commission_list_url, headers, json_body) total_records = json_content["count"] total_pages = (total_records + MAX_RECORDS_PER_PAGE - 1) // MAX_RECORDS_PER_PAGE start_at = 0 for page in range(1, total_pages + 1): print(f"loading page: {page}") response = api_put_request( f"{commission_list_url}?startAt={start_at}&length={MAX_RECORDS_PER_PAGE}", headers, json_body, ) json_content = response logging.debug(f"page: {page}, records: {json_content['result']}") if status is None: records.extend([record for record in json_content["result"] if record["status"] not in ["Completed", "Killed"]]) else: records.extend(json_content["result"]) start_at += MAX_RECORDS_PER_PAGE except requests.exceptions.RequestException as e: print(e) raise Exception("An error occurred. Exiting script...") # write records to file unix_timestamp = str(time.time()).split(".")[0] with open(f"commissions_before{timestamp}-{unix_timestamp}.json", "a") as f: json.dump(records, f) return records def get_projects(records, headers, timestamp, project_list_url, unix_timestamp) -> list: projects = [] number_of_records = len(records) with open(f"projects_{timestamp}-{unix_timestamp}.json", "w") as f: f.write("[") # start the array for i, record in enumerate(records): commission_id = record['id'] print(f"{number_of_records - i} commissions to go...") print(f"Getting projects for commission ID: {commission_id}") try: json_content = api_put_request( project_list_url, headers, json.dumps({"match": "W_EXACT", "commissionId": commission_id}), ) for project in json_content["result"]: if project['status'] == "Completed" or project['status'] == "Killed": print(f"Skipping project {project['id']} with status: {project['status']}") continue try: if project['deletable'] == True and project['deep_archive'] == True: print(f"Skipping project {project['id']} with deletable: {project['deletable']} and deep_archive: {project['deep_archive']}") with open(f"check_{timestamp}-{unix_timestamp}.json", "a") as f: f.write(json.dumps(project)) f.write(",") continue except KeyError: with open(f"check_{timestamp}-{unix_timestamp}.json", "a") as f: f.write(json.dumps(project)) f.write(",") continue created_timestamp = parse_timestamp(project['created']) parsed_timestamp = parse_timestamp(timestamp) if created_timestamp > parsed_timestamp: print(f"Skipping project {project['id']} with created date: {project['created']}") continue print(f"Adding project with id: {project['id']} to list of projects to update") projects += [project] with open(f"projects_{timestamp}-{unix_timestamp}.json", "a") as f: f.write(json.dumps(project)) f.write(",") # add a comma between records except requests.exceptions.RequestException as e: raise Exception(f"An error occurred. {e} Exiting script...") # remove the trailing comma and end the array with open(f"projects_{timestamp}-{unix_timestamp}.json", "rb+") as f: f.seek(-1, os.SEEK_END) f.truncate() f.write(b"]") return projects def parse_timestamp(timestamp_str): try: dt = datetime.strptime(timestamp_str, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc) except ValueError: dt = datetime.strptime(timestamp_str, '%Y-%m-%dT%H:%M:%S.%fZ').replace(tzinfo=timezone.utc) return dt def update_project_status(headers, timestamp, update_url) -> None: #open projects file user_input = input(f"Open projects_{timestamp}.json? (y/n): ") if user_input.lower() == "y": with open(f"projects_{timestamp}.json", "r") as f: projects = json.load(f) elif user_input.lower() == "n": path = input("Enter path to projects file: ") with open(path, "r") as f: projects = json.load(f) if projects: display_projects(projects) else: print("No records to update") return print("Change status to: ") status = STATUS_STRINGS[get_input()] confirm = input(f"Do you want to update the status of these projects to \033[32m{status}\033[0m? (y/n): ") print(confirm.lower()) if confirm.lower() != "y": print("Exiting script") return for project in projects: print(project) request_body = { "status": status } json_body = json.dumps(request_body) try: json_content = api_put_request( f"{update_url}/{project['id']}/status", headers, json_body, ) print(f"Updated record: {project['id']} to {status} {json_content['status']}") logging.info(f"record: {project['id']}, status: {json_content['status']}, project status updated to: {status}") except requests.exceptions.RequestException as e: raise Exception(f"An error occurred. {e} Exiting script...") def display_projects(projects) -> None: print("\n") project_count = 0 for project in projects: project_count += 1 print(f"projectId: {project['id']:<5} commissionId: {project['commissionId']:<7} title: {project['title']:<65} user: {project['user']:<20} status: {project['status']:<10}") print(f"\nTotal projects: {project_count}\n") def get_input() -> int: status_int = input("\n1: New\n2: Held\n3: Completed\n4: Killed\n5: In Production\n6: All\n7: Exit script\n") if status_int not in ALLOWED_INPUT: print("Invalid input. Exiting script") sys.exit() elif status_int == "7": print("Exiting script") sys.exit() return int(status_int) - 1 def main() -> None: args = setup_argparser().parse_args() baseurl = args.baseurl or "https://local.prexit" update_url, commission_list_url, project_list_url = create_urls(baseurl) print(f"update_url: {update_url}") print(f"commission_list_url: {commission_list_url}") print(f"project_list_url: {project_list_url}") token = get_token() headers = get_headers(token) setup_logging() # Set the timestamp to filter records by timestamp = args.timestamp or "2022-01-01" timestamp = f"{timestamp}T00:00:00.0Z" unix_timestamp = str(time.time()).split(".")[0] choice = input("(G)et or (U)pdate projects?\n") if choice.lower() == "g": print(f"Get projects with a completion date before {timestamp} that are:") status = get_input() filtered_records = get_filtered_commission_records(timestamp=timestamp, title=args.title, headers=headers, status=STATUS_STRINGS[status], commission_list_url=commission_list_url) projects = get_projects(filtered_records, headers, timestamp, project_list_url, unix_timestamp) display_projects(projects) elif choice.lower() == "u": update_project_status(headers, timestamp, update_url) if __name__ == "__main__": logging.info(f"Starting script at {datetime.now()}") main()