Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ngclient rollback improvements #1524

Merged
merged 4 commits into from
Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 63 additions & 59 deletions tests/test_trusted_metadata_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,6 @@ def hashes_length_modifier(timestamp: Timestamp) -> None:
def setUp(self) -> None:
self.trusted_set = TrustedMetadataSet(self.metadata["root"])

def _root_updated_and_update_timestamp(
self, timestamp_bytes: Optional[bytes] = None
) -> None:
"""Finsh root update and update timestamp with passed timestamp_bytes.

Args:
timestamp_bytes:
Bytes used when calling trusted_set.update_timestamp().
Default self.metadata["timestamp"].

"""
timestamp_bytes = timestamp_bytes or self.metadata["timestamp"]
self.trusted_set.root_update_finished()
self.trusted_set.update_timestamp(timestamp_bytes)


def _update_all_besides_targets(
self,
Expand All @@ -112,13 +97,14 @@ def _update_all_besides_targets(
Default self.metadata["snapshot"].

"""
self._root_updated_and_update_timestamp(timestamp_bytes)

timestamp_bytes = timestamp_bytes or self.metadata["timestamp"]
self.trusted_set.update_timestamp(timestamp_bytes)
snapshot_bytes = snapshot_bytes or self.metadata["snapshot"]
self.trusted_set.update_snapshot(snapshot_bytes)


def test_update(self):
self.trusted_set.root_update_finished()
self.trusted_set.update_timestamp(self.metadata["timestamp"])
self.trusted_set.update_snapshot(self.metadata["snapshot"])
self.trusted_set.update_targets(self.metadata["targets"])
Expand All @@ -139,24 +125,16 @@ def test_update(self):
self.assertTrue(count, 6)

def test_out_of_order_ops(self):
# Update timestamp before root is finished
with self.assertRaises(RuntimeError):
self.trusted_set.update_timestamp(self.metadata["timestamp"])

self.trusted_set.root_update_finished()
with self.assertRaises(RuntimeError):
self.trusted_set.root_update_finished()

# Update root after a previous successful root update
with self.assertRaises(RuntimeError):
self.trusted_set.update_root(self.metadata["root"])

# Update snapshot before timestamp
with self.assertRaises(RuntimeError):
self.trusted_set.update_snapshot(self.metadata["snapshot"])

self.trusted_set.update_timestamp(self.metadata["timestamp"])

# Update root after timestamp
with self.assertRaises(RuntimeError):
self.trusted_set.update_root(self.metadata["root"])

# Update targets before snapshot
with self.assertRaises(RuntimeError):
self.trusted_set.update_targets(self.metadata["targets"])
Expand Down Expand Up @@ -198,8 +176,6 @@ def test_update_with_invalid_json(self):
with self.assertRaises(exceptions.RepositoryError):
self.trusted_set.update_root(self.metadata["snapshot"])

self.trusted_set.root_update_finished()

top_level_md = [
(self.metadata["timestamp"], self.trusted_set.update_timestamp),
(self.metadata["snapshot"], self.trusted_set.update_snapshot),
Expand Down Expand Up @@ -241,15 +217,16 @@ def test_update_root_new_root_ver_same_as_trusted_root_ver(self):
with self.assertRaises(exceptions.ReplayedMetadataError):
self.trusted_set.update_root(self.metadata["root"])

def test_root_update_finished_expired(self):
def test_root_expired_final_root(self):
def root_expired_modifier(root: Root) -> None:
root.expires = datetime(1970, 1, 1)

# intermediate root can be expired
root = self.modify_metadata("root", root_expired_modifier)
tmp_trusted_set = TrustedMetadataSet(root)
# call root_update_finished when trusted root has expired
# update timestamp to trigger final root expiry check
with self.assertRaises(exceptions.ExpiredMetadataError):
tmp_trusted_set.root_update_finished()
tmp_trusted_set.update_timestamp(self.metadata["timestamp"])


def test_update_timestamp_new_timestamp_ver_below_trusted_ver(self):
Expand All @@ -258,7 +235,7 @@ def version_modifier(timestamp: Timestamp) -> None:
timestamp.version = 3

timestamp = self.modify_metadata("timestamp", version_modifier)
self._root_updated_and_update_timestamp(timestamp)
self.trusted_set.update_timestamp(timestamp)
with self.assertRaises(exceptions.ReplayedMetadataError):
self.trusted_set.update_timestamp(self.metadata["timestamp"])

Expand All @@ -268,35 +245,38 @@ def bump_snapshot_version(timestamp: Timestamp) -> None:

# set current known snapshot.json version to 2
timestamp = self.modify_metadata("timestamp", bump_snapshot_version)
self._root_updated_and_update_timestamp(timestamp)
self.trusted_set.update_timestamp(timestamp)

# newtimestamp.meta["snapshot.json"].version < trusted_timestamp.meta["snapshot.json"].version
with self.assertRaises(exceptions.ReplayedMetadataError):
self.trusted_set.update_timestamp(self.metadata["timestamp"])

def test_update_timestamp_expired(self):
self.trusted_set.root_update_finished()
# new_timestamp has expired
def timestamp_expired_modifier(timestamp: Timestamp) -> None:
timestamp.expires = datetime(1970, 1, 1)

# intermediate timestamp is allowed to be expired
timestamp = self.modify_metadata("timestamp", timestamp_expired_modifier)
self.trusted_set.update_timestamp(timestamp)

# update snapshot to trigger final timestamp expiry check
with self.assertRaises(exceptions.ExpiredMetadataError):
self.trusted_set.update_timestamp(timestamp)
self.trusted_set.update_snapshot(self.metadata["snapshot"])

def test_update_snapshot_length_or_hash_mismatch(self):
def modify_snapshot_length(timestamp: Timestamp) -> None:
timestamp.meta["snapshot.json"].length = 1

# set known snapshot.json length to 1
timestamp = self.modify_metadata("timestamp", modify_snapshot_length)
self._root_updated_and_update_timestamp(timestamp)
self.trusted_set.update_timestamp(timestamp)

with self.assertRaises(exceptions.RepositoryError):
self.trusted_set.update_snapshot(self.metadata["snapshot"])

def test_update_snapshot_cannot_verify_snapshot_with_threshold(self):
self._root_updated_and_update_timestamp(self.metadata["timestamp"])
self.trusted_set.update_timestamp(self.metadata["timestamp"])
snapshot = Metadata.from_bytes(self.metadata["snapshot"])
snapshot.signatures.clear()
with self.assertRaises(exceptions.UnsignedMetadataError):
Expand All @@ -307,46 +287,70 @@ def timestamp_version_modifier(timestamp: Timestamp) -> None:
timestamp.meta["snapshot.json"].version = 2

timestamp = self.modify_metadata("timestamp", timestamp_version_modifier)
self._root_updated_and_update_timestamp(timestamp)
# new_snapshot.version != trusted timestamp.meta["snapshot"].version
def snapshot_version_modifier(snapshot: Snapshot) -> None:
snapshot.version = 3
self.trusted_set.update_timestamp(timestamp)

#intermediate snapshot is allowed to not match meta version
self.trusted_set.update_snapshot(self.metadata["snapshot"])

snapshot = self.modify_metadata("snapshot", snapshot_version_modifier)
# final snapshot must match meta version
with self.assertRaises(exceptions.BadVersionNumberError):
self.trusted_set.update_snapshot(snapshot)
self.trusted_set.update_targets(self.metadata["targets"])


def test_update_snapshot_after_successful_update_new_snapshot_no_meta(self):
def test_update_snapshot_file_removed_from_meta(self):
self._update_all_besides_targets(self.metadata["timestamp"])
# Test removing a meta_file in new_snapshot compared to the old snapshot
def no_meta_modifier(snapshot: Snapshot) -> None:
snapshot.meta = {}
def remove_file_from_meta(snapshot: Snapshot) -> None:
del snapshot.meta["targets.json"]

snapshot = self.modify_metadata("snapshot", no_meta_modifier)
# Test removing a meta_file in new_snapshot compared to the old snapshot
snapshot = self.modify_metadata("snapshot", remove_file_from_meta)
with self.assertRaises(exceptions.RepositoryError):
self.trusted_set.update_snapshot(snapshot)

def test_update_snapshot_after_succesfull_update_new_snapshot_meta_version_different(self):
self._root_updated_and_update_timestamp(self.metadata["timestamp"])
# snapshot.meta["project1"].version != new_snapshot.meta["project1"].version
def test_update_snapshot_meta_version_decreases(self):
self.trusted_set.update_timestamp(self.metadata["timestamp"])

def version_meta_modifier(snapshot: Snapshot) -> None:
for metafile_path in snapshot.meta.keys():
snapshot.meta[metafile_path].version += 1
snapshot.meta["targets.json"].version += 1

snapshot = self.modify_metadata("snapshot", version_meta_modifier)
self.trusted_set.update_snapshot(snapshot)

with self.assertRaises(exceptions.BadVersionNumberError):
self.trusted_set.update_snapshot(self.metadata["snapshot"])

def test_update_snapshot_expired_new_snapshot(self):
self._root_updated_and_update_timestamp(self.metadata["timestamp"])
# new_snapshot has expired
self.trusted_set.update_timestamp(self.metadata["timestamp"])
def snapshot_expired_modifier(snapshot: Snapshot) -> None:
snapshot.expires = datetime(1970, 1, 1)

# intermediate snapshot is allowed to be expired
snapshot = self.modify_metadata("snapshot", snapshot_expired_modifier)
self.trusted_set.update_snapshot(snapshot)

# update targets to trigger final snapshot expiry check
with self.assertRaises(exceptions.ExpiredMetadataError):
self.trusted_set.update_snapshot(snapshot)
self.trusted_set.update_targets(self.metadata["targets"])

def test_update_snapshot_successful_rollback_checks(self):
def meta_version_bump(timestamp: Timestamp) -> None:
timestamp.meta["snapshot.json"].version += 1

def version_bump(snapshot: Snapshot) -> None:
snapshot.version += 1

# load a "local" timestamp, then update to newer one:
self.trusted_set.update_timestamp(self.metadata["timestamp"])
new_timestamp = self.modify_metadata("timestamp", meta_version_bump)
self.trusted_set.update_timestamp(new_timestamp)

# load a "local" snapshot, then update to newer one:
self.trusted_set.update_snapshot(self.metadata["snapshot"])
new_snapshot = self.modify_metadata("snapshot", version_bump)
self.trusted_set.update_snapshot(new_snapshot)

# update targets to trigger final snapshot meta version check
self.trusted_set.update_targets(self.metadata["targets"])

def test_update_targets_no_meta_in_snapshot(self):
def no_meta_modifier(snapshot: Snapshot) -> None:
Expand Down
Loading