aboutsummaryrefslogtreecommitdiff
path: root/steuermann/run.py
diff options
context:
space:
mode:
Diffstat (limited to 'steuermann/run.py')
-rw-r--r--steuermann/run.py57
1 files changed, 43 insertions, 14 deletions
diff --git a/steuermann/run.py b/steuermann/run.py
index af52a02..e83fca2 100644
--- a/steuermann/run.py
+++ b/steuermann/run.py
@@ -3,12 +3,32 @@ run processes asynchronously on various machines, with a callback
on process exit.
'''
+# to do someday:
+#
+# This feature should really be broken into 3 parts:
+# - remotely execute on another machine
+# - track concurrent execution
+# - reserve resource usage
+#
+# To start a process, ask for a resource reservation. (Currently, the
+# only resource we track is CPUs.) If we don't get a reservation, we
+# don't run right away.
+#
+# If we do, use the remote exec to run the process on the target machine.
+# This is the part that knows hosts.ini. (We also use hosts.ini to declare
+# resource availability.)
+#
+# When the process finishes, release the resource reservation.
+#
+
import subprocess
import time
import datetime
import os
+import os.path
import traceback
import sys
+import errno
import ConfigParser
@@ -56,7 +76,7 @@ class runner(object):
#####
# start a process
- def run( self, node, run_name, no_run = False ):
+ def run( self, node, run_name, no_run = False, logfile_name = None ):
try :
try :
@@ -72,14 +92,15 @@ class runner(object):
n = int(self.howmany.get(hostname,0))
if n >= int(args['maxproc']) :
- print "decline to run %s - %d other already running"%(node.name,n)
+ # print "decline to run %s - %d other already running"%(node.name,n)
return False
n = n + 1
self.howmany[hostname] = n
- print "running %s %s %d"%(hostname,node.name, n)
+ # print "running %s %s %d"%(hostname,node.name, n)
else :
- print "running %s %s no maxproc"%(hostname, node.name)
+ # print "running %s %s no maxproc"%(hostname, node.name)
+ pass
if debug :
print "run",node.name
@@ -96,6 +117,7 @@ class runner(object):
table=node.table,
cmd=node.cmd,
node=node.name,
+ runname=run_name,
)
if debug :
@@ -122,15 +144,20 @@ class runner(object):
if debug :
print "RUN",run
- # make sure the log directory is there
- logdir= self.logdir + "/%s"%run_name
- try :
- os.makedirs(logdir)
- except OSError:
- pass
+ if logfile_name is None :
+ # make sure the log directory is there
+ logdir= self.logdir + "/%s"%(run_name)
- # create a name for the log file, but do not use / in the name
- logfile_name = "%s/%s.log"%( logdir, node.name.replace('/','.') )
+ # create a name for the log file, but do not use / in the name
+ logfile_name = "%s/%s.log"%( logdir, node.name.replace('/','.') )
+
+ try :
+ os.makedirs( os.path.dirname(logfile_name) )
+ except OSError, e :
+ if e.errno == errno.EEXIST :
+ pass
+ else :
+ raise
# open the log file, write initial notes
logfile=open(logfile_name,"w")
@@ -142,6 +169,8 @@ class runner(object):
run = [ 'echo', 'no_run - node=', node.name ]
# start running the process
+ if debug :
+ print "RUN",run
p = subprocess.Popen(args=run,
stdout=logfile,
stderr=subprocess.STDOUT,
@@ -177,7 +206,8 @@ class runner(object):
n = self.howmany[hostname] - 1
self.howmany[hostname] = n
- print "finish %s %s %d"%(hostname,node_name,n)
+ if debug :
+ print "finish %s %s %d"%(hostname,node_name,n)
# note the termination of the process at the end of the log file
logfile = self.all_procs[node_name].logfile
@@ -270,7 +300,6 @@ class runner(object):
d1 = d1.copy()
d1.update(d)
d = d1
- print d
del d['like']
# default hostname is the name from the section header