forked from Lonami/TelethonianBotExt
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlayer.py
More file actions
73 lines (58 loc) · 2.23 KB
/
layer.py
File metadata and controls
73 lines (58 loc) · 2.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import asyncio
import io
import sys
from telethon import functions
URI = b'telegramdesktop/tdesktop/dev/Telegram/Resources/tl/api.tl'
SEND_TO = 'TelethonOffTopic'
UPDATE_INTERVAL = 24 * 60 * 60
MESSAGE = '@lonami, [a new layer is available](https://github.com' \
'/telegramdesktop/tdesktop/blob/dev/Telegram/Resources/tl/api.tl)'
async def fetch():
# We could use `aiohttp` but this is more fun
rd, wr = await asyncio.open_connection('raw.githubusercontent.com', 443, ssl=True)
# Send HTTP request
wr.write(
b'GET /' + URI + b' HTTP/1.1\r\n'
b'Host: raw.githubusercontent.com\r\n'
b'\r\n'
)
await wr.drain()
# Get response headers
headers = await rd.readuntil(b'\r\n\r\n')
if headers[-4:] != b'\r\n\r\n':
raise ConnectionError('Connection closed')
# Ensure it's OK
if not headers.startswith(b'HTTP/1.1 200 OK'):
raise ValueError('Bad status code: {}'.format(headers[:headers.index(b'\r\n')]))
# Figure out Content-Length to read
index = headers.index(b'Content-Length:') + 16
length = int(headers[index:headers.index(b'\r', index)])
result = await rd.readexactly(length)
# Properly close the writer
wr.close()
if sys.version_info >= (3, 7):
await wr.wait_closed()
return result
async def init(bot):
last_hash = hash(await fetch())
async def check_feed():
nonlocal last_hash
while bot.is_connected():
contents = await fetch()
if hash(contents) != last_hash:
file = io.BytesIO(contents)
file.name = 'scheme.txt'
message = await bot.send_file(SEND_TO, file, caption=MESSAGE)
await bot(functions.messages.UpdatePinnedMessageRequest(SEND_TO, message.id, silent=True))
last_hash = hash(contents)
# Wait until we disconnect or a timeout occurs
try:
await asyncio.wait_for(
bot.disconnected,
timeout=UPDATE_INTERVAL,
loop=bot.loop
)
except asyncio.TimeoutError:
pass
# TODO This task is not properly terminated on disconnect
bot.loop.create_task(check_feed())