Source code for configuration.utils.functions

"""Contains utility functions for data processing and manipulation."""

# Copyright  2025  Institute of Light and Matter, CNRS UMR 5306, University Claude Bernard Lyon 1
# Contributors: Oscar DUFOUR, Maxime STAPELLE, Alexandre NICOLAS

# This software is a computer program designed to generate a realistic crowd from anthropometric data and
# simulate the mechanical interactions that occur within it and with obstacles.

# This software is governed by the CeCILL-B license under French law and abiding by the rules of distribution
# of free software.  You can  use, modify and/ or redistribute the software under the terms of the CeCILL-B
# license as circulated by CEA, CNRS and INRIA at the following URL "http://www.cecill.info".

# As a counterpart to the access to the source code and  rights to copy, modify and redistribute granted by
# the license, users are provided only with a limited warranty  and the software's author,  the holder of the
# economic rights,  and the successive licensors  have only  limited liability.

# In this respect, the user's attention is drawn to the risks associated with loading,  using,  modifying
# and/or developing or reproducing the software by the user in light of its specific status of free software,
# that may mean  that it is complicated to manipulate,  and  that  also therefore means  that it is reserved
# for developers  and  experienced professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their requirements in conditions enabling
# the security of their systems and/or data to be ensured and,  more generally, to use and operate it in the
# same conditions as regards security.

# The fact that you are presently reading this means that you have had knowledge of the CeCILL-B license and that
# you accept its terms.

import csv
import io
import pickle
from functools import lru_cache
from pathlib import Path
from typing import Any

import numpy as np
import pandas as pd
from numpy.typing import NDArray
from scipy.stats import truncnorm
from shapely.geometry import MultiPolygon, Polygon

import configuration.utils.constants as cst
from configuration.utils.typing_custom import Sex


[docs] @lru_cache(maxsize=4) def load_pickle(file_path: str) -> Any: """ Load data from a pickle file. Parameters ---------- file_path : str A string object representing the path to the pickle file to be loaded. Returns ------- Any The deserialized data loaded from the pickle file. The type of the returned object depends on what was serialized into the pickle file (e.g. list[float], NDArray[np.float64] etc.). Raises ------ TypeError If `file_path` is not a `Path` object. FileNotFoundError If the specified file does not exist. """ if not isinstance(file_path, str): raise TypeError("file_path must be a string.") if not Path(file_path).exists(): raise FileNotFoundError(f"The file {file_path} does not exist.") with open(file_path, "rb") as f: data = pickle.load(f) return data
[docs] def save_pickle(data: Any, file_path: Path) -> None: """ Save data to a pickle file. Parameters ---------- data : Any The data to be serialized and saved. This can be any Python object that is supported by the pickle module. file_path : Path A Path object representing the path where the pickle file will be saved. Raises ------ TypeError If file_path is not a Path object. FileNotFoundError If the directory for file_path does not exist. """ if not isinstance(file_path, Path): raise TypeError("file_path must be a Path object.") if not file_path.parent.exists(): raise FileNotFoundError(f"The directory {file_path.parent} does not exist.") with open(file_path, "wb") as f: pickle.dump(data, f)
[docs] def load_csv(filename: Path) -> pd.DataFrame: """ Load data from a CSV file into a pandas DataFrame. Parameters ---------- filename : Path A Path object representing the path to the CSV file to be loaded. Returns ------- pd.DataFrame A pandas DataFrame containing the data from the CSV file. Raises ------ TypeError If filename is not a Path object. FileNotFoundError If the specified file does not exist. ValueError If the file does not have a .csv extension. """ if not isinstance(filename, Path): raise TypeError("filename must be a Path object.") if not filename.exists(): raise FileNotFoundError(f"The file {filename} does not exist.") if not filename.suffix == ".csv": raise ValueError(f"The file {filename} is not a CSV file.") return pd.read_csv(filename)
[docs] def get_csv_buffer(data_dict: dict[str, list[float | None]]) -> str: """ Generate CSV content from a dictionary for download (in memory). Parameters ---------- data_dict : dict[str, list[float | None]] The dictionary containing the data to be saved. If a value is None, it will be written as an empty cell in the CSV. Returns ------- str The CSV content as a string, ready to be used with Streamlit's download button. """ output = io.StringIO() writer = csv.writer(output) # Write the keys as the header row writer.writerow(data_dict.keys()) # Find the maximum length of the value lists max_len = max(len(v) for v in data_dict.values()) # Write each row of values for i in range(max_len): row = [] for v in data_dict.values(): if i < len(v): row.append("" if v[i] is None else v[i]) else: row.append("") writer.writerow(row) return output.getvalue()
[docs] def wrap_angle(angle: float) -> float: """ Wrap an angle to the range [-180, 180). Parameters ---------- angle : float The angle in degrees to be wrapped. This can be any real number. Returns ------- float The wrapped angle in the range [-180, 180). """ return (angle + 180.0) % 360.0 - 180.0
[docs] def draw_from_trunc_normal(mean: float, std_dev: float, min_val: float, max_val: float) -> float: """ Draw a sample from a truncated normal distribution. This function generates a random sample from a normal distribution that is truncated within the range [min_val, max_val]. The truncation ensures that the sample lies within the specified bounds. Parameters ---------- mean : float The mean of the normal distribution. std_dev : float The standard deviation of the normal distribution. min_val : float The lower bound of the truncated normal distribution. max_val : float The upper bound of the truncated normal distribution. Returns ------- float A sample drawn from the truncated normal distribution. Raises ------ ValueError If std_dev is less than or equal to zero, or if min_val is greater than or equal to max_val. """ if std_dev <= 0: raise ValueError("Standard deviation must be greater than zero.") if min_val >= max_val: raise ValueError("min_val must be less than max_val.") # Calculate standardized bounds for truncation a = (min_val - mean) / std_dev b = (max_val - mean) / std_dev # Draw a sample from the truncated normal distribution return float(truncnorm.rvs(a, b, loc=mean, scale=std_dev))
[docs] def draw_sex(p: float) -> Sex: """ Randomly draw a sex (`male` or `female`) based on the input proportion of `male`. Parameters ---------- p : float A proportion value in [0,1], representing the probability of selecting `male`. Returns ------- Sex `male` if a randomly generated number is less than `p`; otherwise, `female`. Raises ------ ValueError If the probability `p` is not in [0,1]. """ # Check if the probability is between 0 and 1 if not 0 <= p <= 1: raise ValueError("Probability p must be between 0 and 1.") # Draw a random number and return the sex return np.random.choice(["male", "female"], p=[p, 1 - p])
[docs] def cross2d(Pn: NDArray[np.float64], Pn1: NDArray[np.float64]) -> float: """ Compute the 2D cross product of two vectors defined as `Pn[0] * Pn1[1] - Pn[1] * Pn1[0]`. Parameters ---------- Pn : NDArray[np.float64] A 1D NumPy array of shape (2,) representing the first 2D vector in the form [x, y]. Pn1 : NDArray[np.float64] A 1D NumPy array of shape (2,) representing the second 2D vector in the form [x, y]. Returns ------- float The magnitude of the resulting perpendicular vector to the plane formed by the input vectors. """ return float(Pn[0] * Pn1[1] - Pn[1] * Pn1[0])
[docs] def compute_moment_of_inertia(geometric_shape: Polygon | MultiPolygon, weight: float) -> float: """ Compute the moment of inertia for a 2D Polygon or MultiPolygon. This function calculates the moment of inertia (I_z) for a 2D shape represented as a polygon based on its vertices and weight. The calculation is performed using the second moment of area formula, assuming the polygon is in the XY-plane. For more details on the second moment of area, refer to: https://en.wikipedia.org/wiki/Second_moment_of_area. Parameters ---------- geometric_shape : Polygon | MultiPolygon The geometrical representation as a shapely Polygon or MultiPolygon object (cm). weight : float The agent weight (kg). Returns ------- float The computed moment of inertia for the shape (kg·m²). Notes ----- For the MultiPolygon case, it computes the moment of inertia for each polygon and sums them up, weighted by their respective areas. """ def polygon_inertia(polygon: Polygon, poly_weight: float) -> float: """ Compute the moment of inertia for a 2D Polygon. Parameters ---------- polygon : Polygon The geometrical representation as a Polygon object (cm). poly_weight : float The agent weight (kg). Returns ------- float The moment of inertia of the polygon (kg·m²). """ vertices = np.array(polygon.exterior.coords) centroid = np.array(polygon.centroid.coords[0]) vertices = vertices - centroid # Shift to centroid N = len(vertices) - 1 # Last point repeats the first rho = poly_weight / polygon.area # Density (mass per unit area) I_z = 0.0 for n in range(N): Pn = np.array(vertices[n]) Pn1 = np.array(vertices[(n + 1) % N]) cross_product_magnitude = abs(cross2d(Pn, Pn1)) dot_product_terms = np.dot(Pn, Pn) + np.dot(Pn, Pn1) + np.dot(Pn1, Pn1) I_z += cross_product_magnitude * dot_product_terms moment_of_inertia: float = rho * I_z / 12.0 moment_of_inertia *= 1e-4 # convert to kg·m^2 return moment_of_inertia if isinstance(geometric_shape, Polygon): return polygon_inertia(geometric_shape, weight) if isinstance(geometric_shape, MultiPolygon): total_area = geometric_shape.area moment = 0.0 for poly in geometric_shape.geoms: poly_area = poly.area poly_weight = weight * (poly_area / total_area) moment += polygon_inertia(poly, poly_weight) return moment raise TypeError("Input must be a Shapely Polygon or MultiPolygon.")
[docs] def validate_material(material: str) -> None: """ Validate if the given material is in MaterialNames. Parameters ---------- material : str The material name to validate. """ if material not in cst.MaterialNames.__members__: raise ValueError(f"Material '{material}' is not supported. Expected one of: {list(cst.MaterialNames.__members__.keys())}.")
[docs] def rotate_vectors(vector_dict: dict[str, tuple[float, float]], theta: float) -> dict[str, tuple[float, float]]: """ Rotate 2D vectors in a dictionary by a given angle. Parameters ---------- vector_dict : dict[str, tuple[float, float]] A dictionary where each key maps to a 2D vector represented as a tuple (x, y). theta : float The angle in degrees by which to rotate the vectors. Returns ------- dict[str, tuple[float, float]] A dictionary with the same keys, where each vector has been rotated by the given angle. """ theta_rad = np.radians(theta) # Convert angle to radians rotated_dict = {} for key, (x, y) in vector_dict.items(): # Apply rotation matrix x_rot = x * np.cos(theta_rad) - y * np.sin(theta_rad) y_rot = x * np.sin(theta_rad) + y * np.cos(theta_rad) rotated_dict[key] = (x_rot, y_rot) return rotated_dict
[docs] def compute_bideltoid_breadth_from_multipolygon(multi_polygon: MultiPolygon) -> float: """ Compute the largest horizontal distance (bideltoid breadth) between points in a MultiPolygon object. Parameters ---------- multi_polygon : MultiPolygon A MultiPolygon object. Returns ------- float The largest horizontal distance (bideltoid breadth). Notes ----- To accelerate that function, only pairs of points with almost the same y-coordinate are considered. Therefore it is assumed that the input is a MultiPolygon object coming from the body3D of a pedestrian that has not been rotated. """ if not isinstance(multi_polygon, MultiPolygon): raise ValueError("Input must be a Shapely MultiPolygon object.") # Assuming multi_polygon is a Shapely MultiPolygon object center_of_mass = multi_polygon.centroid # Combine boundary coordinates from all polygons, subtracting centroid all_coords = np.array( [(coord[0] - center_of_mass.x, coord[1] - center_of_mass.y) for poly in multi_polygon.geoms for coord in poly.boundary.coords] ) # Sort points by their y-coordinate sorted_coords = all_coords[np.argsort(all_coords[:, 1])] # Use a sliding window to find pairs of points with similar y-coordinates tolerance = 1e-1 # Adjust this value based on precision needs max_distance = 0.0 i = 0 while i < len(sorted_coords): j = i + 1 while j < len(sorted_coords) and abs(sorted_coords[j, 1] - sorted_coords[i, 1]) <= tolerance: # Compute horizontal distance (x-difference) distance = abs(sorted_coords[j, 0] - sorted_coords[i, 0]) max_distance = max(max_distance, distance) j += 1 i += 1 return max_distance
[docs] def compute_chest_depth_from_multipolygon(multi_polygon: MultiPolygon) -> float: """ Compute the largest vertical distance (chest depth) in a MultiPolygon object. Parameters ---------- multi_polygon : MultiPolygon A MultiPolygon object. Returns ------- float The largest vertical distance (chest depth). Notes ----- To accelerate that function, only pairs of points with almost the same x-coordinate are considered. Therefore it is assumed that the input is a MultiPolygon object coming from the body3D of a pedestrian that has not been rotated. """ if not isinstance(multi_polygon, MultiPolygon): raise ValueError("Input must be a Shapely MultiPolygon object.") # Assuming multi_polygon is a Shapely MultiPolygon object center_of_mass = multi_polygon.centroid # Combine boundary coordinates from all polygons, subtracting centroid all_coords = np.array( [(coord[0] - center_of_mass.x, coord[1] - center_of_mass.y) for poly in multi_polygon.geoms for coord in poly.boundary.coords] ) # Sort points by their x-coordinate sorted_coords = all_coords[np.argsort(all_coords[:, 0])] # Use a sliding window to find pairs of points with similar x-coordinates tolerance = 1e-1 # Adjust this value based on precision needs max_distance = 0.0 i = 0 while i < len(sorted_coords): j = i + 1 while j < len(sorted_coords) and abs(sorted_coords[j, 0] - sorted_coords[i, 0]) <= tolerance: # Compute vertical distance (y-difference) distance = abs(sorted_coords[j, 1] - sorted_coords[i, 1]) max_distance = max(max_distance, distance) j += 1 i += 1 return max_distance
[docs] def from_string_to_tuple(string: str) -> tuple[float, float]: """ Convert a string representation of a tuple to an actual tuple of two floats. Parameters ---------- string : str The string representation of the tuple, e.g., "1.0, 2.0" or "(1.0, 2.0)". Returns ------- tuple[float, float] The converted tuple of floats, e.g., (1.0, 2.0). Raises ------ ValueError If `string` is not in the expected format. """ if not isinstance(string, str): raise ValueError("Input must be a string.") # Remove optional parentheses and strip whitespace s = string.strip() if s.startswith("(") and s.endswith(")"): s = s[1:-1].strip() parts = [x.strip() for x in s.split(",")] if len(parts) != 2: raise ValueError("`string` must contain exactly two numbers separated by a comma.") try: return float(parts[0]), float(parts[1]) except ValueError as exc: raise ValueError("Both elements must be convertible to float.") from exc
[docs] def sigmoid(x: float, smoothing: float) -> float: """ Compute the numerically stable sigmoid function. Parameters ---------- x : float Input value. smoothing : float Smoothing parameter. Returns ------- float The output of the sigmoid function. """ if smoothing <= 0: raise ValueError("Smoothing parameter must be positive.") X = x / smoothing # Numerically stable sigmoid if X >= 0: z = np.exp(-X) return float(1.0 / (1.0 + z)) z = np.exp(X) return float(z / (1.0 + z))
[docs] def rectangular_function(height: float, scale_xy: float, sex: Sex | str) -> float: """ Compute the value of a door function evaluated in height, based on scale, and sex parameters. Parameters ---------- height : float The variable of the function. scale_xy : float The scale parameter for the rectangular function. sex : Sex or str The sex of a pedestrian, either as a Sex enum or string ("male" or "female"). Returns ------- float The value of the rectangular function. Raises ------ ValueError If an invalid sex is provided. """ # Normalize sex input if isinstance(sex, str): sex = sex.lower() if sex == "male": sex_enum = cst.Sex.male.name elif sex == "female": sex_enum = cst.Sex.female.name else: raise ValueError(f"Invalid sex: {sex}") elif isinstance(sex, Sex): sex_enum = sex else: raise ValueError(f"Invalid sex type: {type(sex)}") if sex_enum == cst.Sex.female.name: neck_height = cst.NECK_HEIGHT_FEMALE knees_height = cst.KNEES_HEIGHT_FEMALE else: neck_height = cst.NECK_HEIGHT_MALE knees_height = cst.KNEES_HEIGHT_MALE return 1.0 + (scale_xy - 1.0) * sigmoid(neck_height - height, cst.EPSILON_SMOOTHING_NECK) * sigmoid( height - knees_height, cst.EPSILON_SMOOTHING_KNEES )
[docs] def direction_of_longest_side(polygon: Polygon) -> float: """ Compute the direction (in degrees) of the longest side of a 4-vertex polygon. The direction is measured from the first vertex of the side to the second, relative to the positive x-axis. Parameters ---------- polygon : Polygon A Polygon object. Returns ------- float The direction of the longest side, in degrees [0-360). """ coords = np.array(polygon.exterior.coords[:-1]) # Exclude the repeated last point assert coords.shape[0] == 4, "Polygon must have exactly 4 vertices." # Compute vectors for each side vectors = np.roll(coords, -1, axis=0) - coords # shape (4, 2) lengths = np.linalg.norm(vectors, axis=1) angles_rad = np.arctan2(vectors[:, 1], vectors[:, 0]) angles_deg = np.degrees(angles_rad) # Find the index of the longest side idx = np.argmax(lengths) return wrap_angle(angles_deg[idx])
[docs] def filter_dict_by_not_None_values(input_dict: dict[str, Any]) -> dict[str, Any]: """ Filter a dictionary to remove keys with None values. Parameters ---------- input_dict : dict[str, Any] The input dictionary to be filtered. Returns ------- dict[str, Any] A new dictionary containing only the key-value pairs where the value is not None. """ return {k: v for k, v in input_dict.items() if v is not None and v != []}
[docs] def k_from_EG(E1: float, G1: float, E2: float, G2: float) -> tuple[float, float]: r""" Compute ``k_perp`` and ``k_par`` from ``E1``, ``G1``, ``E2``, ``G2``. The spring constants are given by: .. math:: k^{\perp} = \left(\frac{4G_1 - E_1}{4G_1^2} + \frac{4G_2 - E_2}{4G_2^2}\right)^{-1}, \\ k^{\parallel} = \left(\frac{6G_1 - E_1}{8G_1^2} + \frac{6G_2 - E_2}{8G_2^2}\right)^{-1}. Parameters ---------- E1 : float Young's modulus of material 1 (N/m). G1 : float Shear modulus of material 1 (N/m). E2 : float Young's modulus of material 2 (N/m). G2 : float Shear modulus of material 2 (N/m). Returns ------- k_perp : float Spring constant for the direction orthogonal to the surface contact (N/m). k_par : float Spring constant for the direction parallel to the surface contact (N/m). Notes ----- The elastic moduli are assumed to be for 2D systems and thus have units of N/m. """ inv_k_perp = (4 * G1 - E1) / (4 * G1**2) + (4 * G2 - E2) / (4 * G2**2) inv_k_par = (6 * G1 - E1) / (8 * G1**2) + (6 * G2 - E2) / (8 * G2**2) k_perp = 1.0 / inv_k_perp k_par = 1.0 / inv_k_par return k_perp, k_par
[docs] def EG_from_k(k_perp: float, k_par: float) -> tuple[float, float]: r""" Compute ``E`` and ``G`` from ``k_perp`` and ``k_par``, assuming both materials in contact are identical. The moduli are given by: .. math:: G = \frac{k^{\parallel} k^{\perp}}{2 k^{\perp} - k^{\parallel}}, \\ E = \frac{2 k^{\parallel} k^{\perp} (4 k^{\perp} - 3 k^{\parallel})}{(k^{\parallel} - 2 k^{\perp})^2}. Parameters ---------- k_perp : float Spring constant for the direction orthogonal to the surface contact (N/m). k_par : float Spring constant for the direction parallel to the surface contact (N/m). Returns ------- E : float Young's modulus (N/m). G : float Shear modulus (N/m). Notes ----- The elastic moduli are assumed to be for 2D systems and thus have units of N/m. """ G = k_par * k_perp / (2 * k_perp - k_par) E = 2 * k_par * k_perp * (4 * k_perp - 3 * k_par) / (k_par - 2 * k_perp) ** 2 return E, G
[docs] def G_from_E_nu(E: float, nu: float) -> float: """ Compute shear modulus G from Young's modulus E and Poisson's ratio nu. Parameters ---------- E : float Young's modulus (N/m). nu : float Poisson's ratio (dimensionless). Returns ------- float Shear modulus G (N/m). Notes ----- The elastic moduli are assumed to be for 2D systems and thus have units of N/m. """ return E / (2 * (1 + nu))