in Python/Encrypt credentials/Encryption sample/app.py [0:0]
def update_datasource():
''' Updates the datasource with encrypted credentials '''
try:
access_token = AadService.get_access_token()
request_data = request.json['data']
gateway_id = request_data['gatewayId']
# Validate the credentials data by the user
data_validation_service = DataValidationService()
data_validation_service.validate_creds(request_data)
data_source_service = GetDatasourceService()
gateway_api_response = data_source_service.get_gateway(access_token, gateway_id)
if not gateway_api_response.ok:
return json.dumps({'errorMsg' : str(f'Error {gateway_api_response.status_code} {gateway_api_response.reason}\nRequest Id:\t{gateway_api_response.headers.get("RequestId")}')}), gateway_api_response.status_code
gateway = gateway_api_response.json()
# Send fetched data to update credentials
update_creds_service = UpdateCredentialsService()
api_response = update_creds_service.update_datasource(
access_token, request_data['credType'], request_data[
'privacyLevel'], request_data['credentialsArray'], gateway, request_data['datasourceId'])
if api_response.ok:
return Response(api_response, api_response.status_code)
else:
return json.dumps({'errorMsg': str(f'Error {api_response.status_code} {api_response.reason}\nRequest Id:\t{api_response.headers.get("RequestId")}')}), api_response.status_code
except KeyError as tx:
return json.dumps({'errorMsg': f'{str(tx)} not found'}), 400
except ValueError as vx:
return json.dumps({'errorMsg': f'Invalid {str(vx)}'}), 400
except Exception as ex:
return json.dumps({'errorMsg': str(ex)}), 500