diff --git a/CHANGES/6244.feature b/CHANGES/6244.feature new file mode 100644 index 0000000000..dd4c7bb781 --- /dev/null +++ b/CHANGES/6244.feature @@ -0,0 +1 @@ +Added support to create and distribute checkpoint publications in Pulp. \ No newline at end of file diff --git a/CHANGES/plugin_api/6244.feature b/CHANGES/plugin_api/6244.feature new file mode 100644 index 0000000000..1fd7720d04 --- /dev/null +++ b/CHANGES/plugin_api/6244.feature @@ -0,0 +1,3 @@ +Added support to create and distribute checkpoint publications in Pulp. +Plugins can choose to enable this feature by exposing the checkpoint field in their inherited PublicationSerializer and DistributionSerializer. +Checkpoint publications and distributions can be created by passing checkpoint=True when creating them. \ No newline at end of file diff --git a/CHANGES/pulp_file/6244.feature b/CHANGES/pulp_file/6244.feature new file mode 100644 index 0000000000..baabc1eb2e --- /dev/null +++ b/CHANGES/pulp_file/6244.feature @@ -0,0 +1 @@ +Added support to create checkpoint file publications and distribute them through checkpoint file distributions. \ No newline at end of file diff --git a/pulp_file/app/serializers.py b/pulp_file/app/serializers.py index 8865c7b8fc..e29d93cc1b 100644 --- a/pulp_file/app/serializers.py +++ b/pulp_file/app/serializers.py @@ -115,10 +115,11 @@ class FilePublicationSerializer(PublicationSerializer): required=False, allow_null=True, ) + checkpoint = serializers.BooleanField(default=False) class Meta: model = FilePublication - fields = PublicationSerializer.Meta.fields + ("distributions", "manifest") + fields = PublicationSerializer.Meta.fields + ("distributions", "manifest", "checkpoint") class FileDistributionSerializer(DistributionSerializer): @@ -133,9 +134,10 @@ class FileDistributionSerializer(DistributionSerializer): queryset=models.Publication.objects.exclude(complete=False), allow_null=True, ) + checkpoint = serializers.BooleanField(default=False) class Meta: - fields = DistributionSerializer.Meta.fields + ("publication",) + fields = DistributionSerializer.Meta.fields + ("publication", "checkpoint") model = FileDistribution diff --git a/pulp_file/app/tasks/publishing.py b/pulp_file/app/tasks/publishing.py index 36e86b71a4..3893fe580a 100644 --- a/pulp_file/app/tasks/publishing.py +++ b/pulp_file/app/tasks/publishing.py @@ -19,7 +19,7 @@ log = logging.getLogger(__name__) -def publish(manifest, repository_version_pk): +def publish(manifest, repository_version_pk, checkpoint=False): """ Create a Publication based on a RepositoryVersion. @@ -37,7 +37,9 @@ def publish(manifest, repository_version_pk): ) with tempfile.TemporaryDirectory(dir="."): - with FilePublication.create(repo_version, pass_through=True) as publication: + with FilePublication.create( + repo_version, pass_through=True, checkpoint=checkpoint + ) as publication: publication.manifest = manifest if manifest: manifest = Manifest(manifest) diff --git a/pulp_file/app/viewsets.py b/pulp_file/app/viewsets.py index b2b80cac68..4ba5a3d935 100644 --- a/pulp_file/app/viewsets.py +++ b/pulp_file/app/viewsets.py @@ -433,11 +433,16 @@ def create(self, request): serializer.is_valid(raise_exception=True) repository_version = serializer.validated_data.get("repository_version") manifest = serializer.validated_data.get("manifest") + checkpoint = serializer.validated_data.get("checkpoint") result = dispatch( tasks.publish, shared_resources=[repository_version.repository], - kwargs={"repository_version_pk": str(repository_version.pk), "manifest": manifest}, + kwargs={ + "repository_version_pk": str(repository_version.pk), + "manifest": manifest, + "checkpoint": checkpoint, + }, ) return OperationPostponedResponse(result, request) diff --git a/pulpcore/app/migrations/0128_distribution_checkpoint_publication_checkpoint.py b/pulpcore/app/migrations/0128_distribution_checkpoint_publication_checkpoint.py new file mode 100644 index 0000000000..a24518f79f --- /dev/null +++ b/pulpcore/app/migrations/0128_distribution_checkpoint_publication_checkpoint.py @@ -0,0 +1,23 @@ +# Generated by Django 4.2.18 on 2025-01-30 19:14 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("core", "0127_remove_upstreampulp_pulp_label_select"), + ] + + operations = [ + migrations.AddField( + model_name="distribution", + name="checkpoint", + field=models.BooleanField(default=False), + ), + migrations.AddField( + model_name="publication", + name="checkpoint", + field=models.BooleanField(default=False, editable=False), + ), + ] diff --git a/pulpcore/app/models/publication.py b/pulpcore/app/models/publication.py index c19d5cf032..c8efc04534 100644 --- a/pulpcore/app/models/publication.py +++ b/pulpcore/app/models/publication.py @@ -73,6 +73,7 @@ class Publication(MasterModel): pass_through (models.BooleanField): Indicates that the publication is a pass-through to the repository version. Enabling pass-through has the same effect as creating a PublishedArtifact for all of the content (artifacts) in the repository. + checkpoint (models.BooleanField): Indicates a checkpoint publication. Relations: repository_version (models.ForeignKey): The RepositoryVersion used to @@ -98,12 +99,13 @@ class Publication(MasterModel): complete = models.BooleanField(db_index=True, default=False) pass_through = models.BooleanField(default=False) + checkpoint = models.BooleanField(default=False, editable=False) repository_version = models.ForeignKey("RepositoryVersion", on_delete=models.CASCADE) pulp_domain = models.ForeignKey("Domain", default=get_domain_pk, on_delete=models.PROTECT) @classmethod - def create(cls, repository_version, pass_through=False): + def create(cls, repository_version, pass_through=False, checkpoint=False): """ Create a publication. @@ -125,7 +127,11 @@ def create(cls, repository_version, pass_through=False): Adds a Task.created_resource for the publication. """ with transaction.atomic(): - publication = cls(pass_through=pass_through, repository_version=repository_version) + publication = cls( + pass_through=pass_through, + repository_version=repository_version, + checkpoint=checkpoint, + ) publication.save() resource = CreatedResource(content_object=publication) resource.save() @@ -159,6 +165,10 @@ def delete(self, **kwargs): # It's possible for errors to occur before any publication has been completed, # so we need to handle the case when no Publication exists. try: + if self.checkpoint: + base_paths |= Distribution.objects.filter( + checkpoint=self.checkpoint, repository=self.repository_version.repository + ).values_list("base_path", flat=True) versions = self.repository.versions.all() pubs = Publication.objects.filter(repository_version__in=versions, complete=True) publication = pubs.latest("repository_version", "pulp_created") @@ -629,6 +639,7 @@ class Distribution(MasterModel): pulp_labels (HStoreField): Dictionary of string values. base_path (models.TextField): The base (relative) path component of the published url. hidden (models.BooleanField): Whether this distribution should be hidden in the content app. + checkpoint (models.BooleanField): Whether this distribution serves checkpoint publications. Relations: content_guard (models.ForeignKey): An optional content-guard. @@ -649,6 +660,7 @@ class Distribution(MasterModel): base_path = models.TextField() pulp_domain = models.ForeignKey("Domain", default=get_domain_pk, on_delete=models.PROTECT) hidden = models.BooleanField(default=False, null=True) + checkpoint = models.BooleanField(default=False) content_guard = models.ForeignKey(ContentGuard, null=True, on_delete=models.SET_NULL) publication = models.ForeignKey(Publication, null=True, on_delete=models.SET_NULL) @@ -706,6 +718,7 @@ def content_headers_for(self, path): "remote", "repository", "repository_version", + "checkpoint", ], has_changed=True, ) diff --git a/pulpcore/app/models/repository.py b/pulpcore/app/models/repository.py index 35a06809b9..ecb78538a2 100644 --- a/pulpcore/app/models/repository.py +++ b/pulpcore/app/models/repository.py @@ -328,7 +328,15 @@ def protected_versions(self): publication__pk__in=Distribution.objects.values_list("publication_id") ) - if distro := Distribution.objects.filter(repository=self.pk).first(): + # Protect repo versions of distributed checkpoint publications. + if Distribution.objects.filter(repository=self.pk, checkpoint=True).exists(): + qs |= self.versions.filter( + publication__pk__in=Publication.objects.filter(checkpoint=True).values_list( + "pulp_id" + ) + ) + + if distro := Distribution.objects.filter(repository=self.pk, checkpoint=False).first(): if distro.detail_model().SERVE_FROM_PUBLICATION: # if the distro serves publications, protect the latest published repo version version = self.versions.filter( diff --git a/pulpcore/app/serializers/publication.py b/pulpcore/app/serializers/publication.py index 738e969181..0345baef0a 100644 --- a/pulpcore/app/serializers/publication.py +++ b/pulpcore/app/serializers/publication.py @@ -294,6 +294,8 @@ def validate(self, data): "publication", (self.partial and self.instance.publication) or None ) + checkpoint = data.get("checkpoint", (self.partial and self.instance.checkpoint) or None) + if publication_provided and repository_version_provided: raise serializers.ValidationError( _( @@ -316,6 +318,12 @@ def validate(self, data): "may be used simultaneously." ) ) + elif checkpoint and ( + not repository_provided or publication_provided or repository_version_provided + ): + raise serializers.ValidationError( + _("The 'checkpoint' attribute may only be used with the 'repository' attribute.") + ) return data diff --git a/pulpcore/app/viewsets/publication.py b/pulpcore/app/viewsets/publication.py index ee17776d54..2a811ec702 100644 --- a/pulpcore/app/viewsets/publication.py +++ b/pulpcore/app/viewsets/publication.py @@ -70,6 +70,7 @@ class Meta: model = Publication fields = { "pulp_created": DATETIME_FILTER_OPTIONS, + "checkpoint": ["exact"], } @@ -497,6 +498,7 @@ class Meta: "name": NAME_FILTER_OPTIONS, "base_path": ["exact", "contains", "icontains", "in"], "repository": ["exact", "in"], + "checkpoint": ["exact"], } diff --git a/pulpcore/content/handler.py b/pulpcore/content/handler.py index e043b0b1c1..0b28ebdd23 100644 --- a/pulpcore/content/handler.py +++ b/pulpcore/content/handler.py @@ -6,7 +6,7 @@ import socket import struct from gettext import gettext as _ -from datetime import timedelta +from datetime import datetime, timedelta from aiohttp.client_exceptions import ClientResponseError, ClientConnectionError from aiohttp.web import FileResponse, StreamResponse, HTTPOk @@ -118,6 +118,28 @@ def __init__(self, path, distros): super().__init__(body=html, headers={"Content-Type": "text/html"}) +class CheckpointListings(HTTPOk): + """ + Response for browsing through the checkpoints of a specific checkpoint distro. + + This is returned when visiting the base path of a checkpoint distro. + """ + + def __init__(self, path, repo): + """Create the HTML response.""" + + checkpoints = ( + Publication.objects.filter(repository_version__repository=repo, checkpoint=True) + .order_by("pulp_created") + .values_list("pulp_created", flat=True) + .distinct() + ) + dates = {f"{Handler._format_checkpoint_timestamp(s)}/": s for s in checkpoints} + directory_list = dates.keys() + html = Handler.render_html(directory_list, dates=dates, path=path) + super().__init__(body=html, headers={"Content-Type": "text/html"}) + + class ArtifactNotFound(Exception): """ The artifact associated with a published-artifact does not exist. @@ -164,6 +186,7 @@ class Handler: ] distribution_model = None + checkpoint_ts_format = "%Y%m%dT%H%M%SZ" @staticmethod def _reset_db_connection(): @@ -312,7 +335,7 @@ def _match_distribution(cls, path, add_trailing_slash=True): distro_model = cls.distribution_model or Distribution domain = get_domain() try: - return ( + distro_object = ( distro_model.objects.filter(pulp_domain=domain) .select_related( "repository", @@ -326,6 +349,10 @@ def _match_distribution(cls, path, add_trailing_slash=True): .get(base_path__in=base_paths) .cast() ) + + if distro_object.checkpoint: + return cls._handle_checkpoint_distribution(distro_object, original_path) + return distro_object except ObjectDoesNotExist: if path.rstrip("/") in base_paths: distros = distro_model.objects.filter( @@ -336,12 +363,7 @@ def _match_distribution(cls, path, add_trailing_slash=True): raise DistroListings(path=path, distros=distros) else: # The list of a subset of distributions was requested without a trailing / - if settings.DOMAIN_ENABLED: - raise HTTPMovedPermanently( - f"{settings.CONTENT_PATH_PREFIX}{domain.name}/{path}" - ) - else: - raise HTTPMovedPermanently(f"{settings.CONTENT_PATH_PREFIX}{path}") + Handler._redirect_sub_path(path) log.debug( _("Distribution not matched for {path} using: {base_paths}").format( @@ -351,6 +373,125 @@ def _match_distribution(cls, path, add_trailing_slash=True): raise PathNotResolved(original_path) + @classmethod + def _handle_checkpoint_distribution(cls, distro, original_path): + """ + Handle a checkpoint distribution. + + Args: + distro (Distribution): The checkpoint distribution. + original_path (str): The original path component of the URL. + + Returns: + The detail object of the matched distribution. + + Raises: + PathNotResolved: when the path is invalid. + CheckpointListings: when the path is the base path of a checkpoint distribution. + """ + # Determine whether it's a listing or a specific checkpoint + if original_path == f"{distro.base_path}": + Handler._redirect_sub_path(f"{original_path}/") + elif original_path == f"{distro.base_path}/": + raise CheckpointListings(path=original_path, repo=distro.repository) + else: + base_path = distro.base_path + request_timestamp = Handler._extract_checkpoint_timestamp(base_path, original_path) + + # Find the latest checkpoint publication before or at the timestamp + checkpoint_publication = ( + Publication.objects.filter( + pulp_created__lte=request_timestamp, + repository_version__repository=distro.repository, + checkpoint=True, + ) + .order_by("-pulp_created") + .first() + ) + + if not checkpoint_publication: + raise PathNotResolved(original_path) + + pub_timestamp_str = Handler._format_checkpoint_timestamp( + checkpoint_publication.pulp_created + ) + request_timestamp_str = Handler._format_checkpoint_timestamp(request_timestamp) + if pub_timestamp_str != request_timestamp_str: + Handler._redirect_sub_path(f"{base_path}/{pub_timestamp_str}/") + + distro.base_path = f"{base_path}/{request_timestamp_str}" + distro.repository = None + distro.publication = checkpoint_publication + return distro + + @staticmethod + def _extract_checkpoint_timestamp(base_path, original_path): + """ + Validate the path and extract the timestamp from it. + + Args: + base_path (str): The base path of the distribution. + original_path (str): The path component of the URL. + + Returns: + The checkpoint timestamp in the request URL. + + Raises: + PathNotResolved: when the path is invalid. + """ + pattern = rf"^{re.escape(base_path)}/(\d{{8}}T\d{{6}}Z)(/.*)?$" + re.compile(pattern) + match = re.search(pattern, original_path) + if match: + request_timestamp_str = match.group(1) + try: + request_timestamp = datetime.strptime( + request_timestamp_str, Handler.checkpoint_ts_format + ) + except ValueError: + raise PathNotResolved(original_path) + else: + raise PathNotResolved(original_path) + + # The timestamp is truncated to seconds, so we need to cover the whole second + request_timestamp = request_timestamp.replace(microsecond=999999).replace( + tzinfo=timezone.utc + ) + # Future timestamps are not allowed for checkpoints + if request_timestamp > datetime.now(tz=timezone.utc): + raise PathNotResolved(original_path) + + return request_timestamp + + @staticmethod + def _format_checkpoint_timestamp(timestamp): + """ + Format a timestamp to the checkpoint format. + + Args: + timestamp (datetime): The timestamp to format. + + Returns: + The formatted timestamp using the checkpoint_ts_format. + """ + return datetime.strftime(timestamp, Handler.checkpoint_ts_format) + + @staticmethod + def _redirect_sub_path(path): + """ + Redirect to the correct path based on whether domain is enabled. + + Args: + path (str): The path component after the path prefix. + + Raises: + HTTPMovedPermanently: to the correct path. + """ + if settings.DOMAIN_ENABLED: + raise HTTPMovedPermanently(f"{settings.CONTENT_PATH_PREFIX}{get_domain().name}/{path}") + else: + raise HTTPMovedPermanently(f"{settings.CONTENT_PATH_PREFIX}{path}") + @staticmethod def _permit(request, distribution): """ diff --git a/pulpcore/tests/functional/api/using_plugin/test_content_access.py b/pulpcore/tests/functional/api/using_plugin/test_content_access.py index ac068fe1d9..66f611e56c 100644 --- a/pulpcore/tests/functional/api/using_plugin/test_content_access.py +++ b/pulpcore/tests/functional/api/using_plugin/test_content_access.py @@ -1,5 +1,10 @@ """Tests related to content delivery.""" +from datetime import datetime, timedelta +import re +from time import sleep +from urllib.parse import urlparse +from aiohttp import ClientResponseError import pytest import uuid @@ -10,6 +15,7 @@ from pulpcore.tests.functional.utils import ( download_file, ) +from pulpcore.content.handler import Handler @pytest.mark.parallel @@ -69,3 +75,91 @@ def test_upload_file_on_demand_already( content = file_bindings.ContentFilesApi.read(content.pulp_href) assert content.artifact is not None + + +@pytest.mark.parallel +def test_checkpoint( + file_repository_factory, + file_distribution_factory, + file_content_unit_with_name_factory, + file_bindings, + gen_object_with_cleanup, + monitor_task, + http_get, +): + """Test checkpoint.""" + + def create_publication(repo, checkpoint): + content = file_content_unit_with_name_factory(str(uuid.uuid4())) + task = file_bindings.RepositoriesFileApi.modify( + repo.pulp_href, {"add_content_units": [content.pulp_href]} + ).task + monitor_task(task) + repo = file_bindings.RepositoriesFileApi.read(repo.pulp_href) + pub = gen_object_with_cleanup( + file_bindings.PublicationsFileApi, + {"repository_version": repo.latest_version_href, "checkpoint": checkpoint}, + ) + sleep(1) + return pub + + # setup + repo = file_repository_factory() + distribution = file_distribution_factory(repository=repo.pulp_href, checkpoint=True) + + pub_0 = create_publication(repo, False) + pub_1 = create_publication(repo, True) + pub_2 = create_publication(repo, False) + pub_3 = create_publication(repo, True) + pub_4 = create_publication(repo, False) + + # checkpoints listing + response = http_get(distribution.base_url).decode("utf-8") + checkpoints_ts = set(re.findall(r"\d{8}T\d{6}Z", response)) + assert len(checkpoints_ts) == 2 + assert Handler._format_checkpoint_timestamp(pub_1.pulp_created) in checkpoints_ts + assert Handler._format_checkpoint_timestamp(pub_3.pulp_created) in checkpoints_ts + + # exact ts + pub_1_url = ( + f"{distribution.base_url}{Handler._format_checkpoint_timestamp(pub_1.pulp_created)}/" + ) + response = http_get(pub_1_url).decode("utf-8") + assert f"