extras-buildsys/server Builder.py, 1.41, 1.42 BuilderManager.py, 1.25, 1.26

Daniel Williams (dcbw) fedora-extras-commits at redhat.com
Tue May 16 15:49:59 UTC 2006


Author: dcbw

Update of /cvs/fedora/extras-buildsys/server
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv26625/server

Modified Files:
	Builder.py BuilderManager.py 
Log Message:
2006-05-16  Dan Williams  <dcbw at redhat.com>

    * Make passive builders work somewhat more; there are still some bugs
        in command processing though.




Index: Builder.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/Builder.py,v
retrieving revision 1.41
retrieving revision 1.42
diff -u -r1.41 -r1.42
--- Builder.py	14 May 2006 05:43:07 -0000	1.41
+++ Builder.py	16 May 2006 15:49:56 -0000	1.42
@@ -62,6 +62,7 @@
         self._suspend_listeners = []
         self._status_listeners = []
         self._ip = None
+        self._last_contact = 0
 
         uri, rest = urllib.splittype(address)
         host, ignore = urllib.splithost(rest)
@@ -359,31 +360,37 @@
 
 # HACK: This class is a hack to work around SSL hanging issues,
 # which cause the whole server to grind to a halt
-class BuildingJobsCheck(threading.Thread):
-    def __init__(self, server, address):
-        self._server = server
+class PassiveBuilderRequest(threading.Thread):
+    def __init__(self, address, certs, cmds):
         self._address = address
-
+        self._certs = certs
+        self._cmds = cmds
         self.done = False
-        self.failed = False
-
+        self.failed = True
+        self.response = None
+        self._server = XMLRPCServerProxy.PlgXMLRPCServerProxy(self._address, self._certs, timeout=20)
         threading.Thread.__init__(self)
 
     def run(self):
-        self.setName("BuildingJobsCheck: %s" % self._address)
-        jobs = {}
-        free_slots = 0
+        self.setName("PassiveBuilderRequest: %s" % self._address)
         try:
-            (jobs, free_slots) = self._server.building_jobs()
-        except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, OpenSSL.SSL.Error, xmlrpclib.ProtocolError):
-            self.failed = True
-        except xmlrpclib.Fault, exc:
-            print "Builder Error (%s) in _building_jobs(): builder replied '%s'" % (self._address, exc)
-            self.failed = True
-        self.jobs = jobs
-        self.free_slots = free_slots
+            cmd_stream = Commands.serialize_to_command_stream(self._cmds)
+            self.response = self._server.request(cmd_stream)
+            self.failed = False
+        except (socket.timeout, OpenSSL.SSL.SysCallError, OpenSSL.SSL.Error,
+                xmlrpclib.ProtocolError, xmlrpclib.Fault), exc:
+            print "PassiveBuilder(%s) Error in request(): '%s'" % (self._address, exc)
+        except socket.error, exc:
+            if exc[0] != 111:
+                print "PassiveBuilder(%s) Error in request(): '%s'" % (self._address, exc)
         self.done = True
 
+    def close(self):
+        try:
+            self._server.close()
+        except:
+            pass
+
 class PassiveBuilder(Builder):
     """
     Passive builders are ones that do not initiate connections.  They
@@ -391,223 +398,165 @@
     a firewall without having holes punched through it.
     """
 
-    _BUILDER_PING_INTERVAL = 60 * 5      # In seconds
+    # How often we try to contact unavailable builders
+    _BUILDER_UNAVAIL_PING_INTERVAL = 300      # 5 minutes (in seconds)
 
-    def __init__(self, manager, cfg, address, weight, btype):
-        Builder.__init__(self, manager, cfg, address, weight, btype)
+    # How often we try to contact available builders
+    _BUILDER_AVAIL_PING_INTERVAL = 20
 
-        self._ping_timeout = 0
-        self._cur_ping_interval = self._BUILDER_PING_INTERVAL
-        self._ping_now = False
+    def __init__(self, manager, cfg, address, weight):
+        Builder.__init__(self, manager, cfg, address, weight, TYPE_PASSIVE)
+        # Builder will get pinged immediately since self._last_contact == 0
+        self._ping_interval = self._BUILDER_UNAVAIL_PING_INTERVAL
 
-        certs = None
+        self._certs = None
         if self._server_cfg.get_bool("Builders", "use_ssl"):
-            certs = {}
-            certs['key_and_cert'] = self._server_cfg.get_str("SSL", "server_key_and_cert")
-            certs['ca_cert'] = self._server_cfg.get_str("SSL", "ca_cert")
-            certs['peer_ca_cert'] = self._server_cfg.get_str("SSL", "ca_cert")
-
-        self._server = XMLRPCServerProxy.PlgXMLRPCServerProxy(self._address, certs, timeout=20)
-        self._server_lock = threading.Lock()
-
-        (self._available, target_list) = self._ping_builder()
-        if self._available:
-            self._init_builder(target_list)
-        else:
-            # Treat the builder as timed out and ping it periodically
-            self._ping_timeout = time.time()
-            self._suspend_reason = SUSPEND_TIMEOUT
-
-    def _ping_builder(self):
-        target_list = []
-        try:
-            target_list = self._server.supported_targets()
-            alive = True
-        except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, OpenSSL.SSL.Error, xmlrpclib.ProtocolError):
-            alive = False
-        except xmlrpclib.Fault, exc:
-            print "Builder Error (%s) in _ping_builder(): builder replied '%s'" % (self._address, exc)
-            alive = False
-        return (alive, target_list)
-
-    def _init_builder(self, target_list):
-        self._target_list = target_list
-        # Kill any jobs currently running on the builder
-        jobs = self._building_jobs()
-        for jobid in jobs.keys():
-            try:
-                self._server.die(jobid)
-            except:
-                pass
-
-        self._num_slots = self._get_num_slots()
+            self._certs = {}
+            self._certs['key_and_cert'] = self._server_cfg.get_str("SSL", "server_key_and_cert")
+            self._certs['ca_cert'] = self._server_cfg.get_str("SSL", "ca_cert")
+            self._certs['peer_ca_cert'] = self._server_cfg.get_str("SSL", "ca_cert")
+
+    def _send_commands(self):
+        """Send queued commands to the builder, and then get it's list
+        of reply commands."""
 
-    def _get_num_slots(self):
-        num_slots = self._num_slots
-        try:
-            num_slots = self._server.num_slots()
-        except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, OpenSSL.SSL.Error, xmlrpclib.ProtocolError):
-            pass
-        except xmlrpclib.Fault, exc:
-            print "Builder Error (%s) in _get_num_slots(): builder replied '%s'" % (self._address, exc)
-        return num_slots
+        # Grab some work for the builder if any is available
+        new_cmds = []
+        if self._free_slots > 0:
+            archjob = self._manager.claim_arch_job(self)
+            if archjob:
+                next_seq = self._seq_gen.next()
+                cmd = Commands.PlgCommandNewJobReq(archjob, seq=next_seq)
+                new_cmds.append(cmd)
 
-    def _building_jobs(self):
-        bjc = BuildingJobsCheck(self._server, self._address)
+        # Copy command queue
+        self._lock.acquire()
+        self._cmd_queue = self._cmd_queue + new_cmds
+        cmd_list = self._cmd_queue
+        # FIXME: deal with keeping ack-requiring cmds around
+        self._cmd_queue = []
+        self._lock.release()
 
+        # The actual XML-RPC request runs in a different thread because SSL
+        # calls sometimes hang 
+        req = PassiveBuilderRequest(self._address, self._certs, cmd_list)
         curtime = time.time()
-        bjc.start()
+        req.start()
 
-        # Give the check 10s, otherwise screw it
-        while time.time() - curtime < 10:
-            if bjc.done:
+        # Give the request 10s, otherwise forget about it
+        while time.time() < curtime + 10:
+            if req.done:
                 break
             time.sleep(0.5)
 
-        if bjc.done:
-            if not bjc.failed:
-                self._unavail_count = 0
-                self._available = True
-                self._free_slots = bjc.free_slots
-                return bjc.jobs
-            else:
-                # Error of some kind
-                self._unavail_count = self._unavail_count + 1
+        # If the request isn't done yet, force-close it to
+        # minimize chances of the commands getting through
+        # to the builder
+        if not req.done:
+            req.cancel()
+
+        response = None
+        if req.failed:
+            # Put all the commands back at the front of the queue
+            self._lock.acquire()
+            self._cmd_queue = cmd_list + self._cmd_queue
+            self._lock.release()
         else:
-            self._unavail_count = self._unavail_count + 1
-
-        return {}
-
-    def ping_asap(self):
-        # Reduce the ping interval to ping the builder right away
-        self._cur_ping_interval = 0
-        self._ping_now = True
+            response = req.response
+        return response
 
-    def _handle_builder_suspend(self, reason, msg):
-        Builder._handle_builder_suspend(self, reason, msg)
-        # Reset current ping interval to default
-        self._cur_ping_interval = self._BUILDER_PING_INTERVAL
-        self._ping_timeout = time.time()
-
-    def _handle_builder_reactivate(self):
-        Builder._handle_builder_reactivate(self)
-        self._ping_timeout = 0
-        self._init_builder(target_list)
+    def _dispatch_command(self, cmd, first_contact):
+        """Dispatch one command.  We let the superclass dispatch the common
+        commands, and handle only those that need action specific to the
+        Passive builder type."""
 
-    def start_job(self, req):
-        if not self.available():
-            raise RuntimeError
-        if not self.can_build_request(req):
-            raise RuntimeError
+        # The first time we contact the builder, we need to tell it to kill
+        # all jobs that are building on it.  So don't let the superclass
+        # handle the first BuildingJobs command we get
+        handled = False
+        if first_contact and isinstance(cmd, Commands.PlgCommandBuildingJobs):
+            # Tell the builder to kill all jobs it might be building
+            # right now.  Think server restart here.
+            for item in cmd.jobs():
+                (uniqid, status) = cmd.get_job(item)
+                cmd = Commands.PlgCommandKillJob(uniqid, self._seq_gen.next())
+                self._cmd_queue.append(cmd)
+            handled = True
 
-        self._server_lock.acquire()
-        try:
-            # Builder will return jobid of 0 if it can't start the job for some reason
-            srpm_url = req.srpm_url()
-            target_dict = req.target_dict()
-            jobid = self._server.start_new_job(target_dict, req.srpm_url())
-        except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, \
-                OpenSSL.SSL.Error, xmlrpclib.ProtocolError, xmlrpclib.Fault), exc:
-            error_class = str(exc.__class__)
-            error_string = str(exc)
-            jobarch = target_dict['arch']
-            parent = req.parent()
-            print "%s (%s/%s): %s exception '%s' starting job on %s" % (parent.uid, \
-                parent.package, jobarch, error_class, error_string, \
-                self._address)
-            # Check for hard errors, for which we suspend the builder
-            if error_string.find("OSError") >= 0 and error_string.find("Errno") >= 0:
-                self._handle_builder_suspend(SUSPEND_HARD_ERROR, error_string)
-            time.sleep(0.5)
-            jobid = 0
+        # Let the superclass handle what's left
+        if not handled:
+            handled = self._dispatch_common_command(cmd)
 
-        if jobid == 0:
-            self._server_lock.release()
-            raise RuntimeError
-
-        job = ArchJob.ArchJob(self, self._server_cfg, self._server, parent, jobid, target_dict)
-        self._jobs[jobid] = job
-        self._update_building_jobs()
-        self._server_lock.release()
-        return job
-
-    def _update_building_jobs(self):
-        jobs = self._building_jobs()
-
-        # Update status for all jobs on this builder
-        if self._unavail_count == 0:
-            builder_jobs = []
-            for jobid in jobs.keys():
-                try:
-                    job = self._jobs[jobid]
-                    status = jobs[jobid]
-                    job.set_builder_status(self, status)
-                    builder_jobs.append(jobid)
-                except KeyError:
-                    pass
-
-            # We have to check jobs that weren't reported
-            # as 'building' by the builder, since the job
-            # may have finished on the builder and was
-            # removed from the building job list before we
-            # were able to know that it was done.  HACK
-            self._prepping_jobs = False
-            for jobid in self._jobs.keys():
-                # If the builder didn't report this job as building,
-                # and its not done, explicitly grab its status
-                job = self._jobs[jobid]
-                if jobid not in builder_jobs and job.status() != 'done':
-                    status = self._get_builder_job_status(jobid)
-                    if status:
-                        job.set_builder_status(self, status)
-
-                # Check for prepping jobs
-                if job.prepping():
-                    self._prepping_jobs = True
-
-    def _get_builder_job_status(self, jobid):
-        """ Get the status of one job on the builder """
-        status = None
-        try:
-            status = self._server.job_status(jobid)
-        except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, OpenSSL.SSL.Error):
-            pass
-        except (xmlrpclib.Fault, xmlrpclib.ProtocolError):
-            pass
-        return status
+        # The we handle what the superclass didn't
+        if not handled:
+            if isinstance(cmd, Commands.PlgCommandJobFilesAck):
+                self._handle_job_files_ack(cmd)
+                handled = True
+            elif isinstance(cmd, Commands.PlgCommandTargets):
+                if not self._target_list:
+                    self._target_list = cmd.targets()
+                    
+        if not handled:
+            print "Builder Error (%s): unhandled command '%s'" % (self._address, cmd.name())
 
+    _SLEEP_INTERVAL = 5
     def run(self):
+        """foobar"""
         DebugUtils.registerThreadName(self)
         while not self._stop:
-            self._server_lock.acquire()
-
-            if self._available:
-                self._update_building_jobs()
+            if self._last_contact < time.time() - self._ping_interval:
+                # Ensure the builder's IP is up-to-date
+                self._get_ip()
+
+                # Try to talk to the builder
+                print "builder contact"
+                cmd_list = self._send_commands()
+                if cmd_list:
+                    # Builder is alive
+                    first_contact = False
+                    if not self._available:
+                        self._handle_builder_reactivate()
+                        first_contact = True
+                    self._unavail_count = 0
 
-                if self._unavail_count > 2:
-                    # Kill all jobs on the client if it went away
-                    self._handle_builder_suspend(SUSPEND_TIMEOUT, "the builder timed out")
+                    # process builder's response
+                    cmds = Commands.deserialize_command_stream(cmd_list)
+                    for cmd in cmds:
+                        self._dispatch_command(cmd, first_contact)
                 else:
-                    # Update status of all archjobs on this builder
-                    for j in self._jobs.values():
-                        j.process()
-            elif not self._available and (self._suspend_reason == SUSPEND_TIMEOUT or self._ping_now):
-                # Ping the builder every so often to see if it responds again
-                if time.time() > (self._ping_timeout + self._cur_ping_interval):
-                    (alive, target_list) = self._ping_builder()
-                    if alive:
-                        self._handle_builder_reactivate()
-                    else:
-                        # Wait and ping again
-                        self._ping_timeout = time.time()
+                    # Builder didn't respond
+                    self._lock.acquire()
+                    self._unavail_count = self._unavail_count + 1
+                    if self._available and self._unavail_count > 2:
+                        self._handle_builder_suspend(SUSPEND_TIMEOUT, "the builder timed out")
+                    self._lock.release()
+                self._last_contact = time.time()
 
-                    # Reset current ping interval to default
-                    self._cur_ping_interval = self._BUILDER_PING_INTERVAL
-                    self._ping_now = False
+            time.sleep(self._SLEEP_INTERVAL)
 
-            self._server_lock.release()
+    def _handle_builder_suspend(self, reason, msg):
+        Builder._handle_builder_suspend(self, reason, msg)
+        self._ping_interval = self._BUILDER_UNAVAIL_PING_INTERVAL
+        self._ip = None
+        self._target_list = None
+        # Set free slots to zero so we don't send the
+        # builder a job on first contact
+        self._free_slots = 0
 
-            time.sleep(20)
+        # Clear out the command queue; we start clean
+        self._cmd_queue = []
+
+    def _handle_builder_reactivate(self):
+        mail = True
+        if self._suspend_reason == SUSPEND_NONE:
+            # Don't send mail saying the builder has been reactivated if
+            # this is the first time the builder has contacted us
+            mail = False
+
+        self._ping_interval = self._BUILDER_AVAIL_PING_INTERVAL
+        self._lock.acquire()
+        Builder._handle_builder_reactivate(self, mail=mail)
+        self._lock.release()
 
 
 class ActiveBuilder(Builder):
@@ -619,9 +568,8 @@
 
     _REQUIRED_CONTACT_INTERVAL = 25
 
-    def __init__(self, manager, cfg, address, weight, btype):
-        Builder.__init__(self, manager, cfg, address, weight, btype)
-        self._last_contact = 0
+    def __init__(self, manager, cfg, address, weight):
+        Builder.__init__(self, manager, cfg, address, weight, TYPE_ACTIVE)
 
     def _init_builder(self, target_list):
         self._target_list = target_list


Index: BuilderManager.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/BuilderManager.py,v
retrieving revision 1.25
retrieving revision 1.26
diff -u -r1.25 -r1.26
--- BuilderManager.py	14 May 2006 05:43:07 -0000	1.25
+++ BuilderManager.py	16 May 2006 15:49:56 -0000	1.26
@@ -280,9 +280,9 @@
 
             # Add the builder to our build list
             if btype == Builder.TYPE_ACTIVE:
-                builder = Builder.ActiveBuilder(self, self._cfg, address, weight, btype)
+                builder = Builder.ActiveBuilder(self, self._cfg, address, weight)
             else:
-                builder = Builder.PassiveBuilder(self, self._cfg, address, weight, btype)
+                builder = Builder.PassiveBuilder(self, self._cfg, address, weight)
             builder.start()
             self._builders.append(builder)
         self._builders_lock.release()




More information about the scm-commits mailing list