bundle_adb_360/src/c_Incremental/12_applyIncrementalToSilver.py (35 lines of code) (raw):
# Databricks notebook source
# MAGIC %md
# MAGIC ## Load Incremental to Silver
# MAGIC ---
# MAGIC This notebook loads the incremental data for customers and restaurants to silver
# MAGIC
# MAGIC Parameters:
# MAGIC * catalog (default catadb360dev)
# MAGIC * schema (default schemadb360dev)
# MAGIC * volume (default bronz)
# MAGIC * destdb (default silver)
# MAGIC * incfilename (default customers_yyyymmdd.parquet)
# COMMAND ----------
dbutils.widgets.text('catalog', 'catadb360dev')
dbutils.widgets.text('schema', 'schemaadb360dev')
dbutils.widgets.text('volume', 'bronze')
dbutils.widgets.text('destdb', 'silverdb')
dbutils.widgets.text('incfilename', 'restaurants_yyyymmdd.parquet')
# COMMAND ----------
catalog = dbutils.widgets.get('catalog')
schema = dbutils.widgets.get('schema')
volume = dbutils.widgets.get('volume')
destdb = dbutils.widgets.get('destdb')
incfilename = dbutils.widgets.get('incfilename')
# COMMAND ----------
# imports
from delta.tables import DeltaTable
# COMMAND ----------
# variables
tablename = incfilename.split('_')[0]
sourcePath = f"/Volumes/{catalog}/{schema}/{volume}/incremental/"
# COMMAND ----------
spark.sql(f"use catalog {catalog}")
# COMMAND ----------
# load delta table as DeltaTable for merge command
deltaF = DeltaTable.forName(spark, f'{destdb}.{tablename}')
# COMMAND ----------
#load updatefile
updateDf = spark.read.format('parquet').load(sourcePath + incfilename )
# COMMAND ----------
# do the merge
if tablename == 'restaurants':
deltaF.alias('t') \
.merge(
updateDf.alias('u'),
't.restaurantId = u.restaurantId'
) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
else: # it's customers
deltaF.alias('t') \
.merge(
updateDf.alias('u'),
't.customerId = u.customerId'
) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# COMMAND ----------
print ('finished')