in 5-app-infra/3-artifact-publish/docker/flex-templates/gcs_to_bq_deidentification/decrypt-gcs-to-bq.py [0:0]
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--gcs_input_file',
required=True,
help='Path to the input CSV file on GCS'
)
parser.add_argument(
'--output_table',
required=True,
help='BigQuery output table in the format PROJECT:DATASET.TABLE'
)
parser.add_argument(
'--bq_schema',
required=True,
help='Comma-separated list of BigQuery schema fields, e.g. name:STRING,age:INTEGER'
)
parser.add_argument(
"--min_batch_size",
default=10,
type=int,
help=(
'Number of min records to be sent in a batch in '
'the call to the Data Loss Prevention (DLP) API.'
)
)
parser.add_argument(
"--max_batch_size",
default=1000,
type=int,
help=(
'Number of max records to be sent in a batch in '
'the call to the Data Loss Prevention (DLP) API.'
)
)
parser.add_argument(
'--dlp_project',
required=True,
help=(
'ID of the project that holds the DLP template.'
)
)
parser.add_argument(
'--dlp_location',
required=False,
help=(
'The Location of the DLP template resource.'
)
)
parser.add_argument(
'--deidentification_template_name',
required=True,
help=(
'Name of the DLP Structured De-identification Template '
'of the form "projects/<PROJECT>/locations/<LOCATION>'
'/deidentifyTemplates/<TEMPLATE_ID>"'
)
)
parser.add_argument(
'--cryptoKeyName',
required=True,
help=(
'GCP KMS Key URI as'
'projects/<PROJECT_ID>/locations/<LOCATION>/keyRings/<KEY_RING>/cryptoKeys/<KEY_NAME>'
)
)
parser.add_argument(
'--wrappedKey',
required=True,
help=(
'Tink Keyset base64 encoded wrapped key from Secret Manager'
'projects/<PROJECT_ID>/secrets/<SECRET_NAME>/versions/<VERSION>'
)
)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(
pipeline_args,
# runner='DirectRunner', # Use DirectRunner for local testing
# direct_num_workers=1 # Reduce parallelism to avoid threading issues
# using cloudpickle over dill for serialization; observed issues in serializing on Dataflow
pickle_library="cloudpickle",
)
with beam.Pipeline(options=pipeline_options) as p:
# Extract schema headers from the schema argument
headers = [field.split(':')[0] for field in known_args.bq_schema.split(',')]
# Read CSV lines
csv_lines = (
p
#Discard first line as is a header
| 'Read CSV' >> beam.io.ReadFromText(known_args.gcs_input_file, skip_header_lines=1)
)
# Convert to JSON for structured data
decrypt_records = (
csv_lines
| 'Decrypt File Contents' >> beam.ParDo(DecryptFile(known_args.cryptoKeyName, known_args.wrappedKey))
| 'Convert to JSON' >> beam.ParDo(ConvertToJSON(headers))
| 'Parse JSON payload' >>
beam.Map(json.loads)
| 'Flatten lists' >>
beam.FlatMap(normalize_data)
)
# Implement de-identification
de_identified_messages = (
decrypt_records
| "Batching" >> BatchElements(
min_batch_size=known_args.min_batch_size,
max_batch_size=known_args.max_batch_size
)
| 'Convert dicts to table' >>
beam.Map(from_list_dicts_to_table)
| 'Call DLP de-identification' >>
MaskDetectedDetails(
project=known_args.dlp_project,
location=known_args.dlp_location,
template_name=known_args.deidentification_template_name
)
| 'Convert table to dicts' >>
beam.FlatMap(from_table_to_list_dict)
| 'Calculate Total Bytes' >> ParDo(CalculateTotalBytes(known_args.output_table))
# stream insert into BQ table
| 'Write to BQ' >> beam.io.WriteToBigQuery(
known_args.output_table,
schema=known_args.bq_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND,
method="STREAMING_INSERTS"
)
)