in Python/Encrypt credentials/Encryption sample/app.py [0:0]
def encrypt_credentials():
''' Encrypts the credentials for datasource '''
try:
access_token = AadService.get_access_token()
request_data = request.json['data']
# Validate the credentials data by the user
data_validation_service = DataValidationService()
data_validation_service.validate_encrypt_data(request_data)
gateway_id = request_data['gatewayId']
data_source_service = GetDatasourceService()
gateway_api_response = data_source_service.get_gateway(access_token, gateway_id)
if not gateway_api_response.ok:
return json.dumps({'errorMsg' : str(f'Error {gateway_api_response.status_code} {gateway_api_response.reason}\nRequest Id:\t{gateway_api_response.headers.get("RequestId")}')}), gateway_api_response.status_code
gateway = gateway_api_response.json()
# Serialize credentials for encryption
serialized_credentials = Utils.serialize_credentials(request_data['credentialsArray'], request_data['credType'])
# Cloud gateway does not contain name property
if 'name' not in gateway:
return serialized_credentials
# Encrypt the credentials Asymmetric Key Encryption
asymmetric_encryptor_service = AsymmetricKeyEncryptor(gateway['publicKey'])
encrypted_credentials_string = asymmetric_encryptor_service.encode_credentials(serialized_credentials)
# If on-premise gateway is used, return encrypted data
return encrypted_credentials_string
except KeyError as tx:
return json.dumps({'errorMsg': f'{str(tx)} not found'}), 400
except ValueError as vx:
return json.dumps({'errorMsg': f'Invalid {str(vx)}'}), 400
except Exception as ex:
return json.dumps({'errorMsg': str(ex)}), 500