Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 55 additions & 39 deletions src/use_notify/decorator/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""

import asyncio
from contextvars import ContextVar
import functools
import inspect
import logging
Expand All @@ -19,13 +20,16 @@

logger = logging.getLogger(__name__)

# 全局默认通知实例
_default_notify_instance: Optional[Notify] = None
# 默认通知实例,按当前执行上下文隔离,避免线程/任务间相互污染
_default_notify_instance_var: ContextVar[Optional[Notify]] = ContextVar(
"use_notify_default_instance",
default=None,
)
RetriableExceptionsInput = Optional[Sequence[Type[BaseException]]]


def set_default_notify_instance(notify_instance: Notify) -> None:
"""设置全局默认通知实例
"""设置当前执行上下文的默认通知实例

Args:
notify_instance: 要设置为默认的 Notify 实例
Expand All @@ -41,27 +45,25 @@ def set_default_notify_instance(notify_instance: Notify) -> None:
def my_task():
return "任务完成"
"""
global _default_notify_instance
if not isinstance(notify_instance, Notify):
raise NotifyConfigError("notify_instance 必须是 Notify 类的实例")
_default_notify_instance = notify_instance
logger.info("已设置全局默认通知实例")
_default_notify_instance_var.set(notify_instance)
logger.info("已设置默认通知实例")


def get_default_notify_instance() -> Optional[Notify]:
"""获取全局默认通知实例
"""获取当前执行上下文的默认通知实例

Returns:
当前的默认通知实例,如果未设置则返回 None
"""
return _default_notify_instance
return _default_notify_instance_var.get()


def clear_default_notify_instance() -> None:
"""清除全局默认通知实例"""
global _default_notify_instance
_default_notify_instance = None
logger.info("已清除全局默认通知实例")
"""清除当前执行上下文的默认通知实例"""
_default_notify_instance_var.set(None)
logger.info("已清除默认通知实例")


class NotifyDecorator:
Expand Down Expand Up @@ -90,27 +92,16 @@ def __init__(
max_retries, retry_delay, retry_backoff, retriable_exceptions,
)

# 如果没有提供 notify_instance,尝试使用全局默认实例
if notify_instance is None:
notify_instance = get_default_notify_instance()
if notify_instance is None:
notify_instance = Notify()
logger.warning("未提供 notify_instance 且未设置全局默认实例,创建了一个空的 Notify 实例。请确保添加通知渠道或设置默认实例。")
else:
logger.debug("使用全局默认通知实例")

notify_instance = self._apply_retry_overrides(
notify_instance=notify_instance,
max_retries=max_retries,
retry_delay=retry_delay,
retry_backoff=retry_backoff,
retriable_exceptions=retriable_exceptions,
)

self.notify_instance = notify_instance
self.title = title
self.notify_on_success = notify_on_success
self.notify_on_error = notify_on_error
self.timeout = timeout
self.max_retries = max_retries
self.retry_delay = retry_delay
self.retry_backoff = retry_backoff
self.retriable_exceptions = retriable_exceptions
self._warned_missing_default_notify = False

# 创建消息格式化器
self.formatter = MessageFormatter(
Expand All @@ -120,12 +111,6 @@ def __init__(
include_result=include_result
)

# 创建通知发送器
self.sender = NotificationSender(
notify_instance=self.notify_instance,
timeout=timeout
)

def __call__(self, func: Callable) -> Callable:
"""装饰器调用"""
if inspect.iscoroutinefunction(func):
Expand Down Expand Up @@ -222,7 +207,8 @@ def _send_success_notification(self, context: ExecutionContext) -> None:
try:
message = self.formatter.format_success_message(context)
title = self.title or message["title"]
self.sender.send_notification(title, message["content"])
sender = self._build_sender()
sender.send_notification(title, message["content"])
except Exception as e:
logger.warning(f"发送成功通知失败: {e}")

Expand All @@ -231,7 +217,8 @@ async def _send_success_notification_async(self, context: ExecutionContext) -> N
try:
message = self.formatter.format_success_message(context)
title = self.title or message["title"]
await self.sender.send_notification_async(title, message["content"])
sender = self._build_sender()
await sender.send_notification_async(title, message["content"])
except Exception as e:
logger.warning(f"发送成功通知失败: {e}")

Expand All @@ -240,7 +227,8 @@ def _send_error_notification(self, context: ExecutionContext) -> None:
try:
message = self.formatter.format_error_message(context)
title = self.title or message["title"]
self.sender.send_notification(title, message["content"])
sender = self._build_sender()
sender.send_notification(title, message["content"])
except Exception as e:
logger.warning(f"发送错误通知失败: {e}")

Expand All @@ -249,9 +237,37 @@ async def _send_error_notification_async(self, context: ExecutionContext) -> Non
try:
message = self.formatter.format_error_message(context)
title = self.title or message["title"]
await self.sender.send_notification_async(title, message["content"])
sender = self._build_sender()
await sender.send_notification_async(title, message["content"])
except Exception as e:
logger.warning(f"发送错误通知失败: {e}")

def _build_sender(self) -> NotificationSender:
notify_instance = self._resolve_notify_instance()
return NotificationSender(notify_instance=notify_instance, timeout=self.timeout)

def _resolve_notify_instance(self) -> Notify:
notify_instance = self.notify_instance

if notify_instance is None:
notify_instance = get_default_notify_instance()
if notify_instance is None:
notify_instance = Notify()
if not self._warned_missing_default_notify:
logger.warning(
"未提供 notify_instance 且当前执行上下文未设置默认实例,创建了一个空的 Notify 实例。请确保添加通知渠道或设置默认实例。"
)
self._warned_missing_default_notify = True
else:
logger.debug("使用全局默认通知实例")

return self._apply_retry_overrides(
notify_instance=notify_instance,
max_retries=self.max_retries,
retry_delay=self.retry_delay,
retry_backoff=self.retry_backoff,
retriable_exceptions=self.retriable_exceptions,
)

def _validate_config(self, *args) -> None:
"""验证配置参数"""
Expand Down
60 changes: 26 additions & 34 deletions src/use_notify/decorator/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,61 +5,53 @@

import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError
from typing import Optional

from ..notification import Notify
from .exceptions import NotifySendError


logger = logging.getLogger(__name__)


class NotificationSender:
"""通知发送器"""

def __init__(
self,
notify_instance: Notify,
timeout: Optional[float] = None
):
self.notify_instance = notify_instance
self.timeout = timeout

def send_notification(self, title: str, content: str) -> None:
"""发送同步通知"""
try:
if self.timeout:
# 对于同步调用,我们使用 asyncio.wait_for 来实现超时
# 但这需要在异步上下文中运行,所以我们创建一个新的事件循环
# 使用线程池执行同步调用,实现超时控制
# 无论是否在异步事件循环中,都能正确应用超时
executor = ThreadPoolExecutor(max_workers=1)
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# 如果已经在事件循环中,直接调用
self.notify_instance.publish(title=title, content=content)
else:
# 如果不在事件循环中,使用 run_until_complete
loop.run_until_complete(
asyncio.wait_for(
self._send_async_internal(title, content),
timeout=self.timeout
)
)
except RuntimeError:
# 如果没有事件循环,创建一个新的
asyncio.run(
asyncio.wait_for(
self._send_async_internal(title, content),
timeout=self.timeout
)
future = executor.submit(
self.notify_instance.publish, title=title, content=content
)
future.result(timeout=self.timeout)
finally:
# 使用 shutdown(wait=False) 立即关闭线程池,不等待线程完成
executor.shutdown(wait=False)
else:
self.notify_instance.publish(title=title, content=content)

logger.info(f"通知发送成功: {title}")


except FutureTimeoutError:
error_msg = f"通知发送超时({self.timeout}秒)"
logger.warning(error_msg)
self._handle_send_error(asyncio.TimeoutError(error_msg))
except Exception as error:
self._handle_send_error(error)

async def send_notification_async(self, title: str, content: str) -> None:
"""发送异步通知"""
try:
Expand All @@ -70,24 +62,24 @@ async def send_notification_async(self, title: str, content: str) -> None:
)
else:
await self._send_async_internal(title, content)

logger.info(f"异步通知发送成功: {title}")

except Exception as error:
self._handle_send_error(error)

async def _send_async_internal(self, title: str, content: str) -> None:
"""内部异步发送方法"""
await self.notify_instance.publish_async(title=title, content=content)

def _handle_send_error(self, error: Exception) -> None:
"""处理发送错误"""
error_msg = f"通知发送失败: {str(error)}"
logger.warning(error_msg)

# 记录详细错误信息但不抛出异常,避免影响原函数执行
logger.debug(f"通知发送错误详情: {error}", exc_info=True)

# 可以选择是否抛出异常,根据需求决定
# 默认情况下不抛出,只记录日志
# raise NotifySendError(error_msg) from error
# raise NotifySendError(error_msg) from error
Loading