def run()

in 5-app-infra/3-artifact-publish/docker/flex-templates/gcs_to_bq_deidentification/decrypt-gcs-to-bq.py [0:0]


def run(argv=None):
    parser = argparse.ArgumentParser()
    
    parser.add_argument(
        '--gcs_input_file',
        required=True,
        help='Path to the input CSV file on GCS'
    )
    
    parser.add_argument(
        '--output_table',
        required=True,
        help='BigQuery output table in the format PROJECT:DATASET.TABLE'
    )
    
    parser.add_argument(
        '--bq_schema',
        required=True,
        help='Comma-separated list of BigQuery schema fields, e.g. name:STRING,age:INTEGER'
    )

    parser.add_argument(
        "--min_batch_size",
        default=10,
        type=int,
        help=(
            'Number of min records to be sent in a batch in '
            'the call to the Data Loss Prevention (DLP) API.'
        )
    )

    parser.add_argument(
        "--max_batch_size",
        default=1000,
        type=int,
        help=(
            'Number of max records to be sent in a batch in '
            'the call to the Data Loss Prevention (DLP) API.'
        )
    )

    parser.add_argument(
        '--dlp_project',
        required=True,
        help=(
            'ID of the project that holds the DLP template.'
        )
    )

    parser.add_argument(
        '--dlp_location',
        required=False,
        help=(
            'The Location of the DLP template resource.'
        )
    )
    parser.add_argument(
        '--deidentification_template_name',
        required=True,
        help=(
            'Name of the DLP Structured De-identification Template '
            'of the form "projects/<PROJECT>/locations/<LOCATION>'
            '/deidentifyTemplates/<TEMPLATE_ID>"'
        )
    )

    parser.add_argument(
        '--cryptoKeyName',
        required=True,
        help=(
            'GCP KMS Key URI as'
            'projects/<PROJECT_ID>/locations/<LOCATION>/keyRings/<KEY_RING>/cryptoKeys/<KEY_NAME>'
        )
    )
    parser.add_argument(
        '--wrappedKey',
        required=True,
        help=(
            'Tink Keyset base64 encoded wrapped key from Secret Manager'
            'projects/<PROJECT_ID>/secrets/<SECRET_NAME>/versions/<VERSION>'
        )
    )
    
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(
        pipeline_args,
        # runner='DirectRunner',  # Use DirectRunner for local testing
        # direct_num_workers=1  # Reduce parallelism to avoid threading issues
        # using cloudpickle over dill for serialization; observed issues in serializing on Dataflow
        pickle_library="cloudpickle",
    )

    with beam.Pipeline(options=pipeline_options) as p:
        # Extract schema headers from the schema argument
        headers = [field.split(':')[0] for field in known_args.bq_schema.split(',')]

        # Read CSV lines
        csv_lines = (
            p
            #Discard first line as is a header
            | 'Read CSV' >> beam.io.ReadFromText(known_args.gcs_input_file, skip_header_lines=1)    
        )

        # Convert to JSON for structured data
        decrypt_records = (
            csv_lines
            | 'Decrypt File Contents' >> beam.ParDo(DecryptFile(known_args.cryptoKeyName, known_args.wrappedKey))
            | 'Convert to JSON' >> beam.ParDo(ConvertToJSON(headers))
            | 'Parse JSON payload' >>
                beam.Map(json.loads)
            | 'Flatten lists' >>
                beam.FlatMap(normalize_data)
        )

        # Implement de-identification
        de_identified_messages = (
            decrypt_records
            | "Batching" >> BatchElements(
                min_batch_size=known_args.min_batch_size,
                max_batch_size=known_args.max_batch_size
                )
            | 'Convert dicts to table' >>
                beam.Map(from_list_dicts_to_table)
            | 'Call DLP de-identification' >>
            MaskDetectedDetails(
                project=known_args.dlp_project,
                location=known_args.dlp_location,
                template_name=known_args.deidentification_template_name
            )
            | 'Convert table to dicts' >>
                beam.FlatMap(from_table_to_list_dict)
            | 'Calculate Total Bytes' >> ParDo(CalculateTotalBytes(known_args.output_table))
            # stream insert into BQ table    
            | 'Write to BQ' >> beam.io.WriteToBigQuery(
                known_args.output_table,
                schema=known_args.bq_schema,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=BigQueryDisposition.WRITE_APPEND,
                method="STREAMING_INSERTS"
            )
        )