in processors/compress.py [0:0]
def process(self, output_var='compress'):
if 'glob' not in self.config:
raise NotConfiguredException('No files selected to be compressed.')
if 'format' not in self.config:
raise NotConfiguredException('Target archive format not specified.')
if 'output' not in self.config:
raise NotConfiguredException('No output file specified..')
glob_spec = self._jinja_expand_string(self.config['glob'], 'glob')
format_spec = self._jinja_expand_string(self.config['format'], 'format')
output = self._jinja_expand_string(self.config['output'], 'output')
strip = None
if 'strip' in self.config:
strip = int(self._jinja_expand_int(self.config['strip'], 'strip'))
self._init_tempdir()
directory = os.path.dirname(output)
if directory and not os.path.exists(directory):
self.logger.debug(
'Creating directory under temporary directory: %s' %
(directory))
os.makedirs(directory, exist_ok=True)
files_to_consider = glob.glob(glob_spec, recursive=True)
if len(files_to_consider) == 0:
self.logger.error('No files found to compress: %s' % (glob_spec),
extra={'glob': glob_spec})
raise ProcessorException('No files found to compress')
if 'exclude' in self.config:
exclude_files = self._jinja_expand_list(self.config['exclude'],
'exclude')
files = []
for fname in files_to_consider:
file_ok = True
for exclude in exclude_files:
if fnmatch.fnmatch(fname, exclude):
file_ok = False
break
if file_ok:
files.append(fname)
if len(files) == 0:
self.logger.error(
'No files found to compress after exclusion: %s' %
(glob_spec),
extra={'glob': glob_spec})
raise ProcessorException(
'No files found to compress after exclusion')
else:
files = files_to_consider
if format_spec == 'zip':
zip_compression = zipfile.ZIP_DEFLATED
if 'compression' in self.config:
if self.config['compression'] == 'stored':
zip_compression = zipfile.ZIP_STORED
elif self.config['compression'] == 'bzip2':
zip_compression = zipfile.ZIP_BZIP2
elif self.config['compression'] == 'lzma':
zip_compression = zipfile.ZIP_LZMA
self.logger.info('Compressing %d files to ZIP: %s' %
(len(files), output))
zip_file = zipfile.ZipFile(output, 'x', zip_compression)
for fname in files:
target_name = fname
if strip:
file_parts = fname.split(os.path.sep)
target_name = os.path.sep.join(file_parts[strip:])
zip_file.write(fname, target_name)
zip_file.close()
elif format_spec == 'tar' or format_spec == 'tar.gz' or format_spec == 'tar.bz2' or format_spec == 'tar.xz':
tar_mode = 'x:'
if format_spec == 'tar.gz':
tar_mode = 'x:gz'
elif format_spec == 'tar.bz2':
tar_mode = 'x:bz2'
elif format_spec == 'tar.xz':
tar_mode = 'x:xz'
self.logger.info('Compressing %d files to %s: %s' %
(len(files), format_spec.upper(), output))
tar_file = tarfile.open(output, tar_mode)
for fname in files:
target_name = fname
if strip:
file_parts = fname.split(os.path.sep)
target_name = os.path.sep.join(file_parts[strip:])
tar_file.add(fname, target_name)
tar_file.close()
file_stats = os.stat(output)
return {
output_var: {
'path': output,
'filename': os.path.basename(output),
'format': format_spec,
'size': file_stats.st_size,
}
}