in scripts/s3-extract-tar.py [0:0]
def extract_tar(cls, config=None):
key = config['key']
manifest_path = None
s3_client = boto3.client('s3')
if key:
tmp_dir = config["tmp_dir"]
os.makedirs(tmp_dir, mode=0o777, exist_ok=True)
mdir = os.path.join(tmp_dir, "manifests")
os.makedirs(mdir, mode=0o777, exist_ok=True)
source_bucket = config["source_bucket"]
dest_bucket = config["dest_bucket"]
dest_prefix = config["dest_prefix"]
manifest_name = key if key.find('/') == -1 else key.rsplit('/', 1)[1]
manifest_path = os.path.join(mdir, manifest_name)
if cls.__is_tar_extracted(s3_client=s3_client, key=key, mdir=mdir, dest_bucket=dest_bucket):
cls.logger.info(f"Tar is already extracted: {key}")
sys.exit(0)
_file_name = key if key.find('/') == -1 else key.rsplit('/', 1)[1]
_file_path = os.path.join(tmp_dir, _file_name)
if os.path.isfile(_file_path):
file_size = os.stat(_file_path).st_size
tar_size = s3_client.head_object(Bucket=source_bucket, Key=key).get('ContentLength')
if file_size == tar_size:
cls.logger.info(f"Skipping download: s3://{source_bucket}/{key}, file exists: {_file_path}, size:{file_size}")
else:
config['file_path'] = cls.__download_file(s3_client=s3_client, bucket_name=source_bucket, key=key,
dir=tmp_dir, mdir=mdir, dest_bucket=dest_bucket)
else:
config['file_path'] = cls.__download_file(s3_client=s3_client, bucket_name=source_bucket, key=key,
dir=tmp_dir, mdir=mdir, dest_bucket=dest_bucket)
file_path = config['file_path']
if not file_path:
cls.logger.info("No file to be extracted")
sys.exit(0)
start_time = time.time()
start = config['start']
end = config['end']
total_file_count = 0
if not start and not end:
ext = os.path.splitext(file_path)[1]
mode = "r:gz" if ext == ".gz" else "r:bz2" if ext == ".bz2" else "r:xz" if ext == ".xz" else "r"
tar_file = tarfile.open(name=file_path, mode=mode)
info_list = tar_file.getmembers()
file_info_list = [ info for info in info_list if info.isfile()]
total_file_count = len(file_info_list)
with open(manifest_path, "w") as manifest:
for file_info in file_info_list:
file_key = cls.__dest_key(dest_prefix, file_info.name)
file_size = file_info.size
manifest.write(f"{file_key} {file_size}\n")
manifest.close()
if total_file_count > cls.FILE_CHUNK_COUNT:
cls.logger.info(f"submit batch jobs for {file_path} : {total_file_count}")
cls.__submit_batch_jobs(config, total_file_count)
else:
_p_list = []
for i in range(cls.P_CONCURRENT):
p = S3TarExtractor(config=config, index=i, count=cls.P_CONCURRENT)
_p_list.append(p)
p.start()
for _p in _p_list:
_p.join()
elapsed = time.time() - start_time
if not start and not end:
verified = cls.__verify_manifest(s3_client=s3_client, manifest_path=manifest_path, dest_bucket=dest_bucket)
if verified:
manifest_key = f"manifests/{key}"
s3_client = boto3.client('s3')
s3_client.upload_file(manifest_path, dest_bucket, manifest_key)
os.remove(manifest_path)
os.remove(file_path)
cls.logger.info(f"Extraction completed: {elapsed} secs")
else:
cls.logger.info(f"Extraction verification error: {elapsed} secs")
sys.exit(1)