in scripts/s3-extract-tar.py [0:0]
def run(self):
file_path = self.config['file_path']
ext = os.path.splitext(file_path)[1]
mode = "r:gz" if ext == ".gz" else "r:bz2" if ext == ".bz2" else "r:xz" if ext == ".xz" else "r"
tar_file = tarfile.open(name=file_path, mode=mode)
start = self.config['start']
end = self.config['end']
info_list = tar_file.getmembers()
file_info_list = [ info for info in info_list if info.isfile()]
file_info_list = file_info_list[start:end] if end > start else file_info_list
file_count = len(file_info_list)
p_chunk = math.ceil(file_count/self.__pcount)
p_start = self.__pindex * p_chunk
p_end = min(p_start + p_chunk, file_count)
file_info_list = file_info_list[p_start:p_end]
file_count = len(file_info_list)
start_time = time.time()
files_extracted=0
dest_bucket = self.config["dest_bucket"]
dest_prefix = self.config["dest_prefix"]
s3_client = boto3.client('s3')
self.logger.info(f"worker {self.__pindex}, start: {start+p_start}, end: {start+p_end}, count: {file_count}")
for file_info in file_info_list:
nattempt = 0
while nattempt < self.PUT_RETRY:
try:
file_reader = tar_file.extractfile(file_info)
file_key = self.__dest_key(dest_prefix, file_info.name)
s3_client.put_object(Body=file_reader,
Bucket=dest_bucket,
Key = file_key)
file_reader.close()
files_extracted += 1
break
except Exception as err:
self.logger.info(f"Exception: {err}")
nattempt += 1
time.sleep(nattempt)
if nattempt >= self.PUT_RETRY:
self.logger.info(f'worker {self.__pindex}, failed: {file_path}')
sys.exit(1)
if files_extracted % 100 == 0:
elapsed = time.time() - start_time
self.logger.info(f"worker {self.__pindex}: files extracted: {files_extracted}: {elapsed}s")
elapsed = time.time() - start_time
self.logger.info(f"worker {self.__pindex}: files extracted: {files_extracted}: {elapsed}s")