Repository :
http://git.fedorahosted.org/cgit/copr.git
On branch : skvidal-backend
---------------------------------------------------------------
commit 75b9e88a80b3c3d379c2a4ad3866e405e4e91de4
Author: Seth Vidal <skvidal(a)fedoraproject.org>
Date: Thu Nov 15 02:24:56 2012 -0500
add errors file for exceptions separately
continue to flesh out functionality
---------------------------------------------------------------
backend/dispatcher.py | 148 ++++++++++++++++++++++++++++++++-----------------
backend/errors.py | 12 ++++
copr-be.py | 99 +++++++++++++++-----------------
3 files changed, 154 insertions(+), 105 deletions(-)
diff --git a/backend/dispatcher.py b/backend/dispatcher.py
index 748e519..4948512 100644
--- a/backend/dispatcher.py
+++ b/backend/dispatcher.py
@@ -1,18 +1,18 @@
#!/usr/bin/python -tt
-import sys
import os
-import glob
-import subprocess
+import sys
import multiprocessing
import time
import Queue
import json
import mockremote
from bunch import Bunch
+import errors
import ansible
import ansible.playbook
+import ansible.errors
from ansible import callbacks
@@ -62,25 +62,21 @@ class SilentPlaybookCallbacks(callbacks.object):
def on_stats(self, stats):
callbacks.call_callback_module('playbook_on_stats', stats)
-def spawn_instance(opts, ip):
- # FIXME - setup silent callbacks
- # - check errors in setup
- # - playbook variablized
- stats = callbacks.AggregateStats()
- playbook_cb = SilentPlaybookCallbacks(verbose=False)
- runner_cb = callbacks.DefaultRunnerCallbacks()
- play = ansible.playbook.PlayBook(stats=stats,
playbook='/srv/copr-work/provision/builderpb.yml',
- callbacks=playbook_cb, runner_callbacks=runner_cb,
remote_user='root')
+class WorkerCallback(object):
+ def __init__(self, logfile=None):
+ self.logfile = logfile
+
+ def log(self, msg):
+ if not self.logfile:
+ return
- play.run()
- if ip:
- return ip
+ now = time.time()
+ try:
+ open(self.logfile, 'a').write(str(now) + ':' + msg +
'\n')
+ except (IOError, OSError), e:
+ print >>sys.stderr, 'Could not write to logfile %s - %s' %
(self.logfile, str(e))
- for i in play.SETUP_CACHE:
- if i =='localhost':
- continue
- return i
class Worker(multiprocessing.Process):
def __init__(self, opts, jobs, ip=None, create=True, callback=None):
@@ -88,17 +84,63 @@ class Worker(multiprocessing.Process):
# base class initialization
multiprocessing.Process.__init__(self, name="worker-builder")
+
# job management stuff
self.jobs = jobs
self.ip = ip
self.opts = opts
self.kill_received = False
- print 'creating worker: %s' % ip
-
-
- def parse_job(job):
+ self.callback = callback
+ if not self.callback:
+ lf = self.opts.worker_logdir + '/worker-%s.log' % self.pid
+ self.callback = WorkerCallback(logfile = lf)
+
+ self.callback.log('creating worker: %s' % ip)
+
+ def spawn_instance(self):
+
+ stats = callbacks.AggregateStats()
+ playbook_cb = SilentPlaybookCallbacks(verbose=False)
+ runner_cb = callbacks.DefaultRunnerCallbacks()
+ # fixme - extra_vars to include ip as a var if we need to specify ips
+ # also to include info for instance type to handle the memory requirements of
builds
+ play = ansible.playbook.PlayBook(stats=stats, playbook=self.opts.playbook,
+ callbacks=playbook_cb, runner_callbacks=runner_cb,
+ remote_user='root')
+
+ play.run()
+ if self.ip:
+ return self.ip
+
+ for i in play.SETUP_CACHE:
+ if i =='localhost':
+ continue
+ return i
+ return None
+
+ def parse_job(self, jobfile):
# read the json of the job in
- # break out what we need return a structured
+ # break out what we need return a bunch of the info we need
+ d = json.load(open(jobfile))
+ build = d['builds'][0]
+ jobdata = Bunch()
+ jobdata.pkgs = build['pkgs'].split(' ')
+ jobdata.repos = build['repos'].split(' ')
+ jobdata.chroots = build['chroots'].split(' ')
+ jobdata.memory_reqs = build['memory_reqs']
+ jobdata.timeout = build['timeout']
+ jobdata.destdir = self.opts.destdir + '/' +
build['copr']['owner']['name'] + '/' +
build['copr']['name'] + '/'
+ jobdata.build_id = build['id']
+ jobdata.copr_id = build['copr']['id']
+ jobdata.user_id = build['user_id']
+ return jobdata
+
+ def return_results(self, job):
+ self.log('%s status %s. Took %s seconds' % (job.id, job.status,
build.ended_on - build.startedon)
+ os.unlink(job.jobfile)
+ #FIXME - this should either return job status/results
+ # into a queue or it should submit results directly to the frontend
+
def run(self):
# worker should startup and check if it can function
# for each job it takes from the jobs queue
@@ -108,41 +150,43 @@ class Worker(multiprocessing.Process):
while not self.kill_received:
try:
- job = self.jobs.get()
+ jobfile = self.jobs.get()
except Queue.Empty:
break
- self.cur_job = job
- f = open(self.opts.get('destdir', '/') + '/' + job,
'w')
- f.write('')
- f.close()
-
# parse the job json into our info
- # pkgs
- # repos
- # chroot(s)
- # memory needed
- # timeout
- # make up a destdir
-
- #print 'start up instance %s using %s' % (self.ip,
self.opts.get('playbook', None))
- ip = spawn_instance(self.opts, ip=ip)
+ job = self.parse_job(jobfile)
- destdir = construct_something_here
+ job.jobfile = jobfile
+ # spin up our build instance
try:
- mr = mockremote.MockRemote(builder=ip, timeout=timeout,
- destdir=destdir, chroot=chroot, cont=True, recurse=True,
- repos=repos, callback=None)
- mr.build_pkgs(pkgs)
- except mockremote.MockRemoteError, e:
- # record and break
- print '%s - %s' % (ip, e)
- break
+ ip = self.spawn_instance()
+ if not ip:
+ raise errors.CoprWorkerError, "No IP found from creating
instance"
+
+ except ansible.errors.AnsibleError, e:
+ self.callback.log('failure to setup instance: %s' % e)
+ raise
+
+ status = 1
+ job.started_on = time.time()
+ for chroot in job.chroots:
+ self.callback.log('mockremote on %s - %s' % (ip, jobfile))
+ try:
+ mr = mockremote.MockRemote(builder=ip, timeout=job.timeout,
+ destdir=job.destdir, chroot=chroot, cont=True, recurse=True,
+ repos=job.repos, callback=None)
+ mr.build_pkgs(job.pkgs)
+ except mockremote.MockRemoteError, e:
+ # record and break
+ self.callback.log('%s - %s' % (ip, e))
+ status = 0 # failure
+
+ job.ended_on = time.time()
+ job.status = status
+ self.return_results(job)
+
- # run mockremote to that ip with the args from above
- print 'mockremote on %s - %s' % (ip, job)
- time.sleep(30)
- #print 'terminate-instance %s' % (self. ip)
diff --git a/backend/errors.py b/backend/errors.py
new file mode 100644
index 0000000..ae8ac34
--- /dev/null
+++ b/backend/errors.py
@@ -0,0 +1,12 @@
+# copr error/exceptions
+class CoprBackendError(Exception):
+
+ def __init__(self, msg):
+ self.msg = msg
+
+ def __str__(self):
+ return self.msg
+
+class CoprWorkerError(CoprBackendError):
+ pass
+
diff --git a/copr-be.py b/copr-be.py
index 4e8945b..39ebb48 100644
--- a/copr-be.py
+++ b/copr-be.py
@@ -5,9 +5,9 @@ import sys
import os
import glob
import time
-import json
import multiprocessing
from backend.dispatcher import Worker
+from backend import errors
from bunch import Bunch
from ConfigParser import ConfigParser
@@ -17,13 +17,6 @@ def _get_conf(cp, section, option, default):
return cp.get(section, option)
return default
-class CoprBackendError(Exception):
-
- def __init__(self, msg):
- self.msg = msg
-
- def __str__(self):
- return self.msg
class CoprBackend(object):
def __init__(self, config_file=None):
@@ -31,7 +24,7 @@ class CoprBackend(object):
# put all the config items into a single self.opts bunch
if not config_file:
- raise CoprBackendError, "Must specify config_file"
+ raise errors.CoprBackendError, "Must specify config_file"
self.config_file = config_file
self.opts = self.read_config()
@@ -58,73 +51,73 @@ class CoprBackend(object):
opts.sleeptime = int(_get_conf(cp, 'backend', 'sleeptime',
10))
opts.num_workers = int(_get_conf(cp, 'backend',
'num_workers', 8))
opts.timeout = int(_get_conf(cp, 'builder', 'timeout',
1800))
- opts.logfile = _get_conf(cp, 'backend', 'logfile',
'/var/log/copr-be.log')
+ opts.logfile = _get_conf(cp, 'backend', 'logfile',
'/var/log/copr/backend.log')
+ opts.worker_logdir = _get_conf(cp, 'backend',
'worker_logdir', '/var/log/copr/worker/')
# thoughts for later
# ssh key for connecting to builders?
# cloud key stuff?
#
except ConfigParser.Error, e:
- raise CoprBackendError, 'Error parsing config file: %s: %s' %
(self.config_file, e)
+ raise errors.CoprBackendError, 'Error parsing config file: %s: %s' %
(self.config_file, e)
if not opts.jobsdir or not opts.destdir:
- raise CoprBackendError, "Incomplete Config - must specify jobsdir and
destdir in configuration"
+ raise errors.CoprBackendError, "Incomplete Config - must specify jobsdir
and destdir in configuration"
return opts
def log(self, msg):
- if self.logfile:
- now = time.time()
- try:
- open(self.logfn, 'a').write(str(now) + ':' + msg +
'\n')
- except (IOError, OSError), e:
- print >>sys.stderr, 'Could not write to logfile %s - %s' %
(self.lf, str(e))
- if not self.quiet:
- print msg
+ now = time.time()
+ try:
+ open(self.logfile, 'a').write(str(now) + ':' + msg +
'\n')
+ except (IOError, OSError), e:
+ print >>sys.stderr, 'Could not write to logfile %s - %s' %
(self.logfile, str(e))
def run(self):
- # start processing builds, etc
- # setup and run our workers
- for i in range(opts.num_workers):
- w = backend.dispatcher.Worker(opts, jobs)
- workers.append(w)
- w.start()
abort = False
while not abort:
print 'adding jobs'
- for f in sorted(glob.glob(jobsdir + '/*.json')):
+ for f in sorted(glob.glob(self.opts.jobsdir + '/*.json')):
n = os.path.basename(f).replace('.json', '')
- if not is_completed(n) and n not in added:
- #jobdata = open(f).read()
- jobs.put(n)
- added.append(n)
- print 'adding %s' % n
-
-
-
- print "# jobs in queue: %s" % jobs.qsize()
-
-
- # FIXME:
- # look up number of workers in config
- # see if it changed and update accordingly?
- # poison pill? if opts.num_workers < len(workers)?
- time.sleep(opts.sleeptime)
+ if n not in self.added_jobs:
+ self.jobs.put(f)
+ self.added_jobs.append(n)
+ self.log('adding %s' % n)
+
+ # this handles starting/growing the number of workers
+ if len(self.workers) < self.opts.num_workers:
+ for i in range(self.opts.num_workers - len(self.workers)):
+ w = dispatcher.Worker(self.opts, self.jobs)
+ self.workers.append(w)
+ w.start()
+ # FIXME - prune out workers
+ #if len(self.workers) > self.opts.num_workers:
+ # killnum = len(self.workers) - self.opts.num_workers
+ # for w in self.workers[:killnum]:
+ # #insert a poison pill? Kill after something? I dunno.
+ # FIXME - if a worker bombs out - we need to check them
+ # and startup a new one if it happens
+
+ self.log("# jobs in queue: %s" % jobs.qsize())
+
+ time.sleep(self.opts.sleeptime)
-def is_completed(jobid):
-
- if glob.glob(destdir + '/' + jobid + '*'):
- return True
- return False
-
def main(args):
-
-
-
+ if len(args) < 1 or not os.path.exists(args[0]):
+ print 'Must pass in config file'
+ sys.exit(1)
+ try:
+ cbe = CoprBackend(args[0])
+ cbe.run()
+ except Exception, e:
+ print 'Killing/Dying'
+ for w in cbe.workers:
+ w.terminate()
+ raise
if __name__ == '__main__':
@@ -134,6 +127,6 @@ if __name__ == '__main__':
print "ERROR: %s - %s" % (str(type(e)), str(e))
sys.exit(1)
except KeyboardInterrupt, e:
- print "\nUser cancelled, need cleanup\n"
+ print "\nUser cancelled, may need cleanup\n"
sys.exit(0)