|
1 | | -// Copyright 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. |
| 1 | +// Copyright 2021-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. |
2 | 2 | // |
3 | 3 | // Redistribution and use in source and binary forms, with or without |
4 | 4 | // modification, are permitted provided that the following conditions |
@@ -595,6 +595,10 @@ Stub::Initialize(bi::managed_external_buffer::handle_t map_handle) |
595 | 595 | model_instance_.attr("initialize")(model_config_params); |
596 | 596 | } |
597 | 597 |
|
| 598 | + // Cache whether is_ready() function defined in the Python model. |
| 599 | + ipc_control_->stub_has_user_model_readiness_fn = |
| 600 | + py::hasattr(model_instance_, "is_ready"); |
| 601 | + |
598 | 602 | initialized_ = true; |
599 | 603 | } |
600 | 604 |
|
@@ -1350,6 +1354,9 @@ Stub::ParentToStubMQMonitor() |
1350 | 1354 | case PYTHONSTUB_CommandType::PYTHONSTUB_InferStreamExecResponse: { |
1351 | 1355 | ProcessBLSResponseDecoupled(ipc_message); |
1352 | 1356 | } break; |
| 1357 | + case PYTHONSTUB_CommandType::PYTHONSTUB_UserModelReadinessRequest: { |
| 1358 | + ProcessUserModelReadinessRequest(ipc_message); |
| 1359 | + } break; |
1353 | 1360 | default: |
1354 | 1361 | break; |
1355 | 1362 | } |
@@ -1573,6 +1580,101 @@ Stub::ProcessBLSResponseDecoupled(std::unique_ptr<IPCMessage>& ipc_message) |
1573 | 1580 | } |
1574 | 1581 | } |
1575 | 1582 |
|
| 1583 | +void |
| 1584 | +Stub::ProcessUserModelReadinessRequest(std::unique_ptr<IPCMessage>& ipc_message) |
| 1585 | +{ |
| 1586 | + AllocatedSharedMemory<UserModelReadinessMessage> readiness_message; |
| 1587 | + UserModelReadinessMessage* readiness_payload = nullptr; |
| 1588 | + try { |
| 1589 | + readiness_message = |
| 1590 | + shm_pool_->Load<UserModelReadinessMessage>(ipc_message->Args()); |
| 1591 | + readiness_payload = readiness_message.data_.get(); |
| 1592 | + } |
| 1593 | + catch (const PythonBackendException& pb_exception) { |
| 1594 | + LOG_ERROR << "Failed to process model readiness request: " |
| 1595 | + << pb_exception.what(); |
| 1596 | + return; |
| 1597 | + } |
| 1598 | + |
| 1599 | + if (ipc_message->ResponseMutex() == nullptr) { |
| 1600 | + LOG_ERROR << "Failed to process model readiness request"; |
| 1601 | + return; |
| 1602 | + } |
| 1603 | + |
| 1604 | + bool is_ready = true; |
| 1605 | + bool function_exists = false; |
| 1606 | + bool has_exception = false; |
| 1607 | + std::string error_string; |
| 1608 | + |
| 1609 | + try { |
| 1610 | + py::gil_scoped_acquire acquire; |
| 1611 | + |
| 1612 | + function_exists = py::hasattr(model_instance_, "is_ready"); |
| 1613 | + if (!function_exists) { |
| 1614 | + is_ready = true; |
| 1615 | + } else { |
| 1616 | + py::object result = model_instance_.attr("is_ready")(); |
| 1617 | + |
| 1618 | + bool is_coroutine = py::module::import("asyncio") |
| 1619 | + .attr("iscoroutine")(result) |
| 1620 | + .cast<bool>(); |
| 1621 | + if (is_coroutine) { |
| 1622 | + result = RunCoroutine(result, false /* in_background */); |
| 1623 | + } |
| 1624 | + |
| 1625 | + if (!py::isinstance<py::bool_>(result)) { |
| 1626 | + throw PythonBackendException("is_ready() must return a boolean value"); |
| 1627 | + } |
| 1628 | + |
| 1629 | + is_ready = result.cast<bool>(); |
| 1630 | + } |
| 1631 | + } |
| 1632 | + catch (const PythonBackendException& pb_exception) { |
| 1633 | + has_exception = true; |
| 1634 | + error_string = pb_exception.what(); |
| 1635 | + } |
| 1636 | + catch (const py::error_already_set& error) { |
| 1637 | + has_exception = true; |
| 1638 | + error_string = error.what(); |
| 1639 | + } |
| 1640 | + |
| 1641 | + // Populate response payload |
| 1642 | + readiness_payload->function_exists = function_exists; |
| 1643 | + readiness_payload->is_ready = has_exception ? false : is_ready; |
| 1644 | + readiness_payload->has_error = has_exception; |
| 1645 | + readiness_payload->is_error_set = false; |
| 1646 | + readiness_payload->error = 0; |
| 1647 | + |
| 1648 | + if (has_exception) { |
| 1649 | + std::unique_ptr<PbString> error_string_shm; |
| 1650 | + LOG_IF_EXCEPTION( |
| 1651 | + error_string_shm = PbString::Create(shm_pool_, error_string)); |
| 1652 | + if (error_string_shm != nullptr) { |
| 1653 | + readiness_payload->is_error_set = true; |
| 1654 | + readiness_payload->error = error_string_shm->ShmHandle(); |
| 1655 | + } |
| 1656 | + } |
| 1657 | + |
| 1658 | + // Signal parent process that response is ready |
| 1659 | + { |
| 1660 | + bi::scoped_lock<bi::interprocess_mutex> lock{ |
| 1661 | + *(ipc_message->ResponseMutex())}; |
| 1662 | + readiness_payload->waiting_on_stub = true; |
| 1663 | + ipc_message->ResponseCondition()->notify_all(); |
| 1664 | + |
| 1665 | + // Wait for parent ack with timeout to avoid deadlock |
| 1666 | + boost::posix_time::ptime timeout = |
| 1667 | + boost::get_system_time() + |
| 1668 | + boost::posix_time::milliseconds(kUserModelReadinessTimeoutMs); |
| 1669 | + while (readiness_payload->waiting_on_stub) { |
| 1670 | + if (!ipc_message->ResponseCondition()->timed_wait(lock, timeout)) { |
| 1671 | + readiness_payload->waiting_on_stub = false; |
| 1672 | + break; |
| 1673 | + } |
| 1674 | + } |
| 1675 | + } |
| 1676 | +} |
| 1677 | + |
1576 | 1678 | PYBIND11_EMBEDDED_MODULE(c_python_backend_utils, module) |
1577 | 1679 | { |
1578 | 1680 | py::class_<PbError, std::shared_ptr<PbError>> triton_error( |
|
0 commit comments