parquet_flask/v1/replace_json_s3.py (42 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from flask_restx import Resource, Namespace, fields
from flask import request
from parquet_flask.utils.general_utils import GeneralUtils
from parquet_flask.v1.authenticator_decorator import authenticator_decorator
from parquet_flask.v1.ingest_aws_json import IngestAwsJsonProps, IngestAwsJson
api = Namespace('replace_json_s3', description="Ingesting JSON files")
LOGGER = logging.getLogger(__name__)
query_model = api.model('replace_json_s3', {
's3_url': fields.String(required=True, example='s3://<bucket>/<key>'),
'job_id': fields.String(required=True, example='sample-uuid'),
'sanitize_record': fields.Boolean(required=False, example='True', default=True),
'wait_till_finish': fields.Boolean(required=False, example='True', default=True),
})
_QUERY_SCHEMA = {
'type': 'object',
'properties': {
's3_url': {'type': 'string'},
'job_id': {'type': 'string'},
'sanitize_record': {'type': 'boolean'},
'wait_till_finish': {'type': 'boolean'},
},
'required': ['s3_url', 'job_id'],
}
@api.route('', methods=["put"])
class IngestParquet(Resource):
def __init__(self, api=None, *args, **kwargs):
super().__init__(api, args, kwargs)
@api.expect(fields=query_model)
@authenticator_decorator
def put(self):
"""
s3://ecsv-h5-data-v1/INDEX/GALILEO/filenames.txt.gz
:return:
"""
payload = request.get_json()
is_valid, json_error = GeneralUtils.is_json_valid(payload, _QUERY_SCHEMA)
if not is_valid:
return {'message': 'invalid request body', 'details': str(json_error)}, 400
props = IngestAwsJsonProps()
props.s3_url = payload['s3_url']
props.uuid = payload['job_id']
props.is_replacing = True
props.is_sanitizing = payload['sanitize_record'] if 'sanitize_record' in payload else True
props.wait_till_complete = payload['wait_till_finish'] if 'wait_till_finish' in payload else True
return IngestAwsJson(props).ingest()