def createUpsertTable()

in bundle_adb_360/src/c_Incremental/11_IncDataGeneration.py [0:0]


def createUpsertTable(bpath, tablename) -> DataFrame:
    # get the number of historical customer rows
    df = spark.read.format('parquet').load(bpath + 'historical/'+ tablename +'.parquet/')
    NoOfRows = df.count()
    addressesNoOfRows = spark.read.format('parquet').load(bpath + 'historical/addresses.parquet').count()
    # now we create an insertset of a quarter for customers
    noofinsertRows = int(NoOfRows / 4)
    # create the new customers insert set with  a fourth of rows of the original created table
    # and change the customerid/restaurantId to be at the end of dataset concerning customerid, so that the records a really new/inserts
    if tablename == 'customers':
        insertDf = createCustomerTable(noofinsertRows, minaddress=1, maxaddress=addressesNoOfRows)
        newInsertCustomersDf = insertDf.withColumn('customerId', col('customerId') + NoOfRows)
    else: # it's restaurants
        insertDf = createRestaurants(noofinsertRows, minaddress=1, maxaddress=addressesNoOfRows)
        newInsertRestaurantsDf = insertDf.withColumn('restaurantId', col('restaurantId') + NoOfRows)

    # creating the updateset we get a random set of a quarter of the original ones
    generic = Generic(locale=Locale.EN)
    listIds = generic.numeric.integers(1, NoOfRows, noofinsertRows)
    # make list items unique
    listIds = list(set(listIds))
    # create where clause
    sListids = '(' + ','.join(str(x) for x in listIds) + ')'
    # mark the customer records, that are in the list of to be updated customerids as having an updated email address with a timestamp of this run
    dateString = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
    if tablename == 'customers':
        upsertDf = df.filter(f'customerid in {sListids}').withColumn('Email', lit(f'updated_{dateString}@update.com')).union(newInsertCustomersDf)
    else:
        upsertDf = df.filter(f'restaurantid in {sListids}').withColumn('Email', lit(f'updated_{dateString}@update.com')).union(newInsertRestaurantsDf)
    return upsertDf