Repository : http://git.fedorahosted.org/cgit/copr.git
On branch : master
commit 1cfded9339138e940b588ab2e346164f31677079 Author: Seth Vidal skvidal@fedoraproject.org Date: Tue Jan 15 11:35:24 2013 -0500
first cut and refactoring for event queue and better logging
backend/dispatcher.py | 33 +++++++--- copr-be.py | 161 +++++++++++++++++++++++++++++++++---------------- 2 files changed, 131 insertions(+), 63 deletions(-)
diff --git a/backend/dispatcher.py b/backend/dispatcher.py index 92395ce..1485285 100644 --- a/backend/dispatcher.py +++ b/backend/dispatcher.py @@ -68,18 +68,16 @@ class WorkerCallback(object): self.logfile = logfile
def log(self, msg): - if not self.logfile: - return - - now = time.strftime('%F %T') - try: - open(self.logfile, 'a').write(str(now) + ': ' + msg + '\n') - except (IOError, OSError), e: - print >>sys.stderr, 'Could not write to logfile %s - %s' % (self.logfile, str(e)) + if self.logfile: + now = time.strftime('%F %T') + try: + open(self.logfile, 'a').write(str(now) + ': ' + msg + '\n') + except (IOError, OSError), e: + print >>sys.stderr, 'Could not write to logfile %s - %s' % (self.logfile, str(e))
class Worker(multiprocessing.Process): - def __init__(self, opts, jobs, worker_num, ip=None, create=True, callback=None): + def __init__(self, opts, jobs, events, worker_num, ip=None, create=True, callback=None):
# base class initialization multiprocessing.Process.__init__(self, name="worker-builder") @@ -87,6 +85,7 @@ class Worker(multiprocessing.Process):
# job management stuff self.jobs = jobs + self.events = events # event queue for communicating back to dispatcher self.worker_num = worker_num self.ip = ip self.opts = opts @@ -99,11 +98,23 @@ class Worker(multiprocessing.Process):
if ip: self.callback.log('creating worker: %s' % ip) + self.event('creating worker: %s' % ip) else: self.callback.log('creating worker: dynamic ip') + self.event('creating worker: dynamic ip') + + def event(self, what): + if self.ip: + who = 'worker-%s-%s' % (self.worker_num, self.ip) + else: + who = 'worker-%s' % (self.worker_num) + + self.events.put({'when':time.time(), 'who':who, 'what':what})
def spawn_instance(self): """call the spawn playbook to startup/provision a building instance""" + + self.callback.log('spawning instance begin') start = time.time()
@@ -195,6 +206,7 @@ class Worker(multiprocessing.Process): # maybe we move this to the callback? def mark_started(self, job):
+ self.event('job start: user:%s copr:%s build:%s ip:%s pid:%s' % (job.user_id, job.copr_id, job.build_id, self.ip, self.pid)) build = {'id':job.build_id, 'started_on': job.started_on, 'results': job.results, @@ -206,8 +218,9 @@ class Worker(multiprocessing.Process):
# maybe we move this to the callback? def return_results(self, job): - self.callback.log('%s status %s. Took %s seconds' % (job.build_id, job.status, job.ended_on - job.started_on)) + self.event('job end: user:%s copr:%s build:%s ip:%s pid:%s' % (job.user_id, job.copr_id, job.build_id, self.ip, self.pid))
+ self.callback.log('%s status %s. Took %s seconds' % (job.build_id, job.status, job.ended_on - job.started_on)) build = {'id':job.build_id, 'ended_on': job.ended_on, 'status': job.status, diff --git a/copr-be.py b/copr-be.py index 6631ac4..9391968 100644 --- a/copr-be.py +++ b/copr-be.py @@ -21,7 +21,98 @@ def _get_conf(cp, section, option, default): return default
+ +class CoprJobGrab(multiprocessing.Process): + """Fetch jobs from the Frontend - submit them to the jobs queue for workers""" + + def __init__(self, opts, events, jobs): + self.opts = opts + self.events = events + self.jobs = jobs + self.added_jobs = [] + + def event(self, what): + self.events.put({'when':time.time(), 'who':'job', 'what':what}) + + def fetch_jobs(self): + self.event('fetching jobs') + try: + r = requests.get('%s/waiting_builds/' % self.opts.frontend_url) # auth stuff here? maybe/maybenot + except requests.RequestException, e: + self.event('Error retrieving jobs from %s: %s' % (self.opts.frontend_url, e)) + else: + try: + r_json = json.loads(r.content) # using old requests on el6 :( + except ValueError, e: + self.event('Error getting JSON build list from FE %s' % e) + return + + if 'builds' in r_json and r_json['builds']: + self.event('%s jobs returned' % len(r_json['builds'])) + count = 0 + for b in r_json['builds']: + if 'id' in b: + jobfile = self.opts.jobsdir + '/%s.json' % b['id'] + if not os.path.exists(jobfile) and b['id'] not in self.added_jobs: + count += 1 + open(jobfile, 'w').write(json.dumps(b)) + self.event('Wrote job: %s' % b['id']) + if count: + self.event('New jobs: %s' % count) + + def run(self): + abort = False + while not abort: + self.fetch_jobs() + for f in sorted(glob.glob(self.opts.jobsdir + '/*.json')): + n = os.path.basename(f).replace('.json', '') + if n not in self.added_jobs: + self.jobs.put(f) + self.added_jobs.append(n) + self.event('adding to work queue id %s' % n) + +class CoprLog(multiprocessing.Process): + """log mechanism where items from the events queue get recorded""" + def __init__(self, opts, events): + + # base class initialization + multiprocessing.Process.__init__(self, name="logger") + + self.opts = opts + self.events = events + + logdir = os.path.dirname(self.opts.logfile) + if not os.path.exists(logdir): + os.makedirs(logdir, mode=0750) + + if not os.path.exists(self.opts.destdir): + os.makedirs(self.opts.destdir, mode=0755) + + # setup a log file to write to + self.logfile = self.opts.logfile + + def log(self, event): + + when = time.strftime('%F %T', time.gmtime(event['when'])) + msg = '%s : %s %s' % (when, event['who'], event['what'].strip()) + + try: + open(self.logfile, 'a').write(msg + '\n') + except (IOError, OSError), e: + print >>sys.stderr, 'Could not write to logfile %s - %s' % (self.logfile, str(e)) + + + # event format is a dict {when:time, who:[worker|logger|job|main], what:str} + def run(self): + abort = False + while not abort: + for e in self.events.get(): + if 'when' in e and 'who' in e and 'what' in e: + self.log(e) + class CoprBackend(object): + """core process - starts/stops/initializes workers""" + def __init__(self, config_file=None, ext_opts=None): # read in config file # put all the config items into a single self.opts bunch @@ -33,25 +124,28 @@ class CoprBackend(object): self.ext_opts = ext_opts # to stow our cli options for read_conf() self.opts = self.read_conf()
- logdir = os.path.dirname(self.opts.logfile) - if not os.path.exists(logdir): - os.makedirs(logdir, mode=0750) + self.jobs = multiprocessing.Queue() # job is a path to a jobfile on the localfs + self.events = multiprocessing.Queue() + # event format is a dict {when:time, who:[worker|logger|job|main], what:str}
- if not os.path.exists(self.opts.destdir): - os.makedirs(self.opts.destdir, mode=0755)
- # setup a log file to write to - self.logfile = self.opts.logfile - self.log("Starting up new copr-be instance") + # create logger + self._logger = CoprLog(self.opts, self.events) + self._logger.start()
+ self.event('Starting up Job Grabber') + # create job grabber + self._jobgrab = CoprJobGrab(self.opts, self.events, self.jobs) + self._jobgrab.start()
if not os.path.exists(self.opts.worker_logdir): os.makedirs(self.opts.worker_logdir, mode=0750)
- self.jobs = multiprocessing.Queue() self.workers = [] self.added_jobs = []
+ def event(self, what): + self.events.put({'when':time.time(), 'who':'main', 'what':what})
def read_conf(self): "read in config file - return Bunch of config data" @@ -90,64 +184,25 @@ class CoprBackend(object): return opts
- def log(self, msg): - now = time.strftime('%F %T') - output = str(now) + ': ' + msg - if not self.opts.daemonize: - print output - - try: - open(self.logfile, 'a').write(output + '\n') - except (IOError, OSError), e: - print >>sys.stderr, 'Could not write to logfile %s - %s' % (self.logfile, str(e)) - - - def fetch_jobs(self): - self.log('fetching jobs') - try: - r = requests.get('%s/waiting_builds/' % self.opts.frontend_url) # auth stuff here? maybe/maybenot - except requests.RequestException, e: - self.log('Error retrieving jobs from %s: %s' % (self.opts.frontend_url, e)) - else: - r_json = json.loads(r.content) # using old requests on el6 :( - if 'builds' in r_json: - self.log('%s jobs returned' % len(r_json['builds'])) - count = 0 - for b in r_json['builds']: - if 'id' in b: - jobfile = self.opts.jobsdir + '/%s.json' % b['id'] - if not os.path.exists(jobfile) and b['id'] not in self.added_jobs: - count += 1 - open(jobfile, 'w').write(json.dumps(b)) - self.log('Wrote job: %s' % b['id']) - self.log('New jobs: %s' % count)
def run(self):
abort = False while not abort: - self.fetch_jobs() - for f in sorted(glob.glob(self.opts.jobsdir + '/*.json')): - n = os.path.basename(f).replace('.json', '') - if n not in self.added_jobs: - self.jobs.put(f) - self.added_jobs.append(n) - self.log('adding to work queue id %s' % n) - # re-read config into opts self.opts = self.read_conf()
if self.jobs.qsize(): - self.log("# jobs in queue: %s" % self.jobs.qsize()) + self.event("# jobs in queue: %s" % self.jobs.qsize()) # this handles starting/growing the number of workers if len(self.workers) < self.opts.num_workers: - self.log("Spinning up more workers for jobs") + self.event("Spinning up more workers for jobs") for i in range(self.opts.num_workers - len(self.workers)): worker_num = len(self.workers) + 1 - w = Worker(self.opts, self.jobs, worker_num) + w = Worker(self.opts, self.jobs, self.events, worker_num) self.workers.append(w) w.start() - self.log("Finished starting worker processes") + self.event("Finished starting worker processes") # FIXME - prune out workers #if len(self.workers) > self.opts.num_workers: # killnum = len(self.workers) - self.opts.num_workers @@ -158,7 +213,7 @@ class CoprBackend(object): # check for dead workers and abort for w in self.workers: if not w.is_alive(): - self.log('Worker %d died unexpectedly' % w.worker_num) + self.event('Worker %d died unexpectedly' % w.worker_num) if self.opts.exit_on_worker: raise errors.CoprBackendError, "Worker died unexpectedly, exiting" else:
copr-devel@lists.fedorahosted.org