extras-buildsys/server ArchJob.py, 1.31, 1.32 BuildMaster.py, 1.40, 1.41 Builder.py, 1.40, 1.41 BuilderManager.py, 1.24, 1.25 PackageJob.py, 1.49, 1.50 main.py, 1.21, 1.22
Daniel Williams (dcbw)
fedora-extras-commits at redhat.com
Sun May 14 05:43:09 UTC 2006
Author: dcbw
Update of /cvs/fedora/extras-buildsys/server
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv2964/server
Modified Files:
ArchJob.py BuildMaster.py Builder.py BuilderManager.py
PackageJob.py main.py
Log Message:
2006-05-14 Dan Williams <dcbw at redhat.com>
* Rework archjob handling. They are now processed from the owning
PackageJob object, and only one is spawned for the lifetime of the
PackageJob for each architecture (previously one was spawned for each
individual build job on the builder). Archjob UIDs are generated on
the server now rather than the builder.
* Correctly handle builders going away before they've had a chance to
notify the server that they have started an archjob. Previously, these
jobs would just be lost. This is the real reason for the rework of the
archjob handling above.
* On the builder, don't use die() to end jobs that have failed; do it
properly and upload files to the server if we weren't killed
* Deal with active builders whose hostnames can't be resolved when
the server starts
* Kill any existing jobs on builders when the server starts up
* Consolidate and simplify logging functions in PackageJob
* More pylint-induced cleanups
Index: ArchJob.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/ArchJob.py,v
retrieving revision 1.31
retrieving revision 1.32
diff -u -r1.31 -r1.32
--- ArchJob.py 9 May 2006 19:10:57 -0000 1.31
+++ ArchJob.py 14 May 2006 05:43:07 -0000 1.32
@@ -15,36 +15,80 @@
# Copyright 2005 Dan Williams <dcbw at redhat.com> and Red Hat, Inc.
import time
-import socket
import os
import threading
import urllib
-import OpenSSL
from plague import FileTransfer
+import sha
+def _generate_uniqid(parent_jobid, start_time, target_dict, srpm_url):
+ distro = target_dict['distro']
+ repo = target_dict['repo']
+ target = target_dict['target']
+ arch = target_dict['arch']
+ hash_string = "%d%d%s%s%s%s%s" % (parent_jobid, start_time, distro,
+ target, arch, repo, srpm_url)
+ sha_hash = sha.new()
+ sha_hash.update(hash_string)
+ return sha_hash.hexdigest()
+
+
+AJ_STATUS_QUEUED = 'queued'
+AJ_STATUS_WAITING = 'waiting'
+AJ_STATUS_REPO_WAIT = 'repo_wait'
+AJ_STATUS_REPO_UNLOCK = 'repo_unlock'
+AJ_STATUS_RUNNING = 'running'
+AJ_STATUS_DOWNLOADING = 'downloading'
+AJ_STATUS_DONE = 'done'
+
class ArchJob:
""" Tracks a single build instance for a single arch on a builder """
- def __init__(self, builder, par_job, jobid, target_dict):
- self.par_job = par_job
- self._builder = builder
- self._repo = par_job.repo()
- self.jobid = jobid
- self._status = 'starting'
+ def __init__(self, parent, target_dict, srpm_url):
+ self._parent = parent
+ self._builder = None
+ self._repo = parent.repo()
+ self._starttime = time.time()
+ self._endtime = 0
+ self._id = _generate_uniqid(parent.uid, self._starttime, target_dict,
+ srpm_url)
+ self._status = AJ_STATUS_QUEUED
self._builder_status = ''
self._failure_noticed = False
self._download_failed = False
self._internal_failure = False
self._target_dict = target_dict
- self._builder_gone = False
+ self._srpm_url = srpm_url
self._result_files = {}
- self._starttime = time.time()
- self._endtime = 0
self._die = False
self._die_user_requested = False
self._die_lock = threading.Lock()
self._prepping = False
+ self._orphaned = False
+
+ def builder_suspend_cb(self, builder, reason, msg):
+ self.unclaim(builder)
+ if not self._is_done_status():
+ self._parent.log(self.arch(), "Builder disappeared. Requeuing arch...")
+
+ def set_builder_status(self, builder, builder_status):
+ if builder != self._builder:
+ return
+
+ # The job has just started on the builder
+ if self._builder_status == '':
+ addr = builder.address()
+ self._parent.log(self.arch(), "%s - UID is %s" % (addr, self._id))
+ # Notify our parent PackageJob that we've started
+ self._parent.archjob_started_cb(self)
+
+ oldstatus = self._builder_status
+ self._builder_status = builder_status
+ if oldstatus != self._builder_status:
+ attrdict = self._to_dict()
+ self._parent.bm.queue_archjob_status_update(self._id, attrdict)
+ del attrdict
def failure_noticed(self):
return self._failure_noticed
@@ -74,86 +118,115 @@
def arch(self):
return self._target_dict['arch']
+ def target_dict(self):
+ return self._target_dict
+
+ def srpm_url(self):
+ return self._srpm_url
+
+ def archjob_id(self):
+ return self._id
+
def builder(self):
return self._builder
+ def orphaned(self):
+ return self._orphaned
+
+ def claim(self, builder):
+ """Called by the Builder via the BuilderManager when the builder
+ agrees to build this archjob."""
+ if self._status != AJ_STATUS_QUEUED:
+ return
+ self._set_status(AJ_STATUS_WAITING)
+ self._builder = builder
+ builder.add_suspend_listener(self)
+
+ def unclaim(self, builder):
+ builder.remove_suspend_listener(self)
+ self._builder = None
+ if not self._is_done_status():
+ self._orphaned = True
+ self._builder_status = ''
+ # Mark ourselves as available for building again
+ self._set_status(AJ_STATUS_QUEUED)
+
def _to_dict(self):
attrdict = {}
- attrdict['jobid'] = self.jobid
- attrdict['parent_uid'] = self.par_job.uid
+ attrdict['jobid'] = self._id
+ attrdict['parent_uid'] = self._parent.uid
attrdict['arch'] = self._target_dict['arch']
- (ip, addr) = self._builder.address()
- # for some reason, splithost doesn't like the protocol
- # method, you have to give it a string starting with "//"
- if addr.startswith("http"):
- idx = addr.find('//')
- addr = addr[idx:]
- host_port, path = urllib.splithost(addr)
- host, port = urllib.splitport(host_port)
- attrdict['builder_addr'] = host
+ if self._builder:
+ name = self._builder.address()
+ # for some reason, splithost doesn't like the protocol
+ # method, you have to give it a string starting with "//"
+ if name.startswith("http"):
+ idx = name.find('//')
+ name = name[idx:]
+ host_port, path = urllib.splithost(name)
+ host, port = urllib.splitport(host_port)
+ attrdict['builder_addr'] = host
+ else:
+ attrdict['builder_addr'] = ""
attrdict['status'] = self._status
attrdict['builder_status'] = self._builder_status
attrdict['starttime'] = self._starttime
attrdict['endtime'] = self._endtime
return attrdict
- def set_builder_job_status(self, builder_status):
- oldstatus = self._builder_status
- self._builder_status = builder_status
- if oldstatus != self._builder_status:
- attrdict = self._to_dict()
- self.par_job.bm.queue_archjob_status_update(self.jobid, attrdict)
- del attrdict
-
- if builder_status == 'killed' or builder_status == 'failed':
- self.par_job.wake()
-
def _set_status(self, status):
oldstatus = self._status
self._status = status
if oldstatus != self._status:
attrdict = self._to_dict()
- self.par_job.bm.queue_archjob_status_update(self.jobid, attrdict)
+ self._parent.bm.queue_archjob_status_update(self._id, attrdict)
del attrdict
def _is_done_status(self):
- if self._status == 'done':
+ if self._status == AJ_STATUS_DONE:
return True
return False
- def _status_starting(self):
+ def _set_done(self):
+ self._builder.remove_suspend_listener(self)
+ self._set_status(AJ_STATUS_DONE)
+
+ def _status_queued(self):
+ pass
+
+ def _status_waiting(self):
# Builders pause before they enter the 'prep' state (which accesses
# the repo for this target), and wait for the server to allow them
# to proceed when the repo is unlocked.
if self._builder_status == 'downloaded':
- self._set_status('repo_wait')
+ self._set_status(AJ_STATUS_REPO_WAIT)
self._repo.request_unlock(self)
def _status_repo_wait(self):
pass
def repo_unlocked_callback(self):
- if self._status == 'repo_wait':
- self._set_status('repo_unlock')
+ if self._status == AJ_STATUS_REPO_WAIT:
+ self._set_status(AJ_STATUS_REPO_UNLOCK)
def _status_repo_unlock(self):
# Builder will be in 'downloaded' state until
# it notices that the repo has been unlocked
- self._builder.unlock_repo_for_job(self.jobid)
self._prepping = True
+ self._builder.unlock_repo_for_job(self._id)
if self._builder_status != 'downloaded':
- self._set_status('running')
+ self._set_status(AJ_STATUS_RUNNING)
def _status_running(self):
if self._builder_status != 'prepping':
self._prepping = False
if self._builder_finished():
- self._set_status('downloading')
- self._builder.request_job_files(self.jobid)
+ self._set_status(AJ_STATUS_DOWNLOADING)
+ self._builder.request_job_files(self._id)
def get_result_files_dir(self):
- result_dir = os.path.join(self.par_job.get_stage_dir(), self._target_dict['arch'])
+ result_dir = os.path.join(self._parent.get_stage_dir(), self._target_dict['arch'])
if not os.path.exists(result_dir):
os.makedirs(result_dir)
return result_dir
@@ -169,13 +242,16 @@
if fname != files.keys()[nresults - 1]:
file_string = file_string + ", "
- print "%s (%s/%s): Build result files - [ %s ]" % (self.par_job.uid,
- self.par_job.package, self._target_dict['arch'], file_string)
+ print "%s (%s/%s): Build result files - [ %s ]" % (self._parent.uid,
+ self._parent.package, self._target_dict['arch'], file_string)
def download_cb(self, files):
"""Called by the Builder to notify us that our job's files are available.
The files argument should be a list of _filenames_, not paths. All files
are assumed to be in the directory returned by get_result_files_dir."""
+ if self._is_done_status():
+ return
+
if len(files.keys()) == 0:
self._download_failed = True
else:
@@ -186,13 +262,21 @@
self._print_downloaded_files(self._result_files)
self._endtime = time.time()
- self._set_status('done')
- self.par_job.wake()
+ self._set_done()
def _status_downloading(self):
# Wait to be notified that our files are downloaded
pass
+ def _handle_death(self, user_requested):
+ self._builder.request_kill_for_job(self._id)
+ if self._status == AJ_STATUS_REPO_WAIT:
+ self._repo.cancel_unlock_request(self)
+ self._set_done()
+ if user_requested:
+ print "%s (%s/%s): %s - killed." % (self._parent.uid, self._parent.package,
+ self._target_dict['arch'], self._id)
+
def process(self):
if self._is_done_status():
return
@@ -203,32 +287,23 @@
user_requested = self._die_user_requested
self._die_lock.release()
if should_die:
- try:
- self._server.die(self.jobid)
- except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, OpenSSL.SSL.Error):
- pass
- if self._status == 'repo_wait':
- self._repo.cancel_unlock_request(self)
- self._set_status('done')
- if user_requested:
- print "%s (%s/%s): %s - killed." % (self.par_job.uid, self.par_job.package,
- self._target_dict['arch'], self.jobid)
+ self._handle_death(user_requested)
return
status_func = None
try:
status_func = getattr(self, "_status_%s" % self._status)
except AttributeError:
- print "%s (%s/%s): %s - internal archjob inconsistency. Unknown status '%s'." % (self.par_job.uid, self.par_job.package,
- self._target_dict['arch'], self.jobid, self._status)
- self._set_status('done')
+ print "%s (%s/%s): %s - internal archjob inconsistency. Unknown status '%s'." % (self._parent.uid,
+ self._parent.package, self._target_dict['arch'], self._id, self._status)
+ self._set_done()
self._internal_failure = True
return
# Do the actual work for this status
status_func()
- def get_status(self):
+ def status(self):
return self._status
def get_files(self):
@@ -239,21 +314,17 @@
files.append(fname)
return files
- def builder_gone(self):
- if self._status != 'done':
- self._builder_status = 'orphaned'
- self._set_status('done')
- self.par_job.remove_arch_job(self)
-
def die(self, user_requested=False):
# Can be called from other threads
- if self._status == 'running' or self._status == 'repo_wait' or self._status == 'starting' or self._status == 'repo_unlock':
- self._die_lock.acquire()
- self._die = True
- self._die_user_requested = user_requested
- self._die_lock.release()
- if user_requested:
- print "%s (%s/%s): %s - kill requested by parent job" % (self.par_job.uid,
- self.par_job.package, self._target_dict['arch'], self.jobid)
+ if self._is_done_status():
+ return
+
+ self._die_lock.acquire()
+ self._die = True
+ self._die_user_requested = user_requested
+ self._die_lock.release()
+ if user_requested:
+ print "%s (%s/%s): %s - kill requested by parent job" % (self._parent.uid,
+ self._parent.package, self._target_dict['arch'], self._id)
Index: BuildMaster.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/BuildMaster.py,v
retrieving revision 1.40
retrieving revision 1.41
diff -u -r1.40 -r1.41
--- BuildMaster.py 28 Apr 2006 03:17:41 -0000 1.40
+++ BuildMaster.py 14 May 2006 05:43:07 -0000 1.41
@@ -18,9 +18,7 @@
import time
import PackageJob
-import DBManager
import threading
-import os
import Repo
import copy
import Config
@@ -144,7 +142,7 @@
for repo in self._repos.values():
repo.stop()
- def create_job_request(self, email, package, source, target_dict, buildreq, time):
+ def create_job_request(self, email, package, source, target_dict, buildreq, ctime):
req = {}
req['email'] = email
req['package'] = package
@@ -152,7 +150,7 @@
req['target_target'] = target_dict['target']
req['target_repo'] = target_dict['repo']
req['buildreq'] = buildreq
- req['time'] = time
+ req['time'] = ctime
req['source'] = source
req['uid_avail'] = False
req['uid'] = -1
@@ -226,8 +224,8 @@
# Update job end time
try:
self._cursor.execute('UPDATE jobs SET endtime=%d WHERE uid=%d' % (job.endtime, uid))
- except StandardError, e:
- print "DB Error: could not access jobs database. Reason: '%s'" % e
+ except StandardError, exc:
+ print "DB Error: could not access jobs database. Reason: '%s'" % exc
self._dbcx.commit()
print "%s (%s): Job finished." % (uid, job.package)
@@ -253,8 +251,8 @@
sql = 'UPDATE jobs SET ' + sql + ' WHERE uid=%d' % uid
try:
self._cursor.execute(sql)
- except StandardError, e:
- print "DB Error: could not access jobs database. Reason: '%s'" % e
+ except StandardError, exc:
+ print "DB Error: could not access jobs database. Reason: '%s'" % exc
self._dbcx.commit()
def _write_archjob_status_to_db(self, uid, attrdict):
@@ -266,15 +264,15 @@
"VALUES ('%s', %d, %d, %d, '%s', '%s', '%s', '%s')" % (uid, attrdict['parent_uid'], \
attrdict['starttime'], attrdict['endtime'], attrdict['arch'], \
attrdict['builder_addr'], attrdict['status'], attrdict['builder_status']))
- except StandardError, e:
- print "DB Error: could not access jobs database. Reason: '%s'" % e
+ except StandardError, exc:
+ print "DB Error: could not access jobs database. Reason: '%s'" % exc
else:
try:
self._cursor.execute("UPDATE archjobs SET status='%s', builder_status='%s', endtime=%d " \
"WHERE jobid='%s' AND parent_uid=%d" % (attrdict['status'],
attrdict['builder_status'], attrdict['endtime'], uid, attrdict['parent_uid']))
- except StandardError, e:
- print "DB Error: could not access jobs database. Reason: '%s'" % e
+ except StandardError, exc:
+ print "DB Error: could not access jobs database. Reason: '%s'" % exc
self._dbcx.commit()
def _save_job_status(self):
Index: Builder.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/Builder.py,v
retrieving revision 1.40
retrieving revision 1.41
diff -u -r1.40 -r1.41
--- Builder.py 9 May 2006 19:10:57 -0000 1.40
+++ Builder.py 14 May 2006 05:43:07 -0000 1.41
@@ -15,7 +15,6 @@
# Copyright 2005 Dan Williams <dcbw at redhat.com> and Red Hat, Inc.
import time
-import string
import xmlrpclib
import socket
import os
@@ -60,19 +59,27 @@
self._seq_gen = Commands.SequenceGenerator()
self._lock = threading.Lock()
self._cmd_queue = []
+ self._suspend_listeners = []
+ self._status_listeners = []
+ self._ip = None
+
+ uri, rest = urllib.splittype(address)
+ host, ignore = urllib.splithost(rest)
+ self._host, port = urllib.splitport(host)
- try:
- type, rest = urllib.splittype(address)
- host, ignore = urllib.splithost(rest)
- host, port = urllib.splitport(host)
- self._ip = socket.gethostbyname(host)
- except Exception, e:
- print "Builder Error(%s): couldn't lookup builder's IP address." % address
- raise Exception(e)
+ self._get_ip()
threading.Thread.__init__(self)
self.setName("Builder: %s" % address)
+ def _get_ip(self):
+ try:
+ self._ip = socket.gethostbyname(self._host)
+ return True
+ except Exception:
+ pass
+ return False
+
def _match_target_dict(self, td1, td2):
if td1['distro'] == td2['distro']:
if td1['target'] == td2['target']:
@@ -81,24 +88,28 @@
return False
def arches(self, target_dict):
- for td in self._target_list:
- if self._match_target_dict(td, target_dict):
+ for tdict in self._target_list:
+ if self._match_target_dict(tdict, target_dict):
arches = []
- for arch in td['supported_arches']:
+ for arch in tdict['supported_arches']:
if not arch in arches:
arches.append(arch)
return arches
return None
- def can_build_for_target(self, target_dict):
+ def can_build_arch_job(self, archjob):
+ target_dict = archjob.target_dict()
for td in self._target_list:
if self._match_target_dict(td, target_dict):
- if target_dict['arch'] in td['supported_arches']:
+ if archjob.arch() in td['supported_arches']:
return True
return False
def address(self):
- return (self._ip, self._address)
+ return self._address
+
+ def ip(self):
+ return self._ip
def available(self):
""" Is the builder responding to requests? """
@@ -123,17 +134,32 @@
pass
return None
+ def add_suspend_listener(self, listener):
+ listeners = self._suspend_listeners[:]
+ if listener not in listeners:
+ self._suspend_listeners.append(listener)
+
+ def remove_suspend_listener(self, listener):
+ if listener in self._suspend_listeners:
+ self._suspend_listeners.remove(listener)
+
+ def _notify_suspend_listeners(self, reason, msg):
+ # Must copy the list since it can be modified from
+ # the listener during our iteration of it
+ listeners = self._suspend_listeners[:]
+ for listener in listeners:
+ listener.builder_suspend_cb(self, reason, msg)
+ del listeners
+
def _handle_builder_suspend(self, reason, msg):
- for jobid in self._jobs.keys():
- job = self._jobs[jobid]
- job.builder_gone()
- del self._jobs[jobid]
- self._jobs = {}
self._available = False
self._suspend_reason = reason
self._unavail_count = 0
self._prepping_jobs = False
self._when_died = time.time()
+ self._jobs = {}
+
+ self._notify_suspend_listeners(reason, msg)
# Notify admins
print "Suspending builder '%s'. Reason: %s - %s." % (self._address, reason, msg)
@@ -178,8 +204,8 @@
builder_dict['address'] = host
arches = []
- for td in self._target_list:
- for arch in td['supported_arches']:
+ for tdict in self._target_list:
+ for arch in tdict['supported_arches']:
if not arch in arches:
arches.append(arch)
builder_dict['arches'] = arches
@@ -197,7 +223,7 @@
(uniqid, status) = cmd.get_job(item)
try:
job = self._jobs[uniqid]
- job.set_builder_job_status(status)
+ job.set_builder_status(self, status)
reported_uniqids.append(uniqid)
except KeyError:
pass
@@ -208,11 +234,10 @@
# removed from the building job list before we
# were able to know that it was done. HACK
self._prepping_jobs = False
- for jobid in self._jobs.keys():
+ for jobid, job in self._jobs.items():
# If the builder didn't report this job as building,
# and its not done, explicitly get its status
- job = self._jobs[jobid]
- if jobid not in reported_uniqids and job.get_status() != 'done':
+ if jobid not in reported_uniqids and job.status() != 'done':
new_cmds.append(Commands.PlgCommandJobStatus(jobid, self._seq_gen.next()))
# Check for prepping jobs
@@ -243,11 +268,15 @@
old_cmd = self._find_and_remove_cmd_for_ack(ack, Commands.PlgCommandNewJobReq)
if old_cmd:
- parent = old_cmd.parent_job()
- archjob_id = ack.archjob_id()
- archjob = ArchJob.ArchJob(self, parent, archjob_id, old_cmd.target_dict())
- self._jobs[archjob_id] = archjob
- parent.add_arch_job(archjob)
+ archjob = old_cmd.archjob()
+ archjob_id = archjob.archjob_id()
+ ack_archjob_id = ack.archjob_id()
+ if archjob_id != ack_archjob_id:
+ print "Builder Error (%s): returned archjob_id (%s) " \
+ "doesn't match expected (%s)." % (self._address, ack_archjob_id, archjob_id)
+ archjob.unclaim(self)
+ else:
+ self._jobs[archjob_id] = archjob
def _handle_job_status_ack(self, ack):
"""Handle a job status ack by setting telling the job object
@@ -258,7 +287,7 @@
archjob_id = ack.archjob_id()
status = ack.status()
job = self._jobs[archjob_id]
- job.set_builder_job_status(status)
+ job.set_builder_status(self, status)
def _decompose_job_files_ack(self, ack):
"""Handle a job files ack by finding the archjob it's for, then
@@ -305,6 +334,28 @@
self._cmd_queue.append(cmd)
self._lock.release()
+ def request_kill_for_job(self, uniqid):
+ cmd = Commands.PlgCommandKillJob(uniqid, self._seq_gen.next())
+ self._lock.acquire()
+ self._cmd_queue.append(cmd)
+ self._lock.release()
+
+ def unlock_repo_for_job(self, uniqid):
+ """Called by an archjob to request the sending of a RepoUnlocked
+ command to the builder for a particular archjob."""
+
+ self._lock.acquire()
+ found = False
+ for cmd in self._cmd_queue:
+ if isinstance(cmd, Commands.PlgCommandUnlockRepo):
+ if cmd.archjob_id() == uniqid:
+ found = True
+ break
+ if not found:
+ cmd = Commands.PlgCommandUnlockRepo(uniqid, self._seq_gen.next())
+ self._cmd_queue.append(cmd)
+ self._lock.release()
+
# HACK: This class is a hack to work around SSL hanging issues,
# which cause the whole server to grind to a halt
@@ -326,8 +377,8 @@
(jobs, free_slots) = self._server.building_jobs()
except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, OpenSSL.SSL.Error, xmlrpclib.ProtocolError):
self.failed = True
- except xmlrpclib.Fault, e:
- print "Builder Error (%s) in _building_jobs(): builder replied '%s'" % (self._address, e)
+ except xmlrpclib.Fault, exc:
+ print "Builder Error (%s) in _building_jobs(): builder replied '%s'" % (self._address, exc)
self.failed = True
self.jobs = jobs
self.free_slots = free_slots
@@ -374,14 +425,13 @@
alive = True
except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, OpenSSL.SSL.Error, xmlrpclib.ProtocolError):
alive = False
- except xmlrpclib.Fault, e:
- print "Builder Error (%s) in _ping_builder(): builder replied '%s'" % (self._address, e)
+ except xmlrpclib.Fault, exc:
+ print "Builder Error (%s) in _ping_builder(): builder replied '%s'" % (self._address, exc)
alive = False
return (alive, target_list)
def _init_builder(self, target_list):
self._target_list = target_list
-
# Kill any jobs currently running on the builder
jobs = self._building_jobs()
for jobid in jobs.keys():
@@ -398,8 +448,8 @@
num_slots = self._server.num_slots()
except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, OpenSSL.SSL.Error, xmlrpclib.ProtocolError):
pass
- except xmlrpclib.Fault, e:
- print "Builder Error (%s) in _get_num_slots(): builder replied '%s'" % (self._address, e)
+ except xmlrpclib.Fault, exc:
+ print "Builder Error (%s) in _get_num_slots(): builder replied '%s'" % (self._address, exc)
return num_slots
def _building_jobs(self):
@@ -444,26 +494,29 @@
self._ping_timeout = 0
self._init_builder(target_list)
- def start_job(self, par_job, target_dict, srpm_url):
+ def start_job(self, req):
if not self.available():
raise RuntimeError
- if not self.can_build_for_target(target_dict):
+ if not self.can_build_request(req):
raise RuntimeError
self._server_lock.acquire()
try:
# Builder will return jobid of 0 if it can't start the job for some reason
- jobid = self._server.start_new_job(target_dict, srpm_url)
+ srpm_url = req.srpm_url()
+ target_dict = req.target_dict()
+ jobid = self._server.start_new_job(target_dict, req.srpm_url())
except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, \
- OpenSSL.SSL.Error, xmlrpclib.ProtocolError, xmlrpclib.Fault), e:
- error_class = str(e.__class__)
- error_string = str(e)
+ OpenSSL.SSL.Error, xmlrpclib.ProtocolError, xmlrpclib.Fault), exc:
+ error_class = str(exc.__class__)
+ error_string = str(exc)
jobarch = target_dict['arch']
- print "%s (%s/%s): %s exception '%s' starting job on %s" % (par_job.uid, \
- par_job.package, jobarch, error_class, error_string, \
+ parent = req.parent()
+ print "%s (%s/%s): %s exception '%s' starting job on %s" % (parent.uid, \
+ parent.package, jobarch, error_class, error_string, \
self._address)
# Check for hard errors, for which we suspend the builder
- if string.find(error_string, "OSError") >= 0 and string.find(error_string, "Errno") >= 0:
+ if error_string.find("OSError") >= 0 and error_string.find("Errno") >= 0:
self._handle_builder_suspend(SUSPEND_HARD_ERROR, error_string)
time.sleep(0.5)
jobid = 0
@@ -472,7 +525,7 @@
self._server_lock.release()
raise RuntimeError
- job = ArchJob.ArchJob(self, self._server_cfg, self._server, par_job, jobid, target_dict)
+ job = ArchJob.ArchJob(self, self._server_cfg, self._server, parent, jobid, target_dict)
self._jobs[jobid] = job
self._update_building_jobs()
self._server_lock.release()
@@ -488,7 +541,7 @@
try:
job = self._jobs[jobid]
status = jobs[jobid]
- job.set_builder_job_status(status)
+ job.set_builder_status(self, status)
builder_jobs.append(jobid)
except KeyError:
pass
@@ -503,10 +556,10 @@
# If the builder didn't report this job as building,
# and its not done, explicitly grab its status
job = self._jobs[jobid]
- if jobid not in builder_jobs and job.get_status() != 'done':
+ if jobid not in builder_jobs and job.status() != 'done':
status = self._get_builder_job_status(jobid)
if status:
- job.set_builder_job_status(status)
+ job.set_builder_status(self, status)
# Check for prepping jobs
if job.prepping():
@@ -573,22 +626,6 @@
def _init_builder(self, target_list):
self._target_list = target_list
- def unlock_repo_for_job(self, uniqid):
- """Called by an archjob to request the sending of a RepoUnlocked
- command to the builder for a particular archjob."""
-
- self._lock.acquire()
- found = False
- for cmd in self._cmd_queue:
- if isinstance(cmd, Commands.PlgCommandUnlockRepo):
- if cmd.archjob_id() == uniqid:
- found = True
- break
- if not found:
- cmd = Commands.PlgCommandUnlockRepo(uniqid, self._seq_gen.next())
- self._cmd_queue.append(cmd)
- self._lock.release()
-
def _handle_job_files_ack(self, cmd):
(archjob, urls) = self._decompose_job_files_ack(cmd)
if not archjob:
@@ -644,10 +681,10 @@
# Grab some work for the builder if any is available
new_cmds = []
if self._free_slots > 0:
- req = self._manager.claim_arch_job(self)
- if req:
+ archjob = self._manager.claim_arch_job(self)
+ if archjob:
next_seq = self._seq_gen.next()
- cmd = Commands.PlgCommandNewJobReq(req['parent'], req['target_dict'], req['srpm_url'], next_seq)
+ cmd = Commands.PlgCommandNewJobReq(archjob, seq=next_seq)
new_cmds.append(cmd)
self._lock.acquire()
@@ -667,6 +704,11 @@
self._lock.release()
return cmd_list
+ def ip(self):
+ if not self._ip:
+ self._get_ip()
+ return Builder.ip(self)
+
_SLEEP_INTERVAL = 10
def run(self):
"""Main builder loop. Since the builder contacts us,
@@ -675,28 +717,24 @@
DebugUtils.registerThreadName(self)
while not self._stop:
- if not self._available:
- time.sleep(self._SLEEP_INTERVAL)
- continue
-
- process_jobs = True
- self._lock.acquire()
- if self._unavail_count > 2:
- self._handle_builder_suspend(SUSPEND_TIMEOUT, "the builder timed out")
- process_jobs = False
- elif self._last_contact + self._REQUIRED_CONTACT_INTERVAL < time.time():
- self._unavail_count = self._unavail_count + 1
- self._lock.release()
-
- if process_jobs:
- for j in self._jobs.values():
- j.process()
+ if self._available:
+ self._lock.acquire()
+ if self._unavail_count > 2:
+ self._handle_builder_suspend(SUSPEND_TIMEOUT, "the builder timed out")
+ elif self._last_contact + self._REQUIRED_CONTACT_INTERVAL < time.time():
+ self._unavail_count = self._unavail_count + 1
+ self._lock.release()
time.sleep(self._SLEEP_INTERVAL)
def _handle_builder_suspend(self, reason, msg):
Builder._handle_builder_suspend(self, reason, msg)
self._last_contact = 0
+ self._ip = None
+
+ # Clear out the command queue; we start clean when the
+ # builder contacts us again
+ self._cmd_queue = []
def _handle_builder_reactivate(self, cmd_list):
# Grab an updated target list from the command stream when
@@ -705,6 +743,14 @@
for cmd in cmd_list:
if isinstance(cmd, Commands.PlgCommandTargets):
target_list = cmd.targets()
+ elif isinstance(cmd, Commands.PlgCommandBuildingJobs):
+ # Tell the builder to kill all jobs it might be building
+ # right now. Think server restart here.
+ for item in cmd.jobs():
+ (uniqid, status) = cmd.get_job(item)
+ cmd = Commands.PlgCommandKillJob(uniqid, self._seq_gen.next())
+ self._cmd_queue.append(cmd)
+
if not target_list:
target_list = self._target_list
Index: BuilderManager.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/BuilderManager.py,v
retrieving revision 1.24
retrieving revision 1.25
diff -u -r1.24 -r1.25
--- BuilderManager.py 9 May 2006 19:10:57 -0000 1.24
+++ BuilderManager.py 14 May 2006 05:43:07 -0000 1.25
@@ -15,34 +15,31 @@
# Copyright 2005 Dan Williams <dcbw at redhat.com> and Red Hat, Inc.
import time
-import string
-import xmlrpclib
-import sys
import socket
import os
import threading
import Builder
-import EmailUtils
-import Config
-import time
from plague import DebugUtils
from plague import AuthedXMLRPCServer
from plague import HTTPServer
from plague import Commands
+import ArchJob
class AddrCache(object):
+ _ENTRY_EXPIRE_TIME = 3600
+
def __init__(self):
self._cache = {}
def get(self, name):
# Expire cache entry if one exists and is old
- time = ip = None
+ ip = None
try:
- (time, ip) = self._cache[name]
- if time < time.time() - (60 * 60):
+ (itime, ip) = self._cache[name]
+ if itime < time.time() - self._ENTRY_EXPIRE_TIME:
del self._cache[name]
- time = ip = None
+ itime = ip = None
except KeyError:
pass
@@ -127,14 +124,18 @@
hostname = cfg.get_str("General", "hostname")
port = cfg.get_int("Active Builders", "xmlrpc_server_port")
- if cfg.get_bool("Builders", "use_ssl") == True:
- certs = {}
- certs['key_and_cert'] = cfg.get_str("SSL", "server_key_and_cert")
- certs['ca_cert'] = cfg.get_str("SSL", "ca_cert")
- certs['peer_ca_cert'] = cfg.get_str("SSL", "ca_cert")
- self._server = AuthedSSLBuilderServer((hostname, port), certs, self._bm)
- else:
- self._server = AuthedBuilderServer((hostname, port), self._bm)
+ try:
+ if cfg.get_bool("Builders", "use_ssl") == True:
+ certs = {}
+ certs['key_and_cert'] = cfg.get_str("SSL", "server_key_and_cert")
+ certs['ca_cert'] = cfg.get_str("SSL", "ca_cert")
+ certs['peer_ca_cert'] = cfg.get_str("SSL", "ca_cert")
+ self._server = AuthedSSLBuilderServer((hostname, port), certs, self._bm)
+ else:
+ self._server = AuthedBuilderServer((hostname, port), self._bm)
+ except socket.gaierror, exc:
+ raise socket.gaierror(exc[0], "Couldn't bind to address %s:%d (%s)" % (hostname,
+ port, exc[1]))
self._dispatcher = BuilderDispatcher()
self._server.register_instance(self._dispatcher)
@@ -145,10 +146,10 @@
def stop(self):
self._server.stop()
- t = time.time()
+ tm = time.time()
while not self._stopped:
try:
- if time.time() > t + 2:
+ if time.time() > tm + 2:
break
except KeyboardInterrupt:
pass
@@ -163,7 +164,6 @@
self._builders = []
any_active = self._load_builders()
- self._print_builders()
self._queue_lock = threading.Lock()
self._queue = []
@@ -190,10 +190,12 @@
self._fileserver.set_POST_handler('/upload', self.upload_callback)
self._fileserver.start()
+ self._print_builders()
+
def upload_callback(self, request_handler, fs):
# Ensure we know this builder
- addr = request_handler.client_address[0]
- builder = self.get_builder(addr, addr)
+ ip = request_handler.client_address[0]
+ builder = self.get_builder(ip, ip)
if not builder:
request_handler.send_error(403, "Unauthorized")
return
@@ -236,7 +238,7 @@
print "\nAuthorized Builders:"
print "-" * 90
for builder in self._builders:
- (ip, addr) = builder.address()
+ addr = builder.address()
string = " " + addr
string = string + " " * (40 - len(addr))
builder_dict = builder.to_dict()
@@ -269,7 +271,7 @@
# If the address is already in our _builders list, skip it
skip = False
for builder in self._builders:
- (ip, addr) = builder.address()
+ addr = builder.address()
if address == addr:
skip = True
break
@@ -301,21 +303,23 @@
return builder_list
def get_builder(self, cert_ip, con_ip):
+ """Return the Builder object, if any, that matches the specified
+ IP address. Performs basic checking on the address too."""
self._builders_lock.acquire()
- builder = None
+ ret_builder = None
# Ensure builder's certificate (if SSL) and
# the remote address of its connection are the same
if cert_ip == con_ip:
# Find matching builder in our authorized builders list
- for b in self._builders:
- (ip, addr) = b.address()
- if cert_ip == ip:
- builder = b
+ for builder in self._builders:
+ ip = builder.ip()
+ if ip and cert_ip == ip:
+ ret_builder = builder
break
self._builders_lock.release()
- return builder
+ return ret_builder
def _builder_cmp_func(self, builder1, builder2):
# If both builders have at least one free slot, sort on
@@ -341,29 +345,36 @@
return 1
def claim_arch_job(self, builder):
+ """Called by a Builder instance to find a job for the builder to build."""
archjob = None
self._queue_lock.acquire()
- for req in self._queue:
- if builder.can_build_for_target(req['target_dict']):
- self._queue.remove(req)
- archjob = req
+
+ # First pass: look for orphaned jobs
+ for job in self._queue:
+ if job.status() != ArchJob.AJ_STATUS_QUEUED:
+ continue
+ if job.orphaned() and builder.can_build_arch_job(job):
+ job.claim(builder)
+ archjob = job
break
+
+ # Second pass: just pick any job
+ if not archjob:
+ for job in self._queue:
+ if job.status() != ArchJob.AJ_STATUS_QUEUED:
+ continue
+ if builder.can_build_arch_job(job):
+ job.claim(builder)
+ archjob = job
+ break
+
self._queue_lock.release()
return archjob
- def request_arch_job(self, par_job, target_dict, srpm_url, orphaned):
- req = {}
- req['parent'] = par_job
- req['target_dict'] = target_dict
- req['srpm_url'] = srpm_url
- req['time_queued'] = time.time()
-
+ def request_arch_job(self, archjob):
+ """Called by the PackageJob instance to queue new arch-specific build jobs."""
self._queue_lock.acquire()
- if orphaned:
- # insert orphaned requests at the front of the queue
- self._queue.insert(0, req)
- else:
- self._queue.append(req)
+ self._queue.append(archjob)
self._queue_lock.release()
def any_prepping_builders(self):
@@ -373,4 +384,3 @@
return True
return False
-
Index: PackageJob.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/PackageJob.py,v
retrieving revision 1.49
retrieving revision 1.50
diff -u -r1.49 -r1.50
--- PackageJob.py 11 May 2006 14:46:37 -0000 1.49
+++ PackageJob.py 14 May 2006 05:43:07 -0000 1.50
@@ -27,9 +27,6 @@
import copy
import string
import EmailUtils
-import xmlrpclib
-import socket
-import BuilderManager
import ArchJob
from plague import ArchUtils
from plague import DebugUtils
@@ -47,9 +44,6 @@
if DEBUG:
print stuff
-def log(stuff=''):
- print stuff
-
class PrepError(exceptions.Exception): pass
class DepError(exceptions.Exception): pass
@@ -72,6 +66,7 @@
"build" stage. This class provides the actual running thread.
"""
def __init__(self, pkg_job, start_stage, end_stage):
+ self._stop = False
self._pkg_job = pkg_job
if not end_stage:
end_stage = 'aaaaa'
@@ -82,9 +77,11 @@
def run(self):
DebugUtils.registerThreadName(self)
- while not self._pkg_job.is_done() and not self._pkg_job.cur_stage() == self._end_stage:
+ while not self._stop and not self._pkg_job.is_done() and not self._pkg_job.cur_stage() == self._end_stage:
self._pkg_job.process()
+ def stop(self):
+ self._stop = True
def is_package_job_stage_valid(stage):
""" Validate a job stage """
@@ -118,9 +115,9 @@
def make_job_log_url(base_url, target_str, uid, name, ver, release):
if target_str and uid and name and ver and release:
if base_url.endswith('/'):
- slash=''
+ slash = ''
else:
- slash='/'
+ slash = '/'
return "%s%s%s/%s-%s-%s-%s/" % (base_url, slash, target_str, uid, name, ver, release)
return None
@@ -158,9 +155,10 @@
self._depsolve_dir = None
self._last_depsolve_error = None
self._depsolve_first_try = False
+ self._checkout_tmpdir = None
self.repofiles = {}
- self.archjobs = {}
+ self._archjobs = {}
self._archjobs_lock = threading.Lock()
self._event = threading.Event()
self._killer = None
@@ -175,6 +173,12 @@
pjc = PackageJobController(self, first_stage, 'waiting')
pjc.start()
+ def log(self, arch=None, msg=None):
+ archstring = ""
+ if arch:
+ archstring = "/%s" % arch
+ print "%s (%s%s): %s" % (self.uid, self.package, archstring, msg)
+
def cur_stage(self):
return self._curstage
@@ -374,7 +378,7 @@
for line in lines:
if line.find('..........') == -1 and len(line) > 0:
output_lines.append(line)
- o = string.join(output_lines, '\n')
+ o = string.join(output_lines, ('\n'))
msg = "Error: could not make srpm for %s - output was:\n\n%s" % (self._source, o)
raise PrepError(msg)
@@ -413,11 +417,11 @@
self.epoch = '0'
self.ver = hdr['version']
self.release = hdr['release']
- (self.archjobs, pkg_arches, allowed_arches) = self.arch_handling(hdr)
+ (self._archjobs, pkg_arches, allowed_arches) = self.arch_handling(hdr)
del hdr
del ts
- if len(self.archjobs) == 0:
+ if len(self._archjobs) == 0:
msg = """Package %s does not build on any architectures this build system supports.
Package: %s
Build System: %s
@@ -426,7 +430,7 @@
if self._server_cfg.get_bool("General", "depsolve_jobs"):
self._set_cur_stage('depsolve_wait')
- log("%s (%s): Requesting depsolve..." % (self.uid, self.package))
+ self.log(msg="Requesting depsolve...")
self._repo.request_depsolve(self, first_try=True)
return True # sleep until the Repo wakes us up for depsolve
else:
@@ -445,9 +449,9 @@
config_lines = []
job_yum_dir = os.path.join(self._depsolve_dir, arch)
for line in config_opts['yum.conf'].split('\n'):
- if string.find(line, "cachedir=") >= 0:
+ if line.find("cachedir=") >= 0:
line = "cachedir=cache"
- elif string.find(line, "logfile=") >= 0:
+ elif line.find("logfile=") >= 0:
line = "logfile=yum.log"
config_lines.append(line+'\n')
del config_opts
@@ -483,8 +487,8 @@
base.log = Logger(threshold=threshold, file_object=sys.stdout)
try:
base.doRepoSetup()
- except yum.Errors.RepoError, e:
- raise DepError(str(e))
+ except yum.Errors.RepoError, exc:
+ raise DepError(str(exc))
archlist = ['src', 'noarch']
if ArchUtils.supported_arches.has_key(arch):
@@ -495,27 +499,27 @@
ts = rpmUtils.transaction.initReadOnlyTransaction()
try:
srpm = yum.packages.YumLocalPackage(ts, self._srpm_path)
- except yum.Errors.MiscError, e:
+ except yum.Errors.MiscError, exc:
del ts
- raise DepError(str(e))
+ raise DepError(str(exc))
del ts
try:
base.doSackSetup(archlist)
- except yum.Errors.RepoError, e:
- raise DepError(str(e))
+ except yum.Errors.RepoError, exc:
+ raise DepError(str(exc))
for dep in srpm.requiresList():
if dep.startswith("rpmlib("): continue
try:
pkg = base.returnPackageByDep(dep)
- except repomd.mdErrors.PackageSackError, e:
- raise DepError(str(e))
- except yum.Errors.YumBaseError, e:
- raise DepError(str(e))
- except DepError, e:
- self._last_depsolve_error = str(e)
- print "%s (%s/%s): Depsolve Error: %s" % (self.uid, self.package, arch, str(e))
+ except repomd.mdErrors.PackageSackError, exc:
+ raise DepError(str(exc))
+ except yum.Errors.YumBaseError, exc:
+ raise DepError(str(exc))
+ except DepError, exc:
+ self._last_depsolve_error = str(exc)
+ print "%s (%s/%s): Depsolve Error: %s" % (self.uid, self.package, arch, str(exc))
success = False
if base:
@@ -530,8 +534,8 @@
self.wake()
def _stage_depsolve(self):
- """ Depsolve all arches, only if all pass do we proceed """
- """ to queue up the actual archjobs for building """
+ """Depsolve all arches, only if all pass do we proceed
+ to queue up the actual archjobs for building."""
# If the job's waited more than 8 hours for deps to be
# solved, kill the job and make some human figure it out
@@ -551,18 +555,18 @@
self._archjobs_lock.acquire()
unsolved_deps = False
- archlist = self.archjobs.keys()
+ archlist = self._archjobs.keys()
# noarch jobs are a bit special here. Since we don't know
# what arch they will actually build on, we have to make
# sure all arches the target supports will work
- if len(self.archjobs.keys()) == 1 and self.archjobs.keys()[0] == 'noarch':
+ if len(self._archjobs.keys()) == 1 and self._archjobs.keys()[0] == 'noarch':
archlist = self._target_cfg.basearches()
for arch in self._target_cfg.optarches():
if arch not in archlist:
archlist.append(arch)
- log("%s (%s): Starting depsolve for arches: %s." % (self.uid, self.package, archlist))
+ self.log(msg="Starting depsolve for arches: %s." % archlist)
failed_arch = None
for arch in archlist:
@@ -588,10 +592,10 @@
# Go to sleep until the repo changes
self._set_cur_stage('depsolve_wait')
self._repo.request_depsolve(self, first_try=False)
- log("%s (%s): Finished depsolve (unsuccessful), trying again later." % (self.uid, self.package))
+ self.log(msg="Finished depsolve (unsuccessful), trying again later.")
return True
- log("%s (%s): Finished depsolve (successful), requesting archjobs." % (self.uid, self.package))
+ self.log(msg="Finished depsolve (successful), requesting archjobs.")
# Success, all deps are solved. Kill the depsolve dir
shutil.rmtree(self._depsolve_dir, ignore_errors=True)
@@ -603,7 +607,7 @@
# Make some directories we need
work_dir = self._server_cfg.get_str("Directories", "server_work_dir")
self._result_dir = self._make_stage_dir(work_dir)
- for arch in self.archjobs.keys():
+ for arch in self._archjobs.keys():
thisdir = os.path.join(self._result_dir, arch)
if not os.path.exists(thisdir):
os.makedirs(thisdir)
@@ -620,7 +624,7 @@
# Queue up archjobs
self._set_cur_stage('waiting')
- self._request_arch_jobs()
+ self._create_arch_jobs()
return False
def _stage_depsolve_wait(self):
@@ -636,58 +640,34 @@
os.makedirs(stage_dir)
return stage_dir
- def _request_one_arch_job(self, arch, orphaned):
- # Construct SPRM URL
- srpm_http_base = self._srpm_http_path[len(self.http_dir):]
- method = "http"
- if self._server_cfg.get_bool("Builders", "use_ssl") == True:
- method = "https"
- hostname = self._server_cfg.get_str("General", "hostname")
- port = self._server_cfg.get_int("Active Builders", "file_server_port")
- srpm_url = "%s://%s:%d/%s" % (method, hostname, port, srpm_http_base)
- target_dict = self._target_cfg.target_dict(arch)
- self.bm.builder_manager.request_arch_job(self, target_dict, srpm_url, orphaned)
-
- def _request_arch_jobs(self):
- # Queue requests for build jobs
+ def _create_arch_jobs(self):
self._archjobs_lock.acquire()
- for arch in self.archjobs.keys():
- if self.archjobs[arch]:
- continue
- self._request_one_arch_job(arch, False)
+ for arch in self._archjobs.keys():
+ # Construct the SRPM URL
+ srpm_http_base = self._srpm_http_path[len(self.http_dir):]
+ method = "http"
+ if self._server_cfg.get_bool("Builders", "use_ssl") == True:
+ method = "https"
+ hostname = self._server_cfg.get_str("General", "hostname")
+ port = self._server_cfg.get_int("Active Builders", "file_server_port")
+ srpm_url = "%s://%s:%d/%s" % (method, hostname, port, srpm_http_base)
+
+ # Create and queue the archjob
+ target_dict = self._target_cfg.target_dict(arch)
+ archjob = ArchJob.ArchJob(self, target_dict, srpm_url)
+ self._archjobs[arch] = archjob
+ self.bm.builder_manager.request_arch_job(archjob)
self._archjobs_lock.release()
- def add_arch_job(self, job):
- """ Called by the BuilderManager when it's started a new arch job for us """
- self._archjobs_lock.acquire()
- jobarch = job.arch()
- if self.archjobs[jobarch] != None:
- log("%s (%s/%s): Already have archjob for this arch (%s). New job UID is %s." % (self.uid, \
- self.package, jobarch, self.archjobs[jobarch].jobid, job.jobid))
- self.archjobs[jobarch] = job
-
+ def archjob_started_cb(self, archjob):
# If this is the first archjob, that means we are now building.
# So we start up the second PackageJobController thread.
if self._curstage == 'waiting':
t = PackageJobController(self, 'building', None)
t.start()
- self._archjobs_lock.release()
- # Only want hostname, not both IP and hostname
- addr = job.builder().address()[1]
- log("%s (%s/%s): %s - UID is %s" % (self.uid, self.package, jobarch, addr, job.jobid))
-
- def remove_arch_job(self, job):
- """ Removes an arch job when its builder is no longer responding """
- self._archjobs_lock.acquire()
- jobarch = job.arch()
- log("%s (%s/%s): Builder disappeared. Requeuing arch..." % (self.uid, self.package, jobarch))
- self.archjobs[jobarch] = None
- self._request_one_arch_job(jobarch, True)
- self._archjobs_lock.release()
-
def is_done(self):
- if self._curstage == 'needsign' or self._curstage == 'failed' or self._curstage == 'finished':
+ if self._curstage in ['needsign', 'failed', 'finished']:
return True
return False
@@ -695,8 +675,7 @@
self._killer = username
self._die = True
- log("%s (%s): Job kill request from %s" % (self.uid, self.package, username))
- self._archjobs_lock.acquire()
+ self.log(msg="Job kill request from %s" % username)
if self._curstage == 'waiting':
# In 'waiting' stage, we have no controller thread. So to get
# the job killed immediately, we have to start one
@@ -705,14 +684,14 @@
else:
# Otherwise, wake up the existing controller thread
self.wake()
- self._archjobs_lock.release()
def _handle_death(self):
- resultstring = "%s (%s): Build on target %s was killed by %s." % (self.uid, self.name, self._target_str, self._killer)
+ result_start = "%s (%s): " % (self.uid, self.name)
+ resultstring = "Build on target %s was killed by %s." % (self._target_str, self._killer)
self._result = 'killed'
self._set_cur_stage('finished', resultstring)
- self.email_result(self.username, resultstring)
- log(resultstring)
+ self.email_result(self.username, result_start + resultstring)
+ self.log(msg=resultstring)
# Kill any building jobs
self._kill_all_archjobs(True)
@@ -726,19 +705,14 @@
def _kill_all_archjobs(self, user_requested=False):
self._archjobs_lock.acquire()
- for job in self.archjobs.values():
- if job:
- job.die(user_requested)
- self.archjobs = {}
+ for job in self._archjobs.values():
+ job.die(user_requested)
self._archjobs_lock.release()
def wake(self):
self._event.set()
def process(self):
- if self.is_done():
- return
-
if self._die:
self._handle_death()
return
@@ -750,24 +724,24 @@
while not self._event.isSet():
self._event.wait()
self._event.clear()
- except PrepError, e:
+ except PrepError, exc:
if self.use_cvs == True:
shutil.rmtree(self.checkout_tmpdir, ignore_errors=True)
- msg = str(e)
+ msg = str(exc)
subj = 'Prep Error (Job %s): %s on %s' % (self.uid, self._source, self._target_str)
self.email_result(self.username, resultstring=msg, subject=subj)
self._stage_failed(msg)
- except DepError, e:
- msg = str(e)
+ except DepError, exc:
+ msg = str(exc)
subj = 'Dependencies Error (Job %s): %s on %s' % (self.uid, self._source, self._target_str)
self.email_result(self.username, resultstring=msg, subject=subj)
self._stage_failed(msg)
- except BuildError, e:
+ except BuildError, exc:
subj = 'Build Error (Job %s): %s on %s' % (self.uid, self._source, self._target_str)
base_url = self._server_cfg.get_str("UI", "log_url")
log_url = make_job_log_url(base_url, self._target_str, self.uid, self.name, self.ver, self.release)
- msg = "%s\n\n Build logs may be found at %s\n\n" % (e.msg, log_url)
- logtail = self._get_log_tail(e.arch)
+ msg = "%s\n\n Build logs may be found at %s\n\n" % (exc.msg, log_url)
+ logtail = self._get_log_tail(exc.arch)
msg = "%s\n-------------------------------------------------\n\n%s\n" % (msg, logtail)
self.email_result(self.username, resultstring=msg, subject=subj)
@@ -776,44 +750,46 @@
if self._target_cfg.testing() == False:
# Kill remaining jobs on other arches
self._kill_all_archjobs(False)
- self._stage_failed(e.msg)
+ self._stage_failed(exc.msg)
def _stage_building(self):
- # Count failed and completed jobs
completed_jobs = 0
failed_jobs = 0
self._archjobs_lock.acquire()
- for job in self.archjobs.values():
- if not job:
- continue
- if job.get_status() is 'done':
- completed_jobs = completed_jobs + 1
-
- if job.builder_failed() or job.download_failed() or job.internal_failure():
- failed_jobs = failed_jobs + 1
+ for job in self._archjobs.values():
+ # If the archjob is running, give it some time
+ if job.status() is not ArchJob.AJ_STATUS_DONE:
+ job.process()
- # Normal jobs will just stop when a single archjob fails, but
- # testing targets don't kill the build when one fails. However,
- # even for testing targets, we still want to notify the user if
- # a particular arch fails.
- if not job.failure_noticed():
- job.set_failure_noticed()
- jobarch = job.arch()
- msg = "Job failed."
- if job.builder_failed():
- msg = "Job failed on arch %s\n" % jobarch
- elif job.download_failed():
- msg = "Job failed on arch %s: couldn't download result files from builder '%s'.\n " \
- "Please contact the build system administrator." % (jobarch, job.builder().address())
- elif job.internal_failure():
- msg = "Job failed on arch %s: there was an internal build system failure.\n " \
- "Please contact the build system administrator." % jobarch
- self._archjobs_lock.release()
- raise BuildError(msg, jobarch)
+ # If the archjob is still running, go to next archjob
+ if job.status() is not ArchJob.AJ_STATUS_DONE:
+ continue
+ completed_jobs = completed_jobs + 1
+ if job.builder_failed() or job.download_failed() or job.internal_failure():
+ failed_jobs = failed_jobs + 1
+
+ # Normal jobs will just stop when a single archjob fails, but
+ # testing targets don't kill the build when one fails. However,
+ # even for testing targets, we still want to notify the user if
+ # a particular arch fails.
+ if not job.failure_noticed():
+ job.set_failure_noticed()
+ jobarch = job.arch()
+ msg = "Job failed."
+ if job.builder_failed():
+ msg = "Job failed on arch %s\n" % jobarch
+ elif job.download_failed():
+ msg = "Job failed on arch %s: couldn't download result files from builder '%s'.\n " \
+ "Please contact the build system administrator." % (jobarch, job.builder().address())
+ elif job.internal_failure():
+ msg = "Job failed on arch %s: there was an internal build system failure.\n " \
+ "Please contact the build system administrator." % jobarch
+ self._archjobs_lock.release()
+ raise BuildError(msg, jobarch)
self._archjobs_lock.release()
- if completed_jobs == len(self.archjobs):
+ if completed_jobs == len(self._archjobs):
# Testing targets don't contribute packages to the repo
if self._target_cfg.testing() == True:
if failed_jobs > 0:
@@ -824,7 +800,8 @@
self._set_cur_stage('add_to_repo')
return False # Don't want to wait
- return True
+ time.sleep(5)
+ return False
def get_stage_dir(self):
return self._result_dir
@@ -848,9 +825,7 @@
srpm_file = os.path.join(self._result_dir, os.path.basename(self._srpm_http_path))
# Delete any RPMs in the arch dirs
- for job in self.archjobs.values():
- if not job:
- continue
+ for job in self._archjobs.values():
job_result_dir = job.get_result_files_dir()
for f in job.get_files():
if not f.endswith(".rpm"):
@@ -874,9 +849,7 @@
# Create a list of files that the repo should copy to
# the repo dir
repo_dir = self._server_cfg.get_str("Directories", "repo_dir")
- for job in self.archjobs.values():
- if not job:
- continue
+ for job in self._archjobs.values():
job_result_dir = job.get_result_files_dir()
for f in job.get_files():
if not f.endswith(".rpm"):
Index: main.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/main.py,v
retrieving revision 1.21
retrieving revision 1.22
diff -u -r1.21 -r1.22
--- main.py 28 Apr 2006 03:17:41 -0000 1.21
+++ main.py 14 May 2006 05:43:07 -0000 1.22
@@ -91,7 +91,7 @@
ret=daemonize.createDaemon()
if ret:
print "Daemonizing failed!"
- sys.exit(2)
+ os._exit(2)
if opts.pidfile:
open(opts.pidfile, 'w').write('%d\n' % os.getpid())
@@ -107,7 +107,7 @@
cfg.load_target_configs()
if len(cfg.targets()) == 0:
print "You need at least one target to do anything useful."
- sys.exit(3)
+ os._exit(3)
hostname = cfg.get_str("General", "hostname")
@@ -125,7 +125,11 @@
dbm = DBManager.DBManager(cfg)
# Create the BuildMaster thread
- builder_manager = BuilderManager.BuilderManager(cfg)
+ try:
+ builder_manager = BuilderManager.BuilderManager(cfg)
+ except Exception, exc:
+ print "Couldn't create BuilderManager: %s" % exc
+ os._exit(4)
bm = BuildMaster.BuildMaster(builder_manager, dbm, cfg)
bm.start()
@@ -143,10 +147,10 @@
else:
ui = UserInterfaceNoAuth(builder_manager, bm, dbm, cfg)
bm_server = AuthedXMLRPCServer.AuthedXMLRPCServer((hostname, port))
- except socket.error, e:
- if e[0] == 98: # Address already in use
+ except socket.error, exc:
+ if exc[0] == 98: # Address already in use
print "Error: couldn't bind to address '%s:%s'. Is the server already running?" % (hostname, port)
- os._exit(1)
+ os._exit(4)
bm_server.register_instance(ui)
# Create dummy thread just to register main thread's name
More information about the scm-commits
mailing list