diff --git a/sparta/CMakeLists.txt b/sparta/CMakeLists.txt index 85a47cb3b9..fd96fb5afc 100644 --- a/sparta/CMakeLists.txt +++ b/sparta/CMakeLists.txt @@ -46,6 +46,8 @@ list (APPEND SourceCppFiles src/CounterBase.cpp src/CsvFormatter.cpp src/DAG.cpp + src/DatabaseCheckpoint.cpp + src/DatabaseCheckpointer.cpp src/Destination.cpp src/EdgeFactory.cpp src/EventNode.cpp diff --git a/sparta/simdb b/sparta/simdb index 394f8bea37..d35c99035b 160000 --- a/sparta/simdb +++ b/sparta/simdb @@ -1 +1 @@ -Subproject commit 394f8bea37cf6684f938fc4ffe7cd4471b4a9f83 +Subproject commit d35c99035b8d870ed99000f44a957cad597f79c5 diff --git a/sparta/sparta/resources/Buffer.hpp b/sparta/sparta/resources/Buffer.hpp index 9f23785ea5..0a385c3e96 100644 --- a/sparta/sparta/resources/Buffer.hpp +++ b/sparta/sparta/resources/Buffer.hpp @@ -99,8 +99,9 @@ namespace sparta public: DataPointer() { } + ~DataPointer() { } - DataPointer(DataPointer &&orig) { + DataPointer(DataPointer &&orig) noexcept { ::memcpy(&object_memory_, &orig.object_memory_, sizeof(object_memory_)); data = reinterpret_cast(&object_memory_); } diff --git a/sparta/sparta/serialization/checkpoint/Checkpoint.hpp b/sparta/sparta/serialization/checkpoint/Checkpoint.hpp index a166a893a4..d4a771d39f 100644 --- a/sparta/sparta/serialization/checkpoint/Checkpoint.hpp +++ b/sparta/sparta/serialization/checkpoint/Checkpoint.hpp @@ -2,21 +2,10 @@ #pragma once -#include -#include - -#include "sparta/functional/ArchData.hpp" -#include "sparta/utils/SpartaException.hpp" -#include "sparta/utils/SpartaAssert.hpp" -#include "sparta/kernel/Scheduler.hpp" - -#include "sparta/serialization/checkpoint/CheckpointExceptions.hpp" - +#include "sparta/serialization/checkpoint/CheckpointBase.hpp" namespace sparta::serialization::checkpoint { - class FastCheckpointer; - /*! * \brief Single checkpoint object interface with a tick number and an ID * unique to the owning Checkpointer instance @@ -25,35 +14,10 @@ namespace sparta::serialization::checkpoint * checkpoint data in memory or on disk at construction which can be * restored with load() */ - class Checkpoint + class Checkpoint : public CheckpointBase { public: - //! \name Local Types - //! @{ - //////////////////////////////////////////////////////////////////////// - - //! \brief tick_t Tick type to which checkpoints will refer - typedef sparta::Scheduler::Tick tick_t; - - //! \brief tick_t Tick type to which checkpoints will refer - typedef uint64_t chkpt_id_t; - - //////////////////////////////////////////////////////////////////////// - //! @} - - /*! - * \brief Indicates the smallest valid checkpoint id - */ - static const chkpt_id_t MIN_CHECKPOINT = 0; - - /*! - * \brief Indicates unidentified checkpoint (could mean 'invalid' or - * 'any') depending on context - */ - static const chkpt_id_t UNIDENTIFIED_CHECKPOINT = ~(chkpt_id_t)0; - - //! \name Construction & Initialization //! @{ //////////////////////////////////////////////////////////////////////// @@ -65,7 +29,13 @@ namespace sparta::serialization::checkpoint Checkpoint(const Checkpoint&) = delete; //! \brief Non-assignable - const Checkpoint& operator=(const Checkpoint&) = delete; + Checkpoint& operator=(const Checkpoint&) = delete; + + //! \brief Not move constructable + Checkpoint(Checkpoint&&) = delete; + + //! \brief Not move assignable + Checkpoint& operator=(Checkpoint&&) = delete; protected: @@ -75,18 +45,14 @@ namespace sparta::serialization::checkpoint Checkpoint(chkpt_id_t id, tick_t tick, Checkpoint* prev) : - tick_(tick), - chkpt_id_(id), + CheckpointBase(id, tick), prev_(prev) { } public: - /*! - * \brief Destructor - * - * Removes this checkpoint from the chain and patches chain between prev + * \brief Removes this checkpoint from the chain and patches chain between prev * and each item in the nexts list */ virtual ~Checkpoint() { @@ -101,72 +67,11 @@ namespace sparta::serialization::checkpoint getPrev()->addNext(d); } } - } //////////////////////////////////////////////////////////////////////// //! @} - /*! - * \brief Returns a string describing this object - */ - virtual std::string stringize() const { - std::stringstream ss; - ss << "'; - return ss.str(); - } - - /*! - * \brief Writes all checkpoint raw data to an ostream - * \param o ostream to which raw data will be written - * \note No newlines or other extra characters will be appended - */ - virtual void dumpData(std::ostream& o) const = 0; - - /*! - * \brief Returns memory usage by this checkpoint including any - * framework data structures - */ - virtual uint64_t getTotalMemoryUse() const noexcept = 0; - - /*! - * \brief Returns memory usage by this checkpoint solely for the - * checkpointed content. - */ - virtual uint64_t getContentMemoryUse() const noexcept = 0; - - //! \name Checkpoint Actions - //! @{ - //////////////////////////////////////////////////////////////////////// - - /*! - * \brief Attempts to restore this checkpoint state to the simulation - * state (ArchData) objects given to this Checkpoint at construction - */ - virtual void load(const std::vector& dats) = 0; - - /*! - * \brief Returns the tick number at which this checkpoint was taken. - */ - tick_t getTick() const noexcept { return tick_; } - - /*! - * \brief Returns the ID of this checkpoint - * \note Number has no sequential meaning - it is effectively a random - * ID. - */ - chkpt_id_t getID() const noexcept { return chkpt_id_; } - - /*! - * \brief Gets the representation of this deleted checkpoint as part of - * a checkpoint chain (if that checkpointer supports deletion) - */ - virtual std::string getDeletedRepr() const { - return "*"; - } - /*! * \brief Returns the previous checkpoint. If this checkpoint is a * snapshot, it has no previous checkpoint. @@ -177,7 +82,7 @@ namespace sparta::serialization::checkpoint /*! * \brief Sets the previous checkpoint of this checkpoint to \a prev - * \param prev New previou checkpoint. Overwrites previous + * \param prev New previous checkpoint. Overwrites previous * This will often be accompanied by a call to addNext on the * \a prev argument */ @@ -185,6 +90,14 @@ namespace sparta::serialization::checkpoint prev_ = prev; } + /*! + * \brief Get the ID of our previous checkpoint. Returns UNIDENTIFIED_CHECKPOINT + * only for the head checkpoint. + */ + chkpt_id_t getPrevID() const override { + return prev_ ? prev_->getID() : UNIDENTIFIED_CHECKPOINT; + } + /*! * \brief Adds another next checkpoint following *this. * \param next Next checkpoint (later in simulator ticks) than @@ -242,22 +155,22 @@ namespace sparta::serialization::checkpoint */ const std::vector& getNexts() const noexcept { return nexts_; } - //////////////////////////////////////////////////////////////////////// - //! @} - - protected: - /*! - * \brief Sets the checkpoint ID. + * \brief Returns next checkpoint following *this. May be an empty + * vector if there are no later checkpoints. */ - void setID_(chkpt_id_t id) { - chkpt_id_ = id; + std::vector getNextIDs() const override { + std::vector next_ids; + for (const auto chkpt : getNexts()) { + next_ids.push_back(chkpt->getID()); + } + return next_ids; } - private: + //////////////////////////////////////////////////////////////////////// + //! @} - const tick_t tick_; //!< Tick number for this checkpoint. - chkpt_id_t chkpt_id_; //!< This checkpoint's ID. Guaranteed to be unique from other checkpoints' + private: /*! * \brief Next checkpoint (later tick numbers in same forward stream of @@ -270,27 +183,3 @@ namespace sparta::serialization::checkpoint }; } // namespace sparta::serialization::checkpoint - - -//! ostream insertion operator for Checkpoint -inline std::ostream& operator<<(std::ostream& o, const sparta::serialization::checkpoint::Checkpoint& dcp){ - o << dcp.stringize(); - return o; -} - -//! ostream insertion operator for Checkpoint -inline std::ostream& operator<<(std::ostream& o, const sparta::serialization::checkpoint::Checkpoint* dcp){ - if(dcp == 0){ - o << "null"; - }else{ - o << dcp->stringize(); - } - return o; -} - -//! \brief Required in simulator source to define some globals. -#define SPARTA_CHECKPOINT_BODY \ - namespace sparta{ namespace serialization { namespace checkpoint { \ - const Checkpoint::chkpt_id_t Checkpoint::MIN_CHECKPOINT; \ - const Checkpoint::chkpt_id_t Checkpoint::UNIDENTIFIED_CHECKPOINT; \ - }}} diff --git a/sparta/sparta/serialization/checkpoint/CheckpointBase.hpp b/sparta/sparta/serialization/checkpoint/CheckpointBase.hpp new file mode 100644 index 0000000000..4c67bcdca0 --- /dev/null +++ b/sparta/sparta/serialization/checkpoint/CheckpointBase.hpp @@ -0,0 +1,209 @@ +// -*- C++ -*- + +#pragma once + +#include +#include + +#include "sparta/functional/ArchData.hpp" +#include "sparta/utils/SpartaException.hpp" +#include "sparta/utils/SpartaAssert.hpp" +#include "sparta/kernel/Scheduler.hpp" + +#include "sparta/serialization/checkpoint/CheckpointExceptions.hpp" + +namespace sparta::serialization::checkpoint +{ + /*! + * \brief Single checkpoint object interface with a tick number and an ID + * unique to the owning Checkpointer instance + * + * A subclass of Checkpointer is expected to hold or refer to some + * checkpoint data in memory or on disk at construction which can be + * restored with load() + */ + class CheckpointBase + { + public: + + //! \name Local Types + //! @{ + //////////////////////////////////////////////////////////////////////// + + //! \brief tick_t Tick type to which checkpoints will refer + typedef sparta::Scheduler::Tick tick_t; + + //! \brief tick_t Checkpoint ID type to which checkpoints will refer + typedef uint64_t chkpt_id_t; + + //////////////////////////////////////////////////////////////////////// + //! @} + + /*! + * \brief Indicates the smallest valid checkpoint id + */ + static const chkpt_id_t MIN_CHECKPOINT = 0; + + /*! + * \brief Indicates unidentified checkpoint (could mean 'invalid' or + * 'any') depending on context + */ + static const chkpt_id_t UNIDENTIFIED_CHECKPOINT = ~(chkpt_id_t)0; + + + //! \name Construction & Initialization + //! @{ + //////////////////////////////////////////////////////////////////////// + + //! \brief Not copy constructable + CheckpointBase(const CheckpointBase&) = delete; + + //! \brief Non-assignable + CheckpointBase& operator=(const CheckpointBase&) = delete; + + //! \brief Not move constructable + CheckpointBase(CheckpointBase&&) = delete; + + //! \brief Not move assignable + CheckpointBase& operator=(CheckpointBase&&) = delete; + + protected: + + /*! + * \note Should only be constructed by subclasses + */ + CheckpointBase(chkpt_id_t id, tick_t tick) : + tick_(tick), + chkpt_id_(id) + { } + + CheckpointBase() = default; + + public: + + /*! + * \brief Destructor + */ + virtual ~CheckpointBase() = default; + + /*! + * \brief boost::serialization support + */ + template + void serialize(Archive& ar, const unsigned int /*version*/) { + ar & tick_; + ar & chkpt_id_; + } + + /*! + * \brief Returns a string describing this object + */ + virtual std::string stringize() const { + std::stringstream ss; + ss << "'; + return ss.str(); + } + + /*! + * \brief Writes all checkpoint raw data to an ostream + * \param o ostream to which raw data will be written + * \note No newlines or other extra characters will be appended + */ + virtual void dumpData(std::ostream& o) const = 0; + + /*! + * \brief Returns memory usage by this checkpoint including any + * framework data structures + */ + virtual uint64_t getTotalMemoryUse() const noexcept = 0; + + /*! + * \brief Returns memory usage by this checkpoint solely for the + * checkpointed content. + */ + virtual uint64_t getContentMemoryUse() const noexcept = 0; + + //! \name Checkpoint Actions + //! @{ + //////////////////////////////////////////////////////////////////////// + + /*! + * \brief Attempts to restore this checkpoint state to the simulation + * state (ArchData) objects given to this Checkpoint at construction + */ + virtual void load(const std::vector& dats) = 0; + + /*! + * \brief Returns the tick number at which this checkpoint was taken. + */ + tick_t getTick() const noexcept { return tick_; } + + /*! + * \brief Returns the ID of this checkpoint + * \note Number has no sequential meaning - it is effectively a random + * ID. + */ + chkpt_id_t getID() const noexcept { return chkpt_id_; } + + /*! + * \brief Get the ID of our previous checkpoint. Returns UNIDENTIFIED_CHECKPOINT + * only for the head checkpoint. + */ + virtual chkpt_id_t getPrevID() const = 0; + + /*! + * \brief Returns next checkpoint following *this. May be an empty + * vector if there are no later checkpoints. + */ + virtual std::vector getNextIDs() const = 0; + + /*! + * \brief Gets the representation of this deleted checkpoint as part of + * a checkpoint chain (if that checkpointer supports deletion) + */ + virtual std::string getDeletedRepr() const { + return "*"; + } + + //////////////////////////////////////////////////////////////////////// + //! @} + + protected: + /*! + * \brief Sets the checkpoint ID. + */ + void setID_(chkpt_id_t id) { + chkpt_id_ = id; + } + + private: + tick_t tick_; //!< Tick number for this checkpoint. + chkpt_id_t chkpt_id_; //!< This checkpoint's ID. Guaranteed to be unique from other checkpoints' + }; + +} // namespace sparta::serialization::checkpoint + +//! ostream insertion operator for Checkpoint +inline std::ostream& operator<<(std::ostream& o, const sparta::serialization::checkpoint::CheckpointBase& dcp){ + o << dcp.stringize(); + return o; +} + +//! ostream insertion operator for Checkpoint +inline std::ostream& operator<<(std::ostream& o, const sparta::serialization::checkpoint::CheckpointBase* dcp){ + if(dcp == 0){ + o << "null"; + }else{ + o << dcp->stringize(); + } + return o; +} + +//! \brief Required in simulator source to define some globals. +#define SPARTA_CHECKPOINT_BODY \ + namespace sparta{ namespace serialization { namespace checkpoint { \ + const CheckpointBase::chkpt_id_t CheckpointBase::MIN_CHECKPOINT; \ + const CheckpointBase::chkpt_id_t CheckpointBase::UNIDENTIFIED_CHECKPOINT; \ + }}} diff --git a/sparta/sparta/serialization/checkpoint/Checkpointer.hpp b/sparta/sparta/serialization/checkpoint/Checkpointer.hpp index 60643de77c..1730e756a9 100644 --- a/sparta/sparta/serialization/checkpoint/Checkpointer.hpp +++ b/sparta/sparta/serialization/checkpoint/Checkpointer.hpp @@ -87,10 +87,7 @@ namespace sparta::serialization::checkpoint */ Checkpointer(TreeNode& root, sparta::Scheduler* sched=nullptr) : sched_(sched), - root_(root), - head_(nullptr), - current_(nullptr), - total_chkpts_created_(0) + root_(root) { } /*! @@ -127,25 +124,13 @@ namespace sparta::serialization::checkpoint * \note This is an approxiation and does not include some of * minimal dynamic overhead from stl containers. */ - uint64_t getTotalMemoryUse() const noexcept { - uint64_t mem = 0; - for(auto& cp : chkpts_){ - mem += cp.second->getTotalMemoryUse(); - } - return mem; - } + virtual uint64_t getTotalMemoryUse() const noexcept = 0; /*! * \brief Computes and returns the memory usage by this checkpointer at * this moment purely for the checkpoint state being held */ - uint64_t getContentMemoryUse() const noexcept { - uint64_t mem = 0; - for(auto& cp : chkpts_){ - mem += cp.second->getContentMemoryUse(); - } - return mem; - } + virtual uint64_t getContentMemoryUse() const noexcept = 0; /*! * \brief Returns the total number of checkpoints which have been @@ -323,7 +308,7 @@ namespace sparta::serialization::checkpoint * \note Makes a new vector of results. This should not be called in a * performance-critical path. */ - virtual std::vector getCheckpointsAt(tick_t t) const = 0; + virtual std::vector getCheckpointsAt(tick_t t) = 0; /*! * \brief Gets all known checkpoint IDs available on any timeline sorted @@ -333,7 +318,7 @@ namespace sparta::serialization::checkpoint * \note Makes a new vector of results. This should not be called in a * performance-critical path. */ - virtual std::vector getCheckpoints() const = 0; + virtual std::vector getCheckpoints() = 0; /*! * \brief Gets the current number of checkpoints having valid IDs @@ -359,37 +344,7 @@ namespace sparta::serialization::checkpoint * \note Makes a new vector of results. This should not be called in the * critical path. */ - virtual std::deque getCheckpointChain(chkpt_id_t id) const = 0; - - /*! - * \brief Finds the latest checkpoint at or before the given tick - * starting at the \a from checkpoint and working backward. - * If no checkpoints before or at tick are found, returns nullptr. - * \param tick Tick to search for - * \param from Checkpoint at which to begin searching for a tick. - * Must be a valid checkpoint known by this checkpointer. - * See hasCheckpoint. - * \return The latest checkpoint with a tick number less than or equal - * to the \a tick argument. Returns nullptr if no checkpoints before \a - * tick were found. It is possible for the checkpoint identified by \a - * from could be returned. - * \warning This is not a high-performance method. Generally, - * a client of this interface knows a paticular ID. - * \throw CheckpointError if \a from does not refer to a valid - * checkpoint. - */ - virtual Checkpoint* findLatestCheckpointAtOrBefore(tick_t tick, - chkpt_id_t from) = 0; - - /*! - * \brief Finds a checkpoint by its ID - * \param id ID of checkpoint to find. Guaranteed not to be flagged as - * deleted - * \return Checkpoint with ID of \a id if found or nullptr if not found - */ - Checkpoint* findCheckpoint(chkpt_id_t id) noexcept { - return findCheckpoint_(id); - } + virtual std::deque getCheckpointChain(chkpt_id_t id) = 0; /*! * \brief Tests whether this checkpoint manager has a checkpoint with @@ -398,9 +353,7 @@ namespace sparta::serialization::checkpoint * and false if not. If id == Checkpoint::UNIDENTIFIED_CHECKPOINT, * always returns false */ - virtual bool hasCheckpoint(chkpt_id_t id) const noexcept { - return findCheckpoint_(id) != nullptr; - } + virtual bool hasCheckpoint(chkpt_id_t id) noexcept = 0; /*! * \brief Returns the head checkpoint which is equivalent to the @@ -412,7 +365,7 @@ namespace sparta::serialization::checkpoint * The head checkpoint has an ID of * Checkpoint::UNIDENTIFIED_CHECKPOINT and can never be deleted. */ - const Checkpoint* getHead() const noexcept { + const CheckpointBase* getHead() const noexcept { return head_; } @@ -484,6 +437,11 @@ namespace sparta::serialization::checkpoint return 0; } + /*! + * \brief Returns IDs of the checkpoints immediately following the given checkpoint. + */ + virtual std::vector getNextIDs(chkpt_id_t id) = 0; + //////////////////////////////////////////////////////////////////////// //! @} @@ -505,23 +463,14 @@ namespace sparta::serialization::checkpoint * ostream with a newline following each checkpoint * \param o ostream to dump to */ - void dumpList(std::ostream& o) const { - for(auto& cp : chkpts_){ - o << cp.second->stringize() << std::endl; - } - } + virtual void dumpList(std::ostream& o) = 0; /*! * \brief Dumps this checkpointer's data to an ostream with a newline * following each checkpoint * \param o ostream to dump to */ - void dumpData(std::ostream& o) const { - for(auto& cp : chkpts_){ - cp.second->dumpData(o); - o << std::endl; - } - } + virtual void dumpData(std::ostream& o) = 0; /*! * \brief Dumps this checkpointer's data to an @@ -529,13 +478,7 @@ namespace sparta::serialization::checkpoint * following each checkpoint description and each checkpoint data dump * \param o ostream to dump to */ - void dumpAnnotatedData(std::ostream& o) const { - for(auto& cp : chkpts_){ - o << cp.second->stringize() << std::endl; - cp.second->dumpData(o); - o << std::endl; - } - } + virtual void dumpAnnotatedData(std::ostream& o) = 0; /*! * \brief Debugging utility which dumps values in some bytes across a @@ -559,9 +502,9 @@ namespace sparta::serialization::checkpoint * for deep branches will be difficult to read * \param o ostream to dump to */ - void dumpTree(std::ostream& o) const { + void dumpTree(std::ostream& o) { std::deque c; - dumpBranch(o, head_, 0, 0, c); + dumpBranch(o, getHeadID(), 0, 0, c); o << '\n'; } @@ -578,10 +521,10 @@ namespace sparta::serialization::checkpoint * expected in directory-like tree-view displays */ void dumpBranch(std::ostream& o, - const Checkpoint* chkpt, + const chkpt_id_t chkpt, uint32_t indent, uint32_t pos, - std::deque& continues) const { + std::deque& continues) { //! \todo Move the constants somewhere static outside this function (especially the assert) static const std::string SEP_STR = "-> "; // Normal checkpoint chain static const std::string CONT_SEP_STR = "`> "; // Checkpoint branch from higher line @@ -601,7 +544,7 @@ namespace sparta::serialization::checkpoint } } - auto nexts = chkpt->getNexts(); + auto nexts = getNextIDs(chkpt); std::stringstream ss; // Draw separator between prev checkpoint and this @@ -612,14 +555,14 @@ namespace sparta::serialization::checkpoint } // Draw box around object if it is current - if(current_ == chkpt){ + if(current_ && current_->getID() == chkpt){ ss << "[ "; } dumpCheckpointNode_(chkpt, ss); ss << ' '; - if(current_ == chkpt){ + if(current_ && current_->getID() == chkpt){ ss << ']'; } @@ -653,20 +596,6 @@ namespace sparta::serialization::checkpoint protected: - /*! - * \brief Attempts to find a checkpoint within this checkpointer by ID. - * \param id Checkpoint ID to search for - * \return Pointer to found checkpoint with matchind ID. If not found, - * returns nullptr. - * \todo Faster lookup? - */ - virtual Checkpoint* findCheckpoint_(chkpt_id_t id) noexcept = 0; - - /*! - * \brief const variant of findCheckpoint_ - */ - virtual const Checkpoint* findCheckpoint_(chkpt_id_t id) const noexcept = 0; - /*! * \brief Create a head node. * \pre ArchDatas for tree root are already enumerated @@ -689,8 +618,8 @@ namespace sparta::serialization::checkpoint */ virtual chkpt_id_t createCheckpoint_(bool force_snapshot=false) = 0; - virtual void dumpCheckpointNode_(const Checkpoint* chkpt, std::ostream& o) const { - o << chkpt->getID(); + virtual void dumpCheckpointNode_(const chkpt_id_t id, std::ostream& o) { + o << id; } /*! @@ -704,14 +633,14 @@ namespace sparta::serialization::checkpoint /*! * \brief Non-const variant of getHead_ */ - Checkpoint* getHead_() noexcept { + CheckpointBase* getHead_() noexcept { return head_; } /*! * \brief Gets the head checkpoint. Returns nullptr if none created yet */ - const Checkpoint* getHead_() const noexcept { + const CheckpointBase* getHead_() const noexcept { return head_; } @@ -722,7 +651,7 @@ namespace sparta::serialization::checkpoint * \pre Internal head pointer must be nullptr. * \note This can only be done once */ - void setHead_(Checkpoint* head) { + void setHead_(CheckpointBase* head) { sparta_assert(head != nullptr, "head argument in setHead_ cannot be nullptr"); sparta_assert(head_ == nullptr, "Cannot setHead_ again on a Checkpointer once heas is already set"); head_ = head; @@ -732,7 +661,7 @@ namespace sparta::serialization::checkpoint * \brief Gets the current checkpointer pointer. Returns nullptr if * there is no current checkpoint object */ - Checkpoint* getCurrent_() const noexcept { + CheckpointBase* getCurrent_() const noexcept { return current_; } @@ -742,22 +671,12 @@ namespace sparta::serialization::checkpoint * checkpoint created will follow the current checkpoint set here. * Cannot be nullptr */ - void setCurrent_(Checkpoint* current) { + void setCurrent_(CheckpointBase* current) { sparta_assert(current != nullptr, "Can never setCurrent_ to nullptr except. A null current is a valid state at initialization only") current_ = current; } - /*! - * \brief All checkpoints sorted by ascending tick number (or - * equivalently ascending checkpoint ID since both are monotonically - * increasing) - * - * This map must still be explicitly torn down in reverse order by a - * subclass of Checkpointer - */ - std::map> chkpts_; - /*! * \brief Scheduler whose tick count will be set and read. Cannnot be * updated after first checkpoint without bad side effects. Keeping this @@ -818,9 +737,9 @@ namespace sparta::serialization::checkpoint /*! * \brief Head checkpoint. This is the first checkpoint taken but cannot - * be deleted. Head checkpoint memory is owned by chkpts_. + * be deleted. Head checkpoint memory is owned by checkpointer subclass. */ - Checkpoint* head_; + CheckpointBase* head_ = nullptr; /*! * \brief ArchDatas required to checkpoint for this checkpointiner based @@ -831,13 +750,13 @@ namespace sparta::serialization::checkpoint /*! * \brief Most recent checkpoint created or loaded */ - Checkpoint* current_; + CheckpointBase* current_ = nullptr; /*! * \brief Total checkpoint ever created by this instance. Monotonically * increasing. Includes the head checkpoint */ - uint64_t total_chkpts_created_; + uint64_t total_chkpts_created_ = 0; }; } // namespace sparta::serialization::checkpoint diff --git a/sparta/sparta/serialization/checkpoint/DatabaseCheckpoint.hpp b/sparta/sparta/serialization/checkpoint/DatabaseCheckpoint.hpp new file mode 100644 index 0000000000..8509fa4cf7 --- /dev/null +++ b/sparta/sparta/serialization/checkpoint/DatabaseCheckpoint.hpp @@ -0,0 +1,257 @@ +// -*- C++ -*- + +#pragma once + +#include "sparta/serialization/checkpoint/CheckpointBase.hpp" +#include "sparta/serialization/checkpoint/CheckpointExceptions.hpp" +#include "sparta/serialization/checkpoint/VectorStorage.hpp" + +namespace sparta::serialization::checkpoint +{ + class DatabaseCheckpointer; + class DatabaseCheckpoint; + + /*! + * \brief A window of checkpoints to be sent to/from the database as a unit. + * \note A "window" is defined as a group of (snap_thresh_ + 1) checkpoints, + * where the first checkpoint in the window is a snapshot and the remaining + * checkpoints in the window are deltas. Checkpoints are processed this way + * to enable various performance optimizations. + */ + struct ChkptWindow { + using chkpt_id_t = CheckpointBase::chkpt_id_t; + std::vector> chkpts; + chkpt_id_t start_chkpt_id = CheckpointBase::UNIDENTIFIED_CHECKPOINT; + chkpt_id_t end_chkpt_id = CheckpointBase::UNIDENTIFIED_CHECKPOINT; + uint64_t start_tick = 0; + uint64_t end_tick = 0; + + //! \brief Support boost::serialization + template + void serialize(Archive& ar, const unsigned int /*version*/); + }; + + /*! + * \brief Compressed version of ChkptWindow to be stored in the database. + */ + struct ChkptWindowBytes { + using chkpt_id_t = CheckpointBase::chkpt_id_t; + std::vector chkpt_bytes; + chkpt_id_t start_chkpt_id = CheckpointBase::UNIDENTIFIED_CHECKPOINT; + chkpt_id_t end_chkpt_id = CheckpointBase::UNIDENTIFIED_CHECKPOINT; + uint64_t start_tick = 0; + uint64_t end_tick = 0; + }; + + /*! + * \brief Checkpoint class optimized for use with database-backed + * checkpointers. + */ + class DatabaseCheckpoint final : public CheckpointBase + { + public: + + //! \name Construction & Initialization + //! @{ + //////////////////////////////////////////////////////////////////////// + + //! \brief Not copy constructable + DatabaseCheckpoint(const DatabaseCheckpoint&) = delete; + + //! \brief Non-assignable + DatabaseCheckpoint& operator=(const DatabaseCheckpoint&) = delete; + + //! \brief Not move constructable + DatabaseCheckpoint(DatabaseCheckpoint&&) = delete; + + //! \brief Not move assignable + DatabaseCheckpoint& operator=(DatabaseCheckpoint&&) = delete; + + private: + + //! \brief Construction to be performed by friend class DatabaseCheckpointer + DatabaseCheckpoint(TreeNode& root, + const std::vector& dats, + chkpt_id_t id, + tick_t tick, + DatabaseCheckpoint* prev, + bool is_snapshot, + DatabaseCheckpointer* checkpointer); + + //! \brief Default constructable required for boost::serialization of ChkptWindow + DatabaseCheckpoint() = default; + + //////////////////////////////////////////////////////////////////////// + //! @} + + friend struct ChkptWindow; + friend class DatabaseCheckpointer; + + public: + + /* + * \brief Support boost::serialization + */ + template + void serialize(Archive& ar, const unsigned int version) { + CheckpointBase::serialize(ar, version); + ar & prev_id_; + ar & next_ids_; + ar & is_snapshot_; + ar & data_; + } + + /*! + * \brief Returns a string describing this object. + */ + std::string stringize() const override; + + /*! + * \brief Writes all checkpoint raw data to an ostream. + * \param o ostream to which raw data will be written. + * \note No newlines or other extra characters will be appended. + */ + void dumpData(std::ostream& o) const override; + + /*! + * \brief Returns memory usage by this checkpoint. + */ + uint64_t getTotalMemoryUse() const noexcept override; + + /*! + * \brief Returns memory usage by the content of this checkpoint. + */ + uint64_t getContentMemoryUse() const noexcept override; + + /*! + * \brief Returns a stack of checkpoints from this checkpoint as far + * back as possible until no previous link is found. + * + * \note Since this checkpointer enforces a linear chain of checkpoints + * with no gaps, this always reaches the head checkpoint. + */ + std::stack getHistoryChain() const; + + /*! + * \brief Returns a stack of checkpoints that must be restored from + * top-to-bottom to fully restore the state associated with this + * checkpoint. + */ + std::stack getRestoreChain() const; + + /*! + * \brief Get the ID of our previous checkpoint. Returns UNIDENTIFIED_CHECKPOINT + * only for the head checkpoint. + */ + chkpt_id_t getPrevID() const override; + + /*! + * \brief Returns next checkpoints following this one. May be an empty + * vector if there are no later checkpoints. + * + * \note Since this checkpointer enforces a linear chain of checkpoints + * with no gaps, this vector will always have 0 or 1 elements. + */ + std::vector getNextIDs() const override; + + /*! + * \brief Attempts to restore this checkpoint including any previous + * deltas (dependencies). + * + * \note Uses loadState to restore state from each checkpoint in the + * restore chain. + */ + void load(const std::vector& dats) override; + + /*! + * \brief Is this checkpoint a snapshot? If true, this checkpoint has + * no dependencies and contains all simulator state. + */ + bool isSnapshot() const noexcept; + + /*! + * \brief Determines how many checkpoints away the closest, earlier + * snapshot is. + * + * \return distance to closest snapshot. If this node is a snapshot, + * returns 0; if immediate getPrev() is a snapshot, returns 1; and + * so on. + */ + uint32_t getDistanceToPrevSnapshot() const noexcept; + + /*! + * \brief Loads delta state of this checkpoint to root. + * \note Does not look at any other checkpoints. + * \see DatabaseCheckpointer::load + */ + void loadState(const std::vector& dats); + + private: + + /*! + * \brief Writes checkpoint data starting from current root to + * checkpoint storage. + * + * \pre Must not have already stored data for this checkpoint. + * + * \note This should only be called at construction + */ + void storeSnapshot_(const std::vector& dats); + + /*! + * \brief Writes checkpoint data starting from current root to + * checkpoint storage. + * + * \pre Must not have already stored data for this checkpoint. + * + * \note This should only be called at construction + */ + void storeDelta_(const std::vector& dats); + + //! \brief ID of the previous checkpoint. + chkpt_id_t prev_id_; + + /*! + * \brief IDs of the next checkpoints. Since this checkpointer + * enforces a linear chain of checkpoints with no gaps, this vector + * will always have 0 or 1 elements. + */ + std::vector next_ids_; + + //! \brief Is this node a snapshot? + bool is_snapshot_; + + //! \brief Storage implementation. + storage::VectorStorage data_; + + //! \brief Checkpointer who created us. + DatabaseCheckpointer* checkpointer_ = nullptr; + }; + + /*! + * \brief Support boost::serialization for ChkptWindow. + * \note Defined down here for "new DatabaseCheckpoint". + */ + template + inline void ChkptWindow::serialize(Archive& ar, const unsigned int /*version*/) { + ar & start_chkpt_id; + ar & end_chkpt_id; + ar & start_tick; + ar & end_tick; + + if (chkpts.empty()) { + // We are loading a checkpoint window from disk + const auto num_chkpts = end_chkpt_id - start_chkpt_id + 1; + for (size_t i = 0; i < num_chkpts; ++i) { + chkpts.emplace_back(new DatabaseCheckpoint); + ar & *chkpts.back(); + } + } else { + // We are saving a checkpoint window to disk + for (auto& chkpt : chkpts) { + ar & *chkpt; + } + } + } + +} // namespace sparta::serialization::checkpoint diff --git a/sparta/sparta/serialization/checkpoint/DatabaseCheckpointer.hpp b/sparta/sparta/serialization/checkpoint/DatabaseCheckpointer.hpp new file mode 100644 index 0000000000..b871d8c384 --- /dev/null +++ b/sparta/sparta/serialization/checkpoint/DatabaseCheckpointer.hpp @@ -0,0 +1,509 @@ +// -*- C++ -*- + +#pragma once + +#include "sparta/serialization/checkpoint/Checkpointer.hpp" +#include "sparta/serialization/checkpoint/DatabaseCheckpoint.hpp" +#include "simdb/apps/App.hpp" +#include "simdb/pipeline/Pipeline.hpp" +#include + +namespace sparta::serialization::checkpoint +{ + +class DatabaseCheckpointer; + +/*! + * \brief Implementation of the FastCheckpointer which only holds + * a subset of checkpoints in memory at any given time, and sends + * checkpoints outside this window to/from SimDB as needed using + * an LRU cache. + */ +class DatabaseCheckpointer final : public simdb::App, public Checkpointer +{ +public: + static constexpr auto NAME = "db-checkpointer"; + + using checkpoint_type = DatabaseCheckpoint; + using checkpoint_ptr = std::shared_ptr; + using checkpoint_ptrs = std::vector; + using window_id_t = uint64_t; + + /*! + * \brief DatabaseCheckpointer constructor + * + * \param db_mgr SimDB instance to use as a backing store for all checkpoints. + * + * \param root TreeNode at which checkpoints will be taken. + * This cannot be changed later. This does not + * necessarily need to be a RootTreeNode. Before + * the first checkpoint is taken, this node must + * be finalized (see sparta::TreeNode::isFinalized). + * At the point of construction, the node does not + * need to be finalized. + * + * \param sched Scheduler to read and restart on checkpoint restore (if + * not nullptr) + */ + DatabaseCheckpointer(simdb::DatabaseManager* db_mgr, TreeNode& root, Scheduler* sched=nullptr); + + /*! + * \brief Define the SimDB schema for this checkpointer. + */ + static void defineSchema(simdb::Schema& schema); + + /*! + * \brief Instantiate the async processing pipeline to save/load checkpoints. + */ + std::unique_ptr createPipeline( + simdb::pipeline::AsyncDatabaseAccessor* db_accessor) override; + + /*! + * \brief Flush all cached windows down the pipeline before threads are shut down. + */ + void preTeardown() override; + + /*! + * \brief Returns the next-shapshot threshold. + * + * This represents the distance between two checkpoints required for the + * checkpointer to automatically place a snapshot checkpoint instead of + * a delta. A threshold of 0 or 1 results in all checkpoints being + * snapshots. A value of 10 results in every 10th checkpoint being a + * snapshot. + * + * \note Unlike FastCheckpointer, this threshold is always enforced and + * snapshots cannot be created using createCheckpoint(force_snapshot=true). + * + * \note This value is a performance/space tradeoff knob. + */ + uint32_t getSnapshotThreshold() const; + + /*! + * \brief Sets the snapshot threshold. + * + * \note This must be called before any checkpoints are taken, and + * cannot be changed later. + */ + void setSnapshotThreshold(uint32_t thresh); + + /*! + * \brief Sets the max number of cached windows (LRU). + * + * \note This must be called before any checkpoints are taken, and + * cannot be changed later. + */ + void setMaxCachedWindows(uint32_t max_windows); + + /*! + * \brief Computes and returns the memory usage by this checkpointer at + * this moment including any framework overhead. + * + * \note This is an approxiation and does not include some of + * minimal dynamic overhead from stl containers. + * + * \note This only includes memory used by checkpoints currently + * in the cache, not checkpoints stored in the database. + */ + uint64_t getTotalMemoryUse() const noexcept override; + + /*! + * \brief Computes and returns the memory usage by this checkpointer at + * this moment purely for the checkpoint state being held. + * + * \note This only includes memory used by checkpoints currently + * in the cache, not checkpoints stored in the database. + */ + uint64_t getContentMemoryUse() const noexcept override; + + /*! + * \brief Explicit checkpoint deletion is NOT supported by this checkpointer. + * + * \throw CheckpointError if called + */ + void deleteCheckpoint(chkpt_id_t) override final; + + /*! + * \brief Loads state from a specific checkpoint by ID + * \note This implicitly deletes all future checkpoints since this checkpointer + * does not allow more than one branch. + * \throw CheckpointError if id does not refer to a checkpoint that exists + * or if checkpoint could not be loaded. + * \warning If checkpoint fails during loading for reasons other than an + * invalid ID, the simulation state could be corrupt + * \post Current checkpoint is now the checkpoint specified by id + * \post Next checkpoint created will have ID = +1 since gaps are not allowed + * \post Sets scheduler current tick to the checkpoint's tick using Scheduler::restartAt + */ + void loadCheckpoint(chkpt_id_t id) override; + + /*! + * \brief Gets all checkpoints taken at tick t. + * \param t Tick number at which checkpoints should found. + * \return vector of valid checkpoint IDs (never + * checkpoint_type::UNIDENTIFIED_CHECKPOINT) + * \note Walks all checkpoints in cache and on disk. This should + * not be called in the critical path. + */ + std::vector getCheckpointsAt(tick_t t) override; + + /*! + * \brief Gets all checkpoint IDs sorted by tick (or equivalently checkpoint ID). + * \return vector of valid checkpoint IDs (never checkpoint_type::UNIDENTIFIED_CHECKPOINT) + * \note Walks all checkpoints in cache and on disk. This should + * not be called in the critical path. + */ + std::vector getCheckpoints() override; + + /*! + * \brief Gets the current number of checkpoints with valid IDs. + */ + uint32_t getNumCheckpoints() const noexcept override; + + /*! + * \brief Gets the current number of snapshots with valid IDs. + */ + uint32_t getNumSnapshots() const noexcept; + + /*! + * \brief Gets the current number of delta checkpoints with valid IDs. + */ + uint32_t getNumDeltas() const noexcept; + + /*! + * \brief Debugging utility which gets a deque of checkpoints + * representing a chain starting at the checkpoint head and ending at + * the checkpoint specified by \a id. + * \param id ID of checkpoint that terminates the chain + * \return dequeue of checkpoint IDs where the front is always the head + * and the back is always the checkpoint described by \a id. If there is + * no checkpoint head, returns an empty result + * \throw CheckpointError if \a id does not refer to a valid checkpoint. + * \note The results never contain Checkpoint::UNIDENTIFIED_CHECKPOINT + */ + std::deque getCheckpointChain(chkpt_id_t id) override; + + /*! + * \brief Finds a checkpoint by its ID. + * \param id ID of checkpoint to find. + * \param must_exist Whether to enforce that the checkpoint be found. + * \return Checkpoint with ID of \a id if found or nullptr if not found. + * \throw CheckpointError if \a must_exist is true and \a id does not + * refer to a valid checkpoint. + */ + std::shared_ptr findCheckpoint(chkpt_id_t id, bool must_exist=false); + + /*! + * \brief Finds the latest checkpoint at or before the given tick + * starting at the \a from checkpoint and working backward. + * If no checkpoints before or at tick are found, returns nullptr. + * \param tick Tick to search for + * \param from Checkpoint at which to begin searching for a tick. + * Must be a valid checkpoint known by this checkpointer. + * See hasCheckpoint. + * \return The latest checkpoint with a tick number less than or equal + * to the \a tick argument. Returns nullptr if no checkpoints before \a + * tick were found. It is possible for the checkpoint identified by \a + * from could be returned. + * \warning This is not a high-performance method. Generally, + * a client of this interface knows a paticular ID. + * \throw CheckpointError if \a from does not refer to a valid + * checkpoint. + */ + std::shared_ptr findLatestCheckpointAtOrBefore(tick_t tick, chkpt_id_t from); + + /*! + * \brief Tests whether this checkpoint manager has a checkpoint with + * the given id in the cache or in the database. + * \return True if id refers to a checkpoint held by this checkpointer + * and false if not. + */ + bool hasCheckpoint(chkpt_id_t id) noexcept override; + + /*! + * \brief Dumps the restore chain for this checkpoint. + * \see getRestoreChain() + * \param o ostream to which chain data will be dumped + * \param id ID of starting checkpoint + */ + void dumpRestoreChain(std::ostream& o, chkpt_id_t id); + + /*! + * \brief Returns a stack of checkpoints from this checkpoint as far + * back as possible until no previous link is found. + * \note Since this checkpointer enforces a linear chain of checkpoints + * with no gaps, this always reaches the head checkpoint. + */ + std::stack getHistoryChain(chkpt_id_t id); + + /*! + * \brief Returns a stack of checkpoints that must be restored from + * top-to-bottom to fully restore the state associated with this + * checkpoint. + */ + std::stack getRestoreChain(chkpt_id_t id); + + /*! + * \brief Returns IDs of the checkpoints immediately following the given checkpoint. + * \note Since this checkpointer does not support checkpoint gaps, + * this will always be a vector of size 0 or 1. + */ + std::vector getNextIDs(chkpt_id_t id) override; + + /*! + * \brief Determines how many checkpoints away the closest, earlier + * snapshot is. + * \return distance to closest snapshot. If this node is a snapshot, + * returns 0; if immediate getPrev() is a snapshot, returns 1; and + * so on. + */ + uint32_t getDistanceToPrevSnapshot(chkpt_id_t id) noexcept; + + /*! + * \brief Check if the given checkpoint is a snapshot (not a delta). + * \return Returns false if not a snapshot or the id is not a checkpoint. + */ + bool isSnapshot(chkpt_id_t id) noexcept; + + /*! + * \brief Returns a string describing this object + */ + std::string stringize() const override; + + /*! + * \brief Dumps this checkpointer's flat list of checkpoints to an + * ostream with a newline following each checkpoint + * \param o ostream to dump to + */ + void dumpList(std::ostream& o) override; + + /*! + * \brief Dumps this checkpointer's data to an ostream with a newline + * following each checkpoint + * \param o ostream to dump to + */ + void dumpData(std::ostream& o) override; + + /*! + * \brief Dumps this checkpointer's data to an + * ostream with annotations between each ArchData and a newline + * following each checkpoint description and each checkpoint data dump + * \param o ostream to dump to + */ + void dumpAnnotatedData(std::ostream& o) override; + + /*! + * \brief Debugging utility which dumps values in some bytes across a + * chain of checkpoints. The intent is to show the values loaded when + * attempting to restore the given value in the selected checkpoint + * \param o ostream where each value and checkpoint ID will be printed + * \param id ID of checkpoint to restore value from + * \param container ArchData in which the data being traced lives + * \param offset Offset into \a container + * \param size Bytes to read at \a offset + * \warning This may change checkpoint data read/write state and should + * only be done between completed checkpoints saves/restores in order to + * not interfere. + * \note NOT CURRENTLY IMPLEMENTED + * \throw CheckpointError + */ + void traceValue(std::ostream& o, chkpt_id_t id, const ArchData* container, uint32_t offset, uint32_t size) override; + + /*! + * \brief Check if the given checkpoint is currently cached in memory. + * \note Used for testing and debugging only. + * \note Even though this might return true, if you wait a bit and call + * again with the same ID, it might return false since the checkpoint + * might have been evicted from the LRU cache. + */ + bool isCheckpointCached(chkpt_id_t id) const noexcept; + +private: + + /*! + * \brief Create a head node. + * \pre ArchDatas for tree root are already enumerated + * \pre Tree of getRoot() is already finalized + * \pre Guaranteed to have a null head at this time + * (getHead() == nullptr) + * \post Must create a head checkpoint + * \post Must invoke setHead_ + * \note invoked by createHead + */ + void createHead_() override; + + /*! + * \brief Create a checkpoint + * \pre Guaranteed to have a valid head at this time + * (getHead() != nullptr) + * \post Must create a checkpoint + * \return Must return a checkpoint ID not currently in use + * \note invoked by createCheckpoint + * \note This checkpointer does not support force_snapshot=true + * since it always enforces the snapshot threshold. + */ + chkpt_id_t createCheckpoint_(bool force_snapshot=false) override; + + /*! + * \brief Deletes a checkpoint by ID. + * \param id ID of checkpoint to delete. Must not be + * Checkpoint::UNIDENTIFIED_CHECKPOINT and must not be equal to the + * ID of the head checkpoint. + * \throw CheckpointError if id == getHeadID(). Head cannot be deleted. + * \note Since this checkpointer does not support checkpoint gaps, + * this will delete all checkpoints from the cache and database + * starting at the given ID and going to the most recent checkpoint. + */ + void deleteCheckpoint_(chkpt_id_t id); + + /*! + * \brief Implements Checkpointer::dumpCheckpointNode_ + */ + void dumpCheckpointNode_(const chkpt_id_t id, std::ostream& o) override; + + /*! + * \brief Sets the head checkpointer pointer to \a head for the first + * time + * \param head New head checkpoint pointer. Must not be nullptr + * \pre Internal head pointer must be nullptr. + * \note This can only be done once + */ + void setHead_(DatabaseCheckpoint* head); + + /*! + * \brief Sets the current checkpoint pointer. + * \param current Pointer to set as current checkpoint. The next + * checkpoint created will follow the current checkpoint set here. + * Cannot be nullptr + */ + void setCurrent_(DatabaseCheckpoint* current); + + /*! + * \brief Add the given checkpoint to the cache and bump its window to the + * front of the LRU list. + */ + void addToCache_(std::shared_ptr chkpt); + + /*! + * \brief Get the window ID for the given checkpoint ID. + */ + window_id_t getWindowID_(chkpt_id_t id) const; + + /*! + * \brief Bump the given window ID to the front of the LRU cache. + */ + void touchWindow_(window_id_t id); + + /*! + * \brief Evict the least recently used window(s) from the cache if needed + * and send them down the pipeline. + */ + void evictWindowsIfNeeded_(bool force_flush=false); + + /*! + * \brief Ensure this checkpoint's window is loaded in the LRU cache. + * \throw CheckpointError if must_succeed is true and the window + * could not be loaded. + */ + bool ensureWindowLoaded_(chkpt_id_t id, bool must_succeed=true); + + /*! + * \brief Retrieve a checkpoint window from the database. + */ + checkpoint_ptrs getWindowFromDatabase_(window_id_t win_id); + + /*! + * \brief "Undo" the pipeline for a ChkptWindows.WindowBytes blob + * into the original ChkptWindow structure. + */ + std::unique_ptr deserializeWindow_(const std::vector& window_bytes) const; + + /*! + * \brief Apply the given callback to every checkpoint (cached and database). + * \note Do not call in the critical path. Used for debugging and for the + * various dump* apis. + */ + void forEachCheckpoint_(const std::function& cb); + + //! \brief Checkpointer head ID. Used to prevent the head from being deleted from the cache. + chkpt_id_t head_id_ = checkpoint_type::UNIDENTIFIED_CHECKPOINT; + + //! \brief Checkpointer current ID. Used to prevent the current node from being deleted from the cache. + chkpt_id_t current_id_ = checkpoint_type::UNIDENTIFIED_CHECKPOINT; + + //! \brief Pipeline input queue which accepts the oldest checkpoint window from the cache. + simdb::ConcurrentQueue* pipeline_head_ = nullptr; + + //! \brief Subset (or all of) our checkpoints that we currently are holding in memory. + std::unordered_map chkpts_cache_; + + //! \brief LRU list of window IDs in our cache. Most recently used at front. + std::list lru_list_; + + //! \brief Map of window ID to its position in the LRU list for O(1) access. + std::unordered_map::iterator> lru_map_; + + //! \brief Maximum number of windows to hold in memory at any given time. + utils::ValidValue max_cached_windows_; + + //! \brief Mutex to protect our checkpoints cache. + mutable std::recursive_mutex cache_mutex_; + + //! \brief SimDB instance. + simdb::DatabaseManager* db_mgr_ = nullptr; + + //! \brief Checkpoint pipeline flusher. + std::unique_ptr pipeline_flusher_; + + //! \brief Snapshot generation threshold. Every n checkpoints in a chain + //! are taken as snapshots instead of deltas. + utils::ValidValue snap_thresh_; + + //! \brief Next checkpoint ID value. + chkpt_id_t next_chkpt_id_; +}; + +} // namespace sparta::serialization::checkpoint + +namespace simdb +{ + +/*! + * \brief This AppFactory specialization is provided since we have an app that inherits + * from Checkpointer, and thus cannot have the default app subclass ctor signature + * that only takes the DatabaseManager like most other apps. + */ +template <> +class AppFactory : public AppFactoryBase +{ +public: + using AppT = sparta::serialization::checkpoint::DatabaseCheckpointer; + + void setSpartaElems(sparta::TreeNode& root, sparta::Scheduler* sched = nullptr) + { + root_ = &root; + sched_ = sched; + } + + AppT* createApp(DatabaseManager* db_mgr) override + { + if (!root_) { + throw sparta::SpartaException( + "Must set root (and maybe scheduler) before instantiating DatabaseCheckpointer app"); + } + + // Make the ctor call that the default AppFactory cannot make. + return new AppT(db_mgr, *root_, sched_); + } + + void defineSchema(Schema& schema) const override + { + AppT::defineSchema(schema); + } + +private: + sparta::TreeNode* root_ = nullptr; + sparta::Scheduler* sched_ = nullptr; +}; + +} // namespace simdb diff --git a/sparta/sparta/serialization/checkpoint/DeltaCheckpoint.hpp b/sparta/sparta/serialization/checkpoint/DeltaCheckpoint.hpp index 4700dacfb0..ec30d64c7c 100644 --- a/sparta/sparta/serialization/checkpoint/DeltaCheckpoint.hpp +++ b/sparta/sparta/serialization/checkpoint/DeltaCheckpoint.hpp @@ -14,339 +14,11 @@ #include "sparta/serialization/checkpoint/Checkpointer.hpp" #include "sparta/serialization/checkpoint/CheckpointExceptions.hpp" - +#include "sparta/serialization/checkpoint/VectorStorage.hpp" +#include "sparta/serialization/checkpoint/StringStreamStorage.hpp" namespace sparta::serialization::checkpoint { - namespace storage - { - /*! - * \brief Vector of buffers storage implementation - */ - class VectorStorage - { - class Segment{ - ArchData::line_idx_type idx_; - std::unique_ptr data_; - uint32_t bytes_; - public: - - /*! - * \brief Copying disabled (avoid memcpy) - */ - Segment(const Segment&) = delete; - - /*! - * \brief Move constructor - */ - Segment(Segment&& rhp) : - idx_(rhp.idx_), - data_(std::move(rhp.data_)), - bytes_(rhp.bytes_) - { - rhp.idx_ = ArchData::INVALID_LINE_IDX; - rhp.bytes_ = 0; - } - - /*! - * \brief Dummy constructor. Represents null entry (end of ArchData) - */ - Segment() : - idx_(ArchData::INVALID_LINE_IDX), - bytes_(0) - {;} - - /*! - * \brief Deleted assignment operator - */ - Segment& operator=(const Segment& rhp) = delete; - - /*! - * \brief Data constructor. Allocates data and copies results over - */ - Segment(ArchData::line_idx_type idx, const char* data, size_t bytes) : - idx_(idx), bytes_(bytes) - { - sparta_assert(idx != ArchData::INVALID_LINE_IDX, - "Attempted to create segment of " << bytes << " bytes with invalid line index"); - data_.reset(new char[bytes]); - ::memcpy(data_.get(), data, bytes); - } - - ArchData::line_idx_type getLineIdx() const { - return idx_; - } - - uint32_t getSize() const { - return sizeof(decltype(*this)) + bytes_; - } - - void copyTo(char* buf, uint32_t size) const { - sparta_assert(size == bytes_, \ - "Attempted to restore checkpoint data for a line where the " - "data was " << bytes_ << " bytes but the loader requested " - << size << " bytes. The sizes must match up or something is " - "wrong"); - memcpy(buf, data_.get(), bytes_); - } - - void dump(std::ostream& o) const { - if(idx_ == ArchData::INVALID_LINE_IDX){ - std::cout << "\nEnd of ArchData"; - return; - } - - std::cout << "\nLine: " << std::dec << idx_ << " (" << bytes_ << ") bytes"; - for(uint32_t off = 0; off < bytes_;){ - char chr = data_[off]; - if(off % 32 == 0){ - o << std::endl << std::setw(7) << std::hex << off; - } - if(chr == 0){ - o << ' ' << ".."; - }else{ - o << ' ' << std::setfill('0') << std::setw(2) << std::hex << (0xff & (uint16_t)chr); - } - off++; - } - } - }; - - /*! - * \brief Data segments to restore - */ - std::vector data_; - - /*! - * \brief Next line index to store when writing lines - */ - ArchData::line_idx_type next_idx_ = ArchData::INVALID_LINE_IDX; - - /*! - * \brief Index in data_ of next line to restore in nextRestoreLine - */ - uint32_t next_restore_idx_ = 0; - - /*! - * \brief iterator in data_ of line being read by call to readLineData. - * Is always next_restore_idx_ or one less. - */ - decltype(data_)::const_iterator cur_restore_itr_; - - public: - VectorStorage() { - } - - ~VectorStorage() { - } - - void dump(std::ostream& o) const { - for(auto const &seg : data_){ - seg.dump(o); - } - } - - uint32_t getSize() const { - uint32_t bytes = sizeof(decltype(*this)); - for(Segment const & seg : data_){ - bytes += seg.getSize(); - } - return bytes; - } - - void prepareForLoad() { - next_restore_idx_ = 0; - cur_restore_itr_ = data_.begin(); - } - - void beginLine(ArchData::line_idx_type idx) { - sparta_assert(idx != ArchData::INVALID_LINE_IDX, - "Cannot begin line with INVALID_LINE_IDX index"); - next_idx_ = idx; - } - - void writeLineBytes(const char* data, size_t size) { - sparta_assert(data_.size() == 0 || data_.back().getLineIdx() != next_idx_, - "Cannot store the same line idx twice in a checkpoint. Line " - << next_idx_ << " detected twice in a row"); - sparta_assert(next_idx_ != ArchData::INVALID_LINE_IDX, - "Cannot write line bytes with INVALID_LINE_IDX index"); - data_.emplace_back(next_idx_, data, size); - } - - /*! - * \brief Signals end of this checkpoint's data for one ArchData - */ - void endArchData() { - data_.emplace_back(); - } - - /*! - * \brief Is the reading state of this storage good? (i.e. haven't tried - * to read past the end of the data) - */ - bool good() const { - return next_restore_idx_ <= data_.size(); // Not past end of stream - } - - /*! - * \brief Restore next line. Return ArchData::INVALID_LINE_IDX on - * end of data. - */ - ArchData::line_idx_type getNextRestoreLine() { - if(next_restore_idx_ == data_.size()){ - next_restore_idx_++; // Increment to detect errors - return ArchData::INVALID_LINE_IDX; // Done with restore - }else if(next_restore_idx_ > data_.size()){ // Past the end - throw SpartaException("Failed to restore a checkpoint because ") - << "caller tried to keep getting next line even after " - "reaching the end of the restore data"; - } - if(next_restore_idx_ != 0){ - cur_restore_itr_++; - } - next_restore_idx_++; - - const auto next_line_idx = cur_restore_itr_->getLineIdx(); // May be invalid to indicate end of ArchData - return next_line_idx; - }; - - /*! - * \brief Read bytes for the current line - */ - void copyLineBytes(char* buf, uint32_t size) { - sparta_assert(cur_restore_itr_ != data_.end(), - "Attempted to copy line bytes from an invalid line iterator"); - sparta_assert(cur_restore_itr_->getLineIdx() != ArchData::INVALID_LINE_IDX, - "About to return line from checkpoint data segment with INVALID_LINE_IDX index"); - cur_restore_itr_->copyTo(buf, size); - } - - /*! - * \brief Steal line buffer. Useful if the checkpoint is being reloaded - * AND simultaneouslty destroyed - * \todo implement this - */ - //void stealLineBytes(char*& buf_ptr, uint32_t size) { - // cur_restore_itr_->stealBuffer(buf_ptr, size); - //} - }; - - /*! - * \brief Stringstream storage implementation - * \warning This is deprecated in favor of VectorStorage for in-memory uses. - * However, this is a starting point for disk-based storage schemes - */ - class StringStreamStorage - { - std::stringstream ss_; - - public: - StringStreamStorage() { - ss_.exceptions(std::ostream::eofbit | std::ostream::badbit | - std::ostream::failbit | std::ostream::goodbit); - } - - void dump(std::ostream& o) const { - auto s = ss_.str(); - auto itr = s.begin(); - for(; itr != s.end(); itr++){ - char chr = *itr; - if(chr == 'L'){ - uint32_t off = 0; - ArchData::line_idx_type ln_idx; - strncpy((char*)&ln_idx, s.substr(itr-s.begin(), sizeof(ln_idx)).c_str(), sizeof(ln_idx)); - std::cout << "\nLine: " << ln_idx << std::endl; - itr += sizeof(ArchData::line_idx_type); - - for(uint16_t i=0; i<64; ++i){ - chr = *itr; - if(off % 32 == 0){ - o << std::setw(7) << std::hex << off; - } - if(chr == 0){ - o << ' ' << ".."; - }else{ - o << ' ' << std::setfill('0') << std::setw(2) << std::hex << (0xff & (uint16_t)chr); - } - off++; - if(off % 32 == 0){ - o << std::endl; - } - ++itr; - } - } - } - } - - uint32_t getSize() const { - return ss_.str().size() + sizeof(decltype(*this)); - } - - void prepareForLoad() { - ss_.seekg(0); // Seek to start with get pointer before consuming - } - - void beginLine(ArchData::line_idx_type idx) { - ss_ << 'L'; // Line start char - - ArchData::line_idx_type idx_repr = reorder(idx); - ss_.write((char*)&idx_repr, sizeof(ArchData::line_idx_type)); - } - - void writeLineBytes(const char* data, size_t size) { - ss_.write(data, size); - } - - /*! - * \brief Signals end of this checkpoint's data - */ - void endArchData() { - ss_ << "E"; // Indicates end of this checkpoint data - - sparta_assert(ss_.good(), - "Ostream error while writing checkpoint data"); - } - - /*! - * \brief Is the reading state of this storage good? (i.e. haven't tried - * to read past the end of the data) - */ - bool good() const { - return ss_.good(); - } - - /*! - * \brief Restore next line. Return ArchData::INVALID_LINE_IDX on - * end of data. - */ - ArchData::line_idx_type getNextRestoreLine() { - char ctrl; - ss_ >> ctrl; - sparta_assert(ss_.good(), - "Encountered checkpoint data stream error or eof"); - if(ctrl == 'L'){ - ArchData::line_idx_type ln_idx = 0; - ss_.read((char*)&ln_idx, sizeof(ln_idx)); // Presumed LE encoding - return ln_idx; - }else if(ctrl == 'E'){ - return ArchData::INVALID_LINE_IDX; // Done with restore - }else{ - throw SpartaException("Failed to restore a checkpoint because a '") - << ctrl << "' control character was found where an 'L' or 'E' was found"; - } - }; - - /*! - * \brief Read bytes for the current line - */ - void copyLineBytes(char* buf, uint32_t size) { - ss_.read(buf, size); - } - }; - - } // namespace storage - class FastCheckpointer; /*! @@ -509,7 +181,7 @@ namespace sparta::serialization::checkpoint o << '('; } if(cp->getID() == UNIDENTIFIED_CHECKPOINT){ - o << "*" << getDeletedID() << ""; + o << "*" << getDeletedID(); }else{ o << cp->getID(); } @@ -677,6 +349,35 @@ namespace sparta::serialization::checkpoint return dcps; } + /*! + * \brief Get the ID of our previous checkpoint. Returns UNIDENTIFIED_CHECKPOINT + * if we have no previous checkpoint, as is the case with the head checkpoint + * and those flagged for deletion. + */ + chkpt_id_t getPrevID() const override { + if (auto prev = static_cast(getPrev())) { + if (!prev->isFlaggedDeleted()) { + return prev->getID(); + } + } + return UNIDENTIFIED_CHECKPOINT; + } + + /*! + * \brief Returns next checkpoint following *this. May be an empty + * vector if there are no later checkpoints. + */ + std::vector getNextIDs() const override { + std::vector next_ids; + for (const auto chkpt : getNexts()) { + const auto dcp = static_cast(chkpt); + if (!dcp->isFlaggedDeleted()) { + next_ids.push_back(chkpt->getID()); + } + } + return next_ids; + } + /*! * \brief Attempts to restore this checkpoint including any previous * deltas (dependencies). diff --git a/sparta/sparta/serialization/checkpoint/FastCheckpointer.hpp b/sparta/sparta/serialization/checkpoint/FastCheckpointer.hpp index aeac93f2fb..ce92e3a33b 100644 --- a/sparta/sparta/serialization/checkpoint/FastCheckpointer.hpp +++ b/sparta/sparta/serialization/checkpoint/FastCheckpointer.hpp @@ -140,6 +140,32 @@ namespace sparta::serialization::checkpoint snap_thresh_ = thresh; } + /*! + * \brief Computes and returns the memory usage by this checkpointer at + * this moment including any framework overhead + * \note This is an approxiation and does not include some of + * minimal dynamic overhead from stl containers. + */ + uint64_t getTotalMemoryUse() const noexcept override { + uint64_t mem = 0; + for(auto& cp : chkpts_){ + mem += cp.second->getTotalMemoryUse(); + } + return mem; + } + + /*! + * \brief Computes and returns the memory usage by this checkpointer at + * this moment purely for the checkpoint state being held + */ + uint64_t getContentMemoryUse() const noexcept override { + uint64_t mem = 0; + for(auto& cp : chkpts_){ + mem += cp.second->getContentMemoryUse(); + } + return mem; + } + //////////////////////////////////////////////////////////////////////// //! @} @@ -232,18 +258,6 @@ namespace sparta::serialization::checkpoint cleanupChain_(rmv); } - /*! - * \brief Queries a specific checkpoint by ID - */ - bool checkpointExists(chkpt_id_t id) { - bool exists = false; - checkpoint_type* d = findCheckpoint_(id); - if(d){ - exists = true; - } - return exists; - } - /*! * \brief Gets all checkpoints taken at tick t on any timeline. * \param t Tick number at which checkpoints should found. @@ -252,7 +266,7 @@ namespace sparta::serialization::checkpoint * \note Makes a new vector of results. This should not be called in the * critical path. */ - std::vector getCheckpointsAt(tick_t t) const override { + std::vector getCheckpointsAt(tick_t t) override { std::vector results; for(auto& p : chkpts_){ const Checkpoint* cp = p.second.get(); @@ -272,7 +286,7 @@ namespace sparta::serialization::checkpoint * \note Makes a new vector of results. This should not be called in the * critical path. */ - std::vector getCheckpoints() const override { + std::vector getCheckpoints() override { std::vector results; for(auto& p : chkpts_){ const Checkpoint* cp = p.second.get(); @@ -328,7 +342,7 @@ namespace sparta::serialization::checkpoint * \note Makes a new vector of results. This should not be called in the * critical path. */ - std::deque getCheckpointChain(chkpt_id_t id) const override { + std::deque getCheckpointChain(chkpt_id_t id) override { std::deque results; if(!getHead()){ return results; @@ -362,7 +376,7 @@ namespace sparta::serialization::checkpoint * checkpoint. */ checkpoint_type* findLatestCheckpointAtOrBefore(tick_t tick, - chkpt_id_t from) override { + chkpt_id_t from) { checkpoint_type* d = findCheckpoint_(from); if(!d){ throw CheckpointError("There is no checkpoint with ID ") << from; @@ -379,13 +393,45 @@ namespace sparta::serialization::checkpoint return d; } + /*! + * \brief Finds a checkpoint by its ID + * \param id ID of checkpoint to find. Guaranteed not to be flagged as + * deleted + * \return Checkpoint with ID of \a id if found or nullptr if not found + */ + const checkpoint_type* findCheckpoint(chkpt_id_t id) noexcept { + auto it = chkpts_.find(id); + if (it != chkpts_.end()) { + return static_cast(it->second.get()); + } + return nullptr; + } /*! - * \brief Gets a checkpoint through findCheckpoint interface casted to - * the type of Checkpoint subclass used by this class. + * \brief Tests whether this checkpoint manager has a checkpoint with + * the given id. + * \return True if id refers to a checkpoint held by this checkpointer + * and false if not. If id == Checkpoint::UNIDENTIFIED_CHECKPOINT, + * always returns false */ - checkpoint_type* findInternalCheckpoint(chkpt_id_t id) { - return static_cast(findCheckpoint_(id)); + bool hasCheckpoint(chkpt_id_t id) noexcept override { + return chkpts_.find(id) != chkpts_.end(); + } + + /*! + * \brief Returns IDs of the checkpoints immediately following the given checkpoint. + */ + std::vector getNextIDs(chkpt_id_t id) override final { + std::vector next_ids; + if (const auto chkpt = findCheckpoint_(id)) { + for (const auto next : chkpt->getNexts()) { + const auto dcp = static_cast(next); + if (!dcp->isFlaggedDeleted()) { + next_ids.push_back(next->getID()); + } + } + } + return next_ids; } //////////////////////////////////////////////////////////////////////// @@ -404,6 +450,43 @@ namespace sparta::serialization::checkpoint return ss.str(); } + /*! + * \brief Dumps this checkpointer's flat list of checkpoints to an + * ostream with a newline following each checkpoint + * \param o ostream to dump to + */ + void dumpList(std::ostream& o) override { + for(auto& cp : chkpts_){ + o << cp.second->stringize() << std::endl; + } + } + + /*! + * \brief Dumps this checkpointer's data to an ostream with a newline + * following each checkpoint + * \param o ostream to dump to + */ + void dumpData(std::ostream& o) override { + for(auto& cp : chkpts_){ + cp.second->dumpData(o); + o << std::endl; + } + } + + /*! + * \brief Dumps this checkpointer's data to an + * ostream with annotations between each ArchData and a newline + * following each checkpoint description and each checkpoint data dump + * \param o ostream to dump to + */ + void dumpAnnotatedData(std::ostream& o) override { + for(auto& cp : chkpts_){ + o << cp.second->stringize() << std::endl; + cp.second->dumpData(o); + o << std::endl; + } + } + /*! * \brief Forwards debug/trace info onto checkpoint by ID */ @@ -461,7 +544,7 @@ namespace sparta::serialization::checkpoint // This snapshot is needed later. Move to previous delta and work from there d = static_cast(d->getPrev()); }else{ - return; // This delta is needed. Therefore all preceeding deltas are needed + return; // This delta is needed. Therefore all preceeding deltas are needed } } @@ -553,7 +636,7 @@ namespace sparta::serialization::checkpoint * returns nullptr. * \todo Faster lookup? */ - checkpoint_type* findCheckpoint_(chkpt_id_t id) noexcept override { + checkpoint_type* findCheckpoint_(chkpt_id_t id) noexcept { auto itr = chkpts_.find(id); if (itr != chkpts_.end()) { return static_cast(itr->second.get()); @@ -564,7 +647,7 @@ namespace sparta::serialization::checkpoint /*! * \brief const variant of findCheckpoint_ */ - const checkpoint_type* findCheckpoint_(chkpt_id_t id) const noexcept override { + const checkpoint_type* findCheckpoint_(chkpt_id_t id) const noexcept { auto itr = chkpts_.find(id); if (itr != chkpts_.end()) { return static_cast(itr->second.get()); @@ -575,17 +658,15 @@ namespace sparta::serialization::checkpoint /*! * \brief Implements Checkpointer::dumpCheckpointNode_ */ - void dumpCheckpointNode_(const Checkpoint* chkpt, std::ostream& o) const override { + void dumpCheckpointNode_(const chkpt_id_t id, std::ostream& o) override { static std::string SNAPSHOT_NOTICE = "(s)"; - - // checkpoint_type is a known direct base class of Checkpoint - const checkpoint_type* cp = static_cast(chkpt); + auto cp = findCheckpoint_(id); // Draw data for this checkpoint if(cp->isFlaggedDeleted()){ - o << chkpt->getDeletedRepr(); + o << cp->getDeletedRepr(); }else{ - o << chkpt->getID(); + o << cp->getID(); } // Show that this is a snapshot if(cp->isSnapshot()){ @@ -695,6 +776,15 @@ namespace sparta::serialization::checkpoint return dcp->getID(); } + /*! + * \brief All checkpoints sorted by ascending tick number (or + * equivalently ascending checkpoint ID since both are monotonically + * increasing) + * + * This map must still be explicitly torn down in reverse order by a + * subclass of Checkpointer + */ + std::map> chkpts_; /*! * \brief Snapshot generation threshold. Every n checkpoints in a chain diff --git a/sparta/sparta/serialization/checkpoint/StringStreamStorage.hpp b/sparta/sparta/serialization/checkpoint/StringStreamStorage.hpp new file mode 100644 index 0000000000..8b76f9992c --- /dev/null +++ b/sparta/sparta/serialization/checkpoint/StringStreamStorage.hpp @@ -0,0 +1,124 @@ +// -*- C++ -*- + +#pragma once + +#include "sparta/functional/ArchData.hpp" +#include "sparta/utils/SpartaException.hpp" + +namespace sparta::serialization::checkpoint::storage +{ + +/*! + * \brief Stringstream storage implementation + * \warning This is deprecated in favor of VectorStorage for in-memory uses. + * However, this is a starting point for disk-based storage schemes + */ +class StringStreamStorage +{ + std::stringstream ss_; + +public: + StringStreamStorage() { + ss_.exceptions(std::ostream::eofbit | std::ostream::badbit | + std::ostream::failbit | std::ostream::goodbit); + } + + void dump(std::ostream& o) const { + auto s = ss_.str(); + auto itr = s.begin(); + for(; itr != s.end(); itr++){ + char chr = *itr; + if(chr == 'L'){ + uint32_t off = 0; + ArchData::line_idx_type ln_idx; + strncpy((char*)&ln_idx, s.substr(itr-s.begin(), sizeof(ln_idx)).c_str(), sizeof(ln_idx)); + std::cout << "\nLine: " << ln_idx << std::endl; + itr += sizeof(ArchData::line_idx_type); + + for(uint16_t i=0; i<64; ++i){ + chr = *itr; + if(off % 32 == 0){ + o << std::setw(7) << std::hex << off; + } + if(chr == 0){ + o << ' ' << ".."; + }else{ + o << ' ' << std::setfill('0') << std::setw(2) << std::hex << (0xff & (uint16_t)chr); + } + off++; + if(off % 32 == 0){ + o << std::endl; + } + ++itr; + } + } + } + } + + uint32_t getSize() const { + return ss_.str().size() + sizeof(decltype(*this)); + } + + void prepareForLoad() { + ss_.seekg(0); // Seek to start with get pointer before consuming + } + + void beginLine(ArchData::line_idx_type idx) { + ss_ << 'L'; // Line start char + + ArchData::line_idx_type idx_repr = reorder(idx); + ss_.write((char*)&idx_repr, sizeof(ArchData::line_idx_type)); + } + + void writeLineBytes(const char* data, size_t size) { + ss_.write(data, size); + } + + /*! + * \brief Signals end of this checkpoint's data + */ + void endArchData() { + ss_ << "E"; // Indicates end of this checkpoint data + + sparta_assert(ss_.good(), + "Ostream error while writing checkpoint data"); + } + + /*! + * \brief Is the reading state of this storage good? (i.e. haven't tried + * to read past the end of the data) + */ + bool good() const { + return ss_.good(); + } + + /*! + * \brief Restore next line. Return ArchData::INVALID_LINE_IDX on + * end of data. + */ + ArchData::line_idx_type getNextRestoreLine() { + char ctrl; + ss_ >> ctrl; + sparta_assert(ss_.good(), + "Encountered checkpoint data stream error or eof"); + if(ctrl == 'L'){ + ArchData::line_idx_type ln_idx = 0; + ss_.read((char*)&ln_idx, sizeof(ln_idx)); // Presumed LE encoding + return ln_idx; + }else if(ctrl == 'E'){ + return ArchData::INVALID_LINE_IDX; // Done with restore + }else{ + throw SpartaException("Failed to restore a checkpoint because a '") + << ctrl << "' control character was found where an 'L' or 'E' was found"; + } + }; + + /*! + * \brief Read bytes for the current line + */ + void copyLineBytes(char* buf, uint32_t size) { + ss_.read(buf, size); + } +}; + +} // namespace sparta::serialization::checkpoint::storage diff --git a/sparta/sparta/serialization/checkpoint/VectorStorage.hpp b/sparta/sparta/serialization/checkpoint/VectorStorage.hpp new file mode 100644 index 0000000000..94ebab97f2 --- /dev/null +++ b/sparta/sparta/serialization/checkpoint/VectorStorage.hpp @@ -0,0 +1,229 @@ +// -*- C++ -*- + +#pragma once + +#include "sparta/functional/ArchData.hpp" +#include "sparta/utils/SpartaException.hpp" + +namespace sparta::serialization::checkpoint::storage +{ + +/*! + * \brief Vector of buffers storage implementation + */ +class VectorStorage +{ + class Segment{ + ArchData::line_idx_type idx_; + std::vector data_; + uint32_t bytes_; + public: + + /*! + * \brief Copy constructor + */ + Segment(const Segment&) = default; + + /*! + * \brief Move constructor + */ + Segment(Segment&& rhp) : + idx_(rhp.idx_), + data_(std::move(rhp.data_)), + bytes_(rhp.bytes_) + { + rhp.idx_ = ArchData::INVALID_LINE_IDX; + rhp.bytes_ = 0; + } + + /*! + * \brief Dummy constructor. Represents null entry (end of ArchData) + */ + Segment() : + idx_(ArchData::INVALID_LINE_IDX), + bytes_(0) + {;} + + /*! + * \brief Deleted assignment operator + */ + Segment& operator=(const Segment& rhp) = delete; + + /*! + * \brief Data constructor. Allocates data and copies results over + */ + Segment(ArchData::line_idx_type idx, const char* data, size_t bytes) : + idx_(idx), bytes_(bytes) + { + sparta_assert(idx != ArchData::INVALID_LINE_IDX, + "Attempted to create segment of " << bytes << " bytes with invalid line index"); + data_.resize(bytes); + ::memcpy(data_.data(), data, bytes); + } + + template + void serialize(Archive& ar, const unsigned int /*version*/) { + ar & idx_; + ar & data_; + ar & bytes_; + } + + ArchData::line_idx_type getLineIdx() const { + return idx_; + } + + uint32_t getSize() const { + return sizeof(decltype(*this)) + bytes_; + } + + void copyTo(char* buf, uint32_t size) const { + sparta_assert(size == bytes_, \ + "Attempted to restore checkpoint data for a line where the " + "data was " << bytes_ << " bytes but the loader requested " + << size << " bytes. The sizes must match up or something is " + "wrong"); + memcpy(buf, data_.data(), bytes_); + } + + void dump(std::ostream& o) const { + if(idx_ == ArchData::INVALID_LINE_IDX){ + std::cout << "\nEnd of ArchData"; + return; + } + + std::cout << "\nLine: " << std::dec << idx_ << " (" << bytes_ << ") bytes"; + for(uint32_t off = 0; off < bytes_;){ + char chr = data_[off]; + if(off % 32 == 0){ + o << std::endl << std::setw(7) << std::hex << off; + } + if(chr == 0){ + o << ' ' << ".."; + }else{ + o << ' ' << std::setfill('0') << std::setw(2) << std::hex << (0xff & (uint16_t)chr); + } + off++; + } + } + }; + + /*! + * \brief Data segments to restore + */ + std::vector data_; + + /*! + * \brief Next line index to store when writing lines + */ + ArchData::line_idx_type next_idx_ = ArchData::INVALID_LINE_IDX; + + /*! + * \brief Index in data_ of next line to restore in nextRestoreLine + */ + uint32_t next_restore_idx_ = 0; + + /*! + * \brief iterator in data_ of line being read by call to readLineData. + * Is always next_restore_idx_ or one less. + */ + decltype(data_)::const_iterator cur_restore_itr_; + +public: + VectorStorage() { + } + + ~VectorStorage() { + } + + VectorStorage(const VectorStorage&) = default; + + template + void serialize(Archive& ar, const unsigned int /*version*/) { + ar & data_; + } + + void dump(std::ostream& o) const { + for(auto const &seg : data_){ + seg.dump(o); + } + } + + uint32_t getSize() const { + uint32_t bytes = sizeof(decltype(*this)); + for(Segment const & seg : data_){ + bytes += seg.getSize(); + } + return bytes; + } + + void prepareForLoad() { + next_restore_idx_ = 0; + cur_restore_itr_ = data_.begin(); + } + + void beginLine(ArchData::line_idx_type idx) { + sparta_assert(idx != ArchData::INVALID_LINE_IDX, + "Cannot begin line with INVALID_LINE_IDX index"); + next_idx_ = idx; + } + + void writeLineBytes(const char* data, size_t size) { + sparta_assert(data_.size() == 0 || data_.back().getLineIdx() != next_idx_, + "Cannot store the same line idx twice in a checkpoint. Line " + << next_idx_ << " detected twice in a row"); + sparta_assert(next_idx_ != ArchData::INVALID_LINE_IDX, + "Cannot write line bytes with INVALID_LINE_IDX index"); + data_.emplace_back(next_idx_, data, size); + } + + /*! + * \brief Signals end of this checkpoint's data for one ArchData + */ + void endArchData() { + data_.emplace_back(); + } + + /*! + * \brief Is the reading state of this storage good? (i.e. haven't tried + * to read past the end of the data) + */ + bool good() const { + return next_restore_idx_ <= data_.size(); // Not past end of stream + } + + /*! + * \brief Restore next line. Return ArchData::INVALID_LINE_IDX on + * end of data. + */ + ArchData::line_idx_type getNextRestoreLine() { + if(next_restore_idx_ == data_.size()){ + next_restore_idx_++; // Increment to detect errors + return ArchData::INVALID_LINE_IDX; // Done with restore + }else if(next_restore_idx_ > data_.size()){ // Past the end + throw SpartaException("Failed to restore a checkpoint because ") + << "caller tried to keep getting next line even after " + "reaching the end of the restore data"; + } + if(next_restore_idx_ != 0){ + cur_restore_itr_++; + } + next_restore_idx_++; + + const auto next_line_idx = cur_restore_itr_->getLineIdx(); // May be invalid to indicate end of ArchData + return next_line_idx; + }; + + /*! + * \brief Read bytes for the current line + */ + void copyLineBytes(char* buf, uint32_t size) { + sparta_assert(cur_restore_itr_ != data_.end(), + "Attempted to copy line bytes from an invalid line iterator"); + sparta_assert(cur_restore_itr_->getLineIdx() != ArchData::INVALID_LINE_IDX, + "About to return line from checkpoint data segment with INVALID_LINE_IDX index"); + cur_restore_itr_->copyTo(buf, size); + } + +}; + +} // namespace sparta::serialization::checkpoint::storage diff --git a/sparta/src/DatabaseCheckpoint.cpp b/sparta/src/DatabaseCheckpoint.cpp new file mode 100644 index 0000000000..931381bf47 --- /dev/null +++ b/sparta/src/DatabaseCheckpoint.cpp @@ -0,0 +1,158 @@ +// -*- C++ -*- + +#include "sparta/serialization/checkpoint/DatabaseCheckpoint.hpp" +#include "sparta/serialization/checkpoint/DatabaseCheckpointer.hpp" + +namespace sparta::serialization::checkpoint +{ + +using tick_t = typename CheckpointBase::tick_t; +using chkpt_id_t = typename CheckpointBase::chkpt_id_t; +using checkpoint_type = DatabaseCheckpoint; +using checkpoint_ptr = std::shared_ptr; + +DatabaseCheckpoint::DatabaseCheckpoint(TreeNode& root, + const std::vector& dats, + chkpt_id_t id, + tick_t tick, + DatabaseCheckpoint* prev, + bool is_snapshot, + DatabaseCheckpointer* checkpointer) + : CheckpointBase(id, tick) + , prev_id_(prev ? prev->getID() : UNIDENTIFIED_CHECKPOINT) + , is_snapshot_(is_snapshot) + , checkpointer_(checkpointer) +{ + (void)root; + if (prev_id_ == UNIDENTIFIED_CHECKPOINT) { + if (is_snapshot == false) { + throw CheckpointError("Cannot create a DatabaseCheckpoint id=") + << id << " at tick=" << tick << " which has no prev_delta and is not a snapshot"; + } + } + + if (prev) { + if (!prev->next_ids_.empty()) { + throw CheckpointError("DatabaseCheckpointer does not support multiple checkpoint branches"); + } + prev->next_ids_.push_back(getID()); + } + + // Store the checkpoint from root + if (is_snapshot) { + storeSnapshot_(dats); + } else { + storeDelta_(dats); + } +} + +std::string DatabaseCheckpoint::stringize() const +{ + std::stringstream ss; + ss << "'; + return ss.str(); +} + +void DatabaseCheckpoint::dumpData(std::ostream& o) const +{ + data_.dump(o); +} + +uint64_t DatabaseCheckpoint::getTotalMemoryUse() const noexcept +{ + return getContentMemoryUse() \ + + sizeof(decltype(*this)) \ + + (getNextIDs().size() * sizeof(typename std::remove_reference::type*)); +} + +uint64_t DatabaseCheckpoint::getContentMemoryUse() const noexcept +{ + return data_.getSize(); +} + +std::stack DatabaseCheckpoint::getHistoryChain() const +{ + return checkpointer_->getHistoryChain(getID()); +} + +std::stack DatabaseCheckpoint::getRestoreChain() const +{ + return checkpointer_->getRestoreChain(getID()); +} + +chkpt_id_t DatabaseCheckpoint::getPrevID() const +{ + return prev_id_; +} + +std::vector DatabaseCheckpoint::getNextIDs() const +{ + return next_ids_; +} + +void DatabaseCheckpoint::load(const std::vector& dats) +{ + // Build stack up to last snapshot + std::stack chkpt_ids = getRestoreChain(); + + // Load in proper order + while (!chkpt_ids.empty()) { + auto id = chkpt_ids.top(); + chkpt_ids.pop(); + checkpointer_->findCheckpoint(id)->loadState(dats); + } +} + +bool DatabaseCheckpoint::isSnapshot() const noexcept +{ + return is_snapshot_; +} + +uint32_t DatabaseCheckpoint::getDistanceToPrevSnapshot() const noexcept +{ + return checkpointer_->getDistanceToPrevSnapshot(getID()); +} + +void DatabaseCheckpoint::loadState(const std::vector& dats) +{ + data_.prepareForLoad(); + sparta_assert(data_.good(), + "Attempted to loadState from a DeltaCheckpoint with a bad data buffer"); + if(isSnapshot()){ + for(ArchData* ad : dats){ + ad->restoreAll(data_); + } + }else{ + for(ArchData* ad : dats){ + ad->restore(data_); + } + } +} + +void DatabaseCheckpoint::storeSnapshot_(const std::vector& dats) +{ + sparta_assert(data_.good(), + "Attempted to storeSnapshot_ from a DatabaseCheckpoint with a bad data buffer"); + + for (ArchData* ad : dats) { + ad->saveAll(data_); + } +} + +void DatabaseCheckpoint::storeDelta_(const std::vector& dats) +{ + sparta_assert(data_.good(), + "Attempted to storeDelta_ from a DatabaseCheckpoint with a bad data buffer"); + + for (ArchData* ad : dats) { + ad->save(data_); + } +} + +} // namespace sparta::serialization::checkpoint diff --git a/sparta/src/DatabaseCheckpointer.cpp b/sparta/src/DatabaseCheckpointer.cpp new file mode 100644 index 0000000000..61b6a46964 --- /dev/null +++ b/sparta/src/DatabaseCheckpointer.cpp @@ -0,0 +1,904 @@ +// -*- C++ -*- + +#include "sparta/serialization/checkpoint/DatabaseCheckpointer.hpp" +#include "simdb/apps/AppRegistration.hpp" +#include "simdb/schema/SchemaDef.hpp" +#include "simdb/pipeline/AsyncDatabaseAccessor.hpp" +#include "simdb/pipeline/Pipeline.hpp" +#include "simdb/pipeline/elements/Function.hpp" +#include "simdb/pipeline/elements/Buffer.hpp" +#include "simdb/utils/Compress.hpp" + +#include +#include +#include +#include +#include +#include + +namespace sparta::serialization::checkpoint +{ + +using tick_t = typename CheckpointBase::tick_t; +using chkpt_id_t = typename CheckpointBase::chkpt_id_t; +using window_id_t = typename DatabaseCheckpointer::window_id_t; + +DatabaseCheckpointer::DatabaseCheckpointer(simdb::DatabaseManager* db_mgr, TreeNode& root, Scheduler* sched) : + Checkpointer(root, sched), + db_mgr_(db_mgr), + next_chkpt_id_(checkpoint_type::MIN_CHECKPOINT) +{ +} + +void DatabaseCheckpointer::defineSchema(simdb::Schema& schema) +{ + using dt = simdb::SqlDataType; + + auto& windows = schema.addTable("ChkptWindows"); + windows.addColumn("WindowID", dt::uint64_t); + windows.addColumn("WindowBytes", dt::blob_t); + windows.addColumn("StartChkpID", dt::uint64_t); + windows.addColumn("EndChkpID", dt::uint64_t); + windows.addColumn("StartTick", dt::uint64_t); + windows.addColumn("EndTick", dt::uint64_t); + windows.createIndexOn("WindowID"); + windows.createCompoundIndexOn({"StartChkpID", "EndChkpID"}); + windows.createCompoundIndexOn({"StartTick", "EndTick"}); + windows.disableAutoIncPrimaryKey(); +} + +std::unique_ptr DatabaseCheckpointer::createPipeline( + simdb::pipeline::AsyncDatabaseAccessor* db_accessor) +{ + auto pipeline = std::make_unique(db_mgr_, NAME); + + // Task 1: Package up checkpoints into a checkpoint window + auto create_window = simdb::pipeline::createTask>( + [](checkpoint_ptrs&& chkpts, + simdb::ConcurrentQueue& windows, + bool /*force_flush*/) + { + ChkptWindow window; + window.start_chkpt_id = chkpts.front()->getID(); + window.end_chkpt_id = chkpts.back()->getID(); + window.start_tick = chkpts.front()->getTick(); + window.end_tick = chkpts.back()->getTick(); + window.chkpts = std::move(chkpts); + windows.emplace(std::move(window)); + } + ); + + // Task 2: Serialize a checkpoint window into a char buffer + auto window_to_bytes = simdb::pipeline::createTask>( + [](ChkptWindow&& window, + simdb::ConcurrentQueue& window_bytes, + bool /*force_flush*/) + { + ChkptWindowBytes bytes; + boost::iostreams::back_insert_device> inserter(bytes.chkpt_bytes); + boost::iostreams::stream>> os(inserter); + boost::archive::binary_oarchive oa(os); + oa << window; + os.flush(); + + bytes.start_chkpt_id = window.start_chkpt_id; + bytes.end_chkpt_id = window.end_chkpt_id; + bytes.start_tick = window.start_tick; + bytes.end_tick = window.end_tick; + window_bytes.emplace(std::move(bytes)); + } + ); + + // Task 3: Perform zlib compression on the checkpoint window bytes + auto zlib_bytes = simdb::pipeline::createTask>( + [](ChkptWindowBytes&& bytes_in, + simdb::ConcurrentQueue& bytes_out, + bool /*force_flush*/) + { + std::vector compressed_bytes; + simdb::compressData(bytes_in.chkpt_bytes, compressed_bytes); + std::swap(bytes_in.chkpt_bytes, compressed_bytes); + bytes_out.emplace(std::move(bytes_in)); + } + ); + + // Task 4: Write to the database + auto write_to_db = db_accessor->createAsyncWriter( + [this](ChkptWindowBytes&& bytes_in, + simdb::pipeline::AppPreparedINSERTs* tables, + bool /*force_flush*/) + { + auto window_inserter = tables->getPreparedINSERT("ChkptWindows"); + + utils::ValidValue win_id; + for (chkpt_id_t cid = bytes_in.start_chkpt_id; cid <= bytes_in.end_chkpt_id; ++cid) { + auto window_id = getWindowID_(cid); + if (!win_id.isValid()) { + win_id = window_id; + } else if (win_id != window_id) { + throw CheckpointError("Checkpoint window has inconsistent window IDs"); + } + } + + window_inserter->setColumnValue(0, win_id.getValue()); + window_inserter->setColumnValue(1, bytes_in.chkpt_bytes); + window_inserter->setColumnValue(2, bytes_in.start_chkpt_id); + window_inserter->setColumnValue(3, bytes_in.end_chkpt_id); + window_inserter->setColumnValue(4, bytes_in.start_tick); + window_inserter->setColumnValue(5, bytes_in.end_tick); + window_inserter->createRecord(); + } + ); + + *create_window >> *window_to_bytes >> *zlib_bytes >> *write_to_db; + + pipeline_head_ = create_window->getTypedInputQueue(); + + pipeline_flusher_ = std::make_unique( + *db_mgr_, create_window, window_to_bytes, zlib_bytes, write_to_db); + + pipeline->createTaskGroup("CheckpointPipeline") + ->addTask(std::move(create_window)) + ->addTask(std::move(window_to_bytes)) + ->addTask(std::move(zlib_bytes)); + + return pipeline; +} + +uint32_t DatabaseCheckpointer::getSnapshotThreshold() const +{ + return snap_thresh_; +} + +void DatabaseCheckpointer::setSnapshotThreshold(uint32_t thresh) +{ + sparta_assert(!snap_thresh_.isValid(), "Snapshot threshold cannot be changed once set."); + sparta_assert(thresh > 1, "Snapshot threshold must be greater than 1"); + snap_thresh_ = thresh; +} + +void DatabaseCheckpointer::setMaxCachedWindows(uint32_t max_windows) +{ + sparta_assert(!max_cached_windows_.isValid(), "Max cached windows cannot be changed once set."); + sparta_assert(max_windows > 0, "Max cached windows must be greater than 0"); + max_cached_windows_ = max_windows; +} + +uint64_t DatabaseCheckpointer::getTotalMemoryUse() const noexcept +{ + std::lock_guard lock(cache_mutex_); + + // Only add up the memory use from the cache. + uint64_t mem = 0; + for (const auto& [win_id, window] : chkpts_cache_) { + for (const auto& chkpt : window) { + mem += chkpt->getTotalMemoryUse(); + } + } + return mem; +} + +uint64_t DatabaseCheckpointer::getContentMemoryUse() const noexcept +{ + std::lock_guard lock(cache_mutex_); + + // Only add up the memory use from the cache. + uint64_t mem = 0; + for (const auto& [win_id, window] : chkpts_cache_) { + for (const auto& chkpt : window) { + mem += chkpt->getContentMemoryUse(); + } + } + return mem; +} + +void DatabaseCheckpointer::deleteCheckpoint(chkpt_id_t) +{ + throw CheckpointError("Explicit checkpoint deletion is not supported by DatabaseCheckpointer"); +} + +void DatabaseCheckpointer::loadCheckpoint(chkpt_id_t id) +{ + std::lock_guard lock(cache_mutex_); + + if (auto c = getCurrent_(); !c || c->getID() == id) { + return; + } + + auto chkpt = findCheckpoint(id, true); + chkpt->load(getArchDatas()); + + // Delete all future checkpoints past this one. Do this from the cache + // as well as from the database. + auto next_ids = chkpt->getNextIDs(); + if (!next_ids.empty()) { + if (next_ids.size() != 1) { + throw CheckpointError("DatabaseCheckpointer does not support multiple checkpoint branches"); + } + deleteCheckpoint_(next_ids[0]); + } + + // Detach future checkpoints from this one as they are deleted. + chkpt->next_ids_.clear(); + + // Move current to this checkpoint. + setCurrent_(chkpt.get()); + + // Increasing-by-one, starting-at-zero checkpoint IDs guarantee we can do this: + next_chkpt_id_ = id + 1; + + // Restore scheduler tick number + if (sched_) { + sched_->restartAt(getCurrentTick()); + } +} + +void DatabaseCheckpointer::preTeardown() +{ + // Send every window down the pipeline and flush it. + evictWindowsIfNeeded_(true); + pipeline_flusher_->flush(); +} + +std::vector DatabaseCheckpointer::getCheckpointsAt(tick_t t) +{ + std::unordered_set ids; + + forEachCheckpoint_([t, &ids](const DatabaseCheckpoint* chkpt) { + if (chkpt->getTick() == t) { + ids.insert(chkpt->getID()); + } + }); + + std::vector ret(ids.begin(), ids.end()); + std::sort(ret.begin(), ret.end()); + return ret; +} + +std::vector DatabaseCheckpointer::getCheckpoints() +{ + std::unordered_set ids; + + forEachCheckpoint_([&ids](const DatabaseCheckpoint* chkpt) { + ids.insert(chkpt->getID()); + }); + + std::vector ret(ids.begin(), ids.end()); + std::sort(ret.begin(), ret.end()); + return ret; +} + +uint32_t DatabaseCheckpointer::getNumCheckpoints() const noexcept +{ + return next_chkpt_id_; +} + +uint32_t DatabaseCheckpointer::getNumSnapshots() const noexcept +{ + return next_chkpt_id_ ? getWindowID_(next_chkpt_id_) + 1 : 0; +} + +uint32_t DatabaseCheckpointer::getNumDeltas() const noexcept +{ + return getNumCheckpoints() - getNumSnapshots(); +} + +std::deque DatabaseCheckpointer::getCheckpointChain(chkpt_id_t id) +{ + std::deque chain; + if (!getHead()) { + return chain; + } + + if (hasCheckpoint(id)) { + // This checkpointer guarantees a linear chain of checkpoints with no gaps. + // While we could also walk backwards using getPrevID(), load checkpoints + // into memory, and call getID() on each of them, the result of doing that + // would effectively load every window into our cache only to dump most of + // them (LRU). The cache could very well end up being 100% full of very old + // checkpoints, thus slowing down further API calls to reload newer windows + // into the cache. + do { + chain.push_back(id); + } while (id-- > 0); + } else { + throw CheckpointError("There is no checkpoint with ID ") << id; + } + + return chain; +} + +std::shared_ptr DatabaseCheckpointer::findCheckpoint(chkpt_id_t id, bool must_exist) +{ + std::lock_guard lock(cache_mutex_); + + if (!ensureWindowLoaded_(id, must_exist)) { + return nullptr; + } + + auto win_id = getWindowID_(id); + auto& window = chkpts_cache_[win_id]; + sparta_assert(!window.empty()); + + // Find the checkpoint in the window in constant time, noting that + // the window will have checkpoints in ascending order by ID with + // no gaps. + auto snapshot_id = window.front()->getID(); + auto& chkpt = window.at(id - snapshot_id); + sparta_assert(chkpt->getID() == id); + return chkpt; +} + +std::shared_ptr DatabaseCheckpointer::findLatestCheckpointAtOrBefore(tick_t tick, chkpt_id_t from) +{ + std::lock_guard lock(cache_mutex_); + + auto chkpt = findCheckpoint(from, true); + do { + if (chkpt->getTick() <= tick) { + break; + } + chkpt = findCheckpoint(chkpt->getPrevID()); + } while (chkpt); + + return (chkpt && chkpt->getTick() <= tick) ? chkpt : nullptr; +} + +bool DatabaseCheckpointer::hasCheckpoint(chkpt_id_t id) noexcept +{ + return findCheckpoint(id) != nullptr; +} + +void DatabaseCheckpointer::dumpRestoreChain(std::ostream& o, chkpt_id_t id) +{ + auto rc = getRestoreChain(id); + while (true) { + auto chkpt = findCheckpoint(rc.top()); + rc.pop(); + + if (chkpt->isSnapshot()) { + o << "("; + } + o << chkpt->getID(); + if (chkpt->isSnapshot()) { + o << ")"; + } + if (rc.empty()) { + break; + } else { + o << " --> "; + } + } +} + +std::stack DatabaseCheckpointer::getHistoryChain(chkpt_id_t id) +{ + ensureWindowLoaded_(id, true); + + std::stack chain; + do { + chain.push(id); + } while (id-- > 0); + + return chain; +} + +std::stack DatabaseCheckpointer::getRestoreChain(chkpt_id_t id) +{ + std::lock_guard lock(cache_mutex_); + + ensureWindowLoaded_(id, true); + auto win_id = getWindowID_(id); + auto& window = chkpts_cache_[win_id]; + sparta_assert(!window.empty()); + + std::stack chain; + auto snapshot_id = window.front()->getID(); + do { + chain.push(id); + } while (id-- > snapshot_id); + + return chain; +} + +std::vector DatabaseCheckpointer::getNextIDs(chkpt_id_t id) +{ + auto chkpt = findCheckpoint(id, true); + return chkpt->getNextIDs(); +} + +uint32_t DatabaseCheckpointer::getDistanceToPrevSnapshot(chkpt_id_t id) noexcept +{ + return getRestoreChain(id).size() - 1; +} + +bool DatabaseCheckpointer::isSnapshot(chkpt_id_t id) noexcept +{ + auto chkpt = findCheckpoint(id, true); + return chkpt->isSnapshot(); +} + +std::string DatabaseCheckpointer::stringize() const +{ + std::stringstream ss; + ss << "'; + return ss.str(); +} + +void DatabaseCheckpointer::dumpList(std::ostream& o) +{ + std::map chkpt_strings; + + forEachCheckpoint_([&chkpt_strings](const DatabaseCheckpoint* chkpt) { + chkpt_strings[chkpt->getID()] = chkpt->stringize(); + }); + + for (const auto& [id, str] : chkpt_strings) { + o << str << "\n"; + } + o << std::flush; +} + +void DatabaseCheckpointer::dumpData(std::ostream& o) +{ + std::map chkpt_strings; + + forEachCheckpoint_([&chkpt_strings](const DatabaseCheckpoint* chkpt) { + std::ostringstream oss; + chkpt->dumpData(oss); + chkpt_strings[chkpt->getID()] = oss.str(); + }); + + for (const auto& [id, str] : chkpt_strings) { + o << str << "\n"; + } + o << std::flush; +} + +void DatabaseCheckpointer::dumpAnnotatedData(std::ostream& o) +{ + std::map chkpt_strings; + + forEachCheckpoint_([&chkpt_strings](const DatabaseCheckpoint* chkpt) { + std::ostringstream oss; + oss << chkpt->stringize() << "\n"; + chkpt->dumpData(oss); + chkpt_strings[chkpt->getID()] = oss.str(); + }); + + for (const auto& [id, str] : chkpt_strings) { + o << str << "\n"; + } + o << std::flush; +} + +void DatabaseCheckpointer::traceValue( + std::ostream& o, + chkpt_id_t id, + const ArchData* container, + uint32_t offset, + uint32_t size) +{ + (void)o; + (void)id; + (void)container; + (void)offset; + (void)size; + + throw CheckpointError("DatabaseCheckpointer::traceValue() not implemented"); +} + +bool DatabaseCheckpointer::isCheckpointCached(chkpt_id_t id) const noexcept +{ + std::lock_guard lock(cache_mutex_); + const auto win_id = getWindowID_(id); + const auto it = chkpts_cache_.find(win_id); + return it != chkpts_cache_.end(); +} + +void DatabaseCheckpointer::createHead_() +{ + std::lock_guard lock(cache_mutex_); + + tick_t tick = 0; + if (sched_) { + tick = sched_->getCurrentTick(); + } + + if (getHead()) { + throw CheckpointError("Cannot create head at ") + << tick << " because a head already exists in this checkpointer"; + } + if (getRoot().isFinalized() == false) { + CheckpointError exc("Cannot create a checkpoint until the tree is finalized. Attempting to checkpoint from node "); + exc << getRoot().getLocation() << " at tick "; + if (sched_) { + exc << tick; + }else{ + exc << ""; + } + throw exc; + } + + std::shared_ptr chkpt(new checkpoint_type( + getRoot(), getArchDatas(), next_chkpt_id_++, tick, + nullptr, true, this)); + + setHead_(chkpt.get()); + setCurrent_(chkpt.get()); + addToCache_(std::move(chkpt)); +} + +chkpt_id_t DatabaseCheckpointer::createCheckpoint_(bool force_snapshot) +{ + if (force_snapshot) { + throw CheckpointError("DatabaseCheckpointer does not support forced snapshots"); + } + + std::lock_guard lock(cache_mutex_); + + bool is_snapshot; + checkpoint_type* prev; + + if (next_chkpt_id_ == checkpoint_type::UNIDENTIFIED_CHECKPOINT) { + throw CheckpointError("Exhausted all ") + << checkpoint_type::UNIDENTIFIED_CHECKPOINT << " possible checkpoint IDs. " + << "This is likely a gross misuse of checkpointing"; + } + + // Caller guarantees a head + sparta_assert(getHead() != nullptr); + + tick_t tick; + if (sched_) { + tick = sched_->getCurrentTick(); + } else { + tick = 0; + } + + if (sched_ && (tick < getHead()->getTick())) { + throw CheckpointError("Cannot create a new checkpoint at tick ") + << tick << " because this tick number is smaller than the tick number of the head checkpoint at: " + << getHead()->getTick() << ". The head checkpoint cannot be reset once created, so it should be done " + << "at the start of simulation before running. The simulator front-end should do this so this must " + << "likely be fixed in the simulator."; + } + + if (nullptr == getCurrent_()) { + // Creating a delta from the head + prev = static_cast(getHead_()); + is_snapshot = false; + } else { + if (sched_ && (tick < getCurrent_()->getTick())) { + throw CheckpointError("Current tick number from sparta scheduler (") + << tick << " ) is less than the current checkpoint's tick number (" + << getCurrent_()->getTick() << " To create a checkpoint with an earlier tick number, an " + << "older checkpoint having a tick number <= the tick number specified here must first be " + << "loaded"; + } + + // Find latest checkpoint <= tick + prev = static_cast(getCurrent_()); + is_snapshot = prev->getDistanceToPrevSnapshot() >= getSnapshotThreshold(); + } + + std::shared_ptr chkpt(new checkpoint_type( + getRoot(), getArchDatas(), next_chkpt_id_++, tick, + prev, force_snapshot || is_snapshot, this)); + + auto current = chkpt.get(); + setCurrent_(current); + addToCache_(std::move(chkpt)); + return current->getID(); +} + +void DatabaseCheckpointer::deleteCheckpoint_(chkpt_id_t id) +{ + // Throw if trying to delete head checkpoint + if (id == getHeadID()) { + throw CheckpointError("Cannot delete head checkpoint with ID ") << id; + } + + window_id_t start_win_id = getWindowID_(id); + window_id_t end_win_id = getWindowID_(next_chkpt_id_ - 1); + + for (window_id_t win_id = start_win_id; win_id <= end_win_id; ++win_id) { + auto it = chkpts_cache_.find(win_id); + if (it != chkpts_cache_.end()) { + if (win_id == start_win_id) { + // Only delete checkpoints in this window >= id + auto& window = it->second; + auto new_end = std::remove_if(window.begin(), window.end(), + [id](const std::shared_ptr& chkpt) { + return chkpt->getID() >= id; + }); + + window.erase(new_end, window.end()); + if (window.empty()) { + chkpts_cache_.erase(it); + } + } else { + // Delete the entire window + chkpts_cache_.erase(it); + } + } + } + + // Now delete from the database + pipeline_flusher_->flush(); + + db_mgr_->safeTransaction([&]() { + // DELETE FROM ChkptWindows WHERE WindowID > start_win_id + auto query = db_mgr_->createQuery("ChkptWindows"); + query->addConstraintForUInt64("WindowID", simdb::Constraints::GREATER, start_win_id); + query->deleteResultSet(); + + // Now update the window containing start_win_id to remove checkpoints >= id + query->resetConstraints(); + query->addConstraintForUInt64("WindowID", simdb::Constraints::EQUAL, start_win_id); + + std::vector compressed_window_bytes; + query->select("WindowBytes", compressed_window_bytes); + + auto results = query->getResultSet(); + if (results.getNextRecord()) { + // DELETE FROM ChkptWindows WHERE WindowID = start_win_id + query->deleteResultSet(); + + // Deserialize the window + auto window = deserializeWindow_(compressed_window_bytes); + + // Remove checkpoints >= id + auto new_end = std::remove_if(window->chkpts.begin(), window->chkpts.end(), + [id](const std::shared_ptr& chkpt) { + return chkpt->getID() >= id; + }); + window->chkpts.erase(new_end, window->chkpts.end()); + + // Send down the pipeline + if (!window->chkpts.empty()) { + pipeline_head_->emplace(std::move(window->chkpts)); + } + } + }); +} + +void DatabaseCheckpointer::dumpCheckpointNode_(const chkpt_id_t id, std::ostream& o) +{ + static std::string SNAPSHOT_NOTICE = "(s)"; + + auto chkpt = findCheckpoint(id, true); + o << chkpt->getID(); + if (chkpt->isSnapshot()) { + o << ' ' << SNAPSHOT_NOTICE; + } +} + +void DatabaseCheckpointer::setHead_(DatabaseCheckpoint* head) +{ + const auto id = head->getID(); + sparta_assert(id != checkpoint_type::UNIDENTIFIED_CHECKPOINT); + sparta_assert(head_id_ == checkpoint_type::UNIDENTIFIED_CHECKPOINT); + + std::lock_guard lock(cache_mutex_); + Checkpointer::setHead_(head); + head_id_ = id; +} + +void DatabaseCheckpointer::setCurrent_(DatabaseCheckpoint* current) +{ + const auto id = current->getID(); + sparta_assert(id != checkpoint_type::UNIDENTIFIED_CHECKPOINT); + + std::lock_guard lock(cache_mutex_); + Checkpointer::setCurrent_(current); + current_id_ = id; +} + +void DatabaseCheckpointer::addToCache_(std::shared_ptr chkpt) +{ + std::lock_guard lock(cache_mutex_); + + const auto win_id = chkpt->getID() / (snap_thresh_ + 1); + auto& window = chkpts_cache_[win_id]; + sparta_assert(window.empty() || window.back()->getID() == chkpt->getID() - 1, + "Checkpoints must be added in ID order with no gaps"); + window.emplace_back(std::move(chkpt)); + touchWindow_(win_id); + evictWindowsIfNeeded_(); +} + +window_id_t DatabaseCheckpointer::getWindowID_(chkpt_id_t id) const +{ + return id / (snap_thresh_ + 1); +} + +void DatabaseCheckpointer::touchWindow_(window_id_t id) +{ + std::lock_guard lock(cache_mutex_); + + auto it = lru_map_.find(id); + if (it != lru_map_.end()) { + lru_list_.erase(it->second); + } + lru_list_.push_front(id); + lru_map_[id] = lru_list_.begin(); +} + +void DatabaseCheckpointer::evictWindowsIfNeeded_(bool force_flush) +{ + std::lock_guard lock(cache_mutex_); + + uint32_t num_windows_to_evict = 0; + + if (force_flush) { + num_windows_to_evict = lru_list_.size(); + } else if (lru_list_.size() > max_cached_windows_) { + num_windows_to_evict = lru_list_.size() - max_cached_windows_; + } + + if (num_windows_to_evict == 0) { + return; + } + + while (num_windows_to_evict > 0) { + // Evict the least recently used window + const auto win_id = lru_list_.back(); + + // Unless we are flushing, do not evict the window containing + // the current checkpoint or the head checkpoint + if (!force_flush) { + auto current = getCurrent_(); + auto current_win_id = getWindowID_(current->getID()); + + auto head = getHead_(); + auto head_win_id = getWindowID_(head->getID()); + + // If the current or head checkpoint is in this window, skip eviction. + // Decrement the number of windows to evict since we are skipping this one. + if (current_win_id == win_id || head_win_id == win_id) { + sparta_assert(num_windows_to_evict-- > 0); + touchWindow_(win_id); + continue; + } + } + + lru_list_.pop_back(); + lru_map_.erase(win_id); + + // Send the window down the pipeline for writing to the database + auto& window = chkpts_cache_[win_id]; + if (!window.empty()) { + pipeline_head_->emplace(std::move(window)); + } + + // Cleanup + chkpts_cache_.erase(win_id); + sparta_assert(num_windows_to_evict-- > 0); + } +} + +bool DatabaseCheckpointer::ensureWindowLoaded_(chkpt_id_t chkpt_id, bool must_succeed) +{ + std::lock_guard lock(cache_mutex_); + + window_id_t win_id = getWindowID_(chkpt_id); + if (chkpts_cache_.find(win_id) == chkpts_cache_.end()) { + checkpoint_ptrs window_chkpts = getWindowFromDatabase_(win_id); + if (window_chkpts.empty() && must_succeed) { + throw CheckpointError("Could not find checkpoint window with ID ") << win_id; + } else if (window_chkpts.empty()) { + return false; + } + chkpts_cache_[win_id] = std::move(window_chkpts); + } + + auto& window = chkpts_cache_[win_id]; + if (window.empty()) { + if (must_succeed) { + throw CheckpointError("Checkpoint window with ID ") << win_id << " is empty"; + } + chkpts_cache_.erase(win_id); + return false; + } + + auto snapshot_id = window.front()->getID(); + if (chkpt_id < snapshot_id || chkpt_id > window.back()->getID()) { + if (must_succeed) { + throw CheckpointError("Checkpoint ID ") << chkpt_id + << " is not in the loaded checkpoint window with ID " << win_id + << " which contains checkpoints from " << snapshot_id + << " to " << window.back()->getID(); + } + return false; + } + + auto& chkpt = window.at(chkpt_id - snapshot_id); + sparta_assert(chkpt->getID() == chkpt_id); + + touchWindow_(win_id); + evictWindowsIfNeeded_(); + return true; +} + +std::vector> DatabaseCheckpointer::getWindowFromDatabase_(window_id_t win_id) +{ + std::vector> window_chkpts; + pipeline_flusher_->flush(); + + db_mgr_->safeTransaction([&]() { + auto query = db_mgr_->createQuery("ChkptWindows"); + query->addConstraintForUInt64("WindowID", simdb::Constraints::EQUAL, win_id); + + std::vector compressed_window_bytes; + query->select("WindowBytes", compressed_window_bytes); + + auto results = query->getResultSet(); + if (results.getNextRecord()) { + std::unique_ptr window_restored = deserializeWindow_(compressed_window_bytes); + sparta_assert(window_restored && !window_restored->chkpts.empty()); + window_chkpts = std::move(window_restored->chkpts); + } + }); + + return window_chkpts; +} + +std::unique_ptr DatabaseCheckpointer::deserializeWindow_(const std::vector& compressed_window_bytes) const +{ + std::vector window_bytes; + simdb::decompressData(compressed_window_bytes, window_bytes); + + auto window_restored = std::make_unique(); + boost::iostreams::basic_array_source device(window_bytes.data(), window_bytes.size()); + boost::iostreams::stream> is(device); + boost::archive::binary_iarchive ia(is); + ia >> *window_restored; + + for (auto& chkpt : window_restored->chkpts) { + chkpt->checkpointer_ = const_cast(this); + } + + return window_restored; +} + +void DatabaseCheckpointer::forEachCheckpoint_(const std::function& cb) +{ + // Flush the pipeline so that every checkpoint is either in our cache or on disk. + // There is no guarantee that the cache has newer checkpoints than the database, + // since many APIs load old windows into the cache and "mix them together" with + // whatever is already in the cache (new and old). + pipeline_flusher_->flush(); + + { + std::lock_guard lock(cache_mutex_); + + // Gather up all checkpoint IDs from our cache + for (const auto& [win_id, window] : chkpts_cache_) { + for (const auto& chkpt : window) { + cb(chkpt.get()); + } + } + } + + // Query the database for any other checkpoints + db_mgr_->safeTransaction([&]() { + auto query = db_mgr_->createQuery("ChkptWindows"); + + std::vector compressed_window_bytes; + query->select("WindowBytes", compressed_window_bytes); + + auto results = query->getResultSet(); + while (results.getNextRecord()) + { + auto window = deserializeWindow_(compressed_window_bytes); + for (const auto& chkpt : window->chkpts) { + cb(chkpt.get()); + } + } + }); +} + +REGISTER_SIMDB_APPLICATION(DatabaseCheckpointer); + +} // namespace sparta::serialization::checkpoint diff --git a/sparta/test/CMakeLists.txt b/sparta/test/CMakeLists.txt index 1f854afc28..1a80c840b2 100644 --- a/sparta/test/CMakeLists.txt +++ b/sparta/test/CMakeLists.txt @@ -30,9 +30,7 @@ add_custom_command (TARGET regress_valgrind POST_BUILD COMMAND ctest -L ${VALGR add_subdirectory (Array) add_subdirectory (Audience) add_subdirectory (BasicHistogram) -if (NOT APPLE) - add_subdirectory (Buffer) -endif () +add_subdirectory (Buffer) add_subdirectory (Bus) add_subdirectory (cache) add_subdirectory (CachedMemory) @@ -46,6 +44,7 @@ add_subdirectory (ContextCounter) add_subdirectory (CycleHistogram) add_subdirectory (DAG) add_subdirectory (DAG_Ordering) +add_subdirectory (DatabaseCheckpoint) add_subdirectory (DataView) add_subdirectory (Enum) add_subdirectory (Events) diff --git a/sparta/test/DatabaseCheckpoint/CMakeLists.txt b/sparta/test/DatabaseCheckpoint/CMakeLists.txt new file mode 100644 index 0000000000..2d219a250f --- /dev/null +++ b/sparta/test/DatabaseCheckpoint/CMakeLists.txt @@ -0,0 +1,5 @@ +project(DatabaseCheckpoint_test) + +sparta_add_test_executable(DatabaseCheckpoint_test DatabaseCheckpoint_test.cpp) + +sparta_test(DatabaseCheckpoint_test DatabaseCheckpoint_test_RUN) diff --git a/sparta/test/DatabaseCheckpoint/DatabaseCheckpoint_test.cpp b/sparta/test/DatabaseCheckpoint/DatabaseCheckpoint_test.cpp new file mode 100644 index 0000000000..47b93d9f35 --- /dev/null +++ b/sparta/test/DatabaseCheckpoint/DatabaseCheckpoint_test.cpp @@ -0,0 +1,553 @@ +#include +#include +#include +#include +#include +#include + +#include "sparta/sparta.hpp" +#include "sparta/simulation/TreeNode.hpp" +#include "sparta/log/Tap.hpp" +#include "sparta/log/Destination.hpp" +#include "sparta/functional/Register.hpp" +#include "sparta/functional/RegisterSet.hpp" +#include "sparta/memory/MemoryObject.hpp" +#include "sparta/serialization/checkpoint/DatabaseCheckpointer.hpp" +#include "sparta/utils/SpartaTester.hpp" + +#include "simdb/apps/AppManager.hpp" +#include "simdb/sqlite/DatabaseManager.hpp" +#include "simdb/pipeline/Pipeline.hpp" + +/*! + * \file DatabaseCheckpoint_test.cpp + * \brief Test for SimDB-backed Checkpoints + * + * This is modified from FastCheckpoint_test.cpp. + * + * Register is built on DataView and RegisterSet is built on ArchData. + * The DataView test performs extensive testing so some test-cases related + * to register sizes and layouts may be omitted from this test. + */ + +TEST_INIT + +using sparta::Register; +using sparta::RegisterSet; +using sparta::RootTreeNode; +using sparta::memory::MemoryObject; +using sparta::memory::BlockingMemoryObjectIFNode; +using sparta::serialization::checkpoint::DatabaseCheckpointer; +using sparta::serialization::checkpoint::DatabaseCheckpoint; + +static const uint16_t HINT_NONE=0; + +//! Some register and field definition tables +Register::Definition reg_defs[] = { + { 0, "reg0", Register::GROUP_NUM_NONE, "", Register::GROUP_IDX_NONE, "reg desc", 1, + {}, {}, nullptr, Register::INVALID_ID, 0, nullptr, HINT_NONE, 0 }, + { 1, "reg1", Register::GROUP_NUM_NONE, "", Register::GROUP_IDX_NONE, "reg desc", 2, + {}, {}, nullptr, Register::INVALID_ID, 0, nullptr, HINT_NONE, 0 }, + { 2, "reg2", Register::GROUP_NUM_NONE, "", Register::GROUP_IDX_NONE, "reg desc", 4, + {}, {}, nullptr, Register::INVALID_ID, 0, nullptr, HINT_NONE, 0 }, + { 3, "reg3", Register::GROUP_NUM_NONE, "", Register::GROUP_IDX_NONE, "reg desc", 8, + {}, {}, nullptr, Register::INVALID_ID, 0, nullptr, HINT_NONE, 0 }, + { 4, "reg4", Register::GROUP_NUM_NONE, "", Register::GROUP_IDX_NONE, "reg desc", 16, + {}, {}, nullptr, Register::INVALID_ID, 0, nullptr, HINT_NONE, 0 }, + Register::DEFINITION_END +}; + +//! Dummy device +class DummyDevice : public sparta::TreeNode +{ +public: + DummyDevice(sparta::TreeNode* parent) : + sparta::TreeNode(parent, "dummy", "", sparta::TreeNode::GROUP_IDX_NONE, "dummy node for checkpoint test") + {} +}; + +void RunCheckpointerTest(uint64_t initial_tick = 0) +{ + sparta::Scheduler sched; + RootTreeNode clocks("clocks"); + sparta::Clock clk(&clocks, "clock", &sched); + + // Create a tree with some register sets and memory + RootTreeNode root; + + DummyDevice dummy(&root); + std::unique_ptr rset(RegisterSet::create(&dummy, reg_defs)); + + DummyDevice dummy2(&dummy); + std::unique_ptr rset2(RegisterSet::create(&dummy2, reg_defs)); + + auto r1 = rset->getRegister("reg2"); + auto r2 = rset2->getRegister("reg2"); + assert(r1 != r2); + r1->write(0 * 5ul); + r2->write(0 % 5ul); + + simdb::DatabaseManager db_mgr("test.db", true); + simdb::AppManager app_mgr(&db_mgr); + + // Setup... + app_mgr.getAppFactory()->setSpartaElems(root, &sched); + app_mgr.enableApp(DatabaseCheckpointer::NAME); + app_mgr.createEnabledApps(); + app_mgr.createSchemas(); + app_mgr.postInit(0, nullptr); + app_mgr.openPipelines(); + + auto& dbcp = *app_mgr.getApp(); + dbcp.setSnapshotThreshold(10); + dbcp.setMaxCachedWindows(10); + + root.enterConfiguring(); + root.enterFinalized(); + sched.finalize(); + EXPECT_EQUAL(sched.getCurrentTick(), 0); + EXPECT_TRUE(dbcp.getCheckpointsAt(0).empty()); + EXPECT_EQUAL(dbcp.getNumCheckpoints(), 0); + EXPECT_EQUAL(dbcp.getNumSnapshots(), 0); + EXPECT_EQUAL(dbcp.getNumDeltas(), 0); + EXPECT_TRUE(dbcp.getCheckpointChain(0).empty()); + + // Advance the scheduler before taking the head checkpoint + if (initial_tick > 0) { + sched.run(initial_tick, true, false); + } + EXPECT_EQUAL(sched.getCurrentTick(), initial_tick); + + // CHECKPOINT: Head + DatabaseCheckpointer::chkpt_id_t head_id; + EXPECT_NOTHROW(dbcp.createHead()); + head_id = dbcp.getHeadID(); + EXPECT_NOTEQUAL(dbcp.getHead(), nullptr); + EXPECT_EQUAL(head_id, dbcp.getHead()->getID()); + EXPECT_EQUAL(dbcp.getCurrentID(), head_id); + EXPECT_EQUAL(dbcp.getCurrentTick(), initial_tick); + EXPECT_TRUE(dbcp.isSnapshot(head_id)); + + std::cout << dbcp.stringize() << std::endl; + + auto step_checkpointer = [&](DatabaseCheckpointer::chkpt_id_t expected_id, bool step_sched = true) { + r1->write(expected_id * 5ul); + r2->write(expected_id % 5ul); + if (step_sched) { + sched.run(1, true, false); + } + + DatabaseCheckpointer::chkpt_id_t actual_id = DatabaseCheckpoint::UNIDENTIFIED_CHECKPOINT; + EXPECT_NOTHROW(actual_id = dbcp.createCheckpoint()); + EXPECT_EQUAL(actual_id, expected_id); + EXPECT_EQUAL(actual_id, dbcp.getCurrentID()); + EXPECT_EQUAL(dbcp.getNumCheckpoints(), expected_id + 1); + + // Should always have the head and current checkpoints in the cache + EXPECT_TRUE(dbcp.isCheckpointCached(dbcp.getHeadID())); + EXPECT_TRUE(dbcp.isCheckpointCached(dbcp.getCurrentID())); + + return actual_id; + }; + + auto verif_find_checkpoint = [&](DatabaseCheckpointer::chkpt_id_t id, bool must_exist = true) { + std::shared_ptr cp; + EXPECT_NOTHROW(cp = dbcp.findCheckpoint(id, must_exist)); + if (cp) { + EXPECT_EQUAL(cp->getID(), id); + EXPECT_EQUAL(cp->getPrevID(), (id > 0) ? (id - 1) : DatabaseCheckpoint::UNIDENTIFIED_CHECKPOINT); + EXPECT_EQUAL(cp->isSnapshot(), (id % (dbcp.getSnapshotThreshold() + 1)) == 0); + + if (cp->isSnapshot()) { + EXPECT_EQUAL(cp->getDistanceToPrevSnapshot(), 0); + } else { + EXPECT_EQUAL(cp->getDistanceToPrevSnapshot(), id % (dbcp.getSnapshotThreshold() + 1)); + } + } + return cp; + }; + + auto verif_load_chkpt = [&](DatabaseCheckpointer::chkpt_id_t id) { + EXPECT_NOTHROW(dbcp.loadCheckpoint(id)); + EXPECT_EQUAL(dbcp.getCurrentID(), id); + EXPECT_EQUAL(dbcp.getNumCheckpoints(), id + 1); + EXPECT_FALSE(dbcp.hasCheckpoint(id + 1)); + EXPECT_EQUAL(sched.getCurrentTick(), id + initial_tick); + + auto r1_val = r1->read(); + auto r2_val = r2->read(); + EXPECT_EQUAL(r1_val, id * 5ul); + EXPECT_EQUAL(r2_val, id % 5ul); + }; + + auto wait_until_evicted = [&](DatabaseCheckpointer::chkpt_id_t id) { + size_t num_tries = 0; + while (dbcp.isCheckpointCached(id) && num_tries < 3) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + ++num_tries; + } + EXPECT_FALSE(num_tries == 3); + EXPECT_FALSE(dbcp.isCheckpointCached(id)); + }; + + // Ensure force_snapshot=true always throws. Not supported. + EXPECT_THROW(dbcp.createCheckpoint(true)); + + // Ensure traceValue() throws. Not supported. + EXPECT_THROW(dbcp.traceValue(std::cout, dbcp.getCurrentID(), nullptr, 0, 4)); + + // Create 1000 checkpoints, and periodically access an old one. Also + // go to sleep sometimes to increase the chances we have to go to the + // database to retrieve a checkpoint. + for (uint32_t i = 1; i <= 1000; ++i) { + // Step the scheduler and take a checkpoint + step_checkpointer(i); + + // Access most recent from the cache directly + verif_find_checkpoint(i); + + // Force some of the checkpoints to be retrieved from the database + if (i % 100 == 0 && i > 250) { + auto old_id = i - 100; + wait_until_evicted(old_id); + verif_find_checkpoint(old_id); + } + } + + // Nothing to test, just call dumpList/dumpData/dumpAnnotatedData. + // Do this while we have a lot of checkpoints in the cache and + // the database for max code coverage. + dbcp.dumpList(std::cout); + std::cout << std::endl; + dbcp.dumpData(std::cout); + std::cout << std::endl; + dbcp.dumpAnnotatedData(std::cout); + std::cout << std::endl; + + // Verify that cached / DB-recreated checkpoints are identical: + // 1. Get the current checkpoint from the cache + auto cached_cp1000 = dbcp.findCheckpoint(dbcp.getCurrentID()); + EXPECT_TRUE(dbcp.isCheckpointCached(cached_cp1000->getID())); + + // 2. Write a lot more checkpoints to force the oldest ones out of the cache + for (uint32_t i = 1001; i <= 1500; ++i) { + step_checkpointer(i); + } + wait_until_evicted(cached_cp1000->getID()); + + // 3. Recreate the same checkpoint from the database + EXPECT_FALSE(dbcp.isCheckpointCached(cached_cp1000->getID())); + auto recreated_cp1000 = dbcp.findCheckpoint(cached_cp1000->getID()); + + EXPECT_NOTEQUAL(cached_cp1000, nullptr); + EXPECT_NOTEQUAL(recreated_cp1000, nullptr); + + if (cached_cp1000 && recreated_cp1000) { + std::ostringstream oss1; + std::ostringstream oss2; + + cached_cp1000->dumpData(oss1); + recreated_cp1000->dumpData(oss2); + + EXPECT_EQUAL(oss1.str(), oss2.str()); + EXPECT_EQUAL(cached_cp1000->getTotalMemoryUse(), recreated_cp1000->getTotalMemoryUse()); + EXPECT_EQUAL(cached_cp1000->getContentMemoryUse(), recreated_cp1000->getContentMemoryUse()); + EXPECT_TRUE(cached_cp1000->getHistoryChain() == recreated_cp1000->getHistoryChain()); + EXPECT_TRUE(cached_cp1000->getRestoreChain() == recreated_cp1000->getRestoreChain()); + EXPECT_EQUAL(cached_cp1000->getPrevID(), recreated_cp1000->getPrevID()); + EXPECT_EQUAL(cached_cp1000->getNextIDs(), recreated_cp1000->getNextIDs()); + EXPECT_EQUAL(cached_cp1000->getTick(), recreated_cp1000->getTick()); + EXPECT_EQUAL(cached_cp1000->isSnapshot(), recreated_cp1000->isSnapshot()); + EXPECT_EQUAL(cached_cp1000->getDistanceToPrevSnapshot(), recreated_cp1000->getDistanceToPrevSnapshot()); + } + + // Load very recent checkpoints that are definitely in the cache + for (size_t i = 1500; i > 1475; --i) { + EXPECT_TRUE(dbcp.isCheckpointCached(i)); + verif_load_chkpt(i); + } + + // Load checkpoints that have already been evicted from the cache + for (size_t i = 250; i > 225; --i) { + wait_until_evicted(i); + } + for (size_t i = 250; i > 225; --i) { + verif_load_chkpt(i); + } + + // Verify history chain + auto hist_chain13 = dbcp.getHistoryChain(13); + for (auto hist_id : {0,1,2,3,4,5,6,7,8,9,10,11,12,13}) { + EXPECT_FALSE(hist_chain13.empty()); + EXPECT_EQUAL(hist_chain13.top(), hist_id); + hist_chain13.pop(); + } + EXPECT_TRUE(hist_chain13.empty()); + + // Verify restore chain + auto rest_chain13 = dbcp.getRestoreChain(13); + for (auto rest_id : {11,12,13}) { + EXPECT_FALSE(rest_chain13.empty()); + EXPECT_EQUAL(rest_chain13.top(), rest_id); + rest_chain13.pop(); + } + EXPECT_TRUE(rest_chain13.empty()); + + // Verify distance to previous snapshot: + // + // Checkpoint ID Snapshot? + // 0 Yes (head) + // 1-10 No + // 11 Yes + // 12-21 No + // 22 Yes + // 23-32 No + // 33 Yes + std::shared_ptr cp; + EXPECT_NOTHROW(cp = dbcp.findCheckpoint(33, true)); + EXPECT_EQUAL(cp->getDistanceToPrevSnapshot(), 0); + EXPECT_NOTHROW(cp = dbcp.findCheckpoint(32, true)); + EXPECT_EQUAL(cp->getDistanceToPrevSnapshot(), 10); + EXPECT_NOTHROW(cp = dbcp.findCheckpoint(22, true)); + EXPECT_EQUAL(cp->getDistanceToPrevSnapshot(), 0); + EXPECT_NOTHROW(cp = dbcp.findCheckpoint(5, true)); + EXPECT_EQUAL(cp->getDistanceToPrevSnapshot(), 5); + + // Nothing to test, just call dumpRestoreChain() + dbcp.dumpRestoreChain(std::cout, 32); + + // Go back to checkpoint 1 + verif_load_chkpt(1); + + // Take 3 more checkpoints with IDs 2, 3, and 4 + step_checkpointer(2); + step_checkpointer(3); + step_checkpointer(4); + + // Go back to head + verif_load_chkpt(head_id); + + // Take some checkpoints and ensure that the current ID is always increasing by 1 with no gaps + step_checkpointer(1); + step_checkpointer(2); + step_checkpointer(3); + verif_load_chkpt(2); + verif_load_chkpt(1); + verif_load_chkpt(head_id); + + // Ensure exception is thrown when loading a non-existent checkpoint + EXPECT_THROW(dbcp.loadCheckpoint(9999)); + + // Ensure findCheckpoint() throws when must_exist=true and checkpoint does not exist + EXPECT_THROW(dbcp.findCheckpoint(9999, true)); + EXPECT_NOTHROW(dbcp.findCheckpoint(9999, false)); + + // Create checkpoints 1-50. + for (uint32_t i = 1; i <= 50; ++i) { + step_checkpointer(i); + } + + // Verify checkpoint chain: 0-50 + auto chain = dbcp.getCheckpointChain(dbcp.getCurrentID()); + EXPECT_EQUAL(chain.size(), 51); + uint32_t chain_idx = 0; + for (uint32_t i = 0; i <= 50; ++i) { + EXPECT_EQUAL(chain[chain_idx++], 50-i); + } + + // Sleep for a bit to flush the pipeline to ensure the checkpoint chain + // can be retrieved from the database. + std::this_thread::sleep_for(std::chrono::seconds(1)); + chain = dbcp.getCheckpointChain(dbcp.getCurrentID()); + EXPECT_EQUAL(chain.size(), 51); + chain_idx = 0; + for (uint32_t i = 0; i <= 50; ++i) { + EXPECT_EQUAL(chain[chain_idx++], 50-i); + } + + // Load checkpoint 45 + verif_load_chkpt(45); + + // Verify that checkpoints 46+ have been implicitly deleted + EXPECT_FALSE(dbcp.hasCheckpoint(46)); + + // Create checkpoints 46-55 + for (uint32_t i = 46; i <= 55; ++i) { + step_checkpointer(i); + } + + // Verify checkpoint chain: 0-55 + chain = dbcp.getCheckpointChain(dbcp.getCurrentID()); + EXPECT_EQUAL(chain.size(), 56); + chain_idx = 0; + for (uint32_t i = 0; i <= 55; ++i) { + EXPECT_EQUAL(chain[chain_idx++], 55-i); + } + + // Create checkpoints 56-58 + for (uint32_t i = 56; i <= 58; ++i) { + step_checkpointer(i); + } + + // Delete checkpoint always throws + EXPECT_THROW(dbcp.deleteCheckpoint(57)); + + // Create checkpoints 59-70 + for (uint32_t i = 59; i <= 70; ++i) { + step_checkpointer(i); + } + + // Load checkpoint 58 + verif_load_chkpt(58); + + // Verify all checkpoints: 0-58 + auto all_chkpts = dbcp.getCheckpoints(); + EXPECT_EQUAL(all_chkpts.size(), 59); + EXPECT_EQUAL(dbcp.getNumCheckpoints(), 59); + uint32_t idx = 0; + for (uint32_t i = 0; i <= 58; ++i) { + EXPECT_EQUAL(all_chkpts[idx++], i); + } + EXPECT_EQUAL(idx, all_chkpts.size()); + + // Create checkpoints 59-75 + for (uint32_t i = 59; i <= 75; ++i) { + step_checkpointer(i); + } + + // Verify all checkpoints: 0-75 + all_chkpts = dbcp.getCheckpoints(); + EXPECT_EQUAL(all_chkpts.size(), 76); + EXPECT_EQUAL(dbcp.getNumCheckpoints(), 76); + idx = 0; + for (uint32_t i = 0; i <= 75; ++i) { + EXPECT_EQUAL(all_chkpts[idx++], i); + } + EXPECT_EQUAL(idx, all_chkpts.size()); + + // Nothing to test, just call dumpRestoreChain() + EXPECT_NOTHROW(dbcp.dumpRestoreChain(std::cout, 73)); + + // Verify history chain up to current checkpoint + size_t all_idx = 0; + auto history_chain = dbcp.getHistoryChain(dbcp.getCurrentID()); + while (!history_chain.empty()) { + EXPECT_EQUAL(history_chain.top(), all_chkpts[all_idx++]); + history_chain.pop(); + } + + // Verify restore chain up to current checkpoint + auto restore_chain = dbcp.getRestoreChain(dbcp.getCurrentID()); + auto id = restore_chain.top(); + restore_chain.pop(); + std::shared_ptr chkpt; + EXPECT_NOTHROW(chkpt = dbcp.findCheckpoint(id, true)); + auto c = chkpt; + EXPECT_NOTEQUAL(c, nullptr); + EXPECT_TRUE(c->isSnapshot()); + + while (!restore_chain.empty()) { + id = restore_chain.top(); + restore_chain.pop(); + EXPECT_NOTHROW(chkpt = dbcp.findCheckpoint(id, true)); + c = chkpt; + EXPECT_NOTEQUAL(c, nullptr); + EXPECT_FALSE(c->isSnapshot()); + } + + // To check the getCheckpointsAt() method, go back to the head + // checkpoint. Then take a bunch of checkpoints at tick 1, 2, and 3. + verif_load_chkpt(head_id); + EXPECT_EQUAL(sched.getCurrentTick(), initial_tick); + + std::vector chkpts_at_1; + for (uint32_t i = 1; i <= 300; ++i) { + const bool step_sched = (i == 1); + auto id = step_checkpointer(i, step_sched); + EXPECT_EQUAL(sched.getCurrentTick(), 1 + initial_tick); + chkpts_at_1.push_back(id); + } + + std::vector chkpts_at_2; + for (uint32_t i = 301; i <= 500; ++i) { + const bool step_sched = (i == 301); + auto id = step_checkpointer(i, step_sched); + EXPECT_EQUAL(sched.getCurrentTick(), 2 + initial_tick); + chkpts_at_2.push_back(id); + } + + std::vector chkpts_at_3; + for (uint32_t i = 501; i <= 700; ++i) { + const bool step_sched = (i == 501); + auto id = step_checkpointer(i, step_sched); + EXPECT_EQUAL(sched.getCurrentTick(), 3 + initial_tick); + chkpts_at_3.push_back(id); + } + + EXPECT_EQUAL(dbcp.getCheckpointsAt(1 + initial_tick), chkpts_at_1); + EXPECT_EQUAL(dbcp.getCheckpointsAt(2 + initial_tick), chkpts_at_2); + EXPECT_EQUAL(dbcp.getCheckpointsAt(3 + initial_tick), chkpts_at_3); + + // Wait for the older checkpoints to be evicted and + // verify getCheckpointsAt() again. + wait_until_evicted(chkpts_at_1.back()); + wait_until_evicted(chkpts_at_2.back()); + + EXPECT_EQUAL(dbcp.getCheckpointsAt(1 + initial_tick), chkpts_at_1); + EXPECT_EQUAL(dbcp.getCheckpointsAt(2 + initial_tick), chkpts_at_2); + EXPECT_EQUAL(dbcp.getCheckpointsAt(3 + initial_tick), chkpts_at_3); + + // Verify the findLatestCheckpointAtOrBefore() method. + // Valid tick (2), invalid ID (9999) + EXPECT_THROW(dbcp.findLatestCheckpointAtOrBefore(2, 9999)); + + // Valid ID (1), but tick is before the head checkpoint + if (initial_tick > 0) { + EXPECT_EQUAL(dbcp.findLatestCheckpointAtOrBefore(initial_tick - 1, 1), nullptr); + } + + // Valid tick (2), valid ID + EXPECT_NOTHROW(chkpt = dbcp.findLatestCheckpointAtOrBefore(2 + initial_tick, chkpts_at_2.back())); + EXPECT_EQUAL(chkpt->getID(), chkpts_at_2.back()); + EXPECT_EQUAL(chkpt->getTick(), 2 + initial_tick); + + // Valid tick (2), valid ID + EXPECT_NOTHROW(chkpt = dbcp.findLatestCheckpointAtOrBefore(2 + initial_tick, chkpts_at_3.back())); + EXPECT_EQUAL(chkpt->getID(), chkpts_at_2.back()); + EXPECT_EQUAL(chkpt->getTick(), 2 + initial_tick); + + // Verify that the head checkpoint is in the cache until simulation teardown. + EXPECT_TRUE(dbcp.isCheckpointCached(head_id)); + + // Finish + app_mgr.postSimLoopTeardown(); + root.enterTeardown(); + clocks.enterTeardown(); + + // Ensure that the head checkpoint is no longer in the cache + EXPECT_FALSE(dbcp.isCheckpointCached(head_id)); +} + +int main() +{ + auto warn_cerr = std::make_unique( + sparta::TreeNode::getVirtualGlobalNode(), + sparta::log::categories::WARN, + std::cerr); + + auto warn_file = std::make_unique( + sparta::TreeNode::getVirtualGlobalNode(), + sparta::log::categories::WARN, + "warnings.log"); + + // Run the test with initial scheduler tick = 0, + // i.e. head checkpoint at tick 0 + RunCheckpointerTest(0); + + // Run the test with initial scheduler tick = 10, + // i.e. head checkpoint at tick 10 + RunCheckpointerTest(10); + + REPORT_ERROR; + return ERROR_CODE; +} diff --git a/sparta/test/FastCheckpoint/CMakeLists.txt b/sparta/test/FastCheckpoint/CMakeLists.txt index 19c186e058..2bc9fbd9ac 100644 --- a/sparta/test/FastCheckpoint/CMakeLists.txt +++ b/sparta/test/FastCheckpoint/CMakeLists.txt @@ -6,4 +6,3 @@ sparta_test(FastCheckpoint_test FastCheckpoint_test_RUN) add_subdirectory(FILEStream) add_subdirectory(PersistentFastCheckpoint) - diff --git a/sparta/test/FastCheckpoint/FastCheckpoint_test.cpp b/sparta/test/FastCheckpoint/FastCheckpoint_test.cpp index 485803764b..833ac322c2 100644 --- a/sparta/test/FastCheckpoint/FastCheckpoint_test.cpp +++ b/sparta/test/FastCheckpoint/FastCheckpoint_test.cpp @@ -364,7 +364,7 @@ void generalTest() // Look at a restore chain - auto* cp20 = (fcp.findInternalCheckpoint(20)); + auto* cp20 = (fcp.findCheckpoint(20)); auto rc20 = cp20->getRestoreChain(); EXPECT_EQUAL(rc20.size(), 6); // 0 -> 16 -> 17 -> * -> 19 -> 20 std::cout << "\nRestore chain for cp 20:" << std::endl; @@ -394,11 +394,11 @@ void generalTest() auto cpA = fcp.createCheckpoint(); ////r1->write(0xbbbb); std::cout << "Dumping restore chain for cpA (" << cpA << ")" << std::endl; - fcp.findInternalCheckpoint(cpA)->dumpRestoreChain(std::cout); + fcp.findCheckpoint(cpA)->dumpRestoreChain(std::cout); std::cout << std::endl; continues.clear(); fcp.dumpBranch(std::cout, - fcp.findCheckpoint(cpP), + cpP, 0, 0, continues); @@ -407,11 +407,11 @@ void generalTest() auto cpC = fcp.createCheckpoint(); //////fcp.deleteCheckpoint(cpA); std::cout << "Dumping restore chain for cpC (" << cpC << ")" << std::endl; - fcp.findInternalCheckpoint(cpC)->dumpRestoreChain(std::cout); + fcp.findCheckpoint(cpC)->dumpRestoreChain(std::cout); std::cout << std::endl; continues.clear(); fcp.dumpBranch(std::cout, - fcp.findCheckpoint(cpP), + cpP, 0, 0, continues); @@ -420,7 +420,7 @@ void generalTest() fcp.deleteCheckpoint(cpC); continues.clear(); fcp.dumpBranch(std::cout, - fcp.findCheckpoint(cpP), + cpP, 0, 0, continues); @@ -429,7 +429,7 @@ void generalTest() fcp.deleteCheckpoint(cpA); continues.clear(); fcp.dumpBranch(std::cout, - fcp.findCheckpoint(cpP), + cpP, 0, 0, continues); @@ -438,11 +438,11 @@ void generalTest() auto cpB = fcp.createCheckpoint(); fcp.loadCheckpoint(cpB); std::cout << "Dumping restore chain for cpB (" << cpB << ")" << std::endl; - fcp.findInternalCheckpoint(cpB)->dumpRestoreChain(std::cout); + fcp.findCheckpoint(cpB)->dumpRestoreChain(std::cout); std::cout << std::endl; continues.clear(); fcp.dumpBranch(std::cout, - fcp.findCheckpoint(cpP), + cpP, 0, 0, continues);