diff options
author | cslocum <cslocum@d34015c8-bcbb-4646-8ac8-8ba5febf221d> | 2012-12-20 09:58:07 -0500 |
---|---|---|
committer | cslocum <cslocum@d34015c8-bcbb-4646-8ac8-8ba5febf221d> | 2012-12-20 09:58:07 -0500 |
commit | 5745d68aa9323cbd2cb90f02a62911692bc63dc2 (patch) | |
tree | b594a43d5cb75890364a73823659b4a789c0ab38 /steuermann/run_all.py | |
parent | 52373b421d420fbe066794235ebaf3ea4fca0046 (diff) | |
download | steuermann-5745d68aa9323cbd2cb90f02a62911692bc63dc2.tar.gz |
implemented common and host-specific resource limits, tracking
git-svn-id: https://svn.stsci.edu/svn/ssb/etal/steuermann/trunk@907 d34015c8-bcbb-4646-8ac8-8ba5febf221d
Diffstat (limited to 'steuermann/run_all.py')
-rw-r--r-- | steuermann/run_all.py | 106 |
1 files changed, 80 insertions, 26 deletions
diff --git a/steuermann/run_all.py b/steuermann/run_all.py index 1dcade4..832c884 100644 --- a/steuermann/run_all.py +++ b/steuermann/run_all.py @@ -25,6 +25,12 @@ except ImportError : readline = None +# global dicts for storing common resource info +# populated in main() from hosts INI file +common_resources = {} +common_resources_avail = {} + + username=getpass.getuser() ##### @@ -83,6 +89,9 @@ def main() : else : hosts_ini = os.path.join(os.path.dirname(__file__), 'hosts.ini') + # parse common resources from hosts INI file + get_common_resources(hosts_ini) + db = steuermann.config.open_db() if all : @@ -92,6 +101,38 @@ def main() : # +def get_common_resources(hosts_ini): + + if not os.path.exists(hosts_ini): + print 'ERROR - %s does not exist' + sys.exit(1) + + file = open(hosts_ini) + lines = [ln.strip() for ln in file.readlines()] + file.close() + + resource_lines = [] + in_resources = False + for ln in lines: + if '[common_resources]' in ln: + in_resources = True + continue + if in_resources: + if ln.startswith('['): + in_resources = False + continue + + if not ln.startswith(';') and len(ln) > 0: + resource_lines.append(ln) + + for ln in resource_lines: + if '=' in ln: + key, val = ln.split('=') + common_resources[key] = int(val) + common_resources_avail[key] = int(val) + +# + def find_wild_names( xnodes, name ) : print "find_wild",name l = [ ] @@ -293,7 +334,7 @@ def run_interactive( xnodes, run_name, hosts_ini, db) : db.commit() while 1 : - ( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db ) + ( runner, keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db ) if not keep_running : break if not no_sleep : @@ -307,11 +348,11 @@ def run_interactive( xnodes, run_name, hosts_ini, db) : if keep_running : print "run step" - ( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db ) + ( runner, keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db ) if len(runner.all_procs) == 0 : # give it a chance to start another - ( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db ) + ( runner, keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db ) if not keep_running : print 'all done' @@ -392,7 +433,7 @@ def run_all(xnodes, run_name, hosts_ini, db) : # will count how many times through there was nothing running while 1 : - ( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db ) + ( runner, keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db ) if not keep_running : break if not no_sleep : @@ -408,7 +449,7 @@ def run_all(xnodes, run_name, hosts_ini, db) : # def run_step( runner, xnodes, run_name, db ) : - + # flag to keep running keep_running = 0 @@ -421,6 +462,17 @@ def run_step( runner, xnodes, run_name, db ) : # skip nodes that we do not need to consider running because + # - not enough resources available + enough = True + for res, amount in x.resources.items(): + if res in common_resources.keys(): + avail = common_resources_avail[res] + if amount > avail: + enough = False + break + if not enough: + continue + # - it is not wanted if not x.wanted : continue @@ -459,6 +511,11 @@ def run_step( runner, xnodes, run_name, db ) : else : try : + # allocate common resources + for res, amount in x.resources.items(): + if res in common_resources_avail.keys(): + common_resources_avail[res] -= amount + tmp = runner.run(x, run_name, no_run=no_run, logfile_name = make_log_file_name(run_name, host, table, cmd) ) # print "STARTED",x_name except run.run_exception, e : @@ -480,7 +537,7 @@ def run_step( runner, xnodes, run_name, db ) : db.execute("UPDATE sm_status SET start_time = ?, status = 'S' WHERE ( run = ? AND host = ? AND tablename = ? AND cmd = ? )", ( str(datetime.datetime.now()), run_name, host, table, cmd ) ) elif tmp == 'M' : - # hit max proc - not run, but try again later + # hit resource cap - not run, but try again later pass else : print "WARNING: runner.run() returned unknown code %s"%str(tmp) @@ -494,32 +551,30 @@ def run_step( runner, xnodes, run_name, db ) : if not who_exited : break - print "SOMETHING EXITED",who_exited - # yes, something exited - no sleep, and keep running + # something exited; no sleep, keep running + print "SOMETHING EXITED", who_exited no_sleep = 1 keep_running = 1 - # note who and log it + # figure out which node exited x_host, x_table, x_cmd = nodes.crack_name(who_exited[0]) + x_name = '%s:%s/%s' %(x_host, x_table, x_cmd) + x = xnodes[x_name] - logs_exist = 0 - + + # de-allocate common resources + for res, amount in x.resources.items(): + if res in common_resources_avail.keys(): + common_resources_avail[res] += amount + + + # get auxiliary logs args = runner.get_host_info(x_host) workdir = args['workdir'] hostname = args['hostname'] - src = os.path.join(workdir, run_name, who_exited[0]) dst = os.path.join(steuermann.config.host_logs, run_name, who_exited[0]) - ''' - print - print "host " + hostname - print "workdir " + workdir - print src - print dst - print - ''' - try: os.system('mkdir -p %s' %os.path.dirname(dst)) except: @@ -529,22 +584,21 @@ def run_step( runner, xnodes, run_name, db ) : except: print 'scp failed' - + logs_exist = 0 if not os.path.exists(dst): print 'WARNING - %s does not exist' %dst else: - print os.listdir(dst) if len(os.listdir(dst)) > 0: logs_exist = 1 print 'FOUND SOME LOGS' + + # update node record in database db.execute("UPDATE sm_status SET end_time = ?, status = ?, logs = ? WHERE ( run = ? AND host = ? AND tablename = ? AND cmd = ? )", ( str(datetime.datetime.now()), who_exited[1], logs_exist, run_name, x_host, x_table, x_cmd ) ) db.commit() - # runner.display_procs() - - return ( keep_running, no_sleep ) + return ( runner, keep_running, no_sleep ) ##### |