diff --git a/libs/py7zr/__init__.py b/libs/py7zr/__init__.py new file mode 100644 index 000000000..b01a37e57 --- /dev/null +++ b/libs/py7zr/__init__.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python +# +# Pure python p7zr implementation +# Copyright (C) 2019 Hiroshi Miura +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +from py7zr.exceptions import Bad7zFile, DecompressionError, UnsupportedCompressionMethodError +from py7zr.py7zr import ArchiveInfo, FileInfo, SevenZipFile, is_7zfile, pack_7zarchive, unpack_7zarchive + +__copyright__ = 'Copyright (C) 2019 Hiroshi Miura' +__version__ = "0.7.0" + +__all__ = ['__version__', 'ArchiveInfo', 'FileInfo', 'SevenZipFile', 'is_7zfile', + 'UnsupportedCompressionMethodError', 'Bad7zFile', 'DecompressionError', + 'pack_7zarchive', 'unpack_7zarchive'] + diff --git a/libs/py7zr/archiveinfo.py b/libs/py7zr/archiveinfo.py new file mode 100644 index 000000000..cbd42381d --- /dev/null +++ b/libs/py7zr/archiveinfo.py @@ -0,0 +1,1103 @@ +#!/usr/bin/python -u +# +# p7zr library +# +# Copyright (c) 2019,2020 Hiroshi Miura +# Copyright (c) 2004-2015 by Joachim Bauch, mail@joachim-bauch.de +# 7-Zip Copyright (C) 1999-2010 Igor Pavlov +# LZMA SDK Copyright (C) 1999-2010 Igor Pavlov +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +import functools +import io +import os +import struct +from binascii import unhexlify +from functools import reduce +from io import BytesIO +from operator import and_, or_ +from struct import pack, unpack +from typing import Any, BinaryIO, Dict, List, Optional, Tuple + +from py7zr.compression import SevenZipCompressor, SevenZipDecompressor +from py7zr.exceptions import Bad7zFile, UnsupportedCompressionMethodError +from py7zr.helpers import ArchiveTimestamp, calculate_crc32 +from py7zr.properties import MAGIC_7Z, CompressionMethod, Property + +MAX_LENGTH = 65536 +P7ZIP_MAJOR_VERSION = b'\x00' +P7ZIP_MINOR_VERSION = b'\x04' + + +def read_crcs(file: BinaryIO, count: int) -> List[int]: + data = file.read(4 * count) + return [unpack(' Tuple[bytes, ...]: + return unpack(b'B' * length, file.read(length)) + + +def read_byte(file: BinaryIO) -> int: + return ord(file.read(1)) + + +def write_bytes(file: BinaryIO, data: bytes): + return file.write(data) + + +def write_byte(file: BinaryIO, data): + assert len(data) == 1 + return write_bytes(file, data) + + +def read_real_uint64(file: BinaryIO) -> Tuple[int, bytes]: + """read 8 bytes, return unpacked value as a little endian unsigned long long, and raw data.""" + res = file.read(8) + a = unpack(' Tuple[int, bytes]: + """read 4 bytes, return unpacked value as a little endian unsigned long, and raw data.""" + res = file.read(4) + a = unpack(' int: + """read UINT64, definition show in write_uint64()""" + b = ord(file.read(1)) + if b == 255: + return read_real_uint64(file)[0] + blen = [(0b01111111, 0), (0b10111111, 1), (0b11011111, 2), (0b11101111, 3), + (0b11110111, 4), (0b11111011, 5), (0b11111101, 6), (0b11111110, 7)] + mask = 0x80 + vlen = 8 + for v, l in blen: + if b <= v: + vlen = l + break + mask >>= 1 + if vlen == 0: + return b & (mask - 1) + val = file.read(vlen) + value = int.from_bytes(val, byteorder='little') + highpart = b & (mask - 1) + return value + (highpart << (vlen * 8)) + + +def write_real_uint64(file: BinaryIO, value: int): + """write 8 bytes, as an unsigned long long.""" + file.write(pack(' 0x01ffffffffffffff: + file.write(b'\xff') + file.write(value.to_bytes(8, 'little')) + return + byte_length = (value.bit_length() + 7) // 8 + ba = bytearray(value.to_bytes(byte_length, 'little')) + high_byte = int(ba[-1]) + if high_byte < 2 << (8 - byte_length - 1): + for x in range(byte_length - 1): + high_byte |= 0x80 >> x + file.write(pack('B', high_byte)) + file.write(ba[:byte_length - 1]) + else: + mask = 0x80 + for x in range(byte_length): + mask |= 0x80 >> x + file.write(pack('B', mask)) + file.write(ba) + + +def read_boolean(file: BinaryIO, count: int, checkall: bool = False) -> List[bool]: + if checkall: + all_defined = file.read(1) + if all_defined != unhexlify('00'): + return [True] * count + result = [] + b = 0 + mask = 0 + for i in range(count): + if mask == 0: + b = ord(file.read(1)) + mask = 0x80 + result.append(b & mask != 0) + mask >>= 1 + return result + + +def write_boolean(file: BinaryIO, booleans: List[bool], all_defined: bool = False): + if all_defined and reduce(and_, booleans, True): + file.write(b'\x01') + return + elif all_defined: + file.write(b'\x00') + o = bytearray(-(-len(booleans) // 8)) + for i, b in enumerate(booleans): + if b: + o[i // 8] |= 1 << (7 - i % 8) + file.write(o) + + +def read_utf16(file: BinaryIO) -> str: + """read a utf-16 string from file""" + val = '' + for _ in range(MAX_LENGTH): + ch = file.read(2) + if ch == unhexlify('0000'): + break + val += ch.decode('utf-16LE') + return val + + +def write_utf16(file: BinaryIO, val: str): + """write a utf-16 string to file""" + for c in val: + file.write(c.encode('utf-16LE')) + file.write(b'\x00\x00') + + +def bits_to_bytes(bit_length: int) -> int: + return - (-bit_length // 8) + + +class ArchiveProperties: + + __slots__ = ['property_data'] + + def __init__(self): + self.property_data = [] + + @classmethod + def retrieve(cls, file): + return cls()._read(file) + + def _read(self, file): + pid = file.read(1) + if pid == Property.ARCHIVE_PROPERTIES: + while True: + ptype = file.read(1) + if ptype == Property.END: + break + size = read_uint64(file) + props = read_bytes(file, size) + self.property_data.append(props) + return self + + def write(self, file): + if len(self.property_data) > 0: + write_byte(file, Property.ARCHIVE_PROPERTIES) + for data in self.property_data: + write_uint64(file, len(data)) + write_bytes(file, data) + write_byte(file, Property.END) + + +class PackInfo: + """ information about packed streams """ + + __slots__ = ['packpos', 'numstreams', 'packsizes', 'packpositions', 'crcs'] + + def __init__(self) -> None: + self.packpos = 0 # type: int + self.numstreams = 0 # type: int + self.packsizes = [] # type: List[int] + self.crcs = None # type: Optional[List[int]] + + @classmethod + def retrieve(cls, file: BinaryIO): + return cls()._read(file) + + def _read(self, file: BinaryIO): + self.packpos = read_uint64(file) + self.numstreams = read_uint64(file) + pid = file.read(1) + if pid == Property.SIZE: + self.packsizes = [read_uint64(file) for _ in range(self.numstreams)] + pid = file.read(1) + if pid == Property.CRC: + self.crcs = [read_uint64(file) for _ in range(self.numstreams)] + pid = file.read(1) + if pid != Property.END: + raise Bad7zFile('end id expected but %s found' % repr(pid)) + self.packpositions = [sum(self.packsizes[:i]) for i in range(self.numstreams + 1)] # type: List[int] + return self + + def write(self, file: BinaryIO): + assert self.packpos is not None + numstreams = len(self.packsizes) + assert self.crcs is None or len(self.crcs) == numstreams + write_byte(file, Property.PACK_INFO) + write_uint64(file, self.packpos) + write_uint64(file, numstreams) + write_byte(file, Property.SIZE) + for size in self.packsizes: + write_uint64(file, size) + if self.crcs is not None: + write_bytes(file, Property.CRC) + for crc in self.crcs: + write_uint64(file, crc) + write_byte(file, Property.END) + + +class Folder: + """ a "Folder" represents a stream of compressed data. + coders: list of coder + num_coders: length of coders + coder: hash list + keys of coders: method, numinstreams, numoutstreams, properties + unpacksizes: uncompressed sizes of outstreams + """ + + __slots__ = ['unpacksizes', 'solid', 'coders', 'digestdefined', 'totalin', 'totalout', + 'bindpairs', 'packed_indices', 'crc', 'decompressor', 'compressor', 'files'] + + def __init__(self) -> None: + self.unpacksizes = None # type: Optional[List[int]] + self.coders = [] # type: List[Dict[str, Any]] + self.bindpairs = [] # type: List[Any] + self.packed_indices = [] # type: List[int] + # calculated values + self.totalin = 0 # type: int + self.totalout = 0 # type: int + # internal values + self.solid = False # type: bool + self.digestdefined = False # type: bool + self.crc = None # type: Optional[int] + # compress/decompress objects + self.decompressor = None # type: Optional[SevenZipDecompressor] + self.compressor = None # type: Optional[SevenZipCompressor] + self.files = None + + @classmethod + def retrieve(cls, file: BinaryIO): + obj = cls() + obj._read(file) + return obj + + def _read(self, file: BinaryIO) -> None: + num_coders = read_uint64(file) + for _ in range(num_coders): + b = read_byte(file) + methodsize = b & 0xf + iscomplex = b & 0x10 == 0x10 + hasattributes = b & 0x20 == 0x20 + c = {'method': file.read(methodsize)} # type: Dict[str, Any] + if iscomplex: + c['numinstreams'] = read_uint64(file) + c['numoutstreams'] = read_uint64(file) + else: + c['numinstreams'] = 1 + c['numoutstreams'] = 1 + self.totalin += c['numinstreams'] + self.totalout += c['numoutstreams'] + if hasattributes: + proplen = read_uint64(file) + c['properties'] = file.read(proplen) + self.coders.append(c) + num_bindpairs = self.totalout - 1 + for i in range(num_bindpairs): + self.bindpairs.append((read_uint64(file), read_uint64(file),)) + num_packedstreams = self.totalin - num_bindpairs + if num_packedstreams == 1: + for i in range(self.totalin): + if self._find_in_bin_pair(i) < 0: # there is no in_bin_pair + self.packed_indices.append(i) + elif num_packedstreams > 1: + for i in range(num_packedstreams): + self.packed_indices.append(read_uint64(file)) + + def write(self, file: BinaryIO): + num_coders = len(self.coders) + assert num_coders > 0 + write_uint64(file, num_coders) + for i, c in enumerate(self.coders): + id = c['method'] # type: bytes + id_size = len(id) & 0x0f + iscomplex = 0x10 if not self.is_simple(c) else 0x00 + hasattributes = 0x20 if c['properties'] is not None else 0x00 + flag = struct.pack('B', id_size | iscomplex | hasattributes) + write_byte(file, flag) + write_bytes(file, id[:id_size]) + if not self.is_simple(c): + write_uint64(file, c['numinstreams']) + assert c['numoutstreams'] == 1 + write_uint64(file, c['numoutstreams']) + if c['properties'] is not None: + write_uint64(file, len(c['properties'])) + write_bytes(file, c['properties']) + num_bindpairs = self.totalout - 1 + assert len(self.bindpairs) == num_bindpairs + num_packedstreams = self.totalin - num_bindpairs + for bp in self.bindpairs: + write_uint64(file, bp[0]) + write_uint64(file, bp[1]) + if num_packedstreams > 1: + for pi in self.packed_indices: + write_uint64(file, pi) + + def is_simple(self, coder): + return coder['numinstreams'] == 1 and coder['numoutstreams'] == 1 + + def get_decompressor(self, size: int, reset: bool = False) -> SevenZipDecompressor: + if self.decompressor is not None and not reset: + return self.decompressor + else: + try: + self.decompressor = SevenZipDecompressor(self.coders, size, self.crc) + except Exception as e: + raise e + if self.decompressor is not None: + return self.decompressor + else: + raise + + def get_compressor(self) -> SevenZipCompressor: + if self.compressor is not None: + return self.compressor + else: + try: + # FIXME: set filters + self.compressor = SevenZipCompressor() + self.coders = self.compressor.coders + return self.compressor + except Exception as e: + raise e + + def get_unpack_size(self) -> int: + if self.unpacksizes is None: + return 0 + for i in range(len(self.unpacksizes) - 1, -1, -1): + if self._find_out_bin_pair(i): + return self.unpacksizes[i] + raise TypeError('not found') + + def _find_in_bin_pair(self, index: int) -> int: + for idx, (a, b) in enumerate(self.bindpairs): + if a == index: + return idx + return -1 + + def _find_out_bin_pair(self, index: int) -> int: + for idx, (a, b) in enumerate(self.bindpairs): + if b == index: + return idx + return -1 + + def is_encrypted(self) -> bool: + return CompressionMethod.CRYPT_AES256_SHA256 in [x['method'] for x in self.coders] + + +class UnpackInfo: + """ combines multiple folders """ + + __slots__ = ['numfolders', 'folders', 'datastreamidx'] + + @classmethod + def retrieve(cls, file: BinaryIO): + obj = cls() + obj._read(file) + return obj + + def __init__(self): + self.numfolders = None + self.folders = [] + self.datastreamidx = None + + def _read(self, file: BinaryIO): + pid = file.read(1) + if pid != Property.FOLDER: + raise Bad7zFile('folder id expected but %s found' % repr(pid)) + self.numfolders = read_uint64(file) + self.folders = [] + external = read_byte(file) + if external == 0x00: + self.folders = [Folder.retrieve(file) for _ in range(self.numfolders)] + else: + datastreamidx = read_uint64(file) + current_pos = file.tell() + file.seek(datastreamidx, 0) + self.folders = [Folder.retrieve(file) for _ in range(self.numfolders)] + file.seek(current_pos, 0) + self._retrieve_coders_info(file) + + def _retrieve_coders_info(self, file: BinaryIO): + pid = file.read(1) + if pid != Property.CODERS_UNPACK_SIZE: + raise Bad7zFile('coders unpack size id expected but %s found' % repr(pid)) + for folder in self.folders: + folder.unpacksizes = [read_uint64(file) for _ in range(folder.totalout)] + pid = file.read(1) + if pid == Property.CRC: + defined = read_boolean(file, self.numfolders, checkall=True) + crcs = read_crcs(file, self.numfolders) + for idx, folder in enumerate(self.folders): + folder.digestdefined = defined[idx] + folder.crc = crcs[idx] + pid = file.read(1) + if pid != Property.END: + raise Bad7zFile('end id expected but %s found at %d' % (repr(pid), file.tell())) + + def write(self, file: BinaryIO): + assert self.numfolders is not None + assert self.folders is not None + assert self.numfolders == len(self.folders) + file.write(Property.UNPACK_INFO) + file.write(Property.FOLDER) + write_uint64(file, self.numfolders) + write_byte(file, b'\x00') + for folder in self.folders: + folder.write(file) + # If support external entity, we may write + # self.datastreamidx here. + # folder data will be written in another place. + # write_byte(file, b'\x01') + # assert self.datastreamidx is not None + # write_uint64(file, self.datastreamidx) + write_byte(file, Property.CODERS_UNPACK_SIZE) + for folder in self.folders: + for i in range(folder.totalout): + write_uint64(file, folder.unpacksizes[i]) + write_byte(file, Property.END) + + +class SubstreamsInfo: + """ defines the substreams of a folder """ + + __slots__ = ['digests', 'digestsdefined', 'unpacksizes', 'num_unpackstreams_folders'] + + def __init__(self): + self.digests = [] # type: List[int] + self.digestsdefined = [] # type: List[bool] + self.unpacksizes = None # type: Optional[List[int]] + self.num_unpackstreams_folders = [] # type: List[int] + + @classmethod + def retrieve(cls, file: BinaryIO, numfolders: int, folders: List[Folder]): + obj = cls() + obj._read(file, numfolders, folders) + return obj + + def _read(self, file: BinaryIO, numfolders: int, folders: List[Folder]): + pid = file.read(1) + if pid == Property.NUM_UNPACK_STREAM: + self.num_unpackstreams_folders = [read_uint64(file) for _ in range(numfolders)] + pid = file.read(1) + else: + self.num_unpackstreams_folders = [1] * numfolders + if pid == Property.SIZE: + self.unpacksizes = [] + for i in range(len(self.num_unpackstreams_folders)): + totalsize = 0 # type: int + for j in range(1, self.num_unpackstreams_folders[i]): + size = read_uint64(file) + self.unpacksizes.append(size) + totalsize += size + self.unpacksizes.append(folders[i].get_unpack_size() - totalsize) + pid = file.read(1) + num_digests = 0 + num_digests_total = 0 + for i in range(numfolders): + numsubstreams = self.num_unpackstreams_folders[i] + if numsubstreams != 1 or not folders[i].digestdefined: + num_digests += numsubstreams + num_digests_total += numsubstreams + if pid == Property.CRC: + defined = read_boolean(file, num_digests, checkall=True) + crcs = read_crcs(file, num_digests) + didx = 0 + for i in range(numfolders): + folder = folders[i] + numsubstreams = self.num_unpackstreams_folders[i] + if numsubstreams == 1 and folder.digestdefined and folder.crc is not None: + self.digestsdefined.append(True) + self.digests.append(folder.crc) + else: + for j in range(numsubstreams): + self.digestsdefined.append(defined[didx]) + self.digests.append(crcs[didx]) + didx += 1 + pid = file.read(1) + if pid != Property.END: + raise Bad7zFile('end id expected but %r found' % pid) + if not self.digestsdefined: + self.digestsdefined = [False] * num_digests_total + self.digests = [0] * num_digests_total + + def write(self, file: BinaryIO, numfolders: int): + assert self.num_unpackstreams_folders is not None + if len(self.num_unpackstreams_folders) == 0: + # nothing to write + return + if self.unpacksizes is None: + raise ValueError + write_byte(file, Property.SUBSTREAMS_INFO) + if not functools.reduce(lambda x, y: x and (y == 1), self.num_unpackstreams_folders, True): + write_byte(file, Property.NUM_UNPACK_STREAM) + for n in self.num_unpackstreams_folders: + write_uint64(file, n) + write_byte(file, Property.SIZE) + idx = 0 + for i in range(numfolders): + for j in range(1, self.num_unpackstreams_folders[i]): + size = self.unpacksizes[idx] + write_uint64(file, size) + idx += 1 + idx += 1 + if functools.reduce(lambda x, y: x or y, self.digestsdefined, False): + write_byte(file, Property.CRC) + write_boolean(file, self.digestsdefined, all_defined=True) + write_crcs(file, self.digests) + write_byte(file, Property.END) + + +class StreamsInfo: + """ information about compressed streams """ + + __slots__ = ['packinfo', 'unpackinfo', 'substreamsinfo'] + + def __init__(self): + self.packinfo = None # type: PackInfo + self.unpackinfo = None # type: UnpackInfo + self.substreamsinfo = None # type: Optional[SubstreamsInfo] + + @classmethod + def retrieve(cls, file: BinaryIO): + obj = cls() + obj.read(file) + return obj + + def read(self, file: BinaryIO) -> None: + pid = file.read(1) + if pid == Property.PACK_INFO: + self.packinfo = PackInfo.retrieve(file) + pid = file.read(1) + if pid == Property.UNPACK_INFO: + self.unpackinfo = UnpackInfo.retrieve(file) + pid = file.read(1) + if pid == Property.SUBSTREAMS_INFO: + self.substreamsinfo = SubstreamsInfo.retrieve(file, self.unpackinfo.numfolders, self.unpackinfo.folders) + pid = file.read(1) + if pid != Property.END: + raise Bad7zFile('end id expected but %s found' % repr(pid)) + + def write(self, file: BinaryIO): + write_byte(file, Property.MAIN_STREAMS_INFO) + self._write(file) + + def _write(self, file: BinaryIO): + if self.packinfo is not None: + self.packinfo.write(file) + if self.unpackinfo is not None: + self.unpackinfo.write(file) + if self.substreamsinfo is not None: + self.substreamsinfo.write(file, self.unpackinfo.numfolders) + write_byte(file, Property.END) + + +class HeaderStreamsInfo(StreamsInfo): + + def __init__(self): + super().__init__() + self.packinfo = PackInfo() + self.unpackinfo = UnpackInfo() + folder = Folder() + folder.compressor = SevenZipCompressor() + folder.coders = folder.compressor.coders + folder.solid = False + folder.digestdefined = False + folder.bindpairs = [] + folder.totalin = 1 + folder.totalout = 1 + folder.digestdefined = [True] + self.unpackinfo.numfolders = 1 + self.unpackinfo.folders = [folder] + + def write(self, file: BinaryIO): + self._write(file) + + +class FilesInfo: + """ holds file properties """ + + __slots__ = ['files', 'emptyfiles', 'antifiles'] + + def __init__(self): + self.files = [] # type: List[Dict[str, Any]] + self.emptyfiles = [] # type: List[bool] + self.antifiles = None + + @classmethod + def retrieve(cls, file: BinaryIO): + obj = cls() + obj._read(file) + return obj + + def _read(self, fp: BinaryIO): + numfiles = read_uint64(fp) + self.files = [{'emptystream': False} for _ in range(numfiles)] + numemptystreams = 0 + while True: + prop = fp.read(1) + if prop == Property.END: + break + size = read_uint64(fp) + if prop == Property.DUMMY: + # Added by newer versions of 7z to adjust padding. + fp.seek(size, os.SEEK_CUR) + continue + buffer = io.BytesIO(fp.read(size)) + if prop == Property.EMPTY_STREAM: + isempty = read_boolean(buffer, numfiles, checkall=False) + list(map(lambda x, y: x.update({'emptystream': y}), self.files, isempty)) # type: ignore + numemptystreams += isempty.count(True) + elif prop == Property.EMPTY_FILE: + self.emptyfiles = read_boolean(buffer, numemptystreams, checkall=False) + elif prop == Property.ANTI: + self.antifiles = read_boolean(buffer, numemptystreams, checkall=False) + elif prop == Property.NAME: + external = buffer.read(1) + if external == b'\x00': + self._read_name(buffer) + else: + dataindex = read_uint64(buffer) + current_pos = fp.tell() + fp.seek(dataindex, 0) + self._read_name(fp) + fp.seek(current_pos, 0) + elif prop == Property.CREATION_TIME: + self._read_times(buffer, 'creationtime') + elif prop == Property.LAST_ACCESS_TIME: + self._read_times(buffer, 'lastaccesstime') + elif prop == Property.LAST_WRITE_TIME: + self._read_times(buffer, 'lastwritetime') + elif prop == Property.ATTRIBUTES: + defined = read_boolean(buffer, numfiles, checkall=True) + external = buffer.read(1) + if external == b'\x00': + self._read_attributes(buffer, defined) + else: + dataindex = read_uint64(buffer) + # try to read external data + current_pos = fp.tell() + fp.seek(dataindex, 0) + self._read_attributes(fp, defined) + fp.seek(current_pos, 0) + elif prop == Property.START_POS: + self._read_start_pos(buffer) + else: + raise Bad7zFile('invalid type %r' % prop) + + def _read_name(self, buffer: BinaryIO) -> None: + for f in self.files: + f['filename'] = read_utf16(buffer).replace('\\', '/') + + def _read_attributes(self, buffer: BinaryIO, defined: List[bool]) -> None: + for idx, f in enumerate(self.files): + f['attributes'] = read_uint32(buffer)[0] if defined[idx] else None + + def _read_times(self, fp: BinaryIO, name: str) -> None: + defined = read_boolean(fp, len(self.files), checkall=True) + # NOTE: the "external" flag is currently ignored, should be 0x00 + external = fp.read(1) + assert external == b'\x00' + for i, f in enumerate(self.files): + f[name] = ArchiveTimestamp(read_real_uint64(fp)[0]) if defined[i] else None + + def _read_start_pos(self, fp: BinaryIO) -> None: + defined = read_boolean(fp, len(self.files), checkall=True) + # NOTE: the "external" flag is currently ignored, should be 0x00 + external = fp.read(1) + assert external == 0x00 + for i, f in enumerate(self.files): + f['startpos'] = read_real_uint64(fp)[0] if defined[i] else None + + def _write_times(self, fp: BinaryIO, propid, name: str) -> None: + write_byte(fp, propid) + defined = [] # type: List[bool] + num_defined = 0 # type: int + for f in self.files: + if name in f.keys(): + if f[name] is not None: + defined.append(True) + num_defined += 1 + size = num_defined * 8 + 2 + if not reduce(and_, defined, True): + size += bits_to_bytes(num_defined) + write_uint64(fp, size) + write_boolean(fp, defined, all_defined=True) + write_byte(fp, b'\x00') + for i, file in enumerate(self.files): + if defined[i]: + write_real_uint64(fp, ArchiveTimestamp.from_datetime(file[name])) + else: + pass + + def _write_prop_bool_vector(self, fp: BinaryIO, propid, vector) -> None: + write_byte(fp, propid) + write_boolean(fp, vector, all_defined=True) + + @staticmethod + def _are_there(vector) -> bool: + if vector is not None: + if functools.reduce(or_, vector, False): + return True + return False + + def _write_names(self, file: BinaryIO): + name_defined = 0 + names = [] + name_size = 0 + for f in self.files: + if f.get('filename', None) is not None: + name_defined += 1 + names.append(f['filename']) + name_size += len(f['filename'].encode('utf-16LE')) + 2 # len(str + NULL_WORD) + if name_defined > 0: + write_byte(file, Property.NAME) + write_uint64(file, name_size + 1) + write_byte(file, b'\x00') + for n in names: + write_utf16(file, n) + + def _write_attributes(self, file): + defined = [] # type: List[bool] + num_defined = 0 + for f in self.files: + if 'attributes' in f.keys() and f['attributes'] is not None: + defined.append(True) + num_defined += 1 + else: + defined.append(False) + size = num_defined * 4 + 2 + if num_defined != len(defined): + size += bits_to_bytes(num_defined) + write_byte(file, Property.ATTRIBUTES) + write_uint64(file, size) + write_boolean(file, defined, all_defined=True) + write_byte(file, b'\x00') + for i, f in enumerate(self.files): + if defined[i]: + write_uint32(file, f['attributes']) + + def write(self, file: BinaryIO): + assert self.files is not None + write_byte(file, Property.FILES_INFO) + numfiles = len(self.files) + write_uint64(file, numfiles) + emptystreams = [] # List[bool] + for f in self.files: + emptystreams.append(f['emptystream']) + if self._are_there(emptystreams): + write_byte(file, Property.EMPTY_STREAM) + write_uint64(file, bits_to_bytes(numfiles)) + write_boolean(file, emptystreams, all_defined=False) + else: + if self._are_there(self.emptyfiles): + self._write_prop_bool_vector(file, Property.EMPTY_FILE, self.emptyfiles) + if self._are_there(self.antifiles): + self._write_prop_bool_vector(file, Property.ANTI, self.antifiles) + # Name + self._write_names(file) + # timestamps + self._write_times(file, Property.CREATION_TIME, 'creationtime') + self._write_times(file, Property.LAST_ACCESS_TIME, 'lastaccesstime') + self._write_times(file, Property.LAST_WRITE_TIME, 'lastwritetime') + # start_pos + # FIXME: TBD + # attribute + self._write_attributes(file) + write_byte(file, Property.END) + + +class Header: + """ the archive header """ + + __slot__ = ['solid', 'properties', 'additional_streams', 'main_streams', 'files_info', + 'size', '_start_pos'] + + def __init__(self) -> None: + self.solid = False + self.properties = None + self.additional_streams = None + self.main_streams = None + self.files_info = None + self.size = 0 # fixme. Not implemented yet + self._start_pos = 0 + + @classmethod + def retrieve(cls, fp: BinaryIO, buffer: BytesIO, start_pos: int): + obj = cls() + obj._read(fp, buffer, start_pos) + return obj + + def _read(self, fp: BinaryIO, buffer: BytesIO, start_pos: int) -> None: + self._start_pos = start_pos + fp.seek(self._start_pos) + self._decode_header(fp, buffer) + + def _decode_header(self, fp: BinaryIO, buffer: BytesIO) -> None: + """ + Decode header data or encoded header data from buffer. + When buffer consist of encoded buffer, it get stream data + from it and call itself recursively + """ + pid = buffer.read(1) + if not pid: + # empty archive + return + elif pid == Property.HEADER: + self._extract_header_info(buffer) + return + elif pid != Property.ENCODED_HEADER: + raise TypeError('Unknown field: %r' % id) + # get from encoded header + streams = HeaderStreamsInfo.retrieve(buffer) + self._decode_header(fp, self._get_headerdata_from_streams(fp, streams)) + + def _get_headerdata_from_streams(self, fp: BinaryIO, streams: StreamsInfo) -> BytesIO: + """get header data from given streams.unpackinfo and packinfo. + folder data are stored in raw data positioned in afterheader.""" + buffer = io.BytesIO() + src_start = self._start_pos + for folder in streams.unpackinfo.folders: + if folder.is_encrypted(): + raise UnsupportedCompressionMethodError() + + uncompressed = folder.unpacksizes + if not isinstance(uncompressed, (list, tuple)): + uncompressed = [uncompressed] * len(folder.coders) + compressed_size = streams.packinfo.packsizes[0] + uncompressed_size = uncompressed[-1] + + src_start += streams.packinfo.packpos + fp.seek(src_start, 0) + decompressor = folder.get_decompressor(compressed_size) + folder_data = decompressor.decompress(fp.read(compressed_size))[:uncompressed_size] + src_start += uncompressed_size + if folder.digestdefined: + if folder.crc != calculate_crc32(folder_data): + raise Bad7zFile('invalid block data') + buffer.write(folder_data) + buffer.seek(0, 0) + return buffer + + def _encode_header(self, file: BinaryIO, afterheader: int): + startpos = file.tell() + packpos = startpos - afterheader + buf = io.BytesIO() + _, raw_header_len, raw_crc = self.write(buf, 0, False) + streams = HeaderStreamsInfo() + streams.packinfo.packpos = packpos + folder = streams.unpackinfo.folders[0] + folder.crc = [raw_crc] + folder.unpacksizes = [raw_header_len] + compressed_len = 0 + buf.seek(0, 0) + data = buf.read(io.DEFAULT_BUFFER_SIZE) + while data: + out = folder.compressor.compress(data) + compressed_len += len(out) + file.write(out) + data = buf.read(io.DEFAULT_BUFFER_SIZE) + out = folder.compressor.flush() + compressed_len += len(out) + file.write(out) + # + streams.packinfo.packsizes = [compressed_len] + # actual header start position + startpos = file.tell() + write_byte(file, Property.ENCODED_HEADER) + streams.write(file) + write_byte(file, Property.END) + return startpos + + def write(self, file: BinaryIO, afterheader: int, encoded: bool = True): + startpos = file.tell() + if encoded: + startpos = self._encode_header(file, afterheader) + else: + write_byte(file, Property.HEADER) + # Archive properties + if self.main_streams is not None: + self.main_streams.write(file) + # Files Info + if self.files_info is not None: + self.files_info.write(file) + if self.properties is not None: + self.properties.write(file) + # AdditionalStreams + if self.additional_streams is not None: + self.additional_streams.write(file) + write_byte(file, Property.END) + endpos = file.tell() + header_len = endpos - startpos + file.seek(startpos, io.SEEK_SET) + crc = calculate_crc32(file.read(header_len)) + file.seek(endpos, io.SEEK_SET) + return startpos, header_len, crc + + def _extract_header_info(self, fp: BinaryIO) -> None: + pid = fp.read(1) + if pid == Property.ARCHIVE_PROPERTIES: + self.properties = ArchiveProperties.retrieve(fp) + pid = fp.read(1) + if pid == Property.ADDITIONAL_STREAMS_INFO: + self.additional_streams = StreamsInfo.retrieve(fp) + pid = fp.read(1) + if pid == Property.MAIN_STREAMS_INFO: + self.main_streams = StreamsInfo.retrieve(fp) + pid = fp.read(1) + if pid == Property.FILES_INFO: + self.files_info = FilesInfo.retrieve(fp) + pid = fp.read(1) + if pid != Property.END: + raise Bad7zFile('end id expected but %s found' % (repr(pid))) + + @staticmethod + def build_header(folders): + header = Header() + header.files_info = FilesInfo() + header.main_streams = StreamsInfo() + header.main_streams.packinfo = PackInfo() + header.main_streams.packinfo.numstreams = 0 + header.main_streams.packinfo.packpos = 0 + header.main_streams.unpackinfo = UnpackInfo() + header.main_streams.unpackinfo.numfolders = len(folders) + header.main_streams.unpackinfo.folders = folders + header.main_streams.substreamsinfo = SubstreamsInfo() + header.main_streams.substreamsinfo.num_unpackstreams_folders = [len(folders)] + header.main_streams.substreamsinfo.unpacksizes = [] + return header + + +class SignatureHeader: + """The SignatureHeader class hold information of a signature header of archive.""" + + __slots__ = ['version', 'startheadercrc', 'nextheaderofs', 'nextheadersize', 'nextheadercrc'] + + def __init__(self) -> None: + self.version = (P7ZIP_MAJOR_VERSION, P7ZIP_MINOR_VERSION) # type: Tuple[bytes, ...] + self.startheadercrc = None # type: Optional[int] + self.nextheaderofs = None # type: Optional[int] + self.nextheadersize = None # type: Optional[int] + self.nextheadercrc = None # type: Optional[int] + + @classmethod + def retrieve(cls, file: BinaryIO): + obj = cls() + obj._read(file) + return obj + + def _read(self, file: BinaryIO) -> None: + file.seek(len(MAGIC_7Z), 0) + self.version = read_bytes(file, 2) + self.startheadercrc, _ = read_uint32(file) + self.nextheaderofs, data = read_real_uint64(file) + crc = calculate_crc32(data) + self.nextheadersize, data = read_real_uint64(file) + crc = calculate_crc32(data, crc) + self.nextheadercrc, data = read_uint32(file) + crc = calculate_crc32(data, crc) + if crc != self.startheadercrc: + raise Bad7zFile('invalid header data') + + def calccrc(self, length: int, header_crc: int): + self.nextheadersize = length + self.nextheadercrc = header_crc + assert self.nextheaderofs is not None + buf = io.BytesIO() + write_real_uint64(buf, self.nextheaderofs) + write_real_uint64(buf, self.nextheadersize) + write_uint32(buf, self.nextheadercrc) + startdata = buf.getvalue() + self.startheadercrc = calculate_crc32(startdata) + + def write(self, file: BinaryIO): + assert self.startheadercrc is not None + assert self.nextheadercrc is not None + assert self.nextheaderofs is not None + assert self.nextheadersize is not None + file.seek(0, 0) + write_bytes(file, MAGIC_7Z) + write_byte(file, self.version[0]) + write_byte(file, self.version[1]) + write_uint32(file, self.startheadercrc) + write_real_uint64(file, self.nextheaderofs) + write_real_uint64(file, self.nextheadersize) + write_uint32(file, self.nextheadercrc) + + def _write_skelton(self, file: BinaryIO): + file.seek(0, 0) + write_bytes(file, MAGIC_7Z) + write_byte(file, self.version[0]) + write_byte(file, self.version[1]) + write_uint32(file, 1) + write_real_uint64(file, 2) + write_real_uint64(file, 3) + write_uint32(file, 4) + + +class FinishHeader(): + """Finish header for multi-volume 7z file.""" + + def __init__(self): + self.archive_start_offset = None # data offset from end of the finish header + self.additional_start_block_size = None # start signature & start header size + self.finish_header_size = 20 + 16 + + @classmethod + def retrieve(cls, file): + obj = cls() + obj._read(file) + return obj + + def _read(self, file): + self.archive_start_offset = read_uint64(file) + self.additional_start_block_size = read_uint64(file) diff --git a/libs/py7zr/callbacks.py b/libs/py7zr/callbacks.py new file mode 100644 index 000000000..6b2c08383 --- /dev/null +++ b/libs/py7zr/callbacks.py @@ -0,0 +1,61 @@ +#!/usr/bin/python -u +# +# p7zr library +# +# Copyright (c) 2020 Hiroshi Miura +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +from abc import ABC, abstractmethod + + +class Callback(ABC): + """Abstrat base class for progress callbacks.""" + + @abstractmethod + def report_start_preparation(self): + """report a start of preparation event such as making list of files and looking into its properties.""" + pass + + @abstractmethod + def report_start(self, processing_file_path, processing_bytes): + """report a start event of specified archive file and its input bytes.""" + pass + + @abstractmethod + def report_end(self, processing_file_path, wrote_bytes): + """report an end event of specified archive file and its output bytes.""" + pass + + @abstractmethod + def report_warning(self, message): + """report an warning event with its message""" + pass + + @abstractmethod + def report_postprocess(self): + """report a start of post processing event such as set file properties and permissions or creating symlinks.""" + pass + + +class ExtractCallback(Callback): + """Abstrat base class for extraction progress callbacks.""" + pass + + +class ArchiveCallback(Callback): + """Abstrat base class for progress callbacks.""" + pass diff --git a/libs/py7zr/compression.py b/libs/py7zr/compression.py new file mode 100644 index 000000000..25d5726ac --- /dev/null +++ b/libs/py7zr/compression.py @@ -0,0 +1,384 @@ +#!/usr/bin/python -u +# +# p7zr library +# +# Copyright (c) 2019 Hiroshi Miura +# Copyright (c) 2004-2015 by Joachim Bauch, mail@joachim-bauch.de +# 7-Zip Copyright (C) 1999-2010 Igor Pavlov +# LZMA SDK Copyright (C) 1999-2010 Igor Pavlov +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +import bz2 +import io +import lzma +import os +import queue +import sys +import threading +from typing import IO, Any, BinaryIO, Dict, List, Optional, Union + +from py7zr import UnsupportedCompressionMethodError +from py7zr.extra import CopyDecompressor, DeflateDecompressor, ISevenZipDecompressor, ZstdDecompressor +from py7zr.helpers import MemIO, NullIO, calculate_crc32, readlink +from py7zr.properties import READ_BLOCKSIZE, ArchivePassword, CompressionMethod + +if sys.version_info < (3, 6): + import pathlib2 as pathlib +else: + import pathlib +try: + import zstandard as Zstd # type: ignore +except ImportError: + Zstd = None + + +class Worker: + """Extract worker class to invoke handler""" + + def __init__(self, files, src_start: int, header) -> None: + self.target_filepath = {} # type: Dict[int, Union[MemIO, pathlib.Path, None]] + self.files = files + self.src_start = src_start + self.header = header + + def extract(self, fp: BinaryIO, parallel: bool, q=None) -> None: + """Extract worker method to handle 7zip folder and decompress each files.""" + if hasattr(self.header, 'main_streams') and self.header.main_streams is not None: + src_end = self.src_start + self.header.main_streams.packinfo.packpositions[-1] + numfolders = self.header.main_streams.unpackinfo.numfolders + if numfolders == 1: + self.extract_single(fp, self.files, self.src_start, src_end, q) + else: + folders = self.header.main_streams.unpackinfo.folders + positions = self.header.main_streams.packinfo.packpositions + empty_files = [f for f in self.files if f.emptystream] + if not parallel: + self.extract_single(fp, empty_files, 0, 0, q) + for i in range(numfolders): + self.extract_single(fp, folders[i].files, self.src_start + positions[i], + self.src_start + positions[i + 1], q) + else: + filename = getattr(fp, 'name', None) + self.extract_single(open(filename, 'rb'), empty_files, 0, 0, q) + extract_threads = [] + for i in range(numfolders): + p = threading.Thread(target=self.extract_single, + args=(filename, folders[i].files, + self.src_start + positions[i], self.src_start + positions[i + 1], q)) + p.start() + extract_threads.append((p)) + for p in extract_threads: + p.join() + else: + empty_files = [f for f in self.files if f.emptystream] + self.extract_single(fp, empty_files, 0, 0, q) + + def extract_single(self, fp: Union[BinaryIO, str], files, src_start: int, src_end: int, + q: Optional[queue.Queue]) -> None: + """Single thread extractor that takes file lists in single 7zip folder.""" + if files is None: + return + if isinstance(fp, str): + fp = open(fp, 'rb') + fp.seek(src_start) + for f in files: + if q is not None: + q.put(('s', str(f.filename), str(f.compressed) if f.compressed is not None else '0')) + fileish = self.target_filepath.get(f.id, None) + if fileish is not None: + fileish.parent.mkdir(parents=True, exist_ok=True) + with fileish.open(mode='wb') as ofp: + if not f.emptystream: + # extract to file + self.decompress(fp, f.folder, ofp, f.uncompressed[-1], f.compressed, src_end) + ofp.seek(0) + else: + pass # just create empty file + elif not f.emptystream: + # read and bin off a data but check crc + with NullIO() as ofp: + self.decompress(fp, f.folder, ofp, f.uncompressed[-1], f.compressed, src_end) + if q is not None: + q.put(('e', str(f.filename), str(f.uncompressed[-1]))) + + def decompress(self, fp: BinaryIO, folder, fq: IO[Any], + size: int, compressed_size: Optional[int], src_end: int) -> None: + """decompressor wrapper called from extract method. + + :parameter fp: archive source file pointer + :parameter folder: Folder object that have decompressor object. + :parameter fq: output file pathlib.Path + :parameter size: uncompressed size of target file. + :parameter compressed_size: compressed size of target file. + :parameter src_end: end position of the folder + :returns None + """ + assert folder is not None + out_remaining = size + decompressor = folder.get_decompressor(compressed_size) + while out_remaining > 0: + max_length = min(out_remaining, io.DEFAULT_BUFFER_SIZE) + rest_size = src_end - fp.tell() + read_size = min(READ_BLOCKSIZE, rest_size) + if read_size == 0: + tmp = decompressor.decompress(b'', max_length) + if len(tmp) == 0: + raise Exception("decompression get wrong: no output data.") + else: + inp = fp.read(read_size) + tmp = decompressor.decompress(inp, max_length) + if len(tmp) > 0 and out_remaining >= len(tmp): + out_remaining -= len(tmp) + fq.write(tmp) + if out_remaining <= 0: + break + if fp.tell() >= src_end: + if decompressor.crc is not None and not decompressor.check_crc(): + print('\nCRC error! expected: {}, real: {}'.format(decompressor.crc, decompressor.digest)) + return + + def _find_link_target(self, target): + """Find the target member of a symlink or hardlink member in the archive. + """ + targetname = target.as_posix() # type: str + linkname = readlink(targetname) + # Check windows full path symlinks + if linkname.startswith("\\\\?\\"): + linkname = linkname[4:] + # normalize as posix style + linkname = pathlib.Path(linkname).as_posix() # type: str + member = None + for j in range(len(self.files)): + if linkname == self.files[j].origin.as_posix(): + # FIXME: when API user specify arcname, it will break + member = os.path.relpath(linkname, os.path.dirname(targetname)) + break + if member is None: + member = linkname + return member + + def archive(self, fp: BinaryIO, folder, deref=False): + """Run archive task for specified 7zip folder.""" + compressor = folder.get_compressor() + outsize = 0 + self.header.main_streams.packinfo.numstreams = 1 + num_unpack_streams = 0 + self.header.main_streams.substreamsinfo.digests = [] + self.header.main_streams.substreamsinfo.digestsdefined = [] + last_file_index = 0 + foutsize = 0 + for i, f in enumerate(self.files): + file_info = f.file_properties() + self.header.files_info.files.append(file_info) + self.header.files_info.emptyfiles.append(f.emptystream) + foutsize = 0 + if f.is_symlink and not deref: + last_file_index = i + num_unpack_streams += 1 + link_target = self._find_link_target(f.origin) # type: str + tgt = link_target.encode('utf-8') # type: bytes + insize = len(tgt) + crc = calculate_crc32(tgt, 0) # type: int + out = compressor.compress(tgt) + outsize += len(out) + foutsize += len(out) + fp.write(out) + self.header.main_streams.substreamsinfo.digests.append(crc) + self.header.main_streams.substreamsinfo.digestsdefined.append(True) + self.header.main_streams.substreamsinfo.unpacksizes.append(insize) + self.header.files_info.files[i]['maxsize'] = foutsize + elif not f.emptystream: + last_file_index = i + num_unpack_streams += 1 + insize = 0 + with f.origin.open(mode='rb') as fd: + data = fd.read(READ_BLOCKSIZE) + insize += len(data) + crc = 0 + while data: + crc = calculate_crc32(data, crc) + out = compressor.compress(data) + outsize += len(out) + foutsize += len(out) + fp.write(out) + data = fd.read(READ_BLOCKSIZE) + insize += len(data) + self.header.main_streams.substreamsinfo.digests.append(crc) + self.header.main_streams.substreamsinfo.digestsdefined.append(True) + self.header.files_info.files[i]['maxsize'] = foutsize + self.header.main_streams.substreamsinfo.unpacksizes.append(insize) + else: + out = compressor.flush() + outsize += len(out) + foutsize += len(out) + fp.write(out) + if len(self.files) > 0: + self.header.files_info.files[last_file_index]['maxsize'] = foutsize + # Update size data in header + self.header.main_streams.packinfo.packsizes = [outsize] + folder.unpacksizes = [sum(self.header.main_streams.substreamsinfo.unpacksizes)] + self.header.main_streams.substreamsinfo.num_unpackstreams_folders = [num_unpack_streams] + + def register_filelike(self, id: int, fileish: Union[MemIO, pathlib.Path, None]) -> None: + """register file-ish to worker.""" + self.target_filepath[id] = fileish + + +class SevenZipDecompressor: + """Main decompressor object which is properly configured and bind to each 7zip folder. + because 7zip folder can have a custom compression method""" + + lzma_methods_map = { + CompressionMethod.LZMA: lzma.FILTER_LZMA1, + CompressionMethod.LZMA2: lzma.FILTER_LZMA2, + CompressionMethod.DELTA: lzma.FILTER_DELTA, + CompressionMethod.P7Z_BCJ: lzma.FILTER_X86, + CompressionMethod.BCJ_ARM: lzma.FILTER_ARM, + CompressionMethod.BCJ_ARMT: lzma.FILTER_ARMTHUMB, + CompressionMethod.BCJ_IA64: lzma.FILTER_IA64, + CompressionMethod.BCJ_PPC: lzma.FILTER_POWERPC, + CompressionMethod.BCJ_SPARC: lzma.FILTER_SPARC, + } + + FILTER_BZIP2 = 0x31 + FILTER_ZIP = 0x32 + FILTER_COPY = 0x33 + FILTER_AES = 0x34 + FILTER_ZSTD = 0x35 + alt_methods_map = { + CompressionMethod.MISC_BZIP2: FILTER_BZIP2, + CompressionMethod.MISC_DEFLATE: FILTER_ZIP, + CompressionMethod.COPY: FILTER_COPY, + CompressionMethod.CRYPT_AES256_SHA256: FILTER_AES, + CompressionMethod.MISC_ZSTD: FILTER_ZSTD, + } + + def __init__(self, coders: List[Dict[str, Any]], size: int, crc: Optional[int]) -> None: + # Get password which was set when creation of py7zr.SevenZipFile object. + self.input_size = size + self.consumed = 0 # type: int + self.crc = crc + self.digest = None # type: Optional[int] + if self._check_lzma_coders(coders): + self._set_lzma_decompressor(coders) + else: + self._set_alternative_decompressor(coders) + + def _check_lzma_coders(self, coders: List[Dict[str, Any]]) -> bool: + res = True + for coder in coders: + if self.lzma_methods_map.get(coder['method'], None) is None: + res = False + break + return res + + def _set_lzma_decompressor(self, coders: List[Dict[str, Any]]) -> None: + filters = [] # type: List[Dict[str, Any]] + for coder in coders: + if coder['numinstreams'] != 1 or coder['numoutstreams'] != 1: + raise UnsupportedCompressionMethodError('Only a simple compression method is currently supported.') + filter_id = self.lzma_methods_map.get(coder['method'], None) + if filter_id is None: + raise UnsupportedCompressionMethodError + properties = coder.get('properties', None) + if properties is not None: + filters[:0] = [lzma._decode_filter_properties(filter_id, properties)] # type: ignore + else: + filters[:0] = [{'id': filter_id}] + self.decompressor = lzma.LZMADecompressor(format=lzma.FORMAT_RAW, filters=filters) # type: Union[bz2.BZ2Decompressor, lzma.LZMADecompressor, ISevenZipDecompressor] # noqa + + def _set_alternative_decompressor(self, coders: List[Dict[str, Any]]) -> None: + filter_id = self.alt_methods_map.get(coders[0]['method'], None) + if filter_id == self.FILTER_BZIP2: + self.decompressor = bz2.BZ2Decompressor() + elif filter_id == self.FILTER_ZIP: + self.decompressor = DeflateDecompressor() + elif filter_id == self.FILTER_COPY: + self.decompressor = CopyDecompressor() + elif filter_id == self.FILTER_ZSTD and Zstd: + self.decompressor = ZstdDecompressor() + else: + raise UnsupportedCompressionMethodError + + def decompress(self, data: bytes, max_length: Optional[int] = None) -> bytes: + self.consumed += len(data) + if max_length is not None: + folder_data = self.decompressor.decompress(data, max_length=max_length) + else: + folder_data = self.decompressor.decompress(data) + # calculate CRC with uncompressed data + if self.crc is not None: + self.digest = calculate_crc32(folder_data, self.digest) + return folder_data + + def check_crc(self): + return self.crc == self.digest + + +class SevenZipCompressor: + + """Main compressor object to configured for each 7zip folder.""" + + __slots__ = ['filters', 'compressor', 'coders'] + + lzma_methods_map_r = { + lzma.FILTER_LZMA2: CompressionMethod.LZMA2, + lzma.FILTER_DELTA: CompressionMethod.DELTA, + lzma.FILTER_X86: CompressionMethod.P7Z_BCJ, + } + + def __init__(self, filters=None): + if filters is None: + self.filters = [{"id": lzma.FILTER_LZMA2, "preset": 7 | lzma.PRESET_EXTREME}, ] + else: + self.filters = filters + self.compressor = lzma.LZMACompressor(format=lzma.FORMAT_RAW, filters=self.filters) + self.coders = [] + for filter in self.filters: + if filter is None: + break + method = self.lzma_methods_map_r[filter['id']] + properties = lzma._encode_filter_properties(filter) + self.coders.append({'method': method, 'properties': properties, 'numinstreams': 1, 'numoutstreams': 1}) + + def compress(self, data): + return self.compressor.compress(data) + + def flush(self): + return self.compressor.flush() + + +def get_methods_names(coders: List[dict]) -> List[str]: + """Return human readable method names for specified coders""" + methods_name_map = { + CompressionMethod.LZMA2: "LZMA2", + CompressionMethod.LZMA: "LZMA", + CompressionMethod.DELTA: "delta", + CompressionMethod.P7Z_BCJ: "BCJ", + CompressionMethod.BCJ_ARM: "BCJ(ARM)", + CompressionMethod.BCJ_ARMT: "BCJ(ARMT)", + CompressionMethod.BCJ_IA64: "BCJ(IA64)", + CompressionMethod.BCJ_PPC: "BCJ(POWERPC)", + CompressionMethod.BCJ_SPARC: "BCJ(SPARC)", + CompressionMethod.CRYPT_AES256_SHA256: "7zAES", + } + methods_names = [] # type: List[str] + for coder in coders: + try: + methods_names.append(methods_name_map[coder['method']]) + except KeyError: + raise UnsupportedCompressionMethodError("Unknown method {}".format(coder['method'])) + return methods_names diff --git a/libs/py7zr/exceptions.py b/libs/py7zr/exceptions.py new file mode 100644 index 000000000..1a25e2089 --- /dev/null +++ b/libs/py7zr/exceptions.py @@ -0,0 +1,42 @@ +# +# p7zr library +# +# Copyright (c) 2019 Hiroshi Miura +# Copyright (c) 2004-2015 by Joachim Bauch, mail@joachim-bauch.de +# 7-Zip Copyright (C) 1999-2010 Igor Pavlov +# LZMA SDK Copyright (C) 1999-2010 Igor Pavlov +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + + +class ArchiveError(Exception): + pass + + +class Bad7zFile(ArchiveError): + pass + + +class UnsupportedCompressionMethodError(ArchiveError): + pass + + +class DecompressionError(ArchiveError): + pass + + +class InternalError(ArchiveError): + pass diff --git a/libs/py7zr/extra.py b/libs/py7zr/extra.py new file mode 100644 index 000000000..48cc840a5 --- /dev/null +++ b/libs/py7zr/extra.py @@ -0,0 +1,122 @@ +#!/usr/bin/python -u +# +# p7zr library +# +# Copyright (c) 2019 Hiroshi Miura +# Copyright (c) 2004-2015 by Joachim Bauch, mail@joachim-bauch.de +# 7-Zip Copyright (C) 1999-2010 Igor Pavlov +# LZMA SDK Copyright (C) 1999-2010 Igor Pavlov +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +import lzma +import zlib +from abc import ABC, abstractmethod +from typing import Any, Dict, List, Union + +from py7zr import UnsupportedCompressionMethodError +from py7zr.helpers import Buffer, calculate_key +from py7zr.properties import READ_BLOCKSIZE, CompressionMethod + +try: + import zstandard as Zstd # type: ignore +except ImportError: + Zstd = None + + +class ISevenZipCompressor(ABC): + @abstractmethod + def compress(self, data: Union[bytes, bytearray, memoryview]) -> bytes: + pass + + @abstractmethod + def flush(self) -> bytes: + pass + + +class ISevenZipDecompressor(ABC): + @abstractmethod + def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes: + pass + + +class DeflateDecompressor(ISevenZipDecompressor): + def __init__(self): + self.buf = b'' + self._decompressor = zlib.decompressobj(-15) + + def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1): + if max_length < 0: + res = self.buf + self._decompressor.decompress(data) + self.buf = b'' + else: + tmp = self.buf + self._decompressor.decompress(data) + res = tmp[:max_length] + self.buf = tmp[max_length:] + return res + + +class CopyDecompressor(ISevenZipDecompressor): + + def __init__(self): + self._buf = bytes() + + def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes: + if max_length < 0: + length = len(data) + else: + length = min(len(data), max_length) + buflen = len(self._buf) + if length > buflen: + res = self._buf + data[:length - buflen] + self._buf = data[length - buflen:] + else: + res = self._buf[:length] + self._buf = self._buf[length:] + data + return res + + +class ZstdDecompressor(ISevenZipDecompressor): + + def __init__(self): + if Zstd is None: + raise UnsupportedCompressionMethodError + self.buf = b'' # type: bytes + self._ctc = Zstd.ZstdDecompressor() # type: ignore + + def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes: + dobj = self._ctc.decompressobj() # type: ignore + if max_length < 0: + res = self.buf + dobj.decompress(data) + self.buf = b'' + else: + tmp = self.buf + dobj.decompress(data) + res = tmp[:max_length] + self.buf = tmp[max_length:] + return res + + +class ZstdCompressor(ISevenZipCompressor): + + def __init__(self): + if Zstd is None: + raise UnsupportedCompressionMethodError + self._ctc = Zstd.ZstdCompressor() # type: ignore + + def compress(self, data: Union[bytes, bytearray, memoryview]) -> bytes: + return self._ctc.compress(data) # type: ignore + + def flush(self): + pass diff --git a/libs/py7zr/helpers.py b/libs/py7zr/helpers.py new file mode 100644 index 000000000..1f84417b8 --- /dev/null +++ b/libs/py7zr/helpers.py @@ -0,0 +1,362 @@ +#!/usr/bin/python -u +# +# p7zr library +# +# Copyright (c) 2019 Hiroshi Miura +# Copyright (c) 2004-2015 by Joachim Bauch, mail@joachim-bauch.de +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# + +import _hashlib # type: ignore # noqa +import ctypes +import os +import pathlib +import platform +import sys +import time as _time +import zlib +from datetime import datetime, timedelta, timezone, tzinfo +from typing import BinaryIO, Optional, Union + +import py7zr.win32compat + + +def calculate_crc32(data: bytes, value: Optional[int] = None, blocksize: int = 1024 * 1024) -> int: + """Calculate CRC32 of strings with arbitrary lengths.""" + length = len(data) + pos = blocksize + if value: + value = zlib.crc32(data[:pos], value) + else: + value = zlib.crc32(data[:pos]) + while pos < length: + value = zlib.crc32(data[pos:pos + blocksize], value) + pos += blocksize + + return value & 0xffffffff + + +def _calculate_key1(password: bytes, cycles: int, salt: bytes, digest: str) -> bytes: + """Calculate 7zip AES encryption key.""" + if digest not in ('sha256'): + raise ValueError('Unknown digest method for password protection.') + assert cycles <= 0x3f + if cycles == 0x3f: + ba = bytearray(salt + password + bytes(32)) + key = bytes(ba[:32]) # type: bytes + else: + rounds = 1 << cycles + m = _hashlib.new(digest) + for round in range(rounds): + m.update(salt + password + round.to_bytes(8, byteorder='little', signed=False)) + key = m.digest()[:32] + return key + + +def _calculate_key2(password: bytes, cycles: int, salt: bytes, digest: str): + """Calculate 7zip AES encryption key. + It utilize ctypes and memoryview buffer and zero-copy technology on Python.""" + if digest not in ('sha256'): + raise ValueError('Unknown digest method for password protection.') + assert cycles <= 0x3f + if cycles == 0x3f: + key = bytes(bytearray(salt + password + bytes(32))[:32]) # type: bytes + else: + rounds = 1 << cycles + m = _hashlib.new(digest) + length = len(salt) + len(password) + + class RoundBuf(ctypes.LittleEndianStructure): + _pack_ = 1 + _fields_ = [ + ('saltpassword', ctypes.c_ubyte * length), + ('round', ctypes.c_uint64) + ] + + buf = RoundBuf() + for i, c in enumerate(salt + password): + buf.saltpassword[i] = c + buf.round = 0 + mv = memoryview(buf) # type: ignore # noqa + while buf.round < rounds: + m.update(mv) + buf.round += 1 + key = m.digest()[:32] + return key + + +if platform.python_implementation() == "PyPy": + calculate_key = _calculate_key1 # Avoid https://foss.heptapod.net/pypy/pypy/issues/3209 +else: + calculate_key = _calculate_key2 # ver2 is 1.7-2.0 times faster than ver1 + + +def filetime_to_dt(ft): + """Convert Windows NTFS file time into python datetime object.""" + EPOCH_AS_FILETIME = 116444736000000000 + us = (ft - EPOCH_AS_FILETIME) // 10 + return datetime(1970, 1, 1, tzinfo=timezone.utc) + timedelta(microseconds=us) + + +ZERO = timedelta(0) +HOUR = timedelta(hours=1) +SECOND = timedelta(seconds=1) + +# A class capturing the platform's idea of local time. +# (May result in wrong values on historical times in +# timezones where UTC offset and/or the DST rules had +# changed in the past.) + +STDOFFSET = timedelta(seconds=-_time.timezone) +if _time.daylight: + DSTOFFSET = timedelta(seconds=-_time.altzone) +else: + DSTOFFSET = STDOFFSET + +DSTDIFF = DSTOFFSET - STDOFFSET + + +class LocalTimezone(tzinfo): + + def fromutc(self, dt): + assert dt.tzinfo is self + stamp = (dt - datetime(1970, 1, 1, tzinfo=self)) // SECOND + args = _time.localtime(stamp)[:6] + dst_diff = DSTDIFF // SECOND + # Detect fold + fold = (args == _time.localtime(stamp - dst_diff)) + return datetime(*args, microsecond=dt.microsecond, tzinfo=self) + + def utcoffset(self, dt): + if self._isdst(dt): + return DSTOFFSET + else: + return STDOFFSET + + def dst(self, dt): + if self._isdst(dt): + return DSTDIFF + else: + return ZERO + + def tzname(self, dt): + return _time.tzname[self._isdst(dt)] + + def _isdst(self, dt): + tt = (dt.year, dt.month, dt.day, + dt.hour, dt.minute, dt.second, + dt.weekday(), 0, 0) + stamp = _time.mktime(tt) + tt = _time.localtime(stamp) + return tt.tm_isdst > 0 + + +Local = LocalTimezone() +TIMESTAMP_ADJUST = -11644473600 + + +class UTC(tzinfo): + """UTC""" + + def utcoffset(self, dt): + return ZERO + + def tzname(self, dt): + return "UTC" + + def dst(self, dt): + return ZERO + + def _call__(self): + return self + + +class ArchiveTimestamp(int): + """Windows FILETIME timestamp.""" + + def __repr__(self): + return '%s(%d)' % (type(self).__name__, self) + + def totimestamp(self) -> float: + """Convert 7z FILETIME to Python timestamp.""" + # FILETIME is 100-nanosecond intervals since 1601/01/01 (UTC) + return (self / 10000000.0) + TIMESTAMP_ADJUST + + def as_datetime(self): + """Convert FILETIME to Python datetime object.""" + return datetime.fromtimestamp(self.totimestamp(), UTC()) + + @staticmethod + def from_datetime(val): + return ArchiveTimestamp((val - TIMESTAMP_ADJUST) * 10000000.0) + + +def islink(path): + """ + Cross-platform islink implementation. + Supports Windows NT symbolic links and reparse points. + """ + is_symlink = os.path.islink(path) + if sys.version_info >= (3, 8) or sys.platform != "win32" or sys.getwindowsversion()[0] < 6: + return is_symlink + # special check for directory junctions which py38 does. + if is_symlink: + if py7zr.win32compat.is_reparse_point(path): + is_symlink = False + return is_symlink + + +def readlink(path: Union[str, pathlib.Path], *, dir_fd=None) -> Union[str, pathlib.Path]: + """ + Cross-platform compat implementation of os.readlink and Path.readlink(). + Supports Windows NT symbolic links and reparse points. + When called with path argument as pathlike(str), return result as a pathlike(str). + When called with Path object, return also Path object. + When called with path argument as bytes, return result as a bytes. + """ + is_path_pathlib = isinstance(path, pathlib.Path) + if sys.version_info >= (3, 9): + if is_path_pathlib and dir_fd is None: + return path.readlink() + else: + return os.readlink(path, dir_fd=dir_fd) + elif sys.version_info >= (3, 8) or sys.platform != "win32": + res = os.readlink(path, dir_fd=dir_fd) + # Hack to handle a wrong type of results + if isinstance(res, bytes): + res = os.fsdecode(res) + if is_path_pathlib: + return pathlib.Path(res) + else: + return res + elif not os.path.exists(str(path)): + raise OSError(22, 'Invalid argument', path) + return py7zr.win32compat.readlink(path) + + +class MemIO: + """pathlib.Path-like IO class to write memory(io.Bytes)""" + def __init__(self, buf: BinaryIO): + self._buf = buf + + def write(self, data: bytes) -> int: + return self._buf.write(data) + + def read(self, length: Optional[int] = None) -> bytes: + if length is not None: + return self._buf.read(length) + else: + return self._buf.read() + + def close(self) -> None: + self._buf.seek(0) + + def flush(self) -> None: + pass + + def seek(self, position: int) -> None: + self._buf.seek(position) + + def open(self, mode=None): + return self + + @property + def parent(self): + return self + + def mkdir(self, parents=None, exist_ok=False): + return None + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + +class NullIO: + """pathlib.Path-like IO class of /dev/null""" + + def __init__(self): + pass + + def write(self, data): + return len(data) + + def read(self, length=None): + if length is not None: + return bytes(length) + else: + return b'' + + def close(self): + pass + + def flush(self): + pass + + def open(self, mode=None): + return self + + @property + def parent(self): + return self + + def mkdir(self): + return None + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + +class BufferOverflow(Exception): + pass + + +class Buffer: + + def __init__(self, size: int = 16): + self._size = size + self._buf = bytearray(size) + self._buflen = 0 + self.view = memoryview(self._buf[0:0]) + + def add(self, data: Union[bytes, bytearray, memoryview]): + length = len(data) + if length + self._buflen > self._size: + raise BufferOverflow() + self._buf[self._buflen:self._buflen + length] = data + self._buflen += length + self.view = memoryview(self._buf[0:self._buflen]) + + def reset(self) -> None: + self._buflen = 0 + self.view = memoryview(self._buf[0:0]) + + def set(self, data: Union[bytes, bytearray, memoryview]) -> None: + length = len(data) + if length > self._size: + raise BufferOverflow() + self._buf[0:length] = data + self._buflen = length + self.view = memoryview(self._buf[0:length]) + + def __len__(self) -> int: + return self._buflen diff --git a/libs/py7zr/properties.py b/libs/py7zr/properties.py new file mode 100644 index 000000000..38cfbe8f5 --- /dev/null +++ b/libs/py7zr/properties.py @@ -0,0 +1,155 @@ +# +# p7zr library +# +# Copyright (c) 2019 Hiroshi Miura +# Copyright (c) 2004-2015 by Joachim Bauch, mail@joachim-bauch.de +# 7-Zip Copyright (C) 1999-2010 Igor Pavlov +# LZMA SDK Copyright (C) 1999-2010 Igor Pavlov +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +import binascii +from enum import Enum +from typing import Optional + +MAGIC_7Z = binascii.unhexlify('377abcaf271c') +FINISH_7Z = binascii.unhexlify('377abcaf271d') +READ_BLOCKSIZE = 32248 +QUEUELEN = READ_BLOCKSIZE * 2 + +READ_BLOCKSIZE = 32248 + + +class ByteEnum(bytes, Enum): + pass + + +class Property(ByteEnum): + """Hold 7zip property fixed values.""" + END = binascii.unhexlify('00') + HEADER = binascii.unhexlify('01') + ARCHIVE_PROPERTIES = binascii.unhexlify('02') + ADDITIONAL_STREAMS_INFO = binascii.unhexlify('03') + MAIN_STREAMS_INFO = binascii.unhexlify('04') + FILES_INFO = binascii.unhexlify('05') + PACK_INFO = binascii.unhexlify('06') + UNPACK_INFO = binascii.unhexlify('07') + SUBSTREAMS_INFO = binascii.unhexlify('08') + SIZE = binascii.unhexlify('09') + CRC = binascii.unhexlify('0a') + FOLDER = binascii.unhexlify('0b') + CODERS_UNPACK_SIZE = binascii.unhexlify('0c') + NUM_UNPACK_STREAM = binascii.unhexlify('0d') + EMPTY_STREAM = binascii.unhexlify('0e') + EMPTY_FILE = binascii.unhexlify('0f') + ANTI = binascii.unhexlify('10') + NAME = binascii.unhexlify('11') + CREATION_TIME = binascii.unhexlify('12') + LAST_ACCESS_TIME = binascii.unhexlify('13') + LAST_WRITE_TIME = binascii.unhexlify('14') + ATTRIBUTES = binascii.unhexlify('15') + COMMENT = binascii.unhexlify('16') + ENCODED_HEADER = binascii.unhexlify('17') + START_POS = binascii.unhexlify('18') + DUMMY = binascii.unhexlify('19') + + +class CompressionMethod(ByteEnum): + """Hold fixed values for method parameter.""" + COPY = binascii.unhexlify('00') + DELTA = binascii.unhexlify('03') + BCJ = binascii.unhexlify('04') + PPC = binascii.unhexlify('05') + IA64 = binascii.unhexlify('06') + ARM = binascii.unhexlify('07') + ARMT = binascii.unhexlify('08') + SPARC = binascii.unhexlify('09') + # SWAP = 02.. + SWAP2 = binascii.unhexlify('020302') + SWAP4 = binascii.unhexlify('020304') + # 7Z = 03.. + LZMA = binascii.unhexlify('030101') + PPMD = binascii.unhexlify('030401') + P7Z_BCJ = binascii.unhexlify('03030103') + P7Z_BCJ2 = binascii.unhexlify('0303011B') + BCJ_PPC = binascii.unhexlify('03030205') + BCJ_IA64 = binascii.unhexlify('03030401') + BCJ_ARM = binascii.unhexlify('03030501') + BCJ_ARMT = binascii.unhexlify('03030701') + BCJ_SPARC = binascii.unhexlify('03030805') + LZMA2 = binascii.unhexlify('21') + # MISC : 04.. + MISC_ZIP = binascii.unhexlify('0401') + MISC_BZIP2 = binascii.unhexlify('040202') + MISC_DEFLATE = binascii.unhexlify('040108') + MISC_DEFLATE64 = binascii.unhexlify('040109') + MISC_Z = binascii.unhexlify('0405') + MISC_LZH = binascii.unhexlify('0406') + NSIS_DEFLATE = binascii.unhexlify('040901') + NSIS_BZIP2 = binascii.unhexlify('040902') + # + MISC_ZSTD = binascii.unhexlify('04f71101') + MISC_BROTLI = binascii.unhexlify('04f71102') + MISC_LZ4 = binascii.unhexlify('04f71104') + MISC_LZS = binascii.unhexlify('04f71105') + MISC_LIZARD = binascii.unhexlify('04f71106') + # CRYPTO 06.. + CRYPT_ZIPCRYPT = binascii.unhexlify('06f10101') + CRYPT_RAR29AES = binascii.unhexlify('06f10303') + CRYPT_AES256_SHA256 = binascii.unhexlify('06f10701') + + +class SupportedMethods: + """Hold list of methods which python3 can support.""" + formats = [{'name': "7z", 'magic': MAGIC_7Z}] + codecs = [{'id': CompressionMethod.LZMA, 'name': "LZMA"}, + {'id': CompressionMethod.LZMA2, 'name': "LZMA2"}, + {'id': CompressionMethod.DELTA, 'name': "DELTA"}, + {'id': CompressionMethod.P7Z_BCJ, 'name': "BCJ"}, + {'id': CompressionMethod.BCJ_PPC, 'name': 'PPC'}, + {'id': CompressionMethod.BCJ_IA64, 'name': 'IA64'}, + {'id': CompressionMethod.BCJ_ARM, 'name': "ARM"}, + {'id': CompressionMethod.BCJ_ARMT, 'name': "ARMT"}, + {'id': CompressionMethod.BCJ_SPARC, 'name': 'SPARC'} + ] + + +# this class is Borg/Singleton +class ArchivePassword: + + _shared_state = { + '_password': None, + } + + def __init__(self, password: Optional[str] = None): + self.__dict__ = self._shared_state + if password is not None: + self._password = password + + def set(self, password): + self._password = password + + def get(self): + if self._password is not None: + return self._password + else: + return '' + + def __str__(self): + if self._password is not None: + return self._password + else: + return '' diff --git a/libs/py7zr/py7zr.py b/libs/py7zr/py7zr.py new file mode 100644 index 000000000..466ae6274 --- /dev/null +++ b/libs/py7zr/py7zr.py @@ -0,0 +1,974 @@ +#!/usr/bin/python -u +# +# p7zr library +# +# Copyright (c) 2019,2020 Hiroshi Miura +# Copyright (c) 2004-2015 by Joachim Bauch, mail@joachim-bauch.de +# 7-Zip Copyright (C) 1999-2010 Igor Pavlov +# LZMA SDK Copyright (C) 1999-2010 Igor Pavlov +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# +"""Read 7zip format archives.""" +import collections.abc +import datetime +import errno +import functools +import io +import operator +import os +import queue +import stat +import sys +import threading +from io import BytesIO +from typing import IO, Any, BinaryIO, Dict, List, Optional, Tuple, Union + +from py7zr.archiveinfo import Folder, Header, SignatureHeader +from py7zr.callbacks import ExtractCallback +from py7zr.compression import SevenZipCompressor, Worker, get_methods_names +from py7zr.exceptions import Bad7zFile, InternalError +from py7zr.helpers import ArchiveTimestamp, MemIO, calculate_crc32, filetime_to_dt +from py7zr.properties import MAGIC_7Z, READ_BLOCKSIZE, ArchivePassword + +if sys.version_info < (3, 6): + import contextlib2 as contextlib + import pathlib2 as pathlib +else: + import contextlib + import pathlib + +if sys.platform.startswith('win'): + import _winapi + +FILE_ATTRIBUTE_UNIX_EXTENSION = 0x8000 +FILE_ATTRIBUTE_WINDOWS_MASK = 0x04fff + + +class ArchiveFile: + """Represent each files metadata inside archive file. + It holds file properties; filename, permissions, and type whether + it is directory, link or normal file. + + Instances of the :class:`ArchiveFile` class are returned by iterating :attr:`files_list` of + :class:`SevenZipFile` objects. + Each object stores information about a single member of the 7z archive. Most of users use :meth:`extractall()`. + + The class also hold an archive parameter where file is exist in + archive file folder(container).""" + def __init__(self, id: int, file_info: Dict[str, Any]) -> None: + self.id = id + self._file_info = file_info + + def file_properties(self) -> Dict[str, Any]: + """Return file properties as a hash object. Following keys are included: ‘readonly’, ‘is_directory’, + ‘posix_mode’, ‘archivable’, ‘emptystream’, ‘filename’, ‘creationtime’, ‘lastaccesstime’, + ‘lastwritetime’, ‘attributes’ + """ + properties = self._file_info + if properties is not None: + properties['readonly'] = self.readonly + properties['posix_mode'] = self.posix_mode + properties['archivable'] = self.archivable + properties['is_directory'] = self.is_directory + return properties + + def _get_property(self, key: str) -> Any: + try: + return self._file_info[key] + except KeyError: + return None + + @property + def origin(self) -> pathlib.Path: + return self._get_property('origin') + + @property + def folder(self) -> Folder: + return self._get_property('folder') + + @property + def filename(self) -> str: + """return filename of archive file.""" + return self._get_property('filename') + + @property + def emptystream(self) -> bool: + """True if file is empty(0-byte file), otherwise False""" + return self._get_property('emptystream') + + @property + def uncompressed(self) -> List[int]: + return self._get_property('uncompressed') + + @property + def uncompressed_size(self) -> int: + """Uncompressed file size.""" + return functools.reduce(operator.add, self.uncompressed) + + @property + def compressed(self) -> Optional[int]: + """Compressed size""" + return self._get_property('compressed') + + def _test_attribute(self, target_bit: int) -> bool: + attributes = self._get_property('attributes') + if attributes is None: + return False + return attributes & target_bit == target_bit + + @property + def archivable(self) -> bool: + """File has a Windows `archive` flag.""" + return self._test_attribute(stat.FILE_ATTRIBUTE_ARCHIVE) # type: ignore # noqa + + @property + def is_directory(self) -> bool: + """True if file is a directory, otherwise False.""" + return self._test_attribute(stat.FILE_ATTRIBUTE_DIRECTORY) # type: ignore # noqa + + @property + def readonly(self) -> bool: + """True if file is readonly, otherwise False.""" + return self._test_attribute(stat.FILE_ATTRIBUTE_READONLY) # type: ignore # noqa + + def _get_unix_extension(self) -> Optional[int]: + attributes = self._get_property('attributes') + if self._test_attribute(FILE_ATTRIBUTE_UNIX_EXTENSION): + return attributes >> 16 + return None + + @property + def is_symlink(self) -> bool: + """True if file is a symbolic link, otherwise False.""" + e = self._get_unix_extension() + if e is not None: + return stat.S_ISLNK(e) + return self._test_attribute(stat.FILE_ATTRIBUTE_REPARSE_POINT) # type: ignore # noqa + + @property + def is_junction(self) -> bool: + """True if file is a junction/reparse point on windows, otherwise False.""" + return self._test_attribute(stat.FILE_ATTRIBUTE_REPARSE_POINT | # type: ignore # noqa + stat.FILE_ATTRIBUTE_DIRECTORY) # type: ignore # noqa + + @property + def is_socket(self) -> bool: + """True if file is a socket, otherwise False.""" + e = self._get_unix_extension() + if e is not None: + return stat.S_ISSOCK(e) + return False + + @property + def lastwritetime(self) -> Optional[ArchiveTimestamp]: + """Return last written timestamp of a file.""" + return self._get_property('lastwritetime') + + @property + def posix_mode(self) -> Optional[int]: + """ + posix mode when a member has a unix extension property, or None + :return: Return file stat mode can be set by os.chmod() + """ + e = self._get_unix_extension() + if e is not None: + return stat.S_IMODE(e) + return None + + @property + def st_fmt(self) -> Optional[int]: + """ + :return: Return the portion of the file mode that describes the file type + """ + e = self._get_unix_extension() + if e is not None: + return stat.S_IFMT(e) + return None + + +class ArchiveFileList(collections.abc.Iterable): + """Iteratable container of ArchiveFile.""" + + def __init__(self, offset: int = 0): + self.files_list = [] # type: List[dict] + self.index = 0 + self.offset = offset + + def append(self, file_info: Dict[str, Any]) -> None: + self.files_list.append(file_info) + + def __len__(self) -> int: + return len(self.files_list) + + def __iter__(self) -> 'ArchiveFileListIterator': + return ArchiveFileListIterator(self) + + def __getitem__(self, index): + if index > len(self.files_list): + raise IndexError + if index < 0: + raise IndexError + res = ArchiveFile(index + self.offset, self.files_list[index]) + return res + + +class ArchiveFileListIterator(collections.abc.Iterator): + + def __init__(self, archive_file_list): + self._archive_file_list = archive_file_list + self._index = 0 + + def __next__(self) -> ArchiveFile: + if self._index == len(self._archive_file_list): + raise StopIteration + res = self._archive_file_list[self._index] + self._index += 1 + return res + + +# ------------------ +# Exported Classes +# ------------------ +class ArchiveInfo: + """Hold archive information""" + + def __init__(self, filename, size, header_size, method_names, solid, blocks, uncompressed): + self.filename = filename + self.size = size + self.header_size = header_size + self.method_names = method_names + self.solid = solid + self.blocks = blocks + self.uncompressed = uncompressed + + +class FileInfo: + """Hold archived file information.""" + + def __init__(self, filename, compressed, uncompressed, archivable, is_directory, creationtime): + self.filename = filename + self.compressed = compressed + self.uncompressed = uncompressed + self.archivable = archivable + self.is_directory = is_directory + self.creationtime = creationtime + + +class SevenZipFile(contextlib.AbstractContextManager): + """The SevenZipFile Class provides an interface to 7z archives.""" + + def __init__(self, file: Union[BinaryIO, str, pathlib.Path], mode: str = 'r', + *, filters: Optional[str] = None, dereference=False, password: Optional[str] = None) -> None: + if mode not in ('r', 'w', 'x', 'a'): + raise ValueError("ZipFile requires mode 'r', 'w', 'x', or 'a'") + if password is not None: + if mode not in ('r'): + raise NotImplementedError("It has not been implemented to create archive with password.") + ArchivePassword(password) + self.password_protected = True + else: + self.password_protected = False + # Check if we were passed a file-like object or not + if isinstance(file, str): + self._filePassed = False # type: bool + self.filename = file # type: str + if mode == 'r': + self.fp = open(file, 'rb') # type: BinaryIO + elif mode == 'w': + self.fp = open(file, 'w+b') + elif mode == 'x': + self.fp = open(file, 'x+b') + elif mode == 'a': + self.fp = open(file, 'r+b') + else: + raise ValueError("File open error.") + self.mode = mode + elif isinstance(file, pathlib.Path): + self._filePassed = False + self.filename = str(file) + if mode == 'r': + self.fp = file.open(mode='rb') # type: ignore # noqa # typeshed issue: 2911 + elif mode == 'w': + self.fp = file.open(mode='w+b') # type: ignore # noqa + elif mode == 'x': + self.fp = file.open(mode='x+b') # type: ignore # noqa + elif mode == 'a': + self.fp = file.open(mode='r+b') # type: ignore # noqa + else: + raise ValueError("File open error.") + self.mode = mode + elif isinstance(file, io.IOBase): + self._filePassed = True + self.fp = file + self.filename = getattr(file, 'name', None) + self.mode = mode # type: ignore #noqa + else: + raise TypeError("invalid file: {}".format(type(file))) + self._fileRefCnt = 1 + try: + if mode == "r": + self._real_get_contents(self.fp) + self._reset_worker() + elif mode in 'w': + # FIXME: check filters here + self.folder = self._create_folder(filters) + self.files = ArchiveFileList() + self._prepare_write() + self._reset_worker() + elif mode in 'x': + raise NotImplementedError + elif mode == 'a': + raise NotImplementedError + else: + raise ValueError("Mode must be 'r', 'w', 'x', or 'a'") + except Exception as e: + self._fpclose() + raise e + self.encoded_header_mode = False + self._dict = {} # type: Dict[str, IO[Any]] + self.dereference = dereference + self.reporterd = None # type: Optional[threading.Thread] + self.q = queue.Queue() # type: queue.Queue[Any] + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def _create_folder(self, filters): + folder = Folder() + folder.compressor = SevenZipCompressor(filters) + folder.coders = folder.compressor.coders + folder.solid = True + folder.digestdefined = False + folder.bindpairs = [] + folder.totalin = 1 + folder.totalout = 1 + return folder + + def _fpclose(self) -> None: + assert self._fileRefCnt > 0 + self._fileRefCnt -= 1 + if not self._fileRefCnt and not self._filePassed: + self.fp.close() + + def _real_get_contents(self, fp: BinaryIO) -> None: + if not self._check_7zfile(fp): + raise Bad7zFile('not a 7z file') + self.sig_header = SignatureHeader.retrieve(self.fp) + self.afterheader = self.fp.tell() + buffer = self._read_header_data() + header = Header.retrieve(self.fp, buffer, self.afterheader) + if header is None: + return + self.header = header + buffer.close() + self.files = ArchiveFileList() + if getattr(self.header, 'files_info', None) is not None: + self._filelist_retrieve() + + def _read_header_data(self) -> BytesIO: + self.fp.seek(self.sig_header.nextheaderofs, os.SEEK_CUR) + buffer = io.BytesIO(self.fp.read(self.sig_header.nextheadersize)) + if self.sig_header.nextheadercrc != calculate_crc32(buffer.getvalue()): + raise Bad7zFile('invalid header data') + return buffer + + class ParseStatus: + def __init__(self, src_pos=0): + self.src_pos = src_pos + self.folder = 0 # 7zip folder where target stored + self.outstreams = 0 # output stream count + self.input = 0 # unpack stream count in each folder + self.stream = 0 # target input stream position + + def _gen_filename(self) -> str: + # compressed file is stored without a name, generate one + try: + basefilename = self.filename + except AttributeError: + # 7z archive file doesn't have a name + return 'contents' + else: + if basefilename is not None: + fn, ext = os.path.splitext(os.path.basename(basefilename)) + return fn + else: + return 'contents' + + def _get_fileinfo_sizes(self, pstat, subinfo, packinfo, folder, packsizes, unpacksizes, file_in_solid, numinstreams): + if pstat.input == 0: + folder.solid = subinfo.num_unpackstreams_folders[pstat.folder] > 1 + maxsize = (folder.solid and packinfo.packsizes[pstat.stream]) or None + uncompressed = unpacksizes[pstat.outstreams] + if not isinstance(uncompressed, (list, tuple)): + uncompressed = [uncompressed] * len(folder.coders) + if file_in_solid > 0: + compressed = None + elif pstat.stream < len(packsizes): # file is compressed + compressed = packsizes[pstat.stream] + else: # file is not compressed + compressed = uncompressed + packsize = packsizes[pstat.stream:pstat.stream + numinstreams] + return maxsize, compressed, uncompressed, packsize, folder.solid + + def _filelist_retrieve(self) -> None: + # Initialize references for convenience + if hasattr(self.header, 'main_streams') and self.header.main_streams is not None: + folders = self.header.main_streams.unpackinfo.folders + packinfo = self.header.main_streams.packinfo + subinfo = self.header.main_streams.substreamsinfo + packsizes = packinfo.packsizes + unpacksizes = subinfo.unpacksizes if subinfo.unpacksizes is not None else [x.unpacksizes for x in folders] + else: + subinfo = None + folders = None + packinfo = None + packsizes = [] + unpacksizes = [0] + + pstat = self.ParseStatus() + pstat.src_pos = self.afterheader + file_in_solid = 0 + + for file_id, file_info in enumerate(self.header.files_info.files): + if not file_info['emptystream'] and folders is not None: + folder = folders[pstat.folder] + numinstreams = max([coder.get('numinstreams', 1) for coder in folder.coders]) + (maxsize, compressed, uncompressed, + packsize, solid) = self._get_fileinfo_sizes(pstat, subinfo, packinfo, folder, packsizes, + unpacksizes, file_in_solid, numinstreams) + pstat.input += 1 + folder.solid = solid + file_info['folder'] = folder + file_info['maxsize'] = maxsize + file_info['compressed'] = compressed + file_info['uncompressed'] = uncompressed + file_info['packsizes'] = packsize + if subinfo.digestsdefined[pstat.outstreams]: + file_info['digest'] = subinfo.digests[pstat.outstreams] + if folder is None: + pstat.src_pos += file_info['compressed'] + else: + if folder.solid: + file_in_solid += 1 + pstat.outstreams += 1 + if folder.files is None: + folder.files = ArchiveFileList(offset=file_id) + folder.files.append(file_info) + if pstat.input >= subinfo.num_unpackstreams_folders[pstat.folder]: + file_in_solid = 0 + pstat.src_pos += sum(packinfo.packsizes[pstat.stream:pstat.stream + numinstreams]) + pstat.folder += 1 + pstat.stream += numinstreams + pstat.input = 0 + else: + file_info['folder'] = None + file_info['maxsize'] = 0 + file_info['compressed'] = 0 + file_info['uncompressed'] = [0] + file_info['packsizes'] = [0] + + if 'filename' not in file_info: + file_info['filename'] = self._gen_filename() + self.files.append(file_info) + + def _num_files(self) -> int: + if getattr(self.header, 'files_info', None) is not None: + return len(self.header.files_info.files) + return 0 + + def _set_file_property(self, outfilename: pathlib.Path, properties: Dict[str, Any]) -> None: + # creation time + creationtime = ArchiveTimestamp(properties['lastwritetime']).totimestamp() + if creationtime is not None: + os.utime(str(outfilename), times=(creationtime, creationtime)) + if os.name == 'posix': + st_mode = properties['posix_mode'] + if st_mode is not None: + outfilename.chmod(st_mode) + return + # fallback: only set readonly if specified + if properties['readonly'] and not properties['is_directory']: + ro_mask = 0o777 ^ (stat.S_IWRITE | stat.S_IWGRP | stat.S_IWOTH) + outfilename.chmod(outfilename.stat().st_mode & ro_mask) + + def _reset_decompressor(self) -> None: + if self.header.main_streams is not None and self.header.main_streams.unpackinfo.numfolders > 0: + for i, folder in enumerate(self.header.main_streams.unpackinfo.folders): + folder.decompressor = None + + def _reset_worker(self) -> None: + """Seek to where archive data start in archive and recreate new worker.""" + self.fp.seek(self.afterheader) + self.worker = Worker(self.files, self.afterheader, self.header) + + def set_encoded_header_mode(self, mode: bool) -> None: + self.encoded_header_mode = mode + + @staticmethod + def _check_7zfile(fp: Union[BinaryIO, io.BufferedReader]) -> bool: + result = MAGIC_7Z == fp.read(len(MAGIC_7Z))[:len(MAGIC_7Z)] + fp.seek(-len(MAGIC_7Z), 1) + return result + + def _get_method_names(self) -> str: + methods_names = [] # type: List[str] + for folder in self.header.main_streams.unpackinfo.folders: + methods_names += get_methods_names(folder.coders) + return ', '.join(x for x in methods_names) + + def _test_digest_raw(self, pos: int, size: int, crc: int) -> bool: + self.fp.seek(pos) + remaining_size = size + digest = None + while remaining_size > 0: + block = min(READ_BLOCKSIZE, remaining_size) + digest = calculate_crc32(self.fp.read(block), digest) + remaining_size -= block + return digest == crc + + def _test_pack_digest(self) -> bool: + self._reset_worker() + crcs = self.header.main_streams.packinfo.crcs + if crcs is not None and len(crcs) > 0: + # check packed stream's crc + for i, p in enumerate(self.header.main_streams.packinfo.packpositions): + if not self._test_digest_raw(p, self.header.main_streams.packinfo.packsizes[i], crcs[i]): + return False + return True + + def _test_unpack_digest(self) -> bool: + self._reset_worker() + for f in self.files: + self.worker.register_filelike(f.id, None) + try: + self.worker.extract(self.fp, parallel=(not self.password_protected)) # TODO: print progress + except Bad7zFile: + return False + else: + return True + + def _test_digests(self) -> bool: + if self._test_pack_digest(): + if self._test_unpack_digest(): + return True + return False + + def _prepare_write(self) -> None: + self.sig_header = SignatureHeader() + self.sig_header._write_skelton(self.fp) + self.afterheader = self.fp.tell() + self.folder.totalin = 1 + self.folder.totalout = 1 + self.folder.bindpairs = [] + self.folder.unpacksizes = [] + self.header = Header.build_header([self.folder]) + + def _write_archive(self): + self.worker.archive(self.fp, self.folder, deref=self.dereference) + # Write header and update signature header + (header_pos, header_len, header_crc) = self.header.write(self.fp, self.afterheader, + encoded=self.encoded_header_mode) + self.sig_header.nextheaderofs = header_pos - self.afterheader + self.sig_header.calccrc(header_len, header_crc) + self.sig_header.write(self.fp) + return + + def _is_solid(self): + for f in self.header.main_streams.substreamsinfo.num_unpackstreams_folders: + if f > 1: + return True + return False + + def _var_release(self): + self._dict = None + self.files = None + self.folder = None + self.header = None + self.worker = None + self.sig_header = None + + @staticmethod + def _make_file_info(target: pathlib.Path, arcname: Optional[str] = None, dereference=False) -> Dict[str, Any]: + f = {} # type: Dict[str, Any] + f['origin'] = target + if arcname is not None: + f['filename'] = pathlib.Path(arcname).as_posix() + else: + f['filename'] = target.as_posix() + if os.name == 'nt': + fstat = target.lstat() + if target.is_symlink(): + if dereference: + fstat = target.stat() + if stat.S_ISDIR(fstat.st_mode): + f['emptystream'] = True + f['attributes'] = fstat.st_file_attributes & FILE_ATTRIBUTE_WINDOWS_MASK # type: ignore # noqa + else: + f['emptystream'] = False + f['attributes'] = stat.FILE_ATTRIBUTE_ARCHIVE # type: ignore # noqa + f['uncompressed'] = fstat.st_size + else: + f['emptystream'] = False + f['attributes'] = fstat.st_file_attributes & FILE_ATTRIBUTE_WINDOWS_MASK # type: ignore # noqa + # f['attributes'] |= stat.FILE_ATTRIBUTE_REPARSE_POINT # type: ignore # noqa + elif target.is_dir(): + f['emptystream'] = True + f['attributes'] = fstat.st_file_attributes & FILE_ATTRIBUTE_WINDOWS_MASK # type: ignore # noqa + elif target.is_file(): + f['emptystream'] = False + f['attributes'] = stat.FILE_ATTRIBUTE_ARCHIVE # type: ignore # noqa + f['uncompressed'] = fstat.st_size + else: + fstat = target.lstat() + if target.is_symlink(): + if dereference: + fstat = target.stat() + if stat.S_ISDIR(fstat.st_mode): + f['emptystream'] = True + f['attributes'] = stat.FILE_ATTRIBUTE_DIRECTORY # type: ignore # noqa + f['attributes'] |= FILE_ATTRIBUTE_UNIX_EXTENSION | (stat.S_IFDIR << 16) + f['attributes'] |= (stat.S_IMODE(fstat.st_mode) << 16) + else: + f['emptystream'] = False + f['attributes'] = stat.FILE_ATTRIBUTE_ARCHIVE # type: ignore # noqa + f['attributes'] |= FILE_ATTRIBUTE_UNIX_EXTENSION | (stat.S_IMODE(fstat.st_mode) << 16) + else: + f['emptystream'] = False + f['attributes'] = stat.FILE_ATTRIBUTE_ARCHIVE | stat.FILE_ATTRIBUTE_REPARSE_POINT # type: ignore # noqa + f['attributes'] |= FILE_ATTRIBUTE_UNIX_EXTENSION | (stat.S_IFLNK << 16) + f['attributes'] |= (stat.S_IMODE(fstat.st_mode) << 16) + elif target.is_dir(): + f['emptystream'] = True + f['attributes'] = stat.FILE_ATTRIBUTE_DIRECTORY # type: ignore # noqa + f['attributes'] |= FILE_ATTRIBUTE_UNIX_EXTENSION | (stat.S_IFDIR << 16) + f['attributes'] |= (stat.S_IMODE(fstat.st_mode) << 16) + elif target.is_file(): + f['emptystream'] = False + f['uncompressed'] = fstat.st_size + f['attributes'] = stat.FILE_ATTRIBUTE_ARCHIVE # type: ignore # noqa + f['attributes'] |= FILE_ATTRIBUTE_UNIX_EXTENSION | (stat.S_IMODE(fstat.st_mode) << 16) + + f['creationtime'] = fstat.st_ctime + f['lastwritetime'] = fstat.st_mtime + f['lastaccesstime'] = fstat.st_atime + return f + + # -------------------------------------------------------------------------- + # The public methods which SevenZipFile provides: + def getnames(self) -> List[str]: + """Return the members of the archive as a list of their names. It has + the same order as the list returned by getmembers(). + """ + return list(map(lambda x: x.filename, self.files)) + + def archiveinfo(self) -> ArchiveInfo: + fstat = os.stat(self.filename) + uncompressed = 0 + for f in self.files: + uncompressed += f.uncompressed_size + return ArchiveInfo(self.filename, fstat.st_size, self.header.size, self._get_method_names(), + self._is_solid(), len(self.header.main_streams.unpackinfo.folders), + uncompressed) + + def list(self) -> List[FileInfo]: + """Returns contents information """ + alist = [] # type: List[FileInfo] + creationtime = None # type: Optional[datetime.datetime] + for f in self.files: + if f.lastwritetime is not None: + creationtime = filetime_to_dt(f.lastwritetime) + alist.append(FileInfo(f.filename, f.compressed, f.uncompressed_size, f.archivable, f.is_directory, + creationtime)) + return alist + + def test(self) -> bool: + """Test archive using CRC digests.""" + return self._test_digests() + + def readall(self) -> Optional[Dict[str, IO[Any]]]: + return self._extract(path=None, return_dict=True) + + def extractall(self, path: Optional[Any] = None, callback: Optional[ExtractCallback] = None) -> None: + """Extract all members from the archive to the current working + directory and set owner, modification time and permissions on + directories afterwards. `path' specifies a different directory + to extract to. + """ + self._extract(path=path, return_dict=False, callback=callback) + + def read(self, targets: Optional[List[str]] = None) -> Optional[Dict[str, IO[Any]]]: + return self._extract(path=None, targets=targets, return_dict=True) + + def extract(self, path: Optional[Any] = None, targets: Optional[List[str]] = None) -> None: + self._extract(path, targets, return_dict=False) + + def _extract(self, path: Optional[Any] = None, targets: Optional[List[str]] = None, + return_dict: bool = False, callback: Optional[ExtractCallback] = None) -> Optional[Dict[str, IO[Any]]]: + if callback is not None and not isinstance(callback, ExtractCallback): + raise ValueError('Callback specified is not a subclass of py7zr.callbacks.ExtractCallback class') + elif callback is not None: + self.reporterd = threading.Thread(target=self.reporter, args=(callback,), daemon=True) + self.reporterd.start() + target_junction = [] # type: List[pathlib.Path] + target_sym = [] # type: List[pathlib.Path] + target_files = [] # type: List[Tuple[pathlib.Path, Dict[str, Any]]] + target_dirs = [] # type: List[pathlib.Path] + if path is not None: + if isinstance(path, str): + path = pathlib.Path(path) + try: + if not path.exists(): + path.mkdir(parents=True) + else: + pass + except OSError as e: + if e.errno == errno.EEXIST and path.is_dir(): + pass + else: + raise e + fnames = [] # type: List[str] # check duplicated filename in one archive? + self.q.put(('pre', None, None)) + for f in self.files: + # TODO: sanity check + # check whether f.filename with invalid characters: '../' + if f.filename.startswith('../'): + raise Bad7zFile + # When archive has a multiple files which have same name + # To guarantee order of archive, multi-thread decompression becomes off. + # Currently always overwrite by latter archives. + # TODO: provide option to select overwrite or skip. + if f.filename not in fnames: + outname = f.filename + else: + i = 0 + while True: + outname = f.filename + '_%d' % i + if outname not in fnames: + break + fnames.append(outname) + if path is not None: + outfilename = path.joinpath(outname) + else: + outfilename = pathlib.Path(outname) + if os.name == 'nt': + if outfilename.is_absolute(): + # hack for microsoft windows path length limit < 255 + outfilename = pathlib.WindowsPath('\\\\?\\' + str(outfilename)) + if targets is not None and f.filename not in targets: + self.worker.register_filelike(f.id, None) + continue + if f.is_directory: + if not outfilename.exists(): + target_dirs.append(outfilename) + target_files.append((outfilename, f.file_properties())) + else: + pass + elif f.is_socket: + pass + elif return_dict: + fname = outfilename.as_posix() + _buf = io.BytesIO() + self._dict[fname] = _buf + self.worker.register_filelike(f.id, MemIO(_buf)) + elif f.is_symlink: + target_sym.append(outfilename) + try: + if outfilename.exists(): + outfilename.unlink() + except OSError as ose: + if ose.errno not in [errno.ENOENT]: + raise + self.worker.register_filelike(f.id, outfilename) + elif f.is_junction: + target_junction.append(outfilename) + self.worker.register_filelike(f.id, outfilename) + else: + self.worker.register_filelike(f.id, outfilename) + target_files.append((outfilename, f.file_properties())) + for target_dir in sorted(target_dirs): + try: + target_dir.mkdir() + except FileExistsError: + if target_dir.is_dir(): + # skip rare case + pass + elif target_dir.is_file(): + raise Exception("Directory name is existed as a normal file.") + else: + raise Exception("Directory making fails on unknown condition.") + + if callback is not None: + self.worker.extract(self.fp, parallel=(not self.password_protected and not self._filePassed), q=self.q) + else: + self.worker.extract(self.fp, parallel=(not self.password_protected and not self._filePassed)) + + self.q.put(('post', None, None)) + if return_dict: + return self._dict + else: + # create symbolic links on target path as a working directory. + # if path is None, work on current working directory. + for t in target_sym: + sym_dst = t.resolve() + with sym_dst.open('rb') as b: + sym_src = b.read().decode(encoding='utf-8') # symlink target name stored in utf-8 + sym_dst.unlink() # unlink after close(). + sym_dst.symlink_to(pathlib.Path(sym_src)) + # create junction point only on windows platform + if sys.platform.startswith('win'): + for t in target_junction: + junction_dst = t.resolve() + with junction_dst.open('rb') as b: + junction_target = pathlib.Path(b.read().decode(encoding='utf-8')) + junction_dst.unlink() + _winapi.CreateJunction(junction_target, str(junction_dst)) # type: ignore # noqa + # set file properties + for o, p in target_files: + self._set_file_property(o, p) + return None + + def reporter(self, callback: ExtractCallback): + while True: + try: + item: Optional[Tuple[str, str, str]] = self.q.get(timeout=1) + except queue.Empty: + pass + else: + if item is None: + break + elif item[0] == 's': + callback.report_start(item[1], item[2]) + elif item[0] == 'e': + callback.report_end(item[1], item[2]) + elif item[0] == 'pre': + callback.report_start_preparation() + elif item[0] == 'post': + callback.report_postprocess() + elif item[0] == 'w': + callback.report_warning(item[1]) + else: + pass + self.q.task_done() + + def writeall(self, path: Union[pathlib.Path, str], arcname: Optional[str] = None): + """Write files in target path into archive.""" + if isinstance(path, str): + path = pathlib.Path(path) + if not path.exists(): + raise ValueError("specified path does not exist.") + if path.is_dir() or path.is_file(): + self._writeall(path, arcname) + else: + raise ValueError("specified path is not a directory or a file") + + def _writeall(self, path, arcname): + try: + if path.is_symlink() and not self.dereference: + self.write(path, arcname) + elif path.is_file(): + self.write(path, arcname) + elif path.is_dir(): + if not path.samefile('.'): + self.write(path, arcname) + for nm in sorted(os.listdir(str(path))): + arc = os.path.join(arcname, nm) if arcname is not None else None + self._writeall(path.joinpath(nm), arc) + else: + return # pathlib ignores ELOOP and return False for is_*(). + except OSError as ose: + if self.dereference and ose.errno in [errno.ELOOP]: + return # ignore ELOOP here, this resulted to stop looped symlink reference. + elif self.dereference and sys.platform == 'win32' and ose.errno in [errno.ENOENT]: + return # ignore ENOENT which is happened when a case of ELOOP on windows. + else: + raise + + def write(self, file: Union[pathlib.Path, str], arcname: Optional[str] = None): + """Write single target file into archive(Not implemented yet).""" + if isinstance(file, str): + path = pathlib.Path(file) + elif isinstance(file, pathlib.Path): + path = file + else: + raise ValueError("Unsupported file type.") + file_info = self._make_file_info(path, arcname, self.dereference) + self.files.append(file_info) + + def close(self): + """Flush all the data into archive and close it. + When close py7zr start reading target and writing actual archive file. + """ + if 'w' in self.mode: + self._write_archive() + if 'r' in self.mode: + if self.reporterd is not None: + self.q.put_nowait(None) + self.reporterd.join(1) + if self.reporterd.is_alive(): + raise InternalError("Progress report thread terminate error.") + self.reporterd = None + self._fpclose() + self._var_release() + + def reset(self) -> None: + """When read mode, it reset file pointer, decompress worker and decompressor""" + if self.mode == 'r': + self._reset_worker() + self._reset_decompressor() + + +# -------------------- +# exported functions +# -------------------- +def is_7zfile(file: Union[BinaryIO, str, pathlib.Path]) -> bool: + """Quickly see if a file is a 7Z file by checking the magic number. + The file argument may be a filename or file-like object too. + """ + result = False + try: + if isinstance(file, io.IOBase) and hasattr(file, "read"): + result = SevenZipFile._check_7zfile(file) # type: ignore # noqa + elif isinstance(file, str): + with open(file, 'rb') as fp: + result = SevenZipFile._check_7zfile(fp) + elif isinstance(file, pathlib.Path) or isinstance(file, pathlib.PosixPath) or \ + isinstance(file, pathlib.WindowsPath): + with file.open(mode='rb') as fp: # type: ignore # noqa + result = SevenZipFile._check_7zfile(fp) + else: + raise TypeError('invalid type: file should be str, pathlib.Path or BinaryIO, but {}'.format(type(file))) + except OSError: + pass + return result + + +def unpack_7zarchive(archive, path, extra=None): + """Function for registering with shutil.register_unpack_format()""" + arc = SevenZipFile(archive) + arc.extractall(path) + arc.close() + + +def pack_7zarchive(base_name, base_dir, owner=None, group=None, dry_run=None, logger=None): + """Function for registering with shutil.register_archive_format()""" + target_name = '{}.7z'.format(base_name) + archive = SevenZipFile(target_name, mode='w') + archive.writeall(path=base_dir) + archive.close() diff --git a/libs/py7zr/win32compat.py b/libs/py7zr/win32compat.py new file mode 100644 index 000000000..dc72bfdf3 --- /dev/null +++ b/libs/py7zr/win32compat.py @@ -0,0 +1,174 @@ +import pathlib +import stat +import sys +from logging import getLogger +from typing import Union + +if sys.platform == "win32": + import ctypes + from ctypes.wintypes import BOOL, DWORD, HANDLE, LPCWSTR, LPDWORD, LPVOID, LPWSTR + + _stdcall_libraries = {} + _stdcall_libraries['kernel32'] = ctypes.WinDLL('kernel32') + CloseHandle = _stdcall_libraries['kernel32'].CloseHandle + CreateFileW = _stdcall_libraries['kernel32'].CreateFileW + DeviceIoControl = _stdcall_libraries['kernel32'].DeviceIoControl + GetFileAttributesW = _stdcall_libraries['kernel32'].GetFileAttributesW + OPEN_EXISTING = 3 + GENERIC_READ = 2147483648 + FILE_FLAG_OPEN_REPARSE_POINT = 0x00200000 + FSCTL_GET_REPARSE_POINT = 0x000900A8 + FILE_FLAG_BACKUP_SEMANTICS = 0x02000000 + IO_REPARSE_TAG_MOUNT_POINT = 0xA0000003 + IO_REPARSE_TAG_SYMLINK = 0xA000000C + MAXIMUM_REPARSE_DATA_BUFFER_SIZE = 16 * 1024 + + def _check_bit(val: int, flag: int) -> bool: + return bool(val & flag == flag) + + class SymbolicLinkReparseBuffer(ctypes.Structure): + """ Implementing the below in Python: + + typedef struct _REPARSE_DATA_BUFFER { + ULONG ReparseTag; + USHORT ReparseDataLength; + USHORT Reserved; + union { + struct { + USHORT SubstituteNameOffset; + USHORT SubstituteNameLength; + USHORT PrintNameOffset; + USHORT PrintNameLength; + ULONG Flags; + WCHAR PathBuffer[1]; + } SymbolicLinkReparseBuffer; + struct { + USHORT SubstituteNameOffset; + USHORT SubstituteNameLength; + USHORT PrintNameOffset; + USHORT PrintNameLength; + WCHAR PathBuffer[1]; + } MountPointReparseBuffer; + struct { + UCHAR DataBuffer[1]; + } GenericReparseBuffer; + } DUMMYUNIONNAME; + } REPARSE_DATA_BUFFER, *PREPARSE_DATA_BUFFER; + """ + # See https://docs.microsoft.com/en-us/windows-hardware/drivers/ddi/content/ntifs/ns-ntifs-_reparse_data_buffer + _fields_ = [ + ('flags', ctypes.c_ulong), + ('path_buffer', ctypes.c_byte * (MAXIMUM_REPARSE_DATA_BUFFER_SIZE - 20)) + ] + + class MountReparseBuffer(ctypes.Structure): + _fields_ = [ + ('path_buffer', ctypes.c_byte * (MAXIMUM_REPARSE_DATA_BUFFER_SIZE - 16)), + ] + + class ReparseBufferField(ctypes.Union): + _fields_ = [ + ('symlink', SymbolicLinkReparseBuffer), + ('mount', MountReparseBuffer) + ] + + class ReparseBuffer(ctypes.Structure): + _anonymous_ = ("u",) + _fields_ = [ + ('reparse_tag', ctypes.c_ulong), + ('reparse_data_length', ctypes.c_ushort), + ('reserved', ctypes.c_ushort), + ('substitute_name_offset', ctypes.c_ushort), + ('substitute_name_length', ctypes.c_ushort), + ('print_name_offset', ctypes.c_ushort), + ('print_name_length', ctypes.c_ushort), + ('u', ReparseBufferField) + ] + + def is_reparse_point(path: Union[str, pathlib.Path]) -> bool: + GetFileAttributesW.argtypes = [LPCWSTR] + GetFileAttributesW.restype = DWORD + return _check_bit(GetFileAttributesW(str(path)), stat.FILE_ATTRIBUTE_REPARSE_POINT) + + def readlink(path: Union[str, pathlib.Path]) -> Union[str, pathlib.WindowsPath]: + # FILE_FLAG_OPEN_REPARSE_POINT alone is not enough if 'path' + # is a symbolic link to a directory or a NTFS junction. + # We need to set FILE_FLAG_BACKUP_SEMANTICS as well. + # See https://docs.microsoft.com/en-us/windows/desktop/api/fileapi/nf-fileapi-createfilea + + # description from _winapi.c:601 + # /* REPARSE_DATA_BUFFER usage is heavily under-documented, especially for + # junction points. Here's what I've learned along the way: + # - A junction point has two components: a print name and a substitute + # name. They both describe the link target, but the substitute name is + # the physical target and the print name is shown in directory listings. + # - The print name must be a native name, prefixed with "\??\". + # - Both names are stored after each other in the same buffer (the + # PathBuffer) and both must be NUL-terminated. + # - There are four members defining their respective offset and length + # inside PathBuffer: SubstituteNameOffset, SubstituteNameLength, + # PrintNameOffset and PrintNameLength. + # - The total size we need to allocate for the REPARSE_DATA_BUFFER, thus, + # is the sum of: + # - the fixed header size (REPARSE_DATA_BUFFER_HEADER_SIZE) + # - the size of the MountPointReparseBuffer member without the PathBuffer + # - the size of the prefix ("\??\") in bytes + # - the size of the print name in bytes + # - the size of the substitute name in bytes + # - the size of two NUL terminators in bytes */ + + target_is_path = isinstance(path, pathlib.Path) + if target_is_path: + target = str(path) + else: + target = path + CreateFileW.argtypes = [LPWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE] + CreateFileW.restype = HANDLE + DeviceIoControl.argtypes = [HANDLE, DWORD, LPVOID, DWORD, LPVOID, DWORD, LPDWORD, LPVOID] + DeviceIoControl.restype = BOOL + handle = HANDLE(CreateFileW(target, GENERIC_READ, 0, None, OPEN_EXISTING, + FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OPEN_REPARSE_POINT, 0)) + buf = ReparseBuffer() + ret = DWORD(0) + status = DeviceIoControl(handle, FSCTL_GET_REPARSE_POINT, None, 0, ctypes.byref(buf), + MAXIMUM_REPARSE_DATA_BUFFER_SIZE, ctypes.byref(ret), None) + CloseHandle(handle) + if not status: + logger = getLogger(__file__) + logger.error("Failed IOCTL access to REPARSE_POINT {})".format(target)) + raise ValueError("not a symbolic link or access permission violation") + + if buf.reparse_tag == IO_REPARSE_TAG_SYMLINK: + offset = buf.substitute_name_offset + ending = offset + buf.substitute_name_length + rpath = bytearray(buf.symlink.path_buffer)[offset:ending].decode('UTF-16-LE') + elif buf.reparse_tag == IO_REPARSE_TAG_MOUNT_POINT: + offset = buf.substitute_name_offset + ending = offset + buf.substitute_name_length + rpath = bytearray(buf.mount.path_buffer)[offset:ending].decode('UTF-16-LE') + else: + raise ValueError("not a symbolic link") + # on posixmodule.c:7859 in py38, we do that + # ``` + # else if (rdb->ReparseTag == IO_REPARSE_TAG_MOUNT_POINT) + # { + # name = (wchar_t *)((char*)rdb->MountPointReparseBuffer.PathBuffer + + # rdb->MountPointReparseBuffer.SubstituteNameOffset); + # nameLen = rdb->MountPointReparseBuffer.SubstituteNameLength / sizeof(wchar_t); + # } + # else + # { + # PyErr_SetString(PyExc_ValueError, "not a symbolic link"); + # } + # if (nameLen > 4 && wcsncmp(name, L"\\??\\", 4) == 0) { + # /* Our buffer is mutable, so this is okay */ + # name[1] = L'\\'; + # } + # ``` + # so substitute prefix here. + if rpath.startswith('\\??\\'): + rpath = '\\\\' + rpath[2:] + if target_is_path: + return pathlib.WindowsPath(rpath) + else: + return rpath