add py7zr library version 0.7.0

pull/1247/head
josdion 4 years ago
parent 99a98a564a
commit 8a981553bf

@ -0,0 +1,29 @@
#!/usr/bin/env python
#
# Pure python p7zr implementation
# Copyright (C) 2019 Hiroshi Miura
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
from py7zr.exceptions import Bad7zFile, DecompressionError, UnsupportedCompressionMethodError
from py7zr.py7zr import ArchiveInfo, FileInfo, SevenZipFile, is_7zfile, pack_7zarchive, unpack_7zarchive
__copyright__ = 'Copyright (C) 2019 Hiroshi Miura'
__version__ = "0.7.0"
__all__ = ['__version__', 'ArchiveInfo', 'FileInfo', 'SevenZipFile', 'is_7zfile',
'UnsupportedCompressionMethodError', 'Bad7zFile', 'DecompressionError',
'pack_7zarchive', 'unpack_7zarchive']

File diff suppressed because it is too large Load Diff

@ -0,0 +1,61 @@
#!/usr/bin/python -u
#
# p7zr library
#
# Copyright (c) 2020 Hiroshi Miura <miurahr@linux.com>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
from abc import ABC, abstractmethod
class Callback(ABC):
"""Abstrat base class for progress callbacks."""
@abstractmethod
def report_start_preparation(self):
"""report a start of preparation event such as making list of files and looking into its properties."""
pass
@abstractmethod
def report_start(self, processing_file_path, processing_bytes):
"""report a start event of specified archive file and its input bytes."""
pass
@abstractmethod
def report_end(self, processing_file_path, wrote_bytes):
"""report an end event of specified archive file and its output bytes."""
pass
@abstractmethod
def report_warning(self, message):
"""report an warning event with its message"""
pass
@abstractmethod
def report_postprocess(self):
"""report a start of post processing event such as set file properties and permissions or creating symlinks."""
pass
class ExtractCallback(Callback):
"""Abstrat base class for extraction progress callbacks."""
pass
class ArchiveCallback(Callback):
"""Abstrat base class for progress callbacks."""
pass

@ -0,0 +1,384 @@
#!/usr/bin/python -u
#
# p7zr library
#
# Copyright (c) 2019 Hiroshi Miura <miurahr@linux.com>
# Copyright (c) 2004-2015 by Joachim Bauch, mail@joachim-bauch.de
# 7-Zip Copyright (C) 1999-2010 Igor Pavlov
# LZMA SDK Copyright (C) 1999-2010 Igor Pavlov
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
import bz2
import io
import lzma
import os
import queue
import sys
import threading
from typing import IO, Any, BinaryIO, Dict, List, Optional, Union
from py7zr import UnsupportedCompressionMethodError
from py7zr.extra import CopyDecompressor, DeflateDecompressor, ISevenZipDecompressor, ZstdDecompressor
from py7zr.helpers import MemIO, NullIO, calculate_crc32, readlink
from py7zr.properties import READ_BLOCKSIZE, ArchivePassword, CompressionMethod
if sys.version_info < (3, 6):
import pathlib2 as pathlib
else:
import pathlib
try:
import zstandard as Zstd # type: ignore
except ImportError:
Zstd = None
class Worker:
"""Extract worker class to invoke handler"""
def __init__(self, files, src_start: int, header) -> None:
self.target_filepath = {} # type: Dict[int, Union[MemIO, pathlib.Path, None]]
self.files = files
self.src_start = src_start
self.header = header
def extract(self, fp: BinaryIO, parallel: bool, q=None) -> None:
"""Extract worker method to handle 7zip folder and decompress each files."""
if hasattr(self.header, 'main_streams') and self.header.main_streams is not None:
src_end = self.src_start + self.header.main_streams.packinfo.packpositions[-1]
numfolders = self.header.main_streams.unpackinfo.numfolders
if numfolders == 1:
self.extract_single(fp, self.files, self.src_start, src_end, q)
else:
folders = self.header.main_streams.unpackinfo.folders
positions = self.header.main_streams.packinfo.packpositions
empty_files = [f for f in self.files if f.emptystream]
if not parallel:
self.extract_single(fp, empty_files, 0, 0, q)
for i in range(numfolders):
self.extract_single(fp, folders[i].files, self.src_start + positions[i],
self.src_start + positions[i + 1], q)
else:
filename = getattr(fp, 'name', None)
self.extract_single(open(filename, 'rb'), empty_files, 0, 0, q)
extract_threads = []
for i in range(numfolders):
p = threading.Thread(target=self.extract_single,
args=(filename, folders[i].files,
self.src_start + positions[i], self.src_start + positions[i + 1], q))
p.start()
extract_threads.append((p))
for p in extract_threads:
p.join()
else:
empty_files = [f for f in self.files if f.emptystream]
self.extract_single(fp, empty_files, 0, 0, q)
def extract_single(self, fp: Union[BinaryIO, str], files, src_start: int, src_end: int,
q: Optional[queue.Queue]) -> None:
"""Single thread extractor that takes file lists in single 7zip folder."""
if files is None:
return
if isinstance(fp, str):
fp = open(fp, 'rb')
fp.seek(src_start)
for f in files:
if q is not None:
q.put(('s', str(f.filename), str(f.compressed) if f.compressed is not None else '0'))
fileish = self.target_filepath.get(f.id, None)
if fileish is not None:
fileish.parent.mkdir(parents=True, exist_ok=True)
with fileish.open(mode='wb') as ofp:
if not f.emptystream:
# extract to file
self.decompress(fp, f.folder, ofp, f.uncompressed[-1], f.compressed, src_end)
ofp.seek(0)
else:
pass # just create empty file
elif not f.emptystream:
# read and bin off a data but check crc
with NullIO() as ofp:
self.decompress(fp, f.folder, ofp, f.uncompressed[-1], f.compressed, src_end)
if q is not None:
q.put(('e', str(f.filename), str(f.uncompressed[-1])))
def decompress(self, fp: BinaryIO, folder, fq: IO[Any],
size: int, compressed_size: Optional[int], src_end: int) -> None:
"""decompressor wrapper called from extract method.
:parameter fp: archive source file pointer
:parameter folder: Folder object that have decompressor object.
:parameter fq: output file pathlib.Path
:parameter size: uncompressed size of target file.
:parameter compressed_size: compressed size of target file.
:parameter src_end: end position of the folder
:returns None
"""
assert folder is not None
out_remaining = size
decompressor = folder.get_decompressor(compressed_size)
while out_remaining > 0:
max_length = min(out_remaining, io.DEFAULT_BUFFER_SIZE)
rest_size = src_end - fp.tell()
read_size = min(READ_BLOCKSIZE, rest_size)
if read_size == 0:
tmp = decompressor.decompress(b'', max_length)
if len(tmp) == 0:
raise Exception("decompression get wrong: no output data.")
else:
inp = fp.read(read_size)
tmp = decompressor.decompress(inp, max_length)
if len(tmp) > 0 and out_remaining >= len(tmp):
out_remaining -= len(tmp)
fq.write(tmp)
if out_remaining <= 0:
break
if fp.tell() >= src_end:
if decompressor.crc is not None and not decompressor.check_crc():
print('\nCRC error! expected: {}, real: {}'.format(decompressor.crc, decompressor.digest))
return
def _find_link_target(self, target):
"""Find the target member of a symlink or hardlink member in the archive.
"""
targetname = target.as_posix() # type: str
linkname = readlink(targetname)
# Check windows full path symlinks
if linkname.startswith("\\\\?\\"):
linkname = linkname[4:]
# normalize as posix style
linkname = pathlib.Path(linkname).as_posix() # type: str
member = None
for j in range(len(self.files)):
if linkname == self.files[j].origin.as_posix():
# FIXME: when API user specify arcname, it will break
member = os.path.relpath(linkname, os.path.dirname(targetname))
break
if member is None:
member = linkname
return member
def archive(self, fp: BinaryIO, folder, deref=False):
"""Run archive task for specified 7zip folder."""
compressor = folder.get_compressor()
outsize = 0
self.header.main_streams.packinfo.numstreams = 1
num_unpack_streams = 0
self.header.main_streams.substreamsinfo.digests = []
self.header.main_streams.substreamsinfo.digestsdefined = []
last_file_index = 0
foutsize = 0
for i, f in enumerate(self.files):
file_info = f.file_properties()
self.header.files_info.files.append(file_info)
self.header.files_info.emptyfiles.append(f.emptystream)
foutsize = 0
if f.is_symlink and not deref:
last_file_index = i
num_unpack_streams += 1
link_target = self._find_link_target(f.origin) # type: str
tgt = link_target.encode('utf-8') # type: bytes
insize = len(tgt)
crc = calculate_crc32(tgt, 0) # type: int
out = compressor.compress(tgt)
outsize += len(out)
foutsize += len(out)
fp.write(out)
self.header.main_streams.substreamsinfo.digests.append(crc)
self.header.main_streams.substreamsinfo.digestsdefined.append(True)
self.header.main_streams.substreamsinfo.unpacksizes.append(insize)
self.header.files_info.files[i]['maxsize'] = foutsize
elif not f.emptystream:
last_file_index = i
num_unpack_streams += 1
insize = 0
with f.origin.open(mode='rb') as fd:
data = fd.read(READ_BLOCKSIZE)
insize += len(data)
crc = 0
while data:
crc = calculate_crc32(data, crc)
out = compressor.compress(data)
outsize += len(out)
foutsize += len(out)
fp.write(out)
data = fd.read(READ_BLOCKSIZE)
insize += len(data)
self.header.main_streams.substreamsinfo.digests.append(crc)
self.header.main_streams.substreamsinfo.digestsdefined.append(True)
self.header.files_info.files[i]['maxsize'] = foutsize
self.header.main_streams.substreamsinfo.unpacksizes.append(insize)
else:
out = compressor.flush()
outsize += len(out)
foutsize += len(out)
fp.write(out)
if len(self.files) > 0:
self.header.files_info.files[last_file_index]['maxsize'] = foutsize
# Update size data in header
self.header.main_streams.packinfo.packsizes = [outsize]
folder.unpacksizes = [sum(self.header.main_streams.substreamsinfo.unpacksizes)]
self.header.main_streams.substreamsinfo.num_unpackstreams_folders = [num_unpack_streams]
def register_filelike(self, id: int, fileish: Union[MemIO, pathlib.Path, None]) -> None:
"""register file-ish to worker."""
self.target_filepath[id] = fileish
class SevenZipDecompressor:
"""Main decompressor object which is properly configured and bind to each 7zip folder.
because 7zip folder can have a custom compression method"""
lzma_methods_map = {
CompressionMethod.LZMA: lzma.FILTER_LZMA1,
CompressionMethod.LZMA2: lzma.FILTER_LZMA2,
CompressionMethod.DELTA: lzma.FILTER_DELTA,
CompressionMethod.P7Z_BCJ: lzma.FILTER_X86,
CompressionMethod.BCJ_ARM: lzma.FILTER_ARM,
CompressionMethod.BCJ_ARMT: lzma.FILTER_ARMTHUMB,
CompressionMethod.BCJ_IA64: lzma.FILTER_IA64,
CompressionMethod.BCJ_PPC: lzma.FILTER_POWERPC,
CompressionMethod.BCJ_SPARC: lzma.FILTER_SPARC,
}
FILTER_BZIP2 = 0x31
FILTER_ZIP = 0x32
FILTER_COPY = 0x33
FILTER_AES = 0x34
FILTER_ZSTD = 0x35
alt_methods_map = {
CompressionMethod.MISC_BZIP2: FILTER_BZIP2,
CompressionMethod.MISC_DEFLATE: FILTER_ZIP,
CompressionMethod.COPY: FILTER_COPY,
CompressionMethod.CRYPT_AES256_SHA256: FILTER_AES,
CompressionMethod.MISC_ZSTD: FILTER_ZSTD,
}
def __init__(self, coders: List[Dict[str, Any]], size: int, crc: Optional[int]) -> None:
# Get password which was set when creation of py7zr.SevenZipFile object.
self.input_size = size
self.consumed = 0 # type: int
self.crc = crc
self.digest = None # type: Optional[int]
if self._check_lzma_coders(coders):
self._set_lzma_decompressor(coders)
else:
self._set_alternative_decompressor(coders)
def _check_lzma_coders(self, coders: List[Dict[str, Any]]) -> bool:
res = True
for coder in coders:
if self.lzma_methods_map.get(coder['method'], None) is None:
res = False
break
return res
def _set_lzma_decompressor(self, coders: List[Dict[str, Any]]) -> None:
filters = [] # type: List[Dict[str, Any]]
for coder in coders:
if coder['numinstreams'] != 1 or coder['numoutstreams'] != 1:
raise UnsupportedCompressionMethodError('Only a simple compression method is currently supported.')
filter_id = self.lzma_methods_map.get(coder['method'], None)
if filter_id is None:
raise UnsupportedCompressionMethodError
properties = coder.get('properties', None)
if properties is not None:
filters[:0] = [lzma._decode_filter_properties(filter_id, properties)] # type: ignore
else:
filters[:0] = [{'id': filter_id}]
self.decompressor = lzma.LZMADecompressor(format=lzma.FORMAT_RAW, filters=filters) # type: Union[bz2.BZ2Decompressor, lzma.LZMADecompressor, ISevenZipDecompressor] # noqa
def _set_alternative_decompressor(self, coders: List[Dict[str, Any]]) -> None:
filter_id = self.alt_methods_map.get(coders[0]['method'], None)
if filter_id == self.FILTER_BZIP2:
self.decompressor = bz2.BZ2Decompressor()
elif filter_id == self.FILTER_ZIP:
self.decompressor = DeflateDecompressor()
elif filter_id == self.FILTER_COPY:
self.decompressor = CopyDecompressor()
elif filter_id == self.FILTER_ZSTD and Zstd:
self.decompressor = ZstdDecompressor()
else:
raise UnsupportedCompressionMethodError
def decompress(self, data: bytes, max_length: Optional[int] = None) -> bytes:
self.consumed += len(data)
if max_length is not None:
folder_data = self.decompressor.decompress(data, max_length=max_length)
else:
folder_data = self.decompressor.decompress(data)
# calculate CRC with uncompressed data
if self.crc is not None:
self.digest = calculate_crc32(folder_data, self.digest)
return folder_data
def check_crc(self):
return self.crc == self.digest
class SevenZipCompressor:
"""Main compressor object to configured for each 7zip folder."""
__slots__ = ['filters', 'compressor', 'coders']
lzma_methods_map_r = {
lzma.FILTER_LZMA2: CompressionMethod.LZMA2,
lzma.FILTER_DELTA: CompressionMethod.DELTA,
lzma.FILTER_X86: CompressionMethod.P7Z_BCJ,
}
def __init__(self, filters=None):
if filters is None:
self.filters = [{"id": lzma.FILTER_LZMA2, "preset": 7 | lzma.PRESET_EXTREME}, ]
else:
self.filters = filters
self.compressor = lzma.LZMACompressor(format=lzma.FORMAT_RAW, filters=self.filters)
self.coders = []
for filter in self.filters:
if filter is None:
break
method = self.lzma_methods_map_r[filter['id']]
properties = lzma._encode_filter_properties(filter)
self.coders.append({'method': method, 'properties': properties, 'numinstreams': 1, 'numoutstreams': 1})
def compress(self, data):
return self.compressor.compress(data)
def flush(self):
return self.compressor.flush()
def get_methods_names(coders: List[dict]) -> List[str]:
"""Return human readable method names for specified coders"""
methods_name_map = {
CompressionMethod.LZMA2: "LZMA2",
CompressionMethod.LZMA: "LZMA",
CompressionMethod.DELTA: "delta",
CompressionMethod.P7Z_BCJ: "BCJ",
CompressionMethod.BCJ_ARM: "BCJ(ARM)",
CompressionMethod.BCJ_ARMT: "BCJ(ARMT)",
CompressionMethod.BCJ_IA64: "BCJ(IA64)",
CompressionMethod.BCJ_PPC: "BCJ(POWERPC)",
CompressionMethod.BCJ_SPARC: "BCJ(SPARC)",
CompressionMethod.CRYPT_AES256_SHA256: "7zAES",
}
methods_names = [] # type: List[str]
for coder in coders:
try:
methods_names.append(methods_name_map[coder['method']])
except KeyError:
raise UnsupportedCompressionMethodError("Unknown method {}".format(coder['method']))
return methods_names

@ -0,0 +1,42 @@
#
# p7zr library
#
# Copyright (c) 2019 Hiroshi Miura <miurahr@linux.com>
# Copyright (c) 2004-2015 by Joachim Bauch, mail@joachim-bauch.de
# 7-Zip Copyright (C) 1999-2010 Igor Pavlov
# LZMA SDK Copyright (C) 1999-2010 Igor Pavlov
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
class ArchiveError(Exception):
pass
class Bad7zFile(ArchiveError):
pass
class UnsupportedCompressionMethodError(ArchiveError):
pass
class DecompressionError(ArchiveError):
pass
class InternalError(ArchiveError):
pass

@ -0,0 +1,122 @@
#!/usr/bin/python -u
#
# p7zr library
#
# Copyright (c) 2019 Hiroshi Miura <miurahr@linux.com>
# Copyright (c) 2004-2015 by Joachim Bauch, mail@joachim-bauch.de
# 7-Zip Copyright (C) 1999-2010 Igor Pavlov
# LZMA SDK Copyright (C) 1999-2010 Igor Pavlov
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
import lzma
import zlib
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Union
from py7zr import UnsupportedCompressionMethodError
from py7zr.helpers import Buffer, calculate_key
from py7zr.properties import READ_BLOCKSIZE, CompressionMethod
try:
import zstandard as Zstd # type: ignore
except ImportError:
Zstd = None
class ISevenZipCompressor(ABC):
@abstractmethod
def compress(self, data: Union[bytes, bytearray, memoryview]) -> bytes:
pass
@abstractmethod
def flush(self) -> bytes:
pass
class ISevenZipDecompressor(ABC):
@abstractmethod
def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes:
pass
class DeflateDecompressor(ISevenZipDecompressor):
def __init__(self):
self.buf = b''
self._decompressor = zlib.decompressobj(-15)
def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1):
if max_length < 0:
res = self.buf + self._decompressor.decompress(data)
self.buf = b''
else:
tmp = self.buf + self._decompressor.decompress(data)
res = tmp[:max_length]
self.buf = tmp[max_length:]
return res
class CopyDecompressor(ISevenZipDecompressor):
def __init__(self):
self._buf = bytes()
def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes:
if max_length < 0:
length = len(data)
else:
length = min(len(data), max_length)
buflen = len(self._buf)
if length > buflen:
res = self._buf + data[:length - buflen]
self._buf = data[length - buflen:]
else:
res = self._buf[:length]
self._buf = self._buf[length:] + data
return res
class ZstdDecompressor(ISevenZipDecompressor):
def __init__(self):
if Zstd is None:
raise UnsupportedCompressionMethodError
self.buf = b'' # type: bytes
self._ctc = Zstd.ZstdDecompressor() # type: ignore
def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes:
dobj = self._ctc.decompressobj() # type: ignore
if max_length < 0:
res = self.buf + dobj.decompress(data)
self.buf = b''
else:
tmp = self.buf + dobj.decompress(data)
res = tmp[:max_length]
self.buf = tmp[max_length:]
return res
class ZstdCompressor(ISevenZipCompressor):
def __init__(self):
if Zstd is None:
raise UnsupportedCompressionMethodError
self._ctc = Zstd.ZstdCompressor() # type: ignore
def compress(self, data: Union[bytes, bytearray, memoryview]) -> bytes:
return self._ctc.compress(data) # type: ignore
def flush(self):
pass

@ -0,0 +1,362 @@
#!/usr/bin/python -u
#
# p7zr library
#
# Copyright (c) 2019 Hiroshi Miura <miurahr@linux.com>
# Copyright (c) 2004-2015 by Joachim Bauch, mail@joachim-bauch.de
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
#
import _hashlib # type: ignore # noqa
import ctypes
import os
import pathlib
import platform
import sys
import time as _time
import zlib
from datetime import datetime, timedelta, timezone, tzinfo
from typing import BinaryIO, Optional, Union
import py7zr.win32compat
def calculate_crc32(data: bytes, value: Optional[int] = None, blocksize: int = 1024 * 1024) -> int:
"""Calculate CRC32 of strings with arbitrary lengths."""
length = len(data)
pos = blocksize
if value:
value = zlib.crc32(data[:pos], value)
else:
value = zlib.crc32(data[:pos])
while pos < length:
value = zlib.crc32(data[pos:pos + blocksize], value)
pos += blocksize
return value & 0xffffffff
def _calculate_key1(password: bytes, cycles: int, salt: bytes, digest: str) -> bytes:
"""Calculate 7zip AES encryption key."""
if digest not in ('sha256'):
raise ValueError('Unknown digest method for password protection.')
assert cycles <= 0x3f
if cycles == 0x3f:
ba = bytearray(salt + password + bytes(32))
key = bytes(ba[:32]) # type: bytes
else:
rounds = 1 << cycles
m = _hashlib.new(digest)
for round in range(rounds):
m.update(salt + password + round.to_bytes(8, byteorder='little', signed=False))
key = m.digest()[:32]
return key
def _calculate_key2(password: bytes, cycles: int, salt: bytes, digest: str):
"""Calculate 7zip AES encryption key.
It utilize ctypes and memoryview buffer and zero-copy technology on Python."""
if digest not in ('sha256'):
raise ValueError('Unknown digest method for password protection.')
assert cycles <= 0x3f
if cycles == 0x3f:
key = bytes(bytearray(salt + password + bytes(32))[:32]) # type: bytes
else:
rounds = 1 << cycles
m = _hashlib.new(digest)
length = len(salt) + len(password)
class RoundBuf(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [
('saltpassword', ctypes.c_ubyte * length),
('round', ctypes.c_uint64)
]
buf = RoundBuf()
for i, c in enumerate(salt + password):
buf.saltpassword[i] = c
buf.round = 0
mv = memoryview(buf) # type: ignore # noqa
while buf.round < rounds:
m.update(mv)
buf.round += 1
key = m.digest()[:32]
return key
if platform.python_implementation() == "PyPy":
calculate_key = _calculate_key1 # Avoid https://foss.heptapod.net/pypy/pypy/issues/3209
else:
calculate_key = _calculate_key2 # ver2 is 1.7-2.0 times faster than ver1
def filetime_to_dt(ft):
"""Convert Windows NTFS file time into python datetime object."""
EPOCH_AS_FILETIME = 116444736000000000
us = (ft - EPOCH_AS_FILETIME) // 10
return datetime(1970, 1, 1, tzinfo=timezone.utc) + timedelta(microseconds=us)
ZERO = timedelta(0)
HOUR = timedelta(hours=1)
SECOND = timedelta(seconds=1)
# A class capturing the platform's idea of local time.
# (May result in wrong values on historical times in
# timezones where UTC offset and/or the DST rules had
# changed in the past.)
STDOFFSET = timedelta(seconds=-_time.timezone)
if _time.daylight:
DSTOFFSET = timedelta(seconds=-_time.altzone)
else:
DSTOFFSET = STDOFFSET
DSTDIFF = DSTOFFSET - STDOFFSET
class LocalTimezone(tzinfo):
def fromutc(self, dt):
assert dt.tzinfo is self
stamp = (dt - datetime(1970, 1, 1, tzinfo=self)) // SECOND
args = _time.localtime(stamp)[:6]
dst_diff = DSTDIFF // SECOND
# Detect fold
fold = (args == _time.localtime(stamp - dst_diff))
return datetime(*args, microsecond=dt.microsecond, tzinfo=self)
def utcoffset(self, dt):
if self._isdst(dt):
return DSTOFFSET
else:
return STDOFFSET
def dst(self, dt):
if self._isdst(dt):
return DSTDIFF
else:
return ZERO
def tzname(self, dt):
return _time.tzname[self._isdst(dt)]
def _isdst(self, dt):
tt = (dt.year, dt.month, dt.day,
dt.hour, dt.minute, dt.second,
dt.weekday(), 0, 0)
stamp = _time.mktime(tt)
tt = _time.localtime(stamp)
return tt.tm_isdst > 0
Local = LocalTimezone()
TIMESTAMP_ADJUST = -11644473600
class UTC(tzinfo):
"""UTC"""
def utcoffset(self, dt):
return ZERO
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return ZERO
def _call__(self):
return self
class ArchiveTimestamp(int):
"""Windows FILETIME timestamp."""
def __repr__(self):
return '%s(%d)' % (type(self).__name__, self)
def totimestamp(self) -> float:
"""Convert 7z FILETIME to Python timestamp."""
# FILETIME is 100-nanosecond intervals since 1601/01/01 (UTC)
return (self / 10000000.0) + TIMESTAMP_ADJUST
def as_datetime(self):
"""Convert FILETIME to Python datetime object."""
return datetime.fromtimestamp(self.totimestamp(), UTC())
@staticmethod
def from_datetime(val):
return ArchiveTimestamp((val - TIMESTAMP_ADJUST) * 10000000.0)
def islink(path):
"""
Cross-platform islink implementation.
Supports Windows NT symbolic links and reparse points.
"""
is_symlink = os.path.islink(path)
if sys.version_info >= (3, 8) or sys.platform != "win32" or sys.getwindowsversion()[0] < 6:
return is_symlink
# special check for directory junctions which py38 does.
if is_symlink:
if py7zr.win32compat.is_reparse_point(path):
is_symlink = False
return is_symlink
def readlink(path: Union[str, pathlib.Path], *, dir_fd=None) -> Union[str, pathlib.Path]:
"""
Cross-platform compat implementation of os.readlink and Path.readlink().
Supports Windows NT symbolic links and reparse points.
When called with path argument as pathlike(str), return result as a pathlike(str).
When called with Path object, return also Path object.
When called with path argument as bytes, return result as a bytes.
"""
is_path_pathlib = isinstance(path, pathlib.Path)
if sys.version_info >= (3, 9):
if is_path_pathlib and dir_fd is None:
return path.readlink()
else:
return os.readlink(path, dir_fd=dir_fd)
elif sys.version_info >= (3, 8) or sys.platform != "win32":
res = os.readlink(path, dir_fd=dir_fd)
# Hack to handle a wrong type of results
if isinstance(res, bytes):
res = os.fsdecode(res)
if is_path_pathlib:
return pathlib.Path(res)
else:
return res
elif not os.path.exists(str(path)):
raise OSError(22, 'Invalid argument', path)
return py7zr.win32compat.readlink(path)
class MemIO:
"""pathlib.Path-like IO class to write memory(io.Bytes)"""
def __init__(self, buf: BinaryIO):
self._buf = buf
def write(self, data: bytes) -> int:
return self._buf.write(data)
def read(self, length: Optional[int] = None) -> bytes:
if length is not None:
return self._buf.read(length)
else:
return self._buf.read()
def close(self) -> None:
self._buf.seek(0)
def flush(self) -> None:
pass
def seek(self, position: int) -> None:
self._buf.seek(position)
def open(self, mode=None):
return self
@property
def parent(self):
return self
def mkdir(self, parents=None, exist_ok=False):
return None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
class NullIO:
"""pathlib.Path-like IO class of /dev/null"""
def __init__(self):
pass
def write(self, data):
return len(data)
def read(self, length=None):
if length is not None:
return bytes(length)
else:
return b''
def close(self):
pass
def flush(self):
pass
def open(self, mode=None):
return self
@property
def parent(self):
return self
def mkdir(self):
return None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
class BufferOverflow(Exception):
pass
class Buffer:
def __init__(self, size: int = 16):
self._size = size
self._buf = bytearray(size)
self._buflen = 0
self.view = memoryview(self._buf[0:0])
def add(self, data: Union[bytes, bytearray, memoryview]):
length = len(data)
if length + self._buflen > self._size:
raise BufferOverflow()
self._buf[self._buflen:self._buflen + length] = data
self._buflen += length
self.view = memoryview(self._buf[0:self._buflen])
def reset(self) -> None:
self._buflen = 0
self.view = memoryview(self._buf[0:0])
def set(self, data: Union[bytes, bytearray, memoryview]) -> None:
length = len(data)
if length > self._size:
raise BufferOverflow()
self._buf[0:length] = data
self._buflen = length
self.view = memoryview(self._buf[0:length])
def __len__(self) -> int:
return self._buflen

@ -0,0 +1,155 @@
#
# p7zr library
#
# Copyright (c) 2019 Hiroshi Miura <miurahr@linux.com>
# Copyright (c) 2004-2015 by Joachim Bauch, mail@joachim-bauch.de
# 7-Zip Copyright (C) 1999-2010 Igor Pavlov
# LZMA SDK Copyright (C) 1999-2010 Igor Pavlov
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
import binascii
from enum import Enum
from typing import Optional
MAGIC_7Z = binascii.unhexlify('377abcaf271c')
FINISH_7Z = binascii.unhexlify('377abcaf271d')
READ_BLOCKSIZE = 32248
QUEUELEN = READ_BLOCKSIZE * 2
READ_BLOCKSIZE = 32248
class ByteEnum(bytes, Enum):
pass
class Property(ByteEnum):
"""Hold 7zip property fixed values."""
END = binascii.unhexlify('00')
HEADER = binascii.unhexlify('01')
ARCHIVE_PROPERTIES = binascii.unhexlify('02')
ADDITIONAL_STREAMS_INFO = binascii.unhexlify('03')
MAIN_STREAMS_INFO = binascii.unhexlify('04')
FILES_INFO = binascii.unhexlify('05')
PACK_INFO = binascii.unhexlify('06')
UNPACK_INFO = binascii.unhexlify('07')
SUBSTREAMS_INFO = binascii.unhexlify('08')
SIZE = binascii.unhexlify('09')
CRC = binascii.unhexlify('0a')
FOLDER = binascii.unhexlify('0b')
CODERS_UNPACK_SIZE = binascii.unhexlify('0c')
NUM_UNPACK_STREAM = binascii.unhexlify('0d')
EMPTY_STREAM = binascii.unhexlify('0e')
EMPTY_FILE = binascii.unhexlify('0f')
ANTI = binascii.unhexlify('10')
NAME = binascii.unhexlify('11')
CREATION_TIME = binascii.unhexlify('12')
LAST_ACCESS_TIME = binascii.unhexlify('13')
LAST_WRITE_TIME = binascii.unhexlify('14')
ATTRIBUTES = binascii.unhexlify('15')
COMMENT = binascii.unhexlify('16')
ENCODED_HEADER = binascii.unhexlify('17')
START_POS = binascii.unhexlify('18')
DUMMY = binascii.unhexlify('19')
class CompressionMethod(ByteEnum):
"""Hold fixed values for method parameter."""
COPY = binascii.unhexlify('00')
DELTA = binascii.unhexlify('03')
BCJ = binascii.unhexlify('04')
PPC = binascii.unhexlify('05')
IA64 = binascii.unhexlify('06')
ARM = binascii.unhexlify('07')
ARMT = binascii.unhexlify('08')
SPARC = binascii.unhexlify('09')
# SWAP = 02..
SWAP2 = binascii.unhexlify('020302')
SWAP4 = binascii.unhexlify('020304')
# 7Z = 03..
LZMA = binascii.unhexlify('030101')
PPMD = binascii.unhexlify('030401')
P7Z_BCJ = binascii.unhexlify('03030103')
P7Z_BCJ2 = binascii.unhexlify('0303011B')
BCJ_PPC = binascii.unhexlify('03030205')
BCJ_IA64 = binascii.unhexlify('03030401')
BCJ_ARM = binascii.unhexlify('03030501')
BCJ_ARMT = binascii.unhexlify('03030701')
BCJ_SPARC = binascii.unhexlify('03030805')
LZMA2 = binascii.unhexlify('21')
# MISC : 04..
MISC_ZIP = binascii.unhexlify('0401')
MISC_BZIP2 = binascii.unhexlify('040202')
MISC_DEFLATE = binascii.unhexlify('040108')
MISC_DEFLATE64 = binascii.unhexlify('040109')
MISC_Z = binascii.unhexlify('0405')
MISC_LZH = binascii.unhexlify('0406')
NSIS_DEFLATE = binascii.unhexlify('040901')
NSIS_BZIP2 = binascii.unhexlify('040902')
#
MISC_ZSTD = binascii.unhexlify('04f71101')
MISC_BROTLI = binascii.unhexlify('04f71102')
MISC_LZ4 = binascii.unhexlify('04f71104')
MISC_LZS = binascii.unhexlify('04f71105')
MISC_LIZARD = binascii.unhexlify('04f71106')
# CRYPTO 06..
CRYPT_ZIPCRYPT = binascii.unhexlify('06f10101')
CRYPT_RAR29AES = binascii.unhexlify('06f10303')
CRYPT_AES256_SHA256 = binascii.unhexlify('06f10701')
class SupportedMethods:
"""Hold list of methods which python3 can support."""
formats = [{'name': "7z", 'magic': MAGIC_7Z}]
codecs = [{'id': CompressionMethod.LZMA, 'name': "LZMA"},
{'id': CompressionMethod.LZMA2, 'name': "LZMA2"},
{'id': CompressionMethod.DELTA, 'name': "DELTA"},
{'id': CompressionMethod.P7Z_BCJ, 'name': "BCJ"},
{'id': CompressionMethod.BCJ_PPC, 'name': 'PPC'},
{'id': CompressionMethod.BCJ_IA64, 'name': 'IA64'},
{'id': CompressionMethod.BCJ_ARM, 'name': "ARM"},
{'id': CompressionMethod.BCJ_ARMT, 'name': "ARMT"},
{'id': CompressionMethod.BCJ_SPARC, 'name': 'SPARC'}
]
# this class is Borg/Singleton
class ArchivePassword:
_shared_state = {
'_password': None,
}
def __init__(self, password: Optional[str] = None):
self.__dict__ = self._shared_state
if password is not None:
self._password = password
def set(self, password):
self._password = password
def get(self):
if self._password is not None:
return self._password
else:
return ''
def __str__(self):
if self._password is not None:
return self._password
else:
return ''

@ -0,0 +1,974 @@
#!/usr/bin/python -u
#
# p7zr library
#
# Copyright (c) 2019,2020 Hiroshi Miura <miurahr@linux.com>
# Copyright (c) 2004-2015 by Joachim Bauch, mail@joachim-bauch.de
# 7-Zip Copyright (C) 1999-2010 Igor Pavlov
# LZMA SDK Copyright (C) 1999-2010 Igor Pavlov
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
#
"""Read 7zip format archives."""
import collections.abc
import datetime
import errno
import functools
import io
import operator
import os
import queue
import stat
import sys
import threading
from io import BytesIO
from typing import IO, Any, BinaryIO, Dict, List, Optional, Tuple, Union
from py7zr.archiveinfo import Folder, Header, SignatureHeader
from py7zr.callbacks import ExtractCallback
from py7zr.compression import SevenZipCompressor, Worker, get_methods_names
from py7zr.exceptions import Bad7zFile, InternalError
from py7zr.helpers import ArchiveTimestamp, MemIO, calculate_crc32, filetime_to_dt
from py7zr.properties import MAGIC_7Z, READ_BLOCKSIZE, ArchivePassword
if sys.version_info < (3, 6):
import contextlib2 as contextlib
import pathlib2 as pathlib
else:
import contextlib
import pathlib
if sys.platform.startswith('win'):
import _winapi
FILE_ATTRIBUTE_UNIX_EXTENSION = 0x8000
FILE_ATTRIBUTE_WINDOWS_MASK = 0x04fff
class ArchiveFile:
"""Represent each files metadata inside archive file.
It holds file properties; filename, permissions, and type whether
it is directory, link or normal file.
Instances of the :class:`ArchiveFile` class are returned by iterating :attr:`files_list` of
:class:`SevenZipFile` objects.
Each object stores information about a single member of the 7z archive. Most of users use :meth:`extractall()`.
The class also hold an archive parameter where file is exist in
archive file folder(container)."""
def __init__(self, id: int, file_info: Dict[str, Any]) -> None:
self.id = id
self._file_info = file_info
def file_properties(self) -> Dict[str, Any]:
"""Return file properties as a hash object. Following keys are included: readonly, is_directory,
posix_mode, archivable, emptystream, filename, creationtime, lastaccesstime,
lastwritetime, attributes
"""
properties = self._file_info
if properties is not None:
properties['readonly'] = self.readonly
properties['posix_mode'] = self.posix_mode
properties['archivable'] = self.archivable
properties['is_directory'] = self.is_directory
return properties
def _get_property(self, key: str) -> Any:
try:
return self._file_info[key]
except KeyError:
return None
@property
def origin(self) -> pathlib.Path:
return self._get_property('origin')
@property
def folder(self) -> Folder:
return self._get_property('folder')
@property
def filename(self) -> str:
"""return filename of archive file."""
return self._get_property('filename')
@property
def emptystream(self) -> bool:
"""True if file is empty(0-byte file), otherwise False"""
return self._get_property('emptystream')
@property
def uncompressed(self) -> List[int]:
return self._get_property('uncompressed')
@property
def uncompressed_size(self) -> int:
"""Uncompressed file size."""
return functools.reduce(operator.add, self.uncompressed)
@property
def compressed(self) -> Optional[int]:
"""Compressed size"""
return self._get_property('compressed')
def _test_attribute(self, target_bit: int) -> bool:
attributes = self._get_property('attributes')
if attributes is None:
return False
return attributes & target_bit == target_bit
@property
def archivable(self) -> bool:
"""File has a Windows `archive` flag."""
return self._test_attribute(stat.FILE_ATTRIBUTE_ARCHIVE) # type: ignore # noqa
@property
def is_directory(self) -> bool:
"""True if file is a directory, otherwise False."""
return self._test_attribute(stat.FILE_ATTRIBUTE_DIRECTORY) # type: ignore # noqa
@property
def readonly(self) -> bool:
"""True if file is readonly, otherwise False."""
return self._test_attribute(stat.FILE_ATTRIBUTE_READONLY) # type: ignore # noqa
def _get_unix_extension(self) -> Optional[int]:
attributes = self._get_property('attributes')
if self._test_attribute(FILE_ATTRIBUTE_UNIX_EXTENSION):
return attributes >> 16
return None
@property
def is_symlink(self) -> bool:
"""True if file is a symbolic link, otherwise False."""
e = self._get_unix_extension()
if e is not None:
return stat.S_ISLNK(e)
return self._test_attribute(stat.FILE_ATTRIBUTE_REPARSE_POINT) # type: ignore # noqa
@property
def is_junction(self) -> bool:
"""True if file is a junction/reparse point on windows, otherwise False."""
return self._test_attribute(stat.FILE_ATTRIBUTE_REPARSE_POINT | # type: ignore # noqa
stat.FILE_ATTRIBUTE_DIRECTORY) # type: ignore # noqa
@property
def is_socket(self) -> bool:
"""True if file is a socket, otherwise False."""
e = self._get_unix_extension()
if e is not None:
return stat.S_ISSOCK(e)
return False
@property
def lastwritetime(self) -> Optional[ArchiveTimestamp]:
"""Return last written timestamp of a file."""
return self._get_property('lastwritetime')
@property
def posix_mode(self) -> Optional[int]:
"""
posix mode when a member has a unix extension property, or None
:return: Return file stat mode can be set by os.chmod()
"""
e = self._get_unix_extension()
if e is not None:
return stat.S_IMODE(e)
return None
@property
def st_fmt(self) -> Optional[int]:
"""
:return: Return the portion of the file mode that describes the file type
"""
e = self._get_unix_extension()
if e is not None:
return stat.S_IFMT(e)
return None
class ArchiveFileList(collections.abc.Iterable):
"""Iteratable container of ArchiveFile."""
def __init__(self, offset: int = 0):
self.files_list = [] # type: List[dict]
self.index = 0
self.offset = offset
def append(self, file_info: Dict[str, Any]) -> None:
self.files_list.append(file_info)
def __len__(self) -> int:
return len(self.files_list)
def __iter__(self) -> 'ArchiveFileListIterator':
return ArchiveFileListIterator(self)
def __getitem__(self, index):
if index > len(self.files_list):
raise IndexError
if index < 0:
raise IndexError
res = ArchiveFile(index + self.offset, self.files_list[index])
return res
class ArchiveFileListIterator(collections.abc.Iterator):
def __init__(self, archive_file_list):
self._archive_file_list = archive_file_list
self._index = 0
def __next__(self) -> ArchiveFile:
if self._index == len(self._archive_file_list):
raise StopIteration
res = self._archive_file_list[self._index]
self._index += 1
return res
# ------------------
# Exported Classes
# ------------------
class ArchiveInfo:
"""Hold archive information"""
def __init__(self, filename, size, header_size, method_names, solid, blocks, uncompressed):
self.filename = filename
self.size = size
self.header_size = header_size
self.method_names = method_names
self.solid = solid
self.blocks = blocks
self.uncompressed = uncompressed
class FileInfo:
"""Hold archived file information."""
def __init__(self, filename, compressed, uncompressed, archivable, is_directory, creationtime):
self.filename = filename
self.compressed = compressed
self.uncompressed = uncompressed
self.archivable = archivable
self.is_directory = is_directory
self.creationtime = creationtime
class SevenZipFile(contextlib.AbstractContextManager):
"""The SevenZipFile Class provides an interface to 7z archives."""
def __init__(self, file: Union[BinaryIO, str, pathlib.Path], mode: str = 'r',
*, filters: Optional[str] = None, dereference=False, password: Optional[str] = None) -> None:
if mode not in ('r', 'w', 'x', 'a'):
raise ValueError("ZipFile requires mode 'r', 'w', 'x', or 'a'")
if password is not None:
if mode not in ('r'):
raise NotImplementedError("It has not been implemented to create archive with password.")
ArchivePassword(password)
self.password_protected = True
else:
self.password_protected = False
# Check if we were passed a file-like object or not
if isinstance(file, str):
self._filePassed = False # type: bool
self.filename = file # type: str
if mode == 'r':
self.fp = open(file, 'rb') # type: BinaryIO
elif mode == 'w':
self.fp = open(file, 'w+b')
elif mode == 'x':
self.fp = open(file, 'x+b')
elif mode == 'a':
self.fp = open(file, 'r+b')
else:
raise ValueError("File open error.")
self.mode = mode
elif isinstance(file, pathlib.Path):
self._filePassed = False
self.filename = str(file)
if mode == 'r':
self.fp = file.open(mode='rb') # type: ignore # noqa # typeshed issue: 2911
elif mode == 'w':
self.fp = file.open(mode='w+b') # type: ignore # noqa
elif mode == 'x':
self.fp = file.open(mode='x+b') # type: ignore # noqa
elif mode == 'a':
self.fp = file.open(mode='r+b') # type: ignore # noqa
else:
raise ValueError("File open error.")
self.mode = mode
elif isinstance(file, io.IOBase):
self._filePassed = True
self.fp = file
self.filename = getattr(file, 'name', None)
self.mode = mode # type: ignore #noqa
else:
raise TypeError("invalid file: {}".format(type(file)))
self._fileRefCnt = 1
try:
if mode == "r":
self._real_get_contents(self.fp)
self._reset_worker()
elif mode in 'w':
# FIXME: check filters here
self.folder = self._create_folder(filters)
self.files = ArchiveFileList()
self._prepare_write()
self._reset_worker()
elif mode in 'x':
raise NotImplementedError
elif mode == 'a':
raise NotImplementedError
else:
raise ValueError("Mode must be 'r', 'w', 'x', or 'a'")
except Exception as e:
self._fpclose()
raise e
self.encoded_header_mode = False
self._dict = {} # type: Dict[str, IO[Any]]
self.dereference = dereference
self.reporterd = None # type: Optional[threading.Thread]
self.q = queue.Queue() # type: queue.Queue[Any]
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def _create_folder(self, filters):
folder = Folder()
folder.compressor = SevenZipCompressor(filters)
folder.coders = folder.compressor.coders
folder.solid = True
folder.digestdefined = False
folder.bindpairs = []
folder.totalin = 1
folder.totalout = 1
return folder
def _fpclose(self) -> None:
assert self._fileRefCnt > 0
self._fileRefCnt -= 1
if not self._fileRefCnt and not self._filePassed:
self.fp.close()
def _real_get_contents(self, fp: BinaryIO) -> None:
if not self._check_7zfile(fp):
raise Bad7zFile('not a 7z file')
self.sig_header = SignatureHeader.retrieve(self.fp)
self.afterheader = self.fp.tell()
buffer = self._read_header_data()
header = Header.retrieve(self.fp, buffer, self.afterheader)
if header is None:
return
self.header = header
buffer.close()
self.files = ArchiveFileList()
if getattr(self.header, 'files_info', None) is not None:
self._filelist_retrieve()
def _read_header_data(self) -> BytesIO:
self.fp.seek(self.sig_header.nextheaderofs, os.SEEK_CUR)
buffer = io.BytesIO(self.fp.read(self.sig_header.nextheadersize))
if self.sig_header.nextheadercrc != calculate_crc32(buffer.getvalue()):
raise Bad7zFile('invalid header data')
return buffer
class ParseStatus:
def __init__(self, src_pos=0):
self.src_pos = src_pos
self.folder = 0 # 7zip folder where target stored
self.outstreams = 0 # output stream count
self.input = 0 # unpack stream count in each folder
self.stream = 0 # target input stream position
def _gen_filename(self) -> str:
# compressed file is stored without a name, generate one
try:
basefilename = self.filename
except AttributeError:
# 7z archive file doesn't have a name
return 'contents'
else:
if basefilename is not None:
fn, ext = os.path.splitext(os.path.basename(basefilename))
return fn
else:
return 'contents'
def _get_fileinfo_sizes(self, pstat, subinfo, packinfo, folder, packsizes, unpacksizes, file_in_solid, numinstreams):
if pstat.input == 0:
folder.solid = subinfo.num_unpackstreams_folders[pstat.folder] > 1
maxsize = (folder.solid and packinfo.packsizes[pstat.stream]) or None
uncompressed = unpacksizes[pstat.outstreams]
if not isinstance(uncompressed, (list, tuple)):
uncompressed = [uncompressed] * len(folder.coders)
if file_in_solid > 0:
compressed = None
elif pstat.stream < len(packsizes): # file is compressed
compressed = packsizes[pstat.stream]
else: # file is not compressed
compressed = uncompressed
packsize = packsizes[pstat.stream:pstat.stream + numinstreams]
return maxsize, compressed, uncompressed, packsize, folder.solid
def _filelist_retrieve(self) -> None:
# Initialize references for convenience
if hasattr(self.header, 'main_streams') and self.header.main_streams is not None:
folders = self.header.main_streams.unpackinfo.folders
packinfo = self.header.main_streams.packinfo
subinfo = self.header.main_streams.substreamsinfo
packsizes = packinfo.packsizes
unpacksizes = subinfo.unpacksizes if subinfo.unpacksizes is not None else [x.unpacksizes for x in folders]
else:
subinfo = None
folders = None
packinfo = None
packsizes = []
unpacksizes = [0]
pstat = self.ParseStatus()
pstat.src_pos = self.afterheader
file_in_solid = 0
for file_id, file_info in enumerate(self.header.files_info.files):
if not file_info['emptystream'] and folders is not None:
folder = folders[pstat.folder]
numinstreams = max([coder.get('numinstreams', 1) for coder in folder.coders])
(maxsize, compressed, uncompressed,
packsize, solid) = self._get_fileinfo_sizes(pstat, subinfo, packinfo, folder, packsizes,
unpacksizes, file_in_solid, numinstreams)
pstat.input += 1
folder.solid = solid
file_info['folder'] = folder
file_info['maxsize'] = maxsize
file_info['compressed'] = compressed
file_info['uncompressed'] = uncompressed
file_info['packsizes'] = packsize
if subinfo.digestsdefined[pstat.outstreams]:
file_info['digest'] = subinfo.digests[pstat.outstreams]
if folder is None:
pstat.src_pos += file_info['compressed']
else:
if folder.solid:
file_in_solid += 1
pstat.outstreams += 1
if folder.files is None:
folder.files = ArchiveFileList(offset=file_id)
folder.files.append(file_info)
if pstat.input >= subinfo.num_unpackstreams_folders[pstat.folder]:
file_in_solid = 0
pstat.src_pos += sum(packinfo.packsizes[pstat.stream:pstat.stream + numinstreams])
pstat.folder += 1
pstat.stream += numinstreams
pstat.input = 0
else:
file_info['folder'] = None
file_info['maxsize'] = 0
file_info['compressed'] = 0
file_info['uncompressed'] = [0]
file_info['packsizes'] = [0]
if 'filename' not in file_info:
file_info['filename'] = self._gen_filename()
self.files.append(file_info)
def _num_files(self) -> int:
if getattr(self.header, 'files_info', None) is not None:
return len(self.header.files_info.files)
return 0
def _set_file_property(self, outfilename: pathlib.Path, properties: Dict[str, Any]) -> None:
# creation time
creationtime = ArchiveTimestamp(properties['lastwritetime']).totimestamp()
if creationtime is not None:
os.utime(str(outfilename), times=(creationtime, creationtime))
if os.name == 'posix':
st_mode = properties['posix_mode']
if st_mode is not None:
outfilename.chmod(st_mode)
return
# fallback: only set readonly if specified
if properties['readonly'] and not properties['is_directory']:
ro_mask = 0o777 ^ (stat.S_IWRITE | stat.S_IWGRP | stat.S_IWOTH)
outfilename.chmod(outfilename.stat().st_mode & ro_mask)
def _reset_decompressor(self) -> None:
if self.header.main_streams is not None and self.header.main_streams.unpackinfo.numfolders > 0:
for i, folder in enumerate(self.header.main_streams.unpackinfo.folders):
folder.decompressor = None
def _reset_worker(self) -> None:
"""Seek to where archive data start in archive and recreate new worker."""
self.fp.seek(self.afterheader)
self.worker = Worker(self.files, self.afterheader, self.header)
def set_encoded_header_mode(self, mode: bool) -> None:
self.encoded_header_mode = mode
@staticmethod
def _check_7zfile(fp: Union[BinaryIO, io.BufferedReader]) -> bool:
result = MAGIC_7Z == fp.read(len(MAGIC_7Z))[:len(MAGIC_7Z)]
fp.seek(-len(MAGIC_7Z), 1)
return result
def _get_method_names(self) -> str:
methods_names = [] # type: List[str]
for folder in self.header.main_streams.unpackinfo.folders:
methods_names += get_methods_names(folder.coders)
return ', '.join(x for x in methods_names)
def _test_digest_raw(self, pos: int, size: int, crc: int) -> bool:
self.fp.seek(pos)
remaining_size = size
digest = None
while remaining_size > 0:
block = min(READ_BLOCKSIZE, remaining_size)
digest = calculate_crc32(self.fp.read(block), digest)
remaining_size -= block
return digest == crc
def _test_pack_digest(self) -> bool:
self._reset_worker()
crcs = self.header.main_streams.packinfo.crcs
if crcs is not None and len(crcs) > 0:
# check packed stream's crc
for i, p in enumerate(self.header.main_streams.packinfo.packpositions):
if not self._test_digest_raw(p, self.header.main_streams.packinfo.packsizes[i], crcs[i]):
return False
return True
def _test_unpack_digest(self) -> bool:
self._reset_worker()
for f in self.files:
self.worker.register_filelike(f.id, None)
try:
self.worker.extract(self.fp, parallel=(not self.password_protected)) # TODO: print progress
except Bad7zFile:
return False
else:
return True
def _test_digests(self) -> bool:
if self._test_pack_digest():
if self._test_unpack_digest():
return True
return False
def _prepare_write(self) -> None:
self.sig_header = SignatureHeader()
self.sig_header._write_skelton(self.fp)
self.afterheader = self.fp.tell()
self.folder.totalin = 1
self.folder.totalout = 1
self.folder.bindpairs = []
self.folder.unpacksizes = []
self.header = Header.build_header([self.folder])
def _write_archive(self):
self.worker.archive(self.fp, self.folder, deref=self.dereference)
# Write header and update signature header
(header_pos, header_len, header_crc) = self.header.write(self.fp, self.afterheader,
encoded=self.encoded_header_mode)
self.sig_header.nextheaderofs = header_pos - self.afterheader
self.sig_header.calccrc(header_len, header_crc)
self.sig_header.write(self.fp)
return
def _is_solid(self):
for f in self.header.main_streams.substreamsinfo.num_unpackstreams_folders:
if f > 1:
return True
return False
def _var_release(self):
self._dict = None
self.files = None
self.folder = None
self.header = None
self.worker = None
self.sig_header = None
@staticmethod
def _make_file_info(target: pathlib.Path, arcname: Optional[str] = None, dereference=False) -> Dict[str, Any]:
f = {} # type: Dict[str, Any]
f['origin'] = target
if arcname is not None:
f['filename'] = pathlib.Path(arcname).as_posix()
else:
f['filename'] = target.as_posix()
if os.name == 'nt':
fstat = target.lstat()
if target.is_symlink():
if dereference:
fstat = target.stat()
if stat.S_ISDIR(fstat.st_mode):
f['emptystream'] = True
f['attributes'] = fstat.st_file_attributes & FILE_ATTRIBUTE_WINDOWS_MASK # type: ignore # noqa
else:
f['emptystream'] = False
f['attributes'] = stat.FILE_ATTRIBUTE_ARCHIVE # type: ignore # noqa
f['uncompressed'] = fstat.st_size
else:
f['emptystream'] = False
f['attributes'] = fstat.st_file_attributes & FILE_ATTRIBUTE_WINDOWS_MASK # type: ignore # noqa
# f['attributes'] |= stat.FILE_ATTRIBUTE_REPARSE_POINT # type: ignore # noqa
elif target.is_dir():
f['emptystream'] = True
f['attributes'] = fstat.st_file_attributes & FILE_ATTRIBUTE_WINDOWS_MASK # type: ignore # noqa
elif target.is_file():
f['emptystream'] = False
f['attributes'] = stat.FILE_ATTRIBUTE_ARCHIVE # type: ignore # noqa
f['uncompressed'] = fstat.st_size
else:
fstat = target.lstat()
if target.is_symlink():
if dereference:
fstat = target.stat()
if stat.S_ISDIR(fstat.st_mode):
f['emptystream'] = True
f['attributes'] = stat.FILE_ATTRIBUTE_DIRECTORY # type: ignore # noqa
f['attributes'] |= FILE_ATTRIBUTE_UNIX_EXTENSION | (stat.S_IFDIR << 16)
f['attributes'] |= (stat.S_IMODE(fstat.st_mode) << 16)
else:
f['emptystream'] = False
f['attributes'] = stat.FILE_ATTRIBUTE_ARCHIVE # type: ignore # noqa
f['attributes'] |= FILE_ATTRIBUTE_UNIX_EXTENSION | (stat.S_IMODE(fstat.st_mode) << 16)
else:
f['emptystream'] = False
f['attributes'] = stat.FILE_ATTRIBUTE_ARCHIVE | stat.FILE_ATTRIBUTE_REPARSE_POINT # type: ignore # noqa
f['attributes'] |= FILE_ATTRIBUTE_UNIX_EXTENSION | (stat.S_IFLNK << 16)
f['attributes'] |= (stat.S_IMODE(fstat.st_mode) << 16)
elif target.is_dir():
f['emptystream'] = True
f['attributes'] = stat.FILE_ATTRIBUTE_DIRECTORY # type: ignore # noqa
f['attributes'] |= FILE_ATTRIBUTE_UNIX_EXTENSION | (stat.S_IFDIR << 16)
f['attributes'] |= (stat.S_IMODE(fstat.st_mode) << 16)
elif target.is_file():
f['emptystream'] = False
f['uncompressed'] = fstat.st_size
f['attributes'] = stat.FILE_ATTRIBUTE_ARCHIVE # type: ignore # noqa
f['attributes'] |= FILE_ATTRIBUTE_UNIX_EXTENSION | (stat.S_IMODE(fstat.st_mode) << 16)
f['creationtime'] = fstat.st_ctime
f['lastwritetime'] = fstat.st_mtime
f['lastaccesstime'] = fstat.st_atime
return f
# --------------------------------------------------------------------------
# The public methods which SevenZipFile provides:
def getnames(self) -> List[str]:
"""Return the members of the archive as a list of their names. It has
the same order as the list returned by getmembers().
"""
return list(map(lambda x: x.filename, self.files))
def archiveinfo(self) -> ArchiveInfo:
fstat = os.stat(self.filename)
uncompressed = 0
for f in self.files:
uncompressed += f.uncompressed_size
return ArchiveInfo(self.filename, fstat.st_size, self.header.size, self._get_method_names(),
self._is_solid(), len(self.header.main_streams.unpackinfo.folders),
uncompressed)
def list(self) -> List[FileInfo]:
"""Returns contents information """
alist = [] # type: List[FileInfo]
creationtime = None # type: Optional[datetime.datetime]
for f in self.files:
if f.lastwritetime is not None:
creationtime = filetime_to_dt(f.lastwritetime)
alist.append(FileInfo(f.filename, f.compressed, f.uncompressed_size, f.archivable, f.is_directory,
creationtime))
return alist
def test(self) -> bool:
"""Test archive using CRC digests."""
return self._test_digests()
def readall(self) -> Optional[Dict[str, IO[Any]]]:
return self._extract(path=None, return_dict=True)
def extractall(self, path: Optional[Any] = None, callback: Optional[ExtractCallback] = None) -> None:
"""Extract all members from the archive to the current working
directory and set owner, modification time and permissions on
directories afterwards. `path' specifies a different directory
to extract to.
"""
self._extract(path=path, return_dict=False, callback=callback)
def read(self, targets: Optional[List[str]] = None) -> Optional[Dict[str, IO[Any]]]:
return self._extract(path=None, targets=targets, return_dict=True)
def extract(self, path: Optional[Any] = None, targets: Optional[List[str]] = None) -> None:
self._extract(path, targets, return_dict=False)
def _extract(self, path: Optional[Any] = None, targets: Optional[List[str]] = None,
return_dict: bool = False, callback: Optional[ExtractCallback] = None) -> Optional[Dict[str, IO[Any]]]:
if callback is not None and not isinstance(callback, ExtractCallback):
raise ValueError('Callback specified is not a subclass of py7zr.callbacks.ExtractCallback class')
elif callback is not None:
self.reporterd = threading.Thread(target=self.reporter, args=(callback,), daemon=True)
self.reporterd.start()
target_junction = [] # type: List[pathlib.Path]
target_sym = [] # type: List[pathlib.Path]
target_files = [] # type: List[Tuple[pathlib.Path, Dict[str, Any]]]
target_dirs = [] # type: List[pathlib.Path]
if path is not None:
if isinstance(path, str):
path = pathlib.Path(path)
try:
if not path.exists():
path.mkdir(parents=True)
else:
pass
except OSError as e:
if e.errno == errno.EEXIST and path.is_dir():
pass
else:
raise e
fnames = [] # type: List[str] # check duplicated filename in one archive?
self.q.put(('pre', None, None))
for f in self.files:
# TODO: sanity check
# check whether f.filename with invalid characters: '../'
if f.filename.startswith('../'):
raise Bad7zFile
# When archive has a multiple files which have same name
# To guarantee order of archive, multi-thread decompression becomes off.
# Currently always overwrite by latter archives.
# TODO: provide option to select overwrite or skip.
if f.filename not in fnames:
outname = f.filename
else:
i = 0
while True:
outname = f.filename + '_%d' % i
if outname not in fnames:
break
fnames.append(outname)
if path is not None:
outfilename = path.joinpath(outname)
else:
outfilename = pathlib.Path(outname)
if os.name == 'nt':
if outfilename.is_absolute():
# hack for microsoft windows path length limit < 255
outfilename = pathlib.WindowsPath('\\\\?\\' + str(outfilename))
if targets is not None and f.filename not in targets:
self.worker.register_filelike(f.id, None)
continue
if f.is_directory:
if not outfilename.exists():
target_dirs.append(outfilename)
target_files.append((outfilename, f.file_properties()))
else:
pass
elif f.is_socket:
pass
elif return_dict:
fname = outfilename.as_posix()
_buf = io.BytesIO()
self._dict[fname] = _buf
self.worker.register_filelike(f.id, MemIO(_buf))
elif f.is_symlink:
target_sym.append(outfilename)
try:
if outfilename.exists():
outfilename.unlink()
except OSError as ose:
if ose.errno not in [errno.ENOENT]:
raise
self.worker.register_filelike(f.id, outfilename)
elif f.is_junction:
target_junction.append(outfilename)
self.worker.register_filelike(f.id, outfilename)
else:
self.worker.register_filelike(f.id, outfilename)
target_files.append((outfilename, f.file_properties()))
for target_dir in sorted(target_dirs):
try:
target_dir.mkdir()
except FileExistsError:
if target_dir.is_dir():
# skip rare case
pass
elif target_dir.is_file():
raise Exception("Directory name is existed as a normal file.")
else:
raise Exception("Directory making fails on unknown condition.")
if callback is not None:
self.worker.extract(self.fp, parallel=(not self.password_protected and not self._filePassed), q=self.q)
else:
self.worker.extract(self.fp, parallel=(not self.password_protected and not self._filePassed))
self.q.put(('post', None, None))
if return_dict:
return self._dict
else:
# create symbolic links on target path as a working directory.
# if path is None, work on current working directory.
for t in target_sym:
sym_dst = t.resolve()
with sym_dst.open('rb') as b:
sym_src = b.read().decode(encoding='utf-8') # symlink target name stored in utf-8
sym_dst.unlink() # unlink after close().
sym_dst.symlink_to(pathlib.Path(sym_src))
# create junction point only on windows platform
if sys.platform.startswith('win'):
for t in target_junction:
junction_dst = t.resolve()
with junction_dst.open('rb') as b:
junction_target = pathlib.Path(b.read().decode(encoding='utf-8'))
junction_dst.unlink()
_winapi.CreateJunction(junction_target, str(junction_dst)) # type: ignore # noqa
# set file properties
for o, p in target_files:
self._set_file_property(o, p)
return None
def reporter(self, callback: ExtractCallback):
while True:
try:
item: Optional[Tuple[str, str, str]] = self.q.get(timeout=1)
except queue.Empty:
pass
else:
if item is None:
break
elif item[0] == 's':
callback.report_start(item[1], item[2])
elif item[0] == 'e':
callback.report_end(item[1], item[2])
elif item[0] == 'pre':
callback.report_start_preparation()
elif item[0] == 'post':
callback.report_postprocess()
elif item[0] == 'w':
callback.report_warning(item[1])
else:
pass
self.q.task_done()
def writeall(self, path: Union[pathlib.Path, str], arcname: Optional[str] = None):
"""Write files in target path into archive."""
if isinstance(path, str):
path = pathlib.Path(path)
if not path.exists():
raise ValueError("specified path does not exist.")
if path.is_dir() or path.is_file():
self._writeall(path, arcname)
else:
raise ValueError("specified path is not a directory or a file")
def _writeall(self, path, arcname):
try:
if path.is_symlink() and not self.dereference:
self.write(path, arcname)
elif path.is_file():
self.write(path, arcname)
elif path.is_dir():
if not path.samefile('.'):
self.write(path, arcname)
for nm in sorted(os.listdir(str(path))):
arc = os.path.join(arcname, nm) if arcname is not None else None
self._writeall(path.joinpath(nm), arc)
else:
return # pathlib ignores ELOOP and return False for is_*().
except OSError as ose:
if self.dereference and ose.errno in [errno.ELOOP]:
return # ignore ELOOP here, this resulted to stop looped symlink reference.
elif self.dereference and sys.platform == 'win32' and ose.errno in [errno.ENOENT]:
return # ignore ENOENT which is happened when a case of ELOOP on windows.
else:
raise
def write(self, file: Union[pathlib.Path, str], arcname: Optional[str] = None):
"""Write single target file into archive(Not implemented yet)."""
if isinstance(file, str):
path = pathlib.Path(file)
elif isinstance(file, pathlib.Path):
path = file
else:
raise ValueError("Unsupported file type.")
file_info = self._make_file_info(path, arcname, self.dereference)
self.files.append(file_info)
def close(self):
"""Flush all the data into archive and close it.
When close py7zr start reading target and writing actual archive file.
"""
if 'w' in self.mode:
self._write_archive()
if 'r' in self.mode:
if self.reporterd is not None:
self.q.put_nowait(None)
self.reporterd.join(1)
if self.reporterd.is_alive():
raise InternalError("Progress report thread terminate error.")
self.reporterd = None
self._fpclose()
self._var_release()
def reset(self) -> None:
"""When read mode, it reset file pointer, decompress worker and decompressor"""
if self.mode == 'r':
self._reset_worker()
self._reset_decompressor()
# --------------------
# exported functions
# --------------------
def is_7zfile(file: Union[BinaryIO, str, pathlib.Path]) -> bool:
"""Quickly see if a file is a 7Z file by checking the magic number.
The file argument may be a filename or file-like object too.
"""
result = False
try:
if isinstance(file, io.IOBase) and hasattr(file, "read"):
result = SevenZipFile._check_7zfile(file) # type: ignore # noqa
elif isinstance(file, str):
with open(file, 'rb') as fp:
result = SevenZipFile._check_7zfile(fp)
elif isinstance(file, pathlib.Path) or isinstance(file, pathlib.PosixPath) or \
isinstance(file, pathlib.WindowsPath):
with file.open(mode='rb') as fp: # type: ignore # noqa
result = SevenZipFile._check_7zfile(fp)
else:
raise TypeError('invalid type: file should be str, pathlib.Path or BinaryIO, but {}'.format(type(file)))
except OSError:
pass
return result
def unpack_7zarchive(archive, path, extra=None):
"""Function for registering with shutil.register_unpack_format()"""
arc = SevenZipFile(archive)
arc.extractall(path)
arc.close()
def pack_7zarchive(base_name, base_dir, owner=None, group=None, dry_run=None, logger=None):
"""Function for registering with shutil.register_archive_format()"""
target_name = '{}.7z'.format(base_name)
archive = SevenZipFile(target_name, mode='w')
archive.writeall(path=base_dir)
archive.close()

@ -0,0 +1,174 @@
import pathlib
import stat
import sys
from logging import getLogger
from typing import Union
if sys.platform == "win32":
import ctypes
from ctypes.wintypes import BOOL, DWORD, HANDLE, LPCWSTR, LPDWORD, LPVOID, LPWSTR
_stdcall_libraries = {}
_stdcall_libraries['kernel32'] = ctypes.WinDLL('kernel32')
CloseHandle = _stdcall_libraries['kernel32'].CloseHandle
CreateFileW = _stdcall_libraries['kernel32'].CreateFileW
DeviceIoControl = _stdcall_libraries['kernel32'].DeviceIoControl
GetFileAttributesW = _stdcall_libraries['kernel32'].GetFileAttributesW
OPEN_EXISTING = 3
GENERIC_READ = 2147483648
FILE_FLAG_OPEN_REPARSE_POINT = 0x00200000
FSCTL_GET_REPARSE_POINT = 0x000900A8
FILE_FLAG_BACKUP_SEMANTICS = 0x02000000
IO_REPARSE_TAG_MOUNT_POINT = 0xA0000003
IO_REPARSE_TAG_SYMLINK = 0xA000000C
MAXIMUM_REPARSE_DATA_BUFFER_SIZE = 16 * 1024
def _check_bit(val: int, flag: int) -> bool:
return bool(val & flag == flag)
class SymbolicLinkReparseBuffer(ctypes.Structure):
""" Implementing the below in Python:
typedef struct _REPARSE_DATA_BUFFER {
ULONG ReparseTag;
USHORT ReparseDataLength;
USHORT Reserved;
union {
struct {
USHORT SubstituteNameOffset;
USHORT SubstituteNameLength;
USHORT PrintNameOffset;
USHORT PrintNameLength;
ULONG Flags;
WCHAR PathBuffer[1];
} SymbolicLinkReparseBuffer;
struct {
USHORT SubstituteNameOffset;
USHORT SubstituteNameLength;
USHORT PrintNameOffset;
USHORT PrintNameLength;
WCHAR PathBuffer[1];
} MountPointReparseBuffer;
struct {
UCHAR DataBuffer[1];
} GenericReparseBuffer;
} DUMMYUNIONNAME;
} REPARSE_DATA_BUFFER, *PREPARSE_DATA_BUFFER;
"""
# See https://docs.microsoft.com/en-us/windows-hardware/drivers/ddi/content/ntifs/ns-ntifs-_reparse_data_buffer
_fields_ = [
('flags', ctypes.c_ulong),
('path_buffer', ctypes.c_byte * (MAXIMUM_REPARSE_DATA_BUFFER_SIZE - 20))
]
class MountReparseBuffer(ctypes.Structure):
_fields_ = [
('path_buffer', ctypes.c_byte * (MAXIMUM_REPARSE_DATA_BUFFER_SIZE - 16)),
]
class ReparseBufferField(ctypes.Union):
_fields_ = [
('symlink', SymbolicLinkReparseBuffer),
('mount', MountReparseBuffer)
]
class ReparseBuffer(ctypes.Structure):
_anonymous_ = ("u",)
_fields_ = [
('reparse_tag', ctypes.c_ulong),
('reparse_data_length', ctypes.c_ushort),
('reserved', ctypes.c_ushort),
('substitute_name_offset', ctypes.c_ushort),
('substitute_name_length', ctypes.c_ushort),
('print_name_offset', ctypes.c_ushort),
('print_name_length', ctypes.c_ushort),
('u', ReparseBufferField)
]
def is_reparse_point(path: Union[str, pathlib.Path]) -> bool:
GetFileAttributesW.argtypes = [LPCWSTR]
GetFileAttributesW.restype = DWORD
return _check_bit(GetFileAttributesW(str(path)), stat.FILE_ATTRIBUTE_REPARSE_POINT)
def readlink(path: Union[str, pathlib.Path]) -> Union[str, pathlib.WindowsPath]:
# FILE_FLAG_OPEN_REPARSE_POINT alone is not enough if 'path'
# is a symbolic link to a directory or a NTFS junction.
# We need to set FILE_FLAG_BACKUP_SEMANTICS as well.
# See https://docs.microsoft.com/en-us/windows/desktop/api/fileapi/nf-fileapi-createfilea
# description from _winapi.c:601
# /* REPARSE_DATA_BUFFER usage is heavily under-documented, especially for
# junction points. Here's what I've learned along the way:
# - A junction point has two components: a print name and a substitute
# name. They both describe the link target, but the substitute name is
# the physical target and the print name is shown in directory listings.
# - The print name must be a native name, prefixed with "\??\".
# - Both names are stored after each other in the same buffer (the
# PathBuffer) and both must be NUL-terminated.
# - There are four members defining their respective offset and length
# inside PathBuffer: SubstituteNameOffset, SubstituteNameLength,
# PrintNameOffset and PrintNameLength.
# - The total size we need to allocate for the REPARSE_DATA_BUFFER, thus,
# is the sum of:
# - the fixed header size (REPARSE_DATA_BUFFER_HEADER_SIZE)
# - the size of the MountPointReparseBuffer member without the PathBuffer
# - the size of the prefix ("\??\") in bytes
# - the size of the print name in bytes
# - the size of the substitute name in bytes
# - the size of two NUL terminators in bytes */
target_is_path = isinstance(path, pathlib.Path)
if target_is_path:
target = str(path)
else:
target = path
CreateFileW.argtypes = [LPWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE]
CreateFileW.restype = HANDLE
DeviceIoControl.argtypes = [HANDLE, DWORD, LPVOID, DWORD, LPVOID, DWORD, LPDWORD, LPVOID]
DeviceIoControl.restype = BOOL
handle = HANDLE(CreateFileW(target, GENERIC_READ, 0, None, OPEN_EXISTING,
FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OPEN_REPARSE_POINT, 0))
buf = ReparseBuffer()
ret = DWORD(0)
status = DeviceIoControl(handle, FSCTL_GET_REPARSE_POINT, None, 0, ctypes.byref(buf),
MAXIMUM_REPARSE_DATA_BUFFER_SIZE, ctypes.byref(ret), None)
CloseHandle(handle)
if not status:
logger = getLogger(__file__)
logger.error("Failed IOCTL access to REPARSE_POINT {})".format(target))
raise ValueError("not a symbolic link or access permission violation")
if buf.reparse_tag == IO_REPARSE_TAG_SYMLINK:
offset = buf.substitute_name_offset
ending = offset + buf.substitute_name_length
rpath = bytearray(buf.symlink.path_buffer)[offset:ending].decode('UTF-16-LE')
elif buf.reparse_tag == IO_REPARSE_TAG_MOUNT_POINT:
offset = buf.substitute_name_offset
ending = offset + buf.substitute_name_length
rpath = bytearray(buf.mount.path_buffer)[offset:ending].decode('UTF-16-LE')
else:
raise ValueError("not a symbolic link")
# on posixmodule.c:7859 in py38, we do that
# ```
# else if (rdb->ReparseTag == IO_REPARSE_TAG_MOUNT_POINT)
# {
# name = (wchar_t *)((char*)rdb->MountPointReparseBuffer.PathBuffer +
# rdb->MountPointReparseBuffer.SubstituteNameOffset);
# nameLen = rdb->MountPointReparseBuffer.SubstituteNameLength / sizeof(wchar_t);
# }
# else
# {
# PyErr_SetString(PyExc_ValueError, "not a symbolic link");
# }
# if (nameLen > 4 && wcsncmp(name, L"\\??\\", 4) == 0) {
# /* Our buffer is mutable, so this is okay */
# name[1] = L'\\';
# }
# ```
# so substitute prefix here.
if rpath.startswith('\\??\\'):
rpath = '\\\\' + rpath[2:]
if target_is_path:
return pathlib.WindowsPath(rpath)
else:
return rpath
Loading…
Cancel
Save