in fast/project-templates/secops-anonymization-pipeline/source/main.py [0:0]
def import_logs(export_date):
client = SecOpsClient()
chronicle = client.chronicle(customer_id=SECOPS_TARGET_CUSTOMER_ID,
project_id=SECOPS_TARGET_PROJECT,
region=SECOPS_REGION)
storage_client = storage.Client()
BUCKET = SECOPS_OUTPUT_BUCKET if not SKIP_ANONYMIZATION else SECOPS_EXPORT_BUCKET
bucket = storage_client.bucket(BUCKET)
export_ids = utils.get_secops_export_folders_for_date(BUCKET, export_date)
for export_id in export_ids:
for folder in utils.list_anonymized_folders(BUCKET, export_id):
log_type = folder.split("-")[0]
for log_file in utils.list_log_files(BUCKET, f"{export_id}/{folder}"):
try:
blob = bucket.blob(log_file) # Directly get the blob object
with blob.open("r") as f:
logs = []
for line in f:
logs.append(line.rstrip('\n'))
if len(logs) == 1000:
response = chronicle.ingest_log(
log_message=logs, log_type=log_type,
forwarder_id=SECOPS_TARGET_FORWARDER_ID)
LOGGER.debug(response)
logs = []
# Send any remaining entries
if len(logs) > 0:
response = chronicle.ingest_log(
log_message=logs, log_type=log_type,
forwarder_id=SECOPS_TARGET_FORWARDER_ID)
LOGGER.debug(response)
except Exception as e:
LOGGER.error(f"Error during log ingestion': {e}")
raise SystemExit(f'Error during log ingestion: {e}')
# delete both export and anonymized buckets after ingesting logs
utils.delete_folder(BUCKET, export_id)
if not SKIP_ANONYMIZATION:
utils.delete_folder(SECOPS_EXPORT_BUCKET, export_id)
LOGGER.info("Finished importing data.")