forked from hep-cce2/root_serialization
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRepeatingRootSource.cc
More file actions
132 lines (122 loc) · 4.38 KB
/
RepeatingRootSource.cc
File metadata and controls
132 lines (122 loc) · 4.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#include "RepeatingRootSource.h"
#include <iostream>
#include "TBranch.h"
#include "TTree.h"
#include "TFile.h"
#include "TClass.h"
#include <unordered_set>
using namespace cce::tf;
RepeatingRootSource::RepeatingRootSource(std::string const& iName, unsigned int iNUniqueEvents, unsigned int iNLanes, unsigned long long iNEvents, std::string const& iBranchToRead) :
SharedSourceBase(iNEvents),
nUniqueEvents_(iNUniqueEvents),
dataProductsPerLane_(iNLanes),
identifierPerEvent_(iNUniqueEvents),
dataBuffersPerEvent_(iNUniqueEvents),
accumulatedTime_(0)
{
auto file_ = std::unique_ptr<TFile>(TFile::Open(iName.c_str()));
auto events = file_->Get<TTree>("Events");
auto l = events->GetListOfBranches();
const std::string eventAuxiliaryBranchName{"EventAuxiliary"};
const std::string eventIDBranchName{"EventID"};
TBranch* eventIDBranch = nullptr;
int eventAuxIndex = -1;
for(auto& dataProducts: dataProductsPerLane_) {
dataProducts.reserve(l->GetEntriesFast());
}
std::unordered_set<std::string> branchesToRead;
if(not iBranchToRead.empty()) {
branchesToRead.insert(iBranchToRead);
branchesToRead.insert(eventAuxiliaryBranchName);
}
std::vector<TBranch*> branches;
branches.reserve(l->GetEntriesFast());
int index=0;
for( int i=0; i< l->GetEntriesFast(); ++i) {
auto b = dynamic_cast<TBranch*>((*l)[i]);
//std::cout<<b->GetName()<<std::endl;
//std::cout<<b->GetClassName()<<std::endl;
if(eventIDBranchName == b->GetName()) {
eventIDBranch = b;
continue;
}
b->SetupAddresses();
TClass* class_ptr=nullptr;
EDataType type;
b->GetExpectedType(class_ptr,type);
assert(class_ptr != nullptr);
if((not branchesToRead.empty()) and branchesToRead.end() == branchesToRead.find(b->GetName())) {
continue;
}
std::cout<<b->GetName()<<std::endl;
for(auto& dataProducts: dataProductsPerLane_) {
dataProducts.emplace_back(index,
nullptr,
b->GetName(),
class_ptr,
&delayedReader_);
}
branches.emplace_back(b);
if(eventAuxiliaryBranchName == dataProductsPerLane_[0].back().name()) {
eventAuxIndex = index;
}
++index;
}
EventAuxReader aux_reader{*file_};
for(int i=0; i<nUniqueEvents_; ++i) {
fillBuffer(i, dataBuffersPerEvent_[i], branches);
if(eventAuxIndex != -1) {
auto addr = &dataBuffersPerEvent_[i][eventAuxIndex].address_;
identifierPerEvent_[i] = aux_reader.doWork(addr);
//std::cout <<"id "<<identifierPerEvent_[i].event<<std::endl;
} else if(eventIDBranch) {
eventIDBranch->SetAddress(&identifierPerEvent_[i]);
eventIDBranch->GetEntry(i);
} else {
identifierPerEvent_[i] = {1,1,static_cast<unsigned long long>(i)};
}
}
}
RepeatingRootSource::~RepeatingRootSource() {
for(auto& dataProducts: dataProductsPerLane_) {
for(auto& d: dataProducts) {
d.setAddress(nullptr);
}
}
for(auto& buffers: dataBuffersPerEvent_) {
auto it = buffers.begin();
for(auto& d: dataProductsPerLane_[0]) {
d.classType()->Destructor(it->address_);
++it;
}
}
}
void RepeatingRootSource::readEventAsync(unsigned int iLane, long iEventIndex, OptionalTaskHolder iTask) {
auto start = std::chrono::high_resolution_clock::now();
auto presentEventIndex = iEventIndex % nUniqueEvents_;
auto it = dataBuffersPerEvent_[presentEventIndex].begin();
auto& dataProducts = dataProductsPerLane_[iLane];
for(auto& d: dataProducts) {
d.setAddress(&it->address_);
d.setSize(it->size_);
++it;
}
auto time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::high_resolution_clock::now() - start);
accumulatedTime_ += time.count();
iTask.runNow();
}
void RepeatingRootSource::fillBuffer(int iEntry, std::vector<BufferInfo>& bi, std::vector<TBranch*>& branches) {
bi.reserve(branches.size());
for(int i=0; i< branches.size(); ++i) {
void* object = dataProductsPerLane_[0][i].classType()->New();
auto branch = branches[i];
branch->SetAddress(&object);
auto s = branch->GetEntry(iEntry);
branch->SetAddress(nullptr);
bi.emplace_back( object, static_cast<size_t>(s));
}
}
void RepeatingRootSource::printSummary() const {
std::chrono::microseconds sourceTime = accumulatedTime();
std::cout <<"\nSource time: "<<sourceTime.count()<<"us\n"<<std::endl;
}