-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.py
More file actions
129 lines (104 loc) · 3.74 KB
/
main.py
File metadata and controls
129 lines (104 loc) · 3.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/python
# -*- coding: iso-8859-15 -*-
# Aca se definen funciones para trabajar con multiples
# threads y Mongodb
from pymongo import MongoClient
from extrae import descarga_candidato, lock_print
import threading
import time
# Creamos el cliente de Mongodb solo una vez
# Por defecto se conecta a 'localhost' por el puerto 27017
_client = None
# Donde guardaremos la informacion en la base de datos
_database = "candidatos"
_collection = "candFiltrado"
def conectar_db(host="localhost", port=27017,
db=None, collection=None):
"""Crea un cliente de la base de datos Mongodb"""
global _client
_client = MongoClient(host, port)
if db:
global _database
_database = db
if collection:
global _collection
_collection = collection
def db_inserta(dato, contador=[0]):
"""Inserta un dato en la base de datos
La variable 'contador' se comporta como una variable estatica.
Nos permite limitar la cantidad de mensajes que imprimimos en
la terminal"""
db = _client[_database]
contador[0] += 1
collect = db[_collection]
item_id = collect.insert(dato)
with lock_print:
if not contador[0] % 20:
print "Insertado id: %d" % item_id
# Esto es un plagio, pero funciona
# http://stackoverflow.com/questions/13456735/
# how-to-wrap-a-python-iterator-to-make-it-thread-safe
class LockedIterator(object):
"""Iterador que soporta multithreading"""
def __init__(self, it):
self._lock = threading.Lock()
self._it = it.__iter__()
if hasattr(self._it, 'close'):
def close(self):
with self._lock:
self._it.close()
self.__setattr__('close', close)
def __iter__(self):
return self
def next(self):
with self._lock:
return self._it.next()
def genera_target(iter_get_params, foo_do, foo_done):
"""Devuelve una funcion para ser llamada por los threads
Los threads van a conseguir del iterador una lista de parametros
con los cuales llamaran a la funcion 'foo_do'. Despues, con el valor
de retorno de esta funcion llamaran a 'foo_done'"""
safe_iterable = LockedIterator(iter_get_params)
def target():
for item in safe_iterable:
res = foo_do(item)
foo_done(res)
with lock_print:
print "Hilo terminado"
return target
def genera_threads(n_threads, iter_get_params, foo_do, foo_done):
"""Construye y devuelve una lista de threads
'n_threads' es el numero de threads a crear
los otros parametros son con los cuales se construira la
funcion target"""
target = genera_target(iter_get_params, foo_do, foo_done)
return [threading.Thread(target=target)
for _ in range(n_threads)]
def descarga_varios(id_inicio, id_fin, n_threads=1, filtrar=True, tor=False):
"""Descargar un intervalo de candidatos utilizando varios
threads"""
if not _client:
conectar_db()
if id_inicio > id_fin:
iter_params = range(id_inicio, id_fin - 1, -1)
else:
iter_params = range(id_inicio, id_fin + 1)
kargs = {"n_threads": n_threads,
"iter_get_params": iter_params,
"foo_do": lambda x: descarga_candidato(x, filtrar, tor),
"foo_done": db_inserta}
lista_threads = genera_threads(**kargs)
for hilo in lista_threads:
hilo.setDaemon(True)
hilo.start()
while True:
try:
if not max([x.isAlive() for x in lista_threads]):
break
time.sleep(0.1)
except KeyboardInterrupt:
client.close()
return
if __name__ == "__main__":
# Descarga y filtra candidatos a la base de datos Mongodb
descarga_varios(1, 116826, 2)