utils/rules_update.py (269 lines of code) (raw):
"""
Copyright 2022 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
'''
This script is used for Updating the Validation Rules
'''
# Create the parser
import argparse
from sys import argv
import os
import json
from copy import deepcopy
from six import string_types
from jinjasql import JinjaSql
from google.cloud import storage
from commmon.utils.logging_handler import Logger
from common.config import PATH,BUCKET_NAME_VALIDATION,PATH_TEMPLATE,PROJECT_ID
file_name=PATH.rsplit('/', 1)[-1]
def parsers():
'''
Define the Various Command Line arguments as well as Flags available to the user
Returns:
args: Returns the command line arguments taken from the User
'''
parser = argparse.ArgumentParser()
# Add an argument
template_choices = ['Template1', 'Template2','Template3','Template4',
'Template5','Template6']
parser.add_argument('--template',type=str,
choices=template_choices,
help = "Select One of the available Templates",required=True)
parser.add_argument('--key', type=str,
required=(template_choices[1] not in argv
and '-v' not in argv and '-d' not in argv),
help="Enter the Key Name")
parser.add_argument('--doc_type', type=str,
required=True,
help = "Enter the Document Name")
parser.add_argument('--operator', type=str,
required =((template_choices[2]) in argv) or (template_choices[3] in argv),
help= "Enter one of the Operators")
parser.add_argument('-d','--delete', action='store_true',
help = "Please Enter the doc_type and the Rule_id you wish to remove")
parser.add_argument('-v','--view', action='store_true',
help = "Enter the doctype for which you wish to view the existing rules")
parser.add_argument('--ruleid', type=str,
required = '-d' in argv)
parser.add_argument('--value', type=str,
required=((template_choices[0] not in argv) and
template_choices[1] not in argv and
'-v' not in argv and '-d' not in argv) ,
help = "Enter the Value to compare with the select operators")
parser.add_argument('--value_before', type=int,
required=(template_choices[0] in argv),
help = "Months to subtract from the current date(Only for Template1)")
parser.add_argument('--value_after', type=int,
required=(template_choices[0] in argv),
help="Value in months to add to the current date(Only for Template1)")
parser.add_argument("--key_list", nargs="+",
required=(template_choices[1] in argv),
help = "Enter the keys.(Template 2 only)")
parser.add_argument("--operator_list", nargs="+",
required=(template_choices[1] in argv),
help = "Enter the operators(Template 2 only)")
parser.add_argument("--value_list", nargs="+",
required=(template_choices[1] in argv),
help = " Enter the values(Template 2 only)")
args = parser.parse_args()
return args
def read_json(path):
"""Function to read a json file directly from gcs
Input:
path: gcs path of the json to be loaded
Output:
data_dict : dict consisting of the json output
"""
bucket_name = path.split("/", 3)[2]
file_path = path.split("/", 3)[3]
client = storage.Client()
bucket = client.get_bucket(bucket_name)
blob = bucket.blob(file_path)
data = blob.download_as_string(client=None)
data_dict = json.loads(data)
return data_dict
def get_params(args):
'''
Assign the Appropriate Parameters according to the input taken fron the Command Line
Input :
args:Command Line input taken from the User
Output:
params : Dictionary conisting of the necessary fields to plug into the SQL Query
'''
if args.delete:
datak[args.doc_type].pop(args.ruleid)
elif args.template == 'Template1':
params={
'key' : args.key,
'BQ_Table' :"`project_table`",
'doc_type' : args.doc_type,
'value_before' : args.value_before,
'value_after' : args.value_after}
elif args.template == 'Template2':
params = {
'key' : args.key_list,
'BQ_Table' : '`project_table`',
'value' : args.value_list,
'operator' : args.operator_list}
else:
params={
'key' : args.key,
'BQ_Table' :"`project_table`",
'doc_type' : args.doc_type,
'operator' : args.operator,
'value' : args.value}
return params
def check_float(potential_float):
'''
Check if the value passed to the function is a float or not
'''
try:
float(potential_float)
return True
except ValueError:
return False
def get_sql_from_template(query, bind_params):
'''
Get the Parameters Json and select the appropriate fields where single quote needs to be applied
Input:
query : Is the SQL Query prepared using teamplates
bind_params : list of parameters
Output:
query%params : complete sql query with quotes added in appropriate location
'''
if not bind_params:
return query
params = deepcopy(bind_params)
# Used for iterating on the key value dictionary
for key, val in params.items():
if "doc_type" in key or ("key" not in key and not check_float(val)):
if "BQ_Table" in key:
continue
if "dim" in key:
continue
params[key] = quote_sql_string(val)
return query % params
def quote_sql_string(value):
'''
If `value` is a string type, escapes single quotes in the string
and returns the string enclosed in single quotes.
'''
if isinstance(value, string_types):
if value in ['>','<','==','!=','=']:
return value
new_value = str(value)
new_value = new_value.replace("'", "''")
return "'{}'".format(new_value)
return value
def prep_sql(template,params):
'''
Apply a JinjaSql template (string) substituting parameters (dict) and return
the final SQL.
Input:
template : Template used for query Generation
params: parameter dict to plug into the joinja template
Output:
jinn : the generated SQL Query
'''
j = JinjaSql(param_style='pyformat')
query, bind_params = j.prepare_query(template, params)
jinn = get_sql_from_template(query, bind_params)
return jinn
def get_var(var):
'''
Used to Generate the Rule id for a new rule
Input:
var : Last Rule number of the currently existing rule list
Output:
rule_no : New Rule Number
'''
var = int(var.split('_')[1])
rule_no = "Rule_" + str(var+1)
return rule_no
def update_json(jinn,args):
'''
Add the newly create Sql Query(Rule) to the already existing Rules json
Input:
jinn: Newly Generated SQL Query(Rule)
args" Command Line arguments recevied from the user
Output:
data : Updated Rule Dict
'''
try:
data = read_json(PATH)
except Exception:
data={}
rule_no = "Rule_1"
try:
get_list = list(data[args.doc_type].keys())
except KeyError as e:
data[args.doc_type]={rule_no : jinn}
return data
var=get_list[-1]
if args.ruleid:
rule_no = args.ruleid
else:
rule_no = get_var(var)
if args.doc_type in data:
data[args.doc_type][rule_no] = jinn
return data
def load_template(data,args):
'''
As per the user input, the appropriate jinja template is loaded from the template file
Input:
data: dict consisting of the jinja templates
args : Command Line input taken from the user
Ouput:
template : The Template selected as per the user input
'''
template = data[args.template]
return template
def dump_updated_json(data):
'''
Update the newly created rules json
Input:
data: Dict with the updated rules included
'''
json.dump(data, open(file_name,"w"))
def upload_bucket():
'''
Upload the Newly Created json to the bucket
'''
n = 3
groups = PATH.split('/')
blob_name='/'.join(groups[n:])
client = storage.Client(project=PROJECT_ID)
bucket = client.get_bucket(BUCKET_NAME_VALIDATION)
blob = bucket.blob(blob_name)
blob.upload_from_filename(file_name)
def main():
'''
Main Function Used to control the code flow
'''
args=parsers()
try:
data = read_json(PATH_TEMPLATE)
except FileNotFoundError:
Logger.info(f"Template File Does not exist or the file path is incorrect")
return
if args.view:
try:
data = read_json(PATH)
#Iterating on the rules to print one by one
for i in data[args.doc_type]:
Logger.info(f"{i} : {data[args.doc_type][i]}")
except FileNotFoundError:
Logger.info(f"File Does not exist or the file path is incorrect")
except KeyError:
Logger.info(f"Document Name is incorrect")
return
if args.delete:
try:
data = read_json(PATH)
data[args.doc_type].pop(args.ruleid)
json.dump(data, open(file_name,"w"))
upload_bucket()
os.remove(file_name)
except FileNotFoundError:
Logger.info(f"File Does not exist or the file path is incorrect")
return
params = get_params(args)
template = load_template(data,args)
try:
jinn = prep_sql(template,params)
except TypeError as e:
Logger.info(f"Incorrect Input Format")
return
data = update_json(jinn,args)
dump_updated_json(data)
upload_bucket()
os.remove(file_name)
if __name__ == "__main__":
main()