Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions client/python/anna/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,32 @@ def _prepare_data_request(self, keys):

return (req, tuples)

# Helper function to create a KeyRequest (see
# hydro-project/common/proto/anna.proto). Takes in a key name and returns a
# tuple containing a KeyRequest and a KeyTuple contained in that KeyRequest
# with response_address, request_id, and address_cache_size automatically
# populated.
def _prepare_delta_data_request(self, keys, serialized_previous):
req = KeyRequest()
req.request_id = self._get_request_id()
req.response_address = self.response_address

tuples = []

for key in keys:
# TODO: is tup included in req?
tup = req.tuples.add()
tuples.append(tup)
tup.key = key
tup.previous_payload = serialized_previous
tup.delta = True

if self.address_cache and key in self.address_cache:
tup.address_cache_size = len(self.address_cache[key])

return (req, tuples)


# Returns and increments a request ID. Loops back after 10,000 requests.
def _get_request_id(self):
response = self.ut.get_ip() + ':' + str(self.rid)
Expand Down
74 changes: 74 additions & 0 deletions client/python/anna/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@
SocketCache
)

from anna.lattice import (
LWWPairLattice,
SetLattice
)

from anna.anna_pb2 import (
# Anna's lattice types as an enum
LWW, SET)



class AnnaTcpClient(BaseAnnaClient):
def __init__(self, elb_addr, ip, local=False, offset=0):
Expand Down Expand Up @@ -76,6 +86,8 @@ def __init__(self, elb_addr, ip, local=False, offset=0):

self.rid = 0

self.cache = {}

def get(self, keys):
if type(keys) != list:
keys = [keys]
Expand Down Expand Up @@ -115,6 +127,68 @@ def get(self, keys):

return kv_pairs

def get_delta(self, keys):
if type(keys) != list:
keys = [keys]

worker_addresses = {}
for key in keys:
worker_addresses[key] = (self._get_worker_address(key))

# Initialize all KV pairs to 0. Only change a value if we get a valid
# response from the server.
kv_pairs = {}
for key in keys:
kv_pairs[key] = None

request_ids = []
for key in worker_addresses:
if key in self.cache:
# TODO: if the key is in cache,
# send the previous_payplad along with the request
kv_pairs[key] = self.cache[key]

if worker_addresses[key]:
send_sock = self.pusher_cache.get(worker_addresses[key])

req, _ = self._prepare_delta_data_request([key], self._serialize(self.cache[key])[0])
req.type = GET

send_request(req, send_sock)
request_ids.append(req.request_id)

else:
if worker_addresses[key]:
send_sock = self.pusher_cache.get(worker_addresses[key])

req, _ = self._prepare_data_request([key])
req.type = GET

send_request(req, send_sock)
request_ids.append(req.request_id)

# Wait for all responses to return.
responses = recv_response(request_ids, self.response_puller,
KeyResponse)

for response in responses:
for tup in response.tuples:
if tup.invalidate:
self._invalidate_cache(tup.key)

lattice_value = self._deserialize(tup)

if (lattice_value.identical):
lattice_value = self.cache[key]

if tup.error == NO_ERROR:
kv_pairs[tup.key] = lattice_value
cache[tup.key] = lattice_value

return kv_pairs



def get_all(self, keys):
if type(keys) != list:
keys = [keys]
Expand Down
2 changes: 1 addition & 1 deletion common
2 changes: 1 addition & 1 deletion include/kvs/kvs_handlers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ void send_gossip(AddressKeysetMap &addr_keyset_map, SocketCache &pushers,
map<Key, KeyProperty> &stored_key_map);

std::pair<string, AnnaError> process_get(const Key &key,
Serializer *serializer);
Serializer *serializer, bool delta = false, const string& previous_payload = "");

void process_put(const Key &key, LatticeType lattice_type,
const string &payload, Serializer *serializer,
Expand Down
57 changes: 41 additions & 16 deletions include/kvs/server_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "kvs_common.hpp"
#include "lattices/lww_pair_lattice.hpp"
#include "yaml-cpp/yaml.h"
#include "metadata.hpp"

// Define the garbage collect threshold
#define GARBAGE_COLLECT_THRESHOLD 10000000
Expand All @@ -48,7 +49,7 @@ typedef map<Address, set<Key>> AddressKeysetMap;

class Serializer {
public:
virtual string get(const Key &key, AnnaError &error) = 0;
virtual string get(const Key &key, AnnaError &error, bool delta = false, const string &previous_payload = "") = 0;
virtual unsigned put(const Key &key, const string &serialized) = 0;
virtual void remove(const Key &key) = 0;
virtual ~Serializer(){};
Expand All @@ -60,14 +61,24 @@ class MemoryLWWSerializer : public Serializer {
public:
MemoryLWWSerializer(MemoryLWWKVS *kvs) : kvs_(kvs) {}

string get(const Key &key, AnnaError &error) {
string get(const Key &key, AnnaError &error, bool delta = false, const string &previous_payload = "") {

auto val = kvs_->get(key, error);

if (val.reveal().value == "") {
if (val.size().reveal() == 0) {
error = AnnaError::KEY_DNE;
}

if (!delta) {
return serialize(val);
} else {
if (val.reveal().timestamp != deserialize_lww(previous_payload).reveal().timestamp) {
return serialize(val);
} else {
return kDeltaRequestIdentical;
}
}

return serialize(val);
}

unsigned put(const Key &key, const string &serialized) {
Expand All @@ -85,12 +96,24 @@ class MemorySetSerializer : public Serializer {
public:
MemorySetSerializer(MemorySetKVS *kvs) : kvs_(kvs) {}

string get(const Key &key, AnnaError &error) {
string get(const Key &key, AnnaError &error, bool delta = false, const string &previous_payload = "") {
auto val = kvs_->get(key, error);

if (val.size().reveal() == 0) {
error = AnnaError::KEY_DNE;
}
return serialize(val);

if (!delta) {
return serialize(val);
} else {
SetLattice<string> deserialized_payload = deserialize_set(previous_payload);

if (val.reveal() != deserialized_payload.reveal()) {
return serialize(val);
} else {
return kDeltaRequestIdentical;
}
}
}

unsigned put(const Key &key, const string &serialized) {
Expand All @@ -108,7 +131,7 @@ class MemoryOrderedSetSerializer : public Serializer {
public:
MemoryOrderedSetSerializer(MemoryOrderedSetKVS *kvs) : kvs_(kvs) {}

string get(const Key &key, AnnaError &error) {
string get(const Key &key, AnnaError &error, bool delta = false, const string &previous_payload = "") {
auto val = kvs_->get(key, error);
if (val.size().reveal() == 0) {
error = AnnaError::KEY_DNE;
Expand All @@ -131,7 +154,7 @@ class MemorySingleKeyCausalSerializer : public Serializer {
public:
MemorySingleKeyCausalSerializer(MemorySingleKeyCausalKVS *kvs) : kvs_(kvs) {}

string get(const Key &key, AnnaError &error) {
string get(const Key &key, AnnaError &error, bool delta = false, const string &previous_payload = "") {
auto val = kvs_->get(key, error);
if (val.reveal().value.size().reveal() == 0) {
error = AnnaError::KEY_DNE;
Expand All @@ -156,7 +179,7 @@ class MemoryMultiKeyCausalSerializer : public Serializer {
public:
MemoryMultiKeyCausalSerializer(MemoryMultiKeyCausalKVS *kvs) : kvs_(kvs) {}

string get(const Key &key, AnnaError &error) {
string get(const Key &key, AnnaError &error, bool delta = false, const string &previous_payload = "") {
auto val = kvs_->get(key, error);
if (val.reveal().value.size().reveal() == 0) {
error = AnnaError::KEY_DNE;
Expand All @@ -182,7 +205,7 @@ class MemoryPrioritySerializer : public Serializer {
public:
MemoryPrioritySerializer(MemoryPriorityKVS *kvs) : kvs_(kvs) {}

string get(const Key &key, AnnaError &error) {
string get(const Key &key, AnnaError &error, bool delta = false, const string &previous_payload = "") {
auto val = kvs_->get(key, error);
if (val.reveal().value == "") {
error = AnnaError::KEY_DNE;
Expand Down Expand Up @@ -214,7 +237,7 @@ class DiskLWWSerializer : public Serializer {
}
}

string get(const Key &key, AnnaError &error) {
string get(const Key &key, AnnaError &error, bool delta = false, const string &previous_payload = "") {
string res;
LWWValue value;

Expand Down Expand Up @@ -299,7 +322,7 @@ class DiskSetSerializer : public Serializer {
}
}

string get(const Key &key, AnnaError &error) {
string get(const Key &key, AnnaError &error, bool delta = false, const string &previous_payload = "") {
string res;
SetValue value;

Expand Down Expand Up @@ -395,7 +418,7 @@ class DiskOrderedSetSerializer : public Serializer {
}
}

string get(const Key &key, AnnaError &error) {
string get(const Key &key, AnnaError &error, bool delta = false, const string &previous_payload = "") {
string res;
SetValue value;

Expand Down Expand Up @@ -491,7 +514,7 @@ class DiskSingleKeyCausalSerializer : public Serializer {
}
}

string get(const Key &key, AnnaError &error) {
string get(const Key &key, AnnaError &error, bool delta = false, const string &previous_payload = "") {
string res;
SingleKeyCausalValue value;

Expand Down Expand Up @@ -608,7 +631,7 @@ class DiskMultiKeyCausalSerializer : public Serializer {
}
}

string get(const Key &key, AnnaError &error) {
string get(const Key &key, AnnaError &error, bool delta = false, const string &previous_payload = "") {
string res;
MultiKeyCausalValue value;

Expand Down Expand Up @@ -761,7 +784,7 @@ class DiskPrioritySerializer : public Serializer {
ebs_root_ += "/";
}

string get(const Key &key, AnnaError &error) override {
string get(const Key &key, AnnaError &error, bool delta = false, const string &previous_payload = "") override {
string res;
PriorityValue value;

Expand Down Expand Up @@ -842,3 +865,5 @@ struct PendingGossip {
};

#endif // INCLUDE_KVS_SERVER_UTILS_HPP_


1 change: 1 addition & 0 deletions include/metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "threads.hpp"

const string kMetadataTypeReplication = "replication";
const string kDeltaRequestIdentical = "identical";

struct TierEnumHash {
template <typename T> std::size_t operator()(T t) const {
Expand Down
16 changes: 13 additions & 3 deletions src/kvs/user_request_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include "kvs/kvs_handlers.hpp"
#include "metadata.hpp"

void user_request_handler(
unsigned &access_count, unsigned &seed, string &serialized, logger log,
Expand All @@ -39,6 +40,8 @@ void user_request_handler(
// first check if the thread is responsible for the key
Key key = tuple.key();
string payload = tuple.payload();
bool delta = tuple.delta();
const string previous_payload = tuple.previous_payload();

ServerThreadList threads = kHashRingUtil->get_responsible_threads(
wt.replication_response_connect_address(), key, is_metadata(key),
Expand Down Expand Up @@ -74,12 +77,19 @@ void user_request_handler(
if (stored_key_map.find(key) == stored_key_map.end() ||
stored_key_map[key].type_ == LatticeType::NONE) {

tp->set_error(AnnaError::KEY_DNE);
tp->set_error(AnnaError::KEY_DNE);
} else {
auto res = process_get(key, serializers[stored_key_map[key].type_]);

auto res = process_get(key, serializers[stored_key_map[key].type_], delta, previous_payload);
tp->set_lattice_type(stored_key_map[key].type_);
tp->set_payload(res.first);
if (res.first == kDeltaRequestIdentical) {
tp->set_identical(true);

} else {
tp->set_payload(res.first);
}
tp->set_error(res.second);

}
} else if (request_type == RequestType::PUT) {
if (tuple.lattice_type() == LatticeType::NONE) {
Expand Down
6 changes: 4 additions & 2 deletions src/kvs/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ void send_gossip(AddressKeysetMap &addr_keyset_map, SocketCache &pushers,
type = stored_key_map[key].type_;
}



auto res = process_get(key, serializers[type]);

if (res.second == 0) {
Expand All @@ -51,9 +53,9 @@ void send_gossip(AddressKeysetMap &addr_keyset_map, SocketCache &pushers,
}

std::pair<string, AnnaError> process_get(const Key &key,
Serializer *serializer) {
Serializer *serializer, bool delta, const string &previous_payload) {
AnnaError error = AnnaError::NO_ERROR;
auto res = serializer->get(key, error);
auto res = serializer->get(key, error, delta, previous_payload);
return std::pair<string, AnnaError>(std::move(res), error);
}

Expand Down
Loading