aboutsummaryrefslogtreecommitdiff
path: root/steuermann/run_all.py
diff options
context:
space:
mode:
Diffstat (limited to 'steuermann/run_all.py')
-rw-r--r--steuermann/run_all.py75
1 files changed, 53 insertions, 22 deletions
diff --git a/steuermann/run_all.py b/steuermann/run_all.py
index 1c6fe28..e7bff61 100644
--- a/steuermann/run_all.py
+++ b/steuermann/run_all.py
@@ -14,6 +14,8 @@ import nodes
import steuermann.config
+import pandokia.helpers.easyargs as easyargs
+
try :
import readline
@@ -35,18 +37,20 @@ def main() :
import atexit
atexit.register(readline.write_history_file, history)
+ opt, args = easyargs.get( { '-a' : '--all',
+ '--all' : '-a',
+ } )
+
+ #
#
- all = sys.argv[1] == '-a'
- if all :
- di_nodes = nodes.read_file_list( sys.argv[2:] )
- else :
- di_nodes = nodes.read_file_list( sys.argv[1:] )
+ all = '--all' in opt
+
+ di_nodes = nodes.read_file_list( args )
xnodes = di_nodes.node_index
run_name = str(datetime.datetime.now()).replace(' ','_')
db = steuermann.config.open_db()
- register_database(db, run_name, xnodes)
if all :
run_all(xnodes, run_name, db)
@@ -83,11 +87,16 @@ def do_flag( xnodes, name, recursive, fn, verbose ) :
raise Exception()
def set_want( node ) :
+ # if we said we want it, mark it as wanted and don't skip
node.wanted = 1
node.skip = 0
def set_skip( node ) :
- node.wanted = 0
+ # If we want to skip it, mark it as IS wanted and skip.
+ # Wanted makes us try to run it, then skip makes the run a nop.
+ # This means that stuff that comes after it can run, but only
+ # at the right point in the sequence.
+ node.wanted = 1
node.skip = 1
@@ -119,6 +128,8 @@ pre node show what must come before a node
def run_interactive( xnodes, run_name, db) :
+ register_database(db, run_name, xnodes)
+
runner = run.runner( xnodes, steuermann.config.logdir )
for x in xnodes :
@@ -179,6 +190,15 @@ def run_interactive( xnodes, run_name, db) :
elif n == 'skip' :
cmd_flagging( l, xnodes, set_skip )
+ elif n == 'reset' :
+ print "marking all as not finished"
+ for x in xnodes :
+ xnodes[x].finished = 0
+
+ run_name = str(datetime.datetime.now()).replace(' ','_')
+ print "new run name",run_name
+ register_database(db, run_name, xnodes)
+
elif n == 'list' :
print_all = '-a' in l
l = sorted ( [ x for x in xnodes ] )
@@ -188,10 +208,17 @@ def run_interactive( xnodes, run_name, db) :
if print_all :
print " AFTER", ' '.join([ a.name for a in xnodes[x].predecessors ])
- elif n == 'start' :
- keep_running = 1
-
elif n == 'wait' :
+ c = db.cursor()
+ for x in xnodes :
+ host, tablename, cmd = nodes.crack_name(x)
+ if xnodes[x].wanted :
+ status = 'W'
+ c.execute("UPDATE status SET status = 'W' WHERE run = ? AND host = ? AND tablename = ? AND cmd = ? AND status = 'N'",
+ (run_name, host, tablename, cmd) )
+
+ db.commit()
+
while 1 :
( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db )
if not keep_running :
@@ -274,6 +301,8 @@ def register_database(db, run, xnodes ) :
def run_all(xnodes, run_name, db) :
+ register_database(db, run_name, xnodes)
+
runner = run.runner( xnodes )
for x in xnodes :
@@ -304,12 +333,6 @@ def run_step( runner, xnodes, run_name, db ) :
# skip nodes that we do not need to consider running because
- # - we explicitly ask to skip it; also mark it finished
- # so that things that come after can run
- if x.skip :
- x.finished = 1
- continue
-
# - it is not wanted
if not x.wanted :
continue
@@ -337,19 +360,29 @@ def run_step( runner, xnodes, run_name, db ) :
if released == len(x.released) :
host, table, cmd = nodes.crack_name(x_name)
- if runner.run(x, run_name) :
- # returns true/false whether it actually ran it - it may not because of resource limits
+ # we are now ready to let it run. If it is marked skipped, just say it ran really fast.
+ if x.skip :
+ x.finished = 1
+ no_sleep = 1
+ keep_running = 1
db.execute("UPDATE status SET start_time = ?, status = 'S' WHERE ( run = ? AND host = ? AND tablename = ? AND cmd = ? )",
- ( str(datetime.datetime.now()), run_name, host, table, cmd ) )
+ ( str(datetime.datetime.now()), run_name, host, table, cmd ) )
db.commit()
+ else :
+ if runner.run(x, run_name) :
+ # returns true/false whether it actually ran it - it may not because of resource limits
+ db.execute("UPDATE status SET start_time = ?, status = 'R' WHERE ( run = ? AND host = ? AND tablename = ? AND cmd = ? )",
+ ( str(datetime.datetime.now()), run_name, host, table, cmd ) )
+ db.commit()
+
# if anything has exited, we process it and update the status in the database
while 1 :
who_exited = runner.poll()
if not who_exited :
break
- # print "SOMETHING EXITED",who_exited
+ print "SOMETHING EXITED",who_exited
# yes, something exited - no sleep, and keep running
no_sleep = 1
keep_running = 1
@@ -357,8 +390,6 @@ def run_step( runner, xnodes, run_name, db ) :
# note who and log it
x_host, x_table, x_cmd = nodes.crack_name(who_exited[0])
- xnodes[who_exited[0]].wanted = 0
-
db.execute("UPDATE status SET end_time = ?, status = ? WHERE ( run = ? AND host = ? AND tablename = ? AND cmd = ? )",
( str(datetime.datetime.now()), who_exited[1], run_name, x_host, x_table, x_cmd ) )
db.commit()