ilm/batch_pipeline_main.py (83 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ==============================================================================
"""Image Lifecycle Management (ILM) Dataflow batch pipeline main."""
import argparse
import json
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from ilm.ilm_lib import dicom_store_lib
from ilm.ilm_lib import heuristics
from ilm.ilm_lib import logs_lib
from ilm.ilm_lib import pipeline_util
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
'--ilm_config_gcs_uri',
dest='ilm_config_gcs_uri',
default=None,
help='GCS URI for ILM config.',
)
return parser.parse_known_args()
def run() -> None:
"""Main entry point; defines and runs the pipeline."""
known_args, pipeline_args = parse_args()
pipeline_options = PipelineOptions(pipeline_args)
ilm_cfg = pipeline_util.read_ilm_config(known_args.ilm_config_gcs_uri)
logging.info(
'Starting pipeline with options %s and ILM config %s',
pipeline_options.get_all_options(),
json.dumps(json.loads(ilm_cfg.to_json()), indent=2),
)
with beam.Pipeline(options=pipeline_options) as pipeline:
# Read audit logs and count instance access.
instances_to_access_count = (
pipeline
| 'ReadDataAccessLogs'
>> beam.io.ReadFromBigQuery(
query=logs_lib.get_data_access_logs_query(ilm_cfg),
use_standard_sql=True,
)
| 'ParseAuditLogs'
>> beam.ParDo(logs_lib.ParseDataAccessLogsDoFn(ilm_cfg))
| 'GroupByInstance' >> beam.GroupByKey()
| 'ComputeTotalAccessCount'
>> beam.Map(logs_lib.compute_log_access_metadata)
)
# Read DICOM instances metadata.
instances_to_metadata = (
pipeline
| 'ReadFromDicomStoreTable'
>> beam.io.ReadFromBigQuery(
query=dicom_store_lib.get_dicom_store_query(ilm_cfg),
use_standard_sql=True,
)
| 'ParseDicomStoreMetadata'
>> beam.Map(dicom_store_lib.parse_dicom_metadata)
| 'FilterOutDisallowList'
>> beam.Filter(pipeline_util.should_keep_instance, ilm_cfg)
)
# Merge {DICOM metadata, access count} and compute storage class changes.
instances_to_merge = [instances_to_access_count, instances_to_metadata]
instances_to_move = (
instances_to_merge
| 'MergeInstances' >> beam.CoGroupByKey()
| 'IncludeAccessCountInMetadata'
>> beam.ParDo(pipeline_util.include_access_count_in_metadata)
| 'ComputeStorageClassChanges'
>> beam.ParDo(heuristics.ComputeStorageClassChangesDoFn(ilm_cfg))
)
# Batch instances and write SetBlobStorageSettingsRequest filter files to
# GCS.
filter_files = (
instances_to_move
| 'KeyByNewStorageClass' >> beam.Map(lambda x: (x.new_storage_class, x))
| 'BatchInstances'
>> beam.GroupIntoBatches(
batch_size=ilm_cfg.dicom_store_config.set_storage_class_max_num_instances,
)
| 'GenerateFilterFiles'
>> beam.ParDo(dicom_store_lib.GenerateFilterFilesDoFn(ilm_cfg))
)
# Perform storage class updates in DICOM Store and generate report(s).
_ = (
filter_files
| 'UpdateStorageClasses'
>> beam.ParDo(dicom_store_lib.UpdateStorageClassesDoFn(ilm_cfg))
| 'KeyBySingleKey' >> beam.Map(lambda x: (0, x))
| 'CombineAllOperations' >> beam.GroupByKey()
| 'WaitForLongRunningOperationsAndGenerateReport'
>> beam.ParDo(dicom_store_lib.GenerateReportDoFn(ilm_cfg))
)
if __name__ == '__main__':
run()