From 7428bd83a98e54df08987217c79428b3fb6f0f72 Mon Sep 17 00:00:00 2001 From: Amit Singh Date: Mon, 18 May 2026 16:59:18 +0530 Subject: [PATCH 1/3] refactor: simplifies source ingestion logic Signed-off-by: Amit Singh --- oapi.yaml | 24 +++ scripts/helper.py | 42 +++++ scripts/ingest_sources.py | 170 ++++++++++++++++++++ scripts/openrouter.py | 328 +++++++------------------------------- 4 files changed, 290 insertions(+), 274 deletions(-) create mode 100644 scripts/helper.py create mode 100644 scripts/ingest_sources.py diff --git a/oapi.yaml b/oapi.yaml index f616c2d..dbe1aed 100644 --- a/oapi.yaml +++ b/oapi.yaml @@ -486,6 +486,17 @@ components: x-oapi-codegen-extra-tags: validate: httpsurl binding: required + domainUrlNewsData: + type: string + description: Domain url corresponding to newsdata.io `domainurl` parameter + example: "timesofindia.indiatimes.com,theguardian.com" + x-oapi-codegen-extra-tags: + validate: omitnil,nonempty,nospace + example: + name: "New York Times" + summary: "American daily newspaper based in New York City" + tags: "news,media,journalism" + uri: "https://www.nytimes.com" SourcePatchInput: type: object @@ -512,6 +523,12 @@ components: example: "news,journalism,trusted" x-oapi-codegen-extra-tags: validate: omitnil,nospace,nonempty + domainUrlNewsData: + type: string + description: Domain url corresponding to newsdata.io `domainurl` parameter + example: "timesofindia.indiatimes.com,theguardian.com" + x-oapi-codegen-extra-tags: + validate: omitnil,nonempty,nospace Source: type: object @@ -523,6 +540,7 @@ components: - tags - uri - uriDigest + - domainUrlNewsData properties: name: type: string @@ -562,6 +580,12 @@ components: example: "https://www.nytimes.com" x-oapi-codegen-extra-tags: binding: required + domainUrlNewsData: + type: string + description: Domain url corresponding to newsdata.io `domainurl` parameter + example: "timesofindia.indiatimes.com,theguardian.com" + x-oapi-codegen-extra-tags: + binding: required CreateSourceResponse: type: object diff --git a/scripts/helper.py b/scripts/helper.py new file mode 100644 index 0000000..7a5b73b --- /dev/null +++ b/scripts/helper.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +"""Helper functions for scripts""" + +import json +import os +import sys +import yaml +from typing import Tuple +from urllib.request import Request, urlopen + +def get_text_from_url(url: str) -> str: + with urlopen(url, timeout=60) as r: + return r.read().decode("utf-8") + +def get_text_from_file(path: str) -> str: + with open(path, 'r', encoding='utf-8') as f: + return f.read() + +def get_oapi_spec(): + base_dir = os.path.dirname(os.path.dirname(__file__)) + oapi_path = os.path.join(base_dir, "oapi.yaml") + + with open(oapi_path, 'r', encoding='utf-8') as f: + return yaml.safe_load(f) + +def post_request(endpoint: str, headers: dict, payload: dict, timeout: int) -> Tuple[int, str]: + data = json.dumps(payload).encode("utf-8") + req = Request( + url=endpoint, + data=data, + headers=headers + ) + + try: + with urlopen(req, timeout=timeout) as r: + status = r.getcode() + body = r.read().decode("utf-8") + except Exception as e: + print(f"Error making request to {endpoint}: {e}", file=sys.stderr) + return -1, "" + + return status, body diff --git a/scripts/ingest_sources.py b/scripts/ingest_sources.py new file mode 100644 index 0000000..3f59876 --- /dev/null +++ b/scripts/ingest_sources.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 +"""Source doc specific logic for ingest_sources.py""" + +import os +import re +import sys +import yaml + +import helper +import openrouter + +WEB_SEARCH_TOOL = {"type": "openrouter:web_search"} + +MD_PROCESSING_SKILL_URL = os.getenv( + "MD_PROCESSING_SKILL_URL", + "https://raw.githubusercontent.com/semmet95/agent-skills/refs/heads/main/md-processing/SKILL.md" +) +SOURCE_FILTER_PROMPT = os.getenv( + "SOURCE_FILTER_PROMPT", + "What are the top 10 latest most popular news outlets in the world listed in this document? Only output URLs of these news outlets separated by new lines. Do not output anything else." +) +SOURCE_CLEANUP_PROMPT = os.getenv( + "SOURCE_CLEANUP_PROMPT", + ( + "Use web search to access these URLs and discard those that are invalid. Do not scrape the web page, only check if the URL is valid" + "Based on successful web searches, return a list of corresponding properly formatted URLs witout any extra test. Keep only one URL per line." + ) +) +SOURCE_DOC_GEN_PROMPT = os.getenv( + "SOURCE_DOC_GEN_PROMPT", + "Use web search to fetch information about these media outlets and create yaml docs for each of them following the source_schema schema. Do not output anything except for the yaml documents for these medial outlets separated by ---" +) + +def remove_ingested_sources(source_docs: list[str], sources_dir: str) -> list[str]: + # Load existing source files + existing = [] + try: + for fn in os.listdir(sources_dir): + if not (fn.endswith('.yaml') or fn.endswith('.yml')): + continue + path = os.path.join(sources_dir, fn) + try: + with open(path, 'r', encoding='utf-8') as f: + doc = yaml.safe_load(f) + if isinstance(doc, dict): + existing.append(doc) + except Exception as e: + print(f"Warning: failed to read/parse {path}: {e}", file=sys.stderr) + except Exception as e: + print(f"Error listing sources directory {sources_dir}: {e}", file=sys.stderr) + return source_docs + + cleaned: list[str] = [] + for doc_str in source_docs: + try: + doc = yaml.safe_load(doc_str) + except Exception as e: + print(f"Warning: failed to parse source_doc; not adding it to the source list it. Error: {e}", file=sys.stderr) + continue + if doc == None: + print(f"Warning: skipping the following yaml string as it failed to load: {doc_str}", file=sys.stderr) + continue + duplicate = False + for src in existing: + if doc.get('name') == src.get('name') or doc.get('uri') == src.get('uri'): + duplicate = True + break + if not duplicate: + cleaned.append(doc_str) + + return cleaned + +def write_source_docs(source_docs: list[str], sources_dir: str): + for doc_str in source_docs: + try: + parsed = yaml.safe_load(doc_str) + except Exception as e: + print(f"Warning: failed to parse src_doc : {doc_str}: {e}", file=sys.stderr) + continue + + filename = parsed.get('name') + if filename == None or filename.strip() == '': + print(f"Warning: invalid source name or failed to extract name field from : {doc_str}", file=sys.stderr) + continue + + # sanitize name to use as filename + filename = re.sub(r"[^A-Za-z0-9._-]+", "-", filename.strip()) + + filename = filename + ".yaml" + path = os.path.join(sources_dir, filename) + + # avoid overwriting existing files + if os.path.exists(path): + print(f"Warning: source file with name : {filename} already exists", file=sys.stderr) + continue + + try: + with open(path, 'w', encoding='utf-8') as f: + f.write(doc_str.strip() + "\n") + except Exception as e: + print(f"Error writing {path}: {e}", file=sys.stderr) + +def main(): + if len(sys.argv) < 2: + print("Usage: openrouter.py TMP_MD", file=sys.stderr) + sys.exit(1) + + tmp_md = sys.argv[1] + + # Fetch md processing skill + try: + md_processing_skill = helper.get_text_from_url(MD_PROCESSING_SKILL_URL) + except Exception as e: + print(f"Error: failed to fetch skill from {MD_PROCESSING_SKILL_URL}: {e}", file=sys.stderr) + sys.exit(1) + + # Fetch md file content + try: + md_doc = helper.get_text_from_file(tmp_md) + except Exception as e: + print(f"Error: failed to fetch content from file {tmp_md}: {e}", file=sys.stderr) + sys.exit(1) + + req_content = "Here is a web page in Markdown:\n\n" + md_doc + "\n\nAnswer this question:\n" + SOURCE_FILTER_PROMPT + raw_source_list = openrouter.req_w_addons(req_content, skill=md_processing_skill) + if raw_source_list == "": + print("Error: failed to get raw source list from openrouter", file=sys.stderr) + sys.exit(1) + + # openrouter call to get a clean formatted list of source urls + req_content = ( + "Here is a raw list of URLs of news outlets with each line containing one or more unformatted URLs:" + f"\n\n{raw_source_list}\n\n" + f"{SOURCE_CLEANUP_PROMPT}" + ) + source_list = openrouter.req_w_addons(req_content, tools=[WEB_SEARCH_TOOL]) + if source_list == "": + print("Error: failed to get filtered list of source urls", file=sys.stderr) + sys.exit(1) + + # load the source input example + src_input_schema = helper.get_oapi_spec()['components']['schemas']['SourceInput'] + src_example = src_input_schema.get('example') + + # generate source docs from the source url list + req_content = ( + "Extract schema from the following yaml document and store it as source_schema:" + f"\n\n{src_example}\n\n" + "Following is a list of urls of media outlets separated by new lines:" + f"\n\n{source_list}\n\n" + f"{SOURCE_DOC_GEN_PROMPT}" + ) + src_docs_str = openrouter.req_w_addons(req_content, tools=[WEB_SEARCH_TOOL]) + if src_docs_str == "": + print("Error: failed to generate source docs from source urls", file=sys.stderr) + sys.exit(1) + + source_docs = src_docs_str.split("---") + source_docs = list(filter(str.strip, source_docs)) + + sources_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "sources") + + # remove sources that are already in the `sources` directory + unique_src_docs = remove_ingested_sources(source_docs, sources_dir) + + # Write each unique source YAML doc into the `sources/` folder + write_source_docs(unique_src_docs, sources_dir) + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/openrouter.py b/scripts/openrouter.py index 4d0d464..1941f33 100644 --- a/scripts/openrouter.py +++ b/scripts/openrouter.py @@ -8,213 +8,64 @@ OPENROUTER_API_KEY must be set. """ -from __future__ import annotations - import json import os import sys -import re -import yaml -from typing import Tuple +import time -try: - from urllib.request import Request, urlopen - from urllib.error import HTTPError -except Exception: # pragma: no cover - very unlikely - raise +import helper -MD_PROCESSING_SKILL_URL = "https://raw.githubusercontent.com/semmet95/agent-skills/refs/heads/main/md-processing/SKILL.md" -SOURCE_QUESTION = "What are the top 10 latest most popular news outlets in the world listed in this document? Only output URLs of these news outlets separated by new lines. Do not output anything else." FREE_MODELS_DOC = [ - "openai/gpt-oss-120b:free", "google/gemma-4-31b-it:free", "nvidia/nemotron-3-nano-omni-30b-a3b-reasoning:free", - "google/gemma-4-26b-a4b-it:free", - "nvidia/nemotron-nano-12b-v2-vl:free", - "z-ai/glm-4.5-air:free" + "openrouter/free" ] +OPENROUTER_API_KEY = os.environ["OPENROUTER_API_KEY"] +MD_PROCESSING_SKILL_URL = os.getenv("MD_PROCESSING_SKILL_URL", "https://raw.githubusercontent.com/semmet95/agent-skills/refs/heads/main/md-processing/SKILL.md") +OPENROUTER_CHAT_ENDPOINT = os.getenv("OPENROUTER_CHAT_ENDPOINT", "https://openrouter.ai/api/v1/chat/completions") +OPENROUTER_MAX_RETRIES = int(os.getenv("OPENROUTER_MAX_RETRIES", "3")) + +def req_chat(payload: dict) -> str: + headers = { + "Authorization": f"Bearer {OPENROUTER_API_KEY}", + "Content-Type": "application/json", + } + + for i in range(1, OPENROUTER_MAX_RETRIES + 1): + status, body = helper.post_request( + OPENROUTER_CHAT_ENDPOINT, + headers, + payload, + 60, + ) + + if status in [500, 429]: + print(f"OpenRouter API returned status {status}, retrying...", file=sys.stderr) + time.sleep(10 * i) + continue -def fetch_text(url: str) -> str: - with urlopen(url, timeout=60) as r: - return r.read().decode("utf-8") - -def fetch_content(path: str) -> str: - with open(path, 'r') as f: - return f.read() - -def post_openrouter(api_key: str, payload: dict) -> Tuple[int, str]: - data = json.dumps(payload).encode("utf-8") - req = Request( - "https://openrouter.ai/api/v1/chat/completions", - data=data, - headers={ - "Content-Type": "application/json", - "Authorization": f"Bearer {api_key}", - }, - ) - - status = -1 - try: - with urlopen(req, timeout=60) as r: - status = r.getcode() - body = r.read().decode("utf-8") - except Exception as e: - print(f"Error making request to OpenRouter API: {e}", file=sys.stderr) - body = "" - - return status, body - -# TODO: add retry mechanism when hitting openrouter endpoints -def req_openrouter(payload: dict, api_key: str) -> str: - status, body = post_openrouter(api_key, payload) - - if status == 200: - try: - data = json.loads(body) - except Exception as e: - print(f"Failed to parse JSON response: {e}", file=sys.stderr) - print(body) - sys.exit(1) - - # Extract assistant reply - reply = None - try: - reply = data["choices"][0]["message"]["content"] - return reply - except Exception as e: - print(f"Failed to access key [choices][0][message][content]: {e}", file=sys.stderr) - sys.exit(1) - else: - print(f"Openrouter response status: {status}", file=sys.stderr) - return "" - -def process_raw_doc(md_processing_skill: str, md_doc: str, api_key: str) -> str: - for model in FREE_MODELS_DOC: - payload = { - "model": model, - "messages": [ - {"role": "system", "content": md_processing_skill}, - { - "role": "user", - "content": "Here is a web page in Markdown:\n\n" + md_doc + "\n\nAnswer this question:\n" + SOURCE_QUESTION - }, - ], - } - - raw_source_list = req_openrouter(payload, api_key) - if raw_source_list != "": - break - if raw_source_list == "": - print("Error: All models failed.", file=sys.stderr) - sys.exit(1) - return raw_source_list - -def process_raw_srcs(raw_source_list: str, api_key: str) -> str: - content = (f"Here is a raw list of URLs of news outlets with each line containing one or more unformatted URLs:\n\n" - f"{raw_source_list}\n\n" - f"Use web search to access these URLs and discard those that are invalid. Do not scrape the web page, only check if the URL is valid\n" - "Based on successful web searches, return a list of corresponding properly formatted URLs witout any extra test. Keep only one URL per line." - ) - for model in FREE_MODELS_DOC: - payload = { - "model": model, - "messages": [ - { - "role": "user", - "content": content - }, - ], - "tools": [ - {"type": "openrouter:web_search"} - ] - } - - source_list = req_openrouter(payload, api_key) - if source_list != "": - break - if source_list == "": - print("Error: All models failed.", file=sys.stderr) - sys.exit(1) - return source_list - -def get_latest_source() -> str: - base_dir = os.path.dirname(os.path.dirname(__file__)) - sources_dir = os.path.join(base_dir, "sources") - - try: - entries = [os.path.join(sources_dir, fn) for fn in os.listdir(sources_dir)] - files = [p for p in entries if os.path.isfile(p)] - except Exception as e: - print(f"Error listing sources directory {sources_dir}: {e}", file=sys.stderr) - sys.exit(1) - - if not files: - print(f"No files found in {sources_dir}", file=sys.stderr) - sys.exit(1) - - latest = max(files, key=lambda p: os.path.getmtime(p)) - - try: - with open(latest, 'r', encoding='utf-8') as f: - latest_content = f.read() - except Exception as e: - print(f"Error reading latest file {latest}: {e}", file=sys.stderr) - sys.exit(1) - - return latest_content - -def remove_ingested_sources(source_docs: list[str]) -> list[str]: - base_dir = os.path.dirname(os.path.dirname(__file__)) - sources_dir = os.path.join(base_dir, "sources") - - # Load existing source files - existing = [] - try: - for fn in os.listdir(sources_dir): - if not (fn.endswith('.yaml') or fn.endswith('.yml')): - continue - path = os.path.join(sources_dir, fn) + if status == 200: try: - with open(path, 'r', encoding='utf-8') as f: - doc = yaml.safe_load(f) - if isinstance(doc, dict): - existing.append(doc) + data = json.loads(body) except Exception as e: - print(f"Warning: failed to read/parse {path}: {e}", file=sys.stderr) - except Exception as e: - print(f"Error listing sources directory {sources_dir}: {e}", file=sys.stderr) - return source_docs - - cleaned: list[str] = [] - for doc_str in source_docs: - try: - doc = yaml.safe_load(doc_str) - except Exception as e: - print(f"Warning: failed to parse source_doc; not adding it to the source list it. Error: {e}", file=sys.stderr) - continue - if doc == None: - print(f"Warning: skipping the following yaml string as it failed to load: {doc_str}", file=sys.stderr) - continue - duplicate = False - for src in existing: - if doc.get('name') == src.get('name') or doc.get('uri') == src.get('uri'): - duplicate = True - break - if not duplicate: - cleaned.append(doc_str) + print(f"Failed to parse openrouter json response {body}: {e}", file=sys.stderr) + return "" - return cleaned + # Extract assistant reply + try: + return data["choices"][0]["message"]["content"] + except Exception as e: + print(f"Failed to access key [choices][0][message][content] in json {data}: {e}", file=sys.stderr) + return "" + else: + print(f"Openrouter response status: {status} with bode: {body}", file=sys.stderr) + break -def get_source_docs(sample: str, sources: str, api_key: str) -> str: - # TODO: generate yaml doc for once source at a time, however this consumes more requests and increases odds of failed requests - content = (f"Extract schema from the following yaml document and store it as source_schema:\n\n" - f"{sample}\n\n" - f"Following is a list of urls of media outlets separated by new lines\n\n" - f"{sources}\n\n" - "Use web search to fetch information about these media outlets and create yaml docs for each of them following the source_schema schema. Do not output anything except for the yaml documents for these medial outlets separated by ---." - ) + return "" +def req_w_addons(content: str, skill="", tools=[]) -> str: + resp = "" for model in FREE_MODELS_DOC: payload = { "model": model, @@ -224,91 +75,20 @@ def get_source_docs(sample: str, sources: str, api_key: str) -> str: "content": content }, ], - "tools": [ - {"type": "openrouter:web_search"} - ] } - source_docs = req_openrouter(payload, api_key) - if source_docs != "": - break - if source_docs == "": - print("Error: All models failed.", file=sys.stderr) - sys.exit(1) - return source_docs - -def main(): - if len(sys.argv) < 2: - print("Usage: openrouter.py TMP_MD", file=sys.stderr) - sys.exit(1) - - tmp_md = sys.argv[1] + if skill != "": + payload["messages"].append({"role": "system", "content": skill}) - api_key = os.getenv("OPENROUTER_API_KEY") - if not api_key: - print("Error: OPENROUTER_API_KEY environment variable is not set.", file=sys.stderr) - sys.exit(1) - - # Fetch md processing skill - try: - md_processing_skill = fetch_text(MD_PROCESSING_SKILL_URL) - except Exception as e: - print(f"Error: failed to fetch skill from {MD_PROCESSING_SKILL_URL}: {e}", file=sys.stderr) - sys.exit(1) - - # Fetch md file content - try: - md_doc = fetch_content(tmp_md) - except Exception as e: - print(f"Error: failed to fetch content from file {tmp_md}: {e}", file=sys.stderr) - sys.exit(1) - - raw_source_list = process_raw_doc(md_processing_skill, md_doc, api_key) - - # openrouter call to get a clean formatted list of source urls - source_list = process_raw_srcs(raw_source_list, api_key) - - # load the most recently added source - latest_source = get_latest_source() - - source_docs = get_source_docs(latest_source, source_list, api_key).split("---") - source_docs = list(filter(str.strip, source_docs)) - - # remove sources that are already in the `sources` directory - unique_src_docs = remove_ingested_sources(source_docs) - - # Write each unique source YAML doc into the `sources/` folder. - base_dir = os.path.dirname(os.path.dirname(__file__)) - sources_dir = os.path.join(base_dir, "sources") - - for doc_str in unique_src_docs: - try: - parsed = yaml.safe_load(doc_str) - except Exception as e: - print(f"Warning: failed to parse src_doc : {doc_str}: {e}", file=sys.stderr) - continue - - filename = parsed.get('name') - if filename == None or filename.strip() == '': - print(f"Warning: invalid source name or failed to extract name field from : {doc_str}", file=sys.stderr) - continue - - # sanitize name to use as filename - filename = re.sub(r"[^A-Za-z0-9._-]+", "-", filename.strip()) - - filename = filename + ".yaml" - path = os.path.join(sources_dir, filename) - - # avoid overwriting existing files - if os.path.exists(path): - print(f"Warning: source file with name : {filename} already exists", file=sys.stderr) - continue + if len(tools) > 0: + payload["tools"] = tools + + resp = req_chat(payload) + if resp != "": + break + print(f"{model} failed, retrying with another model...") - try: - with open(path, 'w', encoding='utf-8') as f: - f.write(doc_str.strip() + "\n") - except Exception as e: - print(f"Error writing {path}: {e}", file=sys.stderr) + if resp == "": + print("Error: All models failed.", file=sys.stderr) -if __name__ == "__main__": - raise SystemExit(main()) + return resp From 0e13a1ff16a6c90d1c9c80ebf1ce3fed1b54ec7c Mon Sep 17 00:00:00 2001 From: Amit Singh Date: Mon, 18 May 2026 19:21:23 +0530 Subject: [PATCH 2/3] refactor: simplifies claim ingestion logic Signed-off-by: Amit Singh --- scripts/helper.py | 35 +++++ scripts/ingest_claims.py | 159 ++++++++++++++++++++++ scripts/ingest_sources.py | 6 +- scripts/newsdata_io.py | 273 +------------------------------------- scripts/openrouter.py | 1 + 5 files changed, 202 insertions(+), 272 deletions(-) create mode 100644 scripts/ingest_claims.py diff --git a/scripts/helper.py b/scripts/helper.py index 7a5b73b..e81573a 100644 --- a/scripts/helper.py +++ b/scripts/helper.py @@ -2,8 +2,11 @@ """Helper functions for scripts""" import json +import demjson3 import os +import re import sys +import requests import yaml from typing import Tuple from urllib.request import Request, urlopen @@ -23,6 +26,38 @@ def get_oapi_spec(): with open(oapi_path, 'r', encoding='utf-8') as f: return yaml.safe_load(f) +def get_sources(base_url: str, api_key: str): + endpoint = f"{base_url}/api/v1/sources" + headers = {"X-API-Key": api_key} + response = requests.get(endpoint, headers=headers, timeout=90) + if response.status_code != 200: + print(f"Error: failed to get all sources: {response.status_code}") + return None + return response.json() + +def cleanup_json_str(json_str: str) -> str: + # models often return the json string wrapped in a code block or with incompatible values + match = re.search(r'```(?:json)?\s*(.*?)\s*```', json_str, re.DOTALL) + if match: + json_str = match.group(1) + + # Replace all Python boolean and None values + json_str = re.sub(r':\s*None\b', ': null', json_str) + json_str = re.sub(r':\s*False\b', ': false', json_str) + json_str = re.sub(r':\s*True\b', ': true', json_str) + + json_str = json_str.replace("`", "'") + json_str = json_str.strip().replace("'", "\'") + json_str = json.dumps(demjson3.decode(json_str)) + + return json_str + +def clean_filepath(path: str, replace: str = "_") -> str: + cleaned = re.sub(r'[^a-zA-Z0-9_-]', replace, path) + cleaned = re.sub(f'{re.escape(replace)}+', replace, cleaned) + cleaned = cleaned.strip(replace) + return cleaned + def post_request(endpoint: str, headers: dict, payload: dict, timeout: int) -> Tuple[int, str]: data = json.dumps(payload).encode("utf-8") req = Request( diff --git a/scripts/ingest_claims.py b/scripts/ingest_claims.py new file mode 100644 index 0000000..741ae6f --- /dev/null +++ b/scripts/ingest_claims.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python3 + +import glob +import json +import os +import sys +import yaml + +import helper +import newsdata_io +import openrouter + +API_BASE_URL = os.environ["API_BASE_URL"] +API_KEY = os.environ["API_KEY"] + +FALSIFIABLE_CLAIM_SKILL_URL = os.getenv( + "FALSIFIABLE_CLAIM_SKILL_URL", + "https://raw.githubusercontent.com/semmet95/agent-skills/refs/heads/main/determine-falsifialbe-claim/SKILL.md" +) +CLAIM_FILTER_PROMPT = os.getenv( + "CLAIM_FILTER_PROMPT", + ( + "Use web search tool to visit the link for each article, access the content and then assess if it is a falsifiable claim." + "Out of these 10 articles, only return 1 article that best fits the falsifiable claim criterion." + "Prefer claims that have been made by the news source directly" + "Keep the json structure of the claims the same as the original schema in the input. Do not add remove, or modify any key or value in the json string." + "Only output the plain json array string that I can safely unmarshal." + "Do not format the string. Do not output anything else." + ) +) + +def update_claim_fields(srcDigest: str, claim): + claim["sourceUriDigest"] = srcDigest + claim["summary"] = claim["description"] + claim["uri"] = claim["link"] + +def get_claim_docs(claims_dir: str): + # Find all YAML files in claims directory and subdirectories + yaml_files = glob.glob(os.path.join(claims_dir, "**", "*.yaml"), recursive=True) + yaml_files.extend(glob.glob(os.path.join(claims_dir, "**", "*.yml"), recursive=True)) + + claims_array = [] + for yaml_file in yaml_files: + try: + with open(yaml_file, 'r') as f: + claim_data = yaml.safe_load(f) + if claim_data: # Only add if file is not empty + claims_array.append(claim_data) + except Exception as e: + print(f"Error loading YAML file {yaml_file}: {e}", file=sys.stderr) + + return claims_array + +def get_new_claims(all_claim_docs, new_claims, srcUriDigest): + unique_claims = [] + + for claim in new_claims: + update_claim_fields(srcUriDigest, claim) + new_claim = True + for claim_doc in all_claim_docs: + if claim["uri"] == claim_doc["uri"] or claim["title"] == claim_doc["title"]: + new_claim = False + break + if new_claim: + unique_claims.append(claim) + + return unique_claims + +def create_claim_docs(claims: list, srcName: str): + claim_input_schema = helper.get_oapi_spec()['components']['schemas']['ClaimInput'] + claim_example = claim_input_schema.get('example') + + # Custom representer to force double quotes around strings + def quoted_str_representer(dumper, data): + return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='"') + + yaml.add_representer(str, quoted_str_representer) + + for claim in claims: + claim_doc = {} + # keep only relevant fields in the claims + for key in claim_example.keys(): + claim_doc[key] = str(claim[key]) + + filename = claim_doc["title"].lower() + if len(filename) > 30: + filename = filename[:30] + filename = helper.clean_filepath(filename) + filename = f"{filename}.yaml" + + dirname = srcName.lower() + if len(dirname) > 30: + dirname = dirname[:30] + dirname = helper.clean_filepath(dirname) + + # Create file path + file_path = os.path.join("claims", dirname, filename) + + # Write claim_doc to YAML file + os.makedirs(os.path.dirname(file_path), exist_ok=True) + with open(file_path, 'w', encoding='utf-8') as f: + yaml.dump(claim_doc, f, default_flow_style=False, allow_unicode=True, width=float('inf')) + + print(f"Created claim document: {file_path}") + +# TODO: maintain a list of sources for which domainUrlNewsData fields needs to be updated +def main(): + claims_dir = os.path.join(os.path.dirname(__file__), "..", "claims") + claims_dir = os.path.abspath(claims_dir) + all_claim_docs = get_claim_docs(claims_dir) + + sources = helper.get_sources(API_BASE_URL, API_KEY) + if sources == None: + print(f"Error: failed to fetch all sources", file=sys.stderr) + sys.exit(1) + + # Fetch falsifiable claim skill + try: + falsifiable_claim_skill = helper.get_text_from_url(FALSIFIABLE_CLAIM_SKILL_URL) + except Exception as e: + print(f"Error: failed to fetch skill from {FALSIFIABLE_CLAIM_SKILL_URL}: {e}", file=sys.stderr) + sys.exit(1) + + for source in sources: + domain_url = source["uri"] + if source["domainUrlNewsData"] != "": + domain_url = source["domainUrlNewsData"] + + claims = newsdata_io.get_claims(domain_url) + if claims == None or len(claims) == 0: + continue + + # keep only those articles that can be classified as falsifiable claims + req_content = ( + "Following is a list of 10 articles published by the same news outlet. Each article is represented by a json string type element in the array" + f"\n\n{claims}\n\n" + f"{CLAIM_FILTER_PROMPT}" + ) + filtered_claims = openrouter.req_w_addons(req_content, skill=falsifiable_claim_skill, tools=[openrouter.WEB_SEARCH_TOOL]) + if filtered_claims == "": + print(f"Error: failed to filter claims for source {source['name']}", file=sys.stderr) + continue + + try: + filtered_claims_list = json.loads(helper.cleanup_json_str(filtered_claims)) + except Exception as e: + print(f"Error: failed to cleanup and unmarshal claims json string {filtered_claims}: {e}", file=sys.stderr) + continue + + if len(filtered_claims_list) == 0: + print(f"Error: no claims found for {source['name']} in: {filtered_claims}", file=sys.stderr) + continue + + # list of new claims to be ingested + new_unique_claims = get_new_claims(all_claim_docs, filtered_claims_list, source["uriDigest"]) + create_claim_docs(new_unique_claims, source["name"]) + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/ingest_sources.py b/scripts/ingest_sources.py index 3f59876..baaf0a1 100644 --- a/scripts/ingest_sources.py +++ b/scripts/ingest_sources.py @@ -9,8 +9,6 @@ import helper import openrouter -WEB_SEARCH_TOOL = {"type": "openrouter:web_search"} - MD_PROCESSING_SKILL_URL = os.getenv( "MD_PROCESSING_SKILL_URL", "https://raw.githubusercontent.com/semmet95/agent-skills/refs/heads/main/md-processing/SKILL.md" @@ -133,7 +131,7 @@ def main(): f"\n\n{raw_source_list}\n\n" f"{SOURCE_CLEANUP_PROMPT}" ) - source_list = openrouter.req_w_addons(req_content, tools=[WEB_SEARCH_TOOL]) + source_list = openrouter.req_w_addons(req_content, tools=[openrouter.WEB_SEARCH_TOOL]) if source_list == "": print("Error: failed to get filtered list of source urls", file=sys.stderr) sys.exit(1) @@ -150,7 +148,7 @@ def main(): f"\n\n{source_list}\n\n" f"{SOURCE_DOC_GEN_PROMPT}" ) - src_docs_str = openrouter.req_w_addons(req_content, tools=[WEB_SEARCH_TOOL]) + src_docs_str = openrouter.req_w_addons(req_content, tools=[openrouter.WEB_SEARCH_TOOL]) if src_docs_str == "": print("Error: failed to generate source docs from source urls", file=sys.stderr) sys.exit(1) diff --git a/scripts/newsdata_io.py b/scripts/newsdata_io.py index 0070f47..c270596 100644 --- a/scripts/newsdata_io.py +++ b/scripts/newsdata_io.py @@ -1,16 +1,7 @@ #!/usr/bin/env python3 -import glob -import demjson3 -import json import os -import re -import sys -import time -from typing import Tuple -from urllib.request import Request, urlopen import requests -import yaml # Query parameters constants CATEGORY = "environment,technology,world" @@ -19,146 +10,18 @@ SIZE = "10" DATATYPE = "news,research,analysis,pressRelease" -FALSIFIABLE_CLAIM_SKILL_URL = "https://raw.githubusercontent.com/semmet95/agent-skills/refs/heads/main/determine-falsifialbe-claim/SKILL.md" -CLAIM_PER_SOURCE = 2 -FREE_MODELS_DOC = [ - "google/gemma-4-31b-it:free", - "nvidia/nemotron-3-nano-omni-30b-a3b-reasoning:free", - "openrouter/free" -] +NEWSDATA_API_BASE_URL = os.getenv("NEWSDATA_API_BASE_URL", "https://newsdata.io/api/1") +NEWSDATA_API_KEY = os.environ["NEWSDATA_API_KEY"] - -def fetch_web_text(url: str) -> str: - with urlopen(url, timeout=10) as r: - return r.read().decode("utf-8") - -def get_sources(base_url: str, api_key: str): - endpoint = f"{base_url}/api/v1/sources" - headers = {"X-API-Key": api_key} - response = requests.get(endpoint, headers=headers, timeout=90) - if response.status_code != 200: - print(f"Error: Received status code {response.status_code}") - exit(1) - return response.json() - -def post_openrouter(base_url: str, api_key: str, payload: dict) -> Tuple[int, str]: - data = json.dumps(payload).encode("utf-8") - req = Request( - f"{base_url}/chat/completions", - data=data, - headers={ - "Content-Type": "application/json", - "Authorization": f"Bearer {api_key}", - }, - ) - - status = -1 - try: - with urlopen(req, timeout=180) as r: - status = r.getcode() - body = r.read().decode("utf-8") - except Exception as e: - print(f"Error making request to OpenRouter API: {e}", file=sys.stderr) - body = "" - - return status, body - -def req_openrouter(base_url: str, api_key: str, payload: dict) -> str: - status, body = post_openrouter(base_url, api_key, payload) - - if status == 200: - try: - data = json.loads(body) - except Exception as e: - print(f"Failed to parse JSON response: {e}", file=sys.stderr) - print(body) - return "" - - # Extract assistant reply - reply = None - try: - reply = data["choices"][0]["message"]["content"] - return reply - except Exception as e: - print(f"Failed to access key [choices][0][message][content]: {e}", file=sys.stderr) - return "" - else: - print(f"Openrouter response status: {status}", file=sys.stderr) - return "" - -def filter_claims(base_url: str, api_key: str, falsifiable_claim_skill: str, claims): - filter_prompt = ( - "Following is a list of 10 articles published by the same news outlet. Each article is represented by a json string type element in the array\n\n" - f"{claims}" - "\n\nUse web search tool to visit the link for each article, access the content and then assess if it is a falsifiable claim." - "\nOut of these 10 articles, only return 1 article that best fits the falsifiable claim criterion." - "Prefer claims that have been made by the news source directly" - "Keep the json structure of the claims the same as the original schema in the input. Do not add remove, or modify any key or value in the json string." - "Only output the plain json array string that I can safely unmarshal." - "Do not format the string. Do not output anything else." - ) - - filtered_claims = "" - filtered_claims_list = [] - ctr = 1 - for model in FREE_MODELS_DOC: - payload = { - "model": model, - "messages": [ - {"role": "system", "content": falsifiable_claim_skill}, - { - "role": "user", - "content": filter_prompt - }, - ], - "tools": [ - {"type": "openrouter:web_search"} - ] - } - - filtered_claims = req_openrouter(base_url, api_key, payload) - if filtered_claims != None and filtered_claims != "" and filtered_claims != "[]": - # Replace all Python boolean and None values - filtered_claims = re.sub(r':\s*None\b', ': null', filtered_claims) - filtered_claims = re.sub(r':\s*False\b', ': false', filtered_claims) - filtered_claims = re.sub(r':\s*True\b', ': true', filtered_claims) - - # models often return the json string wrapped in a code block or with incompatible values - match = re.search(r'```(?:json)?\s*(.*?)\s*```', filtered_claims, re.DOTALL) - if match: - filtered_claims = match.group(1) - try: - filtered_claims = filtered_claims.replace("`", "'") - filtered_claims = filtered_claims.strip().replace("'", "\'") - filtered_claims = json.dumps(demjson3.decode(filtered_claims)) - filtered_claims_list = json.loads(filtered_claims) - except Exception as e: - print(f"Error: failed to unmarshal claims json string {filtered_claims}: {e}", file=sys.stderr) - continue - - if filtered_claims_list != None and len(filtered_claims_list) != 0: - break - - # delay before sending the request again - time.sleep(30*ctr) - ctr += 1 - - if filtered_claims_list == None or len(filtered_claims_list) == 0: - print("Error: All models failed.", file=sys.stderr) - sys.exit(1) - - - return filtered_claims_list - -def get_claims(base_url: str, api_key: str, src_domain_url: str): - endpoint = f"{base_url}/latest" +def get_claims(src_domain_url: str): + endpoint = f"{NEWSDATA_API_BASE_URL}/latest" params = { "category": CATEGORY, "language": LANGUAGE, "removeduplicate": REMOVE_DUPLICATE, "size": SIZE, "datatype": DATATYPE, - "apikey": api_key, + "apikey": NEWSDATA_API_KEY, "domainurl": src_domain_url } response = requests.get(endpoint, params=params, timeout=10) @@ -166,129 +29,3 @@ def get_claims(base_url: str, api_key: str, src_domain_url: str): print(f"Error: couldn't fetch claims for {src_domain_url}: {response.status_code}") return None return response.json()["results"] - -def update_claim_fields(srcDigest: str, claim): - claim["sourceUriDigest"] = srcDigest - claim["summary"] = claim["description"] - claim["uri"] = claim["link"] - -def get_claim_docs(): - claims_dir = os.path.join(os.path.dirname(__file__), "..", "claims") - claims_dir = os.path.abspath(claims_dir) - - # Find all YAML files in claims directory and subdirectories - yaml_files = glob.glob(os.path.join(claims_dir, "**", "*.yaml"), recursive=True) - yaml_files.extend(glob.glob(os.path.join(claims_dir, "**", "*.yml"), recursive=True)) - - claims_array = [] - for yaml_file in yaml_files: - try: - with open(yaml_file, 'r') as f: - claim_data = yaml.safe_load(f) - if claim_data: # Only add if file is not empty - claims_array.append(claim_data) - except Exception as e: - print(f"Error loading YAML file {yaml_file}: {e}", file=sys.stderr) - - return claims_array - -def is_claim_new(claim) -> bool: - claim_docs = get_claim_docs() - - for claim_doc in claim_docs: - if claim["uri"] == claim_doc["uri"] or claim["title"] == claim_doc["title"]: - return False - - return True - -def clean_filepath(path: str, replace: str = "_") -> str: - cleaned = re.sub(r'[^a-zA-Z0-9_-]', replace, path) - cleaned = re.sub(f'{re.escape(replace)}+', replace, cleaned) - cleaned = cleaned.strip(replace) - return cleaned - -def create_claim_docs(claims: list, srcName: str): - with open('oapi.yaml', 'r') as f: - oapi_spec = yaml.safe_load(f) - - claim_input_schema = oapi_spec['components']['schemas']['ClaimInput'] - claim_example = claim_input_schema.get('example') - - # Custom representer to force double quotes around strings - def quoted_str_representer(dumper, data): - return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='"') - - yaml.add_representer(str, quoted_str_representer) - - for claim in claims: - claim_doc = {} - for key in claim_example.keys(): - claim_doc[key] = str(claim[key]) - - filename = claim_doc["title"].lower() - if len(filename) > 30: - filename = filename[:30] - filename = clean_filepath(filename) - filename = f"{filename}.yaml" - - dirname = srcName.lower() - if len(dirname) > 30: - dirname = dirname[:30] - dirname = clean_filepath(dirname) - - # Create file path - file_path = os.path.join("claims", dirname, filename) - - # Write claim_doc to YAML file - os.makedirs(os.path.dirname(file_path), exist_ok=True) - with open(file_path, 'w', encoding='utf-8') as f: - yaml.dump(claim_doc, f, default_flow_style=False, allow_unicode=True, width=float('inf')) - - print(f"Created claim document: {file_path}") - -def main(): - base_url = os.environ["API_BASE_URL"] - api_key = os.environ["API_KEY"] - news_data_base_url = os.environ["NEWSDATA_API_BASE_URL"] - news_data_api_key = os.environ["NEWSDATA_API_KEY"] - openrouter_api_key = os.environ["OPENROUTER_API_KEY"] - openrouter_base_url = os.environ["OPENROUTER_API_BASE_URL"] - - sources = get_sources(base_url, api_key) - - # Fetch falsifiable claim skill - try: - falsifiable_claim_skill = fetch_web_text(FALSIFIABLE_CLAIM_SKILL_URL) - except Exception as e: - print(f"Error: failed to fetch skill from {FALSIFIABLE_CLAIM_SKILL_URL}: {e}", file=sys.stderr) - sys.exit(1) - - src_to_patch = set() - for source in sources: - domain_url = source["uri"] - if source["domainUrlNewsData"] != "": - domain_url = source["domainUrlNewsData"] - else: - src_to_patch.add(source["uriDigest"]) - - claims = get_claims(news_data_base_url, news_data_api_key, domain_url) - if claims == None or len(claims) == 0: - continue - - # add gaps between openrouter api requests - time.sleep(30) - # keep only those articles that can be classified as falsifiable claims - filtered_claims = filter_claims(openrouter_base_url, openrouter_api_key, falsifiable_claim_skill, claims) - - # list of new claims to be ingested - new_claims = [] - # keep only relevant fields in the claims - for claim in filtered_claims: - update_claim_fields(source["uriDigest"], claim) - if is_claim_new(claim): - new_claims.append(claim) - - create_claim_docs(new_claims, source["name"]) - -if __name__ == "__main__": - sys.exit(main()) \ No newline at end of file diff --git a/scripts/openrouter.py b/scripts/openrouter.py index 1941f33..11fa78e 100644 --- a/scripts/openrouter.py +++ b/scripts/openrouter.py @@ -20,6 +20,7 @@ "nvidia/nemotron-3-nano-omni-30b-a3b-reasoning:free", "openrouter/free" ] +WEB_SEARCH_TOOL = {"type": "openrouter:web_search"} OPENROUTER_API_KEY = os.environ["OPENROUTER_API_KEY"] MD_PROCESSING_SKILL_URL = os.getenv("MD_PROCESSING_SKILL_URL", "https://raw.githubusercontent.com/semmet95/agent-skills/refs/heads/main/md-processing/SKILL.md") From a69be54ba170cab13eb03b1df9ce7ab1ae2d5bd1 Mon Sep 17 00:00:00 2001 From: Amit Singh Date: Mon, 18 May 2026 21:38:01 +0530 Subject: [PATCH 3/3] refactor: minor improvements Signed-off-by: Amit Singh --- scripts/helper.py | 2 +- scripts/ingest_claims.py | 11 ++++++++--- scripts/ingest_sources.py | 6 +++--- scripts/openrouter.py | 2 +- scripts/source_scraper.sh | 2 +- 5 files changed, 14 insertions(+), 9 deletions(-) diff --git a/scripts/helper.py b/scripts/helper.py index e81573a..bab6234 100644 --- a/scripts/helper.py +++ b/scripts/helper.py @@ -72,6 +72,6 @@ def post_request(endpoint: str, headers: dict, payload: dict, timeout: int) -> T body = r.read().decode("utf-8") except Exception as e: print(f"Error making request to {endpoint}: {e}", file=sys.stderr) - return -1, "" + return 0, "" return status, body diff --git a/scripts/ingest_claims.py b/scripts/ingest_claims.py index 741ae6f..d885a00 100644 --- a/scripts/ingest_claims.py +++ b/scripts/ingest_claims.py @@ -42,7 +42,7 @@ def get_claim_docs(claims_dir: str): claims_array = [] for yaml_file in yaml_files: try: - with open(yaml_file, 'r') as f: + with open(yaml_file, 'r', encoding='utf-8') as f: claim_data = yaml.safe_load(f) if claim_data: # Only add if file is not empty claims_array.append(claim_data) @@ -95,6 +95,11 @@ def quoted_str_representer(dumper, data): # Create file path file_path = os.path.join("claims", dirname, filename) + + # avoid overwriting existing files + if os.path.exists(file_path): + print(f"Warning: claim file with name : {file_path} already exists", file=sys.stderr) + continue # Write claim_doc to YAML file os.makedirs(os.path.dirname(file_path), exist_ok=True) @@ -110,7 +115,7 @@ def main(): all_claim_docs = get_claim_docs(claims_dir) sources = helper.get_sources(API_BASE_URL, API_KEY) - if sources == None: + if sources is None: print(f"Error: failed to fetch all sources", file=sys.stderr) sys.exit(1) @@ -127,7 +132,7 @@ def main(): domain_url = source["domainUrlNewsData"] claims = newsdata_io.get_claims(domain_url) - if claims == None or len(claims) == 0: + if claims is None or len(claims) == 0: continue # keep only those articles that can be classified as falsifiable claims diff --git a/scripts/ingest_sources.py b/scripts/ingest_sources.py index baaf0a1..13a0b9c 100644 --- a/scripts/ingest_sources.py +++ b/scripts/ingest_sources.py @@ -55,7 +55,7 @@ def remove_ingested_sources(source_docs: list[str], sources_dir: str) -> list[st except Exception as e: print(f"Warning: failed to parse source_doc; not adding it to the source list it. Error: {e}", file=sys.stderr) continue - if doc == None: + if doc is None: print(f"Warning: skipping the following yaml string as it failed to load: {doc_str}", file=sys.stderr) continue duplicate = False @@ -77,7 +77,7 @@ def write_source_docs(source_docs: list[str], sources_dir: str): continue filename = parsed.get('name') - if filename == None or filename.strip() == '': + if filename is None or filename.strip() == '': print(f"Warning: invalid source name or failed to extract name field from : {doc_str}", file=sys.stderr) continue @@ -100,7 +100,7 @@ def write_source_docs(source_docs: list[str], sources_dir: str): def main(): if len(sys.argv) < 2: - print("Usage: openrouter.py TMP_MD", file=sys.stderr) + print("Usage: ingest_sources.py TMP_MD", file=sys.stderr) sys.exit(1) tmp_md = sys.argv[1] diff --git a/scripts/openrouter.py b/scripts/openrouter.py index 11fa78e..20d0b90 100644 --- a/scripts/openrouter.py +++ b/scripts/openrouter.py @@ -41,7 +41,7 @@ def req_chat(payload: dict) -> str: 60, ) - if status in [500, 429]: + if status == 0 or status == 429 or 500 <= status < 600: print(f"OpenRouter API returned status {status}, retrying...", file=sys.stderr) time.sleep(10 * i) continue diff --git a/scripts/source_scraper.sh b/scripts/source_scraper.sh index bc07cb2..18cfc11 100755 --- a/scripts/source_scraper.sh +++ b/scripts/source_scraper.sh @@ -22,4 +22,4 @@ trap 'rm -f "$TMP_MD"' EXIT python3 scripts/scrape_firecrawl.py "$URL" >"$TMP_MD" # Process the scraped document using LLMs -python3 scripts/openrouter.py "$TMP_MD" +python3 scripts/ingest_sources.py "$TMP_MD"