in Solutions/Tenable App/Data Connectors/TenableVM/TenableExportsOrchestrator/__init__.py [0:0]
def orchestrator_function(context: df.DurableOrchestrationContext):
logging.info("started main orchestrator")
logging.info(
f"instance id: f{context.instance_id} at {context.current_utc_datetime}")
first_run = context.get_input()
if first_run is not None and "isFirstRun" in first_run and first_run["isFirstRun"]:
assets_timestamp = 0
vulns_timestamp = 0
compliance_timestamp = 0
else:
checkpoint_store = ExportsTableStore(connection_string, checkpoint_table_name)
assets_timestamp = checkpoint_store.get("assets", "timestamp").get("assets_timestamp", 0)
vulns_timestamp = checkpoint_store.get("vulns", "timestamp").get("vulns_timestamp", 0)
compliance_timestamp = checkpoint_store.get("compliance", "timestamp").get("compliance_timestamp", 0)
logging.info("checkpoint timestamp value for assets: %d", assets_timestamp)
logging.info("checkpoint timestamp value for vulns: %d", vulns_timestamp)
stats_store = ExportsTableStore(connection_string, stats_table_name)
asset_start_time = int(time.time())
asset_export_job_id = yield context.call_activity(start_asset_job_name, assets_timestamp)
logging.info("retrieved a new asset job ID")
logging.warning(
f"instance id: f{context.instance_id} working with asset export job {asset_export_job_id}, sending to sub orchestrator")
stats_store.merge(asset_export_job_id, "prime", {
"status": TenableStatus.processing.value,
"exportType": TenableExportType.asset.value,
"failedChunks": "",
"chunks": "",
"totalChunksCount": 0,
"jobTimestamp": assets_timestamp,
"startedAt": context.current_utc_datetime.timestamp()
})
logging.info(
f"saved {asset_export_job_id} to stats table. moving to start vuln job.")
vulns_start_time = int(time.time())
vuln_export_job_id = yield context.call_activity(start_vuln_job_name, vulns_timestamp)
logging.info("retrieved a new vuln job ID")
logging.warning(
f"instance id: f{context.instance_id} working with vuln export job {vuln_export_job_id}, sending to sub orchestrator")
stats_store.merge(vuln_export_job_id, "prime", {
"status": TenableStatus.processing.value,
"exportType": TenableExportType.vuln.value,
"failedChunks": "",
"chunks": "",
"totalChunksCount": 0,
"jobTimestamp": vulns_timestamp,
"startedAt": context.current_utc_datetime.timestamp()
})
if ingest_compliance_data:
compliance_start_time = int(time.time())
compliance_export_job_id = yield context.call_activity(start_compliance_job_name, compliance_timestamp)
logging.info("retrieved a new compliance job ID")
logging.warning(
"instance id: {} working with compliance export job {}, sending to sub orchestrator".format(
context.instance_id, compliance_export_job_id
)
)
logging.info("filter by time for compliance: %d", compliance_timestamp)
stats_store.merge(compliance_export_job_id, "prime", {
"status": TenableStatus.processing.value,
"exportType": TenableExportType.compliance.value,
"failedChunks": "",
"chunks": "",
"totalChunksCount": 0,
"jobTimestamp": compliance_timestamp,
"startedAt": context.current_utc_datetime.timestamp()
})
logging.info(
"saved {} to stats table.".format(compliance_export_job_id))
else:
logging.info("User opted not to ingest compliance data. Skipping compliance export job")
asset_export = context.call_sub_orchestrator(asset_orchestrator_name, {
"timestamp": assets_timestamp,
"assetJobId": asset_export_job_id,
"mainOrchestratorInstanceId": context.instance_id,
"start_time": asset_start_time
})
stats_store.merge(asset_export_job_id, "prime", {
"status": TenableStatus.sent_to_sub_orchestrator.value
})
vuln_export = context.call_sub_orchestrator(vuln_orchestrator_name, {
"timestamp": vulns_timestamp,
"vulnJobId": vuln_export_job_id,
"mainOrchestratorInstanceId": context.instance_id,
"start_time": vulns_start_time
})
stats_store.merge(vuln_export_job_id, "prime", {
"status": TenableStatus.sent_to_sub_orchestrator.value
})
if ingest_compliance_data:
compliance_export = context.call_sub_orchestrator(compliance_orchestrator_name, {
"timestamp": compliance_timestamp,
"complianceJobId": compliance_export_job_id,
"mainOrchestratorInstanceId": context.instance_id,
"start_time": compliance_start_time
})
stats_store.merge(compliance_export_job_id, "prime", {
"status": TenableStatus.sent_to_sub_orchestrator.value
})
results = yield context.task_all([asset_export, vuln_export, compliance_export])
else:
logging.info("User opted not to ingest compliance data. Skipping compliance export sub orchestrator call.")
results = yield context.task_all([asset_export, vuln_export])
logging.info("Finished all jobs!")
logging.info(results)
try:
asset_job_finished = results[0]
asset_id = asset_job_finished["id"] if "id" in asset_job_finished else ""
chunks = asset_job_finished["chunks"] if "chunks" in asset_job_finished else [
]
chunk_ids = ",".join(str(c) for c in chunks)
if asset_id != "":
stats_store.merge(asset_id, "prime", {
"status": TenableStatus.finished.value,
"chunks": chunk_ids,
"totalChunksCount": len(chunks)
})
except IndexError as e:
logging.warning("asset job returned no results")
logging.warning(e)
try:
vuln_job_finished = results[1]
vuln_id = vuln_job_finished["id"] if "id" in vuln_job_finished else ""
chunks = vuln_job_finished["chunks"] if "chunks" in vuln_job_finished else [
]
chunk_ids = ",".join(str(c) for c in chunks)
if vuln_id != "":
stats_store.merge(vuln_id, "prime", {
"status": TenableStatus.finished.value,
"chunks": chunk_ids,
"totalChunksCount": len(chunks)
})
except IndexError as e:
logging.warning("vuln job returned no results")
logging.warning(e)
# condition to process compliance job data only if user opted for it
if ingest_compliance_data:
process_compliance_data(results, stats_store)
next_check = context.current_utc_datetime + \
timedelta(minutes=export_schedule_minutes)
yield context.create_timer(next_check)
context.continue_as_new(None)