aboutsummaryrefslogtreecommitdiff
path: root/steuermann/run.py
diff options
context:
space:
mode:
authorsienkiew <sienkiew@d34015c8-bcbb-4646-8ac8-8ba5febf221d>2011-08-30 16:09:41 -0400
committersienkiew <sienkiew@d34015c8-bcbb-4646-8ac8-8ba5febf221d>2011-08-30 16:09:41 -0400
commitea0bbd187c539b30c6b70a7a220ca1249f3cca41 (patch)
treebfc64ff8328e952e4348440fbe77a85aee88f48e /steuermann/run.py
parent0206b3bdab5ca17b8e22806c34330dd58d55e429 (diff)
downloadsteuermann-ea0bbd187c539b30c6b70a7a220ca1249f3cca41.tar.gz
hacked together prototype of steuermann 21.9
git-svn-id: https://svn.stsci.edu/svn/ssb/etal/steuermann/trunk@381 d34015c8-bcbb-4646-8ac8-8ba5febf221d
Diffstat (limited to 'steuermann/run.py')
-rw-r--r--steuermann/run.py206
1 files changed, 206 insertions, 0 deletions
diff --git a/steuermann/run.py b/steuermann/run.py
new file mode 100644
index 0000000..7657248
--- /dev/null
+++ b/steuermann/run.py
@@ -0,0 +1,206 @@
+'''
+run processes asynchronously on various machines, with a callback
+on process exit.
+'''
+
+import subprocess
+import time
+import datetime
+import os
+
+import ConfigParser
+
+debug=0
+
+#####
+
+class struct :
+ pass
+
+
+#####
+
+class runner(object):
+
+ # dict of all current running processes, indexed by node name
+ all_procs = { }
+
+ # index of nodes
+ node_index = { }
+
+ # info about hosts we can run on
+ host_info = { }
+
+ # dir where we write our logs
+ logdir = 'logs'
+
+ #####
+ #
+
+ def __init__( self, nodes ) :
+ self.node_index = nodes
+ self.load_host_info()
+
+ #####
+ # start a process
+
+ def run( self, node, run_name ):
+ if debug :
+ print "run",node.name
+ node.running = 1
+ if debug :
+ print "....%s:%s/%s\n"%(node.host, node.table, node.cmd)
+
+ try :
+ args = self.host_info[node.host]
+ except :
+ print "ERROR: do not know how to run on %s"%node.host
+ raise
+
+ run = args['run']
+
+ args = args.copy()
+ args.update(
+ script=node.script,
+ host=node.host,
+ table=node.table,
+ cmd=node.cmd
+ )
+
+ if debug :
+ print "ARGS"
+ for x in sorted([x for x in args]) :
+ print '%s=%s'%(x,args[x])
+
+ run = [ x % args for x in run ]
+
+ if debug :
+ print "RUN",run
+
+ # make sure the log directory is there
+ logdir= self.logdir + "/%s"%run_name
+ try :
+ os.makedirs(logdir)
+ except OSError:
+ pass
+
+ # create a name for the log file, but do not use / in the name
+ logfile_name = "%s/%s.log"%( logdir, node.name.replace('/','+') )
+
+ # open the log file, write initial notes
+ logfile=open(logfile_name,"w")
+ logfile.write('%s %s\n'%(datetime.datetime.now(),run))
+ logfile.flush()
+
+ # start running the process
+ p = subprocess.Popen(args=run,
+ stdout=logfile,
+ stderr=subprocess.STDOUT,
+ shell=False, close_fds=True)
+
+ # remember the popen object for the process; remember the open log file
+ n = struct()
+ n.proc = p
+ n.logfile = logfile
+ n.logfile_name = logfile_name
+
+ # remember the process is running
+ self.all_procs[node.name] = n
+
+ #####
+ # callback when a node finishes
+
+ def finish( self, node_name, status):
+ # note the termination of the process at the end of the log file
+ logfile = self.all_procs[node_name].logfile
+ logfile.seek(0,2) # end of file
+ logfile.write('\n%s exit=%s\n'%(datetime.datetime.now(),status))
+ logfile.close()
+
+ # note the completion of the command
+ node = self.node_index[node_name]
+ if debug :
+ print "finish",node.name
+ node.running = 0
+ node.finished = 1
+ node.exit_status = status
+
+ #####
+
+ # poll for exited child processes - this whole thing could could
+ # be event driven, but I don't care to work out the details right
+ # now.
+
+ def poll( self ) :
+
+ # look at all active processes
+ for name in self.all_procs :
+
+ # see if name has finished
+ p = self.all_procs[name].proc
+ n = p.poll()
+ if n is not None :
+
+ # marke the node finished
+ self.finish(name,n)
+
+ #
+ status = p.returncode
+
+ # remove it from the list of pending processes
+ del self.all_procs[name]
+
+ # Return the identity of the exited process.
+ # There may be more, but we will come back and poll again.
+ return ( name, status )
+
+ return None
+
+ #####
+
+ def display_procs( self ) :
+ # display currently active child processes
+ print "procs:",
+ for x in sorted(self.all_procs) :
+ print " ",x
+ print ""
+
+ #####
+
+
+ def _host_get_names( self, cfg, section, d ) :
+ # pick all the variables out of this section
+ for name, value in cfg.items(section) :
+ if name == 'run' :
+ # run is a list
+ d[name] = eval(value)
+ else :
+ # everything else is plain text
+ d[name] = value
+
+ def load_host_info( self, filename=None ) :
+ self.host_info = { }
+
+ # read the config file
+ if filename is None :
+ filename = os.path.dirname(__file__) + '/hosts.ini'
+ cfg = ConfigParser.RawConfigParser()
+ cfg.read(filename)
+
+ # this dict holds the set of values that are defined as
+ # applying to all hosts
+ all_dict = { }
+ self._host_get_names(cfg, 'ALL', all_dict)
+
+ # for all the sections (except ALL) get the names from that section
+ for x in cfg.sections() :
+ if x == 'ALL' :
+ continue
+
+ d = all_dict.copy()
+ self._host_get_names(cfg, x, d)
+ self.host_info[x] = d
+
+ del cfg
+
+ #####