aboutsummaryrefslogtreecommitdiff
path: root/htc_utils
diff options
context:
space:
mode:
Diffstat (limited to 'htc_utils')
-rw-r--r--htc_utils/CLI/__init__.py0
-rw-r--r--htc_utils/CLI/batch.py81
-rwxr-xr-xhtc_utils/CLI/split.py129
-rwxr-xr-xhtc_utils/CLI/wrap.py53
-rw-r--r--htc_utils/__init__.py1
-rw-r--r--htc_utils/bindings/__init__.py59
-rw-r--r--htc_utils/bindings/job.py146
-rw-r--r--htc_utils/bindings/submit.py80
8 files changed, 549 insertions, 0 deletions
diff --git a/htc_utils/CLI/__init__.py b/htc_utils/CLI/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/htc_utils/CLI/__init__.py
diff --git a/htc_utils/CLI/batch.py b/htc_utils/CLI/batch.py
new file mode 100644
index 0000000..efd9099
--- /dev/null
+++ b/htc_utils/CLI/batch.py
@@ -0,0 +1,81 @@
+#!/usr/bin/env python
+#
+# This file is part of htc_utils.
+#
+# htc_utils is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# htc_utils is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with htc_utils. If not, see <http://www.gnu.org/licenses/>.
+
+
+import argparse
+import htc_utils
+import time
+
+
+# We create a reference Job object to make a copy of the initial class state
+nulljob = htc_utils.Job('NULL')
+
+parser = argparse.ArgumentParser()
+parser.add_argument('-n', '--jobname', action='store', default=str(int(time.time())), help='Output filename without extension (Default: {Unix Epoch}.job')
+parser.add_argument('--stdout', action='store_true', help='Output job to stdout instead of a file')
+parser.add_argument('-q', '--queue', action='store', default='', help='Queue main job N times')
+parser.add_argument('-b', '--subqueue', action='store', default='', help='Queue sub-job N tims')
+parser.add_argument('-s', '--subarg', action='append')
+
+# Sort by key, then by value
+sorted_args = sorted(nulljob.config.items(), key=lambda record: record[0], reverse=False)
+sorted_args = sorted(sorted_args, key=lambda record: record[1], reverse=True)
+
+# nulljob is no longer required
+del nulljob
+
+# Proccess argument list and assign default values to the argument parser
+for attr, value in sorted_args:
+ if attr == 'executable':
+ continue
+ elif attr == 'arguments':
+ continue
+
+ default_value = '(Default: {0})'.format(value) if value else ''
+ parser.add_argument('--' + attr, default=value, help=default_value)
+
+parser.add_argument('executable')
+parser.add_argument('args', nargs='*')
+args = parser.parse_args()
+
+# Let's begin writing our HTCondor job
+job = htc_utils.Job(args.jobname)
+
+# Populate job attributes based on argparse input
+for key, value in vars(args).items():
+ # Ignore foreign keys
+ if key not in job.config:
+ continue
+ job.attr(key, value)
+
+# Manually assign important variables
+job.attr('executable', args.executable)
+job.attr('arguments', args.args)
+job.attr('queue', args.queue)
+
+# Process any sub-attributes we may have
+if args.subarg is not None:
+ for subarg in args.subarg:
+ job.subattr('arguments', subarg)
+ job.subattr('queue', args.subqueue)
+
+if args.stdout:
+ print(job.generate())
+else:
+ job.commit()
+ print('Wrote: {0}'.format(job.filename))
+
diff --git a/htc_utils/CLI/split.py b/htc_utils/CLI/split.py
new file mode 100755
index 0000000..6ecbd64
--- /dev/null
+++ b/htc_utils/CLI/split.py
@@ -0,0 +1,129 @@
+#!/usr/bin/env python
+#
+# This file is part of htc_utils.
+#
+# htc_utils is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# htc_utils is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with htc_utils. If not, see <http://www.gnu.org/licenses/>.
+
+
+import argparse
+import os
+import fnmatch
+import shutil
+import tempfile
+
+VERBOSE = False
+
+def path_search(p, ext, strip_components):
+ filenames = []
+ p = os.path.abspath(p)
+ index = 1
+ for root, _, files in os.walk(p):
+ for f in files:
+ path = os.path.join(root, f)
+ if not os.path.isfile(path):
+ continue
+ if fnmatch.fnmatch(path, ext):
+ if strip_components:
+ head, tail = os.path.split(path)
+ strip_max = len(head.split(os.path.sep))
+ if strip_components <= strip_max:
+ components = head.split(os.path.sep)[strip_components:]
+ path = os.path.join(os.path.sep.join(components), tail)
+ else:
+ # strip_components = strip_max
+ print("Warning: Cannot strip {0} components from {1} (max {2})".format(strip_components, path, strip_max))
+
+ if VERBOSE:
+ print("[{0}]:{1}".format(index, path))
+ filenames.append(path)
+ index += 1
+ return filenames
+
+def generate_manifest(manifest):
+ dest = tempfile.NamedTemporaryFile("w+", delete=False)
+ for line in manifest:
+ dest.write(line + os.linesep)
+ return dest.name
+
+def assign_chunks(manifest, chunks, output_dir, as_jobs=False):
+ label = 0
+ count = 0
+ written = 0
+ # max_chunks = 0
+ dest = None
+ output_dir = os.path.abspath(output_dir)
+
+ if VERBOSE:
+ print("Output directory: {0}".format(output_dir))
+
+ if not os.path.exists(output_dir):
+ os.makedirs(output_dir)
+ else:
+ shutil.rmtree(output_dir)
+ os.makedirs(output_dir)
+
+ if as_jobs:
+ manifest_count = len(file(manifest, 'r').readlines())
+ max_chunks = manifest_count / chunks
+ chunks = max_chunks
+
+
+ for line in open(manifest, 'r'):
+ if count % chunks == 0:
+ if dest:
+ dest.close()
+ if VERBOSE:
+ print("[{0}]: {1}, {2} entries".format(label, os.path.basename(dest.name), written))
+ written = 0
+ dest = file(os.path.join(output_dir, "stdin." + str(label)), 'w')
+ label += 1
+ dest.write(line)
+ count += 1
+ written += 1
+ os.unlink(manifest)
+
+
+def main():
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--strip-components", default=0, type=int, help="Top-level directories to strip")
+ parser.add_argument("--output-dir", default="input")
+ parser.add_argument("--extension", default="*.*", type=str, help="Extension of files")
+ parser.add_argument("--chunks", default=1, type=int, help="Number of entries per file")
+ parser.add_argument("--as-jobs", action="store_true", help="Number of expected jobs")
+ parser.add_argument("--verbose", action="store_true")
+ parser.add_argument("search_path", action="store", type=str, help="Directory to search under")
+ args = parser.parse_args()
+
+ global VERBOSE
+ VERBOSE = args.verbose
+ chunks = args.chunks
+ if chunks < 1:
+ chunks = 1
+ index_at = 0
+
+ files = path_search(args.search_path, args.extension, args.strip_components)
+ if files:
+ index_at = 1
+
+ if VERBOSE:
+ print("{0} files".format(len(files)))
+
+ if not files:
+ exit(0)
+
+ manifest = generate_manifest(files)
+ assign_chunks(manifest, chunks, args.output_dir, as_jobs=args.as_jobs)
+
+if __name__ == "__main__":
+ main()
diff --git a/htc_utils/CLI/wrap.py b/htc_utils/CLI/wrap.py
new file mode 100755
index 0000000..ba8cbfc
--- /dev/null
+++ b/htc_utils/CLI/wrap.py
@@ -0,0 +1,53 @@
+#!/usr/bin/env python
+#
+# This file is part of htc_utils.
+#
+# htc_utils is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# htc_utils is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with htc_utils. If not, see <http://www.gnu.org/licenses/>.
+
+import argparse
+import os
+import sys
+import subprocess
+
+def main():
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--verbose", action="store_true", help="Be more verbose")
+ parser.add_argument("job", action="store", help="Path to executable")
+ parser.add_argument("indata", nargs='?', type=argparse.FileType('r'), default=sys.stdin, help="Standard input")
+ args = parser.parse_args()
+
+ verbose = args.verbose
+
+ job = [os.path.abspath(args.job)]
+ indata = args.indata
+ output = []
+ if isinstance(indata, file):
+ for line in indata.readlines():
+ line = line.strip()
+ line = os.path.abspath(line)
+ output.append(line)
+ else:
+ output = indata
+
+ compiled = job + output
+ if verbose:
+ print("Job: {0}".format(compiled))
+
+ process = subprocess.Popen(compiled, stdout=sys.stdout, stderr=sys.stderr)
+ process.communicate()
+ process.wait()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/htc_utils/__init__.py b/htc_utils/__init__.py
new file mode 100644
index 0000000..55734c8
--- /dev/null
+++ b/htc_utils/__init__.py
@@ -0,0 +1 @@
+from .bindings import * \ No newline at end of file
diff --git a/htc_utils/bindings/__init__.py b/htc_utils/bindings/__init__.py
new file mode 100644
index 0000000..9b50384
--- /dev/null
+++ b/htc_utils/bindings/__init__.py
@@ -0,0 +1,59 @@
+import os
+import subprocess
+from collections import OrderedDict
+from distutils.spawn import find_executable
+
+__all__ = ['htcondor_path', 'recast', 'Job', 'Submit']
+
+def htcondor_path(path=None):
+ needles = ['condor_master', 'condor_submit']
+ paths = []
+ if path is not None:
+ os.environ['PATH'] = ':'.join([os.path.join(path, 'bin'), os.environ['PATH']])
+ os.environ['PATH'] = ':'.join([os.path.join(path, 'sbin'), os.environ['PATH']])
+
+ for needle in needles:
+ paths.append(os.path.dirname(find_executable(needle)))
+
+ if not paths:
+ raise OSError('Unable to find a valid HTCondor installation.')
+
+ return ':'.join(paths)
+
+
+def recast(value):
+ ''' Convert value to string
+ '''
+ if type(value) is bool and value == True:
+ value = 'true'
+ return value
+
+ if type(value) is bool and value == False:
+ value = 'false'
+ return value
+
+ if isinstance(value, list) or \
+ isinstance(value, tuple):
+ values = ' '.join([ str(x) for x in value ])
+ return values
+
+ if isinstance(value, dict):
+ temp = []
+ for key, val in value.items():
+ val = str(val)
+ temp.append('='.join([str(key), str(val)]))
+ # Ad-hoc string quoting
+ return ';'.join(str(x) for x in temp)
+
+
+ try:
+ value = str(value)
+ except:
+ raise TypeError('Unable to cast "{}" to str'.format(type(value)))
+
+ return value
+
+
+
+from .job import *
+from .submit import * \ No newline at end of file
diff --git a/htc_utils/bindings/job.py b/htc_utils/bindings/job.py
new file mode 100644
index 0000000..79771fc
--- /dev/null
+++ b/htc_utils/bindings/job.py
@@ -0,0 +1,146 @@
+# This file is part of htc_utils.
+#
+# htc_utils is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# htc_utils is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with htc_utils. If not, see <http://www.gnu.org/licenses/>.
+
+
+import os
+from . import recast
+from collections import OrderedDict
+
+
+class Job(object):
+ def __init__(self, filename, ext='job', * args, **kwargs):
+ ''' Example usage
+ import random
+ from condor_batch import Job, Submit
+
+ # Create job object (filename will be "my_neato_condor.job")
+ job = Job('my_neato_condor')
+
+ # Populate initial job attributes
+ job.attr('getenv', True)
+ job.attr('executable', '/bin/sleep')
+ job.attr('arguments', 1)
+ job.logging('logs', create=True)
+ job.attr('queue')
+
+ # Populate additional sub-attributes (i.e. re-execute job with
+ # a different set of arguments.
+ for fileno in range(5):
+ x = random.Random().randint(2, 5)
+ job.subattr('arguments', x)
+ job.subattr('queue')
+
+ # Write job file to disk
+ job.commit()
+
+ # Pass our job to the Submit class
+ submit = Submit(job)
+
+ # Send our job to the cluster
+ submit.execute()
+ '''
+
+
+ self.filename = os.path.abspath('.'.join([filename, ext]))
+ self.shebang = '#!/usr/bin/env condor_submit'
+
+ self.config_ext = []
+ self.config = OrderedDict()
+
+ self.default_config = [
+ self.attr('universe', 'vanilla'),
+ self.attr('getenv', True),
+ self.attr('environment', ''),
+ self.attr('executable', ''),
+ self.attr('arguments', ''),
+ self.attr('notification', 'Never'),
+ self.attr('notify_user', ''),
+ self.attr('priority', 0),
+ self.attr('rank', ''),
+ self.attr('input', ''),
+ self.attr('request_cpus', ''),
+ self.attr('request_disk', ''),
+ self.attr('request_memory', ''),
+ self.attr('requirements', ''),
+ self.attr('transfer_executable', True),
+ self.attr('transfer_input_files', []),
+ self.attr('transfer_output_files', ''),
+ self.attr('transfer_output_remaps', ''),
+ self.attr('should_transfer_files', 'IF_NEEDED'),
+ self.attr('when_to_transfer_output', 'ON_EXIT'),
+ self.attr('hold', False),
+ self.attr('initialdir', ''),
+ self.attr('remote_initialdir', ''),
+ self.attr('run_as_owner', ''),
+ self.attr('nice_user', ''),
+ self.attr('stack_size', ''),
+ ]
+
+ for key, value in kwargs.items():
+ self.attr(key, value)
+
+
+ def generate(self):
+ '''Return a
+ '''
+ if not self.config:
+ print("Warning: No attributes defined!")
+ output = ''
+
+ # Parse initial job attributes
+ for key, value in self.config.items():
+ if value and key != 'queue':
+ output += ' = '.join([key, value])
+ output += os.linesep
+ if key == 'queue':
+ output += ' '.join([key, value])
+ output += os.linesep
+
+ # Parse job sub-attributes
+ for sub in self.config_ext:
+ output += os.linesep
+ for key, value in sub.items():
+ if value and key != 'queue':
+ output += ' = '.join([key, value])
+ # output += os.linesep
+ if key == 'queue':
+ output += ' '.join([key, value])
+ output += os.linesep
+
+ return output
+
+ def logging(self, path, ext='log', create=False):
+ if create and not os.path.exists(path):
+ os.mkdir(os.path.abspath(path))
+
+ self.attr('log', os.path.normpath(os.path.join(path, '.'.join(['condor_$(Cluster)', ext]))))
+ self.attr('output', os.path.normpath(os.path.join(path, '.'.join(['stdout_$(Cluster)_$(Process)', ext]))))
+ self.attr('error', os.path.normpath(os.path.join(path, '.'.join(['stderr_$(Cluster)_$(Process)', ext]))))
+
+ def attr(self, key, *args):
+ value = ' '.join([recast(x) for x in args])
+ self.config[key] = recast(value)
+
+ def subattr(self, key, *args):
+ value = ' '.join([recast(x) for x in args])
+ self.config_ext.append(OrderedDict([(key, recast(value))]))
+
+ def commit(self, ext='job'):
+ with open(self.filename, 'w+') as submit_file:
+ submit_file.write(self.generate())
+
+ def reset(self):
+ self.config = OrderedDict()
+ self.config_ext = OrderedDict()
diff --git a/htc_utils/bindings/submit.py b/htc_utils/bindings/submit.py
new file mode 100644
index 0000000..fba59a5
--- /dev/null
+++ b/htc_utils/bindings/submit.py
@@ -0,0 +1,80 @@
+# This file is part of htc_utils.
+#
+# htc_utils is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# htc_utils is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with htc_utils. If not, see <http://www.gnu.org/licenses/>.
+
+
+import os
+import subprocess
+from . import htcondor_path
+
+
+class Submit(object):
+ def __init__(self, job, **kwargs):
+ self.job = job
+ self.cluster = None
+ self.environ = os.environ
+ self._prefix = ':'.join([os.environ['PATH'], htcondor_path()])
+ self.environ['PATH'] = self._prefix
+ self.cli_args = []
+
+ print("Received job: {}".format(self.job.filename))
+
+ def sendto_name(self, schedd_name):
+ self.cli_args.append(['-name', schedd_name])
+
+ def sendto_remote(self, schedd_name):
+ self.cli_args.append(['-remote', schedd_name])
+
+ def sendto_addr(self, ip, port):
+ addr = ':'.join([ip, str(port)])
+ addr = '<' + addr + '>'
+ self.cli_args.append(['-addr', addr])
+
+ def sendto_pool(self, pool_name):
+ self.cli_args.append(['-pool', pool_name])
+
+ def toggle_verbose(self):
+ self.cli_args.append(['-verbose'])
+
+ def toggle_unused_variables(self):
+ self.cli_args.append(['-unused'])
+
+ def execute(self):
+ condor_submit_args = ' '.join([str(arg) for record in self.cli_args for arg in record])
+ condor_submit = ' '.join(['condor_submit', condor_submit_args, self.job.filename])
+ proc = subprocess.Popen(condor_submit.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.environ)
+ stdout, stderr = proc.communicate()
+ print(stdout)
+ print(stderr)
+ proc.wait()
+
+ if not stderr:
+ if 'cluster' in stdout:
+ for line in stdout.split(os.linesep):
+ if 'cluster' in line:
+ self.cluster = line.split()[-1].strip('.')
+
+ def monitor(self):
+ if self.cluster is None:
+ print('No cluster data for job.')
+ return False
+ print('Monitoring cluster {}'.format(self.cluster))
+ os.environ['PATH'] = self._prefix
+ '''
+ proc = subprocess.Popen('condor_q -analyze:summary'.split(), env=self.environ)
+ proc.communicate()
+ proc.wait()
+ '''
+ print('NOT IMPLEMENTED')
+ return True