datahub/client/common/config.py (148 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from .constant import Constant
class Utils:
DATE_FMT = "[%Y-%m-%d %H:%M:%S]"
FORMAT = "%(asctime)s|%(levelname)7s|%(threadName)10s|%(message)s"
class DatahubConfig:
__slots__ = '_access_id', '_access_key', '_endpoint', '_protocol_type', '_compress_format'
def __init__(self, access_id, access_key, endpoint, protocol_type, compress_format):
self._access_id = access_id
self._access_key = access_key
self._endpoint = endpoint
self._protocol_type = protocol_type
self._compress_format = compress_format
@property
def access_id(self):
return self._access_id
@access_id.setter
def access_id(self, value):
self._access_id = value
@property
def access_key(self):
return self._access_key
@access_key.setter
def access_key(self, value):
self._access_key = value
@property
def endpoint(self):
return self._endpoint
@endpoint.setter
def endpoint(self, value):
self._endpoint = value
@property
def protocol_type(self):
return self._protocol_type
@protocol_type.setter
def protocol_type(self, value):
self._protocol_type = value
@property
def compress_format(self):
return self._compress_format
@compress_format.setter
def compress_format(self, value):
self._compress_format = value
class CommonConfig(DatahubConfig):
__slots__ = '_retry_times', '_async_thread_limit', '_thread_queue_limit', '_logging_level', '_logging_filename'
def __init__(self, access_id, access_key, endpoint, protocol_type, compress_format):
super().__init__(access_id, access_key, endpoint, protocol_type, compress_format)
self._retry_times = Constant.DEFAULT_RETRY_TIMES
self._async_thread_limit = Constant.DEFAULT_ASYNC_THREAD_LIMIT
self._thread_queue_limit = Constant.DEFAULT_THREAD_QUEUE_LIMIT
self._logging_level = Constant.DEFAULT_LOGING_LEVEL
self._logging_filename = Constant.DEFAULT_LOGING_FILENAME
@property
def retry_times(self):
return self._retry_times
@retry_times.setter
def retry_times(self, value):
self._retry_times = value
@property
def async_thread_limit(self):
return self._async_thread_limit
@async_thread_limit.setter
def async_thread_limit(self, value):
self._async_thread_limit = value
@property
def thread_queue_limit(self):
return self._thread_queue_limit
@thread_queue_limit.setter
def thread_queue_limit(self, value):
self._thread_queue_limit = value
@property
def logging_level(self):
return self._logging_level
@logging_level.setter
def logging_level(self, value):
self._logging_level = value
@property
def logging_filename(self):
return self._logging_filename
@logging_filename.setter
def logging_filename(self, value):
self._logging_filename = value
class ConsumerConfig(CommonConfig):
"""
Config for datahub producer
Members:
access_id (:class:`string`): Aliyun access id
access_key (:class:`string`): Aliyun access key
endpoint (:class:`string`): Datahub endpoint
protocol_type (:class:`datahub.core.DatahubProtocolType`): Protocol type for datahub client
compress_format (:class:`datahub.models.compress.CompressFormat`): Compress format for records data
retry_times (:class:`int`): Retry times when request error
async_thread_limit (:class:`int`): Thread num limit for thread pool in message reader
thread_queue_limit (:class:`int`): Task num limit for queue in thread pool in message reader
logging_level (:class:`int`): Logging level
logging_filename (:class:`string`): Logging file name
auto_ack_offset (:class:`bool`): Auto ack offset for fetched records or not
session_timeout (:class:`int`): Session timeout
max_record_buffer_size (:class:`int`): Max record buffer size in consumer
fetch_limit (:class:`int`): Fetch num limit need to consume
"""
__slots__ = '_auto_ack_offset', '_session_timeout', '_max_record_buffer_size', '_fetch_limit'
def __init__(self, access_id, access_key, endpoint, protocol_type=Constant.DEFAULT_PROTOCOL_TYPE,
compress_format=Constant.DEFAULT_COMPRESS_FORMAT):
super().__init__(access_id, access_key, endpoint, protocol_type, compress_format)
self._auto_ack_offset = Constant.DEFAULT_AUTO_ACK_OFFSET
self._session_timeout = Constant.DEFAULT_SESSION_TIMEOUT
self._max_record_buffer_size = Constant.DEFAULT_MAX_RECORD_BUFFER_SIZE
self._fetch_limit = Constant.DEFAULT_FETCH_LIMIT
@property
def auto_ack_offset(self):
return self._auto_ack_offset
@auto_ack_offset.setter
def auto_ack_offset(self, value):
self._auto_ack_offset = value
@property
def session_timeout(self):
return self._session_timeout
@session_timeout.setter
def session_timeout(self, value):
self._session_timeout = value
@property
def max_record_buffer_size(self):
return self._max_record_buffer_size
@max_record_buffer_size.setter
def max_record_buffer_size(self, value):
self._max_record_buffer_size = value
@property
def fetch_limit(self):
return self._fetch_limit
@fetch_limit.setter
def fetch_limit(self, value):
self._fetch_limit = value
class ProducerConfig(CommonConfig):
"""
Config for datahub producer
Members:
access_id (:class:`string`): Aliyun access id
access_key (:class:`string`): Aliyun access key
endpoint (:class:`string`): Datahub endpoint
protocol_type (:class:`datahub.core.DatahubProtocolType`): Protocol type for datahub client
compress_format (:class:`datahub.models.compress.CompressFormat`): Compress format for records data
retry_times (:class:`int`): Retry times when request error
async_thread_limit (:class:`int`): Thread num limit for thread pool in message writer
thread_queue_limit (:class:`int`): Task num limit for queue in thread pool in message writer
logging_level (:class:`int`): Logging level
logging_filename (:class:`string`): Logging file name
max_async_buffer_records (:class:`int`): Max buffer records number to PutRecords once. Only valid when write async.
max_async_buffer_size (:class:`int`): Max buffer size to PutRecords once. Only valid when write async.
max_async_buffer_time (:class:`int`): Max buffer time to PutRecords once. Only valid when write async.
max_record_pack_queue_limit (:class:`int`): Max ready record pack limit for queue. Only valid when write async.
"""
__slots__ = '_max_async_buffer_records', '_max_async_buffer_size',\
'_max_async_buffer_time', '_max_record_pack_queue_limit'
def __init__(self, access_id, access_key, endpoint, protocol_type=Constant.DEFAULT_PROTOCOL_TYPE,
compress_format=Constant.DEFAULT_COMPRESS_FORMAT):
super().__init__(access_id, access_key, endpoint, protocol_type, compress_format)
self._max_async_buffer_records = Constant.MAX_ASYNC_BUFFER_RECORD_COUNT
self._max_async_buffer_size = Constant.MAX_ASYNC_BUFFER_SIZE
self._max_async_buffer_time = Constant.MAX_ASYNC_BUFFER_TIMEOUT_S
self._max_record_pack_queue_limit = Constant.MAX_RECORD_PACK_QUEUE_LIMIT
@property
def max_async_buffer_records(self):
return self._max_async_buffer_records
@max_async_buffer_records.setter
def max_async_buffer_records(self, value):
self._max_async_buffer_records = min(value, Constant.MAX_ASYNC_BUFFER_RECORD_COUNT)
@property
def max_async_buffer_size(self):
return self._max_async_buffer_size
@max_async_buffer_size.setter
def max_async_buffer_size(self, value):
self._max_async_buffer_size = min(value, Constant.MAX_ASYNC_BUFFER_SIZE)
@property
def max_async_buffer_time(self):
return self._max_async_buffer_time
@max_async_buffer_time.setter
def max_async_buffer_time(self, value):
self._max_async_buffer_time = min(value, Constant.MAX_ASYNC_BUFFER_TIMEOUT_S)
@property
def max_record_pack_queue_limit(self):
return self._max_record_pack_queue_limit
@max_record_pack_queue_limit.setter
def max_record_pack_queue_limit(self, value):
self._max_record_pack_queue_limit = value