-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathurl-category-v2.py
More file actions
executable file
·307 lines (253 loc) · 10.1 KB
/
url-category-v2.py
File metadata and controls
executable file
·307 lines (253 loc) · 10.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
#!/usr/bin/env python
# coding: utf-8
# What does this script do:
# It's a bulk URL/domain/IP blocklist uploader for a web filtering category (e.g. "Advanced Safe Browsing") via GraphQL.
#
# Here's the flow:
# 1. Parse input — reads a blocklist from a local file (--file) or URL (--url). Each line is parsed using regex to extract domains (plain or wildcard), IPv4 addresses, or CIDR subnets. Lines starting with # or ! are skipped. It prints a summary of parsed entries.
# 2. Authenticate — gets an auth token via getToken(apiKey, apiSecret).
# 3. Query the web category — uses a GraphQL query (listWebCategorys) to find a category by name (default: "Advanced Safe Browsing") and retrieves its current includeList and excludeList.
# 4. Update the list — replaces the category's includeList or excludeList (chosen by --list-type) with the parsed blocklist (capped at 5000 entries), then writes it back via a updateWebCategory GraphQL mutation.
# 5. Verify — compares the returned list from the mutation result against what was sent to confirm the update succeeded.
from lib.common import API_KEY
from lib.common import API_SECRET
from lib.common import API_HOST
from lib.common import getToken
from lib.common import booleanString
import ipaddress
import argparse
import urllib
import json
import os
import re
from enum import Enum
from dotenv import load_dotenv
from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport
# Regex for standard domain: "example.com" or "sub-domain.co.uk"
DOMAIN_REGEX = re.compile(
r'([A-Za-z0-9-]+\.)+([A-Za-z]{2,})+'
)
# Regex for wildcard domain: "*.example.com"
WILDCARD_DOMAIN_REGEX = re.compile(
r'\*\.([A-Za-z0-9-]+\.)*([A-Za-z]{2,})+'
)
# Regex for IPv4 addresses: "192.168.0.1", "255.255.255.255", etc.
IPV4_REGEX = re.compile(
r'(?:25[0-5]|2[0-4]\d|[01]?\d?\d)'
r'(?:\.(?:25[0-5]|2[0-4]\d|[01]?\d?\d)){3}'
)
# Regex for IPv4 subnet / CIDR
IPV4NET_REGEX = re.compile(
r'(?:25[0-5]|2[0-4]\d|[01]?\d?\d)'
r'(?:\.(?:25[0-5]|2[0-4]\d|[01]?\d?\d)){3}/'
r'(?:[0-9]|[12]\d|3[0-2])'
)
class entrytype(Enum):
Unknown = 0
IPv4 = 1
CIDR = 2
Domain = 3
def gqlinit(idToken):
transport = AIOHTTPTransport(url=f'{API_HOST}/graphql', headers={'Authorization': f'Bearer {idToken}'})
client = Client(transport=transport, fetch_schema_from_transport=False)
return client
def gqlexec(client, query, variables):
result = client.execute(query, variable_values=variables)
return result
def block_entry_type(candidate: str) -> entrytype:
# is it IPv4 addresses
try:
ipaddress.ip_address(candidate)
return entrytype.IPv4
except ValueError:
pass
# is it v4 subnet
try:
# Use IPv4Network. `strict=False` allows host bits to be set (e.g., "192.168.0.1/24")
ipaddress.IPv4Network(candidate, strict=False)
return entrytype.CIDR
except ValueError:
pass
# is it domains
if DOMAIN_REGEX.match(candidate) or WILDCARD_DOMAIN_REGEX.match(candidate):
return entrytype.Domain
return entrytype.Unknown
def get_longest_match_in_line(line: str) -> str:
"""
Searches the line for any standard domain, wildcard domain, or IPv4 address.
Returns the longest match found, or None if there are no matches.
"""
all_matches = []
# Find all domain matches
all_matches.extend(m.group(0) for m in DOMAIN_REGEX.finditer(line))
# Find all wildcard domain matches
all_matches.extend(m.group(0) for m in WILDCARD_DOMAIN_REGEX.finditer(line))
# Find all IPv4 address matches
all_matches.extend(m.group(0) for m in IPV4_REGEX.finditer(line))
# Find all IPv4 subnet matches
all_matches.extend(m.group(0) for m in IPV4NET_REGEX.finditer(line))
if not all_matches:
return None
# Pick the longest matched string by character length
return max(all_matches, key=len)
def read_domains_and_extract_longest(filepath: str, url: str) -> list:
"""
This is the main parsing function.
Reads each line from the given file, skipping empty lines and lines that start
with '#', extracts the longest valid domain match if present, and returns
a list of those matches (or None for lines with no match).
"""
lines_total =0
lines_skipped = 0
lines_parsed = 0
lines_failed = 0
count_domain = 0
count_ipv4 = 0
count_cidr = 0
results = []
print()
print("Parsing:")
# open input stream from file or url
stream = open(filepath, 'r') if len(filepath)>0 else urllib.request.urlopen(url)
with stream:
for rawline in stream:
lines_total += 1
if isinstance(rawline, bytes):
rawline = rawline.decode("utf-8")
line = rawline.strip()
# Skip empty lines or lines starting with '#' or '!'
if not line or line.startswith('#') or line.startswith('!'):
lines_skipped += 1
continue
longest_match = get_longest_match_in_line(line)
if not longest_match or longest_match.strip() == '':
print(f"Failed to parse: {line}")
lines_failed += 1
continue
candidate = longest_match.strip()
lines_parsed += 1
### Now check the type
type = block_entry_type(candidate)
if type == entrytype.CIDR:
count_cidr += 1
elif type == entrytype.IPv4:
count_ipv4 += 1
elif type == entrytype.Domain:
count_domain += 1
# now add this new result
results.append(candidate)
print(f"Total lines: {lines_total}")
print(f"Skipped lines: {lines_skipped}")
print(f"Success lines: {lines_parsed}")
print(f"Failed lines: {lines_failed}")
print(f" IPv4 addrs: {count_ipv4}")
print(f" IPv4subnet: {count_cidr}")
print(f" Domains: {count_domain}")
print()
return results
def update_blocklist(client: Client, blocklist: list, category_name: str, list_type: str):
# query web category
query = gql(
"""
fragment WebCategoryFields on WebCategory {
id
name
description
usage
dynamicURLCategories
preDefinedVariant
excludeList
includeList
}
query ListWebCategorys( $namefilter: String! ) {
listWebCategorys (filter: { name: { eq: $namefilter } }) {
items {
...WebCategoryFields
}
total
}
}
"""
)
# query the pre-defined block category
variables = { "namefilter": category_name }
result = gqlexec(client, query, variables)
webcat_list = result['listWebCategorys']
if not webcat_list['items']:
raise ValueError(f"Category '{category_name}' not found")
blockcat = webcat_list['items'][0]
# modify the specified list of this block category
if list_type == 'include':
blockcat['includeList'] = blocklist
print(f"new include list size: {len(blocklist)}")
else: # exclude
blockcat['excludeList'] = blocklist
print(f"new exclude list size: {len(blocklist)}")
# write back the changed object and check the new value
update = gql(
"""
fragment WebCategoryFields on WebCategory {
id
name
description
usage
dynamicURLCategories
preDefinedVariant
excludeList
includeList
}
mutation UpdateWebCategory($input: UpdateWebCategoryInput!) {
updateWebCategory(input: $input) {
...WebCategoryFields
}
}
"""
)
variables = { "input": blockcat }
result = gqlexec(client, update, variables)
new_webcat = result['updateWebCategory']
# verify that the updated block list is the same as the one we set
newlist = new_webcat['includeList'] if list_type == 'include' else new_webcat['excludeList']
print(f"Update result matches: {newlist == blocklist}")
# If you want to see the updated object, uncomment the line below
# result_string = json.dumps(new_webcat, indent=4)
# print(result_string)
def main(argsdict):
# The order to searching for API key:
# environment variables,
# the --apiKey command line argument,
# the --env argument.
# if more than one is provided, the later option will override option checked earlier.
apiKey = argsdict['apiKey']
apiSecret = argsdict['apiSecret']
env_path = argsdict['env_path']
if env_path and env_path != '':
load_dotenv(env_path)
apiKey = os.environ.get('apiKey', apiKey)
apiSecret = os.environ.get('apiSecret', apiSecret)
# parse input file
blocklist = read_domains_and_extract_longest(argsdict['file'], argsdict['url'])
# acquire auth token
idToken = getToken(apiKey=apiKey, apiSecret=apiSecret)
# init GraphQL client
client = gqlinit(idToken)
# take action
# note we have size limit of up to 5000
update_blocklist(client, blocklist[:5000], argsdict['category'], argsdict['list_type'])
return
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Change the default block list')
# auth block
parser.add_argument('--env', dest='env_path', type=str, default='', required=False, help='Path to the credential file in dotenv format')
parser.add_argument('--apiKey', dest='apiKey', type=str, default=API_KEY, required=False, help='API key if not set in environment')
parser.add_argument('--apiSecret', dest='apiSecret', type=str, default=API_SECRET, required=False, help='API secret if not set in environment')
# input block
input_group = parser.add_mutually_exclusive_group(required=True)
input_group.add_argument('--file', dest='file', type=str, default='', help='Path to the file containing the block list')
input_group.add_argument('--url', dest='url', type=str, default='', help='HTTP/HTTPS URL to the block list')
# category block
parser.add_argument('--category', dest='category', type=str, default='Advanced Safe Browsing', help='Name of the category to update')
parser.add_argument('--list-type', dest='list_type', type=str, choices=['include', 'exclude'], default='include', help='Which list to update: include (default) or exclude')
args = parser.parse_args()
main(vars(args))