|
5 | 5 | from agentscore_commerce.identity import ( |
6 | 6 | AGENTSCORE_UCP_CAPABILITY, |
7 | 7 | AssessResult, |
| 8 | + OperatorVerification, |
8 | 9 | UCPCapability, |
9 | 10 | UCPPaymentHandler, |
10 | 11 | UCPService, |
@@ -179,3 +180,133 @@ def test_coerces_null_verified_at_to_none() -> None: |
179 | 180 |
|
180 | 181 | def test_coerces_empty_string_verified_at_to_none() -> None: |
181 | 182 | assert _claims_of({"verified_at": ""})["verified_at"] is None |
| 183 | + |
| 184 | + |
| 185 | +# Typed-field fallback: production callers populate `data.raw`, but a |
| 186 | +# hand-constructed AssessResult (no raw) should still surface the operator |
| 187 | +# verification block via the typed `AssessResult.operator_verification` field |
| 188 | +# and the (optional) `account_verification` attribute. Mirrors the node sibling's |
| 189 | +# typed-field read path. |
| 190 | + |
| 191 | + |
| 192 | +def test_typed_operator_verification_fallback_when_raw_is_none() -> None: |
| 193 | + result = AssessResult( |
| 194 | + allow=True, |
| 195 | + resolved_operator="op_typed", |
| 196 | + operator_verification=OperatorVerification( |
| 197 | + level="enhanced", |
| 198 | + operator_type="api", |
| 199 | + verified_at="2026-04-01T00:00:00Z", |
| 200 | + ), |
| 201 | + raw=None, |
| 202 | + ) |
| 203 | + profile = build_ucp_profile(**_base_kwargs(), data=result) |
| 204 | + d = profile.to_dict() |
| 205 | + cap = next(c for c in d["capabilities"] if c["name"] == AGENTSCORE_UCP_CAPABILITY) |
| 206 | + claims = cap["claims"] |
| 207 | + assert claims["operator_id"] == "op_typed" |
| 208 | + assert claims["kyc_level"] == "enhanced" |
| 209 | + assert claims["verified_at"] == "2026-04-01T00:00:00Z" |
| 210 | + |
| 211 | + |
| 212 | +def test_typed_account_verification_fallback_via_setattr() -> None: |
| 213 | + # `AssessResult` doesn't declare `account_verification` as a typed field, but |
| 214 | + # a caller can still attach one ad-hoc. The fallback reads it via getattr so |
| 215 | + # parity with the node sibling holds whichever way the caller populates it. |
| 216 | + result = AssessResult( |
| 217 | + allow=True, |
| 218 | + resolved_operator="op_typed", |
| 219 | + operator_verification=OperatorVerification(level="verified"), |
| 220 | + raw=None, |
| 221 | + ) |
| 222 | + result.account_verification = { # type: ignore[attr-defined] |
| 223 | + "kyc_level": "verified", |
| 224 | + "age_bracket": "21+", |
| 225 | + "jurisdiction": "US", |
| 226 | + "sanctions_clear": True, |
| 227 | + } |
| 228 | + profile = build_ucp_profile(**_base_kwargs(), data=result) |
| 229 | + d = profile.to_dict() |
| 230 | + cap = next(c for c in d["capabilities"] if c["name"] == AGENTSCORE_UCP_CAPABILITY) |
| 231 | + claims = cap["claims"] |
| 232 | + assert claims["kyc_level"] == "verified" |
| 233 | + assert claims["age_bracket"] == "21+" |
| 234 | + assert claims["jurisdiction"] == "US" |
| 235 | + assert claims["sanctions_clear"] is True |
| 236 | + |
| 237 | + |
| 238 | +def test_raw_takes_precedence_over_typed_fallback() -> None: |
| 239 | + # When raw carries `operator_verification`, the typed-field fallback is NOT |
| 240 | + # consulted. Production callers populate raw and the typed fields stay |
| 241 | + # in sync; this test pins the precedence so a typed mismatch can't silently |
| 242 | + # override the raw payload. |
| 243 | + result = AssessResult( |
| 244 | + allow=True, |
| 245 | + resolved_operator="op_xyz", |
| 246 | + operator_verification=OperatorVerification(level="enhanced"), |
| 247 | + raw={ |
| 248 | + "operator_verification": {"level": "verified"}, |
| 249 | + "account_verification": {"kyc_level": "verified"}, |
| 250 | + }, |
| 251 | + ) |
| 252 | + profile = build_ucp_profile(**_base_kwargs(), data=result) |
| 253 | + cap = next(c for c in profile.capabilities if c.name == AGENTSCORE_UCP_CAPABILITY) |
| 254 | + # `kyc_level` reads from raw account_verification.kyc_level first. |
| 255 | + assert cap.extras["claims"]["kyc_level"] == "verified" |
| 256 | + |
| 257 | + |
| 258 | +# Per-element to_dict reserved-key collision guard. Mirrors the parent |
| 259 | +# UCPProfile.to_dict guard so vendor extras can't silently overwrite a canonical |
| 260 | +# field on UCPService / UCPCapability / UCPSigningKey via `out.update(extras)`. |
| 261 | + |
| 262 | + |
| 263 | +def test_ucp_service_extras_collision_with_type_rejected() -> None: |
| 264 | + svc = UCPService(type="rest", extras={"type": "different"}) |
| 265 | + with pytest.raises(ValueError, match=r"UCPService\.extras key 'type' collides"): |
| 266 | + svc.to_dict() |
| 267 | + |
| 268 | + |
| 269 | +def test_ucp_service_extras_collision_with_url_rejected() -> None: |
| 270 | + svc = UCPService(type="rest", url="https://x.example", extras={"url": "https://attacker.example"}) |
| 271 | + with pytest.raises(ValueError, match=r"UCPService\.extras key 'url' collides"): |
| 272 | + svc.to_dict() |
| 273 | + |
| 274 | + |
| 275 | +def test_ucp_service_extras_non_reserved_pass_through() -> None: |
| 276 | + svc = UCPService(type="rest", url="https://x.example", extras={"region": "us-west-1"}) |
| 277 | + assert svc.to_dict() == {"type": "rest", "url": "https://x.example", "region": "us-west-1"} |
| 278 | + |
| 279 | + |
| 280 | +def test_ucp_capability_extras_collision_with_name_rejected() -> None: |
| 281 | + cap = UCPCapability(name="checkout", extras={"name": "different"}) |
| 282 | + with pytest.raises(ValueError, match=r"UCPCapability\.extras key 'name' collides"): |
| 283 | + cap.to_dict() |
| 284 | + |
| 285 | + |
| 286 | +def test_ucp_capability_extras_collision_with_schema_rejected() -> None: |
| 287 | + cap = UCPCapability(name="checkout", schema="https://x/y", extras={"schema": "https://attacker"}) |
| 288 | + with pytest.raises(ValueError, match=r"UCPCapability\.extras key 'schema' collides"): |
| 289 | + cap.to_dict() |
| 290 | + |
| 291 | + |
| 292 | +def test_ucp_capability_extras_non_reserved_pass_through() -> None: |
| 293 | + cap = UCPCapability(name="checkout", extras={"claims": {"k": "v"}}) |
| 294 | + assert cap.to_dict() == {"name": "checkout", "claims": {"k": "v"}} |
| 295 | + |
| 296 | + |
| 297 | +def test_ucp_signing_key_extras_collision_with_kid_rejected() -> None: |
| 298 | + sk = UCPSigningKey(kid="me", kty="EC", extras={"kid": "attacker"}) |
| 299 | + with pytest.raises(ValueError, match=r"UCPSigningKey\.extras key 'kid' collides"): |
| 300 | + sk.to_dict() |
| 301 | + |
| 302 | + |
| 303 | +def test_ucp_signing_key_extras_collision_with_kty_rejected() -> None: |
| 304 | + sk = UCPSigningKey(kid="me", kty="EC", extras={"kty": "RSA"}) |
| 305 | + with pytest.raises(ValueError, match=r"UCPSigningKey\.extras key 'kty' collides"): |
| 306 | + sk.to_dict() |
| 307 | + |
| 308 | + |
| 309 | +def test_ucp_signing_key_extras_non_reserved_pass_through() -> None: |
| 310 | + sk = UCPSigningKey(kid="me", kty="EC", alg="ES256", crv="P-256", extras={"x": "abc", "y": "def"}) |
| 311 | + out = sk.to_dict() |
| 312 | + assert out == {"kid": "me", "kty": "EC", "alg": "ES256", "crv": "P-256", "x": "abc", "y": "def"} |
0 commit comments