aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcslocum <cslocum@d34015c8-bcbb-4646-8ac8-8ba5febf221d>2012-12-20 09:58:07 -0500
committercslocum <cslocum@d34015c8-bcbb-4646-8ac8-8ba5febf221d>2012-12-20 09:58:07 -0500
commit5745d68aa9323cbd2cb90f02a62911692bc63dc2 (patch)
treeb594a43d5cb75890364a73823659b4a789c0ab38
parent52373b421d420fbe066794235ebaf3ea4fca0046 (diff)
downloadsteuermann-5745d68aa9323cbd2cb90f02a62911692bc63dc2.tar.gz
implemented common and host-specific resource limits, tracking
git-svn-id: https://svn.stsci.edu/svn/ssb/etal/steuermann/trunk@907 d34015c8-bcbb-4646-8ac8-8ba5febf221d
-rw-r--r--steuermann/run.py76
-rw-r--r--steuermann/run_all.py106
2 files changed, 127 insertions, 55 deletions
diff --git a/steuermann/run.py b/steuermann/run.py
index 3b098da..72bba6c 100644
--- a/steuermann/run.py
+++ b/steuermann/run.py
@@ -73,18 +73,22 @@ class runner(object):
self.node_index = nodes
self.load_host_info(filename = hosts_ini)
self.host_info_cache = { }
- self.howmany = { }
+ self.resources = {}
+ self.resources_avail = {}
+
#####
# start a process
def run( self, node, run_name, logfile_name, no_run = False ):
- '''run a process
- return is:
- D - host disabled
- M - not run max proc limit
- R - running
-'''
+ '''
+ run a process
+
+ return is:
+ D - host disabled
+ M - not run; resource limit
+ R - running
+ '''
try :
@@ -99,20 +103,34 @@ class runner(object):
if ( config_yes_no(args,'disable') ) :
return 'D'
- hostname = args['hostname']
- if 'maxproc' in args :
+ # track resources
+ if node.host not in self.resources.keys():
+ self.resources[node.host] = {}
+ self.resources_avail[node.host] = {}
+
+ for key, val in args.items():
+ if key.startswith('res_'):
+ key = key.lstrip('res_')
+ self.resources[node.host][key] = int(val)
+ if key not in self.resources_avail[node.host].keys():
+ self.resources_avail[node.host][key] = int(val)
+
+ # check if enough resources on host and return if not
+ enough = True
+ for res, amount in node.resources.items():
+ if res in self.resources[node.host].keys():
+ avail = self.resources_avail[node.host][res]
+ if amount > avail:
+ enough = False
+ break
+ if not enough:
+ return 'M'
+
+ # allocate host resources
+ for res, amount in node.resources.items():
+ if res in self.resources[node.host].keys():
+ self.resources_avail[node.host][res] -= amount
- n = int(self.howmany.get(hostname,0))
- if n >= int(args['maxproc']) :
- # print "decline to run %s - %d other already running"%(node.name,n)
- return 'M'
-
- n = n + 1
- self.howmany[hostname] = n
- # print "running %s %s %d"%(hostname,node.name, n)
- else :
- # print "running %s %s no maxproc"%(hostname, node.name)
- pass
if debug :
print "run",node.name
@@ -178,10 +196,12 @@ class runner(object):
# start running the process
if debug :
print "RUN",run
- p = subprocess.Popen(args=run,
+ p = subprocess.Popen(
+ args=run,
stdout=logfile,
stderr=subprocess.STDOUT,
- shell=False, close_fds=True)
+ shell=False, close_fds=True
+ )
# remember the popen object for the process; remember the open log file
n = struct()
@@ -208,12 +228,14 @@ class runner(object):
args = self.get_host_info(node.host)
- hostname = args['hostname']
- n = self.howmany[hostname] - 1
- self.howmany[hostname] = n
+ # de-allocate host resources
+ for res, amount in node.resources.items():
+ if res in self.resources[node.host].keys():
+ self.resources_avail[node.host][res] += amount
if debug :
+ hostname = args['hostname']
print "finish %s %s %d"%(hostname,node_name,n)
# note the termination of the process at the end of the log file
@@ -324,10 +346,6 @@ class runner(object):
if not 'hostname' in d :
d['hostname'] = host
- # default maximum processes is 1
- if not 'maxproc' in d :
- d['maxproc'] = 1
-
self.host_info_cache[host] = d
return self.host_info_cache[host]
diff --git a/steuermann/run_all.py b/steuermann/run_all.py
index 1dcade4..832c884 100644
--- a/steuermann/run_all.py
+++ b/steuermann/run_all.py
@@ -25,6 +25,12 @@ except ImportError :
readline = None
+# global dicts for storing common resource info
+# populated in main() from hosts INI file
+common_resources = {}
+common_resources_avail = {}
+
+
username=getpass.getuser()
#####
@@ -83,6 +89,9 @@ def main() :
else :
hosts_ini = os.path.join(os.path.dirname(__file__), 'hosts.ini')
+ # parse common resources from hosts INI file
+ get_common_resources(hosts_ini)
+
db = steuermann.config.open_db()
if all :
@@ -92,6 +101,38 @@ def main() :
#
+def get_common_resources(hosts_ini):
+
+ if not os.path.exists(hosts_ini):
+ print 'ERROR - %s does not exist'
+ sys.exit(1)
+
+ file = open(hosts_ini)
+ lines = [ln.strip() for ln in file.readlines()]
+ file.close()
+
+ resource_lines = []
+ in_resources = False
+ for ln in lines:
+ if '[common_resources]' in ln:
+ in_resources = True
+ continue
+ if in_resources:
+ if ln.startswith('['):
+ in_resources = False
+ continue
+
+ if not ln.startswith(';') and len(ln) > 0:
+ resource_lines.append(ln)
+
+ for ln in resource_lines:
+ if '=' in ln:
+ key, val = ln.split('=')
+ common_resources[key] = int(val)
+ common_resources_avail[key] = int(val)
+
+#
+
def find_wild_names( xnodes, name ) :
print "find_wild",name
l = [ ]
@@ -293,7 +334,7 @@ def run_interactive( xnodes, run_name, hosts_ini, db) :
db.commit()
while 1 :
- ( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db )
+ ( runner, keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db )
if not keep_running :
break
if not no_sleep :
@@ -307,11 +348,11 @@ def run_interactive( xnodes, run_name, hosts_ini, db) :
if keep_running :
print "run step"
- ( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db )
+ ( runner, keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db )
if len(runner.all_procs) == 0 :
# give it a chance to start another
- ( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db )
+ ( runner, keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db )
if not keep_running :
print 'all done'
@@ -392,7 +433,7 @@ def run_all(xnodes, run_name, hosts_ini, db) :
# will count how many times through there was nothing running
while 1 :
- ( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db )
+ ( runner, keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db )
if not keep_running :
break
if not no_sleep :
@@ -408,7 +449,7 @@ def run_all(xnodes, run_name, hosts_ini, db) :
#
def run_step( runner, xnodes, run_name, db ) :
-
+
# flag to keep running
keep_running = 0
@@ -421,6 +462,17 @@ def run_step( runner, xnodes, run_name, db ) :
# skip nodes that we do not need to consider running because
+ # - not enough resources available
+ enough = True
+ for res, amount in x.resources.items():
+ if res in common_resources.keys():
+ avail = common_resources_avail[res]
+ if amount > avail:
+ enough = False
+ break
+ if not enough:
+ continue
+
# - it is not wanted
if not x.wanted :
continue
@@ -459,6 +511,11 @@ def run_step( runner, xnodes, run_name, db ) :
else :
try :
+ # allocate common resources
+ for res, amount in x.resources.items():
+ if res in common_resources_avail.keys():
+ common_resources_avail[res] -= amount
+
tmp = runner.run(x, run_name, no_run=no_run, logfile_name = make_log_file_name(run_name, host, table, cmd) )
# print "STARTED",x_name
except run.run_exception, e :
@@ -480,7 +537,7 @@ def run_step( runner, xnodes, run_name, db ) :
db.execute("UPDATE sm_status SET start_time = ?, status = 'S' WHERE ( run = ? AND host = ? AND tablename = ? AND cmd = ? )",
( str(datetime.datetime.now()), run_name, host, table, cmd ) )
elif tmp == 'M' :
- # hit max proc - not run, but try again later
+ # hit resource cap - not run, but try again later
pass
else :
print "WARNING: runner.run() returned unknown code %s"%str(tmp)
@@ -494,32 +551,30 @@ def run_step( runner, xnodes, run_name, db ) :
if not who_exited :
break
- print "SOMETHING EXITED",who_exited
- # yes, something exited - no sleep, and keep running
+ # something exited; no sleep, keep running
+ print "SOMETHING EXITED", who_exited
no_sleep = 1
keep_running = 1
- # note who and log it
+ # figure out which node exited
x_host, x_table, x_cmd = nodes.crack_name(who_exited[0])
+ x_name = '%s:%s/%s' %(x_host, x_table, x_cmd)
+ x = xnodes[x_name]
- logs_exist = 0
-
+
+ # de-allocate common resources
+ for res, amount in x.resources.items():
+ if res in common_resources_avail.keys():
+ common_resources_avail[res] += amount
+
+
+ # get auxiliary logs
args = runner.get_host_info(x_host)
workdir = args['workdir']
hostname = args['hostname']
-
src = os.path.join(workdir, run_name, who_exited[0])
dst = os.path.join(steuermann.config.host_logs, run_name, who_exited[0])
- '''
- print
- print "host " + hostname
- print "workdir " + workdir
- print src
- print dst
- print
- '''
-
try:
os.system('mkdir -p %s' %os.path.dirname(dst))
except:
@@ -529,22 +584,21 @@ def run_step( runner, xnodes, run_name, db ) :
except:
print 'scp failed'
-
+ logs_exist = 0
if not os.path.exists(dst):
print 'WARNING - %s does not exist' %dst
else:
- print os.listdir(dst)
if len(os.listdir(dst)) > 0:
logs_exist = 1
print 'FOUND SOME LOGS'
+
+ # update node record in database
db.execute("UPDATE sm_status SET end_time = ?, status = ?, logs = ? WHERE ( run = ? AND host = ? AND tablename = ? AND cmd = ? )",
( str(datetime.datetime.now()), who_exited[1], logs_exist, run_name, x_host, x_table, x_cmd ) )
db.commit()
- # runner.display_procs()
-
- return ( keep_running, no_sleep )
+ return ( runner, keep_running, no_sleep )
#####