diff options
-rw-r--r-- | steuermann/run.py | 76 | ||||
-rw-r--r-- | steuermann/run_all.py | 106 |
2 files changed, 127 insertions, 55 deletions
diff --git a/steuermann/run.py b/steuermann/run.py index 3b098da..72bba6c 100644 --- a/steuermann/run.py +++ b/steuermann/run.py @@ -73,18 +73,22 @@ class runner(object): self.node_index = nodes self.load_host_info(filename = hosts_ini) self.host_info_cache = { } - self.howmany = { } + self.resources = {} + self.resources_avail = {} + ##### # start a process def run( self, node, run_name, logfile_name, no_run = False ): - '''run a process - return is: - D - host disabled - M - not run max proc limit - R - running -''' + ''' + run a process + + return is: + D - host disabled + M - not run; resource limit + R - running + ''' try : @@ -99,20 +103,34 @@ class runner(object): if ( config_yes_no(args,'disable') ) : return 'D' - hostname = args['hostname'] - if 'maxproc' in args : + # track resources + if node.host not in self.resources.keys(): + self.resources[node.host] = {} + self.resources_avail[node.host] = {} + + for key, val in args.items(): + if key.startswith('res_'): + key = key.lstrip('res_') + self.resources[node.host][key] = int(val) + if key not in self.resources_avail[node.host].keys(): + self.resources_avail[node.host][key] = int(val) + + # check if enough resources on host and return if not + enough = True + for res, amount in node.resources.items(): + if res in self.resources[node.host].keys(): + avail = self.resources_avail[node.host][res] + if amount > avail: + enough = False + break + if not enough: + return 'M' + + # allocate host resources + for res, amount in node.resources.items(): + if res in self.resources[node.host].keys(): + self.resources_avail[node.host][res] -= amount - n = int(self.howmany.get(hostname,0)) - if n >= int(args['maxproc']) : - # print "decline to run %s - %d other already running"%(node.name,n) - return 'M' - - n = n + 1 - self.howmany[hostname] = n - # print "running %s %s %d"%(hostname,node.name, n) - else : - # print "running %s %s no maxproc"%(hostname, node.name) - pass if debug : print "run",node.name @@ -178,10 +196,12 @@ class runner(object): # start running the process if debug : print "RUN",run - p = subprocess.Popen(args=run, + p = subprocess.Popen( + args=run, stdout=logfile, stderr=subprocess.STDOUT, - shell=False, close_fds=True) + shell=False, close_fds=True + ) # remember the popen object for the process; remember the open log file n = struct() @@ -208,12 +228,14 @@ class runner(object): args = self.get_host_info(node.host) - hostname = args['hostname'] - n = self.howmany[hostname] - 1 - self.howmany[hostname] = n + # de-allocate host resources + for res, amount in node.resources.items(): + if res in self.resources[node.host].keys(): + self.resources_avail[node.host][res] += amount if debug : + hostname = args['hostname'] print "finish %s %s %d"%(hostname,node_name,n) # note the termination of the process at the end of the log file @@ -324,10 +346,6 @@ class runner(object): if not 'hostname' in d : d['hostname'] = host - # default maximum processes is 1 - if not 'maxproc' in d : - d['maxproc'] = 1 - self.host_info_cache[host] = d return self.host_info_cache[host] diff --git a/steuermann/run_all.py b/steuermann/run_all.py index 1dcade4..832c884 100644 --- a/steuermann/run_all.py +++ b/steuermann/run_all.py @@ -25,6 +25,12 @@ except ImportError : readline = None +# global dicts for storing common resource info +# populated in main() from hosts INI file +common_resources = {} +common_resources_avail = {} + + username=getpass.getuser() ##### @@ -83,6 +89,9 @@ def main() : else : hosts_ini = os.path.join(os.path.dirname(__file__), 'hosts.ini') + # parse common resources from hosts INI file + get_common_resources(hosts_ini) + db = steuermann.config.open_db() if all : @@ -92,6 +101,38 @@ def main() : # +def get_common_resources(hosts_ini): + + if not os.path.exists(hosts_ini): + print 'ERROR - %s does not exist' + sys.exit(1) + + file = open(hosts_ini) + lines = [ln.strip() for ln in file.readlines()] + file.close() + + resource_lines = [] + in_resources = False + for ln in lines: + if '[common_resources]' in ln: + in_resources = True + continue + if in_resources: + if ln.startswith('['): + in_resources = False + continue + + if not ln.startswith(';') and len(ln) > 0: + resource_lines.append(ln) + + for ln in resource_lines: + if '=' in ln: + key, val = ln.split('=') + common_resources[key] = int(val) + common_resources_avail[key] = int(val) + +# + def find_wild_names( xnodes, name ) : print "find_wild",name l = [ ] @@ -293,7 +334,7 @@ def run_interactive( xnodes, run_name, hosts_ini, db) : db.commit() while 1 : - ( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db ) + ( runner, keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db ) if not keep_running : break if not no_sleep : @@ -307,11 +348,11 @@ def run_interactive( xnodes, run_name, hosts_ini, db) : if keep_running : print "run step" - ( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db ) + ( runner, keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db ) if len(runner.all_procs) == 0 : # give it a chance to start another - ( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db ) + ( runner, keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db ) if not keep_running : print 'all done' @@ -392,7 +433,7 @@ def run_all(xnodes, run_name, hosts_ini, db) : # will count how many times through there was nothing running while 1 : - ( keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db ) + ( runner, keep_running, no_sleep ) = run_step( runner, xnodes, run_name, db ) if not keep_running : break if not no_sleep : @@ -408,7 +449,7 @@ def run_all(xnodes, run_name, hosts_ini, db) : # def run_step( runner, xnodes, run_name, db ) : - + # flag to keep running keep_running = 0 @@ -421,6 +462,17 @@ def run_step( runner, xnodes, run_name, db ) : # skip nodes that we do not need to consider running because + # - not enough resources available + enough = True + for res, amount in x.resources.items(): + if res in common_resources.keys(): + avail = common_resources_avail[res] + if amount > avail: + enough = False + break + if not enough: + continue + # - it is not wanted if not x.wanted : continue @@ -459,6 +511,11 @@ def run_step( runner, xnodes, run_name, db ) : else : try : + # allocate common resources + for res, amount in x.resources.items(): + if res in common_resources_avail.keys(): + common_resources_avail[res] -= amount + tmp = runner.run(x, run_name, no_run=no_run, logfile_name = make_log_file_name(run_name, host, table, cmd) ) # print "STARTED",x_name except run.run_exception, e : @@ -480,7 +537,7 @@ def run_step( runner, xnodes, run_name, db ) : db.execute("UPDATE sm_status SET start_time = ?, status = 'S' WHERE ( run = ? AND host = ? AND tablename = ? AND cmd = ? )", ( str(datetime.datetime.now()), run_name, host, table, cmd ) ) elif tmp == 'M' : - # hit max proc - not run, but try again later + # hit resource cap - not run, but try again later pass else : print "WARNING: runner.run() returned unknown code %s"%str(tmp) @@ -494,32 +551,30 @@ def run_step( runner, xnodes, run_name, db ) : if not who_exited : break - print "SOMETHING EXITED",who_exited - # yes, something exited - no sleep, and keep running + # something exited; no sleep, keep running + print "SOMETHING EXITED", who_exited no_sleep = 1 keep_running = 1 - # note who and log it + # figure out which node exited x_host, x_table, x_cmd = nodes.crack_name(who_exited[0]) + x_name = '%s:%s/%s' %(x_host, x_table, x_cmd) + x = xnodes[x_name] - logs_exist = 0 - + + # de-allocate common resources + for res, amount in x.resources.items(): + if res in common_resources_avail.keys(): + common_resources_avail[res] += amount + + + # get auxiliary logs args = runner.get_host_info(x_host) workdir = args['workdir'] hostname = args['hostname'] - src = os.path.join(workdir, run_name, who_exited[0]) dst = os.path.join(steuermann.config.host_logs, run_name, who_exited[0]) - ''' - print - print "host " + hostname - print "workdir " + workdir - print src - print dst - print - ''' - try: os.system('mkdir -p %s' %os.path.dirname(dst)) except: @@ -529,22 +584,21 @@ def run_step( runner, xnodes, run_name, db ) : except: print 'scp failed' - + logs_exist = 0 if not os.path.exists(dst): print 'WARNING - %s does not exist' %dst else: - print os.listdir(dst) if len(os.listdir(dst)) > 0: logs_exist = 1 print 'FOUND SOME LOGS' + + # update node record in database db.execute("UPDATE sm_status SET end_time = ?, status = ?, logs = ? WHERE ( run = ? AND host = ? AND tablename = ? AND cmd = ? )", ( str(datetime.datetime.now()), who_exited[1], logs_exist, run_name, x_host, x_table, x_cmd ) ) db.commit() - # runner.display_procs() - - return ( keep_running, no_sleep ) + return ( runner, keep_running, no_sleep ) ##### |