diff options
Diffstat (limited to 'htc_utils')
-rw-r--r-- | htc_utils/CLI/__init__.py | 0 | ||||
-rw-r--r-- | htc_utils/CLI/batch.py | 81 | ||||
-rwxr-xr-x | htc_utils/CLI/split.py | 129 | ||||
-rwxr-xr-x | htc_utils/CLI/wrap.py | 53 | ||||
-rw-r--r-- | htc_utils/__init__.py | 1 | ||||
-rw-r--r-- | htc_utils/bindings/__init__.py | 59 | ||||
-rw-r--r-- | htc_utils/bindings/job.py | 146 | ||||
-rw-r--r-- | htc_utils/bindings/submit.py | 80 |
8 files changed, 549 insertions, 0 deletions
diff --git a/htc_utils/CLI/__init__.py b/htc_utils/CLI/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/htc_utils/CLI/__init__.py diff --git a/htc_utils/CLI/batch.py b/htc_utils/CLI/batch.py new file mode 100644 index 0000000..efd9099 --- /dev/null +++ b/htc_utils/CLI/batch.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python +# +# This file is part of htc_utils. +# +# htc_utils is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# htc_utils is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with htc_utils. If not, see <http://www.gnu.org/licenses/>. + + +import argparse +import htc_utils +import time + + +# We create a reference Job object to make a copy of the initial class state +nulljob = htc_utils.Job('NULL') + +parser = argparse.ArgumentParser() +parser.add_argument('-n', '--jobname', action='store', default=str(int(time.time())), help='Output filename without extension (Default: {Unix Epoch}.job') +parser.add_argument('--stdout', action='store_true', help='Output job to stdout instead of a file') +parser.add_argument('-q', '--queue', action='store', default='', help='Queue main job N times') +parser.add_argument('-b', '--subqueue', action='store', default='', help='Queue sub-job N tims') +parser.add_argument('-s', '--subarg', action='append') + +# Sort by key, then by value +sorted_args = sorted(nulljob.config.items(), key=lambda record: record[0], reverse=False) +sorted_args = sorted(sorted_args, key=lambda record: record[1], reverse=True) + +# nulljob is no longer required +del nulljob + +# Proccess argument list and assign default values to the argument parser +for attr, value in sorted_args: + if attr == 'executable': + continue + elif attr == 'arguments': + continue + + default_value = '(Default: {0})'.format(value) if value else '' + parser.add_argument('--' + attr, default=value, help=default_value) + +parser.add_argument('executable') +parser.add_argument('args', nargs='*') +args = parser.parse_args() + +# Let's begin writing our HTCondor job +job = htc_utils.Job(args.jobname) + +# Populate job attributes based on argparse input +for key, value in vars(args).items(): + # Ignore foreign keys + if key not in job.config: + continue + job.attr(key, value) + +# Manually assign important variables +job.attr('executable', args.executable) +job.attr('arguments', args.args) +job.attr('queue', args.queue) + +# Process any sub-attributes we may have +if args.subarg is not None: + for subarg in args.subarg: + job.subattr('arguments', subarg) + job.subattr('queue', args.subqueue) + +if args.stdout: + print(job.generate()) +else: + job.commit() + print('Wrote: {0}'.format(job.filename)) + diff --git a/htc_utils/CLI/split.py b/htc_utils/CLI/split.py new file mode 100755 index 0000000..6ecbd64 --- /dev/null +++ b/htc_utils/CLI/split.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python +# +# This file is part of htc_utils. +# +# htc_utils is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# htc_utils is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with htc_utils. If not, see <http://www.gnu.org/licenses/>. + + +import argparse +import os +import fnmatch +import shutil +import tempfile + +VERBOSE = False + +def path_search(p, ext, strip_components): + filenames = [] + p = os.path.abspath(p) + index = 1 + for root, _, files in os.walk(p): + for f in files: + path = os.path.join(root, f) + if not os.path.isfile(path): + continue + if fnmatch.fnmatch(path, ext): + if strip_components: + head, tail = os.path.split(path) + strip_max = len(head.split(os.path.sep)) + if strip_components <= strip_max: + components = head.split(os.path.sep)[strip_components:] + path = os.path.join(os.path.sep.join(components), tail) + else: + # strip_components = strip_max + print("Warning: Cannot strip {0} components from {1} (max {2})".format(strip_components, path, strip_max)) + + if VERBOSE: + print("[{0}]:{1}".format(index, path)) + filenames.append(path) + index += 1 + return filenames + +def generate_manifest(manifest): + dest = tempfile.NamedTemporaryFile("w+", delete=False) + for line in manifest: + dest.write(line + os.linesep) + return dest.name + +def assign_chunks(manifest, chunks, output_dir, as_jobs=False): + label = 0 + count = 0 + written = 0 + # max_chunks = 0 + dest = None + output_dir = os.path.abspath(output_dir) + + if VERBOSE: + print("Output directory: {0}".format(output_dir)) + + if not os.path.exists(output_dir): + os.makedirs(output_dir) + else: + shutil.rmtree(output_dir) + os.makedirs(output_dir) + + if as_jobs: + manifest_count = len(file(manifest, 'r').readlines()) + max_chunks = manifest_count / chunks + chunks = max_chunks + + + for line in open(manifest, 'r'): + if count % chunks == 0: + if dest: + dest.close() + if VERBOSE: + print("[{0}]: {1}, {2} entries".format(label, os.path.basename(dest.name), written)) + written = 0 + dest = file(os.path.join(output_dir, "stdin." + str(label)), 'w') + label += 1 + dest.write(line) + count += 1 + written += 1 + os.unlink(manifest) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--strip-components", default=0, type=int, help="Top-level directories to strip") + parser.add_argument("--output-dir", default="input") + parser.add_argument("--extension", default="*.*", type=str, help="Extension of files") + parser.add_argument("--chunks", default=1, type=int, help="Number of entries per file") + parser.add_argument("--as-jobs", action="store_true", help="Number of expected jobs") + parser.add_argument("--verbose", action="store_true") + parser.add_argument("search_path", action="store", type=str, help="Directory to search under") + args = parser.parse_args() + + global VERBOSE + VERBOSE = args.verbose + chunks = args.chunks + if chunks < 1: + chunks = 1 + index_at = 0 + + files = path_search(args.search_path, args.extension, args.strip_components) + if files: + index_at = 1 + + if VERBOSE: + print("{0} files".format(len(files))) + + if not files: + exit(0) + + manifest = generate_manifest(files) + assign_chunks(manifest, chunks, args.output_dir, as_jobs=args.as_jobs) + +if __name__ == "__main__": + main() diff --git a/htc_utils/CLI/wrap.py b/htc_utils/CLI/wrap.py new file mode 100755 index 0000000..ba8cbfc --- /dev/null +++ b/htc_utils/CLI/wrap.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python +# +# This file is part of htc_utils. +# +# htc_utils is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# htc_utils is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with htc_utils. If not, see <http://www.gnu.org/licenses/>. + +import argparse +import os +import sys +import subprocess + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--verbose", action="store_true", help="Be more verbose") + parser.add_argument("job", action="store", help="Path to executable") + parser.add_argument("indata", nargs='?', type=argparse.FileType('r'), default=sys.stdin, help="Standard input") + args = parser.parse_args() + + verbose = args.verbose + + job = [os.path.abspath(args.job)] + indata = args.indata + output = [] + if isinstance(indata, file): + for line in indata.readlines(): + line = line.strip() + line = os.path.abspath(line) + output.append(line) + else: + output = indata + + compiled = job + output + if verbose: + print("Job: {0}".format(compiled)) + + process = subprocess.Popen(compiled, stdout=sys.stdout, stderr=sys.stderr) + process.communicate() + process.wait() + + +if __name__ == "__main__": + main() diff --git a/htc_utils/__init__.py b/htc_utils/__init__.py new file mode 100644 index 0000000..55734c8 --- /dev/null +++ b/htc_utils/__init__.py @@ -0,0 +1 @@ +from .bindings import *
\ No newline at end of file diff --git a/htc_utils/bindings/__init__.py b/htc_utils/bindings/__init__.py new file mode 100644 index 0000000..9b50384 --- /dev/null +++ b/htc_utils/bindings/__init__.py @@ -0,0 +1,59 @@ +import os +import subprocess +from collections import OrderedDict +from distutils.spawn import find_executable + +__all__ = ['htcondor_path', 'recast', 'Job', 'Submit'] + +def htcondor_path(path=None): + needles = ['condor_master', 'condor_submit'] + paths = [] + if path is not None: + os.environ['PATH'] = ':'.join([os.path.join(path, 'bin'), os.environ['PATH']]) + os.environ['PATH'] = ':'.join([os.path.join(path, 'sbin'), os.environ['PATH']]) + + for needle in needles: + paths.append(os.path.dirname(find_executable(needle))) + + if not paths: + raise OSError('Unable to find a valid HTCondor installation.') + + return ':'.join(paths) + + +def recast(value): + ''' Convert value to string + ''' + if type(value) is bool and value == True: + value = 'true' + return value + + if type(value) is bool and value == False: + value = 'false' + return value + + if isinstance(value, list) or \ + isinstance(value, tuple): + values = ' '.join([ str(x) for x in value ]) + return values + + if isinstance(value, dict): + temp = [] + for key, val in value.items(): + val = str(val) + temp.append('='.join([str(key), str(val)])) + # Ad-hoc string quoting + return ';'.join(str(x) for x in temp) + + + try: + value = str(value) + except: + raise TypeError('Unable to cast "{}" to str'.format(type(value))) + + return value + + + +from .job import * +from .submit import *
\ No newline at end of file diff --git a/htc_utils/bindings/job.py b/htc_utils/bindings/job.py new file mode 100644 index 0000000..79771fc --- /dev/null +++ b/htc_utils/bindings/job.py @@ -0,0 +1,146 @@ +# This file is part of htc_utils. +# +# htc_utils is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# htc_utils is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with htc_utils. If not, see <http://www.gnu.org/licenses/>. + + +import os +from . import recast +from collections import OrderedDict + + +class Job(object): + def __init__(self, filename, ext='job', * args, **kwargs): + ''' Example usage + import random + from condor_batch import Job, Submit + + # Create job object (filename will be "my_neato_condor.job") + job = Job('my_neato_condor') + + # Populate initial job attributes + job.attr('getenv', True) + job.attr('executable', '/bin/sleep') + job.attr('arguments', 1) + job.logging('logs', create=True) + job.attr('queue') + + # Populate additional sub-attributes (i.e. re-execute job with + # a different set of arguments. + for fileno in range(5): + x = random.Random().randint(2, 5) + job.subattr('arguments', x) + job.subattr('queue') + + # Write job file to disk + job.commit() + + # Pass our job to the Submit class + submit = Submit(job) + + # Send our job to the cluster + submit.execute() + ''' + + + self.filename = os.path.abspath('.'.join([filename, ext])) + self.shebang = '#!/usr/bin/env condor_submit' + + self.config_ext = [] + self.config = OrderedDict() + + self.default_config = [ + self.attr('universe', 'vanilla'), + self.attr('getenv', True), + self.attr('environment', ''), + self.attr('executable', ''), + self.attr('arguments', ''), + self.attr('notification', 'Never'), + self.attr('notify_user', ''), + self.attr('priority', 0), + self.attr('rank', ''), + self.attr('input', ''), + self.attr('request_cpus', ''), + self.attr('request_disk', ''), + self.attr('request_memory', ''), + self.attr('requirements', ''), + self.attr('transfer_executable', True), + self.attr('transfer_input_files', []), + self.attr('transfer_output_files', ''), + self.attr('transfer_output_remaps', ''), + self.attr('should_transfer_files', 'IF_NEEDED'), + self.attr('when_to_transfer_output', 'ON_EXIT'), + self.attr('hold', False), + self.attr('initialdir', ''), + self.attr('remote_initialdir', ''), + self.attr('run_as_owner', ''), + self.attr('nice_user', ''), + self.attr('stack_size', ''), + ] + + for key, value in kwargs.items(): + self.attr(key, value) + + + def generate(self): + '''Return a + ''' + if not self.config: + print("Warning: No attributes defined!") + output = '' + + # Parse initial job attributes + for key, value in self.config.items(): + if value and key != 'queue': + output += ' = '.join([key, value]) + output += os.linesep + if key == 'queue': + output += ' '.join([key, value]) + output += os.linesep + + # Parse job sub-attributes + for sub in self.config_ext: + output += os.linesep + for key, value in sub.items(): + if value and key != 'queue': + output += ' = '.join([key, value]) + # output += os.linesep + if key == 'queue': + output += ' '.join([key, value]) + output += os.linesep + + return output + + def logging(self, path, ext='log', create=False): + if create and not os.path.exists(path): + os.mkdir(os.path.abspath(path)) + + self.attr('log', os.path.normpath(os.path.join(path, '.'.join(['condor_$(Cluster)', ext])))) + self.attr('output', os.path.normpath(os.path.join(path, '.'.join(['stdout_$(Cluster)_$(Process)', ext])))) + self.attr('error', os.path.normpath(os.path.join(path, '.'.join(['stderr_$(Cluster)_$(Process)', ext])))) + + def attr(self, key, *args): + value = ' '.join([recast(x) for x in args]) + self.config[key] = recast(value) + + def subattr(self, key, *args): + value = ' '.join([recast(x) for x in args]) + self.config_ext.append(OrderedDict([(key, recast(value))])) + + def commit(self, ext='job'): + with open(self.filename, 'w+') as submit_file: + submit_file.write(self.generate()) + + def reset(self): + self.config = OrderedDict() + self.config_ext = OrderedDict() diff --git a/htc_utils/bindings/submit.py b/htc_utils/bindings/submit.py new file mode 100644 index 0000000..fba59a5 --- /dev/null +++ b/htc_utils/bindings/submit.py @@ -0,0 +1,80 @@ +# This file is part of htc_utils. +# +# htc_utils is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# htc_utils is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with htc_utils. If not, see <http://www.gnu.org/licenses/>. + + +import os +import subprocess +from . import htcondor_path + + +class Submit(object): + def __init__(self, job, **kwargs): + self.job = job + self.cluster = None + self.environ = os.environ + self._prefix = ':'.join([os.environ['PATH'], htcondor_path()]) + self.environ['PATH'] = self._prefix + self.cli_args = [] + + print("Received job: {}".format(self.job.filename)) + + def sendto_name(self, schedd_name): + self.cli_args.append(['-name', schedd_name]) + + def sendto_remote(self, schedd_name): + self.cli_args.append(['-remote', schedd_name]) + + def sendto_addr(self, ip, port): + addr = ':'.join([ip, str(port)]) + addr = '<' + addr + '>' + self.cli_args.append(['-addr', addr]) + + def sendto_pool(self, pool_name): + self.cli_args.append(['-pool', pool_name]) + + def toggle_verbose(self): + self.cli_args.append(['-verbose']) + + def toggle_unused_variables(self): + self.cli_args.append(['-unused']) + + def execute(self): + condor_submit_args = ' '.join([str(arg) for record in self.cli_args for arg in record]) + condor_submit = ' '.join(['condor_submit', condor_submit_args, self.job.filename]) + proc = subprocess.Popen(condor_submit.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.environ) + stdout, stderr = proc.communicate() + print(stdout) + print(stderr) + proc.wait() + + if not stderr: + if 'cluster' in stdout: + for line in stdout.split(os.linesep): + if 'cluster' in line: + self.cluster = line.split()[-1].strip('.') + + def monitor(self): + if self.cluster is None: + print('No cluster data for job.') + return False + print('Monitoring cluster {}'.format(self.cluster)) + os.environ['PATH'] = self._prefix + ''' + proc = subprocess.Popen('condor_q -analyze:summary'.split(), env=self.environ) + proc.communicate() + proc.wait() + ''' + print('NOT IMPLEMENTED') + return True |