diff --git a/CMakeLists.txt b/CMakeLists.txt index fb577ab..b423600 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -24,11 +24,13 @@ set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED True) #set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g3") +add_compile_definitions(TBB_USE_THREADING_TOOLS=1) find_package(ROOT REQUIRED COMPONENTS Core RIO Tree) find_package(TBB REQUIRED) find_package(zstd REQUIRED) find_package(lz4 REQUIRED) +find_package(LibLZMA REQUIRED) set(THREADS_PREFER_PTHREAD_FLAG ON) find_package(Threads REQUIRED) @@ -84,6 +86,7 @@ add_executable(threaded_io_test outputerFactoryGenerator.cc WaiterFactory.cc waiterFactoryGenerator.cc + WaitingTaskList.cc ScaleWaiter.cc EventSleepWaiter.cc EventUnevenSleepWaiter.cc @@ -99,6 +102,7 @@ target_compile_definitions(threaded_io_test PUBLIC TBB_PREVIEW_TASK_GROUP_EXTENS target_link_libraries(threaded_io_test PRIVATE LZ4::lz4 + LibLZMA::LibLZMA ROOT::Core ROOT::RIO ROOT::Tree @@ -188,3 +192,26 @@ if(ENABLE_HDF5) add_test(NAME TestProductsHDFEvent COMMAND bash -c "${CMAKE_CURRENT_BINARY_DIR}/threaded_io_test -s TestProductsSource -t 1 -n 10 -o HDFEventOutputer=test_prod_e.h5") #; ${CMAKE_CURRENT_BINARY_DIR}/threaded_io_test -s HDFSource=test_prodi_e.h5 -t 1 -n 10 -o TestProductsOutputer") endif() + +option(ENABLE_S3 "Build S3 Sources and Outputers" OFF) # default OFF +if(ENABLE_S3) + if(NOT DEFINED LIBS3_DIR) + message(FATAL_ERROR "You must provide LIBS3_DIR variable") + endif() + find_package(CURL REQUIRED) + find_package(Protobuf REQUIRED) + include_directories(${Protobuf_INCLUDE_DIRS}) + include_directories(${CMAKE_CURRENT_BINARY_DIR}) + protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS objectstripe.proto) + target_link_libraries(threaded_io_test PRIVATE ${Protobuf_LIBRARIES}) + target_sources(threaded_io_test PRIVATE + S3Outputer.cc + S3Source.cc + S3Common.cc + ${PROTO_SRCS} + ) + target_include_directories(threaded_io_test PRIVATE ${LIBS3_DIR}/include) + target_link_directories(threaded_io_test PRIVATE ${LIBS3_DIR}/lib) + target_link_libraries(threaded_io_test PRIVATE s3 curl) + # add_test(NAME S3OutputerEmptyTest COMMAND threaded_io_test EmptySource 1 1 0 10 S3Outputer) +endif() diff --git a/ConfigurationParameters.cc b/ConfigurationParameters.cc index 995ecfc..999bc0d 100644 --- a/ConfigurationParameters.cc +++ b/ConfigurationParameters.cc @@ -21,6 +21,11 @@ namespace cce::tf { return std::stof(iValue); } + template<> + unsigned long ConfigurationParameters::convert(std::string const& iValue) { + return std::stoul(iValue); + } + template<> bool ConfigurationParameters::convert(std::string const& iValue) { return (iValue.empty()) or ( diff --git a/README.md b/README.md index 2f78ec9..a52bbfc 100644 --- a/README.md +++ b/README.md @@ -148,6 +148,17 @@ This is similar to SharedRootEventSource except this time each entry in the `Eve > threaded_io_test -s SharedRootBatchEventsSource=test.eroot -t 1 -n 10 ``` +#### S3Source +Reads individual data product _stripes_--compressed concatenated serialized data products--from an S3 server, along with the appropriate +event-level metadata to index them. See s3io.md for further details of the data layout. + +- verbose (int): increase number to get more detail +- prefix (string): the object prefix in the S3 bucket +- conn (string): path to the S3 connection configuration file +``` +> threaded_io_test -s S3Source=prefix=testproducts:conn=s3conn.ini -t 1 -n 10 +``` + ### Outputers #### DummyOutputer @@ -311,6 +322,19 @@ or > threaded_io_test -s ReplicatedRootSource=test.root -t 1 -n 10 -o RootBatchEventsOutputer=test.root:batchSize=4 ``` +#### S3Outputer +Outputs individual data product _stripes_--compressed concatenated serialized data products--to an S3 server, along with the appropriate +event-level metadata to index them. See s3io.md for further details of the data layout. + +- verbose (int): increase number to get more detail +- prefix (string): the object prefix to use when storing data in the S3 bucket +- productFlush (int): the minimum number of (possibly compressed) bytes to accumulate in the product stripe output buffer before flushing it to S3 +- eventFlush (int): the maximum number of events that can be contained in a single product stripe. +- conn (string): path to the S3 connection configuration file +``` +> threaded_io_test -s TestProductsSource -t 1 -n 10 -o S3Outputer=prefix=testproducts:conn=s3conn.ini +``` + ### Waiters #### ScaleWaiter diff --git a/S3Common.cc b/S3Common.cc new file mode 100644 index 0000000..7ae5dc2 --- /dev/null +++ b/S3Common.cc @@ -0,0 +1,401 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "libs3.h" +#include +#include "tbb/task_arena.h" +#include "tbb/concurrent_queue.h" +#include "S3Common.h" + + +namespace { +using namespace cce::tf; + +class S3RequestWrapper { + public: + S3RequestWrapper(std::shared_ptr iReq, const S3BucketContext* iCtx, tbb::task_handle&& iCallback): + req{std::move(iReq)}, bucketCtx{iCtx}, callback{std::move(iCallback)} + { + arena = std::make_unique(tbb::task_arena::attach{}); + backoffTimeout = req->timeout.count(); + submit_after = std::chrono::steady_clock::now(); + }; + + void done() { + arena->enqueue(std::move(callback)); + }; + + std::shared_ptr req; + const S3BucketContext* bucketCtx; + tbb::task_handle callback; + std::unique_ptr arena; + size_t put_offset{0}; + int retries_executed{0}; + long backoffTimeout; + std::chrono::steady_clock::time_point submit_after; + static_assert(std::chrono::steady_clock::duration() <= std::chrono::milliseconds(1)); +}; + +class S3LibWrapper { + public: + static S3LibWrapper& instance() { + static S3LibWrapper instance; + return instance; + } + S3LibWrapper(const S3LibWrapper&) = delete; + void operator=(const S3LibWrapper&) = delete; + + bool running() const { return running_; } + + void submit(S3RequestWrapper* req) { + requests_.push(req); + } + + private: + S3LibWrapper() : running_(false) { + initStatus_ = S3_initialize("s3", S3_INIT_ALL, ""); + if ( initStatus_ != S3StatusOK ) { + std::cerr << "Failed to initialize libs3, error: " << S3_get_status_name(initStatus_) << "\n"; + return; + } + running_ = true; + loop_ = std::thread(&S3LibWrapper::loop_body, this); + } + + ~S3LibWrapper() { + running_ = false; + if ( loop_.joinable() ) loop_.join(); + S3_deinitialize(); + } + + void loop_body() { + S3RequestContext * ctx; + fd_set read_fds, write_fds, except_fds; + int max_fd, activeRequests{0}; + int topfds{0}, topreq{0}; + S3_create_request_context(&ctx); + // For now we do not enable peer verification because CURL is loading the CA bundle per connection https://github.com/curl/curl/pull/9620 + // S3_set_request_context_verify_peer(ctx, 1); + // auto status = curl_easy_setopt(curl, CURLOPT_CAPATH, "/etc/grid-security/certificates"); + // if ( status != CURLE_OK ) throw std::runtime_error("curle fail"); + std::vector to_defer; + while(running_) { + FD_ZERO(&read_fds); + FD_ZERO(&write_fds); + FD_ZERO(&except_fds); + + switch (S3_get_request_context_fdsets(ctx, &read_fds, &write_fds, &except_fds, &max_fd)) { + case S3StatusOK: + break; + case S3StatusInternalError: + throw std::runtime_error("internal error in S3_get_request_context_fdsets"); + } + + topfds = std::max(topfds, max_fd); + + if ( max_fd != -1 ) { + int64_t timeout = std::min(100l, S3_get_request_context_timeout(ctx)); // milliseconds + assert(timeout >= 0); + struct timeval tv { timeout / 1000, (timeout % 1000) * 1000 }; + select(max_fd+1, &read_fds, &write_fds, &except_fds, &tv); + } + + switch (S3_runonce_request_context(ctx, &activeRequests)) { + case S3StatusOK: + break; + case S3StatusConnectionFailed: + throw std::runtime_error("failed to connect in S3_runonce_request_context"); + case S3StatusServerFailedVerification: + throw std::runtime_error("SSL verification failure in S3_runonce_request_context"); + case S3StatusInternalError: + throw std::runtime_error("internal error in S3_runonce_request_context"); + case S3StatusOutOfMemory: + throw std::runtime_error("out of memory while processing S3_runonce_request_context"); + } + topreq = std::max(topreq, activeRequests); + + S3RequestWrapper* req; + int currentlyActive{activeRequests}; + while ( + (activeRequests < asyncRequestLimit_) + and activeRequests < (currentlyActive+asyncAddRequestLimit_) + and requests_.try_pop(req) // test this last! + ) { + if ( req->submit_after <= std::chrono::steady_clock::now() ) { + _submit(req, ctx); + activeRequests++; + } else { + to_defer.push_back(req); + } + } + for (auto req : to_defer) { + requests_.push(req); + } + to_defer.clear(); + + if ( activeRequests == 0 ) { + // TODO: would be better to use a semaphore (submit() and ~S3LibWrapper need to notify) + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } + // TODO: this may abort requests in flight, do we wait or is it synchronous? + S3_destroy_request_context(ctx); + std::cout << "S3LibWrapper: max open file descriptors: " << topfds << ", max concurrent requests: " << topreq << std::endl; + } + + void _submit(S3RequestWrapper* req, S3RequestContext* ctx) const { + assert(ctx != nullptr); + switch ( req->req->type ) { + case S3Request::Type::undef: + assert(false); // logic error + break; + case S3Request::Type::get: + S3_get_object( + req->bucketCtx, + req->req->key.c_str(), + nullptr, // S3GetConditions + 0, // startByte + 0, // byteCount + ctx, + req->backoffTimeout, + &S3LibWrapper::getObjectHandler, + static_cast(req)); + break; + case S3Request::Type::put: + S3_put_object( + req->bucketCtx, + req->req->key.c_str(), + req->req->buffer.size(), + nullptr, // S3PutProperties (TODO probably want .md5) + ctx, + req->backoffTimeout, + &S3LibWrapper::putObjectHandler, + static_cast(req)); + break; + } + } + + static S3Status responsePropertiesCallback(const S3ResponseProperties *properties, void *callbackData) { + auto req = static_cast(callbackData); + if ( req->req->type == S3Request::Type::get ) { + if ( properties->contentLength > 0 ) { + req->req->buffer.reserve(properties->contentLength); + } + // else what? + // TODO: save headers? + } + return S3StatusOK; + // perhaps S3StatusAbortedByCallback + } + + static void responseCompleteCallback(S3Status status, const S3ErrorDetails *error, void *callbackData) { + auto req = static_cast(callbackData); + auto now = std::chrono::steady_clock::now(); + if ( S3_status_is_retryable(status) && req->retries_executed < req->req->retries ) { + // e.g. S3StatusErrorRequestTimeout or ErrorSlowDown + // Run backoff algo, https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ + static thread_local std::minstd_rand rng(std::hash{}(std::this_thread::get_id())); + std::uniform_int_distribution dist(0l, std::min(S3Request::max_timeout.count(), req->backoffTimeout)); + const auto dt = std::chrono::milliseconds(dist(rng)); + std::cerr << "Got status " << S3_get_status_name(status) << " while running request " + << *(req->req) << ", will retry in " << dt.count() << "ms\n"; + req->submit_after = now + dt; + req->put_offset = 0; + req->retries_executed++; + req->backoffTimeout *= 2; + instance().requests_.push(req); + return; // no delete! + } + switch ( status ) { + case S3StatusOK: + req->req->status = S3Request::Status::ok; + std::cerr << ((req->req->type == S3Request::Type::get) ? "get: " : "put: ") + + std::to_string(std::chrono::duration_cast(now - req->submit_after).count()) + + " " + std::to_string(req->req->buffer.size()) + + " " + std::to_string(std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1)) + + "\n"; + break; + default: + std::cerr << "Got status " << S3_get_status_name(status) << " at end request " << *(req->req) << "\n"; + req->req->status = S3Request::Status::error; + } + req->done(); + // end of S3RequestWrapper lifecycle + delete req; + } + + static int putObjectDataCallback(int bufferSize, char *buffer, void *callbackData) { + auto req = static_cast(callbackData); + int toWrite = std::min(bufferSize, (int) (req->req->buffer.size() - req->put_offset)); + assert(toWrite >= 0); + if ( toWrite > 0 ) { + std::copy_n(req->req->buffer.begin() + req->put_offset, toWrite, buffer); + req->put_offset += toWrite; + } + // return > 0 = bytes written, 0 = done, -1 = S3StatusAbortedByCallback + return toWrite; + } + + static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void *callbackData) { + auto req = static_cast(callbackData); + auto offset = req->req->buffer.size(); + req->req->buffer.resize(offset + bufferSize); // out of memory exception? + std::copy_n(buffer, bufferSize, req->req->buffer.begin() + offset); + return S3StatusOK; // can also return S3StatusAbortedByCallback + } + + constexpr static S3ResponseHandler responseHandler{ + &S3LibWrapper::responsePropertiesCallback, + &S3LibWrapper::responseCompleteCallback + }; + + constexpr static S3PutObjectHandler putObjectHandler{ + responseHandler, + &S3LibWrapper::putObjectDataCallback + }; + + constexpr static S3GetObjectHandler getObjectHandler{ + responseHandler, + &S3LibWrapper::getObjectDataCallback + }; + + private: + S3Status initStatus_; + int asyncRequestLimit_{32}; // no more than FD_SETSIZE (1024) + int asyncAddRequestLimit_{64}; + std::thread loop_; + std::atomic running_; + // all callbackData pointers are to S3RequestWrapper objects + tbb::concurrent_queue requests_; +}; + +} // anon namespace + +namespace cce::tf { + +std::ostream& operator<<(std::ostream& os, const S3Request& req) { + os << "S3Request("; + switch (req.type) { + case S3Request::Type::undef: + os << "undef"; break; + case S3Request::Type::get: + os << "get"; break; + case S3Request::Type::put: + os << "put"; break; + } + os << ", key=" << req.key << ", timeout=" << req.timeout.count() << "ms, retries=" << req.retries; + os << ", buffer length=" << req.buffer.size() << ", "; + switch (req.status) { + case S3Request::Status::waiting: + os << "waiting"; break; + case S3Request::Status::ok: + os << "ok"; break; + case S3Request::Status::error: + os << "error"; break; + } + os << ")"; + return os; +} + +S3ConnectionRef S3Connection::from_config(const std::string& filename) { + std::ifstream fin(filename); + if (not fin.is_open()) { + std::cerr << "S3Connection config file " << filename << " could not be opened\n"; + return {}; + } + std::string hostName; + std::string bucketName; + std::string accessKeyId; + std::string secretAccessKey; + std::string securityToken; + for (std::string line; std::getline(fin, line); ) { + if ( line.empty() || line[0] == '#' ) continue; + auto delim = line.find("="); + auto key = line.substr(0, delim); + auto val = line.substr(delim+1, line.length() - 1); + if ( key == "hostName" ) hostName = val; + else if ( key == "bucketName" ) bucketName = val; + else if ( key == "accessKeyId" ) accessKeyId = val; + else if ( key == "secretAccessKey" ) secretAccessKey = val; + else if ( key == "securityToken" ) securityToken = val; + else { + std::cerr << "unrecognized config file key " << key << " in S3Connection config " << filename << "\n"; + } + } + + if ( hostName.empty() || bucketName.empty() || accessKeyId.empty() || secretAccessKey.empty() ) { + std::cerr << "S3Connection config file missing required keys\n"; + return {}; + } + + if ( not S3LibWrapper::instance().running() ) { + return {}; + } + + S3Status status = S3_validate_bucket_name(bucketName.c_str(), S3UriStyleVirtualHost); + if ( status != S3StatusOK ) { + std::cerr << "S3 bucket name invalid: " << bucketName << "\n"; + return {}; + } + + return std::make_shared(hostName, bucketName, accessKeyId, secretAccessKey, securityToken); +}; + +S3Connection::S3Connection( + std::string_view iHostName, + std::string_view iBucketName, + std::string_view iAccessKey, + std::string_view iSecretKey, + std::string_view iSecurityToken + ) : + hostName_(iHostName), + bucketName_(iBucketName), + accessKeyId_(iAccessKey), + secretAccessKey_(iSecretKey), + securityToken_(iSecurityToken), + blockingTime_{0} +{ + if ( hostName_ == "devnull") { + // magic do-nothing connection + return; + } + ctx_.reset(new S3BucketContext{ + .hostName = hostName_.c_str(), + .bucketName = bucketName_.c_str(), + .protocol = S3ProtocolHTTPS, + .uriStyle = S3UriStylePath, + .accessKeyId = accessKeyId_.c_str(), + .secretAccessKey = secretAccessKey_.c_str(), + .securityToken = securityToken_.empty() ? nullptr : securityToken_.c_str(), + .authRegion = nullptr + }); +}; + +void S3Connection::submit(std::shared_ptr req, TaskHolder&& callback) const { + auto start = std::chrono::high_resolution_clock::now(); + if ( ctx_ ) { + auto task_handle = callback.group()->defer([cb=std::move(callback)](){}); + // start of S3RequestWrapper lifecycle (ends in S3LibWrapper::responseCompleteCallback) + auto wrapper = new S3RequestWrapper(std::move(req), ctx_.get(), std::move(task_handle)); + S3LibWrapper::instance().submit(wrapper); + } else { + if ( req->type == S3Request::Type::put ) { + req->status = S3Request::Status::ok; + } else { + req->status = S3Request::Status::error; + } + callback.doneWaiting(); + } + auto time = std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - start); + blockingTime_ += time.count(); +}; + +} diff --git a/S3Common.h b/S3Common.h new file mode 100644 index 0000000..febf701 --- /dev/null +++ b/S3Common.h @@ -0,0 +1,66 @@ +#if !defined(S3Common_h) +#define S3Common_h +#include +#include +#include +#include + +#include "TaskHolder.h" + +// libs3.h +struct S3BucketContext; + +namespace cce::tf { +class S3Connection; +typedef std::shared_ptr S3ConnectionRef; + +class S3Request { + public: + enum class Type {undef, get, put}; + enum class Status {waiting, ok, error}; + static constexpr std::chrono::milliseconds default_timeout{5000}; + static constexpr std::chrono::milliseconds max_timeout{60000}; + + S3Request() = delete; + S3Request(Type iType, const std::string& iKey, std::chrono::milliseconds iTimeout=default_timeout, int iRetries=5): + type{iType}, key{iKey}, timeout{iTimeout}, retries{iRetries} {}; + + const Type type; + const std::string key; + const std::chrono::milliseconds timeout; + const int retries; + std::string buffer; + Status status{Status::waiting}; + + friend std::ostream& operator<<(std::ostream& os, const S3Request& req); +}; + +class S3Connection { + public: + static S3ConnectionRef from_config(const std::string& filename); + + S3Connection( + std::string_view iHostName, + std::string_view iBucketName, + std::string_view iAccessKey, + std::string_view iSecretKey, + std::string_view iSecurityToken + ); + + void submit(std::shared_ptr req, TaskHolder&& callback) const; + std::chrono::microseconds blockingTime() const { return std::chrono::microseconds(blockingTime_.load()); } + + private: + const std::string hostName_; + const std::string bucketName_; + const std::string accessKeyId_; + const std::string secretAccessKey_; + const std::string securityToken_; + // holds pointers to c_str() of the above + std::unique_ptr ctx_; + + mutable std::atomic blockingTime_; +}; + +} +#endif diff --git a/S3Outputer.cc b/S3Outputer.cc new file mode 100644 index 0000000..1a8cce2 --- /dev/null +++ b/S3Outputer.cc @@ -0,0 +1,495 @@ +#include +#include "S3Outputer.h" +#include "OutputerFactory.h" +#include "UnrolledSerializerWrapper.h" +#include "FunctorTask.h" + +#if ZSTD_VERSION_NUMBER < (1*100*100 + 3*100) +#error("zstd is too old") +#endif + +using namespace cce::tf; + +namespace { +size_t zstd_compress(ZSTD_CCtx* ctx, const std::string_view blob, std::string& out, bool flush) { + size_t tail{out.size()}; + if ( out.capacity() < ZSTD_CStreamOutSize() ) out.resize(ZSTD_CStreamOutSize()); + else out.resize(out.capacity()); + ZSTD_outBuffer_s obuf{.dst=out.data(), .size=out.size(), .pos=tail}; + + size_t status; + if ( flush ) { + ZSTD_inBuffer_s ibuf{.src=nullptr, .size=0, .pos=0}; + do { + if ( obuf.pos == obuf.size ) { + size_t new_size = (obuf.size * 3) / 2; + out.resize(new_size); + obuf.dst = out.data(); + obuf.size = new_size; + } + status = ZSTD_compressStream2(ctx, &obuf, &ibuf, ZSTD_e_end); + if ( ZSTD_isError(status) ) { + std::cerr <<"ERROR in compression " << ZSTD_getErrorName(status) << std::endl; + } + } while ( status != 0 ); + } else { + ZSTD_inBuffer_s ibuf{.src=blob.data(), .size=blob.size(), .pos=0}; + while ( ibuf.pos < ibuf.size ) { + status = ZSTD_compressStream2(ctx, &obuf, &ibuf, ZSTD_e_continue); + if ( ZSTD_isError(status) ) { + std::cerr <<"ERROR in compression " << ZSTD_getErrorName(status) << std::endl; + } + if ( obuf.pos == obuf.size ) { + size_t new_size = obuf.size * 2; + out.resize(new_size); + obuf.dst = out.data(); + obuf.size = new_size; + } + } + } + out.resize(obuf.pos); + // we are supposed to get a hint from ZSTD of the bytes left in internal buffers of CCtx + // but it doesn't appear to be nonzero + return status; +} + +lzma_ret lzma_init(lzma_stream* strm, uint32_t level) { + lzma_options_lzma opt_lzma2; + lzma_filter filters[] = { + { .id = LZMA_FILTER_LZMA2, .options = &opt_lzma2 }, + { .id = LZMA_VLI_UNKNOWN, .options = NULL }, + }; + lzma_lzma_preset(&opt_lzma2, level); + // TODO: pass through target stripe size for better choice of dict size? + // ROOT choice: input size / 4 + opt_lzma2.dict_size = std::max(LZMA_DICT_SIZE_MIN, 32768u); + return lzma_stream_encoder(strm, filters, LZMA_CHECK_CRC32); +} + +size_t lzma_compress(lzma_stream* strm, const std::string_view blob, std::string& out, bool flush) { + size_t tail{out.size()}; + if ( out.capacity() < BUFSIZ ) out.resize(BUFSIZ); + else out.resize(out.capacity()); + + lzma_action action = LZMA_RUN; + strm->next_out = (uint8_t*) out.data() + tail; + strm->avail_out = out.size() - tail; + if ( flush ) { + action = LZMA_FINISH; + strm->next_in = NULL; + strm->avail_in = 0; + } else { + strm->next_in = (const uint8_t*) blob.data(); + strm->avail_in = blob.size(); + } + + while ( (strm->avail_in > 0) || flush ) { + lzma_ret ret = lzma_code(strm, action); + if ( ret == LZMA_STREAM_END ) break; + else if ( strm->avail_out == 0 ) { + size_t old_size = out.size(); + size_t new_size = (old_size * 3) / 2; + out.resize(new_size); + strm->next_out = (uint8_t*) out.data() + old_size; + strm->avail_out = new_size - old_size; + } + else if (ret != LZMA_OK) { + std::cerr << "ERROR in lzma compression " << ret << std::endl; + break; + } + } + + out.resize(out.size() - strm->avail_out); + return 0; +} +} // anonymous namespace + +StreamCompressor::StreamCompressor(const objstripe::Compression& setting): + setting_{setting} +{ + switch ( setting_.type() ) { + case objstripe::CompressionType::kNone: + break; + case objstripe::CompressionType::kZSTD: + zstd_.reset(ZSTD_createCStream()); + ZSTD_CCtx_setParameter(zstd_.get(), ZSTD_c_compressionLevel, setting_.level()); + break; + case objstripe::CompressionType::kLZMA: + lzma_.reset((lzma_stream*) malloc(sizeof(lzma_stream))); + memset(lzma_.get(), 0, sizeof(lzma_stream)); + lzma_ret ret = ::lzma_init(lzma_.get(), setting_.level()); + if (ret != LZMA_OK) { throw std::runtime_error("Could not initialize LZMA encoder: " + std::to_string(ret)); } + break; + } +} + +size_t StreamCompressor::write(const std::string_view blob, std::string& out) { + switch ( setting_.type() ) { + case objstripe::CompressionType::kNone: + out.append(blob); + return 0; + case objstripe::CompressionType::kZSTD: + return ::zstd_compress(zstd_.get(), blob, out, false); + case objstripe::CompressionType::kLZMA: + return ::lzma_compress(lzma_.get(), blob, out, false); + default: + assert(false); + return 0; + } +} + +void StreamCompressor::flush(std::string& out) { + switch ( setting_.type() ) { + case objstripe::CompressionType::kNone: + return; + case objstripe::CompressionType::kZSTD: + ::zstd_compress(zstd_.get(), {}, out, true); + return; + case objstripe::CompressionType::kLZMA: + ::lzma_compress(lzma_.get(), {}, out, true); + // unlike zstd, lzma must be (TODO: true?) reset after each finish + if ( + ::lzma_init(lzma_.get(), setting_.level()) != LZMA_OK + ) { throw std::runtime_error("Could not initialize LZMA encoder"); } + return; + default: + assert(false); + return; + } +} + +void S3Outputer::setupForLane(unsigned int iLaneIndex, std::vector const& iDPs) { + auto& s = serializers_[iLaneIndex]; + switch(index_.serializestrategy()) { + case objstripe::SerializeStrategy::kRoot: + s = SerializeStrategy::make>(); + break; + case objstripe::SerializeStrategy::kRootUnrolled: + s = SerializeStrategy::make>(); + break; + default: + throw std::runtime_error("S3Outputer: unrecognized serialization strategy"); + } + s.reserve(iDPs.size()); + for(auto const& dp: iDPs) { + s.emplace_back(dp.name(), dp.classType()); + } + if (buffers_.size() == 0) { + buffers_.reserve(iDPs.size()); + index_.mutable_products()->Reserve(iDPs.size()); + for(auto const& ss: s) { + auto* prod = index_.add_products(); + prod->set_productname(std::string(ss.name())); + prod->set_producttype(ss.className()); + prod->set_flushsize(0); + prod->set_flushminbytes(productBufferFlushMinBytes_); + // TODO: choose compression setting based on properties of ss? + buffers_.emplace_back(objPrefix_ + "/" + prod->productname(), prod, defaultCompression_); + } + } + // all lanes see same products? if not we'll need a map + assert(buffers_.size() == iDPs.size()); +} + +void S3Outputer::productReadyAsync(unsigned int iLaneIndex, DataProductRetriever const& iDataProduct, TaskHolder iCallback) const { + assert(iLaneIndex < serializers_.size()); + auto& laneSerializers = serializers_[iLaneIndex]; + auto group = iCallback.group(); + assert(iDataProduct.index() < laneSerializers.size() ); + laneSerializers[iDataProduct.index()].doWorkAsync(*group, iDataProduct.address(), std::move(iCallback)); +} + +void S3Outputer::outputAsync(unsigned int iLaneIndex, EventIdentifier const& iEventID, TaskHolder iCallback) const { + auto start = std::chrono::high_resolution_clock::now(); + auto group = iCallback.group(); + collateQueue_.push(*group, [this, iEventID, iLaneIndex, callback=std::move(iCallback)]() mutable { + collateProducts(iEventID, serializers_[iLaneIndex], std::move(callback)); + }); + auto time = std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - start); + parallelTime_ += time.count(); +} + +void S3Outputer::printSummary() const { + { + // drain all buffers + { + TaskHolder finalTask(*tails_group_, make_functor_task([this, task=tails_group_->defer([](){})]() mutable { tails_group_->run(std::move(task)); })); + SmallBuffers smallbuffers = std::make_shared(); + auto productsDoneCallback = makeProductsDoneCallback(finalTask, smallbuffers, true); + for(auto& buf : buffers_) { + buf.appendQueue_.push(*tails_group_, [this, &buf, cb=productsDoneCallback, smallbuffers]() mutable { + appendProductBuffer(buf, {}, std::move(cb), true, smallbuffers); + }); + } + } + tails_group_->wait(); + } + + if(verbose_ >= 2) { + summarize_serializers(serializers_); + } + std::chrono::microseconds serializerTime = std::chrono::microseconds::zero(); + for(const auto& lane : serializers_) { + for(const auto& s : lane) { + serializerTime += s.accumulatedTime(); + } + } + std::chrono::microseconds appendTime = std::chrono::microseconds::zero(); + for(const auto& buf : buffers_) { + appendTime += buf.appendTime_; + } + std::cout <<"S3Outputer\n" + " total serial collate time at end event: "<blockingTime().count()<<"us\n"; +} + +void S3Outputer::collateProducts( + EventIdentifier const& iEventID, + SerializeStrategy const& iSerializers, + TaskHolder iCallback + ) const +{ + auto start = std::chrono::high_resolution_clock::now(); + auto sev = currentEventStripe_.add_events(); + sev->set_offset(eventGlobalOffset_++); + sev->set_run(iEventID.run); + sev->set_lumi(iEventID.lumi); + sev->set_event(iEventID.event); + if (verbose_ >= 2) { std::cout << sev->DebugString(); } + + SmallBuffers smallbuffers = std::make_shared(); + // pass a copy of iCallback + auto productsDoneCallback = makeProductsDoneCallback(iCallback, smallbuffers, false); + + auto buf = std::begin(buffers_); + for (const auto& s : iSerializers) { + std::string blob(s.blob().data(), s.blob().size()); + buf->appendQueue_.push(*productsDoneCallback.group(), [this, buf, blob=std::move(blob), cb=productsDoneCallback, smallbuffers]() mutable { + appendProductBuffer(*buf, std::move(blob), std::move(cb), false, smallbuffers); + }); + buf++; + } + collateTime_ += std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - start); +} + +TaskHolder S3Outputer::makeProductsDoneCallback(TaskHolder iCallback, SmallBuffers smallbuffers, bool last) const { + using namespace std::string_literals; + if ( (last and currentEventStripe_.events_size() > 0) or (currentEventStripe_.events_size() == eventFlushSize_) ) { + objstripe::EventStripe stripeOut; + stripeOut.mutable_events()->Reserve(eventFlushSize_); + std::swap(currentEventStripe_, stripeOut); + std::cerr << "flush " + std::to_string(numFireAndForgetCollates_) + + " " + std::to_string(std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1)) + + "\n"; + assert(last xor (iCallback.group() != tails_group_.get())); + auto nextCallback = ( (numFireAndForgetCollates_ < maxFireAndForgetCollates_) ) ? + numFireAndForgetCollates_++, + TaskHolder( + *iCallback.group(), + make_functor_task([this, task=tails_group_->defer([](){})]() mutable {numFireAndForgetCollates_--; tails_group_->run(std::move(task));}) + ) : std::move(iCallback); + return TaskHolder(*nextCallback.group(), make_functor_task( + [this, stripeOut=std::move(stripeOut), callback=std::move(nextCallback), smallbuffers=std::move(smallbuffers), last]() mutable { + if(verbose_ >= 2) { std::cout << "reached event flush size "s + std::to_string(stripeOut.events_size()) + ", flushing\n"; } + // merge buffers by greedy algorithm + std::sort(smallbuffers->begin(), smallbuffers->end(), [](const auto &a, const auto &b){ return a.second.content().size() > b.second.content().size(); }); + size_t iGroup{0}; + auto it = smallbuffers->begin(); + objstripe::ProductGroupStripe gout; + while ( it != smallbuffers->end() ) { + size_t nbytes{0}; + auto* group = stripeOut.add_groups(); + while ( (nbytes < productBufferFlushMinBytes_) and (it != smallbuffers->end()) ) { + nbytes += it->second.content().size(); + auto* name = group->add_names(); + std::swap(*name, it->first); + auto* prod = gout.add_products(); + std::swap(*prod, it->second); + it++; + } + auto req = std::make_shared(S3Request::Type::put, objPrefix_ + "/group" + std::to_string(iGroup) + "/" + std::to_string(stripeOut.events(0).offset())); + gout.SerializeToString(&req->buffer); + auto putDoneTask = TaskHolder(*callback.group(), make_functor_task([req, callback]() { + if ( req->status != S3Request::Status::ok ) { + std::cerr << "failed to write product buffer " << *req << std::endl; + } + })); + conn_->submit(std::move(req), std::move(putDoneTask)); + iGroup++; + gout.clear_products(); + } + flushQueue_.push(*callback.group(), [this, stripeOut=std::move(stripeOut), callback=std::move(callback), last]() { + flushEventStripe(stripeOut, std::move(callback), last); + }); + } + )); + } + return iCallback; +} + + +void S3Outputer::appendProductBuffer( + ProductOutputBuffer& buf, + std::string&& blob, + TaskHolder iCallback, + bool last, + SmallBuffers smallbuffers + ) const +{ + using namespace std::string_literals; + auto start = std::chrono::high_resolution_clock::now(); + + size_t pendingbytes{0}; + if ( not last ) { + buf.stripe_.add_counts(blob.size()); + pendingbytes = buf.compressor_.write(blob, *buf.stripe_.mutable_content()); + } + const size_t bufferNevents = buf.stripe_.counts_size(); + size_t bufferNbytes = buf.stripe_.content().size(); + if ( pendingbytes > 0 ) { + std::cout << "product buffer for "s + std::string(buf.info_->productname()) + + " put " + std::to_string(blob.size()) + " bytes in" + " and has "s + std::to_string(bufferNbytes) + " bytes out" + " and "s + std::to_string(pendingbytes) + " bytes pending\n"; + } + + // first flush when we exceed min size and have an even divisor of eventFlushSize_ + // subsequent flush when we reach productFlushSize + // flush if last call and we have something to write + // for buffers that never get big enough, flush when we reach eventFlushSize_ but to a merge queue + if ( + ( + (buf.info_->flushsize() == 0) + && (bufferNbytes > buf.info_->flushminbytes()) + && (eventFlushSize_ % bufferNevents == 0) + ) + || (bufferNevents == buf.info_->flushsize()) + || (last && bufferNevents > 0) + || (bufferNevents == eventFlushSize_) + ) + { + buf.compressor_.flush(*buf.stripe_.mutable_content()); + bufferNbytes = buf.stripe_.content().size(); + if(verbose_ >= 2) { + std::cout << "product buffer for "s + std::string(buf.info_->productname()) + + " is full ("s + std::to_string(bufferNbytes) + + " bytes, "s + std::to_string(bufferNevents) + " events), flushing\n"; + } + + std::string name = buf.prefix_ + "/" + std::to_string(buf.stripe_.globaloffset()); + if ( (last or (bufferNevents == eventFlushSize_)) && (bufferNbytes <= buf.info_->flushminbytes()) ) { + // too small buffer, put it on a merge queue + objstripe::ProductStripe out; + out.mutable_content()->reserve(bufferNbytes); + out.mutable_compression()->CopyFrom(buf.stripe_.compression()); + out.set_globaloffset(buf.stripe_.globaloffset()); + assert(out.content().size() == 0); + assert(buf.stripe_.content().size() == bufferNbytes); + std::swap(out, buf.stripe_); + assert(out.content().size() == bufferNbytes); + assert(buf.stripe_.content().size() == 0); + auto it = smallbuffers->emplace_back(name, std::move(out)); + assert(it->second.content().size() == bufferNbytes); + // leave iCallback alive til end of function + } else { + auto req = std::make_shared(S3Request::Type::put, name); + buf.stripe_.SerializeToString(&req->buffer); + auto putDoneTask = TaskHolder(*iCallback.group(), make_functor_task([req, callback=std::move(iCallback)]() { + if ( req->status != S3Request::Status::ok ) { + std::cerr << "failed to write product buffer " << *req << std::endl; + } + })); + conn_->submit(std::move(req), std::move(putDoneTask)); + } + + buf.stripe_.clear_counts(); + buf.stripe_.clear_content(); + buf.stripe_.set_globaloffset(buf.stripe_.globaloffset() + bufferNevents); + if ( buf.info_->flushsize() == 0 ) { + // only modification to info_, done inside serial appendQueue_ + buf.info_->set_flushsize(bufferNevents); + } + } + buf.appendTime_ += std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - start); +} + +void S3Outputer::flushEventStripe(const objstripe::EventStripe& stripe, TaskHolder iCallback, bool last) const { + if ( last and stripe.events_size() == 0 ) { + return; + } + auto start = std::chrono::high_resolution_clock::now(); + index_.set_totalevents(index_.totalevents() + stripe.events_size()); + { + auto dest = index_.add_packedeventstripes(); + std::string buf; + stripe.SerializeToString(&buf); + eventStripeCompressor_.write(buf, *dest); + eventStripeCompressor_.flush(*dest); + index_.add_eventstripesizes(buf.size()); + if ( verbose_ >= 2 ) { + std::cout << "length of packed EventStripe: " << dest->size() << "\n"; + } + } + + // TODO: checkpoint only every few event stripes? + auto req = std::make_shared(S3Request::Type::put, "index/" + objPrefix_); + index_.SerializeToString(&req->buffer); + auto putDoneTask = TaskHolder(*iCallback.group(), make_functor_task([req, callback=std::move(iCallback)]() { + if ( req->status != S3Request::Status::ok ) { + std::cerr << "failed to write product buffer index" << std::endl; + // TODO: if several failures, maybe exit? + } + })); + conn_->submit(std::move(req), std::move(putDoneTask)); + flushTime_ += std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - start); +} + +namespace { +class Maker : public OutputerMakerBase { + public: + Maker(): OutputerMakerBase("S3Outputer") {} + std::unique_ptr create(unsigned int iNLanes, ConfigurationParameters const& params) const final { + auto verbose = params.get("verbose", 0); + auto objPrefix = params.get("prefix"); + if(not objPrefix) { + std::cerr << "no object prefix given for S3Outputer\n"; + return {}; + } + auto productFlush = params.get("productFlush", 1024*128); + auto eventFlush = params.get("eventFlush", 144); + auto connfile = params.get("conn"); + if(not connfile) { + std::cerr <<"no connection configuration file name given for S3Outputer\n"; + return {}; + } + auto conn = S3Connection::from_config(connfile.value()); + if(not conn) { + return {}; + } + auto cType = objstripe::CompressionType::kZSTD; + uint32_t cLevel = 4; + auto cTypeStr = params.get("compression"); + if(cTypeStr) { + if ( cTypeStr.value() == "ZSTD" ) { + cType = objstripe::CompressionType::kZSTD; + } + else if ( cTypeStr.value() == "LZMA" ) { + cType = objstripe::CompressionType::kLZMA; + cLevel = 9; + } + else { + std::cerr << "Unrecognized compression type: " << cTypeStr.value() << "\n"; + return {}; + } + } + + return std::make_unique(iNLanes, objPrefix.value(), verbose, productFlush, eventFlush, conn, cType, cLevel); + } +}; + +Maker s_maker; +} diff --git a/S3Outputer.h b/S3Outputer.h new file mode 100644 index 0000000..427d4bb --- /dev/null +++ b/S3Outputer.h @@ -0,0 +1,139 @@ +#if !defined(S3Outputer_h) +#define S3Outputer_h + +#include +#include +#include +#include + +#include "zstd.h" +#include "lzma.h" + +#define TBB_PREVIEW_TASK_GROUP_EXTENSIONS 1 // for task_group::defer +#include "tbb/task_group.h" +#include "tbb/concurrent_vector.h" + +#include "OutputerBase.h" +#include "EventIdentifier.h" +#include "SerializeStrategy.h" +#include "DataProductRetriever.h" +#include "summarize_serializers.h" +#include "SerialTaskQueue.h" +#include "S3Common.h" +#include "objectstripe.pb.h" + +namespace cce::tf { + +class StreamCompressor { + public: + StreamCompressor() {}; + StreamCompressor(const objstripe::Compression& setting); + const objstripe::Compression& getCompression() const { return setting_; } + size_t write(const std::string_view blob, std::string& out); + void flush(std::string& out); + + private: + objstripe::Compression setting_; + + struct ZSTDDeleter { void operator()(ZSTD_CStream* s) const {ZSTD_freeCStream(s);} }; + std::unique_ptr zstd_; + struct LZMADeleter { void operator()(lzma_stream* s) const {lzma_end(s); free(s);} }; + std::unique_ptr lzma_; +}; + +class S3Outputer : public OutputerBase { + public: + S3Outputer(unsigned int iNLanes, std::string objPrefix, int iVerbose, size_t iProductBufferFlush, size_t iEventFlushSize, S3ConnectionRef conn, objstripe::CompressionType cType, uint32_t cLevel): + serializers_(iNLanes), + objPrefix_(objPrefix), + verbose_(iVerbose), + productBufferFlushMinBytes_(iProductBufferFlush), + eventFlushSize_(iEventFlushSize), + conn_(std::move(conn)), + collateTime_{std::chrono::microseconds::zero()}, + flushTime_{std::chrono::microseconds::zero()}, + parallelTime_{0} + { + index_.set_eventstripesize(eventFlushSize_); + currentEventStripe_.mutable_events()->Reserve(eventFlushSize_); + + // TODO: make configurable + index_.set_serializestrategy(objstripe::SerializeStrategy::kRoot); + defaultCompression_.set_type(cType); + defaultCompression_.set_level(cLevel); + index_.mutable_eventstripecompression()->CopyFrom(defaultCompression_); + eventStripeCompressor_ = StreamCompressor(index_.eventstripecompression()); + + tails_group_ = std::make_unique(); + } + + void setupForLane(unsigned int iLaneIndex, std::vector const& iDPs) final; + bool usesProductReadyAsync() const final {return true;} + void productReadyAsync(unsigned int iLaneIndex, DataProductRetriever const& iDataProduct, TaskHolder iCallback) const final; + void outputAsync(unsigned int iLaneIndex, EventIdentifier const& iEventID, TaskHolder iCallback) const final; + void printSummary() const final; + +private: + struct ProductOutputBuffer { + ProductOutputBuffer(const std::string& prefix, objstripe::ProductInfo* info, const objstripe::Compression& comp) : + prefix_{prefix}, info_{info}, compressor_{comp} { + stripe_.set_content(""); + stripe_.mutable_compression()->CopyFrom(compressor_.getCompression()); + }; + + const std::string prefix_; + objstripe::ProductInfo* info_; // owned by index_ + StreamCompressor compressor_; + objstripe::ProductStripe stripe_{}; + SerialTaskQueue appendQueue_{}; + std::chrono::microseconds appendTime_{0}; + }; + // product buffer name, bytes + typedef std::shared_ptr>> SmallBuffers; + + // Plan: + // productReadyAsync() is threadsafe because serializers_ is one per lane + // outputAsync puts collateProducts() in collateQueue_ + // collateProducts() appends a new objstripe::Event to currentEventStripe_ and if time to flush + // it creates a TaskHolder that appends flushEventStripe() to flushQueue_ + // then collate() calls appendProductBuffer() with the above TaskHolder as callback (or original callback) + // printSummary() takes care of the tails by setting last=true in the calls + void collateProducts(EventIdentifier const& iEventID, SerializeStrategy const& iSerializers, TaskHolder iCallback) const; + void appendProductBuffer(ProductOutputBuffer& buf, std::string&& blob, TaskHolder iCallback, bool last, SmallBuffers smallbuffers) const; + TaskHolder makeProductsDoneCallback(TaskHolder iCallback, SmallBuffers smallbuffers, bool last) const; + void flushEventStripe(const objstripe::EventStripe& stripe, TaskHolder iCallback, bool last=false) const; + + // configuration options + const int verbose_; + const std::string objPrefix_; + const size_t productBufferFlushMinBytes_; + const size_t eventFlushSize_; + S3ConnectionRef conn_; + objstripe::Compression defaultCompression_{}; + + // only modified by productReadyAsync() + mutable std::vector serializers_; + + // only modified in collateProducts() + mutable SerialTaskQueue collateQueue_; + mutable size_t eventGlobalOffset_{0}; + mutable objstripe::EventStripe currentEventStripe_{}; + mutable std::chrono::microseconds collateTime_; + constexpr static unsigned int maxFireAndForgetCollates_{4}; + mutable std::atomic numFireAndForgetCollates_{0}; + std::unique_ptr tails_group_; // for fire-and-forget and last flush + + // only modified in appendProductBuffer() + mutable std::vector buffers_; + + // only modified in flushEventStripe() + // (for index_'s ProductInfos, appendProductBuffer() has finished before we access) + mutable SerialTaskQueue flushQueue_; + mutable objstripe::ObjectStripeIndex index_; + mutable StreamCompressor eventStripeCompressor_; + mutable std::chrono::microseconds flushTime_; + + mutable std::atomic parallelTime_; +}; +} +#endif diff --git a/S3Source.cc b/S3Source.cc new file mode 100644 index 0000000..b45f946 --- /dev/null +++ b/S3Source.cc @@ -0,0 +1,448 @@ +#include +#include "zstd.h" +#include "lzma.h" +#include "S3Source.h" +#include "SourceFactory.h" +#include "Deserializer.h" +#include "UnrolledDeserializer.h" +#include "FunctorTask.h" + +using namespace cce::tf; + +namespace { +struct ZSTD_ContextHolder { + ZSTD_ContextHolder() { ctx = ZSTD_createDCtx(); } + ~ZSTD_ContextHolder() { ZSTD_freeDCtx(ctx); } + ZSTD_DCtx* ctx; +}; + +size_t zstd_perthread_decompress(void* dst, size_t dstCapacity, const void* src, size_t compressedSize) { + static thread_local ZSTD_ContextHolder holder{}; + return ZSTD_decompressDCtx(holder.ctx, dst, dstCapacity, src, compressedSize); +} + +void zstd_decompress(const std::string& blob, std::string& out, size_t dSize) { + out.resize(dSize); + size_t status = ZSTD_decompress(out.data(), out.size(), blob.data(), blob.size()); + // size_t status = zstd_perthread_decompress(out.data(), out.size(), blob.data(), blob.size()); + if ( ZSTD_isError(status) ) { + std::cerr <<"ERROR in decompression " << ZSTD_getErrorName(status) << std::endl; + } + if (status < dSize) { + std::cerr <<"ERROR in decompression, expected " << dSize << " bytes but only got " << status << std::endl; + } +} + +// /cvmfs/cms.cern.ch/slc7_amd64_gcc10/external/xz/5.2.5-d6fed2038c4e8d6e04531d1adba59f37 +void lzma_decompress(const std::string& blob, std::string& out, size_t dSize) { + lzma_stream strm = LZMA_STREAM_INIT; + lzma_ret ret = lzma_stream_decoder(&strm, UINT64_MAX, 0); + if (ret != LZMA_OK) { throw std::runtime_error("Could not initialize LZMA encoder"); } + + out.resize(dSize); + strm.next_in = (const uint8_t*) blob.data(); + strm.avail_in = blob.size(); + strm.next_out = (uint8_t*) out.data(); + strm.avail_out = out.size(); + while ( strm.avail_in > 0 ) { + ret = lzma_code(&strm, LZMA_RUN); + if ( ret == LZMA_STREAM_END ) break; + else if (ret != LZMA_OK) { + std::cerr << "ERROR in lzma compression " << ret << std::endl; + break; + } + } + if ( strm.avail_out > 0 ) { + std::cerr <<"ERROR in decompression, expected " << dSize << " bytes but only got " << dSize - strm.avail_out << std::endl; + } +} + +void decompress_stripe(const objstripe::Compression& setting, const std::string& blob, std::string& out, size_t dSize) { + switch ( setting.type() ) { + case objstripe::CompressionType::kNone: + out = blob; + break; + case objstripe::CompressionType::kZSTD: + ::zstd_decompress(blob, out, dSize); + break; + case objstripe::CompressionType::kLZMA: + ::lzma_decompress(blob, out, dSize); + break; + } +} + +void parse_productstripe(objstripe::ProductStripe& stripe, std::vector& offsets, std::string& content) { + offsets.reserve(stripe.counts_size() + 1); + size_t nbytes{0}; + offsets.push_back(nbytes); + for (const auto& c : stripe.counts()) { + nbytes += c; + offsets.push_back(nbytes); + } + assert(offsets.size() == stripe.counts_size() + 1); + decompress_stripe(stripe.compression(), stripe.content(), content, nbytes); + assert(nbytes == content.size()); + stripe.clear_content(); +} +} + +WaitableFetch::~WaitableFetch() {} + +void WaitableFetch::fetch(TaskHolder&& callback) { + auto this_state{State::unretrieved}; + if ( state_.compare_exchange_strong(this_state, State::retrieving) ) { + auto req = std::make_shared(S3Request::Type::get, name_); + auto group = callback.group(); + waiters_.add(std::move(callback)); + auto getDoneTask = TaskHolder(*group, make_functor_task([this, req]() { + if ( req->status == S3Request::Status::ok ) { + auto start = std::chrono::high_resolution_clock::now(); + parse(req->buffer); + parseTime_ = std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - start); + state_ = State::retrieved; + waiters_.doneWaiting(); + } else { + // TODO: possible that prefetch is for a stripe that is in a group in the next event batch + // throw std::runtime_error("Could not retrieve key " + name_); + std::cerr << "Could not retrieve key " + name_ + "\n"; + } + })); + conn_->submit(std::move(req), std::move(getDoneTask)); + } else if (this_state == State::retrieved ) { + return; + } else { + waiters_.add(std::move(callback)); + } +} + +std::string_view WaitableFetchProductStripe::bufferAt(size_t groupIdx, size_t iOffset) const { + assert(state_ == State::retrieved); + assert(groupIdx == 0); + if ( iOffset >= offsets_.size() - 1 ) { + std::cerr << name_ << " at " << groupIdx << ", " << iOffset << std::endl; + } + assert(iOffset < offsets_.size() - 1); + size_t bstart = offsets_[iOffset]; + size_t bstop = offsets_[iOffset+1]; + assert(bstop > bstart); + assert(bstop <= content_.size()); + return {&content_[bstart], bstop - bstart}; +} + +void WaitableFetchProductStripe::parse(const std::string& buffer) { + if ( not data_.ParseFromString(buffer) ) { + throw std::runtime_error("Could not deserialize key " + name_); + } + ::parse_productstripe(data_, offsets_, content_); +} + +std::string_view WaitableFetchProductGroupStripe::bufferAt(size_t groupIdx, size_t iOffset) const { + assert(state_ == State::retrieved); + assert(groupIdx < offsets_.size()); + assert(iOffset < offsets_[groupIdx].size() - 1); + size_t bstart = offsets_[groupIdx][iOffset]; + size_t bstop = offsets_[groupIdx][iOffset+1]; + assert(bstop > bstart); + assert(bstop <= content_[groupIdx].size()); + return {&content_[groupIdx][bstart], bstop - bstart}; +} + +void WaitableFetchProductGroupStripe::parse(const std::string& buffer) { + if ( not data_.ParseFromString(buffer) ) { + throw std::runtime_error("Could not deserialize key " + name_); + } + offsets_.resize(data_.products_size()); + content_.resize(data_.products_size()); + for(size_t i=0; i< data_.products_size(); ++i) { + ::parse_productstripe(*data_.mutable_products(i), offsets_[i], content_[i]); + } +} + +void DelayedProductStripeRetriever::fetch(TaskHolder&& callback) const { + fetcher_->fetch(std::move(callback)); +} + +std::string_view DelayedProductStripeRetriever::bufferAt(size_t globalEventIndex) const { + assert(globalOffset_ <= globalEventIndex); + size_t iOffset = globalEventIndex - globalOffset_; + assert(iOffset < flushSize_); + return fetcher_->bufferAt(groupIdx_, iOffset); +} + +ProductStripeGenerator::ProductStripeGenerator(const S3ConnectionRef& conn, const std::string& prefix, unsigned int flushSize, size_t globalIndexStart, size_t globalIndexEnd) : + conn_(conn), prefix_(prefix), flushSize_(flushSize), globalIndexStart_(globalIndexStart), globalIndexEnd_(globalIndexEnd) +{ + auto indexThis = globalIndexStart - (globalIndexStart % flushSize_); + auto indexNext = indexThis + flushSize_; + currentStripe_ = std::make_shared(conn_, prefix_ + "/" + std::to_string(indexThis), indexThis, flushSize_); + nextStripe_ = std::make_shared(conn_, prefix_ + "/" + std::to_string(indexNext), indexNext, flushSize_); + prefetch_group_ = std::make_unique(); +} + +std::shared_ptr +ProductStripeGenerator::stripeFor(size_t globalEventIndex) { + assert(globalEventIndex >= globalIndexStart_ and globalEventIndex < globalIndexEnd_); + assert(globalEventIndex >= currentStripe_->globalOffset()); + assert(globalEventIndex <= nextStripe_->globalOffset()); + if ( globalEventIndex == nextStripe_->globalOffset() ) { + auto indexNext = globalEventIndex + flushSize_; + auto new_ps = std::make_shared(conn_, prefix_ + "/" + std::to_string(indexNext), indexNext, flushSize_); + // record decompress time of old stripe + decompressTime_ += currentStripe_->decompressTime(); + // shuffle new_ps -> nextStripe_ -> currentStripe_ + currentStripe_ = nextStripe_; + nextStripe_ = new_ps; + } + assert(globalEventIndex >= currentStripe_->globalOffset()); + assert(globalEventIndex < nextStripe_->globalOffset()); + if ( + currentStripe_->wasFetched() + and ~nextStripe_->wasFetched() + and (globalEventIndex % flushSize_ >= flushSize_ / 2) + and (globalEventIndex + flushSize_ < globalIndexEnd_) + and false + ) + { + // somewhere in the middle of current stripe, prefetch next + nextStripe_->fetch(TaskHolder(*prefetch_group_, make_functor_task([](){}))); + } + return currentStripe_; +} + +S3DelayedRetriever::S3DelayedRetriever(objstripe::ObjectStripeIndex const& index, DeserializeStrategy strategy): + deserializers_{std::move(strategy)} +{ + dataProducts_.reserve(index.products_size()); + deserializers_.reserve(index.products_size()); + dataBuffers_.resize(index.products_size(), nullptr); + stripes_.resize(index.products_size()); + size_t i{0}; + for(auto const& pi : index.products()) { + TClass* cls = TClass::GetClass(pi.producttype().c_str()); + if ( cls == nullptr ) { + throw std::runtime_error("No TClass reflection available for " + pi.productname()); + } + dataBuffers_[i] = cls->New(); + dataProducts_.emplace_back(i, &dataBuffers_[i], pi.productname(), cls, this); + deserializers_.emplace_back(cls); + ++i; + } +} + +S3DelayedRetriever::~S3DelayedRetriever() { + auto it = dataProducts_.begin(); + for(void * b: dataBuffers_) { + it->classType()->Destructor(b); + ++it; + } +} + +void S3DelayedRetriever::getAsync(DataProductRetriever& product, int index, TaskHolder callback) { + assert(&product == &dataProducts_[index]); + assert(product.address() == &dataBuffers_[index]); + assert(stripes_[index]); + TaskHolder fetchCallback(*callback.group(), make_functor_task( + [this, index, callback=std::move(callback)]() mutable { + auto start = std::chrono::high_resolution_clock::now(); + auto buf = stripes_[index]->bufferAt(globalEventIndex_); + auto readSize = deserializers_[index].deserialize(buf.data(), buf.size(), *dataProducts_[index].address()); + if ( readSize != buf.size() ) { + throw std::runtime_error( + "Read fail for event " + std::to_string(globalEventIndex_) + " product " + std::to_string(index) + " (" + dataProducts_[index].name() + ")" + ); + } + assert(readSize == buf.size()); + dataProducts_[index].setSize(readSize); + deserializeTime_ += std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - start); + } + )); + stripes_[index]->fetch(std::move(fetchCallback)); +} + +S3Source::S3Source(unsigned int iNLanes, std::string iObjPrefix, int iVerbose, unsigned long long iNEvents, const S3ConnectionRef& conn): + SharedSourceBase(iNEvents), + objPrefix_(std::move(iObjPrefix)), + verbose_(iVerbose), + conn_(conn), + readTime_{std::chrono::microseconds::zero()} +{ + auto start = std::chrono::high_resolution_clock::now(); + + { + tbb::task_group group; + auto req = std::make_shared(S3Request::Type::get, "index/" + objPrefix_); + auto getDoneTask = TaskHolder(group, make_functor_task([this, req]() { + if ( req->status == S3Request::Status::ok ) { + if ( not index_.ParseFromString(req->buffer) ) { + throw std::runtime_error("Could not deserialize index in S3Source construction"); + } + } + else { throw std::runtime_error("Could not retrieve index in S3Source construction"); } + })); + conn_->submit(std::move(req), std::move(getDoneTask)); + group.wait(); + } + + if ( verbose_ >= 3 ) { + std::cout << index_.DebugString() << "\n"; + } + + if ( index_.totalevents() < iNEvents ) { + std::cout << "WARNING: less events in source than requested: " + << index_.totalevents() << " vs. " << iNEvents << ". Will read all available events instead.\n"; + } + + productRetrievers_.reserve(index_.products_size()); + for(const auto& productInfo : index_.products()) { + productRetrievers_.emplace_back(conn_, objPrefix_ + "/" + productInfo.productname(), productInfo.flushsize(), 0ul, index_.totalevents()); + } + + laneRetrievers_.reserve(iNLanes); + for(unsigned int i = 0; i< iNLanes; ++i) { + DeserializeStrategy strategy; + switch(index_.serializestrategy()) { + case objstripe::SerializeStrategy::kRoot: + strategy = DeserializeStrategy::make>(); + break; + case objstripe::SerializeStrategy::kRootUnrolled: + strategy = DeserializeStrategy::make>(); + break; + } + laneRetrievers_.emplace_back(index_, std::move(strategy)); + } + + readTime_ += std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - start); +} + +size_t S3Source::numberOfDataProducts() const { + return index_.products_size(); +} + +std::vector& S3Source::dataProducts(unsigned int iLane, long iEventIndex) { + return laneRetrievers_[iLane].dataProducts(); +} + +EventIdentifier S3Source::eventIdentifier(unsigned int iLane, long iEventIndex) { + return laneRetrievers_[iLane].event(); +} + +void S3Source::readEventAsync(unsigned int iLane, long iEventIndex, OptionalTaskHolder iTask) { + queue_.push(*iTask.group(), [iLane, optTask = std::move(iTask), this]() mutable { + auto start = std::chrono::high_resolution_clock::now(); + if( + (nextEventStripe_ < index_.packedeventstripes_size()) + or (nextEventInStripe_ < currentEventStripe_.events_size()) + ) + { + // default-constructed currentEventStripe_ will have size zero, so 0, 0 will load first stripe + if(nextEventInStripe_ == currentEventStripe_.events_size()) { + // Need to read ahead + const auto& stripeData = index_.packedeventstripes(nextEventStripe_); + if ( index_.has_eventstripecompression() ) { + auto dsize = index_.eventstripesizes(nextEventStripe_); + std::string decompressedStripe; + decompressedStripe.resize(dsize); + ::decompress_stripe(index_.eventstripecompression(), stripeData, decompressedStripe, dsize); + currentEventStripe_.ParseFromString(decompressedStripe); + } else { + currentEventStripe_.ParseFromString(stripeData); + } + nextEventStripe_++; + nextEventInStripe_ = 0; + + productGroupMap_.clear(); + size_t eventStripeStart = currentEventStripe_.events(0).offset(); + assert(eventStripeStart == (nextEventStripe_-1)* index_.eventstripesize()); + for (size_t iGroup=0; iGroup < currentEventStripe_.groups_size(); ++iGroup) { + const auto& group = currentEventStripe_.groups(iGroup); + auto fetcher = std::make_shared( + conn_, objPrefix_ + "/group" + std::to_string(iGroup) + "/" + std::to_string(eventStripeStart) + ); + if ( verbose_ >= 2 ) std::cout << "creating fetcher for group " + objPrefix_ + "/group" + std::to_string(iGroup) + "/" + std::to_string(eventStripeStart) + "\n"; + for (size_t groupIdx=0; groupIdx < group.names_size(); groupIdx++) { + productGroupMap_[group.names(groupIdx)] = std::make_shared(fetcher, groupIdx, eventStripeStart, index_.eventstripesize()); + } + } + } + size_t eventStripeStart = currentEventStripe_.events(0).offset(); + const auto event = currentEventStripe_.events(nextEventInStripe_); + if ( verbose_ >= 1 ) std::cout << event.DebugString() << "\n"; + auto& retriever = laneRetrievers_[iLane]; + size_t globalEventIndex = event.offset(); + + for (size_t i=0; i < productRetrievers_.size(); ++i) { + auto itgroup = productGroupMap_.find(objPrefix_ + "/" + index_.products(i).productname() + "/" + std::to_string(eventStripeStart)); + auto pstripe = productRetrievers_[i].stripeFor(globalEventIndex); + if ( itgroup != productGroupMap_.end() ) { + if ( verbose_ >= 2) std::cout << "using group for product " + index_.products(i).productname() + "/" + std::to_string(eventStripeStart) + "\n"; + retriever.setStripe(i, itgroup->second); + } else { + if (verbose_ >= 2) std::cout << "using individual stripe for product " + index_.products(i).productname() + "\n"; + retriever.setStripe(i, std::move(pstripe)); + } + } + + retriever.setEvent(globalEventIndex, {event.run(), event.lumi(), event.event()}); + optTask.releaseToTaskHolder(); + ++nextEventInStripe_; + } + readTime_ +=std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - start); + }); +} + +void S3Source::printSummary() const { + std::cout <<"\nSource:\n" + " serial read time: "<blockingTime().count()<<"us\n"; +}; + +std::chrono::microseconds S3Source::serialReadTime() const { + return readTime_; +} + +std::chrono::microseconds S3Source::decompressTime() const { + auto time = std::chrono::microseconds::zero(); + for(auto const& p : productRetrievers_) { + time += p.decompressTime(); + } + return time; +} + +std::chrono::microseconds S3Source::deserializeTime() const { + auto time = std::chrono::microseconds::zero(); + for(auto const& l : laneRetrievers_) { + time += l.deserializeTime(); + } + return time; +} + + +namespace { +class Maker : public SourceMakerBase { + public: + Maker(): SourceMakerBase("S3Source") {} + std::unique_ptr create(unsigned int iNLanes, unsigned long long iNEvents, ConfigurationParameters const& params) const final { + auto verbose = params.get("verbose", 0); + auto objPrefix = params.get("prefix"); + if(not objPrefix) { + std::cerr << "no object prefix given for S3Outputer\n"; + return {}; + } + auto connfile = params.get("conn"); + if(not connfile) { + std::cerr <<"no connection configuration file name given for S3Source\n"; + return {}; + } + auto conn = S3Connection::from_config(connfile.value()); + if(not conn) { + return {}; + } + + return std::make_unique(iNLanes, objPrefix.value(), verbose, iNEvents, conn); + } + }; + +Maker s_maker; +} diff --git a/S3Source.h b/S3Source.h new file mode 100644 index 0000000..84544f7 --- /dev/null +++ b/S3Source.h @@ -0,0 +1,174 @@ +#if !defined(S3Source_h) +#define S3Source_h + +#include +#include +#include +#include +#include + +#include "S3Common.h" +#include "SharedSourceBase.h" +#include "DataProductRetriever.h" +#include "DelayedProductRetriever.h" +#include "SerialTaskQueue.h" +#include "WaitingTaskList.h" +#include "DeserializeStrategy.h" +#include "objectstripe.pb.h" + + +namespace cce::tf { + +class WaitableFetch { + public: + WaitableFetch(const S3ConnectionRef& conn, const std::string& name): + conn_(conn), name_(name), state_{State::unretrieved} {}; + virtual ~WaitableFetch(); + void fetch(TaskHolder&& callback); + bool wasFetched() const { return state_ != State::unretrieved; }; + virtual std::string_view bufferAt(size_t groupIdx, size_t iOffset) const = 0; + std::chrono::microseconds parseTime() const { return parseTime_; }; + + protected: + const std::string name_; + enum class State {unretrieved, retrieving, retrieved}; + std::atomic state_; + + private: + virtual void parse(const std::string& buffer) = 0; + WaitingTaskList waiters_{}; + const S3ConnectionRef conn_; + std::chrono::microseconds parseTime_{0}; +}; + +class WaitableFetchProductStripe : public WaitableFetch { + public: + using WaitableFetch::WaitableFetch; + std::string_view bufferAt(size_t groupIdx, size_t iOffset) const override; + private: + void parse(const std::string& buffer) override; + objstripe::ProductStripe data_; + std::vector offsets_{}; + std::string content_{}; +}; + +class WaitableFetchProductGroupStripe : public WaitableFetch { + public: + using WaitableFetch::WaitableFetch; + std::string_view bufferAt(size_t groupIdx, size_t iOffset) const override; + private: + void parse(const std::string& buffer) override; + objstripe::ProductGroupStripe data_; + std::vector> offsets_{}; + std::vector content_{}; +}; + +class DelayedProductStripeRetriever { + public: + // Note: for ProductStripes not in a ProductGroupStripe, groupIdx is ignored + DelayedProductStripeRetriever(const std::shared_ptr& fetcher, size_t groupIdx, size_t globalOffset, size_t flushSize): + fetcher_(fetcher), groupIdx_(groupIdx), globalOffset_(globalOffset), flushSize_(flushSize) {}; + DelayedProductStripeRetriever(const S3ConnectionRef& conn, const std::string& name, size_t globalOffset, size_t flushSize): + fetcher_(std::make_shared(conn, name)), groupIdx_(0), globalOffset_(globalOffset), flushSize_(flushSize) {}; + void fetch(TaskHolder&& callback) const; + std::string_view bufferAt(size_t globalEventIndex) const; + size_t globalOffset() const { return globalOffset_; }; + bool wasFetched() const { return fetcher_->wasFetched(); }; + std::chrono::microseconds decompressTime() const { return fetcher_->parseTime(); } + + private: + const size_t groupIdx_; + const size_t globalOffset_; + const size_t flushSize_; + std::shared_ptr fetcher_; +}; + +class ProductStripeGenerator { + public: + ProductStripeGenerator(const S3ConnectionRef& conn, const std::string& prefix, unsigned int flushSize, size_t globalIndexStart, size_t globalIndexEnd); + std::shared_ptr stripeFor(size_t globalEventIndex); + std::chrono::microseconds decompressTime() const { return decompressTime_; }; + + private: + const S3ConnectionRef conn_; + const std::string prefix_; + const unsigned int flushSize_; + const size_t globalIndexStart_; + const size_t globalIndexEnd_; + std::shared_ptr currentStripe_; + std::shared_ptr nextStripe_; + std::chrono::microseconds decompressTime_{0}; + std::unique_ptr prefetch_group_; +}; + +class S3DelayedRetriever : public DelayedProductRetriever { + public: + S3DelayedRetriever(objstripe::ObjectStripeIndex const&, DeserializeStrategy); + ~S3DelayedRetriever(); + + S3DelayedRetriever(S3DelayedRetriever&&) = default; + S3DelayedRetriever(S3DelayedRetriever const&) = delete; + S3DelayedRetriever& operator=(S3DelayedRetriever&&) = default; + S3DelayedRetriever& operator=(S3DelayedRetriever const&) = delete; + + EventIdentifier event() const { return eventID_; } + void setEvent(size_t globalEventIndex, EventIdentifier&& ev) { globalEventIndex_ = globalEventIndex; eventID_ = ev; } + + std::chrono::microseconds deserializeTime() const { return deserializeTime_; } + + void setStripe(size_t index, const std::shared_ptr& ptr) { stripes_[index] = ptr; } + + std::vector& dataProducts() { return dataProducts_; } + + void getAsync(DataProductRetriever& product, int index, TaskHolder callback) final; + + private: + size_t globalEventIndex_; + EventIdentifier eventID_; + std::vector dataProducts_; + std::vector dataBuffers_; + DeserializeStrategy deserializers_; + std::vector> stripes_; + std::chrono::microseconds deserializeTime_{0}; +}; + +class S3Source : public SharedSourceBase { + public: + S3Source(unsigned int iNLanes, std::string iObjPrefix, int iVerbose, unsigned long long iNEvents, const S3ConnectionRef& conn); + S3Source(S3Source&&) = delete; + S3Source(S3Source const&) = delete; + ~S3Source() = default; + + size_t numberOfDataProducts() const final; + std::vector& dataProducts(unsigned int iLane, long iEventIndex) final; + EventIdentifier eventIdentifier(unsigned int iLane, long iEventIndex) final; + + void printSummary() const final; + + private: + void readEventAsync(unsigned int iLane, long iEventIndex, OptionalTaskHolder) final; + + std::chrono::microseconds serialReadTime() const; + std::chrono::microseconds parallelReadTime() const; + std::chrono::microseconds decompressTime() const; + std::chrono::microseconds deserializeTime() const; + + const int verbose_; + const std::string objPrefix_; + const S3ConnectionRef conn_; + SerialTaskQueue queue_; + + objstripe::ObjectStripeIndex index_; + + // mutated only by methods called in queue_ + size_t nextEventStripe_ = 0; + size_t nextEventInStripe_ = 0; + objstripe::EventStripe currentEventStripe_; + std::map> productGroupMap_; + std::vector productRetrievers_; + std::vector laneRetrievers_; + std::chrono::microseconds readTime_; +}; +} + +#endif diff --git a/TaskHolder.h b/TaskHolder.h index cb815f4..1b94964 100644 --- a/TaskHolder.h +++ b/TaskHolder.h @@ -2,16 +2,19 @@ #define TaskHolder_h #include +#include #include "tbb/task_group.h" #include "TaskBase.h" namespace cce::tf { class TaskHolder { public: + friend class WaitingTaskList; + TaskHolder(): group_{nullptr}, task_{nullptr} {} - TaskHolder(tbb::task_group& iGroup, std::unique_ptr iTask): - group_{&iGroup}, task_{iTask.release()} { - //std::cout <<"new task "< iTask, bool track=false): + group_{&iGroup}, task_{iTask.release()}, track_{track} { + if ( track_ ) std::cout << "New holder for task " + std::to_string(reinterpret_cast(task_)) << std::endl; task_->increment_ref_count(); } @@ -23,14 +26,16 @@ class TaskHolder { TaskHolder( const TaskHolder& iOther): group_{iOther.group_}, - task_{iOther.task_} { - //std::cout <<"copy holder with task "<(task_)) << std::endl; if(task_) { task_->increment_ref_count(); } } TaskHolder(TaskHolder&& iOther): group_{iOther.group_}, - task_{iOther.task_} { - //std::cout <<"move holder with task "<(task_)) << std::endl; iOther.task_ = nullptr; } @@ -57,17 +62,25 @@ class TaskHolder { auto t = task_; task_ = nullptr; if(t->decrement_ref_count()) { - //std::cout <<"Task "<run([t]() { - t->execute(); - //std::cout <<"delete "<(t)) << std::endl; + group_->run([t, track=track_]() { + if ( track ) std::cout << "Running task " + std::to_string(reinterpret_cast(t)) << std::endl; + t->execute(); + if ( track ) std::cout << "Deleting task " + std::to_string(reinterpret_cast(t)) << std::endl; + delete t; + }); } } private: + TaskBase* release_no_decrement() { + auto t = task_; + task_ = nullptr; + return t; + } + tbb::task_group* group_; TaskBase* task_; + bool track_; }; } #endif diff --git a/WaitingTaskList.cc b/WaitingTaskList.cc new file mode 100644 index 0000000..4f1c5ff --- /dev/null +++ b/WaitingTaskList.cc @@ -0,0 +1,185 @@ +// -*- C++ -*- +// +// Package: Concurrency +// Class : WaitingTaskList +// +// Implementation: +// [Notes on implementation] +// +// Original Author: Chris Jones +// Created: Thu Feb 21 13:46:45 CST 2013 +// $Id$ +// + +// system include files + +// user include files +#include "oneapi/tbb/task.h" +#include +#include + +#include "WaitingTaskList.h" + +using namespace cce::tf; + +#if __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 2) +#define hardware_pause() asm("") +#endif +#if defined(__x86_64__) || defined(__i386__) +#undef hardware_pause +#define hardware_pause() asm("pause") +#endif + +WaitingTaskList::WaitingTaskList(unsigned int iInitialSize) + : m_head{nullptr}, + m_nodeCache{new WaitNode[iInitialSize]}, + m_nodeCacheSize{iInitialSize}, + m_lastAssignedCacheIndex{0}, + m_waiting{true} { + auto nodeCache = m_nodeCache.get(); + for (auto it = nodeCache, itEnd = nodeCache + m_nodeCacheSize; it != itEnd; ++it) { + it->m_fromCache = true; + } +} + +// +// member functions +// +void WaitingTaskList::reset() { + unsigned int nSeenTasks = m_lastAssignedCacheIndex; + m_lastAssignedCacheIndex = 0; + assert(m_head == nullptr); + if (nSeenTasks > m_nodeCacheSize) { + //need to expand so next time we don't have to do any + // memory requests + m_nodeCacheSize = nSeenTasks; + m_nodeCache = std::make_unique(nSeenTasks); + auto nodeCache = m_nodeCache.get(); + for (auto it = nodeCache, itEnd = nodeCache + m_nodeCacheSize; it != itEnd; ++it) { + it->m_fromCache = true; + } + } + //this will make sure all cores see the changes + m_waiting = true; +} + +WaitingTaskList::WaitNode* WaitingTaskList::createNode(oneapi::tbb::task_group* iGroup, TaskBase* iTask) { + unsigned int index = m_lastAssignedCacheIndex++; + + WaitNode* returnValue; + if (index < m_nodeCacheSize) { + returnValue = m_nodeCache.get() + index; + } else { + returnValue = new WaitNode; + returnValue->m_fromCache = false; + } + returnValue->m_task = iTask; + returnValue->m_group = iGroup; + //No other thread can see m_next yet. The caller to create node + // will be doing a synchronization operation anyway which will + // make sure m_task and m_next are synched across threads + returnValue->m_next.store(returnValue, std::memory_order_relaxed); + + return returnValue; +} + +void WaitingTaskList::add(TaskHolder iTask) { + if (m_waiting) { + auto task = iTask.release_no_decrement(); + WaitNode* newHead = createNode(iTask.group(), task); + //This exchange is sequentially consistent thereby + // ensuring ordering between it and setNextNode + WaitNode* oldHead = m_head.exchange(newHead); + newHead->setNextNode(oldHead); + + //For the case where oldHead != nullptr, + // even if 'm_waiting' changed, we don't + // have to recheck since we beat 'announce()' in + // the ordering of 'm_head.exchange' call so iTask + // is guaranteed to be in the link list + + if (nullptr == oldHead) { + newHead->setNextNode(nullptr); + if (!m_waiting) { + //if finished waiting right before we did the + // exchange our task will not be run. Also, + // additional threads may be calling add() and swapping + // heads and linking us to the new head. + // It is safe to call announce from multiple threads + announce(); + } + } + } +} + +void WaitingTaskList::add(oneapi::tbb::task_group* iGroup, TaskBase* iTask) { + iTask->increment_ref_count(); + if (!m_waiting) { + if (iTask->decrement_ref_count()) { + iGroup->run([iTask]() { + iTask->execute(); + delete iTask; + }); + } + } else { + WaitNode* newHead = createNode(iGroup, iTask); + //This exchange is sequentially consistent thereby + // ensuring ordering between it and setNextNode + WaitNode* oldHead = m_head.exchange(newHead); + newHead->setNextNode(oldHead); + + //For the case where oldHead != nullptr, + // even if 'm_waiting' changed, we don't + // have to recheck since we beat 'announce()' in + // the ordering of 'm_head.exchange' call so iTask + // is guaranteed to be in the link list + + if (nullptr == oldHead) { + if (!m_waiting) { + //if finished waiting right before we did the + // exchange our task will not be run. Also, + // additional threads may be calling add() and swapping + // heads and linking us to the new head. + // It is safe to call announce from multiple threads + announce(); + } + } + } +} + +void WaitingTaskList::announce() { + //Need a temporary storage since one of these tasks could + // cause the next event to start processing which would refill + // this waiting list after it has been reset + WaitNode* n = m_head.exchange(nullptr); + WaitNode* next; + while (n) { + //it is possible that 'WaitingTaskList::add' is running in a different + // thread and we have a new 'head' but the old head has not yet been + // attached to the new head (we identify this since 'nextNode' will return itself). + // In that case we have to wait until the link has been established before going on. + while (n == (next = n->nextNode())) { + hardware_pause(); + } + auto t = n->m_task; + auto g = n->m_group; + if (!n->m_fromCache) { + delete n; + } + n = next; + + //the task may indirectly call WaitingTaskList::reset + // so we need to call spawn after we are done using the node. + if (t->decrement_ref_count()) { + g->run([t]() { + t->execute(); + delete t; + }); + } + } +} + +void WaitingTaskList::doneWaiting() { + m_waiting = false; + announce(); +} diff --git a/WaitingTaskList.h b/WaitingTaskList.h new file mode 100644 index 0000000..ccadac6 --- /dev/null +++ b/WaitingTaskList.h @@ -0,0 +1,74 @@ +#if !defined(WaitingTaskList_h) +#define WaitingTaskList_h +// +// Original Author: Chris Jones +// Created: Thu Feb 21 13:46:31 CST 2013 +// $Id$ +// + +// system include files +#include + +// user include files +#include "TaskBase.h" +#include "TaskHolder.h" + +// forward declarations + +namespace cce::tf { + class WaitingTaskList { + public: + explicit WaitingTaskList(unsigned int iInitialSize = 2); + WaitingTaskList(const WaitingTaskList&) = delete; // stop default + const WaitingTaskList& operator=(const WaitingTaskList&) = delete; // stop default + ~WaitingTaskList() = default; + + void add(oneapi::tbb::task_group*, TaskBase*); + void add(TaskHolder); + + ///Signals that the resource is now available and tasks should be spawned + /**The owner of the resource calls this function to allow the waiting tasks to + * start accessing it. + * If the task fails, a non 'null' std::exception_ptr should be used. + * To have tasks wait again one must call reset(). + * Calls to add() and doneWaiting() can safely be done concurrently. + */ + void doneWaiting(); + + ///Resets access to the resource so that added tasks will wait. + /**The owner of the resouce calls reset() to make tasks wait. + * Calling reset() is NOT thread safe. The system must guarantee that no tasks are + * using the resource when reset() is called and neither add() nor doneWaiting() can + * be called concurrently with reset(). + */ + void reset(); + + private: + /**Handles running the tasks, + * safe to call from multiple threads + */ + void announce(); + + struct WaitNode { + TaskBase* m_task; + oneapi::tbb::task_group* m_group; + std::atomic m_next; + bool m_fromCache; + + void setNextNode(WaitNode* iNext) { m_next = iNext; } + + WaitNode* nextNode() const { return m_next; } + }; + + WaitNode* createNode(oneapi::tbb::task_group* iGroup, TaskBase* iTask); + + // ---------- member data -------------------------------- + std::atomic m_head; + std::unique_ptr m_nodeCache; + unsigned int m_nodeCacheSize; + std::atomic m_lastAssignedCacheIndex; + std::atomic m_waiting; + }; +} // namespace cce::tf + +#endif diff --git a/objectstripe.proto b/objectstripe.proto new file mode 100644 index 0000000..8b3861e --- /dev/null +++ b/objectstripe.proto @@ -0,0 +1,66 @@ +syntax = "proto2"; + +package objstripe; + +enum SerializeStrategy { + kRoot = 0; + kRootUnrolled = 1; +} + +enum CompressionType { + kNone = 0; + kZSTD = 1; + kLZMA = 2; +} + +message Compression { + optional CompressionType type = 1; + optional uint32 level = 2; + optional string dictionaryPath = 3; +} + +message ProductInfo { + optional string productName = 1; + optional string productType = 2; + optional uint32 flushSize = 3; + optional uint32 flushMinBytes = 4; +} + +message ObjectStripeIndex { + optional uint32 eventStripeSize = 1; + optional uint64 totalEvents = 2; + repeated ProductInfo products = 3; + repeated bytes packedEventStripes = 4; + optional SerializeStrategy serializeStrategy = 5; + optional Compression eventStripeCompression = 6; + repeated uint32 eventStripeSizes = 7 [packed = true]; +} + +message EventStripe { + message Event { + optional uint64 offset = 1; + optional uint32 run = 2; + optional uint32 lumi = 3; + optional uint64 event = 4; + } + + repeated Event events = 1; + // TODO: store product flushSize here? + + message ProductGroup { + repeated string names = 1; + } + + repeated ProductGroup groups = 2; +} + +message ProductStripe { + repeated uint32 counts = 1 [packed = true]; + optional bytes content = 2; + optional uint64 globalOffset = 3; + optional Compression compression = 4; +} + +message ProductGroupStripe { + repeated ProductStripe products = 2; +} diff --git a/s3conn_local.ini b/s3conn_local.ini new file mode 100644 index 0000000..e03ede0 --- /dev/null +++ b/s3conn_local.ini @@ -0,0 +1,5 @@ +hostName=localhost:9000 +bucketName=test +accessKeyId=minio +secretAccessKey=miniotestpass +#securityToken=blah diff --git a/s3io.md b/s3io.md new file mode 100644 index 0000000..81f7303 --- /dev/null +++ b/s3io.md @@ -0,0 +1,46 @@ +# S3 I/O components + +As part of the "Object Storage for CMS in the HL-LHC era" project, the +`S3Source` and `S3Outputer` provide an IO system that can write to S3 buckets, +with the main purpose to explore the performance and parallelization +capabilities of the Ceph RadosGW S3 service. + +## Building +To build with S3 support, you will need to download and install [libs3](https://github.com/bji/libs3): +```bash +mkdir -p external +git clone git@github.com:bji/libs3.git +cd libs3 +make DESTDIR=../external install +``` + +the rest of the dependencies can be sourced, e.g., from a recent CMSSW release: +```bash +source /cvmfs/cms.cern.ch/slc7_amd64_gcc10/external/cmake/3.18.2/etc/profile.d/init.sh +pushd /cvmfs/cms.cern.ch/slc7_amd64_gcc10/cms/cmssw/CMSSW_12_3_0_pre5/ +cmsenv +popd +git clone git@github.com:hep-cce2/root_serialization.git +cd root_serialization +mkdir build && cd build +cmake ../ \ + -DCMAKE_PREFIX_PATH="/cvmfs/cms.cern.ch/slc7_amd64_gcc10/external/lz4/1.9.2-373b1f6c80ba13e93f436c77aa63c026;/cvmfs/cms.cern.ch/slc7_amd64_gcc10/external/protobuf/3.15.1-b2ca6d3fa59916150b27c3d598c7c7ac;/cvmfs/cms.cern.ch/slc7_amd64_gcc10/external/xz/5.2.5-d6fed2038c4e8d6e04531d1adba59f37" \ + -Dzstd_DIR=/cvmfs/cms.cern.ch/slc7_amd64_gcc10/external/zstd/1.4.5-ec760e16a89e932fdc84f1fd3192f206/lib/cmake/zstd \ + -DTBB_DIR=/cvmfs/cms.cern.ch/slc7_amd64_gcc10/external/tbb/v2021.4.0-75e6d730601d8461f20893321f4f7660/lib/cmake/TBB \ + -DROOT_DIR=$ROOTSYS/cmake \ + -DLIBS3_DIR=$(realpath ../../external) \ + -DENABLE_HDF5=OFF -DENABLE_S3=ON +``` + +## Running with local server +The S3 connection settings are specified in an ini file. A local server can be +brought up using the `./s3localserver.sh` script, assuming singularity is +available at your site. Then use `conn=s3conn_local.ini` in the source/outputer +configuration. + +## Data layout +There are two types of binary data blobs written to the S3 service: +- An event index, unique per processing task, is stored at `index/{prefix}`. It contains an event number index, stored as a list of compressed _event stripes_, and is rewritten after each new event stripe is flushed. The frequency of this is controlled by the `eventFlush` parameter in S3Outputer. +- Several product _stripes_, stored at `{prefix}/{product_name}/{offset}` where the offset indexes the event stripe list. The number of product stripes written per event stripe depends on the size of the product and the `productFlush` parameter, as well as the requirement that the number of events worth of data products per stripe is divisible by `eventFlush`. + +The binary data format of the blobs is specified by the `objectstripe.proto` protobuf schema. diff --git a/s3localserver.sh b/s3localserver.sh new file mode 100755 index 0000000..adba9f7 --- /dev/null +++ b/s3localserver.sh @@ -0,0 +1,7 @@ +#!/bin/bash +mkdir -p data/test +singularity run \ + -B ${PWD}/data:/data \ + --env MINIO_ROOT_USER=minio \ + --env MINIO_ROOT_PASSWORD=miniotestpass \ + docker://quay.io/minio/minio server /data --console-address ":9001" diff --git a/threaded_io_test.cc b/threaded_io_test.cc index 0e13d74..e40ab61 100644 --- a/threaded_io_test.cc +++ b/threaded_io_test.cc @@ -10,6 +10,8 @@ #include "CLI11.hpp" +#define TBB_PREVIEW_TASK_GROUP_EXTENSIONS 1 // for task_group::defer + #include "outputerFactoryGenerator.h" #include "sourceFactoryGenerator.h" #include "waiterFactoryGenerator.h" @@ -133,6 +135,8 @@ int main(int argc, char* argv[]) { } std::cout <<"finished warmup"< waiter; @@ -150,8 +154,6 @@ int main(int argc, char* argv[]) { } std::atomic ievt{0}; - - tbb::task_arena arena(parallelism); decltype(std::chrono::high_resolution_clock::now()) start; auto pOut = out.get();