-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathAQTest.cpp
More file actions
131 lines (109 loc) · 3.31 KB
/
AQTest.cpp
File metadata and controls
131 lines (109 loc) · 3.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#include <iostream>
#include <sstream>
#include <chrono>
#include <mutex>
#include <thread>
#include <algorithm>
#include <string>
#include "AsyncRequest.hpp"
void print(const std::string& s)
{
using namespace std;
static mutex m;
unique_lock<mutex> lk(m);
cout << "Thread " << this_thread::get_id() << "\t" << s << endl;
}
//--------------------------------------------------------------------
class SampleAsyncRequest : public AsyncRequest {
public:
explicit SampleAsyncRequest(int i)
:
index_(i)
{
}
virtual ~SampleAsyncRequest()
{
}
// Must override call() in derived classes
//
virtual void call()
{
using namespace std;
std::ostringstream out;
out << "Function: "<< __func__ << "(), index: " << index_;
print(out.str());
this_thread::sleep_for(chrono::milliseconds(100));
}
private:
int index_;
};
//----------------------------------------------------------
int main(int argc, char* argv[])
{
using namespace std;
int nthreads = (argc > 1) ? atoi(argv[1]) : 256;
int qsize = (argc > 2) ? atoi(argv[2]) : 1024;
int tasks_remaining = 10000;
int treceived = 0;
int index = 0;
auto aq = std::make_unique<AsyncRequestMgr>(nthreads, qsize);
aq->start();
AsyncRequest* request = nullptr;
// we have TOTAL_TASKS to perform,
// we submit tasks in chunks of up to qsize
//
for (;;) {
int chunk = std::min(tasks_remaining, qsize);
while (chunk) {
if (!request)
request = new SampleAsyncRequest(index);
if (aq->try_submit(request) == AResult::Full)
break;
request = nullptr;
++index;
--tasks_remaining;
--chunk;
}
// container to collect all returned tasks
//
std::deque<AsyncRequest*> results;
// poll fills all returned tasks in results if there are any
//
auto rc = aq->poll(results);
// query the results
//
if (rc == AResult::Empty) {
// if both empty AND no tasks remaining, we are done
//
if (tasks_remaining == 0)
break;
}
else if (rc == AResult::Ready) {
// Some results were ready when calling poll. Here we just
// cleanup the tasks object
//
for (auto p : results) {
treceived++;
delete p;
}
}
else if (rc == AResult::Pending) {
// Some tasks were being processed but not finished yet
// we can sleep or block (with or without timeout) until some
// results are ready. Here we block until a result is ready,
// but with timeout of 1000 microsecond
//
auto acked = aq->receive_uptr<SampleAsyncRequest>(1000 /* microseconds */);
// If evaulates to false then we got a timeout
//
if (acked) {
treceived++;
}
}
}
print("Just Before SHUTDOWN");
aq->shutdown();
print("Received " + std::to_string(treceived));
cout << "Last call receive: " << aq->receive(request) << endl;
}
//-----------------------------------------------------------------------