Skip to content

Commit 9d26e15

Browse files
Add route and handlers for closing PR mirrors
When a PR is closed by a merge: * Copy the PR mirror to the graduated binaries mirror * Prune duplicates that have already been updated in the "develop" mirror * Reindex the graduated binaries mirror * Delete the PR mirror When a PR is closed, but not merged: * Delete the PR mirror This change removes the need for a sync script based cleaning of PR binary mirrors.
1 parent c4956d5 commit 9d26e15

File tree

4 files changed

+311
-0
lines changed

4 files changed

+311
-0
lines changed

spackbot/handlers/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@
33
from .reviewers import add_reviewers, add_issue_maintainers # noqa
44
from .reviewers import add_reviewers # noqa
55
from .style import style_comment, fix_style # noqa
6+
from .mirrors import close_pr_mirror # noqa

spackbot/handlers/mirrors.py

+124
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
# Copyright 2013-2021 Lawrence Livermore National Security, LLC and other
2+
# Spack Project Developers. See the top-level COPYRIGHT file for details.
3+
#
4+
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
5+
6+
from sh.contrib import git
7+
import os
8+
9+
import spackbot.helpers as helpers
10+
from spackbot.workers import (
11+
copy_pr_mirror,
12+
prune_mirror_duplicates,
13+
update_mirror_index,
14+
delete_pr_mirror,
15+
get_queue,
16+
TASK_QUEUE_LONG,
17+
spack_upstream,
18+
pr_expected_base,
19+
pr_mirror_base_url,
20+
pr_shared_mirror,
21+
publish_mirror_base_url,
22+
)
23+
24+
# If we don't provide a timeout, the default in RQ is 180 seconds
25+
WORKER_JOB_TIMEOUT = 6 * 60 * 60
26+
27+
logger = helpers.getLogger(__name__)
28+
29+
30+
def list_ci_stacks(branch):
31+
with helpers.temp_dir() as cwd:
32+
# Shallow clone of spack to use for spack getting the current stacks availabe
33+
# in the base branch.
34+
git.clone("--branch", branch, "--depth", 1, spack_upstream, "spack-develop")
35+
36+
stacks = []
37+
pipeline_root = (
38+
f"{cwd}/spack-develop/share/spack/gitlab/cloud_pipelines/stacks/"
39+
)
40+
for stack in os.listdir(pipeline_root):
41+
if os.path.isfile(f"{pipeline_root}/{stack}/spack.yaml"):
42+
stacks.append(stack)
43+
44+
return stacks
45+
46+
47+
async def close_pr_mirror(event, gh):
48+
payload = event.data
49+
50+
base_branch = payload["pull_request"]["base"]["ref"]
51+
is_merged = payload["pull_request"]["merged"]
52+
53+
copy_job = None
54+
55+
if is_merged and base_branch == pr_expected_base:
56+
pr_number = payload["number"]
57+
pr_branch = payload["pull_request"]["head"]["ref"]
58+
59+
logger.info(
60+
f"PR {pr_number}/{pr_branch} merged to develop, graduating binaries"
61+
)
62+
63+
# Use the "long" running task queue
64+
ltask_q = get_queue(TASK_QUEUE_LONG)
65+
66+
job_metadata = {
67+
"type": None,
68+
"stack": None,
69+
}
70+
# Copy all of the stack binaries from the PR to the shared PR
71+
# mirror.
72+
job_metadata.update({"type": "copy"})
73+
pr_mirror_url = f"{pr_mirror_base_url}/pr{pr_number}_{pr_branch}"
74+
shared_pr_mirror_url = f"{pr_mirror_base_url}/{pr_shared_mirror}"
75+
copy_job = ltask_q.enqueue(
76+
copy_pr_mirror,
77+
pr_mirror_url,
78+
shared_pr_mirror_url,
79+
meta=job_metadata,
80+
job_timeout=WORKER_JOB_TIMEOUT,
81+
)
82+
logger.info(f"Copy job queued: {copy_job.id}")
83+
84+
# Loop all of the stacks detected on the expected base branch
85+
for stack in list_ci_stacks(pr_expected_base):
86+
job_metadata.update({"stack": stack})
87+
# Prune duplicates that have been published after copy
88+
# since copy may have introduced duplicates for some reason
89+
job_metadata.update({"type": "prune"})
90+
shared_pr_mirror_url = f"{pr_mirror_base_url}/{pr_shared_mirror}/{stack}"
91+
publish_mirror_url = f"{publish_mirror_base_url}/{stack}/{pr_expected_base}"
92+
prune_job = ltask_q.enqueue(
93+
prune_mirror_duplicates,
94+
shared_pr_mirror_url,
95+
publish_mirror_url,
96+
job_timeout=WORKER_JOB_TIMEOUT,
97+
depends_on=copy_job,
98+
meta=job_metadata,
99+
)
100+
logger.info(f"Pruning job queued: {prune_job.id}")
101+
102+
stack_mirror_url = f"{pr_mirror_base_url}/{pr_shared_mirror}/{stack}"
103+
# Queue a reindex for the stack mirror to attempt to run after
104+
# prune.
105+
job_metadata.update({"type": "reindex"})
106+
update_job = ltask_q.enqueue(
107+
update_mirror_index,
108+
stack_mirror_url,
109+
job_timeout=WORKER_JOB_TIMEOUT,
110+
depends_on=prune_job,
111+
meta=job_metadata,
112+
)
113+
logger.info(f"Reindex job queued: {update_job.id}")
114+
115+
# Delete the mirror
116+
job_metadata.update({"type": "delete"})
117+
del_job = ltask_q.enqueue(
118+
delete_pr_mirror,
119+
f"pr{pr_number}_{pr_branch}",
120+
meta=job_metadata,
121+
job_timeout=WORKER_JOB_TIMEOUT,
122+
depends_on=copy_job,
123+
)
124+
logger.info(f"Delete job queued: {del_job.id}")

spackbot/routes.py

+8
Original file line numberDiff line numberDiff line change
@@ -130,3 +130,11 @@ async def label_pull_requests(event, gh, *args, session, **kwargs):
130130
Add labels to PRs based on which files were modified.
131131
"""
132132
await handlers.add_labels(event, gh)
133+
134+
135+
@router.register("pull_request", action="closed")
136+
async def on_closed_pull_request(event, gh, *args, session, **kwargs):
137+
"""
138+
Respond to the pull request closed
139+
"""
140+
await handlers.close_pr_mirror(event, gh)

spackbot/workers.py

+178
Original file line numberDiff line numberDiff line change
@@ -326,3 +326,181 @@ async def fix_style_task(event):
326326
await gh.post(
327327
event.data["issue"]["comments_url"], {}, data={"body": message}
328328
)
329+
330+
331+
async def copy_pr_mirror(pr_mirror_url, shared_pr_mirror_url):
332+
"""Create an s3 client to copy between the
333+
per-pr mirror and the shared pr mirror.
334+
Also accumulated s3 objects to be deleted from
335+
the PR mirror
336+
"""
337+
pr_url = helpers.s3_parse_url(pr_mirror_url)
338+
shared_pr_url = helpers.s3_parse_url(shared_pr_mirror_url)
339+
340+
s3 = boto3.resource("s3")
341+
pr_bucket_name = pr_url.get("bucket")
342+
pr_bucket = s3.Bucket(pr_bucket_name)
343+
pr_mirror_prefix = pr_url.get("prefix")
344+
345+
shared_pr_bucket = s3.Bucket(shared_pr_url.get("bucket"))
346+
shared_pr_mirror_prefix = shared_pr_url.get("prefix")
347+
348+
# Files extensions to copy
349+
extensions = (".spack", ".spec.json", ".spec.yaml", ".spec.json.sig")
350+
351+
for obj in pr_bucket.filter(Prefix=pr_mirror_prefix):
352+
if obj.key.endswith(extensions):
353+
# Create a new opject replacing the first instance of the pr_mirror_prefix
354+
# with the shared_pr_mirror_prefix.
355+
new_obj = shared_pr_bucket.Object(
356+
obj.key.replace(pr_mirror_prefix, shared_pr_mirror_prefix, 1)
357+
)
358+
# Copy the PR mirror object to the new object in the shared PR mirror
359+
new_obj.copy(
360+
{
361+
"Bucket": pr_bucket_name,
362+
"Key": obj.key,
363+
}
364+
)
365+
366+
367+
async def delete_pr_mirror(pr_mirror_url):
368+
pr_url = helpers.s3_parse_url(pr_mirror_url)
369+
370+
s3 = boto3.resource("s3")
371+
pr_bucket = s3.Bucket(pr_url.get("bucket"))
372+
pr_mirror_prefix = pr_url.get("prefix")
373+
pr_bucket.filter(Prefix=pr_mirror_prefix).delete()
374+
375+
376+
# Upate index per stack mirror
377+
async def update_mirror_index(mirror_url):
378+
"""Use spack buildcache command to update index on remote mirror"""
379+
380+
# Current job stack
381+
job = get_current_job()
382+
stack = job.meta["info"]["stack"]
383+
384+
# Check if another reindex for this stack is queued
385+
do_reindex = True
386+
ltask_q = get_queue(job.origin)
387+
388+
for job in ltask_q.jobs:
389+
info = job.meta["info"]
390+
if info["type"] == "reindex" and info["stack"] == stack:
391+
do_reindex = False
392+
break
393+
394+
# Check the queue for more reindex jobs, if there are none,
395+
# run reindex on the graduated PR mirror.
396+
if do_reindex:
397+
print(f"Updating binary index at {mirror_url}")
398+
await helpers.run_in_subprocess(
399+
[
400+
"spack",
401+
"-d",
402+
"buildcache",
403+
"update-index",
404+
"--mirror-url",
405+
f"'{mirror_url}'",
406+
]
407+
)
408+
409+
410+
# This works because we guarentee the hash is in the filename.
411+
# If this assumption is ever broken, this code will break.
412+
def hash_from_key(key):
413+
h = None
414+
# hash is 32 chars long between a "-" and a "."
415+
# examples include:
416+
# linux-ubuntu18.04-x86_64-gcc-8.4.0-armadillo-10.5.0-gq3ijjrtnzgpm4bvuamjr6wa7hzxkypz.spack
417+
# linux-ubuntu18.04-x86_64-gcc-8.4.0-armadillo-10.5.0-gq3ijjrtnzgpm4bvuamjr6wa7hzxkypz.spec.json
418+
h = re.findall("-([a-zA-Z0-9]{32,32})\.", key.lower())
419+
if len(h) > 1:
420+
# Error, multiple matches are ambigious
421+
h = None
422+
elif h:
423+
h = h[0]
424+
return h
425+
426+
427+
# Prune per stack mirror
428+
async def prune_mirror_duplicates(pr_mirror_url, publish_mirror_url):
429+
s3 = boto3.resource("s3")
430+
431+
pr_url = helpers.s3_parse_url(pr_mirror_url)
432+
pr_bucket_name = pr_url.get("bucket")
433+
pr_bucket = s3.Bucket(pr_bucket_name)
434+
pr_mirror_prefix = pr_url.get("prefix")
435+
436+
publish_url = helpers.s3_parse_url(publish_mirror_url)
437+
publish_bucket = s3.Bucket(publish_url.get("bucket"))
438+
publish_mirror_prefix = publish_url.get("prefix")
439+
440+
# All of the expected possible spec file extensions
441+
extensions = (".spec.json", ".spec.yaml", ".spec.json.sig")
442+
443+
# Get the current time for age based pruning
444+
now = datetime.now()
445+
pr_specs = {}
446+
for obj in pr_bucket.objects.filter(
447+
Prefix=pr_mirror_prefix,
448+
):
449+
# Need to convert from aware to naive time to get delta
450+
last_modified = obj.last_modified.replace(tzinfo=None)
451+
# Prune obj.last_modified > 7 days to avoid storing cached objects
452+
# that only existed during development.
453+
if (now - last_modified).days >= helpers.pr_mirror_retire_after_days:
454+
logger.debug(
455+
f"pr mirror pruning {obj.key} from s3://{pr_bucket_name}: "
456+
"reason(age)"
457+
)
458+
# Anything older than the retirement age should just be indesciminately
459+
# pruned
460+
obj.delete()
461+
462+
# Grab the hash from the object, to ensure all of the files associated with
463+
# it are also removed.
464+
spec_hash = hash_from_key(obj.key)
465+
if spec_hash:
466+
pr_specs.add(hash_from_key(obj.key))
467+
continue
468+
469+
if not obj.key.endswith(extensions):
470+
continue
471+
472+
# Get the hashes in the shared PR bucket.
473+
spec_hash = hash_from_key(obj.key)
474+
if spec_hash:
475+
pr_specs.add(hash_from_key(obj.key))
476+
else:
477+
logger.error(f"Encountered spec file without hash in name: {obj.key}")
478+
479+
# Check in the published base branch bucket for duplicates to delete
480+
delete_specs = {}
481+
for obj in publish_bucket.objects.filter(
482+
Prefix=publish_mirror_prefix,
483+
):
484+
if not obj.key.endswith(extensions):
485+
continue
486+
487+
spec_hash = hash_from_key(obj.key.lower())
488+
if spec_hash in pr_specs:
489+
delete_specs.add(spec_hash)
490+
491+
# Also look at the .spack files for deletion
492+
extensions = (".spack", *extensions)
493+
494+
# Delete all of the objects with marked hashes
495+
for obj in pr_bucket.objects.filter(
496+
Prefix=pr_mirror_prefix,
497+
):
498+
if not obj.key.endswith(extensions):
499+
continue
500+
501+
if hash_from_key(obj.key) in delete_specs:
502+
logger.debug(
503+
f"pr mirror pruning {obj.key} from s3://{pr_bucket_name}: "
504+
"reason(published)"
505+
)
506+
obj.delete()

0 commit comments

Comments
 (0)