diff --git a/resolver/rsv_dupabug.py b/resolver/rsv_dupabug.py new file mode 100644 index 0000000..ebc4b1d --- /dev/null +++ b/resolver/rsv_dupabug.py @@ -0,0 +1,114 @@ +# We find cases where the number of "A queries within 0 second of first" +# is larger than the number of "unique A queries". This feels wrong. +# we take a sample to find out how this could happen. +# + +import sys +import os +from pathlib import Path +import ip2as +import rsv_log_parse +import pandas as pd +import traceback +import top_as +import time +import bz2 + +class dupabug_uid: + def __init__(self, uid, query_cc, query_AS, query_time): + self.uid = uid + self.query_cc = query_cc + self.query_AS = query_AS + self.query_time = query_time + self.query_ad_time = 0 + self.hit_count = 0 + self.hit_file = [ "", "", "" ] + self.hit_event = [ 0, 0, 0 ] + + def hit(self, file_name, nb_event): + self.hit_count += 1 + if self.hit_count < 3: + self.hit_file[self.hit_count] = file_name + self.hit_event[self.hit_count] = nb_event + + def columns(): + return("uid,query_cc,query_AS,hits,f1,nb1,f2,nb2,f3,nb3,") + + def get_s(self): + s = "" + s += self.uid + "," + s += self.query_cc + "," + s += self.query_AS + "," + s += str(self.hit_count) + "," + for i in range(0, 3): + s += self.hit_file[i] + "," + s += str(self.hit_event[i]) + "," + return s + + +class dupabug_log: + def __init__(self): + self.uids=dict() + + def add_entry(self, uid, query_cc, query_AS, query_time, query_ad_time, file_name, nb_event): + if not uid in self.uids: + self.uids[uid] = dupabug_uid(uid, query_cc, query_AS, query_time) + if self.uids[uid].query_time <= query_time: + self.uids[uid].hit(file_name, nb_event) + + def load_dupabug_log(self, log_file, log_threshold=15625, time_start=0): + nb_events = 0 + nb_dups = 0 + + lth = log_threshold; + t = [] + old_time = 0 + print("Opening: " + log_file) + if log_file.endswith(".bz2"): + F = bz2.open(log_file, "rt") + else: + F = open(log_file, "r") + for line in F: + parsed = True + try: + x = rsv_log_parse.rsv_log_line() + parsed = x.parse_line(line) + except Exception as exc: + traceback.print_exc() + print('\nCode generated an exception: %s' % (exc)) + print("Cannot parse:\n" + line + "\n") + parsed = False + if parsed: + if x.filter(rr_types=['A'], experiment=['0du'], query_delay=30, check_dotnxdomain=True): + self.add_entry(x.query_user_id, x.query_cc, x.query_AS, x.query_time, x.query_ad_time, log_file, nb_events) + nb_events += 1 + if (nb_events%lth) == 0: + new_time = time.time() - time_start + print(log_file + ": loaded " + str(nb_events) + " events at " + str(new_time) + ", " + str(nb_dups) + " dups.") + sys.stdout.flush() + if lth < 1000000: + lth *= 2 + new_time = time.time() - time_start + print(log_file + ": loaded " + str(nb_events) + " events at " + str(new_time) + ", " + str(nb_dups) + " dups.") + + def save_and_close(self, file_name): + with open(file_name, "w") as f: + f.write(dupabug_uid.columns() + "\n") + for uid in self.uids: + if self.uids[uid].nb_hits > 1: + f.write(self.uids[uid].get_s() + "\n") + +# main + +if len(sys.argv) < 3: + print("usage: dupabug.py *") + print("Only " + str(len(sys.argv)) + " arguments provided.") + exit(-1) + +dbl = dupabug_log() +time_start = time.time() + +for log_file in sys.argv[2:]: + nb_events = dbl.load_dupabug_log(log_file, log_threshold=15625, time_start=time_start) + +dbl.save_and_close(sys.argv[1]) diff --git a/resolver/rsv_monthly.py b/resolver/rsv_monthly.py index 1972d33..07fd604 100644 --- a/resolver/rsv_monthly.py +++ b/resolver/rsv_monthly.py @@ -1,7 +1,7 @@ # Summary of the monthly files. # # This is specialized for the organization of data on the compute server. -# It assumes that for a given month, we will have a set of folders +# It assumes that for a given month, we will have a set of folders # named flux-yyyy-mm-dd and recap-yyyy-mm-dd. In these folders, # we find summary files flux-summary.csv or recap-summary.csv. # There is one line per ISP, with different columns for the two files: @@ -42,8 +42,16 @@ def usage(): print("Usage: python monthly.py results_dir year month") + +delay_name = [['sum_deltas_A_PDNS_ISP', 'uids_A_PDNS_ISP', 'average_A_PDNS_ISP'], + ['sum_deltas_A_others_ISP', 'uids_A_others_ISP', 'average_A_others_ISP'], + ['sum_deltas_AAAA_PDNS_ISP', 'uids_AAAA_PDNS_ISP', 'average_AAAA_PDNS_ISP'], + ['sum_deltas_AAAA_others_ISP', 'uids_AAAA_others_ISP', 'average_AAAA_others_ISP'], + ['sum_deltas_HTTPS_PDNS_ISP', 'uids_HTTPS_PDNS_ISP', 'average_HTTPS_PDNS_ISP'], + ['sum_deltas_HTTPS_others_ISP', 'uids_HTTPS_others_ISP', 'average_HTTPS_others_ISP']] + class per_key_sum: - def __init__(self, x, summed_columns, has_max_delay, has_average_delay, has_sum_delays): + def __init__(self, x, summed_columns, has_max_delay, has_average_delay, has_sum_delays, has_deltas): self.x = dict() self.summed_columns = summed_columns @@ -54,23 +62,30 @@ def __init__(self, x, summed_columns, has_max_delay, has_average_delay, has_sum_ self.x[name] = x[name] if self.has_max_delay: self.x['max_delay'] = x['max_delay'] + self.has_deltas = has_deltas + if self.has_deltas: + for delay in delay_name: + if (delay[1] in self.x) and (self.x[delay[1]] > 0): + self.x[delay[2]] = self.x[delay[0]] / self.x[delay[1]] if self.has_average_delay: self.x['average_delay'] = x['average_delay'] if not self.has_sum_delays: self.x['sum_delays'] = self.x['uids']*self.x['average_delay'] - # if (self.x['uids'] > 0): - # print("Sum delays: " + str(self.x['sum_delays']) + " (" + str(self.x['uids']) + ", " + str(self.x['average_delay'])) def add(self,x): for name in self.summed_columns: self.x[name] += x[name] if self.has_max_delay and x['max_delay'] > self.x['max_delay']: self.x['max_delay'] = x['max_delay'] + if self.has_deltas: + for delay in delay_name: + if (delay[1] in self.x) and (self.x[delay[1]] > 0): + self.x[delay[2]] = self.x[delay[0]] / self.x[delay[1]] if self.has_average_delay and self.x['uids'] > 0: if not self.has_sum_delays: self.x['sum_delays'] += x['uids']*x['average_delay'] self.x['average_delay'] = self.x['sum_delays'] / self.x['uids'] - + def get_row(self): row = [] for name in self.summed_columns: @@ -87,6 +102,7 @@ def __init__(self, summary_columns): self.has_max_delay = False self.has_average_delay = False self.has_sum_delays = False + self.has_deltas = False self.monthly_per_isp = dict() self.monthly_per_day = dict() #print(str(summary_columns)) @@ -96,18 +112,21 @@ def __init__(self, summary_columns): def get_columns(self, summary_columns): header_set = set(["CC", "AS", 'start']) - + #print("headers: " + str(summary_columns)) for c in summary_columns: if not c in header_set: if c == 'max_delay': self.has_max_delay=True elif c == 'average_delay': self.has_average_delay=True + elif c == delay_name[0][0]: + self.has_deltas = True + self.summed_columns.append(c) else: self.summed_columns.append(c) if c == 'sum_delays': self.has_sum_delays = True - + def add_cc_as(self, row): cc = row['CC'] @@ -119,14 +138,14 @@ def add_cc_as(self, row): key = cc + '-' + asn if not key in self.monthly_per_isp: - self.monthly_per_isp[key] = per_key_sum(row, self.summed_columns, self.has_max_delay, self.has_average_delay, self.has_sum_delays) + self.monthly_per_isp[key] = per_key_sum(row, self.summed_columns, self.has_max_delay, self.has_average_delay, self.has_sum_delays, self.has_deltas) else: self.monthly_per_isp[key].add(row) def add_daily(self, row, year, month, day): key = str(year) + '-' + str(month) + '-' + str(day) if not key in self.monthly_per_day: - self.monthly_per_day[key] = per_key_sum(row, self.summed_columns, self.has_max_delay, self.has_average_delay, self.has_sum_delays) + self.monthly_per_day[key] = per_key_sum(row, self.summed_columns, self.has_max_delay, self.has_average_delay, self.has_sum_delays, self.has_deltas) else: self.monthly_per_day[key].add(row) @@ -135,9 +154,6 @@ def add_file(self, year, month, day, file_path): df = pd.read_csv(file_path) df.apply(lambda row: self.add_cc_as(row),axis=1) df.apply(lambda row: self.add_daily(row, year, month, day),axis=1) - #print("After loading " + file_path + ":") - #print("ISP: " + str(len(self.monthly_per_isp))) - #print("Days: " + str(len(self.monthly_per_day))) def load_day(self, month_dir, file_name, year, month, prefix): print("Try: " + file_name) @@ -227,7 +243,6 @@ def save(self, year, month, month_dir, table_name): recap_dirs = [f for f in m_list if f.startswith(recap_prefix)] for dir_name in recap_dirs: - print(dir_name) + #print(dir_name) recap_summaries.load_day(month_dir, dir_name, year, month, "recap2-") recap_summaries.save(year, month, month_dir, 'recap2') - diff --git a/resolver/rsv_recap.py b/resolver/rsv_recap.py index 2e48989..0bf4a6a 100644 --- a/resolver/rsv_recap.py +++ b/resolver/rsv_recap.py @@ -86,7 +86,9 @@ def __init__(self, query_time, rr_type, resolver_tag): self.has_A_prov = [False, False, False] self.nb_A_prov = [0, 0, 0] self.nb_A_under = [[ 0, 0, 0 ],[ 0, 0, 0 ],[ 0, 0, 0 ],[ 0, 0, 0 ],[ 0, 0, 0 ],[ 0, 0, 0 ],[ 0, 0, 0 ],[ 0, 0, 0 ],[ 0, 0, 0 ]] - self.first_time_A = 0 + self.first_time_A = [ 0, 0, 0 ] + self.first_time_AAAA = [ 0, 0, 0 ] + self.first_time_HTTPS = [ 0, 0, 0 ] self.per_RR = [ flux_uid_rr(), flux_uid_rr(), flux_uid_rr() ] self.has_prov = [ False, False, False ] @@ -115,6 +117,89 @@ def __init__(self): self.per_prov = [ 0, 0, 0 ] #self.total = 0 +class first_delay_cc_as: + def __init__(self): + self.sum_delta_PDNS_ISP = 0 + self.sum_delta_others_ISP = 0 + self.uids_PDNS_ISP = 0 + self.uids_others_ISP = 0 + self.average_PDNS_ISP = 0 + self.average_others_ISP = 0 + + def update(self, first_delays): + if first_delays[0] > 0: + if first_delays[1] > 0: + self.sum_delta_PDNS_ISP += first_delays[1] - first_delays[0] + self.uids_PDNS_ISP += 1 + if first_delays[2] > 0: + self.sum_delta_others_ISP += first_delays[2] - first_delays[0] + self.uids_others_ISP += 1 + + def average(self): + if (self.uids_PDNS_ISP > 0): + self.average_PDNS_ISP = self.sum_delta_PDNS_ISP/self.uids_PDNS_ISP + else: + self.average_PDNS_ISP = 0 + if self.uids_others_ISP > 0: + self.average_others_ISP = self.sum_delta_others_ISP/self.uids_others_ISP + else: + self.average_others_ISP = 0 + + def copy(self,other): + self.sum_delta_PDNS_ISP = other.sum_delta_PDNS_ISP + self.sum_delta_others_ISP = other.sum_delta_others_ISP + self.uids_PDNS_ISP = other.uids_PDNS_ISP + self.uids_others_ISP = other.uids_others_ISP + self.average_PDNS_ISP = other.average_PDNS_ISP + self.average_others_ISP = other.average_others_ISP + + def add_delays(self, other): + self.sum_delta_PDNS_ISP += other.sum_delta_PDNS_ISP + self.sum_delta_others_ISP += other.sum_delta_others_ISP + self.uids_PDNS_ISP += other.uids_PDNS_ISP + self.uids_others_ISP += other.uids_others_ISP + self.average() + + def to_list(self): + return [ + self.sum_delta_PDNS_ISP, self.uids_PDNS_ISP, self.average_PDNS_ISP, + self.sum_delta_others_ISP, self.uids_others_ISP, self.average_others_ISP + ] + + def reset(self): + self.sum_delta_PDNS_ISP = 0 + self.sum_delta_others_ISP = 0 + self.uids_PDNS_ISP = 0 + self.uids_others_ISP = 0 + self.average_PDNS_ISP = 0 + self.average_others_ISP = 0 + + def to_str(self): + s = str(self.sum_delta_PDNS_ISP) + ',' + s += str(self.uids_PDNS_ISP) + ',' + s += str(self.average_PDNS_ISP) + ',' + s += str(self.sum_delta_others_ISP) + ',' + s += str(self.uids_others_ISP) + ',' + s += str(self.average_others_ISP) + return(s) + + def add_row(self, row, delay_name): + self.sum_delta_PDNS_ISP += row[delay_name[0]] + self.uids_PDNS_ISP += row[delay_name[1]] + self.sum_delta_others_ISP += row[delay_name[2]] + self.uids_others_ISP += row[delay_name[3]] + self.average() + + def set_from_row(self, row, delay_name): + self.sum_delta_PDNS_ISP = row[delay_name[0]] + self.uids_PDNS_ISP = row[delay_name[1]] + self.sum_delta_others_ISP = row[delay_name[2]] + self.uids_others_ISP = row[delay_name[3]] + self.average() + + + + class recap_cc_as: def __init__(self, query_cc, query_AS, slice_start, slice_duration, recap_file, is_first=False): self.query_cc = query_cc @@ -154,6 +239,12 @@ def __init__(self, query_cc, query_AS, slice_start, slice_duration, recap_file, self.per_prov = [ 0, 0, 0 ] self.per_rr = [ flux_cc_as_rr(), flux_cc_as_rr(), flux_cc_as_rr() ] + self.delays = [ + first_delay_cc_as(), + first_delay_cc_as(), + first_delay_cc_as() + ] + def init_next_slice(self): # Copy the values in the previous slice, @@ -189,6 +280,9 @@ def init_next_slice(self): self.previous_slice.per_rr[i_rr].per_prov[i_prov] = \ self.per_rr[i_rr].per_prov[i_prov] + for i_prov in range(0, 3): + self.previous_slice.delays[i_prov].copy( self.delays[i_prov]) + self.previous_slice.uids = self.uids self.previous_slice.slice_number = self.slice_number self.previous_slice.should_save = True @@ -217,10 +311,12 @@ def init_next_slice(self): for i_prov in range(0,3): self.per_prov[i_prov] = 0 for i_rr in range(0,3): - #self.per_rr[i_rr].total = 0 for i_prov in range(0,3): self.per_rr[i_rr].per_prov[i_prov] = 0 + for i_prov in range(0, 3): + self.delays[i_prov].reset() + self.uids = dict() self.should_save = True self.slice_number += 1 @@ -248,28 +344,39 @@ def summarize(self): for i in range(0, len(delta_range)): for j in range(0,3): self.nb_A_under[i][j] += r_uid.nb_A_under[i][j] + self.delays[0].update(r_uid.first_time_A) + self.delays[1].update(r_uid.first_time_AAAA) + self.delays[2].update(r_uid.first_time_HTTPS) + for i in range(0,3): + self.delays[i].average() def update_uid(self, r_uid, query_time, rr_type, resolver_tag): #update_uid is only called if the delay to time stamp is less than 30s. + if resolver_tag in rsv_log_parse.tag_isp_set: + prov_index = 0 + elif resolver_tag in rsv_log_parse.tag_public_set: + prov_index = 1 + else: + prov_index = 2 if rr_type == 'HTTPS': r_uid.has_https = True + if r_uid.first_time_HTTPS[prov_index] == 0: + r_uid.first_time_HTTPS[prov_index] = query_time elif rr_type == 'AAAA': r_uid.has_AAAA = True + if r_uid.first_time_AAAA[prov_index] == 0: + r_uid.first_time_AAAA[prov_index] = query_time elif rr_type == 'A': - if not r_uid.has_A: - r_uid.first_time_A = query_time - delta_t = query_time - r_uid.first_time_A r_uid.has_A = True - if resolver_tag in rsv_log_parse.tag_isp_set: - prov_index = 0 - elif resolver_tag in rsv_log_parse.tag_public_set: - prov_index = 1 - else: - prov_index = 2 + is_new_A_prov = False + if r_uid.first_time_A[prov_index] == 0: + r_uid.first_time_A[prov_index] = query_time + is_new_A_prov = True + delta_t = query_time - r_uid.first_time_A[prov_index] if delta_t <= cutoff_delay: r_uid.has_A_prov[prov_index] = True for i in range(0, len(delta_range)): - if delta_t <= delta_range[i]: + if delta_t <= delta_range[i] and (i > 0 or is_new_A_prov): r_uid.nb_A_under[i][prov_index] += 1 r_uid.nb_A_prov[prov_index] += 1 break @@ -298,7 +405,6 @@ def get_header(): s += 'nb_A_u10s_ISP,nb_A_u10s_PDNS,nb_A_u10s_others,' s += 'nb_A_u30s_ISP,nb_A_u30s_PDNS,nb_A_u30s_others,' s += 'zombies,z_ISP,z_PDNS,z_others,first_3s,first_10s,' - for i_prov in range(0,3): s += "uids_" + prov_names[i_prov] + ',' for i_rr in range(0,3): @@ -307,7 +413,15 @@ def get_header(): prefix += '_' for i_prov in range(0,3): s += prefix + prov_names[i_prov] + ',' - s += 'sum_delay,max_delay\n' + s_i_x = ["_PDNS_ISP", "_others_ISP"] + for rr_name in rr_names: + for i_x in range(0, 2): + s += "sum_deltas_" + rr_name + s_i_x[i_x] + ',' + s += "uids_" + rr_name + s_i_x[i_x] + ',' + s += "average_" + rr_name + s_i_x[i_x] + ',' + s += 'sum_delay,max_delay' + s += '\n' + return s def save_to_file(self): @@ -341,9 +455,11 @@ def save_to_file(self): for i_prov in range(0,3): s += str(self.per_rr[i_rr].per_prov[i_prov]) + ',' - s += str(self.sum_first_delay) + ',' + str(self.max_first_delay) + '\n' - + for i_rr in range(0,3): + self.delays[i_rr].average() + s += self.delays[i_rr].to_str() + ',' + s += str(self.sum_first_delay) + ',' + str(self.max_first_delay) + '\n' self.recap_file.write(s) @@ -509,7 +625,13 @@ def save_and_close(self): 'uids_A_ISP', 'uids_A_PDNS', 'uids_A_others', 'uids_AAAA_ISP', 'uids_AAAA_PDNS', 'uids_AAAA_others', 'uids_HTTPS_ISP', 'uids_HTTPS_PDNS', 'uids_HTTPS_others', - 'sum_delay', 'max_delay' + 'sum_deltas_A_PDNS_ISP', 'uids_A_PDNS_ISP', 'average_A_PDNS_ISP', + 'sum_deltas_A_others_ISP', 'uids_A_others_ISP', 'average_A_others_ISP', + 'sum_deltas_AAAA_PDNS_ISP', 'uids_AAAA_PDNS_ISP', 'average_AAAA_PDNS_ISP', + 'sum_deltas_AAAA_others_ISP', 'uids_AAAA_others_ISP', 'average_AAAA_others_ISP', + 'sum_deltas_HTTPS_PDNS_ISP', 'uids_HTTPS_PDNS_ISP', 'average_HTTPS_PDNS_ISP', + 'sum_deltas_HTTPS_others_ISP', 'uids_HTTPS_others_ISP', 'average_HTTPS_others_ISP', + 'sum_delay', 'max_delay' ] recap_first_columns = [ @@ -533,7 +655,13 @@ def save_and_close(self): 'uids_A_ISP', 'uids_A_PDNS', 'uids_A_others', 'uids_AAAA_ISP', 'uids_AAAA_PDNS', 'uids_AAAA_others', 'uids_HTTPS_ISP', 'uids_HTTPS_PDNS', 'uids_HTTPS_others', - 'sum_delay' + 'sum_deltas_A_PDNS_ISP', 'uids_A_PDNS_ISP', 'average_A_PDNS_ISP', + 'sum_deltas_A_others_ISP', 'uids_A_others_ISP', 'average_A_others_ISP', + 'sum_deltas_AAAA_PDNS_ISP', 'uids_AAAA_PDNS_ISP', 'average_AAAA_PDNS_ISP', + 'sum_deltas_AAAA_others_ISP', 'uids_AAAA_others_ISP', 'average_AAAA_others_ISP', + 'sum_deltas_HTTPS_PDNS_ISP', 'uids_HTTPS_PDNS_ISP', 'average_HTTPS_PDNS_ISP', + 'sum_deltas_HTTPS_others_ISP', 'uids_HTTPS_others_ISP', 'average_HTTPS_others_ISP', + 'sum_delay' ] recap_PDNS = [ @@ -555,6 +683,12 @@ class recap_row: 'uids_A_ISP', 'uids_A_PDNS', 'uids_A_others', 'uids_AAAA_ISP', 'uids_AAAA_PDNS', 'uids_AAAA_others', 'uids_HTTPS_ISP', 'uids_HTTPS_PDNS', 'uids_HTTPS_others' ] + delay_name = [['sum_deltas_A_PDNS_ISP', 'uids_A_PDNS_ISP', + 'sum_deltas_A_others_ISP', 'uids_A_others_ISP'], + ['sum_deltas_AAAA_PDNS_ISP', 'uids_AAAA_PDNS_ISP', + 'sum_deltas_AAAA_others_ISP', 'uids_AAAA_others_ISP'], + ['sum_deltas_HTTPS_PDNS_ISP', 'uids_HTTPS_PDNS_ISP', + 'sum_deltas_HTTPS_others_ISP', 'uids_HTTPS_others_ISP']] def __init__(self, row): self.query_cc = row['CC'] self.query_AS = row['AS'] @@ -591,10 +725,10 @@ def __init__(self, row): self.first_10s = row['first_10s'] self.sum_delay = row['sum_delay'] self.max_delay = row['max_delay'] - self.uids_ISP = row['uids_ISP'] self.uids_PDNS = row['uids_PDNS'] self.uids_others = row['uids_others'] + self.uids_A_ISP = row['uids_A_ISP'] self.uids_A_PDNS = row['uids_A_PDNS'] self.uids_A_others = row['uids_A_others'] @@ -605,6 +739,14 @@ def __init__(self, row): self.uids_HTTPS_PDNS = row['uids_HTTPS_PDNS'] self.uids_HTTPS_others = row['uids_HTTPS_others'] + self.delays = [ + first_delay_cc_as(), + first_delay_cc_as(), + first_delay_cc_as() + ] + for i_rr in range(0, 3): + self.delays[i_rr].set_from_row(row, recap_row.delay_name[i_rr]) + def add_row(self, row): self.total_uids += row['uids'] self.first_ISP += row['first_ISP'] @@ -650,6 +792,9 @@ def add_row(self, row): self.uids_HTTPS_PDNS += row['uids_HTTPS_PDNS'] self.uids_HTTPS_others += row['uids_HTTPS_others'] + for i_rr in range(0, 3): + self.delays[i_rr].add_row(row, recap_row.delay_name[i_rr]) + class recap_cc_as2: def __init__(self, query_cc, query_AS): self.query_cc = query_cc @@ -662,7 +807,13 @@ def __init__(self, query_cc, query_AS): self.nb_A_under = [[ 0, 0, 0 ],[ 0, 0, 0 ],[ 0, 0, 0 ],[ 0, 0, 0 ],[ 0, 0, 0 ],[ 0, 0, 0 ],[ 0, 0, 0 ],[ 0, 0, 0 ],[ 0, 0, 0 ]] self.per_prov = [0, 0, 0] self.per_rr = [[0, 0, 0],[0, 0, 0],[0, 0, 0]] + self.delays = [ + first_delay_cc_as(), + first_delay_cc_as(), + first_delay_cc_as() + ] + # add_row is used to add rows of the same slice. def add_row(self, row): self.total_uids += row['uids'] start = row['start'] @@ -671,7 +822,6 @@ def add_row(self, row): else: self.slices[start].add_row(row) - def evaluate(self): self.total_PDNS = [ 0, 0, 0, 0, 0, 0, 0 ] self.top_PDNS = [ 0, 1, 2 ] @@ -762,6 +912,10 @@ def save_file(self, file_name): s += str(r_row.uids_HTTPS_ISP) + "," s += str(r_row.uids_HTTPS_PDNS) + "," s += str(r_row.uids_HTTPS_others) + "," + + for i_rr in range(0,3): + s += r_row.delays[i_rr].to_str() + "," + s += str(r_row.max_delay) + "," uids = r_row.total_uids @@ -794,6 +948,12 @@ def summary_columns(): 'uids_A_ISP', 'uids_A_PDNS', 'uids_A_others', 'uids_AAAA_ISP', 'uids_AAAA_PDNS', 'uids_AAAA_others', 'uids_HTTPS_ISP', 'uids_HTTPS_PDNS', 'uids_HTTPS_others', + 'sum_deltas_A_PDNS_ISP', 'uids_A_PDNS_ISP', 'average_A_PDNS_ISP', + 'sum_deltas_A_others_ISP', 'uids_A_others_ISP', 'average_A_others_ISP', + 'sum_deltas_AAAA_PDNS_ISP', 'uids_AAAA_PDNS_ISP', 'average_AAAA_PDNS_ISP', + 'sum_deltas_AAAA_others_ISP', 'uids_AAAA_others_ISP', 'average_AAAA_others_ISP', + 'sum_deltas_HTTPS_PDNS_ISP', 'uids_HTTPS_PDNS_ISP', 'average_HTTPS_PDNS_ISP', + 'sum_deltas_HTTPS_others_ISP', 'uids_HTTPS_others_ISP', 'average_HTTPS_others_ISP', 'average_delay', 'max_delay' ] return columns @@ -825,7 +985,7 @@ def summary_row(self): z_others = 0 first_3s = 0 first_10s = 0 - + uids_ISP = 0 uids_PDNS = 0 uids_others = 0 @@ -839,6 +999,12 @@ def summary_row(self): uids_HTTPS_PDNS = 0 uids_HTTPS_others = 0 + delays = [ + first_delay_cc_as(), + first_delay_cc_as(), + first_delay_cc_as() + ] + max_delay = 0 sum_delay = 0 @@ -873,7 +1039,7 @@ def summary_row(self): z_others += r_row.z_others first_3s += r_row.first_3s first_10s += r_row.first_10s - + uids_ISP += r_row.uids_ISP uids_PDNS += r_row.uids_PDNS uids_others += r_row.uids_others @@ -887,6 +1053,9 @@ def summary_row(self): uids_HTTPS_PDNS += r_row.uids_HTTPS_PDNS uids_HTTPS_others += r_row.uids_HTTPS_others + for i_rr in range(0,3): + delays[i_rr].add_delays(r_row.delays[i_rr]) + if max_delay < r_row.max_delay: max_delay = r_row.max_delay sum_delay += r_row.sum_delay @@ -955,7 +1124,7 @@ def summary_row(self): z_others, first_3s, first_10s, - + uids_ISP, uids_PDNS, uids_others, @@ -967,10 +1136,12 @@ def summary_row(self): uids_AAAA_others, uids_HTTPS_ISP, uids_HTTPS_PDNS, - uids_HTTPS_others, + uids_HTTPS_others] + + for i_rr in range(0,3): + row += delays[i_rr].to_list() - average_delay, - max_delay ] + row += [average_delay, max_delay] return row diff --git a/resolver/rsv_recap_daily.py b/resolver/rsv_recap_daily.py index 19e7bba..fe81b27 100644 --- a/resolver/rsv_recap_daily.py +++ b/resolver/rsv_recap_daily.py @@ -1,6 +1,6 @@ # Recapitulate the tables per CC/AS. # Produce one file for all CC/AS that have more that 1000 commands per day -# +# import sys import os @@ -14,7 +14,7 @@ import time import bz2 from rsv_delay_class import delay_query_as -import rsv_arguments +import rsv_arguments from rsv_recap import recap_log, recap_lines import concurrent.futures @@ -27,7 +27,7 @@ def usage(): print("in the output directory a file for each CC/AS that has > 10,000") print("events") - + class file_bucket: def __init__(self, ip2a4, ip2a6, as_names, output_file, source_file, bucket_id, time_start): self.ip2a4 = ip2a4 @@ -74,23 +74,23 @@ def load_bucket(bucket): print("Invalid list of input files.") usage() exit(-1) - + # load the IP mapping tables source_path = Path(__file__).resolve() resolver_dir = source_path.parent auto_source_dir = resolver_dir.parent print("Auto source path is: " + str(auto_source_dir) + " (source: " + str(source_path) + ")") - source_dir = os.path.join(auto_source_dir, "data") - ip2a4_file = os.path.join(source_dir, "ip2as.csv") + source_dir = os.path.join(auto_source_dir, "data") + ip2a4_file = os.path.join(source_dir, "ip2as.csv") ip2a6_file = os.path.join(source_dir, "ip2asv6.csv") - as_names_file = os.path.join(source_dir, "as_names.csv") + as_names_file = os.path.join(source_dir, "as_names.csv") ip2a4 = ip2as.ip2as_table() ip2a4.load(ip2a4_file) ip2a6 = ip2as.ip2as_table() ip2a6.load(ip2a6_file) as_names = ip2as.asname() as_names.load(as_names_file) - time_loaded = time.time() + time_loaded = time.time() print("Tables loaded at " + str(time_loaded - time_start) + " seconds.") @@ -124,7 +124,7 @@ def load_bucket(bucket): bucket_time = time.time() # All the buckets have been processed. Now, create the tables. - + rcl = recap_lines() for bucket in bucket_list: rcl.load_recap(bucket.output_file) @@ -132,14 +132,14 @@ def load_bucket(bucket): nb_files = 0 for key in rcl.cc_as_list: if rcl.cc_as_list[key].total_uids > 10000: - as_file = os.path.join(output_dir, "recap2-" + - rcl.cc_as_list[key].query_cc + "-" + + as_file = os.path.join(output_dir, "recap2-" + + rcl.cc_as_list[key].query_cc + "-" + rcl.cc_as_list[key].query_AS + ".csv") rcl.cc_as_list[key].save_file(as_file) print("Saved: " + str(len(rcl.cc_as_list[key].slices)) + " slice in " + as_file) nb_files += 1 print("Saved " + str(nb_files) + " CC/AS files.") - + summary_file = os.path.join(output_dir, "recap2-summary.csv") df = rcl.summary_df() df.to_csv(summary_file) diff --git a/resolver/rsv_recap_test_slices.py b/resolver/rsv_recap_test_slices.py index 8e3b3c2..69a4ad0 100644 --- a/resolver/rsv_recap_test_slices.py +++ b/resolver/rsv_recap_test_slices.py @@ -10,7 +10,7 @@ #import rsv_both_graphs import pandas as pd isp_A_slices = [ - 'nb_A_0ms_ISP', + 'nb_A_0ms_ISP', 'nb_A_u10ms_ISP', 'nb_A_u30ms_ISP', 'nb_A_u100ms_ISP', @@ -21,6 +21,42 @@ nb_A_0ms = [ 'nb_A_0ms_ISP', 'nb_A_0ms_PDNS', 'nb_A_0ms_others'] + +delay_columns = [ + ['sum_deltas_A_PDNS_ISP', 'uids_A_PDNS_ISP', 'average_A_PDNS_ISP'], + ['sum_deltas_A_others_ISP', 'uids_A_others_ISP', 'average_A_others_ISP'], + ['sum_deltas_AAAA_PDNS_ISP', 'uids_AAAA_PDNS_ISP', 'average_AAAA_PDNS_ISP'], + ['sum_deltas_AAAA_others_ISP', 'uids_AAAA_others_ISP', 'average_AAAA_others_ISP'], + ['sum_deltas_HTTPS_PDNS_ISP', 'uids_HTTPS_PDNS_ISP', 'average_HTTPS_PDNS_ISP'], + ['sum_deltas_HTTPS_others_ISP', 'uids_HTTPS_others_ISP', 'average_HTTPS_others_ISP']] + +uids_columns = [ + [ 'A_ISP_PDNS', 'A_all3', 'uids_A_PDNS_ISP' ], + [ 'A_ISP_others', 'A_all3', 'uids_A_others_ISP' ]] + +def check_average(r, headers): + if r[headers[1]] > 0: + average = r[headers[0]] / r[headers[1]] + if abs(average - r[headers[2]]) > 0.000001: + print("average " + headers[2] + " = " + str(r[headers[2]]) + + " != sum_deltas/uids = " + str(average)) + print(r) + exit() + elif r[headers[2]] != 0: + print("average " + headers[2] + " = " + str(r[headers[2]]) + + " != 0 when uids is 0.") + print(r) + exit() + +def check_uids(r, headers): + uids = r[headers[0]] + r[headers[1]] + if uids != r[headers[2]]: + print("uids " + headers[2] + " = " + str(r[headers[2]]) + + " != sum of columns " + headers[0] + " and " + headers[1] + + " = " + str(uids)) + print(r) + exit() + def check_row(r): sum_slices = 0 sum_uids = 0 @@ -43,6 +79,10 @@ def check_row(r): print("nb_A = " + str(r['nb_A']) + " != sum of 0ms slices " + str(sum_0ms)) print(r) exit() + for headers in uids_columns: + check_uids(r, headers) + for headers in delay_columns: + check_average(r, headers) # @@ -51,6 +91,3 @@ def check_row(r): #print(df.columns) df.apply(lambda row: check_row(row),axis=1) print("test pass.") - - -