-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathip_filter.py
More file actions
84 lines (71 loc) · 4.13 KB
/
ip_filter.py
File metadata and controls
84 lines (71 loc) · 4.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
from pathlib import Path
from tqdm import tqdm
from cmdargs import get_cmdargs
from firewalld import gen_zone_xml, add_firewalld_zone_file
from ip_loads import loads_ip, loads_json
from report import gen_report_into_file, gen_report_on_xpaste
def get_flat_list(sequence: list) -> list:
"""
Получить одномерный список из списка списков
:param sequence: Исходный список, который нужно преобразовать к одномерному
:return: Одномерный список
"""
return [element for subsequence in sequence for element in subsequence]
def get_ips_for_ban(processed_ip: dict, limit_of_requests: int) -> list:
"""
Получить список ip-адресов для блокировки, у которых количество запросов больше значения limit_of_requests
:param processed_ip: Словарь с данными, полученными при обработке входящего файла с ip-адресами
:param limit_of_requests: Лимит запросов
:return: Список ip-алресов, которые необходимо заблокировать
"""
ips_for_ban = []
for ips in processed_ip.values():
for ip, count_req in ips.items():
if count_req >= limit_of_requests:
ips_for_ban.append(ip)
return ips_for_ban
def main():
cmd_args = get_cmdargs()
processed_ip = {}
ip_file_path = cmd_args.get("ip_file_path")
ip_servers = cmd_args.get("ip_servers")
report_type = cmd_args.get("report")
limit_of_requests = 2000 # Лимит запросов для блокировки ip-адреса
# Если параметр -f был передан, обработать файл с ip-адресами.
if ip_file_path:
processed_ip = loads_ip("geoip2.mmdb/GeoLite2-ASN.mmdb", "geoip2.mmdb/GeoLite2-Country.mmdb", ip_file_path)
# Если нет, то в качестве данных использовать json-файл, сформированный при предыдущей обработке.
# Если и его не будет, то скрипт не совершит никакой полезной работы.
else:
if Path("dump.json").exists():
processed_ip = loads_json("dump.json")
# Если параметр -s со списком серверов был передан и есть данные для обработки
if ip_servers and processed_ip:
# Получить список ip-адресов, которые необходимо заблокировать
ips_for_ban = get_ips_for_ban(processed_ip, limit_of_requests)
# Передать список ip-адресов для формирования xml-файла зоны firewalld
gen_zone_xml(ips_for_ban)
# Обойти сервера и скопировать на них xml-файл зоны firewalld
print("Подключаемся к серверам...")
for ip_srv in get_flat_list(ip_servers):
stdout, stderr = add_firewalld_zone_file(local_filename="ip-filter.xml", ssh_host=ip_srv, ssh_port=22)
print(f"result: {stdout}")
print(f"errors: {stderr}")
else:
print("Команды для firewalld не были переданы...")
# Если передан параметр -r и есть данные для отчёта
if report_type and processed_ip:
if report_type == "xpaste":
# Распечатать ссылки на заметки xpaste
print("Генерируем отчёт на xpaste...")
for link in gen_report_on_xpaste(processed_ip):
print(link)
elif report_type == "file":
print("Генерирует отчёт в файл...")
gen_report_into_file(processed_ip, "report.txt")
else:
print("Отчёт не был сформирован...")
else:
print("Отчёт не был сформирован...")
if __name__ == "__main__":
main()