Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions be/src/exec/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,11 @@ bool PipelineTask::is_blockable() const {
_sink->is_blockable(_state);
}

void PipelineTask::_stop_accepting_submit() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我们调用这个is block 方法的时候,一定有整个fragment context 的shared ptr,为啥会出现析构的问题?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is blockable里调的是sink operator这些东西,PipelineTask在finalize/close里会释放相关资源

std::unique_lock<std::mutex> lock(_blockable_check_lock);
_accept_submit = false;
}

bool PipelineTask::_is_blocked() {
// `_dry_run = true` means we do not need data from source operator.
if (!_dry_run) {
Expand Down Expand Up @@ -887,6 +892,7 @@ Status PipelineTask::finalize() {
return Status::OK();
}
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(fragment->get_query_ctx()->query_mem_tracker());
_stop_accepting_submit();
// Synchronize with unblock_all_dependencies() before clearing state used by wake_up()->submit().
std::unique_lock<std::mutex> lock(_dependency_lifecycle_lock);
RETURN_IF_ERROR(_state_transition(State::FINALIZED));
Expand All @@ -901,6 +907,9 @@ Status PipelineTask::finalize() {
}

Status PipelineTask::close(Status exec_status, bool close_sink) {
if (close_sink) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在finalize 里加了之后,这里为什么还要加?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

op->close / sink->close 这些东西本质也是写操作,担心会让is blockable这个读操作不安全。而且理论上进close sink了也不需要再submit了

不过实际执行逻辑是 [close_task] -> [task->close] -> [task->finalize] 。可以只在close sink里加,finalize里不加

_stop_accepting_submit();
}
int64_t close_ns = 0;
Status s;
{
Expand Down
9 changes: 9 additions & 0 deletions be/src/exec/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <cstdint>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

Expand Down Expand Up @@ -187,6 +188,10 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
PipelineTask() : _index(0) {}

private:
friend class HybridTaskScheduler;

void _stop_accepting_submit();

// Whether this task is blocked before execution (FE 2-phase commit trigger, runtime filters)
bool _wait_to_start();
// Whether this task is blocked during execution (read dependency, write dependency)
Expand Down Expand Up @@ -276,6 +281,10 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
// call wake_up() and submit this task, so close()/finalize() must not clear operator/shared
// state until forced unblocking finishes. wake_up() must not take this lock.
std::mutex _dependency_lifecycle_lock;
// Guards _accept_submit and keeps HybridTaskScheduler::submit() from reading _sink/_operators
// in is_blockable() while terminal close/finalize is closing the submit gate.
std::mutex _blockable_check_lock;
bool _accept_submit = true;

std::atomic<bool> _running {false};
std::atomic<bool> _eos {false};
Expand Down
10 changes: 9 additions & 1 deletion be/src/exec/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,15 @@ void TaskScheduler::stop() {
}

Status HybridTaskScheduler::submit(PipelineTaskSPtr task) {
if (task->is_blockable()) {
bool blockable = false;
{
std::unique_lock<std::mutex> blockable_check_lock(task->_blockable_check_lock);
if (!task->_accept_submit) {
return Status::OK();
}
blockable = task->is_blockable();
}
if (blockable) {
return _blocking_scheduler.submit(task);
} else {
return _simple_scheduler.submit(task);
Expand Down
102 changes: 102 additions & 0 deletions be/test/exec/pipeline/pipeline_task_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <glog/logging.h>
#include <gtest/gtest.h>

#include <atomic>
#include <chrono>
#include <functional>
#include <future>
Expand Down Expand Up @@ -113,6 +114,26 @@ class BlockableSubmitTaskScheduler : public MockTaskScheduler {
std::function<void(PipelineTaskSPtr)> on_submit;
};

class CountingBlockableSinkOperator final : public DataSinkOperatorX<DummySinkLocalState> {
public:
CountingBlockableSinkOperator(int op_id, int node_id, int dest_id,
std::atomic<int>* blockable_checks)
: DataSinkOperatorX<DummySinkLocalState>(op_id, node_id, dest_id),
_blockable_checks(blockable_checks) {}

Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override {
return Status::OK();
}

bool is_blockable(RuntimeState* state) const override {
_blockable_checks->fetch_add(1, std::memory_order_relaxed);
return DataSinkOperatorX<DummySinkLocalState>::is_blockable(state);
}

private:
std::atomic<int>* _blockable_checks;
};

TEST_F(PipelineTaskTest, TEST_CONSTRUCTOR) {
auto num_instances = 1;
auto pip_id = 0;
Expand Down Expand Up @@ -662,6 +683,87 @@ TEST_F(PipelineTaskTest, TEST_WAKE_UP_SUBMIT_PROTECTED_FROM_FINALIZE) {
EXPECT_TRUE(task->_operators.empty());
}

TEST_F(PipelineTaskTest, TEST_CLOSED_TASK_REJECTS_HYBRID_SUBMIT_BEFORE_FINALIZE) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
OperatorPtr source_op;
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());

int op_id = 1;
int node_id = 2;
int dest_id = 3;
std::atomic<int> blockable_checks = 0;
DataSinkOperatorPtr sink_op;
sink_op.reset(new CountingBlockableSinkOperator(op_id, node_id, dest_id, &blockable_checks));
EXPECT_TRUE(pip->set_sink(sink_op).ok());

auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;

std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
EXPECT_TRUE(task->close(Status::OK()).ok());
EXPECT_EQ(task->_exec_state, PipelineTask::State::FINISHED);
EXPECT_NE(task->_sink, nullptr);
EXPECT_FALSE(task->_operators.empty());
EXPECT_FALSE(task->_accept_submit);

HybridTaskScheduler scheduler(1, 1, "test_hybrid_task_scheduler", nullptr);
EXPECT_TRUE(scheduler.submit(task).ok());
EXPECT_EQ(blockable_checks.load(std::memory_order_relaxed), 0);
scheduler.stop();
EXPECT_TRUE(task->finalize().ok());
}

TEST_F(PipelineTaskTest, TEST_FINALIZED_TASK_REJECTS_HYBRID_SUBMIT) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
OperatorPtr source_op;
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());

int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());

auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;

std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
EXPECT_TRUE(task->close(Status::OK()).ok());
Comment thread
BiteTheDDDDt marked this conversation as resolved.
EXPECT_TRUE(task->finalize().ok());
EXPECT_EQ(task->_sink, nullptr);

HybridTaskScheduler scheduler(1, 1, "test_hybrid_task_scheduler", nullptr);
EXPECT_TRUE(scheduler.submit(task).ok());
scheduler.stop();
}

TEST_F(PipelineTaskTest, TEST_SINK_FINISHED) {
auto num_instances = 1;
auto pip_id = 0;
Expand Down
Loading