"""Pedestrian visualization tab."""
# Copyright 2025 Institute of Light and Matter, CNRS UMR 5306, University Claude Bernard Lyon 1
# Contributors: Oscar DUFOUR, Maxime STAPELLE, Alexandre NICOLAS
# This software is a computer program designed to generate a realistic crowd from anthropometric data and
# simulate the mechanical interactions that occur within it and with obstacles.
# This software is governed by the CeCILL-B license under French law and abiding by the rules of distribution
# of free software. You can use, modify and/ or redistribute the software under the terms of the CeCILL-B
# license as circulated by CEA, CNRS and INRIA at the following URL "http://www.cecill.info".
# As a counterpart to the access to the source code and rights to copy, modify and redistribute granted by
# the license, users are provided only with a limited warranty and the software's author, the holder of the
# economic rights, and the successive licensors have only limited liability.
# In this respect, the user's attention is drawn to the risks associated with loading, using, modifying
# and/or developing or reproducing the software by the user in light of its specific status of free software,
# that may mean that it is complicated to manipulate, and that also therefore means that it is reserved
# for developers and experienced professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their requirements in conditions enabling
# the security of their systems and/or data to be ensured and, more generally, to use and operate it in the
# same conditions as regards security.
# The fact that you are presently reading this means that you have had knowledge of the CeCILL-B license and that
# you accept its terms.
import pickle
from datetime import datetime
from io import BytesIO
import matplotlib.pyplot as plt
import numpy as np
import plotly.graph_objects as go
import streamlit as st
from shapely.geometry import Polygon
import configuration.backup.crowd_to_dict as fun_dict
import configuration.backup.crowd_to_zip_and_reverse as fun_zip
import configuration.backup.dict_to_xml_and_reverse as fun_xml
import configuration.utils.constants as cst
import configuration.utils.functions as fun
import streamlit_app.utils.constants as cst_app
from configuration.models.crowd import Crowd, create_agents_from_dynamic_static_geometry_parameters
from configuration.models.measures import CrowdMeasures
from configuration.utils.typing_custom import DynamicCrowdDataType, GeometryDataType, StaticCrowdDataType
from streamlit_app.plot import plot
[docs]
def initialize_session_state() -> None:
"""Initialize Streamlit session state variables."""
default_values = {
"pedestrian_proportion": cst.CrowdStat["pedestrian_proportion"],
"bike_proportion": cst.CrowdStat["bike_proportion"],
"male_proportion": cst.CrowdStat["male_proportion"],
"male_chest_depth_mean": cst.CrowdStat["male_chest_depth_mean"],
"male_bideltoid_breadth_mean": cst.CrowdStat["male_bideltoid_breadth_mean"],
"male_height_mean": cst.CrowdStat["male_height_mean"],
"male_weight_mean": cst.CrowdStat["male_weight_mean"],
"female_chest_depth_mean": cst.CrowdStat["female_chest_depth_mean"],
"female_bideltoid_breadth_mean": cst.CrowdStat["female_bideltoid_breadth_mean"],
"female_height_mean": cst.CrowdStat["female_height_mean"],
"female_weight_mean": cst.CrowdStat["female_weight_mean"],
"wheel_width_mean": cst.CrowdStat["wheel_width_mean"],
"total_length_mean": cst.CrowdStat["total_length_mean"],
"handlebar_length_mean": cst.CrowdStat["handlebar_length_mean"],
"top_tube_length_mean": cst.CrowdStat["top_tube_length_mean"],
"boundary_x": cst_app.DEFAULT_BOUNDARY_X,
"boundary_y": cst_app.DEFAULT_BOUNDARY_Y,
"bike_weight": cst.CrowdStat["bike_weight_mean"],
"repulsion_length": cst_app.DEFAULT_REPULSION_LENGTH_MIN,
"wall_interaction": cst_app.DEFAULT_WALL_INTERACTION,
"simulation_run": True,
"desired_direction": cst.DEFAULT_DESIRED_DIRECTION,
"variable_orientation": False,
"plot_twoD_run": True,
"plot_threeD_run": True,
"plot_threeD_layers_run": True,
"twoD_scene": plt.figure(),
"threeD_scene": go.Figure(),
"threeD_layers": go.Figure(),
"selected_packing_option": "grid",
"pack_options": {"grid": "Grid", "pack": "Custom packing \n (time consuming)"},
}
for key, value in default_values.items():
if key not in st.session_state:
st.session_state[key] = value
if "current_crowd" not in st.session_state:
initial_boundaries = create_boundaries(cst_app.DEFAULT_BOUNDARY_X, cst_app.DEFAULT_BOUNDARY_Y)
current_crowd = Crowd(boundaries=initial_boundaries)
current_crowd.create_agents(cst_app.DEFAULT_AGENT_NUMBER)
st.session_state.current_crowd = current_crowd
st.session_state.crowd_measures = current_crowd.measures
if "num_agents" not in st.session_state:
st.session_state.num_agents = st.session_state.current_crowd.get_number_agents()
[docs]
def parameter_changed() -> None:
"""Update the Streamlit session state to indicate that a simulation should be run."""
st.session_state.simulation_run = True
st.session_state.plot_twoD_run = True
st.session_state.plot_threeD_run = True
st.session_state.plot_threeD_layers_run = True
[docs]
def create_boundaries(boundary_x: float, boundary_y: float) -> Polygon:
"""
Create a polygon representing the room boundaries.
Parameters
----------
boundary_x : float
Half-width of the room.
boundary_y : float
Half-height of the room.
Returns
-------
Polygon
A polygon object representing the rectangular room boundaries.
"""
return Polygon(
[
(-boundary_x / 2.0, -boundary_y / 2.0),
(boundary_x / 2.0, -boundary_y / 2.0),
(boundary_x / 2.0, boundary_y / 2.0),
(-boundary_x / 2.0, boundary_y / 2.0),
]
)
[docs]
def update_crowd(boundaries: Polygon, num_agents: int) -> Crowd:
"""
Create and return a new Crowd object.
Parameters
----------
boundaries : Polygon
The boundaries for the simulation area.
num_agents : int
The number of agents to create in the crowd.
Returns
-------
Crowd
A new Crowd object with the specified boundaries and agents.
"""
crowd = Crowd(boundaries=boundaries)
crowd.create_agents(num_agents)
return crowd
[docs]
def display_interpenetration_warning() -> None:
"""Display a warning if interpenetration is too high."""
interpenetration_between_agents, interpenetration_with_boundaries = st.session_state.current_crowd.calculate_interpenetration()
string_packing = " or pack closely." if st.session_state.selected_packing_option == st.session_state.pack_options["grid"] else "."
if interpenetration_between_agents > 1e-4:
st.warning(
f"The interpenetration area **between agents** is {interpenetration_between_agents:.2f} cm².\n"
+ "Please rerun or increase the boundaries"
+ string_packing,
icon="⚠️",
)
if st.session_state.wall_interaction:
if interpenetration_with_boundaries > 1e-4:
st.warning(
f"The interpenetration area **with boundaries** is {interpenetration_with_boundaries:.2f} cm².\n"
+ "Please rerun or increase the boundaries or pack closely"
+ string_packing,
icon="⚠️",
)
[docs]
def display_table(data: dict[str, float | int]) -> None:
"""
Display a markdown table in a Streamlit column.
Parameters
----------
data : dict[str, float | int]
The data to display in the table.
"""
if data:
table_md = "| Measure | Value |\n|---|---|\n"
for key, value in data.items():
table_md += f"| {key.capitalize()} | {np.round(value, 2)} |\n"
st.markdown(table_md)
else:
st.info("No data.")
[docs]
def display_crowd_statistics(crowd_statistics_measures: dict[str, float | int | None]) -> None:
"""
Display crowd statistics in a Streamlit app, organized into four side-by-side tables.
Parameters
----------
crowd_statistics_measures : dict[str, float | int | None]
A dictionary containing crowd statistics measures.
"""
filtered_measures = {k: v for k, v in crowd_statistics_measures.items() if v is not None}
st.write("### Measured crowd statistics")
# Group keys
group1 = {k: v for k, v in filtered_measures.items() if "proportion" in k}
group2 = {k: v for k, v in filtered_measures.items() if "male" in k and "female" not in k and k not in group1}
group3 = {k: v for k, v in filtered_measures.items() if "female" in k and k not in set(group1) | set(group2)}
# Exclude keys already in other groups for group4
used_keys = set(group1) | set(group2) | set(group3)
group4 = {k: v for k, v in filtered_measures.items() if k not in used_keys}
# Prepare non-empty groups
groups: list[tuple[str, dict[str, float | int]]] = []
if group1:
groups.append(("Proportion", group1))
if group2:
groups.append(("Male", group2))
if group3:
groups.append(("Female", group3))
if group4:
groups.append(("Bike", group4))
if not groups:
st.info("No statistics available to display.")
return
tab_titles = [title for title, _ in groups]
tabs = st.tabs(tab_titles)
for tab, (_, data) in zip(tabs, groups, strict=False):
with tab:
display_table(data)
[docs]
def plot_and_download_crowd2D(current_crowd: Crowd) -> None:
"""
Plot the crowd and provide download options.
Parameters
----------
current_crowd : Crowd
The Crowd object to be plotted and downloaded.
"""
crowd_statistics = current_crowd.get_crowd_statistics()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Display section
col1, col2 = st.columns([1.5, 1])
with col1:
st.subheader("Visualisation")
if st.session_state.plot_twoD_run:
st.session_state.twoD_scene = plot.display_crowd2D(current_crowd)[0]
st.session_state.plot_twoD_run = False
st.pyplot(st.session_state.twoD_scene)
crowd_plot = BytesIO()
st.session_state.twoD_scene.savefig(crowd_plot, format="pdf")
crowd_plot.seek(0)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
st.download_button(
label="Download plot as PDF",
data=crowd_plot,
file_name=f"crowd_{timestamp}.pdf",
mime="application/pdf",
)
with col2:
display_crowd_statistics(crowd_statistics["measures"])
# Download section
st.sidebar.header("Download")
# check if all agents in the Crowd are pedestrian
if all(agent.agent_type == cst.AgentTypes.pedestrian for agent in current_crowd.agents):
filename = f"crowd2D_{timestamp}.zip"
zip_buffer = fun_zip.write_crowd_data_to_zip(current_crowd)
# Add download button for the ZIP file
st.sidebar.download_button(
label="Export crowd as XML config files",
data=zip_buffer,
file_name=filename,
mime="application/zip",
width="stretch",
)
if cst_app.SHOW_DEV:
filename = f"crowd2D_{timestamp}.xml"
data_dict = fun_dict.get_light_agents_params(current_crowd)
data = fun_xml.save_light_agents_params_dict_to_xml(data_dict)
st.sidebar.download_button(
label="Export crowd as XML config file",
data=data,
file_name=filename,
mime="application/xml",
help="Export basic information about the crowd to a single XML file",
width="stretch",
)
# Download the crowd statistics as a CSV file
filename = f"crowd_statistics_{timestamp}.csv"
data = fun.get_csv_buffer(crowd_statistics["stats_lists"])
st.sidebar.download_button(
label="Export distributions as CSV file",
data=data,
file_name=filename,
mime="text/csv",
help="Export all the measured data used to compute the statistics given in the table as CSV file",
width="stretch",
)
else:
filename = f"crowd2D_{timestamp}.xml"
data_dict = fun_dict.get_light_agents_params(current_crowd)
data = fun_xml.save_light_agents_params_dict_to_xml(data_dict)
st.sidebar.download_button(
label="Export crowd as XML config file",
data=data,
file_name=filename,
mime="application/xml",
help="Export basic information about the crowd to a single XML file",
width="stretch",
)
# Download the crowd statistics as a CSV file
filename = f"crowd_statistics_{timestamp}.csv"
data = fun.get_csv_buffer(crowd_statistics["stats_lists"])
st.sidebar.download_button(
label="Export distributions as CSV file",
data=data,
file_name=filename,
mime="text/csv",
help="Export all the measured data used to compute the statistics given in the table as CSV file",
width="stretch",
)
# Display information about the covered area
st.info(f"Total area covered by the agents: {current_crowd.calculate_covered_area():.2f} cm²", icon="ℹ️")
[docs]
def agent_statistics_state(new_boundaries: Polygon, num_agents: int) -> None:
"""
Create custom statistics and update the session state.
Parameters
----------
new_boundaries : Polygon
The new boundaries for the simulation area.
num_agents : int
The number of agents in the simulation.
"""
pedestrian_proportion = st.sidebar.slider(
"Proportion of pedestrians",
min_value=0.0,
max_value=1.0,
value=st.session_state.pedestrian_proportion,
step=0.1,
on_change=parameter_changed,
)
st.session_state.pedestrian_proportion = pedestrian_proportion
bike_proportion = 1 - pedestrian_proportion
st.session_state.bike_proportion = bike_proportion
if st.session_state.pedestrian_proportion > 0:
male_proportion = st.sidebar.slider(
"Proportion of male",
min_value=0.0,
max_value=1.0,
value=st.session_state.male_proportion,
step=0.1,
on_change=parameter_changed,
)
st.session_state.male_proportion = male_proportion
if st.session_state.male_proportion != 0.0:
male_chest_depth_mean = st.sidebar.slider(
"Male mean chest depth",
min_value=cst.CrowdStat["male_chest_depth_min"],
max_value=cst.CrowdStat["male_chest_depth_max"],
value=st.session_state.male_chest_depth_mean,
step=1.0,
on_change=parameter_changed,
)
st.session_state.male_chest_depth_mean = male_chest_depth_mean
male_bideltoid_breadth_mean = st.sidebar.slider(
"Male mean bideltoid breadth",
min_value=cst.CrowdStat["male_bideltoid_breadth_min"],
max_value=cst.CrowdStat["male_bideltoid_breadth_max"],
value=st.session_state.male_bideltoid_breadth_mean,
step=1.0,
on_change=parameter_changed,
)
st.session_state.male_bideltoid_breadth_mean = male_bideltoid_breadth_mean
male_height_mean = st.sidebar.slider(
"Male mean height",
min_value=cst.CrowdStat["male_height_min"],
max_value=cst.CrowdStat["male_height_max"],
value=st.session_state.male_height_mean,
step=1.0,
on_change=parameter_changed,
)
st.session_state.male_height_mean = male_height_mean
if st.session_state.male_proportion != 1.0:
female_chest_depth_mean = st.sidebar.slider(
"Female mean chest depth",
min_value=cst.CrowdStat["male_chest_depth_min"],
max_value=cst.CrowdStat["male_chest_depth_max"],
value=st.session_state.male_chest_depth_mean,
step=1.0,
on_change=parameter_changed,
)
st.session_state.female_chest_depth_mean = female_chest_depth_mean
female_bideltoid_breadth_mean = st.sidebar.slider(
"Female mean bideltoid breadth",
min_value=cst.CrowdStat["male_bideltoid_breadth_min"],
max_value=cst.CrowdStat["male_bideltoid_breadth_max"],
value=st.session_state.female_bideltoid_breadth_mean,
step=1.0,
on_change=parameter_changed,
)
st.session_state.female_bideltoid_breadth_mean = female_bideltoid_breadth_mean
female_height_mean = st.sidebar.slider(
"Female mean height",
min_value=cst.CrowdStat["female_height_min"],
max_value=cst.CrowdStat["female_height_max"],
value=st.session_state.female_height_mean,
step=1.0,
on_change=parameter_changed,
)
st.session_state.female_height_mean = female_height_mean
if st.session_state.bike_proportion > 0.0:
wheel_width_mean = st.sidebar.slider(
"Wheel width mean",
min_value=cst.CrowdStat["wheel_width_min"],
max_value=cst.CrowdStat["wheel_width_max"],
value=st.session_state.wheel_width_mean,
step=1.0,
on_change=parameter_changed,
)
st.session_state.wheel_width_mean = wheel_width_mean
total_length_mean = st.sidebar.slider(
"Total length mean",
min_value=cst.CrowdStat["total_length_min"],
max_value=cst.CrowdStat["total_length_max"],
value=st.session_state.total_length_mean,
step=1.0,
on_change=parameter_changed,
)
st.session_state.total_length_mean = total_length_mean
handlebar_length_mean = st.sidebar.slider(
"Handlebar length mean",
min_value=cst.CrowdStat["handlebar_length_min"],
max_value=cst.CrowdStat["handlebar_length_max"],
value=st.session_state.handlebar_length_mean,
step=1.0,
on_change=parameter_changed,
)
st.session_state.handlebar_length_mean = handlebar_length_mean
top_tube_length_mean = st.sidebar.slider(
"Top tube length mean",
min_value=cst.CrowdStat["top_tube_length_min"],
max_value=cst.CrowdStat["top_tube_length_max"],
value=st.session_state.top_tube_length_mean,
step=1.0,
on_change=parameter_changed,
)
st.session_state.top_tube_length_mean = top_tube_length_mean
# Initialize agent_statistics with default values from cst.CrowdStat
agent_statistics = cst.CrowdStat.copy()
# Override specific values with st.session_state where applicable
agent_statistics.update(
{
"male_proportion": st.session_state.male_proportion,
"pedestrian_proportion": st.session_state.pedestrian_proportion,
"bike_proportion": st.session_state.bike_proportion,
"male_bideltoid_breadth_mean": st.session_state.male_bideltoid_breadth_mean,
"male_chest_depth_mean": st.session_state.male_chest_depth_mean,
"male_height_mean": st.session_state.male_height_mean,
"female_bideltoid_breadth_mean": st.session_state.female_bideltoid_breadth_mean,
"female_chest_depth_mean": st.session_state.female_chest_depth_mean,
"female_height_mean": st.session_state.female_height_mean,
"wheel_width_mean": st.session_state.wheel_width_mean,
"total_length_mean": st.session_state.total_length_mean,
"handlebar_length_mean": st.session_state.handlebar_length_mean,
}
)
crowd_measures = CrowdMeasures(agent_statistics=agent_statistics)
# Check if the measures have changed
if st.session_state.simulation_run:
# Update session state if measures have changed
st.session_state.crowd_measures = crowd_measures
# Create a new crowd with updated boundaries and measures
current_crowd = Crowd(boundaries=new_boundaries, measures=crowd_measures)
current_crowd.create_agents(num_agents)
# Update the session state with the new crowd
st.session_state.current_crowd = current_crowd
[docs]
def boundaries_state() -> Polygon:
"""
Create room boundaries and update the session state.
Returns
-------
Polygon
The new boundaries of the room.
"""
if st.session_state.wall_interaction:
boundary_x = st.sidebar.number_input(
"Length X (cm)",
min_value=cst_app.DEFAULT_BOUNDARY_X_MIN,
max_value=cst_app.DEFAULT_BOUNDARY_X_MAX,
value=st.session_state.get("boundary_x", cst_app.DEFAULT_BOUNDARY_X_MIN),
step=1.0,
on_change=parameter_changed,
)
boundary_y = st.sidebar.number_input(
"Length Y (cm)",
min_value=cst_app.DEFAULT_BOUNDARY_Y_MIN,
max_value=cst_app.DEFAULT_BOUNDARY_Y_MAX,
value=st.session_state.get("boundary_y", cst_app.DEFAULT_BOUNDARY_Y_MIN),
step=1.0,
on_change=parameter_changed,
)
# Update session state if changed
if boundary_x != st.session_state.get("boundary_x", cst_app.DEFAULT_BOUNDARY_X_MIN) or boundary_y != st.session_state.get(
"boundary_y", cst_app.DEFAULT_BOUNDARY_Y_MIN
):
st.session_state.boundary_x = boundary_x
st.session_state.boundary_y = boundary_y
st.session_state.simulation_run = True
new_boundaries = create_boundaries(st.session_state.boundary_x, st.session_state.boundary_y)
else:
new_boundaries = Polygon()
return new_boundaries
[docs]
def general_settings() -> Polygon:
"""
Configure and return general settings for the simulation.
Returns
-------
Polygon
The updated boundaries of the simulation area.
"""
selected_packing_option = st.sidebar.pills(
" ", list(st.session_state.pack_options.values()), label_visibility="collapsed", default=st.session_state.pack_options["grid"]
)
if selected_packing_option != st.session_state.selected_packing_option:
st.session_state.selected_packing_option = selected_packing_option
parameter_changed()
# Compare using the display value, not the key
close_packing_enabled = st.session_state.selected_packing_option == st.session_state.pack_options["pack"]
num_agents = st.sidebar.number_input(
"Number of agents",
min_value=cst_app.DEFAULT_AGENT_NUMBER_MIN,
max_value=cst_app.DEFAULT_AGENT_NUMBER_MAX,
value=st.session_state.num_agents,
step=1,
on_change=parameter_changed,
)
if num_agents != st.session_state.num_agents:
st.session_state.num_agents = num_agents
if close_packing_enabled:
desired_direction = st.sidebar.number_input(
"Desired direction (degrees)",
min_value=-180.0,
max_value=180.0,
value=st.session_state.desired_direction,
step=1.0,
on_change=parameter_changed,
)
if desired_direction != st.session_state.desired_direction:
st.session_state.desired_direction = desired_direction
variable_orientation: bool = st.sidebar.checkbox("Variable orientation", value=False, on_change=parameter_changed)
if variable_orientation != st.session_state.variable_orientation:
st.session_state.variable_orientation = variable_orientation
repulsion_length: float = st.sidebar.slider(
"Initial spacing (cm)",
min_value=cst_app.DEFAULT_REPULSION_LENGTH_MIN,
max_value=cst_app.DEFAULT_REPULSION_LENGTH_MAX,
value=cst.DEFAULT_REPULSION_LENGTH,
step=0.01,
on_change=parameter_changed,
)
if repulsion_length != st.session_state.repulsion_length:
st.session_state.repulsion_length = repulsion_length
wall_interaction = st.sidebar.checkbox(
"Enable wall interaction",
value=cst_app.DEFAULT_WALL_INTERACTION,
key="wall_interaction",
on_change=parameter_changed,
)
if wall_interaction != st.session_state.get("wall_interaction", False):
st.session_state.wall_interaction = wall_interaction
new_boundaries = boundaries_state()
else:
st.session_state.wall_interaction = False
new_boundaries = Polygon()
return new_boundaries
[docs]
def run_crowd_init() -> None:
"""
Provide an interactive interface for simulating and visualizing a crowd of agents.
Users can configure general settings, select databases, and control agent packing behavior.
The tab includes options for crowd visualization and downloading results.
Attributes
----------
Main Page:
- Crowd visualization using Plotly charts.
- Interpenetration warnings if applicable.
Notes
-----
- Sidebar:
- General settings:
- Toggle for packing agents.
- Input fields for boundaries, number of agents, wall interaction strength, and repulsion length.
- Database selection:
- Options: ANSURII database, Custom statistics.
- Additional settings for custom statistics.
- Download options:
- Export results as files.
- If agent packing is enabled, agents are packed using force-based interactions.
Otherwise, the crowd is unpacked.
- Interpenetration between agents is calculated and displayed as a warning if necessary.
"""
# Initialize session state variables
initialize_session_state()
st.sidebar.header("General settings")
new_boundaries = general_settings()
# Rolling menu to select between ANSURII database / Custom Statistics
database_option = st.sidebar.selectbox(
"Database origin",
options=["ANSURII database", "Custom statistics"],
)
if "database_option" not in st.session_state:
st.session_state.database_option = database_option
if database_option == "ANSURII database":
if st.session_state.simulation_run:
info_placeholder = st.empty()
info_placeholder.info(
"The agents creation is ongoing and may take some time. Please be patient.",
icon="⏳",
)
current_crowd = Crowd(boundaries=new_boundaries)
current_crowd.create_agents(st.session_state.num_agents)
st.session_state.current_crowd = current_crowd
info_placeholder.empty()
else: # Custom Statistics
st.sidebar.header(f"{database_option} settings")
agent_statistics_state(new_boundaries, st.session_state.num_agents)
if st.session_state.simulation_run:
info_placeholder = st.empty()
if st.session_state.selected_packing_option == st.session_state.pack_options["pack"]:
info_placeholder.info(
"The packing of the crowd is ongoing and may take some time. Please be patient.",
icon="⏳",
)
st.session_state.current_crowd.pack_agents_with_forces(
st.session_state.repulsion_length, st.session_state.desired_direction, st.session_state.variable_orientation
)
st.session_state.simulation_run = False
info_placeholder.empty()
else:
# if some agents are bike, then specify the grid parameters
if any(agent.agent_type == cst.AgentTypes.bike for agent in st.session_state.current_crowd.agents):
st.session_state.current_crowd.pack_agents_on_grid(grid_size_x=cst.GRID_SIZE_X_BIKE, grid_size_y=cst.GRID_SIZE_Y_BIKE)
else:
st.session_state.current_crowd.pack_agents_on_grid()
st.session_state.simulation_run = False
display_interpenetration_warning()
# Choose between 2D representation of the crowd or 3D representation
st.subheader("Choose dimension")
plot_2D_3D_and_download_section(st.session_state.current_crowd)
[docs]
def plot_2D_3D_and_download_section(current_crowd: Crowd) -> None:
"""
Display options to plot the current crowd in 2D or 3D and provide download functionality.
Depending on the agent types in the crowd, this function presents the user with options
to visualize the crowd either in 2D or 3D. If all agents are of type `pedestrian`, both
2D and 3D visualization options are available. Otherwise, only 2D visualization is offered.
The function also enables downloading the plotted results.
Parameters
----------
current_crowd : Crowd
The crowd object containing agent data to be visualized.
"""
if all(agent.agent_type == cst.AgentTypes.pedestrian for agent in current_crowd.agents):
dimension_options = {
"2D crowd": "2D",
"3D crowd": "3D",
}
selected_dimension_options = st.pills(" ", list(dimension_options.values()), label_visibility="collapsed", default="2D")
# Plotting and downloading
if selected_dimension_options == dimension_options["2D crowd"]:
plot_and_download_crowd2D(current_crowd)
elif selected_dimension_options == dimension_options["3D crowd"]:
plot_and_download_crowd3D(current_crowd)
else:
selected_dimension_option = st.pills(" ", ["2D"], label_visibility="collapsed", default="2D")
# Plotting and downloading
if selected_dimension_option == "2D":
plot_and_download_crowd2D(current_crowd)
[docs]
def run_crowd_from_config() -> None:
"""
Run the crowd simulation from uploaded XML configuration files.
This function provides a Streamlit sidebar interface for uploading three required XML files:
Agents.xml, Geometry.xml, and AgentDynamics.xml. It validates the uploads, parses the XML files
into dictionaries, creates a crowd object using the configuration, displays a 2D plot of the crowd,
and allows the user to download the plot as a PDF.
Notes
-----
- All three configuration files must be uploaded to proceed.
- Displays errors or info messages in the Streamlit sidebar if files are missing or invalid.
"""
if "plot_twoD_run" not in st.session_state:
st.session_state["plot_twoD_run"] = True
if "plot_threeD_run" not in st.session_state:
st.session_state["plot_threeD_run"] = True
if "plot_threeD_layers_run" not in st.session_state:
st.session_state["plot_threeD_layers_run"] = True
if "twoD_scene" not in st.session_state:
st.session_state["twoD_scene"] = plt.figure()
if "threeD_scene" not in st.session_state:
st.session_state["threeD_scene"] = go.Figure()
if "threeD_layers" not in st.session_state:
st.session_state["threeD_layers"] = go.Figure()
# --- File upload section ---
st.sidebar.header("Upload configuration files")
uploaded_dynamics = st.sidebar.file_uploader(
"Upload AgentDynamics.xml", type="xml", key="AgentDynamics", on_change=parameter_changed
)
uploaded_agents = st.sidebar.file_uploader("Upload Agents.xml", type="xml", key="Agents", on_change=parameter_changed)
uploaded_geometry = st.sidebar.file_uploader("Upload Geometry.xml", type="xml", key="Geometry", on_change=parameter_changed)
# --- File validation ---
files = {
"Agents.xml": uploaded_agents,
"Geometry.xml": uploaded_geometry,
"AgentDynamics.xml": uploaded_dynamics,
}
missing_files = [name for name, file in files.items() if file is None or (hasattr(file, "size") and file.size == 0)]
if missing_files:
for name in missing_files:
st.error(f"{name} is missing or empty. Please upload a valid file.")
st.info("Please upload all three configuration files to continue.")
# --- XML Parsing ---
if all(file is not None and (not hasattr(file, "size") or file.size > 0) for file in files.values()): #
crowd_xml: str = uploaded_agents.read().decode("utf-8")
static_dict: StaticCrowdDataType = fun_xml.static_xml_to_dict(crowd_xml)
geometry_xml: str = uploaded_geometry.read().decode("utf-8")
geometry_dict: GeometryDataType = fun_xml.geometry_xml_to_dict(geometry_xml)
dynamic_xml: str = uploaded_dynamics.read().decode("utf-8")
dynamic_dict: DynamicCrowdDataType = fun_xml.dynamic_xml_to_dict(dynamic_xml)
# --- Crowd creation ---
try:
current_crowd = create_agents_from_dynamic_static_geometry_parameters(
static_dict=static_dict,
dynamic_dict=dynamic_dict,
geometry_dict=geometry_dict,
)
# --- Plotting and downloading ---
st.subheader("Choose dimension")
plot_2D_3D_and_download_section(current_crowd)
except ValueError as e:
st.error(f"Value error while creating crowd: {e}")
except KeyError as e:
st.error(f"Key error while creating crowd: {e}")
except TypeError as e:
st.error(f"Type error while creating crowd: {e}")
[docs]
def plot_and_download_crowd_from_config(current_crowd: Crowd) -> None:
"""
Plot and download the plot of the crowd from configuration files.
Parameters
----------
current_crowd : Crowd
The Crowd object to be plotted and downloaded.
"""
# --- Plotting ---
col1, _ = st.columns([1.5, 1])
with col1:
fig = plot.display_crowd2D(current_crowd)[0]
st.pyplot(fig)
# --- Download section ---
crowd_plot = BytesIO()
fig.savefig(crowd_plot, format="pdf")
crowd_plot.seek(0)
st.sidebar.header("Download")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
st.download_button(
label="Download plot as PDF",
data=crowd_plot,
file_name=f"crowd_{timestamp}.pdf",
mime="application/pdf",
)
[docs]
def plot_and_download_crowd3D(current_crowd: Crowd) -> None:
"""
Plot the crowd in 3D and provide download options.
Parameters
----------
current_crowd : Crowd
The Crowd object to be plotted and downloaded.
"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
st.sidebar.header("Download")
filename = f"crowd3D_{timestamp}.pkl"
data_to_download = pickle.dumps([current_pedestrian.shapes3D.shapes for current_pedestrian in current_crowd.agents])
st.sidebar.download_button(
label="Export 3D crowd data as PKL",
data=data_to_download,
file_name=filename,
mime="application/octet-stream",
help="This will download the 3D crowd data as "
"a list of dict[float, MultiPolygon], "
"i.e. one dictionary for each agent, "
"as a pickle file.",
)
st.subheader("Visualisation")
col1, col2 = st.columns([1, 1])
with col1:
if st.session_state.plot_threeD_run:
st.session_state.threeD_scene = plot.display_crowd3D_whole_3Dscene(current_crowd)
st.session_state.plot_threeD_run = False
st.plotly_chart(st.session_state.threeD_scene)
with col2:
st.text(" ")
st.text(" ")
st.text(" ")
st.text(" ")
st.text(" ")
st.text(" ")
st.text(" ")
st.text(" ")
if st.session_state.plot_threeD_layers_run:
st.session_state.threeD_layers = plot.display_crowd3D_slices_by_slices(current_crowd)
st.session_state.plot_threeD_layers_run = False
st.plotly_chart(st.session_state.threeD_layers)
[docs]
def run_tab_crowd() -> None:
"""
Display and manage the crowd setup tab in the Streamlit app.
This function allows the user to either initialize a new crowd and save configuration files,
or to create a crowd using existing configuration files by uploading them. The function
handles file validation, parsing, crowd creation, visualization, and plot download.
"""
st.subheader("Select the crowd setup method")
crowd_origin_options = {
"init crowd": "Initialize your own crowd",
"crowd from config": "Generate from configuration files",
}
selected_crowd_origin = st.pills(" ", list(crowd_origin_options.values()), label_visibility="collapsed")
if selected_crowd_origin == crowd_origin_options["init crowd"]:
run_crowd_init()
if selected_crowd_origin == crowd_origin_options["crowd from config"]:
run_crowd_from_config()