in cloud-composer/dags/sample-bigquery-stop-spanner.py [0:0]
def delete_spanner_instance(spanner_uri):
print("delete_spanner_instance")
filePath = "/home/airflow/gcs/data/write_spanner_run_datetime.json"
deleteSpannerInstance = False
if os.path.exists(filePath):
with open(filePath) as f:
data = json.load(f)
print("run_datetime: ", data['run_datetime'])
print("spanner_instance_id: ", data['spanner_instance_id'])
run_datetime = datetime.strptime(data['run_datetime'], "%m/%d/%Y %H:%M:%S")
spanner_instance_id = data['spanner_instance_id']
difference = run_datetime - datetime.now()
print("difference.total_seconds(): ", abs(difference.total_seconds()))
# Test for 4 hours
# if difference.total_seconds() > (4 * 60 * 60):
if abs(difference.total_seconds()) > (4 * 60 * 60):
print("Deleting Spanner Instance > 4 hours")
deleteSpannerInstance = True
else:
print("Json files does not exist (no Spanner Instance deployed)")
if deleteSpannerInstance:
# Get auth (default service account running composer worker node)
creds, project = google.auth.default()
auth_req = google.auth.transport.requests.Request() # required to acess access token
creds.refresh(auth_req)
access_token=creds.token
auth_header = {
'Authorization' : "Bearer " + access_token,
'Accept' : 'application/json',
'Content-Type' : 'application/json' }
# print("auth_header: ", auth_header)
# call rest api with bearer token
# DELETE https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instances/delete
uri="https://spanner.googleapis.com/v1/" + spanner_uri
print("uri: ", uri)
# request_body = { "requestedState" : "JOB_STATE_CANCELLED" }
"""
# https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instances/delete
curl --request DELETE \
'https://spanner.googleapis.com/v1/adam?key=[YOUR_API_KEY]' \
--header 'Authorization: Bearer [YOUR_ACCESS_TOKEN]' \
--header 'Accept: application/json' \
--compressed
"""
try:
response = requests.delete(uri, headers=auth_header)
response.raise_for_status()
print("Deleted Spanner Instance")
os.remove(filePath)
except requests.exceptions.RequestException as err:
print("Error: ", err)
raise err