aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsienkiew <sienkiew@d34015c8-bcbb-4646-8ac8-8ba5febf221d>2011-09-19 12:02:23 -0400
committersienkiew <sienkiew@d34015c8-bcbb-4646-8ac8-8ba5febf221d>2011-09-19 12:02:23 -0400
commit245dcb86e2632a315aafa2a936dbc66c3de56be9 (patch)
treefb52049bda75fd03bb95b65b33612b1691bd58de
parentacedd0fe3b909516fa83ae14b4c4f731641eb4b3 (diff)
downloadsteuermann-245dcb86e2632a315aafa2a936dbc66c3de56be9.tar.gz
implement maxproc for each host
git-svn-id: https://svn.stsci.edu/svn/ssb/etal/steuermann/trunk@422 d34015c8-bcbb-4646-8ac8-8ba5febf221d
-rw-r--r--steuermann/hosts.ini19
-rw-r--r--steuermann/run.py60
-rw-r--r--steuermann/run_all.py26
3 files changed, 91 insertions, 14 deletions
diff --git a/steuermann/hosts.ini b/steuermann/hosts.ini
index d51e9df..d0c5729 100644
--- a/steuermann/hosts.ini
+++ b/steuermann/hosts.ini
@@ -17,12 +17,16 @@
[all]
local=[ 'sh', '-c', '%(script)s' ]
+maxproc=2
[linux:csh]
hostname=no_such_machine
run=[ 'ssh', '-q', '%(hostname)s', 'source .steuermann.%(hostname)s; cd %(workdir)s; hostname; %(script)s ' ]
like=all
+[mac:csh]
+like=linux:csh
+
[windows:cmd]
hostname=no_such_machine
run=[ 'python', '-m', 'steuermann.windows_comm', '%(hostname)s', '%(script)s' ]
@@ -53,26 +57,37 @@ like=cadeau
hostname=herbert
like=linux:csh
workdir=/herbert/data1/sienkiew/steuermann
+maxproc=4
[thor]
hostname=thor
like=linux:csh
workdir=/thor/data2/sienkiew/steuermann
+maxproc=4
[arzach]
hostname=arzach
like=linux:csh
workdir=/arzach/data1/sienkiew/steuermann
+maxproc=4
[bond]
hostname=bond
-like=linux:csh
+like=mac:csh
workdir=/Users/sienkiew/work/steuermann
+maxproc=8
[cadeau]
hostname=cadeau
-like=linux:csh
+like=mac:csh
+workdir=/Users/sienkiew/work/steuermann
+maxproc=8
+
+[banana]
+hostname=banana
+like=mac:csh
workdir=/Users/sienkiew/work/steuermann
+maxproc=4
; There is a section [ALL] that is used with every machine name
[ALL]
diff --git a/steuermann/run.py b/steuermann/run.py
index 1379120..cb2af58 100644
--- a/steuermann/run.py
+++ b/steuermann/run.py
@@ -34,6 +34,9 @@ class runner(object):
#
host_info_cache = None
+ # dict of how many commands we have running for that machine
+ howmany = None
+
#####
#
@@ -43,16 +46,12 @@ class runner(object):
self.load_host_info()
self.logdir = logdir
self.host_info_cache = { }
+ self.howmany = { }
#####
# start a process
def run( self, node, run_name ):
- if debug :
- print "run",node.name
- node.running = 1
- if debug :
- print "....%s:%s/%s\n"%(node.host, node.table, node.cmd)
try :
args = self.get_host_info(node.host)
@@ -60,6 +59,26 @@ class runner(object):
print "ERROR: do not know how to run on %s"%node.host
raise
+ if 'maxproc' in args :
+ hostname = args['hostname']
+
+ n = int(self.howmany.get(hostname,0))
+ if n >= int(args['maxproc']) :
+ print "decline to run %s - %d other already running"%(node.name,n)
+ return False
+
+ n = n + 1
+ self.howmany[hostname] = n
+ print "running %s %s %d"%(hostname,node.name, n)
+ else :
+ print "running %s %s no maxproc"%(hostname, node.name)
+
+ if debug :
+ print "run",node.name
+ if debug :
+ print "....%s:%s/%s\n"%(node.host, node.table, node.cmd)
+
+ node.running = 1
args = args.copy()
args.update(
@@ -124,10 +143,28 @@ class runner(object):
# remember the process is running
self.all_procs[node.name] = n
+ return True
+
#####
# callback when a node finishes
def finish( self, node_name, status):
+
+ node = self.node_index[node_name]
+
+ args = self.get_host_info(node.host)
+
+ if 'maxproc' in args :
+ hostname = args['hostname']
+
+ n = int(self.howmany.get(hostname,0))
+ n = n - 1
+
+ self.howmany[hostname] = n
+ print "finish %s %s %d"%(hostname,node_name,n)
+ else :
+ print "finish %s %s no maxproc"%(hostname,node_name)
+
# note the termination of the process at the end of the log file
logfile = self.all_procs[node_name].logfile
logfile.seek(0,2) # end of file
@@ -135,7 +172,6 @@ class runner(object):
logfile.close()
# note the completion of the command
- node = self.node_index[node_name]
if debug :
print "finish",node.name
node.running = 0
@@ -206,16 +242,26 @@ class runner(object):
self.cfg.read(filename)
def get_host_info(self, host) :
+ if debug:
+ print "enter get_host_info",host
if not host in self.host_info_cache :
d = self._host_get_names(self.cfg, host)
+ if debug:
+ print "in get_host_info, got names for ",host, d
if 'like' in d :
- d1 = self.get_host_info(d['like'])
+ if debug:
+ print "has like", d['like']
+ d1 = self.get_host_info(d['like']).copy()
del d['like']
d1.update(d)
self.host_info_cache[host] = d1
else :
+ print "end of chain",host,d
self.host_info_cache[host] = d
+ if debug:
+ print "leave get_host_info",host, self.host_info_cache[host]
+
return self.host_info_cache[host]
#####
diff --git a/steuermann/run_all.py b/steuermann/run_all.py
index 38f8241..1c6fe28 100644
--- a/steuermann/run_all.py
+++ b/steuermann/run_all.py
@@ -145,6 +145,22 @@ def run_interactive( xnodes, run_name, db) :
if n == '?' :
print helpstr
+ elif n == 'd' :
+ run.debug=0
+ if len(l) > 1 :
+ for x in l[1:] :
+ print "XXXXXXXXXX"
+ print "SECTION",x
+ print runner.get_host_info(x)
+ print ""
+ else :
+ for x in runner.cfg.sections() :
+ print "XXXXXXXXXX"
+ print "SECTION",x
+ print runner.get_host_info(x)
+ print ""
+ run.debug=0
+
elif n == 'report' :
print report.report_text( db, run_name )
@@ -320,12 +336,12 @@ def run_step( runner, xnodes, run_name, db ) :
# of predecessors, we can run this one
if released == len(x.released) :
host, table, cmd = nodes.crack_name(x_name)
- # print "RUN NODE", x_name
- db.execute("UPDATE status SET start_time = ?, status = 'S' WHERE ( run = ? AND host = ? AND tablename = ? AND cmd = ? )",
- ( str(datetime.datetime.now()), run_name, host, table, cmd ) )
- db.commit()
- runner.run(x, run_name)
+ if runner.run(x, run_name) :
+ # returns true/false whether it actually ran it - it may not because of resource limits
+ db.execute("UPDATE status SET start_time = ?, status = 'S' WHERE ( run = ? AND host = ? AND tablename = ? AND cmd = ? )",
+ ( str(datetime.datetime.now()), run_name, host, table, cmd ) )
+ db.commit()
# if anything has exited, we process it and update the status in the database
while 1 :