tools/check-tf-plan.py (122 lines of code) (raw):
#!/usr/bin/env python3
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import json
import glob
import shutil
import requests
from python_terraform import Terraform
from urllib.parse import unquote
def main(PR):
TOKEN = os.getenv('GITHUB_TOKEN')
GITHUB_WORKSPACE = os.getenv('GITHUB_WORKSPACE')
GITHUB_REPOSITORY = os.getenv('GITHUB_REPOSITORY')
# Get Added / Modified files in PR
modified_files, modified_files_raw, removed_files = pr_files(GITHUB_REPOSITORY, PR)
# Get Working directories to run TF Plan on
working_directories = get_updated_modules(modified_files, removed_files)
# Loop through all the identified working directories
# Deleting added/modified & removed files
try:
for dir in working_directories:
print("----------> RUN FOR: " + dir)
try:
# IF MODULE EXISTS: Copying main directory in temp folder
shutil.copytree(GITHUB_WORKSPACE+'/'+dir, os.getcwd()+'/temp/'+dir)
# Deleting added/modified & removed files
for mfile in modified_files:
if os.path.exists(os.getcwd()+'/temp/'+mfile):
print("Deleting file: " + mfile)
os.remove(os.getcwd()+'/temp/'+mfile)
for rfile in removed_files:
if os.path.exists(os.getcwd()+'/temp/'+rfile):
print("Deleting file: " + rfile)
os.remove(os.getcwd()+'/temp/'+rfile)
except:
# IF MODULE DONOT EXISTS: Creating temp module folder
os.makedirs(os.getcwd()+'/temp/'+dir)
except requests.exceptions.RequestException as e:
print('No working directory with TF configs in PR.')
raise SystemExit(e)
# Loop through all the identified working directories
# Download added/modified files
try:
for dir in working_directories:
print("Module: " + dir)
# Download added/modified files
for file in modified_files:
if dir in file:
print("File: " + file)
for raw in modified_files_raw:
# print("Raw: " + raw)
# print("Raw Decoded: " + unquote(raw))
if file in unquote(raw):
print("Downloading file: " + unquote(raw))
downloadprfiles(unquote(raw), file, os.getcwd()+'/temp/'+os.path.dirname(file))
break
except requests.exceptions.RequestException as e:
print('No working directory with TF configs in PR.')
raise SystemExit(e)
# Loop through all the identified working directories
# Run Terraform Plan
try:
for dir in working_directories:
# print('****************************')
# print(glob.glob(os.getcwd() + '/temp/' + dir+'/*'))
# print('****************************')
# print(glob.glob(os.getcwd() + '/temp/' + dir+'/*/*'))
# Running Terraform Init & Terraform Plan
comment, status = tf(os.getcwd() + '/temp/' + dir)
comment = comment + ' for: **' + dir + '** !'
# Commenting on the PR
commentpr(GITHUB_REPOSITORY, PR, comment, TOKEN)
if(status == 'fail'):
sys.exit('Terraform Init or Terraform Plan FAILED for: '+ dir)
except requests.exceptions.RequestException as e:
print('No working directory with TF configs in PR.')
raise SystemExit(e)
def pr_files(GITHUB_REPOSITORY,pr):
removed_files = []
modified_files = []
modified_files_raw = []
try:
response = requests.get('https://api.github.com/repos/'+ GITHUB_REPOSITORY +'/pulls/'+ str(pr) +'/files')
for file in response.json():
if(file['status'] == 'removed'):
print("Removed File: " + file['filename'])
removed_files.append(file['filename'])
else:
print("Added/Modified File: " + file['filename'])
modified_files.append(file['filename'])
modified_files_raw.append(file['raw_url'])
return modified_files, modified_files_raw, removed_files
except requests.exceptions.RequestException as e:
raise SystemExit(e)
def downloadprfiles(raw, file, path):
# print(path)
if not os.path.exists(path):
os.makedirs(path)
# print('Beginning file download with requests')
r = requests.get(raw)
with open(path + '/' + os.path.basename(file), 'wb') as f:
f.write(r.content)
# Retrieve HTTP meta-data
# print(r.status_code)
# print(r.headers['content-type'])
# print(r.encoding)
def get_updated_modules(modified_files, removed_files):
modified_files_dir = []
removed_files_dir = []
for file in modified_files:
modified_files_dir.append(os.path.dirname(file))
for file in removed_files:
removed_files_dir.append(os.path.dirname(file))
working_directories = modified_files_dir + removed_files_dir
working_directories = list(set(working_directories))
# print("Working Directories:")
# print(working_directories)
modules = [x for x in working_directories if x.startswith('modules/')]
modules = [x for x in modules if x.count('/') == 1]
print("Modules Updated:")
print(modules)
return modules
def tf(dir):
tr = Terraform(working_dir=dir)
return_code_init, stdout_init, stderr_init = tr.init_cmd(capture_output=False)
if "secure_data_warehouse" in dir:
# Special case in SDW module as it require additional mandatory variables compared to any other modules.
return_code_plan, stdout_plan, stderr_plan = tr.plan_cmd(capture_output=False,var={'billing_account_id':'ABCD-EFGH-IJKL-MNOP', 'organization_id':'1234567890', 'random_id': '1234', 'data_analyst_group': 'data_analyst_group@example.com', 'data_engineer_group': 'data_engineer_group@example.com', 'security_administrator_group': 'security_administrator_group@example.com', 'network_administrator_group': 'network_administrator_group@example.com', 'security_analyst_group': 'security_analyst_group@example.com', 'perimeter_additional_members': ['demouser@example.com','demosa@service.gserviceaccount.com'], 'secure_datawarehouse_service_acccount': 'radlab-module-creator-sa@project-id.iam.gserviceaccount.com'})
else:
return_code_plan, stdout_plan, stderr_plan = tr.plan_cmd(capture_output=False,var={'billing_account_id':'ABCD-EFGH-IJKL-MNOP', 'organization_id':'1234567890', 'random_id': '1234'})
path = os.getcwd()+'/temp/'
if(return_code_init == 1):
comment = 'Terraform Init FAILED'
status = 'fail'
if(return_code_plan == 1):
comment = 'Terraform Plan FAILED'
status = 'fail'
else:
comment = 'Terraform Init & Terraform Plan SUCCESSFUL'
status = 'pass'
return comment, status
def commentpr(GITHUB_REPOSITORY, pr, comment, TOKEN):
headers = {'Authorization': f'token {TOKEN}', 'Accept': 'application/vnd.github.v3+json'}
# print(comment)
data = {"body":comment}
try:
response = requests.post('https://api.github.com/repos/'+ GITHUB_REPOSITORY +'/issues/'+ str(pr) +'/comments', data=json.dumps(data), headers=headers)
# print(response.text)
except requests.exceptions.RequestException as e:
raise SystemExit(e)
if __name__ == '__main__':
if len(sys.argv) != 2:
raise SystemExit('No PR passed.')
main(sys.argv[1])