diff --git a/.github/workflows/core_tests.yml b/.github/workflows/core_tests.yml index 8e659b25c..a458a56fe 100644 --- a/.github/workflows/core_tests.yml +++ b/.github/workflows/core_tests.yml @@ -130,6 +130,7 @@ jobs: - run: uv run pytest test/test_skim_name_conflicts.py - run: uv run pytest test/random_seed/test_random_seed.py + - run: uv run pytest test/skip_failed_choices/test_skip_failed_choices.py builtin_regional_models: needs: foundation diff --git a/activitysim/abm/models/location_choice.py b/activitysim/abm/models/location_choice.py index 7f032a8ae..2d629a404 100644 --- a/activitysim/abm/models/location_choice.py +++ b/activitysim/abm/models/location_choice.py @@ -234,6 +234,11 @@ def location_sample( ): # FIXME - MEMORY HACK - only include columns actually used in spec chooser_columns = model_settings.SIMULATE_CHOOSER_COLUMNS + # Drop this when PR #1017 is merged + if ("household_id" not in chooser_columns) and ( + "household_id" in persons_merged.columns + ): + chooser_columns = chooser_columns + ["household_id"] choosers = persons_merged[chooser_columns] # create wrapper with keys for this lookup - in this case there is a home_zone_id in the choosers @@ -390,6 +395,11 @@ def location_presample( # FIXME maybe we should add it for multi-zone (from maz_taz) if missing? chooser_columns = model_settings.SIMULATE_CHOOSER_COLUMNS chooser_columns = [HOME_TAZ if c == HOME_MAZ else c for c in chooser_columns] + # Drop this when PR #1017 is merged + if ("household_id" not in chooser_columns) and ( + "household_id" in persons_merged.columns + ): + chooser_columns = chooser_columns + ["household_id"] choosers = persons_merged[chooser_columns] # create wrapper with keys for this lookup - in this case there is a HOME_TAZ in the choosers @@ -620,6 +630,11 @@ def run_location_simulate( # FIXME - MEMORY HACK - only include columns actually used in spec chooser_columns = model_settings.SIMULATE_CHOOSER_COLUMNS + # Drop this when PR #1017 is merged + if ("household_id" not in chooser_columns) and ( + "household_id" in persons_merged.columns + ): + chooser_columns = chooser_columns + ["household_id"] choosers = persons_merged[chooser_columns] alt_dest_col_name = model_settings.ALT_DEST_COL_NAME @@ -1072,6 +1087,33 @@ def iterate_location_choice( else: choices_df = choices_df_ + if ( + state.settings.skip_failed_choices + and state.get("num_skipped_households", 0) > 0 + ): + # drop choices that belong to the failed households: state.skipped_household_ids + # so that their choices are not considered in shadow price calculations + # first append household_id to choices_df + choices_df = choices_df.merge( + persons_merged_df[["household_id"]], + left_index=True, + right_index=True, + how="left", + ) + if len(choices_df) > 0: + # Get all household IDs from all trace_labels in the dictionary + import itertools + + skipped_household_ids_dict = state.get("skipped_household_ids", dict()) + all_skipped_hh_ids = set( + itertools.chain.from_iterable(skipped_household_ids_dict.values()) + ) + + choices_df = choices_df[ + ~choices_df["household_id"].isin(all_skipped_hh_ids) + ] + choices_df = choices_df.drop(columns=["household_id"]) + spc.set_choices( choices=choices_df["choice"], segment_ids=persons_merged_df[chooser_segment_column].reindex( diff --git a/activitysim/abm/models/school_escorting.py b/activitysim/abm/models/school_escorting.py index 32e5058e7..183b45ee2 100644 --- a/activitysim/abm/models/school_escorting.py +++ b/activitysim/abm/models/school_escorting.py @@ -472,6 +472,11 @@ def school_escorting( # reduce memory by limiting columns if selected columns are supplied chooser_columns = model_settings.SIMULATE_CHOOSER_COLUMNS if chooser_columns is not None: + # Drop this when PR #1017 is merged + if ("household_id" not in chooser_columns) and ( + "household_id" in choosers.columns + ): + chooser_columns = chooser_columns + ["household_id"] chooser_columns = chooser_columns + participant_columns choosers = choosers[chooser_columns] diff --git a/activitysim/abm/models/trip_matrices.py b/activitysim/abm/models/trip_matrices.py index 5552e3b00..5b5a8351a 100644 --- a/activitysim/abm/models/trip_matrices.py +++ b/activitysim/abm/models/trip_matrices.py @@ -90,6 +90,34 @@ def write_trip_matrices( trips_df = annotate_trips(state, trips, network_los, model_settings) + # This block adjusts household sample rate column to account for skipped households. + # Note: the `HH_EXPANSION_WEIGHT_COL` is pointing to the `sample_rate` column in the households table. + # Based on the calculation in write_matrices() function, the sample_rate is used to calculate the expansion weight as 1 / sample_rate. + # A sample_rate of 0.01 means the sample household should be expanded 1/0.01 = 100 times in the actual population households. + # In simulation, the `sample_rate` is calculated and added to the synthetic households + # based on household_sample_size / total_household_count, and therefore is the same for all households. + # In estimation, the `sample_rate` may vary by household, but weights are not used in estimation, and write_trip_matrices is not called during estimation. + # But we still try to cover both cases (when rates are the same vs when they vary) here for consistency. + hh_weight_col = model_settings.HH_EXPANSION_WEIGHT_COL + if state.get("num_skipped_households", 0) > 0: + logger.info( + f"Adjusting household sample rate in {hh_weight_col} to account for {state.get('num_skipped_households', 0)} skipped households." + ) + # adjust the hh sample rates to account for skipped households + # first get the total expansion weight of the skipped households, which will be the sum of inverse of their sample rates + skipped_household_weights = ( + 1 / state.get_dataframe("households_skipped")[hh_weight_col] + ).sum() + # next get the total expansion weight of the remaining households + remaining_household_weights = ( + 1 / state.get_dataframe("households")[hh_weight_col] + ).sum() + # the adjustment factor is the remaining household weight / (remaining household weight + skipped household weight) + adjustment_factor = remaining_household_weights / ( + remaining_household_weights + skipped_household_weights + ) + trips_df[hh_weight_col] = trips_df[hh_weight_col] * adjustment_factor + if model_settings.SAVE_TRIPS_TABLE: state.add_table("trips", trips_df) diff --git a/activitysim/abm/models/trip_mode_choice.py b/activitysim/abm/models/trip_mode_choice.py index a942b7af8..3378b5421 100644 --- a/activitysim/abm/models/trip_mode_choice.py +++ b/activitysim/abm/models/trip_mode_choice.py @@ -366,7 +366,20 @@ def trip_mode_choice( "trip_mode_choice choices", trips_df[mode_column_name], value_counts=True ) - assert not trips_df[mode_column_name].isnull().any() + # if we're skipping failed choices, the trip modes for failed simulations will be null + if state.settings.skip_failed_choices: + # Get all household IDs from all trace_labels in the dictionary - more efficient flattening + import itertools + + skipped_household_ids_dict = state.get("skipped_household_ids", dict()) + all_skipped_hh_ids = set( + itertools.chain.from_iterable(skipped_household_ids_dict.values()) + ) + + mask_skipped = trips_df["household_id"].isin(all_skipped_hh_ids) + assert not trips_df.loc[~mask_skipped, mode_column_name].isnull().any() + else: + assert not trips_df[mode_column_name].isnull().any() state.add_table("trips", trips_df) @@ -382,6 +395,8 @@ def trip_mode_choice( # need to update locals_dict to access skims that are the same .shape as trips table locals_dict = {} locals_dict.update(constants) + if state.settings.skip_failed_choices: + trips_merged = trips_merged.loc[~mask_skipped] simulate.set_skim_wrapper_targets(trips_merged, skims) locals_dict.update(skims) locals_dict["timeframe"] = "trip" diff --git a/activitysim/abm/models/util/school_escort_tours_trips.py b/activitysim/abm/models/util/school_escort_tours_trips.py index 665844023..1bfdd22aa 100644 --- a/activitysim/abm/models/util/school_escort_tours_trips.py +++ b/activitysim/abm/models/util/school_escort_tours_trips.py @@ -1043,6 +1043,41 @@ def force_escortee_trip_modes_to_match_chauffeur(state: workflow.State, trips): f"Changed {diff.sum()} trip modes of school escortees to match their chauffeur" ) + # trip_mode can be na if the run allows skipping failed choices and the trip mode choice has failed + # in that case we can't assert that all trip modes are filled + # instead, we throw a warning about how many are missing, and return early + if state.settings.skip_failed_choices: + missing_count = trips.trip_mode.isna().sum() + if missing_count > 0: + # check if the missing trip modes are all because of simulation failures + # i.e., they are from households that are in the skipped_household_ids set + import itertools + + skipped_household_ids_dict = state.get("skipped_household_ids", dict()) + skipped_household_ids = set( + itertools.chain.from_iterable(skipped_household_ids_dict.values()) + ) + missing_household_ids = set( + trips[trips.trip_mode.isna()]["household_id"].unique() + ) + # log a warning about the missing trip modes for skipped households + missing_count_due_to_sim_fail = len( + trips[ + trips.trip_mode.isna() + & trips.household_id.isin(skipped_household_ids) + ] + ) + logger.warning( + f"Missing trip mode for {missing_count_due_to_sim_fail} trips due to simulation failures in trip mode choice, " + f"these records and their corresponding households are being skipped: {missing_household_ids}" + ) + # throw assertion error if there are missing trip modes for households that were not skipped + assert missing_household_ids.issubset(skipped_household_ids), ( + f"Missing trip modes for households that were not skipped: {missing_household_ids - skipped_household_ids}. " + f"Missing trip modes for: {trips[trips.trip_mode.isna() & ~trips.household_id.isin(skipped_household_ids)]}" + ) + return trips + assert ( ~trips.trip_mode.isna() ).all(), f"Missing trip mode for {trips[trips.trip_mode.isna()]}" diff --git a/activitysim/abm/models/util/tour_destination.py b/activitysim/abm/models/util/tour_destination.py index d99803bd7..2cac47ee1 100644 --- a/activitysim/abm/models/util/tour_destination.py +++ b/activitysim/abm/models/util/tour_destination.py @@ -625,6 +625,11 @@ def run_destination_sample( # if special person id is passed chooser_id_column = model_settings.CHOOSER_ID_COLUMN + # Drop this when PR #1017 is merged + if ("household_id" not in chooser_columns) and ( + "household_id" in persons_merged.columns + ): + chooser_columns = chooser_columns + ["household_id"] persons_merged = persons_merged[ [c for c in persons_merged.columns if c in chooser_columns] ] @@ -799,6 +804,11 @@ def run_destination_simulate( # if special person id is passed chooser_id_column = model_settings.CHOOSER_ID_COLUMN + # Drop this when PR #1017 is merged + if ("household_id" not in chooser_columns) and ( + "household_id" in persons_merged.columns + ): + chooser_columns = chooser_columns + ["household_id"] persons_merged = persons_merged[ [c for c in persons_merged.columns if c in chooser_columns] ] diff --git a/activitysim/abm/models/util/tour_od.py b/activitysim/abm/models/util/tour_od.py index 5ec9dd493..c2548cbd6 100644 --- a/activitysim/abm/models/util/tour_od.py +++ b/activitysim/abm/models/util/tour_od.py @@ -692,6 +692,9 @@ def run_od_sample( choosers = tours # FIXME - MEMORY HACK - only include columns actually used in spec chooser_columns = model_settings.SIMULATE_CHOOSER_COLUMNS + # Drop this when PR #1017 is merged + if ("household_id" not in chooser_columns) and ("household_id" in choosers.columns): + chooser_columns = chooser_columns + ["household_id"] choosers = choosers[chooser_columns] # interaction_sample requires that choosers.index.is_monotonic_increasing @@ -951,6 +954,9 @@ def run_od_simulate( # FIXME - MEMORY HACK - only include columns actually used in spec chooser_columns = model_settings.SIMULATE_CHOOSER_COLUMNS + # Drop this when PR #1017 is merged + if ("household_id" not in chooser_columns) and ("household_id" in choosers.columns): + chooser_columns = chooser_columns + ["household_id"] choosers = choosers[chooser_columns] # interaction_sample requires that choosers.index.is_monotonic_increasing diff --git a/activitysim/abm/models/util/tour_scheduling.py b/activitysim/abm/models/util/tour_scheduling.py index 80474db59..e591c29db 100644 --- a/activitysim/abm/models/util/tour_scheduling.py +++ b/activitysim/abm/models/util/tour_scheduling.py @@ -40,6 +40,12 @@ def run_tour_scheduling( c for c in model_columns if c not in logsum_columns ] + # Drop this when PR #1017 is merged + if ("household_id" not in chooser_columns) and ( + "household_id" in persons_merged.columns + ): + chooser_columns = chooser_columns + ["household_id"] + persons_merged = expressions.filter_chooser_columns(persons_merged, chooser_columns) timetable = state.get_injectable("timetable") diff --git a/activitysim/abm/tables/households.py b/activitysim/abm/tables/households.py index c0c33dcbc..680cdb6c7 100644 --- a/activitysim/abm/tables/households.py +++ b/activitysim/abm/tables/households.py @@ -110,6 +110,12 @@ def households(state: workflow.State) -> pd.DataFrame: # replace table function with dataframe state.add_table("households", df) + if state.settings.skip_failed_choices: + logger.info( + "Note: 'skip_failed_choices' is enabled; households may be skipped when simulation fails." + ) + # initialize skipped households table as empty and same columns as households + state.add_table("households_skipped", df.iloc[0:0]) state.get_rn_generator().add_channel("households", df) diff --git a/activitysim/cli/run.py b/activitysim/cli/run.py index af9a76daa..e7cb0c162 100644 --- a/activitysim/cli/run.py +++ b/activitysim/cli/run.py @@ -469,6 +469,24 @@ def run(args): if memory_sidecar_process: memory_sidecar_process.stop() + # print out a summary of households skipped due to failed choices + # we want to see number of unique households skipped by trace_label + if state.settings.skip_failed_choices: + skipped_household_ids_dict = state.get("skipped_household_ids", dict()) + for trace_label, hh_id_set in skipped_household_ids_dict.items(): + logger.warning( + f"Number of unique households skipped for trace_label '{trace_label}': {len(hh_id_set)}. They are: {sorted(hh_id_set)}" + ) + # also log the total number of unique households skipped across all trace_labels + import itertools + + all_skipped_hh_ids = set( + itertools.chain.from_iterable(skipped_household_ids_dict.values()) + ) + logger.warning( + f"Total number of unique households skipped across all trace_labels: {len(all_skipped_hh_ids)}." + ) + if state.settings.expression_profile: # generate a summary of slower expression evaluation times # across all models and write to a file diff --git a/activitysim/core/configuration/top.py b/activitysim/core/configuration/top.py index 405560810..34175c2f0 100644 --- a/activitysim/core/configuration/top.py +++ b/activitysim/core/configuration/top.py @@ -776,11 +776,27 @@ def _check_store_skims_in_shm(self): check_model_settings: bool = True """ - run checks to validate that YAML settings files are loadable and spec and coefficent csv can be resolved. + run checks to validate that YAML settings files are loadable and spec and coefficient csv can be resolved. should catch many common errors early, including missing required configurations or specified coefficient labels without defined values. """ + skip_failed_choices: bool = True + """ + Skip households that cause errors during processing instead of failing the model run. + + .. versionadded:: 1.6 + """ + + fraction_of_failed_choices_allowed: float = 0.1 + """ + Threshold for the fraction of households that can be skipped before failing the model run, + used in conjunction with `skip_failed_choices`. + We want to skip problems when they are rare, but fail the run if they are common. + + .. versionadded:: 1.6 + """ + other_settings: dict[str, Any] = None def _get_attr(self, attr): diff --git a/activitysim/core/interaction_sample_simulate.py b/activitysim/core/interaction_sample_simulate.py index df1c53fa0..e34974840 100644 --- a/activitysim/core/interaction_sample_simulate.py +++ b/activitysim/core/interaction_sample_simulate.py @@ -351,6 +351,11 @@ def _interaction_sample_simulate( # that is, we want the index value of the row that is offset by rows into the # tranche of this choosers alternatives created by cross join of alternatives and choosers + # when skip failed choices is enabled, the position may be -99 for failed choices, which gets droppped eventually + # here we just need to clip to zero to avoid getting the wrong index in the take() below + if state.settings.skip_failed_choices: + positions = positions.clip(lower=0) + # resulting pandas Int64Index has one element per chooser row and is in same order as choosers choices = alternatives[choice_column].take(positions + first_row_offsets) diff --git a/activitysim/core/logit.py b/activitysim/core/logit.py index 105e18fec..1dca1f945 100644 --- a/activitysim/core/logit.py +++ b/activitysim/core/logit.py @@ -30,6 +30,7 @@ def report_bad_choices( state: workflow.State, bad_row_map, df, + skip_failed_choices, trace_label, msg, trace_choosers=None, @@ -87,6 +88,45 @@ def report_bad_choices( logger.warning(row_msg) + if skip_failed_choices: + # update counter in state + num_skipped_households = state.get("num_skipped_households", 0) + skipped_household_ids = state.get("skipped_household_ids", dict()) + + # Get current household IDs and filter out None values + current_hh_ids = set(df[trace_col].dropna().unique()) + + # Get all previously skipped household IDs across all trace_labels + import itertools + + already_skipped = set( + itertools.chain.from_iterable(skipped_household_ids.values()) + ) + + # Find truly new household IDs that haven't been skipped before + new_skipped_hh_ids = current_hh_ids - already_skipped + + # Only process if there are new households to skip + if new_skipped_hh_ids: + # Initialize list for this trace_label if it doesn't exist + if trace_label not in skipped_household_ids: + skipped_household_ids[trace_label] = [] + skipped_household_ids[trace_label].extend(new_skipped_hh_ids) + num_skipped_households += len(new_skipped_hh_ids) + + # make sure the number of skipped households is consistent with the ids recorded + assert num_skipped_households == sum( + len(hh_list) for hh_list in skipped_household_ids.values() + ), "Inconsistent number of skipped households and recorded ids" + state.set("num_skipped_households", num_skipped_households) + state.set("skipped_household_ids", skipped_household_ids) + + logger.warning( + f"Skipping {bad_row_map.sum()} bad choices. Total skipped households so far: {num_skipped_households}. Skipped household IDs: {skipped_household_ids}" + ) + + return + if raise_error: raise InvalidTravelError(msg_with_count) @@ -136,6 +176,7 @@ def utils_to_probs( allow_zero_probs=False, trace_choosers=None, overflow_protection: bool = True, + skip_failed_choices: bool = True, return_logsums: bool = False, ): """ @@ -176,6 +217,16 @@ def utils_to_probs( overflow_protection will have no benefit but impose a modest computational overhead cost. + skip_failed_choices : bool, default True + If True, when bad choices are detected (all zero probabilities or infinite + probabilities), the entire household that's causing bad choices will be skipped instead of + being masked by overflow protection or causing an error. + A counter will be incremented for each skipped household. This is useful when running large + simulations where occasional bad choices are encountered and should not halt the process. + The counter can be accessed via `state.get("num_skipped_households", 0)`. + The number of skipped households and their IDs will be logged at the end of the simulation. + When `skip_failed_choices` is True, `overflow_protection` will be forced to False to avoid conflicts. + Returns ------- probs : pandas.DataFrame @@ -203,6 +254,13 @@ def utils_to_probs( utils_arr.dtype == np.float32 and utils_arr.max() > 85 ) + # get skip_failed_choices from state + skip_failed_choices = state.settings.skip_failed_choices + # when skipping failed choices, we cannot use overflow protection + # because it would mask the underlying issue causing bad choices + if skip_failed_choices: + overflow_protection = False + if overflow_protection: # exponentiated utils will overflow, downshift them shifts = utils_arr.max(1, keepdims=True) @@ -240,6 +298,7 @@ def utils_to_probs( state, zero_probs, utils, + skip_failed_choices, trace_label=tracing.extend_trace_label(trace_label, "zero_prob_utils"), msg="all probabilities are zero", trace_choosers=trace_choosers, @@ -251,6 +310,7 @@ def utils_to_probs( state, inf_utils, utils, + skip_failed_choices, trace_label=tracing.extend_trace_label(trace_label, "inf_exp_utils"), msg="infinite exponentiated utilities", trace_choosers=trace_choosers, @@ -316,11 +376,14 @@ def make_choices( np.ones(len(probs.index)) ).abs() > BAD_PROB_THRESHOLD * np.ones(len(probs.index)) + skip_failed_choices = state.settings.skip_failed_choices + if bad_probs.any() and not allow_bad_probs: report_bad_choices( state, bad_probs, probs, + skip_failed_choices, trace_label=tracing.extend_trace_label(trace_label, "bad_probs"), msg="probabilities do not add up to 1", trace_choosers=trace_choosers, @@ -329,6 +392,8 @@ def make_choices( rands = state.get_rn_generator().random_for_df(probs) choices = pd.Series(choice_maker(probs.values, rands), index=probs.index) + # mark bad choices with -99 + choices[bad_probs] = -99 rands = pd.Series(np.asanyarray(rands).flatten(), index=probs.index) diff --git a/activitysim/core/simulate.py b/activitysim/core/simulate.py index 55caf050a..c6b210775 100644 --- a/activitysim/core/simulate.py +++ b/activitysim/core/simulate.py @@ -1325,7 +1325,9 @@ def eval_mnl( if custom_chooser: choices, rands = custom_chooser(state, probs, choosers, spec, trace_label) else: - choices, rands = logit.make_choices(state, probs, trace_label=trace_label) + choices, rands = logit.make_choices( + state, probs, trace_label=trace_label, trace_choosers=choosers + ) del probs chunk_sizer.log_df(trace_label, "probs", None) @@ -1490,6 +1492,7 @@ def eval_nl( state, no_choices, base_probabilities, + state.settings.skip_failed_choices, trace_label=tracing.extend_trace_label(trace_label, "bad_probs"), trace_choosers=choosers, msg="base_probabilities do not sum to one", diff --git a/activitysim/core/test/test_logit.py b/activitysim/core/test/test_logit.py index e249475de..d0ee07cb2 100644 --- a/activitysim/core/test/test_logit.py +++ b/activitysim/core/test/test_logit.py @@ -78,6 +78,7 @@ def test_utils_to_probs(utilities, test_data): def test_utils_to_probs_raises(): state = workflow.State().default_settings() + state.settings.skip_failed_choices = False idx = pd.Index(name="household_id", data=[1]) with pytest.raises(RuntimeError) as excinfo: logit.utils_to_probs( diff --git a/activitysim/core/util.py b/activitysim/core/util.py index 20f79c760..58bc50be4 100644 --- a/activitysim/core/util.py +++ b/activitysim/core/util.py @@ -683,6 +683,9 @@ def drop_unused_columns( unique_variables_in_spec |= set(additional_columns or []) + # always keep household_id + unique_variables_in_spec.add("household_id") + if locals_d: unique_variables_in_spec.add(locals_d.get("orig_col_name", None)) unique_variables_in_spec.add(locals_d.get("dest_col_name", None)) diff --git a/activitysim/core/workflow/state.py b/activitysim/core/workflow/state.py index 9f7dcd4d6..de12731ac 100644 --- a/activitysim/core/workflow/state.py +++ b/activitysim/core/workflow/state.py @@ -938,8 +938,179 @@ def add_table( # mark this salient table as edited, so it can be checkpointed # at some later time if desired. self.existing_table_status[name] = True + + # Set the new table content self.set(name, content) + # Check if we need to update tables for skipped households + skipped_household_ids_dict = self.get("skipped_household_ids", dict()) + if ( + skipped_household_ids_dict + ): # only proceed if there are any skipped households + current_skipped_count = sum( + len(hh_list) for hh_list in skipped_household_ids_dict.values() + ) + # make sure this is consistent with number of skipped households tracked + assert current_skipped_count == self.get("num_skipped_households", 0) + + # Only update this specific table if it actually contains skipped household data + import itertools + + skipped_household_ids = set( + itertools.chain.from_iterable(skipped_household_ids_dict.values()) + ) + + # Check if the new content contains any skipped households + content_needs_cleaning = False + if hasattr(content, "index") and "household_id" in content.index.names: + content_needs_cleaning = ( + content.index.get_level_values("household_id") + .isin(skipped_household_ids) + .any() + ) + elif hasattr(content, "columns") and "household_id" in content.columns: + content_needs_cleaning = ( + content["household_id"].isin(skipped_household_ids).any() + ) + + if content_needs_cleaning: + self.update_table(name) + + # Check if there are newly skipped households and update all tables if needed + last_updated_count = self.get("_last_updated_skipped_count", 0) + if current_skipped_count > last_updated_count: + # update all tables to remove newly skipped households + self.update_table(name=None) + # Track the number of skipped households at the time of last update + self.set("_last_updated_skipped_count", current_skipped_count) + + def update_table(self, name: str = None): + """ + Go through existing tables in the state and + get rid of any rows that correspond to skipped households. + Save skipped household records in state under households_skipped + Parameters + ---------- + name : str, optional + Name of table to update. If None, update all tables. + Returns + ------- + None + """ + import itertools + + skipped_household_ids_dict = self.get("skipped_household_ids", dict()) + skipped_household_ids = set( + itertools.chain.from_iterable(skipped_household_ids_dict.values()) + ) + + if not skipped_household_ids: + return + + # get existing tables in the current state context + existing_tables = self.registered_tables() if name is None else [name] + + for table_name in existing_tables: + if not self.is_table(table_name): + continue + df = self.get_dataframe(table_name, as_copy=False) + # get the initial length of the dataframe + initial_len = len(df) + # we do not drop rows from households_skipped table + if table_name == "households_skipped": + continue + + # determine which column/index contains household_id and create mask + mask = None + if "household_id" in df.index.names: + mask = df.index.get_level_values("household_id").isin( + skipped_household_ids + ) + elif "household_id" in df.columns: + mask = df["household_id"].isin(skipped_household_ids) + else: + continue # skip tables without household_id + + # early exit if no matches found + if not mask.any(): + continue + + # save skipped household records for households table only + if table_name == "households": + newly_skipped_hh_df = df.loc[mask].copy() + skipped_hh_df = self.get("households_skipped", pd.DataFrame()) + skipped_hh_df = pd.concat( + [skipped_hh_df, newly_skipped_hh_df], join="inner" + ) + # make sure household_id is unique in skipped households + if "household_id" in skipped_hh_df.index.names: + assert skipped_hh_df.index.get_level_values( + "household_id" + ).is_unique, "household_id is not unique in households_skipped" + self.set("households_skipped", skipped_hh_df) + + # Check if we've exceeded the allowed fraction of skipped households + # Use weighted households if expansion weight column exists, otherwise use counts + if "sample_rate" in df.columns: + # Use weighted calculation + skipped_household_weights = (1 / skipped_hh_df["sample_rate"]).sum() + remaining_household_weights = ( + 1 / df.loc[~mask, "sample_rate"] + ).sum() + total_household_weights = ( + skipped_household_weights + remaining_household_weights + ) + + if total_household_weights > 0: + skipped_fraction = ( + skipped_household_weights / total_household_weights + ) + metric_name = "weighted households" + else: + # Fallback to count-based if weights are zero + skipped_fraction = len(skipped_hh_df) / ( + len(skipped_hh_df) + len(df.loc[~mask]) + ) + metric_name = "households" + else: + # Use count-based calculation + skipped_fraction = len(skipped_hh_df) / ( + len(skipped_hh_df) + len(df.loc[~mask]) + ) + metric_name = "households" + + max_allowed_fraction = self.settings.fraction_of_failed_choices_allowed + + if skipped_fraction > max_allowed_fraction: + raise RuntimeError( + f"Too many {metric_name} skipped: {skipped_fraction:.2%} exceeds the allowed threshold of " + f"{max_allowed_fraction:.2%}. This indicates a systematic problem with the model " + f"simulation. Adjust 'fraction_of_failed_choices_allowed' setting " + f"if this is expected." + ) + + # drop the matching rows using the same mask + df.drop(index=df.index[mask], inplace=True) + + # get the length of the dataframe after dropping rows + final_len = len(df) + logger.debug( + f"update_table: dropped {initial_len - final_len} rows from {table_name} " + f"corresponding to skipped households" + ) + # mark this table as edited if we dropped any rows + if final_len < initial_len: + self.existing_table_status[table_name] = True + # terminate the run if we dropped all rows + # and raise an error + if final_len == 0: + raise RuntimeError( + f"update_table: all rows dropped from {table_name} " + f"corresponding to skipped households, terminating run" + ) + # set the updated dataframe back to the state + self.set(table_name, df) + def is_table(self, name: str): """ Check if a name corresponds to a table in this state's context. diff --git a/test/skip_failed_choices/.gitignore b/test/skip_failed_choices/.gitignore new file mode 100644 index 000000000..67176c62d --- /dev/null +++ b/test/skip_failed_choices/.gitignore @@ -0,0 +1,2 @@ +configs*/ +output/ \ No newline at end of file diff --git a/test/skip_failed_choices/test_skip_failed_choices.py b/test/skip_failed_choices/test_skip_failed_choices.py new file mode 100644 index 000000000..50f0581b1 --- /dev/null +++ b/test/skip_failed_choices/test_skip_failed_choices.py @@ -0,0 +1,121 @@ +from __future__ import annotations + +# ActivitySim +# See full license in LICENSE.txt. +import importlib.resources +import os +from shutil import copytree + +import pandas as pd +import pytest +import yaml + +from activitysim.core import workflow + + +def example_path(dirname): + resource = os.path.join("examples", "prototype_mtc", dirname) + return str(importlib.resources.files("activitysim").joinpath(resource)) + + +def dir_test_path(dirname): + return os.path.join(os.path.dirname(__file__), dirname) + + +data_dir = example_path("data") +new_configs_dir = dir_test_path("configs") +new_settings_file = os.path.join(new_configs_dir, "settings.yaml") +# copy example configs to test/skip_failed_choices/configs if not already there +if not os.path.exists(new_configs_dir): + copytree(example_path("configs"), new_configs_dir) + + +def update_settings(settings_file, key, value): + with open(settings_file, "r") as f: + settings = yaml.safe_load(f) + f.close() + + settings[key] = value + + with open(settings_file, "w") as f: + yaml.safe_dump(settings, f) + f.close() + + +def update_uec_csv(uec_file, expression, coef_value): + # read in the uec file + df = pd.read_csv(uec_file) + # append a new row, set expression and coef_value + df.loc[len(df), "Expression"] = expression + # from the 4th column onward are coefficients + for col in df.columns[3:]: + df.loc[len(df) - 1, col] = coef_value + df.to_csv(uec_file, index=False) + + +@pytest.fixture +def state(): + configs_dir = new_configs_dir + output_dir = dir_test_path("output") + data_dir = example_path("data") + + # turn the global setting on to skip failed choices + update_settings(new_settings_file, "skip_failed_choices", True) + + # make some choices fail by setting extreme coefficients in the uec + # auto ownership + auto_ownership_uec_file = os.path.join(new_configs_dir, "auto_ownership.csv") + # forcing households in home zone 8 (recoded 7) to fail auto ownership choice + update_uec_csv(auto_ownership_uec_file, "@df.home_zone_id==7", -999.0) + + # work location choice + work_location_choice_uec_file = os.path.join( + new_configs_dir, "workplace_location.csv" + ) + # forcing workers from home zone 18 to fail work location choice + # as if there is a network connection problem for zone 18 + update_uec_csv(work_location_choice_uec_file, "@df.home_zone_id==18", -999.0) + + # trip mode choice + trip_mode_choice_uec_file = os.path.join(new_configs_dir, "trip_mode_choice.csv") + # forcing trips on drivealone tours to fail trip mode choice + update_uec_csv(trip_mode_choice_uec_file, "@df.tour_mode=='DRIVEALONEFREE'", -999.0) + + state = workflow.State.make_default( + configs_dir=configs_dir, + output_dir=output_dir, + data_dir=data_dir, + ) + + from activitysim.abm.tables.skims import network_los_preload + + state.get(network_los_preload) + + state.logging.config_logger() + return state + + +def test_skip_failed_choices(state): + + # check that the setting is indeed set to True + assert state.settings.skip_failed_choices is True + + state.run(models=state.settings.models, resume_after=None) + + # check that the number of skipped households is recorded in state + assert state.get("num_skipped_households", 0) == 943 + + # check that there are no DRIVEALONEFREE tours in the final tours + final_tours_df = state.get_dataframe("tours") + assert "DRIVEALONEFREE" not in final_tours_df["tour_mode"].values + + # check that there are no households in home zone 8 (recoded 7) in the final households + final_households_df = state.get_dataframe("households") + assert not any(final_households_df["home_zone_id"] == 7) + + # check that there are no workers from households in home zone 18 in the final persons + final_persons_df = state.get_dataframe("persons") + assert not any( + (final_persons_df["home_zone_id"] == 18) + & (final_persons_df["is_worker"] == True) + )