Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 119 additions & 12 deletions include/afina/concurrency/Executor.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#ifndef AFINA_CONCURRENCY_EXECUTOR_H
#define AFINA_CONCURRENCY_EXECUTOR_H

#include <cassert>
#include <condition_variable>
#include <functional>
#include <iostream>
#include <memory>
#include <mutex>
#include <queue>
Expand All @@ -16,6 +18,7 @@ namespace Concurrency {
* # Thread pool
*/
class Executor {
public:
enum class State {
// Threadpool is fully operational, tasks could be added and get executed
kRun,
Expand All @@ -27,17 +30,47 @@ class Executor {
// Threadppol is stopped
kStopped
};
Executor(uint32_t low_watermark, uint32_t hight_watermark, uint32_t max_queue_size, uint32_t idle_time)
: low_watermark(low_watermark), hight_watermark(hight_watermark), max_queue_size(max_queue_size),
idle_time(idle_time) {
state = State::kRun;
_current_workers = 0;
//_logger->debug("Threadpool watermarks is {} {}", low_watermark, hight_watermark);
for (uint32_t i = 0; i != low_watermark; ++i) {
std::unique_lock<std::mutex> lock(_queue_modify);
std::thread th(&Executor::perform, this);
//_logger->debug("Creating thread {}/{}, id {}", i + 1, low_watermark, th.get_id());
th.detach();
++_current_workers;
}
}

Executor(std::string name, int size);
~Executor();
~Executor() {
assert(low_watermark < hight_watermark && low_watermark > 0);
Stop(true);
}

/**
* Signal thread pool to stop, it will stop accepting new jobs and close threads just after each become
* free. All enqueued jobs will be complete.
*
* In case if await flag is true, call won't return until all background jobs are done and all threads are stopped
*/
void Stop(bool await = false);
void Stop(bool await = false) {
std::unique_lock<std::mutex> lock(_queue_modify);
if (state == State::kStopped) {
return;
}
state = State::kStopping;
if (tasks.empty()) {
empty_condition.notify_all();
}
if (await) {
while (state != State::kStopped) {
had_finished.wait(lock);
}
}
}

/**
* Add function to be executed on the threadpool. Method returns true in case if task has been placed
Expand All @@ -50,39 +83,111 @@ class Executor {
// Prepare "task"
auto exec = std::bind(std::forward<F>(func), std::forward<Types>(args)...);

std::unique_lock<std::mutex> lock(this->mutex);
std::unique_lock<std::mutex> lock(_queue_modify);
if (state != State::kRun) {
return false;
}

// Enqueue new task
if (tasks.size() > max_queue_size) {
return false;
}
tasks.push_back(exec);
empty_condition.notify_one();
return true;
}

private:
// No copy/move/assign allowed
Executor(const Executor &); // = delete;
Executor(Executor &&); // = delete;
Executor &operator=(const Executor &); // = delete;
Executor &operator=(Executor &&); // = delete;
Executor(const Executor &) = delete;
Executor(Executor &&) = delete;
Executor &operator=(const Executor &) = delete;
Executor &operator=(Executor &&) = delete;

/**
* Main function that all pool threads are running. It polls internal task queue and execute tasks
* Method that all pool threads are running. It polls internal task queue and execute tasks
*/
friend void perform(Executor *executor);
void perform() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(_queue_modify);
if (!tasks.empty()) {
task = tasks.front();
tasks.pop_front();
//_logger->debug("Thread id {} has got task", std::this_thread::get_id());
} else if (tasks.empty() && state == State::kStopping) {
--_current_workers;
empty_condition.notify_all();
if (!_current_workers) {
state = State::kStopped;
had_finished.notify_all();
}
return;
} else {
while (tasks.empty()) {
auto now = std::chrono::system_clock::now();
while (tasks.empty()) {
if (empty_condition.wait_until(lock, now + std::chrono::milliseconds(idle_time)) ==
std::cv_status::timeout) {
if (_current_workers > low_watermark) {
--_current_workers;
//_logger->debug("Thread with id {} has been killed", std::this_thread::get_id());
return;
}
}
if (tasks.empty() && state == State::kStopping) {
--_current_workers;
if (!_current_workers) {
state = State::kStopped;
had_finished.notify_all();
}
//_logger->debug("Thread with id {} has been killed", std::this_thread::get_id());
return;
}
}
}
task = tasks.front();
tasks.pop_front();
//_logger->debug("Thread with id {} has got task after wait", std::this_thread::get_id());
}
}
{
std::unique_lock<std::mutex> lock(_queue_modify);
if (!tasks.empty() && state == State::kRun && _current_workers < hight_watermark) {
++_current_workers;
std::thread th(&Executor::perform, this);
//_logger->debug("Creating thread with id {}", th.get_id());
th.detach();
}
}
// std::cout << "task execution" << std::endl;
//_logger->debug("Thread id {} is processing task", std::this_thread::get_id());
task();
}
}

/**
* Mutex to protect state below from concurrent modification
* Mutex to protect state and tasks below from concurrent modification
*/
std::mutex mutex;
std::mutex _queue_modify;

/**
* Conditional variable to await new data in case of empty queue
*/
std::condition_variable empty_condition;

uint32_t _current_workers;
uint32_t low_watermark;
uint32_t hight_watermark;
uint32_t max_queue_size;
uint32_t idle_time;

/**
* Conditional variable to inform thread with Stop function that all work has been done
*/
std::condition_variable had_finished;

/**
* Vector of actual threads that perorm execution
*/
Expand All @@ -97,6 +202,8 @@ class Executor {
* Flag to stop bg threads
*/
State state;

// std::shared_ptr<Logging::Service> _logger;
};

} // namespace Concurrency
Expand Down
4 changes: 2 additions & 2 deletions src/concurrency/Executor.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#include <afina/concurrency/Executor.h>
#include "afina/concurrency/Executor.h"

namespace Afina {
namespace Concurrency {}
namespace Concurrency {} // namespace Concurrency
} // namespace Afina
5 changes: 5 additions & 0 deletions src/execute/Set.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <afina/Storage.h>
#include <afina/execute/Set.h>
#include <thread>

#include <iostream>

Expand All @@ -9,7 +10,11 @@ namespace Execute {
// memcached protocol: "set" means "store this data".
void Set::Execute(Storage &storage, const std::string &args, std::string &out) {
std::cout << "Set(" << _key << "): " << args << std::endl;
// std::cout << "Lol1" << std::endl;
storage.Put(_key, args);
// std::this_thread::sleep_for(std::chrono::milliseconds(10000));

std::cout << "STORED" << std::endl;
out = "STORED";
}

Expand Down
2 changes: 1 addition & 1 deletion src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class Application {
console.color = true;

Logging::Logger &logger = logConfig->loggers["root"];
logger.level = Logging::Logger::Level::WARNING;
logger.level = Logging::Logger::Level::DEBUG;
logger.appenders.push_back("console");
logger.format = "[%H:%M:%S %z] [thread %t] [%n] [%l] %v";
logService.reset(new Logging::ServiceImpl(logConfig));
Expand Down
109 changes: 99 additions & 10 deletions src/network/mt_blocking/ServerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,25 +73,31 @@ void ServerImpl::Start(uint16_t port, uint32_t n_accept, uint32_t n_workers) {
}

running.store(true);
executor.reset();
// TODO make it configurable
auto new_ex = std::unique_ptr<Concurrency::Executor>(
new Concurrency::Executor(std::max(n_workers / 2, uint32_t(1)), n_workers, 100, 10000));
executor = std::move(new_ex);

_thread = std::thread(&ServerImpl::OnRun, this);
}

// See Server.h
void ServerImpl::Stop() {
running.store(false);
shutdown(_server_socket, SHUT_RDWR);
executor->Stop(true);
}

// See Server.h
void ServerImpl::Join() {
assert(_thread.joinable());
_thread.join();
close(_server_socket);
executor->Stop(true);
}

// See Server.h
void ServerImpl::OnRun() {
// Here is connection state
void ServerImpl::Worker(int client_socket) {
// - parser: parse state of the stream
// - command_to_execute: last command parsed out of stream
// - arg_remains: how many bytes to read from stream to get command argument
Expand All @@ -100,6 +106,93 @@ void ServerImpl::OnRun() {
Protocol::Parser parser;
std::string argument_for_command;
std::unique_ptr<Execute::Command> command_to_execute;

try {
int readed_bytes = -1;
char client_buffer[4096];
while (running.load() && (readed_bytes = read(client_socket, client_buffer, sizeof(client_buffer))) > 0) {
_logger->debug("Got {} bytes from socket", readed_bytes);

// Single block of data readed from the socket could trigger inside actions a multiple times,
// for example:
// - read#0: [<command1 start>]
// - read#1: [<command1 end> <argument> <command2> <argument for command 2> <command3> ... ]
while (running.load() && readed_bytes > 0) {
_logger->debug("Process {} bytes", readed_bytes);
// There is no command yet
if (!command_to_execute) {
std::size_t parsed = 0;
if (parser.Parse(client_buffer, readed_bytes, parsed)) {
// There is no command to be launched, continue to parse input stream
// Here we are, current chunk finished some command, process it
_logger->debug("Found new command: {} in {} bytes", parser.Name(), parsed);
command_to_execute = parser.Build(arg_remains);
if (arg_remains > 0) {
arg_remains += 2;
}
}

// Parsed might fails to consume any bytes from input stream. In real life that could happens,
// for example, because we are working with UTF-16 chars and only 1 byte left in stream
if (parsed == 0) {
break;
} else {
std::memmove(client_buffer, client_buffer + parsed, readed_bytes - parsed);
readed_bytes -= parsed;
}
}

// There is command, but we still wait for argument to arrive...
if (command_to_execute && arg_remains > 0) {
_logger->debug("Fill argument: {} bytes of {}", readed_bytes, arg_remains);
// There is some parsed command, and now we are reading argument
std::size_t to_read = std::min(arg_remains, std::size_t(readed_bytes));
argument_for_command.append(client_buffer, to_read);

std::memmove(client_buffer, client_buffer + to_read, readed_bytes - to_read);
arg_remains -= to_read;
readed_bytes -= to_read;
}

// Thre is command & argument - RUN!
if (command_to_execute && arg_remains == 0) {
_logger->debug("Start command execution on client_socket {}", client_socket);

std::string result;
command_to_execute->Execute(*pStorage, argument_for_command, result);
_logger->debug("finished execution on client_socket {}", client_socket);

// Send response
result += "\r\n";
if (send(client_socket, result.data(), result.size(), 0) <= 0) {
throw std::runtime_error("Failed to send response");
}

command_to_execute.reset();
argument_for_command.resize(0);
parser.Reset();
}
} // while (readed_bytes)
}

if (readed_bytes == 0) {
_logger->debug("Connection closed");
} else {
_logger->debug("Finish connection with readed_bytes {}", readed_bytes);
throw std::runtime_error(std::string(strerror(errno)));
}
} catch (std::runtime_error &ex) {
_logger->error("Failed to process connection on descriptor {}: {}", client_socket, ex.what());
}

// We are done with this connection
close(client_socket);
}

// See Server.h
void ServerImpl::OnRun() {
// Here is connection state

while (running.load()) {
_logger->debug("waiting for connection...");

Expand Down Expand Up @@ -134,18 +227,14 @@ void ServerImpl::OnRun() {

// TODO: Start new thread and process data from/to connection
{
static const std::string msg = "TODO: start new thread and process memcached protocol instead";
if (send(client_socket, msg.data(), msg.size(), 0) <= 0) {
_logger->error("Failed to write response to client: {}", strerror(errno));
}
close(client_socket);
if (!running.load() || !executor->Execute(&ServerImpl::Worker, this, client_socket))
close(client_socket);
}
}

// Cleanup on exit...
_logger->warn("Network stopped");
}

} // namespace MTblocking

} // namespace Network
} // namespace Afina
Loading