custos-client-sdks/custos-python-sdk/custos/clients/utils/certificate_fetching_rest_client.py (51 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import logging import OpenSSL import requests import os import datetime from urllib3.exceptions import InsecureRequestWarning import warnings requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) from custos.transport.settings import CustosServerClientSettings import custos.clients.utils.utilities as utl logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) import requests from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) class CertificateFetchingRestClient(object): def __init__(self, custos_server_setting): self.custos_settings = custos_server_setting self.target = self.custos_settings.CUSTOS_SERVER_HOST + ":" + str(self.custos_settings.CUSTOS_SERVER_PORT) self.url = "https://" + self.target + "/resource-secret-management/v1.0.0/secret" self.ownertype = "CUSTOS" self.resource_type = "SERVER_CERTIFICATE" self.params = { 'metadata.owner_type': self.ownertype, 'metadata.resource_type': self.resource_type } self.rootdir = os.path.abspath(os.curdir) encodedStr = utl.get_token(self.custos_settings) self.header = {'Authorization': 'Bearer {}'.format(encodedStr)} def load_certificate(self): if not self.__is_certificate_valid(): self.__download_certificate() def __download_certificate(self): r = requests.get(url=self.url, params=self.params, headers=self.header, stream=True, timeout=60, verify=False) value = r.json()['value'] path = self.custos_settings.CUSTOS_CERT_PATH f = open(path, "w+") f.write(value) def __is_certificate_valid(self): if os.path.isfile(self.custos_settings.CUSTOS_CERT_PATH): file = open(self.custos_settings.CUSTOS_CERT_PATH) x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, file.read()) expires = datetime.datetime.strptime(x509.get_notAfter().decode('ascii'), '%Y%m%d%H%M%SZ') now = datetime.datetime.now() if now > expires: return False else: return True else: return False