Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions be/src/scheduling/cluster-membership-mgr-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,13 @@ TEST(ClusterMembershipMgrUnitTest, TestPopulateExpectedExecGroupSets) {
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.msg().GetFullMessageDetails(),
"Executor group set prefix specified multiple times: group-prefix1:10\n");

// Case 10: Star in FLAGS_expected_executor_group_sets
FLAGS_expected_executor_group_sets = "*";
expected_exec_group_sets.clear();
status = ClusterMembershipMgr::PopulateExpectedExecGroupSets(expected_exec_group_sets);
EXPECT_TRUE(status.ok());
EXPECT_EQ(expected_exec_group_sets.size(), 0);
}

/// This ensures that all executor group configuration scenarios possible using available
Expand Down Expand Up @@ -838,6 +845,30 @@ TEST(ClusterMembershipMgrUnitTest, PopulateExecutorMembershipRequest) {
EXPECT_EQ(update_req.exec_group_sets[1].exec_group_name_prefix, "bar");
snapshot_ptr->executor_groups.clear();
}

// Case 3: Using executor groups, expected_exec_group_sets is *
{
FLAGS_expected_executor_group_sets = "*";

ExecutorGroup exec_group("foo-group1", 1);
exec_group.AddExecutor(MakeBackendDescriptor(1, exec_group, 0));
snapshot_ptr->executor_groups.insert({exec_group.name(), exec_group});
ExecutorGroup exec_group2("bar-group1", 1);
exec_group2.AddExecutor(MakeBackendDescriptor(1, exec_group2, 1));
exec_group2.AddExecutor(MakeBackendDescriptor(2, exec_group2, 2));
snapshot_ptr->executor_groups.insert({exec_group2.name(), exec_group2});
ClusterMembershipMgr::SnapshotPtr ptr = snapshot_ptr;
PopulateExecutorMembershipRequest(ptr, empty_exec_group_sets, update_req);
EXPECT_EQ(update_req.exec_group_sets.size(), 2);
// reverse order is ok
EXPECT_EQ(update_req.exec_group_sets[1].curr_num_executors, 1);
EXPECT_EQ(update_req.exec_group_sets[1].expected_num_executors, -1);
EXPECT_EQ(update_req.exec_group_sets[1].exec_group_name_prefix, "foo-group1");
EXPECT_EQ(update_req.exec_group_sets[0].curr_num_executors, 2);
EXPECT_EQ(update_req.exec_group_sets[0].expected_num_executors, -1);
EXPECT_EQ(update_req.exec_group_sets[0].exec_group_name_prefix, "bar-group1");
snapshot_ptr->executor_groups.clear();
}
}

template <class T>
Expand Down
17 changes: 17 additions & 0 deletions be/src/scheduling/cluster-membership-mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,17 @@ void PopulateExecutorMembershipRequest(const ClusterMembershipMgr::SnapshotPtr&
}
}
} else {
if (FLAGS_expected_executor_group_sets == "*") {
LOG(INFO) <<
"Special case handling for FLAGS_expected_executor_group_sets == \"*\"";
for (const auto& it : snapshot->executor_groups) {
// order does not matter
exec_group_sets.emplace_back();
exec_group_sets.back().__set_exec_group_name_prefix(it.first);
// We set expected_num_executors to -1 to identify automation from Frontend
exec_group_sets.back().__set_expected_num_executors(-1);
}
} else
if (expected_exec_group_sets.empty()) {
// Add a default exec group set if no expected group sets were specified.
exec_group_sets.emplace_back();
Expand All @@ -745,6 +756,7 @@ void PopulateExecutorMembershipRequest(const ClusterMembershipMgr::SnapshotPtr&
exec_group_sets.insert(exec_group_sets.begin(), expected_exec_group_sets.begin(),
expected_exec_group_sets.end());
}

int matching_exec_groups_found = 0;
for (auto& set : exec_group_sets) {
int max_num_executors = -1;
Expand Down Expand Up @@ -788,6 +800,11 @@ Status ClusterMembershipMgr::PopulateExpectedExecGroupSets(
expected_exec_group_sets.clear();
std::unordered_set<string> parsed_group_prefixes;
vector<StringPiece> groups;

if (FLAGS_expected_executor_group_sets == "*") {
return Status::OK();
}

groups = strings::Split(FLAGS_expected_executor_group_sets, ",", strings::SkipEmpty());
if (groups.empty()) return Status::OK();

Expand Down
3 changes: 2 additions & 1 deletion be/src/service/impala-server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,8 @@ DEFINE_string(expected_executor_group_sets, "",
"prefix1-group1, prefix1-group2, etc. The expected group size (number of executors "
"in each group) is used during planning when no healthy executor group is available. "
"If this flag is used then any executor groups that do not map to the specified group"
" sets will never be used to schedule queries.");
" sets will never be used to schedule queries. If this flag set to “*“ will populate "
"all healthy resource groups.");

// TODO: can we automatically choose a startup grace period based on the max admission
// control queue timeout + some margin for error?
Expand Down
21 changes: 15 additions & 6 deletions fe/src/main/java/org/apache/impala/service/Frontend.java
Original file line number Diff line number Diff line change
Expand Up @@ -2113,12 +2113,21 @@ public static List<TExecutorGroupSet> setupThresholdsForExecutorGroupSets(

// Executor groups exist in the cluster. Identify those that can be used.
for (TExecutorGroupSet e : executorGroupSets) {
// If defined, request_pool can be a suffix of the group name prefix. For example
// group_set_prefix = root.queue1
// request_pool = queue1
if (StringUtils.isNotEmpty(request_pool)
&& !e.getExec_group_name_prefix().endsWith(request_pool)) {
continue;
if (StringUtils.isNotEmpty(request_pool)) {
// If defined, request_pool can be a suffix of the group name prefix. For example
// group_set_prefix = root.queue1
// request_pool = queue1
if (!e.getExec_group_name_prefix().endsWith(request_pool)
&& e.getExpected_num_executors() >= 0) {
continue;
}
// in case of automation (we set expected_num_executors == -1 in that case) we will have
// group_set_prefix = queue1-1-2-3
// request_pool = queue1
if (!e.getExec_group_name_prefix().startsWith(request_pool)
&& e.getExpected_num_executors() < 0) {
continue;
}
}
TExecutorGroupSet new_entry = new TExecutorGroupSet(e);
if (poolService != null) {
Expand Down