Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions project_4_client_server_analytics/analytics_lib/src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use csv::StringRecord;
use csv_sniffer::{Sniffer, Type};
use crate::dataset::{ColumnType, Dataset, Row, Value};

//function just builds the csv file into a dataset
pub fn read_input_csv_file(filename: &str) -> Dataset {
// Sniff CSV column types.
let mut sniffer = Sniffer::new();
Expand Down
33 changes: 19 additions & 14 deletions project_4_client_server_analytics/analytics_lib/src/dataset.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
use serde::{Deserialize, Serialize};
use std::fmt::{Debug, Display};

#[derive(Clone, PartialEq, Eq, Debug)]
#[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
//ColumnType stores string and integer
pub enum ColumnType {
String,
Integer,
}

#[derive(Clone, PartialEq, Hash, Eq, Debug, PartialOrd, Ord)]
//Value stores string or integer
#[derive(Clone, PartialEq, Hash, Eq, Debug, PartialOrd, Ord, Serialize, Deserialize)]
pub enum Value {
String(String),
Integer(i32),
}
//convert value into string
impl Value {
pub fn to_string(&self) -> String {
match self {
Expand All @@ -20,48 +24,49 @@ impl Value {
}
}

#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct Row {
values: Vec<Value>,
}
impl Row {
pub fn new(values: Vec<Value>) -> Row {
return Row { values };
}
pub fn get_values(&self) -> &Vec<Value> {
pub fn get_values(&self) -> &Vec<Value> { //returns one row
return &self.values;
}
pub fn get_value(&self, index: usize) -> &Value {
pub fn get_value(&self, index: usize) -> &Value { //returns one value (either string or integer)
return &self.values[index];
}
pub fn move_values(self) -> Vec<Value> {
pub fn move_values(self) -> Vec<Value> { //moves values into new vector
return self.values;
}
}

#[derive(Serialize, Deserialize, Clone)]
pub struct Dataset {
columns: Vec<(String, ColumnType)>,
rows: Vec<Row>,
}
impl Dataset {
pub fn new(columns: Vec<(String, ColumnType)>) -> Dataset {
pub fn new(columns: Vec<(String, ColumnType)>) -> Dataset { //creates a new dataset
return Dataset {
columns,
rows: Vec::new(),
};
}
pub fn add_row(&mut self, row: Row) {
pub fn add_row(&mut self, row: Row) { //adds a new row
self.rows.push(row);
}

pub fn columns(&self) -> &Vec<(String, ColumnType)> {
pub fn columns(&self) -> &Vec<(String, ColumnType)> { //returns names of columns
return &self.columns;
}
pub fn column_type(&self, column_name: &String) -> &ColumnType {
pub fn column_type(&self, column_name: &String) -> &ColumnType { //returns if its string or integer
let i = self.column_index(column_name);
return &self.columns[i].1;
}
pub fn column_index(&self, column_name: &String) -> usize {
pub fn column_index(&self, column_name: &String) -> usize { //finds column needed and returns index
for i in 0..self.columns.len() {
let (cname, _ctype) = &self.columns[i];
if cname == column_name {
Expand All @@ -71,15 +76,15 @@ impl Dataset {
panic!("Column {} not found", column_name);
}

pub fn iter(&self) -> std::slice::Iter<'_, Row> {
pub fn iter(&self) -> std::slice::Iter<'_, Row> { // gives references to each row of dataset
return self.rows.iter();
}

pub fn into_iter(self) -> std::vec::IntoIter<Row> {
pub fn into_iter(self) -> std::vec::IntoIter<Row> { //gives you the actual rows of each dataset (moving it out)
return self.rows.into_iter();
}

pub fn len(&self) -> usize {
pub fn len(&self) -> usize { //finds how many rows are in dataset
return self.rows.len();
}
}
Expand Down
7 changes: 5 additions & 2 deletions project_4_client_server_analytics/analytics_lib/src/query.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use serde::{Deserialize, Serialize};
use crate::dataset::Value;

#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum Condition {
Equal(String, Value),
Not(Box<Condition>),
Not(Box<Condition>), //takes in conditions written on heap
And(Box<Condition>, Box<Condition>),
Or(Box<Condition>, Box<Condition>),
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum Aggregation {
Count(String),
Sum(String),
Expand All @@ -21,7 +24,7 @@ impl Aggregation {
}
}
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Query {
filter: Condition,
group_by: String,
Expand Down
92 changes: 88 additions & 4 deletions project_4_client_server_analytics/analytics_lib/src/solution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,102 @@ use std::collections::HashMap;
use crate::dataset::{ColumnType, Dataset, Value, Row};
use crate::query::{Aggregation, Condition, Query};

pub fn filter_dataset(dataset: &Dataset, filter: &Condition) -> Dataset {
todo!("Implement this!");

//recursive helper function that iterates throughout all rows and return true or false
pub fn row_matches (row: &Row, condition: &Condition, dataset: &Dataset) -> bool {
match condition {
//1. equal condition > all use this basically
Condition::Equal(col_name, expected) => {
//get index of column name
let index = dataset.column_index (col_name);
//looks at values of each row of that index and compare against expected
row.get_value(index) == expected
}
//2. not condition
Condition::Not(conditions) => {
//not is just opposite of equal
!row_matches(row, conditions, dataset)
}
//3. and condition
Condition::And(left, right) => {
//unpacks box condition to left and right b/c it takes in two boxes
row_matches(row, left, dataset) && row_matches(row, right, dataset)
}
//4. or condition
Condition::Or(left, right)=> {
//same as and
row_matches(row, left, dataset) || row_matches (row, right, dataset)
}
}
}

pub fn filter_dataset(dataset: &Dataset, filter: &Condition) -> Dataset {
//create a new empty dataset first
let mut result = Dataset::new(dataset.columns().clone());
//iterate through all rows in dataset, using recursive row matches function, if returns true, then add that row into new dataset
for row in dataset.iter() {
if row_matches(row, filter, dataset) {
result.add_row(row.clone());
}
}
result
}
//split dataset into many datasets inside a HashMap that are mapped using the values of whatever column name
pub fn group_by_dataset(dataset: Dataset, group_by_column: &String) -> HashMap<Value, Dataset> {
todo!("Implement this!");
//return copy of column name and type
let columns = dataset.columns().clone();
//gets column index with desired column name
let column_index = dataset.column_index(group_by_column);
let mut groups: HashMap<Value, Dataset> = HashMap::new();

//moves the row out of old dataset into new Hashmap
for row in dataset.into_iter() {
//take value at column index in each row
let key = row.get_value(column_index).clone();
//does it all in one: if already has a key then go to it, if not create a new empty dataset and add rows
groups.entry(key).or_insert_with(||Dataset::new(columns.clone())).add_row(row);
}
groups
}

//create a helper function to sum up values in column
fn sum_column(dataset: &Dataset, col_name: &String) -> i32 {
//get index of column name
let col_index = dataset.column_index(col_name);
let mut sum = 0;
//first check that value of column is integer, then index the row value and add to sum
for row in dataset.iter() {
if let Value::Integer(val) = row.get_value(col_index) {
sum += val;
}
}
sum

}

//performs math on the data, returns hashmap that maps whatever column into count, sum or avg
pub fn aggregate_dataset(dataset: HashMap<Value, Dataset>, aggregation: &Aggregation) -> HashMap<Value, Value> {
todo!("Implement this!");
//moves the values out of old hashmap into a new one
dataset.into_iter().map(|(key, group)| {
let agg_value = match aggregation {
Aggregation::Count(_) => {
Value::Integer(group.len() as i32) //just get len for count
}
Aggregation::Sum(col_name) => {
Value::Integer(sum_column(&group, col_name)) //use helper function
}
Aggregation::Average(col_name)=> {
let total = sum_column(&group, col_name);
Value::Integer(total / group.len() as i32)
}
};
(key, agg_value)
})
.collect()
}

//function for part 2 - slow rpc!!
//takes all the
pub fn compute_query_on_dataset(dataset: &Dataset, query: &Query) -> Dataset {
let filtered = filter_dataset(dataset, query.get_filter());
let grouped = group_by_dataset(filtered, query.get_group_by());
Expand Down
13 changes: 6 additions & 7 deletions project_4_client_server_analytics/client/src/solution.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use analytics_lib::{dataset::Dataset, query::Query, solution::compute_query_on_dataset};
use analytics_lib::{dataset::Dataset, query::Query, solution::compute_query_on_dataset};
use interface::RPCInterfaceClient;
use tarpc::context::Context;

Expand All @@ -9,17 +9,16 @@ pub async fn run_hello(rpc_client: &RPCInterfaceClient) {

pub async fn run_slow_rpc(rpc_client: &RPCInterfaceClient, query: Query) -> Dataset {
println!("using slow_rpc");

// let dataset = rpc_client.slow_rpc().await.unwrap(); // Hint: this line is incomplete, you may need to fix it!
todo!("Implement this");
let dataset = rpc_client.slow_rpc(Context::current()).await.unwrap(); // Hint: this line is incomplete, you may need to fix it!
//for slow rpc the query is computed on the client side!! = slow
return compute_query_on_dataset(&dataset, &query);

// What should you do to the dataset?
// Hint: you have not used `query`, maybe you need to use it somehow?
}

pub async fn run_fast_rpc(rpc_client: &RPCInterfaceClient, query: Query) -> Dataset {
println!("using fast_rpc");

// You should call fast_rpc here and not slow_rpc.
todo!("Implement this");
let dataset = rpc_client.fast_rpc(Context::current(), query).await.unwrap();
return dataset;
}
4 changes: 2 additions & 2 deletions project_4_client_server_analytics/interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ use analytics_lib::{dataset::Dataset, query::Query};
#[tarpc::service]
pub trait RPCInterface {
async fn hello() -> String;
// async fn slow_rpc() -> Dataset;
// async fn fast_rpc(query: Query) -> Dataset;
async fn slow_rpc() -> Dataset;
async fn fast_rpc(query: Query) -> Dataset;
}
6 changes: 0 additions & 6 deletions project_4_client_server_analytics/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,12 @@ impl RPCInterface for AnalyticsServer {
async fn hello(self, _context: tarpc::context::Context) -> String {
return solution::hello();
}

/*
async fn slow_rpc(self, _context: tarpc::context::Context) -> analytics_lib::dataset::Dataset {
return solution::slow_rpc(self.dataset);
}
*/

/*
async fn fast_rpc(self, _context: tarpc::context::Context, query: analytics_lib::query::Query) -> analytics_lib::dataset::Dataset {
return solution::fast_rpc(self.dataset, query);
}
*/
}

// Do not modify this code.
Expand Down
8 changes: 5 additions & 3 deletions project_4_client_server_analytics/server/src/solution.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use analytics_lib::{dataset::Dataset, query::Query};
use analytics_lib::{dataset::Dataset, query::Query, solution::compute_query_on_dataset};

pub fn hello() -> String {
println!("hello called");
Expand All @@ -7,10 +7,12 @@ pub fn hello() -> String {

pub fn slow_rpc(input_dataset: &Dataset) -> Dataset {
println!("slow_rpc called");
todo!("Implement this");
return input_dataset.clone();
}

pub fn fast_rpc(input_dataset: &Dataset, query: Query) -> Dataset {
println!("fast_rpc called");
todo!("Implement this");
//diff is that it computes query and generates new dataset ON THE SERVER SIDE!!
return compute_query_on_dataset(input_dataset, &query);

}