healthcare/api-client/v1beta1/fhir/fhir_resources.py (349 lines of code) (raw):
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import json
import os
from google.auth.transport import requests
from google.oauth2 import service_account
_BASE_URL = "https://healthcare.googleapis.com/v1beta1"
def get_session(service_account_json):
"""
Returns an authorized Requests Session class using the service account
credentials JSON. This class is used to perform requests to the
Healthcare API endpoint.
"""
# Pass in the credentials and project ID. If none supplied, get them
# from the environment.
credentials = service_account.Credentials.from_service_account_file(
service_account_json
)
scoped_credentials = credentials.with_scopes(
["https://www.googleapis.com/auth/cloud-platform"]
)
# Create a requests Session object with the credentials.
session = requests.AuthorizedSession(scoped_credentials)
return session
def create_patient(
service_account_json, base_url, project_id, cloud_region, dataset_id, fhir_store_id
):
"""Creates a new Patient resource in a FHIR store."""
url = f"{base_url}/projects/{project_id}/locations/{cloud_region}"
fhir_store_path = "{}/datasets/{}/fhirStores/{}/fhir/Patient".format(
url, dataset_id, fhir_store_id
)
# Make an authenticated API request
session = get_session(service_account_json)
headers = {"Content-Type": "application/fhir+json;charset=utf-8"}
body = {
"name": [{"use": "official", "family": "Smith", "given": ["Darcy"]}],
"gender": "female",
"birthDate": "1970-01-01",
"resourceType": "Patient",
}
response = session.post(fhir_store_path, headers=headers, json=body)
response.raise_for_status()
resource = response.json()
print("Created Patient resource with ID {}".format(resource["id"]))
return response
def create_encounter(
service_account_json,
base_url,
project_id,
cloud_region,
dataset_id,
fhir_store_id,
patient_id,
):
"""Creates a new Encounter resource in a FHIR store based on a Patient."""
url = f"{base_url}/projects/{project_id}/locations/{cloud_region}"
fhir_store_path = "{}/datasets/{}/fhirStores/{}/fhir/Encounter".format(
url, dataset_id, fhir_store_id
)
# Make an authenticated API request
session = get_session(service_account_json)
headers = {"Content-Type": "application/fhir+json;charset=utf-8"}
body = {
"status": "finished",
"class": {
"system": "http://hl7.org/fhir/v3/ActCode",
"code": "IMP",
"display": "inpatient encounter",
},
"reasonCode": [
{
"text": "The patient had an abnormal heart rate. She was concerned about this."
}
],
"subject": {"reference": f"Patient/{patient_id}"},
"resourceType": "Encounter",
}
response = session.post(fhir_store_path, headers=headers, json=body)
response.raise_for_status()
resource = response.json()
print("Created Encounter resource with ID {}".format(resource["id"]))
return response
def create_observation(
service_account_json,
base_url,
project_id,
cloud_region,
dataset_id,
fhir_store_id,
patient_id,
encounter_id,
):
"""
Creates a new Observation resource in a FHIR store based on
an Encounter.
"""
url = f"{base_url}/projects/{project_id}/locations/{cloud_region}"
fhir_store_path = "{}/datasets/{}/fhirStores/{}/fhir/Observation".format(
url, dataset_id, fhir_store_id
)
# Make an authenticated API request
session = get_session(service_account_json)
headers = {"Content-Type": "application/fhir+json;charset=utf-8"}
body = {
"resourceType": "Observation",
"status": "final",
"subject": {"reference": f"Patient/{patient_id}"},
"effectiveDateTime": "2020-01-01T00:00:00+00:00",
"code": {
"coding": [
{
"system": "http://loinc.org",
"code": "8867-4",
"display": "Heart rate",
}
]
},
"valueQuantity": {"value": 55, "unit": "bpm"},
"encounter": {"reference": f"Encounter/{encounter_id}"},
}
response = session.post(fhir_store_path, headers=headers, json=body)
response.raise_for_status()
resource = response.json()
print("Created Observation resource with ID {}".format(resource["id"]))
return response
def delete_resource(
service_account_json,
base_url,
project_id,
cloud_region,
dataset_id,
fhir_store_id,
resource_type,
resource_id,
):
"""
Deletes a FHIR resource. Regardless of whether the operation succeeds or
fails, the server returns a 200 OK HTTP status code. To check that the
resource was successfully deleted, search for or get the resource and
see if it exists.
"""
url = f"{base_url}/projects/{project_id}/locations/{cloud_region}"
resource_path = "{}/datasets/{}/fhirStores/{}/fhir/{}/{}".format(
url, dataset_id, fhir_store_id, resource_type, resource_id
)
# Make an authenticated API request
session = get_session(service_account_json)
response = session.delete(resource_path)
print(f"Deleted {resource_type} resource with ID {resource_id}.")
return response
# [START healthcare_conditional_update_resource]
def conditional_update_resource(
service_account_json,
base_url,
project_id,
cloud_region,
dataset_id,
fhir_store_id,
patient_id,
encounter_id,
):
"""
If a resource is found based on the search criteria specified in
the query parameters, updates the entire contents of that resource.
"""
url = f"{base_url}/projects/{project_id}/locations/{cloud_region}"
# The search query in this request updates all Observations
# using the Observation's identifier (ABC-12345 in my-code-system)
# so that their 'status' is 'cancelled'.
resource_path = "{}/datasets/{}/fhirStores/{}/fhir/Observation".format(
url, dataset_id, fhir_store_id
)
# Make an authenticated API request
session = get_session(service_account_json)
body = {
"resourceType": "Observation",
"status": "cancelled",
"subject": {"reference": f"Patient/{patient_id}"},
"effectiveDateTime": "2020-01-01T00:00:00+00:00",
"code": {
"coding": [
{
"system": "http://loinc.org",
"code": "8867-4",
"display": "Heart rate",
}
]
},
"valueQuantity": {"value": 55, "unit": "bpm"},
"encounter": {"reference": f"Encounter/{encounter_id}"},
}
headers = {"Content-Type": "application/fhir+json;charset=utf-8"}
params = {"identifier": "my-code-system|ABC-12345"}
response = session.put(resource_path, headers=headers, params=params, json=body)
response.raise_for_status()
resource = response.json()
print(
"Conditionally updated Observations with the identifier "
"'my-code-system|ABC-12345' to have a 'status' of "
"'cancelled'."
)
print(json.dumps(resource, indent=2))
return resource
# [END healthcare_conditional_update_resource]
# [START healthcare_conditional_delete_resource]
def conditional_delete_resource(
service_account_json, base_url, project_id, cloud_region, dataset_id, fhir_store_id
):
"""Deletes FHIR resources that match a search query."""
url = f"{base_url}/projects/{project_id}/locations/{cloud_region}"
# The search query in this request deletes all Observations
# with a status of 'cancelled'.
resource_path = "{}/datasets/{}/fhirStores/{}/fhir/Observation".format(
url, dataset_id, fhir_store_id
)
# The search query is passed in as a query string parameter.
params = {"status": "cancelled"}
# Make an authenticated API request
session = get_session(service_account_json)
response = session.delete(resource_path, params=params)
print(response.url)
if response.status_code != 404: # Don't consider missing to be error
response.raise_for_status()
print("Conditionally deleted all Observations with status='cancelled'.")
return response
# [END healthcare_conditional_delete_resource]
# [START healthcare_conditional_patch_resource]
def conditional_patch_resource(
service_account_json, base_url, project_id, cloud_region, dataset_id, fhir_store_id
):
"""
If a resource is found based on the search criteria specified in
the query parameters, updates part of that resource by
applying the operations specified in a JSON Patch document.
"""
url = f"{base_url}/projects/{project_id}/locations/{cloud_region}"
# The search query in this request updates all Observations
# if the subject of the Observation is a particular patient.
resource_path = "{}/datasets/{}/fhirStores/{}/fhir/Observation".format(
url, dataset_id, fhir_store_id
)
# Make an authenticated API request
session = get_session(service_account_json)
headers = {"Content-Type": "application/json-patch+json"}
body = json.dumps(
[
{
"op": "replace",
"path": "/valueQuantity/value",
# Sets the BPM for all matching Observations to 80. This
# is the portion of the request being patched.
"value": 80,
}
]
)
# The search query is passed in as a query string parameter.
params = {"identifier": "my-code-system|ABC-12345"}
response = session.patch(resource_path, headers=headers, params=params, data=body)
response.raise_for_status()
print(response.url)
resource = response.json()
print(
"Conditionally patched all Observations with the "
"identifier 'my-code-system|ABC-12345' to use a BPM of 80."
)
print(json.dumps(resource, indent=2))
return resource
# [END healthcare_conditional_patch_resource]
def parse_command_line_args():
"""Parses command line arguments."""
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
"--service_account_json",
default=os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"),
help="Path to service account JSON file.",
)
parser.add_argument("--base_url", default=_BASE_URL, help="Healthcare API URL.")
parser.add_argument(
"--project_id",
default=os.environ.get("GOOGLE_CLOUD_PROJECT"),
help="GCP project name",
)
parser.add_argument("--cloud_region", default="us-central1", help="GCP region")
parser.add_argument("--dataset_id", default=None, help="Name of dataset")
parser.add_argument("--fhir_store_id", default=None, help="Name of FHIR store")
parser.add_argument(
"--resource_type",
default=None,
help="The type of resource. First letter must be capitalized",
)
parser.add_argument(
"--resource_id", default=None, help="Identifier for a FHIR resource"
)
parser.add_argument(
"--patient_id",
default=None,
help="Identifier for a Patient resource. Can be used as a reference "
"for an Encounter/Observation",
)
parser.add_argument(
"--encounter_id",
default=None,
help="Identifier for an Encounter resource. Can be used as a "
"reference for an Observation",
)
command = parser.add_subparsers(dest="command")
command.add_parser("create-patient", help=create_patient.__doc__)
command.add_parser("create-encounter", help=create_encounter.__doc__)
command.add_parser("create-observation", help=create_observation.__doc__)
command.add_parser("delete-resource", help=delete_resource.__doc__)
command.add_parser(
"conditional-delete-resource", help=conditional_delete_resource.__doc__
)
command.add_parser(
"conditional-update-resource", help=conditional_update_resource.__doc__
)
command.add_parser(
"conditional-patch-resource", help=conditional_patch_resource.__doc__
)
return parser.parse_args()
def run_command(args):
"""Calls the program using the specified command."""
if args.project_id is None:
print(
"You must specify a project ID or set the "
'"GOOGLE_CLOUD_PROJECT" environment variable.'
)
return
elif args.command == "create-patient":
create_patient(
args.service_account_json,
args.base_url,
args.project_id,
args.cloud_region,
args.dataset_id,
args.fhir_store_id,
)
elif args.command == "create-encounter":
create_encounter(
args.service_account_json,
args.base_url,
args.project_id,
args.cloud_region,
args.dataset_id,
args.fhir_store_id,
args.patient_id,
)
elif args.command == "create-observation":
create_observation(
args.service_account_json,
args.base_url,
args.project_id,
args.cloud_region,
args.dataset_id,
args.fhir_store_id,
args.patient_id,
args.encounter_id,
)
elif args.command == "delete-resource":
delete_resource(
args.service_account_json,
args.base_url,
args.project_id,
args.cloud_region,
args.dataset_id,
args.fhir_store_id,
args.resource_type,
args.resource_id,
)
elif args.command == "conditional-delete-resource":
conditional_delete_resource(
args.service_account_json,
args.base_url,
args.project_id,
args.cloud_region,
args.dataset_id,
args.fhir_store_id,
)
elif args.command == "conditional-update-resource":
conditional_update_resource(
args.service_account_json,
args.base_url,
args.project_id,
args.cloud_region,
args.dataset_id,
args.fhir_store_id,
args.patient_id,
args.encounter_id,
)
elif args.command == "conditional-patch-resource":
conditional_patch_resource(
args.service_account_json,
args.base_url,
args.project_id,
args.cloud_region,
args.dataset_id,
args.fhir_store_id,
)
def main():
args = parse_command_line_args()
run_command(args)
if __name__ == "__main__":
main()