Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port securesystemslib.hash module #2815

Merged
merged 5 commits into from
Mar 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions tests/repository_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@
from __future__ import annotations

import datetime
import hashlib
import logging
import os
import tempfile
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
from urllib import parse

import securesystemslib.hash as sslib_hash
from securesystemslib.signer import CryptoSigner, Signer

from tuf.api.exceptions import DownloadHTTPError
Expand Down Expand Up @@ -80,6 +80,8 @@

SPEC_VER = ".".join(SPECIFICATION_VERSION)

_HASH_ALGORITHM = "sha256"


@dataclass
class FetchTracker:
Expand Down Expand Up @@ -292,9 +294,9 @@ def _compute_hashes_and_length(
self, role: str
) -> tuple[dict[str, str], int]:
data = self.fetch_metadata(role)
digest_object = sslib_hash.digest(sslib_hash.DEFAULT_HASH_ALGORITHM)
digest_object = hashlib.new(_HASH_ALGORITHM)
digest_object.update(data)
hashes = {sslib_hash.DEFAULT_HASH_ALGORITHM: digest_object.hexdigest()}
hashes = {_HASH_ALGORITHM: digest_object.hexdigest()}
return hashes, len(data)

def update_timestamp(self) -> None:
Expand Down
21 changes: 17 additions & 4 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from typing import ClassVar

from securesystemslib import exceptions as sslib_exceptions
from securesystemslib import hash as sslib_hash
from securesystemslib.signer import (
CryptoSigner,
Key,
Expand Down Expand Up @@ -896,6 +895,12 @@ def test_length_and_hash_validation(self) -> None:
# test with data as bytes
snapshot_metafile.verify_length_and_hashes(data)

# test with custom blake algorithm
snapshot_metafile.hashes = {
"blake2b-256": "963a3c31aad8e2a91cfc603fdba12555e48dd0312674ac48cce2c19c243236a1"
}
snapshot_metafile.verify_length_and_hashes(data)

# test exceptions
expected_length = snapshot_metafile.length
snapshot_metafile.length = 2345
Expand Down Expand Up @@ -958,9 +963,7 @@ def test_targetfile_from_file(self) -> None:
# Test with a non-existing file
file_path = os.path.join(self.repo_dir, Targets.type, "file123.txt")
with self.assertRaises(FileNotFoundError):
TargetFile.from_file(
file_path, file_path, [sslib_hash.DEFAULT_HASH_ALGORITHM]
)
TargetFile.from_file(file_path, file_path, ["sha256"])

# Test with an unsupported algorithm
file_path = os.path.join(self.repo_dir, Targets.type, "file1.txt")
Expand Down Expand Up @@ -990,6 +993,12 @@ def test_targetfile_from_data(self) -> None:
targetfile_from_data = TargetFile.from_data(target_file_path, data)
targetfile_from_data.verify_length_and_hashes(data)

# Test with custom blake hash algorithm
targetfile_from_data = TargetFile.from_data(
target_file_path, data, ["blake2b-256"]
)
targetfile_from_data.verify_length_and_hashes(data)

def test_metafile_from_data(self) -> None:
data = b"Inline test content"

Expand All @@ -1013,6 +1022,10 @@ def test_metafile_from_data(self) -> None:
),
)

# Test with custom blake hash algorithm
metafile = MetaFile.from_data(1, data, ["blake2b-256"])
metafile.verify_length_and_hashes(data)

def test_targetfile_get_prefixed_paths(self) -> None:
target = TargetFile(100, {"sha256": "abc", "md5": "def"}, "a/b/f.ext")
self.assertEqual(
Expand Down
78 changes: 50 additions & 28 deletions tuf/api/_payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@

import abc
import fnmatch
import hashlib
import io
import logging
import sys
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import (
Expand All @@ -21,7 +23,6 @@
)

from securesystemslib import exceptions as sslib_exceptions
from securesystemslib import hash as sslib_hash
from securesystemslib.signer import Key, Signature

from tuf.api.exceptions import LengthOrHashMismatchError, UnsignedMetadataError
Expand All @@ -34,6 +35,9 @@
_TARGETS = "targets"
_TIMESTAMP = "timestamp"

_DEFAULT_HASH_ALGORITHM = "sha256"
_BLAKE_HASH_ALGORITHM = "blake2b-256"

# We aim to support SPECIFICATION_VERSION and require the input metadata
# files to have the same major version (the first number) as ours.
SPECIFICATION_VERSION = ["1", "0", "31"]
Expand All @@ -45,6 +49,38 @@
T = TypeVar("T", "Root", "Timestamp", "Snapshot", "Targets")


def _get_digest(algo: str) -> Any: # noqa: ANN401
"""New digest helper to support custom "blake2b-256" algo name."""
if algo == _BLAKE_HASH_ALGORITHM:
return hashlib.blake2b(digest_size=32)

return hashlib.new(algo)


def _hash_bytes(data: bytes, algo: str) -> str:
"""Returns hexdigest for data using algo."""
digest = _get_digest(algo)
digest.update(data)

return digest.hexdigest()


def _hash_file(f: IO[bytes], algo: str) -> str:
"""Returns hexdigest for file using algo."""
f.seek(0)
if sys.version_info >= (3, 11):
digest = hashlib.file_digest(f, lambda: _get_digest(algo)) # type: ignore[arg-type]

else:
# Fallback for older Pythons. Chunk size is taken from the previously
# used and now deprecated `securesystemslib.hash.digest_fileobject`.
digest = _get_digest(algo)
for chunk in iter(lambda: f.read(4096), b""):
digest.update(chunk)

return digest.hexdigest()


class Signed(metaclass=abc.ABCMeta):
"""A base class for the signed part of TUF metadata.

Expand Down Expand Up @@ -664,24 +700,18 @@ def _verify_hashes(
data: bytes | IO[bytes], expected_hashes: dict[str, str]
) -> None:
"""Verify that the hash of ``data`` matches ``expected_hashes``."""
is_bytes = isinstance(data, bytes)
for algo, exp_hash in expected_hashes.items():
try:
if is_bytes:
digest_object = sslib_hash.digest(algo)
digest_object.update(data)
if isinstance(data, bytes):
observed_hash = _hash_bytes(data, algo)
else:
# if data is not bytes, assume it is a file object
digest_object = sslib_hash.digest_fileobject(data, algo)
except (
sslib_exceptions.UnsupportedAlgorithmError,
sslib_exceptions.FormatError,
) as e:
observed_hash = _hash_file(data, algo)
except (ValueError, TypeError) as e:
raise LengthOrHashMismatchError(
f"Unsupported algorithm '{algo}'"
) from e

observed_hash = digest_object.hexdigest()
if observed_hash != exp_hash:
raise LengthOrHashMismatchError(
f"Observed hash {observed_hash} does not match "
Expand Down Expand Up @@ -731,25 +761,17 @@ def _get_length_and_hashes(
hashes = {}

if hash_algorithms is None:
hash_algorithms = [sslib_hash.DEFAULT_HASH_ALGORITHM]
hash_algorithms = [_DEFAULT_HASH_ALGORITHM]

for algorithm in hash_algorithms:
try:
if isinstance(data, bytes):
digest_object = sslib_hash.digest(algorithm)
digest_object.update(data)
hashes[algorithm] = _hash_bytes(data, algorithm)
else:
digest_object = sslib_hash.digest_fileobject(
data, algorithm
)
except (
sslib_exceptions.UnsupportedAlgorithmError,
sslib_exceptions.FormatError,
) as e:
hashes[algorithm] = _hash_file(data, algorithm)
except (ValueError, TypeError) as e:
raise ValueError(f"Unsupported algorithm '{algorithm}'") from e

hashes[algorithm] = digest_object.hexdigest()

return (length, hashes)


Expand Down Expand Up @@ -832,7 +854,7 @@ def from_data(
version: Version of the metadata file.
data: Metadata bytes that the metafile represents.
hash_algorithms: Hash algorithms to create the hashes with. If not
specified, the securesystemslib default hash algorithm is used.
specified, "sha256" is used.

Raises:
ValueError: The hash algorithms list contains an unsupported
Expand Down Expand Up @@ -1150,7 +1172,7 @@ def is_delegated_path(self, target_filepath: str) -> bool:
if self.path_hash_prefixes is not None:
# Calculate the hash of the filepath
# to determine in which bin to find the target.
digest_object = sslib_hash.digest(algorithm="sha256")
digest_object = hashlib.new(name="sha256")
digest_object.update(target_filepath.encode("utf-8"))
target_filepath_hash = digest_object.hexdigest()

Expand Down Expand Up @@ -1269,7 +1291,7 @@ def get_role_for_target(self, target_filepath: str) -> str:
target_filepath: URL path to a target file, relative to a base
targets URL.
"""
hasher = sslib_hash.digest(algorithm="sha256")
hasher = hashlib.new(name="sha256")
hasher.update(target_filepath.encode("utf-8"))

# We can't ever need more than 4 bytes (32 bits).
Expand Down Expand Up @@ -1542,7 +1564,7 @@ def from_file(
targets URL.
local_path: Local path to target file content.
hash_algorithms: Hash algorithms to calculate hashes with. If not
specified the securesystemslib default hash algorithm is used.
specified, "sha256" is used.

Raises:
FileNotFoundError: The file doesn't exist.
Expand All @@ -1566,7 +1588,7 @@ def from_data(
targets URL.
data: Target file content.
hash_algorithms: Hash algorithms to create the hashes with. If not
specified the securesystemslib default hash algorithm is used.
specified, "sha256" is used.

Raises:
ValueError: The hash algorithms list contains an unsupported
Expand Down