-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathannotate.py
More file actions
176 lines (144 loc) · 5.79 KB
/
annotate.py
File metadata and controls
176 lines (144 loc) · 5.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import sys
import os.path
import glob
import gzip
import re
import yaml
import duckdb
import utils
import duck_utils
verbose = 1
duck_utils.init_duckdb_httpfs(verbose=verbose)
# Parse command line arguments
if len(sys.argv) < 2:
raise ValueError("At least database_yaml argument is required")
database_yaml = sys.argv[1]
# Validate database_yaml path exists
if not os.path.exists(database_yaml):
raise FileNotFoundError(f"Database YAML not found: {database_yaml}")
# Load database configuration
with open(database_yaml, 'r', encoding='utf8') as fd:
config = yaml.safe_load(fd)
left_db = duck_utils.db_config(config, verbose=verbose)
# Process remaining arguments: find valid YAML paths for joins and action
join_yamls = []
action_yaml = None
passthrough_args = []
for i, arg in enumerate(sys.argv[2:], start=2):
if os.path.exists(arg) and arg.endswith(('.yaml', '.yml')):
if action_yaml is None:
join_yamls.append(arg)
else:
# Already found action_yaml, so this and rest are passthrough
passthrough_args.append(arg)
passthrough_args.extend(sys.argv[i+1:])
break
else:
# First non-existent/non-YAML path: previous valid path becomes action_yaml
if join_yamls:
action_yaml = join_yamls.pop()
passthrough_args.append(arg)
passthrough_args.extend(sys.argv[i+1:])
break
else:
# All remaining args were valid YAML paths, last one is action_yaml
if join_yamls:
action_yaml = join_yamls.pop()
# Backward compatibility: if we have exactly 2 YAML args after database (old behavior)
# treat first as join_yaml and second as action_yaml
if not join_yamls and not action_yaml and len(sys.argv) >= 4:
# Old style: database_yaml join_yaml action_yaml [args...]
if os.path.exists(sys.argv[2]) and os.path.exists(sys.argv[3]):
join_yamls = [sys.argv[2]]
action_yaml = sys.argv[3]
passthrough_args = sys.argv[4:]
# Load all join table YAMLs and perform joins
current_view = 'left_db'
for idx, join_yaml in enumerate(join_yamls):
with open(join_yaml, 'r', encoding='utf8') as fd:
config = yaml.safe_load(fd)
if verbose:
print(f'right db config: {config}')
# Use a unique name for each right table
right_view = f'right_db_{idx}'
exec(f'{right_view} = duck_utils.db_config(config, verbose=verbose)')
prefix = config.get('prefix', '')
def q(col):
return f'"{col}"'
if prefix:
rcols = ', '.join(f'{right_view}.{q(col)} AS {q(prefix + col)}' for col in config['right_columns'])
else:
rcols = ', '.join(f'{right_view}.{q(col)}' for col in config['right_columns'])
join_columns = config['join_columns']
if isinstance(join_columns, dict):
left_cols = join_columns['left'] if isinstance(join_columns['left'], list) else [join_columns['left']]
right_cols = join_columns['right'] if isinstance(join_columns['right'], list) else [join_columns['right']]
jcols = ' AND '.join(f'{current_view}.{q(lc)} = {right_view}.{q(rc)}' for lc, rc in zip(left_cols, right_cols))
else:
jcols = ' AND '.join(f'{current_view}.{q(jc)} = {right_view}.{q(jc)}' for jc in join_columns)
# Determine join type from YAML (default to LEFT OUTER for backward compatibility)
join_type = config.get('join_type', 'OUTER').upper()
if join_type == 'INNER':
join_clause = 'INNER JOIN'
elif join_type == 'OUTER':
join_clause = 'LEFT OUTER JOIN'
else:
raise ValueError(f"Invalid join_type '{join_type}'. Must be 'INNER' or 'OUTER'")
# Create intermediate view for chaining
next_view = 'joined' if idx == len(join_yamls) - 1 else f'join_step_{idx}'
view_sql = '''\
CREATE OR REPLACE VIEW {next_view} AS
SELECT {current_view}.*,
{rcols}
FROM {current_view}
{join_clause} {right_view}
ON ({jcols})
'''
sql = view_sql.format(next_view=next_view, current_view=current_view, right_view=right_view, rcols=rcols, jcols=jcols, join_clause=join_clause)
if verbose:
print('view sql is:\n')
print(sql)
duckdb.sql(sql)
# Update current_view for next iteration
current_view = next_view
# Load action configuration (if exists)
if action_yaml:
with open(action_yaml, 'r', encoding='utf8') as fd:
action = yaml.safe_load(fd)
else:
raise ValueError("No action YAML file provided")
# Handle passthrough arguments
argvs = passthrough_args if passthrough_args else ['']
# Validate argv requirement
if not passthrough_args and 'argv' in action:
raise ValueError('configuration requires an argument')
for argv in argvs:
and_tld = ''
# Create arg1, arg2, arg3, ... variables for use in action yaml
arg_vars = {f'arg{i+1}': arg for i, arg in enumerate(argvs)}
if 'argv' in action and 'surt_host_name' in action['argv']:
argv = utils.thing_to_surt_host_name(argv.rstrip())
tld = argv.split(',', 1)[0]
and_tld = f" AND url_host_tld = '{tld}'"
limits = action.get('limits', {}) or {}
limit_count = limits.get('count')
sql = action['sql'].format(
columns=action['columns'],
where=action['where'].format(argv=argv, and_tld=and_tld, **arg_vars),
)
if limit_count:
sql = re.sub(r'(?i)\s+LIMIT\s+\d+\s*;?\s*$', '', sql)
sql = sql.rstrip().rstrip(';')
sql += f'\nLIMIT {int(limit_count)}'
if verbose:
print('action sql is:\n')
print(sql)
result = duckdb.sql(sql)
if verbose:
print('result is', result.shape[0], 'rows')
if isinstance(argv, str):
filename = argv + '.csv' if argv else 'output.csv'
print('writing', filename)
duckdb.sql(f"COPY ({sql}) TO '{filename}' (FORMAT CSV, HEADER)")
else:
duckdb.sql(f"COPY ({sql}) TO '/dev/stdout' (FORMAT CSV, HEADER)")