-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
70 lines (58 loc) · 2.46 KB
/
main.py
File metadata and controls
70 lines (58 loc) · 2.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List
from hashlib import sha256
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import json, time, os
app = FastAPI(title="C-FROG Cloud API")
TENANTS, POLICIES = {}, {}
SECRETS = os.environ.get("C_FROG_SECRET","dev-secret")
def default_policy():
return {"nodes":["pii","medical","financial","general"],
"edges":[("general","financial"),("general","medical")],
"disallow":{"pii":["ssn","aadhaar","phone number","email"]},
"require":{"medical":["DISCLAIMER: Not medical advice."]},
"allow_sources":["ingested"]}
def ensure_tenant(t):
if t not in TENANTS: TENANTS[t] = {"docs":[],"tfidf":None,"mat":None}
if t not in POLICIES: POLICIES[t] = default_policy()
def build_index(t):
data = TENANTS[t]["docs"]
if not data:return
v = TfidfVectorizer(max_features=5000,ngram_range=(1,2))
m = v.fit_transform(data); TENANTS[t]["tfidf"],TENANTS[t]["mat"] = v,m
def retrieve(t,q,k=5):
v,m = TENANTS[t]["tfidf"],TENANTS[t]["mat"]
if v is None:return []
qv = v.transform([q]); sims = cosine_similarity(qv,m)[0]; idx = sims.argsort()[::-1][:k]
return [(TENANTS[t]["docs"][i], float(sims[i])) for i in idx]
def guard_answer(t,ans):
p = POLICIES[t]; issues = []
for kw in p["disallow"].get("pii",[]):
if kw.lower() in ans.lower(): issues.append(f"pii:{kw}")
return len(issues)==0, issues
def make_proof(p):
s = json.dumps(p, sort_keys=True).encode()
d = sha256(s).hexdigest(); sig = sha256((d+SECRETS).encode()).hexdigest()
return {"digest": d, "sig": sig, "ts": int(time.time())}
class IngestReq(BaseModel):
tenant_id:str; documents:List[str]
class AskReq(BaseModel):
tenant_id:str; query:str
@app.get("/")
def home(): return {"message":"Welcome to C-FROG Cloud API"}
@app.post("/ingest")
def ingest(r:IngestReq):
ensure_tenant(r.tenant_id)
TENANTS[r.tenant_id]["docs"].extend(r.documents)
build_index(r.tenant_id)
return {"ok":True,"count":len(TENANTS[r.tenant_id]["docs"])}
@app.post("/ask")
def ask(r:AskReq):
ensure_tenant(r.tenant_id)
hits = retrieve(r.tenant_id, r.query)
ans = " ".join([h[0] for h in hits]) if hits else "No data."
ok, issues = guard_answer(r.tenant_id, ans)
proof = make_proof({"tenant":r.tenant_id,"query":r.query,"ans":ans,"issues":issues})
return {"answer":ans,"policy_ok":ok,"issues":issues,"proof":proof}