extras-buildsys/utils/pushscript/rpmUtils __init__.py, NONE, 1.1 arch.py, NONE, 1.1 miscutils.py, NONE, 1.1 oldUtils.py, NONE, 1.1 transaction.py, NONE, 1.1 updates.py, NONE, 1.1
Michael Schwendt (mschwendt)
fedora-extras-commits at redhat.com
Sun Oct 15 12:30:46 UTC 2006
- Previous message: extras-buildsys/utils/pushscript README,1.3,1.4
- Next message: extras-buildsys/utils/pushscript/repomd __init__.py, NONE, 1.1 mdErrors.py, NONE, 1.1 mdUtils.py, NONE, 1.1 packageObject.py, NONE, 1.1 packageSack.py, NONE, 1.1 repoMDObject.py, NONE, 1.1 test.py, NONE, 1.1
- Messages sorted by:
[ date ]
[ thread ]
[ subject ]
[ author ]
Author: mschwendt
Update of /cvs/fedora/extras-buildsys/utils/pushscript/rpmUtils
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv30805/rpmUtils
Added Files:
__init__.py arch.py miscutils.py oldUtils.py transaction.py
updates.py
Log Message:
Add the patched yum 2.6.1 here, so we remove the dependency on the
system-installed one.
--- NEW FILE __init__.py ---
#!/usr/bin/python -tt
import rpm
import miscutils
import exceptions
import oldUtils
class RpmUtilsError(exceptions.Exception):
def __init__(self, args=None):
exceptions.Exception.__init__(self)
self.args = args
# useful functions
def getHeadersByKeyword(ts, **kwargs):
"""return list of headers from the rpmdb matching a keyword
ex: getHeadersByKeyword(name='foo', version='1', release='1')
"""
lst = []
# lifted from up2date - way to easy and useful NOT to steal - thanks adrian
mi = ts.dbMatch()
if kwargs.has_key('epoch'):
del(kwargs['epoch']) # epochs don't work here for None/0/'0' reasons
keywords = len(kwargs.keys())
for hdr in mi:
match = 0
for keyword in kwargs.keys():
if hdr[keyword] == kwargs[keyword]:
match += 1
if match == keywords:
lst.append(hdr)
del mi
return lst
def getIndexesByKeyword(ts, **kwargs):
"""return list of headers Indexes from the rpmdb matching a keyword
ex: getHeadersByKeyword(name='foo', version='1', release='1')
"""
# THIS IS EXCRUCIATINGLY SLOW
lst = []
mi = ts.dbMatch()
for keyword in kwargs.keys():
mi.pattern(keyword, rpm.RPMMIRE_GLOB, kwargs[keyword])
# we really shouldnt be getting multiples here, but what the heck
for h in mi:
instance = mi.instance()
lst.append(instance)
del mi
return lst
class RpmDBHolder:
def __init__(self):
self.pkglists = []
def addDB(self, ts):
self.ts = ts
self.match_on_index = 1
try:
# we need the find a known index so we can test if
# rpm/rpm-python allows us to grab packages by db index.
mi = self.ts.dbMatch()
hdr = mi.next()
known_index = mi.instance()
mi = self.ts.dbMatch(0, known_index)
hdr = mi.next()
except (TypeError, StopIteration), e:
self.match_on_index = 0
else:
self.match_on_index = 1
self.indexdict = {}
mi = self.ts.dbMatch()
for hdr in mi:
pkgtuple = self._hdr2pkgTuple(hdr)
if not self.indexdict.has_key(pkgtuple):
self.indexdict[pkgtuple] = []
else:
continue
self.indexdict[pkgtuple].append(mi.instance())
self.pkglists.append(pkgtuple)
del mi
def _hdr2pkgTuple(self, hdr):
name = hdr['name']
arch = hdr['arch']
ver = str(hdr['version']) # convert these to strings to be sure
rel = str(hdr['release'])
epoch = hdr['epoch']
if epoch is None:
epoch = '0'
else:
epoch = str(epoch)
return (name, arch, epoch, ver, rel)
def getPkgList(self):
return self.pkglists
def getHdrList(self):
hdrlist = []
mi = self.ts.dbMatch()
if mi:
for hdr in mi:
hdrlist.append(hdr)
del mi
return hdrlist
def getNameArchPkgList(self):
lst = []
for (name, arch, epoch, ver, rel) in self.pkglists:
lst.append((name, arch))
return miscutils.unique(lst)
def getNamePkgList(self):
lst = []
for (name, arch, epoch, ver, rel) in self.pkglists:
lst.append(name)
return miscutils.unique(lst)
def returnNewestbyNameArch(self):
"""returns the newest set of pkgs based on 'name and arch'"""
highdict = {}
for (n, a, e, v, r) in self.pkglists:
if not highdict.has_key((n, a)):
highdict[(n, a)] = (e, v, r)
else:
(e2, v2, r2) = highdict[(n, a)]
rc = miscutils.compareEVR((e,v,r), (e2, v2, r2))
if rc > 0:
highdict[(n, a)] = (e, v, r)
returns = []
for (n, a) in highdict.keys():
(e, v, r) = highdict[(n, a)]
returns.append((n, a, e, v ,r))
return returns
def returnNewestbyName(self):
"""returns the newest set of pkgs based on name"""
highdict = {}
for (n, a, e, v, r) in self.pkglists:
if not highdict.has_key(n):
highdict[n] = (a, e, v, r)
else:
(a2, e2, v2, r2) = highdict[n]
rc = miscutils.compareEVR((e,v,r), (e2, v2, r2))
if rc > 0:
highdict[n] = (a, e, v, r)
returns = []
for n in highdict.keys():
(a, e, v, r) = highdict[n]
returns.append((n, a, e, v ,r))
return returns
def installed(self, name=None, arch=None, epoch=None, ver=None, rel=None):
if len(self.returnTupleByKeyword(name=name, arch=arch, epoch=epoch, ver=ver, rel=rel)) > 0:
return 1
return 0
def returnTupleByKeyword(self, name=None, arch=None, epoch=None, ver=None, rel=None):
"""return a list of pkgtuples based on name, arch, epoch, ver and/or rel
matches."""
completelist = self.getPkgList()
removedict = {}
returnlist = []
for pkgtup in completelist:
(n, a, e, v, r) = pkgtup
if name is not None:
if name != n:
removedict[pkgtup] = 1
continue
if arch is not None:
if arch != a:
removedict[pkgtup] = 1
continue
if epoch is not None:
if epoch != e:
removedict[pkgtup] = 1
continue
if ver is not None:
if ver != v:
removedict[pkgtup] = 1
continue
if rel is not None:
if rel != r:
removedict[pkgtup] = 1
continue
for pkgtup in completelist:
if not removedict.has_key(pkgtup):
returnlist.append(pkgtup)
return returnlist
def returnHeaderByTuple(self, pkgtuple):
"""returns a list of header(s) based on the pkgtuple provided"""
(n, a, e, v, r) = pkgtuple
if not self.match_on_index:
lst = getHeadersByKeyword(self.ts, name=n, arch=a, epoch=e, version=v,
release=r)
return lst
else:
idxs = self.returnIndexByTuple(pkgtuple)
idx = idxs[0]
mi = self.ts.dbMatch(0, idx)
hdr = mi.next()
return [hdr]
def returnIndexByTuple(self, pkgtuple):
return self.indexdict[pkgtuple]
def whatProvides(self, provname, provflag, provver):
"""uses the ts in this class to return a list of pkgtuples that match
the provide"""
matches = []
checkfileprov = 0
if provname[0] == '/':
checkfileprov = 1
matchingFileHdrs = self.ts.dbMatch('basenames', provname)
matchingHdrs = self.ts.dbMatch('provides', provname)
else:
matchingHdrs = self.ts.dbMatch('provides', provname)
# now we have the list of pkgs installed in the rpmdb that
# have a the provname matching, now we need to find out
# if any/all of them match the flag/ver set
if checkfileprov and matchingFileHdrs.count() > 0:
for matchhdr in matchingFileHdrs:
pkgtuple = self._hdr2pkgTuple(matchhdr)
matches.append(pkgtuple)
del matchingFileHdrs
return miscutils.unique(matches)
if provflag in [0, None] or provver is None: # if we've got no ver or flag
# for comparison then they all match
for matchhdr in matchingHdrs:
pkgtuple = self._hdr2pkgTuple(matchhdr)
matches.append(pkgtuple)
del matchingHdrs
return miscutils.unique(matches)
for matchhdr in matchingHdrs:
(pkg_n, pkg_a, pkg_e, pkg_v, pkg_r) = self._hdr2pkgTuple(matchhdr)
pkgtuple = (pkg_n, pkg_a, pkg_e, pkg_v, pkg_r)
# break the provver up into e-v-r (e:v-r)
(prov_e, prov_v, prov_r) = miscutils.stringToVersion(provver)
provtuple = (provname, provflag, (prov_e, prov_v, prov_r))
# find the provide in the header
providelist = self._providesList(matchhdr)
for (name, flag, ver) in providelist:
if name != provname: # definitely not
continue
match_n = name
match_a = pkg_a # you got a better idea?
if ver is not None:
(match_e, match_v, match_r) = miscutils.stringToVersion(ver)
else:
match_e = pkg_e
match_v = pkg_v
match_r = pkg_r
matchtuple = (match_n, match_a, match_e, match_v, match_r)
# This provides matches if the version is in the requested
# range or the providing package provides the resource
# without a version (meaning that it matches all EVR)
if miscutils.rangeCheck(provtuple, matchtuple) or (match_v == None):
matches.append(pkgtuple)
del matchingHdrs
return miscutils.unique(matches)
def _providesList(self, hdr):
lst = []
names = hdr[rpm.RPMTAG_PROVIDENAME]
flags = hdr[rpm.RPMTAG_PROVIDEFLAGS]
vers = hdr[rpm.RPMTAG_PROVIDEVERSION]
if names is not None:
lst = zip(names, flags, vers)
return miscutils.unique(lst)
--- NEW FILE arch.py ---
#!/usr/bin/python
#
import os
# dict mapping arch -> ( multicompat, best personality, biarch personality )
multilibArches = { "x86_64": ( "athlon", "x86_64", "athlon" ),
"sparc64": ( "sparc", "sparc", "sparc64" ),
"ppc64": ( "ppc", "ppc", "ppc64" ),
"s390x": ( "s390", "s390x", "s390" ),
"ia64": ( "i686", "ia64", "i686" )
}
arches = {
# ia32
"athlon": "i686",
"i686": "i586",
"i586": "i486",
"i486": "i386",
"i386": "noarch",
# amd64
"x86_64": "athlon",
"amd64": "x86_64",
"ia32e": "x86_64",
# itanium
"ia64": "i686",
# ppc
"ppc64pseries": "ppc64",
"ppc64iseries": "ppc64",
"ppc64": "ppc",
"ppc": "noarch",
# s390{,x}
"s390x": "s390",
"s390": "noarch",
# sparc
"sparc64": "sparcv9",
"sparcv9": "sparcv8",
"sparcv8": "sparc",
"sparc": "noarch",
# alpha
"alphaev6": "alphaev56",
"alphaev56": "alphaev5",
"alphaev5": "alpha",
"alpha": "noarch",
}
# this computes the difference between myarch and targetarch
def archDifference(myarch, targetarch):
if myarch == targetarch:
return 1
if arches.has_key(myarch):
ret = archDifference(arches[myarch], targetarch)
if ret != 0:
return ret + 1
return 0
return 0
def score(arch):
return archDifference(canonArch, arch)
def isMultiLibArch(arch=None):
"""returns true if arch is a multilib arch, false if not"""
if arch is None:
arch = getCanonArch()
if not arches.has_key(arch): # or we could check if it is noarch
return 0
if multilibArches.has_key(arch):
return 1
if multilibArches.has_key(arches[arch]):
return 1
return 0
def getBestArchFromList(archlist, myarch=None):
"""
return the best arch from the list for myarch if - myarch is not given,
then return the best arch from the list for the canonArch.
"""
if myarch is None:
myarch = getCanonArch()
if len(archlist) == 0:
return None
thisarch = archlist[0]
for arch in archlist[1:]:
val1 = archDifference(myarch, thisarch)
val2 = archDifference(myarch, arch)
if val1 == 0 and val2 == 0:
continue
if val1 < val2:
if val1 == 0:
thisarch = arch
if val2 < val1:
if val2 != 0:
thisarch = arch
if val1 == val2:
pass
# thisarch should now be our bestarch
# one final check to make sure we're not returning a bad arch
val = archDifference(myarch, thisarch)
if val == 0:
return None
return thisarch
def getArchList(thisarch=None):
# this returns a list of archs that are compatible with arch given
if not thisarch:
thisarch = getCanonArch()
archlist = [thisarch]
while arches.has_key(thisarch):
thisarch = arches[thisarch]
archlist.append(thisarch)
return archlist
def getCanonX86Arch(arch):
# only athlon vs i686 isn't handled with uname currently
if arch != "i686":
return arch
# if we're i686 and AuthenticAMD, then we should be an athlon
f = open("/proc/cpuinfo", "r")
lines = f.readlines()
f.close()
for line in lines:
if line.startswith("vendor") and line.find("AuthenticAMD") != -1:
return "athlon"
# i686 doesn't guarantee cmov, but we depend on it
elif line.startswith("flags") and line.find("cmov") == -1:
return "i586"
return arch
def getCanonPPCArch(arch):
# FIXME: should I do better handling for mac, etc?
if arch != "ppc64":
return arch
machine = None
f = open("/proc/cpuinfo", "r")
lines = f.readlines()
f.close()
for line in lines:
if line.find("machine") != -1:
machine = line.split(':')[1]
break
if machine is None:
return arch
if machine.find("CHRP IBM") != -1:
return "ppc64pseries"
if machine.find("iSeries") != -1:
return "ppc64iseries"
return arch
def getCanonX86_64Arch(arch):
if arch != "x86_64":
return arch
vendor = None
f = open("/proc/cpuinfo", "r")
lines = f.readlines()
f.close()
for line in lines:
if line.startswith("vendor_id"):
vendor = line.split(':')[1]
break
if vendor is None:
return arch
if vendor.find("Authentic AMD") != -1:
return "amd64"
if vendor.find("GenuineIntel") != -1:
return "ia32e"
return arch
def getCanonArch(skipRpmPlatform = 0):
if not skipRpmPlatform and os.access("/etc/rpm/platform", os.R_OK):
try:
f = open("/etc/rpm/platform", "r")
line = f.readline()
f.close()
(arch, vendor, opersys) = line.split("-", 2)
return arch
except:
pass
arch = os.uname()[4]
if (len(arch) == 4 and arch[0] == "i" and arch[2:4] == "86"):
return getCanonX86Arch(arch)
if arch.startswith("ppc"):
return getCanonPPCArch(arch)
if arch == "x86_64":
return getCanonX86_64Arch(arch)
return arch
# this gets you the "compat" arch of a biarch pair
def getMultiArchInfo(arch = getCanonArch()):
if multilibArches.has_key(arch):
return multilibArches[arch]
if arches.has_key(arch) and arches[arch] != "noarch":
return getMultiArchInfo(arch = arches[arch])
return None
# get the best usual userspace arch for the arch we're on. this is
# our arch unless we're on an arch that uses the secondary as its
# userspace (eg ppc64, sparc64)
def getBestArch():
arch = canonArch
if arch == "sparc64":
arch = "sparc"
if arch.startswith("ppc64"):
arch = "ppc"
return arch
def getBaseArch(myarch=None):
"""returns 'base' arch for myarch, if specified, or canonArch if not.
base arch is the arch before noarch in the arches dict if myarch is not
a key in the multilibArches."""
if not myarch:
myarch = getCanonArch()
if not arches.has_key(myarch): # this is dumb, but <shrug>
return myarch
if myarch == "sparc64":
return "sparc"
elif myarch.startswith("ppc64"):
return "ppc"
if isMultiLibArch(arch=myarch):
if multilibArches.has_key(myarch):
return myarch
else:
return arches[myarch]
if arches.has_key(myarch):
basearch = myarch
value = arches[basearch]
while value != 'noarch':
basearch = value
value = arches[basearch]
return basearch
canonArch = getCanonArch()
--- NEW FILE miscutils.py ---
#!/usr/bin/python -tt
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Library General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
# Copyright 2003 Duke University
import string
import rpm
import types
import gzip
import os
import sys
import locale
import rpmUtils.transaction
def rpmOutToStr(arg):
if type(arg) != types.StringType:
# and arg is not None:
arg = str(arg)
return arg
def compareEVR((e1, v1, r1), (e2, v2, r2)):
# return 1: a is newer than b
# 0: a and b are the same version
# -1: b is newer than a
e1 = rpmOutToStr(e1)
v1 = rpmOutToStr(v1)
r1 = rpmOutToStr(r1)
e2 = rpmOutToStr(e2)
v2 = rpmOutToStr(v2)
r2 = rpmOutToStr(r2)
#print '%s, %s, %s vs %s, %s, %s' % (e1, v1, r1, e2, v2, r2)
rc = rpm.labelCompare((e1, v1, r1), (e2, v2, r2))
#print '%s, %s, %s vs %s, %s, %s = %s' % (e1, v1, r1, e2, v2, r2, rc)
return rc
def checkSig(ts, package):
"""Takes a transaction set and a package, check it's sigs,
return 0 if they are all fine
return 1 if the gpg key can't be found
return 2 if the header is in someway damaged
return 3 if the key is not trusted
return 4 if the pkg is not gpg or pgp signed"""
value = 0
currentflags = ts.setVSFlags(0)
fdno = os.open(package, os.O_RDONLY)
try:
hdr = ts.hdrFromFdno(fdno)
except rpm.error, e:
if str(e) == "public key not availaiable":
value = 1
if str(e) == "public key not available":
value = 1
if str(e) == "public key not trusted":
value = 3
if str(e) == "error reading package header":
value = 2
else:
error, siginfo = getSigInfo(hdr)
if error == 101:
os.close(fdno)
del hdr
value = 4
else:
del hdr
try:
os.close(fdno)
except OSError, e: # if we're not opened, don't scream about it
pass
ts.setVSFlags(currentflags) # put things back like they were before
return value
def getSigInfo(hdr):
"""checks signature from an hdr hand back signature information and/or
an error code"""
locale.setlocale(locale.LC_ALL, 'C')
string = '%|DSAHEADER?{%{DSAHEADER:pgpsig}}:{%|RSAHEADER?{%{RSAHEADER:pgpsig}}:{%|SIGGPG?{%{SIGGPG:pgpsig}}:{%|SIGPGP?{%{SIGPGP:pgpsig}}:{(none)}|}|}|}|'
siginfo = hdr.sprintf(string)
if siginfo != '(none)':
error = 0
sigtype, sigdate, sigid = siginfo.split(',')
else:
error = 101
sigtype = 'MD5'
sigdate = 'None'
sigid = 'None'
infotuple = (sigtype, sigdate, sigid)
return error, infotuple
def pkgTupleFromHeader(hdr):
"""return a pkgtuple (n, a, e, v, r) from a hdr object, converts
None epoch to 0, as well."""
name = hdr['name']
arch = hdr['arch']
ver = hdr['version']
rel = hdr['release']
epoch = hdr['epoch']
if epoch is None:
epoch = '0'
pkgtuple = (name, arch, epoch, ver, rel)
return pkgtuple
def rangeCheck(reqtuple, pkgtuple):
"""returns true if the package epoch-ver-rel satisfy the range
requested in the reqtuple:
ex: foo >= 2.1-1"""
# we only ever get here if we have a versioned prco
# nameonly shouldn't ever raise it
(reqn, reqf, (reqe, reqv, reqr)) = reqtuple
(n, a, e, v, r) = pkgtuple
#simple failures
if reqn != n: return 0
# and you thought we were done having fun
# if the requested release is left out then we have
# to remove release from the package prco to make sure the match
# is a success - ie: if the request is EQ foo 1:3.0.0 and we have
# foo 1:3.0.0-15 then we have to drop the 15 so we can match
if reqr is None:
r = None
if reqe is None:
e = None
if reqv is None: # just for the record if ver is None then we're going to segfault
v = None
rc = compareEVR((e, v, r), (reqe, reqv, reqr))
if rc >= 1:
if reqf in ['GT', 'GE', 4, 12]:
return 1
if rc == 0:
if reqf in ['GE', 'LE', 'EQ', 8, 10, 12]:
return 1
if rc <= -1:
if reqf in ['LT', 'LE', 2, 10]:
return 1
return 0
###########
# Title: Remove duplicates from a sequence
# Submitter: Tim Peters
# From: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/52560
def unique(s):
"""Return a list of the elements in s, but without duplicates.
For example, unique([1,2,3,1,2,3]) is some permutation of [1,2,3],
unique("abcabc") some permutation of ["a", "b", "c"], and
unique(([1, 2], [2, 3], [1, 2])) some permutation of
[[2, 3], [1, 2]].
For best speed, all sequence elements should be hashable. Then
unique() will usually work in linear time.
If not possible, the sequence elements should enjoy a total
ordering, and if list(s).sort() doesn't raise TypeError it's
assumed that they do enjoy a total ordering. Then unique() will
usually work in O(N*log2(N)) time.
If that's not possible either, the sequence elements must support
equality-testing. Then unique() will usually work in quadratic
time.
"""
n = len(s)
if n == 0:
return []
# Try using a dict first, as that's the fastest and will usually
# work. If it doesn't work, it will usually fail quickly, so it
# usually doesn't cost much to *try* it. It requires that all the
# sequence elements be hashable, and support equality comparison.
u = {}
try:
for x in s:
u[x] = 1
except TypeError:
del u # move on to the next method
else:
return u.keys()
# We can't hash all the elements. Second fastest is to sort,
# which brings the equal elements together; then duplicates are
# easy to weed out in a single pass.
# NOTE: Python's list.sort() was designed to be efficient in the
# presence of many duplicate elements. This isn't true of all
# sort functions in all languages or libraries, so this approach
# is more effective in Python than it may be elsewhere.
try:
t = list(s)
t.sort()
except TypeError:
del t # move on to the next method
else:
assert n > 0
last = t[0]
lasti = i = 1
while i < n:
if t[i] != last:
t[lasti] = last = t[i]
lasti += 1
i += 1
return t[:lasti]
# Brute force is all that's left.
u = []
for x in s:
if x not in u:
u.append(x)
return u
def splitFilename(filename):
"""pass in a standard style rpm fullname and it returns
a name, version, release, epoch, arch
aka foo-1.0-1.i386.rpm returns foo, 1.0, 1, i386
1:bar-9-123a.ia64.rpm returns bar, 9, 123a, 1, ia64
"""
if filename[-4:] == '.rpm':
filename = filename[:-4]
archIndex = string.rfind(filename, '.')
arch = filename[archIndex+1:]
relIndex = string.rfind(filename[:archIndex], '-')
rel = filename[relIndex+1:archIndex]
verIndex = string.rfind(filename[:relIndex], '-')
ver = filename[verIndex+1:relIndex]
epochIndex = string.find(filename, ':')
if epochIndex == -1:
epoch = ''
else:
epoch = filename[:epochIndex]
name = filename[epochIndex + 1:verIndex]
return name, ver, rel, epoch, arch
def rpm2cpio(fdno, out=sys.stdout, bufsize=2048):
"""Performs roughly the equivalent of rpm2cpio(8).
Reads the package from fdno, and dumps the cpio payload to out,
using bufsize as the buffer size."""
ts = rpmUtils.transaction.initReadOnlyTransaction()
hdr = ts.hdrFromFdno(fdno)
del ts
compr = hdr[rpm.RPMTAG_PAYLOADCOMPRESSOR] or 'gzip'
#XXX FIXME
#if compr == 'bzip2':
# TODO: someone implement me!
#el
if compr != 'gzip':
raise rpmUtils.RpmUtilsError, \
'Unsupported payload compressor: "%s"' % compr
f = gzip.GzipFile(None, 'rb', None, os.fdopen(fdno, 'rb', bufsize))
while 1:
tmp = f.read(bufsize)
if tmp == "": break
out.write(tmp)
f.close()
def formatRequire (name, version, flags):
s = name
if flags:
if flags & (rpm.RPMSENSE_LESS | rpm.RPMSENSE_GREATER |
rpm.RPMSENSE_EQUAL):
s = s + " "
if flags & rpm.RPMSENSE_LESS:
s = s + "<"
if flags & rpm.RPMSENSE_GREATER:
s = s + ">"
if flags & rpm.RPMSENSE_EQUAL:
s = s + "="
if version:
s = "%s %s" %(s, version)
return s
def stringToVersion(verstring):
if verstring is None:
return (None, None, None)
i = string.find(verstring, ':')
if i != -1:
try:
epoch = string.atol(verstring[:i])
except ValueError:
# look, garbage in the epoch field, how fun, kill it
epoch = '0' # this is our fallback, deal
else:
epoch = '0'
j = string.find(verstring, '-')
if j != -1:
if verstring[i + 1:j] == '':
version = None
else:
version = verstring[i + 1:j]
release = verstring[j + 1:]
else:
if verstring[i + 1:] == '':
version = None
else:
version = verstring[i + 1:]
release = None
return (epoch, version, release)
def hdrFromPackage(ts, package):
"""hand back the rpm header or raise an Error if the pkg is fubar"""
try:
fdno = os.open(package, os.O_RDONLY)
except OSError, e:
raise rpmUtils.RpmUtilsError, 'Unable to open file'
try:
hdr = ts.hdrFromFdno(fdno)
except rpm.error, e:
os.close(fdno)
raise rpmUtils.RpmUtilsError, "RPM Error opening Package"
if type(hdr) != rpm.hdr:
os.close(fdno)
raise rpmUtils.RpmUtilsError, "RPM Error opening Package"
os.close(fdno)
return hdr
--- NEW FILE oldUtils.py ---
#!/usr/bin/python -tt
import rpm
import types
import os
import gzip
import sys
from gzip import write32u, FNAME
def _(msg):
return msg
def checkheader(headerfile, name, arch):
"""check a header by opening it and comparing the results to the name and arch
we believe it to be for. if it fails raise URLGrabError(-1)"""
h = Header_Work(headerfile)
fail = 0
if h.hdr is None:
fail = 1
else:
if name != h.name() or arch != h.arch():
fail = 1
if fail:
raise URLGrabError(-1, _('Header cannot be opened or does not match %s, %s.') % (name, arch))
return
def checkRpmMD5(package, urlgraberror=0):
"""take a package, check it out by trying to open it, return 1 if its good
return 0 if it's not"""
ts.sigChecking('md5')
fdno = os.open(package, os.O_RDONLY)
try:
ts.hdrFromFdno(fdno)
except rpm.error, e:
good = 0
else:
good = 1
os.close(fdno)
ts.sigChecking('default')
if urlgraberror:
if not good:
raise URLGrabError(-1, _('RPM %s fails md5 check') % (package))
else:
return
else:
return good
def checkSig(package):
""" take a package, check it's sigs, return 0 if they are all fine, return
1 if the gpg key can't be found, 2 if the header is in someway damaged,
3 if the key is not trusted, 4 if the pkg is not gpg or pgp signed"""
ts.sigChecking('default')
fdno = os.open(package, os.O_RDONLY)
try:
hdr = ts.hdrFromFdno(fdno)
except rpm.error, e:
if str(e) == "public key not availaiable":
return 1
if str(e) == "public key not available":
return 1
if str(e) == "public key not trusted":
return 3
if str(e) == "error reading package header":
return 2
else:
error, siginfo = getSigInfo(hdr)
if error == 101:
os.close(fdno)
del hdr
return 4
else:
del hdr
os.close(fdno)
return 0
def getSigInfo(hdr):
"""checks if a computerhand back signature information and an error code"""
string = '%|DSAHEADER?{%{DSAHEADER:pgpsig}}:{%|RSAHEADER?{%{RSAHEADER:pgpsig}}:{%|SIGGPG?{%{SIGGPG:pgpsig}}:{%|SIGPGP?{%{SIGPGP:pgpsig}}:{(none)}|}|}|}|'
siginfo = hdr.sprintf(string)
if siginfo != '(none)':
error = 0
sigtype, sigdate, sigid = siginfo.split(',')
else:
error = 101
sigtype = 'MD5'
sigdate = 'None'
sigid = 'None'
infotuple = (sigtype, sigdate, sigid)
return error, infotuple
def getProvides(header):
provnames = []
provides = header[rpm.RPMTAG_PROVIDENAME]
if provides is None:
pass
elif type(provides) is types.ListType:
provnames.extend(provides)
else:
provnames.append(provides)
return provnames
def compareEVR((e1, v1, r1), (e2, v2, r2)):
# return 1: a is newer than b
# 0: a and b are the same version
# -1: b is newer than a
def rpmOutToStr(arg):
if type(arg) != types.StringType and arg != None:
arg = str(arg)
return arg
e1 = rpmOutToStr(e1)
v1 = rpmOutToStr(v1)
r1 = rpmOutToStr(r1)
e2 = rpmOutToStr(e2)
v2 = rpmOutToStr(v2)
r2 = rpmOutToStr(r2)
rc = rpm.labelCompare((e1, v1, r1), (e2, v2, r2))
log(6, '%s, %s, %s vs %s, %s, %s = %s' % (e1, v1, r1, e2, v2, r2, rc))
return rc
def formatRequire (name, version, flags):
if flags:
if flags & (rpm.RPMSENSE_LESS | rpm.RPMSENSE_GREATER | rpm.RPMSENSE_EQUAL):
name = name + ' '
if flags & rpm.RPMSENSE_LESS:
name = name + '<'
if flags & rpm.RPMSENSE_GREATER:
name = name + '>'
if flags & rpm.RPMSENSE_EQUAL:
name = name + '='
name = name + ' %s' % version
return name
def openrpmdb():
try:
db = rpm.TransactionSet(conf.installroot)
except rpm.error, e:
errorlog(0, _("Could not open RPM database for reading. Perhaps it is already in use?"))
return db
# this is done to make the hdr writing _more_ sane for rsync users especially
__all__ = ["GzipFile","open"]
class GzipFile(gzip.GzipFile):
def _write_gzip_header(self):
self.fileobj.write('\037\213') # magic header
self.fileobj.write('\010') # compression method
fname = self.filename[:-3]
flags = 0
if fname:
flags = FNAME
self.fileobj.write(chr(flags))
write32u(self.fileobj, long(0))
self.fileobj.write('\002')
self.fileobj.write('\377')
if fname:
self.fileobj.write(fname + '\000')
def _gzipOpen(filename, mode="rb", compresslevel=9):
return GzipFile(filename, mode, compresslevel)
class RPM_Base_Work:
def _getTag(self, tag):
if self.hdr is None:
errorlog(0, _('Got an empty Header, something has gone wrong'))
#FIXME should raise a yum error here
sys.exit(1)
return self.hdr[tag]
def isSource(self):
if self._getTag('sourcepackage') == 1:
return 1
else:
return 0
def name(self):
return self._getTag('name')
def arch(self):
return self._getTag('arch')
def epoch(self):
return self._getTag('epoch')
def version(self):
return self._getTag('version')
def release(self):
return self._getTag('release')
def evr(self):
e = self._getTag('epoch')
v = self._getTag('version')
r = self._getTag('release')
return (e, v, r)
def nevra(self):
n = self._getTag('name')
e = self._getTag('epoch')
v = self._getTag('version')
r = self._getTag('release')
a = self._getTag('arch')
return (n, e, v, r, a)
def writeHeader(self, headerdir, compress):
# write the header out to a file with the format: name-epoch-ver-rel.arch.hdr
# return the name of the file it just made - no real reason :)
(name, epoch, ver, rel, arch) = self.nevra()
if epoch is None:
epoch = '0'
if self.isSource():
headerfn = "%s/%s-%s-%s-%s.src.hdr" % (headerdir, name, epoch, ver, rel)
else:
headerfn = "%s/%s-%s-%s-%s.%s.hdr" % (headerdir, name, epoch, ver, rel, arch)
if compress:
headerout = _gzipOpen(headerfn, "w")
else:
headerout = open(headerfn, "w")
headerout.write(self.hdr.unload(1))
headerout.close()
return(headerfn)
class Header_Work(RPM_Base_Work):
"""for operating on hdrs in and out of the rpmdb
if the first arg is a string then it's a filename
otherwise it's an rpm hdr"""
def __init__(self, header):
if type(header) is types.StringType:
try:
fd = gzip.open(header, 'r')
try:
h = rpm.headerLoad(fd.read())
except rpm.error, e:
errorlog(0,_('Damaged Header %s') % header)
h = None
except IOError,e:
fd = open(header, 'r')
try:
h = rpm.headerLoad(fd.read())
except rpm.error, e:
errorlog(0,_('Damaged Header %s') % header)
h = None
except ValueError, e:
errorlog(0,_('Damaged Header %s') % header)
h = None
except zlibError, e:
errorlog(0,_('Damaged Header %s') % header)
h = None
fd.close()
else:
h = header
self.hdr = h
class RPM_Work(RPM_Base_Work):
def __init__(self, rpmfn):
ts.setVSFlags(~(rpm._RPMVSF_NOSIGNATURES))
fd = os.open(rpmfn, os.O_RDONLY)
try:
self.hdr = ts.hdrFromFdno(fd)
except rpm.error, e:
errorlog(0, _('Error opening rpm %s - error %s') % (rpmfn, e))
self.hdr = None
os.close(fd)
class Rpm_Ts_Work:
"""This should operate on groups of headers/matches/etc in the rpmdb - ideally it will
operate with a list of the Base objects above, so I can refer to any one object there
not sure the best way to do this yet, more thinking involved"""
def __init__(self, dbPath='/'):
try:
if conf.installroot:
if conf.installroot != '/':
dbPath = conf.installroot
except NameError, e:
pass
self.ts = rpm.TransactionSet(dbPath)
self.methods = ['addInstall', 'addErase', 'run', 'check', 'order', 'hdrFromFdno',
'closeDB', 'dbMatch', 'setFlags', 'setVSFlags', 'setProbFilter']
def __getattr__(self, attribute):
if attribute in self.methods:
return getattr(self.ts, attribute)
else:
raise AttributeError, attribute
def match(self, tag = None, search = None, mire = None):
"""hands back a list of Header_Work objects"""
hwlist = []
# hand back the whole list of hdrs
if mire is None and tag is None and search is None:
hdrlist = self.ts.dbMatch()
else:
#just do a non-mire'd search
if mire == None:
hdrlist = self.ts.dbMatch(tag, search)
else:
# mire search
if mire == 'glob':
hdrlist = self.ts.dbMatch()
hdrlist.pattern(tag, rpm.RPMMIRE_GLOB, search)
elif mire == 'regex':
hdrlist = self.ts.dbMatch()
hdrlist.pattern(tag, rpm.RPMMIRE_REGEX, search)
elif mire == 'strcmp':
hdrlist = self.ts.dbMatch()
hdrlist.pattern(tag, rpm.RPMMIRE_STRCMP, search)
else:
hdrlist = self.ts.dbMatch()
hdrlist.pattern(tag, rpm.RPMMIRE_DEFAULT, search)
for hdr in hdrlist:
hdrobj = Header_Work(hdr)
hwlist.append(hdrobj)
return hwlist
def sigChecking(self, sig):
"""pass type of check you want to occur, default is to have them off"""
if sig == 'md5':
#turn off everything but md5 - and we need to the check the payload
self.ts.setVSFlags(~(rpm.RPMVSF_NOMD5|rpm.RPMVSF_NEEDPAYLOAD))
elif sig == 'none':
# turn off everything - period
self.ts.setVSFlags(~(rpm._RPMVSF_NOSIGNATURES))
elif sig == 'default':
# set it back to the default
self.ts.setVSFlags(rpm.RPMVSF_DEFAULT)
else:
raise AttributeError, sig
--- NEW FILE transaction.py ---
#!/usr/bin/python
#
# Client code for Update Agent
# Copyright (c) 1999-2002 Red Hat, Inc. Distributed under GPL.
#
# Adrian Likins <alikins at redhat.com>
# Some Edits by Seth Vidal <skvidal at phy.duke.edu>
#
# a couple of classes wrapping up transactions so that we
# can share transactions instead of creating new ones all over
#
import rpm
import miscutils
from sets import Set
read_ts = None
ts = None
# wrapper/proxy class for rpm.Transaction so we can
# instrument it, etc easily
class TransactionWrapper:
def __init__(self, root='/'):
self.ts = rpm.TransactionSet(root)
self._methods = ['dbMatch',
'check',
'order',
'addErase',
'addInstall',
'run',
'IDTXload',
'IDTXglob',
'rollback',
'pgpImportPubkey',
'pgpPrtPkts',
'Debug',
'setFlags',
'setVSFlags',
'setProbFilter',
'hdrFromFdno',
'next',
'clean']
self.tsflags = []
def __getattr__(self, attr):
if attr in self._methods:
return self.getMethod(attr)
else:
raise AttributeError, attr
def __iter__(self):
return self.ts
def getMethod(self, method):
# in theory, we can override this with
# profile/etc info
return getattr(self.ts, method)
# push/pop methods so we dont lose the previous
# set value, and we can potentiall debug a bit
# easier
def pushVSFlags(self, flags):
self.tsflags.append(flags)
self.ts.setVSFlags(self.tsflags[-1])
def popVSFlags(self):
del self.tsflags[-1]
self.ts.setVSFlags(self.tsflags[-1])
def addTsFlag(self, flag):
curflags = self.ts.setFlags(0)
self.ts.setFlags(curflags | flag)
def test(self, cb, conf={}):
"""tests the ts we've setup, takes a callback function and a conf dict
for flags and what not"""
#FIXME
# I don't like this function - it should test, sure - but not
# with this conf dict, we should be doing that beforehand and
# we should be packing this information away elsewhere.
self.addTsFlag(rpm.RPMTRANS_FLAG_TEST)
if conf.has_key('diskspacecheck'):
if conf['diskspacecheck'] == 0:
self.ts.setProbFilter(rpm.RPMPROB_FILTER_DISKSPACE)
tserrors = self.ts.run(cb.callback, '')
reserrors = []
if tserrors:
for (descr, (etype, mount, need)) in tserrors:
reserrors.append(descr)
return reserrors
def returnLeafNodes(self):
"""returns a list of package tuples (n,a,e,v,r) that are not required by
any other package on the system"""
req = {}
orphan = []
mi = self.dbMatch()
if mi is None: # this is REALLY unlikely but let's just say it for the moment
return orphan
for h in mi:
tup = miscutils.pkgTupleFromHeader(h)
if not h[rpm.RPMTAG_REQUIRENAME]:
continue
for r in h[rpm.RPMTAG_REQUIRENAME]:
if not req.has_key(r):
req[r] = Set()
req[r].add(tup)
mi = self.dbMatch()
if mi is None:
return orphan
for h in mi:
preq = 0
tup = miscutils.pkgTupleFromHeader(h)
for p in h[rpm.RPMTAG_PROVIDES] + h[rpm.RPMTAG_FILENAMES]:
if req.has_key(p):
# Don't count a package that provides its require
s = req[p]
if len(s) > 1 or tup not in s:
preq = preq + 1
if preq == 0:
orphan.append(tup)
return orphan
def initReadOnlyTransaction(root='/'):
read_ts = TransactionWrapper(root=root)
read_ts.pushVSFlags((rpm._RPMVSF_NOSIGNATURES|rpm._RPMVSF_NODIGESTS))
return read_ts
--- NEW FILE updates.py ---
#!/usr/bin/python -tt
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Library General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
# Copyright 2004 Duke University
import rpmUtils.miscutils
import rpmUtils.arch
import rpmUtils
class Updates:
"""This class computes and keeps track of updates and obsoletes.
initialize, add installed packages, add available packages (both as
unique lists of name, epoch, ver, rel, arch tuples), add an optional dict
of obsoleting packages with obsoletes and what they obsolete ie:
foo, i386, 0, 1.1, 1: bar >= 1.1."""
def __init__(self, instlist, availlist):
self.changeTup = [] # storage list tuple of updates or obsoletes
# (oldpkg, newpkg, ['update'|'obsolete'])
self.installed = instlist # list of installed pkgs (n, a, e, v, r)
self.available = availlist # list of available pkgs (n, a, e, v, r)
self.rawobsoletes = {} # dict of obsoleting package->[what it obsoletes]
self.exactarch = 1 # don't change archs by default
self.exactarchlist = ['kernel', 'kernel-smp', 'glibc', 'kernel-hugemem',
'kernel-enterprise', 'kernel-bigmem', 'kernel-BOOT']
self.myarch = rpmUtils.arch.getCanonArch() # this is for debugging only
# set this if you want to
# test on some other arch
# otherwise leave it alone
# make some dicts from installed and available
self.installdict = self.makeNADict(self.installed, 1)
self.availdict = self.makeNADict(self.available, 1)
# holder for our updates dict
self.updatesdict = {}
#debug, ignore me
self.debug = 0
def debugprint(self, msg):
if self.debug:
print msg
def makeNADict(self, pkglist, Nonelists):
"""return lists of (e,v,r) tuples as value of a dict keyed on (n, a)
optionally will return a (n, None) entry with all the a for that
n in tuples of (a,e,v,r)"""
returndict = {}
for (n, a, e, v, r) in pkglist:
if not returndict.has_key((n, a)):
returndict[(n, a)] = []
returndict[(n, a)].append((e,v,r))
if Nonelists:
if not returndict.has_key((n, None)):
returndict[(n, None)] = []
returndict[(n, None)].append((a, e, v, r))
return returndict
def returnNewest(self, evrlist):
"""takes a list of (e, v, r) tuples and returns the newest one"""
if len(evrlist)==0:
raise rpmUtils.RpmUtilsError, "Zero Length List in returnNewest call"
if len(evrlist)==1:
return evrlist[0]
(new_e, new_v, new_r) = evrlist[0] # we'll call the first ones 'newest'
for (e, v, r) in evrlist[1:]:
rc = rpmUtils.miscutils.compareEVR((e, v, r), (new_e, new_v, new_r))
if rc > 0:
new_e = e
new_v = v
new_r = r
return (new_e, new_v, new_r)
def returnHighestVerFromAllArchsByName(self, name, archlist, pkglist):
"""returns a list of package tuples in a list (n, a, e, v, r)
takes a package name, a list of archs, and a list of pkgs in
(n, a, e, v, r) form."""
# go through list and throw out all pkgs not in archlist
matchlist = []
for (n, a, e, v, r) in pkglist:
if name == n:
if a in archlist:
matchlist.append((n, a, e, v, r))
if len(matchlist) == 0:
return []
# get all the evr's in a tuple list for returning the highest
verlist = []
for (n, a, e, v, r) in matchlist:
verlist.append((e,v,r))
(high_e, high_v, high_r) = self.returnNewest(verlist)
returnlist = []
for (n, a, e, v, r) in matchlist:
if (high_e, high_v, high_r) == (e, v, r):
returnlist.append((n,a,e,v,r))
return returnlist
def condenseUpdates(self):
"""remove any accidental duplicates in updates"""
for tup in self.updatesdict.keys():
if len(self.updatesdict[tup]) > 1:
mylist = self.updatesdict[tup]
self.updatesdict[tup] = rpmUtils.miscutils.unique(mylist)
def checkForObsolete(self, pkglist, newest=1):
"""accept a list of packages to check to see if anything obsoletes them
return an obsoleted_dict in the format of makeObsoletedDict"""
obsdict = {} # obseleting package > [obsoleted package]
pkgdict = self.makeNADict(pkglist, 1)
# this needs to keep arch in mind
# if foo.i386 obsoletes bar
# it needs to obsoletes bar.i386 preferentially, not bar.x86_64
# if there is only one bar and only one foo then obsolete it, but try to
# match the arch.
# look through all the obsoleting packages look for multiple archs per name
# if you find it look for the packages they obsolete
#
for pkgtup in self.rawobsoletes.keys():
(name, arch, epoch, ver, rel) = pkgtup
for (obs_n, flag, (obs_e, obs_v, obs_r)) in self.rawobsoletes[(pkgtup)]:
if flag in [None, 0]: # unversioned obsolete
if pkgdict.has_key((obs_n, None)):
for (rpm_a, rpm_e, rpm_v, rpm_r) in pkgdict[(obs_n, None)]:
if not obsdict.has_key(pkgtup):
obsdict[pkgtup] = []
obsdict[pkgtup].append((obs_n, rpm_a, rpm_e, rpm_v, rpm_r))
else: # versioned obsolete
if pkgdict.has_key((obs_n, None)):
for (rpm_a, rpm_e, rpm_v, rpm_r) in pkgdict[(obs_n, None)]:
if rpmUtils.miscutils.rangeCheck((obs_n, flag, (obs_e, \
obs_v, obs_r)), (obs_n,\
rpm_a, rpm_e, rpm_v, rpm_r)):
# make sure the obsoleting pkg is not already installed
if not obsdict.has_key(pkgtup):
obsdict[pkgtup] = []
obsdict[pkgtup].append((obs_n, rpm_a, rpm_e, rpm_v, rpm_r))
obslist = obsdict.keys()
if newest:
obslist = self._reduceListNewestByNameArch(obslist)
returndict = {}
for new in obslist:
for old in obsdict[new]:
if not returndict.has_key(old):
returndict[old] = []
returndict[old].append(new)
return returndict
def doObsoletes(self):
"""figures out what things available obsolete things installed, returns
them in a dict attribute of the class."""
obsdict = {} # obseleting package -> [obsoleted package]
# this needs to keep arch in mind
# if foo.i386 obsoletes bar
# it needs to obsoletes bar.i386 preferentially, not bar.x86_64
# if there is only one bar and only one foo then obsolete it, but try to
# match the arch.
# look through all the obsoleting packages look for multiple archs per name
# if you find it look for the packages they obsolete
#
for pkgtup in self.rawobsoletes.keys():
(name, arch, epoch, ver, rel) = pkgtup
for (obs_n, flag, (obs_e, obs_v, obs_r)) in self.rawobsoletes[(pkgtup)]:
if flag in [None, 0]: # unversioned obsolete
if self.installdict.has_key((obs_n, None)):
for (rpm_a, rpm_e, rpm_v, rpm_r) in self.installdict[(obs_n, None)]:
# make sure the obsoleting pkg is not already installed
willInstall = 1
if self.installdict.has_key((name, None)):
for (ins_a, ins_e, ins_v, ins_r) in self.installdict[(name, None)]:
pkgver = (epoch, ver, rel)
installedver = (ins_e, ins_v, ins_r)
if self.returnNewest((pkgver, installedver)) == installedver:
willInstall = 0
break
if willInstall:
if not obsdict.has_key(pkgtup):
obsdict[pkgtup] = []
obsdict[pkgtup].append((obs_n, rpm_a, rpm_e, rpm_v, rpm_r))
else: # versioned obsolete
if self.installdict.has_key((obs_n, None)):
for (rpm_a, rpm_e, rpm_v, rpm_r) in self.installdict[(obs_n, None)]:
if rpmUtils.miscutils.rangeCheck((obs_n, flag, (obs_e, \
obs_v, obs_r)), (obs_n,\
rpm_a, rpm_e, rpm_v, rpm_r)):
# make sure the obsoleting pkg is not already installed
willInstall = 1
if self.installdict.has_key((name, None)):
for (ins_a, ins_e, ins_v, ins_r) in self.installdict[(name, None)]:
pkgver = (epoch, ver, rel)
installedver = (ins_e, ins_v, ins_r)
if self.returnNewest((pkgver, installedver)) == installedver:
willInstall = 0
break
if willInstall:
if not obsdict.has_key(pkgtup):
obsdict[pkgtup] = []
obsdict[pkgtup].append((obs_n, rpm_a, rpm_e, rpm_v, rpm_r))
self.obsoletes = obsdict
self.makeObsoletedDict()
def makeObsoletedDict(self):
"""creates a dict of obsoleted packages -> [obsoleting package], this
is to make it easier to look up what package obsoletes what item in
the rpmdb"""
self.obsoleted_dict = {}
for new in self.obsoletes.keys():
for old in self.obsoletes[new]:
if not self.obsoleted_dict.has_key(old):
self.obsoleted_dict[old] = []
self.obsoleted_dict[old].append(new)
def doUpdates(self):
"""check for key lists as populated then commit acts of evil to
determine what is updated and/or obsoleted, populate self.updatesdict
"""
# best bet is to chew through the pkgs and throw out the new ones early
# then deal with the ones where there are a single pkg installed and a
# single pkg available
# then deal with the multiples
# we should take the whole list as a 'newlist' and remove those entries
# which are clearly:
# 1. updates
# 2. identical to the ones in ourdb
# 3. not in our archdict at all
simpleupdate = []
complexupdate = []
updatedict = {} # (old n, a, e, v, r) : [(new n, a, e, v, r)]
# make the new ones a list b/c while we _shouldn't_
# have multiple updaters, we might and well, it needs
# to be solved one way or the other <sigh>
newpkgs = []
newpkgs = self.availdict
archlist = rpmUtils.arch.getArchList(self.myarch)
for (n, a) in newpkgs.keys():
# remove stuff not in our archdict
# high log here
if a is None:
for (arch, e,v,r) in newpkgs[(n, a)]:
if arch not in archlist:
newpkgs[(n, a)].remove((arch, e,v,r))
continue
if a not in archlist:
# high log here
del newpkgs[(n, a)]
continue
# remove the older stuff - if we're doing an update we only want the
# newest evrs
for (n, a) in newpkgs.keys():
if a is None:
continue
(new_e,new_v,new_r) = self.returnNewest(newpkgs[(n, a)])
for (e, v, r) in newpkgs[(n, a)]:
if (new_e, new_v, new_r) != (e, v, r):
newpkgs[(n, a)].remove((e, v, r))
for (n, a) in newpkgs.keys():
if a is None: # the None archs are only for lookups
continue
# simple ones - look for exact matches or older stuff
if self.installdict.has_key((n, a)):
for (rpm_e, rpm_v, rpm_r) in self.installdict[(n, a)]:
try:
(e, v, r) = self.returnNewest(newpkgs[(n,a)])
except rpmUtils.RpmUtilsError:
continue
else:
rc = rpmUtils.miscutils.compareEVR((e, v, r), (rpm_e, rpm_v, rpm_r))
if rc <= 0:
try:
newpkgs[(n, a)].remove((e, v, r))
except ValueError:
pass
# get rid of all the empty dict entries:
for nakey in newpkgs.keys():
if len(newpkgs[nakey]) == 0:
del newpkgs[nakey]
# ok at this point our newpkgs list should be thinned, we should have only
# the newest e,v,r's and only archs we can actually use
for (n, a) in newpkgs.keys():
if a is None: # the None archs are only for lookups
continue
if self.installdict.has_key((n, None)):
installarchs = []
availarchs = []
for (a, e, v ,r) in newpkgs[(n, None)]:
availarchs.append(a)
for (a, e, v, r) in self.installdict[(n, None)]:
installarchs.append(a)
if len(availarchs) > 1 or len(installarchs) > 1:
self.debugprint('putting %s in complex update' % n)
complexupdate.append(n)
else:
#log(4, 'putting %s in simple update list' % name)
self.debugprint('putting %s in simple update' % n)
simpleupdate.append((n, a))
# we have our lists to work with now
# simple cases
for (n, a) in simpleupdate:
# try to be as precise as possible
if n in self.exactarchlist:
if self.installdict.has_key((n, a)):
(rpm_e, rpm_v, rpm_r) = self.returnNewest(self.installdict[(n, a)])
if newpkgs.has_key((n,a)):
(e, v, r) = self.returnNewest(newpkgs[(n, a)])
rc = rpmUtils.miscutils.compareEVR((e, v, r), (rpm_e, rpm_v, rpm_r))
if rc > 0:
# this is definitely an update - put it in the dict
if not updatedict.has_key((n, a, rpm_e, rpm_v, rpm_r)):
updatedict[(n, a, rpm_e, rpm_v, rpm_r)] = []
updatedict[(n, a, rpm_e, rpm_v, rpm_r)].append((n, a, e, v, r))
else:
# we could only have 1 arch in our rpmdb and 1 arch of pkg
# available - so we shouldn't have to worry about the lists, here
# we just need to find the arch of the installed pkg so we can
# check it's (e, v, r)
(rpm_a, rpm_e, rpm_v, rpm_r) = self.installdict[(n, None)][0]
if newpkgs.has_key((n, None)):
for (a, e, v, r) in newpkgs[(n, None)]:
rc = rpmUtils.miscutils.compareEVR((e, v, r), (rpm_e, rpm_v, rpm_r))
if rc > 0:
# this is definitely an update - put it in the dict
if not updatedict.has_key((n, rpm_a, rpm_e, rpm_v, rpm_r)):
updatedict[(n, rpm_a, rpm_e, rpm_v, rpm_r)] = []
updatedict[(n, rpm_a, rpm_e, rpm_v, rpm_r)].append((n, a, e, v, r))
# complex cases
# we're multilib/biarch
# we need to check the name.arch in two different trees
# one for the multiarch itself and one for the compat arch
# ie: x86_64 and athlon(i686-i386) - we don't want to descend
# x86_64->i686
archlists = []
if rpmUtils.arch.isMultiLibArch(arch=self.myarch):
if rpmUtils.arch.multilibArches.has_key(self.myarch):
biarches = [self.myarch]
else:
biarches = [self.myarch, rpmUtils.arch.arches[self.myarch]]
multicompat = rpmUtils.arch.getMultiArchInfo(self.myarch)[0]
multiarchlist = rpmUtils.arch.getArchList(multicompat)
archlists = [ biarches, multiarchlist ]
else:
archlists = [ archlist ]
for n in complexupdate:
for thisarchlist in archlists:
# we need to get the highest version and the archs that have it
# of the installed pkgs
tmplist = []
for (a, e, v, r) in self.installdict[(n, None)]:
tmplist.append((n, a, e, v, r))
highestinstalledpkgs = self.returnHighestVerFromAllArchsByName(n,
thisarchlist, tmplist)
tmplist = []
for (a, e, v, r) in newpkgs[(n, None)]:
tmplist.append((n, a, e, v, r))
highestavailablepkgs = self.returnHighestVerFromAllArchsByName(n,
thisarchlist, tmplist)
hapdict = self.makeNADict(highestavailablepkgs, 0)
hipdict = self.makeNADict(highestinstalledpkgs, 0)
# now we have the two sets of pkgs
if n in self.exactarchlist:
for (n, a) in hipdict:
if hapdict.has_key((n, a)):
self.debugprint('processing %s.%s' % (n, a))
# we've got a match - get our versions and compare
(rpm_e, rpm_v, rpm_r) = hipdict[(n, a)][0] # only ever going to be first one
(e, v, r) = hapdict[(n, a)][0] # there can be only one
rc = rpmUtils.miscutils.compareEVR((e, v, r), (rpm_e, rpm_v, rpm_r))
if rc > 0:
# this is definitely an update - put it in the dict
if not updatedict.has_key((n, a, rpm_e, rpm_v, rpm_r)):
updatedict[(n, a, rpm_e, rpm_v, rpm_r)] = []
updatedict[(n, a, rpm_e, rpm_v, rpm_r)].append((n, a, e, v, r))
else:
self.debugprint('processing %s' % n)
# this is where we have to have an arch contest if there
# is more than one arch updating with the highest ver
instarchs = []
availarchs = []
for (n,a) in hipdict.keys():
instarchs.append(a)
for (n,a) in hapdict.keys():
availarchs.append(a)
rpm_a = rpmUtils.arch.getBestArchFromList(instarchs, myarch=self.myarch)
a = rpmUtils.arch.getBestArchFromList(availarchs, myarch=self.myarch)
if rpm_a is None or a is None:
continue
(rpm_e, rpm_v, rpm_r) = hipdict[(n, rpm_a)][0] # there can be just one
(e, v, r) = hapdict[(n, a)][0] # just one, I'm sure, I swear!
rc = rpmUtils.miscutils.compareEVR((e, v, r), (rpm_e, rpm_v, rpm_r))
if rc > 0:
# this is definitely an update - put it in the dict
if not updatedict.has_key((n, rpm_a, rpm_e, rpm_v, rpm_r)):
updatedict[(n, rpm_a, rpm_e, rpm_v, rpm_r)] = []
updatedict[(n, rpm_a, rpm_e, rpm_v, rpm_r)].append((n, a, e, v, r))
self.updatesdict = updatedict
self.makeUpdatingDict()
def makeUpdatingDict(self):
"""creates a dict of available packages -> [installed package], this
is to make it easier to look up what package will be updating what
in the rpmdb"""
self.updating_dict = {}
for old in self.updatesdict.keys():
for new in self.updatesdict[old]:
if not self.updating_dict.has_key(new):
self.updating_dict[new] = []
self.updating_dict[new].append(old)
def reduceListByNameArch(self, pkglist, name=None, arch=None):
"""returns a set of pkg naevr tuples reduced based on name or arch"""
returnlist = []
if name or arch:
for (n, a, e, v, r) in pkglist:
if name:
if name == n:
returnlist.append((n, a, e, v, r))
continue
if arch:
if arch == a:
returnlist.append((n, a, e, v, r))
continue
else:
returnlist = pkglist
return returnlist
def getUpdatesTuples(self, name=None, arch=None):
"""returns updates for packages in a list of tuples of:
(updating naevr, installed naevr)"""
returnlist = []
for oldtup in self.updatesdict.keys():
(old_n, old_a, old_e, old_v, old_r) = oldtup
for newtup in self.updatesdict[oldtup]:
returnlist.append((newtup, oldtup))
tmplist = []
if name:
for ((n, a, e, v, r), oldtup) in returnlist:
if name != n:
tmplist.append(((n, a, e, v, r), oldtup))
if arch:
for ((n, a, e, v, r), oldtup) in returnlist:
if arch != a:
tmplist.append(((n, a, e, v, r), oldtup))
for item in tmplist:
try:
returnlist.remove(item)
except ValueError:
pass
return returnlist
def getUpdatesList(self, name=None, arch=None):
"""returns updating packages in a list of (naevr) tuples"""
returnlist = []
for oldtup in self.updatesdict.keys():
for newtup in self.updatesdict[oldtup]:
returnlist.append(newtup)
returnlist = self.reduceListByNameArch(returnlist, name, arch)
return returnlist
def getObsoletesTuples(self, newest=0, name=None, arch=None):
"""returns obsoletes for packages in a list of tuples of:
(obsoleting naevr, installed naevr). You can specify name and/or
arch of the installed package to narrow the results.
You can also specify newest=1 to get the set of newest pkgs (name, arch)
sorted, that obsolete something"""
tmplist = []
obslist = self.obsoletes.keys()
if newest:
obslist = self._reduceListNewestByNameArch(obslist)
for obstup in obslist:
for rpmtup in self.obsoletes[obstup]:
tmplist.append((obstup, rpmtup))
returnlist = []
if name or arch:
for (obstup, (n, a, e, v, r)) in tmplist:
if name:
if name == n:
returnlist.append((obstup, (n, a, e, v, r)))
continue
if arch:
if arch == a:
returnlist.append((obstup, (n, a, e, v, r)))
continue
else:
returnlist = tmplist
return returnlist
def getObsoletesList(self, newest=0, name=None, arch=None):
"""returns obsoleting packages in a list of naevr tuples of just the
packages that obsolete something that is installed. You can specify
name and/or arch of the obsoleting packaging to narrow the results.
You can also specify newest=1 to get the set of newest pkgs (name, arch)
sorted, that obsolete something"""
tmplist = self.obsoletes.keys()
if newest:
tmplist = self._reduceListNewestByNameArch(tmplist)
returnlist = self.reduceListByNameArch(tmplist, name, arch)
return returnlist
def getObsoletedList(self, newest=0, name=None):
"""returns a list of pkgtuples obsoleting the package in name"""
returnlist = []
for new in self.obsoletes.keys():
for obstup in self.obsoletes[new]:
(n, a, e, v, r) = obstup
if n == name:
returnlist.append(new)
continue
return returnlist
def getOthersList(self, name=None, arch=None):
"""returns a naevr tuple of the packages that are neither installed
nor an update - this may include something that obsoletes an installed
package"""
updates = {}
inst = {}
tmplist = []
for pkgtup in self.getUpdatesList():
updates[pkgtup] = 1
for pkgtup in self.installed:
inst[pkgtup] = 1
for pkgtup in self.available:
if not updates.has_key(pkgtup) and not inst.has_key(pkgtup):
tmplist.append(pkgtup)
returnlist = self.reduceListByNameArch(tmplist, name, arch)
return returnlist
def _reduceListNewestByNameArch(self, tuplelist):
"""return list of newest packages based on name, arch matching
this means(in name.arch form): foo.i386 and foo.noarch are not
compared to each other for highest version only foo.i386 and
foo.i386 will be compared"""
highdict = {}
for pkgtup in tuplelist:
(n, a, e, v, r) = pkgtup
if not highdict.has_key((n, a)):
highdict[(n, a)] = pkgtup
else:
pkgtup2 = highdict[(n, a)]
(n2, a2, e2, v2, r2) = pkgtup2
rc = rpmUtils.miscutils.compareEVR((e,v,r), (e2, v2, r2))
if rc > 0:
highdict[(n, a)] = pkgtup
return highdict.values()
# def getProblems(self):
# """return list of problems:
# - Packages that are both obsoleted and updated.
# - Packages that have multiple obsoletes.
# - Packages that _still_ have multiple updates
# """
- Previous message: extras-buildsys/utils/pushscript README,1.3,1.4
- Next message: extras-buildsys/utils/pushscript/repomd __init__.py, NONE, 1.1 mdErrors.py, NONE, 1.1 mdUtils.py, NONE, 1.1 packageObject.py, NONE, 1.1 packageSack.py, NONE, 1.1 repoMDObject.py, NONE, 1.1 test.py, NONE, 1.1
- Messages sorted by:
[ date ]
[ thread ]
[ subject ]
[ author ]
More information about the scm-commits
mailing list