def request()

in mysqloperator/controller/backup/meb/meb_controller.py [0:0]


    def request(self, host: str, request_target: str, method: str,
                request_body: dict) -> dict:
        private_key = serialization.load_pem_private_key(
            self.private_key.encode('utf-8'),
            password=None
        )

        endpoint = f'https://{host}{request_target}'

        date_header = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
        headers = {
            "Content-Type": "application/json",
            "date": date_header,
            "host": host,
            "opc-client-info": "public-objectstorage:0.1.0"
        }

        signing_string = f"(request-target): {method.lower()} {request_target}\n" \
                         f"host: {host}\n" \
                         f"date: {headers['date']}\n" \
                         f"opc-client-info: {headers['opc-client-info']}"

        # Sign the string
        signature = base64.b64encode(
            private_key.sign(
                signing_string.encode('utf-8'),
                padding.PKCS1v15(),
                SHA256()
            )
        ).decode('utf-8')

        authorization_header = (
            f'Signature version="1",'
            f'headers="(request-target) host date opc-client-info",'
            f'keyId="{self.tenancy_id}/{self.user_id}/{self.fingerprint}",'
            f'algorithm="rsa-sha256",'
            f'signature="{signature}"'
        )
        headers['authorization'] = authorization_header

        data = json.dumps(request_body) if request_body else None

        parsed_url = urlparse(endpoint)
        conn = http.client.HTTPSConnection(parsed_url.netloc)

        path = parsed_url.path
        if parsed_url.query:
            path += "?" + parsed_url.query

        headers = headers or {}
        if data:
            headers['Content-Type'] = 'application/json'

        conn.request(method, path, body=data, headers=headers)
        response = conn.getresponse()

        if response.status not in (200, 204):
            print(f"Failed to do OCI request: {response.status}")
            print(response.read().decode())
            raise Exception("Failed OCI API call")

        response_data = response.read()
        if response_data:
            return json.loads(response_data)

        return True