# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ==============================================================================
"""DICOM store utilities for ILM."""

import collections
import dataclasses
import datetime
import json
import logging
import os
import time
from typing import Any, Iterable, List, Mapping, Optional, Tuple
import urllib.parse
import uuid

import apache_beam as beam
import google.auth
import google.auth.transport.requests
from google.cloud import bigquery
from google.cloud import storage
import requests

from ilm import ilm_config
from ilm import ilm_types
from ilm.ilm_lib import pipeline_util


# Required DICOM Store BigQuery table columns.
_BLOB_STORAGE_SIZE = 'BlobStorageSize'
_LAST_UPDATED = 'LastUpdated'
_LAST_UPDATED_STORAGE_CLASS = 'LastUpdatedStorageClass'
_SERIES_INSTANCE_UID = 'SeriesInstanceUID'
_SOP_INSTANCE_UID = 'SOPInstanceUID'
_STORAGE_CLASS = 'StorageClass'
_STUDY_INSTANCE_UID = 'StudyInstanceUID'
_TYPE = 'Type'

# Optional DICOM Store BigQuery table columns.
_ACQUISITION_DATE = 'AcquisitionDate'
_CONTENT_DATE = 'ContentDate'
_IMAGE_TYPE = 'ImageType'
_MODALITY = 'Modality'
_NUMBER_OF_FRAMES = 'NumberOfFrames'
_PIXEL_SPACING = 'PixelSpacing'
_SERIES_DATE = 'SeriesDate'
_SOP_CLASS_UID = 'SOPClassUID'
_STUDY_DATE = 'StudyDate'

_OPTIONAL_DICOM_STORE_METADATA_COLUMNS = frozenset([
    _ACQUISITION_DATE,
    _CONTENT_DATE,
    _IMAGE_TYPE,
    _MODALITY,
    _NUMBER_OF_FRAMES,
    _PIXEL_SPACING,
    _SERIES_DATE,
    _SOP_CLASS_UID,
    _STUDY_DATE,
])

# BigQuery query keywords
_INSTANCE_DICOM_WEB_PATH = 'InstanceDicomWebPath'


class _DicomTags:
  SERIES_INSTANCE_UID = '0020000E'
  SOP_INSTANCE_UID = '00080018'


@dataclasses.dataclass(frozen=True)
class _StorageClassResult:
  succeeded_count: int
  failed_count: int
  unfinished_count: int

  succeeded_instances: List[str]
  failed_instances: List[str]
  unfinished_instances: List[str]


class DicomStoreError(Exception):
  pass


def _get_optional_dicom_store_metadata_table_columns(
    ilm_cfg: ilm_config.ImageLifecycleManagementConfig,
) -> List[str]:
  """Fetches metadata columns present in DICOM Store BigQuery table."""
  project_id, dataset_id, table_id = (
      ilm_cfg.dicom_store_config.dicom_store_bigquery_table.split('.')
  )
  client = bigquery.Client(project=project_id)
  table_ref = client.dataset(dataset_id).table(table_id)
  table = client.get_table(table_ref)
  metadata_columns = []
  for field in table.schema:
    if field.name in _OPTIONAL_DICOM_STORE_METADATA_COLUMNS:
      metadata_columns.append(field.name)
  return metadata_columns


def _metadata_columns_select(metadata_columns: List[str]) -> str:
  return (
      ', '.join(metadata_columns)
      .replace(
          _PIXEL_SPACING,
          f'{_PIXEL_SPACING}[SAFE_OFFSET(0)] AS {_PIXEL_SPACING}',
      )
      .replace(
          _IMAGE_TYPE, f'ARRAY_TO_STRING({_IMAGE_TYPE}, "/") AS {_IMAGE_TYPE}'
      )
  )


def _metadata_columns_group_by(metadata_columns: List[str]) -> str:
  return (
      ', '.join(metadata_columns)
      .replace(_PIXEL_SPACING, f'{_PIXEL_SPACING}[SAFE_OFFSET(0)]')
      .replace(_IMAGE_TYPE, f'ARRAY_TO_STRING({_IMAGE_TYPE}, "/")')
  )


def get_dicom_store_query(
    ilm_cfg: ilm_config.ImageLifecycleManagementConfig,
):
  """Generates query to fetch DICOM metadata from BigQuery table."""
  metadata_columns = _get_optional_dicom_store_metadata_table_columns(ilm_cfg)
  metadata_columns.extend([_BLOB_STORAGE_SIZE, _STORAGE_CLASS, _TYPE])
  metadata_columns_select = _metadata_columns_select(metadata_columns)
  metadata_columns_group_by = _metadata_columns_group_by(metadata_columns)
  partition_by_uid_triple = (
      f'PARTITION BY {_STUDY_INSTANCE_UID}, {_SERIES_INSTANCE_UID}, '
      f'{_SOP_INSTANCE_UID}'
  )
  return (
      'SELECT '
      '  CONCAT( '
      f'   "studies/", {_STUDY_INSTANCE_UID}, '
      f'   "/series/", {_SERIES_INSTANCE_UID}, '
      f'   "/instances/", {_SOP_INSTANCE_UID}) AS {_INSTANCE_DICOM_WEB_PATH}, '
      f'  {metadata_columns_select}, '
      f'  MAX({_LAST_UPDATED}) AS {_LAST_UPDATED}, '
      f'  MAX({_LAST_UPDATED_STORAGE_CLASS}) AS {_LAST_UPDATED_STORAGE_CLASS} '
      'FROM ( '
      '  SELECT '
      '    *, '
      f'   MAX({_LAST_UPDATED}) '
      f'     OVER ({partition_by_uid_triple}) AS MaxLastUpdated, '
      '    CASE '
      f'     WHEN {_TYPE}="CREATE" OR {_STORAGE_CLASS}!=LAG({_STORAGE_CLASS}) '
      f'       OVER ({partition_by_uid_triple} ORDER BY {_LAST_UPDATED} ASC) '
      f'     THEN {_LAST_UPDATED} '
      '      ELSE NULL '
      f'   END AS {_LAST_UPDATED_STORAGE_CLASS} '
      f'  FROM `{ilm_cfg.dicom_store_config.dicom_store_bigquery_table}` '
      ') '
      'WHERE '
      f'  {_LAST_UPDATED} = MaxLastUpdated '
      f'  AND {_STORAGE_CLASS} IS NOT NULL '
      f'  AND {_TYPE}!="DELETE" '
      f'GROUP BY {_STUDY_INSTANCE_UID}, {_SERIES_INSTANCE_UID}, '
      f'  {_SOP_INSTANCE_UID}, {metadata_columns_group_by}'
  )


def parse_dicom_metadata(
    raw_metadata: Mapping[str, Any],
    today: datetime.datetime = datetime.datetime.now(tz=datetime.timezone.utc),
) -> Tuple[str, ilm_types.InstanceMetadata]:
  """Parses metadata from DICOM store table query result."""
  try:
    instance = raw_metadata[_INSTANCE_DICOM_WEB_PATH]
    pixel_spacing = (
        float(raw_metadata[_PIXEL_SPACING])
        if raw_metadata.get(_PIXEL_SPACING)
        else None
    )
    last_updated_storage_class = raw_metadata[_LAST_UPDATED_STORAGE_CLASS]
    if last_updated_storage_class is None:
      logging.info(
          'Missing last updated for storage class in DICOM Store BigQuery '
          'table for instance %s. Using last updated timestamp instead.',
          instance,
      )
      last_updated_storage_class = raw_metadata[_LAST_UPDATED]
    days_in_current_storage_class = (today - last_updated_storage_class).days

    metadata = ilm_types.InstanceMetadata(
        instance=instance,
        modality=raw_metadata.get(_MODALITY, None),
        num_frames=int(raw_metadata.get(_NUMBER_OF_FRAMES) or 0),
        pixel_spacing=pixel_spacing,
        sop_class_uid=raw_metadata.get(_SOP_CLASS_UID, ''),
        acquisition_date=raw_metadata.get(_ACQUISITION_DATE, None),
        content_date=raw_metadata.get(_CONTENT_DATE, None),
        series_date=raw_metadata.get(_SERIES_DATE, None),
        study_date=raw_metadata.get(_STUDY_DATE, None),
        image_type=raw_metadata.get(_IMAGE_TYPE, None),
        size_bytes=raw_metadata[_BLOB_STORAGE_SIZE],
        storage_class=ilm_config.StorageClass(raw_metadata[_STORAGE_CLASS]),
        num_days_in_current_storage_class=days_in_current_storage_class,
    )
  except (KeyError, ValueError, TypeError) as e:
    raise RuntimeError(
        f'Invalid metadata in DICOM Store BigQuery table: {raw_metadata}'
    ) from e
  return (instance, metadata)


class GenerateFilterFilesDoFn(beam.DoFn):
  """DoFn to generate filter files for DICOM instances storage class changes."""

  def __init__(self, ilm_cfg: ilm_config.ImageLifecycleManagementConfig):
    self._ilm_cfg = ilm_cfg
    self._dry_run = ilm_cfg.dry_run
    self._storage_client = None

  def setup(self):
    self._storage_client = storage.Client()

  def process(
      self,
      storage_class_to_changes: Tuple[
          ilm_config.StorageClass, Iterable[ilm_types.StorageClassChange]
      ],
  ) -> Iterable[ilm_types.SetStorageClassRequestMetadata]:
    new_storage_class, changes = storage_class_to_changes
    filter_file_gcs_uri = os.path.join(
        self._ilm_cfg.tmp_gcs_uri, f'instances-filter-{uuid.uuid4()}.txt'
    )
    changes = list(changes)
    if not self._dry_run:
      instances_str = '\n'.join([change.instance for change in changes])
      pipeline_util.write_gcs_file(
          file_content=instances_str,
          gcs_uri=filter_file_gcs_uri,
          storage_client=self._storage_client,
      )
    if self._ilm_cfg.report_config.detailed_results_report_gcs_uri:
      instances = [change.instance for change in changes]
    else:
      instances = []
    yield ilm_types.SetStorageClassRequestMetadata(
        [change.move_rule_id for change in changes],
        instances,
        filter_file_gcs_uri,
        new_storage_class,
    )


class DicomStoreClient:
  """DICOM store client for storage class operations."""

  _HEALTHCARE_API = 'https://healthcare.googleapis.com/v1beta1/'
  _OPERATION_NAME_KEY = 'name'
  _OPERATION_STATUS_KEY = 'done'

  def __init__(self, ilm_cfg: ilm_config.ImageLifecycleManagementConfig):
    self._dicom_store_resource = ilm_cfg.dicom_store_config.dicom_store_path
    self._dicom_store_path = urllib.parse.urljoin(
        DicomStoreClient._HEALTHCARE_API, self._dicom_store_resource.lstrip('/')
    ).rstrip('/')
    self._auth_credentials = None

  def _add_auth_to_header(
      self, msg_headers: Mapping[str, str]
  ) -> Mapping[str, str]:
    """Updates credentials returns header with authentication added."""
    if self._auth_credentials is None:
      self._auth_credentials = google.auth.default(
          scopes=['https://www.googleapis.com/auth/cloud-platform']
      )[0]
    if not self._auth_credentials.valid:
      auth_req = google.auth.transport.requests.Request()
      self._auth_credentials.refresh(auth_req)
    self._auth_credentials.apply(msg_headers)
    return msg_headers

  def _get_tag_value(
      self, instance_metadata: Mapping[str, Any], tag: str
  ) -> Optional[Any]:
    try:
      return instance_metadata[tag]['Value'][0]
    except (KeyError, IndexError):
      return None

  def fetch_instances(self, dicomweb_path: str) -> List[str]:
    """Fetches instances for given DICOMweb path.

    Args:
      dicomweb_path: path to either a study (i.e. studies/<STUDY>) or a series
        (i.e. studies/<STUDY>/series/<SERIES>) in DICOM store.

    Returns:
      List of full DICOMweb paths of instances.

    Raises:
      DicomStoreError: in case of unexpected or error response from DICOM Store.
    """
    uri = os.path.join(
        self._dicom_store_path, 'dicomWeb', dicomweb_path, 'instances'
    )
    headers = self._add_auth_to_header({})
    try:
      response = requests.get(uri, headers=headers)
      response.raise_for_status()
      response_json = response.json()
    except (requests.HTTPError, requests.exceptions.JSONDecodeError) as e:
      raise DicomStoreError(
          f'Failed to get instances metadata for {dicomweb_path}.'
      ) from e

    has_series = 'series/' in dicomweb_path
    instances = []
    for instance_metadata in response_json:
      instance_dicomweb_path = [dicomweb_path]
      if not has_series:
        series_instance_uid = self._get_tag_value(
            instance_metadata, _DicomTags.SERIES_INSTANCE_UID
        )
        instance_dicomweb_path.append(f'series/{series_instance_uid}')
      sop_instance_uid = self._get_tag_value(
          instance_metadata, _DicomTags.SOP_INSTANCE_UID
      )
      instance_dicomweb_path.append(f'instances/{sop_instance_uid}')
      instance_dicomweb_path = '/'.join(instance_dicomweb_path)
      instances.append(instance_dicomweb_path)
    return instances

  def set_blob_storage_settings(
      self,
      request: ilm_types.SetStorageClassRequestMetadata,
  ) -> str:
    """Sends setBlobStorageSettings request with filter file.

    Args:
      request: metadata for sending setBlobStorageSettings request.

    Returns:
      setBlobStorageSettings response operation name.

    Raises:
      DicomStoreError if request fails.
    """
    uri = f'{self._dicom_store_path}:setBlobStorageSettings'
    headers = self._add_auth_to_header(
        {'Content-Type': 'application/json; charset=utf-8'}
    )
    data = (
        '{{ '
        '  resource: "{dicom_store_resource}",'
        '  filter_config: {{ resourcePathsGcsUri: "{filter_file_gcs_uri}" }},'
        '  blobStorageSettings: {{ blob_storage_class: "{new_storage_class}" }}'
        '}}'.format(
            dicom_store_resource=self._dicom_store_resource,
            filter_file_gcs_uri=request.filter_file_gcs_uri,
            new_storage_class=request.new_storage_class.value,
        )
    )
    try:
      response = requests.post(uri, data=data, headers=headers)
      response.raise_for_status()
      operation = json.loads(response.content)
      return operation[DicomStoreClient._OPERATION_NAME_KEY]
    except (requests.HTTPError, json.JSONDecodeError) as exp:
      raise DicomStoreError(
          f'Failed to change storage class for {len(request.move_rule_ids)} '
          f'instance(s) with error: {exp}'
      ) from exp

  def is_operation_done(self, operation_name: str) -> bool:
    """Get operation status.

    Args:
      operation_name: Operation resource name.

    Returns:
      True if operation is done.

    Raises:
      DicomStoreError if unable to get operation status.
    """
    uri = os.path.join(DicomStoreClient._HEALTHCARE_API, operation_name)
    headers = self._add_auth_to_header({})
    try:
      response = requests.get(uri, headers=headers)
      response.raise_for_status()
      operation = json.loads(response.content)
      return operation[DicomStoreClient._OPERATION_NAME_KEY]
    except (requests.HTTPError, json.JSONDecodeError) as exp:
      raise DicomStoreError(
          f'Failed to get operation status for {operation_name}'
      ) from exp
    except KeyError:
      return False


class UpdateStorageClassesDoFn(beam.DoFn):
  """DoFn to update storage classes of instances in DICOM store."""

  def __init__(self, ilm_cfg: ilm_config.ImageLifecycleManagementConfig):
    self._dry_run = ilm_cfg.dry_run
    self._cfg = ilm_cfg
    self._dicom_store_client = DicomStoreClient(self._cfg)
    self._throttler = pipeline_util.Throttler(
        self._cfg.dicom_store_config.max_dicom_store_qps
    )

  def setup(self):
    self._dicom_store_client = DicomStoreClient(self._cfg)
    self._throttler = pipeline_util.Throttler(
        self._cfg.dicom_store_config.max_dicom_store_qps
    )

  def process(
      self, request: ilm_types.SetStorageClassRequestMetadata
  ) -> Iterable[ilm_types.SetStorageClassOperationMetadata]:
    if self._dry_run:
      logging.info('Dry-run mode. Skipping DICOM store changes.')
      return [
          ilm_types.SetStorageClassOperationMetadata(
              operation_name='',
              move_rule_ids=request.move_rule_ids,
              instances=request.instances,
              succeeded=True,
          )
      ]
    try:
      self._throttler.wait()
      operation_name = self._dicom_store_client.set_blob_storage_settings(
          request
      )
      return [
          ilm_types.SetStorageClassOperationMetadata(
              operation_name=operation_name,
              move_rule_ids=request.move_rule_ids,
              instances=request.instances,
              start_time=time.time(),
              filter_file_gcs_uri=request.filter_file_gcs_uri,
          )
      ]
    except DicomStoreError as e:
      logging.error(e)
      return [
          ilm_types.SetStorageClassOperationMetadata(
              operation_name='',
              move_rule_ids=request.move_rule_ids,
              instances=request.instances,
              succeeded=False,
              filter_file_gcs_uri=request.filter_file_gcs_uri,
          )
      ]


class GenerateReportDoFn(beam.DoFn):
  """DoFn to generate report of storage class updates after LROs finish."""

  def __init__(self, ilm_cfg: ilm_config.ImageLifecycleManagementConfig):
    self._dry_run = ilm_cfg.dry_run
    self._move_rules = ilm_cfg.storage_class_config.move_rules
    self._operation_timeout_seconds = (
        ilm_cfg.dicom_store_config.set_storage_class_timeout_min * 60
    )
    self._cfg = ilm_cfg

  def setup(self):
    self._operations_in_progress = collections.deque()
    self._operations_done = []
    self._storage_class_results = {}
    timestamp = str(datetime.datetime.now())
    self._summarized_report = (
        self._cfg.report_config.summarized_results_report_gcs_uri.format(
            timestamp
        )
    )
    if self._cfg.report_config.detailed_results_report_gcs_uri:
      self._detailed_report = (
          self._cfg.report_config.detailed_results_report_gcs_uri.format(
              timestamp
          )
      )
    else:
      self._detailed_report = ''
    self._dicom_store_client = DicomStoreClient(self._cfg)
    self._throttler = pipeline_util.Throttler(
        target_qps=self._cfg.dicom_store_config.max_dicom_store_qps
    )

  def _wait_for_operations_to_finish(self):
    """Waits for DICOM store operations to finish.

    Operations that time out have succeeded status = None.
    """
    logging.info('Waiting for SetBlobStorageSettings operations to finish.')
    if not self._operations_in_progress:
      return
    num_operations_to_verify = len(self._operations_in_progress)
    success = 0
    failure = len(self._operations_done)  # Operations that had already failed.
    timeout = 0
    while self._operations_in_progress:
      if num_operations_to_verify <= 0:
        # Wait to verify operations again after a full round.
        time.sleep(60)
        num_operations_to_verify = len(self._operations_in_progress)
      operation = self._operations_in_progress.popleft()
      try:
        self._throttler.wait()
        if self._dicom_store_client.is_operation_done(operation.operation_name):
          operation = dataclasses.replace(operation, succeeded=True)
          self._operations_done.append(operation)
          success += 1
          num_operations_to_verify -= 1
          continue
      except DicomStoreError as e:
        logging.error(e)
        operation = dataclasses.replace(operation, succeeded=False)
        self._operations_done.append(operation)
        failure += 1
        num_operations_to_verify -= 1
        continue
      if time.time() > operation.start_time + self._operation_timeout_seconds:
        self._operations_done.append(operation)
        timeout += 1
        num_operations_to_verify -= 1
        continue
      self._operations_in_progress.append(operation)
      num_operations_to_verify -= 1

    logging.info(
        'Finished waiting for SetBlobStorageSettings operations: '
        '%s succeeded, %s failed, %s timed out.',
        success,
        failure,
        timeout,
    )

  def _cleanup_filter_files(self):
    if not self._cfg.dicom_store_config.set_storage_class_delete_filter_files:
      return
    for operation in self._operations_in_progress:
      if operation.filter_file_gcs_uri:
        pipeline_util.delete_gcs_file(operation.filter_file_gcs_uri)
    for operation in self._operations_done:
      if operation.filter_file_gcs_uri:
        pipeline_util.delete_gcs_file(operation.filter_file_gcs_uri)

  def _compute_results(self):
    """Computes instances results based on status of DICOM store operations."""
    # Rule id to counts
    succeeded = collections.defaultdict(int)
    failed = collections.defaultdict(int)
    unfinished = collections.defaultdict(int)

    succeeded_instances = collections.defaultdict(list)
    failed_instances = collections.defaultdict(list)
    unfinished_instances = collections.defaultdict(list)
    rule_ids = set()
    for operation in self._operations_done:
      if operation.succeeded is None:
        counter = unfinished
        instances = unfinished_instances
      elif operation.succeeded:
        counter = succeeded
        instances = succeeded_instances
      else:
        counter = failed
        instances = failed_instances
      for rule_id in operation.move_rule_ids:
        counter[rule_id] += 1
        rule_ids.add(rule_id)
      if operation.instances and len(operation.instances) == len(
          operation.move_rule_ids
      ):
        for rule_id, instance in zip(
            operation.move_rule_ids, operation.instances
        ):
          instances[rule_id].append(instance)

    for rule_id in rule_ids:
      self._storage_class_results[rule_id] = _StorageClassResult(
          succeeded_count=succeeded[rule_id],
          failed_count=failed[rule_id],
          unfinished_count=unfinished[rule_id],
          succeeded_instances=succeeded_instances[rule_id],
          failed_instances=failed_instances[rule_id],
          unfinished_instances=unfinished_instances[rule_id],
      )

  def _generate_report(self):
    """Generates report of storage class updates.

    Reports instances' updates and status (success, failure, still running) for
    each move rule and condition in ILM config.
    """
    self._compute_results()
    csv_separator = ','
    header = csv_separator.join([
        'from_storage_class',
        'to_storage_class',
        'condition',
        'rule_index',
        'condition_index',
        'succeeded',
        'failed',
        'unfinished',
    ])
    summarized_report_results = [header]
    detailed_report_results = [header]
    logging.info('Generating storage class updates report(s).')
    # TODO: Also add operation name.
    for rule_index, rule in enumerate(self._move_rules):
      from_storage_class = rule.from_storage_class
      to_storage_class = rule.to_storage_class
      for condition_index, condition in enumerate(rule.conditions):
        rule_id = ilm_types.MoveRuleId(
            rule_index=rule_index, condition_index=condition_index
        )
        if rule_id not in self._storage_class_results:
          continue
        condition_results = self._storage_class_results[rule_id]
        logging.info(
            'Instances moved from %s to %s due to %s (move rule %s, condition '
            '%s): %s succeeded, %s failed, %s unfinished',
            from_storage_class.value,
            to_storage_class.value,
            condition,
            rule_index,
            condition_index,
            condition_results.succeeded_count,
            condition_results.failed_count,
            condition_results.unfinished_count,
        )
        summarized_report_results.append(
            csv_separator.join([
                str(i)
                for i in [
                    from_storage_class.value,
                    to_storage_class.value,
                    condition,
                    rule_index,
                    condition_index,
                    condition_results.succeeded_count,
                    condition_results.failed_count,
                    condition_results.unfinished_count,
                ]
            ]),
        )
        if self._detailed_report:
          detailed_report_results.append(
              csv_separator.join([
                  str(i)
                  for i in [
                      from_storage_class.value,
                      to_storage_class.value,
                      condition,
                      rule_index,
                      condition_index,
                      f'"{condition_results.succeeded_instances}"',
                      f'"{condition_results.failed_instances}"',
                      f'"{condition_results.unfinished_instances}"',
                  ]
              ])
          )
    summarized_report_results_str = '\n'.join(summarized_report_results)
    pipeline_util.write_gcs_file(
        file_content=summarized_report_results_str,
        gcs_uri=self._summarized_report,
    )
    logging.info(
        'Wrote summarized report of updates to %s.', self._summarized_report
    )
    if self._detailed_report:
      detailed_report_results_str = '\n'.join(detailed_report_results)
      pipeline_util.write_gcs_file(
          file_content=detailed_report_results_str,
          gcs_uri=self._detailed_report,
      )
      logging.info(
          'Wrote detailed report of updates to %s.', self._detailed_report
      )

  def process(
      self,
      key_to_operations: Tuple[
          int, Iterable[ilm_types.SetStorageClassOperationMetadata]
      ],
  ) -> None:
    _, operations = key_to_operations
    if self._dry_run:
      self._operations_done = collections.deque(operations)
    else:
      for op in operations:
        if op.succeeded is None:
          self._operations_in_progress.append(op)
        else:
          self._operations_done.append(op)
      self._wait_for_operations_to_finish()
      self._cleanup_filter_files()
    self._generate_report()
