in Solutions/Oracle Cloud Infrastructure/Data Connectors/AzureFunctionOCILogs/main.py [0:0]
def process_events(client: oci.streaming.StreamClient, stream_id, initial_cursor, limit, sentinel: AzureSentinelConnector, start_ts):
cursor = initial_cursor
while True:
get_response = client.get_messages(stream_id, cursor, limit=limit, retry_strategy=oci.retry.DEFAULT_RETRY_STRATEGY)
if not get_response.data:
return
for message in get_response.data:
if message:
event = b64decode(message.value.encode()).decode()
logging.info('event details {}'.format(event))
myjson = str(event)
if(myjson.startswith("{")):
#if event != 'ok' and event != 'Test':
event = json.loads(event)
if "data" in event:
if "request" in event["data"] and event["type"] != "com.oraclecloud.loadbalancer.access":
if event["data"]["request"] is not None:
# Process "headers" and "parameters" in "request"
if "headers" in event["data"]["request"]:
process_large_field(event["data"]["request"], "headers", FIELD_SIZE_LIMIT_BYTES)
if "parameters" in event["data"]["request"]:
process_large_field(event["data"]["request"], "parameters", FIELD_SIZE_LIMIT_BYTES)
if "response" in event["data"] and event["data"]["response"] is not None:
# Process "headers" in "response"
if "headers" in event["data"]["response"]:
process_large_field(event["data"]["response"], "headers", FIELD_SIZE_LIMIT_BYTES)
if "additionalDetails" in event["data"]:
process_large_field(event["data"], "additionalDetails", FIELD_SIZE_LIMIT_BYTES)
if "stateChange" in event["data"] and event["data"]["stateChange"] is not None:
# Process "current" in "stateChange"
if "current" in event["data"]["stateChange"]:
process_large_field(event["data"]["stateChange"], "current", FIELD_SIZE_LIMIT_BYTES)
sentinel.send(event)
sentinel.flush()
if check_if_script_runs_too_long(start_ts):
logging.info('Script is running too long. Saving progress and exit.')
break
cursor = get_response.headers["opc-next-cursor"]