parquet_flask/utils/parallel_json_validator.py (49 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import fastjsonschema
import logging
from datetime import datetime
from multiprocessing import Pool
from parquet_flask.utils.singleton import Singleton
LOGGER = logging.getLogger(__name__)
def __validate_small_data(small_data, schema):
try:
fastjsonschema.compile(schema)(small_data)
return None
except Exception as e:
return str(e)
def parallel_validate(chunked_data, schema):
a = datetime.now()
with Pool(16) as p:
all_result = p.starmap(__validate_small_data, [(k, schema) for k in chunked_data])
all_result = [k for k in all_result if k is not None]
b = datetime.now()
LOGGER.debug(f'validation took: {b - a}')
return len(all_result) < 1, all_result
class ParallelJsonValidator(object):
def __init__(self):
self.__schema = None
@property
def schema(self):
return self.__schema
@schema.setter
def schema(self, val):
"""
:param val:
:return: None
"""
self.__schema = val
return
def load_schema(self, input_schema):
self.schema = input_schema
return self
def is_schema_loaded(self):
return self.__schema is not None
def __validate_this(self, small_data):
try:
self.__schema(small_data)
return None
except Exception as e:
return str(e)
def validate_json(self, chunked_data: list):
if self.is_schema_loaded() is False:
raise ValueError(f'schema is not loaded. cannot validate')
if len(chunked_data) < 1:
LOGGER.debug(f'no need to validate empty json')
return True
LOGGER.debug(f'chunked_data size: {len(chunked_data)}')
return parallel_validate(chunked_data, self.schema)