extras-buildsys/server Builder.py, 1.41, 1.42 BuilderManager.py, 1.25, 1.26
Daniel Williams (dcbw)
fedora-extras-commits at redhat.com
Tue May 16 15:49:59 UTC 2006
Author: dcbw
Update of /cvs/fedora/extras-buildsys/server
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv26625/server
Modified Files:
Builder.py BuilderManager.py
Log Message:
2006-05-16 Dan Williams <dcbw at redhat.com>
* Make passive builders work somewhat more; there are still some bugs
in command processing though.
Index: Builder.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/Builder.py,v
retrieving revision 1.41
retrieving revision 1.42
diff -u -r1.41 -r1.42
--- Builder.py 14 May 2006 05:43:07 -0000 1.41
+++ Builder.py 16 May 2006 15:49:56 -0000 1.42
@@ -62,6 +62,7 @@
self._suspend_listeners = []
self._status_listeners = []
self._ip = None
+ self._last_contact = 0
uri, rest = urllib.splittype(address)
host, ignore = urllib.splithost(rest)
@@ -359,31 +360,37 @@
# HACK: This class is a hack to work around SSL hanging issues,
# which cause the whole server to grind to a halt
-class BuildingJobsCheck(threading.Thread):
- def __init__(self, server, address):
- self._server = server
+class PassiveBuilderRequest(threading.Thread):
+ def __init__(self, address, certs, cmds):
self._address = address
-
+ self._certs = certs
+ self._cmds = cmds
self.done = False
- self.failed = False
-
+ self.failed = True
+ self.response = None
+ self._server = XMLRPCServerProxy.PlgXMLRPCServerProxy(self._address, self._certs, timeout=20)
threading.Thread.__init__(self)
def run(self):
- self.setName("BuildingJobsCheck: %s" % self._address)
- jobs = {}
- free_slots = 0
+ self.setName("PassiveBuilderRequest: %s" % self._address)
try:
- (jobs, free_slots) = self._server.building_jobs()
- except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, OpenSSL.SSL.Error, xmlrpclib.ProtocolError):
- self.failed = True
- except xmlrpclib.Fault, exc:
- print "Builder Error (%s) in _building_jobs(): builder replied '%s'" % (self._address, exc)
- self.failed = True
- self.jobs = jobs
- self.free_slots = free_slots
+ cmd_stream = Commands.serialize_to_command_stream(self._cmds)
+ self.response = self._server.request(cmd_stream)
+ self.failed = False
+ except (socket.timeout, OpenSSL.SSL.SysCallError, OpenSSL.SSL.Error,
+ xmlrpclib.ProtocolError, xmlrpclib.Fault), exc:
+ print "PassiveBuilder(%s) Error in request(): '%s'" % (self._address, exc)
+ except socket.error, exc:
+ if exc[0] != 111:
+ print "PassiveBuilder(%s) Error in request(): '%s'" % (self._address, exc)
self.done = True
+ def close(self):
+ try:
+ self._server.close()
+ except:
+ pass
+
class PassiveBuilder(Builder):
"""
Passive builders are ones that do not initiate connections. They
@@ -391,223 +398,165 @@
a firewall without having holes punched through it.
"""
- _BUILDER_PING_INTERVAL = 60 * 5 # In seconds
+ # How often we try to contact unavailable builders
+ _BUILDER_UNAVAIL_PING_INTERVAL = 300 # 5 minutes (in seconds)
- def __init__(self, manager, cfg, address, weight, btype):
- Builder.__init__(self, manager, cfg, address, weight, btype)
+ # How often we try to contact available builders
+ _BUILDER_AVAIL_PING_INTERVAL = 20
- self._ping_timeout = 0
- self._cur_ping_interval = self._BUILDER_PING_INTERVAL
- self._ping_now = False
+ def __init__(self, manager, cfg, address, weight):
+ Builder.__init__(self, manager, cfg, address, weight, TYPE_PASSIVE)
+ # Builder will get pinged immediately since self._last_contact == 0
+ self._ping_interval = self._BUILDER_UNAVAIL_PING_INTERVAL
- certs = None
+ self._certs = None
if self._server_cfg.get_bool("Builders", "use_ssl"):
- certs = {}
- certs['key_and_cert'] = self._server_cfg.get_str("SSL", "server_key_and_cert")
- certs['ca_cert'] = self._server_cfg.get_str("SSL", "ca_cert")
- certs['peer_ca_cert'] = self._server_cfg.get_str("SSL", "ca_cert")
-
- self._server = XMLRPCServerProxy.PlgXMLRPCServerProxy(self._address, certs, timeout=20)
- self._server_lock = threading.Lock()
-
- (self._available, target_list) = self._ping_builder()
- if self._available:
- self._init_builder(target_list)
- else:
- # Treat the builder as timed out and ping it periodically
- self._ping_timeout = time.time()
- self._suspend_reason = SUSPEND_TIMEOUT
-
- def _ping_builder(self):
- target_list = []
- try:
- target_list = self._server.supported_targets()
- alive = True
- except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, OpenSSL.SSL.Error, xmlrpclib.ProtocolError):
- alive = False
- except xmlrpclib.Fault, exc:
- print "Builder Error (%s) in _ping_builder(): builder replied '%s'" % (self._address, exc)
- alive = False
- return (alive, target_list)
-
- def _init_builder(self, target_list):
- self._target_list = target_list
- # Kill any jobs currently running on the builder
- jobs = self._building_jobs()
- for jobid in jobs.keys():
- try:
- self._server.die(jobid)
- except:
- pass
-
- self._num_slots = self._get_num_slots()
+ self._certs = {}
+ self._certs['key_and_cert'] = self._server_cfg.get_str("SSL", "server_key_and_cert")
+ self._certs['ca_cert'] = self._server_cfg.get_str("SSL", "ca_cert")
+ self._certs['peer_ca_cert'] = self._server_cfg.get_str("SSL", "ca_cert")
+
+ def _send_commands(self):
+ """Send queued commands to the builder, and then get it's list
+ of reply commands."""
- def _get_num_slots(self):
- num_slots = self._num_slots
- try:
- num_slots = self._server.num_slots()
- except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, OpenSSL.SSL.Error, xmlrpclib.ProtocolError):
- pass
- except xmlrpclib.Fault, exc:
- print "Builder Error (%s) in _get_num_slots(): builder replied '%s'" % (self._address, exc)
- return num_slots
+ # Grab some work for the builder if any is available
+ new_cmds = []
+ if self._free_slots > 0:
+ archjob = self._manager.claim_arch_job(self)
+ if archjob:
+ next_seq = self._seq_gen.next()
+ cmd = Commands.PlgCommandNewJobReq(archjob, seq=next_seq)
+ new_cmds.append(cmd)
- def _building_jobs(self):
- bjc = BuildingJobsCheck(self._server, self._address)
+ # Copy command queue
+ self._lock.acquire()
+ self._cmd_queue = self._cmd_queue + new_cmds
+ cmd_list = self._cmd_queue
+ # FIXME: deal with keeping ack-requiring cmds around
+ self._cmd_queue = []
+ self._lock.release()
+ # The actual XML-RPC request runs in a different thread because SSL
+ # calls sometimes hang
+ req = PassiveBuilderRequest(self._address, self._certs, cmd_list)
curtime = time.time()
- bjc.start()
+ req.start()
- # Give the check 10s, otherwise screw it
- while time.time() - curtime < 10:
- if bjc.done:
+ # Give the request 10s, otherwise forget about it
+ while time.time() < curtime + 10:
+ if req.done:
break
time.sleep(0.5)
- if bjc.done:
- if not bjc.failed:
- self._unavail_count = 0
- self._available = True
- self._free_slots = bjc.free_slots
- return bjc.jobs
- else:
- # Error of some kind
- self._unavail_count = self._unavail_count + 1
+ # If the request isn't done yet, force-close it to
+ # minimize chances of the commands getting through
+ # to the builder
+ if not req.done:
+ req.cancel()
+
+ response = None
+ if req.failed:
+ # Put all the commands back at the front of the queue
+ self._lock.acquire()
+ self._cmd_queue = cmd_list + self._cmd_queue
+ self._lock.release()
else:
- self._unavail_count = self._unavail_count + 1
-
- return {}
-
- def ping_asap(self):
- # Reduce the ping interval to ping the builder right away
- self._cur_ping_interval = 0
- self._ping_now = True
+ response = req.response
+ return response
- def _handle_builder_suspend(self, reason, msg):
- Builder._handle_builder_suspend(self, reason, msg)
- # Reset current ping interval to default
- self._cur_ping_interval = self._BUILDER_PING_INTERVAL
- self._ping_timeout = time.time()
-
- def _handle_builder_reactivate(self):
- Builder._handle_builder_reactivate(self)
- self._ping_timeout = 0
- self._init_builder(target_list)
+ def _dispatch_command(self, cmd, first_contact):
+ """Dispatch one command. We let the superclass dispatch the common
+ commands, and handle only those that need action specific to the
+ Passive builder type."""
- def start_job(self, req):
- if not self.available():
- raise RuntimeError
- if not self.can_build_request(req):
- raise RuntimeError
+ # The first time we contact the builder, we need to tell it to kill
+ # all jobs that are building on it. So don't let the superclass
+ # handle the first BuildingJobs command we get
+ handled = False
+ if first_contact and isinstance(cmd, Commands.PlgCommandBuildingJobs):
+ # Tell the builder to kill all jobs it might be building
+ # right now. Think server restart here.
+ for item in cmd.jobs():
+ (uniqid, status) = cmd.get_job(item)
+ cmd = Commands.PlgCommandKillJob(uniqid, self._seq_gen.next())
+ self._cmd_queue.append(cmd)
+ handled = True
- self._server_lock.acquire()
- try:
- # Builder will return jobid of 0 if it can't start the job for some reason
- srpm_url = req.srpm_url()
- target_dict = req.target_dict()
- jobid = self._server.start_new_job(target_dict, req.srpm_url())
- except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, \
- OpenSSL.SSL.Error, xmlrpclib.ProtocolError, xmlrpclib.Fault), exc:
- error_class = str(exc.__class__)
- error_string = str(exc)
- jobarch = target_dict['arch']
- parent = req.parent()
- print "%s (%s/%s): %s exception '%s' starting job on %s" % (parent.uid, \
- parent.package, jobarch, error_class, error_string, \
- self._address)
- # Check for hard errors, for which we suspend the builder
- if error_string.find("OSError") >= 0 and error_string.find("Errno") >= 0:
- self._handle_builder_suspend(SUSPEND_HARD_ERROR, error_string)
- time.sleep(0.5)
- jobid = 0
+ # Let the superclass handle what's left
+ if not handled:
+ handled = self._dispatch_common_command(cmd)
- if jobid == 0:
- self._server_lock.release()
- raise RuntimeError
-
- job = ArchJob.ArchJob(self, self._server_cfg, self._server, parent, jobid, target_dict)
- self._jobs[jobid] = job
- self._update_building_jobs()
- self._server_lock.release()
- return job
-
- def _update_building_jobs(self):
- jobs = self._building_jobs()
-
- # Update status for all jobs on this builder
- if self._unavail_count == 0:
- builder_jobs = []
- for jobid in jobs.keys():
- try:
- job = self._jobs[jobid]
- status = jobs[jobid]
- job.set_builder_status(self, status)
- builder_jobs.append(jobid)
- except KeyError:
- pass
-
- # We have to check jobs that weren't reported
- # as 'building' by the builder, since the job
- # may have finished on the builder and was
- # removed from the building job list before we
- # were able to know that it was done. HACK
- self._prepping_jobs = False
- for jobid in self._jobs.keys():
- # If the builder didn't report this job as building,
- # and its not done, explicitly grab its status
- job = self._jobs[jobid]
- if jobid not in builder_jobs and job.status() != 'done':
- status = self._get_builder_job_status(jobid)
- if status:
- job.set_builder_status(self, status)
-
- # Check for prepping jobs
- if job.prepping():
- self._prepping_jobs = True
-
- def _get_builder_job_status(self, jobid):
- """ Get the status of one job on the builder """
- status = None
- try:
- status = self._server.job_status(jobid)
- except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, OpenSSL.SSL.Error):
- pass
- except (xmlrpclib.Fault, xmlrpclib.ProtocolError):
- pass
- return status
+ # The we handle what the superclass didn't
+ if not handled:
+ if isinstance(cmd, Commands.PlgCommandJobFilesAck):
+ self._handle_job_files_ack(cmd)
+ handled = True
+ elif isinstance(cmd, Commands.PlgCommandTargets):
+ if not self._target_list:
+ self._target_list = cmd.targets()
+
+ if not handled:
+ print "Builder Error (%s): unhandled command '%s'" % (self._address, cmd.name())
+ _SLEEP_INTERVAL = 5
def run(self):
+ """foobar"""
DebugUtils.registerThreadName(self)
while not self._stop:
- self._server_lock.acquire()
-
- if self._available:
- self._update_building_jobs()
+ if self._last_contact < time.time() - self._ping_interval:
+ # Ensure the builder's IP is up-to-date
+ self._get_ip()
+
+ # Try to talk to the builder
+ print "builder contact"
+ cmd_list = self._send_commands()
+ if cmd_list:
+ # Builder is alive
+ first_contact = False
+ if not self._available:
+ self._handle_builder_reactivate()
+ first_contact = True
+ self._unavail_count = 0
- if self._unavail_count > 2:
- # Kill all jobs on the client if it went away
- self._handle_builder_suspend(SUSPEND_TIMEOUT, "the builder timed out")
+ # process builder's response
+ cmds = Commands.deserialize_command_stream(cmd_list)
+ for cmd in cmds:
+ self._dispatch_command(cmd, first_contact)
else:
- # Update status of all archjobs on this builder
- for j in self._jobs.values():
- j.process()
- elif not self._available and (self._suspend_reason == SUSPEND_TIMEOUT or self._ping_now):
- # Ping the builder every so often to see if it responds again
- if time.time() > (self._ping_timeout + self._cur_ping_interval):
- (alive, target_list) = self._ping_builder()
- if alive:
- self._handle_builder_reactivate()
- else:
- # Wait and ping again
- self._ping_timeout = time.time()
+ # Builder didn't respond
+ self._lock.acquire()
+ self._unavail_count = self._unavail_count + 1
+ if self._available and self._unavail_count > 2:
+ self._handle_builder_suspend(SUSPEND_TIMEOUT, "the builder timed out")
+ self._lock.release()
+ self._last_contact = time.time()
- # Reset current ping interval to default
- self._cur_ping_interval = self._BUILDER_PING_INTERVAL
- self._ping_now = False
+ time.sleep(self._SLEEP_INTERVAL)
- self._server_lock.release()
+ def _handle_builder_suspend(self, reason, msg):
+ Builder._handle_builder_suspend(self, reason, msg)
+ self._ping_interval = self._BUILDER_UNAVAIL_PING_INTERVAL
+ self._ip = None
+ self._target_list = None
+ # Set free slots to zero so we don't send the
+ # builder a job on first contact
+ self._free_slots = 0
- time.sleep(20)
+ # Clear out the command queue; we start clean
+ self._cmd_queue = []
+
+ def _handle_builder_reactivate(self):
+ mail = True
+ if self._suspend_reason == SUSPEND_NONE:
+ # Don't send mail saying the builder has been reactivated if
+ # this is the first time the builder has contacted us
+ mail = False
+
+ self._ping_interval = self._BUILDER_AVAIL_PING_INTERVAL
+ self._lock.acquire()
+ Builder._handle_builder_reactivate(self, mail=mail)
+ self._lock.release()
class ActiveBuilder(Builder):
@@ -619,9 +568,8 @@
_REQUIRED_CONTACT_INTERVAL = 25
- def __init__(self, manager, cfg, address, weight, btype):
- Builder.__init__(self, manager, cfg, address, weight, btype)
- self._last_contact = 0
+ def __init__(self, manager, cfg, address, weight):
+ Builder.__init__(self, manager, cfg, address, weight, TYPE_ACTIVE)
def _init_builder(self, target_list):
self._target_list = target_list
Index: BuilderManager.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/BuilderManager.py,v
retrieving revision 1.25
retrieving revision 1.26
diff -u -r1.25 -r1.26
--- BuilderManager.py 14 May 2006 05:43:07 -0000 1.25
+++ BuilderManager.py 16 May 2006 15:49:56 -0000 1.26
@@ -280,9 +280,9 @@
# Add the builder to our build list
if btype == Builder.TYPE_ACTIVE:
- builder = Builder.ActiveBuilder(self, self._cfg, address, weight, btype)
+ builder = Builder.ActiveBuilder(self, self._cfg, address, weight)
else:
- builder = Builder.PassiveBuilder(self, self._cfg, address, weight, btype)
+ builder = Builder.PassiveBuilder(self, self._cfg, address, weight)
builder.start()
self._builders.append(builder)
self._builders_lock.release()
More information about the scm-commits
mailing list