bundle_adb_360/src/c_Incremental/13_Scd1ToGold.py [25:56]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
catalog = dbutils.widgets.get('catalog')
schema = dbutils.widgets.get('schema')
volume = dbutils.widgets.get('volume')
destdb = dbutils.widgets.get('destdb')
tablename = dbutils.widgets.get('tablename')

# COMMAND ----------

# imports
from delta.tables import DeltaTable
from pyspark.sql.functions import lit, monotonically_increasing_id, row_number, col
from pyspark.sql.types import LongType
from pyspark.sql.window import Window

# COMMAND ----------

spark.sql(f"use catalog {catalog}")

# COMMAND ----------

# variables
maxVersion = 0

# COMMAND ----------

# load maxversion from watermarktable
maxVersion = spark.sql(f"select lastCommitKey from golddb.watermarktable where tablename = '{tablename}'" ).collect()[0][0]

# COMMAND ----------

# load the last version from table
tcdf = spark.read.format('delta').option('readChangeFeed', 'true').option('startingVersion', maxVersion + 1).table(f'silverdb.{tablename}')
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



bundle_adb_360/src/c_Incremental/14_Scd2ToGold.py [25:56]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
catalog = dbutils.widgets.get('catalog')
schema = dbutils.widgets.get('schema')
volume = dbutils.widgets.get('volume')
destdb = dbutils.widgets.get('destdb')
tablename = dbutils.widgets.get('tablename')

# COMMAND ----------

# imports
from delta.tables import DeltaTable
from pyspark.sql.functions import lit, monotonically_increasing_id, row_number, col
from pyspark.sql.types import LongType
from pyspark.sql.window import Window

# COMMAND ----------

spark.sql(f"use catalog {catalog}")

# COMMAND ----------

# variables
maxVersion = 0

# COMMAND ----------

# load maxversion from watermarktable
maxVersion = spark.sql(f"select lastCommitKey from golddb.watermarktable where tablename = '{tablename}'" ).collect()[0][0]

# COMMAND ----------

# load the last version from table
tcdf = spark.read.format('delta').option('readChangeFeed', 'true').option('startingVersion', maxVersion + 1).table(f'silverdb.{tablename}')
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



