aboutsummaryrefslogtreecommitdiff
path: root/steuermann/run_all.py
diff options
context:
space:
mode:
authorsienkiew <sienkiew@d34015c8-bcbb-4646-8ac8-8ba5febf221d>2011-08-30 16:09:41 -0400
committersienkiew <sienkiew@d34015c8-bcbb-4646-8ac8-8ba5febf221d>2011-08-30 16:09:41 -0400
commitea0bbd187c539b30c6b70a7a220ca1249f3cca41 (patch)
treebfc64ff8328e952e4348440fbe77a85aee88f48e /steuermann/run_all.py
parent0206b3bdab5ca17b8e22806c34330dd58d55e429 (diff)
downloadsteuermann-f95cfa5c88a0ba2ce91e6892523b3107d3fd94f1.tar.gz
hacked together prototype of steuermann 21.9
git-svn-id: https://svn.stsci.edu/svn/ssb/etal/steuermann/trunk@381 d34015c8-bcbb-4646-8ac8-8ba5febf221d
Diffstat (limited to 'steuermann/run_all.py')
-rw-r--r--steuermann/run_all.py368
1 files changed, 368 insertions, 0 deletions
diff --git a/steuermann/run_all.py b/steuermann/run_all.py
new file mode 100644
index 0000000..1fe2aa2
--- /dev/null
+++ b/steuermann/run_all.py
@@ -0,0 +1,368 @@
+'''
+run everything in a set of command files
+
+'''
+
+import time
+import sys
+import sqlite3
+
+import run
+
+import datetime
+
+import nodes
+
+
+#####
+
+def main() :
+ global xnodes
+ # read all the input files
+
+ di_nodes = nodes.read_file_list( sys.argv[2:] )
+
+ xnodes = di_nodes.node_index
+ run_name = 'arf' + str(datetime.datetime.now())
+ db = sqlite3.connect('sr.db')
+ register_database(db, run_name, xnodes)
+
+ n = sys.argv[1]
+ if n == '-a' :
+ run_all(xnodes, run_name, db)
+ elif n == '-i' :
+ run_interactive( xnodes, run_name, db )
+ else :
+ print "%s ?"%n
+
+
+def do_flag( xnodes, name, recursive, fn, verbose ) :
+ if verbose :
+ verbose = verbose + 1
+ if not ':' in name :
+ name = '*:' + name
+ if ( '*' in name ) or ( '?' in name ) or ( '[' in name ) :
+ if verbose :
+ print ' '*verbose, "wild",name
+ for x in xnodes :
+ if nodes.wildcard_name( name, x ) :
+ if verbose :
+ print ' '*verbose, "match",x
+ do_flag( xnodes, x, recursive, fn, verbose )
+ elif name in xnodes :
+ if verbose :
+ print ' '*verbose, "found",name
+ fn(xnodes[name])
+ if recursive :
+ for y in xnodes[name].predecessors :
+ do_flag( xnodes, y.name, recursive, fn, verbose )
+ else :
+ if verbose :
+ print ' '*verbose, "not in list", name
+ raise Exception()
+
+def set_want( node ) :
+ node.wanted = 1
+ node.skip = 0
+
+def set_skip( node ) :
+ node.wanted = 0
+ node.skip = 1
+
+
+def cmd_flagging( l, xnodes, func ) :
+ if l[1] == '-r' :
+ recursive = 1
+ l = l[2:]
+ else :
+ recursive = 0
+ l = l[1:]
+
+ for x in l :
+ do_flag( xnodes, x, recursive, func, 1 )
+
+helpstr = """
+report show report
+want [-r] node declare that we want that node
+skip [-r] node skip this node
+list -a
+list node
+start
+wait
+wr report want/skip values
+wd report depth
+"""
+
+def run_interactive( xnodes, run_name, db) :
+
+ runner = run.runner( xnodes )
+
+ for x in xnodes :
+ xnodes[x].finished = 0
+ xnodes[x].running = 0
+ xnodes[x].wanted = 0
+ xnodes[x].skip = 0
+
+ keep_running = 0
+
+ while 1 :
+ print "action?"
+ l = sys.stdin.readline()
+ if l == '' :
+ break
+ l = l.strip()
+ l = l.split()
+ if len(l) > 0 :
+ n = l[0]
+ else :
+ n = ''
+
+ if n == '?' :
+ print helpstr
+
+ elif n == 'report' :
+ report( db, run_name )
+
+ elif n == 'wr' :
+ report( db, run_name, info_callback_want )
+
+ elif n == 'wd' :
+ report( db, run_name, info_callback_depth )
+
+ elif n == 'want' :
+ cmd_flagging( l, xnodes, set_want )
+
+ elif n == 'skip' :
+ cmd_flagging( l, xnodes, set_skip )
+
+ elif n == 'list' :
+ print_all = '-a' in l
+ l = sorted ( [ x for x in xnodes ] )
+ print "w f s name"
+ for x in l :
+ print xnodes[x].wanted, xnodes[x].finished, xnodes[x].skip, x
+ if print_all :
+ print " AFTER", ' '.join([ a.name for a in xnodes[x].predecessors ])
+
+ elif n == 'start' :
+ keep_running = 1
+
+ elif n == 'wait' :
+ while 1 :
+ ( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db )
+ if not keep_running :
+ break
+ if not no_sleep :
+ time.sleep(1)
+ if keypress() :
+ print "wait interrupted (processes continue)"
+ break
+ print "wait finished"
+
+ if keep_running :
+ print "run step"
+ ( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db )
+
+ if len(runner.all_procs) == 0 :
+ # give it a chance to start another
+ ( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db )
+
+ if not keep_running :
+ print 'all done'
+
+ else :
+ if len(runner.all_procs) == 0 :
+ print "no processes running - some prereq not satisfiable"
+
+
+
+def register_database(db, run, xnodes ) :
+ c = db.cursor()
+ c.execute('INSERT INTO runs ( run ) VALUES ( ? )', ( run, ) )
+
+ c = db.cursor()
+ for x in xnodes :
+ host, tablename, cmd = nodes.crack_name(x)
+ depth = xnodes[x].depth
+ c.execute("INSERT INTO status ( run, host, tablename, cmd, depth, status ) VALUES "
+ "( ?, ?, ?, ?, ?, 'N' )", ( run, host, tablename, cmd, depth ) )
+
+ db.commit()
+
+def run_all(xnodes, run_name, db) :
+
+ runner = run.runner( xnodes )
+
+ for x in xnodes :
+ xnodes[x].finished = 0
+ xnodes[x].running = 0
+ xnodes[x].wanted = 1
+
+ while 1 :
+ ( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db )
+ if not keep_running :
+ break
+ if not no_sleep :
+ time.sleep(1)
+
+def run_step( runner, xnodes, run_name, db ) :
+
+ # flag to keep running
+ keep_running = 0
+
+ # flag to suppress brief sleep at end of loop
+ no_sleep = 0
+
+ # Loop, polling for work to do, or for finishing processes
+ print "loop"
+ for x_name in xnodes :
+ x=xnodes[x_name]
+
+ # skip nodes that we do not need to consider running because
+
+ # - we explicitly ask to skip it; also mark it finished
+ # so that things that come after can run
+ if x.skip :
+ x.finished = 1
+ continue
+
+ # - it is not wanted
+ if not x.wanted :
+ continue
+
+ # - it is already finished
+ if x.finished :
+ continue
+
+ # - we are already running it
+ if x.running :
+ keep_running=1
+ continue
+
+ # ok, if we are here, we found a node that we want to run
+
+ # if there is a node we need to run, we need to come back through the loop
+ # (bug: are we sure there is not a deadlock caused by mutual dependencies? if that happens, it can never run.)
+ keep_running = 1
+
+ # count how many of the predecessors are finished
+ released = sum( [ xnodes[r].finished for r in x.released ])
+
+ # if the number of predecessors finished is the number
+ # of predecessors, we can run this one
+ if released == len(x.released) :
+ host, table, cmd = nodes.crack_name(x_name)
+ print "RUN", x_name
+
+ db.execute("UPDATE status SET start_time = ?, status = 'S' WHERE ( run = ? AND host = ? AND tablename = ? AND cmd = ? )",
+ ( str(datetime.datetime.now()), run_name, host, table, cmd ) )
+ db.commit()
+ runner.run(x, run_name)
+
+ # if anything has exited, we process it and update the status in the database
+ while 1 :
+ who_exited = runner.poll()
+ if not who_exited :
+ break
+
+ # yes, something exited - no sleep, and keep running
+ no_sleep = 1
+ keep_running = 1
+
+ # note who and log it
+ x_host, x_table, x_cmd = nodes.crack_name(who_exited[0])
+
+ xnodes[who_exited[0]].wanted = 0
+
+ db.execute("UPDATE status SET end_time = ?, status = ? WHERE ( run = ? AND host = ? AND tablename = ? AND cmd = ? )",
+ ( str(datetime.datetime.now()), who_exited[1], run_name, x_host, x_table, x_cmd ) )
+
+
+ runner.display_procs()
+
+ return ( keep_running, no_sleep )
+
+#####
+
+ms_windows = 0
+
+if ms_windows :
+ import msvcrt
+else :
+ import select
+
+def keypress() :
+ if ms_windows :
+ return msvcrt.kbhit()
+ else :
+ return select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], [])
+
+#####
+
+def info_callback_status( tablename, cmd, host, status ) :
+ return status
+
+def info_callback_want( tablename, cmd, host, status ) :
+ n = xnodes['%s:%s/%s'%(host,tablename,cmd)]
+ s = ''
+ if n.skip :
+ s = s + 'S'
+ if n.wanted :
+ s = s + 'W'
+ if s == '' :
+ s = '-'
+ return s
+
+def info_callback_depth( tablename, cmd, host, status ) :
+ n = xnodes['%s:%s/%s'%(host,tablename,cmd)]
+ return n.depth
+
+def report( db, run_name, info_callback = info_callback_status ) :
+ import pandokia.text_table as tt
+
+ c = db.cursor()
+ c.execute("select max(depth) as d, tablename from status where run = ? group by tablename order by d asc",(run_name,))
+ table_list = [ x for x in c ]
+ # table_list contains ( depth, tablename )
+
+ print """
+ -- N = not started
+ -- S = started, not finished
+ -- P = prereq not satisfied, so not attempted
+ -- 0-255 = exit code
+"""
+
+ for depth, tablename in table_list :
+ print "------"
+ print tablename
+ t = tt.text_table()
+
+ row = 0
+ t.define_column('-') # the command name in column 0
+
+ c.execute("select distinct host from status where tablename = ? and run = ? order by host asc",(tablename, run_name))
+ for host, in c :
+ t.define_column(host)
+ t.set_value(row, host, host)
+
+ c.execute("""select cmd, host, depth, status, start_time, end_time, notes from status
+ where tablename = ? and run = ? order by depth, cmd asc
+ """, ( tablename, run_name ) )
+
+ prev_cmd = None
+ for x in c :
+ cmd, host, depth, status, start_time, end_time, notes = x
+ if cmd != prev_cmd :
+ row = row + 1
+ t.set_value(row, 0, cmd)
+ prev_cmd = cmd
+ t.set_value( row, host, info_callback( tablename, cmd, host, status ) )
+
+ t.pad()
+
+ print t.get_trac_wiki()
+
+#####
+
+if __name__ == '__main__' :
+ main()