Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
125622e
added seed to random choice
Jul 26, 2021
eb1788d
Merge branch 'aousterh:main' into main
ananyaks Jul 29, 2021
45317b0
Merge remote-tracking branch 'upstream/main' into main
Jul 29, 2021
39339f6
Merge branch 'main' of https://github.com/ananyaks/data_model into main
Jul 29, 2021
34b3253
Merge branch 'main' of https://github.com/ananyaks/data_model into main
ananyaks Jul 29, 2021
141f7d2
Merge remote-tracking branch 'upstream/main' into main
ananyaks Aug 1, 2021
517f8db
changed generate workload.py to sample from a list of IPs instead of …
ananyaks Aug 1, 2021
856ee9d
Removed 'id.orig_h' from being appended to each query name to allow f…
ananyaks Aug 2, 2021
7a39e7f
Merge remote-tracking branch 'upstream/main' into main
ananyaks Aug 3, 2021
5fa841c
Added script to issue elastic queries
ananyaks Aug 3, 2021
751d331
config edits to make script cleaner to run straight from repository
ananyaks Aug 3, 2021
090b346
Merge remote-tracking branch 'upstream/main' into main
ananyaks Aug 5, 2021
d13214f
Changes based on PR comments 1
ananyaks Aug 5, 2021
1220e9c
resolved merge conflict
Aug 12, 2021
20bb8b4
added flag for whether to generate list of unique IPs each time
Aug 12, 2021
947492e
fixed config file to run in repo
ananyaks Aug 13, 2021
84c6628
updated script to use search after
ananyaks Sep 9, 2021
901e52d
resolved merge conflict
ananyaks Sep 9, 2021
7f807d9
Merge branch 'aousterh:main' into main
ananyaks Sep 11, 2021
2338a86
included script with analytics and search queries handled, as well as…
ananyaks Sep 14, 2021
db6f838
Merge remote-tracking branch 'upstream/main' into main
ananyaks Sep 14, 2021
4432123
Merge branch 'main' of https://github.com/ananyaks/data_model into main
ananyaks Sep 14, 2021
ab3a827
updated elastic script to use scroll and fixed validation bug
ananyaks Sep 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions benchmark/elastic/elastic_benchmark_config.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
DATA_DIR="../workload/trace/"

WORKLOAD_FILE_NAME="network_log_search_30.ndjson"
#WORKLOAD_FILE_NAME="network_log_search_needles_30.ndjson"
#WORKLOAD_FILE_NAME="network_log_search_needles_30_2.ndjson"
WORKLOAD_FILE_NAME="network_log_analytics_avg_30.ndjson"
#WORKLOAD_FILE_NAME="debug.ndjson"

OUTPUT_DIR="elastic-output/"

OUTPUT_FILE_NAME="query_execution_times.csv"
#OUTPUT_FILE_NAME="elastic_network_log_search_needles_30_execution_times.csv"
#OUTPUT_FILE_NAME="elastic_network_log_search_needles_30_2_execution_times.csv"
OUTPUT_FILE_NAME="elastic_network_log_analytics_avg_30_execution_times.csv"
#OUTPUT_FILE_NAME="debug.csv"

QUERIES = ["search id.orig_h",
"search id.orig_h + sort ts",
"search id.orig_h + sort ts + slice 5",
"search id.orig_h + count by id.resp_h",
"search id.orig_h + sum orig_bytes",
"search id.orig_h + count by schema"]
QUERIES = ["search id.orig_h","search id.orig_h + sort ts + slice 1000", "analytics avg field"]


AGGREGATION_FIELDS= {"search id.orig_h + count by id.resp_h": "id.resp_h",
"search id.orig_h + count by schema": "_path" }

AGGREGATION_FIELDS= {"search id.orig_h + count by id.resp_h": "id.resp_h",
"search id.orig_h + count by schema": "_path" }
90 changes: 57 additions & 33 deletions benchmark/elastic/issue_elastic_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import numpy as np
import os
import pandas as pd
import time

from elastic_benchmark_config import *

Expand All @@ -13,67 +14,80 @@
queries = QUERIES
aggregation_fields = AGGREGATION_FIELDS

def runQuery(query):
def runQuery(query, queryname):
cmd = query + " > " + output_directory + "query-output.json"
os.system(cmd)

last_ports = []

with open(output_directory + "query-output.json", 'r') as j:
contents = json.loads(j.read())
executiontime = contents["took"]
hits = contents["hits"]["total"]["value"]
if queryname == "analytics avg field":
avg = contents["aggregations"]["avg_field"]["value"]
else:
avg = 0
if queryname == "search id.orig_h + sort ts + slice 1000":
for content in contents["hits"]["hits"]:
last_ports.append(content["_source"]["id.orig_p"])
last_port = last_ports.pop()
else:
last_port = 0
ret = {"executiontime": executiontime * 0.001, "hits": hits, "avg": avg, "last_port": last_port }

os.system("rm " + output_directory + "query-output.json")
return {"executiontime": executiontime * 0.001, "hits": hits}
return ret

def getQuery(queryname, arguments):
# TODO: edit queries to return all results, not just top 10-1000
def getQuery(queryname, arguments, sortval):
if queryname == "search id.orig_h":
query = "curl -X GET 'http://localhost:9200/test/_search?q=id.orig_h:"+arguments[0]+"&format=json&pretty'"
elif queryname == "search id.orig_h + sort ts":
query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"sort\" : [\"ts\" ],\"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" }}}'"
elif queryname == "search id.orig_h + sort ts + slice 5":
query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"size\": 5,\"sort\" : [{\"ts\" : {\"order\": \"asc\" }}],\"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" }}}'"
elif queryname == "search id.orig_h + count by id.resp_h":
query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"id.resp_h\": {\"terms\": {\"field\": \"id.resp_h\"}}}}'"
elif queryname == "search id.orig_h + sum orig_bytes":
query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"orig_bytes\": {\"sum\": {\"field\": \"orig_bytes\"}}}}'"
elif queryname == "search id.orig_h + count by schema":
query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"schema\": {\"terms\": {\"field\": \"_path\"}}}}'"
query = "curl -X POST \"localhost:9200/test/_search?scroll=1m&pretty\" -H 'Content-Type: application/json' -d'{\"query\": {\"term\" : {\"id.orig_h\" : \""+arguments[0]+"\"}}}'"
elif queryname == "search id.orig_h + sort ts + slice 1000":
query = "curl -X GET \"localhost:9200/test/_search?scroll=1m&pretty\" -H 'Content-Type: application/json' -d'{\"size\": 1000,\"sort\" : [{\"ts\" : {\"order\": \"asc\" }}],\"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" }}}'"
elif queryname == "analytics avg field":
query = "curl -X GET \"localhost:9200/test/_search?scroll=1m&pretty\" -H 'Content-Type: application/json' -d'{\"aggs\": {\"avg_field\": {\"avg\": { \"field\": \""+arguments[0]+"\"}}}}'"
else:
query= ""
return query

def issueQueries():
real_list = []
took_list = []
query_name_list = []
argument_list = []
validation_list =[]

avg_list =[]
hits_list =[]
real_list=[]
port_list =[]

restartElastic()

with open(input_directory + workload_file ) as f:
data = ndjson.load(f)
for d in data:
start_time = time.time()
if (d['query'] in queries):
queryname = d['query']
query = getQuery(queryname, d['arguments'])
query = getQuery(queryname, d['arguments'], 0)
if query:
if queryname in aggregation_fields:
prepForAggregation(aggregation_fields[queryname])

queryoutput = runQuery(query)
real_list.append(queryoutput["executiontime"])
queryoutput = runQuery(query, queryname)

took_list.append(queryoutput['executiontime'])
query_name_list.append(d['query'])
argument_list.append(d['arguments'])
validation_list.append(queryoutput["hits"])
avg_list.append(queryoutput["avg"])
hits_list.append(queryoutput["hits"])
port_list.append(queryoutput["last_port"])

if queryname in aggregation_fields:
cleanFromAggregation(aggregation_fields[queryname])
else:
print('Query Not Supported: ' + str(d['query']))
else:
print('Query Not Supported: ' + str(d['query']))

num_queries = len(real_list)

end_time = time.time()
real_list.append(end_time-start_time)

num_queries = len(took_list)
output_df = pd.DataFrame({'index': range(num_queries)})
output_df = output_df.set_index('index')

Expand All @@ -86,22 +100,32 @@ def issueQueries():
output_df.insert(6, 'user', [np.nan] * num_queries)
output_df.insert(7, 'sys',[np.nan] * num_queries)
output_df.insert(8, 'argument_0', argument_list)
output_df.insert(9, 'validation', validation_list)
if any(avg_list):
output_df.insert(9, 'validation', avg_list)
elif any(port_list):
output_df.insert(9, 'validation', port_list)
else:
output_df.insert(9, 'validation', hits_list)
output_df.insert(10, 'instance', ["m5d.2xlarge"] * num_queries)
output_df.insert(11,'took', took_list)

output_df.to_csv(output_directory + output_file, na_rep='NaN')
return output_df


def restartElastic():
os.system("sudo systemctl stop elasticsearch.service")
print("Elastic Stopped")
os.system("sudo systemctl start elasticsearch.service")
print("Elastic Started")
os.system("sleep 30")
print("Sleep Complete")


# Aggregation prep/clean due to:
# "Text fields are not optimised for operations that require per-document field data like aggregations and sorting, so these operations are disabled by default.
# Please use a keyword field instead.
# Alternatively, set fielddata=true on [id.resp_h] in order to load field data by uninverting the inverted index.
# Aggregation prep/clean due to:
# "Text fields are not optimised for operations that require per-document field data like aggregations and sorting, so these operations are disabled by default.
# Please use a keyword field instead.
# Alternatively, set fielddata=true on [id.resp_h] in order to load field data by uninverting the inverted index.
# Note that this can use significant memory."

def prepForAggregation(fieldname):
Expand Down