#!/usr/bin/env python import os import time import string import StringIO import tempfile import numpy from matplotlib import pyplot class Statistics: def __init__(self): print("Initializing %s" % (self.__class__)) self.results = {} pass def store(self, key, value): try: self.results[key] = value except: return False return True def append(self, key, value): try: self.results[key].append(value) except: return False return True def retrieve(self): return self.results[self.name] def get_numpy(self, key): data = self.results[key] xlist = [] ylist = [] for x, y in data: xlist.append(x) ylist.append(y) x = numpy.array(xlist) y = numpy.array(ylist) return x, y def plot(self): pass class Stopwatch: def __init__(self): self.start_time = 0.0 self.stop_time = 0.0 def start(self): self.start_time = time.time() def stop(self): self.stop_time = time.time() def now(self): return (time.time() - self.start_time) def result(self): return (self.stop_time - self.start_time) class Sequencial_test(Statistics): def __init__(self, size=1): ''' size in megabytes bufsiz in bytes mode is 'buffered' or 'unbuffered' async is True or False ''' Statistics.__init__(self) Statistics.store(self, 'write_buffered_sync', []) Statistics.store(self, 'write_buffered_async', []) Statistics.store(self, 'write_unbuffered_sync', []) Statistics.store(self, 'write_unbuffered_async', []) Statistics.store(self, 'read_buffered_sync', []) Statistics.store(self, 'read_buffered_async', []) Statistics.store(self, 'read_unbuffered_sync', []) Statistics.store(self, 'read_unbuffered_async', []) self.MEGABYTE = 1024000 self.size = size self.timer = Stopwatch() self.random_source = os.urandom(self.MEGABYTE * 10) self.prefix='sequencial_test_' self.path_tests = os.path.join(os.curdir, 'tests') self.path_results = os.path.join(os.curdir, 'results') if not os.path.exists(self.path_tests): os.mkdir(self.path_tests) if not os.path.exists(self.path_results): os.mkdir(self.path_results) def __del__(self): if os.path.exists(self.filename): os.remove(os.path.join(self.filename)) def _cleanup(self): self.__del__() def stress_test_wrapper(self, iter_max=1): self.stress_buffered_sync(iter_max) self.stress_buffered_async(iter_max) self.stress_unbuffered_sync(iter_max) self.stress_unbuffered_async(iter_max) def stress_buffered_sync(self, iter_max=1): print("Synchronous buffered I/O") for i in range(iter_max): print("\rTest: %d of %d" % (i+1, iter_max)) self.write_buffered_sync() self.read_buffered_sync() self._cleanup() def stress_buffered_async(self, iter_max=1): print("Asynchronous buffered I/O") for i in range(iter_max): print("\rTest: %d of %d" % (i+1, iter_max)) self.write_buffered_async() self.read_buffered_async() self._cleanup() def stress_unbuffered_sync(self, iter_max=1): print("Synchronous unbuffered I/O") for i in range(iter_max): print("\rTest: %d of %d\r" % (i+1, iter_max)) self.write_unbuffered_sync() self.read_unbuffered_sync() self._cleanup() def stress_unbuffered_async(self, iter_max=1): print("Asynchronous unbuffered I/O") for i in range(iter_max): print("\rTest: %d of %d\r" % (i+1, iter_max)) self.write_unbuffered_async() self.read_unbuffered_async() self._cleanup() def write_buffered_sync(self): Statistics.append(self,'write_buffered_sync', self.write_out(size=self.size)) def write_buffered_async(self): Statistics.append(self, 'write_buffered_async', self.write_out(size=self.size, async=True)) def write_buffered_stepping(self): pass def write_unbuffered_sync(self): Statistics.append(self, 'write_unbuffered_sync', self.write_out(size=self.size, mode='unbuffered')) def write_unbuffered_async(self): Statistics.append(self, 'write_unbuffered_async', self.write_out(size=self.size, mode='unbuffered', async=True)) def write_unbuffered_stepping(self): pass def read_buffered_sync(self): Statistics.append(self,'read_buffered_sync', self.read_in(size=self.size)) def read_buffered_async(self): Statistics.append(self, 'read_buffered_async', self.read_in(size=self.size, async=True)) def read_buffered_stepping(self): pass def read_unbuffered_sync(self): Statistics.append(self, 'read_unbuffered_sync', self.read_in(size=self.size, mode='unbuffered')) def read_unbuffered_async(self): Statistics.append(self, 'read_unbuffered_async', self.read_in(size=self.size, mode='unbuffered', async=True)) def read_unbuffered_stepping(self): pass def write_out(self, size, bufsiz=1024000, mode='buffered', async=False): size = size * self.MEGABYTE temp = tempfile.NamedTemporaryFile(dir=self.path_tests, prefix=self.prefix, delete=False) self.filename = temp.name outfile = file(self.filename, 'wb') total_bytes = 0 self.timer.start() while total_bytes <= size: try: source = StringIO.StringIO(self.random_source) if mode == 'buffered': buf = source.read(bufsiz) else: buf = source.read() outfile.write(buf) if not async: outfile.flush() os.fsync(outfile) total_bytes += len(buf) source.close() except: print('Failed') self.timer.stop() return False outfile.close() self.timer.stop() rate = (total_bytes / self.timer.result() / 1024 / 1024) elapsed = self.timer.result() return rate, elapsed def read_in(self, size, bufsiz=1024000, mode='buffered', async=False): size = size * self.MEGABYTE infile = file(self.filename, 'rb') total_bytes = 0 self.timer.start() while total_bytes <= size: try: if mode == 'buffered': buf = infile.read(bufsiz) else: buf = infile.read() if not async: infile.flush() os.fsync(infile) total_bytes += len(buf) except: print('Failed') self.timer.stop() return False infile.close() self.timer.stop() rate = (total_bytes / self.timer.result() / 1024 / 1024) elapsed = self.timer.result() return rate, elapsed if __name__ == "__main__": size_steps = [ 1, 4, 8 ] buffer_steps = [ 512000, 1024000, 4096000 ] iter_max=25 for size in size_steps: print("### %dMB ####" % (size)) sequence_test = Sequencial_test(size) sequence_test.stress_buffered_sync(iter_max) sequence_test.stress_buffered_async(iter_max) sequence_test.stress_unbuffered_sync(iter_max) sequence_test.stress_unbuffered_async(iter_max) for k in sequence_test.results: print('Plotting %s' % (k)) rate, elapsed = sequence_test.get_numpy(k) rate_mean = numpy.repeat(numpy.mean(rate), rate.size, 0) #rate_average = numpy.repeat(numpy.average(rate), rate.size, 0) elapsed_mean = numpy.repeat(numpy.mean(elapsed), elapsed.size, 0) #elapsed_average = numpy.repeat(numpy.average(elapsed), elapsed.size, 0) print("Rate mean: %f" % (numpy.mean(rate_mean))) print("Elapsed mean: %f" % (numpy.mean(elapsed_mean))) pyplot.figure(1) pyplot.subplot(211) pyplot.title(k) pyplot.grid(True) pyplot.xlabel('Iteration') pyplot.ylabel('Rate (MB/s)') pyplot.plot(rate) #pyplot.plot(rate_average) pyplot.plot(rate_mean) pyplot.subplot(212) pyplot.grid(True) pyplot.xlabel('Iteration') pyplot.ylabel('Time to Write') pyplot.plot(elapsed) #pyplot.plot(elapsed_average) pyplot.plot(elapsed_mean) pyplot.savefig(os.path.join(sequence_test.path_results, string.join([k, str(sequence_test.size)], sep='_'))) pyplot.close()