aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xlogparse.py248
1 files changed, 173 insertions, 75 deletions
diff --git a/logparse.py b/logparse.py
index 4b8edb0..baee14d 100755
--- a/logparse.py
+++ b/logparse.py
@@ -6,6 +6,7 @@ import re
from glob import glob
import argparse
from math import ceil
+import hashlib
import gzip
import socket
import pandas as pd
@@ -14,39 +15,58 @@ import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from dateutil import parser as dpar
from collections import OrderedDict
+import yaml
+
+
+def md5(fname):
+ hash_md5 = hashlib.md5()
+ with open(fname, "rb") as f:
+ for chunk in iter(lambda: f.read(4096), b""):
+ hash_md5.update(chunk)
+ return hash_md5.hexdigest()
+
# regex pattern to extract key values from each line of an apache/nginx access log
# Accommodate PUTs as well as second URLs (normally "-")
-patt = '(?P<ipaddress>\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}) .* .* \\[(?P<date>\\d{2}\\/[a-zA-Z]{3}\\/\\d{4}):(?P<time>\\d{2}:\\d{2}:\\d{2}) (\\+|\\-)\\d{4}] ".* (?P<path>.*?) .*" (?P<status>\\d*) \\d* ".*" "(?P<agent>.*)"'
+patt = '(?P<ipaddress>\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}) .* .* \\[(?P<date>\\d{2}\\/[a-zA-Z]{3}\\/\\d{4}):(?P<time>\\d{2}:\\d{2}:\\d{2}) (\\+|\\-)\\d{4}] ".* (?P<path>.*?) .*" (?P<status>\\d*) (?P<size>\\d*)'
p = re.compile(patt)
-
class logData():
+ columns = {
+ 'ipaddress': {},
+ 'hostname': {},
+ 'date': {},
+ 'time': {},
+ 'path': {},
+ 'status': {},
+ 'agent': {},
+ }
+
def __init__(self,
gethostnames=False,
ignore_hosts=[]):
- self.columns = {
- 'ipaddress': {},
- 'hostname': {},
- 'date': {},
- 'time': {},
- 'path': {},
- 'status': {},
- 'agent': {},
- }
- self.dframe = pd.DataFrame(self.columns)
self.digest_path = 'digests'
self.gethostnames = gethostnames
+ self.hostnames = {}
self.ignore_hosts = ignore_hosts
+ def poll_hostnames(self):
+ if ipaddress not in self.hostnames.keys():
+ try:
+ hostname = socket.gethostbyaddr(ipaddress)
+ except:
+ hostname = 'offline'
+ self.hostnames[ipaddress] = hostname
+ else:
+ hostname = self.hostnames[ipaddress]
+
def process_lines(self, f):
print('process lines')
df = pd.DataFrame(self.columns)
unparseable = 0
for line in f.readlines():
- print(line)
try:
line = str(line.decode("utf-8"))
except(AttributeError):
@@ -57,13 +77,13 @@ class logData():
if host in line:
continue
except(TypeError):
- continue
+ pass
try:
match = p.match(line)
except:
line_errors += 1
- pass
- print(match)
+ print(f'Line parse error: {line}')
+ continue
try:
ipaddress = match.group('ipaddress')
date = match.group('date')
@@ -71,18 +91,18 @@ class logData():
time = match.group('time')
path = match.group('path')
status = match.group('status')
- agent = match.group('agent')
+ #agent = match.group('agent')
+ hostname = ''
+ df = df.append({'ipaddress':ipaddress,
+ 'hostname':hostname,
+ 'date':dateobj,
+ 'time':time,
+ 'path':path,
+ 'status':status},
+ ignore_index=True)
+ #'agent':agent}, ignore_index=True)
except(AttributeError):
unparseable += 1
- # Selective polling of hostnames here.
- hostname = '?'
- df = df.append({'ipaddress':ipaddress,
- 'hostname':hostname,
- 'date':dateobj,
- 'time':time,
- 'path':path,
- 'status':status,
- 'agent':agent}, ignore_index=True)
print(f'unparseable lines : {unparseable}')
return(df)
@@ -90,35 +110,44 @@ class logData():
'''Accepts:
a list of apache/nginx access log files, either raw or .gz,
- and parses each that does not already have a corresponding digested
- data frame in the 'digests' subdir.'''
+ and parses each that has not already been ingested.'''
- # Create data frame for receiving log data
- df = pd.DataFrame(self.columns)
+ # Create data frame for receiving all log data.
locframe = pd.DataFrame(self.columns)
-
- # Sort list of logs before processing so data will be appended in
- # chronological order.
+
+ # Track which files have been parsed by storing the MD5 hash of each
+ # once it's been read.
+ pfile = 'parsed_files.dat'
+ if not os.path.exists(pfile):
+ open(pfile, 'a').close()
+ with open(pfile, 'r') as f:
+ already_parsed = f.read().split()
+ parsed = open(pfile, 'a')
+
for log in sorted(logs):
+ # Compute MD5 hash of file and compare to list of files that
+ # have already been parsed. If new, parse, if not, skip.
+ hashval = md5(log)
+ if hashval in already_parsed:
+ print(f'File {log} already parsed.')
+ continue
+ df = pd.DataFrame(self.columns)
setname = re.sub('\.gz$', '', log)
setpath = os.path.join(self.digest_path, setname)
pklpath = os.path.join(self.digest_path, f'{setname}.pkl')
print(f'ingesting dataset = {setname}')
- if os.path.isfile(pklpath):
- df = pd.read_pickle(pklpath)
+ if '.gz' in log:
+ with gzip.open(log, 'r') as f:
+ df = self.process_lines(f)
else:
- print('parsing log file')
- if '.gz' in log:
- with gzip.open(log, 'r') as f:
- df = self.process_lines(f)
- else:
- with open(log, 'r') as f:
- df = self.process_lines(f)
- print(f'df shape = {df.shape}')
- # Dump digested log data to disk for more efficient repeated use.
- df.to_pickle(f'{setpath}.pkl')
+ with open(log, 'r') as f:
+ df = self.process_lines(f)
+ print(f'df shape = {df.shape}')
locframe = locframe.append(df, ignore_index=True)
print(locframe.shape)
+ parsed.write(f'{hashval}\n')
+
+ parsed.close()
return(locframe)
@@ -126,23 +155,28 @@ class logData():
def filter_pkgs(df):
'''Filter dataframe df down to just the rows the represent
successful (HTTP 200) conda package (.bz2 files) downloads.'''
+ print(df)
inlen = len(df)
- out = df.loc[df['agent'].str.contains('conda')]
- print(out)
- out = out.loc[out['path'].str.contains('bz2')]
- out = out.loc[out['status'].str.contains('200')]
+ ##out = df.loc[df['agent'].str.contains('conda')]
+ ##out = out.loc[out['path'].str.contains('bz2')]
+ out = df.loc[df['path'].str.contains('bz2')]
+ out = out.loc[(out['status'] == '200') | (out['status'] == '302')]
outlen = len(out)
print(f'{inlen-outlen} rows removed to leave conda txns only')
return(out)
-
def main():
ap = argparse.ArgumentParser(
prog='logparse.py',
description='Parse and digest apache/nginx access logs in either'
' raw or .gz format.')
+ ap.add_argument('--config',
+ '-c',
+ help='Configuration file used to adjust behavior of the '
+ 'program',
+ required=True)
ap.add_argument('--files',
'-f',
help='List of log files to parse, raw or .gz are accepted.'
@@ -156,6 +190,9 @@ def main():
nargs='+')
args = ap.parse_args()
+ with open(args.config, 'r') as f:
+ config = yaml.safe_load(f)
+
files = []
for filespec in args.files:
expanded = glob(filespec)
@@ -166,38 +203,66 @@ def main():
else:
files.append(expanded)
+ inf_hosts = config['infrastructure_hosts']
+ num_inf_hosts = len(inf_hosts)
+
+ # Read in any pre-existing parsed data.
+ datfile = 'dataframe.dat'
+ if not os.path.exists(datfile):
+ open(datfile, 'a').close()
+
+ with open(datfile, 'r') as f:
+ try:
+ data = pd.read_pickle(datfile)
+ except:
+ data = pd.DataFrame(logData.columns)
+
# TODO: Should host filtering take place here?
- # It leaves a disconnect between the pickled data which _may_ have been
- # culled and the actual data being referenced by the inclusion of a file
- # that has data from an exluded host within it.
+ # It leaves a disconnect between the pickled data which _may_ have
+ # been culled and the actual data being referenced.
logproc = logData(ignore_hosts=args.ignorehosts)
- data = logproc.read_logs(files)
+ newdata = logproc.read_logs(files)
- allpkgs = filter_pkgs(data)
- allpkgs = allpkgs.sort_values(by='date')
+ # If any new log files were read, filter down to only conda package downloads
+ # Then sort by date.
+ if len(newdata.index) != 0:
+ newdata = filter_pkgs(newdata)
+ newdata = newdata.sort_values(by='date')
+
+ # Append newdata to existing data (potentially empty)
+ data = data.append(newdata, ignore_index=True)
+
+ # Remove any duplicate rows in data:
+ data = data.drop_duplicates()
+
+ # Dump data to disk for use during subsequent runs.
+ data.to_pickle(datfile)
- start_date = allpkgs.iloc[0]['date']
- end_date = allpkgs.iloc[-1]['date']
- time_range = end_date - start_date
- days_elapsed = time_range.days
- if days_elapsed == 0:
- days_elapsed = 1
-
- print(f'Over the period {start_date.strftime("%m-%d-%Y")} '
- f'to {end_date.strftime("%m-%d-%Y")}')
- print(f'{days_elapsed} days')
# Normalize all conda-dev channel names to astroconda-dev
- allpkgs = allpkgs.replace('/conda-dev', '/astroconda-dev', regex=True)
+ data = data.replace('/conda-dev', '/astroconda-dev', regex=True)
+
+ all_unique_hosts = list(set(data['ipaddress']))
+ #for host in all_unique_hosts:
+ # try:
+ # print(f'{host} {socket.gethostbyaddr(host)[0]}')
+ # except:
+ # print(f'{host} offline?')
# All packages in a dictionary by channel.
- chans = [path.split('/')[1] for path in allpkgs['path']]
- chans = set(chans)
- chan_pkgs = {}
+ chans = [path.split('/')[1] for path in data['path']]
+ chans = list(set(chans))
+ chans.sort()
+ chan_pkgs = OrderedDict()
for chan in chans:
# Trailing '/' added to ensure only a single channel gets stored for each
# due to matching overlap depending on length of substring.
- chan_pkgs[chan] = allpkgs[allpkgs['path'].str.contains(chan+'/')]
+ chan_pkgs[chan] = data[data['path'].str.contains(chan+'/')]
+
+ total_downloads = 0
+ for chan in chan_pkgs.keys():
+ total_downloads += len(chan_pkgs[chan].index)
+ print(f'TOTAL downloads = {total_downloads}')
# For each channel, generate summary report of the download activity.
for chan in chan_pkgs.keys():
@@ -211,21 +276,32 @@ def main():
dates.sort()
bydate = OrderedDict()
+ start_date = dates[0]
+ end_date = dates[-1]
+ time_range = end_date - start_date
+ days_elapsed = time_range.days
+ if days_elapsed == 0:
+ days_elapsed = 1
+ print(f'\nOver the period {start_date.strftime("%m-%d-%Y")} '
+ f'to {end_date.strftime("%m-%d-%Y")}')
+ print(f'{days_elapsed} days')
+
# Downloads per day over time frame
for date in dates:
bydate[date] = len(pkgs[pkgs['date'] == date])
#for date in bydate:
# print(f'{date} : {bydate[date]}')
- total_downloads = len(pkgs.index)
- print(f'Total downloads: {total_downloads}')
+ chan_downloads = len(pkgs.index)
+ print(f'Downloads: {chan_downloads}')
# Downloads per week over time frame
- print(f'Average downloads per day: {ceil(total_downloads / days_elapsed)}')
+ print(f'Average downloads per day: {ceil(chan_downloads / days_elapsed)}')
# Number of unique hosts and geographic location
unique_hosts = set(pkgs['ipaddress'])
- print(f'Unique hosts {len(unique_hosts)}')
+ num_unique_hosts = len(unique_hosts)
+ print(f'Unique hosts {num_unique_hosts}')
## Unique packages
unique_pkgs = set(pkgs['path'])
@@ -245,6 +321,23 @@ def main():
#for i in range(top):
# print(pkg_totals[i])
+ # What fraction of total downloads come from non-infrastructure on-site hosts?
+ noninf = pkgs[~pkgs['ipaddress'].isin(config['infrastructure_hosts'])]
+ total_noninf = len(noninf.index)
+ print(f'Non-infrastructure downloads: {total_noninf}')
+ print(f'Percentage noninf downloads: {(total_noninf/chan_downloads)*100:.1f}%')
+
+ # What fraction of total downloads come from off-site hosts?
+ int_host_patterns = ['^'+s for s in config['internal_host_specs']]
+ offsite = pkgs[~pkgs['ipaddress'].str.contains(
+ '|'.join(int_host_patterns), regex=True)]
+ num_offsite_hosts = len(set(offsite['ipaddress']))
+ print(f'num off-site hosts: {num_offsite_hosts}')
+ onsite = pkgs[pkgs['ipaddress'].str.contains(
+ '|'.join(int_host_patterns), regex=True)]
+ num_onsite_hosts = len(set(onsite['ipaddress']))
+ print(f'num on-site hosts: {num_onsite_hosts}')
+
# Totals of unique software names
# i.e. name without version, hash, py or build iteration values
# Extract simple package titles from 'path' column of data frame.
@@ -260,8 +353,13 @@ def main():
total = names.count(name)
name_totals.append([name, total])
name_totals.sort(key=lambda x: x[1], reverse=True)
+ y = []
+ x = range(0,len(name_totals))
for total in name_totals:
+ y.append(total[1])
print(f'{total[0]}: {total[1]}')
+ plt.plot(x, y)
+ plt.savefig('ding.png')
if __name__ == "__main__":