in bundle_adb_360/src/c_Incremental/11_IncDataGeneration.py [0:0]
def createUpsertTable(bpath, tablename) -> DataFrame:
# get the number of historical customer rows
df = spark.read.format('parquet').load(bpath + 'historical/'+ tablename +'.parquet/')
NoOfRows = df.count()
addressesNoOfRows = spark.read.format('parquet').load(bpath + 'historical/addresses.parquet').count()
# now we create an insertset of a quarter for customers
noofinsertRows = int(NoOfRows / 4)
# create the new customers insert set with a fourth of rows of the original created table
# and change the customerid/restaurantId to be at the end of dataset concerning customerid, so that the records a really new/inserts
if tablename == 'customers':
insertDf = createCustomerTable(noofinsertRows, minaddress=1, maxaddress=addressesNoOfRows)
newInsertCustomersDf = insertDf.withColumn('customerId', col('customerId') + NoOfRows)
else: # it's restaurants
insertDf = createRestaurants(noofinsertRows, minaddress=1, maxaddress=addressesNoOfRows)
newInsertRestaurantsDf = insertDf.withColumn('restaurantId', col('restaurantId') + NoOfRows)
# creating the updateset we get a random set of a quarter of the original ones
generic = Generic(locale=Locale.EN)
listIds = generic.numeric.integers(1, NoOfRows, noofinsertRows)
# make list items unique
listIds = list(set(listIds))
# create where clause
sListids = '(' + ','.join(str(x) for x in listIds) + ')'
# mark the customer records, that are in the list of to be updated customerids as having an updated email address with a timestamp of this run
dateString = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
if tablename == 'customers':
upsertDf = df.filter(f'customerid in {sListids}').withColumn('Email', lit(f'updated_{dateString}@update.com')).union(newInsertCustomersDf)
else:
upsertDf = df.filter(f'restaurantid in {sListids}').withColumn('Email', lit(f'updated_{dateString}@update.com')).union(newInsertRestaurantsDf)
return upsertDf