-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathqueueHandler.py
More file actions
156 lines (144 loc) · 6.57 KB
/
queueHandler.py
File metadata and controls
156 lines (144 loc) · 6.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# # import time
# #
# #
# import random
# import asyncio
# import time
#
#
# async def my_function():
# print("Function called")
# with open('hello.txt', 'w') as file:
# # Write "Hello" to the file
# num = random.random()
# file.write("Hello" + str(num))
#
#
# async def print_active():
# while True:
# await my_function() # Call the function
# await asyncio.sleep(2) # Wait for 2 seconds
#
# #
# # print_active()
import os
import time
from urllib.parse import urlparse
import mysql
import requests
from bs4 import BeautifulSoup
import json
import threading
import mysql.connector
class scrap_handler:
def __init__(self, data):
# print("Scrap Handler Data Item => " + str(data))
mThread = threading.Thread(target=self.doScraping, args=([data]))
mThread.daemon = True # Set the thread as a daemon so it won't block the program from exiting
mThread.start()
def doScraping(self, data):
# print(data)
while True:
db_connection = mysql.connector.connect(
host="localhost",
user="root",
passwd="",
database="scrapiusdb"
)
db_cursor = db_connection.cursor(buffered=True)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.81 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9'
# Add more headers if needed
}
r = requests.get(data['url'],headers=headers)
soup = BeautifulSoup(r.content, 'html.parser')
m_attrs = {**data}; # ** is used to duplicate complete variable
url = data['url']
username = data['username']
m_attrs.pop("username")
m_attrs.pop("url")
print(url + " => " + str(len(soup.findAll(m_attrs["parent"]["type"], m_attrs["parent"]["atr"]))))
for eachItem in reversed(soup.findAll(m_attrs["parent"]["type"], m_attrs["parent"]["atr"])):
m_values = {};
for keys in m_attrs:
if keys == 'link':
mUrl = eachItem.find(m_attrs[keys]["type"], m_attrs[keys]["atr"]).get('href') if eachItem.find(
m_attrs[keys]["type"], m_attrs[keys]["atr"]) else ""
if mUrl != "" and not mUrl.startswith("http"):
parsed_url = urlparse(url)
mUrl = str(parsed_url.scheme + "://" + parsed_url.netloc) + mUrl
m_values[keys] = mUrl
elif keys == 'img':
# if url.startswith('https://www.livemint.com/technology') :
# print(str(eachItem.find(m_attrs[keys]["type"], m_attrs[keys]["atr"])))
chkUrl = str(eachItem.find(m_attrs[keys]["type"], m_attrs[keys]["atr"]).get('src'))
if not chkUrl.startswith('http') or chkUrl.endswith('.webp'):
chkUrl = eachItem.find(m_attrs[keys]["type"], m_attrs[keys]["atr"]).get('data-src')
if chkUrl is None:
chkUrl = str(eachItem.find(m_attrs[keys]["type"], m_attrs[keys]["atr"]).get('src'))
if not str(chkUrl).startswith('http'):
chkUrl = eachItem.find(m_attrs[keys]["type"], m_attrs[keys]["atr"]).get('data-lazy-src')
print(chkUrl)
m_values[keys] = chkUrl
elif keys != 'parent':
# print(str(eachItem.find(m_attrs[keys]["type"])))
m_values[keys] = eachItem.find(m_attrs[keys]["type"]).text.strip()
# print("My Value Heavding => " + m_values['heading'])
if m_values['heading'] != "" :
query = "Select * from scrapeddata where Site=(%s) AND user=(%s) AND title=(%s) Limit 1"
db_cursor.execute(query, (url, data['username'], m_values['heading']))
myresult = db_cursor.fetchall()
if len(myresult) == 0:
try:
query = "Insert into scrapeddata (Site, user, data, title) values (%s, %s, %s, %s)"
db_cursor.execute(query,(url, data['username'], json.dumps(m_values, indent=4), m_values['heading']))
db_connection.commit()
except Exception as e:
print(f"An error occurred: {e}")
print("username => " +data['username'])
print("heading => " + m_values['heading'])
time.sleep(30)
def restart_scraping_services():
userDir = os.listdir('userbase')
with open('queue.json', 'r') as file:
dataqueue = json.load(file)
for user in userDir:
with open('userbase/' + user + '/schema.json', 'r') as file:
userSchema = json.load(file)
for key in userSchema.keys():
userSchema[key]['url'] = key
userSchema[key]['username'] = user
dataqueue.append(userSchema[key])
with open('queue.json', 'w') as file:
json.dump(dataqueue, file, indent=4)
with open('configdata.json', 'r') as file:
configdata = json.load(file)
configdata["isRestarting"] = "false"
with open('configdata.json', 'w') as file:
json.dump(configdata, file, indent=4)
print("Restarted All Services")
def queuehandlerStarted():
print("Queue Handler Started")
# Create a thread that runs the function repeatedly
def queue_json_handler():
while True:
with open('configdata.json', 'r') as file:
configdata = json.load(file)
if configdata["isRestarting"] != "true":
with open('queue.json', 'r') as file:
dataqueue = json.load(file)
for dataItem in dataqueue:
scrap_handler(dataItem)
# handleQueueRequest()
dataqueue.clear()
with open('queue.json', 'w') as file:
json.dump(dataqueue, file, indent=4)
time.sleep(2)
# Create and start the thread
thread = threading.Thread(target=queue_json_handler)
thread.daemon = True # Set the thread as a daemon so it won't block the program from exiting
thread.start()
thread1 = threading.Thread(target=restart_scraping_services)
thread1.daemon = True
thread1.start()