def trigger_export()

in fast/project-templates/secops-anonymization-pipeline/source/main.py [0:0]


def trigger_export(export_date: str, export_start_datetime: str,
                   export_end_datetime: str, log_types: str):
  """
    Trigger secops export using Data Export API for a specific date
    :param secops_source_sa_key_secret_path:
    :param secops_export_bucket:
    :param secops_target_project_id:
    :param log_types:
    :param export_end_datetime:
    :param export_start_datetime:
    :param export_date:
    :param date: datetime (as string) with DD-MM-YYYY format
    :return:
  """

  client = SecOpsClient()
  chronicle = client.chronicle(customer_id=SECOPS_SOURCE_CUSTOMER_ID,
                               project_id=SECOPS_SOURCE_PROJECT,
                               region=SECOPS_REGION)

  export_ids = []

  if export_start_datetime and export_end_datetime:
    start_time, end_time = datetime.strptime(
        export_start_datetime,
        "%Y-%m-%dT%H:%M:%SZ"), datetime.strptime(export_end_datetime,
                                                 "%Y-%m-%dT%H:%M:%SZ")
  else:
    start_time, end_time = utils.format_date_time_range(date_input=export_date)
  gcs_bucket = f"projects/{GCP_PROJECT_ID}/buckets/{SECOPS_EXPORT_BUCKET}"

  try:
    if log_types is None or log_types == "":
      export_response = chronicle.create_data_export(start_time=start_time,
                                                     end_time=end_time,
                                                     gcs_bucket=gcs_bucket,
                                                     export_all_logs=True)
      LOGGER.info(export_response)
      export_id = export_response["dataExportStatus"]["name"].split("/")[-1]
      export_ids.append(export_id)
      LOGGER.info(f"Triggered export with ID: {export_id}")
    else:
      for log_type in log_types.split(","):
        export_response = chronicle.create_data_export(start_time=start_time,
                                                       end_time=end_time,
                                                       gcs_bucket=gcs_bucket,
                                                       log_type=log_type)
        export_id = export_response["dataExportStatus"]["name"].split("/")[-1]
        export_ids.append(export_id)
        LOGGER.info(f"Triggered export with ID: {export_id}")
  except Exception as e:
    LOGGER.error(f"Error during export': {e}")
    raise SystemExit(f'Error during secops export: {e}')

  return export_ids