datasets/deepmind/pipelines/alphafold/pipeline.yaml (255 lines of code) (raw):
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
---
resources: ~
dag:
airflow_version: 2
initialize:
dag_id: alphafold
default_args:
owner: "Google"
depends_on_past: False
start_date: '2022-03-01'
max_active_runs: 1
schedule_interval: "@once"
catchup: False
default_view: graph
tasks:
- operator: "CloudDataTransferServiceGCSToGCSOperator"
description: "Copy JSON metadata, accession IDs, and FASTA to public bucket"
args:
task_id: copy_json_metadata_accession_and_fasta_to_public_bucket
timeout: 43200 # 12 hours
retries: 0
wait: True
project_id: bigquery-public-data
source_bucket: "{{ var.json.deepmind.alphafold.source_bucket }}"
destination_bucket: "{{ var.json.deepmind.alphafold.destination_bucket }}"
object_conditions:
includePrefixes: ["metadata", "accession_ids.csv", "sequences.fasta"]
transfer_options:
overwriteWhen: DIFFERENT
deleteObjectsUniqueInSink: True
- operator: "CloudDataTransferServiceGCSToGCSOperator"
description: "Copy proteomes to public bucket"
args:
task_id: copy_proteomes_to_public_bucket
timeout: 43200 # 12 hours
retries: 0
wait: True
project_id: bigquery-public-data
source_bucket: "{{ var.json.deepmind.alphafold.source_bucket }}"
destination_bucket: "{{ var.json.deepmind.alphafold.destination_bucket }}"
object_conditions:
includePrefixes: ["proteomes"]
transfer_options:
overwriteWhen: DIFFERENT
deleteObjectsUniqueInSink: True
- operator: "CloudDataTransferServiceGCSToGCSOperator"
description: "Download accession_ids.csv to Composer bucket"
args:
task_id: download_accession_ids_to_composer_bucket
timeout: 43200 # 12 hours
retries: 0
wait: True
project_id: bigquery-public-data
source_bucket: "{{ var.json.deepmind.alphafold.source_bucket }}"
destination_bucket: "{{ var.value.composer_bucket }}"
destination_path: data/deepmind/alphafold
object_conditions:
includePrefixes: ["accession_ids.csv"]
transfer_options:
overwriteWhen: DIFFERENT
- operator: BashOperator
description: Download accession_ids.csv, then split it into multiple manifest files
args:
task_id: generate_manifests
bash_command: |
set -e
mkdir -p $WORKING_DIR
cut -d , -f 4 $WORKING_DIR/accession_ids.csv > $WORKING_DIR/accession_ids_trimmed.csv
split --numeric-suffixes=1 -a 3 -l $OBJECTS_PER_MANIFEST $WORKING_DIR/accession_ids_trimmed.csv $WORKING_DIR/part- --additional-suffix=.csv
env:
WORKING_DIR: /home/airflow/gcs/data/deepmind/alphafold
OBJECTS_PER_MANIFEST: "10000000"
SERVICE_ACCOUNT: "{{ var.json.deepmind.alphafold.service_account }}"
SOURCE_BUCKET: "{{ var.json.deepmind.alphafold.source_bucket }}"
- operator: BashOperator
description:
args:
task_id: suffix_confidence_v3_json
bash_command: |
set -e
mkdir -p $WORKING_DIR/manifests
for f in `find $WORKING_DIR/part*.csv -exec basename {} \;`;
do sed "s/$/-confidence_v3.json/" $WORKING_DIR/$f > $WORKING_DIR/manifests/manifest-confidence_v3_json-$f;
done
env:
WORKING_DIR: /home/airflow/gcs/data/deepmind/alphafold/
- operator: BashOperator
description:
args:
task_id: suffix_model_v3_cif
bash_command: |
set -e
mkdir -p $WORKING_DIR/manifests
for f in `find $WORKING_DIR/part*.csv -exec basename {} \;`;
do sed "s/$/-model_v3.cif/" $WORKING_DIR/$f > $WORKING_DIR/manifests/manifest-model_v3_cif-$f;
done
env:
WORKING_DIR: /home/airflow/gcs/data/deepmind/alphafold/
- operator: BashOperator
description:
args:
task_id: suffix_predicted_aligned_error_v3_json
bash_command: |
set -e
mkdir -p $WORKING_DIR/manifests
for f in `find $WORKING_DIR/part*.csv -exec basename {} \;`;
do sed "s/$/-predicted_aligned_error_v3.json/" $WORKING_DIR/$f > $WORKING_DIR/manifests/manifest-predicted_aligned_error_v3_json-$f;
done
env:
WORKING_DIR: /home/airflow/gcs/data/deepmind/alphafold/
- operator: "KubernetesPodOperator"
description: "Create and run transfer jobs using manifest files"
args:
task_id: "create_and_run_sts_jobs_using_manifests"
name: "create_and_run_sts_jobs_using_manifests"
namespace: "composer-user-workloads"
service_account_name: "default"
config_file: "/home/airflow/composer_kube_config"
image_pull_policy: "Always"
image: "{{ var.json.deepmind.alphafold.sts_jobs_generator }}"
env_vars:
MANIFEST_BUCKET: "{{ var.value.composer_bucket }}"
MANIFEST_PREFIX: data/deepmind/alphafold/manifests
SOURCE_BUCKET: "{{ var.json.deepmind.alphafold.source_bucket }}"
DESTINATION_BUCKET: "{{ var.json.deepmind.alphafold.destination_bucket }}"
GCP_PROJECT: "{{ var.value.gcp_project }}"
- operator: "CloudDataTransferServiceGCSToGCSOperator"
description: "Copy manifests to public bucket"
args:
task_id: copy_manifests_to_public_bucket
timeout: 43200 # 12 hours
retries: 0
wait: True
project_id: bigquery-public-data
source_bucket: "{{ var.value.composer_bucket }}"
source_path: data/deepmind/alphafold/manifests
destination_bucket: "{{ var.json.deepmind.alphafold.destination_bucket }}"
destination_path: manifests
transfer_options:
overwriteWhen: DIFFERENT
- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Load JSON metadata files to BQ"
args:
task_id: "load_json_metadata_to_bq"
bucket: "{{ var.json.deepmind.alphafold.destination_bucket }}" # Use what's been copied over
source_objects: ["metadata/*.json"]
source_format: "NEWLINE_DELIMITED_JSON"
destination_project_dataset_table: "deepmind_alphafold.metadata"
write_disposition: "WRITE_TRUNCATE"
schema_fields:
- description: An array of AFDB versions this prediction has had
mode: REPEATED
name: allVersions
type: INTEGER
- description: The latest AFDB version for this prediction
mode: NULLABLE
name: latestVersion
type: INTEGER
- description: List of common organism names
mode: REPEATED
name: organismCommonNames
type: STRING
- description: Number of the last residue in the entry relative to the UniProt entry. This is equal to the length of the protein unless we are dealing with protein fragments
mode: NULLABLE
name: uniprotEnd
type: INTEGER
- description: Short names of the protein
mode: REPEATED
name: proteinShortNames
type: STRING
- description: Number of the first residue in the entry relative to the UniProt entry. This is 1 unless we are dealing with protein fragments
mode: NULLABLE
name: uniprotStart
type: INTEGER
- description: Fraction of the residues in the prediction with pLDDT between 70 and 90
mode: NULLABLE
name: fractionPlddtConfident
type: FLOAT
- description: List of synonyms for the organism
mode: REPEATED
name: organismSynonyms
type: STRING
- description: Fraction of the residues in the prediction with pLDDT greater than 90
mode: NULLABLE
name: fractionPlddtVeryHigh
type: FLOAT
- description: Full names of the protein
mode: REPEATED
name: proteinFullNames
type: STRING
- description: The mean pLDDT of this prediction
mode: NULLABLE
name: globalMetricValue
type: FLOAT
- description: The scientific name of the organism
mode: NULLABLE
name: organismScientificName
type: STRING
- description: The name recommended by the UniProt consortium
mode: NULLABLE
name: uniprotDescription
type: STRING
- description: Fraction of the residues in the prediction with pLDDT between 50 and 70
mode: NULLABLE
name: fractionPlddtLow
type: FLOAT
- description: Uniprot accession ID
mode: NULLABLE
name: uniprotAccession
type: STRING
- description: CRC64 hash of the sequence. Can be used for cheaper lookups.
mode: NULLABLE
name: sequenceChecksum
type: STRING
- description: NCBI taxonomic id of the originating species
mode: NULLABLE
name: taxId
type: INTEGER
- description: The Uniprot EntryName field
mode: NULLABLE
name: uniprotId
type: STRING
- description: The date of creation for this entry, e.g. "2022-06-01"
mode: NULLABLE
name: modelCreatedDate
type: DATE
- description: Fraction of the residues in the prediction with pLDDT less than 50
mode: NULLABLE
name: fractionPlddtVeryLow
type: FLOAT
- description: Date when the sequence data was last modified in UniProt
mode: NULLABLE
name: sequenceVersionDate
type: DATE
- description: The AFDB entry ID, e.g. "AF-Q1HGU3-F1"
mode: NULLABLE
name: entryId
type: STRING
- description: Additional synonyms for the gene
mode: REPEATED
name: geneSynonyms
type: STRING
- description: Amino acid sequence for this prediction
mode: NULLABLE
name: uniprotSequence
type: STRING
- description: The name of the gene if known, e.g. “COII”
mode: NULLABLE
name: gene
type: STRING
graph_paths:
- "[copy_json_metadata_accession_and_fasta_to_public_bucket, copy_proteomes_to_public_bucket] >> load_json_metadata_to_bq"
- "download_accession_ids_to_composer_bucket >> generate_manifests >> [suffix_confidence_v3_json, suffix_model_v3_cif, suffix_predicted_aligned_error_v3_json]"
- "[suffix_confidence_v3_json, suffix_model_v3_cif, suffix_predicted_aligned_error_v3_json] >> create_and_run_sts_jobs_using_manifests"
- "create_and_run_sts_jobs_using_manifests >> [copy_manifests_to_public_bucket, load_json_metadata_to_bq]"