Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/ccontrol/UserQuerySelect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ void UserQuerySelect::submit() {

string dbName("");
bool dbNameSet = false;

bool checkDbs = true;
for (auto i = _qSession->cQueryBegin(), e = _qSession->cQueryEnd(); i != e && !exec->getCancelled();
++i) {
auto& chunkSpec = *i;
Expand All @@ -267,7 +267,10 @@ void UserQuerySelect::submit() {
{
lock_guard<mutex> lock(chunksMtx);
bool fillInChunkIdTag = false; // do not fill in the chunkId
cs = _qSession->buildChunkQuerySpec(queryTemplates, chunkSpec, fillInChunkIdTag);
cs = _qSession->buildChunkQuerySpec(queryTemplates, chunkSpec, fillInChunkIdTag, checkDbs);
Copy link
Copy Markdown
Contributor

@fritzm fritzm Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to pass in checkDBs here as an argument, rather than having an internal _dbsChecked member on the session, like one would do with a lazy constructor?

The "lazy" way I think would have fewer edits, and would be more robust against future code changes, guaranteeing always called exactly once before first needed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After pulling in a rebase from main, some tests on the cluster got painfully slow (from 4min to 35min) as the method for accessing CSS changed on main. This is meant to be stop gap measure to allow testing and reasonable performance until it gets fixed properly. Existing comment I put in buildChunkQuerySpec. validateDominantDbs() is called in numerous places for unknown reasons.
/// TODO: checkDominantDb may not be the best way to go about this, but checking it
/// for every chunk is not useful and wastes a lot of time as the information doesn't
/// change and all chunks use the same databases (while using different tables).
/// There are things that could be done, maybe all are applicable, or maybe not worth the effort:
/// - validateDominantDbs() is called in many places and maybe it just doesn't need to be called
/// that often.
/// - Create a cache of CSS that is immutable so many threads can read without locking.
/// - When a query is started, it asks for a shared pointer to the CSS cache, if the
/// CSS cache is obsolete, a new cache is made and returned, otherwise the existing
/// cache is returned. This way, the CSS cache is only updated when the CSS changes,
/// and all threads can read from it without locking. Changing CSS values mid analysis
// may be detrimental, and this would prevent that as well.
if (checkDominantDb && !validateDominantDbs()) {

// Only need to check the dominantDbs for the first chunk, as all chunks
// should have the same databases (same databases, different tables).
checkDbs = false;
chunks.push_back(cs->chunkId);
}

Expand Down
6 changes: 4 additions & 2 deletions src/czar/ActiveWorker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,10 @@ void ActiveWorker::setWorkerContactInfo(protojson::WorkerContactInfo::Ptr const&
}

void ActiveWorker::_changeStateTo(State newState, double secsSinceUpdate, string const& note) {
auto lLvl = (newState == DEAD) ? LOG_LVL_ERROR : LOG_LVL_INFO;
auto lLvl = (newState == DEAD) ? LOG_LVL_ERROR : LOG_LVL_WARN;
LOGS(_log, lLvl,
note << " oldState=" << getStateStr(_state) << " newState=" << getStateStr(newState)
<< " secsSince=" << secsSinceUpdate);
_state = newState;
}

void ActiveWorker::updateStateAndSendMessages(double timeoutAliveSecs, double timeoutDeadSecs,
Expand All @@ -96,6 +95,9 @@ void ActiveWorker::updateStateAndSendMessages(double timeoutAliveSecs, double ti
cName(__func__) << " wInfo=" << wInfo_->dump()
<< " secsSince=" << wInfo_->timeSinceRegUpdateSeconds()
<< " secsSinceUpdate=" << secsSinceUpdate);
LOGS(_log, LOG_LVL_WARN,
"&&& " << cName(__func__) << " wInfo=" << wInfo_->dump() << " secsSince="
<< wInfo_->timeSinceRegUpdateSeconds() << " secsSinceUpdate=" << secsSinceUpdate);

// Update the last time the registry contacted this worker.
// TODO:DM-53240 - This needs to be added to the dashboard.
Expand Down
70 changes: 58 additions & 12 deletions src/czar/Czar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ Czar::Czar(string const& configFilePath, string const& czarName)
_clientToQuery(),
_monitorSleepTime(_czarConfig->getMonitorSleepTimeMilliSec()),
_activeWorkerMap(new ActiveWorkerMap(_czarConfig)),
_fqdn(util::get_current_host_fqdn()) {
_fqdn(util::get_current_host_fqdn_wait()) {
// set id counter to milliseconds since the epoch, mod 1 year.
struct timeval tv;
gettimeofday(&tv, nullptr);
Expand Down Expand Up @@ -698,8 +698,8 @@ void Czar::killIncompleteUbjerJobsOn(std::string const& restartedWorkerId) {
}
}

nlohmann::json Czar::handleUberJobReadyMsg(std::shared_ptr<protojson::UberJobReadyMsg> const& jrMsg,
string const& note, bool const retry) {
protojson::ExecutiveRespMsg::Ptr Czar::handleUberJobReadyMsg(
std::shared_ptr<protojson::UberJobReadyMsg> const& jrMsg, string const& note) {
auto queryId = jrMsg->queryId;
auto czarId = jrMsg->czarId;
auto uberJobId = jrMsg->uberJobId;
Expand All @@ -708,6 +708,7 @@ nlohmann::json Czar::handleUberJobReadyMsg(std::shared_ptr<protojson::UberJobRea
if (exec == nullptr) {
LOGS(_log, LOG_LVL_WARN,
note << " null exec QID:" << queryId << " ujId=" << uberJobId << " cz=" << czarId);
// This means the user query is done and the results on the worker won't be needed
throw invalid_argument(string("HttpCzarWorkerModule::_handleJobReady No executive for qid=") +
to_string(queryId) + " czar=" + to_string(czarId));
}
Expand All @@ -723,30 +724,75 @@ nlohmann::json Czar::handleUberJobReadyMsg(std::shared_ptr<protojson::UberJobRea
uj->setResultFileSize(jrMsg->fileUrlInfo.fileSize);
exec->checkResultFileSize(jrMsg->fileUrlInfo.fileSize);

auto importRes = uj->importResultFile(jrMsg->fileUrlInfo, retry);
auto importRes = uj->importResultFile(jrMsg->fileUrlInfo);
return importRes;
}

nlohmann::json Czar::handleUberJobErrorMsg(std::shared_ptr<protojson::UberJobErrorMsg> const& jrMsg,
string const& note) {
protojson::ExecutiveRespMsg::Ptr Czar::handleUberJobReadyMsgNoThrow(
std::shared_ptr<protojson::UberJobReadyMsg> const& jrMsg, string const& note) {
protojson::ExecutiveRespMsg::Ptr execRespMsg;
try {
execRespMsg = handleUberJobReadyMsg(jrMsg, note);
} catch (invalid_argument const& ex) {
LOGS(_log, LOG_LVL_WARN, note << " exception: " << ex.what());
// The message was parsed, but this UberJob is no longer needed by the czar.
execRespMsg = protojson::ExecutiveRespMsg::create(false, true, jrMsg->queryId, jrMsg->uberJobId,
jrMsg->czarId, "uberJobEnded", ex.what());
}
return execRespMsg;
}

protojson::ExecutiveRespMsg::Ptr Czar::handleUberJobErrorMsg(
std::shared_ptr<protojson::UberJobErrorMsg> const& jrMsg, string const& note) {
auto queryId = jrMsg->queryId;
auto czarId = jrMsg->czarId;
auto uberJobId = jrMsg->uberJobId;
string const idMsg =
"qId=" + to_string(queryId) + " ujId=" + to_string(uberJobId) + " czId=" + to_string(czarId);
auto execRespMsg = protojson::ExecutiveRespMsg::create(false, false, queryId, uberJobId, czarId);

// Find UberJob
qdisp::Executive::Ptr exec = czar::Czar::getCzar()->getExecutiveFromMap(queryId);
if (exec == nullptr) {
throw invalid_argument(note + " No executive for qid=" + to_string(queryId) +
" czar=" + to_string(czarId));
// exec==nullptr just means this czar no longer has any use for any data associated with this QID.
LOGS(_log, LOG_LVL_WARN, note << " No executive for " << idMsg);
execRespMsg->success = true;
execRespMsg->dataObsolete = true;
execRespMsg->errorType = "queryEnded";
execRespMsg->note = "null Executive";
return execRespMsg;
}
qdisp::UberJob::Ptr uj = exec->findUberJob(uberJobId);
if (uj == nullptr) {
throw invalid_argument(note + " No UberJob for qid=" + to_string(queryId) +
" ujId=" + to_string(uberJobId) + " czar=" + to_string(czarId));
LOGS(_log, LOG_LVL_WARN, note << " No UberJob for " << idMsg);
execRespMsg->success = true;
execRespMsg->dataObsolete = true;
execRespMsg->errorType = "uberJobEnded";
execRespMsg->note = "null UberJob";
return execRespMsg;
}

auto importRes = uj->workerError(jrMsg->multiError);
return importRes;
uj->workerError(jrMsg->multiError, *execRespMsg);
return execRespMsg;
}

void Czar::incrCommErrCount(std::string const& type, std::string const& worker, std::string const& note) {
LOGS(_log, LOG_LVL_WARN, "Czar::incrCommErrCount " << type << " worker=" << worker << " " << note);
stringstream os;
lock_guard lg(_commErrCountMtx);
auto key = std::make_pair(type, worker);
auto iter = _commErrCountMap.find(key);
if (iter == _commErrCountMap.end()) {
_commErrCountMap[key] = 1;
} else {
iter->second += 1;
}
os << "Czar::incrCommErrCount {";
for (auto const& [key, val] : _commErrCountMap) {
LOGS(_log, LOG_LVL_WARN, "(" << key.first << " worker=" << key.second << " count=" << val << ")");
}
os << "}";
LOGS(_log, LOG_LVL_WARN, os.str());
}

} // namespace lsst::qserv::czar
28 changes: 20 additions & 8 deletions src/czar/Czar.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "global/intTypes.h"
#include "global/stringTypes.h"
#include "mysql/MySqlConfig.h"
#include "protojson/ResponseMsg.h"
#include "util/ConfigStore.h"
#include "util/Timer.h"

Expand Down Expand Up @@ -170,15 +171,21 @@ class Czar {

/// Starts the process of collecting a result file from the worker.
/// @throws std::invalid_argument
/// @param retry - true indicates this is a retry of a failed communication and
/// should not kill the associated UberJob due to an unexpected state.
nlohmann::json handleUberJobReadyMsg(std::shared_ptr<protojson::UberJobReadyMsg> const& jrMsg,
std::string const& note, bool const retry = false);
protojson::ExecutiveRespMsg::Ptr handleUberJobReadyMsg(
std::shared_ptr<protojson::UberJobReadyMsg> const& jrMsg, std::string const& note);

/// Handle an UberJob processing error from the worker.
/// @throws std::invalid_argument
nlohmann::json handleUberJobErrorMsg(std::shared_ptr<protojson::UberJobErrorMsg> const& jrMsg,
std::string const& note);
/// Same as handleUberJobReadyMsg but returns an altered message instead of throwing.
protojson::ExecutiveRespMsg::Ptr handleUberJobReadyMsgNoThrow(
std::shared_ptr<protojson::UberJobReadyMsg> const& jrMsg, std::string const& note);

/// Handle an UberJob processing error from the worker, does not throw exceptions.
/// It alters the returned response message instead of throwing an exception.
protojson::ExecutiveRespMsg::Ptr handleUberJobErrorMsg(
std::shared_ptr<protojson::UberJobErrorMsg> const& jrMsg, std::string const& note);

/// Increment a communication error count. Just logging them now as it probably is not an
/// issue, but may be they have been happening and it would be useful to know.
void incrCommErrCount(std::string const& type, std::string const& worker, std::string const& note);

/// Startup time of czar, sent to workers so they can detect that the czar was
/// was restarted when this value changes.
Expand Down Expand Up @@ -276,6 +283,11 @@ class Czar {

/// FQDN for this czar.
std::string const _fqdn;

/// Map of communication error counts by type and worker, protected by _commErrCountMtx.
/// Key - <error type, worker id> - value - count of errors of that type for that worker.
std::map<std::pair<std::string, std::string>, int> _commErrCountMap;
mutable std::mutex _commErrCountMtx; ///< protects _commErrCountMap
};

} // namespace lsst::qserv::czar
Expand Down
48 changes: 28 additions & 20 deletions src/czar/HttpCzarWorkerModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ json HttpCzarWorkerModule::_handleJobError(string const& func) {
auto const& jsReq = body().objJson;
auto jrMsg = protojson::UberJobErrorMsg::createFromJson(jsReq);
auto importRes = czar::Czar::getCzar()->handleUberJobErrorMsg(jrMsg, fName);
return importRes;
return importRes->toJson();
} catch (std::invalid_argument const& iaEx) {
LOGS(_log, LOG_LVL_ERROR,
"HttpCzarWorkerModule::_handleJobError received "
<< iaEx.what() << " js=" << protojson::pwHide(body().objJson));
protojson::ResponseMsg respMsg(false, "parse", iaEx.what());
protojson::ExecutiveRespMsg respMsg(false, false, 0, 0, 0, "parse", iaEx.what());
return respMsg.toJson();
}
}
Expand All @@ -129,12 +129,12 @@ json HttpCzarWorkerModule::_handleJobReady(string const& func) {
auto const& jsReq = body().objJson;
auto jrMsg = protojson::UberJobReadyMsg::createFromJson(jsReq);
auto importRes = czar::Czar::getCzar()->handleUberJobReadyMsg(jrMsg, fName);
return importRes;
return importRes->toJson();
} catch (std::invalid_argument const& iaEx) {
LOGS(_log, LOG_LVL_ERROR,
"HttpCzarWorkerModule::_handleJobReady received "
<< iaEx.what() << " js=" << protojson::pwHide(body().objJson));
protojson::ResponseMsg respMsg(false, "parse", iaEx.what());
protojson::ExecutiveRespMsg respMsg(false, false, 0, 0, 0, "parse", iaEx.what());
return respMsg.toJson();
}
}
Expand All @@ -143,14 +143,15 @@ json HttpCzarWorkerModule::_handleWorkerCzarComIssue(string const& func) {
string const fName("HttpCzarWorkerModule::_handleWorkerCzarComIssue");
LOGS(_log, LOG_LVL_DEBUG, fName << " start");
// Parse and verify the json message and then deal with the problems.
string wId = "unknown";
try {
protojson::AuthContext const authC(cconfig::CzarConfig::instance()->replicationInstanceId(),
cconfig::CzarConfig::instance()->replicationAuthKey());
auto const& jsReq = body().objJson;
auto wccIssue = protojson::WorkerCzarComIssue::createFromJson(jsReq, authC);

auto wId = wccIssue->getWorkerInfo()->wId;
if (wccIssue->getThoughtCzarWasDead()) {
wId = wccIssue->getWorkerInfo()->wId;
if (wccIssue->getThoughtCzarWasDeadTime() > 0) {
LOGS(_log, LOG_LVL_WARN,
"HttpCzarWorkerModule::_handleWorkerCzarComIssue worker="
<< wId << " thought czar was dead and killed related uberjobs.");
Expand All @@ -164,35 +165,42 @@ json HttpCzarWorkerModule::_handleWorkerCzarComIssue(string const& func) {
execPtr->killIncompleteUberJobsOnWorker(wId);
}
}
// The response here includes the QueryId and UberJobId of all
// uberjobs in the original message. If the czar cannot handle
// one now, it won't be able to handle it later, so there's no
// point in the worker sending it again.
// Under normal circumstances, the czar should be able to
// find and handle all failed transmits. Anything it can't find should
// show up in completed query IDs, or failed uberJobs, and failing that
// it should be garbage collected.
auto jsRet = wccIssue->responseToJson();

// Responses are sent for all `failedTransmits` in the message. If
// something couldn't be parsed, the response indicates that and
// the UberJob will be abandoned by the worker. If the query
// could finish without the results of that uberjob, it indicates
// that the result file is obsolete. If the this was successful,
// the worker just waits for the czar to collect the file as usual.
// In all cases, the worker will remove the item from its
// `failedTransmits` list so it won't be tried again.
vector<protojson::ExecutiveRespMsg::Ptr> execRespMsgs;
auto failedTransmits = wccIssue->takeFailedTransmitsMap();
for (auto& [key, elem] : *failedTransmits) {
protojson::UberJobStatusMsg::Ptr& statusMsg = elem;
auto rdyMsg = dynamic_pointer_cast<protojson::UberJobReadyMsg>(statusMsg);
if (rdyMsg != nullptr) {
bool const retry = true;
// Put the file on a queue to be collected later.
czar::Czar::getCzar()->handleUberJobReadyMsg(rdyMsg, fName, retry);
auto exRespMsg = czar::Czar::getCzar()->handleUberJobReadyMsgNoThrow(rdyMsg, fName);
execRespMsgs.push_back(exRespMsg);
} else {
auto errMsg = dynamic_pointer_cast<protojson::UberJobErrorMsg>(statusMsg);
// Kill the UberJob or user query depending on the error.
czar::Czar::getCzar()->handleUberJobErrorMsg(errMsg, fName);
// Kill the UberJob or user query depending on the error. (Doesn't throw)
auto exRespMsg = czar::Czar::getCzar()->handleUberJobErrorMsg(errMsg, fName);
execRespMsgs.push_back(exRespMsg);
}
}
LOGS(_log, LOG_LVL_TRACE, "HttpCzarWorkerModule::_handleWorkerCzarComIssue jsRet=" << jsRet.dump());
auto jsRet = wccIssue->responseToJson(wccIssue->getThoughtCzarWasDeadTime(), execRespMsgs);
LOGS(_log, LOG_LVL_TRACE,
"HttpCzarWorkerModule::_handleWorkerCzarComIssue jsRet=" << protojson::pwHide(jsRet));
return jsRet;
} catch (std::invalid_argument const& iaEx) {
LOGS(_log, LOG_LVL_ERROR,
"HttpCzarWorkerModule::_handleWorkerCzarComIssue received "
<< iaEx.what() << " js=" << protojson::pwHide(body().objJson));
// This is very bad as there's no way to know what is going wrong. Just one of these is surviveable,
// but if it keeps happening, the system is unstable.
Czar::getCzar()->incrCommErrCount("WorkerCzarComIssue", wId, iaEx.what());
protojson::ResponseMsg respMsg(false, "parse", iaEx.what());
return respMsg.toJson();
}
Expand Down
4 changes: 2 additions & 2 deletions src/mysql/CsvMemDisk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class CsvMemDiskBuffer : public CsvBuffer {

unsigned fetch(char* buffer, unsigned bufLen) override {
if (bufLen == 0) {
throw LocalInfileError("CsvStreamBuffer::fetch Can't fetch non-positive bytes");
throw LocalInfileError("CsvMemDiskBuffer::fetch Can't fetch non-positive bytes");
}
auto csvMd = _csvMemDisk.lock();
if (csvMd == nullptr) return 0;
Expand All @@ -234,7 +234,7 @@ class CsvMemDiskBuffer : public CsvBuffer {
return bytesToCopy;
}

string dump() const override { return "CsvStreamBuffer"; }
string dump() const override { return "CsvMemDiskBuffer"; }

private:
weak_ptr<CsvMemDisk> _csvMemDisk;
Expand Down
1 change: 1 addition & 0 deletions src/protojson/PwHideJson.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.protojson.PwHideJson");

namespace lsst::qserv::protojson {

// TODO Really need to make this recursive.
nlohmann::json PwHideJson::hide(nlohmann::json const& in) const {
try {
nlohmann::json js(in);
Expand Down
Loading
Loading