Skip to content

NafiTG4/BV-AI

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

12 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

import time import asyncio from collections import deque from typing import Optional, Callable, Any, Coroutine import logging

logger = logging.getLogger(name)

class SlidingWindowRateLimiter: """ Ensures that at most max_calls operations happen within a sliding window of period seconds. Uses asyncio lock and sleeps if necessary before proceeding. """ def init(self, max_calls: int, period: float = 1.0): self.max_calls = max_calls self.period = period self.timestamps = deque() self.lock = asyncio.Lock()

async def acquire(self):
    async with self.lock:
        now = time.monotonic()
        # Remove timestamps outside the window
        while self.timestamps and now - self.timestamps[0] >= self.period:
            self.timestamps.popleft()
        if len(self.timestamps) >= self.max_calls:
            # Wait until the oldest timestamp leaves the window
            wait_time = self.period - (now - self.timestamps[0]) + 0.001
            await asyncio.sleep(wait_time)
            now = time.monotonic()
            while self.timestamps and now - self.timestamps[0] >= self.period:
                self.timestamps.popleft()
        self.timestamps.append(now)

class RateLimitedRetrySender: """ Wraps an async sender function (like bot.send_message or message.copy) with a sliding window rate limiter and retry logic for transient failures. """ def init(self, limiter: SlidingWindowRateLimiter, max_retries: int = 3): self.limiter = limiter self.max_retries = max_retries

async def send(self, sender: Callable[..., Coroutine[Any, Any, Any]], *args, **kwargs) -> Any:
    """
    Call `sender(*args, **kwargs)` with rate limiting and retry on Telegram retry-after errors.
    `sender` is an async function like bot.send_message or update.message.copy.
    """
    last_exception = None
    for attempt in range(self.max_retries):
        try:
            await self.limiter.acquire()
            result = await sender(*args, **kwargs)
            return result
        except Exception as e:
            last_exception = e
            # Detect if it's a flood wait error from Telegram
            retry_after = self._extract_retry_after(e)
            if retry_after is not None:
                wait = retry_after
            else:
                # For other network errors, wait a short while
                wait = 1.0
            logger.warning(
                f"Sender failed (attempt {attempt+1}/{self.max_retries}): {e}. "
                f"Retrying after {wait:.1f}s."
            )
            await asyncio.sleep(wait)
    # All retries exhausted
    raise last_exception if last_exception else RuntimeError("Max retries exceeded")

@staticmethod
def _extract_retry_after(exception: Exception) -> Optional[float]:
    """Extract retry_after from Telegram's RetryAfter exception, if present."""
    # PTB's RetryAfter exception has a `retry_after` attribute
    if hasattr(exception, 'retry_after'):
        return float(exception.retry_after)
    return None

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors