openvid/zipstream.py (154 lines of code) (raw):

from dataclasses import dataclass import pathlib import requests import struct import tqdm from typing import Optional import zlib @dataclass class LocalFileHeader: signature: bytes version: int flag: int method: int modification_time: int modification_date: int crc32: int compressed_size: int uncompressed_size: int file_name_length: int extra_field_length: int @dataclass class CentralDirectoryFileHeader: signature: bytes version: int minimum_version: int flag: int method: int modification_time: int modification_date: int crc32: int compressed_size: int uncompressed_size: int file_name_length: int extra_field_length: int file_comment_length: int disk_number: int internal_file_attributes: int external_file_attributes: int relative_offset: int class ZipStreamFile: def __init__( self, url: str, filename: str, file_offset: int, file_size: int, ): self.url = url self.filename = filename self.file_offset = file_offset self.file_size = file_size def download( self, filename: Optional[str] = None, base_path: Optional[str] = None, ): struct_format = "<4sHHHHHIIIHH" struct_size = struct.calcsize(struct_format) headers = {"Range": f"bytes={self.file_offset}-{self.file_offset+struct_size-1}"} local_file_header = requests.get(self.url, headers=headers, stream=True).content local_file_header = LocalFileHeader(*struct.unpack(struct_format, local_file_header)) data_offset = struct_size + local_file_header.file_name_length + local_file_header.extra_field_length headers = {"Range": f"bytes={self.file_offset+data_offset}-{self.file_offset+data_offset+self.file_size-1}"} data = requests.get(self.url, headers=headers, stream=True).content if local_file_header.method == 8: data = zlib.decompress(data, -15) elif local_file_header.method != 0: raise ValueError("Unsupported compression method.") filename = filename or self.filename if base_path is not None and filename is not None: with open(f"{base_path}/{filename}", "wb") as f: f.write(data) return data def __repr__(self): return f"ZipStreamFile(\n\turl={self.url},\n\tfilename={self.filename},\n\tfile_offset={self.file_offset},\n\tfile_size={self.file_size}\n)" class ZipStream: tail_size: int = 65536 @classmethod def size(self, url: str): headers = {"Range": f"bytes=-1"} return int(requests.get(url, headers=headers).headers["Content-Range"].split("/")[-1]) @classmethod def get_central_directory(self, url: str, offset: Optional[int] = None): headers = {"Range": f"bytes=-{self.tail_size}"} tail_data = requests.get(url, headers=headers, stream=True).content zip64_eocd = b"\x50\x4b\x06\x06" eocd_offset = tail_data.rfind(zip64_eocd) eocd = tail_data[eocd_offset:] cd_offset = int.from_bytes(eocd[48 : 48 + 8], byteorder="little") if offset is not None: cd_offset - offset headers = {"Range": f"bytes={cd_offset}-"} central_directory = requests.get(url, headers=headers, stream=True).content return central_directory @classmethod def get_files(self, url: str, central_directory: bytes, file_to_get: str = None): files = [] offset = 0 while offset <= len(central_directory): file, offset = ZipStream.get_file(url=url, central_directory=central_directory, offset=offset) if file is None: continue if file_to_get is None: files.append(file) elif file_to_get is not None and file_to_get in file.filename: return file return files @classmethod def get_file(self, url: str, central_directory: bytes, offset: int): struct_format = "<4sHHHHHHIIIHHHHHII" struct_size = struct.calcsize(struct_format) buffer = central_directory[offset : offset + struct_size] if len(buffer) < struct_size: return None, offset + struct_size central_directory_file_header = CentralDirectoryFileHeader(*struct.unpack(struct_format, buffer)) filename = central_directory[ offset + struct_size : offset + struct_size + central_directory_file_header.file_name_length ].decode("utf-8") next_offset = ( offset + struct_size + central_directory_file_header.file_name_length + central_directory_file_header.extra_field_length + central_directory_file_header.file_comment_length ) if not filename: return None, next_offset is_zip64 = (central_directory_file_header.compressed_size == 2**32 - 1) or ( central_directory_file_header.relative_offset == 2**32 - 1 ) if is_zip64: extra = central_directory[ offset + struct_size + central_directory_file_header.file_name_length : next_offset ] central_directory_file_header.relative_offset = int.from_bytes(extra[-8:], byteorder="little") return ( ZipStreamFile( url=url, filename=filename, file_offset=central_directory_file_header.relative_offset, file_size=central_directory_file_header.compressed_size, ), next_offset, ) def __init__( self, url: str, central_directory: Optional[bytes] = None, offset: Optional[int] = None, ): self.url = url central_directory = central_directory or ZipStream.get_central_directory(url=self.url, offset=offset) self.central_directory = central_directory self.files = ZipStream.get_files(url=self.url, central_directory=self.central_directory)