Skip to content

为支持高效快照功能的状态机设计更高效的checkpoint机制 #222

@dyx2025

Description

@dyx2025

为支持高效快照功能的状态机设计更高效的checkpoint机制。

目前的checkpoint机制,主要由Replayer,StateMachine,CheckpointSender,CheckpointReceiver和Learner协同完成。

  1. Replayer确定需要应用到checkpoint的paxos log日志,然后通过Replayer::PlayOne调用SMFac::ExecuteForCheckpoint,把paxos log一条条地应用到checkpoint。
  2. CheckpointSender在需要发送checkpoint给远端节点的时候,先暂停Replayer应用paxoslog,避免发送过程中checkpoint被修改。然后,CheckpointSender通过SendCheckpointFofaSM::GetCheckpointState获取状态机对应的checkpoint文件。之后通过CheckpointSender ::SendFile把checkpoint文件发送到远端节点的CheckpointReceiver。
  3. CheckpointReceiver通过CheckpointReceiver::ReceiveCheckpoint接收checkpoint文件,并存储在本地。
  4. Learner在接收完checkpoint文件后调用Learner::OnSendCheckpoint_End,再调用StateMachine::LoadCheckpointState把存储在本地的checkpoint文件应用到对应的状态机。

目前checkpoint机制的固定开销:

  1. Replayer读取磁盘的paxos log。
  2. Replayer调用SMFac::ExecuteForCheckpoint的开销。

目前checkpoint机制取决于状态机实现的开销:

  1. 状态机数据和checkpoint数据分离时,checkpoint需要格外的磁盘空间。写量越多,SSD盘寿命越短。
  2. 状态机数据和checkpoint数据不分离时,分两种情况。
    2.1. 状态机支持高效快照功能,生成快照句柄堵塞状态机读写的时间极短,读取快照内容不会阻塞状态机的读写操作。
    2.2. 状态机不支持高效快照功能,生成快照句柄或读取快照内容需要长时间阻塞态机的写操作。

如果单个paxos group的所有支持checkpoint的状态机都支持高效快照功能,目前checkpoint机制的固定开销对这个paxos group完全没有必要。
为什么有如此限定条件呢?原因如下:

  1. 某些状态机不支持checkpoint功能(如SystemVSM和MasterStateMachine),没必要承担目前checkpoint机制的固定开销。
  2. 如果某些状态机不支持高效快照,正常情况下应该选择状态机数据和checkpoint数据分离,而不是生成快照句柄或读取快照内容时长时间阻塞态机的写操作 。而前者需要目前checkpoint机制的固定开销。

目前很多流行的单机存储引擎都支持高效的快照机制,如leveldb,rocksdb等。
以leveldb为例,生成快照句柄的时间极短,只需要加一次互斥锁,然后对memtable,immemtable和current_version增加引用计数器,然后返回memtable,mmemtable,current_version的对象指针和当前已应用的最大sequence即可。
由于有引用计数器的存在,memtable,immemtable和current_version在读取快照内容时不会被释放。
由于immemtable和current_version在创建和释放期间是不变体,读取快照内容期间它们不会被修改,而且读取它们不会堵塞其他的读写操作。
由于memtable的无锁实现和只插入不删除的特性(删除数据操作通过插入带有删除标志位的数据),可以安全地读取其内容,也不会堵塞其他的读写操作。
由于有当前已应用的最大sequence作为屏障,新写入的数据在快照中不可见。
leveldb获取快照句柄函数在 https://github.com/google/leveldb/blob/main/db/db_impl.cc 的 DBImpl::NewIterator 。

新checkpoint机制如下(以下讨论范围为单个paxos group):

  1. 如果所有支持checkpoint的状态机都支持高效快照功能,则使用高效快照机制,禁用Replayer原有的应用paxos log功能。
  2. 使用高效快照机制时,每次状态机列表成功执行完一个paxos log的时候,检查是否需要获取所有支持checkpoint的状态机的快照句柄(检查条件可以设置为每N个paxos log获取一次快照)和本次执行的instanceid。
  3. 使用高效快照机制时,CheckpointSender获取所有支持checkpoint的状态机的快照句柄及其对应的instanceid,然后读取快照内容构造checkpoint,最后把checkpoint发送到远端的节点。

使用高效快照机制时,获取所有支持checkpoint的状态机的快照句柄和本次执行的instanceid的伪代码如下。
src/algorithm/instance.cpp

int Instance :: Init()
{
    // ...

    uint64_t llCPInstanceID = m_oCheckpointMgr.GetCheckpointInstanceID() + 1;

    PLGImp("Acceptor.OK, Log.InstanceID %lu Checkpoint.InstanceID %lu", 
            m_oAcceptor.GetInstanceID(), llCPInstanceID);

    bool bPlayed = false;
    uint64_t llNowInstanceID = llCPInstanceID;
    if (llNowInstanceID < m_oAcceptor.GetInstanceID())
    {
        ret = PlayLog(llNowInstanceID, m_oAcceptor.GetInstanceID());
        if (ret != 0)
        {
            return ret;
        }

        PLGImp("PlayLog OK, begin instanceid %lu end instanceid %lu", llNowInstanceID, m_oAcceptor.GetInstanceID());

        llNowInstanceID = m_oAcceptor.GetInstanceID();
    }
    else
    {
        if (llNowInstanceID > m_oAcceptor.GetInstanceID())
        {
            ret = ProtectionLogic_IsCheckpointInstanceIDCorrect(llNowInstanceID, m_oAcceptor.GetInstanceID());
            if (ret != 0)
            {
                return ret;
            }
            m_oAcceptor.InitForNewPaxosInstance();
        }
        
        m_oAcceptor.SetInstanceID(llNowInstanceID);
    }

     // 使用高效快照机制时,获取所有支持checkpoint的状态机的快照句柄和本次执行的instanceid
     if (gUseEffectiveSnapshot && !gHasEffectiveSnapshot ) {
         // 获取所有支持checkpoint的状态机的快照句柄和本次执行的instanceid   
     }

    PLGImp("NowInstanceID %lu", llNowInstanceID);

    m_oLearner.SetInstanceID(llNowInstanceID);
    m_oProposer.SetInstanceID(llNowInstanceID);
    m_oProposer.SetStartProposalID(m_oAcceptor.GetAcceptorState()->GetPromiseBallot().m_llProposalID + 1);

    m_oCheckpointMgr.SetMaxChosenInstanceID(llNowInstanceID);

     // ...

src/sm-base/sm_base.cpp

bool SMFac :: Execute(const int iGroupIdx, const uint64_t llInstanceID, const std::string & sPaxosValue, SMCtx * poSMCtx)
{
    if (sPaxosValue.size() < sizeof(int))
    {
        PLG1Err("Value wrong, instanceid %lu size %zu", llInstanceID, sPaxosValue.size());
        //need do nothing, just skip
        return true;
    }

    int iSMID = 0;
    memcpy(&iSMID, sPaxosValue.data(), sizeof(int));

    if (iSMID == 0)
    {
        PLG1Imp("Value no need to do sm, just skip, instanceid %lu", llInstanceID);
        return true;
    }

    bool ret = false;
    std::string sBodyValue = string(sPaxosValue.data() + sizeof(int), sPaxosValue.size() - sizeof(int));
    if (iSMID == BATCH_PROPOSE_SMID)
    {
        BatchSMCtx * poBatchSMCtx = nullptr;
        if (poSMCtx != nullptr && poSMCtx->m_pCtx != nullptr)
        {
            poBatchSMCtx = (BatchSMCtx *)poSMCtx->m_pCtx;
        }
        // return BatchExecute(iGroupIdx, llInstanceID, sBodyValue, poBatchSMCtx);
        ret = BatchExecute(iGroupIdx, llInstanceID, sBodyValue, poBatchSMCtx);
    }
    else
    {
        // return DoExecute(iGroupIdx, llInstanceID, sBodyValue, iSMID, poSMCtx);
        ret = DoExecute(iGroupIdx, llInstanceID, sBodyValue, iSMID, poSMCtx);
    }

     if (ret && gUseEffectiveSnapshot && bNeedEffectiveSnapshot) {
         // 获取所有支持checkpoint的状态机的快照句柄和本次执行的instanceid 
         gHasEffectiveSnapshot = true;  
     }

     return ret;
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions