in 5-app-infra/3-artifact-publish/docker/cdmc/record_manager/Service.py [0:0]
def create_snapshots(self, retention_records):
for record in retention_records:
if record['expiration_action'] != 'Purge':
continue
snapshot_name = record['dataset'] + '_' + record['table']
snapshot_table = self.snapshot_project + '.' + self.snapshot_dataset + '.' + snapshot_name
snapshot_expiration = record['retention_period'] + self.snapshot_retention_period + 1
create_date = datetime.datetime(record['year'], record['month'], record['day'])
snapshot_expiration = create_date + datetime.timedelta(days=snapshot_expiration)
if record['retention_period'] == -1:
retention_period = 0
else:
retention_period = record['retention_period']
table_expiration = create_date + datetime.timedelta(days=retention_period)
if table_expiration.date() <= datetime.date.today() and snapshot_expiration.date() > datetime.date.today():
ddl = ('create snapshot table ' + snapshot_table
+ ' clone ' + record['project'] + '.' + record['dataset'] + '.' + record['table']
+ ' options ('
+ ' expiration_timestamp = timestamp "' + snapshot_expiration.strftime("%Y-%m-%d") + '");')
try:
if self.mode == 'apply':
try:
self.bq_client.get_table(snapshot_table)
self.bq_client.delete_table(snapshot_table)
print('Info: deleted snapshot table ' + snapshot_table)
except NotFound:
print("Snapshot table {} not found, skipping delete.".format(snapshot_table))
print('Info: using ddl to create snapshot table: ' + ddl)
query_job = self.bq_client.query(ddl).result()
print('Info: created snapshot table ' + snapshot_table)
else:
# validate mode
print('Info: create snapshot table ' + snapshot_table)
except Exception as e:
print('Error occurred in create_snapshots. Error message: ' + str(e))
else:
print('skipping snapshot table creation for table ', record['table'])