Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cimdea"
version = "0.3.2"
version = "0.4.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
229 changes: 224 additions & 5 deletions src/conventions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::ipums_data_model::*;
use crate::ipums_metadata_model::*;
use crate::layout;
use crate::mderror::{metadata_error, MdError};
use crate::parquet_metadata::ParquetMetadataReader;
use crate::request::InputType;

use std::collections::HashMap;
Expand Down Expand Up @@ -129,8 +130,98 @@ impl MicroDataCollection {
/// The path like `../output_data/current/parquet/us2019a/`
/// Reading the schema will give approximately the same metadata information
/// as reading the fixed-width layout file for the same dataset.
pub fn load_metadata_from_parquet(&mut self, _parquet_dataset_path: &Path) {
todo!("implement");
/// Additionally, if the parquet files contain IPUMS metadata in their key-value
/// metadata, this will load variable labels, categories, and dataset information.
pub fn load_metadata_from_parquet(
&mut self,
parquet_dataset_path: &Path,
) -> Result<(), MdError> {
if !parquet_dataset_path.exists() {
return Err(metadata_error!(
"Parquet dataset path does not exist: {}",
parquet_dataset_path.display()
));
}

// Extract dataset name from path
let dataset_name = parquet_dataset_path
.file_name()
.and_then(|n| n.to_str())
.ok_or_else(|| {
metadata_error!(
"Could not extract dataset name from path: {}",
parquet_dataset_path.display()
)
})?;

// Collect record types and filenames first to avoid borrow issues
let record_types: Vec<(String, String)> = self.record_types
.keys()
.map(|k| {
let base_filename = self.base_filename_for_dataset_and_rectype(dataset_name, k);
(k.clone(), base_filename)
})
.collect();

// Initialize metadata if not already present
if self.metadata.is_none() {
self.metadata = Some(MetadataEntities::new());
}

// Now work with the metadata
let md = self.metadata.as_mut().unwrap();

for (rectype_abbrev, base_filename) in record_types {
let parquet_file = parquet_dataset_path.join(format!("{}.parquet", base_filename));

if parquet_file.exists() {
// Check if the file has IPUMS metadata
if ParquetMetadataReader::has_ipums_metadata(&parquet_file) {
// Load metadata from the parquet file
let (variables, datasets) = ParquetMetadataReader::load_metadata_from_file(
&parquet_file,
&rectype_abbrev,
)?;

// Find the dataset once before the variable loop
let dataset = datasets
.iter()
.find(|d| d.name == dataset_name)
.cloned()
.unwrap_or_else(|| IpumsDataset::from((dataset_name.to_string(), 0)));

// Add variables to metadata
for var in variables {
md.add_dataset_variable(dataset.clone(), var);
}
} else {
// Fall back to just schema information
let schema_info = ParquetMetadataReader::get_schema_info(&parquet_file)?;

// Create a dataset once before the variable loop
let dataset = IpumsDataset::from((dataset_name.to_string(), 0));

// Add each field as a variable with minimal metadata
for (field_name, (data_type_str, _nullable)) in schema_info {
let ipums_var = IpumsVariable {
name: field_name,
data_type: Some(IpumsDataType::from(data_type_str.as_str())),
label: None,
record_type: rectype_abbrev.clone(),
categories: None,
formatting: None,
general_width: None,
description: None,
category_bins: None,
id: 0,
};
md.add_dataset_variable(dataset.clone(), ipums_var);
}
}
}
}

Ok(())
}

/// Using the data_root, scan the layouts and load metadata from them.
Expand Down Expand Up @@ -162,9 +253,53 @@ impl MicroDataCollection {

/// Takes a path like ../output_data/current/parquet/, which could be derived
/// automatically from defaults based on data root or product root. Scans all
/// parquet schema information.
pub fn load_metadata_from_all_parquet(&mut self, _parquet_path: &Path) {
todo!("implement");
/// parquet schema information and embedded metadata.
pub fn load_metadata_from_all_parquet(
&mut self,
parquet_path: &Path,
) -> Result<(), MdError> {
if !parquet_path.exists() {
return Err(metadata_error!(
"Parquet path does not exist: {}",
parquet_path.display()
));
}

// Read all subdirectories in the parquet path
let entries = std::fs::read_dir(parquet_path).map_err(|e| {
metadata_error!(
"Failed to read parquet directory {}: {}",
parquet_path.display(),
e
)
})?;

let mut loaded_count = 0;
let mut errors = Vec::new();

for entry in entries {
let entry = entry.map_err(|e| {
metadata_error!("Failed to read directory entry: {}", e)
})?;

let path = entry.path();
if path.is_dir() {
// Try to load metadata from this dataset directory
match self.load_metadata_from_parquet(&path) {
Ok(()) => loaded_count += 1,
Err(e) => errors.push(format!("{}: {}", path.display(), e)),
}
}
}

if loaded_count == 0 && !errors.is_empty() {
return Err(metadata_error!(
"Failed to load metadata from any parquet datasets. Errors: {}",
errors.join("; ")
));
}

Ok(())
}

/// Load everything available for the selected variables and samples from the available
Expand Down Expand Up @@ -514,6 +649,46 @@ impl Context {
}
}

/// Load metadata for datasets from parquet files
/// This will extract metadata from the parquet files' key-value metadata if available,
/// or fall back to schema information.
pub fn load_metadata_for_datasets_from_parquet(
&mut self,
datasets: &[&str],
) -> Result<(), MdError> {
if let Some(ref data_root) = self.data_root {
let parquet_path = data_root.join("parquet");
if !parquet_path.exists() {
return Err(metadata_error!(
"Parquet directory does not exist at: {}",
parquet_path.display()
));
}

for dataset in datasets {
let dataset_path = parquet_path.join(dataset);
self.settings.load_metadata_from_parquet(&dataset_path)?;
}
Ok(())
} else {
Err(metadata_error!(
"Cannot load parquet metadata without a data_root"
))
}
}

/// Load all available metadata from parquet files in the data root
pub fn load_all_metadata_from_parquet(&mut self) -> Result<(), MdError> {
if let Some(ref data_root) = self.data_root {
let parquet_path = data_root.join("parquet");
self.settings.load_metadata_from_all_parquet(&parquet_path)
} else {
Err(metadata_error!(
"Cannot load parquet metadata without a data_root"
))
}
}

/// The context should be set to read from layouts or full metadata
pub fn load_metadata_for_datasets_and_variables(
&mut self,
Expand Down Expand Up @@ -656,4 +831,48 @@ mod test {
let result = collection.default_table_name("us2021a", "Z");
assert!(result.is_err(), "expected an error but got {result:?}");
}

#[test]
fn test_load_metadata_from_parquet() {
let data_root = Some(String::from("tests/data_root"));
let mut usa_ctx = Context::from_ipums_collection_name("usa", None, data_root)
.expect("should be able to create USA context");

// Try to load metadata from parquet if test data exists
let parquet_path = PathBuf::from("tests/data_root/parquet/us2015b");
if parquet_path.exists() {
let result = usa_ctx.settings.load_metadata_from_parquet(&parquet_path);
// We don't assert success here since test data may not have metadata
// but the function should not panic
if result.is_ok() {
assert!(usa_ctx.settings.metadata.is_some());
}
}
}

#[test]
fn test_context_load_metadata_from_parquet() {
let data_root = Some(String::from("tests/data_root"));
let mut usa_ctx = Context::from_ipums_collection_name("usa", None, data_root)
.expect("should be able to create USA context");

// Try to load metadata for a specific dataset
let parquet_path = PathBuf::from("tests/data_root/parquet");
if parquet_path.exists() {
// This should work even if the parquet files don't have embedded metadata
// It will fall back to schema information
let result = usa_ctx.load_metadata_for_datasets_from_parquet(&["us2015b"]);
match result {
Ok(_) => {
// Success - metadata was loaded
}
Err(e) if e.to_string().contains("does not exist") => {
// Expected - dataset directory doesn't exist in test data
}
Err(e) => {
panic!("Unexpected error loading parquet metadata: {}", e);
}
}
}
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub mod ipums_data_model;
pub mod ipums_metadata_model;
pub mod layout;
pub mod mderror;
pub mod parquet_metadata;
pub mod query_gen;
pub mod request;
pub mod tabulate;
Expand Down
Loading
Loading