[createrepo] Pull in missing deltarpm patches (upstream fa0520 and bd5577)

Dennis Gilmore ausil at fedoraproject.org
Tue Jan 13 03:11:47 UTC 2015


commit 0feb00af51e4b7dae39beb86f521b4df8016eb49
Author: Dennis Gilmore <dennis at ausil.us>
Date:   Mon Jan 12 21:15:08 2015 -0600

    Pull in missing deltarpm patches (upstream fa0520 and bd5577)

 createrepo-deltarpm.patch |  374 +++++++++++++++++++++++++++++++++++++++++++++
 createrepo.spec           |    8 +-
 2 files changed, 381 insertions(+), 1 deletions(-)
---
diff --git a/createrepo-deltarpm.patch b/createrepo-deltarpm.patch
new file mode 100644
index 0000000..45dcb6e
--- /dev/null
+++ b/createrepo-deltarpm.patch
@@ -0,0 +1,374 @@
+diff --git a/createrepo/__init__.py b/createrepo/__init__.py
+index 85f2a3d..517ea04 100644
+--- a/createrepo/__init__.py
++++ b/createrepo/__init__.py
+@@ -28,6 +28,10 @@ import fcntl
+ import subprocess
+ from select import select
+ 
++# To support parallel deltarpms
++import multiprocessing
++import multiprocessing.managers
++
+ from yum import misc, Errors
+ from yum.repoMDObject import RepoMD, RepoData
+ from yum.sqlutils import executeSQL
+@@ -113,7 +117,10 @@ class MetaDataConfig(object):
+         #self.worker_cmd = './worker.py' # helpful when testing
+         self.retain_old_md = 0
+         self.compress_type = 'compat'
+-
++        # Parallel deltas additions
++        self.delta_workers = 1 # number of workers to fork when doing deltarpms
++        # Keep the combined payload size of all in-progress deltarpm creation below this number
++        self.max_concurrent_delta_rpm_size = self.max_delta_rpm_size
+         
+ class SimpleMDCallBack(object):
+     def errorlog(self, thing):
+@@ -718,19 +725,216 @@ class MetaDataGenerator:
+             if err:
+                 raise MDError, "Failed to process %d package(s)." % err
+ 
+-            for pkgfile in pkgfiles:
+-                if self.conf.deltas:
+-                    try:
+-                        po = self.read_in_package(pkgfile, pkgpath=pkgpath, reldir=reldir)
+-                        self._do_delta_rpm_package(po)
+-                    except MDError, e:
+-                        errorprint(e)
+-                        continue
+-                self.read_pkgs.append(pkgfile)
++            if self.conf.delta_workers == 1:
++                for pkgfile in pkgfiles:
++                    if self.conf.deltas:
++                        try:
++                            po = self.read_in_package(pkgfile, pkgpath=pkgpath, reldir=reldir)
++                            self._do_delta_rpm_package(po)
++                        except MDError, e:
++                            errorprint(e)
++                            continue
++                    self.read_pkgs.append(pkgfile)
++            else:
++                self._parallel_deltas(pkgfiles, pkgpath, reldir)
+ 
+         save_keptpkgs(None) # append anything left
+         return self.current_pkg
+ 
++    def _parallel_deltas(self, pkgfiles, pkgpath, reldir):
++        class WrappedMDCallBack(object):
++            def __init__(self, log_queue):
++                self.log_queue = log_queue
++            def errorlog(self, thing):
++                self.log_queue.put([ "errorlog", os.getpid(), thing ])
++
++            def log(self, thing):
++                self.log_queue.put([ "log", os.getpid(), thing ])
++
++            def progress(self, item, current, total):
++                # progress messages in a multiprocess context are likely to just be a confusing mess
++                pass
++
++        # Init a few things that we'd rather do in the main process and then
++        # inherit in the children
++        if not hasattr(self, 'tempdir'):
++            self.tempdir = tempfile.mkdtemp()
++        self._get_old_package_dict()
++
++        # queue containing packages that are candidates for processing
++        # now within the memory constraints
++        work_queue = multiprocessing.Queue(1)
++
++        # queue containing callback messages from the workers
++        log_queue = multiprocessing.Queue()
++
++        # Event used to allow the manager, when needed, to block for a completed task in a worker
++        completion_event = multiprocessing.Event()
++
++        # wrapped callback to pass in to workers
++        callback_wrap = WrappedMDCallBack(log_queue)
++
++        # list containing the completed packages
++        # accessed in children via a Manager and proxy as each child proc
++        # will be appending as it finishes
++        manager = multiprocessing.Manager()
++        read_pkgs_proxy = manager.list()
++
++        # lists used by the package size reading workers
++        pkgfiles_proxy = manager.list(pkgfiles)
++        pkgfiles_withsize_proxy = manager.list()
++
++        # process-safe value - total size of RPM payloads being deltaed
++        # The lock for entry into this also functions as our critical section
++        # elsewhere, as changes in the "in-flight" size of deltas is the key
++        # decision point in our work queue
++        # 'L' is unsigned long
++        active_work_size = multiprocessing.Value('L',0)
++
++        # Our candidate list is the packages sorted from largest to smallest
++        # Do this with workers as well because, parallel is good
++        # Seriously though, this is also CPU-bound
++        self.callback.log("Reading package sizes in preparation for deltarpm creation")
++
++        def size_reader_entry(pkgfiles_proxy, pkgpath, reldir, pkgfiles_withsize_proxy, repo_obj):
++            while True:
++                try:
++                    pkgfile = pkgfiles_proxy.pop()
++                    po = repo_obj.read_in_package(pkgfile, pkgpath=pkgpath, reldir=reldir)
++                    pkgfiles_withsize_proxy.append([ pkgfile, po.size ])
++                except IndexError:
++                    break
++
++        sort_workers = [ ]
++        for i in range(0,self.conf.delta_workers):
++            sw = multiprocessing.Process(target = size_reader_entry, args = (pkgfiles_proxy, pkgpath, reldir, pkgfiles_withsize_proxy, self))
++            sort_workers.append(sw)
++            sw.start()
++
++        for worker in sort_workers:
++            worker.join()
++
++        self.callback.log("Sorting package files by size")
++        sorted_packages = sorted(pkgfiles_withsize_proxy, key=lambda package: package[1], reverse=True)
++
++        def worker_entry(work_queue, log_queue, read_pkgs_proxy, repo_obj, callback_wrap, active_work_size, completion_event, pkgpath, reldir):
++            # We are now a new process - replace the callback with the wrapper that pushes log messages
++            # to a queue for processing in the main process
++            repo_obj.callback = callback_wrap
++            while True:
++                try:
++                    pkg = None
++                    pkg = work_queue.get()
++                    if not pkg:
++                        # The manager feeds each worker a None to indicate we are finished
++                        # this allows us to use a blocking get without fear - I think
++                        break
++                    po = repo_obj.read_in_package(pkg[0], pkgpath=pkgpath, reldir=reldir)
++                    repo_obj._do_delta_rpm_package(po)
++                except Exception, e:
++                    callback_wrap.errorlog(e)
++                    continue
++                finally:
++                    if pkg:
++                        with active_work_size.get_lock():
++                            active_work_size.value -= pkg[1]
++                            completion_event.set()
++                        read_pkgs_proxy.append(pkg)
++
++        def manager_entry(packages, active_work_size, log_queue, completion_event, work_queue, callback_wrap, read_pkgs_proxy, repo_obj, pkgpath, reldir):
++            max_work_size = repo_obj.conf.max_concurrent_delta_rpm_size
++            num_workers = repo_obj.conf.delta_workers
++            workers = [ ]
++            callback_wrap.log("Starting %d workers to process deltarpms - max total work size (%d) bytes" % (num_workers, max_work_size))
++            for i in range(0,repo_obj.conf.delta_workers):
++                wp = multiprocessing.Process(target = worker_entry, args = (work_queue, log_queue, read_pkgs_proxy, repo_obj, callback_wrap, active_work_size, completion_event, pkgpath, reldir))
++                workers.append(wp)
++                wp.start()
++
++            pending_packages = 0
++            while len(packages) > 0:
++                # Look through the package list and add things that fit under the max size limit
++                # until we reach the end of the list
++
++                # Don't read shared state for every package - it is an expensive operation
++                work_size_snap = active_work_size.value
++                #log_queue.put("Entered main loop with package list of length %d and size snap %d" % (len(packages), work_size_snap))
++                consumed = [ ]
++                for i in range(0,len(packages)):
++                    package = packages[i]
++                    if package[1] + work_size_snap < max_work_size:
++                        with active_work_size.get_lock():
++                            # As long as we have the lock we may as well refresh our view of the actual size
++                            active_work_size.value += package[1]
++                            #Turn on profiling if you want to convince yourself that this really does keep the size sane
++                            if self.conf.profile:
++                                callback_wrap.log("Adding package (%s) of size %d to deltarpm work queue" % (package[0], package[1]))
++                                callback_wrap.log("Current TOTAL in-flight work size: %d" % (active_work_size.value))
++                                callback_wrap.log("Packages remaining to process: %d" % (len(packages)-len(consumed)-1))
++                            work_size_snap = active_work_size.value
++                            # Note that we block here if the queue is full
++                            pending_packages = work_queue.qsize() + 1
++                            consumed.append(i)
++                        # This can block - do it without the lock
++                        work_queue.put(package)
++                # Now prune the added items from the list, going backwards to ensure that we don't
++                # shift the index and delete the wrong thing
++                for i in reversed(consumed):
++                    del packages[i]
++                if len(packages) == 0:
++                    break
++
++                with active_work_size.get_lock():
++                    work_queue_size = work_queue.qsize()
++                    if pending_packages > work_queue_size:
++                        # Some work was started since we last touched the queue - try to add more
++                        # Note that this guarantees there is at least one free slot in the work_queue
++                        # This should also prevent us from constantly spinning in the package loop when
++                        # we have space in the queue but not enough active_work_size to allow us to add any
++                        # available package
++                        pending_packages = work_queue_size
++                        continue
++                    else:
++                        completion_event.clear()
++
++                # We either have too many items on the work_queue or too much total work size
++                # Wait for a worker to finish and then try again
++                completion_event.wait()
++
++            # We are done - tell the workers to stop
++            for worker in workers:
++                work_queue.put(None)
++
++            for worker in workers:
++                worker.join()
++
++            # Now signal to the main thread that we are done adding work
++            log_queue.put(None)
++
++        manager = multiprocessing.Process(target = manager_entry, args = (sorted_packages, active_work_size, log_queue, completion_event, work_queue, callback_wrap, read_pkgs_proxy, self, pkgpath, reldir))
++        manager.start()
++
++        def log_digest(callback, log_message):
++            if log_message[0] == "errorlog":
++                callback.errorlog("Worker PID(%d) - %s" % (log_message[1], log_message[2]))
++            elif log_message[0] == "log":
++                callback.log("Worker PID(%d) - %s" % (log_message[1], log_message[2]))
++            else:
++                callback.errorlog("Malformed error in queue (%s)" % (str(log_message)))
++
++        # Process log messages until we get the finished signal "None"
++        while True:
++            log_message = log_queue.get()
++            if log_message is None:
++                break
++            log_digest(self.callback, log_message)
++
++        # now empty our proxy list
++        for pkg in read_pkgs_proxy:
++            self.read_pkgs.append(pkg)
++
++        # TODO: we may be able to explicitly stop the Manager at this point
++
+ 
+     def closeMetadataDocs(self):
+         # save them up to the tmp locations:
+@@ -849,19 +1053,22 @@ class MetaDataGenerator:
+         # appending the output. for each of the keys in the dict, return
+         # the tag for the target + each of the drpm infos + closure for the target
+         # tag
+-        targets = {}
+         results = []
+-        for drpm_fn in self.getFileList(self.conf.deltadir, '.drpm'):
+-            drpm_rel_fn = os.path.normpath(self.conf.delta_relative +
+-                                           '/' + drpm_fn) # this is annoying
+-            drpm_po = yumbased.CreateRepoPackage(self.ts,
+-                 self.conf.deltadir + '/' + drpm_fn, sumtype=self.conf.sumtype)
+-
+-            drpm = deltarpms.DeltaRPMPackage(drpm_po, self.conf.outputdir,
+-                                             drpm_rel_fn)
+-            if not targets.has_key(drpm_po.pkgtup):
+-                targets[drpm_po.pkgtup] = []
+-            targets[drpm_po.pkgtup].append(drpm.xml_dump_metadata())
++        if self.conf.delta_workers == 1:
++            targets = {}
++            for drpm_fn in self.getFileList(self.conf.deltadir, '.drpm'):
++                drpm_rel_fn = os.path.normpath(self.conf.delta_relative +
++                                               '/' + drpm_fn) # this is annoying
++                drpm_po = yumbased.CreateRepoPackage(self.ts,
++                     self.conf.deltadir + '/' + drpm_fn, sumtype=self.conf.sumtype)
++
++                drpm = deltarpms.DeltaRPMPackage(drpm_po, self.conf.outputdir,
++                                                 drpm_rel_fn)
++                if not targets.has_key(drpm_po.pkgtup):
++                    targets[drpm_po.pkgtup] = []
++                targets[drpm_po.pkgtup].append(drpm.xml_dump_metadata())
++        else:
++            targets = self._parallel_generate_delta_xml()
+ 
+         for (n, a, e, v, r) in targets.keys():
+             results.append("""  <newpackage name="%s" epoch="%s" version="%s" release="%s" arch="%s">\n""" % (
+@@ -874,6 +1081,52 @@ class MetaDataGenerator:
+ 
+         return ' '.join(results)
+ 
++    def _parallel_generate_delta_xml(self):
++        drpm_fns = [ ]
++        for drpm_fn in self.getFileList(self.conf.deltadir, '.drpm'):
++            drpm_fns.append(drpm_fn)
++
++        manager = multiprocessing.Manager()
++        drpm_fns_proxy = manager.list(drpm_fns)
++        targets_proxy = manager.dict()
++        targets_lock = manager.RLock()
++
++        def drpm_xml_entry(drpm_fns_proxy, targets_proxy, targets_lock, repo_obj):
++            while True:
++                try:
++                    drpm_fn = drpm_fns_proxy.pop()
++                    drpm_rel_fn = os.path.normpath(repo_obj.conf.delta_relative +
++                                                   '/' + drpm_fn) # this is annoying
++                    drpm_po = yumbased.CreateRepoPackage(repo_obj.ts,
++                         repo_obj.conf.deltadir + '/' + drpm_fn, sumtype=repo_obj.conf.sumtype)
++
++                    drpm = deltarpms.DeltaRPMPackage(drpm_po, repo_obj.conf.outputdir,
++                                                     drpm_rel_fn)
++
++                    with targets_lock:
++                        d_element = targets_proxy.get(drpm_po.pkgtup, [ ])
++                        d_element.append(drpm.xml_dump_metadata())
++                        # managed dict requires that we re-assign modified list rather than modify in place
++                        targets_proxy[drpm_po.pkgtup] = d_element
++                except IndexError:
++                    break
++
++        xml_workers = [ ]
++        for i in range(0,self.conf.delta_workers):
++            xw = multiprocessing.Process(target = drpm_xml_entry, args = (drpm_fns_proxy, targets_proxy, targets_lock, self))
++            xml_workers.append(xw)
++            xw.start()
++
++        for worker in xml_workers:
++            worker.join()
++
++        # I'm doing a copy in this way as I believe that prevents references to the manager from lingering
++        # TODO: Verify?
++        targets_copy = { }
++        for key in targets_proxy.keys():
++            targets_copy[key] = targets_proxy[key]
++        return targets_copy
++
+     def _createRepoDataObject(self, mdfile, mdtype, compress=True, 
+                               compress_type=None, attribs={}):
+         """return random metadata as RepoData object to be  added to RepoMD
+diff --git a/genpkgmetadata.py b/genpkgmetadata.py
+index 35e7fc9..a684038 100755
+--- a/genpkgmetadata.py
++++ b/genpkgmetadata.py
+@@ -128,9 +128,15 @@ def parse_args(args, conf):
+     parser.add_option("--max-delta-rpm-size", default=100000000,
+         dest='max_delta_rpm_size', type='int',
+         help="max size of an rpm that to run deltarpm against (in bytes)")
++    parser.add_option("--max-concurrent-delta-rpm-size", default=100000000,
++        dest='max_concurrent_delta_rpm_size', type='int',
++        help="max total payload size of concurrent deltarpm runs (in bytes)")
+     parser.add_option("--workers", default=def_workers,
+         dest='workers', type='int',
+         help="number of workers to spawn to read rpms")
++    parser.add_option("--delta-workers", default=1,
++        dest='delta_workers', type='int',
++        help="number of workers to spawn to create delta rpms")
+     parser.add_option("--xz", default=False,
+         action="store_true",
+         help=SUPPRESS_HELP)
+@@ -155,6 +161,12 @@ def parse_args(args, conf):
+     if opts.workers >= 128:
+         errorprint(_('Warning: More than 128 workers is a lot. Limiting.'))
+         opts.workers = 128
++    if opts.delta_workers > opts.workers:
++        errorprint(_('Warning: Requested more delta workers than workers. This is insane. Limiting.'))
++        opts.delta_workers = opts.workers
++    if opts.max_concurrent_delta_rpm_size < opts.max_delta_rpm_size:
++        errorprint(_('Warning: max_concurrent_delta_rpm_size < max_delta_rpm_size - this will deadlock. Setting them to the same value.'))
++        opts.max_concurrent_delta_rpm_size = opts.max_delta_rpm_size
+     if opts.sumtype == 'sha1':
+         errorprint(_('Warning: It is more compatible to use sha instead of sha1'))
+ 
diff --git a/createrepo.spec b/createrepo.spec
index e3f7632..00ef2bc 100644
--- a/createrepo.spec
+++ b/createrepo.spec
@@ -15,12 +15,14 @@ BuildRequires: bash-completion
 Summary: Creates a common metadata repository
 Name: createrepo
 Version: 0.10.3
-Release: 3%{?dist}
+Release: 4%{?dist}
 License: GPLv2
 Group: System Environment/Base
 Source: http://createrepo.baseurl.org/download/%{name}-%{version}.tar.gz
 Patch1: ten-changelog-limit.patch
 Patch2: createrepo-HEAD.patch
+Patch3: createrepo-deltarpm.patch
+
 URL: http://createrepo.baseurl.org/
 BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n)
 BuildArchitectures: noarch
@@ -36,6 +38,7 @@ packages.
 %setup -q
 %patch1 -p0
 %patch2 -p1
+%patch3 -p1
 
 %build
 
@@ -59,6 +62,9 @@ rm -rf $RPM_BUILD_ROOT
 %{python_sitelib}/createrepo
 
 %changelog
+* Mon Jan 12 2015 Ian McLeod <imcleod at fedoraproject.org> - 0.10.3-4
+- Pull in missing deltarpm patches (upstream fa0520 and bd5577)
+
 * Sat Jun 07 2014 Fedora Release Engineering <rel-eng at lists.fedoraproject.org> - 0.10.3-3
 - Rebuilt for https://fedoraproject.org/wiki/Fedora_21_Mass_Rebuild
 


More information about the scm-commits mailing list