datasets/cms_medicare/pipelines/outpatient_charges/outpatient_charges_dag.py (408 lines of code) (raw):
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators import kubernetes_pod
from airflow.providers.google.cloud.transfers import gcs_to_bigquery
default_args = {
"owner": "Google",
"depends_on_past": False,
"start_date": "2021-03-01",
}
with DAG(
dag_id="cms_medicare.outpatient_charges",
default_args=default_args,
max_active_runs=1,
schedule_interval="@once",
catchup=False,
default_view="graph",
) as dag:
# Run CSV transform within kubernetes pod
outpatient_2011_transform_csv = kubernetes_pod.KubernetesPodOperator(
task_id="outpatient_2011_transform_csv",
startup_timeout_seconds=600,
name="cms_medicare_outpatient_charges_2011",
namespace="composer-user-workloads",
service_account_name="default",
config_file="/home/airflow/composer_kube_config",
image_pull_policy="Always",
image="{{ var.json.cms_medicare.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://www.cms.gov/Research-Statistics-Data-and-Systems/Statistics-Trends-and-Reports/Medicare-Provider-Charge-Data/Downloads/Outpatient_Data_2011_CSV.zip",
"SOURCE_FILE": "files/data.zip",
"TARGET_FILE": "files/data_output.csv",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/cms_medicare/outpatient_charges_2011/data_output.csv",
"CSV_HEADERS": '["provider_id","provider_name","provider_street_address","provider_city","provider_state","provider_zipcode","apc","hospital_referral_region","outpatient_services","average_estimated_submitted_charges","average_total_payments"]',
"RENAME_MAPPINGS": '{"Provider Id": "provider_id","Provider Name": "provider_name","Provider Street Address": "provider_street_address","Provider City": "provider_city","Provider State": "provider_state","Provider Zip Code": "provider_zipcode","APC": "apc","Hospital Referral Region (HRR) Description": "hospital_referral_region","Outpatient Services": "outpatient_services","Average Estimated Submitted Charges": "average_estimated_submitted_charges","Average Total Payments": "average_total_payments"}',
"PIPELINE_NAME": "outpatient_charges_2011",
},
)
# Run CSV transform within kubernetes pod
outpatient_2012_transform_csv = kubernetes_pod.KubernetesPodOperator(
task_id="outpatient_2012_transform_csv",
startup_timeout_seconds=600,
name="cms_medicare_outpatient_charges_2012",
namespace="composer-user-workloads",
service_account_name="default",
config_file="/home/airflow/composer_kube_config",
image_pull_policy="Always",
image="{{ var.json.cms_medicare.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://www.cms.gov/Research-Statistics-Data-and-Systems/Statistics-Trends-and-Reports/Medicare-Provider-Charge-Data/Downloads/Outpatient_Data_2012_CSV.zip",
"SOURCE_FILE": "files/data.zip",
"TARGET_FILE": "files/data_output.csv",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/cms_medicare/outpatient_charges_2012/data_output.csv",
"CSV_HEADERS": '["provider_id","provider_name","provider_street_address","provider_city","provider_state","provider_zipcode","apc","hospital_referral_region","outpatient_services","average_estimated_submitted_charges","average_total_payments"]',
"RENAME_MAPPINGS": '{"Provider Id": "provider_id","Provider Name": "provider_name","Provider Street Address": "provider_street_address","Provider City": "provider_city","Provider State": "provider_state","Provider Zip Code": "provider_zipcode","APC": "apc","Hospital Referral Region (HRR) Description": "hospital_referral_region","Outpatient Services": "outpatient_services","Average Estimated Submitted Charges": "average_estimated_submitted_charges","Average Total Payments": "average_total_payments"}',
"PIPELINE_NAME": "outpatient_charges_2012",
},
)
# Run CSV transform within kubernetes pod
outpatient_2013_transform_csv = kubernetes_pod.KubernetesPodOperator(
task_id="outpatient_2013_transform_csv",
startup_timeout_seconds=600,
name="cms_medicare_outpatient_charges_2013",
namespace="composer-user-workloads",
service_account_name="default",
config_file="/home/airflow/composer_kube_config",
image_pull_policy="Always",
image="{{ var.json.cms_medicare.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://www.cms.gov/Research-Statistics-Data-and-Systems/Statistics-Trends-and-Reports/Medicare-Provider-Charge-Data/Downloads/Outpatient_Data_2013_CSV_v2.zip",
"SOURCE_FILE": "files/data.zip",
"TARGET_FILE": "files/data_output.csv",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/cms_medicare/outpatient_charges_2013/data_output.csv",
"CSV_HEADERS": '["provider_id","provider_name","provider_street_address","provider_city","provider_state","provider_zipcode","apc","hospital_referral_region","outpatient_services","average_estimated_submitted_charges","average_total_payments"]',
"RENAME_MAPPINGS": '{"Provider Id": "provider_id","Provider Name": "provider_name","Provider Street Address": "provider_street_address","Provider City": "provider_city","Provider State": "provider_state","Provider Zip Code": "provider_zipcode","APC": "apc","Hospital Referral Region (HRR) Description": "hospital_referral_region","Outpatient Services": "outpatient_services","Average Estimated Submitted Charges": "average_estimated_submitted_charges","Average Total Payments": "average_total_payments"}',
"PIPELINE_NAME": "outpatient_charges_2013",
},
)
# Run CSV transform within kubernetes pod
outpatient_2014_transform_csv = kubernetes_pod.KubernetesPodOperator(
task_id="outpatient_2014_transform_csv",
startup_timeout_seconds=600,
name="cms_medicare_outpatient_charges_2014",
namespace="composer-user-workloads",
service_account_name="default",
config_file="/home/airflow/composer_kube_config",
image_pull_policy="Always",
image="{{ var.json.cms_medicare.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://www.cms.gov/Research-Statistics-Data-and-Systems/Statistics-Trends-and-Reports/Medicare-Provider-Charge-Data/Downloads/Outpatient_Data_2014_CSV.zip",
"SOURCE_FILE": "files/data.zip",
"TARGET_FILE": "files/data_output.csv",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/cms_medicare/outpatient_charges_2014/data_output.csv",
"CSV_HEADERS": '["provider_id","provider_name","provider_street_address","provider_city","provider_state","provider_zipcode","apc","hospital_referral_region","outpatient_services","average_estimated_submitted_charges","average_total_payments"]',
"RENAME_MAPPINGS": '{"provider_id": "provider_id","provider_name": "provider_name","Provider_Street_Address": "provider_street_address","Provider_City": "provider_city","Provider_State": "provider_state","Provider_Zip_Code": "provider_zipcode","apc": "apc","Hospital_Referral_Region": "hospital_referral_region","Outpatient_Services": "outpatient_services","Average_Estimated_Submitted_Charges": "average_estimated_submitted_charges","Average_Total_Payments": "average_total_payments"}',
"PIPELINE_NAME": "outpatient_charges_2014",
},
)
# Task to load CSV data to a BigQuery table
load_outpatient_2011_to_bq = gcs_to_bigquery.GCSToBigQueryOperator(
task_id="load_outpatient_2011_to_bq",
bucket="{{ var.value.composer_bucket }}",
source_objects=["data/cms_medicare/outpatient_charges_2011/data_output.csv"],
source_format="CSV",
destination_project_dataset_table="cms_medicare.outpatient_charges_2011",
skip_leading_rows=1,
write_disposition="WRITE_TRUNCATE",
schema_fields=[
{
"description": "The CMS Certification Number (CCN) of the provider billing for outpatient hospital services",
"name": "provider_id",
"type": "STRING",
"mode": "REQUIRED",
},
{
"description": "The name of the provider",
"name": "provider_name",
"type": "STRING",
"mode": "NULLABLE",
},
{
"description": "The street address in which the provider is physically located",
"name": "provider_street_address",
"type": "STRING",
"mode": "NULLABLE",
},
{
"description": "The city in which the provider is physically located",
"name": "provider_city",
"type": "STRING",
"mode": "NULLABLE",
},
{
"description": "The state in which the provider is physically located",
"name": "provider_state",
"type": "STRING",
"mode": "NULLABLE",
},
{
"description": "The zip code in which the provider is physically located",
"name": "provider_zipcode",
"type": "STRING",
"mode": "NULLABLE",
},
{
"description": "The code and description identifying the MS-DRG. MS-DRGs are a classification system that groups similar clinical conditions (diagnoses) and the procedures furnished by the hospital during the stay",
"name": "apc",
"type": "STRING",
"mode": "REQUIRED",
},
{
"description": "Code and description identifying the APC. APCs are a classification system where individual services (Healthcare Common Procedure Coding System [HCPCS] codes) are assigned based on similar clinical characteristics and similar costs",
"name": "hospital_referral_region",
"type": "STRING",
"mode": "NULLABLE",
},
{
"description": "The number of discharges billed by the provider for inpatient hospital services",
"name": "outpatient_services",
"type": "INTEGER",
"mode": "NULLABLE",
},
{
"description": "The number of services billed by the provider for outpatient hospital services",
"name": "average_estimated_submitted_charges",
"type": "FLOAT",
"mode": "NULLABLE",
},
{
"description": "The provider's average estimated submitted charge for services covered by Medicare for the APC. These will vary from hospital to hospital because of differences in hospital charge structures",
"name": "average_total_payments",
"type": "FLOAT",
"mode": "NULLABLE",
},
],
)
# Task to load CSV data to a BigQuery table
load_outpatient_2012_to_bq = gcs_to_bigquery.GCSToBigQueryOperator(
task_id="load_outpatient_2012_to_bq",
bucket="{{ var.value.composer_bucket }}",
source_objects=["data/cms_medicare/outpatient_charges_2012/data_output.csv"],
source_format="CSV",
destination_project_dataset_table="cms_medicare.outpatient_charges_2012",
skip_leading_rows=1,
write_disposition="WRITE_TRUNCATE",
schema_fields=[
{
"description": "The CMS Certification Number (CCN) of the provider billing for outpatient hospital services",
"name": "provider_id",
"type": "STRING",
"mode": "REQUIRED",
},
{
"description": "The name of the provider",
"name": "provider_name",
"type": "STRING",
"mode": "NULLABLE",
},
{
"description": "The street address in which the provider is physically located",
"name": "provider_street_address",
"type": "STRING",
"mode": "NULLABLE",
},
{
"description": "The city in which the provider is physically located",
"name": "provider_city",
"type": "STRING",
"mode": "NULLABLE",
},
{
"description": "The state in which the provider is physically located",
"name": "provider_state",
"type": "STRING",
"mode": "NULLABLE",
},
{
"description": "The zip code in which the provider is physically located",
"name": "provider_zipcode",
"type": "STRING",
"mode": "NULLABLE",
},
{
"description": "The code and description identifying the MS-DRG. MS-DRGs are a classification system that groups similar clinical conditions (diagnoses) and the procedures furnished by the hospital during the stay",
"name": "apc",
"type": "STRING",
"mode": "REQUIRED",
},
{
"description": "Code and description identifying the APC. APCs are a classification system where individual services (Healthcare Common Procedure Coding System [HCPCS] codes) are assigned based on similar clinical characteristics and similar costs",
"name": "hospital_referral_region",
"type": "STRING",
"mode": "NULLABLE",
},
{
"description": "The number of discharges billed by the provider for inpatient hospital services",
"name": "outpatient_services",
"type": "INTEGER",
"mode": "NULLABLE",
},
{
"description": "The number of services billed by the provider for outpatient hospital services",
"name": "average_estimated_submitted_charges",
"type": "FLOAT",
"mode": "NULLABLE",
},
{
"description": "The provider's average estimated submitted charge for services covered by Medicare for the APC. These will vary from hospital to hospital because of differences in hospital charge structures",
"name": "average_total_payments",
"type": "FLOAT",
"mode": "NULLABLE",
},
],
)
# Task to load CSV data to a BigQuery table
load_outpatient_2013_to_bq = gcs_to_bigquery.GCSToBigQueryOperator(
task_id="load_outpatient_2013_to_bq",
bucket="{{ var.value.composer_bucket }}",
source_objects=["data/cms_medicare/outpatient_charges_2013/data_output.csv"],
source_format="CSV",
destination_project_dataset_table="cms_medicare.outpatient_charges_2013",
skip_leading_rows=1,
write_disposition="WRITE_TRUNCATE",
schema_fields=[
{
"description": "The CMS Certification Number (CCN) of the provider billing for outpatient hospital services",
"name": "provider_id",
"type": "STRING",
"mode": "REQUIRED",
},
{
"description": "The name of the provider",
"name": "provider_name",
"type": "STRING",
"mode": "NULLABLE",
},
{
"description": "The street address in which the provider is physically located",
"name": "provider_street_address",
"type": "STRING",
"mode": "NULLABLE",
},
{
"description": "The city in which the provider is physically located",
"name": "provider_city",
"type": "STRING",
"mode": "NULLABLE",
},
{
"description": "The state in which the provider is physically located",
"name": "provider_state",
"type": "STRING",
"mode": "NULLABLE",
},
{
"description": "The zip code in which the provider is physically located",
"name": "provider_zipcode",
"type": "STRING",
"mode": "NULLABLE",
},
{
"description": "The code and description identifying the MS-DRG. MS-DRGs are a classification system that groups similar clinical conditions (diagnoses) and the procedures furnished by the hospital during the stay",
"name": "apc",
"type": "STRING",
"mode": "REQUIRED",
},
{
"description": "Code and description identifying the APC. APCs are a classification system where individual services (Healthcare Common Procedure Coding System [HCPCS] codes) are assigned based on similar clinical characteristics and similar costs",
"name": "hospital_referral_region",
"type": "STRING",
"mode": "NULLABLE",
},
{
"description": "The number of discharges billed by the provider for inpatient hospital services",
"name": "outpatient_services",
"type": "INTEGER",
"mode": "NULLABLE",
},
{
"description": "The number of services billed by the provider for outpatient hospital services",
"name": "average_estimated_submitted_charges",
"type": "FLOAT",
"mode": "NULLABLE",
},
{
"description": "The provider's average estimated submitted charge for services covered by Medicare for the APC. These will vary from hospital to hospital because of differences in hospital charge structures",
"name": "average_total_payments",
"type": "FLOAT",
"mode": "NULLABLE",
},
],
)
# Task to load CSV data to a BigQuery table
load_outpatient_2014_to_bq = gcs_to_bigquery.GCSToBigQueryOperator(
task_id="load_outpatient_2014_to_bq",
bucket="{{ var.value.composer_bucket }}",
source_objects=["data/cms_medicare/outpatient_charges_2014/data_output.csv"],
source_format="CSV",
destination_project_dataset_table="cms_medicare.outpatient_charges_2014",
skip_leading_rows=1,
write_disposition="WRITE_TRUNCATE",
schema_fields=[
{
"description": "The CMS Certification Number (CCN) of the provider billing for outpatient hospital services",
"name": "provider_id",
"type": "STRING",
"mode": "REQUIRED",
},
{
"description": "The name of the provider",
"name": "provider_name",
"type": "STRING",
"mode": "NULLABLE",
},
{
"description": "The street address in which the provider is physically located",
"name": "provider_street_address",
"type": "STRING",
"mode": "NULLABLE",
},
{
"description": "The city in which the provider is physically located",
"name": "provider_city",
"type": "STRING",
"mode": "NULLABLE",
},
{
"description": "The state in which the provider is physically located",
"name": "provider_state",
"type": "STRING",
"mode": "NULLABLE",
},
{
"description": "The zip code in which the provider is physically located",
"name": "provider_zipcode",
"type": "STRING",
"mode": "NULLABLE",
},
{
"description": "The code and description identifying the MS-DRG. MS-DRGs are a classification system that groups similar clinical conditions (diagnoses) and the procedures furnished by the hospital during the stay",
"name": "apc",
"type": "STRING",
"mode": "REQUIRED",
},
{
"description": "Code and description identifying the APC. APCs are a classification system where individual services (Healthcare Common Procedure Coding System [HCPCS] codes) are assigned based on similar clinical characteristics and similar costs",
"name": "hospital_referral_region",
"type": "STRING",
"mode": "NULLABLE",
},
{
"description": "The number of discharges billed by the provider for inpatient hospital services",
"name": "outpatient_services",
"type": "INTEGER",
"mode": "NULLABLE",
},
{
"description": "The number of services billed by the provider for outpatient hospital services",
"name": "average_estimated_submitted_charges",
"type": "FLOAT",
"mode": "NULLABLE",
},
{
"description": "The provider's average estimated submitted charge for services covered by Medicare for the APC. These will vary from hospital to hospital because of differences in hospital charge structures",
"name": "average_total_payments",
"type": "FLOAT",
"mode": "NULLABLE",
},
],
)
outpatient_2011_transform_csv >> load_outpatient_2011_to_bq
outpatient_2012_transform_csv >> load_outpatient_2012_to_bq
outpatient_2013_transform_csv >> load_outpatient_2013_to_bq
outpatient_2014_transform_csv >> load_outpatient_2014_to_bq