in Solutions/Infoblox/Data Connectors/InfobloxCloudDataConnector/InfobloxHistoricalToAzureStorage/infoblox_to_azure_storage.py [0:0]
def initiate_and_iterate_through_response_obj(self, date_state_manager_obj):
"""Initiate and iterate through the response object.
Fetches checkpoint data, processes dates, and query parameters.
Handles response object iteration and posts data to Azure storage.
Args:
date_state_manager_obj: State management object.
"""
__method_name = inspect.currentframe().f_code.co_name
try:
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Fetching checkpoint data",
)
)
checkpoint_data = self.get_checkpoint_data(date_state_manager_obj, load_flag=True)
to_date = None
if checkpoint_data:
to_date = checkpoint_data.get("from_date", None)
if not to_date:
to_date = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
data_to_post = {"from_date": to_date}
self.post_checkpoint_data(date_state_manager_obj, data_to_post, dump_flag=True)
# *Condition if historical data are fetched successfully till given start time
end_date = consts.HISTORICAL_START_DATE + " 00:00:00.000"
end_date_epoch = self.iso_to_epoch_str(end_date)
to_date_epoch = self.iso_to_epoch_str(to_date)
if (int(to_date_epoch) - int(end_date_epoch)) < 0:
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Complete Fetching Historical Data till = {}".format(end_date),
)
)
return
from_date = self.add_xh_to_iso_time_string(to_date, consts.HISTORICAL_TIME_INTERVAL)
base_checkpoint_file_name_for_from_and_to_dates = self.create_checkpoint_file_name_using_dates(
from_date, to_date, self.ioc_type
)
self.checkpoint_for_from_and_to_dates = StateManager(
consts.CONN_STRING,
base_checkpoint_file_name_for_from_and_to_dates,
consts.FILE_SHARE_NAME_DATA,
)
status_of_last_from_date = self.get_checkpoint_data(self.checkpoint_for_from_and_to_dates)
if status_of_last_from_date:
status_of_last_from_date = int(status_of_last_from_date)
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Retry count from last iteration = {}".format(status_of_last_from_date),
)
)
list_of_file_with_prefix = self.list_file_names_in_file_share(
self.parent_file,
base_checkpoint_file_name_for_from_and_to_dates,
)
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"No. of file = {} with prefix = {}".format(
len(list_of_file_with_prefix),
base_checkpoint_file_name_for_from_and_to_dates,
),
)
)
# ! delete all checkpoints file starting with prefix base_checkpoint_file_name_for_from_and_to_dates
if list_of_file_with_prefix:
self.delete_files_from_azure_storage(list_of_file_with_prefix, self.parent_file)
if status_of_last_from_date > 2:
self.store_failed_range(from_date, to_date)
to_date = from_date
from_date = self.add_xh_to_iso_time_string(to_date, consts.HISTORICAL_TIME_INTERVAL)
base_checkpoint_file_name_for_from_and_to_dates = self.create_checkpoint_file_name_using_dates(
from_date, to_date, self.ioc_type
)
data_to_post = {"from_date": to_date}
self.post_checkpoint_data(date_state_manager_obj, data_to_post, dump_flag=True)
self.checkpoint_for_from_and_to_dates = StateManager(
consts.CONN_STRING,
base_checkpoint_file_name_for_from_and_to_dates,
consts.FILE_SHARE_NAME_DATA,
)
status_of_last_from_date = 1
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"This from_date occur for the first time. Storing retry count = 1",
)
)
self.post_checkpoint_data(
self.checkpoint_for_from_and_to_dates,
str(status_of_last_from_date),
)
else:
status_of_last_from_date += 1
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Storing retry count = {}".format(status_of_last_from_date),
)
)
self.post_checkpoint_data(
self.checkpoint_for_from_and_to_dates,
str(status_of_last_from_date),
)
else:
status_of_last_from_date = 1
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"This from_date occur for the first time. Storing retry count = 1",
)
)
self.post_checkpoint_data(self.checkpoint_for_from_and_to_dates, str(status_of_last_from_date))
query_params = {"from_date": from_date, "to_date": to_date}
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Query params = {}".format(query_params),
)
)
response_obj = self.initiate_response_obj(query_params, self.ioc_type)
base_checkpoint_file_name_for_from_and_to_dates += "_" + self.start_time
self.iterate_through_response_obj(response_obj, base_checkpoint_file_name_for_from_and_to_dates)
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"IOCs posted to azure storage from_date = {}, to_date = {}".format(from_date, to_date),
)
)
data_to_post = {"from_date": from_date}
self.post_checkpoint_data(date_state_manager_obj, data_to_post, dump_flag=True)
self.checkpoint_for_from_and_to_dates.delete()
except InfobloxException:
raise InfobloxException()
except Exception as err:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.UNEXPECTED_ERROR_MSG.format(err),
)
)
raise InfobloxException()