forked from Dr15Jones/root_serialization
-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathSerialRNTupleTFileSource.cc
More file actions
125 lines (110 loc) · 4.44 KB
/
SerialRNTupleTFileSource.cc
File metadata and controls
125 lines (110 loc) · 4.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#include "SerialRNTupleTFileSource.h"
#include "SourceFactory.h"
#include "TFile.h"
#include <iostream>
using namespace cce::tf;
SerialRNTupleTFileSource::SerialRNTupleTFileSource(unsigned iNLanes, unsigned long long iNEvents, std::string const& iName, bool iDelayReading):
SharedSourceBase(iNEvents),
file_{TFile::Open(iName.c_str())},
events_{ROOT::RNTupleReader::Open(*file_->Get<ROOT::RNTuple>("Events"))},
accumulatedTime_{std::chrono::microseconds::zero()},
delayReading_{iDelayReading}
{
if(not delayReading_) {
promptReaders_.reserve(iNLanes);
} else {
delayedReaders_.reserve(iNLanes);
}
dataProductsPerLane_.reserve(iNLanes);
ptrToDataProducts_.resize(iNLanes);
entries_.reserve(iNLanes);
identifiers_.resize(iNLanes);
nEvents_ = events_->GetNEntries();
if (iNEvents < nEvents_ ) nEvents_ = iNEvents;
const std::string eventIDBranchName{"EventID"};
bool hasEventID = false;
bool hasEventAux = false;
auto const& model = events_->GetModel();
auto const& subfields = model.GetConstFieldZero().GetConstSubfields();
std::vector<std::string> fieldIDs;
fieldIDs.reserve(subfields.size());
std::vector<std::string> fieldType;
fieldType.reserve(subfields.size());
for(auto* field: subfields) {
if(eventIDBranchName == field->GetFieldName()) {
hasEventID = true;
continue;
}
fieldIDs.emplace_back(field->GetFieldName());
fieldType.emplace_back(field->GetTypeName());
}
for(int laneId=0; laneId < iNLanes; ++laneId) {
entries_.emplace_back(model.CreateEntry());
dataProductsPerLane_.emplace_back();
auto& dataProducts = dataProductsPerLane_.back();
dataProducts.reserve(fieldIDs.size());
DelayedProductRetriever* delayedReader = nullptr;
if(not delayReading_) {
promptReaders_.emplace_back();
delayedReader = &promptReaders_.back();
} else {
delayedReaders_.emplace_back(&queue_, *events_.get(), fieldIDs, &ptrToDataProducts_[laneId]);
delayedReader = &delayedReaders_.back();
}
auto& addressDataProducts = ptrToDataProducts_[laneId];
addressDataProducts.reserve(fieldIDs.size());
for(int i=0; i< fieldIDs.size(); ++i) {
TClass* class_ptr=TClass::GetClass(fieldType[i].c_str());
addressDataProducts.push_back(entries_.back()->GetPtr<void>(fieldIDs[i]).get());
dataProducts.emplace_back(i,
&addressDataProducts[i],
fieldIDs[i],
class_ptr,
delayedReader);
}
}
}
void SerialRNTupleTFileSource::readEventAsync(unsigned int iLane, long iEventIndex, OptionalTaskHolder iTask) {
if(nEvents_ > iEventIndex) {
auto temptask = iTask.releaseToTaskHolder();
auto group = temptask.group();
queue_.push(*group, [task=std::move(temptask), this, iLane, iEventIndex]() mutable {
auto start = std::chrono::high_resolution_clock::now();
if (delayReading_) {
identifiers_[iLane] = events_->GetView<cce::tf::EventIdentifier>("EventID")(iEventIndex);
delayedReaders_[iLane].setEventIndex(iEventIndex);
} else {
events_->LoadEntry(iEventIndex, *entries_[iLane]);
identifiers_[iLane] = *entries_[iLane]->GetPtr<cce::tf::EventIdentifier>("EventID");
}
accumulatedTime_ += std::chrono::duration_cast<decltype(accumulatedTime_)>(std::chrono::high_resolution_clock::now() - start);
task.doneWaiting();
});
}
}
std::chrono::microseconds SerialRNTupleTFileSource::accumulatedTime() const {
auto fullTime = accumulatedTime_;
for(auto& delayedReader: promptReaders_) {
fullTime += delayedReader.accumulatedTime();
}
return fullTime;
}
void SerialRNTupleTFileSource::printSummary() const {
std::chrono::microseconds sourceTime = accumulatedTime();
std::cout <<"\nSource time: "<<sourceTime.count()<<"us\n"<<std::endl;
}
namespace {
class Maker : public SourceMakerBase {
public:
Maker(): SourceMakerBase("SerialRNTupleTFileSource") {}
std::unique_ptr<SharedSourceBase> create(unsigned int iNLanes, unsigned long long iNEvents, ConfigurationParameters const& params) const final {
auto fileName = params.get<std::string>("fileName");
if(not fileName) {
std::cout <<"no file name given\n";
return {};
}
return std::make_unique<SerialRNTupleTFileSource>(iNLanes, iNEvents, *fileName, params.get<bool>("delayReading",false));
}
};
Maker s_maker;
}