Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/update_html.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ jobs:
ref: gh-pages

- name: Setup Pages
uses: actions/configure-pages@v3
uses: actions/configure-pages@v4

- name: Upload artifact
uses: actions/upload-pages-artifact@v2
uses: actions/upload-pages-artifact@v3
with:
# Upload entire repository
path: '.'

- name: Deploy to GitHub Pages
id: deployment
uses: actions/deploy-pages@v2
uses: actions/deploy-pages@v4
192 changes: 192 additions & 0 deletions jinja-tests/test_jinja.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import json
import datetime
import re
from bs4 import BeautifulSoup
from lxml import etree
from jinja2 import Environment, FileSystemLoader, pass_context
import os
import sys
import inspect
from html import escape as html_escape, unescape as html_unescape

templates = ['dublincore', 'dcat-us', 'marc-21', 'datacite', 'ddi', 'schema.org', 'icpsr-schema']

if len(sys.argv) < 2:
print("\nUsage: python test_jinja.py <'dublincore', 'dcat-us', 'marc-21', 'datacite', 'ddi', 'schema.org', 'icpsr-schema'>")
sys.exit(1)

if sys.argv[1] not in templates:
print("\nInvalid template. Usage: python test_jinja.py <'dublincore', 'dcat-us', 'marc-21', 'datacite', 'ddi', 'schema.org', 'icpsr-schema'>")
sys.exit(1)

# Define paths
template_dir = "C:/icpsr_github/metadata/jinja-tests"

# Determine format based on template argument
fmt = "xml" if sys.argv[1] in ['dublincore', 'marc-21', 'datacite', 'ddi'] else "json"
template_file = f"{sys.argv[1].strip()}_template.{fmt}.jinja"

json_file_path = os.path.join(template_dir, "export_request-1256-20250213T203045.json")
crosswalk_file_path = os.path.join(template_dir, "dcat-us_crosswalk.json") # Adjust filename as needed
output_file = os.path.join(template_dir, os.path.splitext(template_file)[0].replace('template', 'test'))

# Load JSON data
with open(json_file_path, "r", encoding="utf-8") as json_file:
data = json.load(json_file)

# Load crosswalk mappings
try:
with open(crosswalk_file_path, "r", encoding="utf-8") as crosswalk_file:
crosswalk_dict = json.load(crosswalk_file)
except FileNotFoundError:
print(f"Warning: Crosswalk file not found at {crosswalk_file_path}. Proceeding without it.")
crosswalk_dict = {}

# Extract study data (assuming single study per export)
study_key = next(iter(data)) # Get first key (e.g., "pcms_study_5512")
tree = data[study_key]

# Initialize Jinja environment
env = Environment(loader=FileSystemLoader(template_dir))

# === Register Custom Filters ===
def strip_tags(string, strip=False):
"""Strip HTML tags from a string while preserving whitespace."""
return BeautifulSoup(html_unescape(string), "html.parser").get_text(strip=strip)

def from_iso_date(string):
"""Parse an ISO date and return a datetime object."""
return datetime.datetime.fromisoformat(string)

def format_date(dttm, format='%Y-%m-%dT%H:%M:%S', length=None):
"""Convert a datetime to a formatted string."""
result = dttm.strftime(format)
length = length or len(result)
return result[:length]

def jsonify(obj):
""" Converts lists and strings to JSON-compliant strings. """
if inspect.isgenerator(obj):
return json.dumps(list(obj))
return json.dumps(obj)

def split(string, delim='~~'):
"""Splits a string by a delimiter."""
return string.split(delim)

def flatten_date_ranges(ranges, delimiter='/'):
"""Flatten an array of date range objects into an array of 'startDate/endDate' strings."""
flattened = [[r.get('startDate'), r.get('endDate')] for r in ranges]
joined = [delimiter.join(filter(None, f)) for f in flattened]
return list(filter(None, joined))

def collapse_date_ranges(ranges, delimiter='/'):
"""Collapses an array of date range objects into a single range of the form 'minDate/maxDate'."""
dates = [r.get('startDate') for r in ranges] + [r.get('endDate') for r in ranges]
if dates := set(filter(None, dates)):
return delimiter.join([min(dates), max(dates)])
return None

def defaultattr(lst, attr, value):
"""Sets an attribute on objects in a list if the attribute is missing."""
return [{**obj, attr: obj.get(attr, value)} for obj in lst]

# @pass_context
# def crosswalk(context, string, field):
# """Applies a crosswalk to a field using regex or simple mappings."""
# field = field.lower()
# if field not in crosswalk_dict:
# return string # No mapping available, return original value

# mapping_info = crosswalk_dict[field]
# use_regex = mapping_info.get("regex", False)
# mapping = mapping_info.get("mapping", {})

# if use_regex:
# for pattern, replacement in mapping.items():
# if re.match(pattern, string, re.IGNORECASE):
# return replacement
# return string # No match found, return original value
# else:
# return mapping.get(string.lower(), string) # Simple lookup

# @pass_context
# def crosswalk(context, string, field):
# """Applies a crosswalk mapping to a field using regex or simple mappings."""
# field = field.lower()
# if field not in crosswalk_dict:
# return string # No mapping found, return original value

# mapping_info = crosswalk_dict[field]
# use_regex = mapping_info.get("regex", False)
# mapping = mapping_info.get("mapping", {})

# if not string: # Handle empty values
# return string

# if use_regex:
# for pattern, replacement in mapping.items():
# if re.search(pattern, string, re.IGNORECASE): # Use `search()` instead of `match()`
# return replacement
# return string # No match found, return original value
# else:
# return mapping.get(string.lower(), string) # Simple lookup
@pass_context
def crosswalk(context, string, field):
""" Applies a crosswalk to a field.

Assumes the crosswalk is a dict available from context.parent whose keys are field names and
whose values are dicts of the form {'regex': True/False, 'mapping': {'oldvalue1': 'newvalue1', ...}}.

When a mapping uses regular expressions, this method will iterate over all values in the mapping
searching for a match. Otherwise, this method will perform a simple look-up.

If either the field or the value is not in the crosswalk, this method returns None.
"""
field = field.lower()
crosswalk_dict = context.parent.get('crosswalk', {})
mapping = crosswalk_dict.get(field, {}).get('mapping', {})
uses_regex = crosswalk_dict.get(field, {}).get('regex', False)

if uses_regex:
for pattern, new_value in mapping.items():
if re.match(pattern, string, re.IGNORECASE):
return new_value
return None
return mapping.get(string.lower())

# Add filters to Jinja
env.filters["strip_tags"] = strip_tags
env.filters["from_iso_date"] = from_iso_date
env.filters["format_date"] = format_date
env.filters["crosswalk"] = crosswalk
env.filters["jsonify"] = jsonify
env.filters["split"] = split
env.filters["flatten_date_ranges"] = flatten_date_ranges
env.filters["collapse_date_ranges"] = collapse_date_ranges
env.filters["defaultattr"] = defaultattr

# Load template
template = env.get_template(template_file)

# Render template
rendered = template.render(tree=tree, crosswalk=crosswalk_dict)

# === Format XML or JSON ===
def clean_xml(xml_string):
"""Parse and pretty-print XML while removing unnecessary whitespace."""
parser = etree.XMLParser(remove_blank_text=True)
root = etree.fromstring(xml_string.encode(), parser)
return etree.tostring(root, pretty_print=True, encoding="utf-8").decode()

# Clean up the output
if fmt == 'xml':
formatted_output = clean_xml(rendered)
else:
formatted_output = json.dumps(json.loads(rendered), indent=4)

# Save output
with open(output_file, "w", encoding="utf-8") as f:
f.write(formatted_output)

print(f"Rendered {fmt.upper()} saved to {output_file}")
14 changes: 8 additions & 6 deletions markdown/icpsr_metadata_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@
## Under Development: A New API to Export Metadata!

ICPSR is developing a new application programming interface (API) so that community members can perform bulk exports of metadata records. This new API will:
- Simplify and standardize the process of accessing metadata records.
- Allow ICPSR to provide metadata in a broader range of standards and formats.
- Support more complex queries so that users can find the metadata records that best meet their needs.

- Simplify and standardize the process of accessing metadata records.
- Allow ICPSR to provide metadata in a broader range of standards and formats.
- Support more complex queries so that users can find the metadata records that best meet their needs.

## ICPSR Metadata API Mappings

Upon its release, the ICPSR Metadata API will produce records that conform to the following standards (with more to come in the future):
- [DCAT-US](https://resources.data.gov/resources/dcat-us/)
- [MARCXML](https://www.loc.gov/standards/marcxml/)
- [Dublin Core](https://www.dublincore.org/specifications/dublin-core/dcmi-terms/).

- [DCAT-US](https://resources.data.gov/resources/dcat-us/): a U.S. government extension of the Data Catalog Vocabulary (DCAT), designed to improve the discoverability and interoperability of federal open data. It provides a standardized way to describe datasets, data services, and distributions using RDF-based metadata, ensuring consistency across data catalogs like data.gov. This standard aligns with international best practices while incorporating specific requirements for U.S. government data publishing.
- [MARCXML](https://www.loc.gov/standards/marcxml/): an XML-based representation of the MARC (Machine-Readable Cataloging) standard, developed by the Library of Congress for bibliographic and authority data. It preserves the structure and semantics of MARC records while enabling interoperability with modern XML-based systems. This format allows libraries and archives to exchange, transform, and integrate catalog data more easily with digital repositories and web technologies.
- [Dublin Core](https://www.dublincore.org/specifications/dublin-core/dcmi-terms/): a simple yet flexible schema for describing digital and physical resources, widely used for interoperability across different information systems. Designed to enhance resource discovery and metadata sharing, Dublin Core is commonly used in libraries, data repositories, and web-based metadata applications.

This [Metadata API Mappings](https://docs.google.com/spreadsheets/d/1Avw212FfzxRjsUFvlJOLtsJclKeL8VJc0pbhLQevXg8/edit?usp=sharing) spreadsheet provides more information about how ICPSR metadata elements align with the above-mentioned standards.

Expand Down
85 changes: 85 additions & 0 deletions ror/ror_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import requests
import json
import sys
import os
import csv
import unicodedata

def clean_strings(txt):
clean_txt = txt.strip().lower().replace(' - ', ' ').replace(' ', '+').replace('&', '%26')
return ''.join(c for c in unicodedata.normalize('NFKD', clean_txt) if not unicodedata.combining(c))

work_dir = os.path.dirname(os.path.abspath(__file__))

org_csv = os.path.join(work_dir, 'ACTIVE_MEMBER_ORGS.csv')
org_ror_csv = os.path.join(work_dir, 'ACTIVE_MEMBER_ROR_IDs.csv')

#get info from our CSV file
with open(org_csv, mode="r", encoding="utf-8") as file:
reader = csv.DictReader(file)
rows = list(reader)

counter = 0
no_matches = []

for row in rows:
counter += 1
found = False

search_string = clean_strings(row["NAME"])
print(f'\n\nWorking on #{counter}: {row['NAME']}')

# Define the API URL
url = f"https://api.ror.org/v1/organizations?affiliation={search_string}"

# Make the API request
response = requests.get(url)
response.encoding = 'utf-8'

# Check if the request was successful
if response.status_code == 200:
data = response.json() # Parse JSON response

if data['number_of_results'] > 0:
print('\tFound results...')

likely_match = data['items'][0]
if likely_match['chosen'] == True:

#try to match city/state
if row["COUNTRY"].strip() == 'USA':
country = "United States"

if (row["STATE"].strip()s in likely_match['organization']['addresses'][0]['geonames_city']['geonames_admin1']['code']) and (country in likely_match['organization']['country']['country_name']):
found = True

else:
country = row["COUNTRY"].strip()

if country.lower() in likely_match['organization']['country']['country_name'].lower():
found = True
else:
print('Found nothing!')

else:
print(f"Error: {response.status_code} - {response.text}")

if found:
print('\tMatch made!')
row["ROR_NAME"] = likely_match['organization']['name']
row["ROR_ID"] = likely_match['organization']['id']
else:
print('\tNo match...')
row["ROR_NAME"] = ''
row["ROR_ID"] = ''

# Get all field names (preserve existing + add 'ROR_ID')
fieldnames = list(rows[0].keys())

# Write updated data back to a new CSV file
with open(org_ror_csv, mode="w", encoding="utf-8", newline="") as file:
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows) # Write modified rows

print(f"Updated CSV saved as {org_ror_csv}")