[createrepo] Pull in missing deltarpm patches (upstream fa0520 and bd5577)
Dennis Gilmore
ausil at fedoraproject.org
Tue Jan 13 03:11:47 UTC 2015
commit 0feb00af51e4b7dae39beb86f521b4df8016eb49
Author: Dennis Gilmore <dennis at ausil.us>
Date: Mon Jan 12 21:15:08 2015 -0600
Pull in missing deltarpm patches (upstream fa0520 and bd5577)
createrepo-deltarpm.patch | 374 +++++++++++++++++++++++++++++++++++++++++++++
createrepo.spec | 8 +-
2 files changed, 381 insertions(+), 1 deletions(-)
---
diff --git a/createrepo-deltarpm.patch b/createrepo-deltarpm.patch
new file mode 100644
index 0000000..45dcb6e
--- /dev/null
+++ b/createrepo-deltarpm.patch
@@ -0,0 +1,374 @@
+diff --git a/createrepo/__init__.py b/createrepo/__init__.py
+index 85f2a3d..517ea04 100644
+--- a/createrepo/__init__.py
++++ b/createrepo/__init__.py
+@@ -28,6 +28,10 @@ import fcntl
+ import subprocess
+ from select import select
+
++# To support parallel deltarpms
++import multiprocessing
++import multiprocessing.managers
++
+ from yum import misc, Errors
+ from yum.repoMDObject import RepoMD, RepoData
+ from yum.sqlutils import executeSQL
+@@ -113,7 +117,10 @@ class MetaDataConfig(object):
+ #self.worker_cmd = './worker.py' # helpful when testing
+ self.retain_old_md = 0
+ self.compress_type = 'compat'
+-
++ # Parallel deltas additions
++ self.delta_workers = 1 # number of workers to fork when doing deltarpms
++ # Keep the combined payload size of all in-progress deltarpm creation below this number
++ self.max_concurrent_delta_rpm_size = self.max_delta_rpm_size
+
+ class SimpleMDCallBack(object):
+ def errorlog(self, thing):
+@@ -718,19 +725,216 @@ class MetaDataGenerator:
+ if err:
+ raise MDError, "Failed to process %d package(s)." % err
+
+- for pkgfile in pkgfiles:
+- if self.conf.deltas:
+- try:
+- po = self.read_in_package(pkgfile, pkgpath=pkgpath, reldir=reldir)
+- self._do_delta_rpm_package(po)
+- except MDError, e:
+- errorprint(e)
+- continue
+- self.read_pkgs.append(pkgfile)
++ if self.conf.delta_workers == 1:
++ for pkgfile in pkgfiles:
++ if self.conf.deltas:
++ try:
++ po = self.read_in_package(pkgfile, pkgpath=pkgpath, reldir=reldir)
++ self._do_delta_rpm_package(po)
++ except MDError, e:
++ errorprint(e)
++ continue
++ self.read_pkgs.append(pkgfile)
++ else:
++ self._parallel_deltas(pkgfiles, pkgpath, reldir)
+
+ save_keptpkgs(None) # append anything left
+ return self.current_pkg
+
++ def _parallel_deltas(self, pkgfiles, pkgpath, reldir):
++ class WrappedMDCallBack(object):
++ def __init__(self, log_queue):
++ self.log_queue = log_queue
++ def errorlog(self, thing):
++ self.log_queue.put([ "errorlog", os.getpid(), thing ])
++
++ def log(self, thing):
++ self.log_queue.put([ "log", os.getpid(), thing ])
++
++ def progress(self, item, current, total):
++ # progress messages in a multiprocess context are likely to just be a confusing mess
++ pass
++
++ # Init a few things that we'd rather do in the main process and then
++ # inherit in the children
++ if not hasattr(self, 'tempdir'):
++ self.tempdir = tempfile.mkdtemp()
++ self._get_old_package_dict()
++
++ # queue containing packages that are candidates for processing
++ # now within the memory constraints
++ work_queue = multiprocessing.Queue(1)
++
++ # queue containing callback messages from the workers
++ log_queue = multiprocessing.Queue()
++
++ # Event used to allow the manager, when needed, to block for a completed task in a worker
++ completion_event = multiprocessing.Event()
++
++ # wrapped callback to pass in to workers
++ callback_wrap = WrappedMDCallBack(log_queue)
++
++ # list containing the completed packages
++ # accessed in children via a Manager and proxy as each child proc
++ # will be appending as it finishes
++ manager = multiprocessing.Manager()
++ read_pkgs_proxy = manager.list()
++
++ # lists used by the package size reading workers
++ pkgfiles_proxy = manager.list(pkgfiles)
++ pkgfiles_withsize_proxy = manager.list()
++
++ # process-safe value - total size of RPM payloads being deltaed
++ # The lock for entry into this also functions as our critical section
++ # elsewhere, as changes in the "in-flight" size of deltas is the key
++ # decision point in our work queue
++ # 'L' is unsigned long
++ active_work_size = multiprocessing.Value('L',0)
++
++ # Our candidate list is the packages sorted from largest to smallest
++ # Do this with workers as well because, parallel is good
++ # Seriously though, this is also CPU-bound
++ self.callback.log("Reading package sizes in preparation for deltarpm creation")
++
++ def size_reader_entry(pkgfiles_proxy, pkgpath, reldir, pkgfiles_withsize_proxy, repo_obj):
++ while True:
++ try:
++ pkgfile = pkgfiles_proxy.pop()
++ po = repo_obj.read_in_package(pkgfile, pkgpath=pkgpath, reldir=reldir)
++ pkgfiles_withsize_proxy.append([ pkgfile, po.size ])
++ except IndexError:
++ break
++
++ sort_workers = [ ]
++ for i in range(0,self.conf.delta_workers):
++ sw = multiprocessing.Process(target = size_reader_entry, args = (pkgfiles_proxy, pkgpath, reldir, pkgfiles_withsize_proxy, self))
++ sort_workers.append(sw)
++ sw.start()
++
++ for worker in sort_workers:
++ worker.join()
++
++ self.callback.log("Sorting package files by size")
++ sorted_packages = sorted(pkgfiles_withsize_proxy, key=lambda package: package[1], reverse=True)
++
++ def worker_entry(work_queue, log_queue, read_pkgs_proxy, repo_obj, callback_wrap, active_work_size, completion_event, pkgpath, reldir):
++ # We are now a new process - replace the callback with the wrapper that pushes log messages
++ # to a queue for processing in the main process
++ repo_obj.callback = callback_wrap
++ while True:
++ try:
++ pkg = None
++ pkg = work_queue.get()
++ if not pkg:
++ # The manager feeds each worker a None to indicate we are finished
++ # this allows us to use a blocking get without fear - I think
++ break
++ po = repo_obj.read_in_package(pkg[0], pkgpath=pkgpath, reldir=reldir)
++ repo_obj._do_delta_rpm_package(po)
++ except Exception, e:
++ callback_wrap.errorlog(e)
++ continue
++ finally:
++ if pkg:
++ with active_work_size.get_lock():
++ active_work_size.value -= pkg[1]
++ completion_event.set()
++ read_pkgs_proxy.append(pkg)
++
++ def manager_entry(packages, active_work_size, log_queue, completion_event, work_queue, callback_wrap, read_pkgs_proxy, repo_obj, pkgpath, reldir):
++ max_work_size = repo_obj.conf.max_concurrent_delta_rpm_size
++ num_workers = repo_obj.conf.delta_workers
++ workers = [ ]
++ callback_wrap.log("Starting %d workers to process deltarpms - max total work size (%d) bytes" % (num_workers, max_work_size))
++ for i in range(0,repo_obj.conf.delta_workers):
++ wp = multiprocessing.Process(target = worker_entry, args = (work_queue, log_queue, read_pkgs_proxy, repo_obj, callback_wrap, active_work_size, completion_event, pkgpath, reldir))
++ workers.append(wp)
++ wp.start()
++
++ pending_packages = 0
++ while len(packages) > 0:
++ # Look through the package list and add things that fit under the max size limit
++ # until we reach the end of the list
++
++ # Don't read shared state for every package - it is an expensive operation
++ work_size_snap = active_work_size.value
++ #log_queue.put("Entered main loop with package list of length %d and size snap %d" % (len(packages), work_size_snap))
++ consumed = [ ]
++ for i in range(0,len(packages)):
++ package = packages[i]
++ if package[1] + work_size_snap < max_work_size:
++ with active_work_size.get_lock():
++ # As long as we have the lock we may as well refresh our view of the actual size
++ active_work_size.value += package[1]
++ #Turn on profiling if you want to convince yourself that this really does keep the size sane
++ if self.conf.profile:
++ callback_wrap.log("Adding package (%s) of size %d to deltarpm work queue" % (package[0], package[1]))
++ callback_wrap.log("Current TOTAL in-flight work size: %d" % (active_work_size.value))
++ callback_wrap.log("Packages remaining to process: %d" % (len(packages)-len(consumed)-1))
++ work_size_snap = active_work_size.value
++ # Note that we block here if the queue is full
++ pending_packages = work_queue.qsize() + 1
++ consumed.append(i)
++ # This can block - do it without the lock
++ work_queue.put(package)
++ # Now prune the added items from the list, going backwards to ensure that we don't
++ # shift the index and delete the wrong thing
++ for i in reversed(consumed):
++ del packages[i]
++ if len(packages) == 0:
++ break
++
++ with active_work_size.get_lock():
++ work_queue_size = work_queue.qsize()
++ if pending_packages > work_queue_size:
++ # Some work was started since we last touched the queue - try to add more
++ # Note that this guarantees there is at least one free slot in the work_queue
++ # This should also prevent us from constantly spinning in the package loop when
++ # we have space in the queue but not enough active_work_size to allow us to add any
++ # available package
++ pending_packages = work_queue_size
++ continue
++ else:
++ completion_event.clear()
++
++ # We either have too many items on the work_queue or too much total work size
++ # Wait for a worker to finish and then try again
++ completion_event.wait()
++
++ # We are done - tell the workers to stop
++ for worker in workers:
++ work_queue.put(None)
++
++ for worker in workers:
++ worker.join()
++
++ # Now signal to the main thread that we are done adding work
++ log_queue.put(None)
++
++ manager = multiprocessing.Process(target = manager_entry, args = (sorted_packages, active_work_size, log_queue, completion_event, work_queue, callback_wrap, read_pkgs_proxy, self, pkgpath, reldir))
++ manager.start()
++
++ def log_digest(callback, log_message):
++ if log_message[0] == "errorlog":
++ callback.errorlog("Worker PID(%d) - %s" % (log_message[1], log_message[2]))
++ elif log_message[0] == "log":
++ callback.log("Worker PID(%d) - %s" % (log_message[1], log_message[2]))
++ else:
++ callback.errorlog("Malformed error in queue (%s)" % (str(log_message)))
++
++ # Process log messages until we get the finished signal "None"
++ while True:
++ log_message = log_queue.get()
++ if log_message is None:
++ break
++ log_digest(self.callback, log_message)
++
++ # now empty our proxy list
++ for pkg in read_pkgs_proxy:
++ self.read_pkgs.append(pkg)
++
++ # TODO: we may be able to explicitly stop the Manager at this point
++
+
+ def closeMetadataDocs(self):
+ # save them up to the tmp locations:
+@@ -849,19 +1053,22 @@ class MetaDataGenerator:
+ # appending the output. for each of the keys in the dict, return
+ # the tag for the target + each of the drpm infos + closure for the target
+ # tag
+- targets = {}
+ results = []
+- for drpm_fn in self.getFileList(self.conf.deltadir, '.drpm'):
+- drpm_rel_fn = os.path.normpath(self.conf.delta_relative +
+- '/' + drpm_fn) # this is annoying
+- drpm_po = yumbased.CreateRepoPackage(self.ts,
+- self.conf.deltadir + '/' + drpm_fn, sumtype=self.conf.sumtype)
+-
+- drpm = deltarpms.DeltaRPMPackage(drpm_po, self.conf.outputdir,
+- drpm_rel_fn)
+- if not targets.has_key(drpm_po.pkgtup):
+- targets[drpm_po.pkgtup] = []
+- targets[drpm_po.pkgtup].append(drpm.xml_dump_metadata())
++ if self.conf.delta_workers == 1:
++ targets = {}
++ for drpm_fn in self.getFileList(self.conf.deltadir, '.drpm'):
++ drpm_rel_fn = os.path.normpath(self.conf.delta_relative +
++ '/' + drpm_fn) # this is annoying
++ drpm_po = yumbased.CreateRepoPackage(self.ts,
++ self.conf.deltadir + '/' + drpm_fn, sumtype=self.conf.sumtype)
++
++ drpm = deltarpms.DeltaRPMPackage(drpm_po, self.conf.outputdir,
++ drpm_rel_fn)
++ if not targets.has_key(drpm_po.pkgtup):
++ targets[drpm_po.pkgtup] = []
++ targets[drpm_po.pkgtup].append(drpm.xml_dump_metadata())
++ else:
++ targets = self._parallel_generate_delta_xml()
+
+ for (n, a, e, v, r) in targets.keys():
+ results.append(""" <newpackage name="%s" epoch="%s" version="%s" release="%s" arch="%s">\n""" % (
+@@ -874,6 +1081,52 @@ class MetaDataGenerator:
+
+ return ' '.join(results)
+
++ def _parallel_generate_delta_xml(self):
++ drpm_fns = [ ]
++ for drpm_fn in self.getFileList(self.conf.deltadir, '.drpm'):
++ drpm_fns.append(drpm_fn)
++
++ manager = multiprocessing.Manager()
++ drpm_fns_proxy = manager.list(drpm_fns)
++ targets_proxy = manager.dict()
++ targets_lock = manager.RLock()
++
++ def drpm_xml_entry(drpm_fns_proxy, targets_proxy, targets_lock, repo_obj):
++ while True:
++ try:
++ drpm_fn = drpm_fns_proxy.pop()
++ drpm_rel_fn = os.path.normpath(repo_obj.conf.delta_relative +
++ '/' + drpm_fn) # this is annoying
++ drpm_po = yumbased.CreateRepoPackage(repo_obj.ts,
++ repo_obj.conf.deltadir + '/' + drpm_fn, sumtype=repo_obj.conf.sumtype)
++
++ drpm = deltarpms.DeltaRPMPackage(drpm_po, repo_obj.conf.outputdir,
++ drpm_rel_fn)
++
++ with targets_lock:
++ d_element = targets_proxy.get(drpm_po.pkgtup, [ ])
++ d_element.append(drpm.xml_dump_metadata())
++ # managed dict requires that we re-assign modified list rather than modify in place
++ targets_proxy[drpm_po.pkgtup] = d_element
++ except IndexError:
++ break
++
++ xml_workers = [ ]
++ for i in range(0,self.conf.delta_workers):
++ xw = multiprocessing.Process(target = drpm_xml_entry, args = (drpm_fns_proxy, targets_proxy, targets_lock, self))
++ xml_workers.append(xw)
++ xw.start()
++
++ for worker in xml_workers:
++ worker.join()
++
++ # I'm doing a copy in this way as I believe that prevents references to the manager from lingering
++ # TODO: Verify?
++ targets_copy = { }
++ for key in targets_proxy.keys():
++ targets_copy[key] = targets_proxy[key]
++ return targets_copy
++
+ def _createRepoDataObject(self, mdfile, mdtype, compress=True,
+ compress_type=None, attribs={}):
+ """return random metadata as RepoData object to be added to RepoMD
+diff --git a/genpkgmetadata.py b/genpkgmetadata.py
+index 35e7fc9..a684038 100755
+--- a/genpkgmetadata.py
++++ b/genpkgmetadata.py
+@@ -128,9 +128,15 @@ def parse_args(args, conf):
+ parser.add_option("--max-delta-rpm-size", default=100000000,
+ dest='max_delta_rpm_size', type='int',
+ help="max size of an rpm that to run deltarpm against (in bytes)")
++ parser.add_option("--max-concurrent-delta-rpm-size", default=100000000,
++ dest='max_concurrent_delta_rpm_size', type='int',
++ help="max total payload size of concurrent deltarpm runs (in bytes)")
+ parser.add_option("--workers", default=def_workers,
+ dest='workers', type='int',
+ help="number of workers to spawn to read rpms")
++ parser.add_option("--delta-workers", default=1,
++ dest='delta_workers', type='int',
++ help="number of workers to spawn to create delta rpms")
+ parser.add_option("--xz", default=False,
+ action="store_true",
+ help=SUPPRESS_HELP)
+@@ -155,6 +161,12 @@ def parse_args(args, conf):
+ if opts.workers >= 128:
+ errorprint(_('Warning: More than 128 workers is a lot. Limiting.'))
+ opts.workers = 128
++ if opts.delta_workers > opts.workers:
++ errorprint(_('Warning: Requested more delta workers than workers. This is insane. Limiting.'))
++ opts.delta_workers = opts.workers
++ if opts.max_concurrent_delta_rpm_size < opts.max_delta_rpm_size:
++ errorprint(_('Warning: max_concurrent_delta_rpm_size < max_delta_rpm_size - this will deadlock. Setting them to the same value.'))
++ opts.max_concurrent_delta_rpm_size = opts.max_delta_rpm_size
+ if opts.sumtype == 'sha1':
+ errorprint(_('Warning: It is more compatible to use sha instead of sha1'))
+
diff --git a/createrepo.spec b/createrepo.spec
index e3f7632..00ef2bc 100644
--- a/createrepo.spec
+++ b/createrepo.spec
@@ -15,12 +15,14 @@ BuildRequires: bash-completion
Summary: Creates a common metadata repository
Name: createrepo
Version: 0.10.3
-Release: 3%{?dist}
+Release: 4%{?dist}
License: GPLv2
Group: System Environment/Base
Source: http://createrepo.baseurl.org/download/%{name}-%{version}.tar.gz
Patch1: ten-changelog-limit.patch
Patch2: createrepo-HEAD.patch
+Patch3: createrepo-deltarpm.patch
+
URL: http://createrepo.baseurl.org/
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n)
BuildArchitectures: noarch
@@ -36,6 +38,7 @@ packages.
%setup -q
%patch1 -p0
%patch2 -p1
+%patch3 -p1
%build
@@ -59,6 +62,9 @@ rm -rf $RPM_BUILD_ROOT
%{python_sitelib}/createrepo
%changelog
+* Mon Jan 12 2015 Ian McLeod <imcleod at fedoraproject.org> - 0.10.3-4
+- Pull in missing deltarpm patches (upstream fa0520 and bd5577)
+
* Sat Jun 07 2014 Fedora Release Engineering <rel-eng at lists.fedoraproject.org> - 0.10.3-3
- Rebuilt for https://fedoraproject.org/wiki/Fedora_21_Mass_Rebuild
More information about the scm-commits
mailing list