Skip to content

Commit

Permalink
Preserve ZipInfo in writestr_* and from_wheelfile
Browse files Browse the repository at this point in the history
The following `WheelFile` methods will now preserve `ZipInfo` attributes
if a `ZipInfo` object is given to them:

- `writestr_data`
- `writestr_distinfo`

Previously they have not been doing so.

The `WheelFile.from_wheelfile` constructor will now also preserve
distinfo attributes for the files inside data and distinfo archive
directories.

Thanks to Maxime Boissonneault (GH @mboisson) for the code he sent in
his pull request for this, which served as a guidepost for this
implementation's tests and some of the details.
  • Loading branch information
MrMino committed Jul 18, 2024
1 parent e808842 commit 7b69c5a
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 27 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Changed
- **Dropped support of Python versions lower than Python 3.9.**
- The `WheelFile.writestr_*` methods will now preserve as `ZipInfo` attributes,
if a `ZipInfo` object has been passed instead of the filename.
- `WheelFile.from_wheelfile` constructor will now preserve `ZipInfo`
attributes of the files from distinfo and data directories of the original
archive. This includes file permissions.

## [0.0.8] - 2021-08-03
### Changed
Expand Down
67 changes: 67 additions & 0 deletions tests/test_wheelfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -1077,3 +1077,70 @@ def test_writestr_distinfo_default_compresslevel_is_from_init(self, buf):
compresslevel=9)
wf.writestr_distinfo('file', b'data')
assert wf.infolist()[0]._compresslevel == 9


class TestZipinfoAttributePreserval:

preserved_fields = pytest.mark.parametrize("field, value", [
("date_time", (2000, 1, 2, 3, 4, 2)),
("compress_type", ZIP_BZIP2),
("comment", b"Wubba lubba dub dub"),
("extra", bytes([0x00, 0x00, 0x04, 0x00] + [0xFF]*4)),
("create_system", 4),
("create_version", 31),
("extract_version", 42),
("internal_attr", 0x02),
("external_attr", 0x02),

# Failing / impossible:

# ZIP stores timestamps with two seconds of granularity
# ("date_time", (2000, 1, 2, 3, 4, 1)),

# Not preservable without changing other values
# ("flag_bits", 0xFFFFFF),

# Not supported by Python's zipfile
# ("volume", 0x01),
])

@preserved_fields
def test_writestr_propagates_zipinfo_fields(self, field, value, wf, buf):
arcpath = "some/archive/path"
zi = ZipInfo(arcpath)
setattr(zi, field, value)

wf.writestr(zi, "_")
wf.close()

with WheelFile(buf, distname="_", version='0') as wf:
assert getattr(wf.zipfile.getinfo(arcpath), field) == value

@preserved_fields
def test_writestr_data_propagates_zipinfo_fields(self, field, value, wf, buf):
data_path = "some/data"
section = "section"
zi = ZipInfo(data_path)
setattr(zi, field, value)

wf.writestr_data(section, zi, "_")
wf.close()

arcpath = wf.data_dirname + "/" + section + "/" + data_path

with WheelFile(buf, distname="_", version='0') as wf:
assert getattr(wf.zipfile.getinfo(arcpath), field) == value

@preserved_fields
def test_writestr_distinfo_propagates_zipinfo_fields(self, field, value, wf, buf):
data_path = "some/metadata"
zi = ZipInfo(data_path)
setattr(zi, field, value)

wf.writestr_distinfo(zi, "_")
wf.close()

arcpath = wf.distinfo_dirname + "/" + data_path

with WheelFile(buf, distname="_", version='0') as wf:
assert getattr(wf.zipfile.getinfo(arcpath), field) == value
43 changes: 33 additions & 10 deletions tests/test_wheelfile_cloning.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import io
import os
from zipfile import ZIP_DEFLATED, ZIP_BZIP2, ZIP_STORED, ZipInfo
from zipfile import ZIP_DEFLATED, ZIP_BZIP2, ZIP_LZMA, ZIP_STORED, ZipInfo

import pytest

Expand Down Expand Up @@ -211,9 +211,37 @@ def test_data_is_copied(self, wf, buf):
for arcname, data in archive.items():
assert cwf.zipfile.read(arcname) == data

PRESERVED_ZIPINFO_ATTRS = ['date_time', 'compress_type', 'comment',
def test_substitutes_compress_type_if_passed(self, wf, buf):
wf.writestr("file1", "", compress_type=ZIP_BZIP2)
new_compression = ZIP_LZMA

with WheelFile.from_wheelfile(wf, buf, compression=new_compression) as cwf:
assert cwf.zipfile.infolist()[0].compress_type == new_compression

def test_preserves_compress_type_if_not_passed(self, wf, buf):
old_compression = ZIP_BZIP2
wf.writestr("file1", "", compress_type=old_compression)

with WheelFile.from_wheelfile(wf, buf) as cwf:
assert cwf.zipfile.infolist()[0].compress_type == old_compression

def test_substitutes_compresslevel_if_passed(self, wf, buf):
wf.writestr("file1", "", compress_type=ZIP_BZIP2, compresslevel=5)
new_compresslevel = 7

with WheelFile.from_wheelfile(wf, buf, compression=ZIP_LZMA, compresslevel=new_compresslevel) as cwf:
assert cwf.zipfile.infolist()[0]._compresslevel == new_compresslevel

def test_preserves_compresslevel_if_not_passed(self, wf, buf):
old_compresslevel = 7
wf.writestr("file1", "", compress_type=ZIP_BZIP2, compresslevel=old_compresslevel)

with WheelFile.from_wheelfile(wf, buf) as cwf:
assert cwf.zipfile.infolist()[0]._compresslevel == old_compresslevel

PRESERVED_ZIPINFO_ATTRS = ['date_time', 'compress_type', '_compresslevel', 'comment',
'extra', 'create_system', 'create_version',
'extract_version', 'flag_bits', 'volume',
'extract_version', 'volume',
'internal_attr', 'external_attr']

def custom_zipinfo(self):
Expand All @@ -222,9 +250,8 @@ def custom_zipinfo(self):
zf.comment = b"comment"
zf.extra = b"extra"
zf.create_system = 2
zf.create_version = 21
zf.extract_version = 19
zf.flag_bits = 0o123
zf.create_version = 50
zf.extract_version = 60
zf.volume = 7
zf.internal_attr = 123
zf.external_attr = 321
Expand All @@ -240,7 +267,6 @@ def test_zip_attributes_are_preserved_writestr(self, wf, buf, attr):

assert getattr(czf, attr) == getattr(zf, attr)

@pytest.mark.xfail(reason="writestr_data does not propagate zinfo yet")
@pytest.mark.parametrize("attr", PRESERVED_ZIPINFO_ATTRS)
def test_zip_attributes_are_preserved_writestr_data(self, wf, buf, attr):
zf = self.custom_zipinfo()
Expand All @@ -251,9 +277,6 @@ def test_zip_attributes_are_preserved_writestr_data(self, wf, buf, attr):

assert getattr(czf, attr) == getattr(zf, attr)

# writestr_data does not propagate zinfo yet
# skipped because it generates lots of warnings
@pytest.mark.xfail(reason="writestr_distinfo does not propagate zinfo yet")
@pytest.mark.parametrize("attr", PRESERVED_ZIPINFO_ATTRS)
def test_zip_attributes_are_preserved_writestr_distinfo(self, wf, buf,
attr):
Expand Down
93 changes: 76 additions & 17 deletions wheelfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,42 @@ def _slots_from_params(func):
return slots


def _clone_zipinfo(zinfo: zipfile.ZipInfo, **to_replace) -> zipfile.ZipInfo:
"""Clone a ZipInfo object and update its attributes using to_replace."""

PRESERVED_ZIPINFO_ATTRS = [
"date_time",
"compress_type",
"_compresslevel",
"comment",
"extra",
"create_system",
"create_version",
"extract_version",
"volume",
"internal_attr",
"external_attr",
]

# `orig_filename` instead of `filename` is used to prevent any possibility
# of confusing ZipInfo filename normalization.
new_name = zinfo.orig_filename
if "filename" in to_replace:
new_name = to_replace["filename"]
del to_replace["filename"]

new_zinfo = zipfile.ZipInfo(filename=new_name)
for attr in PRESERVED_ZIPINFO_ATTRS:
replaced = to_replace.get(attr)

if replaced is not None:
setattr(new_zinfo, attr, replaced)
else:
setattr(new_zinfo, attr, getattr(zinfo, attr))

return new_zinfo


# TODO: accept packaging.requirements.Requirement in requires_dist, fix this in
# example, ensure such objects are converted on __str__
# TODO: reimplement using dataclasses
Expand Down Expand Up @@ -1228,7 +1264,7 @@ def from_wheelfile(
language_tag: Union[str, None, _Sentinel] = _unspecified,
abi_tag: Union[str, None, _Sentinel] = _unspecified,
platform_tag: Union[str, None, _Sentinel] = _unspecified,
compression: int = zipfile.ZIP_DEFLATED,
compression: Optional[int] = None,
allowZip64: bool = True,
compresslevel: Optional[int] = None,
strict_timestamps: bool = True,
Expand Down Expand Up @@ -1299,7 +1335,13 @@ def from_wheelfile(
turn passes them to `zipfile.ZipFile` - see `zipfile` docs for full
description on each.
Value from `wf` is *not* reused for this parameter.
Value used to construct `wf` is *not* reused for these parameters.
For `compression` and `compresslevel`, if the value is not passed,
the values from the original archive are preserved. Internally the
data is copied using `ZipFile.writestr` with `ZipInfo` attributes
of the files preserved. If the value is passed, these normally
preserved attributes are substituted.
Raises
------
Expand Down Expand Up @@ -1380,6 +1422,11 @@ def from_wheelfile(
"both objects' paths point at the same file."
)

if compression is None:
default_compression = zipfile.ZIP_DEFLATED
else:
default_compression = compression

new_wf = WheelFile(
file_or_path, mode,
distname=distname,
Expand All @@ -1388,7 +1435,7 @@ def from_wheelfile(
language_tag=language_tag,
abi_tag=abi_tag,
platform_tag=platform_tag,
compression=compression,
compression=default_compression,
allowZip64=allowZip64,
compresslevel=compresslevel,
strict_timestamps=strict_timestamps,
Expand Down Expand Up @@ -1429,19 +1476,24 @@ def from_wheelfile(
to_copy = wf.infolist()
for zinfo in to_copy:

data = wf.zipfile.read(zinfo)

arcname = zinfo.filename
arcname_head, *arcname_tail_parts = arcname.split('/')
arcname_tail = '/'.join(arcname_tail_parts)
if arcname_head == wf.distinfo_dirname:
new_arcname = new_wf.distinfo_dirname + '/' + arcname_tail
new_wf.writestr(new_arcname, wf.zipfile.read(zinfo))
continue
if arcname_head == wf.data_dirname:
zinfo = _clone_zipinfo(zinfo, filename=new_arcname)
elif arcname_head == wf.data_dirname:
new_arcname = new_wf.data_dirname + '/' + arcname_tail
new_wf.writestr(new_arcname, wf.zipfile.read(zinfo))
continue
zinfo = _clone_zipinfo(zinfo, filename=new_arcname)

new_wf.writestr(zinfo, wf.zipfile.read(zinfo))
new_wf.writestr(
zinfo,
data,
compress_type=compression,
compresslevel=compresslevel,
)

return new_wf

Expand Down Expand Up @@ -1966,7 +2018,6 @@ def _os_walk_path_to_arcpath(prefix: str, directory: str,
path = os.path.join(arcname, directory[len(prefix):], stem)
return path

# TODO: Make sure fields of given ZipInfo objects are propagated
def writestr(self,
zinfo_or_arcname: Union[zipfile.ZipInfo, str],
data: Union[bytes, str],
Expand Down Expand Up @@ -2090,7 +2141,6 @@ def write_data(self, filename: Union[str, Path],

# TODO: drive letter should be stripped from the arcname the same way
# ZipInfo.from_file does it
# TODO: Make sure fields of given ZipInfo objects are propagated
def writestr_data(self, section: str,
zinfo_or_arcname: Union[zipfile.ZipInfo, str],
data: Union[bytes, str],
Expand Down Expand Up @@ -2140,10 +2190,14 @@ def writestr_data(self, section: str,
else zinfo_or_arcname
)

arcname = self._distinfo_path(section + '/' + arcname.lstrip('/'),
kind='data')
data_arcname = self._distinfo_path(section + '/' + arcname.lstrip('/'), kind='data')

self.writestr(arcname, data, compress_type, compresslevel)
if isinstance(zinfo_or_arcname, zipfile.ZipInfo):
zinfo_or_arcname = _clone_zipinfo(zinfo_or_arcname, filename=data_arcname)
else:
zinfo_or_arcname = data_arcname

self.writestr(zinfo_or_arcname, data, compress_type, compresslevel)

# TODO: Lazy mode should permit writing meta here
def write_distinfo(self, filename: Union[str, Path],
Expand Down Expand Up @@ -2231,7 +2285,6 @@ def write_distinfo(self, filename: Union[str, Path],
self.write(filename, arcname, compress_type, compresslevel,
recursive=recursive, skipdir=skipdir)

# TODO: Make sure fields of given ZipInfo objects are propagated
def writestr_distinfo(self, zinfo_or_arcname: Union[zipfile.ZipInfo, str],
data: Union[bytes, str],
compress_type: Optional[int] = None,
Expand Down Expand Up @@ -2288,8 +2341,14 @@ def writestr_distinfo(self, zinfo_or_arcname: Union[zipfile.ZipInfo, str],
f"Write would result in a duplicated metadata file: {arcname}."
)

arcname = self._distinfo_path(arcname.lstrip('/'))
self.writestr(arcname, data, compress_type, compresslevel)
dist_arcname = self._distinfo_path(arcname.lstrip('/'))

if isinstance(zinfo_or_arcname, zipfile.ZipInfo):
zinfo_or_arcname = _clone_zipinfo(zinfo_or_arcname, filename=dist_arcname)
else:
zinfo_or_arcname = dist_arcname

self.writestr(zinfo_or_arcname, data, compress_type, compresslevel)

@staticmethod
def _check_section(section):
Expand Down

0 comments on commit 7b69c5a

Please sign in to comment.