in Solutions/Snowflake/Data Connectors/AzureFunctionSnowflake/main.py [0:0]
def main(mytimer: func.TimerRequest):
logging.info('Script started.')
script_start_time = int(time.time())
ctx = snowflake.connector.connect(
user=SNOWFLAKE_USER,
password=SNOWFLAKE_PASSWORD,
account=SNOWFLAKE_ACCOUNT
)
sentinel = AzureSentinelConnector(
log_analytics_uri=LOG_ANALYTICS_URI,
workspace_id=WORKSPACE_ID,
shared_key=SHARED_KEY,
log_type=LOG_TYPE,
queue_size=1000
)
state_manager_logins = StateManager(FILE_SHARE_CONN_STRING, file_path='snowflake_logins')
state_manager_queries = StateManager(FILE_SHARE_CONN_STRING, file_path='snowflake_queries')
state_manager_rlogins = StateManager(FILE_SHARE_CONN_STRING, file_path='snowflake_rlogins')
state_manager_rqueries = StateManager(FILE_SHARE_CONN_STRING, file_path='snowflake_rqueries')
logins_date_from = state_manager_logins.get()
logins_date_from = parse_date_from(logins_date_from)
logging.info(f'Getting LOGIN events from {logins_date_from}')
last_ts = None
for event in get_login_events(ctx, logins_date_from):
sentinel.send(event)
last_ts = event.get('EVENT_TIMESTAMP')
if sentinel.is_empty() and last_ts:
state_manager_logins.post(last_ts)
if check_if_script_runs_too_long(script_start_time):
logging.info(f'Script is running too long. Stop processing new events. Finish script. Sent events: {sentinel.successfull_sent_events_number}')
return
sentinel.flush()
if last_ts:
state_manager_logins.post(last_ts)
if check_if_script_runs_too_long(script_start_time):
logging.info(f'Script is running too long. Stop processing new events. Finish script. Sent events: {sentinel.successfull_sent_events_number}')
return
queries_date_from = state_manager_queries.get()
queries_date_from = parse_date_from(queries_date_from)
logging.info(f'Getting QUERIES events from {queries_date_from}')
last_ts = None
for event in get_query_events(ctx, queries_date_from):
sentinel.send(event)
last_ts = event.get('START_TIME')
if sentinel.is_empty() and last_ts:
state_manager_queries.post(last_ts)
if check_if_script_runs_too_long(script_start_time):
logging.info(f'Script is running too long. Stop processing new events. Finish script. Sent events: {sentinel.successfull_sent_events_number}')
return
sentinel.flush()
if last_ts:
state_manager_queries.post(last_ts)
if check_if_script_runs_too_long(script_start_time):
logging.info(f'Script is running too long. Stop processing new events. Finish script. Sent events: {sentinel.successfull_sent_events_number}')
return
rlogins_date_from = state_manager_rlogins.get()
rlogins_date_from = parse_date_from(rlogins_date_from)
logging.info(f'Getting READER LOGIN events from {rlogins_date_from}')
last_ts = None
for event in get_reader_login_events(ctx, rlogins_date_from):
sentinel.send(event)
last_ts = event.get('EVENT_TIMESTAMP')
if sentinel.is_empty() and last_ts:
state_manager_rlogins.post(last_ts)
if check_if_script_runs_too_long(script_start_time):
logging.info(f'Script is running too long. Stop processing new events. Finish script. Sent events: {sentinel.successfull_sent_events_number}')
return
sentinel.flush()
if last_ts:
state_manager_rlogins.post(last_ts)
if check_if_script_runs_too_long(script_start_time):
logging.info(f'Script is running too long. Stop processing new events. Finish script. Sent events: {sentinel.successfull_sent_events_number}')
return
rqueries_date_from = state_manager_rqueries.get()
rqueries_date_from = parse_date_from(rqueries_date_from)
logging.info(f'Getting READER QUERIES events from {rqueries_date_from}')
last_ts = None
for event in get_reader_query_events(ctx, rqueries_date_from):
sentinel.send(event)
last_ts = event.get('START_TIME')
if sentinel.is_empty() and last_ts:
state_manager_rqueries.post(last_ts)
if check_if_script_runs_too_long(script_start_time):
logging.info(f'Script is running too long. Stop processing new events. Finish script. Sent events: {sentinel.successfull_sent_events_number}')
return
sentinel.flush()
if last_ts:
state_manager_rqueries.post(last_ts)
if check_if_script_runs_too_long(script_start_time):
logging.info(f'Script is running too long. Stop processing new events. Finish script. Sent events: {sentinel.successfull_sent_events_number}')
return
ctx.close()
logging.info(f'Script finished. Sent events: {sentinel.successfull_sent_events_number}')