From 3a77cbb46e963e243175b47668916f98bb91671d Mon Sep 17 00:00:00 2001 From: John Gates Date: Sun, 3 Mar 2024 11:37:07 -0800 Subject: [PATCH 1/4] Test approach to reduce mutex contention in FileChannelShared. --- src/wbase/FileChannelShared.cc | 62 ++++++++++++++++++++++------------ src/wbase/FileChannelShared.h | 20 ++++++----- 2 files changed, 51 insertions(+), 31 deletions(-) diff --git a/src/wbase/FileChannelShared.cc b/src/wbase/FileChannelShared.cc index 42a8814822..14eaa6faca 100644 --- a/src/wbase/FileChannelShared.cc +++ b/src/wbase/FileChannelShared.cc @@ -267,11 +267,7 @@ shared_ptr FileChannelShared::create(shared_ptr const& sendChannel, qmeta::CzarId czarId, string const& workerId) - : _sendChannel(sendChannel), - _czarId(czarId), - _workerId(workerId), - _protobufArena(make_unique()), - _scsId(scsSeqId++) { + : _sendChannel(sendChannel), _czarId(czarId), _workerId(workerId), _scsId(scsSeqId++) { LOGS(_log, LOG_LVL_DEBUG, "FileChannelShared created"); if (_sendChannel == nullptr) { throw util::Bug(ERR_LOC, "FileChannelShared constructor given nullptr"); @@ -321,8 +317,10 @@ string FileChannelShared::makeIdStr(int qId, int jId) { bool FileChannelShared::buildAndTransmitError(util::MultiError& multiErr, shared_ptr const& task, bool cancelled) { + // &&& Arena may not really be needed. + std::unique_ptr protobufArena = make_unique(); lock_guard const tMtxLock(_tMtx); - if (!_sendResponse(tMtxLock, task, cancelled, multiErr)) { + if (!_sendResponse(tMtxLock, protobufArena, task, cancelled, multiErr)) { LOGS(_log, LOG_LVL_ERROR, "Could not transmit the error message to Czar."); return false; } @@ -348,6 +346,10 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr protobufArena = make_unique(); + proto::ResponseData* responseData = 0; + while (hasMoreRows && !cancelled) { // This lock is to protect the stream from having other Tasks mess with it // while data is loading. @@ -360,7 +362,9 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr const tMtxLock(_tMtx); + // Make sure the file is sync to disk before notifying Czar. _file.flush(); _file.close(); // Only the last ("summary") message, w/o any rows, is sent to the Czar to notify // it about the completion of the request. - if (!_sendResponse(tMtxLockA, task, cancelled, multiErr)) { + //&&&if (!_sendResponse(tMtxLockA, task, cancelled, multiErr)) { + if (!_sendResponse(tMtxLock, protobufArena, task, cancelled, multiErr)) { LOGS(_log, LOG_LVL_ERROR, "Could not transmit the request completion message to Czar."); erred = true; break; @@ -421,6 +428,7 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr const tMtxLockA(_tMtx); _removeFile(tMtxLockA); } @@ -432,28 +440,36 @@ bool FileChannelShared::_kill(lock_guard const& streamMutexLock, string c return _sendChannel->kill(note); } -bool FileChannelShared::_writeToFile(lock_guard const& tMtxLock, shared_ptr const& task, - MYSQL_RES* mResult, int& bytes, int& rows, util::MultiError& multiErr) { +bool FileChannelShared::_writeToFile(proto::ResponseData* responseData, + unique_ptr const& protobufArena, + shared_ptr const& task, MYSQL_RES* mResult, int& bytes, int& rows, + util::MultiError& multiErr) { // Transfer rows from a result set into the response data object. - if (nullptr == _responseData) { - _responseData = google::protobuf::Arena::CreateMessage(_protobufArena.get()); + if (nullptr == responseData) { + responseData = google::protobuf::Arena::CreateMessage(protobufArena.get()); } else { - _responseData->clear_row(); + responseData->clear_row(); } size_t tSize = 0; + /* &&& LOGS(_log, LOG_LVL_TRACE, __func__ << " _fillRows " << task->getIdStr() << " start"); bool const hasMoreRows = _fillRows(tMtxLock, mResult, rows, tSize); LOGS(_log, LOG_LVL_TRACE, __func__ << " _fillRows " << task->getIdStr() << " end"); _responseData->set_rowcount(rows); _responseData->set_transmitsize(tSize); + */ + bool const hasMoreRows = _fillRows(responseData, mResult, rows, tSize); + responseData->set_rowcount(rows); + responseData->set_transmitsize(tSize); // Serialize the content of the data buffer into the Protobuf data message // that will be written into the output file. std::string msg; - _responseData->SerializeToString(&msg); + responseData->SerializeToString(&msg); bytes = msg.size(); - LOGS(_log, LOG_LVL_TRACE, __func__ << " file write " << task->getIdStr() << " start"); + //&&&LOGS(_log, LOG_LVL_TRACE, __func__ << " file write " << task->getIdStr() << " start"); + lock_guard const tMtxLock(_tMtx); // Create the file if not open. if (!_file.is_open()) { _fileName = task->resultFilePath(); @@ -478,7 +494,7 @@ bool FileChannelShared::_writeToFile(lock_guard const& tMtxLock, shared_p return hasMoreRows; } -bool FileChannelShared::_fillRows(lock_guard const& tMtxLock, MYSQL_RES* mResult, int& rows, +bool FileChannelShared::_fillRows(proto::ResponseData* responseData, MYSQL_RES* mResult, int& rows, size_t& tSize) { int const numFields = mysql_num_fields(mResult); unsigned int szLimit = min(proto::ProtoHeaderWrap::PROTOBUFFER_DESIRED_LIMIT, @@ -488,7 +504,7 @@ bool FileChannelShared::_fillRows(lock_guard const& tMtxLock, MYSQL_RES* MYSQL_ROW row; while ((row = mysql_fetch_row(mResult))) { auto lengths = mysql_fetch_lengths(mResult); - proto::RowBundle* rawRow = _responseData->add_row(); + proto::RowBundle* rawRow = responseData->add_row(); for (int i = 0; i < numFields; ++i) { if (row[i]) { rawRow->add_column(row[i], lengths[i]); @@ -521,8 +537,10 @@ void FileChannelShared::_removeFile(lock_guard const& tMtxLock) { } } -bool FileChannelShared::_sendResponse(lock_guard const& tMtxLock, shared_ptr const& task, - bool cancelled, util::MultiError const& multiErr) { +bool FileChannelShared::_sendResponse(lock_guard const& tMtxLock, + std::unique_ptr const& protobufArena, + shared_ptr const& task, bool cancelled, + util::MultiError const& multiErr) { auto const queryId = task->getQueryId(); auto const jobId = task->getJobId(); auto const idStr(makeIdStr(queryId, jobId)); @@ -534,10 +552,10 @@ bool FileChannelShared::_sendResponse(lock_guard const& tMtxLock, shared_ // This will deallocate any memory managed by the Google Protobuf Arena // to avoid unnecessary memory utilization by the application. LOGS(_log, LOG_LVL_DEBUG, - __func__ << ": Google Protobuf Arena, 1:SpaceUsed=" << _protobufArena->SpaceUsed()); - _protobufArena->Reset(); + __func__ << ": Google Protobuf Arena, 1:SpaceUsed=" << protobufArena->SpaceUsed()); + protobufArena->Reset(); LOGS(_log, LOG_LVL_DEBUG, - __func__ << ": Google Protobuf Arena, 2:SpaceUsed=" << _protobufArena->SpaceUsed()); + __func__ << ": Google Protobuf Arena, 2:SpaceUsed=" << protobufArena->SpaceUsed()); QSERV_LOGCONTEXT_QUERY_JOB(queryId, jobId); LOGS(_log, LOG_LVL_DEBUG, __func__); diff --git a/src/wbase/FileChannelShared.h b/src/wbase/FileChannelShared.h index 0febe6f460..13254e4f18 100644 --- a/src/wbase/FileChannelShared.h +++ b/src/wbase/FileChannelShared.h @@ -192,8 +192,11 @@ class FileChannelShared { * @throws std::runtime_error for problems encountered when attemting to create the file * or write into the file. */ - bool _writeToFile(std::lock_guard const& tMtxLock, std::shared_ptr const& task, - MYSQL_RES* mResult, int& bytes, int& rows, util::MultiError& multiErr); + // &&& fix doc tMtxLock responseData protobufArena + bool _writeToFile(proto::ResponseData* responseData, + std::unique_ptr const& protobufArena, + std::shared_ptr const& task, MYSQL_RES* mResult, int& bytes, int& rows, + util::MultiError& multiErr); /** * Extract as many rows as allowed by the Google Protobuf implementation from @@ -204,7 +207,9 @@ class FileChannelShared { * @param tSize - the approximate amount of data extracted from the result set * @return 'true' if there are more rows left in the result set. */ - bool _fillRows(std::lock_guard const& tMtxLock, MYSQL_RES* mResult, int& rows, size_t& tSize); + //&&& fix doc + static bool _fillRows(proto::ResponseData* responseData, MYSQL_RES* mResult, int& rows, size_t& tSize); + /** * Unconditionaly close and remove (potentially - the partially written) file. * This method gets called in case of any failure detected while processing @@ -225,8 +230,9 @@ class FileChannelShared { * @param multiErr - a collector of any errors that were captured during result set processing * @return 'true' if the operation was successfull */ - bool _sendResponse(std::lock_guard const& tMtxLock, std::shared_ptr const& task, - bool cancelled, util::MultiError const& multiErr); + bool _sendResponse(std::lock_guard const& tMtxLock, + std::unique_ptr const& protobufArena, + std::shared_ptr const& task, bool cancelled, util::MultiError const& multiErr); mutable std::mutex _tMtx; ///< Protects data recording and Czar notification @@ -234,10 +240,6 @@ class FileChannelShared { qmeta::CzarId const _czarId; ///< id of the czar that requested this task(s). std::string const _workerId; ///< The unique identifier of the worker. - // Allocatons/deletion of the data messages are managed by Google Protobuf Arena. - std::unique_ptr _protobufArena; - proto::ResponseData* _responseData = 0; - uint64_t const _scsId; ///< id number for this FileChannelShared /// streamMutex is used to protect _lastCount and messages that are sent From 16323fbdab1ee9275213b9655a23e29680fe6ace Mon Sep 17 00:00:00 2001 From: John Gates Date: Thu, 4 Apr 2024 14:57:33 -0700 Subject: [PATCH 2/4] Rebased on main and added file removal flag. --- src/wbase/FileChannelShared.cc | 32 +++++++++++++++----------------- src/wbase/FileChannelShared.h | 4 ++++ src/wdb/QueryRunner.cc | 6 ++++++ 3 files changed, 25 insertions(+), 17 deletions(-) diff --git a/src/wbase/FileChannelShared.cc b/src/wbase/FileChannelShared.cc index 14eaa6faca..954301cf81 100644 --- a/src/wbase/FileChannelShared.cc +++ b/src/wbase/FileChannelShared.cc @@ -279,7 +279,7 @@ FileChannelShared::~FileChannelShared() { // dead it means there was a problem to process a query or send back a response // to Czar. In either case, the file would be useless and it has to be deleted // in order to avoid leaving unclaimed result files within the results folder. - if (isDead()) { + if (_issueRequiresFileRemoval || isDead()) { _removeFile(lock_guard(_tMtx)); } if (_sendChannel != nullptr) { @@ -329,6 +329,7 @@ bool FileChannelShared::buildAndTransmitError(util::MultiError& multiErr, shared bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr const& task, util::MultiError& multiErr, atomic& cancelled) { + LOGS(_log, LOG_LVL_WARN, "&&& FileChannelShared::buildAndTransmitResult start"); // Operation stats. Note that "buffer fill time" included the amount // of time needed to write the result set to disk. util::Timer transmitT; @@ -348,13 +349,9 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr protobufArena = make_unique(); - proto::ResponseData* responseData = 0; + proto::ResponseData* responseData = nullptr; while (hasMoreRows && !cancelled) { - // This lock is to protect the stream from having other Tasks mess with it - // while data is loading. - lock_guard const tMtxLockA(_tMtx); - util::Timer bufferFillT; bufferFillT.start(); @@ -362,8 +359,6 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr const tMtxLock(_tMtx); + LOGS(_log, LOG_LVL_WARN, "&&& FileChannelShared::buildAndTransmitResult e1"); // Make sure the file is sync to disk before notifying Czar. _file.flush(); @@ -402,7 +399,6 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr const tMtxLockA(_tMtx); _removeFile(tMtxLockA); + */ + // Set a flag to delete the file in the destructor. That should prevent any + // possible race conditions with other threads expecting the file to exist. + _issueRequiresFileRemoval = true; } return erred; } @@ -445,19 +446,13 @@ bool FileChannelShared::_writeToFile(proto::ResponseData* responseData, shared_ptr const& task, MYSQL_RES* mResult, int& bytes, int& rows, util::MultiError& multiErr) { // Transfer rows from a result set into the response data object. + LOGS(_log, LOG_LVL_WARN, "&&& _writeToFile start"); if (nullptr == responseData) { responseData = google::protobuf::Arena::CreateMessage(protobufArena.get()); } else { responseData->clear_row(); } size_t tSize = 0; - /* &&& - LOGS(_log, LOG_LVL_TRACE, __func__ << " _fillRows " << task->getIdStr() << " start"); - bool const hasMoreRows = _fillRows(tMtxLock, mResult, rows, tSize); - LOGS(_log, LOG_LVL_TRACE, __func__ << " _fillRows " << task->getIdStr() << " end"); - _responseData->set_rowcount(rows); - _responseData->set_transmitsize(tSize); - */ bool const hasMoreRows = _fillRows(responseData, mResult, rows, tSize); responseData->set_rowcount(rows); responseData->set_transmitsize(tSize); @@ -468,8 +463,10 @@ bool FileChannelShared::_writeToFile(proto::ResponseData* responseData, responseData->SerializeToString(&msg); bytes = msg.size(); - //&&&LOGS(_log, LOG_LVL_TRACE, __func__ << " file write " << task->getIdStr() << " start"); + LOGS(_log, LOG_LVL_TRACE, __func__ << " file write " << task->getIdStr() << " start"); + LOGS(_log, LOG_LVL_WARN, "&&& _writeToFile d"); lock_guard const tMtxLock(_tMtx); + LOGS(_log, LOG_LVL_WARN, "&&& _writeToFile d1"); // Create the file if not open. if (!_file.is_open()) { _fileName = task->resultFilePath(); @@ -491,6 +488,7 @@ bool FileChannelShared::_writeToFile(proto::ResponseData* responseData, throw runtime_error("FileChannelShared::" + string(__func__) + " failed to write " + to_string(msg.size()) + " bytes into the file '" + _fileName + "'."); } + LOGS(_log, LOG_LVL_WARN, "&&& _writeToFile end"); return hasMoreRows; } diff --git a/src/wbase/FileChannelShared.h b/src/wbase/FileChannelShared.h index 13254e4f18..cd21686186 100644 --- a/src/wbase/FileChannelShared.h +++ b/src/wbase/FileChannelShared.h @@ -274,6 +274,10 @@ class FileChannelShared { uint32_t _rowcount = 0; ///< The total numnber of rows in all result sets of a query. uint64_t _transmitsize = 0; ///< The total amount of data (bytes) in all result sets of a query. + + /// This should be set to true if there were any issues that invalidate the file, such as errors + /// or cancellation. + std::atomic _issueRequiresFileRemoval{false}; }; } // namespace lsst::qserv::wbase diff --git a/src/wdb/QueryRunner.cc b/src/wdb/QueryRunner.cc index a4a7557ab5..6e14855648 100644 --- a/src/wdb/QueryRunner.cc +++ b/src/wdb/QueryRunner.cc @@ -147,6 +147,7 @@ size_t QueryRunner::_getDesiredLimit() { util::TimerHistogram memWaitHisto("memWait Hist", {1, 5, 10, 20, 40}); bool QueryRunner::runQuery() { + LOGS(_log, LOG_LVL_WARN, "&&& runQuery start"); util::InstanceCount ic(to_string(_task->getQueryId()) + "_rq_LDB"); // LockupDB util::HoldTrack::Mark runQueryMarkA(ERR_LOC, "runQuery " + to_string(_task->getQueryId())); QSERV_LOGCONTEXT_QUERY_JOB(_task->getQueryId(), _task->getJobId()); @@ -254,6 +255,7 @@ class ChunkResourceRequest { }; bool QueryRunner::_dispatchChannel() { + LOGS(_log, LOG_LVL_WARN, "&&& dispatch start"); bool erred = false; bool needToFreeRes = false; // set to true once there are results to be freed. // Collect the result in _transmitData. When a reasonable amount of data has been collected, @@ -298,7 +300,9 @@ bool QueryRunner::_dispatchChannel() { if (sendChan == nullptr) { throw util::Bug(ERR_LOC, "QueryRunner::_dispatchChannel() sendChan==null"); } + LOGS(_log, LOG_LVL_WARN, "&&& dispatch calling buildAndTransmitResult a"); erred = sendChan->buildAndTransmitResult(res, _task, _multiError, _cancelled); + LOGS(_log, LOG_LVL_WARN, "&&& dispatch calling buildAndTransmitResult a1"); } } } catch (sql::SqlErrorObject const& e) { @@ -321,10 +325,12 @@ bool QueryRunner::_dispatchChannel() { erred = true; // Send results. This needs to happen after the error check. // If any errors were found, send an error back. + LOGS(_log, LOG_LVL_WARN, "&&& dispatch calling buildAndTransmiError b"); if (!_task->getSendChannel()->buildAndTransmitError(_multiError, _task, _cancelled)) { LOGS(_log, LOG_LVL_WARN, " Could not report error to czar as sendChannel not accepting msgs." << _task->getIdStr()); } + LOGS(_log, LOG_LVL_WARN, "&&& dispatch calling buildAndTransmiError b1"); } return !erred; } From a45a91af495b1248a95b4d363bb48ffe6d449587 Mon Sep 17 00:00:00 2001 From: John Gates Date: Fri, 5 Apr 2024 09:46:58 -0700 Subject: [PATCH 3/4] Improved comments. --- src/wbase/FileChannelShared.cc | 14 -------------- src/wbase/FileChannelShared.h | 8 ++++---- src/wdb/QueryRunner.cc | 6 ------ 3 files changed, 4 insertions(+), 24 deletions(-) diff --git a/src/wbase/FileChannelShared.cc b/src/wbase/FileChannelShared.cc index 954301cf81..5e943e7d25 100644 --- a/src/wbase/FileChannelShared.cc +++ b/src/wbase/FileChannelShared.cc @@ -317,7 +317,6 @@ string FileChannelShared::makeIdStr(int qId, int jId) { bool FileChannelShared::buildAndTransmitError(util::MultiError& multiErr, shared_ptr const& task, bool cancelled) { - // &&& Arena may not really be needed. std::unique_ptr protobufArena = make_unique(); lock_guard const tMtxLock(_tMtx); if (!_sendResponse(tMtxLock, protobufArena, task, cancelled, multiErr)) { @@ -329,7 +328,6 @@ bool FileChannelShared::buildAndTransmitError(util::MultiError& multiErr, shared bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr const& task, util::MultiError& multiErr, atomic& cancelled) { - LOGS(_log, LOG_LVL_WARN, "&&& FileChannelShared::buildAndTransmitResult start"); // Operation stats. Note that "buffer fill time" included the amount // of time needed to write the result set to disk. util::Timer transmitT; @@ -347,7 +345,6 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr protobufArena = make_unique(); proto::ResponseData* responseData = nullptr; @@ -389,9 +386,7 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr const tMtxLock(_tMtx); - LOGS(_log, LOG_LVL_WARN, "&&& FileChannelShared::buildAndTransmitResult e1"); // Make sure the file is sync to disk before notifying Czar. _file.flush(); @@ -424,11 +419,6 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr const tMtxLockA(_tMtx); - _removeFile(tMtxLockA); - */ // Set a flag to delete the file in the destructor. That should prevent any // possible race conditions with other threads expecting the file to exist. _issueRequiresFileRemoval = true; @@ -446,7 +436,6 @@ bool FileChannelShared::_writeToFile(proto::ResponseData* responseData, shared_ptr const& task, MYSQL_RES* mResult, int& bytes, int& rows, util::MultiError& multiErr) { // Transfer rows from a result set into the response data object. - LOGS(_log, LOG_LVL_WARN, "&&& _writeToFile start"); if (nullptr == responseData) { responseData = google::protobuf::Arena::CreateMessage(protobufArena.get()); } else { @@ -464,9 +453,7 @@ bool FileChannelShared::_writeToFile(proto::ResponseData* responseData, bytes = msg.size(); LOGS(_log, LOG_LVL_TRACE, __func__ << " file write " << task->getIdStr() << " start"); - LOGS(_log, LOG_LVL_WARN, "&&& _writeToFile d"); lock_guard const tMtxLock(_tMtx); - LOGS(_log, LOG_LVL_WARN, "&&& _writeToFile d1"); // Create the file if not open. if (!_file.is_open()) { _fileName = task->resultFilePath(); @@ -488,7 +475,6 @@ bool FileChannelShared::_writeToFile(proto::ResponseData* responseData, throw runtime_error("FileChannelShared::" + string(__func__) + " failed to write " + to_string(msg.size()) + " bytes into the file '" + _fileName + "'."); } - LOGS(_log, LOG_LVL_WARN, "&&& _writeToFile end"); return hasMoreRows; } diff --git a/src/wbase/FileChannelShared.h b/src/wbase/FileChannelShared.h index cd21686186..9708fc0f00 100644 --- a/src/wbase/FileChannelShared.h +++ b/src/wbase/FileChannelShared.h @@ -182,7 +182,8 @@ class FileChannelShared { * implementation. Also, the iterative approach to the data extraction allows * the driving code to be interrupted should the correponding query be cancelled * during the lengthy data processing phase. - * @param tMtxLock - a lock on the mutex tMtx + * @param responseData - proto buffer to hold the response being constructed. + * @param protobufArena - proto buffer memory management control. * @param task - a task that produced the result set * @param mResult - MySQL result to be used as a source * @param bytes - the number of bytes in the result message recorded into the file @@ -192,7 +193,6 @@ class FileChannelShared { * @throws std::runtime_error for problems encountered when attemting to create the file * or write into the file. */ - // &&& fix doc tMtxLock responseData protobufArena bool _writeToFile(proto::ResponseData* responseData, std::unique_ptr const& protobufArena, std::shared_ptr const& task, MYSQL_RES* mResult, int& bytes, int& rows, @@ -201,13 +201,13 @@ class FileChannelShared { /** * Extract as many rows as allowed by the Google Protobuf implementation from * from the input result set into the output result object. - * @param tMtxLock - a lock on the mutex tMtx + * @param responseData - proto buffer to hold the response being constructed. + * @param protobufArena - proto buffer memory management control. * @param mResult - MySQL result to be used as a source * @param rows - the number of rows extracted from the result set * @param tSize - the approximate amount of data extracted from the result set * @return 'true' if there are more rows left in the result set. */ - //&&& fix doc static bool _fillRows(proto::ResponseData* responseData, MYSQL_RES* mResult, int& rows, size_t& tSize); /** diff --git a/src/wdb/QueryRunner.cc b/src/wdb/QueryRunner.cc index 6e14855648..a4a7557ab5 100644 --- a/src/wdb/QueryRunner.cc +++ b/src/wdb/QueryRunner.cc @@ -147,7 +147,6 @@ size_t QueryRunner::_getDesiredLimit() { util::TimerHistogram memWaitHisto("memWait Hist", {1, 5, 10, 20, 40}); bool QueryRunner::runQuery() { - LOGS(_log, LOG_LVL_WARN, "&&& runQuery start"); util::InstanceCount ic(to_string(_task->getQueryId()) + "_rq_LDB"); // LockupDB util::HoldTrack::Mark runQueryMarkA(ERR_LOC, "runQuery " + to_string(_task->getQueryId())); QSERV_LOGCONTEXT_QUERY_JOB(_task->getQueryId(), _task->getJobId()); @@ -255,7 +254,6 @@ class ChunkResourceRequest { }; bool QueryRunner::_dispatchChannel() { - LOGS(_log, LOG_LVL_WARN, "&&& dispatch start"); bool erred = false; bool needToFreeRes = false; // set to true once there are results to be freed. // Collect the result in _transmitData. When a reasonable amount of data has been collected, @@ -300,9 +298,7 @@ bool QueryRunner::_dispatchChannel() { if (sendChan == nullptr) { throw util::Bug(ERR_LOC, "QueryRunner::_dispatchChannel() sendChan==null"); } - LOGS(_log, LOG_LVL_WARN, "&&& dispatch calling buildAndTransmitResult a"); erred = sendChan->buildAndTransmitResult(res, _task, _multiError, _cancelled); - LOGS(_log, LOG_LVL_WARN, "&&& dispatch calling buildAndTransmitResult a1"); } } } catch (sql::SqlErrorObject const& e) { @@ -325,12 +321,10 @@ bool QueryRunner::_dispatchChannel() { erred = true; // Send results. This needs to happen after the error check. // If any errors were found, send an error back. - LOGS(_log, LOG_LVL_WARN, "&&& dispatch calling buildAndTransmiError b"); if (!_task->getSendChannel()->buildAndTransmitError(_multiError, _task, _cancelled)) { LOGS(_log, LOG_LVL_WARN, " Could not report error to czar as sendChannel not accepting msgs." << _task->getIdStr()); } - LOGS(_log, LOG_LVL_WARN, "&&& dispatch calling buildAndTransmiError b1"); } return !erred; } From 192ba5eee756e94d44fb68f2cb8644cb56eebfe9 Mon Sep 17 00:00:00 2001 From: John Gates Date: Fri, 5 Apr 2024 12:59:19 -0700 Subject: [PATCH 4/4] Improved comments. --- src/wbase/FileChannelShared.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/wbase/FileChannelShared.h b/src/wbase/FileChannelShared.h index 9708fc0f00..a26314e82f 100644 --- a/src/wbase/FileChannelShared.h +++ b/src/wbase/FileChannelShared.h @@ -180,7 +180,7 @@ class FileChannelShared { * @note The method may not extract all rows if the amount of data found * in the result set exceeded the maximum size allowed by the Google Protobuf * implementation. Also, the iterative approach to the data extraction allows - * the driving code to be interrupted should the correponding query be cancelled + * the driving code to be interrupted should the corresponding query be cancelled * during the lengthy data processing phase. * @param responseData - proto buffer to hold the response being constructed. * @param protobufArena - proto buffer memory management control.