Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/execute/Set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ namespace Execute {
// memcached protocol: "set" means "store this data".
void Set::Execute(Storage &storage, const std::string &args, std::string &out) {
std::cout << "Set(" << _key << "): " << args << std::endl;
// std::cout << "Lol1" << std::endl;
storage.Put(_key, args);

std::cout << "STORED" << std::endl;
out = "STORED";
}

Expand Down
2 changes: 1 addition & 1 deletion src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class Application {
console.color = true;

Logging::Logger &logger = logConfig->loggers["root"];
logger.level = Logging::Logger::Level::WARNING;
logger.level = Logging::Logger::Level::DEBUG;
logger.appenders.push_back("console");
logger.format = "[%H:%M:%S %z] [thread %t] [%n] [%l] %v";
logService.reset(new Logging::ServiceImpl(logConfig));
Expand Down
127 changes: 120 additions & 7 deletions src/network/mt_blocking/ServerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,25 +73,40 @@ void ServerImpl::Start(uint16_t port, uint32_t n_accept, uint32_t n_workers) {
}

running.store(true);
_current_workers = 0;
_max_workers_count = n_workers;

_thread = std::thread(&ServerImpl::OnRun, this);
}

// See Server.h
void ServerImpl::Stop() {
running.store(false);
shutdown(_server_socket, SHUT_RDWR);
{
std::unique_lock<std::mutex> lock(_count_changes);

while (running.load() || _current_workers) {
all_done.wait(lock);
}
}
}

// See Server.h
void ServerImpl::Join() {
assert(_thread.joinable());
_thread.join();
close(_server_socket);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это должен делать поток accept

{
std::unique_lock<std::mutex> lock(_count_changes);

while (running.load() || _current_workers) {
all_done.wait(lock);
}
}
}

// See Server.h
void ServerImpl::OnRun() {
// Here is connection state
void ServerImpl::Worker(int client_socket) {
// - parser: parse state of the stream
// - command_to_execute: last command parsed out of stream
// - arg_remains: how many bytes to read from stream to get command argument
Expand All @@ -100,6 +115,100 @@ void ServerImpl::OnRun() {
Protocol::Parser parser;
std::string argument_for_command;
std::unique_ptr<Execute::Command> command_to_execute;

try {
int readed_bytes = -1;
char client_buffer[4096];
while (running.load() && (readed_bytes = read(client_socket, client_buffer, sizeof(client_buffer))) > 0) {
_logger->debug("Got {} bytes from socket", readed_bytes);

// Single block of data readed from the socket could trigger inside actions a multiple times,
// for example:
// - read#0: [<command1 start>]
// - read#1: [<command1 end> <argument> <command2> <argument for command 2> <command3> ... ]
while (running.load() && readed_bytes > 0) {
_logger->debug("Process {} bytes", readed_bytes);
// There is no command yet
if (!command_to_execute) {
std::size_t parsed = 0;
if (parser.Parse(client_buffer, readed_bytes, parsed)) {
// There is no command to be launched, continue to parse input stream
// Here we are, current chunk finished some command, process it
_logger->debug("Found new command: {} in {} bytes", parser.Name(), parsed);
command_to_execute = parser.Build(arg_remains);
if (arg_remains > 0) {
arg_remains += 2;
}
}

// Parsed might fails to consume any bytes from input stream. In real life that could happens,
// for example, because we are working with UTF-16 chars and only 1 byte left in stream
if (parsed == 0) {
break;
} else {
std::memmove(client_buffer, client_buffer + parsed, readed_bytes - parsed);
readed_bytes -= parsed;
}
}

// There is command, but we still wait for argument to arrive...
if (command_to_execute && arg_remains > 0) {
_logger->debug("Fill argument: {} bytes of {}", readed_bytes, arg_remains);
// There is some parsed command, and now we are reading argument
std::size_t to_read = std::min(arg_remains, std::size_t(readed_bytes));
argument_for_command.append(client_buffer, to_read);

std::memmove(client_buffer, client_buffer + to_read, readed_bytes - to_read);
arg_remains -= to_read;
readed_bytes -= to_read;
}

// Thre is command & argument - RUN!
if (command_to_execute && arg_remains == 0) {
_logger->debug("Start command execution on client_socket {}", client_socket);

std::string result;
command_to_execute->Execute(*pStorage, argument_for_command, result);
_logger->debug("finished execution on client_socket {}", client_socket);

// Send response
result += "\r\n";
if (send(client_socket, result.data(), result.size(), 0) <= 0) {
throw std::runtime_error("Failed to send response");
}

command_to_execute.reset();
argument_for_command.resize(0);
parser.Reset();
}
} // while (readed_bytes)
}

if (readed_bytes == 0) {
_logger->debug("Connection closed");
} else {
_logger->debug("Finish connection with readed_bytes {}", readed_bytes);
throw std::runtime_error(std::string(strerror(errno)));
}
} catch (std::runtime_error &ex) {
_logger->error("Failed to process connection on descriptor {}: {}", client_socket, ex.what());
}

// We are done with this connection
close(client_socket);
{
std::unique_lock<std::mutex> lock(_count_changes);
--_current_workers;
if (!_current_workers && !running.load()) {
all_done.notify_all();
}
}
}

// See Server.h
void ServerImpl::OnRun() {
// Here is connection state

while (running.load()) {
_logger->debug("waiting for connection...");

Expand Down Expand Up @@ -134,11 +243,15 @@ void ServerImpl::OnRun() {

// TODO: Start new thread and process data from/to connection
{
static const std::string msg = "TODO: start new thread and process memcached protocol instead";
if (send(client_socket, msg.data(), msg.size(), 0) <= 0) {
_logger->error("Failed to write response to client: {}", strerror(errno));
std::unique_lock<std::mutex> lock(_count_changes);

if (_current_workers < _max_workers_count && running.load()) {
++_current_workers;
std::thread worker(&ServerImpl::Worker, this, client_socket);
worker.detach();
} else {
close(client_socket);
}
close(client_socket);
}
}

Expand Down
17 changes: 17 additions & 0 deletions src/network/mt_blocking/ServerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#define AFINA_NETWORK_MT_BLOCKING_SERVER_H

#include <atomic>
#include <condition_variable>
#include <mutex>
#include <thread>

#include <afina/network/Server.h>
Expand Down Expand Up @@ -39,6 +41,9 @@ class ServerImpl : public Server {
void OnRun();

private:
// Worker's logic
void Worker(int client_socket);

// Logger instance
std::shared_ptr<spdlog::logger> _logger;

Expand All @@ -47,6 +52,18 @@ class ServerImpl : public Server {
// bounds
std::atomic<bool> running;

// Count of workers
uint32_t _current_workers;

// Max workers count
uint32_t _max_workers_count;

// Mutex to safely change _current_workers and condition_variable
std::mutex _count_changes;

// condition_variable to notify all threads in Join()
std::condition_variable all_done;

// Server socket to accept connections on
int _server_socket;

Expand Down
142 changes: 137 additions & 5 deletions src/storage/SimpleLRU.cpp
Original file line number Diff line number Diff line change
@@ -1,22 +1,154 @@
#include "SimpleLRU.h"
#include <cassert>

namespace Afina {
namespace Backend {

bool SimpleLRU::_FreeSpace(std::size_t size) {
if (size > _max_size) {
return false;
}
while (_current_size + size > _max_size) {
Delete(_lru_head->key);
}
return true;
}

void SimpleLRU::_PutToTail(std::unique_ptr<lru_node> &&node) {
if (!node->next) {
// only head exitst or it's tail
if (!_lru_head) {
// head
_lru_head = std::move(node);
} else {
// tail
node->prev->next = std::move(node);
}
} else if (!_lru_head) {
// only if we've cut head
_lru_head = std::move(node->next);
auto tail = node->prev;
tail->next = std::move(node);
_lru_head->prev->next.reset();
} else {
node->next->prev = node->prev;
node->prev->next = std::move(node->next);
auto tail = _lru_head->prev;
_lru_head->prev = node.get();
node->prev = tail;
tail->next = std::move(node);
}
}

void SimpleLRU::_PutWithoutCheck(const std::string &key, const std::string &value) {
assert(_FreeSpace(key.size() + value.size()));

auto new_node = std::unique_ptr<lru_node>(new lru_node(key, value));

_lru_index.insert(std::make_pair(std::ref(new_node->key), std::ref(*new_node)));
_current_size += key.size() + value.size();

if (!_lru_head) {
new_node->prev = new_node.get();
new_node->next.reset();
_lru_head = std::move(new_node);
} else {
new_node->prev = _lru_head->prev;
new_node->next.reset();
_lru_head->prev->next = std::move(new_node);
_lru_head->prev = _lru_head->prev->next.get();
}
}

// See MapBasedGlobalLockImpl.h
bool SimpleLRU::Put(const std::string &key, const std::string &value) { return false; }
bool SimpleLRU::Put(const std::string &key, const std::string &value) {
if (key.size() + value.size() > _max_size) {
return false;
}

if (!Set(key, value)) {
_PutWithoutCheck(key, value);
}
return true;
}

// See MapBasedGlobalLockImpl.h
bool SimpleLRU::PutIfAbsent(const std::string &key, const std::string &value) { return false; }
bool SimpleLRU::PutIfAbsent(const std::string &key, const std::string &value) {
if (key.size() + value.size() > _max_size) {
return false;
}
if (_lru_index.find(std::ref(key)) == _lru_index.end()) {
_PutWithoutCheck(key, value);
return true;
}
return false;
}

// See MapBasedGlobalLockImpl.h
bool SimpleLRU::Set(const std::string &key, const std::string &value) { return false; }
bool SimpleLRU::Set(const std::string &key, const std::string &value) {
if (key.size() + value.size() > _max_size) {
return false;
}

auto iter = _lru_index.find(std::ref(key));
if (iter == _lru_index.end()) {
return false;
}

auto &node = iter->second.get();
auto ptr = std::move((&node == _lru_head.get()) ? _lru_head : node.prev->next);
_PutToTail(std::move(ptr));
std::size_t free = value.size() > node.value.size() ? value.size() - node.value.size() : 0;
_FreeSpace(free);
_current_size += free;
node.value = value;
return true;
}

// See MapBasedGlobalLockImpl.h
bool SimpleLRU::Delete(const std::string &key) { return false; }
bool SimpleLRU::Delete(const std::string &key) {
auto iter = _lru_index.find(std::ref(key));
if (iter != _lru_index.end()) {
auto &node = iter->second.get();
_lru_index.erase(iter);
_current_size -= node.key.size() + node.value.size();

if (node.prev == &node) {
// only head exists
_lru_head.reset();
} else {
if (&node == _lru_head.get()) {
// deleting head
node.next->prev = node.prev;
_lru_head = std::move(node.next);
} else if (!node.next) {
// deleting tail
_lru_head->prev = node.prev;
node.prev->next.reset();
} else {
node.next->prev = node.prev;
node.prev->next = std::move(node.next);
}
}

return true;
}
return false;
}

// See MapBasedGlobalLockImpl.h
bool SimpleLRU::Get(const std::string &key, std::string &value) { return false; }
bool SimpleLRU::Get(const std::string &key, std::string &value) {
auto iter = _lru_index.find(std::ref(key));
if (iter == _lru_index.end()) {
return false;
} else {
auto &node = iter->second.get();
auto ptr = std::move((&node == _lru_head.get()) ? _lru_head : node.prev->next);
_PutToTail(std::move(ptr));
value = node.value;
return true;
}
}

} // namespace Backend
} // namespace Afina
Loading