-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrss_bot.py
More file actions
356 lines (292 loc) · 11.7 KB
/
rss_bot.py
File metadata and controls
356 lines (292 loc) · 11.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
import configparser
from difflib import SequenceMatcher
import logging
from logging.handlers import RotatingFileHandler
import time
from threading import Thread
from bs4 import BeautifulSoup
from flask import Flask, send_from_directory
import defusedxml.ElementTree as ElemTree # Заменил стандартный парсер на безопасную версию.
import requests
# Configure configparser.
config = configparser.ConfigParser()
config.read('config.ini')
# Configure root logger.
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Create a rotating file handler.
file_handler = RotatingFileHandler('error.log', maxBytes=100000, backupCount=2,
encoding='utf-8') # 100000 bytes = 100 KB
console_handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s \t %(name)s \t %(levelname)s \t %(message)s', datefmt='%d-%m-%Y %H:%M:%S')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add the handler to the root logger.
logger.addHandler(file_handler)
logger.addHandler(console_handler)
# Убираем ненужные сообщения от werkzeug с Esc-последовательностями.
logging.getLogger("werkzeug").setLevel(logging.ERROR)
OUT_URL = 'https://lenta.ru/rss'
LOCAL_URL = "http://192.168.0.101:5000/images/"
FALLBACK_URL = LOCAL_URL + "fallback.jpg"
IMAGE_LIST = []
PATH_FOR_IMAGES = os.path.join(os.getcwd(), 'images')
from collections import deque
dq = deque(maxlen=200)
def sim(a: str, b: str) -> float:
"""
A function that calculates the similarity ratio between two input sequences.
Parameters:
a (any): The first input sequence.
b (any): The second input sequence.
Returns:
float: The similarity ratio between the two input sequences.
"""
return SequenceMatcher(None, a, b).ratio()
# def parse_text(url: str) -> str:
# """
# Parses the text content from the given URL and returns it.
#
# Parameters:
# url (str): The URL of the article to parse
#
# Returns:
# str: The parsed text content
#
# """
# if url is None: return ''
# article = Article(url, language='ru') # Create Article object for the given URL
# article.download() # Download the article content
# article.parse() # Parse the article
#
# # If no text is extracted, return an empty string
# if not article.text:
# return ''
#
# # Clean up the text content
# article_text = article.text.replace('\n\n', '\n')
# article_text = article_text.split('\n')[1:] # Remove the title
#
# # Add period at the end of each line if not present
# article_text = [line + '.' if line and not line.endswith('.') else line for line in article_text]
#
# try:
# # Check similarity between the first two lines and remove if similar
# similarity = sim(article_text[0], article_text[1])
# if similarity >= 0.3:
# article_text = article_text[1:]
# except Exception:
# logging.exception('Ошибка')
#
# # Find and remove the last line containing 'Ранее'
# ind = max([i for i, line in enumerate(article_text) if 'Ранее' in line], default=50)
# article_text = '\n'.join(line for line in article_text[:ind] if line) # Join non-empty lines with newline
#
# return article_text
def parse_text(url: str) -> str:
if not url: return ''
try:
session = requests.Session()
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
response = session.get(url, headers=headers, timeout=30, verify=True)
soup = BeautifulSoup(response.text, 'html.parser')
content = soup.find_all(class_='topic-body__content-text')
text = ' '.join([i.text for i in content if "Ранее" not in i.text])
return text
except Exception as e:
logging.exception('Exception in parse_text')
return ''
def fetch_rss_feed(url) -> bool:
"""Download and save RSS feed."""
logging.info('Downloading RSS feed...')
try:
with requests.get(url, timeout=5) as response:
response.raise_for_status()
with open('lenta.xml', 'wb') as f:
f.write(response.content)
logging.info('Successfully fetched Lenta RSS.')
return True
except Exception:
logging.exception(f'Exception in fetch_rss_feed({url})')
return False
def download_image(url: str) -> None:
"""Download single image."""
try:
filename = url.split('/')[-1]
filepath = os.path.join(PATH_FOR_IMAGES, filename)
# Skip if already exists
if os.path.exists(filepath):
logging.info(f'Image already exists: {filename}')
return
logging.info(f'Downloading image: {filename}')
response = requests.get(url, timeout=(5, 10), headers={"User-Agent": "Mozilla/5.0"})
response.raise_for_status()
with open(filepath, 'wb') as f:
f.write(response.content)
logging.info(f'Successfully downloaded: {filename}')
except Exception:
logging.exception(f"Failed to download image: {url}")
def download_images_from_queue():
"""Download images from queue."""
if not dq:
logging.info('No images to download')
return
# Get unique images not already downloaded
existing_files = set(os.listdir(PATH_FOR_IMAGES))
images_to_download = [
url for url in dq
if url.split('/')[-1] not in existing_files
]
if not images_to_download:
logging.info('All images already downloaded')
return
logging.info(f'Downloading {len(images_to_download)} images...')
# Download images in parallel
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(download_image, url) for url in images_to_download]
for future in as_completed(futures):
future.result() # This will raise any exceptions
logging.info('Image download completed')
# def download_image(dq):
# if not dq:
# return
#
# list_of_files = set(os.listdir(PATH_FOR_IMAGES))
# print(f'{list_of_files=}')
# print(len(list_of_files))
# # dif = set([filename.split('/')[-1] for filename in dq]).difference(list_of_files)
# dif = [i for i in dq if i.split('/')[-1] not in list_of_files]
#
# print(f'{dif=}')
# print(len(dif))
# # breakpoint()
#
# for url in dif:
# # namefile = url.split('/')[-1]
# # если уже скачано — используем
# # if os.path.exists(f'images/{namefile}'):
# # continue
# # url = OUT_URL + filename
# try:
# logging.info(f'Downloading {url}')
# r = requests.get(url, timeout=(5, 10), headers={"User-Agent": "Mozilla/5.0"})
# r.raise_for_status()
# path = os.path.join(PATH_FOR_IMAGES, url.split('/')[-1])
# with open(path, 'wb') as f:
# f.write(r.content)
# logging.info(f'Successfully downloaded image.')
# time.sleep(2)
#
# except Exception:
# logging.exception(f"download_image failed: {url}")
#
# # global IMAGE_LIST
# # IMAGE_LIST = []
# logging.info('Downloading images done.')
def process_item(item):
try:
# Извлекаем:
category = item.findtext('category', default='')
if category in ('Путешествия', 'Спорт'):
return
# item.remove('category')
title = item.findtext('title', default='')
print(title)
link = item.findtext('link', default='')
image_url = item.find('enclosure').get('url')
if image_url.endswith('.jpg'):
IMAGE_LIST.append(image_url)
dq.append(image_url)
local_image_url = LOCAL_URL + image_url.split('/')[-1]
for element in list(item):
if element.tag in ('author', 'category', 'guid', 'enclosure'):
item.remove(element)
if element.tag == 'description' and len(element.text) < 10:
# element.text = f'{parse_text(link)}' # Parse and update description if condition is met
img_html = f'<img src="{local_image_url}" style="width:100%; height:auto; display:block; margin-bottom:10px;" />'
# img_html = f'<img src="{local_image_url}"/>'
# print(img_html)
# element.text = fr'<![CDATA[{img_html}{parse_text(link)}]]>'
cdata_content = f'{img_html}</br>{parse_text(link)}'
element.text = cdata_content
# if element.tag == 'enclosure':
# element.set('url', local_image_url)
except Exception:
logging.exception(f"Ошибка:")
def process_xml_content():
tree = ElemTree.parse('lenta.xml') # Parse the XML file
root = tree.getroot() # Get the root of the XML tree
items = list(root.iter("item"))
clear_items_dict = {}
try:
for item in items:
# for item in items[10]:
title = item.findtext('title')
if item.findtext('category') in ('Путешествия', 'Спорт'):
continue
if title in clear_items_dict:
continue
clear_items_dict[title] = item
with ThreadPoolExecutor(max_workers=6) as executor:
futures = [executor.submit(process_item, item) for item in clear_items_dict.values()]
for f in as_completed(futures):
_ = f.result()
tree.write('output.xml', encoding='utf-8')
logging.info(f'RSS parsed successfully! Processed {len(clear_items_dict)} items.')
return True
except Exception:
logging.exception("Error processing XML content")
return False
def parse_lenta_rss() -> None:
"""Function to parse the RSS feed from Lenta.ru."""
while True:
start = time.time()
try:
# 1. Fetch RSS.
if not fetch_rss_feed(OUT_URL):
time.sleep(60)
continue
# 2. Parse and process XML.
if not process_xml_content():
time.sleep(60)
continue
mes = f'Elapsed time: {time.time() - start}'
logging.info(mes)
if dq:
logging.info('Starting background image download...')
# download_thread = Thread(target=download_images_from_queue, daemon=True)
download_thread = Thread(target=download_images_from_queue)
download_thread.start()
print(download_thread.is_alive())
except Exception as e:
logging.exception(f'Unexpected error in main loop: {e}')
time.sleep(60 * 60) # Wait 1 hour
rss_thread = Thread(target=parse_lenta_rss)
rss_thread.start()
app = Flask(__name__)
@app.route('/')
def index_route() -> str:
"""A function that returns a message based on whether a thread is alive."""
status = '🟢' if rss_thread.is_alive() else '🔴'
return status
@app.route('/rss')
def rss_route():
try:
with open('output.xml', 'r', encoding='utf-8') as f:
rss = f.readlines()
return ''.join(rss) # rss
except FileNotFoundError:
return 'output.xml not available', 404
@app.route("/images/<path:filename>")
def images_route(filename):
try:
return send_from_directory("images", filename, mimetype="image/jpeg")
except FileNotFoundError:
return 'File not found', 404
if __name__ == '__main__':
host = config['settings']['host']
port = config['settings'].getint('port')
logging.info(f'Starting Flask server on {host}:{port}')
app.run(debug=False, host=host, port=port)