datahub/rest.py (248 lines of code) (raw):

#!/usr/bin/env python # -*- coding: utf-8 -*- # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from __future__ import absolute_import import json import logging import platform import socket from enum import Enum from string import Template import requests import six from requests.adapters import HTTPAdapter from .exceptions import exception_handler, DatahubException from .models.compress import CompressFormat, get_compressor from .utils import gen_rfc822_date, to_text, to_binary from .version import __version__, __datahub_client_version__ logger = logging.getLogger('datahub.rest') logger.setLevel(logging.INFO) if not logger.handlers: logger.addHandler(logging.NullHandler()) class HTTPMethod(Enum): GET = 'GET' POST = 'POST' PUT = 'PUT' HEAD = 'HEAD' DELETE = 'DELETE' class ContentType(Enum): HTTP_JSON = 'application/json' HTTP_PROTOBUF = 'application/x-protobuf' HTTP_BATCH = 'application/x-binary' class Headers(object): ACCEPT_ENCODING = "Accept-Encoding" AUTHORIZATION = "Authorization" CACHE_CONTROL = "Cache-Control" CHUNKED = "chunked" CLIENT_VERSION = 'x-datahub-client-version' CONTENT_DISPOSITION = "Content-Disposition" CONTENT_ENCODING = "Content-Encoding" CONTENT_LENGTH = "Content-Length" CONTENT_MD5 = "Content-MD5" CONTENT_TYPE = "Content-Type" CONTENT_SUB_ID = "x-datahub-sub-id" DATE = "Date" ETAG = "ETag" EXPIRES = "Expires" HOST = "Host" LAST_MODIFIED = "Last-Modified" LOCATION = "Location" RANGE = "Range" RAW_SIZE = "x-datahub-content-raw-size" REQUEST_ACTION = "x-datahub-request-action" REQUEST_ID = "x-datahub-request-id" SECURITY_TOKEN = "x-datahub-security-token" TRANSFER_ENCODING = "Transfer-Encoding" USER_AGENT = "User-Agent" class Path(object): PROJECTS = '/projects' PROJECT = '/projects/%s' TOPICS = '/projects/%s/topics' TOPIC = '/projects/%s/topics/%s' SHARDS = '/projects/%s/topics/%s/shards' SHARD = '/projects/%s/topics/%s/shards/%s' CONNECTORS = '/projects/%s/topics/%s/connectors' CONNECTOR = '/projects/%s/topics/%s/connectors/%s' DONE_TIME = '/projects/%s/topics/%s/connectors/%s?donetime' SUBSCRIPTIONS = '/projects/%s/topics/%s/subscriptions' SUBSCRIPTION = '/projects/%s/topics/%s/subscriptions/%s' OFFSETS = '/projects/%s/topics/%s/subscriptions/%s/offsets' class CommonResponseResult(object): __slots__ = ('_status_code', '_request_id', '_error_code', '_error_msg') def __init__(self, resp, content): if resp is not None: try: self._status_code = resp.status_code self._request_id = resp.headers[Headers.REQUEST_ID] content = json.loads(to_text(content)) self._error_code = content['ErrorCode'] self._error_msg = content['ErrorMessage'] except Exception: logger.error('Decode json message error, content: %s' % to_text(content)) raise DatahubException(self._status_code, self._request_id, '', 'Decode json message error, content: %s' % to_text(content)) @property def status_code(self): return self._status_code @property def request_id(self): return self._request_id @property def error_code(self): return self._error_code @property def error_msg(self): return self._error_msg def get_host_ip(): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) ip = '0.0.0.0' try: sock.connect(('8.8.8.8', 80)) ip = sock.getsockname()[0] except socket.error as e: logger.error('can not get host ip, msg: %s' % e) finally: sock.close() return ip def default_user_agent(use_client): os_version = platform.platform() py_version = platform.python_version() ip_addr = get_host_ip() ua_template = Template('$pydatahub_version $python_version $os_version $ip_addr') return ua_template.safe_substitute(pydatahub_version=('pyclient/%s' if use_client else 'pydatahub/%s') % __version__, python_version='python/%s' % py_version, os_version='%s' % os_version, ip_addr='ip/%s' % ip_addr) class RestClient(object): """Restful client enhanced by URL building and request signing facilities. """ def __init__(self, account, endpoint, user_agent=None, proxies=None, stream=False, retry_times=3, conn_timeout=5, read_timeout=120, pool_connections=10, pool_maxsize=10, exception_handler_=exception_handler, use_client=False): if endpoint.endswith('/'): endpoint = endpoint[:-1] self._account = account self._endpoint = endpoint self._user_agent = user_agent or default_user_agent(use_client) self._proxies = proxies self._stream = stream self._retry_times = retry_times self._conn_timeout = conn_timeout self._read_timeout = read_timeout self._session = requests.Session() self._session.headers.update({Headers.ACCEPT_ENCODING: ''}) # mount adapters with retry times adapter = HTTPAdapter(pool_connections=pool_connections, pool_maxsize=pool_maxsize, max_retries=self._retry_times) self._session.mount('http://', adapter) self._session.mount('https://', adapter) # exception handler self._exception_handler = exception_handler_ def __del__(self): self._session.close() @property def endpoint(self): return self._endpoint @endpoint.setter def endpoint(self, value): if value.endswith('/'): value = value[:-1] self._endpoint = value @property def account(self): return self._account @account.setter def account(self, value): self._account = value @property def user_agent(self): return self._user_agent @user_agent.setter def user_agent(self, value): self._user_agent = value @property def proxies(self): return self._proxies @proxies.setter def proxies(self, value): self._proxies = value @staticmethod def is_ok(resp): """ return True if status code < 400 """ return resp.ok def __common_headers(self): headers = { Headers.CLIENT_VERSION: __datahub_client_version__, Headers.USER_AGENT: self._user_agent, Headers.CONTENT_TYPE: ContentType.HTTP_JSON.value, Headers.DATE: gen_rfc822_date() } if self._account.security_token: headers[Headers.SECURITY_TOKEN] = self._account.security_token return headers @staticmethod def __compress_content(content, compress_format): compressor = get_compressor(compress_format) if compressor: compressed = compressor.compress(to_binary(content)) compress_headers = { Headers.ACCEPT_ENCODING: compress_format.value } if len(compressed) < len(content): compress_headers[Headers.RAW_SIZE] = to_text(len(content)) compress_headers[Headers.CONTENT_ENCODING] = compress_format.value return compressed, compress_headers return content, compress_headers return content, {} @staticmethod def __decompress_response(response): content_encoding = response.headers.get(Headers.CONTENT_ENCODING, '') raw_size = int(response.headers.get(Headers.RAW_SIZE, '0')) compressor = get_compressor(content_encoding) if compressor: return compressor.decompress(to_binary(response.content), raw_size) return response.content def request(self, method, url, compress_format=CompressFormat.NONE, **kwargs): url = "%s%s" % (self._endpoint, url) # Construct user agent without handling the letter case. headers = self.__common_headers() extra_headers = kwargs.get('headers', {}) extra_headers = dict((k, str(v)) for k, v in six.iteritems(extra_headers)) headers.update(extra_headers) # Compress content and set headers if 'data' in kwargs: data, compress_headers = RestClient.__compress_content(kwargs['data'], compress_format) headers.update(compress_headers) kwargs['data'] = data headers[Headers.CONTENT_LENGTH] = to_text(len(data)) kwargs['headers'] = headers req = requests.Request(method.value, url, **kwargs) prepared_req = self._session.prepare_request(req) self._account.sign_request(prepared_req) logger.debug('full request url: %s\nrequest headers:\n%s\nrequest body:\n%s' % (prepared_req.url, prepared_req.headers, prepared_req.body)) resp = self._session.send(prepared_req, stream=self._stream, timeout=(self._conn_timeout, self._read_timeout), proxies=self._proxies, verify=False) logger.debug('response.status_code: %d' % resp.status_code) logger.debug('response.headers: \n%s' % resp.headers) if not self._stream: logger.debug('response.content: %s\n' % resp.content) content = RestClient.__decompress_response(resp) # Automatically detect error if not RestClient.is_ok(resp) and self._exception_handler is not None: status_code = resp.status_code request_id = resp.headers.get(Headers.REQUEST_ID, '') try: content_data = json.loads(to_text(content)) error_code = content_data['ErrorCode'] error_msg = content_data['ErrorMessage'] except Exception: logger.error('Decode json message error, content: %s' % to_text(content)) raise DatahubException('Decode json message error, content: %s' % to_text(content), status_code, request_id, '') logger.error("status_code: %d, request_id: %s, error_code: %s, error_msg: %s" % (status_code, request_id, error_code, error_msg)) self._exception_handler.raise_exception(error_msg, status_code, request_id, error_code) return content def get(self, url, **kwargs): return self.request(HTTPMethod.GET, url, **kwargs) def post(self, url, **kwargs): return self.request(HTTPMethod.POST, url, **kwargs) def put(self, url, **kwargs): return self.request(HTTPMethod.PUT, url, **kwargs) def head(self, url, **kwargs): return self.request(HTTPMethod.HEAD, url, **kwargs) def delete(self, url, **kwargs): return self.request(HTTPMethod.DELETE, url, **kwargs)