aboutsummaryrefslogtreecommitdiff
path: root/steuermann/run.py
diff options
context:
space:
mode:
Diffstat (limited to 'steuermann/run.py')
-rw-r--r--steuermann/run.py284
1 files changed, 165 insertions, 119 deletions
diff --git a/steuermann/run.py b/steuermann/run.py
index 36cda80..af52a02 100644
--- a/steuermann/run.py
+++ b/steuermann/run.py
@@ -7,6 +7,8 @@ import subprocess
import time
import datetime
import os
+import traceback
+import sys
import ConfigParser
@@ -20,6 +22,9 @@ class struct :
#####
+class run_exception(Exception) :
+ pass
+
class runner(object):
# dict of all current running processes, indexed by node name
@@ -51,100 +56,112 @@ class runner(object):
#####
# start a process
- def run( self, node, run_name ):
+ def run( self, node, run_name, no_run = False ):
try :
- args = self.get_host_info(node.host)
- except :
- print "ERROR: do not know how to run on %s"%node.host
- raise
+ try :
+ args = self.get_host_info(node.host)
+ except Exception, e :
+ log_traceback()
+ print "ERROR: do not know how to run on %s"%node.host
+ print e
+ raise
- if 'maxproc' in args :
hostname = args['hostname']
+ if 'maxproc' in args :
- n = int(self.howmany.get(hostname,0))
- if n >= int(args['maxproc']) :
- print "decline to run %s - %d other already running"%(node.name,n)
- return False
-
- n = n + 1
- self.howmany[hostname] = n
- print "running %s %s %d"%(hostname,node.name, n)
- else :
- print "running %s %s no maxproc"%(hostname, node.name)
-
- if debug :
- print "run",node.name
- if debug :
- print "....%s:%s/%s\n"%(node.host, node.table, node.cmd)
-
- node.running = 1
-
- args = args.copy()
- args.update(
- script=node.script,
- script_type=node.script_type,
- host=node.host,
- table=node.table,
- cmd=node.cmd,
- node=node.name,
- )
-
- if debug :
- print "ARGS"
- for x in sorted([x for x in args]) :
- print '%s=%s'%(x,args[x])
-
- args['script'] = args['script'] % args
-
- if args['script_type'] == 'r' :
- run = args['run']
- elif args['script_type'] == 'l' :
- run = args['local']
- else :
- raise Exception()
-
- t = [ ]
- for x in run :
- # bug: what to do in case of keyerror
- t.append( x % args )
-
- run = t
+ n = int(self.howmany.get(hostname,0))
+ if n >= int(args['maxproc']) :
+ print "decline to run %s - %d other already running"%(node.name,n)
+ return False
- if debug :
- print "RUN",run
-
- # make sure the log directory is there
- logdir= self.logdir + "/%s"%run_name
- try :
- os.makedirs(logdir)
- except OSError:
- pass
-
- # create a name for the log file, but do not use / in the name
- logfile_name = "%s/%s.log"%( logdir, node.name.replace('/','.') )
-
- # open the log file, write initial notes
- logfile=open(logfile_name,"w")
- logfile.write('%s %s\n'%(datetime.datetime.now(),run))
- logfile.flush()
-
- # start running the process
- p = subprocess.Popen(args=run,
- stdout=logfile,
- stderr=subprocess.STDOUT,
- shell=False, close_fds=True)
-
- # remember the popen object for the process; remember the open log file
- n = struct()
- n.proc = p
- n.logfile = logfile
- n.logfile_name = logfile_name
-
- # remember the process is running
- self.all_procs[node.name] = n
-
- return True
+ n = n + 1
+ self.howmany[hostname] = n
+ print "running %s %s %d"%(hostname,node.name, n)
+ else :
+ print "running %s %s no maxproc"%(hostname, node.name)
+
+ if debug :
+ print "run",node.name
+ if debug :
+ print "....%s:%s/%s\n"%(node.host, node.table, node.cmd)
+
+ node.running = 1
+
+ args = args.copy()
+ args.update(
+ script=node.script,
+ script_type=node.script_type,
+ host=node.host,
+ table=node.table,
+ cmd=node.cmd,
+ node=node.name,
+ )
+
+ if debug :
+ print "ARGS"
+ for x in sorted([x for x in args]) :
+ print '%s=%s'%(x,args[x])
+
+ args['script'] = args['script'] % args
+
+ if args['script_type'] == 'r' :
+ run = args['run']
+ elif args['script_type'] == 'l' :
+ run = args['local']
+ else :
+ raise Exception()
+
+ t = [ ]
+ for x in run :
+ # bug: what to do in case of keyerror
+ t.append( x % args )
+
+ run = t
+
+ if debug :
+ print "RUN",run
+
+ # make sure the log directory is there
+ logdir= self.logdir + "/%s"%run_name
+ try :
+ os.makedirs(logdir)
+ except OSError:
+ pass
+
+ # create a name for the log file, but do not use / in the name
+ logfile_name = "%s/%s.log"%( logdir, node.name.replace('/','.') )
+
+ # open the log file, write initial notes
+ logfile=open(logfile_name,"w")
+ logfile.write('%s %s\n'%(datetime.datetime.now(),run))
+ logfile.flush()
+
+ # debug - just say the name of the node we would run
+ if no_run :
+ run = [ 'echo', 'no_run - node=', node.name ]
+
+ # start running the process
+ p = subprocess.Popen(args=run,
+ stdout=logfile,
+ stderr=subprocess.STDOUT,
+ shell=False, close_fds=True)
+
+ # remember the popen object for the process; remember the open log file
+ n = struct()
+ n.proc = p
+ n.logfile = logfile
+ n.logfile_name = logfile_name
+
+ # remember the process is running
+ self.all_procs[node.name] = n
+
+ return True
+
+ except Exception, e :
+ log_traceback()
+ txt= "ERROR RUNNING %s"%node.name
+ raise run_exception(txt)
#####
# callback when a node finishes
@@ -155,16 +172,12 @@ class runner(object):
args = self.get_host_info(node.host)
- if 'maxproc' in args :
- hostname = args['hostname']
+ hostname = args['hostname']
- n = int(self.howmany.get(hostname,0))
- n = n - 1
+ n = self.howmany[hostname] - 1
+ self.howmany[hostname] = n
- self.howmany[hostname] = n
- print "finish %s %s %d"%(hostname,node_name,n)
- else :
- print "finish %s %s no maxproc"%(hostname,node_name)
+ print "finish %s %s %d"%(hostname,node_name,n)
# note the termination of the process at the end of the log file
logfile = self.all_procs[node_name].logfile
@@ -225,14 +238,18 @@ class runner(object):
def _host_get_names( self, cfg, section ) :
d = { }
# pick all the variables out of this section
- for name, value in cfg.items(section) :
- if value.startswith('[') :
- # it is a list
- d[name] = eval(value)
- else :
- # everything else is plain text
- d[name] = value
- return d
+ try :
+ for name, value in cfg.items(section) :
+ if value.startswith('[') :
+ # it is a list
+ d[name] = eval(value)
+ else :
+ # everything else is plain text
+ d[name] = value
+ return d
+ except ConfigParser.NoSectionError :
+ print "No config section in hosts.ini: %s"%section
+ return { }
def load_host_info( self, filename=None ) :
@@ -243,26 +260,55 @@ class runner(object):
self.cfg.read(filename)
def get_host_info(self, host) :
- if debug:
- print "enter get_host_info",host
if not host in self.host_info_cache :
-
d = self._host_get_names(self.cfg, host)
- if debug:
- print "in get_host_info, got names for ",host, d
+
if 'like' in d :
- if debug:
- print "has like", d['like']
- d1 = self.get_host_info(d['like']).copy()
- del d['like']
+ # get the dict of what this entry is like, copy it,
+ # and update it with the values for this entry
+ d1 = self.get_host_info(d['like'])
+ d1 = d1.copy()
d1.update(d)
- self.host_info_cache[host] = d1
- else :
- print "end of chain",host,d
- self.host_info_cache[host] = d
+ d = d1
+ print d
+ del d['like']
- if debug:
- print "leave get_host_info",host, self.host_info_cache[host]
+ # default hostname is the name from the section header
+ if not 'hostname' in d :
+ d['hostname'] = host
+
+ # default maximum processes is 1
+ if not 'maxproc' in d :
+ d['maxproc'] = 1
+
+ self.host_info_cache[host] = d
return self.host_info_cache[host]
#####
+
+# The traceback interface is awkward in python; here is something I copied from pyetc:
+
+def log_traceback() :
+ # You would think that the python traceback module contains
+ # something useful to do this, but it always returns multi-line
+ # strings. I want each line of output logged separately so the log
+ # file remains easy to process, so I reverse engineered this out of
+ # the logging module.
+ try:
+ etype, value, tb = sys.exc_info()
+ tbex = traceback.extract_tb( tb )
+ for filename, lineno, name, line in tbex :
+ print '%s:%d, in %s'%(filename,lineno,name)
+ if line:
+ print ' %s'%line.strip()
+
+ for x in traceback.format_exception_only( etype, value ) :
+ print ": %s",x
+
+ print "---"
+
+ finally:
+ # If you don't clear these guys, you can make loops that
+ # the garbage collector has to work hard to eliminate.
+ etype = value = tb = None
+