Skip to content

Commit

Permalink
Add last_serial sync optimization
Browse files Browse the repository at this point in the history
fixes: pulp#351
  • Loading branch information
gerrod3 committed Nov 10, 2021
1 parent cfcdb32 commit 500a5e6
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGES/351.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Added ``last_serial`` sync optimization to Python repositories.
Subsequent syncs will use ``last_serial`` to get the changed packages since the previous sync.
18 changes: 18 additions & 0 deletions pulp_python/app/migrations/0011_pythonrepository_last_serial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 3.2.8 on 2021-10-21 20:25

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('python', '0010_update_json_field'),
]

operations = [
migrations.AddField(
model_name='pythonrepository',
name='last_serial',
field=models.IntegerField(default=0),
),
]
6 changes: 6 additions & 0 deletions pulp_python/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from django.core.exceptions import ObjectDoesNotExist
from django.db import models
from django.conf import settings
from django_lifecycle import hook, BEFORE_UPDATE
from yarl import URL

from pulpcore.plugin.models import (
Expand Down Expand Up @@ -236,6 +237,7 @@ class PythonRepository(Repository):
REMOTE_TYPES = [PythonRemote]

autopublish = models.BooleanField(default=False)
last_serial = models.IntegerField(default=0)

class Meta:
default_related_name = "%(app_label)s_%(model_name)s"
Expand All @@ -261,3 +263,7 @@ def finalize_new_version(self, new_version):
"""
remove_duplicates(new_version)
validate_repo_version(new_version)

@hook(BEFORE_UPDATE, when="remote", has_changed=True)
def clear_last_serial(self):
self.last_serial = 0
10 changes: 9 additions & 1 deletion pulp_python/app/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,17 @@ class PythonRepositorySerializer(core_serializers.RepositorySerializer):
default=False,
required=False,
)
last_serial = serializers.IntegerField(
help_text=_(
"The serial number from the last successful sync. Used in the sync process to "
"optimize the sync based on changes from previous sync. Use mirror=True to bypass"
"this optimization."
),
read_only=True,
)

class Meta:
fields = core_serializers.RepositorySerializer.Meta.fields + ("autopublish",)
fields = core_serializers.RepositorySerializer.Meta.fields + ("autopublish", "last_serial")
model = python_models.PythonRepository


Expand Down
2 changes: 1 addition & 1 deletion pulp_python/app/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
"""

from .publish import publish # noqa:F401
from .sync import sync # noqa:F401
from .sync import sync, update_remote # noqa:F401
from .upload import upload, upload_group # noqa:F401
60 changes: 54 additions & 6 deletions pulp_python/app/tasks/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from gettext import gettext as _
from os import environ

from django.db import transaction
from rest_framework import serializers

from pulpcore.plugin.models import Artifact, ProgressReport, Remote, Repository
from pulpcore.plugin.models import Artifact, ProgressReport, Remote
from pulpcore.plugin.stages import (
DeclarativeArtifact,
DeclarativeContent,
Expand All @@ -16,6 +17,7 @@
from pulp_python.app.models import (
PythonPackageContent,
PythonRemote,
PythonRepository,
)
from pulp_python.app.utils import parse_metadata

Expand All @@ -27,6 +29,42 @@
logger = logging.getLogger(__name__)


def update_remote(remote_pk, *args, **kwargs):
"""
Update `PythonRepository.last_serial` when URL or filters are updated.
Args:
remote_pk (str): the id of the PythonRemote
data (dict): dictionary whose keys represent the fields of the model and their corresponding
values.
partial (bool): When true, only the fields specified in the data dictionary are updated.
When false, any fields missing from the data dictionary are assumed to be None and
their values are updated as such.
Raises:
:class:`rest_framework.exceptions.ValidationError`: When serializer instance can't be saved
due to validation error. This theoretically should never occur since validation is
performed before the task is dispatched.
"""
from pulp_python.app.serializers import PythonRemoteSerializer
data = kwargs.pop("data", {})
partial = kwargs.pop("partial", False)
with transaction.atomic():
instance = PythonRemote.objects.get(pk=remote_pk)
fields = {f.name for f in instance._meta.local_fields if f.name != "includes"}
fields.update({"url", "policy"})
if fields.intersection(data):
repos = list(PythonRepository.objects.filter(remote_id=remote_pk, last_serial__gt=0))
for repo in repos:
repo.last_serial = 0
PythonRepository.objects.bulk_update(repos, ["last_serial"])


serializer = PythonRemoteSerializer(instance, data=data, partial=partial)
serializer.is_valid(raise_exception=True)
serializer.save()


def sync(remote_pk, repository_pk, mirror):
"""
Sync content from the remote repository.
Expand All @@ -43,15 +81,21 @@ def sync(remote_pk, repository_pk, mirror):
"""
remote = PythonRemote.objects.get(pk=remote_pk)
repository = Repository.objects.get(pk=repository_pk)
repository = PythonRepository.objects.get(pk=repository_pk)

if not remote.url:
raise serializers.ValidationError(
detail=_("A remote must have a url attribute to sync.")
)

first_stage = PythonBanderStage(remote)
DeclarativeVersion(first_stage, repository, mirror).create()
same_remote = getattr(repository.remote, "pk", None) == remote_pk
serial = repository.last_serial if same_remote else 0
first_stage = PythonBanderStage(remote, mirror, serial)
version = DeclarativeVersion(first_stage, repository, mirror).create()
if version is not None and same_remote:
if first_stage.next_serial and first_stage.next_serial != repository.last_serial:
repository.last_serial = first_stage.next_serial
repository.save()


def create_bandersnatch_config(remote):
Expand Down Expand Up @@ -97,10 +141,13 @@ class PythonBanderStage(Stage):
Python Package Syncing Stage using Bandersnatch
"""

def __init__(self, remote):
def __init__(self, remote, mirror, last_serial):
"""Initialize the stage and Bandersnatch config"""
super().__init__()
self.remote = remote
# If mirror=True, then sync everything, don't use serial
self.serial = last_serial if not mirror else 0
self.next_serial = None
create_bandersnatch_config(remote)

async def run(self):
Expand All @@ -119,7 +166,7 @@ async def run(self):
message="Fetching Project Metadata", code="sync.fetching.project"
) as p:
pmirror = PulpMirror(
serial=0, # Serial currently isn't supported by Pulp
serial=self.serial,
master=master,
workers=workers,
deferred_download=deferred_download,
Expand All @@ -132,6 +179,7 @@ async def run(self):
Requirement(pkg).name for pkg in self.remote.includes
]
await pmirror.synchronize(packages_to_sync)
self.next_serial = pmirror.target_serial


class PulpMirror(Mirror):
Expand Down
18 changes: 18 additions & 0 deletions pulp_python/app/viewsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,24 @@ class PythonRemoteViewSet(core_viewsets.RemoteViewSet):
queryset = python_models.PythonRemote.objects.all()
serializer_class = python_serializers.PythonRemoteSerializer

@extend_schema(
description="Trigger an asynchronous update task",
responses={202: AsyncOperationResponseSerializer},
)
def update(self, request, pk, **kwargs):
"""Update remote."""
partial = kwargs.pop("partial", False)
lock = [self.get_object()]
repos = python_models.PythonRepository.objects.filter(remote_id=pk, last_serial__gt=0)
lock.extend(repos)
async_result = dispatch(
tasks.update_remote,
exclusive_resources=lock,
args=(pk,),
kwargs={"data": request.data, "partial": partial},
)
return core_viewsets.OperationPostponedResponse(async_result, request)

@extend_schema(
summary="Create from Bandersnatch",
responses={201: python_serializers.PythonRemoteSerializer},
Expand Down
44 changes: 44 additions & 0 deletions pulp_python/tests/unit/test_models.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,53 @@
from django.test import TestCase

from pulp_python.app.models import PythonRemote, PythonRepository
from pulp_python.app.tasks import update_remote


DEFAULT_SERIAL = 10000
MAX_SERIAL = 20000


class TestNothing(TestCase):
"""Test Nothing (placeholder)."""

def test_nothing_at_all(self):
"""Test that the tests are running and that's it."""
self.assertTrue(True)


class TestRepositoryLastSerial(TestCase):
"""Tests `last_serial` gets properly set and reset with syncs and remote changes."""

def setUp(self):
self.remote = PythonRemote.objects.create(name="test", url="https://pypi.org")
self.repo = PythonRepository.objects.create(name="test", remote=self.remote, last_serial=DEFAULT_SERIAL)

def test_remote_change(self):
"""Test that `last_serial` gets reset upon remote change."""
self.assertEqual(self.repo.remote.pk, self.remote.pk)
self.assertEqual(self.repo.last_serial, DEFAULT_SERIAL)
self.repo.remote = None
self.repo.save()
self.repo.refresh_from_db()
self.assertEqual(self.repo.last_serial, 0)

def test_remote_update(self):
"""Test that updating a remote will reset `last_serial`."""
self.assertEqual(self.repo.remote.pk, self.remote.pk)
self.assertEqual(self.repo.last_serial, DEFAULT_SERIAL)
# Remote is only updated through update task
new_body = {"url": "https://test.pypi.org"}
update_remote(self.remote.pk, data=new_body, partial=True)
self.repo.refresh_from_db()
self.assertEqual(self.repo.last_serial, 0)

def test_remote_update_no_change(self):
"""Test that changing 'includes' field doesn't reset `last_serial`."""
self.assertEqual(self.repo.remote.pk, self.remote.pk)
self.assertEqual(self.repo.last_serial, DEFAULT_SERIAL)
new_body = {"includes": ["shelf-reader"]}
update_remote(self.remote.pk, data=new_body, partial=True)
self.repo.refresh_from_db()
self.assertEqual(self.repo.last_serial, DEFAULT_SERIAL)

0 comments on commit 500a5e6

Please sign in to comment.