def _process_message()

in aliyun/log/etl_core/trans_comp/trans_json.py [0:0]


    def _process_message(self, key, value):
        new_event = {}
        if isinstance(value, (six.binary_type, six.text_type)):
            try:
                value = json.loads(value)
            except Exception as ex:
                logger.info(u"json_transformer: fail to load event into json object: {0}, error: {1}".format(value, ex))
                return None

        if self.jmes_filter:
            try:
                value = self.jmes_filter.search(value)
                if value is None and self.jmes_ignore_none:
                    logger.info(u"split_event_transformer: value {0} get null from jmes settings {1}, skip it".
                                format(value, self.jmes))
                    return None
            except Exception as ex:
                logger.info(u"split_event_transformer: value {0} with invalid jmes settings {1}, skip it".
                            format(value, self.jmes))
                return None

            if self.output:
                new_event[self.output] = self._n(value)

        # if need to expand
        if self.expand:
            self._expand_json(new_event, key, value, (key,), (key,), self.depth, self.sep, self.prefix, self.suffix)

        return new_event