utils/fake_data_generation/generate_data_california.py (201 lines of code) (raw):
"""
Copyright 2022 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from faker import Faker
from faker.providers.phone_number import Provider
from random import randint
import random
import string
import csv
import json
from datetime import datetime
from datetime import date
from datetime import timedelta
import datetime
import argparse
import re
from dateutil.relativedelta import relativedelta
faker = None
class CustomPhoneNumberProvider(Provider):
def custom_phone_number(self, country_code="+1", num_digits=9):
print(f'{country_code}{self.msisdn()[:num_digits]}','************')
return f'{country_code}{self.msisdn()[:num_digits]}'
def generate_data_row(fields, **options):
row_data = {}
for field in fields:
field_name = field["name"]
faker_params = field.get("faker_params", {})
faker_function = field.get("faker_function", None)
derive_from = field.get("derive_from", None)
debug = options.get("debug", False)
param_list = []
for key, value in faker_params.items():
param_list.append(f"{key}={value}")
params_str = ",".join(param_list)
# Generate fake data if faker_function is defined. Otherwise, use the
# default value.
if faker_function == "phone_number":
field_value = faker.custom_phone_number(
country_code=str(field.get("country_code", "+1")),
num_digits=field.get("num_digits", 9))
if faker_function == "claimant_id":
field_value = randint(100,1000)
#num_digits=field.get("num_digits", 3))
elif faker_function == "address":
field_value = faker.address()
if field.get("exclude_po_box", True):
while re.match(".*(\s|\n)+(FPO|DPO|APO) (AA|AP|AE).*", field_value):
field_value = faker.address()
elif faker_function:
field_value = eval(f"faker.{faker_function}({params_str})")
elif derive_from:
derive_regex = field.get("derive_regex", "")
matches = re.match(derive_regex, row_data[derive_from], re.IGNORECASE)
field_value = matches[field.get("derive_match_group", 1)]
else:
field_value = field["value"]
# Convert string format.
if isinstance(field_value, datetime.date) and "date_format" in field:
field_value = field_value.strftime(field["date_format"])
if field.get("single_quotes", False):
field_value = f"'{field_value}'"
if field.get("double_quotes", False):
field_value = f'"{field_value}"'
row_data[field_name] = field_value
if debug:
print("Adding row:")
print(row_data)
# Convert all values to string and remove line breaks.
for field_key in row_data.keys():
field_value = str(field_value)
field_value = " ".join(field_value.split())
row_data[field_key] = row_data[field_key].replace("\n", " ")
return row_data
def generate_data(config, num_rows, **options):
fields = config.get("fields", [])
data_list = []
for i in range(num_rows):
data_list.append(generate_data_row(fields, **options))
# generate fake data
cities={1:['Acampo',95220, 'San Joaquin'],2:['Bard',92222,'Riverside'],3:['Calexico',92231,'Imperial'],4:['California City',93505,'Kern'],5:['Dana Point',92629,'Orange'],6:['Esparto',95627,'Yolo'],7:['Finley',95435,'Lake'],8:['Galt',95632,'Sacramento'],9:['Heber',92249,'Imperial'],10:['Inverness',94937,'Marin']}
races={1:['hispanic or latino','white','spanish'],2:['hispanic or latino','white','mexican'],3:['none','white','english'],4:['none','african-american','english'],5:['none','asian','english'],6:['none','asian','hindi'],7:['none','asian','mandarin']}
for i in data_list:
i['claimant_id']= randint(1, 100)
vl=randint(1,10)
vl1=randint(1,7)
i['city']= cities[vl][0]
i['zip_code']=cities[vl][1]
i['county']=cities[vl][2]
i['ethnicity']=races[vl1][0]
i['race']=races[vl1][1]
i['language']=races[vl1][2]
i['state']= "California"
i['document_type']='Unemployment Form'
i['alien_registration_number']=randint(1000000,10000000)
i['pan_number']=random.choice(string.ascii_letters) + random.choice(string.ascii_letters) + random.choice(string.ascii_letters) + str(randint(1000000000,10000000000))
i['MI']=i['middle_name'][0]
# rate per month
rate=[30,32,34,35]
# hours worked in 4 weeks
hour= [160,180]
i['rate']=random.choice(rate)
i['hour']=random.choice(hour)
i['hour_week']=i['hour']/4
i['rate_week']=i['rate']/4
i['cost_week']=i['rate_week']*i['hour_week']
i['current_total']= i['rate'] * i['hour']
i['current_deduct']=round(random.uniform(200,300),2)
# cost post deduction
i['net_total']=i['current_total']- i['current_deduct']
i['ytd_gross']=round(random.uniform(19200,25200),2)
i['ytd_deduct']=round(random.uniform(1000,1200),2)
i['ytd_net']=i['ytd_gross']-i['ytd_deduct']
i['paid']='monthly'
day=[5,7,10]
month=[1,2,3]
month1=[15,12,10]
i['today_date']=date.today()
i['eff_date']=datetime.datetime.strptime(str(i['today_date']), "%Y-%m-%d").date()+relativedelta(days=random.choice(day))
# employment end date
i['work_end_date']=datetime.datetime.strptime(str(i['today_date']), "%Y-%m-%d").date()-relativedelta(months=random.choice(month))
# employment start date
i['work_start_date']=datetime.datetime.strptime(str(i['work_end_date']), "%Y-%m-%d").date()-relativedelta(months=random.choice(month1))
difference = i['work_end_date'] - i['work_start_date']
i['difference_in_years'] = round((difference.days + difference.seconds/86400)/365.2425,2)
i['num_months'] = (i['work_end_date'].year - i['work_start_date'].year) * 12 + (i['work_end_date'].month - i['work_start_date'].month)
i['last_day_of_prev_month'] =datetime.datetime.strptime(str(i['work_end_date']), "%Y-%m-%d").date().replace(day=1) - relativedelta(days=1)
i['start_day_of_prev_month'] = datetime.datetime.strptime(str(i['work_end_date']), "%Y-%m-%d").date().replace(day=1)- timedelta(days=i['last_day_of_prev_month'].day)
i['payment_date'] =datetime.datetime.strptime(str(i['last_day_of_prev_month']), "%Y-%m-%d").date() + relativedelta(days=1)
last_day_of_prev_month = date.today().replace(day=1) - timedelta(days=1)
i['statement_date'] = date.today().replace(day=1) - timedelta(days=last_day_of_prev_month.day)
i['due_date']=datetime.datetime.strptime(str(i['statement_date']), "%Y-%m-%d").date()+relativedelta(days=20)
ch=random.choice(string.ascii_letters)
dl=str(randint(1000000,100000000))
i['driver_license']=ch.upper() + '-' + dl
gender=['F','M']
i['gender']=random.choice(gender)
start_date = datetime.date(2015, 1, 1)
end_date = datetime.date(2021, 1, 1)
time_between_dates = end_date - start_date
days_between_dates = time_between_dates.days
random_number_of_days = random.randrange(days_between_dates)
i['license_iss_date']= start_date + datetime.timedelta(days=random_number_of_days)
i['license_end_date']=i['license_iss_date'].replace(i['license_iss_date'].year + 5)
dcl=['A','B','M','C','D']
i['class']=random.choice(dcl)
i['rest']='NONE'
i['wt']=str(randint(99,220))+'lb'
ht=round(random.uniform(4.5,6.5),2)
i['ht']=str(ht)+'ft'
i['dd']=''.join(random.choice('0123456789ABCDEF') for i in range(16))
i['end']='NONE'
colour=['BRW','BLK','BLUE','GRN']
c1=['BRW','BLK']
i['eyes']=random.choice(colour)
i['hair']=random.choice(c1)
number= randint(1000000000,10000000000)
i['account_no']=str(number)+ '-' + str(randint(1,10))
i['employee_id']=randint(10000,100000)
i['check_no']=randint(10000,100000)
print(data_list)
return data_list
def write_to_csv(file_name, data_list):
with open(file_name, "w", encoding="utf-8") as csv_file:
csv_writer = csv.writer(csv_file)
count = 0
for row in data_list:
if count == 0:
header = row.keys()
csv_writer.writerow(header)
count += 1
csv_writer.writerow(row.values())
return file_name
def load_json_file(file_path):
with open(file_path, "r", encoding="utf-8") as f:
json_dict = json.load(f)
return json_dict
def get_parser():
# Read command line arguments
parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter,
description="""
Script used to generate fake data and save to a CSV file.
""",
epilog="""
Examples:
# Create a csv file of 100 unique rows.
python generate_data.py --config=path-to-config.json --output=sample.json --num-records=100
""")
parser.add_argument(
"--config",
dest="config",
help="Path to config JSON file")
parser.add_argument(
"--output",
dest="output",
help="Path to the destination output CSV file")
parser.add_argument(
"--num-rows",
dest="num_rows",
type=int,
default=100,
help="Number of rows to be generated")
parser.add_argument('--debug', dest='debug', action='store_true')
return parser
if __name__ == "__main__":
parser = get_parser()
args = parser.parse_args()
if not args.config or not args.output:
parser.print_help()
exit()
options = {
"debug": args.debug,
}
print(f"Generating test data for {args.num_rows} rows...")
config = load_json_file(args.config)
faker = Faker(config.get("languages", ["en_US"]))
faker.add_provider(CustomPhoneNumberProvider)
data = generate_data(config, args.num_rows, **options)
write_to_csv(args.output, data)
print(f"Wrote to file {args.output}")