Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion include/Plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ class PlanAction
return _id;
}

void id(int i)
{
_id = i;
}

PlanState state() const
{
return _state;
Expand Down Expand Up @@ -178,7 +183,7 @@ class Plan : public ObjectSQL, public ObjectXML
return _actions;
}

PlanAction* get_next_action();
void get_ready_actions(std::vector<PlanAction *>& ready_actions);

/**
* Mark action as finished, return false if the action is not in the plan
Expand All @@ -200,6 +205,15 @@ class Plan : public ObjectSQL, public ObjectXML
*/
void count_actions(int &cluster_actions, std::map<int, int>& host_actions);

/**
* Merge new actions from an XML plan into the current plan, skipping VMs
* that already have a READY or APPLYING action. Used to append newly
* scheduled VMs to a plan that is already being applied.
*
* @param xml XML string of the incoming plan
*/
void merge_actions(const std::string& xml);

private:
friend class PlanPool;

Expand Down
73 changes: 69 additions & 4 deletions src/schedm/Plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,15 @@ std::string PlanAction::to_xml() const
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */

PlanAction* Plan::get_next_action()
void Plan::get_ready_actions(std::vector<PlanAction *>& ready_actions)
{
for (auto& action : _actions)
{
if (action.state() == PlanState::READY)
{
return &action;
ready_actions.push_back(&action);
}
}

return nullptr;
}

/* -------------------------------------------------------------------------- */
Expand Down Expand Up @@ -220,6 +218,73 @@ void Plan::timeout_actions(int timeout)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */

void Plan::merge_actions(const std::string& xml)
{
Plan incoming;

if (incoming.from_xml(xml) != 0)
{
NebulaLog::error("PLM", "merge_actions: failed to parse incoming plan XML");
return;
}

// Remove terminal actions to keep the plan lean. This prevents unbounded
// growth when many scheduling cycles merge into a long-running plan and
// ensures check_completed() is not blocked by already-finished actions.
// NOTE: do NOT touch APPLYING actions — their IDs are stored in VM history
// records and must remain stable so action_finished() callbacks match.
std::vector<PlanAction> active_actions;

for (const auto& a : _actions)
{
if (a.state() != PlanState::DONE &&
a.state() != PlanState::ERROR &&
a.state() != PlanState::TIMEOUT)
{
active_actions.push_back(a);
}
}

_actions = std::move(active_actions);

// Build set of VM IDs that already have an active (READY or APPLYING) action
std::set<int> active_vms;

// Find the highest ID in use so new actions don't collide with any
// in-flight APPLYING actions whose IDs are stored in VM history records.
int next_id = 0;

for (const auto& a : _actions)
{
active_vms.insert(a.vm_id());
next_id = std::max(next_id, a.id() + 1);
}

// Append only new VMs not already present in the plan
int appended = 0;

for (const auto& a : incoming.actions())
{
if (active_vms.count(a.vm_id()) != 0)
{
continue;
}

_actions.push_back(a);
_actions.back().id(next_id++);
appended++;
}

if (appended > 0)
{
NebulaLog::info("PLM", "merge_actions: appended " + std::to_string(appended)
+ " new actions to the running placement plan");
}
}

/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */

void Plan::count_actions(int &cluster_actions, std::map<int, int>& host_actions)
{
for (const auto& a : _actions)
Expand Down
27 changes: 18 additions & 9 deletions src/schedm/PlanManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,20 @@ void PlanManager::add_plan(const string& xml)
{
if (plan.cid() == -1)
{
NebulaLog::info("PLM", "Adding new placement plan");

if (cplan->state() == PlanState::APPLYING)
{
NebulaLog::info("PLM", "Cannot add plan. A placement plan is already in progress.");
return;
NebulaLog::info("PLM", "Merging new actions into running placement plan.");

cplan->merge_actions(xml);
}
else
{
NebulaLog::info("PLM", "Adding new placement plan");

cplan->from_xml(xml);
cplan->from_xml(xml);

cplan->state(PlanState::APPLYING);
cplan->state(PlanState::APPLYING);
}
}
else if (auto cluster = cluster_pool->get_ro(plan.cid()))
{
Expand Down Expand Up @@ -404,18 +407,24 @@ void PlanManager::execute_plan(Plan& plan)
// Update counter, num of running actions per host, per cluster
map<int, int> host_actions;
int cluster_actions = 0;
std::vector<PlanAction *> ready_actions;

plan.count_actions(cluster_actions, host_actions);
plan.get_ready_actions(ready_actions);

// Execute plan actions
while (auto action = plan.get_next_action())
for (auto& action : ready_actions)
{
if (host_actions[action->host_id()] >= max_actions_per_host
|| cluster_actions >= max_actions_per_cluster)
if (cluster_actions >= max_actions_per_cluster)
{
break;
}

if (host_actions[action->host_id()] >= max_actions_per_host)
{
continue;
}

if (start_action(plan.cid(), *action))
{
host_actions[action->host_id()]++;
Expand Down
4 changes: 2 additions & 2 deletions src/schedm/SchedulerManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ void SchedulerManager::trigger_place()
return;
}

//TODO Check a PLACE planning is not being applied
// New actions will be merged into the running plan if one exists
//send place request to driver, reset window
last_place = the_time;
wnd_start = 0;
Expand Down Expand Up @@ -288,7 +288,7 @@ void SchedulerManager::timer_action()

NebulaLog::ddebug("SCMT", oss.str());

//TODO Check there is no placement plan active
// New actions will be merged into the running plan if one exists

if (!expired && !pending)
{
Expand Down