extras-buildsys/server BuildMaster.py, 1.39, 1.40 Builder.py, 1.33, 1.34 BuilderManager.py, 1.21, 1.22 Config.py, 1.16, 1.17 main.py, 1.20, 1.21
Daniel Williams (dcbw)
fedora-extras-commits at redhat.com
Fri Apr 28 03:17:43 UTC 2006
- Previous message: extras-buildsys/common Commands.py, NONE, 1.1 HTTPServer.py, 1.12, 1.13 Makefile, 1.11, 1.12 URLopener.py, 1.1, 1.2
- Next message: owners owners.list,1.910,1.911
- Messages sorted by:
[ date ]
[ thread ]
[ subject ]
[ author ]
Author: dcbw
Update of /cvs/fedora/extras-buildsys/server
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv5292/server
Modified Files:
BuildMaster.py Builder.py BuilderManager.py Config.py main.py
Log Message:
2006-04-27 Dan Williams <dcbw at redhat.com>
Commit partial rework of builder<->server communcation.
Index: BuildMaster.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/BuildMaster.py,v
retrieving revision 1.39
retrieving revision 1.40
diff -u -r1.39 -r1.40
--- BuildMaster.py 24 Mar 2006 19:13:41 -0000 1.39
+++ BuildMaster.py 28 Apr 2006 03:17:41 -0000 1.40
@@ -375,9 +375,6 @@
have_work = True
self._archjob_status_updates_lock.release()
- if not have_work and self.builder_manager.have_work(self._paused):
- have_work = True
-
return have_work
def get_job(self, uid):
@@ -401,9 +398,6 @@
# Write update status for jobs to the database
self._save_job_status()
- if self.builder_manager.have_work(self._paused):
- self.builder_manager.process(self._paused)
-
# Clean up jobs that have finished
self._process_finished_jobs()
Index: Builder.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/Builder.py,v
retrieving revision 1.33
retrieving revision 1.34
diff -u -r1.33 -r1.34
--- Builder.py 20 Mar 2006 12:43:21 -0000 1.33
+++ Builder.py 28 Apr 2006 03:17:41 -0000 1.34
@@ -22,6 +22,7 @@
import os
import urllib
import threading
+from plague import Commands
from plague import XMLRPCServerProxy
from plague import CommonErrors
import OpenSSL
@@ -34,6 +35,152 @@
SUSPEND_TIMEOUT = 'timeout'
SUSPEND_HARD_ERROR = 'hard-error'
+TYPE_PASSIVE = 1
+TYPE_ACTIVE = 2
+
+
+class Builder(threading.Thread):
+ """ Tracks all jobs on a builder instance """
+
+ def __init__(self, manager, cfg, address, weight, btype):
+ self._manager = manager
+ self._jobs = {}
+ self._free_slots = 0
+ self._num_slots = 0
+ self._address = address
+ self._available = False
+ self._suspend_reason = SUSPEND_NONE
+ self._stop = False
+ self._prepping_jobs = False
+ self._unavail_count = 0
+ self._target_list = []
+ self._when_died = 0
+ self._server_cfg = cfg
+ self._weight = weight
+ self._type = btype
+ self._seq_gen = Commands.SequenceGenerator()
+
+ try:
+ type, rest = urllib.splittype(address)
+ host, ignore = urllib.splithost(rest)
+ host, port = urllib.splitport(host)
+ self._ip = socket.gethostbyname(host)
+ except Exception, e:
+ print "Builder Error(%s): couldn't lookup builder's IP address." % address
+ raise Exception(e)
+
+ threading.Thread.__init__(self)
+ self.setName("Builder: %s" % address)
+
+ def _match_target_dict(self, td1, td2):
+ if td1['distro'] == td2['distro']:
+ if td1['target'] == td2['target']:
+ if td1['repo'] == td2['repo']:
+ return True
+ return False
+
+ def arches(self, target_dict):
+ for td in self._target_list:
+ if self._match_target_dict(td, target_dict):
+ arches = []
+ for arch in td['supported_arches']:
+ if not arch in arches:
+ arches.append(arch)
+ return arches
+ return None
+
+ def can_build_for_target(self, target_dict):
+ for td in self._target_list:
+ if self._match_target_dict(td, target_dict):
+ if target_dict['arch'] in td['supported_arches']:
+ return True
+ return False
+
+ def address(self):
+ return (self._ip, self._address)
+
+ def available(self):
+ """ Is the builder responding to requests? """
+ return self._available
+
+ def free_slots(self):
+ return self._free_slots
+
+ def weight(self):
+ return self._weight
+
+ def type(self):
+ return self._type
+
+ def stop(self):
+ self._stop = True
+
+ def _handle_builder_suspend(self, reason, msg):
+ for jobid in self._jobs.keys():
+ job = self._jobs[jobid]
+ job.builder_gone()
+ del self._jobs[jobid]
+ self._jobs = {}
+ self._available = False
+ self._suspend_reason = reason
+ self._unavail_count = 0
+ self._prepping_jobs = False
+ self._when_died = time.time()
+
+ # Notify admins
+ print "Suspending builder '%s'. Reason: %s - %s." % (self._address, reason, msg)
+ subject = "Builder Suspended: %s" % self._address
+ msg = "The builder '%s' was suspended. Reason: %s - %s." % (self._address, reason, msg)
+ sender = self._server_cfg.get_str("Email", "email_from")
+ for addr in self._server_cfg.get_list("Email", "admin_emails"):
+ EmailUtils.email_result(sender, addr, msg, subject)
+
+ def _handle_builder_reactivate(self, mail=False):
+ self._available = True
+ self._suspend_reason = SUSPEND_NONE
+
+ print "Re-activating builder '%s'." % self._address
+
+ if mail:
+ subject = "Builder Re-activated: %s" % self._address
+ msg = """The builder '%s' was re-activated.
+
+ Suspended at: %s
+ Re-Enabled at: %s
+""" % (self._address, time.ctime(self._when_died), time.ctime(time.time()))
+ sender = self._server_cfg.get_str("Email", "email_from")
+ for addr in self._server_cfg.get_list("Email", "admin_emails"):
+ EmailUtils.email_result(sender, addr, msg, subject)
+ self._when_died = 0
+
+ def any_prepping_jobs(self):
+ return self._prepping_jobs
+
+ def to_dict(self):
+ builder_dict = {}
+
+ addr = self._address
+ # for some reason, splithost doesn't like the protocol
+ # method, you have to give it a string starting with "//"
+ if addr.startswith("http"):
+ idx = addr.find('//')
+ addr = addr[idx:]
+ host_port, path = urllib.splithost(addr)
+ host, port = urllib.splitport(host_port)
+ builder_dict['address'] = host
+
+ arches = []
+ for td in self._target_list:
+ for arch in td['supported_arches']:
+ if not arch in arches:
+ arches.append(arch)
+ builder_dict['arches'] = arches
+
+ builder_dict['available'] = self._available
+ builder_dict['num_slots'] = self._num_slots
+ builder_dict['free_slots'] = self._free_slots
+ return builder_dict
+
# HACK: This class is a hack to work around SSL hanging issues,
# which cause the whole server to grind to a halt
class BuildingJobsCheck(threading.Thread):
@@ -61,31 +208,21 @@
self.free_slots = free_slots
self.done = True
-
-class Builder(threading.Thread):
- """ Tracks all jobs on a builder instance """
+class PassiveBuilder(Builder):
+ """
+ Passive builders are ones that do not initiate connections. They
+ wait for the server to contact them, and therefore cannot be behind
+ a firewall without having holes punched through it.
+ """
_BUILDER_PING_INTERVAL = 60 * 5 # In seconds
- def __init__(self, manager, cfg, address, weight):
- self._cur_jobid = None
- self._manager = manager
- self._jobs = {}
- self._free_slots = 0
- self._num_slots = 0
- self._address = address
- self._alive = True
- self._suspend_reason = SUSPEND_NONE
- self._stop = False
- self._prepping_jobs = False
- self._unavail_count = 0
- self._target_list = []
+ def __init__(self, manager, cfg, address, weight, btype):
+ Builder.__init__(self, manager, cfg, address, weight, btype)
+
self._ping_timeout = 0
self._cur_ping_interval = self._BUILDER_PING_INTERVAL
self._ping_now = False
- self._when_died = 0
- self._server_cfg = cfg
- self._weight = weight
certs = None
if self._server_cfg.get_bool("Builders", "use_ssl"):
@@ -97,17 +234,26 @@
self._server = XMLRPCServerProxy.PlgXMLRPCServerProxy(self._address, certs, timeout=20)
self._server_lock = threading.Lock()
- threading.Thread.__init__(self)
- self.setName("Builder: %s" % address)
-
- (self._alive, target_list) = self._ping_builder()
- if self._alive:
+ (self._available, target_list) = self._ping_builder()
+ if self._available:
self._init_builder(target_list)
else:
# Treat the builder as timed out and ping it periodically
self._ping_timeout = time.time()
self._suspend_reason = SUSPEND_TIMEOUT
+ def _ping_builder(self):
+ target_list = []
+ try:
+ target_list = self._server.supported_targets()
+ alive = True
+ except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, OpenSSL.SSL.Error, xmlrpclib.ProtocolError):
+ alive = False
+ except xmlrpclib.Fault, e:
+ print "Builder Error (%s) in _ping_builder(): builder replied '%s'" % (self._address, e)
+ alive = False
+ return (alive, target_list)
+
def _init_builder(self, target_list):
self._target_list = target_list
@@ -146,7 +292,7 @@
if bjc.done:
if not bjc.failed:
self._unavail_count = 0
- self._alive = True
+ self._available = True
self._free_slots = bjc.free_slots
return bjc.jobs
else:
@@ -157,55 +303,22 @@
return {}
- def _ping_builder(self):
- target_list = []
- try:
- target_list = self._server.supported_targets()
- alive = True
- except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, OpenSSL.SSL.Error, xmlrpclib.ProtocolError):
- alive = False
- except xmlrpclib.Fault, e:
- print "Builder Error (%s) in _ping_builder(): builder replied '%s'" % (self._address, e)
- alive = False
- return (alive, target_list)
-
- def _match_target_dict(self, td1, td2):
- if td1['distro'] == td2['distro']:
- if td1['target'] == td2['target']:
- if td1['repo'] == td2['repo']:
- return True
- return False
-
- def arches(self, target_dict):
- for td in self._target_list:
- if self._match_target_dict(td, target_dict):
- arches = []
- for arch in td['supported_arches']:
- if not arch in arches:
- arches.append(arch)
- return arches
- return None
-
- def can_build_for_target(self, target_dict):
- for td in self._target_list:
- if self._match_target_dict(td, target_dict):
- if target_dict['arch'] in td['supported_arches']:
- return True
- return False
-
- def address(self):
- return self._address
+ def ping_asap(self):
+ # Reduce the ping interval to ping the builder right away
+ self._cur_ping_interval = 0
+ self._ping_now = True
- def alive(self):
- """ Is the builder responding to requests? """
- return self._alive
+ def _handle_builder_suspend(self, reason, msg):
+ Builder._handle_builder_suspend(self, reason, msg)
+ # Reset current ping interval to default
+ self._cur_ping_interval = self._BUILDER_PING_INTERVAL
+ self._ping_timeout = time.time()
- def free_slots(self):
- return self._free_slots
+ def _handle_builder_reactivate(self):
+ Builder._handle_builder_reactivate(self)
+ self._ping_timeout = 0
+ self._init_builder(target_list)
- def weight(self):
- return self._weight
-
def start_job(self, par_job, target_dict, srpm_url):
if not self.available():
raise RuntimeError
@@ -285,63 +398,12 @@
pass
return status
- def stop(self):
- self._stop = True
-
- def ping_asap(self):
- # Reduce the ping interval to ping the builder right away
- self._cur_ping_interval = 0
- self._ping_now = True
-
- def _handle_builder_suspend(self, reason, msg):
- for jobid in self._jobs.keys():
- job = self._jobs[jobid]
- job.builder_gone()
- del self._jobs[jobid]
- self._jobs = {}
- self._alive = False
- self._suspend_reason = reason
- self._unavail_count = 0
- self._prepping_jobs = False
- self._when_died = time.time()
-
- # Reset current ping interval to default
- self._cur_ping_interval = self._BUILDER_PING_INTERVAL
- self._ping_timeout = time.time()
-
- # Notify admins
- print "Suspending builder '%s'. Reason: %s - %s." % (self._address, reason, msg)
- subject = "Builder Suspended: %s" % self._address
- msg = "The builder '%s' was suspended. Reason: %s - %s." % (self._address, reason, msg)
- sender = self._server_cfg.get_str("Email", "email_from")
- for addr in self._server_cfg.get_list("Email", "admin_emails"):
- EmailUtils.email_result(sender, addr, msg, subject)
-
- def _handle_builder_reactivate(self, target_list):
- self._alive = True
- self._suspend_reason = SUSPEND_NONE
- self._ping_timeout = 0
-
- self._init_builder(target_list)
-
- print "Re-activating builder '%s'." % self._address
- subject = "Builder Re-activated: %s" % self._address
- msg = """The builder '%s' was re-activated.
-
- Suspended at: %s
- Re-Enabled at: %s
-""" % (self._address, time.ctime(self._when_died), time.ctime(time.time()))
- sender = self._server_cfg.get_str("Email", "email_from")
- for addr in self._server_cfg.get_list("Email", "admin_emails"):
- EmailUtils.email_result(sender, addr, msg, subject)
- self._when_died = 0
-
def run(self):
DebugUtils.registerThreadName(self)
while not self._stop:
self._server_lock.acquire()
- if self._alive:
+ if self._available:
self._update_building_jobs()
if self._unavail_count > 2:
@@ -351,12 +413,12 @@
# Update status of all archjobs on this builder
for j in self._jobs.values():
j.process()
- elif not self._alive and (self._suspend_reason == SUSPEND_TIMEOUT or self._ping_now):
+ elif not self._available and (self._suspend_reason == SUSPEND_TIMEOUT or self._ping_now):
# Ping the builder every so often to see if it responds again
if time.time() > (self._ping_timeout + self._cur_ping_interval):
(alive, target_list) = self._ping_builder()
if alive:
- self._handle_builder_reactivate(target_list)
+ self._handle_builder_reactivate()
else:
# Wait and ping again
self._ping_timeout = time.time()
@@ -368,40 +430,131 @@
self._server_lock.release()
time.sleep(20)
-
- def available(self):
- """
- Can the builder start a new job right now?
- """
- if self._unavail_count > 2 or not self._alive or self.free_slots() <= 0:
- return False
- return True
- def any_prepping_jobs(self):
- return self._prepping_jobs
- def to_dict(self):
- builder_dict = {}
+class ActiveBuilder(Builder):
+ """
+ Active builders are ones which attempt to contact the build server
+ by themselves. Therefore, they can be behind a firewall without
+ punching holes through it.
+ """
+
+ _REQUIRED_CONTACT_INTERVAL = 20
+
+ def __init__(self, manager, cfg, address, weight, btype):
+ Builder.__init__(self, manager, cfg, address, weight, btype)
+ self._last_contact = 0
+ self._lock = threading.Lock()
+ self._cmd_queue = []
- addr = self._address
- # for some reason, splithost doesn't like the protocol
- # method, you have to give it a string starting with "//"
- if addr.startswith("http"):
- idx = addr.find('//')
- addr = addr[idx:]
- host_port, path = urllib.splithost(addr)
- host, port = urllib.splitport(host_port)
- builder_dict['address'] = host
+ def _init_builder(self, target_list):
+ self._target_list = target_list
- arches = []
- for td in self._target_list:
- for arch in td['supported_arches']:
- if not arch in arches:
- arches.append(arch)
- builder_dict['arches'] = arches
+ def _handle_new_job_ack(self, ack):
+ """Handle a NewJobAck command by finding the original command
+ sent to the builder, removing it from the command queue, and notifying
+ the parent job that this archjob is now in progress."""
+
+ old_cmd = None
+ self._lock.acquire()
+ for old_cmd in self._cmd_queue:
+ if old_cmd.seq() == ack.acked_seq() and isinstance(old_cmd, Commands.PlgCommandNewJobReq):
+ self._cmd_queue.remove(old_cmd)
+ break
+ self._lock.release()
- builder_dict['alive'] = self._alive
- builder_dict['num_slots'] = self._num_slots
- builder_dict['free_slots'] = self._free_slots
- return builder_dict
+ if old_cmd:
+ parent = old_cmd.parent_job()
+ archjob = ArchJob.ArchJob(self, parent, ack.archjob_id(), old_cmd.target_dict())
+ self._jobs[jobid] = archjob
+ parent.add_arch_job(archjob)
+
+ def _dispatch_command(self, cmd):
+ name = cmd.name()
+ if isinstance(cmd, Commands.PlgCommandSlots):
+ self._lock.acquire()
+ self._free_slots = cmd.free_slots()
+ self._num_slots = cmd.max_slots()
+ self._lock.release()
+ elif isinstance(cmd, Commands.PlgCommandTargets):
+ self._lock.acquire()
+ self._target_list = cmd.targets()
+ self._lock.release()
+ elif isinstance(cmd, Commands.PlgCommandNewJobAck):
+ self._handle_new_job_ack(cmd)
+ else:
+ print "Builder Error (%s): unhandled command '%s'" % (self._address, cmd.name())
+
+ def request(self, cmd_list):
+ """Process and respond to an active builder's request. Called
+ from the BuildMaster's XML-RPC server."""
+
+ self._last_contact = time.time()
+ if not self._available:
+ self._handle_builder_reactivate(cmd_list)
+
+ # Process the commands the builder sent us
+ for cmd in cmd_list:
+ self._dispatch_command(cmd)
+
+ # Grab some work for the builder if any is available
+ new_cmds = []
+ if self._free_slots > 0:
+ req = self._manager.claim_arch_job(self)
+ if req:
+ next_seq = self._seq_gen.next()
+ cmd = Commands.PlgCommandNewJobReq(req['parent'], req['target_dict'], req['srpm_url'], next_seq)
+ new_cmds.append(cmd)
+
+ self._lock.acquire()
+ # Copy command queue
+ self._cmd_queue = self._cmd_queue + new_cmds
+ cmd_list = self._cmd_queue[:]
+ self._lock.release()
+ return cmd_list
+
+ _SLEEP_INTERVAL = 10
+ def run(self):
+ """Main builder loop. Since the builder contacts us,
+ we don't have to do much here except handle builders
+ going away."""
+ DebugUtils.registerThreadName(self)
+ while not self._stop:
+ if not self._available:
+ time.sleep(self._SLEEP_INTERVAL)
+ continue
+
+ self._lock.acquire()
+ if self._unavail_count > 2:
+ self._handle_builder_suspend(SUSPEND_TIMEOUT, "the builder timed out")
+ elif self._last_contact + self._REQUIRED_CONTACT_INTERVAL < time.time():
+ self._unavail_count = self._unavail_count + 1
+ self._lock.release()
+
+ time.sleep(self._SLEEP_INTERVAL)
+
+ def _handle_builder_suspend(self, reason, msg):
+ Builder._handle_builder_suspend(self, reason, msg)
+ self._last_contact = 0
+
+ def _handle_builder_reactivate(self, cmd_list):
+ # Grab an updated target list from the command stream when
+ # the builder contacts us
+ target_list = None
+ for cmd in cmd_list:
+ if isinstance(cmd, Commands.PlgCommandTargets):
+ target_list = cmd.targets()
+ if not target_list:
+ target_list = self._target_list
+
+ mail = True
+ if self._suspend_reason == SUSPEND_NONE:
+ # Don't send mail saying the builder has been reactivated if
+ # this is the first time the builder has contacted us
+ mail = False
+
+ self._lock.acquire()
+ Builder._handle_builder_reactivate(self, mail=mail)
+ self._init_builder(target_list)
+ self._lock.release()
Index: BuilderManager.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/BuilderManager.py,v
retrieving revision 1.21
retrieving revision 1.22
diff -u -r1.21 -r1.22
--- BuilderManager.py 20 Mar 2006 20:05:11 -0000 1.21
+++ BuilderManager.py 28 Apr 2006 03:17:41 -0000 1.22
@@ -24,91 +24,230 @@
import Builder
import EmailUtils
import Config
+import time
+from plague import DebugUtils
+from plague import AuthedXMLRPCServer
+from plague import HTTPServer
+from plague import Commands
+
+
+class AddrCache(object):
+ def __init__(self):
+ self._cache = {}
+
+ def get(self, name):
+ # Expire cache entry if one exists and is old
+ time = ip = None
+ try:
+ (time, ip) = self._cache[name]
+ if time < time.time() - (60 * 60):
+ del self._cache[name]
+ time = ip = None
+ except KeyError:
+ pass
+
+ # Do a lookup and cache it
+ if not ip:
+ try:
+ ip = socket.gethostbyname(name)
+ self._cache[name] = (time.time(), ip)
+ except:
+ pass
+
+ return ip
+
+class AuthedSSLBuilderServer(AuthedXMLRPCServer.AuthedSSLXMLRPCServer):
+ """ SSL XMLRPC server that authenticates builders based on their certificate. """
+ def __init__(self, address, certs, builder_manager):
+ AuthedXMLRPCServer.AuthedSSLXMLRPCServer.__init__(self, address, self.auth_cb, certs)
+ self.authenticator = builder_manager
+ self._addr_cache = AddrCache()
+
+ def auth_cb(self, request, con_addr_pair):
+ peer_cert = request.get_peer_certificate()
+ cert_address = peer_cert.get_subject().commonName
+ try:
+ (con_address, con_port) = con_addr_pair
+ cert_ip = self._addr_cache.get(cert_address)
+ con_ip = self._addr_cache.get(con_address)
+ builder = self.authenticator.get_builder(cert_ip, con_ip)
+ if builder.type() is not Builder.TYPE_ACTIVE:
+ builder = None
+ except Exception:
+ builder = None
+ return builder
+
+class AuthedBuilderServer(AuthedXMLRPCServer.AuthedXMLRPCServer):
+ """ Authenticates builders based on their IP address. """
+ def __init__(self, address, builder_manager):
+ AuthedXMLRPCServer.AuthedXMLRPCServer.__init__(self, address, self.auth_cb)
+ self.authenticator = builder_manager
+ self._addr_cache = AddrCache()
+
+ def auth_cb(self, request, con_addr_pair):
+ try:
+ (con_address, con_port) = con_addr_pair
+ ip = self._addr_cache.get(con_address)
+ builder = self.authenticator.get_builder(ip, ip)
+ if builder.type() is not Builder.TYPE_ACTIVE:
+ builder = None
+ except Exception:
+ builder = None
+ return builder
+
+
+class BuilderDispatcher(object):
+ def request(self, cmd_list):
+ # Authorize the builder, then pass the request
+ # to the correct builder object
+
+ builder = AuthedXMLRPCServer.get_authinfo()
+ if not builder:
+ cmd = Commands.PlgCommandError("Builder is not authorized")
+ return [cmd.serialize()]
+
+ cmds = Commands.deserialize_command_stream(cmd_list)
+ cmds_for_server = builder.request(cmds)
+ cmd_stream = Commands.serialize_to_command_stream(cmds_for_server)
+ return cmd_stream
-class BuilderManager:
+class BuilderServerThread(threading.Thread):
"""
- Tracks individual builder instances.
+ Object to serve active builder requests in a separate thread.
+ Can't block the main BuilderManager object by sitting in
+ serve_forever().
"""
+ def __init__(self, cfg, bm):
+ self._cfg = cfg
+ self._bm = bm
+ self._stopped = False
+ threading.Thread.__init__(self)
+ self.setName("BuilderServerThread")
+
+ hostname = cfg.get_str("General", "hostname")
+ port = cfg.get_int("Active Builders", "xmlrpc_server_port")
+ if cfg.get_bool("Builders", "use_ssl") == True:
+ certs = {}
+ certs['key_and_cert'] = cfg.get_str("SSL", "server_key_and_cert")
+ certs['ca_cert'] = cfg.get_str("SSL", "ca_cert")
+ certs['peer_ca_cert'] = cfg.get_str("SSL", "ca_cert")
+ self._server = AuthedSSLBuilderServer((hostname, port), certs, self._bm)
+ else:
+ self._server = AuthedBuilderServer((hostname, port), self._bm)
+ self._dispatcher = BuilderDispatcher()
+ self._server.register_instance(self._dispatcher)
+
+ def run(self):
+ DebugUtils.registerThreadName(self)
+ self._server.serve_forever()
+ self._stopped = True
+
+ def stop(self):
+ self._server.stop()
+ t = time.time()
+ while not self._stopped:
+ try:
+ if time.time() > t + 2:
+ break
+ except KeyboardInterrupt:
+ pass
+
+
+class BuilderManager:
+ """ Tracks individual builder instances. """
def __init__(self, cfg):
self._cfg = cfg
self._builders_lock = threading.Lock()
self._builders = []
- self.add_new_builders()
+ any_active = self._load_builders()
+ self._print_builders()
+
+ self._queue_lock = threading.Lock()
+ self._queue = []
+
+ self._xmlrpc_server = None
+ if any_active:
+ # Builder XMLRPC server
+ # Only start it when there are active-type builders listed
+ # in the config file
+ self._xmlrpc_server = BuilderServerThread(cfg, self)
+ self._xmlrpc_server.start()
+
+ # Builder HTTP fileserver
+ hostname = cfg.get_str("General", "hostname")
+ port = cfg.get_int("Active Builders", "file_server_port")
+ http_dir = os.path.join(cfg.get_str("Directories", "server_work_dir"), "srpm_http_dir")
+ certs = {}
+ if cfg.get_bool("Builders", "use_ssl"):
+ certs['key_and_cert'] = cfg.get_str("SSL", "server_key_and_cert")
+ certs['ca_cert'] = cfg.get_str("SSL", "ca_cert")
+ certs['peer_ca_cert'] = cfg.get_str("SSL", "ca_cert")
+ self._srpm_server = HTTPServer.PlgHTTPServerManager((hostname, port), http_dir, certs)
+ self._srpm_server.start()
+ def _print_builders(self):
# Print out builder list when starting up
- print "\nBuilders:"
+ print "\nAuthorized Builders:"
print "-" * 90
for builder in self._builders:
- string = " " + builder.address()
- string = string + " " * (40 - len(builder.address()))
+ (ip, addr) = builder.address()
+ string = " " + addr
+ string = string + " " * (40 - len(addr))
builder_dict = builder.to_dict()
for arch in builder_dict['arches']:
string = string + arch + " "
string = string + " " * (80 - len(string))
status = "unavailable"
- if builder_dict['alive']:
- status = "alive"
+ if builder_dict['available']:
+ status = "available"
string = string + status
del builder_dict
print string
print ""
- self._queue_lock = threading.Lock()
- self._queue = []
-
- self._have_work = False
-
- def __del__(self):
+ def stop(self):
for builder in self._builders:
builder.stop()
- time.sleep(2)
+ if self._xmlrpc_server:
+ self._xmlrpc_server.stop()
+ self._srpm_server.stop()
- def set_build_master(self, build_master):
- self._build_master = build_master
-
- def add_new_builders(self):
+ def _load_builders(self):
self._builders_lock.acquire()
-
- tmp_list = self._cfg.builders()
- prefix = "http://"
- if self._cfg.get_bool("Builders", "use_ssl") == True:
- prefix = "https://"
-
- builder_list = {}
- for addr in tmp_list.keys():
- new_addr = addr
- # Rewrite addresses to match current builder connection method
- if addr.startswith("http://"):
- new_addr = addr[7:]
- elif addr.startswith("https://"):
- new_addr = addr[8:]
- if new_addr:
- builder_list[prefix + new_addr] = tmp_list[addr]
-
- for address in builder_list.keys():
+ any_active = False
+ builders = self._cfg.builders()
+ for address in builders:
+ (weight, btype) = builders[address]
+ if btype == Builder.TYPE_ACTIVE:
+ any_active = True
# If the address is already in our _builders list, skip it
skip = False
for builder in self._builders:
- if address == builder.address():
+ (ip, addr) = builder.address()
+ if address == addr:
skip = True
+ break
if skip == True:
continue
# Add the builder to our build list
- weight = builder_list[address]
- builder = Builder.Builder(self, self._cfg, address, weight)
+ if btype == Builder.TYPE_ACTIVE:
+ builder = Builder.ActiveBuilder(self, self._cfg, address, weight, btype)
+ else:
+ builder = Builder.PassiveBuilder(self, self._cfg, address, weight, btype)
builder.start()
self._builders.append(builder)
-
self._builders_lock.release()
+ return any_active
def ping_suspended_builders(self):
self._builders_lock.acquire()
for builder in self._builders:
- if not builder.alive():
+ passive = (builder.type() == Builder.TYPE_PASSIVE)
+ if passive and not builder.alive():
builder.ping_asap()
self._builders_lock.release()
@@ -118,14 +257,22 @@
builder_list.append(builder.to_dict())
return builder_list
- def have_work(self, paused):
- avail = False
- for builder in self._builders:
- if builder.available():
- avail = True
- if not paused and len(self._queue) > 0 and avail:
- return True
- return self._have_work
+ def get_builder(self, cert_ip, con_ip):
+ self._builders_lock.acquire()
+ builder = None
+
+ # Ensure builder's certificate (if SSL) and
+ # the remote address of its connection are the same
+ if cert_ip == con_ip:
+ # Find matching builder in our authorized builders list
+ for b in self._builders:
+ (ip, addr) = b.address()
+ if cert_ip == ip:
+ builder = b
+ break
+
+ self._builders_lock.release()
+ return builder
def _builder_cmp_func(self, builder1, builder2):
# If both builders have at least one free slot, sort on
@@ -150,62 +297,23 @@
return -1
return 1
- def process(self, paused):
- self._have_work = False
-
- # Don't queue any new jobs if we are paused
- if paused:
- return
-
- # Deal with new arch jobs
+ def claim_arch_job(self, builder):
+ archjob = None
self._queue_lock.acquire()
- new_jobs = {}
for req in self._queue:
- parent = req['parent']
- stage = parent.cur_stage()
- if stage != 'building' and stage != 'waiting':
- self._queue.remove(req)
- continue
-
- # Find all free builders that could satisfy the request
- possible_builders = []
- for builder in self._builders:
- if builder.available() and builder.can_build_for_target(req['target_dict']):
- possible_builders.append(builder)
-
- # Sort builder list by free slots and weights
- possible_builders.sort(self._builder_cmp_func)
- possible_builders.reverse()
-
- for builder in possible_builders:
- try:
- job = builder.start_job(parent, req['target_dict'], req['srpm_url'])
- except RuntimeError, e:
- print "Builder (%s) couldn't start job %s because: '%s'" % (builder.address(),
- req['target_dict'], e)
- continue
-
- if not new_jobs.has_key(parent):
- new_jobs[parent] = []
- new_jobs[parent].append(job)
+ if builder.can_build_for_target(req['target_dict']):
self._queue.remove(req)
+ archjob = req
break
self._queue_lock.release()
-
- # Notify the parent jobs of their new archjobs. Have to do this outside _queue_lock
- # for locking reasons
- for parent in new_jobs.keys():
- for job in new_jobs[parent]:
- parent.add_arch_job(job)
-
- if len(self._queue) > 0:
- time.sleep(0.25)
+ return archjob
def request_arch_job(self, par_job, target_dict, srpm_url, orphaned):
req = {}
req['parent'] = par_job
req['target_dict'] = target_dict
req['srpm_url'] = srpm_url
+ req['time_queued'] = time.time()
self._queue_lock.acquire()
if orphaned:
Index: Config.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/Config.py,v
retrieving revision 1.16
retrieving revision 1.17
diff -u -r1.16 -r1.17
--- Config.py 20 Mar 2006 18:28:37 -0000 1.16
+++ Config.py 28 Apr 2006 03:17:41 -0000 1.17
@@ -18,6 +18,7 @@
import fnmatch
from ConfigParser import ConfigParser
from plague import BaseConfig
+import Builder
def make_target_string(distro, target, repo):
@@ -97,11 +98,21 @@
except InvalidTargetException, e:
print "Error: could not add target %s because: %s" % (f, e)
- def _load_builders(self):
- if not self._config.has_section("Builders"):
+ def _get_builders_of_type(self, btype):
+ if btype == Builder.TYPE_PASSIVE:
+ section = "Passive Builders"
+ elif btype == Builder.TYPE_ACTIVE:
+ section = "Active Builders"
+ else:
+ raise Exception("Unknown builder type %d" % btype)
+
+ if not self._config.has_section(section):
return {}
- items = self._config.items("Builders")
- builder_list = {}
+ prefix = "http://"
+ if self.get_bool("Builders", "use_ssl") == True:
+ prefix = "https://"
+ items = self._config.items(section)
+ list = {}
for (tag, builder) in items:
if not tag.startswith("builder"):
continue
@@ -116,8 +127,26 @@
weight = int(weight_str)
except ValueError:
weight = 0
- builder_list[addr] = weight
- return builder_list
+ # Rewrite addresses to match current builder connection method
+ new_addr = addr
+ if addr.startswith("http://"):
+ new_addr = addr[7:]
+ elif addr.startswith("https://"):
+ new_addr = addr[8:]
+ if new_addr:
+ list[prefix + new_addr] = (weight, btype)
+ return list
+
+ def _load_builders(self):
+ passive_builders = self._get_builders_of_type(Builder.TYPE_PASSIVE)
+ active_builders = self._get_builders_of_type(Builder.TYPE_ACTIVE)
+
+ builders = {}
+ for key in active_builders.keys():
+ builders[key] = active_builders[key]
+ for key in passive_builders.keys():
+ builders[key] = passive_builders[key]
+ return builders
def save_default_config(self, filename=None):
self.add_section("General")
@@ -139,8 +168,15 @@
self.add_section("Builders")
self.set_option("Builders", "use_ssl", "yes")
- self.set_option("Builders", "builder1", "20 127.0.0.1:8888")
- self.set_option("Builders", "builder2", "0 127.0.0.2:8888")
+
+ self.add_section("Active Builders")
+ self.set_option("Active Builders", "xmlrpc_server_port", "8889")
+ self.set_option("Active Builders", "file_server_port", "8890")
+ self.set_option("Active Builders", "builder1", "20 127.0.0.1")
+ self.set_option("Active Builders", "builder2", "0 127.0.0.2")
+
+ self.add_section("Passive Builders")
+ self.set_option("Passive Builders", "builder1", "20 127.0.0.1:8888")
self.add_section("SSL")
self.set_option("SSL", "server_key_and_cert", "/etc/plague/server/certs/server_key_and_cert.pem")
@@ -152,7 +188,7 @@
self.add_section("UI")
self.set_option("UI", "use_ssl", "yes")
self.set_option("UI", "client_ca_cert", "/etc/plague/server/certs/ui_ca_cert.pem")
- self.set_option("UI", "port", "8887")
+ self.set_option("UI", "port", "8888")
self.set_option("UI", "guest_allowed", "yes")
self.set_option("UI", "log_url", "http://www.foo.com/logs/")
Index: main.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/main.py,v
retrieving revision 1.20
retrieving revision 1.21
diff -u -r1.20 -r1.21
--- main.py 13 Mar 2006 13:05:38 -0000 1.20
+++ main.py 28 Apr 2006 03:17:41 -0000 1.21
@@ -124,9 +124,8 @@
dbm = DBManager.DBManager(cfg)
- builder_manager = BuilderManager.BuilderManager(cfg)
-
# Create the BuildMaster thread
+ builder_manager = BuilderManager.BuilderManager(cfg)
bm = BuildMaster.BuildMaster(builder_manager, dbm, cfg)
bm.start()
@@ -148,20 +147,8 @@
if e[0] == 98: # Address already in use
print "Error: couldn't bind to address '%s:%s'. Is the server already running?" % (hostname, port)
os._exit(1)
-
bm_server.register_instance(ui)
- # SRPM fileserver
- SRPM_SERVER_PORT = 8886
- http_dir = os.path.join(cfg.get_str("Directories", "server_work_dir"), "srpm_http_dir")
- srpm_server_certs = {}
- if cfg.get_bool("Builders", "use_ssl"):
- srpm_server_certs['key_and_cert'] = cfg.get_str("SSL", "server_key_and_cert")
- srpm_server_certs['ca_cert'] = cfg.get_str("SSL", "ca_cert")
- srpm_server_certs['peer_ca_cert'] = cfg.get_str("SSL", "ca_cert")
- srpm_server = HTTPServer.PlgHTTPServerManager((hostname, SRPM_SERVER_PORT), http_dir, srpm_server_certs)
- srpm_server.start()
-
# Create dummy thread just to register main thread's name
dummy = threading.Thread()
dummy.setName("MainThread")
@@ -182,9 +169,9 @@
# Make sure the BuildMaster thread shuts down
print "Shutting down..."
bm.stop()
- srpm_server.stop()
if use_tbs:
tbs.stop()
+ builder_manager.stop()
if opts.pidfile:
os.unlink(opts.pidfile)
- Previous message: extras-buildsys/common Commands.py, NONE, 1.1 HTTPServer.py, 1.12, 1.13 Makefile, 1.11, 1.12 URLopener.py, 1.1, 1.2
- Next message: owners owners.list,1.910,1.911
- Messages sorted by:
[ date ]
[ thread ]
[ subject ]
[ author ]
More information about the scm-commits
mailing list