forked from Dr15Jones/root_serialization
-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathLane.cc
More file actions
80 lines (68 loc) · 3.42 KB
/
Lane.cc
File metadata and controls
80 lines (68 loc) · 3.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#include <utility>
#include <iostream>
#include <string>
#include "Lane.h"
#include "FunctorTask.h"
using namespace cce::tf;
Lane::Lane(unsigned int iIndex, SharedSourceBase* iSource, WaiterBase const* iWaiter): source_(iSource), waiter_(iWaiter), index_{iIndex} {
}
void Lane::processEventsAsync(std::atomic<long>& index, tbb::task_group& group, const OutputerBase& outputer,
TaskHolder finalTask) {
doNextEvent(index, group, outputer, std::move(finalTask));
}
TaskHolder Lane::makeWaiterTask(tbb::task_group& group, size_t index, TaskHolder holder) {
if(not waiter_) {
return holder;
} else {
return TaskHolder(group,
make_functor_task([index, holder, this]() {
waiter_->waitAsync(index_,
source_->eventIdentifier(index_, presentEventIndex_),
presentEventIndex_,
dataProducts(),index, std::move(holder));
}) );
}
}
TaskHolder Lane::makeTaskForDataProduct(tbb::task_group& group, size_t index, DataProductRetriever& iDP, OutputerBase const& outputer, TaskHolder holder) {
if(outputer.usesProductReadyAsync()) {
auto laneIndex = this->index_;
return makeWaiterTask(group, index,TaskHolder(group,
make_functor_task([holder, laneIndex, &iDP, &outputer]() {
outputer.productReadyAsync(laneIndex, iDP, std::move(holder));
})));
} else {
return makeWaiterTask(group, index, holder);
}
}
void Lane::processEventAsync(tbb::task_group& group, TaskHolder iCallback, const OutputerBase& outputer) {
//Process order: retrieve data product, do wait, serialize, do output, call iCallback
//std::cout <<"make process event task"<<std::endl;
TaskHolder holder(group,
make_functor_task([&outputer, this, callback=std::move(iCallback)]() {
outputer.outputAsync(this->index_, source_->eventIdentifier(index_, presentEventIndex_),
std::move(callback));
}));
//NOTE: I once replaced with with a tbb::parallel_for but that made the code slower and did not
// scale as well as the number of threads were increased.
size_t index=0;
for(auto& d: mutableDataProducts()) {
d.getAsync(makeTaskForDataProduct(group, index,d, outputer, holder));
++index;
}
}
void Lane::doNextEvent(std::atomic<long>& index, tbb::task_group& group, const OutputerBase& outputer, TaskHolder finalTask) {
using namespace std::string_literals;
presentEventIndex_ = index++;
if(source_->mayBeAbleToGoToEvent(presentEventIndex_)) {
if(verbose_) {
std::cout <<"event "+std::to_string(presentEventIndex_)+"\n"<<std::flush;
}
OptionalTaskHolder processEventTask(group, make_functor_task([this,&index, &group, &outputer, finalTask=std::move(finalTask)]() {
TaskHolder recursiveTask(group, make_functor_task([this, &index, &group, &outputer, finalTask=std::move(finalTask)]() {
doNextEvent(index, group, outputer, std::move(finalTask));
}));
processEventAsync(group, std::move(recursiveTask), outputer);
}) );
source_->gotoEventAsync(this->index_, presentEventIndex_, std::move(processEventTask));
}
}