tools/bigdata-generator/lib.py (178 lines of code) (raw):
# Copyright 2023 Google LLC All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module that contains the different classes used by the Dataflow pipeline"""
import logging
import json
import apache_beam as beam
from google.cloud import storage
class PipelineHelper:
"""Class that encapsulates the logic used by the Dataflow pipeline"""
def __init__(self, config_file_path):
self.config = Config(config_file_path=config_file_path)
def get_batches(self):
"""
Get batches to be used by RowGenerator
Example:
config.total_number_of_rows = 1002
config.number_of_rows_per_batch = 1000
result: [1000,2]
"""
batches = [self.config.number_of_rows_per_batch] * int(self.config.total_number_of_rows
/ self.config.number_of_rows_per_batch)
batches.append(self.config.total_number_of_rows % self.config.number_of_rows_per_batch)
return batches
def get_config(self):
return self.config
class Config:
def __init__(self,config_file_path):
self._parse(
config_data=self.get_config_data(config_file_path)
)
def _parse(self, config_data):
"""
Parses the config file data
"""
self.total_number_of_rows = config_data["total_number_of_rows"]
self.number_of_rows_per_batch = config_data["number_of_rows_per_batch"]
self.sinks = config_data["sinks"]
self.lookup_fields = config_data["lookup_fields"]
self.fields = config_data["fields"]
def get_config_data(self, config_file_path):
"""
Reads the config file data from the path provided (local file or GCS URI)
"""
config_file_json_data = None
if config_file_path.startswith("gs://"): #GCS
bucket = config_file_path.split("/")[2] #get the name of the bucket
object_name = "/".join(config_file_path.split("/")[3:]) # get the name of the object
#download the contents from GCS
client = storage.Client()
bucket = client.get_bucket(bucket)
blob = bucket.get_blob(object_name)
config_file_json_data = blob.download_as_string().decode("UTF-8")
else: #local file
with open(config_file_path, encoding="UTF-8") as local_json_file:
config_file_json_data = local_json_file.read()
return json.loads(config_file_json_data)
class ConfigFileValidator:
""" This class validates the config file provided to the process"""
def __init__(self,config: Config):
self.config = config
def validate(self):
"""
Validates the config file
"""
errors = []
warnings = []
total_number_of_rows = int(self.config.total_number_of_rows)
number_of_rows_per_batch = int(self.config.number_of_rows_per_batch)
if number_of_rows_per_batch > total_number_of_rows:
errors.append(
f"number_of_rows_per_batch ('{number_of_rows_per_batch}') "
+ "is bigger than total_number_of_rows('{total_number_of_rows}')"
)
if len(self.config.sinks) == 0:
warnings.append(
"no sinks have been defined! data will be not be persisted after generated"
)
return errors, warnings
class RowGenerator(beam.DoFn):
def __init__(self, config):
self.config = config
# self.number_of_rows_per_batch = number_of_rows_per_batch
def process(self, number_of_rows_per_batch):
"""
Function called by ParDo
Generates a given amount of rows by following the rules defined in the config file
"""
from datetime import datetime
logging.debug("Starting batch")
metrics = []
for x in range(0, number_of_rows_per_batch):
result = {}
# Generate the values for all fields EXCEPT the LOOKUP_VALUE ones
for field in self.config.fields:
if field["generation"]["type"] == "LOOKUP_VALUE":
continue
# logging.debug(f"processing field:{field}")
field_name = field["name"]
generation_type = field["generation"]["type"]
start_time = datetime.now()
if generation_type == "RANDOM_FROM_REGEX":
result[field_name] = self._get_random_from_regex(
field=field
)
if generation_type == "RANDOM_BETWEEN":
result[field_name] = self._get_random(
field=field
)
elif generation_type == "RANDOM_FROM_LIST":
result[field_name] = self._get_random_from_list(
field=field
)
elif generation_type == "UUID":
result[field_name] = self._get_uuid()
end_time = datetime.now()
metrics.append({
"field": field_name,
"runtime": (end_time - start_time).microseconds
}.copy())
#Now that we have generated the fields, we can generate the LOOKUP_VALUE ones
for field in [
x for x in self.config.fields if x["generation"]["type"] == "LOOKUP_VALUE"
]:
field_name = field["name"]
start_time = datetime.now()
result[field_name] = self._get_lookup_value(
field=field,
row=result
)
end_time = datetime.now()
metrics.append({
"field": field_name,
"runtime": (end_time - start_time).microseconds
}.copy())
yield result
def _get_random_from_regex(self, field):
"""
Returns a random string taking as an input a regex expression
"""
import exrex
result = str(exrex.getone(field["generation"]["expression"]))
return result
def _get_uuid(self):
"""
Returns a UUID
"""
import uuid
uuid_obj = uuid.uuid4()
return uuid_obj.hex
def _get_random(self, field):
"""
Returns different types of random values depending on the data type
"""
if field["type"] == "STRING":
return self._get_random_string(
subtype=field["generation"]["subtype"],
length=field["generation"]["length"]
)
elif field["type"] == "INT":
return self._get_random_int(
min=field["generation"]["min"],
max=field["generation"]["max"]
)
elif field["type"] == "FLOAT":
return self._get_random_float(
min=field["generation"]["min"],
max=field["generation"]["max"],
num_decimals=field["generation"]["num_decimals"]
)
elif field["type"] == "DATETIME":
return self._get_random_datetime(
min=field["generation"]["min"],
max=field["generation"]["max"],
output_format=field["generation"]["output_format"]
)
raise Exception(f"Unknown field type: {field['type']}")
def _get_random_string(self, subtype, length):
"""
Returns a random string
"""
import random
import string
return "".join(random.choice(getattr(string, subtype)) for i in range(length))
def _get_random_int(self, min, max):
"""
Returns a random int
"""
import random
return random.randint(min, max)
def _get_random_float(self, min, max, num_decimals):
"""
Returns a random float
"""
import random
return round(random.uniform(min, max), num_decimals)
def _get_random_datetime(self, min, max, output_format):
"""
Returns a formatted random datatime between 2 input dates
"""
from datetime import datetime, timedelta
import random
min_date = datetime.strptime(min, "%Y-%m-%dT%H:%M:%SZ")
max_date = datetime.strptime(max, "%Y-%m-%dT%H:%M:%SZ")
# logging.debug(f"generating random date between: '{min_date}' and '{max_date}'")
delta = max_date - min_date
int_delta = (delta.days * 24 * 60 * 60) + delta.seconds
random_second = random.randrange(int_delta)
random_date = min_date + timedelta(seconds=random_second)
return random_date.strftime(output_format)
def _get_random_from_list(self, field):
"""
Returns a random value taking a list of values as input
The input list can be weighted
"""
import random
# weights is optional
weights = field["generation"]["weights"] if field["generation"].get("weights") else None
result = random.choices(
field["generation"]["values"],
weights=weights,
k=1
)[0]
return result
def _get_lookup_value(self, field, row):
"""
Returns a value by reading a dictionary
"""
lookup_field_value = next(
(x for x in self.config.lookup_fields
if x["lookup_name"] == field["generation"]["lookup_name"]),
None
)
if lookup_field_value is None:
raise Exception(
f"Can't lookup value for item for field '{field['generation']['lookup_name']}'"
)
result = lookup_field_value["mapping"][row[lookup_field_value["source_field_name"]]]
return result