in aliyun/log/etl_core/trans_comp/trans_lookup.py [0:0]
def __init__(self, data, output_fields, sep=',', quote='"', lstrip=True, case_insensitive=True, headers=None, mode=None):
super(trans_comp_lookup, self).__init__(mode=mode)
data = self._u(data)
output_fields = self._u(output_fields)
sep = sep
quote = quote
headers = self._u(headers)
if isinstance(data, dict):
self.data = DefaultDict(data, case_insensitive)
# init output fields
if isinstance(output_fields, (six.binary_type, six.text_type)):
self.output_fields = [output_fields]
elif isinstance(output_fields, Iterable):
self.output_fields = []
for x in output_fields:
if isinstance(output_fields, (six.binary_type, six.text_type)):
self.output_fields.append(x)
else:
raise SettingError(settings=output_fields)
else:
raise SettingError(settings=output_fields)
elif isinstance(data, (six.binary_type, six.text_type)):
# parse headers
if headers:
if isinstance(headers, (six.text_type, six.binary_type)):
headers = tuple(x.strip() for x in headers.split(","))
else:
headers = tuple(headers)
self.sig = (data, sep, quote, lstrip, headers)
if self.sig in self.EXTERNAL_CACHE:
self.data = self.EXTERNAL_CACHE[self.sig]
else:
type_path = data.split("://")
file_type = type_path[0] if len(type_path) == 2 else u'file'
file_path = type_path[1] if len(type_path) == 2 else data
if file_type != 'file':
raise SettingError(msg="trans_comp_lookup: unsupported file type", settings=data)
if not os.path.isfile(file_path):
caller_frame_record = inspect.stack()[1]
module = inspect.getmodule(caller_frame_record[0])
file_path = os.path.sep.join([os.path.dirname(module.__file__), file_path])
if not os.path.isfile(file_path):
raise SettingError(msg="trans_comp_lookup: cannot locate the file path", settings=data)
with open(file_path) as file:
reader = csv.DictReader(file, fieldnames=headers, skipinitialspace=lstrip, delimiter=sep, quotechar=quote)
self.data = Table(reader, case_insensitive)
# put into cache for re-use for other calling
self.EXTERNAL_CACHE[self.sig] = self.data
# init output fields
self.output_fields = {}
if isinstance(output_fields, (six.binary_type, six.text_type)):
self.output_fields[output_fields] = output_fields
elif isinstance(output_fields, Iterable):
for f in output_fields:
if isinstance(f, (six.binary_type, six.text_type)):
self.output_fields[f] = f
elif isinstance(f, (tuple, list)) and len(f) == 2:
self.output_fields[f[0]] = f[1]
else:
raise SettingError(settings=output_fields)
for f in self.output_fields:
assert f in self.data.field_names, SettingError(
msg="trans_comp_lookup: output field {0} doesn't exist in lookup table".format(f))
else:
raise SettingError(settings=output_fields)
else:
raise SettingError(settings=data)