diff --git a/src/post_processing/dataclass/data_aplose.py b/src/post_processing/dataclass/data_aplose.py index e1d3cae..ce2d9da 100644 --- a/src/post_processing/dataclass/data_aplose.py +++ b/src/post_processing/dataclass/data_aplose.py @@ -431,14 +431,15 @@ def plot( season = kwargs.get("season", False) effort = kwargs.get("effort") - return scatter(df=df_filtered, - ax=ax, - time_range=time, - show_rise_set=show_rise_set, - season=season, - coordinates=self.coordinates, - effort=effort, - ) + return scatter( + df=df_filtered, + ax=ax, + time_range=time, + show_rise_set=show_rise_set, + season=season, + coordinates=self.coordinates, + effort=effort, + ) if mode == "agreement": bin_size = kwargs.get("bin_size") @@ -452,7 +453,11 @@ def plot( label, ) - return timeline(df=df_filtered, ax=ax, color=color) + return timeline( + df=df_filtered, + ax=ax, + color=color, + ) msg = f"Unsupported plot mode: {mode}" raise ValueError(msg) diff --git a/src/post_processing/utils/core_utils.py b/src/post_processing/utils/core_utils.py index c149155..d0125ae 100644 --- a/src/post_processing/utils/core_utils.py +++ b/src/post_processing/utils/core_utils.py @@ -8,7 +8,8 @@ import astral import easygui import numpy as np -from astral.sun import sunrise, sunset +from astral import LocationInfo +from astral.sun import sunrise, sunset, sun from matplotlib import pyplot as plt from osekit.config import TIMESTAMP_FORMAT_AUDIO_FILE from osekit.utils.timestamp_utils import strftime_osmose_format, strptime_from_text @@ -579,3 +580,44 @@ def timedelta_to_str(td: Timedelta) -> str: if seconds % 60 == 0: return f"{seconds // 60}min" return f"{seconds}s" + + +def assign_light_regime( + ts: Timestamp, + lat: float | None = None, + lon: float | None = None, +) -> DataFrame: + """Assign daylight regime to temporal events. + + Parameters + ---------- + ts: Timestamp + Timestamp to assign a light regime to. + lat: float + The latitude of corresponding point. + lon: float + The longitude of corresponding point. + + Returns + ------- + DataFrame + The same dataframe with the column daytime. + + """ + if not all([lat, lon]): + lat, lon = get_coordinates() + + # Get sun times for given location + location = LocationInfo(latitude=lat, longitude=lon) + s = sun(location.observer, date=ts.date()) + + if ts < s['dawn']: + return 'night' + elif ts < s['sunrise']: + return 'dawn' + elif ts < s['sunset']: + return 'day' + elif ts < s['dusk']: + return 'dusk' + else: + return 'night' diff --git a/src/post_processing/utils/filtering_utils.py b/src/post_processing/utils/filtering_utils.py index faf2fd7..3d4832b 100644 --- a/src/post_processing/utils/filtering_utils.py +++ b/src/post_processing/utils/filtering_utils.py @@ -486,6 +486,7 @@ def _process_annotator_label_pair( ] file_vector = [ filename_vector[i] for i, detected in enumerate(detect_vec) if detected + # filename_vector[i + 1] for i, detected in enumerate(detect_vec) if detected ] if not start_datetime: @@ -510,8 +511,8 @@ def reshape_timebin( timebin_new: Timedelta The size of the new time bin. timestamp_audio: list[Timestamp] - A list of Timestamp objects corresponding to the start of each wav - that corresponds to a detection + A list of Timestamp objects corresponding to the shape + in which the data should be reshaped. Returns ------- @@ -572,10 +573,10 @@ def get_filename_timestamps(df: DataFrame, date_parser: str) -> list[Timestamp]: """ tz = get_timezone(df) timestamps = [ - strptime_from_text( - ts, - datetime_template=date_parser, - ) for ts in df["filename"] + strptime_from_text( + ts, + datetime_template=date_parser, + ) for ts in df["filename"] ] if all(t.tz is None for t in timestamps): diff --git a/src/post_processing/utils/fpod_utils.py b/src/post_processing/utils/fpod_utils.py index 20fd904..cd13e58 100644 --- a/src/post_processing/utils/fpod_utils.py +++ b/src/post_processing/utils/fpod_utils.py @@ -1,716 +1,613 @@ +"""FPOD/ CPOD processing functions.""" + from __future__ import annotations -from pathlib import Path +import logging from typing import TYPE_CHECKING +import matplotlib.dates as mdates import pytz import seaborn as sns +from matplotlib import patches from matplotlib import pyplot as plt -from osekit.config import TIMESTAMP_FORMAT_AUDIO_FILE from osekit.utils.timestamp_utils import strftime_osmose_format, strptime_from_text from pandas import ( DataFrame, + DateOffset, Series, Timedelta, Timestamp, concat, - date_range, notna, read_csv, - read_excel, to_datetime, + to_numeric, + to_timedelta, ) -from post_processing import logger -from post_processing.utils.core_utils import get_coordinates, get_sun_times +from post_processing.utils.filtering_utils import find_delimiter +from user_case.config import season_color, site_colors if TYPE_CHECKING: + from pathlib import Path import pytz +logger = logging.getLogger(__name__) + -def fpod2aplose( +def pod2aplose( df: DataFrame, tz: pytz.timezone, dataset_name: str, annotation: str, - bin_size: int = 60, + annotator: str, + bin_size: Timedelta, ) -> DataFrame: - """Format FPOD DataFrame to match APLOSE format. + """Format PODs DataFrame to match an APLOSE format. Parameters ---------- df: DataFrame FPOD result dataframe tz: pytz.timezone - Timezone object to get non-naïve datetimes + Timezone object to get non-naïve datetime. dataset_name: str - dataset name + dataset name. annotation: str - annotation name - bin_size: int - Duration of the detections in seconds + annotation name. + annotator: str + annotator name. + bin_size: Timedelta + Duration of the detections in seconds. Returns ------- DataFrame - An APLOSE formatted DataFrame + An APLOSE formatted DataFrame. """ - fpod_start_dt = sorted( - [ - tz.localize(strptime_from_text(entry, "%d/%m/%Y %H:%M")) - for entry in df["Date heure"] - ], - ) - - fpod_end_dt = sorted( - [entry + Timedelta(seconds=bin_size) for entry in fpod_start_dt], - ) + fpod_start_dt = [ + tz.localize(entry) + for entry in df["Datetime"] + ] data = { "dataset": [dataset_name] * len(df), - "filename": [""] * len(df), + "filename": list(fpod_start_dt), "start_time": [0] * len(df), - "end_time": [bin_size] * len(df), + "end_time": [bin_size.total_seconds()] * len(df), "start_frequency": [0] * len(df), "end_frequency": [0] * len(df), "annotation": [annotation] * len(df), - "annotator": ["FPOD"] * len(df), - "start_datetime": [strftime_osmose_format(entry) for entry in fpod_start_dt], - "end_datetime": [strftime_osmose_format(entry) for entry in fpod_end_dt], - "is_box": [0] * len(df), + "annotator": [annotator] * len(df), + "start_datetime": [ + strftime_osmose_format(entry.floor(bin_size)) for entry in fpod_start_dt + ], + "end_datetime": [ + strftime_osmose_format(entry.ceil(bin_size)) for entry in fpod_start_dt + ], + "type": ["WEAK"] * len(df), + "deploy": df["Deploy"].tolist(), } return DataFrame(data) -def cpod2aplose( - df: DataFrame, - tz: pytz.BaseTzInfo, - dataset_name: str, - annotation: str, - bin_size: int = 60, - extra_columns: list | None = None, +def load_pod_folder( + folder: Path, + ext: str, ) -> DataFrame: - """Format CPOD DataFrame to match APLOSE format. + """Read POD's result files from a folder. Parameters ---------- - df: DataFrame - CPOD result dataframe - tz: pytz.BaseTzInfo - Timezone object to get non-naïve datetimes - dataset_name: str - dataset name - annotation: str - annotation name - bin_size: int, optional - Duration of the detections in seconds - extra_columns: list, optional - Additional columns added from df to data + folder: Path + Folder's place. + ext: str + File extension of result files. Returns ------- DataFrame - An APLOSE formatted DataFrame + Concatenated data. + + Raises + ------ + ValueError + If no result files are found. """ - df_cpod = df.rename(columns={"ChunkEnd": "Date heure"}) + if ext not in {"csv", "txt"}: + msg = f"Invalid file extension: {ext}" + raise ValueError(msg) - # remove lines where the C-POD stopped working - df_cpod = df_cpod.drop( - df_cpod.loc[df_cpod["Date heure"] == " at minute "].index, - ) - data = fpod2aplose(df_cpod, tz, dataset_name, annotation, bin_size) - data["annotator"] = data.loc[data["annotator"] == "FPOD"] = "CPOD" - if extra_columns: - for col in extra_columns: - if col in df_cpod.columns: - data[col] = df_cpod[col].tolist() - else: - msg = f"Column '{col}' does not exist and will be ignored." - logger.warning(msg) + all_files = sorted(folder.rglob(f"*.{ext}")) - return DataFrame(data) + if not all_files: + msg = f"No .{ext} files found in {folder}" + raise ValueError(msg) + all_data = [] + for file in all_files: + sep = find_delimiter(file) + df = read_csv( + file, + sep=sep, + ) -def usable_data_phase( - d_meta: DataFrame, - df: DataFrame, - dpl: str, -) -> DataFrame: - """Calculate the percentage of usable data. + df["Deploy"] = file.stem.strip().lower().replace(" ", "_") + all_data.append(df) - Considering the deployment dates and the collected data. + data = concat(all_data, ignore_index=True) - Parameters - ---------- - df: DataFrame - CPOD result DataFrame - d_meta: DataFrame - Metadata DataFrame with deployments information (previously exported as json) - dpl: str - Deployment of interest where percentage of usable data will be calculated + if ext == "csv": + return _process_csv_data(data) + if ext == "txt": + return _process_txt_data(data) - Returns - ------- - DataFrame - Returns the percentage of usable datas in the chosen phase + msg = f"Could not load {ext} result folder" + raise ValueError(msg) - """ - d_meta.loc[:, ["deployment_date", "recovery_date"]] = d_meta[ - ["deployment_date", "recovery_date"] - ].apply( - to_datetime, - ) - df["start_datetime"] = to_datetime(df["start_datetime"]) - phase = d_meta.loc[d_meta["name"] == dpl].reset_index() - data = df.loc[df["name"] == dpl].reset_index() - start_date = phase.loc[0, "deployment_date"] - end_date = phase.loc[0, "recovery_date"] +def _process_csv_data(data: DataFrame) -> DataFrame: + """Process CSV data with filtering and datetime conversion.""" + data_filtered = _filter_csv_data(data) + data_filtered["Datetime"] = [ + strptime_from_text(dt, "%d/%m/%Y %H:%M") + for dt in data_filtered["ChunkEnd"] + ] + return data_filtered.sort_values(by=["Datetime"]).reset_index(drop=True) + - # Calculate the percentage of collected data on the phase length of time - if data.empty: - percentage_data = 0 - msg = "No data for this phase" +def _filter_csv_data(data: DataFrame) -> DataFrame: + """Filter CSV data based on available columns.""" + if "%TimeLost" in data.columns: + data_filtered = data[data["File"].notna()].copy() + data_filtered = data_filtered[data_filtered["Nall/m"].notna()] else: - df_end = data.loc[data.index[-1], "start_datetime"] - df_start = data.loc[data.index[0], "start_datetime"] - act_length = df_end - df_start - p_length = end_date - start_date - percentage_data = act_length * 100 / p_length - msg = f"Percentage of usable data : {percentage_data}%" + data_filtered = data[data["DPM"] > 0].copy() + data_filtered = data_filtered[data_filtered["Nall"].notna()] - logger.info(msg) - return percentage_data + return data_filtered -def meta_cut_aplose( - d_meta: DataFrame, - df: DataFrame, -) -> DataFrame: - """From APLOSE DataFrame with all rows to filtered DataFrame. +def _process_txt_data(data: DataFrame) -> DataFrame: + """Process TXT data with datetime conversion.""" + data["Datetime"] = data.apply(get_feeding_buzz_datetime, axis=1) + return data.drop_duplicates().sort_values(by=["Datetime"]).reset_index(drop=True) - Parameters - ---------- - df: DataFrame - CPOD result dataframe - d_meta: DataFrame - Metadata dataframe with deployments information (previously exported as json) - Returns - ------- - DataFrame - An APLOSE DataFrame with data from beginning to end of each deployment. - Returns the percentage of usable datas. +def get_feeding_buzz_datetime(row: Series) -> Timestamp: + """Convert feeding buzz timestamp into a standard Timestamp. + The conversion method differs based on the POD type. """ - d_meta.loc[:, ["deployment_date", "recovery_date"]] = d_meta[ - ["deployment_date", "recovery_date"] - ].apply(to_datetime) - df["start_datetime"] = to_datetime( - df["start_datetime"], - format=TIMESTAMP_FORMAT_AUDIO_FILE, - ) - - # Add DPM column - df["DPM"] = (df["Nfiltered"] > 0).astype(int) - - # Extract corresponding line - campaign = df.iloc[0]["dataset"] - phase = d_meta.loc[d_meta["name"] == campaign].reset_index() - start_date = phase.loc[0, "deployment_date"] - end_date = phase.loc[0, "recovery_date"] - df = df[ - (df["start_datetime"] >= start_date) & (df["start_datetime"] <= end_date) - ].copy() - - # Calculate the percentage of collected data on the phase length of time - if df.empty: - msg = "No data for this phase" - else: - df_end = df.loc[df.index[-1], "start_datetime"] - df_start = df.loc[df.index[0], "start_datetime"] - act_length = df_end - df_start - p_length = end_date - start_date - percentage_data = act_length * 100 / p_length - on = int(df.loc[df.MinsOn == 1, "MinsOn"].count()) - percentage_on = percentage_data * (on / len(df)) - msg = f"Percentage of usable data : {percentage_on}%" - - logger.info(msg) - return df + try: + return ( + to_datetime("1900-01-01") + + to_timedelta(row["Minute"], unit="min") + + to_timedelta(row["microsec"] / 1e6, unit="sec") + - to_timedelta(2, unit="D") + ) + except (KeyError, TypeError, ValueError): + pass + try: + return strptime_from_text( + f"{row['Minute']}:{int(str(row['microsec'])[0]):02d}.{int(str(row['microsec'])[1:])}", + "%-d/%-m/%Y %H:%M:%S.%f", + ) + except (KeyError, TypeError, ValueError): + pass -def format_calendar(path: Path) -> DataFrame: - """Format calendar. + msg = "Could not convert feeding buzz timestamp." + raise ValueError(msg) - Parameters - ---------- - path: Path - Excel calendar path - """ - df_calendar = read_excel(path) - df_calendar = df_calendar[df_calendar["Site group"] == "Data"].copy() - - return df_calendar.rename( - columns={ - "Start": "start_datetime", - "Stop": "end_datetime", - "Site": "site.name", - }, - ) - - -def dpm_to_dph( +def process_feeding_buzz( df: DataFrame, - tz: pytz.BaseTzInfo, - dataset_name: str, - annotation: str, - bin_size: int = 3600, - extra_columns: list | None = None, + species: str, ) -> DataFrame: - """From CPOD result DataFrame to APLOSE formatted DataFrame. + """Process a POD feeding buzz detection DataFrame. + + Give the feeding buzz duration, depending on the studied species + (`delphinid`, `porpoise` or `commerson`). Parameters ---------- df: DataFrame - CPOD result DataFrame - tz: pytz.BaseTzInfo - Timezone object to get timezone-aware datetimes - dataset_name: str - dataset name - annotation: str - annotation name - bin_size: int - Duration of the detections in seconds - extra_columns: list, optional - Additional columns added from df to data + Path to cpod.exe feeding buzz file + species: str + Select the species to use between porpoise and Commerson's dolphin Returns ------- DataFrame - An APLOSE DataFrame + Containing all ICIs for every positive minute to click """ - df["start_datetime"] = to_datetime(df["start_datetime"], utc=True) - df["end_datetime"] = to_datetime(df["end_datetime"], utc=True) - df["Date heure"] = df["start_datetime"].dt.floor("h") - dph = df.groupby(["Date heure"])["DPM"].sum().reset_index() - dph["Date heure"] = dph["Date heure"].apply( - lambda x: Timestamp(x).strftime(format="%d/%m/%Y %H:%M:%S"), - ) - - return cpod2aplose(dph, tz, dataset_name, annotation, bin_size, extra_columns) - + df["ICI"] = df["Datetime"].diff() + df["Datetime"] = df["Datetime"].dt.floor("min") + + if species.lower() == "delphinid": # Herzing et al., 2014 + df["Buzz"] = df["ICI"].between( + Timedelta(0), + Timedelta(seconds=0.02), + ).astype(int) + elif species.lower() == "porpoise": # Nuuttila et al., 2013 + df["Buzz"] = df["ICI"].between( + Timedelta(0), + Timedelta(seconds=0.01), + ).astype(int) + elif species.lower() == "commerson": # Reyes Reyes et al., 2015 + df["Buzz"] = df["ICI"].between( + Timedelta(0), + Timedelta(seconds=0.005), + ).astype(int) + else: + msg = "This species is not supported" + raise ValueError(msg) -def assign_phase( - meta: DataFrame, - data: DataFrame, - site: str, -) -> DataFrame: - """Add a column to an APLOSE DataFrame to specify the name of the phase. + df_buzz = df.groupby(["Datetime"])["Buzz"].sum().reset_index() + df_buzz["Foraging"] = to_numeric( + df_buzz["Buzz"] != 0, downcast="integer", + ).astype(int) - The name of the phase is attributed according to metadata. + return df_buzz - Parameters - ---------- - meta: DataFrame - Metadata dataframe with deployments information (previously exported as json). - data: DataFrame - Contain positive hours to detections. - site: str - Name of the site you wish to assign phases to. - Returns - ------- - DataFrame - The same dataframe with the column Phase. +def process_timelost(df: DataFrame, threshold: int = 0) -> DataFrame: + """Process TimeLost DataFrame. - """ - data["start_datetime"] = to_datetime(data["start_datetime"], utc=True) - meta["deployment_date"] = to_datetime(meta["deployment_date"], utc=True) - meta["recovery_date"] = to_datetime(meta["recovery_date"], utc=True) - - meta = meta[meta["site.name"] == site].copy() - - data["name"] = None - for _, meta_row in meta.iterrows(): - j = 0 - while j < len(data): - if ( - meta_row["deployment_date"] - <= data.loc[j, "start_datetime"] - < meta_row["recovery_date"] - ): - data.loc[j, "name"] = meta_row["name"] - j += 1 - return data - - -def assign_phase_simple( - meta: DataFrame, - data: DataFrame, -) -> DataFrame: - """Add column to an Aplose DataFrame to specify the phase, according to metadata. + Returns relevant columns and reshape into hourly data. Parameters ---------- - meta: DataFrame - Metadata dataframe with deployments information (previously exported as json). - data: DataFrame - Contain positive hours to detections. + df: DataFrame + All your Environmental data files. + threshold: float + TimeLost threshold. Returns ------- - DataFrame - The same dataframe with the column Phase. + %TimeLost DataFrame. """ - data["start_datetime"] = to_datetime(data["start_datetime"], utc=True) - data["end_datetime"] = to_datetime(data["end_datetime"], dayfirst=True, utc=True) - meta["deployment_date"] = to_datetime(meta["deployment_date"], utc=True) - meta["recovery_date"] = to_datetime(meta["recovery_date"], utc=True) - meta["deployment_date"] = meta["deployment_date"].dt.floor("d") - meta["recovery_date"] = meta["recovery_date"].dt.floor("d") - - data["name"] = None - for site in data["site.name"].unique(): - site_meta = meta[meta["site.name"] == site] - site_data = data[data["site.name"] == site] - - for _, meta_row in site_meta.iterrows(): - time_filter = ( - meta_row["deployment_date"] <= site_data["start_datetime"] - ) & (site_data["start_datetime"] < meta_row["recovery_date"]) - data.loc[site_data.index[time_filter], "name"] = meta_row["name"] - - return data - + if threshold not in range(101): + msg = "Threshold must integer between 0 and 100." + raise ValueError(msg) + + df["Datetime"] = df["Datetime"].dt.floor("h") + cols_to_drop = [ + col for col in df.columns if col not in { + "File", "Datetime", "Temp", "Angle", "%TimeLost", "Deploy", + } + ] + return df[df["%TimeLost"] <= threshold].drop( + columns=cols_to_drop, + ).sort_values(["Datetime"]).reset_index(drop=True) -def generate_hourly_detections(meta: DataFrame, site: str) -> DataFrame: - """Create a DataFrame with one line per hour between start and end dates. - Keep the number of detections per hour between these dates. +def create_matrix( + df: DataFrame, + group_cols: list, + agg_cols: list, +) -> DataFrame: + """Create a stats matrix (mean & std). Parameters ---------- - meta: DataFrame - Metadata dataframe with deployments information (previously exported as json) - site: str - A way to isolate the site you want to work on. + df : DataFrame + Extended frame with raw data to calculate stats for + group_cols : list + Additional columns to group by + agg_cols : list + Columns to aggregate Returns ------- - DataFrame - A full period of time with positive and negative hours to detections. + Give a matrix of the data in [agg_cols] grouped by [group_cols]. """ - df_meta = meta[meta["site.name"] == site].copy() - df_meta["deployment_date"] = to_datetime(df_meta["deployment_date"]) - df_meta["recovery_date"] = to_datetime(df_meta["recovery_date"]) - df_meta["deployment_date"] = df_meta["deployment_date"].dt.floor("h") - df_meta["recovery_date"] = df_meta["recovery_date"].dt.floor("h") - df_meta = df_meta.sort_values(by=["deployment_date"]) - - records = [ - {"name": row["name"], "start_datetime": date} - for _, row in df_meta.iterrows() - for date in date_range( - start=row["deployment_date"], end=row["recovery_date"], freq="h", - ) - ] + matrix = df.groupby(group_cols).agg({ + col: ["mean", "std"] for col in agg_cols + }) + matrix = matrix.reset_index() - return DataFrame(records) + matrix.columns = group_cols + [f"{col}_{stat}" + for col in agg_cols + for stat in ["mean", "std"]] + return matrix -def merging_tab(meta: DataFrame, data: DataFrame) -> DataFrame: - """Create a DataFrame with one line per hour between start and end dates. +def percent_calc( + data: DataFrame, + time_unit: str | None = None, +) -> DataFrame: + """Calculate the percentage of clicks, feeding buzzes and positive hours to detection. - Keep the number of detections per hour between these dates. + Computed on the entire effort and for every site. Parameters ---------- - meta: DataFrame - Metadata with deployments information (previously exported as json) data: DataFrame - Contain positive hours to detections + All values concatenated + + time_unit: str + Time unit you want to group your data in Returns ------- DataFrame - A full period of time with positive and negative hours to detections. """ - data["start_datetime"] = to_datetime(data["start_datetime"], utc=True) - meta["start_datetime"] = to_datetime(meta["start_datetime"], utc=True) - - deploy_detec = data["name"].unique() - df_filtered = meta[meta["name"].isin(deploy_detec)] + group_cols = ["site.name"] + if time_unit is not None: + group_cols.insert(0, time_unit) - output = df_filtered.merge( - data[["name", "start_datetime", "DPM", "Nfiltered"]], - on=["name", "start_datetime"], - how="outer", + # Aggregate and compute metrics + df = ( + data.groupby(group_cols) + .agg( + { + "DPh": "sum", + "DPM": "sum", + "Day": "size", + "Foraging": "sum", + }, + ) + .reset_index() ) - output["DPM"] = output["DPM"].fillna(0) - output["Nfiltered"] = output["Nfiltered"].fillna(0) - - output["Day"] = output["start_datetime"].dt.day - output["Month"] = output["start_datetime"].dt.month - output["Year"] = output["start_datetime"].dt.year - output["hour"] = output["start_datetime"].dt.hour - - return output + df["%click"] = df["DPM"] * 100 / (df["Day"] * 60) + df["%DPh"] = df["DPh"] * 100 / df["Day"] + df["FBR"] = df.apply( + lambda row: (row["Foraging"] * 100 / row["DPM"]) if row["DPM"] > 0 else 0, + axis=1) + df["%buzzes"] = df["Foraging"] * 100 / (df["Day"] * 60) + return df -def feeding_buzz(df: DataFrame, species: str) -> DataFrame: - """Process a CPOD/FPOD feeding buzz detection file. - Gives the feeding buzz duration, depending on the studied species. +def site_percent(df: DataFrame, metric: str) -> None: + """Plot a graph with the percentage of minutes positive to detection for every site. Parameters ---------- df: DataFrame - Path to cpod.exe feeding buzz file - species: str - Select the species to use between porpoise and Commerson's dolphin - - Returns - ------- - DataFrame - Containing all ICIs for every positive minutes to clicks + All percentages grouped by site + metric: str + Type of percentage you want to show on the graph """ - df.columns = df.columns.str.upper() - df["MICROSEC"] = df["MICROSEC"] / 1e6 - col = "DATE HEURE MINUTE" - col2 = "HEURE MINUTE" - if col in df.columns: - df[["DATE", "HEURE", "MINUTE"]] = df[col].str.split(" ", expand=True) - df["Time"] = (df["DATE"].astype(str) + " " + - df["HEURE"].astype(str) + ":" + - df["MINUTE"].astype(str) + ":" + - df["MICROSEC"].astype(str)) - df["Time"] = to_datetime(df["Time"], dayfirst=True) - elif col2 in df.columns: - df[["HEURE", "MINUTE"]] = df[col2].str.split(" ", expand=True) - df["Time"] = (df["DATE"].astype(str) + " " + - df["HEURE"].astype(str) + ":" + - df["MINUTE"].astype(str) + ":" + - df["MICROSEC"].astype(str)) - df["Time"] = to_datetime(df["Time"], dayfirst=True) - else: - df["Time"] = (df["MINUTE"].astype(str) + ":" + df["MICROSEC"].astype(str)) - df["Time"] = to_datetime(df["Time"], dayfirst=True) - - df = df.sort_values(by="Time").reset_index(drop=True) - df["ICI"] = df["Time"].diff().dt.total_seconds() - - df["Buzz"] = 0 - if species == "Porpoise": - feeding_idx = df.index[df["ICI"] < 0.01] - else: - feeding_idx = df.index[df["ICI"] >= 0.005] - - df.loc[feeding_idx, "Buzz"] = 1 - df.loc[feeding_idx - 1, "Buzz"] = 1 - df.loc[df.index < 0, "Buzz"] = 0 - - df["start_datetime"] = df["Time"].dt.floor("min") - df["start_datetime"] = to_datetime(df["start_datetime"], dayfirst=False, utc=True) - f = df.groupby(["start_datetime"])["Buzz"].sum().reset_index() - - f["Foraging"] = (f["Buzz"] != 0).astype(int) - - return f + ax = sns.barplot( + data=df, + x="site.name", + y=metric, + hue="site.name", + dodge=False, + palette=site_colors, + ) + ax.set_title(f"{metric} per site") + ax.set_ylabel(f"{metric}") + if metric in {"%buzzes", "FBR"}: + for _, bar in enumerate(ax.patches): + bar.set_hatch("/") + plt.show() -def assign_daytime( - df: DataFrame, -) -> DataFrame: - """Assign datetime categories to events. - - Categorize daytime of the detection (among 4 categories). +def year_percent(df: DataFrame, metric: str) -> None: + """Plot a graph with the percentage of minutes positive to detection per site/year. Parameters ---------- df: DataFrame - Contains positive hours to detections. - - Returns - ------- - DataFrame - The same dataframe with the column daytime. + All percentages grouped by site and year + metric: str + Type of percentage you want to show on the graph """ - start = df.iloc[0]["Time"] - stop = df.iloc[-1]["Time"] - lat, lon = get_coordinates() - _, _, dawn, day, dusk, night = get_sun_times(start, stop, lat, lon) - dawn = Series(dawn, name="dawn") - day = Series(day, name="day") - dusk = Series(dusk, name="dusk") - night = Series(night, name="night") - jour = concat([day, night, dawn, dusk], axis=1) - - for i, row in df.iterrows(): - dpm_i = row["Time"] - if notna(dpm_i): # Check if time is not NaN - jour_i = jour[ - (jour["dusk"].dt.year == dpm_i.year) & - (jour["dusk"].dt.month == dpm_i.month) & - (jour["dusk"].dt.day == dpm_i.day) - ] - if not jour_i.empty: # Ensure there"s a matching row - jour_i = jour_i.iloc[0] # Extract first match - if dpm_i <= jour_i["day"]: - df.loc[i, "REGIME"] = 1 - elif dpm_i < jour_i["dawn"]: - df.loc[i, "REGIME"] = 2 - elif dpm_i < jour_i["dusk"]: - df.loc[i, "REGIME"] = 3 - elif dpm_i > jour_i["night"]: - df.loc[i, "REGIME"] = 1 - elif dpm_i > jour_i["dusk"]: - df.loc[i, "REGIME"] = 4 - else: - df.loc[i, "REGIME"] = 1 - - return df - + sites = df["site.name"].unique() + n_sites = len(sites) + fig, axs = plt.subplots(n_sites, 1, figsize=(14, 2.5 * n_sites), sharex=True) + if n_sites == 1: + axs = [axs] + for i, site in enumerate(sorted(sites)): + site_data = df[df["site.name"] == site] + ax = axs[i] + ax.bar( + site_data["Year"], + site_data[metric], + label=f"Site {site}", + color=site_colors.get(site, "gray"), + ) + ax.set_title(f"{site}") + ax.set_ylim(0, max(df[metric]) + 0.2) + ax.set_ylabel(metric) + if i != 3: + ax.set_xlabel("") + else: + ax.set_xlabel("Year") + if metric in {"%buzzes", "FBR"}: + for _, bar in enumerate(ax.patches): + bar.set_hatch("/") + fig.suptitle(f"{metric} per year", fontsize=16) + plt.show() -def process_files_in_folder(folder_path: Path, species: str) -> DataFrame: - """Process a folder containing all CPOD/FPOD feeding buzz detection files. - Apply the feeding buzz function to these files. +def ym_percent(df: DataFrame, metric: str) -> None: + """Plot a graph with the percentage of DPM per site/month-year. Parameters ---------- - folder_path: Path - Path to the folder. - species: str - Select the species to use between porpoise and Commerson's dolphin - - Returns - ------- - DataFrame - Compiled feeding buzz detection positive minutes. + df: DataFrame + All percentages grouped by site and month per year + metric: str + Type of percentage you want to show on the graph """ - all_files = list(Path(folder_path).rglob("*.txt")) - all_data = [] - - for file in all_files: - file_path = folder_path / file - df = read_csv(file_path, sep="\t") - processed_df = feeding_buzz(df, species) - processed_df["file"] = file - all_data.append(processed_df) - - return concat(all_data, ignore_index=True) - - -colors = { - "DY1": "#118B50", - "DY2": "#5DB996", - "DY3": "#B0DB9C", - "DY4": "#E3F0AF", - "CA4": "#5EABD6", - "Walde": "#FFB4B4", -} + sites = df["site.name"].unique() + n_sites = len(sites) + fig, axs = plt.subplots(n_sites, 1, figsize=(14, 2.5 * n_sites), sharex=True) + if n_sites == 1: + axs = [axs] + for i, site in enumerate(sorted(sites)): + site_data = df[df["site.name"] == site] + ax = axs[i] + bar_colors = site_data["Season"].map(season_color).fillna("gray") + ax.bar( + site_data["YM"], + site_data[metric], + label=f"Site {site}", + color=bar_colors, + width=25, + ) + ax.set_title(f"{site}") + ax.set_ylim(0, max(df[metric]) + 0.2) + ax.set_ylabel(metric) + if i != 3: + ax.set_xlabel("") + else: + ax.set_xlabel("Months") + if metric in {"%buzzes", "FBR"}: + for _, bar in enumerate(ax.patches): + bar.set_hatch("/") + legend_elements = [ + patches.Patch(facecolor=col, edgecolor="black", label=season.capitalize()) + for season, col in season_color.items() + ] + fig.legend( + handles=legend_elements, + loc="upper right", + title="Seasons", + bbox_to_anchor=(0.95, 0.95), + ) + fig.suptitle(f"{metric} per month", fontsize=16) + plt.show() -def extract_site(df: DataFrame) -> DataFrame: - """Create new columns: site.name and campaign.name, in order to match the metadata. +def week_percent(df: DataFrame, metric: str) -> None: + """Plot a graph with the percentage of DPM per site/month-year. Parameters ---------- df: DataFrame - All values concatenated - - Returns - ------- - DataFrame - The same dataframe with two additional columns. + All percentages grouped by site and month per year + metric: str + Type of percentage you want to show on the graph """ - df[["site.name", "campaign.name"]] = df["name"].str.split("_", expand=True) - return df + sites = df["site.name"].unique() + n_sites = len(sites) + fig, axs = plt.subplots(n_sites, 1, figsize=(15, 3 * n_sites), sharex=True) + if n_sites == 1: + axs = [axs] + for i, site in enumerate(sorted(sites)): + site_data = df[df["site.name"] == site].copy() + ax = axs[i] -def percent_calc(data: DataFrame, time_unit: str | None = None) -> DataFrame: - """Calculate percentage of clicks, feeding buzzes and positive hours to detection. + # Masque pour identifier les NAs + na_mask = site_data["DPM"].isna() - Computed on the entire effort and for every site. + # Définir la limite Y + ymax = max(df[metric].dropna()) + 0.2 if not df[metric].dropna().empty else 1 + ax.set_ylim(0, ymax) - Parameters - ---------- - data: DataFrame - All values concatenated + # Tracer les rectangles pour les périodes de NAs + na_dates = site_data.loc[na_mask, "start_datetime"] + if len(na_dates) > 0: + na_groups = [] + current_group = [na_dates.iloc[0]] - time_unit: str - Time unit you want to group your data in - - Returns - ------- - DataFrame + for j in range(1, len(na_dates)): + # Vérifier si les semaines sont consécutives (~7 jours) + if (na_dates.iloc[j] - current_group[-1]).days < 10: + current_group.append(na_dates.iloc[j]) + else: + na_groups.append(current_group) + current_group = [na_dates.iloc[j]] + na_groups.append(current_group) + + # Créer les rectangles + for group in na_groups: + start = group[0] - DateOffset(days=3.5) # Centrer sur la semaine + width = len(group) * 7 + 2 # Largeur en jours + rect = patches.Rectangle( + (mdates.date2num(start), 0), + width, + ymax, + linewidth=1, + edgecolor="gray", + facecolor="lightgray", + alpha=0.3, + label="Pas de données" + if (i == 0 and group == na_groups[0]) + else "", + ) + ax.add_patch(rect) + + # Tracer les barres avec données + bar_colors = site_data.loc[~na_mask, "Season"].map(season_color).fillna("gray") + bars = ax.bar( + site_data.loc[~na_mask, "start_datetime"], + site_data.loc[~na_mask, metric], + label=f"Site {site}", + color=bar_colors, + width=6, # Largeur adaptée pour les semaines + ) - """ - group_cols = ["site.name"] - if time_unit is not None: - group_cols.insert(0, time_unit) + # Ajouter des hachures si nécessaire + if metric in {"%buzzes", "FBR"}: + for bar in bars: + bar.set_hatch("/") - # Aggregate and compute metrics - df = data.groupby(group_cols).agg({ - "DPH": "sum", - "DPM": "sum", - "Day": "size", - "Foraging": "sum", - }).reset_index() + ax.set_title(f"{site}") + ax.set_ylabel(metric) + if i != n_sites - 1: + ax.set_xlabel("") + else: + ax.set_xlabel("Week") - df["%click"] = df["DPM"] * 100 / (df["Day"] * 60) - df["%DPH"] = df["DPH"] * 100 / df["Day"] - df["FBR"] = df["Foraging"] * 100 / df["DPM"] - df["%buzz"] = df["Foraging"] * 100 / (df["Day"] * 60) - return df + # Légende des saisons + legend_elements = [ + patches.Patch(facecolor=col, edgecolor="black", label=season.capitalize()) + for season, col in season_color.items() + ] + # Ajouter "Pas de données" à la légende si des NAs existent + if df["DPM"].isna().any(): + legend_elements.append( + patches.Patch( + facecolor="lightgray", + edgecolor="gray", + alpha=0.3, + label="Pas de données")) + + fig.legend( + handles=legend_elements, + loc="upper right", + title="Seasons", + bbox_to_anchor=(0.95, 0.95), + ) + fig.suptitle(f"{metric} per week", fontsize=16) -def site_percent(df: DataFrame, metric: str) -> None: - """Plot a graph with percentage of minutes positive to detection for every site. + # Formatage de l'axe X + axs[-1].xaxis.set_major_locator(mdates.MonthLocator(interval=1)) + axs[-1].xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m")) + fig.autofmt_xdate() - Parameters - ---------- - df: DataFrame - All percentages grouped by site - metric: str - Type of percentage you want to show on the graph - - """ - ax = sns.barplot(data=df, x="site.name", - y=metric, - hue="site.name", - dodge=False, - palette=colors, - ) - ax.set_title(f"{metric} per site") - ax.set_ylabel(f"{metric}") - if metric == "%buzzes": - for _, bar in enumerate(ax.patches): - bar.set_hatch("/") + plt.tight_layout() plt.show() -def year_percent(df: DataFrame, metric: str) -> None: - """Plot a graph with the percentage of minutes positive to detection per site/year. +def month_percent(df: DataFrame, metric: str) -> None: + """Plot a graph with the percentage of minutes positive to detection per site/month. Parameters ---------- df: DataFrame - All percentages grouped by site and year + All percentages grouped by site and month metric: str Type of percentage you want to show on the graph @@ -723,32 +620,50 @@ def year_percent(df: DataFrame, metric: str) -> None: for i, site in enumerate(sorted(sites)): site_data = df[df["site.name"] == site] ax = axs[i] - ax.bar(site_data["Year"], - site_data[metric], - label=f"Site {site}", - color=colors.get(site, "gray"), - ) - ax.set_title(f"Site {site}") + ax.bar( + site_data["Month"], + site_data[metric], + label=f"Site {site}", + color=site_colors.get(site, "gray"), + ) + ax.set_title(f"{site} - Percentage of minutes positive to detection per month") ax.set_ylim(0, max(df[metric]) + 0.2) ax.set_ylabel(metric) + ax.set_xticks( + [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], + [ + "Jan", + "Feb", + "Mar", + "Apr", + "May", + "Jun", + "Jul", + "Agu", + "Sep", + "Oct", + "Nov", + "Dec", + ], + ) if i != 3: ax.set_xlabel("") else: - ax.set_xlabel("Year") - if metric == "%buzzes": + ax.set_xlabel("Months") + if metric in {"%buzzes", "FBR"}: for _, bar in enumerate(ax.patches): bar.set_hatch("/") - fig.suptitle(f"{metric} per year", fontsize=16) + fig.suptitle(f"{metric} per month", fontsize=16) plt.show() -def month_percent(df: DataFrame, metric: str) -> None: - """Plot a graph with the percentage of minutes positive to detection per site/month. +def day_percent(df: DataFrame, metric: str) -> None: + """Plot a graph with the percentage of DPM per site/month-year. Parameters ---------- df: DataFrame - All percentages grouped by site and month + All percentages grouped by site and month per year metric: str Type of percentage you want to show on the graph @@ -761,26 +676,33 @@ def month_percent(df: DataFrame, metric: str) -> None: for i, site in enumerate(sorted(sites)): site_data = df[df["site.name"] == site] ax = axs[i] - ax.bar(site_data["Month"], - site_data[metric], - label=f"Site {site}", - color=colors.get(site, "gray"), - ) - ax.set_title(f"{site} - Percentage of postitive to detection minutes per month") + bar_colors = site_data["Season"].map(season_color).fillna("gray") + ax.bar( + site_data["Date"], + site_data[metric], + label=f"Site {site}", + color=bar_colors, + ) + ax.set_title(f"{site}") ax.set_ylim(0, max(df[metric]) + 0.2) ax.set_ylabel(metric) - ax.set_xticks([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], - ["Jan", "Feb", "Mar", "Apr", "May", "Jun", - "Jul", "Agu", "Sep", "Oct", "Nov", "Dec", - ], - ) if i != 3: ax.set_xlabel("") else: ax.set_xlabel("Months") - if metric == "%buzzes": + if metric in {"%buzzes", "FBR"}: for _, bar in enumerate(ax.patches): bar.set_hatch("/") + legend_elements = [ + patches.Patch(facecolor=col, edgecolor="black", label=season.capitalize()) + for season, col in season_color.items() + ] + fig.legend( + handles=legend_elements, + loc="upper right", + title="Seasons", + bbox_to_anchor=(0.95, 0.95), + ) fig.suptitle(f"{metric} per month", fontsize=16) plt.show() @@ -804,20 +726,388 @@ def hour_percent(df: DataFrame, metric: str) -> None: for i, site in enumerate(sorted(sites)): site_data = df[df["site.name"] == site] ax = axs[i] - ax.bar(site_data["hour"], - site_data[metric], - label=f"Site {site}", - color=colors.get(site, "gray"), - ) - ax.set_title(f"Site {site} - Percentage of positive to detection per hour") + ax.bar( + site_data["Hour"], + site_data[metric], + label=f"Site {site}", + color=site_colors.get(site, "gray"), + ) + ax.set_title( + f"Site {site} - Percentage of minutes positive to detection per hour", + ) ax.set_ylim(0, max(df[metric]) + 0.2) ax.set_ylabel(metric) if i != 3: ax.set_xlabel("") else: ax.set_xlabel("Hour") - if metric == "%buzzes": + if metric in {"%buzzes", "FBR"}: for _, bar in enumerate(ax.patches): bar.set_hatch("/") fig.suptitle(f"{metric} per hour", fontsize=16) plt.show() + + +def calendar( + meta: DataFrame, + data: DataFrame, +) -> None: + """Produce the calendar of the given data. + + Parameters + ---------- + meta: DataFrame + metadatax file + data: DataFrame + cpod file from all sites and phases + + """ + # format the dataframe + meta["deployment_date"] = to_datetime(meta["deployment_date"]) + meta["recovery_date"] = to_datetime(meta["recovery_date"]) + meta = meta.sort_values(["deploy.name", "deployment_date"]).reset_index(drop=True) + data = data.sort_values(["deploy.name", "Deb"]).reset_index(drop=True) + df_fusion = data.merge( + meta[["deploy.name", "deployment_date", "recovery_date"]], + on=["deploy.name"], + how="outer", + ) + + df_fusion["Deb"] = df_fusion["Deb"].fillna(df_fusion["deployment_date"]) + df_fusion["Fin"] = df_fusion["Fin"].fillna(df_fusion["deployment_date"]) + + df_fusion[["Site", "Phase"]] = df_fusion["deploy.name"].str.split("_", expand=True) + df_fusion["color"] = df_fusion["Site"].map(site_colors) + + # Create the figure + fig, ax = plt.subplots(figsize=(14, 4)) + + sites = sorted(df_fusion["Site"].unique(), reverse=True) + site_mapping = {site: idx for idx, site in enumerate(sites)} + + for _, row in df_fusion.iterrows(): + y_pos = site_mapping[row["Site"]] + ax.broken_barh( + [(row["deployment_date"], row["recovery_date"] - row["deployment_date"])], + (y_pos - 0.3, 0.6), + facecolors="#F5F5F5", + edgecolors="black", + linewidth=0.8, + ) + + if notna(row["Deb"]) and notna(row["Fin"]) and row["Fin"] > row["Deb"]: + ax.broken_barh( + [(row["Deb"], row["Fin"] - row["Deb"])], + (y_pos - 0.15, 0.3), + facecolors=row["color"], + edgecolors="black", + linewidth=0.8, + ) + + ax.set_yticks(range(len(sites))) + ax.set_yticklabels(sites, fontsize=12) + + legend_elements = [ + patches.Patch(facecolor="#F5F5F5", edgecolor="black", label="Deployment"), + ] + for site, color in site_colors.items(): + if site in sites: + legend_elements.append( + patches.Patch(facecolor=color, edgecolor="black", label=f"{site}"), + ) + + ax.legend(handles=legend_elements, loc="upper left", fontsize=11, frameon=True) + # Layout final + plt.xticks(fontsize=12) + plt.tight_layout() + plt.show() + + +def hist_mean_m( + df: DataFrame, + metric_mean: str, + metric_std: str, + y_lab: str | None = None, + title_suffix: str | None = None, +) -> None: + """Produce a histogram of the given data. + + It shows mean and standard deviation of the metric. + + Parameters + ---------- + df: DataFrame + All data grouped by site and month + metric_mean: str + Column name for the mean values (e.g., "%click_mean") + metric_std: str + Column name for the standard deviation values (e.g., "%click_std") + y_lab: str, optional + Label for y-axis. If None, uses metric_mean + title_suffix: str, optional + Suffix for the main title. If None, uses metric_mean + + """ + sites = df["site.name"].unique() + n_sites = len(sites) + fig, axs = plt.subplots(n_sites, 1, figsize=(14, 3 * n_sites), sharex=True) + if n_sites == 1: + axs = [axs] + + # Calculate max for y-axis scaling + max_value = max(df[metric_mean] + df[metric_std]) + + for i, site in enumerate(sorted(sites)): + site_data = df[df["site.name"] == site] + ax = axs[i] + + ax.bar( + x=site_data["Month"], + height=site_data[metric_mean], + yerr=site_data[metric_std], + capsize=4, + color=site_colors.get(site, "gray"), + alpha=0.8, + edgecolor="black", + linewidth=0.5, + label=f"Site {site}") + + ax.set_title(f"{site}", fontsize=12) + ax.set_ylim(0, max_value * 1.1) + ax.set_ylabel(y_lab or metric_mean, fontsize=10) + + # Only set x-label on last subplot + if i == n_sites - 1: + ax.set_xlabel("Mois", fontsize=10) + ax.set_xticks( + [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], + [ + "Jan", + "Fev", + "Mar", + "Avr", + "Mai", + "Jun", + "Jul", + "Aou", + "Sep", + "Oct", + "Nov", + "Dec", + ], + ) + if metric_mean in {"%buzzes_mean", "FBR_mean"}: + for _, bar in enumerate(ax.patches): + bar.set_hatch("/") + + fig.suptitle( + f"{title_suffix or metric_mean} per month", + fontsize=16) + plt.xticks(rotation=45) + plt.tight_layout() + plt.show() + + +def hist_mean_h( + df: DataFrame, + metric_mean: str, + metric_std: str, + y_lab: str | None = None, + title_suffix: str | None = None, +) -> None: + """Produce a histogram of the given data. + + It shows mean and standard deviation of the metric. + + Parameters + ---------- + df: DataFrame + All data grouped by site and month + metric_mean: str + Column name for the mean values (e.g., "%click_mean") + metric_std: str + Column name for the standard deviation values (e.g., "%click_std") + y_lab: str, optional + Label for y-axis. If None, uses metric_mean + title_suffix: str, optional + Suffix for the main title. If None, uses metric_mean + + """ + sites = df["site.name"].unique() + n_sites = len(sites) + fig, axs = plt.subplots(n_sites, 1, figsize=(14, 5 * n_sites), sharex=True) + if n_sites == 1: + axs = [axs] + + # Calculate max for y-axis scaling + max_value = max(df[metric_mean] + df[metric_std]) + + for i, site in enumerate(sorted(sites)): + site_data = df[df["site.name"] == site] + ax = axs[i] + + ax.bar( + x=site_data["Hour"], + height=site_data[metric_mean], + yerr=site_data[metric_std], + capsize=4, + color=site_colors.get(site, "gray"), + alpha=0.8, + edgecolor="black", + linewidth=0.5, + label=f"Site {site}", + ) + + ax.set_title(f"{site}", fontsize=12) + ax.set_ylim(0, max_value * 1.1) + ax.set_ylabel(y_lab or metric_mean, fontsize=10) + ax.set_xticks(range(24)) + + # Only set x-label on last subplot + if i == n_sites - 1: + ax.set_xlabel("Heure", fontsize=10) + if metric_mean in {"%buzzes_mean", "FBR_mean"}: + for _, bar in enumerate(ax.patches): + bar.set_hatch("/") + + fig.suptitle(f"{title_suffix or metric_mean} per hour", fontsize=16) + plt.xticks(rotation=45) + plt.tight_layout() + plt.show() + + +def hist_mean_s( + df: DataFrame, + metric_mean: str, + metric_std: str, + y_lab: str | None = None, + title_suffix: str | None = None, +) -> None: + """Plot bar chart with mean values and error bars (std) per site. + + Parameters + ---------- + df: DataFrame + All data grouped by site + metric_mean: str + Column name for the mean values (e.g., "FBR_mean") + metric_std: str + Column name for the standard deviation values (e.g., "FBR_std") + y_lab: str, optional + Label for y-axis. If None, uses metric_mean + title_suffix: str, optional + Suffix for the title. If None, uses metric_mean + + """ + fig, ax = plt.subplots(figsize=(10, 6)) + + # Group by site and calculate means if needed + plot_data = df.groupby("site.name")[[metric_mean, metric_std]].mean().reset_index() + + x_pos = range(len(plot_data)) + + # Create bars + ax.bar( + x=x_pos, + height=plot_data[metric_mean], + color=[site_colors.get(site, "gray") for site in plot_data["site.name"]], + alpha=0.8, + edgecolor="black", + linewidth=0.5) + + # Add hatching if requested + if metric_mean in {"%buzzes_mean", "FBR_mean"}: + for _, bar in enumerate(ax.patches): + bar.set_hatch("/") + + # Add error bars + for i, (_, row) in enumerate(plot_data.iterrows()): + # Ensure error bar doesn't go below zero + yerr_lower = min(row[metric_mean], row[metric_std]) + yerr_upper = row[metric_std] + ax.errorbar( + i, + row[metric_mean], + yerr=[[yerr_lower], [yerr_upper]], + fmt="none", + color="black", + capsize=5, + linewidth=2, + ) + + ax.set_xticks(x_pos) + ax.set_xticklabels(plot_data["site.name"]) + ax.set_title(f"{title_suffix or metric_mean} per site", + fontsize=12) + ax.set_ylabel(y_lab or metric_mean, fontsize=10) + ax.set_xlabel("Site", fontsize=10) + + plt.tight_layout() + plt.show() + + +def hist_mean_season( + df: DataFrame, + metric_mean: str, + metric_std: str, + y_lab: str | None = None, + title_suffix: str | None = None, +) -> None: + """Produce a histogram of the given data. + + It shows mean and standard deviation of the metric. + + Parameters + ---------- + df: DataFrame + All data grouped by site and month + metric_mean: str + Column name for the mean values (e.g., "%click_mean") + metric_std: str + Column name for the standard deviation values (e.g., "%click_std") + y_lab: str, optional + Label for y-axis. If None, uses metric_mean + title_suffix: str, optional + Suffix for the main title. If None, uses metric_mean + + """ + sites = df["site.name"].unique() + n_sites = len(sites) + fig, axs = plt.subplots(n_sites, 1, figsize=(14, 5 * n_sites), sharex=True) + if n_sites == 1: + axs = [axs] + + # Calculate max for y-axis scaling + max_value = max(df[metric_mean] + df[metric_std]) + + for i, site in enumerate(sorted(sites)): + site_data = df[df["site.name"] == site] + ax = axs[i] + + ax.bar( + x=site_data["Season"], + height=site_data[metric_mean], + yerr=site_data[metric_std], + capsize=4, + color=site_colors.get(site, "gray"), + alpha=0.8, + edgecolor="black", + linewidth=0.5, + label=f"Site {site}", + ) + + ax.set_title(f"{site}", fontsize=12) + ax.set_ylim(0, max_value * 1.1) + ax.set_ylabel(y_lab or metric_mean, fontsize=10) + + # Only set x-label on last subplot + if i == n_sites - 1: + ax.set_xlabel("Season", fontsize=10) + if metric_mean in {"%buzzes_mean", "FBR_mean"}: + for _, bar in enumerate(ax.patches): + bar.set_hatch("/") + + fig.suptitle(f"{title_suffix or metric_mean} per season", fontsize=16) + plt.xticks(rotation=45) + plt.tight_layout() + plt.show() \ No newline at end of file diff --git a/src/post_processing/utils/glider_utils.py b/src/post_processing/utils/glider_utils.py index 626371c..aaa0224 100644 --- a/src/post_processing/utils/glider_utils.py +++ b/src/post_processing/utils/glider_utils.py @@ -175,7 +175,7 @@ def load_glider_nav(directory: Path) -> DataFrame: msg = f"Directory '{directory}' does not exist." raise FileNotFoundError(msg) - file = [f for f in directory.glob("*.gz") if "gli" in f.name] + file = [f for f in directory.rglob("*.gz") if "gli" in f.name] if not len(file) > 0: msg = f"Directory '{directory}' does not contain '.gz' files." diff --git a/src/post_processing/utils/plot_utils.py b/src/post_processing/utils/plot_utils.py index 8d12fa3..4cffcd6 100644 --- a/src/post_processing/utils/plot_utils.py +++ b/src/post_processing/utils/plot_utils.py @@ -239,6 +239,7 @@ def scatter( season = kwargs.get("season", False) coordinates = kwargs.get("coordinates", False) effort = kwargs.get("effort", False) + legend = kwargs.get("legend", False) _prepare_timeline_plot( df=df, @@ -282,6 +283,7 @@ def scatter( shade_no_effort( ax=ax, observed=effort, + legend=legend, ) @@ -577,7 +579,7 @@ def timeline( ax.grid(color="k", linestyle="-", linewidth=0.2) ax.set_yticks(np.arange(0, len(labels), 1)) - ax.set_yticklabels(labels[::-1]) + ax.set_yticklabels(labels) ax.set_xlabel("Date") ax.set_xlim( df["start_datetime"].min().floor("1d"), diff --git a/tests/test_fpod_utils.py b/tests/test_fpod_utils.py new file mode 100644 index 0000000..d1c4ece --- /dev/null +++ b/tests/test_fpod_utils.py @@ -0,0 +1,599 @@ +"""FPOD/ CPOD processing functions tests.""" +import pytest +import pytz +from pandas import DataFrame + +from post_processing.utils.fpod_utils import ( + load_pod_folder, + pod2aplose, +) + +# SAMPLE_POD = """File,ChunkEnd,DPM,Nall,MinsOn +# sample_dataset,2023/11/29 08:05,0,0,0 +# +# """ +# SAMPLE_AP = """dataset,filename,start_time,end_time,start_frequency,end_frequency, +# annotation,annotator,start_datetime,end_datetime,is_box +# sample_dataset,,0,60,0,0,ann1,POD,2023-11-29T08:30:00.000+00:00,2023-11-29T08:31:00.000+00:00,0 +# sample_dataset,,0,60,0,0,ann1,POD,2023-11-29T08:31:00.000+00:00,2023-11-29T08:32:00.000+00:00,0 +# sample_dataset,,0,60,0,0,ann1,POD,2023-11-29T09:30:00.000+00:00,2023-11-29T09:31:00.000+00:00,0 +# sample_dataset,,0,60,0,0,ann1,POD,2023-11-30T08:30:00.000+00:00,2023-11-30T08:31:00.000+00:00,0 +# sample_dataset,,0,60,0,0,ann1,POD,2023-12-29T08:30:00.000+00:00,2023-12-29T08:31:00.000+00:00,0 +# sample_dataset,,0,60,0,0,ann1,POD,2024-11-29T08:30:00.000+00:00,2024-11-29T08:31:00.000+00:00,0 +# """ +# +# @pytest.fixture +# def pod_dataframe() -> DataFrame: +# data = DataFrame( +# { +# "File": [ +# "sample_dataset", +# "sample_dataset", +# "sample_dataset", +# "sample_dataset", +# "sample_dataset", +# "sample_dataset", +# ], +# "ChunkEnd": [ +# Timestamp("2023/11/29 08:30"), +# Timestamp("2023/11/29 08:31"), +# Timestamp("2023/11/29 08:32"), +# Timestamp("2023/11/29 08:33"), +# Timestamp("2023/11/29 08:34"), +# Timestamp("2023/11/29 08:35"), +# ], +# "deploy.name": [ +# "site_deploy", +# "site_deploy", +# "site_deploy", +# "site_deploy", +# "site_deploy", +# "site_deploy", +# ], +# "DPM": [1, 1, 0, 0, 0, 0], +# "Nall": [44, 66, 0, 22, 0, 0], +# "MinsOn": [1, 1, 1, 1, 1, 0], +# }, +# ) +# +# return data.reset_index(drop=True) +# +# +# @pytest.fixture +# def aplose_dataframe() -> DataFrame: +# data = DataFrame( +# { +# "dataset": ["dataset_test", "dataset_test", "dataset_test", "dataset_test", +# "dataset_test", "dataset_test"], +# "filename": ["", "", "", ""], +# "start_time": [0, 0, 0, 0, 0, 0], +# "end_time": [60, 60, 60, 60, 60, 60], +# "start_frequency": [0, 0, 0, 0, 0, 0], +# "end_frequency": [0, 0, 0, 0, 0, 0], +# "annotation": ["ann1", "ann1", "ann1", "ann1", "ann1", "ann1"], +# "annotator": ["POD", "POD", "POD", "POD", "POD", "POD"], +# "start_datetime": [ +# Timestamp("2023-11-29T08:30:00.000+00:00"), +# Timestamp("2023-11-29T08:31:00.000+00:00"), +# Timestamp("2023-11-29T09:31:00.000+00:00"), +# Timestamp("2023-11-30T09:31:00.000+00:00"), +# Timestamp("2023-12-30T09:31:00.000+00:00"), +# Timestamp("2024-12-30T09:31:00.000+00:00"), +# ], +# "end_datetime": [ +# Timestamp("2023-11-29T08:31:00.000+00:00"), +# Timestamp("2023-11-29T08:32:00.000+00:00"), +# Timestamp("2023-11-29T09:32:00.000+00:00"), +# Timestamp("2023-11-30T09:32:00.000+00:00"), +# Timestamp("2023-12-30T09:32:00.000+00:00"), +# Timestamp("2024-12-30T09:32:00.000+00:00"), +# ], +# "is_box": [0, 0, 0, 0, 0, 0], +# "deploy.name": ["site_campaign", "site_campaign", "site_campaign", +# "site_campaign", "site_campaign", "site_campaign"], +# }, +# ) +# +# return data.reset_index(drop=True) + +#@pytest.fixture(scope="module") +# @dt.working_directory(__file__) +# def df_raw() -> DataFrame: +# return read_csv("pod_raw.csv") +# +# @pytest.fixture(scope="module") +# @dt.working_directory(__file__) +# def df_ap() -> DataFrame: +# return read_csv("pod_aplose.csv") + +#@pytest.mark.mandatory +# def test_columns(df_raw: DataFrame) -> None: +# dt.validate( +# df_raw.columns, +# {"File", "ChunkEnd", "DPM", "Nall", "MinsOn"}, +# ) +# +# @pytest.mark.mandatory +# def test_columns(df_ap: DataFrame) -> None: +# dt.validate( +# df_ap.columns, +# {"dataset","filename","start_time","end_time","start_frequency","end_frequency", +# "annotation","annotator","start_datetime","end_datetime","is_box"}, +# ) +# +# def test_chunk_end(df_raw: DataFrame) -> None: +# dt.validate(df_raw["ChunkEnd"], +# strptime_from_text(df_raw["ChunkEnd"], "%Y/%m/%d %H:%M")) +# +# def test_start_datetime(df_ap: DataFrame) -> None: +# dt.validate(df_ap["start_datetime"], strptime_from_text(df_ap["start_datetime"], +# "%Y-%m-%dT%H:%M:%S")) + +# @pytest.fixture +# def sample_pod() -> DataFrame: +# df = read_csv(io.StringIO(SAMPLE_POD), parse_dates=["ChunkEnd"]) +# return df.sort_values(["ChunkEnd"]).reset_index(drop=True) + + +# csv_folder +def test_csv_folder_single_file(tmp_path) -> None: + """Test processing a single CSV file.""" + # Create a CSV file + csv_file = tmp_path / "data.csv" + csv_file.write_text("col1;col2\nval1;val2\nval3;val4", encoding="latin-1") + + result = load_pod_folder(tmp_path) + + assert isinstance(result, DataFrame) + assert len(result) == 2 + assert "deploy.name" in result.columns + assert all(result["deploy.name"] == "data") + assert list(result.columns) == ["col1", "col2", "deploy.name"] + + +# pod2aplose +@pytest.fixture +def sample_df(): + """Create a sample POD DataFrame for testing.""" + return DataFrame({ + "ChunkEnd": ["15/01/2024 10:30", "15/01/2024 11:00", "15/01/2024 09:45"], + "deploy.name": ["deploy1", "deploy2", "deploy1"], + }) + + +@pytest.fixture +def timezone(): + """Return UTC timezone for testing.""" + return pytz.UTC + + +def test_pod2aplose_basic_structure(sample_df, timezone): + """Test that basic structure and required columns are present.""" + result = pod2aplose( + df=sample_df, + tz=timezone, + dataset_name="test_dataset", + annotation="test_annotation", + annotator="test_annotator", + ) + + expected_columns = [ + "dataset", + "filename", + "start_time", + "end_time", + "start_frequency", + "end_frequency", + "annotation", + "annotator", + "start_datetime", + "end_datetime", + "is_box", + "deploy.name", + ] + + assert isinstance(result, DataFrame) + assert list(result.columns) == expected_columns + assert len(result) == len(sample_df) + + +def test_pod2aplose_dataset_propagation(sample_df, timezone): + """Test that dataset name is propagated to all rows.""" + result = pod2aplose( + df=sample_df, + tz=timezone, + dataset_name="my_dataset", + annotation="click", + annotator="john", + ) + + assert all(result["dataset"] == "my_dataset") + + +def test_pod2aplose_annotation_propagation(sample_df, timezone): + """Test that annotation is propagated to all rows.""" + result = pod2aplose( + df=sample_df, + tz=timezone, + dataset_name="dataset", + annotation="porpoise_click", + annotator="john", + ) + + assert all(result["annotation"] == "porpoise_click") + + +def test_pod2aplose_annotator_propagation(sample_df, timezone): + """Test that annotator is propagated to all rows.""" + result = pod2aplose( + df=sample_df, + tz=timezone, + dataset_name="dataset", + annotation="click", + annotator="alice", + ) + + assert all(result["annotator"] == "alice") + + +def test_pod2aplose_default_bin_size(sample_df, timezone): + """Test default bin_size of 60 seconds.""" + result = pod2aplose( + df=sample_df, + tz=timezone, + dataset_name="dataset", + annotation="click", + annotator="john", + ) + + assert all(result["start_time"] == 0) + assert all(result["end_time"] == 60) + + +def test_pod2aplose_custom_bin_size(sample_df, timezone): + """Test custom bin_size parameter.""" + result = pod2aplose( + df=sample_df, + tz=timezone, + dataset_name="dataset", + annotation="click", + annotator="john", + bin_size=120, + ) + + assert all(result["start_time"] == 0) + assert all(result["end_time"] == 120) + + +def test_pod2aplose_frequency_values(sample_df, timezone): + """Test that frequency values are set to 0.""" + result = pod2aplose( + df=sample_df, + tz=timezone, + dataset_name="dataset", + annotation="click", + annotator="john", + ) + + assert all(result["start_frequency"] == 0) + assert all(result["end_frequency"] == 0) + + +def test_pod2aplose_is_box_values(sample_df, timezone): + """Test that is_box values are set to 0.""" + result = pod2aplose( + df=sample_df, + tz=timezone, + dataset_name="dataset", + annotation="click", + annotator="john", + ) + + assert all(result["is_box"] == 0) + + +def test_pod2aplose_deploy_name_preserved(sample_df, timezone): + """Test that deploy.name values are preserved from input.""" + result = pod2aplose( + df=sample_df, + tz=timezone, + dataset_name="dataset", + annotation="click", + annotator="john", + ) + + # After sorting, deploy.name should still be present + assert "deploy.name" in result.columns + assert len(result["deploy.name"]) == len(sample_df) + assert set(result["deploy.name"]) == {"deploy1", "deploy2"} + + +def test_pod2aplose_sorting_by_datetime(timezone): + """Test that rows are sorted by datetime.""" + df = DataFrame({ + "ChunkEnd": ["15/01/2024 12:00", "15/01/2024 10:00", "15/01/2024 11:00"], + "deploy.name": ["d1", "d2", "d3"], + }) + + result = pod2aplose( + df=df, tz=timezone, dataset_name="dataset", annotation="click", annotator="john" + ) + + # Check that deploy.name follows the sorted order (by time) + assert result["deploy.name"].tolist() == ["d2", "d3", "d1"] + + +def test_pod2aplose_datetime_formatting(): + """Test that datetime strings are properly formatted.""" + df = DataFrame({"ChunkEnd": ["01/02/2024 14:30"], "deploy.name": ["deploy1"]}) + + result = pod2aplose( + df=df, + tz=pytz.UTC, + dataset_name="dataset", + annotation="click", + annotator="john", + bin_size=60, + ) + + # Check that datetime strings are present and not empty + assert len(result["start_datetime"].iloc[0]) > 0 + assert len(result["end_datetime"].iloc[0]) > 0 + assert len(result["filename"].iloc[0]) > 0 + + +def test_pod2aplose_end_datetime_offset(timezone): + """Test that end_datetime is offset by bin_size from start_datetime.""" + df = DataFrame({"ChunkEnd": ["15/01/2024 10:00"], "deploy.name": ["deploy1"]}) + + result = pod2aplose( + df=df, + tz=timezone, + dataset_name="dataset", + annotation="click", + annotator="john", + bin_size=120, + ) + + # Both should be valid datetime strings + assert result["start_datetime"].iloc[0] != result["end_datetime"].iloc[0] + + +def test_pod2aplose_different_timezones(): + """Test with different timezone.""" + df = DataFrame({"ChunkEnd": ["15/01/2024 10:00"], "deploy.name": ["deploy1"]}) + + tz_paris = pytz.timezone("Europe/Paris") + + result = pod2aplose( + df=df, tz=tz_paris, dataset_name="dataset", annotation="click", annotator="john" + ) + + assert len(result) == 1 + assert result["dataset"].iloc[0] == "dataset" + + +def test_pod2aplose_empty_dataframe(timezone): + """Test handling of empty DataFrame.""" + df = DataFrame({"ChunkEnd": [], "deploy.name": []}) + + result = pod2aplose( + df=df, tz=timezone, dataset_name="dataset", annotation="click", annotator="john" + ) + + assert len(result) == 0 + assert list(result.columns) == [ + "dataset", + "filename", + "start_time", + "end_time", + "start_frequency", + "end_frequency", + "annotation", + "annotator", + "start_datetime", + "end_datetime", + "is_box", + "deploy.name", + ] + + +def test_pod2aplose_single_row(timezone): + """Test with single row DataFrame.""" + df = DataFrame({"ChunkEnd": ["20/03/2024 15:45"], "deploy.name": ["single_deploy"]}) + + result = pod2aplose( + df=df, + tz=timezone, + dataset_name="dataset", + annotation="click", + annotator="john", + bin_size=90, + ) + + assert len(result) == 1 + assert result["deploy.name"].iloc[0] == "single_deploy" + assert result["end_time"].iloc[0] == 90 + + +def test_pod2aplose_does_not_modify_original(sample_df, timezone): + """Test that the original DataFrame is not modified.""" + original_columns = sample_df.columns.tolist() + original_len = len(sample_df) + + pod2aplose( + df=sample_df, + tz=timezone, + dataset_name="dataset", + annotation="click", + annotator="john", + ) + + # Original DataFrame should be unchanged + assert sample_df.columns.tolist() == original_columns + assert len(sample_df) == original_len + assert "_temp_dt" not in sample_df.columns + + +def test_pod2aplose_large_bin_size(sample_df, timezone): + """Test with large bin_size value.""" + result = pod2aplose( + df=sample_df, + tz=timezone, + dataset_name="dataset", + annotation="click", + annotator="john", + bin_size=3600, # 1 hour + ) + + assert all(result["end_time"] == 3600) + + +def test_pod2aplose_index_reset(timezone): + """Test that index is properly reset after sorting.""" + df = DataFrame({ + "ChunkEnd": ["15/01/2024 12:00", "15/01/2024 10:00"], + "deploy.name": ["d1", "d2"] + }) + + result = pod2aplose( + df=df, + tz=timezone, + dataset_name="dataset", + annotation="click", + annotator="john" + ) + + # Index should be 0, 1 after reset + assert result.index.tolist() == [0, 1] + +# meta_cut_aplose + + +# build_range + + +# feeding_buzz + + +# assign_daytime + + +# fb_folder +# def test_fb_folder_non_existent() -> None: +# with pytest.raises(FileNotFoundError): +# txt_folder(Path("/non/existent/folder")) +# +# def test_fb_folder_no_files(tmp_path: pytest.fixture) -> None: +# with pytest.raises(ValueError, match="No .txt files found"): +# txt_folder(tmp_path) + +# extract_site +# def test_extract_site(self) -> None: +# input_data = [ +# {"deploy.name":"Walde_Phase46"}, +# {"deploy.name":"Site A Ile Haute_Phase8"}, +# {"deploy.name":"Site B Ile Heugh_Phase9"}, +# {"deploy.name":"Point E_Phase 4"}, +# ] +# expected_site = [ +# "Walde", +# "Site A Ile Haute", +# "Site B Ile Heugh", +# "Point E", +# ] +# expected_campaign = [ +# "Phase46", +# "Phase8", +# "Phase9", +# "Phase 4", +# ] +# +# for variant, (input_row, site, campaign) in enumerate( +# zip(input_data, expected_site, expected_campaign, strict=False), start=1): +# with self.subTest( +# f"variation #{variant}", +# deploy_name=input_row["deploy.name"], +# expected_site=site, +# expected_campaign=campaign, +# ): +# df = DataFrame([input_row]) +# result = extract_site(df) +# actual_site = result["site.name"].iloc[0] +# actual_campaign = result["campaign.name"].iloc[0] +# +# error_message_site = ( +# f'Called extract_site() with deploy.name="{input_row["deploy.name"]}". ' +# f'The function returned site.name="{actual_site}", but the test ' +# f'expected "{expected_site}".' +# ) +# +# error_message_campaign = ( +# f'Called extract_site() with deploy.name="{input_row["deploy.name"]}". ' +# f'The function returned campaign.name="{actual_campaign}", but the test' +# f'expected "{expected_campaign}".' +# ) +# +# assert actual_site == expected_site, error_message_site +# assert actual_campaign == expected_campaign, error_message_campaign +# +# assert "deploy.name" in result.columns +# assert "value" in result.columns + +# csv_folder +# def test_csv_folder_non_existent() -> None: +# with pytest.raises(FileNotFoundError): +# csv_folder(Path("/non/existent/folder")) +# +# def test_csv_folder_no_files(tmp_path: pytest.fixture) -> None: +# with pytest.raises(ValueError, match="No .csv files found"): +# csv_folder(tmp_path) + +# is_dpm_col + + +# pf_datetime + + +# build_aggregation_dict + + +# resample_dpm + + +# parse_timestamps +# def test_parse_timestamps() -> None: +# df = DataFrame({"date": ["2024-01-01T10:00:00", "06/01/2025 08:35"]}) +# result = parse_timestamps(df, "date") +# expected = DataFrame({"date": ["2024-01-01 10:00:00", +# "2025-01-06 08:35:00"]}).astype("datetime64[ns]") +# assert_frame_equal(result, expected) + +# deploy_period +# def test_deploy_period() -> None: +# df = DataFrame( +# { +# "deploy.name": ["A", "A", "B"], +# "start_datetime": [ +# datetime(2024, 1, 1, 10, 0, tzinfo=datetime.timezone.utc), +# datetime(2024, 1, 2, 15, 30, tzinfo=datetime.timezone.utc), +# datetime(2024, 1, 3, 8, 0, tzinfo=datetime.timezone.utc), +# ], +# }) +# +# expected = DataFrame( +# { +# "deploy.name": ["A", "B"], +# "Début": [ +# datetime(2024, 1, 1, 10, 0, tzinfo=datetime.timezone.utc), +# datetime(2024, 1, 3, 8, 0, tzinfo=datetime.timezone.utc), +# ], +# "Fin": [ +# datetime(2024, 1, 2, 15, 30, tzinfo=datetime.timezone.utc), +# datetime(2024, 1, 3, 8, 0, tzinfo=datetime.timezone.utc), +# ], +# }) +# result = deploy_period(df) +# assert_frame_equal(result, expected) + +# actual_data \ No newline at end of file diff --git a/user_case/config.py b/user_case/config.py new file mode 100644 index 0000000..bf74b37 --- /dev/null +++ b/user_case/config.py @@ -0,0 +1,11 @@ +from pathlib import Path + +import yaml + +config_file = Path(r"C:\Users\fouinel\PycharmProjects\OSmOSE_post_processing\user_case\config.yaml") + +config = yaml.safe_load(config_file.read_text()) if config_file.exists() else {} + +site_colors = config.get("site_colors", {"Site A Haute": "#118B50", "Site B Heugh": "#5DB996", "Site C Chat": "#B0DB9C", "Site D Simone": "#E3F0AF", "CA4": "#80D8C3", "Walde": "#4DA8DA", "Point C": "#932F67", "Point D": "#D92C54", "Point E": "#DDDEAB", "Point F": "#8ABB6C", "Point G": "#456882"}) + +season_color = config.get("season_color", {"spring": "green", "summer": "orange", "autumn": "brown", "winter": "blue"}) \ No newline at end of file diff --git a/user_case/config.yaml b/user_case/config.yaml new file mode 100644 index 0000000..6da6e08 --- /dev/null +++ b/user_case/config.yaml @@ -0,0 +1,19 @@ +site_colors: + CA4: '#80D8C3' + Point C: '#932F67' + Point D: '#D92C54' + Point E: '#DDDEAB' + Point F: '#4E61D3' + Point G: '#456882' + Site A Haute: '#118B50' + Site B Heugh: '#5DB996' + Site C Chat: '#B0DB9C' + Site D Simone: '#E3F0AF' + Walde: '#4DA8DA' + 02Mn Sud Cotentin: '#FB4141' + +season_color : + spring: "green" + summer: "orange" + autumn: "brown" + winter: "blue" \ No newline at end of file