Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/cconfig/CzarConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,6 @@ class CzarConfig {
*/
int getXrootdCBThreadsInit() const { return _xrootdCBThreadsInit->getVal(); }

bool getQueryDistributionTestVer() const { return _queryDistributionTestVer->getVal(); }

/*
* @return A value of the "spread" parameter. This may improve a performance
* of xrootd for catalogs with the large number of chunks. The default value
Expand Down Expand Up @@ -364,8 +362,6 @@ class CzarConfig {
util::ConfigValTInt::create(_configValMap, "tuning", "xrootdCBThreadsMax", notReq, 500);
CVTIntPtr _xrootdCBThreadsInit =
util::ConfigValTInt::create(_configValMap, "tuning", "xrootdCBThreadsInit", notReq, 50);
CVTIntPtr _queryDistributionTestVer =
util::ConfigValTInt::create(_configValMap, "tuning", "queryDistributionTestVer", notReq, 0);
CVTBoolPtr _notifyWorkersOnQueryFinish =
util::ConfigValTBool::create(_configValMap, "tuning", "notifyWorkersOnQueryFinish", notReq, 1);
CVTBoolPtr _notifyWorkersOnCzarRestart =
Expand Down
23 changes: 8 additions & 15 deletions src/ccontrol/UserQuerySelect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -423,33 +423,26 @@ void UserQuerySelect::_expandSelectStarInMergeStatment(std::shared_ptr<query::Se
void UserQuerySelect::saveResultQuery() { _queryMetadata->saveResultQuery(_queryId, getResultQuery()); }

void UserQuerySelect::_setupChunking() {
LOGS(_log, LOG_LVL_TRACE, "Setup chunking");
std::shared_ptr<qproc::IndexMap> im;
std::shared_ptr<IntSet const> eSet = _qSession->getEmptyChunks();
{
eSet = _qSession->getEmptyChunks();
if (!eSet) {
eSet = std::make_shared<IntSet>();
LOGS(_log, LOG_LVL_WARN, "Missing empty chunks info for dominantDbs");
}
}
// FIXME add operator<< for QuerySession
LOGS(_log, LOG_LVL_TRACE, "_qSession: " << _qSession);
LOGS(_log, LOG_LVL_TRACE, "Setup chunking _qSession: " << _qSession);
if (_qSession->hasChunks()) {
auto areaRestrictors = _qSession->getAreaRestrictors();
auto secIdxRestrictors = _qSession->getSecIdxRestrictors();
css::StripingParams partStriping = _qSession->getDbStriping();

im = std::make_shared<qproc::IndexMap>(partStriping, _secondaryIndex);
auto const im = std::make_shared<qproc::IndexMap>(partStriping, _secondaryIndex);
qproc::ChunkSpecVector csv;
if (areaRestrictors != nullptr || secIdxRestrictors != nullptr) {
csv = im->getChunks(areaRestrictors, secIdxRestrictors);
} else { // Unrestricted: full-sky
csv = im->getAllChunks();
}

LOGS(_log, LOG_LVL_TRACE, "Chunk specs: " << util::printable(csv));

// Filter out empty chunks
std::shared_ptr<IntSet const> eSet = _qSession->getEmptyChunks();
if (!eSet) {
eSet = std::make_shared<IntSet const>();
LOGS(_log, LOG_LVL_WARN, "Missing empty chunks info for dominantDbs");
}
for (qproc::ChunkSpecVector::const_iterator i = csv.begin(), e = csv.end(); i != e; ++i) {
if (eSet->count(i->chunkId) == 0) { // chunk not in empty?
_qSession->addChunk(*i);
Expand Down
10 changes: 2 additions & 8 deletions src/czar/Czar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ Czar::Czar(string const& configFilePath, string const& czarName)
int const xrootdSpread = _czarConfig->getXrootdSpread();
LOGS(_log, LOG_LVL_INFO, "config xrootdSpread=" << xrootdSpread);
XrdSsiProviderClient->SetSpread(xrootdSpread);
_queryDistributionTestVer = _czarConfig->getQueryDistributionTestVer();

LOGS(_log, LOG_LVL_INFO, "Creating czar instance with name " << czarName);
LOGS(_log, LOG_LVL_INFO, "Czar config: " << *_czarConfig);
Expand Down Expand Up @@ -223,13 +222,8 @@ SubmitResult Czar::submitQuery(string const& query, map<string, string> const& h
}

// make new UserQuery
// this is atomic
ccontrol::UserQuery::Ptr uq;
{
lock_guard<mutex> lock(_mutex);
uq = _uqFactory->newUserQuery(query, defaultDb, getQdispSharedResources(), userQueryId, msgTableName,
resultDb);
}
ccontrol::UserQuery::Ptr const uq = _uqFactory->newUserQuery(query, defaultDb, getQdispSharedResources(),
userQueryId, msgTableName, resultDb);

// Add logging context with query ID
QSERV_LOGCONTEXT_QUERY(uq->getQueryId());
Expand Down
6 changes: 0 additions & 6 deletions src/czar/Czar.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,6 @@ class Czar {
/// Return a pointer to QdispSharedResources
qdisp::SharedResources::Ptr getQdispSharedResources() { return _qdispSharedResources; }

/// @return true if trivial queries should be treated as
/// interactive queries to stress test the czar.
bool getQueryDistributionTestVer() { return _queryDistributionTestVer; }

/// @param queryId The unique identifier of the previously submitted user query
/// @return The reconstructed info for the query
SubmitResult getQueryInfo(QueryId queryId) const;
Expand Down Expand Up @@ -161,8 +157,6 @@ class Czar {
/// and any other resources for use by query executives.
qdisp::SharedResources::Ptr _qdispSharedResources;

bool _queryDistributionTestVer; ///< True if config says this is distribution test version.

/// Reloads the log configuration file on log config file change.
std::shared_ptr<util::FileMonitor> _logFileMonitor;

Expand Down
1 change: 0 additions & 1 deletion src/qana/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ FUNCTION(qana_tests)
target_link_libraries(${TEST} PUBLIC
cconfig
ccontrol
czar
parser
qana
qdisp
Expand Down
13 changes: 1 addition & 12 deletions src/qana/ScanTablePlugin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
#include "lsst/log/Log.h"

// Qserv headers
#include "czar/Czar.h"
#include "global/stringTypes.h"
#include "proto/ScanTableInfo.h"
#include "query/ColumnRef.h"
Expand Down Expand Up @@ -173,18 +172,8 @@ proto::ScanInfo ScanTablePlugin::_findScanTables(query::SelectStmt& stmt, query:
}
}

auto czar = czar::Czar::getCzar();
bool queryDistributionTestVer = (czar == nullptr) ? 0 : czar->getQueryDistributionTestVer();

StringPairVector scanTables;
// Even trivial queries need to use full table scans to avoid crippling the czar
// with heaps of high priority, but very simple queries. Unless there are
// factors that greatly restrict how many chunks need to be read, it needs to be
// a table scan.
// For system testing, it is useful to see how the system handles large numbers
// trivial queries as the amount of work done by the workers is minimal. This
// highlights the cost of czar-worker communications.
if (!queryDistributionTestVer || hasSelectColumnRef) {
if (hasSelectColumnRef) {
if (hasSecondaryKey) {
LOGS(_log, LOG_LVL_TRACE, "**** Not a scan ****");
// Not a scan? Leave scanTables alone
Expand Down
4 changes: 0 additions & 4 deletions src/qproc/QuerySession.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ std::shared_ptr<query::SelectStmt> QuerySession::parseQuery(std::string const& s
void QuerySession::analyzeQuery(std::string const& sql, std::shared_ptr<query::SelectStmt> const& stmt) {
_original = sql;
_stmt = stmt;
_isFinal = false;
_initContext();
assert(_context.get());

Expand Down Expand Up @@ -288,9 +287,6 @@ std::shared_ptr<query::SelectStmt> QuerySession::getMergeStmt() const {
}

void QuerySession::finalize() {
if (_isFinal) {
return;
}
QueryPluginPtrVector::iterator i;
for (i = _plugins->begin(); i != _plugins->end(); ++i) {
(**i).applyFinal(*_context);
Expand Down
1 change: 0 additions & 1 deletion src/qproc/QuerySession.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ class QuerySession {
std::string _tmpTable;
std::string _resultTable;
std::string _error;
int _isFinal = 0; ///< Has query analysis/optimization completed?

ChunkSpecVector _chunks; ///< Chunk coverage
std::shared_ptr<QueryPluginPtrVector> _plugins; ///< Analysis plugin chain
Expand Down
Loading