-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy pathupload.py
More file actions
315 lines (271 loc) · 12.5 KB
/
upload.py
File metadata and controls
315 lines (271 loc) · 12.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
import argparse
import importlib.util
import os
import re
import sys
from pathlib import Path
from typing import Any, Dict
from eval_protocol.auth import get_fireworks_api_key
from eval_protocol.platform_api import create_or_update_fireworks_secret
from eval_protocol.evaluation import create_evaluation
from .utils import (
_build_entry_point,
_build_evaluator_dashboard_url,
_discover_and_select_tests,
_discover_tests,
_ensure_account_id,
_normalize_evaluator_id,
_prompt_select,
)
def _to_pyargs_nodeid(file_path: str, func_name: str) -> str | None:
"""Attempt to build a pytest nodeid suitable for `pytest <nodeid>`.
Preference order:
1) Dotted package module path with double-colon: pkg.subpkg.module::func
2) Filesystem path with double-colon: path/to/module.py::func
Returns dotted form when package root can be inferred (directory chain with __init__.py
leading up to a directory contained in sys.path). Returns None if no reasonable
nodeid can be created (should be rare).
"""
try:
abs_path = os.path.abspath(file_path)
dir_path, filename = os.path.split(abs_path)
module_base, ext = os.path.splitext(filename)
if ext != ".py":
# Not a python file
return None
# Walk up while packages have __init__.py
segments: list[str] = [module_base]
current = dir_path
package_root = None
while True:
if os.path.isfile(os.path.join(current, "__init__.py")):
segments.insert(0, os.path.basename(current))
parent = os.path.dirname(current)
# Stop if parent is not within current sys.path import roots
if parent == current:
break
current = parent
else:
package_root = current
break
# If we found a package chain, check that the package_root is importable (in sys.path)
if package_root and any(
os.path.abspath(sp).rstrip(os.sep) == os.path.abspath(package_root).rstrip(os.sep) for sp in sys.path
):
dotted = ".".join(segments)
return f"{dotted}::{func_name}"
# Do not emit a dotted top-level module for non-packages; prefer path-based nodeid
# Fallback to relative path (if under cwd) or absolute path
cwd = os.getcwd()
try:
rel = os.path.relpath(abs_path, cwd)
except Exception:
rel = abs_path
return f"{rel}::{func_name}"
except Exception:
return None
def _parse_entry(entry: str, cwd: str) -> tuple[str, str]:
# Accept module::function, path::function, or legacy module:function
entry = entry.strip()
if "::" in entry:
target, func = entry.split("::", 1)
# Determine if target looks like a filesystem path; otherwise treat as module path
looks_like_path = (
"/" in target or "\\" in target or target.endswith(".py") or os.path.exists(os.path.join(cwd, target))
)
if looks_like_path:
abs_path = os.path.abspath(os.path.join(cwd, target))
return abs_path, func
else:
# Treat as module path for --pyargs style
return target, func
elif ":" in entry:
# Legacy support: module:function → convert to module path + function
module, func = entry.split(":", 1)
return module, func
else:
raise ValueError("--entry must be in 'module::function', 'path::function', or 'module:function' format")
def _resolve_entry_to_qual_and_source(entry: str, cwd: str) -> tuple[str, str]:
target, func = _parse_entry(entry, cwd)
# Determine the file path to load
if "/" in target or "\\" in target or os.path.exists(target):
# It's a file path - convert to absolute
if not os.path.isabs(target):
target = os.path.abspath(os.path.join(cwd, target))
if not target.endswith(".py"):
target = target + ".py"
if not os.path.isfile(target):
raise ValueError(f"File not found: {target}")
source_file_path = target
else:
# Treat dotted name as a file path
dotted_as_path = target.replace(".", "/") + ".py"
source_file_path = os.path.join(cwd, dotted_as_path)
# Load the module from the file path
spec = importlib.util.spec_from_file_location(Path(source_file_path).stem, source_file_path)
if not spec or not spec.loader:
raise ValueError(f"Unable to load module from path: {source_file_path}")
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module) # type: ignore[attr-defined]
module_name = spec.name
if not hasattr(module, func):
raise ValueError(f"Function '{func}' not found in module '{module_name}'")
qualname = f"{module_name}.{func}"
return qualname, os.path.abspath(source_file_path) if source_file_path else ""
def _load_secrets_from_env_file(env_file_path: str) -> Dict[str, str]:
"""
Load secrets from a .env file that should be uploaded to Fireworks.
Returns a dictionary of secret key-value pairs that contain 'API_KEY' in the name.
"""
if not os.path.exists(env_file_path):
return {}
# Load the .env file into a temporary environment
env_vars = {}
with open(env_file_path, "r") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and "=" in line:
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'") # Remove quotes
env_vars[key] = value
# Filter for secrets that look like API keys
secrets = {}
for key, value in env_vars.items():
if "API_KEY" in key.upper() and value:
secrets[key] = value
return secrets
def _mask_secret_value(value: str) -> str:
"""
Return a masked representation of a secret showing only a small prefix/suffix.
Example: fw_3Z*******Xgnk
"""
try:
if not isinstance(value, str) or not value:
return "<empty>"
prefix_len = 6
suffix_len = 4
if len(value) <= prefix_len + suffix_len:
return value[0] + "***" + value[-1]
return f"{value[:prefix_len]}***{value[-suffix_len:]}"
except Exception:
return "<masked>"
def upload_command(args: argparse.Namespace) -> int:
root = os.path.abspath(getattr(args, "path", "."))
entries_arg = getattr(args, "entry", None)
non_interactive: bool = bool(getattr(args, "yes", False))
if entries_arg:
# Only support single entry, not comma-separated values
entry = entries_arg.strip()
if "," in entry:
print("Error: --entry only supports uploading one evaluator at a time.")
print("Please specify a single entry in the format: module::function or path::function")
return 1
qualname, resolved_path = _resolve_entry_to_qual_and_source(entry, root)
selected_specs: list[tuple[str, str]] = [(qualname, resolved_path)]
else:
selected_tests = _discover_and_select_tests(root, non_interactive=non_interactive)
if not selected_tests:
return 1
# Enforce single selection
if len(selected_tests) > 1:
print(f"Error: Multiple tests selected ({len(selected_tests)}), but only one can be uploaded at a time.")
print("Please select exactly one test to upload.")
return 1
# Warn about parameterized tests
if selected_tests[0].has_parametrize:
print("\nNote: This parameterized test will be uploaded as a single evaluator that")
print(" handles all parameter combinations. The evaluator will work with")
print(" the same logic regardless of which model/parameters are used.")
selected_specs = [(selected_tests[0].qualname, selected_tests[0].file_path)]
base_id = getattr(args, "id", None)
display_name = getattr(args, "display_name", None)
description = getattr(args, "description", None)
force = bool(getattr(args, "force", False))
env_file = getattr(args, "env_file", None)
# Load secrets from .env file and ensure they're available on Fireworks
try:
fw_account_id = _ensure_account_id()
# Determine .env file path
if env_file:
env_file_path = env_file
else:
env_file_path = os.path.join(root, ".env")
# Load secrets from .env file
secrets_from_file = _load_secrets_from_env_file(env_file_path)
secrets_from_env_file = secrets_from_file.copy() # Track what came from .env file
# Also consider FIREWORKS_API_KEY from environment, but prefer .env value
fw_api_key_value = get_fireworks_api_key()
if fw_api_key_value and "FIREWORKS_API_KEY" not in secrets_from_file:
secrets_from_file["FIREWORKS_API_KEY"] = fw_api_key_value
if fw_account_id and secrets_from_file:
print(f"Found {len(secrets_from_file)} API keys to upload as Fireworks secrets...")
if secrets_from_env_file and os.path.exists(env_file_path):
print(f"Loading secrets from: {env_file_path}")
for secret_name, secret_value in secrets_from_file.items():
source = ".env" if secret_name in secrets_from_env_file else "environment"
print(
f"Ensuring {secret_name} is registered as a secret on Fireworks for rollout... "
f"({source}: {_mask_secret_value(secret_value)})"
)
if create_or_update_fireworks_secret(
account_id=fw_account_id,
key_name=secret_name,
secret_value=secret_value,
):
print(f"✓ {secret_name} secret created/updated on Fireworks.")
else:
print(f"Warning: Failed to create/update {secret_name} secret on Fireworks.")
else:
if not fw_account_id:
print(
"Warning: Could not resolve Fireworks account id from FIREWORKS_API_KEY; cannot register secrets."
)
if not secrets_from_file:
print("Warning: No API keys found in environment or .env file; no secrets to register.")
except Exception as e:
print(f"Warning: Skipped Fireworks secret registration due to error: {e}")
# selected_specs is guaranteed to have exactly 1 item at this point
qualname, source_file_path = selected_specs[0]
# Generate evaluator ID
if base_id:
evaluator_id = base_id
else:
# Extract just the test function name from qualname
test_func_name = qualname.split(".")[-1]
# Extract source file name (e.g., "test_gpqa.py" -> "test_gpqa")
if source_file_path:
source_file_name = Path(source_file_path).stem
else:
source_file_name = "eval"
# Create a shorter ID: filename-testname
evaluator_id = f"{source_file_name}-{test_func_name}"
# Normalize the evaluator ID to meet Fireworks requirements
evaluator_id = _normalize_evaluator_id(evaluator_id)
# Compute entry point metadata for backend as a pytest nodeid usable with `pytest <entrypoint>`
# Always prefer a path-based nodeid to work in plain pytest environments (server may not use --pyargs)
func_name = qualname.split(".")[-1]
entry_point = _build_entry_point(root, source_file_path, func_name)
print(f"\nUploading evaluator '{evaluator_id}' for {qualname.split('.')[-1]}...")
try:
test_dir = root
metric_name = os.path.basename(test_dir) or "metric"
result = create_evaluation(
evaluator_id=evaluator_id,
metric_folders=[f"{metric_name}={test_dir}"],
display_name=display_name or evaluator_id,
description=description or f"Evaluator for {qualname}",
force=force,
entry_point=entry_point,
)
name = result.get("name", evaluator_id) if isinstance(result, dict) else evaluator_id
# Print success message with Fireworks dashboard link
print(f"\n✅ Successfully uploaded evaluator: {evaluator_id}")
print("📊 View in Fireworks Dashboard:")
dashboard_url = _build_evaluator_dashboard_url(evaluator_id)
print(f" {dashboard_url}\n")
return 0
except Exception as e:
print(f"Failed to upload {qualname}: {e}")
return 2