diff --git a/cropclassification/calc_cover.py b/cropclassification/calc_cover.py index be9b6ea..1ce080e 100644 --- a/cropclassification/calc_cover.py +++ b/cropclassification/calc_cover.py @@ -50,7 +50,9 @@ def run_cover( logger.info(f"Config used: \n{conf.pformat_config()}") force = True + force = False markertype = conf.marker["markertype"] + if not markertype.startswith(("COVER", "ONBEDEKT")): raise ValueError(f"Invalid markertype {markertype}, expected COVER_XXX") @@ -60,215 +62,290 @@ def run_cover( conf.paths.getpath("marker_dir"), reuse_last_run_dir ) - # Read the info about the run - input_parcel_filename = conf.calc_marker_params.getpath("input_parcel_filename") - input_dir = conf.paths.getpath("input_dir") - input_parcel_path = input_dir / input_parcel_filename - - # Check if the necessary input files exist... - for path in [input_parcel_path]: - if path is not None and not path.exists(): - message = f"Input file doesn't exist, so STOP: {path}" - logger.critical(message) - raise ValueError(message) - - # Depending on the specific markertype, export only the relevant parcels - input_preprocessed_dir = conf.paths.getpath("input_preprocessed_dir") - input_preprocessed_dir.mkdir(parents=True, exist_ok=True) - if markertype in ("COVER", "COVER_EEF_VOORJAAR"): - pass - elif markertype in ("ONBEDEKT_NA_ZOMER", "COVER_BMG_MEG_MEV_EEF_NAJAAR"): - input_parcel_filename = f"{input_parcel_path.stem}_{markertype}.gpkg" - input_parcel_filtered_path = input_preprocessed_dir / input_parcel_filename - - where = """ - "ALL_BEST" like '%BMG%' - OR "ALL_BEST" like '%MEV%' - OR "ALL_BEST" like '%MEG%' - OR "ALL_BEST" like '%EEF%' - OR "ALL_BEST" like '%TBG%' - OR "ALL_BEST" like '%EBG%' - """ - gfo.copy_layer( - input_parcel_path, input_parcel_filtered_path, where=where, force=force - ) - input_parcel_path = input_parcel_filtered_path + parcels_marker_path = ( + run_dir / f"{markertype}_{datetime.now().strftime('%Y-%m-%d')}.xlsx" + ) + parcel_selected_sqlite_path = parcels_marker_path.with_suffix(".sqlite") - elif markertype == "COVER_EEB_VOORJAAR": - # Fallow parcels with a premium having as requirement that there is no activity - # on them from 15/01 till 10/04 + that they have maize as main crop. - input_parcel_filename = f"{input_parcel_path.stem}_EEB.gpkg" - input_parcel_filtered_path = input_preprocessed_dir / input_parcel_filename + start_date = datetime.fromisoformat(conf.period["start_date"]) + end_date = datetime.fromisoformat(conf.period["end_date"]) - where = ( - "ALL_BEST like '%EEB%' AND GWSCOD_V = '83' AND GWSCOD_H IN ('201', '202')" + # If the config needs to be reused as well, load it, else write it + config_used_path = run_dir / "config_used.ini" + if reuse_last_run_dir and run_dir.exists() and config_used_path.exists(): + config_paths.append(config_used_path) + logger.info(f"Run dir config needs to be reused, so {config_paths}") + conf.read_config( + config_paths=config_paths, + default_basedir=default_basedir, + overrules=config_overrules, ) - gfo.copy_layer( - input_parcel_path, input_parcel_filtered_path, where=where, force=force + logger.info( + "Write new config_used.ini, because some parameters might have been added" ) - input_parcel_path = input_parcel_filtered_path + with config_used_path.open("w") as config_used_file: + conf.config.write(config_used_file) + else: + # Copy the config files to a config dir for later notice + configfiles_used_dir = run_dir / "configfiles_used" + if configfiles_used_dir.exists(): + configfiles_used = sorted(configfiles_used_dir.glob("*.ini")) + conf.read_config( + config_paths=configfiles_used, + default_basedir=default_basedir, + overrules=config_overrules, + ) + else: + configfiles_used_dir.mkdir(parents=True) + for idx, config_path in enumerate(config_paths): + # Prepend with idx so the order of config files is retained... + dst = configfiles_used_dir / f"{idx}_{config_path.name}" + shutil.copy(config_path, dst) + + # Write the resolved complete config, so it can be reused + logger.info("Write config_used.ini, so it can be reused later on") + with config_used_path.open("w") as config_used_file: + conf.config.write(config_used_file) + + if not parcels_marker_path.exists() or force: + # Read the info about the run + input_parcel_filename = conf.calc_marker_params.getpath("input_parcel_filename") + input_dir = conf.paths.getpath("input_dir") + input_parcel_path = input_dir / input_parcel_filename + + # Check if the necessary input files exist... + for path in [input_parcel_path]: + if path is not None and not path.exists(): + message = f"Input file doesn't exist, so STOP: {path}" + logger.critical(message) + raise ValueError(message) + + # Depending on the specific markertype, export only the relevant parcels + input_preprocessed_dir = conf.paths.getpath("input_preprocessed_dir") + input_preprocessed_dir.mkdir(parents=True, exist_ok=True) + if markertype in ("COVER", "COVER_EEF_VOORJAAR", "ONBEDEKT_NA_WINTER"): + input_parcel_filename = f"{input_parcel_path.stem}_{markertype}.gpkg" + input_parcel_filtered_path = input_preprocessed_dir / input_parcel_filename + + where = """ + "ALL_BEST" like '%EEF%' + """ + gfo.copy_layer( + input_parcel_path, input_parcel_filtered_path, where=where, force=force + ) + input_parcel_path = input_parcel_filtered_path + elif markertype in ("ONBEDEKT_NA_ZOMER", "COVER_BMG_MEG_MEV_EEF_NAJAAR"): + input_parcel_filename = f"{input_parcel_path.stem}_{markertype}.gpkg" + input_parcel_filtered_path = input_preprocessed_dir / input_parcel_filename + + where = """ + "ALL_BEST" like '%BMG%' + OR "ALL_BEST" like '%MEV%' + OR "ALL_BEST" like '%MEG%' + OR "ALL_BEST" like '%EEF%' + OR "ALL_BEST" like '%TBG%' + OR "ALL_BEST" like '%EBG%' + """ + gfo.copy_layer( + input_parcel_path, input_parcel_filtered_path, where=where, force=force + ) + input_parcel_path = input_parcel_filtered_path + + elif markertype == "COVER_EEB_VOORJAAR": + # Fallow parcels with a premium having as requirement that there is no activity + # on them from 15/01 till 10/04 + that they have maize as main crop. + input_parcel_filename = f"{input_parcel_path.stem}_EEB.gpkg" + input_parcel_filtered_path = input_preprocessed_dir / input_parcel_filename - elif markertype in ("COVER_TBG_BMG_VOORJAAR", "COVER_TBG_BMG_NAJAAR"): - # Grassland parcels with a premium having a requirement that they cannot be - # resown -> should never be ploughed. - input_parcel_filename = f"{input_parcel_path.stem}_TBG_BMG.gpkg" - input_parcel_filtered_path = input_preprocessed_dir / input_parcel_filename + where = "ALL_BEST like '%EEB%' AND GWSCOD_V = '83' AND GWSCOD_H IN ('201', '202')" + gfo.copy_layer( + input_parcel_path, input_parcel_filtered_path, where=where, force=force + ) + input_parcel_path = input_parcel_filtered_path + + elif markertype in ("COVER_TBG_BMG_VOORJAAR", "COVER_TBG_BMG_NAJAAR"): + # Grassland parcels with a premium having a requirement that they cannot be + # resown -> should never be ploughed. + input_parcel_filename = f"{input_parcel_path.stem}_TBG_BMG.gpkg" + input_parcel_filtered_path = input_preprocessed_dir / input_parcel_filename - where = "ALL_BEST like '%TBG%' OR ALL_BEST like '%BMG%'" - gfo.copy_layer( - input_parcel_path, input_parcel_filtered_path, where=where, force=force + where = "ALL_BEST like '%TBG%' OR ALL_BEST like '%BMG%'" + gfo.copy_layer( + input_parcel_path, input_parcel_filtered_path, where=where, force=force + ) + input_parcel_path = input_parcel_filtered_path + + else: + raise ValueError(f"Invalid {markertype=}") + + # Get some general config + data_ext = conf.general["data_ext"] + geofile_ext = conf.general["geofile_ext"] + + # ------------------------------------------------------------- + # The real work + # ------------------------------------------------------------- + # STEP 1: prepare parcel data for classification and image data extraction + # ------------------------------------------------------------- + + # Prepare the input data for optimal image data extraction: + # 1) apply a negative buffer on the parcel to evade mixels + # 2) remove features that became null because of buffer + buffer = conf.timeseries.getfloat("buffer") + input_parcel_nogeo_path = ( + input_preprocessed_dir / f"{input_parcel_path.stem}{data_ext}" ) - input_parcel_path = input_parcel_filtered_path - else: - raise ValueError(f"Invalid {markertype=}") - - # Get some general config - data_ext = conf.general["data_ext"] - geofile_ext = conf.general["geofile_ext"] - - # ------------------------------------------------------------- - # The real work - # ------------------------------------------------------------- - # STEP 1: prepare parcel data for classification and image data extraction - # ------------------------------------------------------------- - - # Prepare the input data for optimal image data extraction: - # 1) apply a negative buffer on the parcel to evade mixels - # 2) remove features that became null because of buffer - buffer = conf.timeseries.getfloat("buffer") - input_parcel_nogeo_path = ( - input_preprocessed_dir / f"{input_parcel_path.stem}{data_ext}" - ) + imagedata_input_parcel_filename = ( + f"{input_parcel_path.stem}_bufm{buffer:g}{geofile_ext}" + ) + imagedata_input_parcel_path = ( + input_preprocessed_dir / imagedata_input_parcel_filename + ) + ts_helper.prepare_input( + input_parcel_path=input_parcel_path, + output_imagedata_parcel_input_path=imagedata_input_parcel_path, + output_parcel_nogeo_path=input_parcel_nogeo_path, + force=force, + ) - imagedata_input_parcel_filename = ( - f"{input_parcel_path.stem}_bufm{buffer:g}{geofile_ext}" - ) - imagedata_input_parcel_path = ( - input_preprocessed_dir / imagedata_input_parcel_filename - ) - ts_helper.prepare_input( - input_parcel_path=input_parcel_path, - output_imagedata_parcel_input_path=imagedata_input_parcel_path, - output_parcel_nogeo_path=input_parcel_nogeo_path, - force=force, - ) + # STEP 2: Calculate the timeseries data needed + # ------------------------------------------------------------- + # Get the time series data (eg. S1, S2,...) to be used for the classification + # Result: data is put in files in timeseries_periodic_dir, in one file per + # date/period + timeseries_periodic_dir = conf.paths.getpath("timeseries_periodic_dir") + timeseries_periodic_dir /= f"{imagedata_input_parcel_path.stem}" + images_to_use = conf.parse_image_config(conf.images["images"]) + + ts.calc_timeseries_data( + input_parcel_path=imagedata_input_parcel_path, + roi_bounds=tuple(conf.roi.getlistfloat("roi_bounds")), + roi_crs=pyproj.CRS.from_user_input(conf.roi.get("roi_crs")), + start_date=start_date, + end_date=end_date, + images_to_use=images_to_use, + timeseries_periodic_dir=timeseries_periodic_dir, + force=force, + ) - # STEP 2: Calculate the timeseries data needed - # ------------------------------------------------------------- - # Get the time series data (eg. S1, S2,...) to be used for the classification - # Result: data is put in files in timeseries_periodic_dir, in one file per - # date/period - timeseries_periodic_dir = conf.paths.getpath("timeseries_periodic_dir") - timeseries_periodic_dir /= f"{imagedata_input_parcel_path.stem}" - start_date = datetime.fromisoformat(conf.period["start_date"]) - end_date = datetime.fromisoformat(conf.period["end_date"]) - images_to_use = conf.parse_image_config(conf.images["images"]) + # STEP 3: Determine the cover for the parcels for all periods + # ------------------------------------------------------------- + # Loop over all periods + periods = mosaic_util._prepare_periods( + start_date, end_date, period_name="weekly", period_days=None + ) + cover_dir = run_dir / input_parcel_nogeo_path.stem + cover_dir.mkdir(parents=True, exist_ok=True) + + on_error = "warn" + parcels_cover_paths = [] + + for period in periods: + period_start_date = period["start_date"].strftime("%Y-%m-%d") + period_end_date = period["end_date"].strftime("%Y-%m-%d") + output_name = f"cover_{period_start_date}_{period_end_date}{data_ext}" + output_path = cover_dir / output_name + + try: + geo_path = output_path.with_suffix(geofile_ext) + _calc_cover( + input_parcel_path=input_parcel_path, + timeseries_periodic_dir=timeseries_periodic_dir, + images_to_use=images_to_use, + start_date=period["start_date"], + end_date=period["end_date"], + parcel_columns=None, + output_path=output_path, + output_geo_path=geo_path, + force=force, + ) + parcels_cover_paths.append(geo_path) + + except Exception as ex: + message = f"Error calculating {output_path.stem}" + if on_error == "warn": + logger.exception(f"Error calculating {output_path.stem}") + warnings.warn(message, category=RuntimeWarning, stacklevel=1) + elif on_error == "raise": + raise RuntimeError(message) from ex + else: + logger.error( + f"invalid value for on_error: {on_error}, 'raise' assumed" + ) + raise RuntimeError(message) from ex - ts.calc_timeseries_data( - input_parcel_path=imagedata_input_parcel_path, - roi_bounds=tuple(conf.roi.getlistfloat("roi_bounds")), - roi_crs=pyproj.CRS.from_user_input(conf.roi.get("roi_crs")), - start_date=start_date, - end_date=end_date, - images_to_use=images_to_use, - timeseries_periodic_dir=timeseries_periodic_dir, - force=force, - ) + # STEP 4: Create the final list of parcels + # ---------------------------------------- + # Consolidate the list of parcels that need to be controlled + parcels_selected = None + for path in parcels_cover_paths: + parcels_selected_path = path + parcels = gfo.read_file(parcels_selected_path, ignore_geometry=True) - # STEP 3: Determine the cover for the parcels for all periods - # ------------------------------------------------------------- - # Loop over all periods - periods = mosaic_util._prepare_periods( - start_date, end_date, period_name="weekly", period_days=None - ) - cover_dir = run_dir / input_parcel_nogeo_path.stem - cover_dir.mkdir(parents=True, exist_ok=True) - - on_error = "warn" - parcels_cover_paths = [] - - for period in periods: - period_start_date = period["start_date"].strftime("%Y-%m-%d") - period_end_date = period["end_date"].strftime("%Y-%m-%d") - output_name = f"cover_{period_start_date}_{period_end_date}{data_ext}" - output_path = cover_dir / output_name - - try: - geo_path = output_path.with_suffix(geofile_ext) - _calc_cover( - input_parcel_path=input_parcel_path, - timeseries_periodic_dir=timeseries_periodic_dir, - images_to_use=images_to_use, - start_date=period["start_date"], - end_date=period["end_date"], - parcel_columns=None, - output_path=output_path, - output_geo_path=geo_path, - force=force, - ) - parcels_cover_paths.append(geo_path) - - except Exception as ex: - message = f"Error calculating {output_path.stem}" - if on_error == "warn": - logger.exception(f"Error calculating {output_path.stem}") - warnings.warn(message, category=RuntimeWarning, stacklevel=1) - elif on_error == "raise": - raise RuntimeError(message) from ex + if parcels_selected is None: + parcels_selected = parcels else: - logger.error(f"invalid value for on_error: {on_error}, 'raise' assumed") - raise RuntimeError(message) from ex + parcels_selected = pd.concat([parcels_selected, parcels]) + + # Determine max probability for every parcel + assert parcels_selected is not None + input_info = gfo.get_layerinfo(input_parcel_path) + cols_to_keep = [*list(input_info.columns), "pred1"] + if "provincie" in parcels_selected.columns: + cols_to_keep.append("provincie") + + parcels_selected = ( + parcels_selected[[*cols_to_keep, "pred1_prob"]] + .sort_values("pred1_prob", ascending=False) + .groupby(conf.columns["id"], dropna=False, as_index=False) + .first() # Take the highest pred1_prob per id + ) - # STEP 4: Create the final list of parcels - # ---------------------------------------- - # Consolidate the list of parcels that need to be controlled - parcels_marker_path = ( - run_dir / f"{markertype}_{datetime.now().strftime('%Y-%m-%d')}.xlsx" - ) - parcels_selected = None - for path in parcels_cover_paths: - if markertype == "COVER_EEF_VOORJAAR": - parcels_selected_path = run_dir / f"{geo_path.stem}_EEF{geofile_ext}" - _select_parcels_EEF(path, parcels_selected_path) - else: - parcels_selected_path = path + # Add pred_consolidated based on max pred1_proba + parcels_selected["pred_consolidated"] = parcels_selected["pred1_prob"].apply( + _categorize_pred + ) - parcels = gfo.read_file(parcels_selected_path, ignore_geometry=True) + # Add pred_cons_status based on pred_consolidated + parcels_selected["pred_cons_status"] = parcels_selected[ + "pred_consolidated" + ].apply(lambda x: "NOK" if x in ("NODATA", "DOUBT") else "OK") - if parcels_selected is None: - parcels_selected = parcels - else: - parcels_selected = pd.concat([parcels_selected, parcels]) - - # Determine max probability for every parcel - assert parcels_selected is not None - input_info = gfo.get_layerinfo(input_parcel_path) - cols_to_keep = [*list(input_info.columns), "pred1"] - if "provincie" in parcels_selected.columns: - cols_to_keep.append("provincie") - - parcels_selected = ( - parcels_selected[[*cols_to_keep, "pred1_prob"]] - .sort_values("pred1_prob", ascending=False) - .groupby(conf.columns["id"], dropna=False, as_index=False) - .first() # Take the highest pred1_prob per id - ) + # Export to excel + pdh.to_excel(parcels_selected, parcels_marker_path, index=False) + pdh.to_file(parcels_selected, parcel_selected_sqlite_path, index=False) - # Add pred_consolidated based on max pred1_proba - parcels_selected["pred_consolidated"] = parcels_selected["pred1_prob"].apply( - _categorize_pred - ) + # STEP 5: reporting + # ---------------------------------------- - # Add pred_cons_status based on pred_consolidated - parcels_selected["pred_cons_status"] = parcels_selected["pred_consolidated"].apply( - lambda x: "NOK" if x in ("NODATA", "DOUBT") else "OK" + input_dir = conf.paths.getpath("input_dir") + input_groundtruth_filename = conf.calc_marker_params.getpath( + "input_groundtruth_filename" ) - # Export to excel - pdh.to_excel(parcels_selected, parcels_marker_path, index=False) - pdh.to_file( - parcels_selected, parcels_marker_path.with_suffix(".sqlite"), index=False + if input_groundtruth_filename is not None: + input_groundtruth_path = input_dir / input_groundtruth_filename + else: + input_groundtruth_path = None + + classes_refe_filename = conf.calc_marker_params.getpath("classes_refe_filename") + refe_dir = conf.paths.getpath("refe_dir") + classes_refe_path = refe_dir / classes_refe_filename + + original_input_parcel_filename = conf.calc_marker_params.getpath( + "input_parcel_filename" + ) + original_input_parcel_path = input_dir / original_input_parcel_filename + + report( + input_parcel_path=original_input_parcel_path, + parcels_selected_path=parcel_selected_sqlite_path, + ground_truth_path=input_groundtruth_path, + classes_refe_path=classes_refe_path, + id_column=conf.columns["id"], + start_date=start_date, + end_date=end_date, ) logging.shutdown() @@ -541,8 +618,10 @@ def _select_parcels_BMG_MEG_MEV_EEF( gfo.copy_layer(input_geo_path, output_geo_path, where=where) -def _select_parcels_EEF(input_geo_path: Path, output_geo_path: Path) -> None: - """Select parcels based on the cover marker.""" +def _filter_parcels_with_enough_sentinel_data( + input_geo_path: Path, output_geo_path: Path +) -> None: + """Select parcels based on the ndvi median/s1 cover.""" # Select the relevant parcels based on the cover marker info = gfo.get_layerinfo(input_geo_path) columns = {} @@ -553,203 +632,377 @@ def _select_parcels_EEF(input_geo_path: Path, output_geo_path: Path) -> None: if column_lower.endswith("_ndvi_median"): columns["ndvi_median"] = column - # Filter used in the selection of 03/2025 where = f""" - ( ALL_BEST like '%EEF%' - AND ( ( "{columns["ndvi_median"]}" <> 0 + ( ( "{columns["ndvi_median"]}" <> 0 AND "{columns["ndvi_median"]}" < 0.35 ) OR ( cover_s1 = 'bare-soil' AND "{columns["ndvi_median"]}" = 0 -- no NDVI available - ) - ) - ) + ) + ) """ gfo.copy_layer(input_geo_path, output_geo_path, where=where) -def _select_parcels(input_geo_path: Path, output_geo_path: Path) -> None: - """Select parcels based on the cover marker.""" - # Select the relevant parcels based on the cover marker - info = gfo.get_layerinfo(input_geo_path) - columns = {} - for column in info.columns: - column_lower = column.lower() - if not column_lower.startswith("s2-ndvi-weekly"): - continue - if column_lower.endswith("_ndvi_median"): - columns["ndvi_median"] = column +def report( + input_parcel_path: Path, + parcels_selected_path: Path, + ground_truth_path: Path | None = None, + classes_refe_path: Path | None = None, + id_column: str = "UID", + start_date: datetime | None = None, + end_date: datetime | None = None, + force: bool = False, +) -> None: + """Create a report for the cover marker. - # Filter used in the selection of 05/2025 - where = f""" - ( 1=1 - AND ( ( "{columns["ndvi_median"]}" <> 0 - AND "{columns["ndvi_median"]}" < 0.35 - ) - OR ( cover_s1 = 'bare-soil' - AND "{columns["ndvi_median"]}" = 0 -- no NDVI available - ) - ) - ) + Args: + input_parcel_path: Path to the original input parcel file. + parcels_selected_path: Path to the selected parcels file with predictions. + ground_truth_path: Path to the ground truth file. If None, only general + statistics are generated. + classes_refe_path: Path to the classes reference file to determine crop + id_column: Name of the ID column to use for matching parcels. + start_date: Start date of the detection period. Ground truth observations + before this date are filtered out. + end_date: End date of the detection period. Ground truth observations + after end_date + 2 months are filtered out. + force: Whether to force re-creation of the report. """ - gfo.copy_layer(input_geo_path, output_geo_path, where=where) + report_dir = parcels_selected_path.parent + report_path = report_dir / f"{parcels_selected_path.stem}_accuracy_report.html" + if report_path.exists() and not force: + logger.info(f"Report already exists, skipping: {report_path}") + return -def report() -> None: - """Create a report for the cover marker.""" - # Read parcels selected to be controlled for the cover marker - prc_selectie_path = Path( - r"X:\__IT_TEAM_ANG_GIS\Taken\2024\2024-10-16_baresoil_selectie\baresoil_selectie_2024-10-16.xlsx" - ) - force = False - force_update = True - - report_dir = prc_selectie_path.parent / "evaluatie" - - # Create output file with the result of the cover marker + the groundtruth - result_geo_path = report_dir / f"{prc_selectie_path.stem}_gt.gpkg" - if force or not result_geo_path.exists(): - prc_selectie_df = pd.read_excel(prc_selectie_path) - prc_selectie_df["selectie_reden"] = "?" - prc_selectie_df.loc[ - prc_selectie_df["ALL_BEST"].str.contains("MEV", regex=False), - "selectie_reden", - ] = "MEV" - prc_selectie_df.loc[ - prc_selectie_df["ALL_BEST"].str.contains("MEG", regex=False), - "selectie_reden", - ] = "MEG" - prc_selectie_df.loc[ - prc_selectie_df["ALL_BEST"].str.contains("EEF", regex=False), - "selectie_reden", - ] = "EEF" - prc_selectie_df.loc[ - prc_selectie_df["ALL_BEST"].str.contains("BMG", regex=False), - "selectie_reden", - ] = "BMG" - - logger.info(f"{len(prc_selectie_df)=}") - - # Read groundtruth and join with parcels - input_groundtruth_filename = "Prc_BEFL_2024_2025_02_21_groundtruth_baresoil.tsv" - # input_dir = conf.paths.getpath("input_dir") - input_dir = Path(r"X:\Monitoring\Markers\dev\_inputdata") - input_groundtruth_path = input_dir / input_groundtruth_filename - groundtruth_df = pdh.read_file(input_groundtruth_path) - prc_gt_df = prc_selectie_df.merge( - groundtruth_df, on=["ALV_NUMMER", "PRC_NMR"], how="left" - ) - logger.info(f"{len(prc_gt_df)=}") - - # Read geometries of parcels and join with controled parcels - prc_path = input_dir / "Prc_BEFL_2024_2024-10-13.gpkg" - prc_gdf = gfo.read_file(prc_path) - prc_gdf["ALV_NUMMER"] = prc_gdf["ALV_NUMMER"].astype("int64") - prc_gt_gdf = prc_gdf[["geometry", "ALV_NUMMER", "PRC_NMR"]].merge( - prc_gt_df, on=["ALV_NUMMER", "PRC_NMR"], how="inner" + logger.info("Generating report...") + stats_list = [] + + parcels_selected_df = pdh.read_file(parcels_selected_path) + logger.info(f"Loaded {len(parcels_selected_df)} parcels from results file") + total_parcels = len(parcels_selected_df) + + pred_column = None + if "pred_consolidated" in parcels_selected_df.columns: + pred_column = "pred_consolidated" + elif "pred1" in parcels_selected_df.columns: + pred_column = "pred1" + else: + logger.warning("No prediction column found in results file") + return + + pred_counts = parcels_selected_df[pred_column].value_counts() + logger.info(f"Prediction distribution: {pred_counts.to_dict()}") + + # General statistics by prediction + for pred_cat, count in pred_counts.items(): + percentage = (count / total_parcels) * 100 + stats_list.append( + { + "category": f"Prediction: {pred_cat}", + "count": count, + "percentage": percentage, + "total_parcels": total_parcels, + } ) - logger.info(f"{len(prc_gt_gdf)=}") - # Read the result of the cover marker and join with controled parcels - prc_cover_path = Path( - r"X:\Monitoring\Markers\dev\_cover_periodic\Prc_BEFL_2024_2024-10-13\cover_2024-09-30_2024-10-07.gpkg" + # Add overall summary + stats_list.insert( + 0, + { + "category": "Total parcels", + "count": total_parcels, + "percentage": 100.0, + "total_parcels": total_parcels, + }, + ) + + if ( + ground_truth_path is not None + and ground_truth_path.exists() + and classes_refe_path is not None + and classes_refe_path.exists() + ): + logger.info("Ground truth available, calculating accuracy metrics...") + classes_refe_df = pdh.read_file(classes_refe_path) + logger.info(f"Loaded {len(classes_refe_df)} crop codes from reference file") + + groundtruth_df = pdh.read_file(ground_truth_path) + logger.info(f"Loaded {len(groundtruth_df)} parcels from ground truth file") + + # Filter out records that have neither NATEELT_CTRL_COD nor VASTSTELLINGEN + # These records cannot be used for validation + if "NATEELT_CTRL_COD" in groundtruth_df.columns: + before_filter = len(groundtruth_df) + + has_nateelt = groundtruth_df["NATEELT_CTRL_COD"].notna() & ( + groundtruth_df["NATEELT_CTRL_COD"].astype(str).str.strip() != "" + ) + + has_vaststellingen = False + if "VASTSTELLINGEN" in groundtruth_df.columns: + has_vaststellingen = groundtruth_df["VASTSTELLINGEN"].notna() & ( + groundtruth_df["VASTSTELLINGEN"].astype(str).str.strip() != "" + ) + + groundtruth_df = groundtruth_df[has_nateelt | has_vaststellingen].copy() + logger.info( + f"Filtered out records without NATEELT_CTRL_COD and VASTSTELLINGEN: " + f"{before_filter} -> {len(groundtruth_df)} parcels" + ) + + if start_date is not None and "CONTROLEDATUM" in groundtruth_df.columns: + from dateutil.relativedelta import relativedelta + + groundtruth_df["CONTROLEDATUM"] = pd.to_datetime( + groundtruth_df["CONTROLEDATUM"], + format="%d/%m/%Y %H:%M:%S", + errors="coerce", + ) + before_filter = len(groundtruth_df) + + # Filter CONTROLEDATUM >= start_date + date_filter = (groundtruth_df["CONTROLEDATUM"] >= start_date) & ( + groundtruth_df["CONTROLEDATUM"].notna() + ) + + # And CONTROLEDATUM <= end_date + 2 months (if end_date provided) + if end_date is not None: + max_date = end_date + relativedelta(months=2) + date_filter = date_filter & ( + groundtruth_df["CONTROLEDATUM"] <= max_date + ) + date_range_msg = f">= {start_date.date()} and <= {max_date.date()}" + else: + date_range_msg = f">= {start_date.date()}" + + groundtruth_df = groundtruth_df[date_filter].copy() + logger.info( + f"Filtered ground truth by date ({date_range_msg}): " + f"{before_filter} -> {len(groundtruth_df)} parcels" + ) + + # Sanity check: compare ground truth with original input file + # input_parcel_df = gfo.read_file(input_parcel_path, ignore_geometry=True) + # if id_column in input_parcel_df.columns: + # input_parcel_df[id_column] = ( + # input_parcel_df[id_column].astype(str).str.strip() + # ) + # input_ids = set(input_parcel_df[id_column]) + # logger.info( + # f"SANITY CHECK - Original input file has {len(input_ids)} unique IDs" + # ) + + # if id_column in groundtruth_df.columns: + # groundtruth_df[id_column] = ( + # groundtruth_df[id_column].astype(str).str.strip() + # ) + # gt_ids_temp = set(groundtruth_df[id_column]) + # common_with_input = input_ids & gt_ids_temp + # logger.info( + # f"SANITY CHECK - Ground truth overlaps with original input: " + # f"{len(common_with_input)} IDs" + # ) + + if id_column in parcels_selected_df.columns: + parcels_selected_df[id_column] = ( + parcels_selected_df[id_column].astype(str).str.strip() + ) + if id_column in groundtruth_df.columns: + groundtruth_df[id_column] = ( + groundtruth_df[id_column].astype(str).str.strip() + ) + + groundtruth_df = groundtruth_df.merge( + classes_refe_df[["CROPCODE", "MON_VRU", "CROP_DESC"]], + left_on="NATEELT_CTRL_COD", + right_on="CROPCODE", + how="left", ) - prc_cover_df = gfo.read_file( - prc_cover_path, - table_name=prc_cover_path.stem.replace("-", "_"), - ignore_geometry=True, + + merged_df = parcels_selected_df.merge(groundtruth_df, on=id_column, how="inner") + logger.info( + f"Matched {len(merged_df)} parcels between results and ground truth " + f"(after date filtering)" ) - prc_cover_df.rename(columns={"uid": "UID"}, inplace=True) - prc_gt_gdf = prc_gt_gdf.merge(prc_cover_df, on="UID", how="inner") - - # Write the result to a gpkg - if "fid" in prc_gt_gdf.columns: - prc_gt_gdf.drop(columns=["fid"], inplace=True) - gfo.to_file(prc_gt_gdf, result_geo_path) - - s2_ndvi = "s2-ndvi-weekly_20240930_ndvi_median" - expression = f""" - CASE - WHEN "{s2_ndvi}" = 0 THEN 'NODATA' - WHEN ( (selectie_reden = 'BMG' AND "{s2_ndvi}" < 0.25) OR - (selectie_reden <> 'BMG' AND "{s2_ndvi}" < 0.3) - ) THEN 'bare-soil' - ELSE 'other' - END - """ - gfo.add_column( - result_geo_path, - "cover_ndvi", - "TEXT", - expression=expression, - force_update=force_update, - ) - expression_template = """ - CASE - WHEN ctl_vastst IS NULL AND hoofdteelt_ctl IS NULL AND nateelt_ctl IS NULL THEN 'no_groundtruth' - WHEN {cover_col} = 'NODATA' THEN 'NODATA' - WHEN selectie_reden = 'BMG' AND {cover_col} = 'bare-soil' AND ctl_vastst IS NOT NULL THEN 'yes' - WHEN selectie_reden = 'BMG' AND {cover_col} = 'bare-soil' - AND hoofdteelt_ctl IS NOT NULL AND hoofdteelt_ctl NOT IN ('60', '63', '638', '660', '700', '745', '9827', '9828') THEN 'yes' - WHEN selectie_reden = 'BMG' AND {cover_col} <> 'bare-soil' AND ctl_vastst IS NULL THEN 'yes' - WHEN selectie_reden = 'EEF' AND {cover_col} = 'bare-soil' - AND ((hoofdteelt_ctl IS NOT NULL AND hoofdteelt_ctl <> '98') - OR (nateelt_ctl IS NOT NULL AND nateelt_ctl <> '98') - ) THEN 'yes' - WHEN selectie_reden = 'EEF' AND {cover_col} = 'bare-soil' AND ctl_vastst IS NOT NULL THEN 'yes' - WHEN selectie_reden = 'EEF' AND {cover_col} <> 'bare-soil' AND (hoofdteelt_ctl IS NULL OR hoofdteelt_ctl = '98') THEN 'yes' - WHEN selectie_reden = 'MEV' AND {cover_col} = 'bare-soil' - AND ((hoofdteelt_ctl IS NOT NULL AND hoofdteelt_ctl NOT IN ('660', '700', '723', '732')) - OR ctl_vastst IS NOT NULL - ) THEN 'yes' - WHEN selectie_reden = 'MEV' AND {cover_col} <> 'bare-soil' - AND (hoofdteelt_ctl IS NULL OR hoofdteelt_ctl IN ('660', '700', '723', '732')) THEN 'yes' - WHEN selectie_reden = 'MEG' AND {cover_col} = 'bare-soil' - AND ((hoofdteelt_ctl IS NOT NULL AND hoofdteelt_ctl NOT IN ('63')) - OR ctl_vastst IS NOT NULL - ) THEN 'yes' - WHEN selectie_reden = 'MEG' AND {cover_col} <> 'bare-soil' - AND (hoofdteelt_ctl IS NULL OR hoofdteelt_ctl IN ('63')) THEN 'yes' - ELSE 'no' - END - """ # noqa: E501 - gfo.add_column( - result_geo_path, - "ndvi_correct", - "TEXT", - expression=expression_template.format(cover_col="cover_ndvi"), - force_update=force_update, - ) + if len(merged_df) > 0: + + def determine_gt_cover(row: pd.Series) -> str: + vaststellingen = row.get("VASTSTELLINGEN") + onbedekt_vaststellingen = [ + "PCSCHEUR", + "PCPLOZM", + "PC35ET", + "PC35ME", + "PC35EG", + "PCVEGWEG", + ] + if not pd.isna(vaststellingen): + vaststellingen_str = str(vaststellingen).upper() + if any( + vaststelling in vaststellingen_str + for vaststelling in onbedekt_vaststellingen + ): + return "ONBEDEKT" + + # fallback to the MON_VRU mapping + mon_vru = row.get("MON_VRU") + if pd.isna(mon_vru): + return "NODATA" + mon_vru_str = str(mon_vru) + if mon_vru_str == "MON_VRU_BRAAK": + return "ONBEDEKT" + else: + return "BEDEKT" + + merged_df["gt_cover"] = merged_df.apply(determine_gt_cover, axis=1) + merged_df["correct"] = merged_df["gt_cover"] == merged_df[pred_column] + + valid_df = merged_df[ + (merged_df["gt_cover"] != "NODATA") + & (merged_df[pred_column] != "NODATA") + # & (merged_df[pred_column] != "DOUBT") + ].copy() + + logger.info(f"Valid parcels for accuracy calculation: {len(valid_df)}") + + if len(valid_df) > 0: + total_valid = len(valid_df) + total_correct = valid_df["correct"].sum() + accuracy = total_correct / total_valid if total_valid > 0 else 0 + + stats_list.append( + { + "category": "--- ACCURACY METRICS ---", + "count": "", + "percentage": "", + "total_parcels": "", + } + ) + stats_list.append( + { + "category": "Overall Accuracy", + "count": f"{total_correct}/{total_valid}", + "percentage": accuracy * 100, + "total_parcels": "", + } + ) + + for gt_cat in valid_df["gt_cover"].unique(): + gt_subset = valid_df[valid_df["gt_cover"] == gt_cat] + if len(gt_subset) > 0: + correct = gt_subset["correct"].sum() + total = len(gt_subset) + stats_list.append( + { + "category": f"Accuracy for GT: {gt_cat}", + "count": f"{correct}/{total}", + "percentage": (correct / total) * 100, + "total_parcels": "", + } + ) - gfo.add_column( - result_geo_path, - "s1_correct", - "TEXT", - expression=expression_template.format(cover_col="cover_s1"), - force_update=force_update, + logger.info( + f"Overall accuracy: {accuracy:.2%} ({total_correct}/{total_valid})" + ) + else: + logger.warning("No parcels matched between results and ground truth.") + merged_df = parcels_selected_df + else: + logger.info("No ground truth provided, generating general statistics only.") + merged_df = parcels_selected_df + + stats_df = pd.DataFrame(stats_list) + + report_name = f"{parcels_selected_path.stem}_accuracy_report.html" + report_html_path = report_dir / report_name + + html_parts = [] + html_parts.append("
") + html_parts.append("| {col} | \n" + html_table += "
|---|
| {row[col]} | \n" + html_table += "