Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions resolver/rsv_dupabug.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# We find cases where the number of "A queries within 0 second of first"
# is larger than the number of "unique A queries". This feels wrong.
# we take a sample to find out how this could happen.
#

import sys
import os
from pathlib import Path
import ip2as
import rsv_log_parse
import pandas as pd
import traceback
import top_as
import time
import bz2

class dupabug_uid:
def __init__(self, uid, query_cc, query_AS, query_time):
self.uid = uid
self.query_cc = query_cc
self.query_AS = query_AS
self.query_time = query_time
self.query_ad_time = 0
self.hit_count = 0
self.hit_file = [ "", "", "" ]
self.hit_event = [ 0, 0, 0 ]

def hit(self, file_name, nb_event):
self.hit_count += 1
if self.hit_count < 3:
self.hit_file[self.hit_count] = file_name
self.hit_event[self.hit_count] = nb_event

def columns():
return("uid,query_cc,query_AS,hits,f1,nb1,f2,nb2,f3,nb3,")

def get_s(self):
s = ""
s += self.uid + ","
s += self.query_cc + ","
s += self.query_AS + ","
s += str(self.hit_count) + ","
for i in range(0, 3):
s += self.hit_file[i] + ","
s += str(self.hit_event[i]) + ","
return s


class dupabug_log:
def __init__(self):
self.uids=dict()

def add_entry(self, uid, query_cc, query_AS, query_time, query_ad_time, file_name, nb_event):
if not uid in self.uids:
self.uids[uid] = dupabug_uid(uid, query_cc, query_AS, query_time)
if self.uids[uid].query_time <= query_time:
self.uids[uid].hit(file_name, nb_event)

def load_dupabug_log(self, log_file, log_threshold=15625, time_start=0):
nb_events = 0
nb_dups = 0

lth = log_threshold;
t = []
old_time = 0
print("Opening: " + log_file)
if log_file.endswith(".bz2"):
F = bz2.open(log_file, "rt")
else:
F = open(log_file, "r")
for line in F:
parsed = True
try:
x = rsv_log_parse.rsv_log_line()
parsed = x.parse_line(line)
except Exception as exc:
traceback.print_exc()
print('\nCode generated an exception: %s' % (exc))
print("Cannot parse:\n" + line + "\n")
parsed = False
if parsed:
if x.filter(rr_types=['A'], experiment=['0du'], query_delay=30, check_dotnxdomain=True):
self.add_entry(x.query_user_id, x.query_cc, x.query_AS, x.query_time, x.query_ad_time, log_file, nb_events)
nb_events += 1
if (nb_events%lth) == 0:
new_time = time.time() - time_start
print(log_file + ": loaded " + str(nb_events) + " events at " + str(new_time) + ", " + str(nb_dups) + " dups.")
sys.stdout.flush()
if lth < 1000000:
lth *= 2
new_time = time.time() - time_start
print(log_file + ": loaded " + str(nb_events) + " events at " + str(new_time) + ", " + str(nb_dups) + " dups.")

def save_and_close(self, file_name):
with open(file_name, "w") as f:
f.write(dupabug_uid.columns() + "\n")
for uid in self.uids:
if self.uids[uid].nb_hits > 1:
f.write(self.uids[uid].get_s() + "\n")

# main

if len(sys.argv) < 3:
print("usage: dupabug.py <output.csv> *<input_log>")
print("Only " + str(len(sys.argv)) + " arguments provided.")
exit(-1)

dbl = dupabug_log()
time_start = time.time()

for log_file in sys.argv[2:]:
nb_events = dbl.load_dupabug_log(log_file, log_threshold=15625, time_start=time_start)

dbl.save_and_close(sys.argv[1])
43 changes: 29 additions & 14 deletions resolver/rsv_monthly.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Summary of the monthly files.
#
# This is specialized for the organization of data on the compute server.
# It assumes that for a given month, we will have a set of folders
# It assumes that for a given month, we will have a set of folders
# named flux-yyyy-mm-dd and recap-yyyy-mm-dd. In these folders,
# we find summary files flux-summary.csv or recap-summary.csv.
# There is one line per ISP, with different columns for the two files:
Expand Down Expand Up @@ -42,8 +42,16 @@
def usage():
print("Usage: python monthly.py results_dir year month")


delay_name = [['sum_deltas_A_PDNS_ISP', 'uids_A_PDNS_ISP', 'average_A_PDNS_ISP'],
['sum_deltas_A_others_ISP', 'uids_A_others_ISP', 'average_A_others_ISP'],
['sum_deltas_AAAA_PDNS_ISP', 'uids_AAAA_PDNS_ISP', 'average_AAAA_PDNS_ISP'],
['sum_deltas_AAAA_others_ISP', 'uids_AAAA_others_ISP', 'average_AAAA_others_ISP'],
['sum_deltas_HTTPS_PDNS_ISP', 'uids_HTTPS_PDNS_ISP', 'average_HTTPS_PDNS_ISP'],
['sum_deltas_HTTPS_others_ISP', 'uids_HTTPS_others_ISP', 'average_HTTPS_others_ISP']]

class per_key_sum:
def __init__(self, x, summed_columns, has_max_delay, has_average_delay, has_sum_delays):
def __init__(self, x, summed_columns, has_max_delay, has_average_delay, has_sum_delays, has_deltas):
self.x = dict()

self.summed_columns = summed_columns
Expand All @@ -54,23 +62,30 @@ def __init__(self, x, summed_columns, has_max_delay, has_average_delay, has_sum_
self.x[name] = x[name]
if self.has_max_delay:
self.x['max_delay'] = x['max_delay']
self.has_deltas = has_deltas
if self.has_deltas:
for delay in delay_name:
if (delay[1] in self.x) and (self.x[delay[1]] > 0):
self.x[delay[2]] = self.x[delay[0]] / self.x[delay[1]]
if self.has_average_delay:
self.x['average_delay'] = x['average_delay']
if not self.has_sum_delays:
self.x['sum_delays'] = self.x['uids']*self.x['average_delay']
# if (self.x['uids'] > 0):
# print("Sum delays: " + str(self.x['sum_delays']) + " (" + str(self.x['uids']) + ", " + str(self.x['average_delay']))

def add(self,x):
for name in self.summed_columns:
self.x[name] += x[name]
if self.has_max_delay and x['max_delay'] > self.x['max_delay']:
self.x['max_delay'] = x['max_delay']
if self.has_deltas:
for delay in delay_name:
if (delay[1] in self.x) and (self.x[delay[1]] > 0):
self.x[delay[2]] = self.x[delay[0]] / self.x[delay[1]]
if self.has_average_delay and self.x['uids'] > 0:
if not self.has_sum_delays:
self.x['sum_delays'] += x['uids']*x['average_delay']
self.x['average_delay'] = self.x['sum_delays'] / self.x['uids']

def get_row(self):
row = []
for name in self.summed_columns:
Expand All @@ -87,6 +102,7 @@ def __init__(self, summary_columns):
self.has_max_delay = False
self.has_average_delay = False
self.has_sum_delays = False
self.has_deltas = False
self.monthly_per_isp = dict()
self.monthly_per_day = dict()
#print(str(summary_columns))
Expand All @@ -96,18 +112,21 @@ def __init__(self, summary_columns):

def get_columns(self, summary_columns):
header_set = set(["CC", "AS", 'start'])

#print("headers: " + str(summary_columns))
for c in summary_columns:
if not c in header_set:
if c == 'max_delay':
self.has_max_delay=True
elif c == 'average_delay':
self.has_average_delay=True
elif c == delay_name[0][0]:
self.has_deltas = True
self.summed_columns.append(c)
else:
self.summed_columns.append(c)
if c == 'sum_delays':
self.has_sum_delays = True


def add_cc_as(self, row):
cc = row['CC']
Expand All @@ -119,14 +138,14 @@ def add_cc_as(self, row):
key = cc + '-' + asn

if not key in self.monthly_per_isp:
self.monthly_per_isp[key] = per_key_sum(row, self.summed_columns, self.has_max_delay, self.has_average_delay, self.has_sum_delays)
self.monthly_per_isp[key] = per_key_sum(row, self.summed_columns, self.has_max_delay, self.has_average_delay, self.has_sum_delays, self.has_deltas)
else:
self.monthly_per_isp[key].add(row)

def add_daily(self, row, year, month, day):
key = str(year) + '-' + str(month) + '-' + str(day)
if not key in self.monthly_per_day:
self.monthly_per_day[key] = per_key_sum(row, self.summed_columns, self.has_max_delay, self.has_average_delay, self.has_sum_delays)
self.monthly_per_day[key] = per_key_sum(row, self.summed_columns, self.has_max_delay, self.has_average_delay, self.has_sum_delays, self.has_deltas)
else:
self.monthly_per_day[key].add(row)

Expand All @@ -135,9 +154,6 @@ def add_file(self, year, month, day, file_path):
df = pd.read_csv(file_path)
df.apply(lambda row: self.add_cc_as(row),axis=1)
df.apply(lambda row: self.add_daily(row, year, month, day),axis=1)
#print("After loading " + file_path + ":")
#print("ISP: " + str(len(self.monthly_per_isp)))
#print("Days: " + str(len(self.monthly_per_day)))

def load_day(self, month_dir, file_name, year, month, prefix):
print("Try: " + file_name)
Expand Down Expand Up @@ -227,7 +243,6 @@ def save(self, year, month, month_dir, table_name):
recap_dirs = [f for f in m_list if f.startswith(recap_prefix)]

for dir_name in recap_dirs:
print(dir_name)
#print(dir_name)
recap_summaries.load_day(month_dir, dir_name, year, month, "recap2-")
recap_summaries.save(year, month, month_dir, 'recap2')

Loading
Loading