def extract_tar()

in scripts/s3-extract-tar.py [0:0]


    def extract_tar(cls, config=None):

        key = config['key']
        manifest_path = None
        s3_client = boto3.client('s3')

        if key:
            tmp_dir = config["tmp_dir"]
            os.makedirs(tmp_dir, mode=0o777, exist_ok=True)

            mdir = os.path.join(tmp_dir, "manifests")
            os.makedirs(mdir, mode=0o777, exist_ok=True)
    
            source_bucket = config["source_bucket"]
            dest_bucket = config["dest_bucket"]
        
            dest_prefix = config["dest_prefix"]
            manifest_name = key if key.find('/') == -1 else key.rsplit('/', 1)[1]
            manifest_path = os.path.join(mdir, manifest_name)

            if cls.__is_tar_extracted(s3_client=s3_client, key=key, mdir=mdir, dest_bucket=dest_bucket):
                cls.logger.info(f"Tar is already extracted: {key}")
                sys.exit(0)

            _file_name = key if key.find('/') == -1 else key.rsplit('/', 1)[1]
            _file_path = os.path.join(tmp_dir, _file_name)

            if os.path.isfile(_file_path):
                file_size = os.stat(_file_path).st_size
                tar_size = s3_client.head_object(Bucket=source_bucket, Key=key).get('ContentLength')
                if file_size == tar_size:
                    cls.logger.info(f"Skipping download: s3://{source_bucket}/{key}, file exists: {_file_path}, size:{file_size}")
                else:
                    config['file_path'] = cls.__download_file(s3_client=s3_client, bucket_name=source_bucket, key=key, 
                        dir=tmp_dir, mdir=mdir, dest_bucket=dest_bucket)
            else:
                config['file_path'] = cls.__download_file(s3_client=s3_client, bucket_name=source_bucket, key=key, 
                    dir=tmp_dir, mdir=mdir, dest_bucket=dest_bucket)
        
        file_path = config['file_path']
        if not file_path:
            cls.logger.info("No file to be extracted")
            sys.exit(0)

        start_time = time.time()

        start = config['start']
        end = config['end']

        total_file_count = 0
        if not start and not end:
            ext = os.path.splitext(file_path)[1]
            mode = "r:gz" if ext == ".gz" else "r:bz2" if ext == ".bz2" else "r:xz" if ext == ".xz" else "r"
            tar_file = tarfile.open(name=file_path, mode=mode)
            info_list = tar_file.getmembers()
            file_info_list = [ info for info in info_list if  info.isfile()]
            total_file_count = len(file_info_list)

            with open(manifest_path, "w") as manifest:
                for file_info in file_info_list:
                    file_key = cls.__dest_key(dest_prefix, file_info.name)
                    file_size = file_info.size
                    manifest.write(f"{file_key} {file_size}\n")

                manifest.close()

        if total_file_count > cls.FILE_CHUNK_COUNT:
            cls.logger.info(f"submit batch jobs for {file_path} : {total_file_count}")
            cls.__submit_batch_jobs(config, total_file_count)
        else:
            _p_list = []
            for i in range(cls.P_CONCURRENT):
                p = S3TarExtractor(config=config, index=i, count=cls.P_CONCURRENT)
                _p_list.append(p)
                p.start()

            for _p in _p_list:
                _p.join()
    
        elapsed = time.time() - start_time
    
        if not start and not end:
            verified = cls.__verify_manifest(s3_client=s3_client, manifest_path=manifest_path, dest_bucket=dest_bucket)
            if verified:
                manifest_key = f"manifests/{key}"
                s3_client = boto3.client('s3')
                s3_client.upload_file(manifest_path, dest_bucket, manifest_key)
                os.remove(manifest_path)
                os.remove(file_path)
                cls.logger.info(f"Extraction completed: {elapsed} secs") 
            else:
                cls.logger.info(f"Extraction verification error: {elapsed} secs")
                sys.exit(1)