def encrypt_credentials()

in Python/Encrypt credentials/Encryption sample/app.py [0:0]


def encrypt_credentials():
    ''' Encrypts the credentials for datasource '''

    try:
        access_token = AadService.get_access_token()

        request_data = request.json['data']

        # Validate the credentials data by the user
        data_validation_service =  DataValidationService()
        data_validation_service.validate_encrypt_data(request_data)
        gateway_id = request_data['gatewayId']

        data_source_service = GetDatasourceService()
        gateway_api_response = data_source_service.get_gateway(access_token, gateway_id)

        if not gateway_api_response.ok:
            return json.dumps({'errorMsg' : str(f'Error {gateway_api_response.status_code} {gateway_api_response.reason}\nRequest Id:\t{gateway_api_response.headers.get("RequestId")}')}), gateway_api_response.status_code

        gateway = gateway_api_response.json()

        # Serialize credentials for encryption
        serialized_credentials = Utils.serialize_credentials(request_data['credentialsArray'], request_data['credType'])

        # Cloud gateway does not contain name property
        if 'name' not in gateway:
            return serialized_credentials

        # Encrypt the credentials Asymmetric Key Encryption
        asymmetric_encryptor_service = AsymmetricKeyEncryptor(gateway['publicKey'])
        encrypted_credentials_string = asymmetric_encryptor_service.encode_credentials(serialized_credentials)

        # If on-premise gateway is used, return encrypted data
        return encrypted_credentials_string

    except KeyError as tx:
        return json.dumps({'errorMsg': f'{str(tx)} not found'}), 400
    except ValueError as vx:
        return json.dumps({'errorMsg': f'Invalid {str(vx)}'}), 400
    except Exception as ex:
        return json.dumps({'errorMsg': str(ex)}), 500