diff options
author | sienkiew <sienkiew@d34015c8-bcbb-4646-8ac8-8ba5febf221d> | 2011-08-30 16:09:41 -0400 |
---|---|---|
committer | sienkiew <sienkiew@d34015c8-bcbb-4646-8ac8-8ba5febf221d> | 2011-08-30 16:09:41 -0400 |
commit | ea0bbd187c539b30c6b70a7a220ca1249f3cca41 (patch) | |
tree | bfc64ff8328e952e4348440fbe77a85aee88f48e | |
parent | 0206b3bdab5ca17b8e22806c34330dd58d55e429 (diff) | |
download | steuermann-ea0bbd187c539b30c6b70a7a220ca1249f3cca41.tar.gz |
hacked together prototype of steuermann 21.9
git-svn-id: https://svn.stsci.edu/svn/ssb/etal/steuermann/trunk@381 d34015c8-bcbb-4646-8ac8-8ba5febf221d
-rw-r--r-- | Makefile | 12 | ||||
-rw-r--r-- | daily.sr | 74 | ||||
-rw-r--r-- | note | 4 | ||||
-rw-r--r-- | scripts/smc | 4 | ||||
-rw-r--r-- | setup.py | 27 | ||||
-rw-r--r-- | steuermann/__init__.py | 1 | ||||
-rw-r--r-- | steuermann/db.sql | 48 | ||||
-rw-r--r-- | steuermann/hosts.ini | 43 | ||||
-rw-r--r-- | steuermann/nodes.py | 303 | ||||
-rw-r--r-- | steuermann/report.py | 146 | ||||
-rw-r--r-- | steuermann/run.py | 206 | ||||
-rw-r--r-- | steuermann/run_all.py | 368 | ||||
-rw-r--r-- | steuermann/specfile.exy | 68 | ||||
-rw-r--r-- | tests/misc.py | 43 |
14 files changed, 1347 insertions, 0 deletions
diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..bab9fea --- /dev/null +++ b/Makefile @@ -0,0 +1,12 @@ +all: steuermann/specfile.py sr.db + +steuermann/specfile.py: steuermann/specfile.exy + exyapps steuermann/specfile.exy + +sr.db: steuermann/db.sql + rm -f sr.db + sqlite3 sr.db < steuermann/db.sql + +clean: + rm -f sr.db + diff --git a/daily.sr b/daily.sr new file mode 100644 index 0000000..13e856e --- /dev/null +++ b/daily.sr @@ -0,0 +1,74 @@ +TABLE assemble HOST thor + CMD svnsync RUN "svnsync" + AFTER OPT assemble/irafx_update + CMD stsci_python RUN "assemble_stsci_python" + AFTER svnsync + CMD stsci_iraf RUN "assemble_stsci_iraf" + AFTER svnsync + CMD astrolib RUN "assemble_astrolib" + AFTER svnsync + CMD axe RUN "assemble_axe" + AFTER svnsync + CMD hstcal RUN "assemble_hstcal" + AFTER svnsync + +# all machines +TABLE build HOST rhe4-32 rhe4-64 rhe5-64 leopard snow_leopard + + CMD python2.7 RUN "build_stsci_python dev2.7" + AFTER *:assemble/stsci_python + + CMD hstcal RUN "build_hstcal" + AFTER *:assemble/hstcal + +TABLE build HOST rhe-64 + + CMD python2.6 RUN "build_stsci_python dev2.6" + AFTER *:assemble/stsci_python + + CMD python2.5 RUN "build_stsci_python dev2.5" + AFTER *:assemble/stsci_python + + +# 32 bit +TABLE build32 HOST rhe4-32 leopard + + CMD stsci_iraf RUN "build_stsci_iraf" + AFTER *:assemble/stsci_iraf + + CMD axe RUN "build_axe" + AFTER *:assemble/axe + AFTER build32/stsci_iraf + +# copy 32 bit exe to 64 bit machine +TABLE build64 HOST rhe4-64 + CMD iraf32_hack RUN "iraf32hack rhe4-32" + +TABLE build64 HOST rhe5-64 + CMD iraf32_hack RUN "iraf32hack rhe4-32" + +TABLE build64 HOST snow_leopard + CMD iraf32_hack RUN "iraf32hack leopard" + + +# +TABLE build_finished HOST rhe4-32 leopard + CMD finished2.7 RUN "echo done" + AFTER build/python2.7 + AFTER build/hstcal + AFTER build32/stsci_iraf + AFTER build32/axe + +TABLE build_finished HOST rhe4-64 rhe5-64 snow_leopard + CMD finished2.7 RUN "echo done" + AFTER build/python2.7 + AFTER build/hstcal + AFTER build64/iraf32_hack + + +# +TABLE web_updates HOST thor + CMD pyraf RUN "exit 1" + AFTER *:assemble/svnsync + CMD pyfits RUN "exit 1" + AFTER *:assemble/svnsync @@ -0,0 +1,4 @@ + +http://stackoverflow.com/questions/2408560/python-nonblocking-console-input + + diff --git a/scripts/smc b/scripts/smc new file mode 100644 index 0000000..3937855 --- /dev/null +++ b/scripts/smc @@ -0,0 +1,4 @@ +#! python +import steuermann.run_all as run_all + +run_all.main() diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..42d5f3a --- /dev/null +++ b/setup.py @@ -0,0 +1,27 @@ +import distutils.core + +f=open('steuermann/__init__.py','r') +for x in f : + if x.startswith('__version__') : + version = x.split("'")[1] + break +f.close() + +print version + +args = { + 'version' : "2.0dev", + 'description' : "Steuermann Continuous Integration Control System", + 'author' : "Mark Sienkiewicz", + 'scripts' : ['scripts/smc'], + 'package_dir' : { 'steuermann' : 'steuermann', }, + 'url' : 'https://svn.stsci.edu/trac/ssb/etal/wiki/Steuermann', + 'license': 'BSD', + 'packages': [ 'steuermann' ], + 'package_data': { 'steuermann' : [ '*.sql', '*.ini', ], } +} + +d = distutils.core.setup( + **args +) + diff --git a/steuermann/__init__.py b/steuermann/__init__.py new file mode 100644 index 0000000..0e77cbd --- /dev/null +++ b/steuermann/__init__.py @@ -0,0 +1 @@ +__version__ = '2.0dev' diff --git a/steuermann/db.sql b/steuermann/db.sql new file mode 100644 index 0000000..664f5df --- /dev/null +++ b/steuermann/db.sql @@ -0,0 +1,48 @@ +-- The table 'status' contains a record for each command in a run. +-- Before we start running anything, we insert a record for every +-- command in the test run. The initial status is 'S'. + +CREATE TABLE status ( + run VARCHAR(100), + -- name of this run + + host VARCHAR, + tablename VARCHAR, + cmd VARCHAR, + -- name of the command (node) + + depth INTEGER, + -- depth in the tree of this node (used to create report tables) + + status VARCHAR(5), + -- N = not started + -- S = started, not finished + -- P = prereq not satisfied, so not attempted + -- 0-255 = exit code + + start_time VARCHAR(30), + end_time VARCHAR(30), + -- times initially blank + -- YYYY-MM-DD HH:MM:SS.SSS + -- (space for resolution to nanosecond is a bit extreme) + + -- a log file name is implicit in the run/host/tablename/cmd tuple + + notes VARCHAR(1000), + -- notes reported by the script + + FOREIGN KEY(run) REFERENCES runs(run) + -- run name has to be in the run table + ); + + +create unique index idx_status_1 on status ( run, host, tablename, cmd ); + + +-- table lists all run names in the system +CREATE TABLE runs ( + run VARCHAR(100) + ); + +CREATE UNIQUE INDEX idx_runs_run ON runs(run); + diff --git a/steuermann/hosts.ini b/steuermann/hosts.ini new file mode 100644 index 0000000..61231df --- /dev/null +++ b/steuermann/hosts.ini @@ -0,0 +1,43 @@ +; semicolon starts a comment in INI files +; If your data value contains a ";" make sure there is no space before it + +; There is a section for each [machinename]. In that section, you +; should define +; run=some command to run scripts +; It is a list of argv values to run the command. In each item of +; the list, you can use %(n)s to replace with various values: +; %(script)s - +; %(host)s - +; %(table)s - +; %(cmd)s - +; %(foo)s - if you include a foo= line +; +[arzach] +workdir=/arzach/data1/sienkiew + +[thor] +workdir=/thor/data2/sienkiew + +[herbert] +workdir=/herbert/data1/sienkiew + +[jwcalibdev] +workdir=/data1/sienkiew/ur_work + +; There is a section [ALL] that is used with every machine name +[ALL] +run=[ 'ssh', '-q', '%(host)s', 'cd %(workdir)s; %(script)s' ] + +[host_a] +run=[ 'sleep', '3' ] + +[host_b] +run=[ 'sleep', '3' ] + +[host_c] +run=[ 'sleep', '3' ] + +[host_d] +run=[ 'sleep', '3' ] + + diff --git a/steuermann/nodes.py b/steuermann/nodes.py new file mode 100644 index 0000000..5e25d8f --- /dev/null +++ b/steuermann/nodes.py @@ -0,0 +1,303 @@ +''' +Stuff related to the tree structure of the command set +''' + +import fnmatch +import exyapps.runtime + + +##### +##### + +class command_tree(object): + + # a dict that maps currently known node names to node objects + node_index = None + + # This is a list if (before, after) showing names of node orders; + # the individual nodes do not store this information while the + # parsing is happening. + node_order = None + + # call init() once before parsing, then call parse for each file, then call finish() + def __init__( self ) : + self.node_index = { } + self.node_order = [ ] + + def finish( self ) : + # This links up the nodes according to the (before, after) information + # in node_order. before and after are fully qualified node names. + # We do this at the end so that + # - we can have forward references + # - we can use wild cards in our AFTER clauses + + # before is the predecessor, after comes AFTER the predecessor + for before, after, required, pos in self.node_order : + if ( '*' in before ) or ( '?' in before ) or ( '[' in before ): + host, table, cmd = crack_name(before) + for x in self.node_index : + hl, tl, cl = crack_name(x) + if ( fnmatch.fnmatchcase(hl,host ) and + fnmatch.fnmatchcase(tl,table) and + fnmatch.fnmatchcase(cl,cmd ) ) : + self.connect(x, after, required, pos) + else : + self.connect(before, after, required, pos) + + # work out the depths of each node + compute_depths( self.node_index ) + + + # make the actual connection between nodes + def connect( self, after, before, required, line ) : + if not after in self.node_index : + if required : + print "error: %s happens after non-existant %s - line %s"%(before,after,line) + return + + if not before in self.node_index : + print "error: before node %s does not exist %s"%(before,line) + return + + if not after in self.node_index : + print "error: after node %s does not exist %s"%(after,line) + return + + # tell the after node that the other one comes first + + self.node_index[after].predecessors.append(self.node_index[before]) + + # tell the after node about the before node and note that + # the before node is not done yet. + self.node_index[after].released[before] = False + + + # create a set of nodes for a particular command - called from within the parser + def add_command_list( self, table, hostlist, command_list ) : + for host in hostlist : + this_table = '%s:%s' % ( host, table ) + for command, script, after, pos in command_list : + # this happens once for each CMD clause + # command is the name of this command + # script is the script to run + # after is a list of AFTER clauses + # pos is where in this file this command was defined + + command = normalize_name( host, table, command ) + + if command in self.node_index : + # bug: should be error + print "# warning: %s already used on line %s"%(command,self.node_index[command].input_line) + + # create the node + self.node_index[command]=node(command, script, nice_pos( current_file_name, pos) ) + + for before_name, required, pos in after : + # this happens once for each AFTER clause + # before is the name of a predecessor that this one comes after + # required is a boolean, whether the predecessor must exist + # pos is where in the file the AFTER clause begins + + before_name = normalize_name( host, table, before_name ) + + # list of ( before, after, required, pos ) + self.node_order.append( (before_name, command, required, pos) ) + + + +##### + +# crack open host:table/cmd +def crack_name(name) : + t = name.split(':') + host = t[0] + t = t[1].split('/') + table = t[0] + cmd = t[1] + return (host, table, cmd) + +##### + +def join_name( host, table, cmd ) : + return '%s:%s/%s'%(host,table,cmd) + +##### + +def normalize_name( host, table, name ) : + + if not '/' in name : + name = table + '/' + name + + if name.startswith('.:') : + name = name[2:] + + if not ':' in name : + name = host + ':' + name + # print "## return %s"%name + return name + +##### + + +_wildcard_cache = ( None, None ) + +def wildcard_name( wild, name ) : + global _wildcard_cache + + if wild != _wildcard_cache : + host, table, cmd = crack_name(wild) + _wildcard_cache = ( name, ( host, table, cmd ) ) + + host, table, cmd = _wildcard_cache[1] + + hl, tl, cl = crack_name(name) + + return ( fnmatch.fnmatchcase(hl,host ) and + fnmatch.fnmatchcase(tl,table) and + fnmatch.fnmatchcase(cl,cmd ) ) + + +##### + + +##### + + +# a node object for each command instance that will be run. The name is +# "host:table/command". The shell command to execute on the target host +# is "script". +# +# predecessors[] is a list of everything that this node must definitely come after. +# +# released is a dict indexed by each of the "before" nodes; the value is +# true if the before node is finished running. +# +class node(object) : + def __init__(self, name, script, input_line) : + # the fully qualified name of the node + self.name = name + + # the command script that this node runs + self.script = script + + # what line of the input file specified this node; this is + # a string of the form "foo.bar 123" + self.input_line = input_line + + # crack open host:table/cmd + self.host, self.table, self.cmd = crack_name(name) + + # this command runs after every node in this list + self.predecessors = [ ] + + # this is a dict of everything that comes before us + # The key is the name of the other node. + # The value is true/false for whether the other node + # is finished running. + self.released = { } + + # These flags are 1 or 0 so we can sum() them + self.finished = 0 + self.running = 0 + +##### + +# debug - make a string representation of all the nodes + +def show_nodes( node_index ) : + import cStringIO as StringIO + s = StringIO.StringIO() + for x in sorted( [ x for x in node_index ] ) : + x = node_index[x] + + # show the node name and the command it runs + s.write( "NODE %s %s %s\n"%(x.name, x.script, x.input_line) ) + + # show each node that this one comes After, and show the flag + # whether the previous node has run yet. + for y in x.predecessors : + s.write( " AFTER %s\n"%y.name ) + + return s.getvalue() + +##### + +def nice_pos( filename, yapps_pos ) : + # filename is the file name to use + # yapps_pos is a tuple of (file, line, col) except that file + # is a file-like object with no traceability to an actual file1 + return "%s:%s col %s"%(filename, yapps_pos[1], yapps_pos[2]) + +##### + + +##### +# +# calculating depths of a node tree + +def c_d_fn(x,depth) : + + # if it is already deeper than where we are now, we can (must) + # prune the tree walk here. + if x.depth >= depth : + return + + if depth > 100 : + # bug: proxy for somebody wrote a loop + print "error: depth > 100" + return + + # assign the depth + x.depth = depth + + # make all the successors one deeper + depth = depth + 1 + for y in x.successors : + c_d_fn(y,depth) + +def compute_depths(nodes) : + + # init everything + for x in nodes : + x = nodes[x] + x.depth = 0 + x.successors = [ ] + + # walk the nodes in an arbitrary order; make a list of successors + for x in nodes : + x = nodes[x] + for y in x.predecessors : + if not x in y.successors : + y.successors.append(x) + + # recursively walk down the tree assigning depth values, starting + # with depth=1 for the highest level + for x in nodes : + c_d_fn(nodes[x],1) + + +##### + +import specfile + +current_file_name = None + +def read_file_list( file_list ) : + global current_file_name + di = command_tree( ) + for x in file_list : + current_file_name = x + print x + sc = specfile.specfileScanner( open(x,'r').read() ) + p = specfile.specfile( scanner=sc, data=di ) + result = exyapps.runtime.wrap_error_reporter( p, 'start' ) + di.finish() + return di + +if __name__=='__main__': + import sys + n = read_file_list( sys.argv[1:] ) + print show_nodes(n.node_index) + + + diff --git a/steuermann/report.py b/steuermann/report.py new file mode 100644 index 0000000..1bf4f6a --- /dev/null +++ b/steuermann/report.py @@ -0,0 +1,146 @@ +''' +prototype of report + +''' +import subprocess +import time +import sys + +try : + import CStringIO as StringIO +except ImportError : + import StringIO as StringIO + +##### + +def c_d_fn(x,depth) : + if x.depth >= depth : + return + x.depth = depth + depth = depth + 1 + for y in x.children : + c_d_fn(y,depth) + + +def compute_depths(nodes) : + for x in nodes : + x = nodes[x] + x.depth = 0 + x.in_recurse = 0 + x.parents = [ ] + x.children = [ ] + for x in nodes : + x = nodes[x] + for y in x.precursors : + if not x in y.children : + y.children.append(x) + for x in nodes : + c_d_fn(nodes[x],1) + +##### + +def compute_table(nodes) : + # find the depths + compute_depths(nodes) + + # sort the nodes by depth + l = [ x.split(':') for x in nodes ] + l = [ [ x[0] ] + x[1].split('/') for x in l ] + l = [ [ x[1], x[2], x[0] ] for x in l ] + l = sorted(l) + + # table_content is a list of nodes in each table + table_content = { } + + # table_hosts is a list of hosts in each table + table_hosts = { } + + # table_depth is how deep the deepest row of each table is + table_depth = { } + + for x in nodes : + host, table = x.split(':') + table, cmd = table.split('/') + + table_content[table] = table_content.get(table,[]) + [ x ] + + table_hosts [table] = table_hosts .get(table,[]) + [ host ] + + if table_depth.get(table,0) < nodes[x].depth : + table_depth[table] = nodes[x].depth + + for x in table_hosts : + table_hosts[x] = list(set(table_hosts[x])) + + return table_content, table_hosts, table_depth + +##### + +# html of one table + +def html_table(nodes, table, host_list ) : + s=StringIO.StringIO() + + # this is all the nodes in this table + pat = ':%s/' % table + l = [ x for x in nodes if pat in x ] + + # d[x] is the max depth of command x + d = { } + for x in l : + depth = nodes[x].depth + x = x.split('/')[1] + if d.get(x,0) < depth : + d[x] = depth + + # this is the order of the rows of the table + cmd_order = sorted( [ (d[x], x) for x in d ] ) + + # this is the table + s.write( "<table border=1>" ) + + # heading + s.write( "<tr> <td> </td> " ) + for host in host_list : + s.write( "<th>%s</th>" % host ) + s.write( "</tr>\n" ) + + # loop over the commands in the order they appear + for depth, cmd in cmd_order : + s.write( "<tr>\n\t<td>%s/%s</td>\n"%(table,cmd) ) + for host in host_list : + name = host + ':' + table + '/' + cmd + if name in nodes : + s.write( "\t<td class=%%(class!%s)s> %%(text!%s)s </td>\n"%(name,name) ) + else : + s.write( "\t<td class=nothing> . </td>\n" ) + s.write( "</tr>\n" ) + s.write( "</table>" ) + return s.getvalue() + +class struct(object): + pass + +##### + +def main() : + import sqlite3 + db = sqlite3.connect('sr.db') + + c = db.cursor() + c.execute('select run from runs') + for (run,) in c : + + all = [ ] + c1 = db.cursor() + c1.execute('select host, tablename, cmd, depth, status, start_time, end_time, notes from status where run = ? ',(run,) ) + for x in c1 : + n = struct() + n.host, n.tablename, n.cmd, n.depth, n.status, n.start_time, n.ent_time, n.notes = x + all.append(n) + + print "RUN",run + for x in all : + print x.host, x.tablename, x.cmd, x.status + +main() diff --git a/steuermann/run.py b/steuermann/run.py new file mode 100644 index 0000000..7657248 --- /dev/null +++ b/steuermann/run.py @@ -0,0 +1,206 @@ +''' +run processes asynchronously on various machines, with a callback +on process exit. +''' + +import subprocess +import time +import datetime +import os + +import ConfigParser + +debug=0 + +##### + +class struct : + pass + + +##### + +class runner(object): + + # dict of all current running processes, indexed by node name + all_procs = { } + + # index of nodes + node_index = { } + + # info about hosts we can run on + host_info = { } + + # dir where we write our logs + logdir = 'logs' + + ##### + # + + def __init__( self, nodes ) : + self.node_index = nodes + self.load_host_info() + + ##### + # start a process + + def run( self, node, run_name ): + if debug : + print "run",node.name + node.running = 1 + if debug : + print "....%s:%s/%s\n"%(node.host, node.table, node.cmd) + + try : + args = self.host_info[node.host] + except : + print "ERROR: do not know how to run on %s"%node.host + raise + + run = args['run'] + + args = args.copy() + args.update( + script=node.script, + host=node.host, + table=node.table, + cmd=node.cmd + ) + + if debug : + print "ARGS" + for x in sorted([x for x in args]) : + print '%s=%s'%(x,args[x]) + + run = [ x % args for x in run ] + + if debug : + print "RUN",run + + # make sure the log directory is there + logdir= self.logdir + "/%s"%run_name + try : + os.makedirs(logdir) + except OSError: + pass + + # create a name for the log file, but do not use / in the name + logfile_name = "%s/%s.log"%( logdir, node.name.replace('/','+') ) + + # open the log file, write initial notes + logfile=open(logfile_name,"w") + logfile.write('%s %s\n'%(datetime.datetime.now(),run)) + logfile.flush() + + # start running the process + p = subprocess.Popen(args=run, + stdout=logfile, + stderr=subprocess.STDOUT, + shell=False, close_fds=True) + + # remember the popen object for the process; remember the open log file + n = struct() + n.proc = p + n.logfile = logfile + n.logfile_name = logfile_name + + # remember the process is running + self.all_procs[node.name] = n + + ##### + # callback when a node finishes + + def finish( self, node_name, status): + # note the termination of the process at the end of the log file + logfile = self.all_procs[node_name].logfile + logfile.seek(0,2) # end of file + logfile.write('\n%s exit=%s\n'%(datetime.datetime.now(),status)) + logfile.close() + + # note the completion of the command + node = self.node_index[node_name] + if debug : + print "finish",node.name + node.running = 0 + node.finished = 1 + node.exit_status = status + + ##### + + # poll for exited child processes - this whole thing could could + # be event driven, but I don't care to work out the details right + # now. + + def poll( self ) : + + # look at all active processes + for name in self.all_procs : + + # see if name has finished + p = self.all_procs[name].proc + n = p.poll() + if n is not None : + + # marke the node finished + self.finish(name,n) + + # + status = p.returncode + + # remove it from the list of pending processes + del self.all_procs[name] + + # Return the identity of the exited process. + # There may be more, but we will come back and poll again. + return ( name, status ) + + return None + + ##### + + def display_procs( self ) : + # display currently active child processes + print "procs:", + for x in sorted(self.all_procs) : + print " ",x + print "" + + ##### + + + def _host_get_names( self, cfg, section, d ) : + # pick all the variables out of this section + for name, value in cfg.items(section) : + if name == 'run' : + # run is a list + d[name] = eval(value) + else : + # everything else is plain text + d[name] = value + + def load_host_info( self, filename=None ) : + self.host_info = { } + + # read the config file + if filename is None : + filename = os.path.dirname(__file__) + '/hosts.ini' + cfg = ConfigParser.RawConfigParser() + cfg.read(filename) + + # this dict holds the set of values that are defined as + # applying to all hosts + all_dict = { } + self._host_get_names(cfg, 'ALL', all_dict) + + # for all the sections (except ALL) get the names from that section + for x in cfg.sections() : + if x == 'ALL' : + continue + + d = all_dict.copy() + self._host_get_names(cfg, x, d) + self.host_info[x] = d + + del cfg + + ##### diff --git a/steuermann/run_all.py b/steuermann/run_all.py new file mode 100644 index 0000000..1fe2aa2 --- /dev/null +++ b/steuermann/run_all.py @@ -0,0 +1,368 @@ +''' +run everything in a set of command files + +''' + +import time +import sys +import sqlite3 + +import run + +import datetime + +import nodes + + +##### + +def main() : + global xnodes + # read all the input files + + di_nodes = nodes.read_file_list( sys.argv[2:] ) + + xnodes = di_nodes.node_index + run_name = 'arf' + str(datetime.datetime.now()) + db = sqlite3.connect('sr.db') + register_database(db, run_name, xnodes) + + n = sys.argv[1] + if n == '-a' : + run_all(xnodes, run_name, db) + elif n == '-i' : + run_interactive( xnodes, run_name, db ) + else : + print "%s ?"%n + + +def do_flag( xnodes, name, recursive, fn, verbose ) : + if verbose : + verbose = verbose + 1 + if not ':' in name : + name = '*:' + name + if ( '*' in name ) or ( '?' in name ) or ( '[' in name ) : + if verbose : + print ' '*verbose, "wild",name + for x in xnodes : + if nodes.wildcard_name( name, x ) : + if verbose : + print ' '*verbose, "match",x + do_flag( xnodes, x, recursive, fn, verbose ) + elif name in xnodes : + if verbose : + print ' '*verbose, "found",name + fn(xnodes[name]) + if recursive : + for y in xnodes[name].predecessors : + do_flag( xnodes, y.name, recursive, fn, verbose ) + else : + if verbose : + print ' '*verbose, "not in list", name + raise Exception() + +def set_want( node ) : + node.wanted = 1 + node.skip = 0 + +def set_skip( node ) : + node.wanted = 0 + node.skip = 1 + + +def cmd_flagging( l, xnodes, func ) : + if l[1] == '-r' : + recursive = 1 + l = l[2:] + else : + recursive = 0 + l = l[1:] + + for x in l : + do_flag( xnodes, x, recursive, func, 1 ) + +helpstr = """ +report show report +want [-r] node declare that we want that node +skip [-r] node skip this node +list -a +list node +start +wait +wr report want/skip values +wd report depth +""" + +def run_interactive( xnodes, run_name, db) : + + runner = run.runner( xnodes ) + + for x in xnodes : + xnodes[x].finished = 0 + xnodes[x].running = 0 + xnodes[x].wanted = 0 + xnodes[x].skip = 0 + + keep_running = 0 + + while 1 : + print "action?" + l = sys.stdin.readline() + if l == '' : + break + l = l.strip() + l = l.split() + if len(l) > 0 : + n = l[0] + else : + n = '' + + if n == '?' : + print helpstr + + elif n == 'report' : + report( db, run_name ) + + elif n == 'wr' : + report( db, run_name, info_callback_want ) + + elif n == 'wd' : + report( db, run_name, info_callback_depth ) + + elif n == 'want' : + cmd_flagging( l, xnodes, set_want ) + + elif n == 'skip' : + cmd_flagging( l, xnodes, set_skip ) + + elif n == 'list' : + print_all = '-a' in l + l = sorted ( [ x for x in xnodes ] ) + print "w f s name" + for x in l : + print xnodes[x].wanted, xnodes[x].finished, xnodes[x].skip, x + if print_all : + print " AFTER", ' '.join([ a.name for a in xnodes[x].predecessors ]) + + elif n == 'start' : + keep_running = 1 + + elif n == 'wait' : + while 1 : + ( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db ) + if not keep_running : + break + if not no_sleep : + time.sleep(1) + if keypress() : + print "wait interrupted (processes continue)" + break + print "wait finished" + + if keep_running : + print "run step" + ( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db ) + + if len(runner.all_procs) == 0 : + # give it a chance to start another + ( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db ) + + if not keep_running : + print 'all done' + + else : + if len(runner.all_procs) == 0 : + print "no processes running - some prereq not satisfiable" + + + +def register_database(db, run, xnodes ) : + c = db.cursor() + c.execute('INSERT INTO runs ( run ) VALUES ( ? )', ( run, ) ) + + c = db.cursor() + for x in xnodes : + host, tablename, cmd = nodes.crack_name(x) + depth = xnodes[x].depth + c.execute("INSERT INTO status ( run, host, tablename, cmd, depth, status ) VALUES " + "( ?, ?, ?, ?, ?, 'N' )", ( run, host, tablename, cmd, depth ) ) + + db.commit() + +def run_all(xnodes, run_name, db) : + + runner = run.runner( xnodes ) + + for x in xnodes : + xnodes[x].finished = 0 + xnodes[x].running = 0 + xnodes[x].wanted = 1 + + while 1 : + ( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db ) + if not keep_running : + break + if not no_sleep : + time.sleep(1) + +def run_step( runner, xnodes, run_name, db ) : + + # flag to keep running + keep_running = 0 + + # flag to suppress brief sleep at end of loop + no_sleep = 0 + + # Loop, polling for work to do, or for finishing processes + print "loop" + for x_name in xnodes : + x=xnodes[x_name] + + # skip nodes that we do not need to consider running because + + # - we explicitly ask to skip it; also mark it finished + # so that things that come after can run + if x.skip : + x.finished = 1 + continue + + # - it is not wanted + if not x.wanted : + continue + + # - it is already finished + if x.finished : + continue + + # - we are already running it + if x.running : + keep_running=1 + continue + + # ok, if we are here, we found a node that we want to run + + # if there is a node we need to run, we need to come back through the loop + # (bug: are we sure there is not a deadlock caused by mutual dependencies? if that happens, it can never run.) + keep_running = 1 + + # count how many of the predecessors are finished + released = sum( [ xnodes[r].finished for r in x.released ]) + + # if the number of predecessors finished is the number + # of predecessors, we can run this one + if released == len(x.released) : + host, table, cmd = nodes.crack_name(x_name) + print "RUN", x_name + + db.execute("UPDATE status SET start_time = ?, status = 'S' WHERE ( run = ? AND host = ? AND tablename = ? AND cmd = ? )", + ( str(datetime.datetime.now()), run_name, host, table, cmd ) ) + db.commit() + runner.run(x, run_name) + + # if anything has exited, we process it and update the status in the database + while 1 : + who_exited = runner.poll() + if not who_exited : + break + + # yes, something exited - no sleep, and keep running + no_sleep = 1 + keep_running = 1 + + # note who and log it + x_host, x_table, x_cmd = nodes.crack_name(who_exited[0]) + + xnodes[who_exited[0]].wanted = 0 + + db.execute("UPDATE status SET end_time = ?, status = ? WHERE ( run = ? AND host = ? AND tablename = ? AND cmd = ? )", + ( str(datetime.datetime.now()), who_exited[1], run_name, x_host, x_table, x_cmd ) ) + + + runner.display_procs() + + return ( keep_running, no_sleep ) + +##### + +ms_windows = 0 + +if ms_windows : + import msvcrt +else : + import select + +def keypress() : + if ms_windows : + return msvcrt.kbhit() + else : + return select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []) + +##### + +def info_callback_status( tablename, cmd, host, status ) : + return status + +def info_callback_want( tablename, cmd, host, status ) : + n = xnodes['%s:%s/%s'%(host,tablename,cmd)] + s = '' + if n.skip : + s = s + 'S' + if n.wanted : + s = s + 'W' + if s == '' : + s = '-' + return s + +def info_callback_depth( tablename, cmd, host, status ) : + n = xnodes['%s:%s/%s'%(host,tablename,cmd)] + return n.depth + +def report( db, run_name, info_callback = info_callback_status ) : + import pandokia.text_table as tt + + c = db.cursor() + c.execute("select max(depth) as d, tablename from status where run = ? group by tablename order by d asc",(run_name,)) + table_list = [ x for x in c ] + # table_list contains ( depth, tablename ) + + print """ + -- N = not started + -- S = started, not finished + -- P = prereq not satisfied, so not attempted + -- 0-255 = exit code +""" + + for depth, tablename in table_list : + print "------" + print tablename + t = tt.text_table() + + row = 0 + t.define_column('-') # the command name in column 0 + + c.execute("select distinct host from status where tablename = ? and run = ? order by host asc",(tablename, run_name)) + for host, in c : + t.define_column(host) + t.set_value(row, host, host) + + c.execute("""select cmd, host, depth, status, start_time, end_time, notes from status + where tablename = ? and run = ? order by depth, cmd asc + """, ( tablename, run_name ) ) + + prev_cmd = None + for x in c : + cmd, host, depth, status, start_time, end_time, notes = x + if cmd != prev_cmd : + row = row + 1 + t.set_value(row, 0, cmd) + prev_cmd = cmd + t.set_value( row, host, info_callback( tablename, cmd, host, status ) ) + + t.pad() + + print t.get_trac_wiki() + +##### + +if __name__ == '__main__' : + main() diff --git a/steuermann/specfile.exy b/steuermann/specfile.exy new file mode 100644 index 0000000..8dd42a3 --- /dev/null +++ b/steuermann/specfile.exy @@ -0,0 +1,68 @@ + +# Parse a Steuermann Spec file +# +# This is an exyapps grammer in specfile.exy + +%% +parser specfile: + ignore: "[ \r\t\n]+" + ignore: "#.*\n" + + token END: "$" + token TABLE: "TABLE" + token HOST : "HOST" + token CMD: "CMD" + token OPT: "OPT" + token AFTER: "AFTER" + token name: "[a-zA-Z0-9_.-]+" + token STAR: "\*" + token cmdname: "[a-zA-Z0-9_.-]+" + token tablename: "[a-zA-Z0-9_.-]+" + token wildname: "[*?a-zA-Z0-9_.-]+" + token RUN: "RUN" + token string: '"[^"]*"' + token SLASH: "/" + token COLON: ":" + + rule start: table_list END {{ return table_list }} + + rule table_list: table_section [ table_list ] + + rule table_section: + TABLE tablename {{ table_name = tablename }} + HOST + {{ hostlist = [ ] }} + ( name {{ hostlist.append(name) }} )+ + command_list + # command_list is a list of (command, pos) where command is the text from the file and pos is the location in the file + {{ self.data.add_command_list( table_name, hostlist, command_list ) }} + + rule command_list: + # one or more commands, appended together into a list + {{ cmlist = [ ] }} + command {{ cmlist.append( command ) }} + [ command_list {{ cmlist += command_list }} ] + {{ return cmlist }} + + | {{ return [ ] }} + + rule command: + # a single command, including any number of AFTER clauses + CMD {{ cmd_pos = self._scanner.get_pos() }} cmdname {{ cmd_name=cmdname; script=cmdname; x_after_clause = [ ] }} + [ RUN string {{ script = string[1:-1] }} ] + ( {{ after_pos = self._scanner.get_pos() }} AFTER optword after_spec {{ x_after_clause.append( (after_spec,optword, after_pos) ) }} )* + {{ return ( cmd_name, script, x_after_clause, cmd_pos ) }} + + # in the AFTER clause, you can say OPT to mean the node is optional (not an error if it does not exist) + rule optword: + OPT {{ return 0 }} + | {{ return 1 }} + + rule after_spec: + wildname {{ rval = wildname }} + [ COLON wildname {{ rval = rval + ':' + wildname }} ] + [ SLASH wildname {{ rval = rval + '/' + wildname }} ] + {{ return rval }} + +%% + diff --git a/tests/misc.py b/tests/misc.py new file mode 100644 index 0000000..c1d12aa --- /dev/null +++ b/tests/misc.py @@ -0,0 +1,43 @@ + +import nodes + +yes_list = ( + ( 'a:b/c', 'a:b/c' ), + ( 'a:b/*', 'a:b/c' ), + ( 'a:*/c', 'a:b/c' ), + ( 'a:*/*', 'a:b/c' ), + ( '*:b/c', 'a:b/c' ), + ( '*:b/*', 'a:b/c' ), + ( '*:*/c', 'a:b/c' ), + ( '*:*/*', 'a:b/c' ), + + ( '[a-z]:*/*', 'a:b/c' ), + + ( 'a?:*/*', 'ax:b/c' ), + ( 'a?:*/*', 'ay:b/c' ), + + ( '*:*/*', 'xasdadsf:agasdgg/asdgasdg' ), + ) + +no_list = ( + ( '[A-Z]:*/*', 'a:b/c' ), + ( 'a?:*/*', 'a:b/c' ), + ( 'a:b/*', 'a:x/y' ), + ( '*:c/*', 'a:b/y' ), + ( '*:b/y', 'a:b/c' ), + ( 'a:b/y', 'a:b/c' ), + ) + +def test_wildcard_name() : + def yes( a, b ) : + assert nodes.wildcard_name( a, b ) + + def no( a, b ) : + assert not nodes.wildcard_name( a, b ) + + for x in yes_list : + yield yes, x[0], x[1] + + for x in no_list : + yield no, x[0], x[1] + |