diff options
author | sienkiew <sienkiew@d34015c8-bcbb-4646-8ac8-8ba5febf221d> | 2011-09-19 12:02:23 -0400 |
---|---|---|
committer | sienkiew <sienkiew@d34015c8-bcbb-4646-8ac8-8ba5febf221d> | 2011-09-19 12:02:23 -0400 |
commit | 245dcb86e2632a315aafa2a936dbc66c3de56be9 (patch) | |
tree | fb52049bda75fd03bb95b65b33612b1691bd58de | |
parent | acedd0fe3b909516fa83ae14b4c4f731641eb4b3 (diff) | |
download | steuermann-245dcb86e2632a315aafa2a936dbc66c3de56be9.tar.gz |
implement maxproc for each host
git-svn-id: https://svn.stsci.edu/svn/ssb/etal/steuermann/trunk@422 d34015c8-bcbb-4646-8ac8-8ba5febf221d
-rw-r--r-- | steuermann/hosts.ini | 19 | ||||
-rw-r--r-- | steuermann/run.py | 60 | ||||
-rw-r--r-- | steuermann/run_all.py | 26 |
3 files changed, 91 insertions, 14 deletions
diff --git a/steuermann/hosts.ini b/steuermann/hosts.ini index d51e9df..d0c5729 100644 --- a/steuermann/hosts.ini +++ b/steuermann/hosts.ini @@ -17,12 +17,16 @@ [all] local=[ 'sh', '-c', '%(script)s' ] +maxproc=2 [linux:csh] hostname=no_such_machine run=[ 'ssh', '-q', '%(hostname)s', 'source .steuermann.%(hostname)s; cd %(workdir)s; hostname; %(script)s ' ] like=all +[mac:csh] +like=linux:csh + [windows:cmd] hostname=no_such_machine run=[ 'python', '-m', 'steuermann.windows_comm', '%(hostname)s', '%(script)s' ] @@ -53,26 +57,37 @@ like=cadeau hostname=herbert like=linux:csh workdir=/herbert/data1/sienkiew/steuermann +maxproc=4 [thor] hostname=thor like=linux:csh workdir=/thor/data2/sienkiew/steuermann +maxproc=4 [arzach] hostname=arzach like=linux:csh workdir=/arzach/data1/sienkiew/steuermann +maxproc=4 [bond] hostname=bond -like=linux:csh +like=mac:csh workdir=/Users/sienkiew/work/steuermann +maxproc=8 [cadeau] hostname=cadeau -like=linux:csh +like=mac:csh +workdir=/Users/sienkiew/work/steuermann +maxproc=8 + +[banana] +hostname=banana +like=mac:csh workdir=/Users/sienkiew/work/steuermann +maxproc=4 ; There is a section [ALL] that is used with every machine name [ALL] diff --git a/steuermann/run.py b/steuermann/run.py index 1379120..cb2af58 100644 --- a/steuermann/run.py +++ b/steuermann/run.py @@ -34,6 +34,9 @@ class runner(object): # host_info_cache = None + # dict of how many commands we have running for that machine + howmany = None + ##### # @@ -43,16 +46,12 @@ class runner(object): self.load_host_info() self.logdir = logdir self.host_info_cache = { } + self.howmany = { } ##### # start a process def run( self, node, run_name ): - if debug : - print "run",node.name - node.running = 1 - if debug : - print "....%s:%s/%s\n"%(node.host, node.table, node.cmd) try : args = self.get_host_info(node.host) @@ -60,6 +59,26 @@ class runner(object): print "ERROR: do not know how to run on %s"%node.host raise + if 'maxproc' in args : + hostname = args['hostname'] + + n = int(self.howmany.get(hostname,0)) + if n >= int(args['maxproc']) : + print "decline to run %s - %d other already running"%(node.name,n) + return False + + n = n + 1 + self.howmany[hostname] = n + print "running %s %s %d"%(hostname,node.name, n) + else : + print "running %s %s no maxproc"%(hostname, node.name) + + if debug : + print "run",node.name + if debug : + print "....%s:%s/%s\n"%(node.host, node.table, node.cmd) + + node.running = 1 args = args.copy() args.update( @@ -124,10 +143,28 @@ class runner(object): # remember the process is running self.all_procs[node.name] = n + return True + ##### # callback when a node finishes def finish( self, node_name, status): + + node = self.node_index[node_name] + + args = self.get_host_info(node.host) + + if 'maxproc' in args : + hostname = args['hostname'] + + n = int(self.howmany.get(hostname,0)) + n = n - 1 + + self.howmany[hostname] = n + print "finish %s %s %d"%(hostname,node_name,n) + else : + print "finish %s %s no maxproc"%(hostname,node_name) + # note the termination of the process at the end of the log file logfile = self.all_procs[node_name].logfile logfile.seek(0,2) # end of file @@ -135,7 +172,6 @@ class runner(object): logfile.close() # note the completion of the command - node = self.node_index[node_name] if debug : print "finish",node.name node.running = 0 @@ -206,16 +242,26 @@ class runner(object): self.cfg.read(filename) def get_host_info(self, host) : + if debug: + print "enter get_host_info",host if not host in self.host_info_cache : d = self._host_get_names(self.cfg, host) + if debug: + print "in get_host_info, got names for ",host, d if 'like' in d : - d1 = self.get_host_info(d['like']) + if debug: + print "has like", d['like'] + d1 = self.get_host_info(d['like']).copy() del d['like'] d1.update(d) self.host_info_cache[host] = d1 else : + print "end of chain",host,d self.host_info_cache[host] = d + if debug: + print "leave get_host_info",host, self.host_info_cache[host] + return self.host_info_cache[host] ##### diff --git a/steuermann/run_all.py b/steuermann/run_all.py index 38f8241..1c6fe28 100644 --- a/steuermann/run_all.py +++ b/steuermann/run_all.py @@ -145,6 +145,22 @@ def run_interactive( xnodes, run_name, db) : if n == '?' : print helpstr + elif n == 'd' : + run.debug=0 + if len(l) > 1 : + for x in l[1:] : + print "XXXXXXXXXX" + print "SECTION",x + print runner.get_host_info(x) + print "" + else : + for x in runner.cfg.sections() : + print "XXXXXXXXXX" + print "SECTION",x + print runner.get_host_info(x) + print "" + run.debug=0 + elif n == 'report' : print report.report_text( db, run_name ) @@ -320,12 +336,12 @@ def run_step( runner, xnodes, run_name, db ) : # of predecessors, we can run this one if released == len(x.released) : host, table, cmd = nodes.crack_name(x_name) - # print "RUN NODE", x_name - db.execute("UPDATE status SET start_time = ?, status = 'S' WHERE ( run = ? AND host = ? AND tablename = ? AND cmd = ? )", - ( str(datetime.datetime.now()), run_name, host, table, cmd ) ) - db.commit() - runner.run(x, run_name) + if runner.run(x, run_name) : + # returns true/false whether it actually ran it - it may not because of resource limits + db.execute("UPDATE status SET start_time = ?, status = 'S' WHERE ( run = ? AND host = ? AND tablename = ? AND cmd = ? )", + ( str(datetime.datetime.now()), run_name, host, table, cmd ) ) + db.commit() # if anything has exited, we process it and update the status in the database while 1 : |