#!/usr/bin/env python3
#
# __init__.py
"""
Some handy archive helpers for Python.
"""
#
# Copyright © 2021 Dominic Davis-Foster <dominic@davis-foster.co.uk>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
# OR OTHER DEALINGS IN THE SOFTWARE.
#
# stdlib
import datetime
import os
import pathlib
import shutil
import sys
import tarfile
import zipfile
from typing import IO, TYPE_CHECKING, Callable, Iterable, Optional, Type, TypeVar, Union, cast, no_type_check
__author__: str = "Dominic Davis-Foster"
__copyright__: str = "2021 Dominic Davis-Foster"
__license__: str = "MIT License"
__version__: str = "0.2.0"
__email__: str = "dominic@davis-foster.co.uk"
__all__ = ["unpack_archive", "TarFile", "ZipFile", "is_tarfile"]
_Self = TypeVar("_Self")
PathLike = Union[str, pathlib.Path, os.PathLike]
if "wheel" not in shutil._UNPACK_FORMATS: # type: ignore[attr-defined]
shutil.register_unpack_format(
name="wheel",
extensions=[".whl"],
function=shutil._unpack_zipfile, # type: ignore[attr-defined]
)
if TYPE_CHECKING or not hasattr(tarfile, "FilterError"): # pragma: nocover
def fully_trusted_filter(member, dest_path): # noqa: MAN001,MAN002
return member
def tar_filter(member, dest_path): # noqa: MAN001,MAN002
return member
def data_filter(member, dest_path): # noqa: MAN001,MAN002
return member
else: # pragma: nocover
# stdlib
from tarfile import data_filter as data_filter
from tarfile import fully_trusted_filter as fully_trusted_filter
from tarfile import tar_filter as tar_filter
[docs]def unpack_archive(
filename: PathLike,
extract_dir: Optional[PathLike] = None,
format: Optional[str] = None, # noqa: A002 # pylint: disable=redefined-builtin
) -> None:
"""
Unpack an archive.
:param filename: The name of the archive.
:param extract_dir: The name of the target directory, where the archive is unpacked.
If not provided, the current working directory is used.
:param format: The archive format: one of ``'zip'``, ``'tar'``, ``'gztar'``, ``'bztar'``, or ``'xztar'``,
or any other format registered through :func:`shutil.register_unpack_format`.
If not provided, ``unpack_archive`` will use the filename extension and see if
an unpacker was registered for that extension.
If no unpacker is found, a :exc:`ValueError` is raised.
"""
if sys.version_info < (3, 7): # pragma: no cover (py37+)
if extract_dir is not None:
extract_dir = os.fspath(extract_dir)
filename = os.fspath(filename)
shutil.unpack_archive(filename, extract_dir, format)
[docs]class TarFile(tarfile.TarFile):
"""
Subclass of :class:`tarfile.TarFile` with additional methods.
"""
closed: bool
offset: int
[docs] def read_text(
self,
member: Union[str, tarfile.TarInfo],
*,
normalize_nl: bool = False,
) -> str:
r"""
Returns the content of the given file as a string.
:param member:
:param normalize_nl: If :py:obj:`True`, line endings are normalized to ``\n`` (LF).
:raises FileNotFoundError: If the file is not found in the archive.
:rtype:
.. versionadded:: 0.2.0 Added the ``normalize_nl`` option.
"""
return _normalize_nl(self.read_bytes(member).decode("UTF-8"), normalize_nl)
[docs] def read_bytes(self, member: Union[str, tarfile.TarInfo]) -> bytes:
"""
Returns the content of the given file as bytes.
:param member:
:raises FileNotFoundError: If the file is not found in the archive.
"""
with self.extractfile(member) as fd:
return fd.read()
[docs] def write_file(
self,
filename: PathLike,
arcname: Optional[PathLike] = None,
mtime: Optional[datetime.datetime] = None,
) -> None:
"""
Add the file ``filename`` to the archive under the name ``arcname``.
:param filename:
:param arcname: An alternative name for the file in the archive.
:param mtime: The last modified time of the file.
Defaults to the value obtained from :func:`os.stat`.
:no-default mtime:
"""
if not os.path.isfile(filename):
raise IsADirectoryError("'TarFile.write_file' only supports files.")
if mtime is None:
return self.add(filename, arcname, recursive=False)
if arcname is None: # pragma: no cover
arcname = filename
if isinstance(arcname, os.PathLike):
arcname = os.fspath(arcname)
self._check("awx") # type: ignore[attr-defined]
# Skip if somebody tries to archive the archive...
if self.name is not None and os.path.abspath(filename) == self.name: # pragma: no cover
self._dbg(2, "tarfile: Skipped %r" % filename) # type: ignore[attr-defined]
return
self._dbg(1, filename) # type: ignore[attr-defined]
# Create a TarInfo object from the file.
tarinfo = self.gettarinfo(os.fspath(filename), arcname)
tarinfo.mtime = mtime.timestamp() # type: ignore[assignment]
if tarinfo is None: # pragma: no cover
self._dbg(1, f"tarfile: Unsupported type {filename!r}")
return
# Append the tar header and data to the archive.
with open(filename, "rb") as f:
self.addfile(tarinfo, f)
def __enter__(self: _Self) -> _Self:
return super().__enter__() # type: ignore[misc]
@classmethod # noqa: A003 # pylint: disable=redefined-builtin
def open( # type: ignore[override] # noqa: D102
cls: Type[_Self],
name: Optional[PathLike] = None,
*args,
**kwargs,
) -> _Self:
if name is not None:
name = os.fspath(name)
return super().open( # type: ignore[misc]
name,
*args,
**kwargs,
)
[docs]class ZipFile(zipfile.ZipFile):
"""
Subclass of :class:`zipfile.ZipFile` with additional methods.
"""
[docs] def read_text(
self,
member: Union[str, zipfile.ZipInfo],
pwd: Union[str, bytes, None] = None,
*,
normalize_nl: bool = False
) -> str:
r"""
Returns the content of the given file as a string.
:param member:
:param pwd: The password to decrypt files.
:param normalize_nl: If :py:obj:`True`, line endings are normalized to ``\n`` (LF).
:raises FileNotFoundError: If the file is not found in the archive.
:rtype:
.. versionadded:: 0.2.0 Added the ``normalize_nl`` option.
"""
return _normalize_nl(self.read_bytes(member, pwd=pwd).decode("UTF-8"), normalize_nl)
[docs] def read_bytes(
self,
member: Union[str, zipfile.ZipInfo],
pwd: Union[str, bytes, None] = None,
) -> bytes:
"""
Returns the content of the given file as bytes.
:param member:
:param pwd: The password to decrypt files.
:raises FileNotFoundError: If the file is not found in the archive.
"""
with self.extractfile(member, pwd=pwd) as fd:
return fd.read()
[docs] def write_file(
self,
filename: PathLike,
arcname: Optional[PathLike] = None,
mtime: Optional[datetime.datetime] = None,
) -> None:
"""
Put the bytes from ``filename`` into the archive under the name ``arcname``.
:param filename:
:param arcname: An alternative name for the file in the archive.
:param mtime: The last modified time of the file.
Defaults to the value obtained from :func:`os.stat`.
:no-default mtime:
"""
if self._writing: # type: ignore[attr-defined]
raise ValueError("Can't write to ZIP archive while an open writing handle exists")
if not os.path.isfile(filename):
raise IsADirectoryError("'ZipFile.write_file' only supports files.")
if mtime is None:
return self.write(filename, arcname)
if arcname is None:
arcname = os.fspath(filename)
else:
arcname = os.fspath(arcname)
arcname = os.path.normpath(os.path.splitdrive(arcname)[1])
while arcname[0] in (os.sep, os.altsep):
arcname = arcname[1:]
zinfo = zipfile.ZipInfo(arcname, mtime.timetuple()[:6])
zinfo.compress_type = self.compression
if sys.version_info >= (3, 7): # pragma: no cover (<py37)
zinfo._compresslevel = self.compresslevel # type: ignore[attr-defined]
st = os.stat(filename)
zinfo.external_attr = (st.st_mode & 0xFFFF) << 16 # Unix attributes
zinfo.file_size = st.st_size
with open(filename, "rb") as src, self.open(zinfo, 'w') as dest:
shutil.copyfileobj(src, dest, 1024 * 8)
def __enter__(self: _Self) -> _Self:
return super().__enter__() # type: ignore[misc]
[docs]def is_tarfile(name: Union[PathLike, IO[bytes]]) -> bool:
"""
Return :py:obj:`True` if ``name`` points to a tar archive that :mod:`tarfile` can handle,
else return :py:obj:`False`.
:param name: A string, file, or file-like object.
""" # noqa: D400
try:
if hasattr(name, "read"):
t = TarFile.open(fileobj=name)
else:
t = TarFile.open(cast(PathLike, name))
t.close()
return True
except tarfile.TarError:
return False
def _normalize_nl(text: str, enable: bool) -> str:
if enable:
return text.replace("\r\n", '\n').replace('\r', '\n')
else:
return text