Author: tmckay Date: 2010-12-21 20:15:51 +0000 (Tue, 21 Dec 2010) New Revision: 4443
Modified: branches/scale_testing/cumin/bin/cumin-data branches/scale_testing/cumin/python/cumin/config.py branches/scale_testing/mint/python/mint/main.py branches/scale_testing/mint/python/mint/session.py branches/scale_testing/mint/python/mint/update.py Log: Changes to collect stats in CSV files for scale testing.
Modified: branches/scale_testing/cumin/bin/cumin-data =================================================================== --- branches/scale_testing/cumin/bin/cumin-data 2010-12-20 14:12:38 UTC (rev 4442) +++ branches/scale_testing/cumin/bin/cumin-data 2010-12-21 20:15:51 UTC (rev 4443) @@ -16,6 +16,8 @@
parser = CuminOptionParser(values) parser.add_option("--print-stats", action="store_true") + parser.add_option("--scale-stats", action="store_true") + parser.add_option("--object-stats", action="store_true") parser.add_option("--print-events", type="int", default=0, metavar="LEVEL")
opts, args = parser.parse_args() @@ -26,7 +28,7 @@
broker_uris = [x.strip() for x in opts.brokers.split(",")]
- mint = Mint(model_dir, broker_uris, opts.database) + mint = Mint(model_dir, broker_uris, opts.database, values.queue_size)
mint.print_event_level = opts.print_events
@@ -65,19 +67,40 @@ if opts.print_stats: print "[Starred columns are the number of events per second]"
+ # set up for scale stats here + if opts.scale_stats: + stats.init_stats_path(mint, config.home) + + if opts.print_stats or \ + opts.scale_stats or \ + opts.object_stats: while True: - if count % 20 == 0: - stats.print_headings() + stats.capture() + if opts.print_stats: + if count % 20 == 0: + stats.print_headings() + count += 1 + stats.print_values() + + if opts.object_stats: + stats.print_values_by_class(mint)
- count += 1 + if opts.scale_stats: + stats.write_values_by_class(mint, time.localtime()) + stats.write_values(time.localtime())
- stats.print_values() - sleep(5) else: while True: sleep(86400) + except Exception as inst: + print inst + finally: + # close the scale stats CSV file here: + if opts.scale_stats: + pass + mint.stop()
if __name__ == "__main__":
Modified: branches/scale_testing/cumin/python/cumin/config.py =================================================================== --- branches/scale_testing/cumin/python/cumin/config.py 2010-12-20 14:12:38 UTC (rev 4442) +++ branches/scale_testing/cumin/python/cumin/config.py 2010-12-21 20:15:51 UTC (rev 4443) @@ -56,6 +56,9 @@ param = ConfigParameter(data, "vacuum-interval", int) param.default = 60 * 60 # 1 hour
+ param = ConfigParameter(data, "queue-size", int) + param.default = 1000 + def parse(self): paths = list()
Modified: branches/scale_testing/mint/python/mint/main.py =================================================================== --- branches/scale_testing/mint/python/mint/main.py 2010-12-20 14:12:38 UTC (rev 4442) +++ branches/scale_testing/mint/python/mint/main.py 2010-12-21 20:15:51 UTC (rev 4443) @@ -10,14 +10,14 @@ log = logging.getLogger("mint.main")
class Mint(object): - def __init__(self, model_dir, broker_uris, database_dsn): + def __init__(self, model_dir, broker_uris, database_dsn, queue_max=1000): self.model = MintModel(self, model_dir) self.model.sql_logging_enabled = False
self.session = MintSession(self, broker_uris) self.database = MintDatabase(self, database_dsn)
- self.update_thread = UpdateThread(self) + self.update_thread = UpdateThread(self,queue_max)
self.expire_enabled = True self.expire_thread = ExpireThread(self)
Modified: branches/scale_testing/mint/python/mint/session.py =================================================================== --- branches/scale_testing/mint/python/mint/session.py 2010-12-20 14:12:38 UTC (rev 4442) +++ branches/scale_testing/mint/python/mint/session.py 2010-12-21 20:15:51 UTC (rev 4443) @@ -64,14 +64,17 @@ def brokerConnected(self, qmf_broker): message = "Broker %s:%i is connected" self.model.print_event(1, message, qmf_broker.host, qmf_broker.port) + self.model.app.update_thread.ignore()
def brokerInfo(self, qmf_broker): message = "Broker info from %s:%i" self.model.print_event(1, message, qmf_broker.host, qmf_broker.port) + self.model.app.update_thread.ignore()
def brokerDisconnected(self, qmf_broker): message = "Broker %s:%i is disconnected" self.model.print_event(1, message, qmf_broker.host, qmf_broker.port) + self.model.app.update_thread.ignore()
def newAgent(self, qmf_agent): self.model.print_event(3, "Creating %s", qmf_agent) @@ -94,9 +97,11 @@
def newPackage(self, name): self.model.print_event(2, "New package %s", name) + self.model.app.update_thread.ignore()
def newClass(self, kind, classKey): self.model.print_event(2, "New class %s", classKey) + self.model.app.update_thread.ignore()
def objectProps(self, broker, qmf_object): up = ObjectUpdate(self.model, qmf_object) @@ -108,6 +113,7 @@
def event(self, broker, event): self.model.print_event(4, "New event %s from %s", broker, event) + self.model.app.update_thread.ignore()
def methodResponse(self, broker, seq, response): message = "Method response for request %i received from %s" @@ -121,3 +127,4 @@ callback(response.text, response.outArgs) finally: self.model.lock.release() + self.model.app.update_thread.method_response()
Modified: branches/scale_testing/mint/python/mint/update.py =================================================================== --- branches/scale_testing/mint/python/mint/update.py 2010-12-20 14:12:38 UTC (rev 4442) +++ branches/scale_testing/mint/python/mint/update.py 2010-12-21 20:15:51 UTC (rev 4443) @@ -1,6 +1,8 @@ import copy import resource import pickle +import os.path +import csv
from psycopg2 import IntegrityError, TimestampFromTicks from psycopg2.extensions import cursor as Cursor @@ -15,10 +17,10 @@ sample_window_max = 60 * 5
class UpdateThread(MintDaemonThread): - def __init__(self, app): + def __init__(self, app, queue_max=1000): super(UpdateThread, self).__init__(app)
- self.updates = ConcurrentQueue(maxsize=1000) + self.updates = ConcurrentQueue(maxsize=queue_max) self.stats = UpdateStats(self.app)
self.conn = None @@ -33,24 +35,37 @@ self.cursor.stats = self.stats
def enqueue(self, update): + update.enqueue_time = time.time() + self.updates.put(update) - + self.stats.enqueued += 1
+ def ignore(self): + self.stats.ignore += 1 + + def method_response(self): + self.stats.method_response +=1 + def run(self): - while True: - if self.stop_requested: - break + try: + while True: + if self.stop_requested: + break
- try: - update = self.updates.get(True, 1) - except Empty: - continue + try: + update = self.updates.get(True) #, 1) + except Empty: + continue + update.dequeue_time = time.time() + self.stats.dequeued += 1
- self.stats.dequeued += 1 + update.process(self) + self.conn.commit() + + except: + print "uh oh"
- update.process(self) - class UpdateStats(object): group_names = ("Updates", "Agents", "Objects") groups = "%43s | %32s | %32s |" % group_names @@ -60,6 +75,7 @@ "*Created", "*Updated", "*Deleted", "*Created", "*Updated", "*Deleted", "*Sql Ops", "Errors", "Cpu (%)", "Mem (M)") + headings_fmt = \ "%10s %10s %10s %10s | " + \ "%10s %10s %10s | " + \ @@ -67,7 +83,6 @@ "%10s %10s %10s %10s" headings = headings_fmt % heading_names
- values_fmt = \ "%10i %10.1f %10.1f %10.1f | " + \ "%10.1f %10.1f %10.1f | " + \ @@ -81,6 +96,8 @@ self.enqueued = 0 self.dequeued = 0 self.dropped = 0 + self.ignore = 0 + self.method_response = 0
self.agents_created = 0 self.agents_updated = 0 @@ -93,6 +110,7 @@ self.objects_created_by_class = defaultdict(int) self.objects_updated_by_class = defaultdict(int) self.objects_deleted_by_class = defaultdict(int) + self.objects_dropped_by_class = defaultdict(int)
self.sql_ops = 0 self.errors = 0 @@ -101,11 +119,41 @@ self.cpu = 0 self.memory = 0
+ self.init_update_times() + + def init_update_times(self): + # Track average time of update from arrival to + # processing complete. Refreshed per interval. + self.update_queue_duration = 0 + self.update_process_duration = 0 + self.queue_time = 0 + self.initial_process_time = 0 + self.commit_time = 0 + self.record_samples = 0 + self.init_update_flag = False + + def record_update_times(self,update): + try: + if self.init_update_flag: + self.init_update_times() + self.record_samples += 1 + self.update_queue_duration += \ + update.start_process - update.create_time + self.update_process_duration += \ + update.finish_process - update.start_process + self.queue_time += \ + update.dequeue_time - update.enqueue_time + self.initial_process_time += \ + update.after_process - update.start_process + self.commit_time += \ + update.after_commit - update.after_process + except: + print "record update exception" + def capture(self): now = copy.copy(self)
now.time = time.time() - rusage = resource.getrusage(resource.RUSAGE_SELF)
now.cpu = rusage[0] + rusage[1] @@ -113,6 +161,7 @@
UpdateStats.then = UpdateStats.now UpdateStats.now = now + self.init_update_flag = True
def get_resident_pages(self): try: @@ -126,12 +175,7 @@ print self.groups print self.headings
- def print_values(self): - self.capture() - - if not self.then: - return - + def _gather_values(self): values = [self.now.enqueued - self.then.enqueued, self.now.dequeued - self.then.dequeued, self.now.dropped - self.then.dropped, @@ -153,23 +197,118 @@ values.append(self.errors) values.append(int((self.now.cpu - self.then.cpu) / secs * 100)) values.append(self.now.memory / 1000000.0) + return values
- print self.values_fmt % tuple(values) + def print_values(self): +# moved capture call to cumin-data so that write_values can +# be run as well +# self.capture() + if not self.then: + return
- def print_values_by_class(self): - names = ("Class", "Created", "Updated", "Deleted") - print "%20s %10s %10s %10s" % names + print self.values_fmt % tuple(self._gather_values())
+ def print_values_by_class(self,mint): + names = ("Class", "Created", "Updated", "Deleted", "Dropped") + print "%20s %10s %10s %10s %10s" % names + for pkg in mint.model._packages: for cls in pkg._classes: - created = stats.created_by_class[cls] - updated = stats.updated_by_class[cls] - deleted = stats.deleted_by_class[cls] + created = self.objects_created_by_class[cls] + updated = self.objects_updated_by_class[cls] + deleted = self.objects_deleted_by_class[cls] + dropped = self.objects_dropped_by_class[cls]
+ if created or updated or deleted or dropped: + args = (cls._name, created, updated, deleted, dropped) + print "%-20s %10i %10i %10i %10i" % args + + def init_stats_path(self, mint, path): + p = os.path.join(path, \ + "stats_"+ time.strftime("%m%d%y_%H%M%S",time.localtime())) + + if not os.path.exists(p): + os.makedirs(p) + + # init the path for general stats + self.stats_path = os.path.join(p, "stats.csv") + + # init the paths for stats by class + self.class_stats_paths = {} + for pkg in mint.model._packages: + for cls in pkg._classes: + self.class_stats_paths[cls._name] = \ + os.path.join(p, cls._name+".csv") + + def write_values_by_class(self, mint, localtime_): + names = ("Time", "Created", "Updated", "Deleted", "Dropped") + + for pkg in mint.model._packages: + for cls in pkg._classes: + created = self.objects_created_by_class[cls] + updated = self.objects_updated_by_class[cls] + deleted = self.objects_deleted_by_class[cls] + dropped = self.objects_dropped_by_class[cls] + if created or updated or deleted: - args = (cls._name, created, updated, deleted) - print "%-20s %10i %10i %10i" % args - + fname = self.class_stats_paths[cls._name] + write_header = not os.path.isfile(fname) + db = csv.writer(open(fname, "a+")) + if write_header: + db.writerow(names) + db.writerow((time.strftime("%H:%M:%S",localtime_), + created, updated, deleted, dropped)) + + def get_update_durations(self): + total = self.now.record_samples + if total == 0: + create_dur = proc_dur = que_dur = init_proc_time = commit_time = 0 + else: + create_dur = self.now.update_queue_duration / total + proc_dur = self.now.update_process_duration / total + que_dur = self.now.queue_time / total + init_proc_time = self.now.initial_process_time / total + commit_time = self.now.commit_time / total + return (create_dur, proc_dur, que_dur, init_proc_time, commit_time) + + + def write_values(self, localtime_): + stats_headings = \ + ("Time", "Total Msgs", + "Depth", "*Avg Que", "*Avg Create", "*Avg Proc", + "*Init Proc", "*Commit", + "*Enqueued", "*Dequeued", "*Dropped", + "*Create Agent", "*Update Agent", "*Delete Agent", + "*Create", "*Update", "*Delete", + "*Sql Ops", "Errors", "Cpu (%)", "Mem (M)") + + if not self.then: + return + try: + values = self._gather_values() + values.insert(0, time.strftime("%H:%M:%S",localtime_)) + values.insert(1, self.now.enqueued) + + # include object update times + avg_create_dur,avg_process_dur,avg_que_dur, \ + avg_init_proc,avg_commit = self.get_update_durations() + + values.insert(3, avg_que_dur) + values.insert(4, avg_create_dur) + values.insert(5, avg_process_dur) + values.insert(6, avg_init_proc) + values.insert(7, avg_commit) + + write_header = not os.path.isfile(self.stats_path) + db = csv.writer(open(self.stats_path, "a+")) + if write_header: + db.writerow(stats_headings) + db.writerow(values) + + except Exception as inst: + print "exception in write values" + print inst + class UpdateCursor(Cursor): def execute(self, sql, args=None): super(UpdateCursor, self).execute(sql, args) @@ -179,35 +318,50 @@ class Update(object): def __init__(self, model): self.model = model + self.create_time = time.time()
def process(self, thread): + self.start_process = time.time() log.debug("Processing %s", self)
try: self.do_process(thread.cursor, thread.stats) - + self.after_process = time.time() thread.conn.commit() + self.after_commit = time.time() except UpdateDropped: log.debug("Update dropped")
thread.conn.rollback()
thread.stats.dropped += 1 + try: + cls = self.get_class() + except: + cls = None + if cls != None: + thread.stats.objects_dropped_by_class[cls] += 1 + self.after_commit = time.time() + except: log.exception("Update failed")
thread.conn.rollback() - thread.stats.errors += 1
- #print_exc() + print_exc()
if thread.halt_on_error: raise + self.finish_process = time.time() + thread.stats.record_update_times(self)
def do_process(self, cursor, stats): raise Exception("Not implemented")
+ def get_class(self): + return None + def __repr__(self): return self.__class__.__name__
@@ -346,7 +500,7 @@ self.model.print_event(3, "Created %s", obj)
stats.objects_created += 1 - #stats.objects_created_by_class[cls] += 1 + stats.objects_created_by_class[cls] += 1
return obj
@@ -380,7 +534,9 @@
# force a write if it's been too long, even if the values match if object_columns \ - or obj._save_time < obj._qmf_update_time - timedelta(hours=1): + or (obj._save_time != None and \ + obj._qmf_update_time != None and \ + obj._save_time < obj._qmf_update_time - timedelta(hours=1)): object_columns.append(cls.sql_table._qmf_update_time)
sql = cls.sql_update_object.emit(object_columns) @@ -405,7 +561,7 @@ self.model.print_event(4, "Updated %s", obj)
stats.objects_updated += 1 - #stats.objects_updated_by_class[cls] += 1 + stats.objects_updated_by_class[cls] += 1
def delete_object(self, cursor, stats, obj): obj.delete(cursor) @@ -413,7 +569,7 @@ self.model.print_event(3, "Deleted %s", obj)
stats.objects_deleted += 1 - #stats.objects_deleted_by_class[obj._class] += 1 + stats.objects_deleted_by_class[obj._class] += 1
def process_properties(self, obj, columns, cursor): cls = obj._class @@ -605,7 +761,7 @@ count = cls.delete_selection(cursor, _qmf_agent_id=agent.id)
stats.objects_deleted += count - #stats.objects_deleted_by_class[cls] += count + stats.objects_deleted_by_class[cls] += count
cursor.connection.commit()