processors/compress.py (101 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from .base import Processor, NotConfiguredException, ProcessorException import os import glob import fnmatch import zipfile import tarfile class CompressProcessor(Processor): """ Compress files to zip/tar/tgz formats. Args: glob (str): Files to include, recursive. See Python glob(). exclude (list, optional): List of files to exclude. See python fnmatch(). output (str): Target file. format (str): One of: zip, tar, tar.gz, tar.bz2. compression (str, optional): Compression for ZIP: stored, bzip2, lzma. (default deflate) strip (int, optional): Remove N path parts in the archive. """ def get_default_config_key(): return 'compress' def process(self, output_var='compress'): if 'glob' not in self.config: raise NotConfiguredException('No files selected to be compressed.') if 'format' not in self.config: raise NotConfiguredException('Target archive format not specified.') if 'output' not in self.config: raise NotConfiguredException('No output file specified..') glob_spec = self._jinja_expand_string(self.config['glob'], 'glob') format_spec = self._jinja_expand_string(self.config['format'], 'format') output = self._jinja_expand_string(self.config['output'], 'output') strip = None if 'strip' in self.config: strip = int(self._jinja_expand_int(self.config['strip'], 'strip')) self._init_tempdir() directory = os.path.dirname(output) if directory and not os.path.exists(directory): self.logger.debug( 'Creating directory under temporary directory: %s' % (directory)) os.makedirs(directory, exist_ok=True) files_to_consider = glob.glob(glob_spec, recursive=True) if len(files_to_consider) == 0: self.logger.error('No files found to compress: %s' % (glob_spec), extra={'glob': glob_spec}) raise ProcessorException('No files found to compress') if 'exclude' in self.config: exclude_files = self._jinja_expand_list(self.config['exclude'], 'exclude') files = [] for fname in files_to_consider: file_ok = True for exclude in exclude_files: if fnmatch.fnmatch(fname, exclude): file_ok = False break if file_ok: files.append(fname) if len(files) == 0: self.logger.error( 'No files found to compress after exclusion: %s' % (glob_spec), extra={'glob': glob_spec}) raise ProcessorException( 'No files found to compress after exclusion') else: files = files_to_consider if format_spec == 'zip': zip_compression = zipfile.ZIP_DEFLATED if 'compression' in self.config: if self.config['compression'] == 'stored': zip_compression = zipfile.ZIP_STORED elif self.config['compression'] == 'bzip2': zip_compression = zipfile.ZIP_BZIP2 elif self.config['compression'] == 'lzma': zip_compression = zipfile.ZIP_LZMA self.logger.info('Compressing %d files to ZIP: %s' % (len(files), output)) zip_file = zipfile.ZipFile(output, 'x', zip_compression) for fname in files: target_name = fname if strip: file_parts = fname.split(os.path.sep) target_name = os.path.sep.join(file_parts[strip:]) zip_file.write(fname, target_name) zip_file.close() elif format_spec == 'tar' or format_spec == 'tar.gz' or format_spec == 'tar.bz2' or format_spec == 'tar.xz': tar_mode = 'x:' if format_spec == 'tar.gz': tar_mode = 'x:gz' elif format_spec == 'tar.bz2': tar_mode = 'x:bz2' elif format_spec == 'tar.xz': tar_mode = 'x:xz' self.logger.info('Compressing %d files to %s: %s' % (len(files), format_spec.upper(), output)) tar_file = tarfile.open(output, tar_mode) for fname in files: target_name = fname if strip: file_parts = fname.split(os.path.sep) target_name = os.path.sep.join(file_parts[strip:]) tar_file.add(fname, target_name) tar_file.close() file_stats = os.stat(output) return { output_var: { 'path': output, 'filename': os.path.basename(output), 'format': format_spec, 'size': file_stats.st_size, } }