def run()

in scripts/s3-extract-tar.py [0:0]


    def run(self):
        file_path = self.config['file_path']
        ext = os.path.splitext(file_path)[1]
        mode = "r:gz" if ext == ".gz" else "r:bz2" if ext == ".bz2" else "r:xz" if ext == ".xz" else "r"
        tar_file = tarfile.open(name=file_path, mode=mode)
        start = self.config['start']
        end = self.config['end']

        info_list = tar_file.getmembers()
        file_info_list = [ info for info in info_list if info.isfile()]
        file_info_list = file_info_list[start:end] if  end > start else file_info_list
        file_count = len(file_info_list)

        p_chunk = math.ceil(file_count/self.__pcount)
        p_start = self.__pindex * p_chunk
        p_end = min(p_start + p_chunk, file_count)

        file_info_list = file_info_list[p_start:p_end]
        file_count = len(file_info_list)

        start_time = time.time()
        files_extracted=0

        dest_bucket = self.config["dest_bucket"]
        dest_prefix = self.config["dest_prefix"]

        s3_client = boto3.client('s3') 
        self.logger.info(f"worker {self.__pindex}, start: {start+p_start}, end: {start+p_end}, count: {file_count}")
        for file_info in file_info_list:
            nattempt = 0
            while nattempt < self.PUT_RETRY:
                try:
                    file_reader = tar_file.extractfile(file_info)
                    file_key = self.__dest_key(dest_prefix, file_info.name)
                    
                    s3_client.put_object(Body=file_reader, 
                                Bucket=dest_bucket, 
                                Key = file_key)
                    file_reader.close()
                    files_extracted += 1
                    break
                except Exception as err:
                    self.logger.info(f"Exception: {err}")
                    nattempt += 1
                    time.sleep(nattempt)
            if nattempt >= self.PUT_RETRY:
                self.logger.info(f'worker {self.__pindex}, failed: {file_path}')
                sys.exit(1)

            if files_extracted % 100 == 0:
                elapsed = time.time() - start_time
                self.logger.info(f"worker {self.__pindex}: files extracted: {files_extracted}: {elapsed}s")

        elapsed = time.time() - start_time
        self.logger.info(f"worker {self.__pindex}: files extracted: {files_extracted}: {elapsed}s")