From c9122b00f211a12de7e70067134cc8b66596c87e Mon Sep 17 00:00:00 2001 From: Colin Davis Date: Wed, 13 Aug 2025 23:46:36 -0500 Subject: [PATCH 1/4] Support loading Variable and Sample metadata from parquet file metadata if it is present. --- src/conventions.rs | 222 ++++++++++++++- src/lib.rs | 1 + src/parquet_metadata.rs | 613 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 831 insertions(+), 5 deletions(-) create mode 100644 src/parquet_metadata.rs diff --git a/src/conventions.rs b/src/conventions.rs index 000bb625..ef0036ca 100644 --- a/src/conventions.rs +++ b/src/conventions.rs @@ -33,6 +33,7 @@ use crate::ipums_data_model::*; use crate::ipums_metadata_model::*; use crate::layout; use crate::mderror::{metadata_error, MdError}; +use crate::parquet_metadata::ParquetMetadataReader; use crate::request::InputType; use std::collections::HashMap; @@ -129,8 +130,98 @@ impl MicroDataCollection { /// The path like `../output_data/current/parquet/us2019a/` /// Reading the schema will give approximately the same metadata information /// as reading the fixed-width layout file for the same dataset. - pub fn load_metadata_from_parquet(&mut self, _parquet_dataset_path: &Path) { - todo!("implement"); + /// Additionally, if the parquet files contain IPUMS metadata in their key-value + /// metadata, this will load variable labels, categories, and dataset information. + pub fn load_metadata_from_parquet( + &mut self, + parquet_dataset_path: &Path, + ) -> Result<(), MdError> { + if !parquet_dataset_path.exists() { + return Err(metadata_error!( + "Parquet dataset path does not exist: {}", + parquet_dataset_path.display() + )); + } + + // Extract dataset name from path + let dataset_name = parquet_dataset_path + .file_name() + .and_then(|n| n.to_str()) + .ok_or_else(|| { + metadata_error!( + "Could not extract dataset name from path: {}", + parquet_dataset_path.display() + ) + })?; + + // Collect record types and filenames first to avoid borrow issues + let record_types: Vec<(String, String)> = self.record_types + .keys() + .map(|k| { + let base_filename = self.base_filename_for_dataset_and_rectype(dataset_name, k); + (k.clone(), base_filename) + }) + .collect(); + + // Initialize metadata if not already present + if self.metadata.is_none() { + self.metadata = Some(MetadataEntities::new()); + } + + // Now work with the metadata + let md = self.metadata.as_mut().unwrap(); + + for (rectype_abbrev, base_filename) in record_types { + let parquet_file = parquet_dataset_path.join(format!("{}.parquet", base_filename)); + + if parquet_file.exists() { + // Check if the file has IPUMS metadata + if ParquetMetadataReader::has_ipums_metadata(&parquet_file) { + // Load metadata from the parquet file + let (variables, datasets) = ParquetMetadataReader::load_metadata_from_file( + &parquet_file, + &rectype_abbrev, + )?; + + // Add variables to metadata + for var in variables { + // Create or get the dataset + let dataset = datasets + .iter() + .find(|d| d.name == dataset_name) + .cloned() + .unwrap_or_else(|| IpumsDataset::from((dataset_name.to_string(), 0))); + + md.add_dataset_variable(dataset, var); + } + } else { + // Fall back to just schema information + let schema_info = ParquetMetadataReader::get_schema_info(&parquet_file)?; + + // Create a dataset if we don't have one + let dataset = IpumsDataset::from((dataset_name.to_string(), 0)); + + // Add each field as a variable with minimal metadata + for (field_name, (data_type_str, _nullable)) in schema_info { + let ipums_var = IpumsVariable { + name: field_name, + data_type: Some(IpumsDataType::from(data_type_str.as_str())), + label: None, + record_type: rectype_abbrev.clone(), + categories: None, + formatting: None, + general_width: None, + description: None, + category_bins: None, + id: 0, + }; + md.add_dataset_variable(dataset.clone(), ipums_var); + } + } + } + } + + Ok(()) } /// Using the data_root, scan the layouts and load metadata from them. @@ -162,9 +253,53 @@ impl MicroDataCollection { /// Takes a path like ../output_data/current/parquet/, which could be derived /// automatically from defaults based on data root or product root. Scans all - /// parquet schema information. - pub fn load_metadata_from_all_parquet(&mut self, _parquet_path: &Path) { - todo!("implement"); + /// parquet schema information and embedded metadata. + pub fn load_metadata_from_all_parquet( + &mut self, + parquet_path: &Path, + ) -> Result<(), MdError> { + if !parquet_path.exists() { + return Err(metadata_error!( + "Parquet path does not exist: {}", + parquet_path.display() + )); + } + + // Read all subdirectories in the parquet path + let entries = std::fs::read_dir(parquet_path).map_err(|e| { + metadata_error!( + "Failed to read parquet directory {}: {}", + parquet_path.display(), + e + ) + })?; + + let mut loaded_count = 0; + let mut errors = Vec::new(); + + for entry in entries { + let entry = entry.map_err(|e| { + metadata_error!("Failed to read directory entry: {}", e) + })?; + + let path = entry.path(); + if path.is_dir() { + // Try to load metadata from this dataset directory + match self.load_metadata_from_parquet(&path) { + Ok(()) => loaded_count += 1, + Err(e) => errors.push(format!("{}: {}", path.display(), e)), + } + } + } + + if loaded_count == 0 && !errors.is_empty() { + return Err(metadata_error!( + "Failed to load metadata from any parquet datasets. Errors: {}", + errors.join("; ") + )); + } + + Ok(()) } /// Load everything available for the selected variables and samples from the available @@ -500,6 +635,46 @@ impl Context { } } + /// Load metadata for datasets from parquet files + /// This will extract metadata from the parquet files' key-value metadata if available, + /// or fall back to schema information. + pub fn load_metadata_for_datasets_from_parquet( + &mut self, + datasets: &[&str], + ) -> Result<(), MdError> { + if let Some(ref data_root) = self.data_root { + let parquet_path = data_root.join("parquet"); + if !parquet_path.exists() { + return Err(metadata_error!( + "Parquet directory does not exist at: {}", + parquet_path.display() + )); + } + + for dataset in datasets { + let dataset_path = parquet_path.join(dataset); + self.settings.load_metadata_from_parquet(&dataset_path)?; + } + Ok(()) + } else { + Err(metadata_error!( + "Cannot load parquet metadata without a data_root" + )) + } + } + + /// Load all available metadata from parquet files in the data root + pub fn load_all_metadata_from_parquet(&mut self) -> Result<(), MdError> { + if let Some(ref data_root) = self.data_root { + let parquet_path = data_root.join("parquet"); + self.settings.load_metadata_from_all_parquet(&parquet_path) + } else { + Err(metadata_error!( + "Cannot load parquet metadata without a data_root" + )) + } + } + /// The context should be set to read from layouts or full metadata pub fn load_metadata_for_datasets_and_variables( &mut self, @@ -642,4 +817,41 @@ mod test { let result = collection.default_table_name("us2021a", "Z"); assert!(result.is_err(), "expected an error but got {result:?}"); } + + #[test] + fn test_load_metadata_from_parquet() { + let data_root = Some(String::from("tests/data_root")); + let mut usa_ctx = Context::from_ipums_collection_name("usa", None, data_root) + .expect("should be able to create USA context"); + + // Try to load metadata from parquet if test data exists + let parquet_path = PathBuf::from("tests/data_root/parquet/us2015b"); + if parquet_path.exists() { + let result = usa_ctx.settings.load_metadata_from_parquet(&parquet_path); + // We don't assert success here since test data may not have metadata + // but the function should not panic + if result.is_ok() { + assert!(usa_ctx.settings.metadata.is_some()); + } + } + } + + #[test] + fn test_context_load_metadata_from_parquet() { + let data_root = Some(String::from("tests/data_root")); + let mut usa_ctx = Context::from_ipums_collection_name("usa", None, data_root) + .expect("should be able to create USA context"); + + // Try to load metadata for a specific dataset + let parquet_path = PathBuf::from("tests/data_root/parquet"); + if parquet_path.exists() { + // This should work even if the parquet files don't have embedded metadata + // It will fall back to schema information + let result = usa_ctx.load_metadata_for_datasets_from_parquet(&["us2015b"]); + // Don't fail the test if the directory doesn't exist + if !result.is_err() || result.unwrap_err().to_string().contains("does not exist") { + // Expected behavior - either succeeds or fails with expected error + } + } + } } diff --git a/src/lib.rs b/src/lib.rs index 0e81b48b..626f7744 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,6 +48,7 @@ pub mod ipums_data_model; pub mod ipums_metadata_model; pub mod layout; pub mod mderror; +pub mod parquet_metadata; pub mod query_gen; pub mod request; pub mod tabulate; diff --git a/src/parquet_metadata.rs b/src/parquet_metadata.rs new file mode 100644 index 00000000..c28564c1 --- /dev/null +++ b/src/parquet_metadata.rs @@ -0,0 +1,613 @@ +//! Module for loading IPUMS metadata from Parquet files. +//! +//! This module provides functionality to extract metadata embedded in Parquet files' +//! key-value metadata. IPUMS Parquet files can contain JSON-encoded metadata about +//! variables, samples, and data structure. + +use crate::ipums_metadata_model::{ + IpumsCategory, IpumsDataType, IpumsDataset, IpumsValue, IpumsVariable, UniversalCategoryType, +}; +use crate::mderror::{metadata_error, MdError}; +use parquet::file::reader::{FileReader, SerializedFileReader}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::fs::File; +use std::path::Path; + +/// Variable metadata as stored in Parquet key-value metadata +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct ParquetVariableMetadata { + pub label: String, + #[serde(default, deserialize_with = "deserialize_categories")] + pub categories: HashMap, + #[serde(default)] + pub data_type: String, + #[serde(default)] + pub column_start: Option, + #[serde(default)] + pub column_width: Option, + #[serde(default)] + pub general_width: Option, + #[serde(default)] + pub record_type: Option, + #[serde(default)] + pub is_allocated: bool, + #[serde(default)] + pub is_internal: bool, + #[serde(default)] + pub is_restricted: bool, + #[serde(default)] + pub is_source_variable: bool, + #[serde(default)] + pub has_editing_rules: bool, + #[serde(default)] + pub has_no_input: bool, + #[serde(default)] + pub has_source_variables_as_input: bool, + #[serde(default)] + pub hide_status: i32, + #[serde(default)] + pub monetary: String, + #[serde(default)] + pub quality_flag: String, + #[serde(default)] + pub recoding_type: i32, + #[serde(default)] + pub restrictions_apply: bool, + #[serde(default)] + pub sort_order: i32, + #[serde(default)] + pub source_for: String, + #[serde(default)] + pub source_variables: Vec, + #[serde(default)] + pub tabulation_type: i32, +} + +fn deserialize_categories<'de, D>(deserializer: D) -> Result, D::Error> +where + D: serde::Deserializer<'de>, +{ + let opt: Option> = Option::deserialize(deserializer)?; + Ok(opt.unwrap_or_default()) +} + +/// Raw metadata extracted from Parquet files +#[derive(Debug)] +pub struct RawParquetMetadata { + pub variables: String, + pub samples: String, + pub version: String, +} + +impl Default for RawParquetMetadata { + fn default() -> Self { + Self { + variables: String::new(), + samples: String::new(), + version: String::new(), + } + } +} + +/// Main struct for extracting metadata from Parquet files +pub struct ParquetMetadataReader; + +impl ParquetMetadataReader { + /// Convert a HashMap of category codes to labels into a Vec of IpumsCategory objects + fn convert_categories( + categories_map: &HashMap, + data_type: &str, + ) -> Vec { + let mut categories = Vec::new(); + + for (code_str, label) in categories_map { + // Parse the code value based on the variable's data type + let value = match data_type.to_lowercase().as_str() { + "integer" | "fixed" => { + // Try to parse as integer + if let Ok(code) = code_str.parse::() { + IpumsValue::Integer(code) + } else { + // If it fails, store as string + IpumsValue::String { + utf8: true, + value: code_str.as_bytes().to_vec(), + } + } + } + "double" | "float" => IpumsValue::Float(code_str.clone()), + _ => IpumsValue::String { + utf8: true, + value: code_str.as_bytes().to_vec(), + }, + }; + + // Determine the category meaning based on common IPUMS conventions + let meaning = Self::determine_category_meaning(code_str, label); + + categories.push(IpumsCategory::new(label, meaning, value)); + } + + // Sort categories by their code for consistent ordering + categories.sort_by(|a, b| { + match (&a.value, &b.value) { + (IpumsValue::Integer(a_val), IpumsValue::Integer(b_val)) => a_val.cmp(b_val), + _ => std::cmp::Ordering::Equal, + } + }); + + categories + } + + /// Determine the UniversalCategoryType based on code and label patterns + fn determine_category_meaning(code: &str, label: &str) -> UniversalCategoryType { + let label_lower = label.to_lowercase(); + + // Check for common patterns in IPUMS data + if label_lower.contains("n/a") || label_lower.contains("not applicable") { + UniversalCategoryType::NotApplicable + } else if label_lower.contains("missing") || label_lower.contains("unknown") + || label_lower.contains("illegible") || code == "999" || code == "9999" + || code == "99999" || code == "998" || code == "9998" { + UniversalCategoryType::Missing + } else if label_lower.contains("not in universe") || label_lower.contains("niu") { + UniversalCategoryType::NotInUniverse + } else if label_lower.contains("topcode") || label_lower.contains("top code") { + UniversalCategoryType::TopCode + } else if label_lower.contains("bottomcode") || label_lower.contains("bottom code") { + UniversalCategoryType::BottomCode + } else { + UniversalCategoryType::Value + } + } + + /// Extract raw IPUMS metadata from a parquet file's key-value metadata + pub fn extract_raw_metadata(file_path: &Path) -> Result { + let file = File::open(file_path).map_err(|e| { + metadata_error!( + "Failed to open parquet file at {}: {}", + file_path.display(), + e + ) + })?; + + let reader = SerializedFileReader::new(file).map_err(|e| { + metadata_error!( + "Failed to create parquet reader for {}: {}", + file_path.display(), + e + ) + })?; + + let mut metadata = RawParquetMetadata::default(); + + if let Some(kv_metadata) = reader.metadata().file_metadata().key_value_metadata() { + for kv in kv_metadata { + match kv.key.as_str() { + "variables" => { + if let Some(ref value) = kv.value { + metadata.variables = value.clone(); + } + } + "samples" => { + if let Some(ref value) = kv.value { + metadata.samples = value.clone(); + } + } + "version" => { + if let Some(ref value) = kv.value { + metadata.version = value.clone(); + } + } + _ => {} + } + } + + if metadata.variables.is_empty() && metadata.samples.is_empty() { + Err(metadata_error!( + "No IPUMS metadata found in parquet file at {}", + file_path.display() + )) + } else { + Ok(metadata) + } + } else { + Err(metadata_error!( + "No key-value metadata found in parquet file at {}", + file_path.display() + )) + } + } + + /// Parse variable metadata from JSON string + pub fn parse_variable_metadata( + json_str: &str, + record_type: &str, + ) -> Result, MdError> { + let variables_map: HashMap = + serde_json::from_str(json_str).map_err(|e| { + metadata_error!("Failed to parse variables JSON: {}", e) + })?; + + let mut variables = Vec::new(); + let mut failed_vars = Vec::new(); + + for (var_name, var_value) in variables_map { + match serde_json::from_value::(var_value.clone()) { + Ok(metadata) => { + // Convert categories if present and not empty + let categories = if !metadata.categories.is_empty() { + Some(Self::convert_categories(&metadata.categories, &metadata.data_type)) + } else { + None + }; + + let ipums_var = IpumsVariable { + name: var_name.clone(), + data_type: Some(IpumsDataType::from(metadata.data_type.as_str())), + label: Some(metadata.label), + record_type: metadata + .record_type + .unwrap_or_else(|| record_type.to_string()), + categories, + formatting: metadata + .column_start + .and_then(|start| metadata.column_width.map(|width| (start, width))), + general_width: metadata.general_width.or(metadata.column_width), + description: None, // Could be populated from additional metadata if available + category_bins: None, + id: 0, // Will be assigned when added to MetadataEntities + }; + variables.push(ipums_var); + } + Err(e) => { + failed_vars.push(var_name.clone()); + eprintln!( + "Warning: Failed to deserialize metadata for variable '{}': {}", + var_name, e + ); + } + } + } + + if !failed_vars.is_empty() { + eprintln!( + "Warning: Failed to parse {} variables: {:?}", + failed_vars.len(), + failed_vars + ); + } + + if variables.is_empty() { + Err(metadata_error!( + "No valid variables could be parsed from metadata" + )) + } else { + Ok(variables) + } + } + + /// Parse dataset/sample metadata from JSON string + pub fn parse_samples_metadata(json_str: &str) -> Result, MdError> { + let samples_map: HashMap = + serde_json::from_str(json_str).map_err(|e| { + metadata_error!("Failed to parse samples JSON: {}", e) + })?; + + let mut datasets = Vec::new(); + + for (sample_name, sample_value) in samples_map { + // Extract what we can from the JSON value + let label = sample_value + .get("label") + .and_then(|v| v.as_str()) + .map(String::from); + + let year = sample_value + .get("year") + .and_then(|v| v.as_u64()) + .map(|v| v as usize); + + let month = sample_value + .get("month") + .and_then(|v| v.as_u64()) + .map(|v| v as usize); + + let sampling_density = sample_value + .get("density") + .and_then(|v| v.as_f64()) + .or_else(|| { + sample_value + .get("sampling_density") + .and_then(|v| v.as_f64()) + }); + + let dataset = IpumsDataset { + name: sample_name, + year, + month, + label, + sampling_density, + id: 0, // Will be assigned when added to MetadataEntities + }; + + datasets.push(dataset); + } + + if datasets.is_empty() { + Err(metadata_error!( + "No valid datasets could be parsed from metadata" + )) + } else { + Ok(datasets) + } + } + + /// Extract schema information from a parquet file + pub fn get_schema_info(file_path: &Path) -> Result, MdError> { + let file = File::open(file_path).map_err(|e| { + metadata_error!( + "Failed to open parquet file at {}: {}", + file_path.display(), + e + ) + })?; + + let reader = SerializedFileReader::new(file).map_err(|e| { + metadata_error!( + "Failed to create parquet reader for {}: {}", + file_path.display(), + e + ) + })?; + + let schema = reader.metadata().file_metadata().schema(); + let mut schema_info = HashMap::new(); + + for field in schema.get_fields() { + let name = field.name().to_string(); + let type_str = format!("{:?}", field.get_physical_type()); + let nullable = field.is_optional(); + schema_info.insert(name, (type_str, nullable)); + } + + Ok(schema_info) + } + + /// Load all metadata from a parquet file (variables and samples) + pub fn load_metadata_from_file( + file_path: &Path, + record_type: &str, + ) -> Result<(Vec, Vec), MdError> { + let raw_metadata = Self::extract_raw_metadata(file_path)?; + + let variables = if !raw_metadata.variables.is_empty() { + Self::parse_variable_metadata(&raw_metadata.variables, record_type)? + } else { + Vec::new() + }; + + let datasets = if !raw_metadata.samples.is_empty() { + Self::parse_samples_metadata(&raw_metadata.samples)? + } else { + Vec::new() + }; + + Ok((variables, datasets)) + } + + /// Check if a parquet file contains IPUMS metadata + pub fn has_ipums_metadata(file_path: &Path) -> bool { + if let Ok(file) = File::open(file_path) { + if let Ok(reader) = SerializedFileReader::new(file) { + if let Some(kv_metadata) = reader.metadata().file_metadata().key_value_metadata() { + return kv_metadata + .iter() + .any(|kv| kv.key == "variables" || kv.key == "samples"); + } + } + } + false + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_variable_metadata_simple() { + let json_str = r#"{ + "AGE": { + "label": "Age", + "data_type": "integer", + "categories": { + "0": "Less than 1 year", + "1": "1 year old" + } + }, + "SEX": { + "label": "Sex", + "data_type": "integer", + "categories": { + "1": "Male", + "2": "Female" + } + } + }"#; + + let variables = ParquetMetadataReader::parse_variable_metadata(json_str, "P").unwrap(); + assert_eq!(variables.len(), 2); + + let age_var = variables.iter().find(|v| v.name == "AGE").unwrap(); + assert_eq!(age_var.label.as_ref().unwrap(), "Age"); + assert_eq!(age_var.record_type, "P"); + } + + #[test] + fn test_parse_variable_metadata_with_all_fields() { + let json_str = r#"{ + "AGE": { + "label": "Age", + "data_type": "integer", + "column_start": 58, + "column_width": 3, + "general_width": 3, + "record_type": "P", + "is_allocated": false, + "is_internal": false, + "is_restricted": false, + "is_source_variable": false, + "has_editing_rules": false, + "has_no_input": false, + "has_source_variables_as_input": true, + "hide_status": 0, + "monetary": "", + "quality_flag": "null", + "recoding_type": 1, + "restrictions_apply": false, + "sort_order": 651, + "source_for": "null", + "source_variables": ["US1900J_1000"], + "tabulation_type": 1, + "categories": { + "0": "Less than 1 year old", + "1": "1 year old" + } + } + }"#; + + let variables = ParquetMetadataReader::parse_variable_metadata(json_str, "P").unwrap(); + assert_eq!(variables.len(), 1); + + let age_var = &variables[0]; + assert_eq!(age_var.name, "AGE"); + assert_eq!(age_var.label.as_ref().unwrap(), "Age"); + assert_eq!(age_var.record_type, "P"); + assert_eq!(age_var.general_width, Some(3)); + assert_eq!(age_var.formatting, Some((58, 3))); + } + + #[test] + fn test_convert_categories() { + let mut categories_map = HashMap::new(); + categories_map.insert("0".to_string(), "Less than 1 year old".to_string()); + categories_map.insert("1".to_string(), "1 year old".to_string()); + categories_map.insert("999".to_string(), "Missing".to_string()); + + let categories = ParquetMetadataReader::convert_categories(&categories_map, "integer"); + + assert_eq!(categories.len(), 3); + + // Check first category (should be sorted by code) + assert_eq!(categories[0].label(), "Less than 1 year old"); + assert_eq!(categories[0].value, IpumsValue::Integer(0)); + matches!(categories[0].meaning, UniversalCategoryType::Value); + + // Check last category (missing value) + assert_eq!(categories[2].label(), "Missing"); + assert_eq!(categories[2].value, IpumsValue::Integer(999)); + matches!(categories[2].meaning, UniversalCategoryType::Missing); + } + + #[test] + fn test_determine_category_meaning() { + use super::UniversalCategoryType; + + // Test Missing patterns + assert!(matches!( + ParquetMetadataReader::determine_category_meaning("999", "Missing"), + UniversalCategoryType::Missing + )); + assert!(matches!( + ParquetMetadataReader::determine_category_meaning("998", "Unknown/illegible"), + UniversalCategoryType::Missing + )); + + // Test NotApplicable patterns + assert!(matches!( + ParquetMetadataReader::determine_category_meaning("99", "N/A or blank"), + UniversalCategoryType::NotApplicable + )); + + // Test NotInUniverse patterns + assert!(matches!( + ParquetMetadataReader::determine_category_meaning("0", "Not in universe"), + UniversalCategoryType::NotInUniverse + )); + assert!(matches!( + ParquetMetadataReader::determine_category_meaning("0", "NIU"), + UniversalCategoryType::NotInUniverse + )); + + // Test regular value + assert!(matches!( + ParquetMetadataReader::determine_category_meaning("1", "Male"), + UniversalCategoryType::Value + )); + } + + #[test] + fn test_parse_variable_metadata_with_categories() { + let json_str = r#"{ + "SEX": { + "label": "Sex", + "data_type": "integer", + "record_type": "P", + "categories": { + "1": "Male", + "2": "Female", + "9": "Missing" + } + } + }"#; + + let variables = ParquetMetadataReader::parse_variable_metadata(json_str, "P").unwrap(); + assert_eq!(variables.len(), 1); + + let sex_var = &variables[0]; + assert_eq!(sex_var.name, "SEX"); + assert!(sex_var.categories.is_some()); + + let categories = sex_var.categories.as_ref().unwrap(); + assert_eq!(categories.len(), 3); + + // Verify categories are properly converted + assert_eq!(categories[0].label(), "Male"); + assert_eq!(categories[0].value, IpumsValue::Integer(1)); + + assert_eq!(categories[1].label(), "Female"); + assert_eq!(categories[1].value, IpumsValue::Integer(2)); + + assert_eq!(categories[2].label(), "Missing"); + assert_eq!(categories[2].value, IpumsValue::Integer(9)); + matches!(categories[2].meaning, UniversalCategoryType::Missing); + } + + #[test] + fn test_parse_samples_metadata() { + let json_str = r#"{ + "us2019a": { + "label": "2019 American Community Survey", + "year": 2019, + "sampling_density": 0.01 + }, + "us2020a": { + "label": "2020 American Community Survey", + "year": 2020, + "sampling_density": 0.01 + } + }"#; + + let datasets = ParquetMetadataReader::parse_samples_metadata(json_str).unwrap(); + assert_eq!(datasets.len(), 2); + + let us2019 = datasets.iter().find(|d| d.name == "us2019a").unwrap(); + assert_eq!( + us2019.label.as_ref().unwrap(), + "2019 American Community Survey" + ); + assert_eq!(us2019.year.unwrap(), 2019); + } +} \ No newline at end of file From 6019aeeb266de574f010117dd1e99f6752fdaf46 Mon Sep 17 00:00:00 2001 From: Colin Davis Date: Thu, 14 Aug 2025 12:31:18 -0500 Subject: [PATCH 2/4] Clippy warnings addressed. --- src/conventions.rs | 2 +- src/parquet_metadata.rs | 12 +----------- 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/src/conventions.rs b/src/conventions.rs index 75fc887b..255d37e1 100644 --- a/src/conventions.rs +++ b/src/conventions.rs @@ -863,7 +863,7 @@ mod test { // It will fall back to schema information let result = usa_ctx.load_metadata_for_datasets_from_parquet(&["us2015b"]); // Don't fail the test if the directory doesn't exist - if !result.is_err() || result.unwrap_err().to_string().contains("does not exist") { + if result.is_ok() || result.unwrap_err().to_string().contains("does not exist") { // Expected behavior - either succeeds or fails with expected error } } diff --git a/src/parquet_metadata.rs b/src/parquet_metadata.rs index c28564c1..c9e86582 100644 --- a/src/parquet_metadata.rs +++ b/src/parquet_metadata.rs @@ -73,23 +73,13 @@ where } /// Raw metadata extracted from Parquet files -#[derive(Debug)] +#[derive(Debug, Default)] pub struct RawParquetMetadata { pub variables: String, pub samples: String, pub version: String, } -impl Default for RawParquetMetadata { - fn default() -> Self { - Self { - variables: String::new(), - samples: String::new(), - version: String::new(), - } - } -} - /// Main struct for extracting metadata from Parquet files pub struct ParquetMetadataReader; From d5d671d968e1041e36ef76fe74e85053b72764de Mon Sep 17 00:00:00 2001 From: Colin Davis Date: Fri, 22 Aug 2025 18:28:57 -0500 Subject: [PATCH 3/4] Improve code quality and return errors when variable category codes don't match their data type. --- src/parquet_metadata.rs | 189 ++++++++++++++++++++++++++++------------ 1 file changed, 134 insertions(+), 55 deletions(-) diff --git a/src/parquet_metadata.rs b/src/parquet_metadata.rs index c9e86582..4a01136c 100644 --- a/src/parquet_metadata.rs +++ b/src/parquet_metadata.rs @@ -88,25 +88,35 @@ impl ParquetMetadataReader { fn convert_categories( categories_map: &HashMap, data_type: &str, - ) -> Vec { - let mut categories = Vec::new(); + variable_name: &str, + ) -> Result, MdError> { + let mut categories: Vec = Vec::new(); for (code_str, label) in categories_map { // Parse the code value based on the variable's data type let value = match data_type.to_lowercase().as_str() { "integer" | "fixed" => { - // Try to parse as integer - if let Ok(code) = code_str.parse::() { - IpumsValue::Integer(code) - } else { - // If it fails, store as string - IpumsValue::String { - utf8: true, - value: code_str.as_bytes().to_vec(), - } - } - } - "double" | "float" => IpumsValue::Float(code_str.clone()), + code_str + .parse::() + .map(IpumsValue::Integer) + .map_err(|_| { + metadata_error!( + "Variable '{}' has type '{}' but category code '{}' is not a valid integer", + variable_name, data_type, code_str + ) + })? + }, + "double" | "float" => { + // For float types, validate that the string is a valid number + code_str.parse::() + .map_err(|_| { + metadata_error!( + "Variable '{}' has type '{}' but category code '{}' is not a valid number", + variable_name, data_type, code_str + ) + })?; + IpumsValue::Float(code_str.clone()) + }, _ => IpumsValue::String { utf8: true, value: code_str.as_bytes().to_vec(), @@ -120,14 +130,12 @@ impl ParquetMetadataReader { } // Sort categories by their code for consistent ordering - categories.sort_by(|a, b| { - match (&a.value, &b.value) { - (IpumsValue::Integer(a_val), IpumsValue::Integer(b_val)) => a_val.cmp(b_val), - _ => std::cmp::Ordering::Equal, - } + categories.sort_by(|a, b| match (&a.value, &b.value) { + (IpumsValue::Integer(a_val), IpumsValue::Integer(b_val)) => a_val.cmp(b_val), + _ => std::cmp::Ordering::Equal, }); - categories + Ok(categories) } /// Determine the UniversalCategoryType based on code and label patterns @@ -137,9 +145,11 @@ impl ParquetMetadataReader { // Check for common patterns in IPUMS data if label_lower.contains("n/a") || label_lower.contains("not applicable") { UniversalCategoryType::NotApplicable - } else if label_lower.contains("missing") || label_lower.contains("unknown") - || label_lower.contains("illegible") || code == "999" || code == "9999" - || code == "99999" || code == "998" || code == "9998" { + } else if label_lower.contains("missing") + || label_lower.contains("unknown") + || label_lower.contains("illegible") + || matches!(code, "999" | "9999" | "99999" | "998" | "9998") + { UniversalCategoryType::Missing } else if label_lower.contains("not in universe") || label_lower.contains("niu") { UniversalCategoryType::NotInUniverse @@ -174,23 +184,13 @@ impl ParquetMetadataReader { if let Some(kv_metadata) = reader.metadata().file_metadata().key_value_metadata() { for kv in kv_metadata { - match kv.key.as_str() { - "variables" => { - if let Some(ref value) = kv.value { - metadata.variables = value.clone(); - } - } - "samples" => { - if let Some(ref value) = kv.value { - metadata.samples = value.clone(); - } - } - "version" => { - if let Some(ref value) = kv.value { - metadata.version = value.clone(); - } + if let Some(ref value) = kv.value { + match kv.key.as_str() { + "variables" => metadata.variables = value.clone(), + "samples" => metadata.samples = value.clone(), + "version" => metadata.version = value.clone(), + _ => {} } - _ => {} } } @@ -228,7 +228,14 @@ impl ParquetMetadataReader { Ok(metadata) => { // Convert categories if present and not empty let categories = if !metadata.categories.is_empty() { - Some(Self::convert_categories(&metadata.categories, &metadata.data_type)) + match Self::convert_categories(&metadata.categories, &metadata.data_type, &var_name) { + Ok(cats) => Some(cats), + Err(e) => { + failed_vars.push(var_name.clone()); + eprintln!("Warning: Failed to parse categories for variable '{}': {}", var_name, e); + continue; + } + } } else { None }; @@ -253,6 +260,7 @@ impl ParquetMetadataReader { } Err(e) => { failed_vars.push(var_name.clone()); + // Consider using a logger instead of eprintln! in production eprintln!( "Warning: Failed to deserialize metadata for variable '{}': {}", var_name, e @@ -262,6 +270,7 @@ impl ParquetMetadataReader { } if !failed_vars.is_empty() { + // Consider using a logger instead of eprintln! in production eprintln!( "Warning: Failed to parse {} variables: {:?}", failed_vars.len(), @@ -307,11 +316,7 @@ impl ParquetMetadataReader { let sampling_density = sample_value .get("density") .and_then(|v| v.as_f64()) - .or_else(|| { - sample_value - .get("sampling_density") - .and_then(|v| v.as_f64()) - }); + .or_else(|| sample_value.get("sampling_density").and_then(|v| v.as_f64())); let dataset = IpumsDataset { name: sample_name, @@ -394,7 +399,7 @@ impl ParquetMetadataReader { if let Some(kv_metadata) = reader.metadata().file_metadata().key_value_metadata() { return kv_metadata .iter() - .any(|kv| kv.key == "variables" || kv.key == "samples"); + .any(|kv| matches!(kv.key.as_str(), "variables" | "samples")); } } } @@ -486,19 +491,20 @@ mod tests { categories_map.insert("1".to_string(), "1 year old".to_string()); categories_map.insert("999".to_string(), "Missing".to_string()); - let categories = ParquetMetadataReader::convert_categories(&categories_map, "integer"); + let categories = ParquetMetadataReader::convert_categories(&categories_map, "integer", "AGE") + .expect("Should convert valid integer categories"); assert_eq!(categories.len(), 3); // Check first category (should be sorted by code) assert_eq!(categories[0].label(), "Less than 1 year old"); assert_eq!(categories[0].value, IpumsValue::Integer(0)); - matches!(categories[0].meaning, UniversalCategoryType::Value); + assert!(matches!(categories[0].meaning, UniversalCategoryType::Value)); // Check last category (missing value) assert_eq!(categories[2].label(), "Missing"); assert_eq!(categories[2].value, IpumsValue::Integer(999)); - matches!(categories[2].meaning, UniversalCategoryType::Missing); + assert!(matches!(categories[2].meaning, UniversalCategoryType::Missing)); } #[test] @@ -553,7 +559,8 @@ mod tests { } }"#; - let variables = ParquetMetadataReader::parse_variable_metadata(json_str, "P").unwrap(); + let variables = ParquetMetadataReader::parse_variable_metadata(json_str, "P") + .expect("Should parse valid JSON with categories"); assert_eq!(variables.len(), 1); let sex_var = &variables[0]; @@ -590,14 +597,86 @@ mod tests { } }"#; - let datasets = ParquetMetadataReader::parse_samples_metadata(json_str).unwrap(); + let datasets = ParquetMetadataReader::parse_samples_metadata(json_str) + .expect("Should parse valid samples metadata"); assert_eq!(datasets.len(), 2); - let us2019 = datasets.iter().find(|d| d.name == "us2019a").unwrap(); + let us2019 = datasets + .iter() + .find(|d| d.name == "us2019a") + .expect("us2019a dataset should exist"); assert_eq!( - us2019.label.as_ref().unwrap(), - "2019 American Community Survey" + us2019.label.as_deref(), + Some("2019 American Community Survey") ); - assert_eq!(us2019.year.unwrap(), 2019); + assert_eq!(us2019.year, Some(2019)); + assert_eq!(us2019.sampling_density, Some(0.01)); + } + + #[test] + fn test_parse_invalid_json() { + let invalid_json = "not valid json"; + let result = ParquetMetadataReader::parse_variable_metadata(invalid_json, "P"); + assert!(result.is_err(), "Should fail on invalid JSON"); + } + + #[test] + fn test_parse_empty_variables() { + let empty_json = "{}"; + let result = ParquetMetadataReader::parse_variable_metadata(empty_json, "P"); + assert!(result.is_err(), "Should fail when no variables are present"); + } + + #[test] + fn test_parse_empty_samples() { + let empty_json = "{}"; + let result = ParquetMetadataReader::parse_samples_metadata(empty_json); + assert!(result.is_err(), "Should fail when no samples are present"); + } + + #[test] + fn test_category_with_non_integer_code_fails() { + let mut categories_map = HashMap::new(); + categories_map.insert("A".to_string(), "Category A".to_string()); + categories_map.insert("B".to_string(), "Category B".to_string()); + + let result = ParquetMetadataReader::convert_categories(&categories_map, "integer", "TEST_VAR"); + + // Non-integer codes for integer type should cause an error + assert!(result.is_err(), "Should fail when category codes don't match data type"); + let err = result.unwrap_err(); + assert!(err.to_string().contains("not a valid integer")); + } + + #[test] + fn test_category_with_invalid_float_code_fails() { + let mut categories_map = HashMap::new(); + categories_map.insert("1.5".to_string(), "Valid float".to_string()); + categories_map.insert("not_a_number".to_string(), "Invalid float".to_string()); + + let result = ParquetMetadataReader::convert_categories(&categories_map, "float", "TEST_VAR"); + + // Invalid float codes should cause an error + assert!(result.is_err(), "Should fail when float category codes are invalid"); + let err = result.unwrap_err(); + assert!(err.to_string().contains("not a valid number")); + } + + #[test] + fn test_category_with_string_type_accepts_any() { + let mut categories_map = HashMap::new(); + categories_map.insert("A".to_string(), "Category A".to_string()); + categories_map.insert("123".to_string(), "Category 123".to_string()); + categories_map.insert("!@#".to_string(), "Special chars".to_string()); + + let result = ParquetMetadataReader::convert_categories(&categories_map, "string", "TEST_VAR"); + + // String type should accept any category code + assert!(result.is_ok(), "String type should accept any category code"); + let categories = result.unwrap(); + assert_eq!(categories.len(), 3); + for category in &categories { + assert!(matches!(category.value, IpumsValue::String { .. })); + } } } \ No newline at end of file From 35bb92c9378469440db51755b9ff032f86538a42 Mon Sep 17 00:00:00 2001 From: Colin Davis Date: Mon, 29 Dec 2025 15:45:08 -0600 Subject: [PATCH 4/4] Fix parquet metadata parsing issues and bump version to 0.4.0 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix Parquet physical type to IPUMS type mapping (INT32/INT64 -> integer, FLOAT/DOUBLE -> double, BYTE_ARRAY -> string) - Return errors on variable parse failures instead of silently skipping - Fix category sorting for Float and String types - Optimize dataset lookup by hoisting it outside variable loop - Add missing assert!() in test and improve test clarity - Add test for parquet_type_to_ipums_type conversion 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/conventions.rs | 35 ++++++---- src/parquet_metadata.rs | 144 ++++++++++++++++++++++++---------------- 4 files changed, 109 insertions(+), 74 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 22d891b2..bfbc42d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -537,7 +537,7 @@ dependencies = [ [[package]] name = "cimdea" -version = "0.3.2" +version = "0.4.0" dependencies = [ "ascii", "assert_cmd", diff --git a/Cargo.toml b/Cargo.toml index 335fce7e..782186a8 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cimdea" -version = "0.3.2" +version = "0.4.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/src/conventions.rs b/src/conventions.rs index 255d37e1..423d6319 100644 --- a/src/conventions.rs +++ b/src/conventions.rs @@ -183,24 +183,24 @@ impl MicroDataCollection { &rectype_abbrev, )?; + // Find the dataset once before the variable loop + let dataset = datasets + .iter() + .find(|d| d.name == dataset_name) + .cloned() + .unwrap_or_else(|| IpumsDataset::from((dataset_name.to_string(), 0))); + // Add variables to metadata for var in variables { - // Create or get the dataset - let dataset = datasets - .iter() - .find(|d| d.name == dataset_name) - .cloned() - .unwrap_or_else(|| IpumsDataset::from((dataset_name.to_string(), 0))); - - md.add_dataset_variable(dataset, var); + md.add_dataset_variable(dataset.clone(), var); } } else { // Fall back to just schema information let schema_info = ParquetMetadataReader::get_schema_info(&parquet_file)?; - - // Create a dataset if we don't have one + + // Create a dataset once before the variable loop let dataset = IpumsDataset::from((dataset_name.to_string(), 0)); - + // Add each field as a variable with minimal metadata for (field_name, (data_type_str, _nullable)) in schema_info { let ipums_var = IpumsVariable { @@ -862,9 +862,16 @@ mod test { // This should work even if the parquet files don't have embedded metadata // It will fall back to schema information let result = usa_ctx.load_metadata_for_datasets_from_parquet(&["us2015b"]); - // Don't fail the test if the directory doesn't exist - if result.is_ok() || result.unwrap_err().to_string().contains("does not exist") { - // Expected behavior - either succeeds or fails with expected error + match result { + Ok(_) => { + // Success - metadata was loaded + } + Err(e) if e.to_string().contains("does not exist") => { + // Expected - dataset directory doesn't exist in test data + } + Err(e) => { + panic!("Unexpected error loading parquet metadata: {}", e); + } } } } diff --git a/src/parquet_metadata.rs b/src/parquet_metadata.rs index 4a01136c..be29195d 100644 --- a/src/parquet_metadata.rs +++ b/src/parquet_metadata.rs @@ -132,6 +132,16 @@ impl ParquetMetadataReader { // Sort categories by their code for consistent ordering categories.sort_by(|a, b| match (&a.value, &b.value) { (IpumsValue::Integer(a_val), IpumsValue::Integer(b_val)) => a_val.cmp(b_val), + (IpumsValue::Float(a_val), IpumsValue::Float(b_val)) => { + // Parse floats for comparison; fall back to string comparison on parse failure + match (a_val.parse::(), b_val.parse::()) { + (Ok(a_f), Ok(b_f)) => a_f.partial_cmp(&b_f).unwrap_or(std::cmp::Ordering::Equal), + _ => a_val.cmp(b_val), + } + } + (IpumsValue::String { value: a_val, .. }, IpumsValue::String { value: b_val, .. }) => { + a_val.cmp(b_val) + } _ => std::cmp::Ordering::Equal, }); @@ -221,61 +231,45 @@ impl ParquetMetadataReader { })?; let mut variables = Vec::new(); - let mut failed_vars = Vec::new(); for (var_name, var_value) in variables_map { - match serde_json::from_value::(var_value.clone()) { - Ok(metadata) => { - // Convert categories if present and not empty - let categories = if !metadata.categories.is_empty() { - match Self::convert_categories(&metadata.categories, &metadata.data_type, &var_name) { - Ok(cats) => Some(cats), - Err(e) => { - failed_vars.push(var_name.clone()); - eprintln!("Warning: Failed to parse categories for variable '{}': {}", var_name, e); - continue; - } - } - } else { - None - }; - - let ipums_var = IpumsVariable { - name: var_name.clone(), - data_type: Some(IpumsDataType::from(metadata.data_type.as_str())), - label: Some(metadata.label), - record_type: metadata - .record_type - .unwrap_or_else(|| record_type.to_string()), - categories, - formatting: metadata - .column_start - .and_then(|start| metadata.column_width.map(|width| (start, width))), - general_width: metadata.general_width.or(metadata.column_width), - description: None, // Could be populated from additional metadata if available - category_bins: None, - id: 0, // Will be assigned when added to MetadataEntities - }; - variables.push(ipums_var); - } - Err(e) => { - failed_vars.push(var_name.clone()); - // Consider using a logger instead of eprintln! in production - eprintln!( - "Warning: Failed to deserialize metadata for variable '{}': {}", - var_name, e - ); - } - } - } + let metadata: ParquetVariableMetadata = + serde_json::from_value(var_value).map_err(|e| { + metadata_error!( + "Failed to deserialize metadata for variable '{}': {}", + var_name, + e + ) + })?; + + // Convert categories if present and not empty + let categories = if !metadata.categories.is_empty() { + Some(Self::convert_categories( + &metadata.categories, + &metadata.data_type, + &var_name, + )?) + } else { + None + }; - if !failed_vars.is_empty() { - // Consider using a logger instead of eprintln! in production - eprintln!( - "Warning: Failed to parse {} variables: {:?}", - failed_vars.len(), - failed_vars - ); + let ipums_var = IpumsVariable { + name: var_name.clone(), + data_type: Some(IpumsDataType::from(metadata.data_type.as_str())), + label: Some(metadata.label), + record_type: metadata + .record_type + .unwrap_or_else(|| record_type.to_string()), + categories, + formatting: metadata + .column_start + .and_then(|start| metadata.column_width.map(|width| (start, width))), + general_width: metadata.general_width.or(metadata.column_width), + description: None, + category_bins: None, + id: 0, // Will be assigned when added to MetadataEntities + }; + variables.push(ipums_var); } if variables.is_empty() { @@ -339,7 +333,20 @@ impl ParquetMetadataReader { } } - /// Extract schema information from a parquet file + /// Convert a Parquet physical type string (from Debug format) to an IPUMS data type string. + /// Parquet physical types include: BOOLEAN, INT32, INT64, INT96, FLOAT, DOUBLE, + /// BYTE_ARRAY, FIXED_LEN_BYTE_ARRAY. + pub fn parquet_type_to_ipums_type(parquet_type: &str) -> &'static str { + match parquet_type { + "INT32" | "INT64" | "INT96" | "BOOLEAN" => "integer", + "FLOAT" | "DOUBLE" => "double", + "BYTE_ARRAY" | "FIXED_LEN_BYTE_ARRAY" => "string", + _ => "integer", // Default fallback + } + } + + /// Extract schema information from a parquet file. + /// Returns a map of field name to (IPUMS-compatible type string, nullable). pub fn get_schema_info(file_path: &Path) -> Result, MdError> { let file = File::open(file_path).map_err(|e| { metadata_error!( @@ -362,9 +369,10 @@ impl ParquetMetadataReader { for field in schema.get_fields() { let name = field.name().to_string(); - let type_str = format!("{:?}", field.get_physical_type()); + let parquet_type = format!("{:?}", field.get_physical_type()); + let ipums_type = Self::parquet_type_to_ipums_type(&parquet_type).to_string(); let nullable = field.is_optional(); - schema_info.insert(name, (type_str, nullable)); + schema_info.insert(name, (ipums_type, nullable)); } Ok(schema_info) @@ -579,7 +587,7 @@ mod tests { assert_eq!(categories[2].label(), "Missing"); assert_eq!(categories[2].value, IpumsValue::Integer(9)); - matches!(categories[2].meaning, UniversalCategoryType::Missing); + assert!(matches!(categories[2].meaning, UniversalCategoryType::Missing)); } #[test] @@ -668,9 +676,9 @@ mod tests { categories_map.insert("A".to_string(), "Category A".to_string()); categories_map.insert("123".to_string(), "Category 123".to_string()); categories_map.insert("!@#".to_string(), "Special chars".to_string()); - + let result = ParquetMetadataReader::convert_categories(&categories_map, "string", "TEST_VAR"); - + // String type should accept any category code assert!(result.is_ok(), "String type should accept any category code"); let categories = result.unwrap(); @@ -679,4 +687,24 @@ mod tests { assert!(matches!(category.value, IpumsValue::String { .. })); } } + + #[test] + fn test_parquet_type_to_ipums_type() { + // Integer types + assert_eq!(ParquetMetadataReader::parquet_type_to_ipums_type("INT32"), "integer"); + assert_eq!(ParquetMetadataReader::parquet_type_to_ipums_type("INT64"), "integer"); + assert_eq!(ParquetMetadataReader::parquet_type_to_ipums_type("INT96"), "integer"); + assert_eq!(ParquetMetadataReader::parquet_type_to_ipums_type("BOOLEAN"), "integer"); + + // Float types + assert_eq!(ParquetMetadataReader::parquet_type_to_ipums_type("FLOAT"), "double"); + assert_eq!(ParquetMetadataReader::parquet_type_to_ipums_type("DOUBLE"), "double"); + + // String types + assert_eq!(ParquetMetadataReader::parquet_type_to_ipums_type("BYTE_ARRAY"), "string"); + assert_eq!(ParquetMetadataReader::parquet_type_to_ipums_type("FIXED_LEN_BYTE_ARRAY"), "string"); + + // Unknown defaults to integer + assert_eq!(ParquetMetadataReader::parquet_type_to_ipums_type("UNKNOWN"), "integer"); + } } \ No newline at end of file