def import_logs()

in fast/project-templates/secops-anonymization-pipeline/source/main.py [0:0]


def import_logs(export_date):
  client = SecOpsClient()
  chronicle = client.chronicle(customer_id=SECOPS_TARGET_CUSTOMER_ID,
                               project_id=SECOPS_TARGET_PROJECT,
                               region=SECOPS_REGION)

  storage_client = storage.Client()
  BUCKET = SECOPS_OUTPUT_BUCKET if not SKIP_ANONYMIZATION else SECOPS_EXPORT_BUCKET
  bucket = storage_client.bucket(BUCKET)
  export_ids = utils.get_secops_export_folders_for_date(BUCKET, export_date)

  for export_id in export_ids:
    for folder in utils.list_anonymized_folders(BUCKET, export_id):
      log_type = folder.split("-")[0]

      for log_file in utils.list_log_files(BUCKET, f"{export_id}/{folder}"):
        try:
          blob = bucket.blob(log_file)  # Directly get the blob object
          with blob.open("r") as f:
            logs = []
            for line in f:
              logs.append(line.rstrip('\n'))
              if len(logs) == 1000:
                response = chronicle.ingest_log(
                    log_message=logs, log_type=log_type,
                    forwarder_id=SECOPS_TARGET_FORWARDER_ID)
                LOGGER.debug(response)
                logs = []

            # Send any remaining entries
            if len(logs) > 0:
              response = chronicle.ingest_log(
                  log_message=logs, log_type=log_type,
                  forwarder_id=SECOPS_TARGET_FORWARDER_ID)
              LOGGER.debug(response)
        except Exception as e:
          LOGGER.error(f"Error during log ingestion': {e}")
          raise SystemExit(f'Error during log ingestion: {e}')

    # delete both export and anonymized buckets after ingesting logs
    utils.delete_folder(BUCKET, export_id)
    if not SKIP_ANONYMIZATION:
      utils.delete_folder(SECOPS_EXPORT_BUCKET, export_id)

  LOGGER.info("Finished importing data.")