Skip to content

Commit 5c9ec93

Browse files
jgates108fritzm
authored andcommitted
Code changes to test FIFO vs Shared scan.
1 parent a5c58b4 commit 5c9ec93

5 files changed

Lines changed: 359 additions & 2 deletions

File tree

src/wmain/WorkerMain.cc

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@
6565
#include "wsched/BlendScheduler.h"
6666
#include "wsched/FifoScheduler.h"
6767
#include "wsched/GroupScheduler.h"
68-
#include "wsched/ScanScheduler.h"
6968
#include "wcomms/HttpSvc.h"
69+
#include "wsched/ScanSchedFifo.h"
7070

7171
using namespace lsst::qserv;
7272
using namespace nlohmann;
@@ -146,6 +146,7 @@ WorkerMain::WorkerMain() {
146146
int const medium = lsst::qserv::protojson::ScanInfo::Rating::MEDIUM;
147147
int const slow = lsst::qserv::protojson::ScanInfo::Rating::SLOW;
148148
int const slowest = lsst::qserv::protojson::ScanInfo::Rating::SLOWEST;
149+
#if 0 // &&& use ScanSchedulers
149150
double fastScanMaxMinutes = (double)workerConfig->getScanMaxMinutesFast();
150151
double medScanMaxMinutes = (double)workerConfig->getScanMaxMinutesMed();
151152
double slowScanMaxMinutes = (double)workerConfig->getScanMaxMinutesSlow();
@@ -169,6 +170,31 @@ WorkerMain::WorkerMain() {
169170
auto snail = make_shared<wsched::ScanScheduler>(
170171
"SchedSnail", maxThread, workerConfig->getMaxReserveSnail(), workerConfig->getPrioritySnail(),
171172
workerConfig->getMaxActiveChunksSnail(), slow + 1, slowest, snailScanMaxMinutes);
173+
#else //&&& use ScanSchedFifo which are just FIFO
174+
double fastScanMaxMinutes = (double)workerConfig->getScanMaxMinutesFast();
175+
double medScanMaxMinutes = (double)workerConfig->getScanMaxMinutesMed();
176+
double slowScanMaxMinutes = (double)workerConfig->getScanMaxMinutesSlow();
177+
double snailScanMaxMinutes = (double)workerConfig->getScanMaxMinutesSnail();
178+
int maxTasksBootedPerUserQuery = workerConfig->getMaxTasksBootedPerUserQuery();
179+
int maxConcurrentBootedTasks = workerConfig->getMaxConcurrentBootedTasks();
180+
vector<wsched::ScanSchedFifo::Ptr> scanSchedulers{
181+
make_shared<wsched::ScanSchedFifo>("SchedSlow", maxThread, workerConfig->getMaxReserveSlow(),
182+
workerConfig->getPrioritySlow(),
183+
workerConfig->getMaxActiveChunksSlow(), medium + 1, slow,
184+
slowScanMaxMinutes),
185+
make_shared<wsched::ScanSchedFifo>("SchedFast", maxThread, workerConfig->getMaxReserveFast(),
186+
workerConfig->getPriorityFast(),
187+
workerConfig->getMaxActiveChunksFast(), fastest, fast,
188+
fastScanMaxMinutes),
189+
make_shared<wsched::ScanSchedFifo>(
190+
"SchedMed", maxThread, workerConfig->getMaxReserveMed(), workerConfig->getPriorityMed(),
191+
workerConfig->getMaxActiveChunksMed(), fast + 1, medium, medScanMaxMinutes),
192+
};
193+
194+
auto snail = make_shared<wsched::ScanSchedFifo>(
195+
"SchedSnail", maxThread, workerConfig->getMaxReserveSnail(), workerConfig->getPrioritySnail(),
196+
workerConfig->getMaxActiveChunksSnail(), slow + 1, slowest, snailScanMaxMinutes);
197+
#endif // &&&
172198

173199
wpublish::QueriesAndChunks::Ptr queries = wpublish::QueriesAndChunks::setupGlobal(
174200
chrono::minutes(5), chrono::minutes(2), maxTasksBootedPerUserQuery, maxConcurrentBootedTasks,

src/wsched/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ target_sources(wsched PRIVATE
44
BlendScheduler.cc
55
ChunkTasksQueue.cc
66
GroupScheduler.cc
7-
ScanScheduler.cc
87
SchedulerBase.cc
8+
ScanScheduler.cc
9+
ScanSchedFifo.cc
910
)
1011

1112
install(

src/wsched/ScanSchedFifo.cc

Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
// -*- LSST-C++ -*-
2+
/*
3+
* LSST Data Management System
4+
* Copyright 2013-2019 LSST.
5+
*
6+
* This product includes software developed by the
7+
* LSST Project (http://www.lsst.org/).
8+
*
9+
* This program is free software: you can redistribute it and/or modify
10+
* it under the terms of the GNU General Public License as published by
11+
* the Free Software Foundation, either version 3 of the License, or
12+
* (at your option) any later version.
13+
*
14+
* This program is distributed in the hope that it will be useful,
15+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
16+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17+
* GNU General Public License for more details.
18+
*
19+
* You should have received a copy of the LSST License Statement and
20+
* the GNU General Public License along with this program. If not,
21+
* see <http://www.lsstcorp.org/LegalNotices/>.
22+
*/
23+
/**
24+
* @file
25+
*
26+
* @brief A scheduler implementation that limits disk scans to one at
27+
* a time, but allows multiple queries to share I/O.
28+
*
29+
* @author Daniel L. Wang, SLAC
30+
*/
31+
32+
// Class header
33+
34+
#include "wsched/ScanSchedFifo.h"
35+
36+
// System headers
37+
#include <cstddef>
38+
#include <iostream>
39+
#include <mutex>
40+
#include <sstream>
41+
42+
// LSST headers
43+
#include "lsst/log/Log.h"
44+
45+
// Qserv headers
46+
#include "global/LogContext.h"
47+
#include "util/Bug.h"
48+
#include "util/Timer.h"
49+
#include "wcontrol/Foreman.h"
50+
#include "wsched/BlendScheduler.h"
51+
#include "wsched/ChunkTasksQueue.h"
52+
53+
namespace {
54+
LOG_LOGGER _log = LOG_GET("lsst.qserv.wsched.ScanScheduler");
55+
}
56+
57+
using namespace std;
58+
59+
namespace lsst::qserv::wsched {
60+
61+
ScanSchedFifo::ScanSchedFifo(string const& name, int maxThreads, int maxReserve, int priority,
62+
int maxActiveChunks, int minRating, int maxRating, double maxTimeMinutes)
63+
/* &&&
64+
: SchedulerBase{name, maxThreads, maxReserve, maxActiveChunks, priority},
65+
_minRating{minRating},
66+
_maxRating{maxRating},
67+
_maxTimeMinutes{maxTimeMinutes} {
68+
*/
69+
: ScanScheduler(name, maxThreads, maxReserve, priority, maxActiveChunks, minRating, maxRating,
70+
maxTimeMinutes) {}
71+
72+
/// Returns true if there is a Task ready to go and we aren't up against any limits.
73+
bool ScanSchedFifo::ready() {
74+
lock_guard<mutex> lock(util::CommandQueue::_mx);
75+
bool logStuff = false;
76+
if (_infoChanged) {
77+
_infoChanged = false;
78+
logStuff = true;
79+
LOGS(_log, LOG_LVL_TRACE,
80+
getName() << " ScanSchedFifo::_ready "
81+
<< " inFlight=" << _inFlight << " maxThreads=" << _maxThreads
82+
<< " adj=" << _maxThreadsAdj << " activeChunks=" << getActiveChunkCount()
83+
<< " fifo.sz=" << _taskFifo->size());
84+
}
85+
if (_inFlight >= maxInFlight()) {
86+
if (logStuff) {
87+
LOGS(_log, LOG_LVL_TRACE, getName() << " ScanSchedFifo::_ready too many in flight " << _inFlight);
88+
}
89+
return false;
90+
}
91+
92+
return _ready();
93+
}
94+
95+
bool ScanSchedFifo::_ready() const { return !_taskFifo->empty(); }
96+
97+
size_t ScanSchedFifo::getSize() const {
98+
lock_guard<mutex> lock(util::CommandQueue::_mx);
99+
return _taskFifo->size();
100+
}
101+
102+
util::Command::Ptr ScanSchedFifo::getCmd(bool wait) {
103+
unique_lock<mutex> lock(util::CommandQueue::_mx);
104+
LOGS(_log, LOG_LVL_TRACE, "start getCmd " << getName() << " " << _taskFifo->size());
105+
if (wait) {
106+
util::CommandQueue::_cv.wait(lock, [this]() { return _ready(); });
107+
} else if (!_ready()) {
108+
return nullptr;
109+
}
110+
//&&&bool useFlexibleLock = (_inFlight < 1);
111+
//&&&auto task = _taskQueue->getTask(useFlexibleLock);
112+
auto task = _taskFifo->front();
113+
_taskFifo->pop_front();
114+
if (task != nullptr) {
115+
++_inFlight; // in flight as soon as it is off the queue.
116+
QSERV_LOGCONTEXT_QUERY_JOB(task->getQueryId(), task->getJobId());
117+
LOGS(_log, LOG_LVL_TRACE,
118+
"getCmd " << getName() << " tskStart chunk=" << task->getChunkId() << " tid=" << task->getIdStr()
119+
<< " inflight=" << _inFlight << " " << _taskFifo->size());
120+
_infoChanged = true;
121+
_decrCountForUserQuery(task->getQueryId());
122+
_incrChunkTaskCount(task->getChunkId());
123+
// Since a command was retrieved, there's a possibility another is ready.
124+
notify(false); // notify all=false
125+
}
126+
return task;
127+
}
128+
129+
void ScanSchedFifo::queCmd(util::Command::Ptr const& cmd) {
130+
vector<util::Command::Ptr> vect;
131+
vect.push_back(cmd);
132+
queCmd(vect);
133+
}
134+
135+
void ScanSchedFifo::queCmd(vector<util::Command::Ptr> const& cmds) {
136+
LOGS(_log, LOG_LVL_TRACE, "ScanSchedFifo::queCmd cmds.sz=" << cmds.size());
137+
std::vector<wbase::Task::Ptr> tasks;
138+
bool first = true;
139+
QueryId qid;
140+
int jid = 0;
141+
// Convert to a vector of tasks
142+
for (auto const& cmd : cmds) {
143+
wbase::Task::Ptr tsk = dynamic_pointer_cast<wbase::Task>(cmd);
144+
if (tsk == nullptr) {
145+
throw util::Bug(ERR_LOC, getName() + " queCmd could not be converted to Task or was nullptr");
146+
}
147+
if (first) {
148+
first = false;
149+
qid = tsk->getQueryId();
150+
jid = tsk->getJobId();
151+
QSERV_LOGCONTEXT_QUERY_JOB(qid, jid);
152+
} else {
153+
if (qid != tsk->getQueryId()) {
154+
string eMsg("Mismatch multiple query/job ids in single queCmd ");
155+
eMsg += " expected QID=" + to_string(qid) + " got=" + to_string(tsk->getQueryId());
156+
eMsg += " expected JID=" + to_string(qid) + " got=" + to_string(tsk->getJobId());
157+
LOGS(_log, LOG_LVL_ERROR, eMsg);
158+
// This could cause difficult to detect problems later on.
159+
throw util::Bug(ERR_LOC, eMsg);
160+
return;
161+
}
162+
}
163+
tasks.push_back(tsk);
164+
LOGS(_log, LOG_LVL_TRACE, getName() << " queCmd " << tsk->getIdStr());
165+
}
166+
// Queue the tasks
167+
{
168+
lock_guard<mutex> lock(util::CommandQueue::_mx);
169+
auto uqCount = _incrCountForUserQuery(qid, tasks.size());
170+
LOGS(_log, LOG_LVL_TRACE, getName() << " queCmd " << " uqCount=" << uqCount);
171+
for (auto const& tsk : tasks) {
172+
_taskFifo->push_back(tsk);
173+
}
174+
_infoChanged = true;
175+
}
176+
177+
if (cmds.size() > 1) {
178+
util::CommandQueue::_cv.notify_all();
179+
} else {
180+
util::CommandQueue::_cv.notify_one();
181+
}
182+
}
183+
184+
/// @returns - true if a task was removed from the queue. If the task was running
185+
/// or not found, false is returned. A return value of true indicates
186+
/// that the task still needs to be started.
187+
/// If the task is running: the task continues to run, but the scheduler is told
188+
/// it is finished (this allows the scheduler to move on), and its thread is removed
189+
/// from the thread pool (the thread pool creates a new thread to replace it).
190+
bool ScanSchedFifo::removeTask(wbase::Task::Ptr const& task, bool removeRunning) {
191+
QSERV_LOGCONTEXT_QUERY_JOB(task->getQueryId(), task->getJobId());
192+
193+
LOGS(_log, LOG_LVL_INFO, __func__ << " " << getName());
194+
// Check if task is in the queue.
195+
// _taskQueue has its own mutex to protect this.
196+
//&&&auto rmTask = _taskQueue->removeTask(task);
197+
unique_lock<mutex> lock(util::CommandQueue::_mx);
198+
auto rmTask = _removeTask(task);
199+
bool inQueue = rmTask != nullptr;
200+
LOGS(_log, LOG_LVL_DEBUG, "removeTask inQueue=" << inQueue);
201+
if (inQueue) {
202+
LOGS(_log, LOG_LVL_INFO, "removeTask moving task on queue");
203+
_decrCountForUserQuery(task->getQueryId());
204+
return true;
205+
}
206+
207+
LOGS(_log, LOG_LVL_DEBUG, "removeTask not in queue");
208+
// Wasn't in the queue, could be in flight.
209+
if (!removeRunning) {
210+
LOGS(_log, LOG_LVL_DEBUG, "removeTask not removing running tasks");
211+
return false;
212+
}
213+
214+
/// Don't remove the task if there are already too many threads in existence.
215+
if (task->atMaxThreadCount()) {
216+
LOGS(_log, LOG_LVL_WARN, "removeTask couldn't move as too many threads existing");
217+
return false;
218+
}
219+
220+
/// The task can only leave the pool if it has been started, poolThread should be set as
221+
/// it is safe to move the running task according to the test above.
222+
auto poolThread = task->getAndNullPoolEventThread();
223+
if (poolThread != nullptr) {
224+
LOGS(_log, LOG_LVL_INFO, "removeTask moving running task");
225+
return poolThread->leavePool(task);
226+
} else {
227+
LOGS(_log, LOG_LVL_DEBUG,
228+
"removeTask PoolEventThread was null, "
229+
"presumably already moved for large result.");
230+
}
231+
return false;
232+
}
233+
234+
wbase::Task::Ptr ScanSchedFifo::_removeTask(wbase::Task::Ptr const& task) {
235+
LOGS(_log, LOG_LVL_DEBUG, __func__ << task->getIdStr());
236+
auto iter = _taskFifo->begin();
237+
while (iter != _taskFifo->end()) {
238+
if (*iter == task) {
239+
_taskFifo->erase(iter);
240+
return task;
241+
} else {
242+
++iter;
243+
}
244+
}
245+
return nullptr;
246+
}
247+
248+
} // namespace lsst::qserv::wsched

src/wsched/ScanSchedFifo.h

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// -*- LSST-C++ -*-
2+
/*
3+
* LSST Data Management System
4+
* Copyright 2013-2016 LSST Corporation.
5+
*
6+
* This product includes software developed by the
7+
* LSST Project (http://www.lsst.org/).
8+
*
9+
* This program is free software: you can redistribute it and/or modify
10+
* it under the terms of the GNU General Public License as published by
11+
* the Free Software Foundation, either version 3 of the License, or
12+
* (at your option) any later version.
13+
*
14+
* This program is distributed in the hope that it will be useful,
15+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
16+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17+
* GNU General Public License for more details.
18+
*
19+
* You should have received a copy of the LSST License Statement and
20+
* the GNU General Public License along with this program. If not,
21+
* see <http://www.lsstcorp.org/LegalNotices/>.
22+
*/
23+
#ifndef LSST_QSERV_WSCHED_SCANSCHEDFIFO_H
24+
#define LSST_QSERV_WSCHED_SCANSCHEDFIFO_H
25+
26+
// System headers
27+
#include <atomic>
28+
#include <list>
29+
#include <memory>
30+
#include <mutex>
31+
32+
// Qserv headers
33+
#include "wsched/ScanScheduler.h"
34+
35+
// Forward declarations
36+
namespace lsst::qserv::wbase {
37+
class Task;
38+
} // namespace lsst::qserv::wbase
39+
40+
namespace lsst::qserv::wsched {
41+
class BlendScheduler;
42+
} // namespace lsst::qserv::wsched
43+
44+
namespace lsst::qserv::wsched {
45+
46+
class ScanSchedFifo : public ScanScheduler { //&&&
47+
public:
48+
typedef std::shared_ptr<ScanScheduler> Ptr;
49+
50+
ScanSchedFifo(std::string const& name, int maxThreads, int maxReserve, int priority, int maxActiveChunks,
51+
int minRating, int maxRating, double maxTimeMinutes);
52+
virtual ~ScanSchedFifo() {}
53+
54+
void queCmd(std::vector<util::Command::Ptr> const& cmds) override; //&&& check
55+
void queCmd(util::Command::Ptr const& cmd) override; //&&& check
56+
57+
util::Command::Ptr getCmd(bool wait) override; //&&& check
58+
// void commandStart(util::Command::Ptr const& cmd) override; //&&& ok
59+
// void commandFinish(util::Command::Ptr const& cmd) override; //&&& ok
60+
61+
// SchedulerBase overrides
62+
bool ready() override; //&&& check
63+
std::size_t getSize() const override; //&&& check
64+
65+
bool removeTask(wbase::Task::Ptr const& task, bool removeRunning) override; //&&& check
66+
67+
private:
68+
wbase::Task::Ptr _removeTask(wbase::Task::Ptr const& task);
69+
70+
/// Return true if a Task is ready to be run.
71+
/// util::CommandQueue::_mx must be locked before running.
72+
bool _ready() const;
73+
74+
/// Protects _taskFifo as it has no internal mutex protection.
75+
mutable std::mutex _taskFifoMtx;
76+
std::shared_ptr<std::list<wbase::Task::Ptr>> _taskFifo{new std::list<wbase::Task::Ptr>()};
77+
};
78+
79+
} // namespace lsst::qserv::wsched
80+
81+
#endif // LSST_QSERV_WSCHED_SCANSCHEDFIFO_H

src/wsched/ScanScheduler.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ class ScanScheduler : public SchedulerBase {
8181
bool _ready();
8282
std::shared_ptr<ChunkTaskCollection> _taskQueue; ///< Constrains access to files.
8383

84+
protected:
8485
/// Scans placed on this scheduler should have a rating between(inclusive) _minRating and _maxRating.
8586
const int _minRating;
8687
const int _maxRating;

0 commit comments

Comments
 (0)