aboutsummaryrefslogtreecommitdiff
path: root/conmets/conmets.py
diff options
context:
space:
mode:
authorMatt Rendina <mrendina@stsci.edu>2019-10-11 11:55:44 -0400
committerMatt Rendina <mrendina@stsci.edu>2019-10-11 11:55:44 -0400
commiteeac8ec2a92c97b23f7e4a3ab917f466bb775d1f (patch)
tree95d534c2bc899c482c43aa5ed7ee40305c4389fe /conmets/conmets.py
parenta95ac10538f3232f23dd2f78ae75a119eb9daee7 (diff)
downloadconmets-eeac8ec2a92c97b23f7e4a3ab917f466bb775d1f.tar.gz
Relocate module file
Diffstat (limited to 'conmets/conmets.py')
-rwxr-xr-xconmets/conmets.py484
1 files changed, 484 insertions, 0 deletions
diff --git a/conmets/conmets.py b/conmets/conmets.py
new file mode 100755
index 0000000..b083ede
--- /dev/null
+++ b/conmets/conmets.py
@@ -0,0 +1,484 @@
+#!/usr/bin/env python3
+import os
+import sys
+import re
+from glob import glob
+import pickle
+from math import ceil
+import hashlib
+import gzip
+import socket
+import pandas as pd
+import datetime as dt
+import matplotlib.pyplot as plt
+import matplotlib.dates as mdates
+from dateutil import parser as dpar
+from collections import OrderedDict
+
+
+def md5(fname):
+ hash_md5 = hashlib.md5()
+ with open(fname, "rb") as f:
+ for chunk in iter(lambda: f.read(4096), b""):
+ hash_md5.update(chunk)
+ return hash_md5.hexdigest()
+
+
+# regex pattern to extract key values from each line of an apache/nginx access log
+# Accommodate PUTs as well as second URLs (normally "-")
+patt = '(?P<ipaddress>\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}) .* .* \\[(?P<date>\\d{2}\\/[a-zA-Z]{3}\\/\\d{4}):(?P<time>\\d{2}:\\d{2}:\\d{2}) (\\+|\\-)\\d{4}] ".* (?P<path>.*?) .*" (?P<status>\\d*) (?P<size>\\d*)'
+
+logpattern = re.compile(patt)
+
+class LogData():
+
+ columns = {
+ 'ipaddress': {},
+ 'hostname': {},
+ 'date': {},
+ 'time': {},
+ 'path': {},
+ 'status': {},
+ 'size': {},
+ 'name': {}, # derived
+ }
+
+ def __init__(self,
+ dataset_name,
+ gethostnames=False,
+ ignore_hosts=[]):
+ '''dataset is a dict
+ dataframe - pandas dataframe containing digested log data
+ file_hashes - MD5 hashes of each file that was read to compose the dataframe'''
+ self.dataset_name = dataset_name
+ self.dataset = None
+ self.data = None
+ self.digest_path = 'digests'
+ self.gethostnames = gethostnames
+ self.hostnames = {}
+ self.ignore_hosts = ignore_hosts
+
+ try:
+ print('reading dataset...')
+ with open(self.dataset_name, 'rb') as f:
+ self.dataset = pickle.load(f)
+ except:
+ print(f'{self.dataset_name} not found. Creating empty dataset.')
+ with open(self.dataset_name, 'a') as f:
+ self.dataset = {'dataframe':pd.DataFrame(self.columns),
+ 'file_hashes': []}
+ self.data = self.dataset['dataframe']
+ self.hashes = self.dataset['file_hashes']
+
+ def poll_hostnames(self):
+ if ipaddress not in self.hostnames.keys():
+ try:
+ hostname = socket.gethostbyaddr(ipaddress)
+ except:
+ hostname = 'offline'
+ self.hostnames[ipaddress] = hostname
+ else:
+ hostname = self.hostnames[ipaddress]
+
+ def process_lines(self, f):
+ df = pd.DataFrame(self.columns)
+ unparseable = 0
+ for line in f.readlines():
+ try:
+ line = str(line.decode("utf-8"))
+ except(AttributeError):
+ pass
+ # Ignore transactions from particular IP addresses as requested.
+ try:
+ for host in self.ignore_hosts:
+ if host in line:
+ continue
+ except(TypeError):
+ pass
+
+ try:
+ match = logpattern.match(line)
+ #print(f'logpattern.match(line): {match}')
+ except:
+ line_errors += 1
+ print(f'Line parse error: {line}')
+ continue
+ try:
+ ipaddress = match.group('ipaddress')
+ date = match.group('date')
+ dateobj = dpar.parse(date)
+ time = match.group('time')
+ path = match.group('path')
+
+ # Extract simple package titles from 'path' column of data frame.
+ patt0 = re.compile('/.*/.*/')
+ patt1 = re.compile('(?P<simplename>.*)-.*-.*\.tar\.bz2$')
+ tarball = re.sub(patt0, '', path)
+ namematch = patt1.match(tarball)
+ name = namematch.group('simplename')
+ status = match.group('status')
+ size = int(match.group('size'))
+ hostname = ''
+ df = df.append({'ipaddress':ipaddress,
+ 'hostname':hostname,
+ 'date':dateobj,
+ 'time':time,
+ 'path':path,
+ 'status':status,
+ 'size':size,
+ 'name':name},
+ ignore_index=True)
+ except(AttributeError):
+ unparseable += 1
+ print(f'unparseable lines : {unparseable}')
+ return(df)
+
+ def read_logs(self, logs):
+ '''Accepts:
+
+ a list of apache/nginx access log files, either raw or .gz,
+ and parses each that has not already been ingested.'''
+
+ # Create data frame for receiving all log data.
+ newdata = pd.DataFrame(self.columns)
+
+ for log in sorted(logs):
+ # Compute MD5 hash of file and compare to list of files that
+ # have already been parsed. If new, parse, if not, skip.
+ hashval = md5(log)
+ if hashval in self.hashes:
+ print(f'File {log} already parsed.')
+ continue
+ df = pd.DataFrame(self.columns)
+ setname = re.sub('\.gz$', '', log)
+ setpath = os.path.join(self.digest_path, setname)
+ pklpath = os.path.join(self.digest_path, f'{setname}.pkl')
+ print(f'Reading log file {log}...')
+ if '.gz' in log:
+ with gzip.open(log, 'r') as f:
+ df = self.process_lines(f)
+ else:
+ with open(log, 'r') as f:
+ df = self.process_lines(f)
+ print(f'Added {df.index} transations to dataset. {newdata.shape} for this session.')
+ newdata = newdata.append(df, ignore_index=True)
+ print(newdata.shape)
+ self.hashes.append(hashval)
+
+ # If any new log files were read, filter down to only conda package downloads
+ # Then sort by date.
+ if len(newdata.index) != 0:
+ newdata = self.filter_pkgs(newdata)
+ newdata = newdata.sort_values(by='date')
+ newdata = newdata.drop_duplicates()
+ # Normalize any 'conda-dev' channel names to 'astroconda-dev'
+ newdata = newdata.replace('/conda-dev', '/astroconda-dev', regex=True)
+ # Add newdata to (potentially empty) existing data
+ self.data = self.data.append(newdata, ignore_index=True)
+ self.dataset['dataframe'] = self.data
+
+ def filter_pkgs(self, df):
+ '''Filter dataframe df down to just the rows the represent
+ successful (HTTP 200) conda package (.bz2 files) downloads.'''
+ inlen = len(df)
+ out = df.loc[df['path'].str.contains('bz2')]
+ out = out.loc[(out['status'] == '200') | (out['status'] == '302')]
+ outlen = len(out)
+ print(f'{inlen-outlen} rows removed to leave conda txns only')
+ return(out)
+
+ def write_dataset(self, dataset_name=None):
+ '''Serialize working dataset and write it to disk using a filename
+ provided, if requested.
+
+ Parameters
+ ----------
+ dataset_name : string
+ Optional name to use for file when writing working dataset to disk.
+ If not provided, the current name of the working dataset file will
+ be used.'''
+ if dataset_name:
+ dsname = dataset_name
+ else:
+ dsname = self.dataset_name
+ pickle.dump(self.dataset, open(dsname, 'wb'))
+
+
+#def main():
+# ap = argparse.ArgumentParser(
+# prog='logparse.py',
+# description='Parse and digest apache/nginx access logs in either'
+# ' raw or .gz format.')
+# ap.add_argument('dataset_name', type=str,
+# help='Name of dataset file. If file does not exist and'
+# ' log data file names are provided for parsing, this '
+# 'file will be created.')
+# ap.add_argument('--config',
+# '-c',
+# help='Configuration file used to adjust behavior of the '
+# 'program',
+# required=True)
+# ap.add_argument('--files',
+# '-f',
+# help='List of log files to parse, raw or .gz are accepted.'
+# ' glob syntax is also honored.',
+# nargs='+')
+# ap.add_argument('--window',
+# '-w',
+# help='Restrict examination of data to the window of dates'
+# ' provided.\n'
+# ' Format: YYYY.MM.DD-YYYY.MM.DD'
+# ' Omitting a date window will operate on all data contained'
+# ' within the given dataset.')
+# ap.add_argument('--ignorehosts',
+# '-i',
+# help='IP addresses of hosts to ignore when parsing logs.'
+# ' Useful for saving time by not reading in transactions '
+# 'from security scans, etc.',
+# nargs='+')
+# args = ap.parse_args()
+#
+# # Dataset filename
+# dataset_name = args.dataset_name
+#
+# with open(args.config, 'r') as f:
+# config = yaml.safe_load(f)
+#
+# files = []
+# try:
+# for filespec in args.files:
+# expanded = glob(filespec)
+# expanded.sort()
+# if isinstance(expanded, list):
+# for name in expanded:
+# files.append(name)
+# else:
+# files.append(expanded)
+# except(TypeError):
+# print('No log files provided.')
+# print(f'Importing existing dataset {dataset_name}.')
+# pass
+#
+# inf_hosts = config['infrastructure_hosts']
+# num_inf_hosts = len(inf_hosts)
+#
+# # TODO: Should host filtering take place here?
+# # It leaves a disconnect between the pickled data which _may_ have
+# # been culled and the actual data being referenced.
+# logproc = LogData(dataset_name, ignore_hosts=args.ignorehosts)
+# logproc.read_logs(files)
+#
+# print('writing (potentially updated) dataset')
+# logproc.write_dataset()
+#
+# # Filtering and analysis begins here
+# data = logproc.data
+# print(f'num full data rows = {len(data.index)}')
+#
+# # Filter out a particular time period for examination
+# # Set limits on a time period to examine
+# if args.window:
+# start = args.window.split('-')[0].replace('.', '-')
+# end = args.window.split('-')[1].replace('.', '-')
+# window_start = pd.to_datetime(start)
+# window_end = pd.to_datetime(end)
+# print(f'Filtering based on window {window_start} - {window_end}.')
+# data = data[pd.to_datetime(data['date']) >= window_start]
+# data = data[pd.to_datetime(data['date']) <= window_end]
+# print(f'num windowed data rows = {len(data.index)}')
+#
+# all_unique_hosts = list(set(data['ipaddress']))
+# #for host in all_unique_hosts:
+# # try:
+# # print(f'{host} {socket.gethostbyaddr(host)[0]}')
+# # except:
+# # print(f'{host} offline?')
+#
+# # All packages in a dictionary by channel.
+# chans = [path.split('/')[1] for path in data['path']]
+# chans = list(set(chans))
+# chans.sort()
+# chan_pkgs = OrderedDict()
+# for chan in chans:
+# # Trailing '/' added to ensure only a single channel gets stored for each
+# # due to matching overlap depending on length of substring.
+# chan_pkgs[chan] = data[data['path'].str.contains(chan+'/')]
+#
+# total_downloads = 0
+# for chan in chan_pkgs.keys():
+# total_downloads += len(chan_pkgs[chan].index)
+# print(f'TOTAL downloads = {total_downloads}')
+#
+# # For each channel, generate summary report of the download activity.
+# for chan in chan_pkgs.keys():
+# print(f'\n\nSummary for channel: {chan}')
+# print('-----------------------------')
+#
+# pkgs = chan_pkgs[chan]
+# # Unique days
+# dates = set(pkgs['date'])
+# dates = list(dates)
+# dates.sort()
+# bydate = OrderedDict()
+#
+# start_date = dates[0]
+# end_date = dates[-1]
+# time_range = end_date - start_date
+# days_elapsed = time_range.days
+# if days_elapsed == 0:
+# days_elapsed = 1
+# print(f'\nOver the period {start_date.strftime("%m-%d-%Y")} '
+# f'to {end_date.strftime("%m-%d-%Y")}')
+# print(f'{days_elapsed} days')
+#
+# # Downloads per day over time frame
+# for date in dates:
+# bydate[date] = len(pkgs[pkgs['date'] == date])
+#
+# chan_downloads = len(pkgs.index)
+# print(f'Downloads: {chan_downloads}')
+#
+# print(f'Average downloads per day: {ceil(chan_downloads / days_elapsed)}')
+#
+# # Total bandwidth consumed by this channel's use over time frame.
+# bytecount = pkgs['size'].sum()
+# gib = bytecount / 1e9
+# print(f'Data transferred: {gib:.2f} GiB')
+#
+# # Number of unique hosts and geographic location
+# unique_hosts = set(pkgs['ipaddress'])
+# num_unique_hosts = len(unique_hosts)
+# print(f'Unique hosts {num_unique_hosts}')
+#
+# ## Unique packages
+# unique_pkgs = set(pkgs['path'])
+# print(f'Unique full package names {len(unique_pkgs)}')
+#
+# # What is the fraction of downloads for each OS?
+# num_linux_txns = len(pkgs[pkgs['path'].str.contains('linux-64')].index)
+# num_osx_txns = len(pkgs[pkgs['path'].str.contains('osx-64')].index)
+# pcnt_linux_txns = (num_linux_txns / float(chan_downloads))*100
+# pcnt_osx_txns = (num_osx_txns / float(chan_downloads))*100
+#
+# # What fraction of total downloads come from non-infrastructure on-site hosts?
+# noninf = pkgs[~pkgs['ipaddress'].isin(config['infrastructure_hosts'])]
+# total_noninf = len(noninf.index)
+# print(f'Non-infrastructure downloads: {total_noninf}')
+# print(f'Percentage noninf downloads: {(total_noninf/chan_downloads)*100:.1f}%')
+#
+# # What fraction of total downloads come from off-site hosts?
+# int_host_patterns = ['^'+s for s in config['internal_host_specs']]
+# offsite = pkgs[~pkgs['ipaddress'].str.contains(
+# '|'.join(int_host_patterns), regex=True)]
+# num_offsite_hosts = len(set(offsite['ipaddress']))
+# print(f'num unique off-site hosts: {num_offsite_hosts}')
+# onsite = pkgs[pkgs['ipaddress'].str.contains(
+# '|'.join(int_host_patterns), regex=True)]
+# num_onsite_hosts = len(set(onsite['ipaddress']))
+# print(f'num unique on-site hosts: {num_onsite_hosts}')
+#
+# infra = pkgs[pkgs['ipaddress'].str.contains('|'.join(inf_hosts))]
+#
+# # Totals of unique software titles
+# # i.e. name without version, hash, py or build iteration values
+# # Extract simple package titles from 'path' column of data frame.
+# names = list(pkgs['name'])
+# unique_names = list(set(names))
+# name_statsums = []
+# for name in unique_names:
+# statsum = {}
+# statsum['name'] = name
+# statsum['total'] = names.count(name)
+# # Sum on- and off-site transactions for each package name
+# # 'on-site' means transactions to non-infrastructure hosts.
+# name_txns = pkgs[pkgs['name'] == name]
+#
+# on_txns = name_txns[name_txns['ipaddress'].str.contains(
+# '|'.join(int_host_patterns), regex=True)]
+# # Filter out hosts designated as infrastructure hosts in config file.
+# on_txns = on_txns[~on_txns['ipaddress'].str.contains(
+# '|'.join(inf_hosts))]
+#
+# num_onsite_txns = len(on_txns.index)
+# statsum['onsite'] = num_onsite_txns
+#
+# off_txns = name_txns[~name_txns['ipaddress'].str.contains(
+# '|'.join(int_host_patterns), regex=True)]
+# num_offsite_txns = len(off_txns.index)
+# statsum['offsite'] = num_offsite_txns
+#
+# infra_txns = name_txns[name_txns['ipaddress'].str.contains(
+# '|'.join(inf_hosts))]
+# num_infra_txns = len(infra_txns.index)
+# statsum['infra'] = num_infra_txns
+#
+# ## Determine which packages are also available via PyPI
+# url = f'https://pypi.org/pypi/{name}/json'
+# try:
+# rq = urllib.request.urlopen(url)
+# #pl = f.read().decode('utf-8')
+# #piinfo = json.loads(pl)
+# statsum['pypi'] = True
+# except(HTTPError):
+# statsum['pypi'] = False
+# #statsum['pypi'] = False
+#
+# name_statsums.append(statsum)
+#
+# name_statsums.sort(key=lambda x: x['total'], reverse=True)
+# x_onsite = [i['onsite'] for i in name_statsums]
+# x_infra = [i['infra'] for i in name_statsums]
+# x_offsite = [i['offsite'] for i in name_statsums]
+# y = [i['name'] for i in name_statsums]
+#
+# print(f'Number of unique {chan} titles downloaded: {len(unique_names)}')
+# # For each unique softare name, sum the number of transactions from internal hosts.
+# fig, axes = plt.subplots(figsize=(10,25))
+# plt.grid(which='major', axis='x')
+# plt.title(f'{chan} -- {start_date.strftime("%m-%d-%Y")} - {end_date.strftime("%m-%d-%Y")}')
+# plt.xlabel('Downloads')
+# axes.set_ylim(-1,len(name_statsums))
+# axes.tick_params(labeltop=True)
+#
+# plt.gca().invert_yaxis()
+# width = 1
+# from operator import add
+# barlists = []
+# # Horizontal stacked bar chart with off-site, on-site, and infrastructure transactions.
+# barlists.append(axes.barh(y, x_offsite, width, edgecolor='white', color='tab:blue'))
+# barlists.append(axes.barh(y, x_onsite, width, left=x_offsite, edgecolor='white', color='tab:green'))
+# # Sum bars up to this point to correctly stack the subsequent one(s).
+# offset = list(map(add, x_offsite, x_onsite))
+# barlists.append(axes.barh(y, x_infra, width, left=offset, edgecolor='white', color='tab:olive'))
+#
+# for i,statsum in enumerate(name_statsums):
+# if statsum['pypi'] == True:
+# axes.get_yticklabels()[i].set_color('orange')
+# axes.get_yticklabels()[i].set_weight('bold')
+#
+# # Annotate plot with additional stats
+# props = dict(boxstyle='round', facecolor='wheat', alpha=0.5)
+# plural = ''
+# if days_elapsed > 1:
+# plural = 's'
+# stats_text = (f'{days_elapsed} day{plural}\n'
+# f'Total Downloads: {chan_downloads}\n'
+# f'Average downloads per day: {ceil(chan_downloads / days_elapsed)}\n'
+# f'Unique titles: {len(unique_names)}\n'
+# f'Data transferred: {gib:.2f} GiB\n'
+# f'Linux transactions: {pcnt_linux_txns:.1f}%\n'
+# f'Macos transactions: {pcnt_osx_txns:.1f}%\n'
+# f'Unique on-site hosts: {num_onsite_hosts}\n'
+# f'Unique off-site hosts: {num_offsite_hosts}\n')
+# axes.text(0.45, 0.05, stats_text, transform=axes.transAxes, fontsize=14, bbox=props)
+# axes.legend(['off-site', 'on-site', 'on-site infrastructure'])
+#
+# plt.tight_layout()
+# short_startdate = start_date.strftime('%Y%m%d')
+# short_enddate = end_date.strftime('%Y%m%d')
+# plt.savefig(f'{chan}-{short_startdate}-{short_enddate}.png')
+#
+#
+#if __name__ == "__main__":
+# main()
+