Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions muduo/net/TimerId.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,19 @@ namespace muduo
namespace net
{

class Timer;

///
/// An opaque identifier, for canceling Timer.
///
class TimerId : public muduo::copyable
{
public:
TimerId()
: timer_(NULL),
sequence_(0)
: sequence_(0)
{
}

TimerId(Timer* timer, int64_t seq)
: timer_(timer),
sequence_(seq)
TimerId(int64_t seq)
: sequence_(seq)
{
}

Expand All @@ -43,7 +39,6 @@ class TimerId : public muduo::copyable
friend class TimerQueue;

private:
Timer* timer_;
int64_t sequence_;
};

Expand Down
77 changes: 47 additions & 30 deletions muduo/net/TimerQueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "muduo/net/Timer.h"
#include "muduo/net/TimerId.h"

#include <stdint.h>
#include <sys/timerfd.h>
#include <unistd.h>

Expand Down Expand Up @@ -107,20 +108,18 @@ TimerQueue::~TimerQueue()
timerfdChannel_.remove();
::close(timerfd_);
// do not remove channel, since we're in EventLoop::dtor();
for (const Entry& timer : timers_)
{
delete timer.second;
}
timerMap_.clear();
}

TimerId TimerQueue::addTimer(TimerCallback cb,
Timestamp when,
double interval)
{
Timer* timer = new Timer(std::move(cb), when, interval);
int64_t timerId = nextTimerId_.incrementAndGet();
std::shared_ptr<TimerCallback> cbPtr(new TimerCallback(std::move(cb)));
loop_->runInLoop(
std::bind(&TimerQueue::addTimerInLoop, this, timer));
return TimerId(timer, timer->sequence());
std::bind(&TimerQueue::addTimerInLoop, this, timerId, cbPtr, when, interval));
return TimerId(timerId);
}

void TimerQueue::cancel(TimerId timerId)
Expand All @@ -129,33 +128,48 @@ void TimerQueue::cancel(TimerId timerId)
std::bind(&TimerQueue::cancelInLoop, this, timerId));
}

void TimerQueue::addTimerInLoop(Timer* timer)
void TimerQueue::addTimerInLoop(int64_t timerId,
const std::shared_ptr<TimerCallback>& cb,
Timestamp when,
double interval)
{
loop_->assertInLoopThread();
bool earliestChanged = insert(timer);
std::pair<TimerMap::iterator, bool> result
= timerMap_.insert(std::make_pair(timerId,
std::unique_ptr<Timer>(new Timer(std::move(*cb), when, interval))));
assert(result.second); (void)result;

bool earliestChanged = insert(timerId);

if (earliestChanged)
{
resetTimerfd(timerfd_, timer->expiration());
TimerMap::iterator timer = timerMap_.find(timerId);
assert(timer != timerMap_.end());
resetTimerfd(timerfd_, timer->second->expiration());
}
}

void TimerQueue::cancelInLoop(TimerId timerId)
{
loop_->assertInLoopThread();
assert(timers_.size() == activeTimers_.size());
ActiveTimer timer(timerId.timer_, timerId.sequence_);
ActiveTimerSet::iterator it = activeTimers_.find(timer);
ActiveTimerSet::iterator it = activeTimers_.find(timerId.sequence_);
if (it != activeTimers_.end())
{
size_t n = timers_.erase(Entry(it->first->expiration(), it->first));
TimerMap::iterator timer = timerMap_.find(timerId.sequence_);
assert(timer != timerMap_.end());
size_t n = timers_.erase(Entry(timer->second->expiration(), timerId.sequence_));
assert(n == 1); (void)n;
delete it->first; // FIXME: no delete please
timerMap_.erase(timer);
activeTimers_.erase(it);
}
else if (callingExpiredTimers_)
{
cancelingTimers_.insert(timer);
cancelingTimers_.insert(timerId.sequence_);
}
else
{
timerMap_.erase(timerId.sequence_);
}
assert(timers_.size() == activeTimers_.size());
}
Expand All @@ -173,7 +187,9 @@ void TimerQueue::handleRead()
// safe to callback outside critical section
for (const Entry& it : expired)
{
it.second->run();
TimerMap::iterator timer = timerMap_.find(it.second);
assert(timer != timerMap_.end());
timer->second->run();
}
callingExpiredTimers_ = false;

Expand All @@ -184,16 +200,15 @@ std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
{
assert(timers_.size() == activeTimers_.size());
std::vector<Entry> expired;
Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
Entry sentry(now, INT64_MAX);
TimerList::iterator end = timers_.lower_bound(sentry);
assert(end == timers_.end() || now < end->first);
std::copy(timers_.begin(), end, back_inserter(expired));
timers_.erase(timers_.begin(), end);

for (const Entry& it : expired)
{
ActiveTimer timer(it.second, it.second->sequence());
size_t n = activeTimers_.erase(timer);
size_t n = activeTimers_.erase(it.second);
assert(n == 1); (void)n;
}

Expand All @@ -207,23 +222,23 @@ void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)

for (const Entry& it : expired)
{
ActiveTimer timer(it.second, it.second->sequence());
if (it.second->repeat()
&& cancelingTimers_.find(timer) == cancelingTimers_.end())
TimerMap::iterator timer = timerMap_.find(it.second);
assert(timer != timerMap_.end());
if (timer->second->repeat()
&& cancelingTimers_.find(it.second) == cancelingTimers_.end())
{
it.second->restart(now);
timer->second->restart(now);
insert(it.second);
}
else
{
// FIXME move to a free list
delete it.second; // FIXME: no delete please
timerMap_.erase(timer);
}
}

if (!timers_.empty())
{
nextExpire = timers_.begin()->second->expiration();
nextExpire = timers_.begin()->first;
}

if (nextExpire.valid())
Expand All @@ -232,25 +247,27 @@ void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
}
}

bool TimerQueue::insert(Timer* timer)
bool TimerQueue::insert(int64_t timerId)
{
loop_->assertInLoopThread();
assert(timers_.size() == activeTimers_.size());
bool earliestChanged = false;
Timestamp when = timer->expiration();
TimerMap::iterator timer = timerMap_.find(timerId);
assert(timer != timerMap_.end());
Timestamp when = timer->second->expiration();
TimerList::iterator it = timers_.begin();
if (it == timers_.end() || when < it->first)
{
earliestChanged = true;
}
{
std::pair<TimerList::iterator, bool> result
= timers_.insert(Entry(when, timer));
= timers_.insert(Entry(when, timerId));
assert(result.second); (void)result;
}
{
std::pair<ActiveTimerSet::iterator, bool> result
= activeTimers_.insert(ActiveTimer(timer, timer->sequence()));
= activeTimers_.insert(timerId);
assert(result.second); (void)result;
}

Expand Down
21 changes: 13 additions & 8 deletions muduo/net/TimerQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
#ifndef MUDUO_NET_TIMERQUEUE_H
#define MUDUO_NET_TIMERQUEUE_H

#include <memory>
#include <set>
#include <unordered_map>
#include <vector>

#include "muduo/base/Atomic.h"
#include "muduo/base/Mutex.h"
#include "muduo/base/Timestamp.h"
#include "muduo/net/Callbacks.h"
Expand Down Expand Up @@ -51,29 +54,31 @@ class TimerQueue : noncopyable

private:

// FIXME: use unique_ptr<Timer> instead of raw pointers.
// This requires heterogeneous comparison lookup (N3465) from C++14
// so that we can find an T* in a set<unique_ptr<T>>.
typedef std::pair<Timestamp, Timer*> Entry;
typedef std::pair<Timestamp, int64_t> Entry;
typedef std::set<Entry> TimerList;
typedef std::pair<Timer*, int64_t> ActiveTimer;
typedef std::set<ActiveTimer> ActiveTimerSet;
typedef std::unordered_map<int64_t, std::unique_ptr<Timer>> TimerMap;
typedef std::set<int64_t> ActiveTimerSet;

void addTimerInLoop(Timer* timer);
void addTimerInLoop(int64_t timerId,
const std::shared_ptr<TimerCallback>& cb,
Timestamp when,
double interval);
void cancelInLoop(TimerId timerId);
// called when timerfd alarms
void handleRead();
// move out all expired timers
std::vector<Entry> getExpired(Timestamp now);
void reset(const std::vector<Entry>& expired, Timestamp now);

bool insert(Timer* timer);
bool insert(int64_t timerId);

EventLoop* loop_;
AtomicInt64 nextTimerId_;
const int timerfd_;
Channel timerfdChannel_;
// Timer list sorted by expiration
TimerList timers_;
TimerMap timerMap_;

// for cancel()
ActiveTimerSet activeTimers_;
Expand Down