extras-buildsys/server ArchJob.py, 1.21, 1.22 BuildMaster.py, 1.35, 1.36 Builder.py, 1.23, 1.24 BuilderManager.py, 1.18, 1.19 Config.py, 1.10, 1.11 PackageJob.py, 1.36, 1.37 Repo.py, 1.18, 1.19
Daniel Williams (dcbw)
fedora-extras-commits at redhat.com
Sat Nov 26 06:10:25 UTC 2005
Author: dcbw
Update of /cvs/fedora/extras-buildsys/server
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv17265/server
Modified Files:
ArchJob.py BuildMaster.py Builder.py BuilderManager.py
Config.py PackageJob.py Repo.py
Log Message:
2005-11-26 Dan Williams <dcbw at redhat.com>
* First cut of depsolving code. You need to add a
'mock_configs_dir' option in your plague-server.cfg
in the Directories section that should usually point
to /etc/mock
Index: ArchJob.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/ArchJob.py,v
retrieving revision 1.21
retrieving revision 1.22
diff -u -r1.21 -r1.22
--- ArchJob.py 18 Nov 2005 14:33:59 -0000 1.21
+++ ArchJob.py 26 Nov 2005 06:10:22 -0000 1.22
@@ -40,10 +40,11 @@
def __init__(self, builder, cfg, server, par_job, jobid, target_dict):
self.par_job = par_job
self.builder = builder
+ self._repo = par_job.repo()
self._server = server
self._use_ssl = cfg.get_bool("Builders", "use_ssl")
self.jobid = jobid
- self._status = 'running'
+ self._status = 'starting'
self._builder_status = ''
self._failure_noticed = False
self._download_failed = False
@@ -56,6 +57,7 @@
self._die = False
self._die_user_requested = False
self._die_lock = threading.Lock()
+ self._prepping = False
# SSL certificate and key filenames
if self._use_ssl:
@@ -85,10 +87,8 @@
def download_failed(self):
return self._download_failed
- def builder_prepping(self):
- if self._builder_status == 'prepping':
- return True
- return False
+ def prepping(self):
+ return self._prepping
def arch(self):
return self._target_dict['arch']
@@ -159,13 +159,32 @@
return True
return False
- def _status_running(self):
+ def _status_starting(self):
# Builders pause before they enter the 'prep' state (which accesses
# the repo for this target), and wait for the server to allow them
# to proceed when the repo is unlocked.
if self._builder_status == 'downloaded':
- if not self.par_job.repo.locked():
- self._send_repo_unlocked()
+ self._set_status('repo_wait')
+ self._repo.request_unlock(self)
+
+ def _status_repo_wait(self):
+ pass
+
+ def repo_unlocked_callback(self):
+ if self._status == 'repo_wait':
+ self._set_status('repo_unlock')
+
+ def _status_repo_unlock(self):
+ # Builder will be in 'downloaded' state until
+ # it notices that the repo has been unlocked
+ self._send_repo_unlocked()
+ self._prepping = True
+ if self._builder_status != 'downloaded':
+ self._set_status('running')
+
+ def _status_running(self):
+ if self._builder_status != 'prepping':
+ self._prepping = False
# if the builder is done, grab list of files to download
if self._builder_finished():
@@ -276,6 +295,8 @@
self._die_lock.release()
if should_die:
self._server.die(self.jobid)
+ if self._status == 'repo_wait':
+ self._repo.cancel_unlock_request(self)
self._set_status('done')
if user_requested:
print "%s (%s/%s): %s - killed." % (self.par_job.uid, self.par_job.package,
@@ -317,7 +338,7 @@
def die(self, user_requested=False):
# Can be called from other threads
- if self._status == 'running':
+ if self._status == 'running' or self._status == 'repo_wait' or self._status == 'starting' or self._status == 'repo_unlock':
self._die_lock.acquire()
self._die = True
self._die_user_requested = user_requested
Index: BuildMaster.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/BuildMaster.py,v
retrieving revision 1.35
retrieving revision 1.36
diff -u -r1.35 -r1.36
--- BuildMaster.py 9 Sep 2005 15:10:17 -0000 1.35
+++ BuildMaster.py 26 Nov 2005 06:10:22 -0000 1.36
@@ -214,8 +214,8 @@
self._status_updates_lock.release()
attrdict = {}
- attrdict['status'] = job.get_cur_stage()
- attrdict['result'] = job.get_result()
+ attrdict['status'] = job.cur_stage()
+ attrdict['result'] = job.result()
self._write_job_status_to_db(uid, attrdict)
# Update job end time
Index: Builder.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/Builder.py,v
retrieving revision 1.23
retrieving revision 1.24
diff -u -r1.23 -r1.24
--- Builder.py 25 Nov 2005 04:45:12 -0000 1.23
+++ Builder.py 26 Nov 2005 06:10:22 -0000 1.24
@@ -57,13 +57,12 @@
self._when_died = 0
self._server_cfg = cfg
- certs = {}
+ certs = None
if self._server_cfg.get_bool("Builders", "use_ssl"):
+ certs = {}
certs['key_and_cert'] = self._server_cfg.get_str("SSL", "server_key_and_cert")
certs['ca_cert'] = self._server_cfg.get_str("SSL", "ca_cert")
certs['peer_ca_cert'] = self._server_cfg.get_str("SSL", "ca_cert")
- else:
- certs = None
self._server = XMLRPCServerProxy.PlgXMLRPCServerProxy(self._address, certs, timeout=20)
self._server_lock = threading.Lock()
@@ -191,17 +190,14 @@
def _update_building_jobs(self):
jobs = self._building_jobs()
- # Update the current job's status
+ # Update status for all jobs on this builder
if self._unavail_count == 0:
- self._prepping_jobs = False
builder_jobs = []
for jobid in jobs.keys():
try:
job = self._jobs[jobid]
status = jobs[jobid]
job.set_builder_job_status(status)
- if status == 'prepping':
- self._prepping_jobs = True
builder_jobs.append(jobid)
except KeyError:
pass
@@ -211,6 +207,7 @@
# may have finished on the builder and was
# removed from the building job list before we
# were able to know that it was done. HACK
+ self._prepping_jobs = False
for jobid in self._jobs.keys():
# If the builder didn't report this job as building,
# and its not done, explicitly grab its status
@@ -220,6 +217,10 @@
if status:
job.set_builder_job_status(status)
+ # Check for prepping jobs
+ if job.prepping():
+ self._prepping_jobs = True
+
def _get_builder_job_status(self, jobid):
""" Get the status of one job on the builder """
status = None
Index: BuilderManager.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/BuilderManager.py,v
retrieving revision 1.18
retrieving revision 1.19
diff -u -r1.18 -r1.19
--- BuilderManager.py 14 Sep 2005 11:53:42 -0000 1.18
+++ BuilderManager.py 26 Nov 2005 06:10:22 -0000 1.19
@@ -148,8 +148,8 @@
new_jobs = {}
for req in self._queue:
parent = req['parent']
- stage = parent.get_cur_stage()
- if (stage != 'prep') and (stage != 'building') and (stage != 'waiting'):
+ stage = parent.cur_stage()
+ if stage != 'building' and stage != 'waiting':
self._queue.remove(req)
continue
@@ -170,13 +170,13 @@
try:
job = builder.start_job(parent, req['target_dict'], req['srpm_url'])
except RuntimeError:
- pass
- else:
- if not new_jobs.has_key(parent):
- new_jobs[parent] = []
- new_jobs[parent].append(job)
- self._queue.remove(req)
- break
+ continue
+
+ if not new_jobs.has_key(parent):
+ new_jobs[parent] = []
+ new_jobs[parent].append(job)
+ self._queue.remove(req)
+ break
self._queue_lock.release()
# Notify the parent jobs of their new archjobs. Have to do this outside _queue_lock
Index: Config.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/Config.py,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -r1.10 -r1.11
--- Config.py 14 Nov 2005 04:56:02 -0000 1.10
+++ Config.py 26 Nov 2005 06:10:22 -0000 1.11
@@ -101,6 +101,7 @@
self.set_option("Directories", "repo_dir", "/repodir")
self.set_option("Directories", "tmpdir", "/tmp")
self.set_option("Directories", "target_configs_dir", "/etc/plague/targets")
+ self.set_option("Directories", "mock_configs_dir", "/etc/mock")
self.add_section("Email")
self.set_option("Email", "email_from", "buildsys at foo.com")
@@ -144,10 +145,8 @@
self.set_option("mysql Engine", "user", "plague")
self.set_option("mysql Engine", "password", "")
-
self.save()
-
class TargetConfig(BaseConfig.BaseConfig):
def __init__(self, cfg, filename):
BaseConfig.BaseConfig.__init__(self, filename)
@@ -159,18 +158,37 @@
self._parent_cfg = cfg
self._distro = self.get_str("General", "distro")
self._target = self.get_str("General", "target")
- self._base_arches = self.get_str("Arches", "base_arches")
+ self._base_arches = self.get_list("Arches", "base_arches")
+ self._opt_arches = self.get_list("Arches", "optional_arches")
self._repo = self.get_str("General", "repo")
self._testing = self.get_bool("General", "testing")
+ self._mock_configs = self._find_mock_configs()
+
+ def _find_mock_configs(self):
+ mock_configs = {}
+ mock_config_dir = self._parent_cfg.get_str("Directories", "mock_configs_dir")
+ for arch in self._base_arches:
+ mock_config_file = "%s-%s-%s-%s.cfg" % (self._distro, self._target, arch, self._repo)
+ f = os.path.join(mock_config_dir, mock_config_file)
+ if not os.path.exists(f) or not os.access(f, os.R_OK):
+ print """%s: Could not find mock config file for %s. Each base_archs arch
+must have a matching mock config in mock_configs_dir for this target.""" % (self._filename, mock_config_file)
+ os._exit(0)
+ mock_configs[arch] = f
+ return mock_configs
+
def parent_cfg(self):
return self._parent_cfg
- def target_dict(self):
+ def mock_config_for_arch(self, arch):
+ return self._mock_configs[arch]
+
+ def target_dict(self, arch=None):
target_dict = {}
target_dict['distro'] = self._distro
target_dict['target'] = self._target
- target_dict['arch'] = None # meaningless for a server-side target
+ target_dict['arch'] = arch
target_dict['repo'] = self._repo
return target_dict
@@ -186,6 +204,9 @@
def basearches(self):
return self._base_arches
+ def optarches(self):
+ return self._base_arches
+
def repo(self):
return self._repo
Index: PackageJob.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/PackageJob.py,v
retrieving revision 1.36
retrieving revision 1.37
diff -u -r1.36 -r1.37
--- PackageJob.py 15 Nov 2005 05:20:54 -0000 1.36
+++ PackageJob.py 26 Nov 2005 06:10:22 -0000 1.37
@@ -27,13 +27,17 @@
import copy
import string
import EmailUtils
-import SimpleXMLRPCServer
import xmlrpclib
import socket
import BuilderManager
import ArchJob
from plague import ArchUtils
+import yum
+import repomd.mdErrors
+from yum.logger import Logger
+
+
CVS_CMD = "/usr/bin/cvs"
MAKE_CMD = "/usr/bin/make"
@@ -45,12 +49,9 @@
def log(stuff=''):
print stuff
-class PrepError(exceptions.Exception):
- def __init__(self, args=None):
- exceptions.Exception.__init__(self)
- self.args = args
- def __str__(self):
- return self.args
+class PrepError(exceptions.Exception): pass
+
+class DepError(exceptions.Exception): pass
class BuildError(exceptions.Exception):
def __init__(self, msg, arch):
@@ -78,13 +79,28 @@
threading.Thread.__init__(self)
def run(self):
- while not self._pkg_job.is_done() and not self._pkg_job.get_cur_stage() == self._end_stage:
+ while not self._pkg_job.is_done() and not self._pkg_job.cur_stage() == self._end_stage:
self._pkg_job.process()
def is_package_job_stage_valid(stage):
""" Validate a job stage """
- stages = ['initialize', 'checkout_wait', 'checkout_wait_done', 'checkout', 'make_srpm', 'prep', 'waiting', 'building', 'build_done', 'add_to_repo', 'repodone', 'needsign', 'failed', 'finished']
+ stages = ['initialize',
+ 'checkout_wait',
+ 'checkout_wait_done',
+ 'checkout',
+ 'make_srpm',
+ 'prep',
+ 'depsolve',
+ 'depsolve_wait',
+ 'waiting',
+ 'building',
+ 'build_done',
+ 'add_to_repo',
+ 'repodone',
+ 'needsign',
+ 'failed',
+ 'finished']
if stage in stages:
return True
return False
@@ -110,8 +126,8 @@
""" Controller object for building 1 SRPM on multiple arches """
def __init__(self, uid, username, package, source, repo, buildmaster):
- self.curstage = ''
- self.result = 'in-progress'
+ self._curstage = ''
+ self._result = 'in-progress'
self.bm = buildmaster
self.uid = uid
self.package = package
@@ -123,7 +139,7 @@
self._target_cfg = repo.target_cfg()
self._server_cfg = self._target_cfg.parent_cfg()
- self.repo = repo
+ self._repo = repo
self._target_str = self._target_cfg.target_string()
self._target_dict = self._target_cfg.target_dict()
@@ -132,9 +148,11 @@
self.endtime = 0
self.use_cvs = self._server_cfg.get_bool("CVS", "use_cvs")
self._source = source
- self.result_dir = None
- self.srpm_path = None
- self.srpm_http_path = None
+ self._result_dir = None
+ self._srpm_path = None
+ self._srpm_http_path = None
+ self._depsolve_dir = None
+ self._last_depsolve_error = None
self.repofiles = {}
self.archjobs = {}
self._archjobs_lock = threading.Lock()
@@ -151,22 +169,25 @@
pjc = PackageJobController(self, first_stage, 'waiting')
pjc.start()
- def get_cur_stage(self):
- return self.curstage
+ def cur_stage(self):
+ return self._curstage
+
+ def result(self):
+ return self._result
- def get_result(self):
- return self.result
+ def repo(self):
+ return self._repo
def _set_cur_stage(self, stage, result_msg=None):
""" Update our internal job stage, and notify the BuildMaster that
we've changed as well.
"""
- oldstage = self.curstage
- self.curstage = stage
+ oldstage = self._curstage
+ self._curstage = stage
if oldstage != stage:
attrdict = {}
attrdict['status'] = copy.copy(stage)
- attrdict['result'] = copy.copy(self.result)
+ attrdict['result'] = copy.copy(self._result)
if self.name and self.epoch and self.ver and self.release:
attrdict['epoch'] = self.epoch
attrdict['version'] = self.ver
@@ -186,8 +207,8 @@
except KeyError:
pass
- base_arches = self._target_cfg.get_list("Arches", "base_arches")
- opt_arches = self._target_cfg.get_list("Arches", "optional_arches")
+ base_arches = self._target_cfg.basearches()
+ opt_arches = self._target_cfg.optarches()
# Remove arches we don't support from addl_arches
for arch in addl_arches:
@@ -314,7 +335,7 @@
if len(cvs_alias) > 0:
cvs_target = cvs_alias
- self.srpm_path = None
+ self._srpm_path = None
srpm_dir = os.path.join(self.checkout_tmpdir, self.package, cvs_target)
if not os.path.exists(srpm_dir):
msg = "Error: could not find path %s for %s." % (srpm_dir, self._source)
@@ -345,35 +366,24 @@
msg = "Error: could not find srpm for %s - output was:\n\n%s" % (self._source, o)
raise PrepError(msg)
- self.srpm_path = srpmpath
+ self._srpm_path = srpmpath
self._set_cur_stage('prep')
return False
- def _make_stage_dir(self, rootdir):
- # The dir will look like this:
- # <rootdir>/devel/95-foo-1.1.0-23
- pkgsubdir = '%d-%s-%s-%s' % (self.uid, self.name, self.ver, self.release)
- stage_dir = os.path.join(rootdir, self._target_str, pkgsubdir)
- if os.path.exists(stage_dir):
- shutil.rmtree(stage_dir, ignore_errors=True)
- os.makedirs(stage_dir)
- return stage_dir
-
def _stage_prep(self):
-
# In SRPM-only mode, cvs_tag is path to the SRPM to build
if self.use_cvs == False:
- self.srpm_path = self._source
+ self._srpm_path = self._source
# fail the job if we can't access the SRPM. Can happen during
# requeue of jobs when restarting the server.
- if not os.path.exists(self.srpm_path) or not os.access(self.srpm_path, os.R_OK):
- msg = "Could not access SRPM located at %s during prep stage." % self.srpm_path
+ if not os.path.exists(self._srpm_path) or not os.access(self._srpm_path, os.R_OK):
+ msg = "Could not access SRPM located at %s during prep stage." % self._srpm_path
raise PrepError(msg)
ts = rpmUtils.transaction.initReadOnlyTransaction()
- hdr = rpmUtils.miscutils.hdrFromPackage(ts, self.srpm_path)
+ hdr = rpmUtils.miscutils.hdrFromPackage(ts, self._srpm_path)
self.name = hdr['name']
self.epoch = hdr['epoch']
if not self.epoch:
@@ -391,42 +401,196 @@
""" % (self._source, pkg_arches, allowed_arches)
raise PrepError(msg)
+ self._set_cur_stage('depsolve_wait')
+ log("%s (%s): Requesting depsolve..." % (self.uid, self.package))
+ self._repo.request_depsolve(self, first_try=True)
+ return True # sleep until the Repo wakes us up for depsolve
+
+ def _write_yum_conf(self, arch):
+ # Figure out which mock config file it is, and write out it's yum.conf
+ try:
+ mock_config = self._target_cfg.mock_config_for_arch(arch)
+ except KeyError:
+ return None
+
+ config_opts = {}
+ execfile(mock_config)
+ config_lines = []
+ job_yum_dir = os.path.join(self._depsolve_dir, arch)
+ for line in config_opts['yum.conf'].split('\n'):
+ if string.find(line, "cachedir=") >= 0:
+ line = "cachedir=%s" % os.path.join(job_yum_dir, "cache")
+ elif string.find(line, "logfile=") >= 0:
+ line = "logfile=%s" % os.path.join(job_yum_dir, "yum.log")
+ config_lines.append(line+'\n')
+ del config_opts
+
+ yum_config = os.path.join(job_yum_dir, "yum.conf")
+ if not os.path.exists(job_yum_dir):
+ os.makedirs(job_yum_dir)
+ f = open(yum_config, "w")
+ f.writelines(config_lines)
+ f.close()
+
+ return yum_config
+
+ def _arch_deps_solved(self, arch):
+ # Returns: False if dep errors
+ # True if all deps are solved
+
+ success = True
+ try:
+ base = yum.YumBase()
+ yum_config = self._write_yum_conf(arch)
+ base.doConfigSetup(fn=yum_config, root=os.path.dirname(yum_config))
+ threshold = 0
+ if DEBUG:
+ threshold = 5
+ base.log = Logger(threshold=threshold, file_object=sys.stdout)
+ try:
+ base.doRepoSetup()
+ except yum.Errors.RepoError, e:
+ raise DepError(str(e))
+
+ archlist = ['src', 'noarch']
+ if ArchUtils.supported_arches.has_key(arch):
+ archlist.extend(ArchUtils.supported_arches[arch])
+ else:
+ raise DepError("WARNING: arch %s was not in ArchUtils' supported_arches." % arch)
+
+ ts = rpmUtils.transaction.initReadOnlyTransaction()
+ try:
+ srpm = yum.packages.YumLocalPackage(ts, self._srpm_path)
+ except yum.Errors.MiscError, e:
+ del ts
+ raise DepError(str(e))
+ del ts
+
+ try:
+ base.doSackSetup(archlist)
+ except yum.Errors.RepoError, e:
+ raise DepError(str(e))
+
+ for dep in srpm.requiresList():
+ if dep.startswith("rpmlib("): continue
+ try:
+ pkg = base.returnPackageByDep(dep)
+ except repomd.mdErrors.PackageSackError, e:
+ raise DepError(str(e))
+ except yum.Errors.YumBaseError, e:
+ raise DepError(str(e))
+ except DepError, e:
+ self._last_depsolve_error = str(e)
+ print "%s (%s/%s): Depsolve Error: %s" % (self.uid, self.package, arch, str(e))
+ success = False
+
+ del base, srpm
+ return success
+
+ def start_depsolve(self):
+ self._set_cur_stage('depsolve')
+ self.wake()
+
+ def _stage_depsolve(self):
+ """ Depsolve all arches, only if all pass do we proceed """
+ """ to queue up the actual archjobs for building """
+
+ # If the job's waited more than 8 hours for deps to be
+ # solved, kill the job and make some human figure it out
+ if time.time() > self.starttime + (60 * 60 * 8):
+ self._repo.notify_depsolve_done(self)
+ shutil.rmtree(self._depsolve_dir, ignore_errors=True)
+ raise DepError("The job's build dependencies couldn't be solved within 8 hours. Last error: " % self._last_depsolve_error)
+
+ # Create the depsolve metadata cache dir
+ if not self._depsolve_dir:
+ server_work_dir = self._server_cfg.get_str("Directories", "server_work_dir")
+ self._depsolve_dir = os.path.join(server_work_dir, "depsolve", "%s-%s" % (self.uid, self.name))
+ if os.path.exists(self._depsolve_dir):
+ shutil.rmtree(self._depsolve_dir, ignore_errors=True)
+ os.makedirs(self._depsolve_dir)
+
+ self._archjobs_lock.acquire()
+ unsolved_deps = False
+ archlist = self.archjobs.keys()
+
+ # noarch jobs are a bit special here. Since we don't know
+ # what arch they will actually build on, we have to make
+ # sure all arches the target supports will work
+ if len(self.archjobs.keys()) == 1 and self.archjobs.keys()[0] == 'noarch':
+ archlist = self._target_cfg.basearches()
+ for arch in self._target_cfg.optarches():
+ if arch not in archlist:
+ archlist.append(arch)
+
+ log("%s (%s): Starting depsolve for arches: %s." % (self.uid, self.package, archlist))
+
+ for arch in archlist:
+ if self._arch_deps_solved(arch) == False:
+ unsolved_deps = True
+ break
+
+ self._archjobs_lock.release()
+
+ self._repo.notify_depsolve_done(self)
+
+ if unsolved_deps == True:
+ # Go to sleep until the repo changes
+ self._set_cur_stage('depsolve_wait')
+ self._repo.request_depsolve(self, first_try=False)
+ log("%s (%s): Finished depsolve (unsuccessful), trying again later." % (self.uid, self.package))
+ return True
+
+ log("%s (%s): Finished depsolve (successful), requesting archjobs." % (self.uid, self.package))
+
+ # Success, all deps are solved. Kill the depsolve dir
+ shutil.rmtree(self._depsolve_dir, ignore_errors=True)
+ self._depsolve_dir = None
+
+ # Queue up the archjobs
work_dir = self._server_cfg.get_str("Directories", "server_work_dir")
- self.result_dir = self._make_stage_dir(work_dir)
+ self._result_dir = self._make_stage_dir(work_dir)
for arch in self.archjobs.keys():
- thisdir = os.path.join(self.result_dir, arch)
+ thisdir = os.path.join(self._result_dir, arch)
if not os.path.exists(thisdir):
os.makedirs(thisdir)
# Copy SRPM to where the builder can access it
http_pkg_path = self._make_stage_dir(self.http_dir)
- self.srpm_http_path = os.path.join(http_pkg_path, os.path.basename(self.srpm_path))
- shutil.copy(self.srpm_path, self.srpm_http_path)
- self.srpm_path = None
+ self._srpm_http_path = os.path.join(http_pkg_path, os.path.basename(self._srpm_path))
+ shutil.copy(self._srpm_path, self._srpm_http_path)
+ self._srpm_path = None
# Remove CVS checkout and make_srpm dirs
if self.use_cvs == True:
shutil.rmtree(self.checkout_tmpdir, ignore_errors=True)
- self._request_arch_jobs()
self._set_cur_stage('waiting')
+ self._request_arch_jobs()
return False
+ def _stage_depsolve_wait(self):
+ pass
+
+ def _make_stage_dir(self, rootdir):
+ # The dir will look like this:
+ # <rootdir>/devel/95-foo-1.1.0-23
+ pkgsubdir = '%d-%s-%s-%s' % (self.uid, self.name, self.ver, self.release)
+ stage_dir = os.path.join(rootdir, self._target_str, pkgsubdir)
+ if os.path.exists(stage_dir):
+ shutil.rmtree(stage_dir, ignore_errors=True)
+ os.makedirs(stage_dir)
+ return stage_dir
+
def _request_one_arch_job(self, arch, orphaned):
# Construct SPRM URL
- srpm_http_base = self.srpm_http_path[len(self.http_dir):]
- use_ssl = self._server_cfg.get_bool("Builders", "use_ssl")
- if use_ssl == True:
+ srpm_http_base = self._srpm_http_path[len(self.http_dir):]
+ method = "http://"
+ if self._server_cfg.get_bool("Builders", "use_ssl") == True:
method = "https://"
- else:
- method = "http://"
hostname = self._server_cfg.get_str("General", "hostname")
srpm_url = method + hostname + ":8886/" + srpm_http_base
- target_dict = {}
- target_dict['distro'] = self._target_cfg.get_str("General", "distro")
- target_dict['target'] = self._target_cfg.get_str("General", "target")
- target_dict['arch'] = arch
- target_dict['repo'] = self._target_cfg.get_str("General", "repo")
+ target_dict = self._target_cfg.target_dict(arch)
self.bm.builder_manager.request_arch_job(self, target_dict, srpm_url, orphaned)
def _request_arch_jobs(self):
@@ -449,7 +613,7 @@
# If this is the first archjob, that means we are now building.
# So we start up the second PackageJobController thread.
- if self.curstage == 'waiting':
+ if self._curstage == 'waiting':
t = PackageJobController(self, 'building', None)
t.start()
@@ -466,7 +630,7 @@
self._archjobs_lock.release()
def is_done(self):
- if self.curstage == 'needsign' or self.curstage == 'failed' or self.curstage == 'finished':
+ if self._curstage == 'needsign' or self._curstage == 'failed' or self._curstage == 'finished':
return True
return False
@@ -476,7 +640,7 @@
log("%s (%s): Job kill request from %s" % (self.uid, self.package, username))
self._archjobs_lock.acquire()
- if self.curstage == 'waiting':
+ if self._curstage == 'waiting':
# In 'waiting' stage, we have no controller thread. So to get
# the job killed immediately, we have to start one
t = PackageJobController(self, 'killed', None)
@@ -488,7 +652,7 @@
def _handle_death(self):
resultstring = "%s (%s): Build on target %s was killed by %s." % (self.uid, self.name, self._target_str, self._killer)
- self.result = 'killed'
+ self._result = 'killed'
self._set_cur_stage('finished', resultstring)
self.email_result(self.username, resultstring)
log(resultstring)
@@ -523,7 +687,7 @@
return
try:
- func = getattr(self, "_stage_%s" % self.curstage)
+ func = getattr(self, "_stage_%s" % self._curstage)
if func():
# Wait to be woken up when long-running operations complete
while not self._event.isSet():
@@ -532,9 +696,15 @@
except PrepError, e:
if self.use_cvs == True:
shutil.rmtree(self.checkout_tmpdir, ignore_errors=True)
+ msg = str(e)
subj = 'Prep Error (Job %s): %s on %s' % (self.uid, self._source, self._target_str)
- self.email_result(self.username, resultstring=e.args, subject=subj)
- self._stage_failed(e.args)
+ self.email_result(self.username, resultstring=msg, subject=subj)
+ self._stage_failed(msg)
+ except DepError, e:
+ msg = str(e)
+ subj = 'Dependencies Error (Job %s): %s on %s' % (self.uid, self._source, self._target_str)
+ self.email_result(self.username, resultstring=msg, subject=subj)
+ self._stage_failed(msg)
except BuildError, e:
subj = 'Build Error (Job %s): %s on %s' % (self.uid, self._source, self._target_str)
base_url = self._server_cfg.get_str("UI", "log_url")
@@ -597,25 +767,25 @@
return True
def get_stage_dir(self):
- return self.result_dir
+ return self._result_dir
def _stage_failed(self, msg=None):
- self.result = 'failed'
+ self._result = 'failed'
self._set_cur_stage('failed', msg)
self.endtime = time.time()
self._cleanup_job_files()
self.bm.notify_job_done(self)
def _cleanup_job_files(self):
- if not self.result_dir or not self.srpm_http_path:
+ if not self._result_dir or not self._srpm_http_path:
return
# If its a testing target, we keep the RPMs around since they don't
# get copied to the repository, they only live in the repodir
- if self.result == 'success' and self._target_cfg.testing() == True:
+ if self._result == 'success' and self._target_cfg.testing() == True:
return
- srpm_file = os.path.join(self.result_dir, os.path.basename(self.srpm_http_path))
+ srpm_file = os.path.join(self._result_dir, os.path.basename(self._srpm_http_path))
# Delete any RPMs in the arch dirs
for job in self.archjobs.values():
@@ -624,7 +794,7 @@
for f in job.get_files():
if not f.endswith(".rpm"):
continue
- src_file = os.path.join(self.result_dir, job.arch(), f)
+ src_file = os.path.join(self._result_dir, job.arch(), f)
if src_file.endswith(".src.rpm"):
# Keep an SRPM. We prefer built SRPMs from builders over
# the original SRPM.
@@ -634,10 +804,10 @@
# If there were no builder-built SRPMs, keep the original around
if not os.path.exists(srpm_file):
- shutil.copy(self.srpm_http_path, srpm_file)
+ shutil.copy(self._srpm_http_path, srpm_file)
# Delete the SRPM in the server's HTTP dir
- shutil.rmtree(os.path.dirname(self.srpm_http_path), ignore_errors=True)
+ shutil.rmtree(os.path.dirname(self._srpm_http_path), ignore_errors=True)
def _stage_add_to_repo(self):
# Create a list of files that the repo should copy to
@@ -650,7 +820,7 @@
if not f.endswith(".rpm"):
continue
jobarch = job.arch()
- src_file = os.path.join(self.result_dir, jobarch, f)
+ src_file = os.path.join(self._result_dir, jobarch, f)
verrel = "%s-%s" % (self.ver, self.release)
if f.endswith(".src.rpm"):
dst_path = os.path.join(repo_dir, self._target_str, self.name, verrel, "SRPM")
@@ -664,7 +834,7 @@
# list from this object directly when the copy operation
# happens
if len(self.repofiles):
- self.repo.request_copy(self)
+ self._repo.request_copy(self)
self.endtime = time.time()
return True
@@ -678,7 +848,7 @@
def _stage_repodone(self):
resultstring = " %s (%s): Build on target %s succeeded." % (self.uid, self.name, self._target_str)
- self.result = 'success'
+ self._result = 'success'
self._set_cur_stage('needsign', resultstring)
self._cleanup_job_files()
Index: Repo.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/Repo.py,v
retrieving revision 1.18
retrieving revision 1.19
diff -u -r1.18 -r1.19
--- Repo.py 1 Nov 2005 14:46:18 -0000 1.18
+++ Repo.py 26 Nov 2005 06:10:22 -0000 1.19
@@ -28,6 +28,8 @@
class Repo(threading.Thread):
""" Represents an on-disk repository of RPMs and manages updates to the repo. """
+ MAX_DEPSOLVE_JOBS = 4
+
def __init__(self, target_cfg, repodir, builder_manager):
self._builder_manager = builder_manager
self._target_cfg = target_cfg
@@ -36,15 +38,19 @@
print "Error: Repository directory '%s' does not exist." % repodir
os._exit(1)
+ # Ensure this repo's work directories are created
+ ### Base repo dir
target_str = self._target_cfg.target_string()
self._repodir = os.path.join(repodir, target_str)
if not os.path.exists(self._repodir):
os.makedirs(self._repodir)
+ ### createrepo "cache" dir
self._repo_cache_dir = os.path.join(repodir, "cache", target_str)
if not os.path.exists(self._repo_cache_dir):
os.makedirs(self._repo_cache_dir)
+ ### SRPM HTTP upload dir
parent_cfg = self._target_cfg.parent_cfg()
upload_dir = os.path.join(parent_cfg.get_str("Directories", "server_work_dir"), "srpm_upload_dir", target_str)
if not os.path.exists(upload_dir):
@@ -56,6 +62,24 @@
self._lock_count = 0
self._stop = False
+ # We want to execute a job's first depsolve right away, but
+ # if that one fails subsequent depsolves should only happen
+ # when the repo gets updated since that's when the deps might
+ # have changed.
+ #
+ # The queues are dicts mapping buildjob->boolean, where a boolean
+ # value of True means the job's depsolve has started, and false
+ # means it hasn't.
+ self._depsolve_immediate_lock = threading.Lock()
+ self._depsolve_immediate_queue = {}
+ self._depsolve_again_lock = threading.Lock()
+ self._depsolve_again_queue = {}
+
+ # Repo unlock queue
+ self._repo_unlock_lock = threading.Lock()
+ self._repo_unlock_queue = []
+
+ # Repo script stuff
self._pobj = None
self._repo_script_start = 0
self._repo_script = None
@@ -74,19 +98,109 @@
self._lock.acquire()
self._repo_additions.append(buildjob)
# We enter lock level 1 here, preventing builders from
- # starting their 'prep' state
+ # starting their 'prep' state and jobs from depsolving
if self._lock_count == 0:
self._lock_count = 1
self._lock.release()
- def locked(self):
- # We can get away without holding _lock here...
- if self._lock_count > 0:
- return True
- return False
+ def request_depsolve(self, buildjob, first_try=False):
+ """ Registers a buildjob be notified to start depsolving when the repo is ready """
+ if first_try:
+ self._depsolve_immediate_lock.acquire()
+ self._depsolve_immediate_queue[buildjob] = False
+ self._depsolve_immediate_lock.release()
+ else:
+ self._depsolve_again_lock.acquire()
+ self._depsolve_again_queue[buildjob] = False
+ self._depsolve_again_lock.release()
+
+ def request_unlock(self, archjob):
+ self._repo_unlock_lock.acquire()
+ self._repo_unlock_queue.append(archjob)
+ self._repo_unlock_lock.release()
+
+ def cancel_unlock_request(self, archjob):
+ self._repo_unlock_lock.acquire()
+ if archjob in self._repo_unlock_queue:
+ self._repo_unlock_queue.remove(archjob)
+ self._repo_unlock_lock.release()
+
+ def _process_unlock_requests(self):
+ self._repo_unlock_lock.acquire()
+ for archjob in self._repo_unlock_queue:
+ archjob.repo_unlocked_callback()
+ self._repo_unlock_lock.release()
+
+ def _start_depsolves_for_queue(self, queue, max_jobs):
+ num = 0
+ for job in queue:
+ if queue[job]:
+ num = num + 1
+ available = max(max_jobs - num, 0)
+
+ if available > 0:
+ for job in queue.keys():
+ if available <= 0:
+ break
+ if not queue[job]:
+ queue[job] = True
+ job.start_depsolve()
+ available = available - 1
+
+ def _start_waiting_depsolves(self, repo_changed=False):
+ """ Start waiting depsolves, but only a certain number to avoid
+ nailing the build server too hard.
+ """
+ self._depsolve_immediate_lock.acquire()
+ self._depsolve_again_lock.acquire()
+
+ max_immediate_depsolves = self.MAX_DEPSOLVE_JOBS
+ if repo_changed:
+ max_again_depsolves = self.MAX_DEPSOLVE_JOBS / 2
+ max_immediate_depsolves = self.MAX_DEPSOLVE_JOBS / 2
+
+ self._start_depsolves_for_queue(self._depsolve_immediate_queue, max_immediate_depsolves)
+
+ # Only fire off non-first-try depsolves if the repo has changed
+ if repo_changed:
+ self._start_depsolves_for_queue(self._depsolve_again_queue, max_again_depsolves)
+
+ self._depsolve_again_lock.release()
+ self._depsolve_immediate_lock.release()
+
+ def notify_depsolve_done(self, buildjob):
+ """ Notifies the repo that a job is done depsolving """
+ self._depsolve_immediate_lock.acquire()
+ self._depsolve_again_lock.acquire()
+ if buildjob in self._depsolve_immediate_queue:
+ del self._depsolve_immediate_queue[buildjob]
+ elif buildjob in self._depsolve_again_queue:
+ del self._depsolve_again_queue[buildjob]
+ self._depsolve_again_lock.release()
+ self._depsolve_immediate_lock.release()
+
+ def _any_depsolving_jobs(self):
+ """ Determines if any jobs are currently depsolving """
+ any_depsolving = False
+
+ self._depsolve_immediate_lock.acquire()
+ self._depsolve_again_lock.acquire()
+ for job in self._depsolve_immediate_queue.keys():
+ if self._depsolve_immediate_queue[job]:
+ any_depsolving = True
+ break
+ if not any_depsolving:
+ for job in self._depsolve_again_queue.keys():
+ if self._depsolve_again_queue[job]:
+ any_depsolving = True
+ break
+ self._depsolve_again_lock.release()
+ self._depsolve_immediate_lock.release()
+
+ return any_depsolving
def _update_repo(self):
- """ Copy new RPMS to each repo, and update each repo at the end """
+ """ Copy new RPMS to the repo, and updates the repo at the end """
for buildjob in self._repo_additions:
# Ensure all the files are accessible
success = True
@@ -118,12 +232,12 @@
(s, o) = commands.getstatusoutput('/usr/bin/createrepo -q -c %s -x "*.src.rpm" -x "*.debuginfo.rpm" %s' % (self._repo_cache_dir, self._repodir))
if s != 0:
- print "Error: createrepo failed with exit status %d! Output: '%s'" % (s, o)
+ print "Repo Error (%s): createrepo failed with exit status %d! Output: '%s'" % (self._target_cfg.target_string(), s, o)
- def _run_repo_script(self):
+ def _start_repo_script(self):
target_str = self._target_cfg.target_string()
cmd = "%s %s" % (self._repo_script, target_str)
- print "Repo '%s': executing repository script %s" % (target_str, self._repo_script)
+ print "Repo Error (%s): executing repository script %s" % (target_str, self._repo_script)
self._pobj = popen2.Popen4(cmd=cmd)
fcntl.fcntl(self._pobj.fromchild.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
self._repo_script_start = time.time()
@@ -178,8 +292,9 @@
self._pobj = None
def run(self):
+ repo_changed = False
while self._stop == False:
- # We have 3 lock levels:
+ # We have 3 locked operations, signified by self._lock_count:
#
# 0 - repo unlocked
# 1 - entered when jobs request packages to be copied to the repo;
@@ -188,37 +303,44 @@
# packages copied to repo and createrepo is run
# 3 - entered when createrepo is done; repo script run
- prepping_builders = self._builder_manager.any_prepping_builders()
-
self._lock.acquire()
- # Level 2: update the repo
- if self._lock_count == 2:
+ if self._lock_count == 0:
+ # Notify archjobs that the repo is unlocked
+ self._process_unlock_requests()
+
+ # Kick off depsolves
+ self._start_waiting_depsolves(repo_changed)
+ if repo_changed:
+ repo_changed = False
+ elif self._lock_count == 1:
+ # Enter lock level 2 if there are no builders in the
+ # 'prep' state, no jobs are depsolving, and we are already at lock level 1
+ prepping_builders = self._builder_manager.any_prepping_builders()
+ depsolving_jobs = self._any_depsolving_jobs()
+ if not prepping_builders and not depsolving_jobs:
+ self._lock_count = 2
+ elif self._lock_count == 2:
+ # Level 2: update the repo
target_str = self._target_cfg.target_string()
print "Repo '%s': updating repository metadata..." % target_str
self._update_repo()
+ repo_changed = True
print "Repo '%s': Done updating." % target_str
- # If there's a repo script for this target, enter level 3
+ # Run the repo script, if any
+ self._pobj = None
if self._repo_script:
- self._run_repo_script()
- self._lock_count = 3
- else:
- self._lock_count = 0
-
- # Level 3: monitor the repo script
- if self._lock_count == 3:
+ self._start_repo_script()
+ self._lock_count = 3
+ elif self._lock_count == 3:
+ # Level 3: monitor the repo script
if self._pobj:
self._monitor_repo_script()
else:
# If for some reason self._pobj is None, unlock the repo
self._lock_count = 0
- # Enter lock level 2 if there are no builders in the
- # 'prep' state and we are already at lock level 1
- if not prepping_builders and self._lock_count == 1:
- self._lock_count = 2
-
self._lock.release()
time.sleep(5)
More information about the scm-commits
mailing list