parquet_flask/v1/replace_json_s3.py (42 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging from flask_restx import Resource, Namespace, fields from flask import request from parquet_flask.utils.general_utils import GeneralUtils from parquet_flask.v1.authenticator_decorator import authenticator_decorator from parquet_flask.v1.ingest_aws_json import IngestAwsJsonProps, IngestAwsJson api = Namespace('replace_json_s3', description="Ingesting JSON files") LOGGER = logging.getLogger(__name__) query_model = api.model('replace_json_s3', { 's3_url': fields.String(required=True, example='s3://<bucket>/<key>'), 'job_id': fields.String(required=True, example='sample-uuid'), 'sanitize_record': fields.Boolean(required=False, example='True', default=True), 'wait_till_finish': fields.Boolean(required=False, example='True', default=True), }) _QUERY_SCHEMA = { 'type': 'object', 'properties': { 's3_url': {'type': 'string'}, 'job_id': {'type': 'string'}, 'sanitize_record': {'type': 'boolean'}, 'wait_till_finish': {'type': 'boolean'}, }, 'required': ['s3_url', 'job_id'], } @api.route('', methods=["put"]) class IngestParquet(Resource): def __init__(self, api=None, *args, **kwargs): super().__init__(api, args, kwargs) @api.expect(fields=query_model) @authenticator_decorator def put(self): """ s3://ecsv-h5-data-v1/INDEX/GALILEO/filenames.txt.gz :return: """ payload = request.get_json() is_valid, json_error = GeneralUtils.is_json_valid(payload, _QUERY_SCHEMA) if not is_valid: return {'message': 'invalid request body', 'details': str(json_error)}, 400 props = IngestAwsJsonProps() props.s3_url = payload['s3_url'] props.uuid = payload['job_id'] props.is_replacing = True props.is_sanitizing = payload['sanitize_record'] if 'sanitize_record' in payload else True props.wait_till_complete = payload['wait_till_finish'] if 'wait_till_finish' in payload else True return IngestAwsJson(props).ingest()