aboutsummaryrefslogtreecommitdiff
path: root/steuermann
diff options
context:
space:
mode:
Diffstat (limited to 'steuermann')
-rw-r--r--steuermann/db.sql25
-rw-r--r--steuermann/hosts.ini27
-rw-r--r--steuermann/nodes.py4
-rw-r--r--steuermann/report.py15
-rw-r--r--steuermann/run.py57
-rw-r--r--steuermann/run_all.py21
-rw-r--r--steuermann/run_cron.py82
7 files changed, 193 insertions, 38 deletions
diff --git a/steuermann/db.sql b/steuermann/db.sql
index b6d2ae6..99bc752 100644
--- a/steuermann/db.sql
+++ b/steuermann/db.sql
@@ -43,8 +43,31 @@ create unique index sm_status_idx1 on sm_status ( run, host, tablename, cmd );
-- table lists all run names in the system
CREATE TABLE sm_runs (
- run VARCHAR(100)
+ run VARCHAR(100),
+ create_time VARCHAR(26)
);
CREATE UNIQUE INDEX sm_runs_idx1 ON sm_runs(run);
+
+-- table lists scheduled cron events - sort of like the old sr
+-- smcron name script
+CREATE TABLE sm_crons (
+ host VARCHAR,
+ -- what host we ran this on
+ name VARCHAR(100),
+ -- a descriptive name, not necessarily unique
+ decollision VARCHAR(10),
+ -- a cookie to make (host,name,decollision) unique
+ start_time VARCHAR(30),
+ end_time VARCHAR(30),
+ duration REAL,
+ --
+ status VARCHAR(5),
+ -- exit status
+ logfile VARCHAR(1000)
+ -- log file name, relative to config logdir+'/cron/'
+ );
+
+CREATE INDEX sm_crons_idx1 ON sm_crons( host, name, decollision );
+
diff --git a/steuermann/hosts.ini b/steuermann/hosts.ini
index f8c2e3b..abc114c 100644
--- a/steuermann/hosts.ini
+++ b/steuermann/hosts.ini
@@ -28,12 +28,15 @@ local=[ 'sh', '-c', '%(script)s' ]
;
; -q = quiet
; -x = do not forward X windows; prevents errors locking .Xauthority when lots of ssh come in at the same time
-run=[ 'ssh', '-q', '-x', '%(hostname)s', ' cd %(workdir)s; set node=%(node)s; source bin/.steuermann.%(hostname)s; %(script)s; show_status $status ' ]
+run=[ 'ssh', '-q', '-x', '%(hostname)s', ' cd %(workdir)s; setenv sm_node %(node)s; setenv sm_run %(runname)s; source bin/.steuermann.%(hostname)s; %(script)s; show_status $status ' ]
like=all
[mac:csh]
like=linux:csh
+[solaris:csh]
+like=linux:csh
+
[windows:cmd]
run=[ 'python', '-m', 'steuermann.windows_comm', '%(hostname)s', '%(script)s' ]
like=all
@@ -59,6 +62,12 @@ like=cadeau
; actual machines
+[jwcalibdev]
+hostname=jwcalibdev
+like=linux:csh
+workdir=/data1/iraf/steuermann
+maxproc=32
+
[herbert]
hostname=herbert
like=linux:csh
@@ -77,6 +86,9 @@ like=linux:csh
workdir=/arzach/data1/iraf/steuermann
maxproc=4
+[localhost]
+like=ssb
+
[ssb]
hostname=ssbwebv1
like=linux:csh
@@ -101,5 +113,18 @@ like=mac:csh
workdir=/Users/iraf/work/steuermann
maxproc=4
+[aten]
+hostname=aten
+like=solaris:csh
+maxproc=1
+workdir=/aten/data2/iraf/steuermann
+
+[grail]
+hostname=grail
+like=solaris:csh
+maxproc=1
+workdir=/data/grail1/iraf/steuermann
+
+
; There is a section [ALL] that is used with every machine name
[ALL]
diff --git a/steuermann/nodes.py b/steuermann/nodes.py
index 0f24e36..476734a 100644
--- a/steuermann/nodes.py
+++ b/steuermann/nodes.py
@@ -212,6 +212,10 @@ class node(object) :
self.finished = 0
self.running = 0
+ #
+ self.wanted = 1
+ self.skip = 1
+
#####
# debug - make a string representation of all the nodes
diff --git a/steuermann/report.py b/steuermann/report.py
index cc90604..8d95d1f 100644
--- a/steuermann/report.py
+++ b/steuermann/report.py
@@ -11,18 +11,6 @@ import StringIO
# maybe the output is html 3.2 - in any case, it is way simpler than
# more recent standards.
-html_header='''<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
-<HTML>
-<HEAD>
-<TITLE>%(title)s</TITLE>
-</HEAD>
-<BODY>
-'''
-
-html_trailer='''
-</BODY>
-</HTML>
-'''
# this will be reset by the cgi main program if we are in a real cgi
cginame = 'arf.cgi'
@@ -167,7 +155,6 @@ def report_text( db, run_name, info_callback = info_callback_status ) :
def report_html( db, run_name, info_callback = info_callback_status, hlevel=1 ) :
s = StringIO.StringIO()
- s.write(html_header % { 'title' : run_name } )
s.write('<h%d>%s</h%d>\n'%(hlevel,run_name,hlevel))
hlevel = hlevel + 1
@@ -179,8 +166,6 @@ def report_html( db, run_name, info_callback = info_callback_status, hlevel=1 )
t = get_table( db, run_name, tablename, info_callback, showdepth=1 )
s.write(t.get_html())
- s.write(html_trailer)
-
return s.getvalue()
#
diff --git a/steuermann/run.py b/steuermann/run.py
index af52a02..e83fca2 100644
--- a/steuermann/run.py
+++ b/steuermann/run.py
@@ -3,12 +3,32 @@ run processes asynchronously on various machines, with a callback
on process exit.
'''
+# to do someday:
+#
+# This feature should really be broken into 3 parts:
+# - remotely execute on another machine
+# - track concurrent execution
+# - reserve resource usage
+#
+# To start a process, ask for a resource reservation. (Currently, the
+# only resource we track is CPUs.) If we don't get a reservation, we
+# don't run right away.
+#
+# If we do, use the remote exec to run the process on the target machine.
+# This is the part that knows hosts.ini. (We also use hosts.ini to declare
+# resource availability.)
+#
+# When the process finishes, release the resource reservation.
+#
+
import subprocess
import time
import datetime
import os
+import os.path
import traceback
import sys
+import errno
import ConfigParser
@@ -56,7 +76,7 @@ class runner(object):
#####
# start a process
- def run( self, node, run_name, no_run = False ):
+ def run( self, node, run_name, no_run = False, logfile_name = None ):
try :
try :
@@ -72,14 +92,15 @@ class runner(object):
n = int(self.howmany.get(hostname,0))
if n >= int(args['maxproc']) :
- print "decline to run %s - %d other already running"%(node.name,n)
+ # print "decline to run %s - %d other already running"%(node.name,n)
return False
n = n + 1
self.howmany[hostname] = n
- print "running %s %s %d"%(hostname,node.name, n)
+ # print "running %s %s %d"%(hostname,node.name, n)
else :
- print "running %s %s no maxproc"%(hostname, node.name)
+ # print "running %s %s no maxproc"%(hostname, node.name)
+ pass
if debug :
print "run",node.name
@@ -96,6 +117,7 @@ class runner(object):
table=node.table,
cmd=node.cmd,
node=node.name,
+ runname=run_name,
)
if debug :
@@ -122,15 +144,20 @@ class runner(object):
if debug :
print "RUN",run
- # make sure the log directory is there
- logdir= self.logdir + "/%s"%run_name
- try :
- os.makedirs(logdir)
- except OSError:
- pass
+ if logfile_name is None :
+ # make sure the log directory is there
+ logdir= self.logdir + "/%s"%(run_name)
- # create a name for the log file, but do not use / in the name
- logfile_name = "%s/%s.log"%( logdir, node.name.replace('/','.') )
+ # create a name for the log file, but do not use / in the name
+ logfile_name = "%s/%s.log"%( logdir, node.name.replace('/','.') )
+
+ try :
+ os.makedirs( os.path.dirname(logfile_name) )
+ except OSError, e :
+ if e.errno == errno.EEXIST :
+ pass
+ else :
+ raise
# open the log file, write initial notes
logfile=open(logfile_name,"w")
@@ -142,6 +169,8 @@ class runner(object):
run = [ 'echo', 'no_run - node=', node.name ]
# start running the process
+ if debug :
+ print "RUN",run
p = subprocess.Popen(args=run,
stdout=logfile,
stderr=subprocess.STDOUT,
@@ -177,7 +206,8 @@ class runner(object):
n = self.howmany[hostname] - 1
self.howmany[hostname] = n
- print "finish %s %s %d"%(hostname,node_name,n)
+ if debug :
+ print "finish %s %s %d"%(hostname,node_name,n)
# note the termination of the process at the end of the log file
logfile = self.all_procs[node_name].logfile
@@ -270,7 +300,6 @@ class runner(object):
d1 = d1.copy()
d1.update(d)
d = d1
- print d
del d['like']
# default hostname is the name from the section header
diff --git a/steuermann/run_all.py b/steuermann/run_all.py
index f670abe..1e179a2 100644
--- a/steuermann/run_all.py
+++ b/steuermann/run_all.py
@@ -11,6 +11,7 @@ import datetime
import run
import report
import nodes
+import getpass
import steuermann.config
@@ -22,6 +23,9 @@ try :
except ImportError :
readline = None
+
+username=getpass.getuser()
+
#####
def main() :
@@ -68,7 +72,7 @@ def main() :
if '-r' in opt :
run_name = opt['-r']
else :
- run_name = str(datetime.datetime.now()).replace(' ','_')
+ run_name = "user_%s_%s"%(username,str(datetime.datetime.now()).replace(' ','_'))
db = steuermann.config.open_db()
@@ -348,7 +352,7 @@ def print_pre(who, xnodes, depth) :
def register_database(db, run, xnodes ) :
c = db.cursor()
- c.execute('INSERT INTO sm_runs ( run ) VALUES ( ? )', ( run, ) )
+ c.execute('INSERT INTO sm_runs ( run, create_time ) VALUES ( ?, ? )', ( run, str(datetime.datetime.now()).replace(' ','_')) )
c = db.cursor()
for x in xnodes :
@@ -363,14 +367,17 @@ def register_database(db, run, xnodes ) :
def run_all(xnodes, run_name, db) :
+ for x in xnodes :
+ x = xnodes[x]
+ x.finished = 0
+ x.running = 0
+ x.wanted = 1
+ x.skip = 0
+
register_database(db, run_name, xnodes)
- runner = run.runner( xnodes )
+ runner = run.runner( xnodes, steuermann.config.logdir )
- for x in xnodes :
- xnodes[x].finished = 0
- xnodes[x].running = 0
- xnodes[x].wanted = 1
while 1 :
( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db )
diff --git a/steuermann/run_cron.py b/steuermann/run_cron.py
new file mode 100644
index 0000000..39d5be7
--- /dev/null
+++ b/steuermann/run_cron.py
@@ -0,0 +1,82 @@
+import os
+import sys
+import time
+import steuermann.config
+import steuermann.run
+import datetime
+
+# bugs:
+# - this is a hideous hack that works around run.py not being designed for this
+# - the work directory on the target machine is poorly named
+#
+
+class fakenode(object):
+ pass
+
+def main() :
+
+ if len(sys.argv) < 4 :
+ print "smcron host name command"
+ print "(from ",sys.argv,")"
+ sys.exit(1)
+
+ host = sys.argv[1]
+ name = sys.argv[2]
+ script = ' '.join(sys.argv[3:])
+
+ db = steuermann.config.open_db()
+
+ c = db.cursor()
+
+ start_time = datetime.datetime.now()
+ day, tod = str(start_time).split(' ')
+
+ name = name + '--' + tod
+
+ logfile = '%s/%s.%s' % (day, host, name)
+
+ # something to reduce the possibility of a collision to very small - in this case, we assume that
+ # we can't start multiple processes at the same time from the same process number. i.e. the
+ # pid can not wrap in less than 1 clock tick
+ decollision = os.getpid()
+
+ c.execute("INSERT INTO sm_crons ( host, name, start_time, logfile, decollision ) VALUES ( ?, ?, ?, ?, ? )",
+ ( host, name, str(start_time), logfile, decollision ) )
+ db.commit()
+
+ # this is a hideous hack to plug on to an interface that was not designed for this
+
+ node = fakenode()
+ node.host = host
+ node.name = 'cron.' + name
+ node.table = None
+ node.cmd = None
+ node.script = script
+ if host == 'localhost' :
+ node.script_type = 'l' # local
+ else :
+ node.script_type = 'r' # remote
+
+ runner = steuermann.run.runner( nodes = { node.name : node }, logdir=None )
+ runner.run( node=node, run_name='', logfile_name = steuermann.config.logdir + '/cron/' + logfile )
+
+ n = 0.1
+ while 1 :
+ exited = runner.poll()
+ if exited :
+ break
+ if n < 2.0 :
+ n = n * 2.0
+ time.sleep(n)
+
+ status = exited[1]
+
+ end_time = datetime.datetime.now()
+ td = end_time - start_time
+
+ # only 1 decimal place because we don't poll often enough to make more reasonable.
+ td = "%.1f" % ( td.microseconds/1e6 + td.seconds + td.days * 24 * 3600 )
+ c.execute("UPDATE sm_crons SET end_time = ?, status = ?, duration = ? WHERE host = ? AND name = ? ",
+ (str(end_time), status, td, host, name ) )
+ db.commit()
+