diff options
Diffstat (limited to 'steuermann/run_all.py')
-rw-r--r-- | steuermann/run_all.py | 75 |
1 files changed, 53 insertions, 22 deletions
diff --git a/steuermann/run_all.py b/steuermann/run_all.py index 1c6fe28..e7bff61 100644 --- a/steuermann/run_all.py +++ b/steuermann/run_all.py @@ -14,6 +14,8 @@ import nodes import steuermann.config +import pandokia.helpers.easyargs as easyargs + try : import readline @@ -35,18 +37,20 @@ def main() : import atexit atexit.register(readline.write_history_file, history) + opt, args = easyargs.get( { '-a' : '--all', + '--all' : '-a', + } ) + + # # - all = sys.argv[1] == '-a' - if all : - di_nodes = nodes.read_file_list( sys.argv[2:] ) - else : - di_nodes = nodes.read_file_list( sys.argv[1:] ) + all = '--all' in opt + + di_nodes = nodes.read_file_list( args ) xnodes = di_nodes.node_index run_name = str(datetime.datetime.now()).replace(' ','_') db = steuermann.config.open_db() - register_database(db, run_name, xnodes) if all : run_all(xnodes, run_name, db) @@ -83,11 +87,16 @@ def do_flag( xnodes, name, recursive, fn, verbose ) : raise Exception() def set_want( node ) : + # if we said we want it, mark it as wanted and don't skip node.wanted = 1 node.skip = 0 def set_skip( node ) : - node.wanted = 0 + # If we want to skip it, mark it as IS wanted and skip. + # Wanted makes us try to run it, then skip makes the run a nop. + # This means that stuff that comes after it can run, but only + # at the right point in the sequence. + node.wanted = 1 node.skip = 1 @@ -119,6 +128,8 @@ pre node show what must come before a node def run_interactive( xnodes, run_name, db) : + register_database(db, run_name, xnodes) + runner = run.runner( xnodes, steuermann.config.logdir ) for x in xnodes : @@ -179,6 +190,15 @@ def run_interactive( xnodes, run_name, db) : elif n == 'skip' : cmd_flagging( l, xnodes, set_skip ) + elif n == 'reset' : + print "marking all as not finished" + for x in xnodes : + xnodes[x].finished = 0 + + run_name = str(datetime.datetime.now()).replace(' ','_') + print "new run name",run_name + register_database(db, run_name, xnodes) + elif n == 'list' : print_all = '-a' in l l = sorted ( [ x for x in xnodes ] ) @@ -188,10 +208,17 @@ def run_interactive( xnodes, run_name, db) : if print_all : print " AFTER", ' '.join([ a.name for a in xnodes[x].predecessors ]) - elif n == 'start' : - keep_running = 1 - elif n == 'wait' : + c = db.cursor() + for x in xnodes : + host, tablename, cmd = nodes.crack_name(x) + if xnodes[x].wanted : + status = 'W' + c.execute("UPDATE status SET status = 'W' WHERE run = ? AND host = ? AND tablename = ? AND cmd = ? AND status = 'N'", + (run_name, host, tablename, cmd) ) + + db.commit() + while 1 : ( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db ) if not keep_running : @@ -274,6 +301,8 @@ def register_database(db, run, xnodes ) : def run_all(xnodes, run_name, db) : + register_database(db, run_name, xnodes) + runner = run.runner( xnodes ) for x in xnodes : @@ -304,12 +333,6 @@ def run_step( runner, xnodes, run_name, db ) : # skip nodes that we do not need to consider running because - # - we explicitly ask to skip it; also mark it finished - # so that things that come after can run - if x.skip : - x.finished = 1 - continue - # - it is not wanted if not x.wanted : continue @@ -337,19 +360,29 @@ def run_step( runner, xnodes, run_name, db ) : if released == len(x.released) : host, table, cmd = nodes.crack_name(x_name) - if runner.run(x, run_name) : - # returns true/false whether it actually ran it - it may not because of resource limits + # we are now ready to let it run. If it is marked skipped, just say it ran really fast. + if x.skip : + x.finished = 1 + no_sleep = 1 + keep_running = 1 db.execute("UPDATE status SET start_time = ?, status = 'S' WHERE ( run = ? AND host = ? AND tablename = ? AND cmd = ? )", - ( str(datetime.datetime.now()), run_name, host, table, cmd ) ) + ( str(datetime.datetime.now()), run_name, host, table, cmd ) ) db.commit() + else : + if runner.run(x, run_name) : + # returns true/false whether it actually ran it - it may not because of resource limits + db.execute("UPDATE status SET start_time = ?, status = 'R' WHERE ( run = ? AND host = ? AND tablename = ? AND cmd = ? )", + ( str(datetime.datetime.now()), run_name, host, table, cmd ) ) + db.commit() + # if anything has exited, we process it and update the status in the database while 1 : who_exited = runner.poll() if not who_exited : break - # print "SOMETHING EXITED",who_exited + print "SOMETHING EXITED",who_exited # yes, something exited - no sleep, and keep running no_sleep = 1 keep_running = 1 @@ -357,8 +390,6 @@ def run_step( runner, xnodes, run_name, db ) : # note who and log it x_host, x_table, x_cmd = nodes.crack_name(who_exited[0]) - xnodes[who_exited[0]].wanted = 0 - db.execute("UPDATE status SET end_time = ?, status = ? WHERE ( run = ? AND host = ? AND tablename = ? AND cmd = ? )", ( str(datetime.datetime.now()), who_exited[1], run_name, x_host, x_table, x_cmd ) ) db.commit() |