notifier.py (88 lines of code) (raw):
import datetime
import json
import mimetypes
import os
from typing import List
from google.cloud import storage
from google.oauth2 import service_account
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from blocks import return_slack_blocks
def send_slack_message(report_file: str):
print("Trying to send Slack Message...")
token = os.getenv("SLACK_KEY")
# Initialize a Web API client
client = WebClient(token=token)
try:
client.chat_postMessage(
channel="C07AHPJ525V",
text="Important update from ACTIONS...", # This is required but can be anything if blocks are used
blocks=return_slack_blocks(report_file),
)
except SlackApiError as e:
print(f"Error sending message: {e.response['error']}")
def get_content_type(filename):
"""
Return the MIME type based on the filename extension.
"""
content_type, _ = mimetypes.guess_type(filename)
return content_type or "text/plain"
def get_current_timestamp():
"""
Returns the current date and time formatted as 'YYYY-mm-dd_HHmm_SS'.
:return: str, formatted timestamp
"""
print("Getting the current date...")
now = datetime.datetime.now()
formatted_timestamp = now.strftime("%Y-%m-%d-%H%M_%S")
return formatted_timestamp
def list_and_write(source_directory: str, links: List[str]):
print(f"Trying to traverse {source_directory} and write files...")
for root, dirs, files in os.walk(source_directory):
for file in files:
fullpath = os.path.join(root, file)
target_path = os.path.join(f"{time_now}", (fullpath))
content_type = get_content_type(file)
blob = bucket.blob(target_path)
blob.content_type = content_type
read_mode = "rb" if content_type == "image/png" else "r"
write_mode = "wb" if content_type == "image/png" else "w"
links.append(f"{root_url}{target_path}")
with (
open(fullpath, read_mode) as infile,
blob.open(write_mode, content_type=content_type) as f,
):
contents = infile.read()
f.write(contents)
def compile_link_file(links_win: List[str], links_mac: List[str]):
print("Compiling the current artifact link file...")
links_mac.sort()
links_win.sort()
output_file_path = os.path.join(time_now, "full_report.txt")
content_type = get_content_type(output_file_path)
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(output_file_path)
with blob.open("w", content_type=content_type) as f:
f.write("Windows Artifacts:\n\n")
for url in links_win:
f.write(url + "\n")
f.write("\nMac Artifacts:\n\n")
for url in links_mac:
f.write(url + "\n")
return f"{root_url}{time_now}/full_report.txt"
time_now = get_current_timestamp()
links_win = []
links_mac = []
root_url = "https://storage.googleapis.com/notifier-artifact-bucket/"
report_file = ""
try:
credential_string = os.getenv("GCP_CREDENTIAL")
credentials_dict = json.loads(credential_string)
credentials = service_account.Credentials.from_service_account_info(
credentials_dict
)
storage_client = storage.Client(credentials=credentials)
bucket_name = "notifier-artifact-bucket"
bucket = storage_client.bucket(bucket_name)
list_and_write("artifacts-mac", links_mac)
list_and_write("artifacts-win", links_win)
except Exception as e:
print("The artifact upload process ran into some issues: ", e)
try:
report_file = compile_link_file(links_win, links_mac)
except Exception as e:
print(f"The link compilation had some issues: {e}")
send_slack_message(report_file)
print("Message sent! Script finished successfully.")