diff --git a/be/src/exec/pipeline/pipeline_task.cpp b/be/src/exec/pipeline/pipeline_task.cpp index dfbb09551457b9..064e34fe2ca07c 100644 --- a/be/src/exec/pipeline/pipeline_task.cpp +++ b/be/src/exec/pipeline/pipeline_task.cpp @@ -322,6 +322,11 @@ bool PipelineTask::is_blockable() const { _sink->is_blockable(_state); } +void PipelineTask::_stop_accepting_submit() { + std::unique_lock lock(_blockable_check_lock); + _accept_submit = false; +} + bool PipelineTask::_is_blocked() { // `_dry_run = true` means we do not need data from source operator. if (!_dry_run) { @@ -887,6 +892,7 @@ Status PipelineTask::finalize() { return Status::OK(); } SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(fragment->get_query_ctx()->query_mem_tracker()); + _stop_accepting_submit(); // Synchronize with unblock_all_dependencies() before clearing state used by wake_up()->submit(). std::unique_lock lock(_dependency_lifecycle_lock); RETURN_IF_ERROR(_state_transition(State::FINALIZED)); @@ -901,6 +907,9 @@ Status PipelineTask::finalize() { } Status PipelineTask::close(Status exec_status, bool close_sink) { + if (close_sink) { + _stop_accepting_submit(); + } int64_t close_ns = 0; Status s; { diff --git a/be/src/exec/pipeline/pipeline_task.h b/be/src/exec/pipeline/pipeline_task.h index 2268c00a4c7fc5..dbe5d57f23dafa 100644 --- a/be/src/exec/pipeline/pipeline_task.h +++ b/be/src/exec/pipeline/pipeline_task.h @@ -19,6 +19,7 @@ #include #include +#include #include #include @@ -187,6 +188,10 @@ class PipelineTask : public std::enable_shared_from_this { PipelineTask() : _index(0) {} private: + friend class HybridTaskScheduler; + + void _stop_accepting_submit(); + // Whether this task is blocked before execution (FE 2-phase commit trigger, runtime filters) bool _wait_to_start(); // Whether this task is blocked during execution (read dependency, write dependency) @@ -276,6 +281,10 @@ class PipelineTask : public std::enable_shared_from_this { // call wake_up() and submit this task, so close()/finalize() must not clear operator/shared // state until forced unblocking finishes. wake_up() must not take this lock. std::mutex _dependency_lifecycle_lock; + // Guards _accept_submit and keeps HybridTaskScheduler::submit() from reading _sink/_operators + // in is_blockable() while terminal close/finalize is closing the submit gate. + std::mutex _blockable_check_lock; + bool _accept_submit = true; std::atomic _running {false}; std::atomic _eos {false}; diff --git a/be/src/exec/pipeline/task_scheduler.cpp b/be/src/exec/pipeline/task_scheduler.cpp index a565d5d54ebce6..8e5150370ed257 100644 --- a/be/src/exec/pipeline/task_scheduler.cpp +++ b/be/src/exec/pipeline/task_scheduler.cpp @@ -168,7 +168,15 @@ void TaskScheduler::stop() { } Status HybridTaskScheduler::submit(PipelineTaskSPtr task) { - if (task->is_blockable()) { + bool blockable = false; + { + std::unique_lock blockable_check_lock(task->_blockable_check_lock); + if (!task->_accept_submit) { + return Status::OK(); + } + blockable = task->is_blockable(); + } + if (blockable) { return _blocking_scheduler.submit(task); } else { return _simple_scheduler.submit(task); diff --git a/be/test/exec/pipeline/pipeline_task_test.cpp b/be/test/exec/pipeline/pipeline_task_test.cpp index d6294825288949..3c4adb63fc21dc 100644 --- a/be/test/exec/pipeline/pipeline_task_test.cpp +++ b/be/test/exec/pipeline/pipeline_task_test.cpp @@ -18,6 +18,7 @@ #include #include +#include #include #include #include @@ -113,6 +114,26 @@ class BlockableSubmitTaskScheduler : public MockTaskScheduler { std::function on_submit; }; +class CountingBlockableSinkOperator final : public DataSinkOperatorX { +public: + CountingBlockableSinkOperator(int op_id, int node_id, int dest_id, + std::atomic* blockable_checks) + : DataSinkOperatorX(op_id, node_id, dest_id), + _blockable_checks(blockable_checks) {} + + Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override { + return Status::OK(); + } + + bool is_blockable(RuntimeState* state) const override { + _blockable_checks->fetch_add(1, std::memory_order_relaxed); + return DataSinkOperatorX::is_blockable(state); + } + +private: + std::atomic* _blockable_checks; +}; + TEST_F(PipelineTaskTest, TEST_CONSTRUCTOR) { auto num_instances = 1; auto pip_id = 0; @@ -662,6 +683,87 @@ TEST_F(PipelineTaskTest, TEST_WAKE_UP_SUBMIT_PROTECTED_FROM_FINALIZE) { EXPECT_TRUE(task->_operators.empty()); } +TEST_F(PipelineTaskTest, TEST_CLOSED_TASK_REJECTS_HYBRID_SUBMIT_BEFORE_FINALIZE) { + auto num_instances = 1; + auto pip_id = 0; + auto task_id = 0; + auto pip = std::make_shared(pip_id, num_instances, num_instances); + OperatorPtr source_op; + source_op.reset(new DummyOperator()); + EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok()); + + int op_id = 1; + int node_id = 2; + int dest_id = 3; + std::atomic blockable_checks = 0; + DataSinkOperatorPtr sink_op; + sink_op.reset(new CountingBlockableSinkOperator(op_id, node_id, dest_id, &blockable_checks)); + EXPECT_TRUE(pip->set_sink(sink_op).ok()); + + auto profile = std::make_shared("Pipeline : " + std::to_string(pip_id)); + std::map, std::vector>>> + shared_state_map; + _runtime_state->resize_op_id_to_local_state(-1); + auto task = std::make_shared(pip, task_id, _runtime_state.get(), _context, + profile.get(), shared_state_map, task_id); + task->_exec_time_slice = 10'000'000'000ULL; + + std::vector scan_range; + int sender_id = 0; + TDataSink tsink; + EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok()); + EXPECT_TRUE(task->close(Status::OK()).ok()); + EXPECT_EQ(task->_exec_state, PipelineTask::State::FINISHED); + EXPECT_NE(task->_sink, nullptr); + EXPECT_FALSE(task->_operators.empty()); + EXPECT_FALSE(task->_accept_submit); + + HybridTaskScheduler scheduler(1, 1, "test_hybrid_task_scheduler", nullptr); + EXPECT_TRUE(scheduler.submit(task).ok()); + EXPECT_EQ(blockable_checks.load(std::memory_order_relaxed), 0); + scheduler.stop(); + EXPECT_TRUE(task->finalize().ok()); +} + +TEST_F(PipelineTaskTest, TEST_FINALIZED_TASK_REJECTS_HYBRID_SUBMIT) { + auto num_instances = 1; + auto pip_id = 0; + auto task_id = 0; + auto pip = std::make_shared(pip_id, num_instances, num_instances); + OperatorPtr source_op; + source_op.reset(new DummyOperator()); + EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok()); + + int op_id = 1; + int node_id = 2; + int dest_id = 3; + DataSinkOperatorPtr sink_op; + sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id)); + EXPECT_TRUE(pip->set_sink(sink_op).ok()); + + auto profile = std::make_shared("Pipeline : " + std::to_string(pip_id)); + std::map, std::vector>>> + shared_state_map; + _runtime_state->resize_op_id_to_local_state(-1); + auto task = std::make_shared(pip, task_id, _runtime_state.get(), _context, + profile.get(), shared_state_map, task_id); + task->_exec_time_slice = 10'000'000'000ULL; + + std::vector scan_range; + int sender_id = 0; + TDataSink tsink; + EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok()); + EXPECT_TRUE(task->close(Status::OK()).ok()); + EXPECT_TRUE(task->finalize().ok()); + EXPECT_EQ(task->_sink, nullptr); + + HybridTaskScheduler scheduler(1, 1, "test_hybrid_task_scheduler", nullptr); + EXPECT_TRUE(scheduler.submit(task).ok()); + scheduler.stop(); +} + TEST_F(PipelineTaskTest, TEST_SINK_FINISHED) { auto num_instances = 1; auto pip_id = 0;