diff options
Diffstat (limited to 'steuermann')
-rw-r--r-- | steuermann/db.sql | 25 | ||||
-rw-r--r-- | steuermann/hosts.ini | 27 | ||||
-rw-r--r-- | steuermann/nodes.py | 4 | ||||
-rw-r--r-- | steuermann/report.py | 15 | ||||
-rw-r--r-- | steuermann/run.py | 57 | ||||
-rw-r--r-- | steuermann/run_all.py | 21 | ||||
-rw-r--r-- | steuermann/run_cron.py | 82 |
7 files changed, 193 insertions, 38 deletions
diff --git a/steuermann/db.sql b/steuermann/db.sql index b6d2ae6..99bc752 100644 --- a/steuermann/db.sql +++ b/steuermann/db.sql @@ -43,8 +43,31 @@ create unique index sm_status_idx1 on sm_status ( run, host, tablename, cmd ); -- table lists all run names in the system CREATE TABLE sm_runs ( - run VARCHAR(100) + run VARCHAR(100), + create_time VARCHAR(26) ); CREATE UNIQUE INDEX sm_runs_idx1 ON sm_runs(run); + +-- table lists scheduled cron events - sort of like the old sr +-- smcron name script +CREATE TABLE sm_crons ( + host VARCHAR, + -- what host we ran this on + name VARCHAR(100), + -- a descriptive name, not necessarily unique + decollision VARCHAR(10), + -- a cookie to make (host,name,decollision) unique + start_time VARCHAR(30), + end_time VARCHAR(30), + duration REAL, + -- + status VARCHAR(5), + -- exit status + logfile VARCHAR(1000) + -- log file name, relative to config logdir+'/cron/' + ); + +CREATE INDEX sm_crons_idx1 ON sm_crons( host, name, decollision ); + diff --git a/steuermann/hosts.ini b/steuermann/hosts.ini index f8c2e3b..abc114c 100644 --- a/steuermann/hosts.ini +++ b/steuermann/hosts.ini @@ -28,12 +28,15 @@ local=[ 'sh', '-c', '%(script)s' ] ; ; -q = quiet ; -x = do not forward X windows; prevents errors locking .Xauthority when lots of ssh come in at the same time -run=[ 'ssh', '-q', '-x', '%(hostname)s', ' cd %(workdir)s; set node=%(node)s; source bin/.steuermann.%(hostname)s; %(script)s; show_status $status ' ] +run=[ 'ssh', '-q', '-x', '%(hostname)s', ' cd %(workdir)s; setenv sm_node %(node)s; setenv sm_run %(runname)s; source bin/.steuermann.%(hostname)s; %(script)s; show_status $status ' ] like=all [mac:csh] like=linux:csh +[solaris:csh] +like=linux:csh + [windows:cmd] run=[ 'python', '-m', 'steuermann.windows_comm', '%(hostname)s', '%(script)s' ] like=all @@ -59,6 +62,12 @@ like=cadeau ; actual machines +[jwcalibdev] +hostname=jwcalibdev +like=linux:csh +workdir=/data1/iraf/steuermann +maxproc=32 + [herbert] hostname=herbert like=linux:csh @@ -77,6 +86,9 @@ like=linux:csh workdir=/arzach/data1/iraf/steuermann maxproc=4 +[localhost] +like=ssb + [ssb] hostname=ssbwebv1 like=linux:csh @@ -101,5 +113,18 @@ like=mac:csh workdir=/Users/iraf/work/steuermann maxproc=4 +[aten] +hostname=aten +like=solaris:csh +maxproc=1 +workdir=/aten/data2/iraf/steuermann + +[grail] +hostname=grail +like=solaris:csh +maxproc=1 +workdir=/data/grail1/iraf/steuermann + + ; There is a section [ALL] that is used with every machine name [ALL] diff --git a/steuermann/nodes.py b/steuermann/nodes.py index 0f24e36..476734a 100644 --- a/steuermann/nodes.py +++ b/steuermann/nodes.py @@ -212,6 +212,10 @@ class node(object) : self.finished = 0 self.running = 0 + # + self.wanted = 1 + self.skip = 1 + ##### # debug - make a string representation of all the nodes diff --git a/steuermann/report.py b/steuermann/report.py index cc90604..8d95d1f 100644 --- a/steuermann/report.py +++ b/steuermann/report.py @@ -11,18 +11,6 @@ import StringIO # maybe the output is html 3.2 - in any case, it is way simpler than # more recent standards. -html_header='''<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN"> -<HTML> -<HEAD> -<TITLE>%(title)s</TITLE> -</HEAD> -<BODY> -''' - -html_trailer=''' -</BODY> -</HTML> -''' # this will be reset by the cgi main program if we are in a real cgi cginame = 'arf.cgi' @@ -167,7 +155,6 @@ def report_text( db, run_name, info_callback = info_callback_status ) : def report_html( db, run_name, info_callback = info_callback_status, hlevel=1 ) : s = StringIO.StringIO() - s.write(html_header % { 'title' : run_name } ) s.write('<h%d>%s</h%d>\n'%(hlevel,run_name,hlevel)) hlevel = hlevel + 1 @@ -179,8 +166,6 @@ def report_html( db, run_name, info_callback = info_callback_status, hlevel=1 ) t = get_table( db, run_name, tablename, info_callback, showdepth=1 ) s.write(t.get_html()) - s.write(html_trailer) - return s.getvalue() # diff --git a/steuermann/run.py b/steuermann/run.py index af52a02..e83fca2 100644 --- a/steuermann/run.py +++ b/steuermann/run.py @@ -3,12 +3,32 @@ run processes asynchronously on various machines, with a callback on process exit. ''' +# to do someday: +# +# This feature should really be broken into 3 parts: +# - remotely execute on another machine +# - track concurrent execution +# - reserve resource usage +# +# To start a process, ask for a resource reservation. (Currently, the +# only resource we track is CPUs.) If we don't get a reservation, we +# don't run right away. +# +# If we do, use the remote exec to run the process on the target machine. +# This is the part that knows hosts.ini. (We also use hosts.ini to declare +# resource availability.) +# +# When the process finishes, release the resource reservation. +# + import subprocess import time import datetime import os +import os.path import traceback import sys +import errno import ConfigParser @@ -56,7 +76,7 @@ class runner(object): ##### # start a process - def run( self, node, run_name, no_run = False ): + def run( self, node, run_name, no_run = False, logfile_name = None ): try : try : @@ -72,14 +92,15 @@ class runner(object): n = int(self.howmany.get(hostname,0)) if n >= int(args['maxproc']) : - print "decline to run %s - %d other already running"%(node.name,n) + # print "decline to run %s - %d other already running"%(node.name,n) return False n = n + 1 self.howmany[hostname] = n - print "running %s %s %d"%(hostname,node.name, n) + # print "running %s %s %d"%(hostname,node.name, n) else : - print "running %s %s no maxproc"%(hostname, node.name) + # print "running %s %s no maxproc"%(hostname, node.name) + pass if debug : print "run",node.name @@ -96,6 +117,7 @@ class runner(object): table=node.table, cmd=node.cmd, node=node.name, + runname=run_name, ) if debug : @@ -122,15 +144,20 @@ class runner(object): if debug : print "RUN",run - # make sure the log directory is there - logdir= self.logdir + "/%s"%run_name - try : - os.makedirs(logdir) - except OSError: - pass + if logfile_name is None : + # make sure the log directory is there + logdir= self.logdir + "/%s"%(run_name) - # create a name for the log file, but do not use / in the name - logfile_name = "%s/%s.log"%( logdir, node.name.replace('/','.') ) + # create a name for the log file, but do not use / in the name + logfile_name = "%s/%s.log"%( logdir, node.name.replace('/','.') ) + + try : + os.makedirs( os.path.dirname(logfile_name) ) + except OSError, e : + if e.errno == errno.EEXIST : + pass + else : + raise # open the log file, write initial notes logfile=open(logfile_name,"w") @@ -142,6 +169,8 @@ class runner(object): run = [ 'echo', 'no_run - node=', node.name ] # start running the process + if debug : + print "RUN",run p = subprocess.Popen(args=run, stdout=logfile, stderr=subprocess.STDOUT, @@ -177,7 +206,8 @@ class runner(object): n = self.howmany[hostname] - 1 self.howmany[hostname] = n - print "finish %s %s %d"%(hostname,node_name,n) + if debug : + print "finish %s %s %d"%(hostname,node_name,n) # note the termination of the process at the end of the log file logfile = self.all_procs[node_name].logfile @@ -270,7 +300,6 @@ class runner(object): d1 = d1.copy() d1.update(d) d = d1 - print d del d['like'] # default hostname is the name from the section header diff --git a/steuermann/run_all.py b/steuermann/run_all.py index f670abe..1e179a2 100644 --- a/steuermann/run_all.py +++ b/steuermann/run_all.py @@ -11,6 +11,7 @@ import datetime import run import report import nodes +import getpass import steuermann.config @@ -22,6 +23,9 @@ try : except ImportError : readline = None + +username=getpass.getuser() + ##### def main() : @@ -68,7 +72,7 @@ def main() : if '-r' in opt : run_name = opt['-r'] else : - run_name = str(datetime.datetime.now()).replace(' ','_') + run_name = "user_%s_%s"%(username,str(datetime.datetime.now()).replace(' ','_')) db = steuermann.config.open_db() @@ -348,7 +352,7 @@ def print_pre(who, xnodes, depth) : def register_database(db, run, xnodes ) : c = db.cursor() - c.execute('INSERT INTO sm_runs ( run ) VALUES ( ? )', ( run, ) ) + c.execute('INSERT INTO sm_runs ( run, create_time ) VALUES ( ?, ? )', ( run, str(datetime.datetime.now()).replace(' ','_')) ) c = db.cursor() for x in xnodes : @@ -363,14 +367,17 @@ def register_database(db, run, xnodes ) : def run_all(xnodes, run_name, db) : + for x in xnodes : + x = xnodes[x] + x.finished = 0 + x.running = 0 + x.wanted = 1 + x.skip = 0 + register_database(db, run_name, xnodes) - runner = run.runner( xnodes ) + runner = run.runner( xnodes, steuermann.config.logdir ) - for x in xnodes : - xnodes[x].finished = 0 - xnodes[x].running = 0 - xnodes[x].wanted = 1 while 1 : ( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db ) diff --git a/steuermann/run_cron.py b/steuermann/run_cron.py new file mode 100644 index 0000000..39d5be7 --- /dev/null +++ b/steuermann/run_cron.py @@ -0,0 +1,82 @@ +import os +import sys +import time +import steuermann.config +import steuermann.run +import datetime + +# bugs: +# - this is a hideous hack that works around run.py not being designed for this +# - the work directory on the target machine is poorly named +# + +class fakenode(object): + pass + +def main() : + + if len(sys.argv) < 4 : + print "smcron host name command" + print "(from ",sys.argv,")" + sys.exit(1) + + host = sys.argv[1] + name = sys.argv[2] + script = ' '.join(sys.argv[3:]) + + db = steuermann.config.open_db() + + c = db.cursor() + + start_time = datetime.datetime.now() + day, tod = str(start_time).split(' ') + + name = name + '--' + tod + + logfile = '%s/%s.%s' % (day, host, name) + + # something to reduce the possibility of a collision to very small - in this case, we assume that + # we can't start multiple processes at the same time from the same process number. i.e. the + # pid can not wrap in less than 1 clock tick + decollision = os.getpid() + + c.execute("INSERT INTO sm_crons ( host, name, start_time, logfile, decollision ) VALUES ( ?, ?, ?, ?, ? )", + ( host, name, str(start_time), logfile, decollision ) ) + db.commit() + + # this is a hideous hack to plug on to an interface that was not designed for this + + node = fakenode() + node.host = host + node.name = 'cron.' + name + node.table = None + node.cmd = None + node.script = script + if host == 'localhost' : + node.script_type = 'l' # local + else : + node.script_type = 'r' # remote + + runner = steuermann.run.runner( nodes = { node.name : node }, logdir=None ) + runner.run( node=node, run_name='', logfile_name = steuermann.config.logdir + '/cron/' + logfile ) + + n = 0.1 + while 1 : + exited = runner.poll() + if exited : + break + if n < 2.0 : + n = n * 2.0 + time.sleep(n) + + status = exited[1] + + end_time = datetime.datetime.now() + td = end_time - start_time + + # only 1 decimal place because we don't poll often enough to make more reasonable. + td = "%.1f" % ( td.microseconds/1e6 + td.seconds + td.days * 24 * 3600 ) + c.execute("UPDATE sm_crons SET end_time = ?, status = ?, duration = ? WHERE host = ? AND name = ? ", + (str(end_time), status, td, host, name ) ) + db.commit() + |