in Solutions/Infoblox/Data Connectors/InfobloxCloudDataConnector/InfobloxCurrentToAzureStorage/infoblox_to_azure_storage.py [0:0]
def initiate_and_iterate_through_response_obj(self, date_state_manager_obj):
"""Initiate and iterate through the response object.
Fetches checkpoint data, processes dates, and query parameters.
Handles response object iteration and posts data to Azure storage.
Args:
date_state_manager_obj: State management object.
"""
__method_name = inspect.currentframe().f_code.co_name
try:
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Fetching checkpoint data",
)
)
checkpoint_data = self.get_checkpoint_data(date_state_manager_obj, load_flag=True)
from_date = None
if checkpoint_data:
from_date = checkpoint_data.get("to_date", None)
if not from_date:
# !This means function app is running first time
to_date = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
from_date = self.add_xh_to_iso_time_string(to_date, -abs(consts.CURRENT_TIME_INTERVAL))
data_to_post = {"to_date": from_date}
self.post_checkpoint_data(date_state_manager_obj, data_to_post, dump_flag=True)
else:
to_date = self.add_xh_to_iso_time_string(from_date, consts.CURRENT_TIME_INTERVAL)
base_checkpoint_file_name_for_from_and_to_dates = self.create_checkpoint_file_name_using_dates(
from_date, to_date, self.ioc_type
)
self.checkpoint_for_from_and_to_dates = StateManager(
consts.CONN_STRING,
base_checkpoint_file_name_for_from_and_to_dates,
consts.FILE_SHARE_NAME_DATA,
)
status_of_last_from_date = self.get_checkpoint_data(self.checkpoint_for_from_and_to_dates)
if status_of_last_from_date:
status_of_last_from_date = int(status_of_last_from_date)
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Retry count from last iteration = {}".format(status_of_last_from_date),
)
)
list_of_file_with_prefix = self.list_file_names_in_file_share(
self.parent_file,
base_checkpoint_file_name_for_from_and_to_dates,
)
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"No. of file = {} with prefix = {}".format(
len(list_of_file_with_prefix),
base_checkpoint_file_name_for_from_and_to_dates,
),
)
)
# ! delete all checkpoints file starting with prefix base_checkpoint_file_name_for_from_and_to_dates
if list_of_file_with_prefix:
self.delete_files_from_azure_storage(list_of_file_with_prefix, self.parent_file)
if status_of_last_from_date > 2:
self.store_failed_range(from_date, to_date)
data_to_post = {"to_date": to_date}
self.post_checkpoint_data(date_state_manager_obj, data_to_post, dump_flag=True)
from_date = to_date
to_date = self.add_xh_to_iso_time_string(from_date, consts.CURRENT_TIME_INTERVAL)
base_checkpoint_file_name_for_from_and_to_dates = self.create_checkpoint_file_name_using_dates(
from_date, to_date, self.ioc_type
)
self.checkpoint_for_from_and_to_dates = StateManager(
consts.CONN_STRING,
base_checkpoint_file_name_for_from_and_to_dates,
consts.FILE_SHARE_NAME_DATA,
)
status_of_last_from_date = 1
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"This to_date occur for the first time. Storing retry count = 1",
)
)
self.post_checkpoint_data(
self.checkpoint_for_from_and_to_dates,
str(status_of_last_from_date),
)
else:
status_of_last_from_date += 1
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Storing retry count = {}".format(status_of_last_from_date),
)
)
self.post_checkpoint_data(
self.checkpoint_for_from_and_to_dates,
str(status_of_last_from_date),
)
else:
status_of_last_from_date = 1
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"This to_date occur for the first time. Storing retry count = 1",
)
)
self.post_checkpoint_data(self.checkpoint_for_from_and_to_dates, str(status_of_last_from_date))
query_params = {"from_date": from_date, "to_date": to_date}
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"Query params = {}".format(query_params),
)
)
response_obj = self.initiate_response_obj(query_params, self.ioc_type)
base_checkpoint_file_name_for_from_and_to_dates += "_" + self.start_time
self.iterate_through_response_obj(response_obj, base_checkpoint_file_name_for_from_and_to_dates)
applogger.info(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
"IOCs posted to azure storage from_date = {}, to_date = {}".format(from_date, to_date),
)
)
data_to_post = {"to_date": to_date}
self.post_checkpoint_data(date_state_manager_obj, data_to_post, dump_flag=True)
self.checkpoint_for_from_and_to_dates.delete()
except InfobloxException:
raise InfobloxException()
except Exception as err:
applogger.error(
self.log_format.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.azure_function_name,
consts.UNEXPECTED_ERROR_MSG.format(err),
)
)
raise InfobloxException()