-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathpolling_manager.py
More file actions
120 lines (110 loc) · 3.61 KB
/
polling_manager.py
File metadata and controls
120 lines (110 loc) · 3.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import asyncio
import functools
import logging
from asyncio import AbstractEventLoop, CancelledError, Task, get_running_loop
from contextvars import Context
from typing import Any, Awaitable, Dict, List, Optional
from aiogram import Bot
from aiogram.dispatcher.dispatcher import DEFAULT_BACKOFF_CONFIG, Dispatcher
from aiogram.types import User
from aiogram.utils.backoff import BackoffConfig
logger = logging.getLogger(__name__)
class PollingManager:
def __init__(self):
self.polling_tasks: Dict[int, Task] = {}
def _create_pooling_task(
self,
dp: Dispatcher,
bot: Bot,
polling_timeout: int,
handle_as_tasks: bool,
backoff_config: BackoffConfig,
allowed_updates: Optional[List[str]],
**kwargs: Any,
):
asyncio.create_task(
self._start_bot_polling(
dp=dp,
bot=bot,
polling_timeout=polling_timeout,
handle_as_tasks=handle_as_tasks,
backoff_config=backoff_config,
allowed_updates=allowed_updates,
**kwargs,
)
)
def start_bot_polling(
self,
dp: Dispatcher,
bot: Bot,
polling_timeout: int = 10,
handle_as_tasks: bool = True,
backoff_config: BackoffConfig = DEFAULT_BACKOFF_CONFIG,
allowed_updates: Optional[List[str]] = None,
**kwargs: Any,
):
loop: AbstractEventLoop = get_running_loop()
# noinspection PyArgumentList
loop.call_soon(
functools.partial(
self._create_pooling_task,
dp=dp,
bot=bot,
polling_timeout=polling_timeout,
handle_as_tasks=handle_as_tasks,
backoff_config=backoff_config,
allowed_updates=allowed_updates,
**kwargs,
),
context=Context(),
)
async def _start_bot_polling(
self,
dp: Dispatcher,
bot: Bot,
polling_timeout: int = 10,
handle_as_tasks: bool = True,
backoff_config: BackoffConfig = DEFAULT_BACKOFF_CONFIG,
allowed_updates: Optional[List[str]] = None,
on_bot_startup: Optional[Awaitable] = None,
on_bot_shutdown: Optional[Awaitable] = None,
**kwargs: Any,
):
logger.info("Start poling")
user: User = await bot.me()
if on_bot_startup:
await on_bot_startup
try:
logger.info(
"Run polling for bot @%s id=%d - %r",
user.username,
bot.id,
user.full_name,
)
polling_task = asyncio.create_task(
dp._polling(
bot=bot,
handle_as_tasks=handle_as_tasks,
polling_timeout=polling_timeout,
backoff_config=backoff_config,
allowed_updates=allowed_updates,
**kwargs,
)
)
self.polling_tasks[bot.id] = polling_task
await polling_task
except CancelledError:
logger.info("Polling task Canceled")
finally:
logger.info(
"Polling stopped for bot @%s id=%d - %r",
user.username,
bot.id,
user.full_name,
)
if on_bot_shutdown:
await on_bot_shutdown
await bot.session.close()
def stop_bot_polling(self, bot_id: int):
polling_task = self.polling_tasks.pop(bot_id)
polling_task.cancel()