services/order-processor/main.py (62 lines of code) (raw):

# Copyright 2020 Google Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import os import time import requests from flask import Flask, request app = Flask(__name__) PROJECT_ID = os.getenv('PROJECT_ID') REGION = os.getenv('REGION') or 'us-central1' WORKFLOW = os.getenv('WORKFLOW') or 'order_workflow' def error500(): resp = { 'message': 'Internal error occured.' } return resp, 500 def get_access_token(scopes='https://www.googleapis.com/auth/cloud-platform'): url = 'http://metadata.google.internal/computeMetadata' \ '/v1/instance/service-accounts/default/token?scopes={}'.format(scopes) resp = requests.get(url, headers={"Metadata-Flavor": "Google"}) resp = json.loads(resp.text) return resp['access_token'] @app.route('/') def index(): return 'Order process executor service. ' @app.route('/api/v1/order/process', methods=['POST']) @app.route('/order-processor-service/api/v1/order/process', methods=['POST']) def order_process(): json_data = request.get_json() customer_id, number = None, None invalid_fields = [] for key in json_data.keys(): if key == 'customer_id': customer_id = json_data[key] elif key == 'number': number = json_data[key] else: invalid_fields.append(key) if customer_id is None or number is None: return error500() token = get_access_token() # Execute workflow service_url = 'https://workflowexecutions.googleapis.com/v1beta' \ '/projects/{}/locations/{}/workflows/{}/executions'.format( PROJECT_ID, REGION, WORKFLOW) headers = {"Authorization": "Bearer {}".format(token), "Content-Type": "application/json"} data = {"argument": '{{"customer_id":"{}", "number":{}}}'.format(customer_id, number)} resp = requests.post(service_url, headers=headers, json=data) resp = json.loads(resp.text) job_name = resp['name'] # Wait for the excecution to finish service_url = 'https://workflowexecutions.googleapis.com/v1beta/{}'.format(job_name) headers = {"Authorization": "Bearer {}".format(token)} while True: resp = requests.get(service_url, headers=headers) resp = json.loads(resp.text) state = resp['state'] if state == 'SUCCEEDED': result = json.loads(resp['result']) return result, 200 if state == 'FAILED': return error500() time.sleep(5) if __name__ == "__main__": app.run(debug=True, host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))