#!/usr/bin/env python import os import sys import tarfile import time import string import StringIO import tempfile import numpy import argparse import psutil from matplotlib import pyplot class System_Info: def dump(self, path): print('### Cpu ###') cpu_times = psutil.cpu_times(percpu=True) cpu_percent = psutil.cpu_percent(percpu=True) print('## Times ##') for k in cpu_times: print(k) print('## Percent ##') for k in cpu_percent: print(k) print('### Memory ###') mem = psutil.virtual_memory() print(mem) class Statistics: def __init__(self): self.results = {} def store(self, key, value): try: self.results[key] = value except: return False return True def append(self, key, value): try: self.results[key].append(value) except: return False return True def retrieve(self, key): return self.results[key] def get_numpy(self, key): data = self.results[key] xlist = [] ylist = [] for x, y in data: xlist.append(x) ylist.append(y) x = numpy.array(xlist) y = numpy.array(ylist) return x, y def write_1d(self, filename, data): ''' filename: file name data: tuple of values ''' try: output = numpy.column_stack((data)) numpy.savetxt(filename, output, delimiter=' ') except IOError as e: print("%s: %s" % (filename, e.strerror)) return False return True def plot(self): pass class Stopwatch: def __init__(self): self.start_time = 0.0 self.stop_time = 0.0 def start(self): self.start_time = time.time() def stop(self): self.stop_time = time.time() def now(self): return (time.time() - self.start_time) def result(self): return (self.stop_time - self.start_time) def reset(self): self.start_time = 0.0 self.stop_time = 0.0 class Test_Controller: def __init__(self): print("Initializing %s" % (self.__class__)) self.timer = Stopwatch() self.prefix = 'benchy_' self.path_static = os.path.join(os.curdir, 'static') self.path_tests = os.path.join(os.curdir, 'tests') self.path_results = os.path.join(os.curdir, 'results') self.path_results_figures = os.path.join(self.path_results, 'figures') self.path_results_data = os.path.join(self.path_results, 'data') if not os.path.exists(self.path_static): os.mkdir(self.path_static) if not os.path.exists(self.path_tests): os.mkdir(self.path_tests) if not os.path.exists(self.path_results): os.mkdir(self.path_results) if not os.path.exists(self.path_results_figures): os.mkdir(self.path_results_figures) if not os.path.exists(self.path_results_data): os.mkdir(self.path_results_data) class Tar_test(Test_Controller, Statistics): def __init__(self, size=1): Test_Controller.__init__(self) Statistics.__init__(self) Statistics.store(self, 'write_archive', []) Statistics.store(self, 'read_archive', []) self.tarfile = os.path.join(self.path_static, 'test.tar') self.MEGABYTE = 1024000 self.size = size def write_archive(self): #tar = tarfile.open(os.path.join(self.path_results_static, '') pass def read_archive(self): self.timer.start() tar = tarfile.open(self.tarfile, 'r') for _ in tar: continue self.timer.stop() list_time = self.timer.result() self.timer.start() seek_time = self.timer.result() print("Path: %s\nEncoding: %s\nFiles listed in %0.2fs" % (tar.name, tar.encoding, list_time)) def write_out(self): pass def read_in(self): pass class Sequencial_test(Test_Controller, Statistics): def __init__(self, size=1): ''' size in megabytes bufsiz in bytes mode is 'buffered' or 'unbuffered' async is True or False ''' Test_Controller.__init__(self) Statistics.__init__(self) Statistics.store(self, 'write_buffered_sync', []) Statistics.store(self, 'write_buffered_async', []) Statistics.store(self, 'write_unbuffered_sync', []) Statistics.store(self, 'write_unbuffered_async', []) Statistics.store(self, 'read_buffered_sync', []) Statistics.store(self, 'read_buffered_async', []) Statistics.store(self, 'read_unbuffered_sync', []) Statistics.store(self, 'read_unbuffered_async', []) self.MEGABYTE = 1024000 self.size = size self.random_source = os.urandom(self.MEGABYTE * 10) def __del__(self): if os.path.exists(self.filename): os.remove(os.path.join(self.filename)) def _cleanup(self): self.__del__() def stress_test_wrapper(self, iter_max=1): self.stress_buffered_sync(iter_max) self.stress_buffered_async(iter_max) self.stress_unbuffered_sync(iter_max) self.stress_unbuffered_async(iter_max) def stress_buffered_sync(self, iter_max=1): print("Synchronous buffered I/O") for i in range(iter_max): print("Test: %d of %d" % (i + 1, iter_max)) self.write_buffered_sync() self.read_buffered_sync() self._cleanup() def stress_buffered_async(self, iter_max=1): print("Asynchronous buffered I/O") for i in range(iter_max): print("Test: %d of %d" % (i + 1, iter_max)) self.write_buffered_async() self.read_buffered_async() self._cleanup() def stress_unbuffered_sync(self, iter_max=1): print("Synchronous unbuffered I/O") for i in range(iter_max): print("Test: %d of %d" % (i + 1, iter_max)) self.write_unbuffered_sync() self.read_unbuffered_sync() self._cleanup() def stress_unbuffered_async(self, iter_max=1): print("Asynchronous unbuffered I/O") for i in range(iter_max): print("Test: %d of %d" % (i + 1, iter_max)) self.write_unbuffered_async() self.read_unbuffered_async() self._cleanup() def write_buffered_sync(self): Statistics.append(self, 'write_buffered_sync', self.write_out(size=self.size)) def write_buffered_async(self): Statistics.append(self, 'write_buffered_async', self.write_out(size=self.size, async=True)) def write_buffered_stepping(self): pass def write_unbuffered_sync(self): Statistics.append(self, 'write_unbuffered_sync', self.write_out(size=self.size, mode='unbuffered')) def write_unbuffered_async(self): Statistics.append(self, 'write_unbuffered_async', self.write_out(size=self.size, mode='unbuffered', async=True)) def write_unbuffered_stepping(self): pass def read_buffered_sync(self): Statistics.append(self, 'read_buffered_sync', self.read_in(size=self.size)) def read_buffered_async(self): Statistics.append(self, 'read_buffered_async', self.read_in(size=self.size, async=True)) def read_buffered_stepping(self): pass def read_unbuffered_sync(self): Statistics.append(self, 'read_unbuffered_sync', self.read_in(size=self.size, mode='unbuffered')) def read_unbuffered_async(self): Statistics.append(self, 'read_unbuffered_async', self.read_in(size=self.size, mode='unbuffered', async=True)) def read_unbuffered_stepping(self): pass def write_out(self, size, bufsiz=1024000, mode='buffered', async=False): size = size * self.MEGABYTE temp = tempfile.NamedTemporaryFile(dir=self.path_tests, prefix=self.prefix, delete=False) self.filename = temp.name outfile = file(self.filename, 'wb') total_bytes = 0 self.timer.start() while total_bytes <= size: try: source = StringIO.StringIO(self.random_source) if mode == 'buffered': buf = source.read(bufsiz) else: buf = source.read() outfile.write(buf) if not async: outfile.flush() os.fsync(outfile) total_bytes += len(buf) source.close() except: print('Failed') self.timer.stop() return False outfile.close() self.timer.stop() rate = (total_bytes / self.timer.result() / 1024 / 1024) elapsed = self.timer.result() return rate, elapsed def read_in(self, size, bufsiz=1024000, mode='buffered', async=False): size = size * self.MEGABYTE infile = file(self.filename, 'rb') total_bytes = 0 self.timer.start() while total_bytes <= size: try: if mode == 'buffered': buf = infile.read(bufsiz) else: buf = infile.read() if not async: infile.flush() os.fsync(infile) total_bytes += len(buf) except: print('Failed') self.timer.stop() return False infile.close() self.timer.stop() rate = (total_bytes / self.timer.result() / 1024 / 1024) elapsed = self.timer.result() return rate, elapsed if __name__ == "__main__": parser = argparse.ArgumentParser(description='Benchy the Benchmarker') parser.add_argument('-p', '--passes', default=10, action='store', type=int, required=False) parser.add_argument('-s', '--size-step', default=[1], action='store', type=int, required=False) args = vars(parser.parse_args()) sysinfo = System_Info() sysinfo.dump(os.curdir) for size in args['size_step']: print("### %dMB ####" % (size)) sequence_test = Sequencial_test(size) sequence_test.stress_buffered_sync(args['passes']) sequence_test.stress_buffered_async(args['passes']) sequence_test.stress_unbuffered_sync(args['passes']) sequence_test.stress_unbuffered_async(args['passes']) for k in sequence_test.results: print('%s' % (k)) rate, elapsed = sequence_test.get_numpy(k) rate_mean = numpy.repeat(numpy.mean(rate), rate.size, 0) elapsed_mean = numpy.repeat(numpy.mean(elapsed), elapsed.size, 0) print("\tRate mean: %0.4f" % (numpy.mean(rate_mean))) print("\tElapsed mean: %0.4f" % (numpy.mean(elapsed_mean))) pyplot.figure(1) pyplot.subplot(211) pyplot.title(k) pyplot.grid(True) pyplot.xlabel('Iteration') pyplot.ylabel('Rate (MB/s)') pyplot.plot(rate) pyplot.plot(rate_mean) pyplot.subplot(212) pyplot.grid(True) pyplot.xlabel('Iteration') pyplot.ylabel('Time to Write') pyplot.plot(elapsed) pyplot.plot(elapsed_mean) pyplot.savefig(\ os.path.join(sequence_test.path_results_figures, \ string.join([k, str(sequence_test.size)], sep='_'))) sequence_test.write_1d(\ os.path.join(sequence_test.path_results_data, \ string.join([k, str(sequence_test.size)], sep='_')), \ (rate, elapsed)) sequence_test.write_1d(\ os.path.join(sequence_test.path_results_data, \ string.join([k, 'mean', str(sequence_test.size)], sep='_')), \ (rate_mean, elapsed_mean)) pyplot.close() ''' tar_test = Tar_test() tar_test.read_archive() '''