extras-repoclosure HistoryTests.py,NONE,1.1 History.py,NONE,1.1

Michael Schwendt (mschwendt) fedora-extras-commits at redhat.com
Tue Jan 9 11:59:16 UTC 2007


Author: mschwendt

Update of /cvs/fedora/extras-repoclosure
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv1972

Added Files:
	HistoryTests.py History.py 
Log Message:
split off history module and add a few tests for it



--- NEW FILE HistoryTests.py ---
#!/usr/bin/python
# -*- mode: Python; indent-tabs-mode: nil; -*-

import os
import unittest
import datetime

from History import History

release = 'TESTS'
hname = 'history-%s.pickle' % release

barpkg = 'bar-1.0-1.src.rpm'
foopkg = 'foo-1.0-1.src.rpm'


class TestInitHistory(unittest.TestCase):
    def testinit(self):
        try:
            os.remove(hname)
        except OSError:
            pass
        global h
        h = History(release)
        
        today = datetime.date.today()
        waitdelta = datetime.timedelta( days = 16 )
        h.Set(barpkg,today-waitdelta,today-waitdelta)


class TestSequenceFunctions(unittest.TestCase):

    def testage(self):
        a = h.GetAge(barpkg)
        self.assertEqual(a,'(16 days)')

    def testfound(self):
        r = h.Get(barpkg)
        self.assertNotEqual(r,None)

    def testnotfound(self):
        r = h.Get(foopkg)
        self.assertEqual(r,None)

    def testsave(self):
        global h
        h.Save()
        self.assertEqual(os.path.isfile(hname),True)
        r = h.Get(foopkg)
        self.assertNotEqual(r,None)

    def testsavedreload(self):
        global h
        h = History(release)  # this expires barpkg
        r = h.Get(foopkg)
        self.assertEqual(r.has_key('discovered'),True)
        self.assertEqual(r.has_key('reported'),True)
        # expired entry, is refreshed now
        r = h.Get(barpkg)
        self.assertEqual(r,None)

    def testxpiredrefresh(self):
        global h
        h.Save()
        h = History(release)
        r = h.Get(barpkg)
        self.assertNotEqual(r,None)
        self.assertEqual(r['discovered']<r['reported'],True)


if __name__ == '__main__':
    suite1 = unittest.makeSuite(TestInitHistory)
    suitemain = unittest.makeSuite(TestSequenceFunctions)
    
    alltests = unittest.TestSuite((suite1,suitemain))
    unittest.TextTestRunner(verbosity=2).run(alltests)

    try:
        os.remove(hname)
    except OSError:
        pass
    


--- NEW FILE History.py ---
#!/usr/bin/python
# -*- mode: Python; indent-tabs-mode: nil; -*-

import datetime
import pickle

# We identify broken packages based on their src.rpm name and don't report
# them more than once until expiration.
class History:

    def _reset(self):
        self.history = {}
        self.incoming = {}
        self.expired = {}

    def __init__(self,release): # Load history map file and expire its contents.
        self._reset()
        self.today = datetime.date.today()
        self.waitdelta = datetime.timedelta( days = 14 )
        self.historyname = 'history-%s.pickle' % release
        try:
            f = file(self.historyname,'r')
        except:
            return
        self.tmpdict = pickle.load(f)
        f.close()
        # Build up history/expired dicts.
        for srcrpm,tmpdict in self.tmpdict.iteritems():
            reporteddate = tmpdict['reported']
            if ( reporteddate+self.waitdelta > self.today ):
                self.history[srcrpm] = self.tmpdict[srcrpm]
            else:
                # Filter out entries which are too old.
                # We collect them in a different dict, so they are not
                # lost when we discover that a src.rpm is still broken.
                self.expired[srcrpm] = self.tmpdict[srcrpm]

    def Save(self):
        for srcrpm,dict in self.incoming.iteritems():
            self.history[srcrpm] = dict
        f = file(self.historyname,'w')
        pickle.dump(self.history,f)
        f.close()

    def Set(self,srcrpmname,discovereddate,reporteddate):
        self.history[srcrpmname] = {
            'discovered' : discovereddate,
            'reported' : reporteddate
            }

    def SetDelayed(self,srcrpmname,discovereddate,reporteddate):
        self.incoming[srcrpmname] = {
            'discovered' : discovereddate,
            'reported' : reporteddate
            }

    def Get(self,srcrpmname):
        if self.history.has_key(srcrpmname):
            return self.history[srcrpmname]
        elif self.expired.has_key(srcrpmname):
            # Reactivate an expired entry next time.
            self.SetDelayed(srcrpmname,self.expired[srcrpmname]['discovered'],self.today)
            return None  # resend mail
        else:
            # Store new entry.
            self.SetDelayed(srcrpmname,self.today,self.today)
            return None

    def GetAge(self,srcrpmname):
        if self.history.has_key(srcrpmname):
            r = self.history[srcrpmname]
        elif self.expired.has_key(srcrpmname):
            r = self.expired[srcrpmname]
        else:
            return ''
        d = self.today-r['discovered']
        if d.days >= 2:
            return '(%s days)' % d.days
        return ''






More information about the scm-commits mailing list