aboutsummaryrefslogtreecommitdiff
path: root/steuermann/run_all.py
diff options
context:
space:
mode:
authorcslocum <cslocum@d34015c8-bcbb-4646-8ac8-8ba5febf221d>2012-12-20 09:58:07 -0500
committercslocum <cslocum@d34015c8-bcbb-4646-8ac8-8ba5febf221d>2012-12-20 09:58:07 -0500
commit5745d68aa9323cbd2cb90f02a62911692bc63dc2 (patch)
treeb594a43d5cb75890364a73823659b4a789c0ab38 /steuermann/run_all.py
parent52373b421d420fbe066794235ebaf3ea4fca0046 (diff)
downloadsteuermann-5745d68aa9323cbd2cb90f02a62911692bc63dc2.tar.gz
implemented common and host-specific resource limits, tracking
git-svn-id: https://svn.stsci.edu/svn/ssb/etal/steuermann/trunk@907 d34015c8-bcbb-4646-8ac8-8ba5febf221d
Diffstat (limited to 'steuermann/run_all.py')
-rw-r--r--steuermann/run_all.py106
1 files changed, 80 insertions, 26 deletions
diff --git a/steuermann/run_all.py b/steuermann/run_all.py
index 1dcade4..832c884 100644
--- a/steuermann/run_all.py
+++ b/steuermann/run_all.py
@@ -25,6 +25,12 @@ except ImportError :
readline = None
+# global dicts for storing common resource info
+# populated in main() from hosts INI file
+common_resources = {}
+common_resources_avail = {}
+
+
username=getpass.getuser()
#####
@@ -83,6 +89,9 @@ def main() :
else :
hosts_ini = os.path.join(os.path.dirname(__file__), 'hosts.ini')
+ # parse common resources from hosts INI file
+ get_common_resources(hosts_ini)
+
db = steuermann.config.open_db()
if all :
@@ -92,6 +101,38 @@ def main() :
#
+def get_common_resources(hosts_ini):
+
+ if not os.path.exists(hosts_ini):
+ print 'ERROR - %s does not exist'
+ sys.exit(1)
+
+ file = open(hosts_ini)
+ lines = [ln.strip() for ln in file.readlines()]
+ file.close()
+
+ resource_lines = []
+ in_resources = False
+ for ln in lines:
+ if '[common_resources]' in ln:
+ in_resources = True
+ continue
+ if in_resources:
+ if ln.startswith('['):
+ in_resources = False
+ continue
+
+ if not ln.startswith(';') and len(ln) > 0:
+ resource_lines.append(ln)
+
+ for ln in resource_lines:
+ if '=' in ln:
+ key, val = ln.split('=')
+ common_resources[key] = int(val)
+ common_resources_avail[key] = int(val)
+
+#
+
def find_wild_names( xnodes, name ) :
print "find_wild",name
l = [ ]
@@ -293,7 +334,7 @@ def run_interactive( xnodes, run_name, hosts_ini, db) :
db.commit()
while 1 :
- ( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db )
+ ( runner, keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db )
if not keep_running :
break
if not no_sleep :
@@ -307,11 +348,11 @@ def run_interactive( xnodes, run_name, hosts_ini, db) :
if keep_running :
print "run step"
- ( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db )
+ ( runner, keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db )
if len(runner.all_procs) == 0 :
# give it a chance to start another
- ( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db )
+ ( runner, keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db )
if not keep_running :
print 'all done'
@@ -392,7 +433,7 @@ def run_all(xnodes, run_name, hosts_ini, db) :
# will count how many times through there was nothing running
while 1 :
- ( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db )
+ ( runner, keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db )
if not keep_running :
break
if not no_sleep :
@@ -408,7 +449,7 @@ def run_all(xnodes, run_name, hosts_ini, db) :
#
def run_step( runner, xnodes, run_name, db ) :
-
+
# flag to keep running
keep_running = 0
@@ -421,6 +462,17 @@ def run_step( runner, xnodes, run_name, db ) :
# skip nodes that we do not need to consider running because
+ # - not enough resources available
+ enough = True
+ for res, amount in x.resources.items():
+ if res in common_resources.keys():
+ avail = common_resources_avail[res]
+ if amount > avail:
+ enough = False
+ break
+ if not enough:
+ continue
+
# - it is not wanted
if not x.wanted :
continue
@@ -459,6 +511,11 @@ def run_step( runner, xnodes, run_name, db ) :
else :
try :
+ # allocate common resources
+ for res, amount in x.resources.items():
+ if res in common_resources_avail.keys():
+ common_resources_avail[res] -= amount
+
tmp = runner.run(x, run_name, no_run=no_run, logfile_name = make_log_file_name(run_name, host, table, cmd) )
# print "STARTED",x_name
except run.run_exception, e :
@@ -480,7 +537,7 @@ def run_step( runner, xnodes, run_name, db ) :
db.execute("UPDATE sm_status SET start_time = ?, status = 'S' WHERE ( run = ? AND host = ? AND tablename = ? AND cmd = ? )",
( str(datetime.datetime.now()), run_name, host, table, cmd ) )
elif tmp == 'M' :
- # hit max proc - not run, but try again later
+ # hit resource cap - not run, but try again later
pass
else :
print "WARNING: runner.run() returned unknown code %s"%str(tmp)
@@ -494,32 +551,30 @@ def run_step( runner, xnodes, run_name, db ) :
if not who_exited :
break
- print "SOMETHING EXITED",who_exited
- # yes, something exited - no sleep, and keep running
+ # something exited; no sleep, keep running
+ print "SOMETHING EXITED", who_exited
no_sleep = 1
keep_running = 1
- # note who and log it
+ # figure out which node exited
x_host, x_table, x_cmd = nodes.crack_name(who_exited[0])
+ x_name = '%s:%s/%s' %(x_host, x_table, x_cmd)
+ x = xnodes[x_name]
- logs_exist = 0
-
+
+ # de-allocate common resources
+ for res, amount in x.resources.items():
+ if res in common_resources_avail.keys():
+ common_resources_avail[res] += amount
+
+
+ # get auxiliary logs
args = runner.get_host_info(x_host)
workdir = args['workdir']
hostname = args['hostname']
-
src = os.path.join(workdir, run_name, who_exited[0])
dst = os.path.join(steuermann.config.host_logs, run_name, who_exited[0])
- '''
- print
- print "host " + hostname
- print "workdir " + workdir
- print src
- print dst
- print
- '''
-
try:
os.system('mkdir -p %s' %os.path.dirname(dst))
except:
@@ -529,22 +584,21 @@ def run_step( runner, xnodes, run_name, db ) :
except:
print 'scp failed'
-
+ logs_exist = 0
if not os.path.exists(dst):
print 'WARNING - %s does not exist' %dst
else:
- print os.listdir(dst)
if len(os.listdir(dst)) > 0:
logs_exist = 1
print 'FOUND SOME LOGS'
+
+ # update node record in database
db.execute("UPDATE sm_status SET end_time = ?, status = ?, logs = ? WHERE ( run = ? AND host = ? AND tablename = ? AND cmd = ? )",
( str(datetime.datetime.now()), who_exited[1], logs_exist, run_name, x_host, x_table, x_cmd ) )
db.commit()
- # runner.display_procs()
-
- return ( keep_running, no_sleep )
+ return ( runner, keep_running, no_sleep )
#####