diff --git a/CHANGELOG.md b/CHANGELOG.md index f1d054ae..59d02d74 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 0.6.7 +- Add option to skip MPC calculation in given time intervals, e.g. during summer period +- Add a fallback pid module that listens to the same deactivation module and switches on + ## 0.6.6 - self.time available in mpc and ml mpc (not yet available for admm, minlp, etc) diff --git a/agentlib_mpc/__init__.py b/agentlib_mpc/__init__.py index 21018aa2..fec6fe98 100644 --- a/agentlib_mpc/__init__.py +++ b/agentlib_mpc/__init__.py @@ -3,4 +3,5 @@ from .modules import MODULE_TYPES from .models import MODEL_TYPES -__version__ = "0.6.6" + +__version__ = "0.6.7" diff --git a/agentlib_mpc/data_structures/mpc_datamodels.py b/agentlib_mpc/data_structures/mpc_datamodels.py index 9478fcee..53f9b604 100644 --- a/agentlib_mpc/data_structures/mpc_datamodels.py +++ b/agentlib_mpc/data_structures/mpc_datamodels.py @@ -1,6 +1,6 @@ import dataclasses from pathlib import Path -from typing import List, Union, TypeVar, Protocol, Sequence, Iterable +from typing import List, Union, TypeVar, Protocol, Sequence, Iterable, Optional from itertools import chain import attrs @@ -15,6 +15,9 @@ from pydantic import ConfigDict +MPC_FLAG_ACTIVE = "MPC_FLAG_ACTIVE" + + class InitStatus(str, Enum): """Keep track of the readyness status of the MPC.""" @@ -44,7 +47,8 @@ class DiscretizationOptions(pydantic.BaseModel): class Results(Protocol): df: pd.DataFrame - def __getitem__(self, item: str) -> Sequence[float]: ... + def __getitem__(self, item: str) -> Sequence[float]: + ... @dataclasses.dataclass @@ -71,6 +75,7 @@ def __contains__(self, item): all_variables = set(chain.from_iterable(self.__dict__.values())) return item in all_variables + VariableReferenceT = TypeVar("VariableReferenceT", bound=BaseVariableReference) diff --git a/agentlib_mpc/modules/__init__.py b/agentlib_mpc/modules/__init__.py index ccd67e49..77a4b6fb 100644 --- a/agentlib_mpc/modules/__init__.py +++ b/agentlib_mpc/modules/__init__.py @@ -23,9 +23,11 @@ def import_class(self): module_path="agentlib_mpc.modules.data_source", class_name="DataSource" ), "mpc_basic": ModuleImport( - module_path="agentlib_mpc.modules.mpc", class_name="BaseMPC" + module_path="agentlib_mpc.modules.mpc.mpc", class_name="BaseMPC" + ), + "mpc": ModuleImport( + module_path="agentlib_mpc.modules.mpc.mpc_full", class_name="MPC" ), - "mpc": ModuleImport(module_path="agentlib_mpc.modules.mpc_full", class_name="MPC"), "minlp_mpc": ModuleImport( module_path="agentlib_mpc.modules.minlp_mpc", class_name="MINLPMPC" ), @@ -66,4 +68,12 @@ def import_class(self): "mhe": ModuleImport( module_path="agentlib_mpc.modules.estimation.mhe", class_name="MHE" ), + "skip_mpc_intervals": ModuleImport( + module_path="agentlib_mpc.modules.deactivate_mpc.deactivate_mpc", + class_name="SkipMPCInIntervals", + ), + "fallback_pid": ModuleImport( + module_path="agentlib_mpc.modules.deactivate_mpc.fallback_pid", + class_name="FallbackPID", + ), } diff --git a/agentlib_mpc/modules/deactivate_mpc/__init__.py b/agentlib_mpc/modules/deactivate_mpc/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/agentlib_mpc/modules/deactivate_mpc/deactivate_mpc.py b/agentlib_mpc/modules/deactivate_mpc/deactivate_mpc.py new file mode 100644 index 00000000..97fa8dbd --- /dev/null +++ b/agentlib_mpc/modules/deactivate_mpc/deactivate_mpc.py @@ -0,0 +1,123 @@ +from agentlib import AgentVariables +from agentlib.core import BaseModule, BaseModuleConfig, AgentVariable +from pydantic import Field +from typing import Optional + +from agentlib_mpc import utils +from agentlib_mpc.data_structures import mpc_datamodels + + +class MPCOnOffConfig(BaseModuleConfig): + active: AgentVariable = Field( + default=AgentVariable( + name=mpc_datamodels.MPC_FLAG_ACTIVE, + description="MPC is active", + type="bool", + value=True, + shared=False, + ), + description="Variable used to activate or deactivate the MPC operation", + ) + inputs: AgentVariables = Field( + default=[], description="Inputs based on which switch decisions can be made." + ) + t_sample: float = Field( + default=60, description="Sends the active variable every other t_sample" + ) + public_active_message: Optional[AgentVariable] = Field( + default=None, + description="If needed, specify an AgentVariable that is sent when the MPC is active, for example to suppress a local controller.", + ) + public_inactive_message: Optional[AgentVariable] = Field( + default=None, + description="If needed, specify an AgentVariable that is sent when the MPC is inactive, for example to awaken a local controller.", + ) + controls_when_deactivated: AgentVariables = Field( + default=[], description="List of AgentVariables to send as Fallback Controls." + ) + + shared_variable_fields: list[str] = [ + "public_active_message", + "controls_when_deactivated", + ] + + +class MPCOnOff(BaseModule): + config: MPCOnOffConfig + + def process(self): + while True: + deactivate_mpc = self.check_mpc_deactivation() + if deactivate_mpc: + self.deactivate_mpc() + else: + self.activate_mpc() + yield self.env.timeout(self.config.t_sample) + + def deactivate_mpc(self): + """Performs mpc deactivation. Sends the deactivation signal, as well as + default control signals.""" + self.set(self.config.active.name, False) + for agent_variable in self.config.controls_when_deactivated: + # actively resend this variable + self.set(agent_variable.name, agent_variable.value) + if self.config.public_inactive_message is not None: + self.set( + self.config.public_inactive_message.name, + self.config.public_inactive_message.value, + ) + + def activate_mpc(self): + """Performs mpc activation. Sends activation signal, as well as the public + active message.""" + self.set(self.config.active.name, True) + if self.config.public_active_message is not None: + self.set( + self.config.public_active_message.name, + self.config.public_active_message.value, + ) + + def check_mpc_deactivation(self) -> bool: + """This function can be overridden, to define conditions based on which an + MPC module within this agent should be deactivated. Returns True if MPC + should be deactivated, and False if it should be active.""" + + def register_callbacks(self): + """This function can be overridden to check the deactivation in an + event-based manner.""" + + +class SkipMPCInIntervalsConfig(MPCOnOffConfig): + """ + Config for a module which deactivates any MPC by sending the variable + `active` in the specified intervals. + """ + + intervals: list[tuple[float, float]] = Field( + default=[], description="If environment time is within these intervals" + ) + time_unit: utils.TimeConversionTypes = Field( + default="seconds", + description="Specifies the unit of the given " + "`skip_mpc_in_intervals`, e.g. seconds or days.", + ) + + +class SkipMPCInIntervals(MPCOnOff): + """ + Module which deactivates any MPC by sending the variable + `active` in the specified intervals. + """ + + config: SkipMPCInIntervalsConfig + + def check_mpc_deactivation(self) -> bool: + if utils.is_time_in_intervals( + time=self.env.time / utils.TIME_CONVERSION[self.config.time_unit], + intervals=self.config.intervals, + ): + self.logger.debug( + "Current time is in skip_mpc_in_intervals, sending active=False to MPC" + ) + return True + return False diff --git a/agentlib_mpc/modules/deactivate_mpc/fallback_pid.py b/agentlib_mpc/modules/deactivate_mpc/fallback_pid.py new file mode 100644 index 00000000..71bb0bfd --- /dev/null +++ b/agentlib_mpc/modules/deactivate_mpc/fallback_pid.py @@ -0,0 +1,101 @@ +import logging +from math import inf, isclose +from typing import Union, Optional + +# Assuming agentlib components are available +from agentlib.core import Agent, AgentVariable +from agentlib.core.errors import ConfigurationError +from agentlib.modules.controller import SISOController, SISOControllerConfig +from agentlib.modules.controller.pid import PIDConfig, PID +from pydantic import Field, field_validator + +from agentlib_mpc.data_structures import mpc_datamodels + + +class FallbackPIDConfig(PIDConfig): + """Config for FallbackPID: Adds the MPC active flag.""" + + mpc_active_flag: AgentVariable = Field( + default=AgentVariable( + name=mpc_datamodels.MPC_FLAG_ACTIVE, type="bool", value=True + ), + description="Boolean variable indicating if MPC is active (True=MPC active, PID inactive).", + ) + + +class FallbackPID(PID): + """ + PID controller active only when the MPC (indicated by mpc_active_flag) is inactive. + Simplified error handling. Assumes configuration and data are valid. + Resets integral state and timing upon activation/deactivation. + """ + + config: FallbackPIDConfig + _mpc_was_active: Optional[bool] = None # Track previous MPC state + + def __init__(self, *, config: FallbackPIDConfig, agent: Agent): + super().__init__(config=config, agent=agent) + # Initialize tracker, actual state checked in first callback + self._mpc_was_active: Optional[bool] = None + self.logger.info( + f"FallbackPID initialized. Monitoring MPC flag '{self.config.mpc_active_flag.name}'." + ) + + def _siso_callback(self, inp: AgentVariable, name: str): + """Handles input, checks MPC status, runs PID if MPC is inactive.""" + + # 1. Get current MPC status (assume variable exists and is bool) + mpc_flag_var = self.get(self.config.mpc_active_flag.name) + mpc_is_active = bool(mpc_flag_var.value) # Assume value is not None + + # 2. Check for state transitions and reset states + if self._mpc_was_active is None: + # First run: just store the state + self._mpc_was_active = mpc_is_active + self.logger.info( + f"First run detected. Initial MPC state: {mpc_is_active}. Fallback PID active: {not mpc_is_active}" + ) + if not mpc_is_active: # If starting active (MPC inactive) + self.last_time = inp.timestamp # Set time correctly for first step + self.integral = 0.0 + self.e_last = 0.0 + + elif mpc_is_active != self._mpc_was_active: + if mpc_is_active: + # Transition: Fallback PID -> INACTIVE (MPC became Active) + self.logger.info( + f"MPC flag '{mpc_flag_var.name}' became True. Deactivating FallbackPID." + ) + self.integral = 0.0 # Reset integral + self.e_last = 0.0 # Reset last error + else: + # Transition: Fallback PID -> ACTIVE (MPC became Inactive) + self.logger.info( + f"MPC flag '{mpc_flag_var.name}' became False. Activating FallbackPID." + ) + # Reset time to current input to avoid large dt spike on first step + self.last_time = inp.timestamp + # Integral and e_last should be 0 from deactivation, but reset again just in case + self.integral = 0.0 + self.e_last = 0.0 + self._mpc_was_active = mpc_is_active # Update tracked state + + # 3. Execute PID logic only if MPC is inactive + if not mpc_is_active: + self.logger.debug( + f"MPC inactive. Running FallbackPID step for input {name}={inp.value}." + ) + # Call the generator's send method, executing do_step + out_val = self._step.send(inp) + + if out_val is not None: + out_name = self.config.output.name + self.logger.debug("Sending FallbackPID output %s=%s", out_name, out_val) + self.set(name=out_name, value=out_val) + else: + self.logger.warning( + "FallbackPID do_step returned None (likely due to small t_sample). No output sent." + ) + else: + # MPC is active, PID is dormant + pass diff --git a/agentlib_mpc/modules/dmpc/__init__.py b/agentlib_mpc/modules/dmpc/__init__.py index e8fcb3ce..4222b084 100644 --- a/agentlib_mpc/modules/dmpc/__init__.py +++ b/agentlib_mpc/modules/dmpc/__init__.py @@ -1,4 +1,4 @@ -from agentlib_mpc.modules.mpc_full import MPC, MPCConfig +from agentlib_mpc.modules.mpc.mpc_full import MPC, MPCConfig class DistributedMPCConfig(MPCConfig): diff --git a/agentlib_mpc/modules/dmpc/admm/admm_coordinated.py b/agentlib_mpc/modules/dmpc/admm/admm_coordinated.py index 3cb5999d..ccc8befd 100644 --- a/agentlib_mpc/modules/dmpc/admm/admm_coordinated.py +++ b/agentlib_mpc/modules/dmpc/admm/admm_coordinated.py @@ -166,6 +166,7 @@ def optimize(self, variable: AgentVariable): self._result = self.optimization_backend.solve( now=self._start_optimization_at, current_vars=opt_inputs ) + self._result_obtained = True # send optimizationData back to coordinator to signal finished # optimization. Select only trajectory where index is at least zero, to not diff --git a/agentlib_mpc/modules/dmpc/admm/admm_coordinator.py b/agentlib_mpc/modules/dmpc/admm/admm_coordinator.py index c3355fb7..cbc29a82 100644 --- a/agentlib_mpc/modules/dmpc/admm/admm_coordinator.py +++ b/agentlib_mpc/modules/dmpc/admm/admm_coordinator.py @@ -260,7 +260,7 @@ def _fast_process(self): """Process function for use in fast-as-possible simulations. Regularly yields control back to the environment, to allow the callbacks to run.""" yield self._wait_non_rt() - + self._status = cdt.CoordinatorStatus.sleeping while True: # ------------------ # start iteration @@ -654,6 +654,13 @@ def _handle_registrations(self): self._initial_registration(variable) def _wrap_up_algorithm(self, iterations): + active_agents_sources = self._agents_with_status( + cdt.AgentStatus.ready + ) # Get list before changing status + for source in active_agents_sources: + # Reset status to standby, ready for the next cycle's init signal + self.agent_dict[source].status = cdt.AgentStatus.standby + self._save_stats(iterations=iterations) self.penalty_parameter = self.config.penalty_factor diff --git a/agentlib_mpc/modules/dmpc/employee.py b/agentlib_mpc/modules/dmpc/employee.py index b053f56d..630be8a0 100644 --- a/agentlib_mpc/modules/dmpc/employee.py +++ b/agentlib_mpc/modules/dmpc/employee.py @@ -14,12 +14,15 @@ from agentlib.core.datamodels import Source from agentlib_mpc.data_structures.coordinator_datatypes import RegistrationMessage import agentlib_mpc.data_structures.coordinator_datatypes as cdt - +from agentlib_mpc.modules.mpc.skippable_mixin import ( + SkippableMixinConfig, + SkippableMixin, +) logger = logging.getLogger(__name__) -class MiniEmployeeConfig(BaseModuleConfig): +class MiniEmployeeConfig(SkippableMixinConfig): request_frequency: float = Field( default=1, description="Wait time between signup_requests" ) @@ -42,13 +45,14 @@ class MiniEmployeeConfig(BaseModuleConfig): shared_variable_fields: list[str] = ["messages_out"] -class MiniEmployee(BaseModule): +class MiniEmployee(SkippableMixin): config: MiniEmployeeConfig def __init__(self, *, config: dict, agent: Agent): super().__init__(config=config, agent=agent) self._registered_coordinator: Source = None self._start_optimization_at: float = 0 + self._result_obtained = False def process(self): # send registration request to coordinator @@ -96,6 +100,13 @@ def init_iteration_callback(self, variable: AgentVariable): """ # value is True on start + should_step_be_skipped = self.check_if_should_be_skipped() + + if should_step_be_skipped: + self.logger.debug("DMPC participant was externally told to to skip.") + self.set(cdt.START_ITERATION_A2C, False) + return + if variable.value: self._start_optimization_at = self.env.time # new measurement @@ -110,7 +121,9 @@ def init_iteration_callback(self, variable: AgentVariable): # value is False on convergence/iteration limit else: - self._finish_optimization() + if self._result_obtained: + self._finish_optimization() + self._result_obtained = False def get_new_measurement(self): """ @@ -147,6 +160,7 @@ def optimize(self, variable: AgentVariable): value = variables.to_dict() self.logger.debug("Sent optimal solution.") + self._result_obtained = True self.set(name=cdt.OPTIMIZATION_A2C, value=value) def shift_trajectories(self): diff --git a/agentlib_mpc/modules/estimation/mhe.py b/agentlib_mpc/modules/estimation/mhe.py index 8701a3cd..22fe9e13 100644 --- a/agentlib_mpc/modules/estimation/mhe.py +++ b/agentlib_mpc/modules/estimation/mhe.py @@ -15,7 +15,11 @@ from agentlib_mpc.data_structures import mpc_datamodels from agentlib_mpc.data_structures.mpc_datamodels import Results -from agentlib_mpc.modules.mpc import create_optimization_backend +from agentlib_mpc.modules.mpc.mpc import create_optimization_backend +from agentlib_mpc.modules.mpc.skippable_mixin import ( + SkippableMixinConfig, + SkippableMixin, +) from agentlib_mpc.optimization_backends.backend import ( OptimizationBackendT, ) @@ -24,7 +28,7 @@ AG_VAR_DICT = dict[str, AgentVariable] -class MHEConfig(BaseModuleConfig): +class MHEConfig(SkippableMixinConfig): """ Pydantic data model for MPC configuration parser """ @@ -92,7 +96,7 @@ def state_weights_are_in_states( return state_weights -class MHE(BaseModule): +class MHE(SkippableMixin): """ A moving horizon estimator. """ @@ -172,14 +176,19 @@ def _init_optimization(self): def process(self): while True: - current_vars = self.collect_variables_for_optimization() - solution = self.optimization_backend.solve( - now=self.env.now, current_vars=current_vars - ) - self._set_estimation(solution) - self._remove_old_values_from_history() + self.do_step() yield self.env.timeout(self.config.time_step) + def do_step(self): + if self.check_if_should_be_skipped(): + return + current_vars = self.collect_variables_for_optimization() + solution = self.optimization_backend.solve( + now=self.env.now, current_vars=current_vars + ) + self._set_estimation(solution) + self._remove_old_values_from_history() + def _remove_old_values_from_history(self): """Clears the history of all entries that are older than current time minus horizon length.""" diff --git a/agentlib_mpc/modules/minlp_mpc.py b/agentlib_mpc/modules/minlp_mpc.py index 9060f5c9..eefbb3f4 100644 --- a/agentlib_mpc/modules/minlp_mpc.py +++ b/agentlib_mpc/modules/minlp_mpc.py @@ -4,7 +4,7 @@ from agentlib_mpc.data_structures import mpc_datamodels from agentlib_mpc.data_structures.mpc_datamodels import MINLPVariableReference -from agentlib_mpc.modules.mpc import BaseMPCConfig, BaseMPC +from agentlib_mpc.modules.mpc.mpc import BaseMPCConfig, BaseMPC logger = logging.getLogger(__name__) diff --git a/agentlib_mpc/modules/ml_model_training/ml_model_trainer.py b/agentlib_mpc/modules/ml_model_training/ml_model_trainer.py index 9f508623..0c815b8a 100644 --- a/agentlib_mpc/modules/ml_model_training/ml_model_trainer.py +++ b/agentlib_mpc/modules/ml_model_training/ml_model_trainer.py @@ -420,7 +420,7 @@ def resample(self) -> pd.DataFrame: sampled = {} for name, sg in source_grids.items(): single_sampled = sample_values_to_target_grid( - values=self.time_series_data[name].dropna(), + values=self.time_series_data[name].dropna().values, original_grid=sg, target_grid=target_grid, method=self.config.interpolations[name], diff --git a/agentlib_mpc/modules/mpc/__init__.py b/agentlib_mpc/modules/mpc/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/agentlib_mpc/modules/mpc.py b/agentlib_mpc/modules/mpc/mpc.py similarity index 100% rename from agentlib_mpc/modules/mpc.py rename to agentlib_mpc/modules/mpc/mpc.py diff --git a/agentlib_mpc/modules/mpc_full.py b/agentlib_mpc/modules/mpc/mpc_full.py similarity index 94% rename from agentlib_mpc/modules/mpc_full.py rename to agentlib_mpc/modules/mpc/mpc_full.py index 07cf53f7..dc3c4ed0 100644 --- a/agentlib_mpc/modules/mpc_full.py +++ b/agentlib_mpc/modules/mpc/mpc_full.py @@ -1,5 +1,8 @@ """Holds the class for full featured MPCs.""" +from typing import Dict, Union, Optional + +import agentlib import numpy as np import pandas as pd from agentlib.core import AgentVariable @@ -8,10 +11,14 @@ from pydantic import Field, field_validator, FieldValidationInfo from rapidfuzz import process, fuzz -from agentlib_mpc.modules.mpc import BaseMPCConfig, BaseMPC +from agentlib_mpc.modules.mpc.mpc import BaseMPCConfig, BaseMPC +from agentlib_mpc.modules.mpc.skippable_mixin import ( + SkippableMixinConfig, + SkippableMixin, +) -class MPCConfig(BaseMPCConfig): +class MPCConfig(BaseMPCConfig, SkippableMixinConfig): """ Pydantic data model for MPC configuration parser """ @@ -43,7 +50,7 @@ def check_r_del_u_in_controls( return r_del_u -class MPC(BaseMPC): +class MPC(BaseMPC, SkippableMixin): """ A model predictive controller. More info to follow. @@ -81,6 +88,8 @@ def _init_optimization(self): self.register_callbacks_for_lagged_variables() def do_step(self): + if self.check_if_should_be_skipped(): + return super().do_step() self._remove_old_values_from_history() diff --git a/agentlib_mpc/modules/mpc/skippable_mixin.py b/agentlib_mpc/modules/mpc/skippable_mixin.py new file mode 100644 index 00000000..58d28e3d --- /dev/null +++ b/agentlib_mpc/modules/mpc/skippable_mixin.py @@ -0,0 +1,56 @@ +from typing import Optional + +import agentlib +from agentlib import AgentVariable +from pydantic import Field, field_validator +from pydantic_core.core_schema import FieldValidationInfo + +from agentlib_mpc.data_structures import mpc_datamodels + + +class SkippableMixinConfig(agentlib.BaseModuleConfig): + enable_deactivation: bool = Field( + default=False, + description="If true, the MPC module uses an AgentVariable `active` which" + "other modules may change to disable the MPC operation " + "temporarily", + ) + deactivation_source: Optional[agentlib.Source] = Field( + default=None, description="Source for the deactivation signal." + ) + active: AgentVariable = Field( + default=AgentVariable( + name=mpc_datamodels.MPC_FLAG_ACTIVE, + description="MPC is active", + type="bool", + value=True, + shared=False, + ), + validate_default=True, + description="Variable used to activate or deactivate the MPC operation", + ) + + @field_validator("active") + def add_deactivation_source(cls, active: AgentVariable, info: FieldValidationInfo): + source = info.data.get("deactivation_source") + if source is not None: + active.source = source + return active + + +class SkippableMixin(agentlib.BaseModule): + config: SkippableMixinConfig + + def check_if_should_be_skipped(self): + """Checks if mpc steps should be skipped based on external activation flag.""" + if not self.config.enable_deactivation: + return False + active = self.get(mpc_datamodels.MPC_FLAG_ACTIVE) + + if active.value == True: + return False + source = str(active.source) + if source == "None_None": + source = "unknown (not specified in config)" + self.logger.info("MPC was deactivated by source %s", source) + return True diff --git a/agentlib_mpc/optimization_backends/backend.py b/agentlib_mpc/optimization_backends/backend.py index 25857416..4841919a 100644 --- a/agentlib_mpc/optimization_backends/backend.py +++ b/agentlib_mpc/optimization_backends/backend.py @@ -185,13 +185,15 @@ def get_lags_per_variable(self) -> dict[str, float]: variables""" return {} - def results_file_exists(self) -> bool: - """Checks if the results file already exists, and if not, creates it with - headers.""" + def results_folder_exists(self) -> bool: + """ + Checks if the results folder already exists, and if not, creates it with + headers. + """ if self._created_file: return True - if self.config.results_file.is_file(): + if self.results_file_exists(): # todo, this case is weird, as it is the mistake-append self._created_file = True return True @@ -201,6 +203,12 @@ def results_file_exists(self) -> bool: self._created_file = True return False + def results_file_exists(self) -> bool: + """ + Checks if the results file already exists. + """ + return self.config.results_file.is_file() + def update_model_variables(self, current_vars: Dict[str, AgentVariable]): """ Internal method to write current data_broker to model variables. diff --git a/agentlib_mpc/optimization_backends/casadi_/admm.py b/agentlib_mpc/optimization_backends/casadi_/admm.py index d47ec6a3..8a403b3d 100644 --- a/agentlib_mpc/optimization_backends/casadi_/admm.py +++ b/agentlib_mpc/optimization_backends/casadi_/admm.py @@ -137,6 +137,9 @@ def _discretize(self, sys: CasadiADMMSystem): # penalty for control change between time steps self.objective_function += ts * ca.dot(du_weights, (u_prev - uk) ** 2) + # New parameter for inputs + dk = self.add_opt_par(sys.non_controlled_inputs) + # perform inner collocation loop # perform inner collocation loop opt_vars_inside_inner = [ @@ -150,10 +153,10 @@ def _discretize(self, sys: CasadiADMMSystem): sys.multipliers, sys.exchange_multipliers, sys.exchange_diff, - sys.non_controlled_inputs ] constant_over_inner = { sys.controls: uk, + sys.non_controlled_inputs: dk, sys.model_parameters: const_par, sys.penalty_factor: rho, } @@ -271,6 +274,7 @@ def _discretize(self, sys: CasadiADMMSystem): p=ca.vertcat( current_control, local_coupling, + local_exchange, disturbance, model_parameters, algebraic_vars, @@ -308,6 +312,7 @@ def _create_ode( p = ca.vertcat( sys.controls.full_symbolic, sys.local_couplings.full_symbolic, + sys.local_exchange.full_symbolic, sys.non_controlled_inputs.full_symbolic, sys.model_parameters.full_symbolic, sys.algebraics.full_symbolic, @@ -376,7 +381,7 @@ def save_result_df( res_file = self.config.results_file - if self.results_file_exists(): + if self.results_folder_exists(): self.it += 1 if now != self.now: # means we advanced to next step self.it = 0 diff --git a/agentlib_mpc/optimization_backends/casadi_/core/casadi_backend.py b/agentlib_mpc/optimization_backends/casadi_/core/casadi_backend.py index b333fc8a..94be5d28 100644 --- a/agentlib_mpc/optimization_backends/casadi_/core/casadi_backend.py +++ b/agentlib_mpc/optimization_backends/casadi_/core/casadi_backend.py @@ -50,6 +50,13 @@ class CasadiBackendConfig(BackendConfig): description="Boolean to turn JIT of the optimization problems on or off.", validate_default=True, ) + save_only_stats: bool = pydantic.Field( + default=False, + description="If results should be saved, setting this to True will only save" + "the optimization statistics. May be useful for longer timespans," + "if the results with all predictions gets too large." + ) + @pydantic.field_validator("do_jit") @classmethod @@ -281,13 +288,24 @@ def save_result_df( return res_file = self.config.results_file - if not self.results_file_exists(): + stats_file = stats_path(res_file) + + first_entry = False + # Handle stats + if not self.results_folder_exists(): + results.write_stats_columns(stats_file) + first_entry = True + + with open(stats_file, "a") as f: + f.writelines(results.stats_line(str(now))) + + if self.config.save_only_stats: + return + + # Handle all results, including predictions + if first_entry: results.write_columns(res_file) - results.write_stats_columns(stats_path(res_file)) df = results.df df.index = list(map(lambda x: str((now, x)), df.index)) df.to_csv(res_file, mode="a", header=False) - - with open(stats_path(res_file), "a") as f: - f.writelines(results.stats_line(str(now))) diff --git a/agentlib_mpc/optimization_backends/casadi_/core/discretization.py b/agentlib_mpc/optimization_backends/casadi_/core/discretization.py index 170dec55..81a149cb 100644 --- a/agentlib_mpc/optimization_backends/casadi_/core/discretization.py +++ b/agentlib_mpc/optimization_backends/casadi_/core/discretization.py @@ -184,15 +184,6 @@ def solve(self, mpc_inputs: MPCInputs) -> Results: # format and return solution mpc_output = self._nlp_outputs_to_mpc_outputs(vars_at_optimum=nlp_output["x"]) - # clip binary values within tolerance - if "w" in mpc_output: - tolerance = 1e-5 - bin_array = mpc_output["w"].full() - bin_array = np.where((-tolerance < bin_array) & (bin_array < 0), 0, - np.where((1 < bin_array) & (bin_array < 1 + tolerance), - 1, bin_array)) - mpc_output["w"] = bin_array - self._remember_solution(mpc_output) result = self._process_solution(inputs=mpc_inputs, outputs=mpc_output) return result @@ -227,9 +218,7 @@ def _determine_initial_guess(self, mpc_inputs: MPCInputs) -> MPCInputs: + mpc_inputs[f"ub_{denotation}"] ) ) - guess = np.nan_to_num( - guess, posinf=0, neginf=-0 - ) + guess = np.nan_to_num(guess, posinf=0, neginf=-0) guesses.update({GUESS_PREFIX + denotation: guess}) return guesses @@ -253,7 +242,7 @@ def _process_solution(self, inputs: dict, outputs: dict) -> Results: for key, value in inputs.items(): key: str if key.startswith(GUESS_PREFIX): - out_key = key[len(GUESS_PREFIX):] + out_key = key[len(GUESS_PREFIX) :] inputs[key] = outputs[out_key] result_matrix = self._result_map(**inputs)["result"] diff --git a/agentlib_mpc/optimization_backends/casadi_/full.py b/agentlib_mpc/optimization_backends/casadi_/full.py index f136c71e..ace09d09 100644 --- a/agentlib_mpc/optimization_backends/casadi_/full.py +++ b/agentlib_mpc/optimization_backends/casadi_/full.py @@ -71,12 +71,16 @@ def _discretize(self, sys: FullSystem): # penalty for control change between time steps self.objective_function += ts * ca.dot(du_weights, (u_prev - uk) ** 2) + # New parameter for inputs + dk = self.add_opt_par(sys.non_controlled_inputs) + # perform inner collocation loop opt_vars_inside_inner = [sys.algebraics, sys.outputs] - opt_pars_inside_inner = [sys.non_controlled_inputs] + opt_pars_inside_inner = [] constant_over_inner = { sys.controls: uk, + sys.non_controlled_inputs: dk, sys.model_parameters: const_par } xk_end, constraints = self._collocation_inner_loop( diff --git a/agentlib_mpc/optimization_backends/casadi_/minlp.py b/agentlib_mpc/optimization_backends/casadi_/minlp.py index 2134a74d..5c8f5dfb 100644 --- a/agentlib_mpc/optimization_backends/casadi_/minlp.py +++ b/agentlib_mpc/optimization_backends/casadi_/minlp.py @@ -89,12 +89,16 @@ def _discretize(self, sys: CasadiMINLPSystem): uk = self.add_opt_var(sys.controls) wk = self.add_opt_var(sys.binary_controls) + # New parameter for inputs + dk = self.add_opt_par(sys.non_controlled_inputs) + # perform inner collocation loop opt_vars_inside_inner = [sys.algebraics, sys.outputs] - opt_pars_inside_inner = [sys.non_controlled_inputs] + opt_pars_inside_inner = [] constant_over_inner = { sys.controls: uk, + sys.non_controlled_inputs: dk, sys.model_parameters: const_par, sys.binary_controls: wk, } diff --git a/agentlib_mpc/utils/__init__.py b/agentlib_mpc/utils/__init__.py index 63fff1c4..a4a22bf8 100644 --- a/agentlib_mpc/utils/__init__.py +++ b/agentlib_mpc/utils/__init__.py @@ -11,3 +11,17 @@ "hours": 3600, "days": 86400, } + + +def is_time_in_intervals(time: float, intervals: list[tuple[float, float]]) -> bool: + """ + Check if given time is within any of the provided intervals. + + Args: + time: The time value to check + intervals: List of tuples, each containing (start_time, end_time) + + Returns: + True if time falls within any interval, False otherwise + """ + return any(start <= time <= end for start, end in intervals) diff --git a/agentlib_mpc/utils/plotting/mpc_dashboard.py b/agentlib_mpc/utils/plotting/mpc_dashboard.py new file mode 100644 index 00000000..748b51d9 --- /dev/null +++ b/agentlib_mpc/utils/plotting/mpc_dashboard.py @@ -0,0 +1,656 @@ +import re +import webbrowser +from pathlib import Path +from typing import Dict, Union, Optional, Literal, Any, List, Tuple + +import dash +import h5py +import numpy as np +import pandas as pd +import plotly.graph_objects as go +from dash import html, dcc +from dash.dependencies import Input, Output, State + +# Keep existing imports +from agentlib_mpc.utils import TIME_CONVERSION +from agentlib_mpc.utils.analysis import load_mpc, load_mpc_stats +from agentlib_mpc.utils.plotting.basic import EBCColors +from agentlib_mpc.utils.plotting.interactive import get_port, obj_plot, solver_return +from agentlib_mpc.utils.plotting.mpc import interpolate_colors + + +def reduce_triple_index(df: pd.DataFrame) -> pd.DataFrame: + """ + Reduce a triple-indexed DataFrame to a double index by keeping only the rows + with the largest level 1 index for each unique level 0 index. + + Args: + df: DataFrame with either double or triple index + + Returns: + DataFrame with double index + """ + if len(df.index.levels) == 2: + return df + + # Group by level 0 and get the maximum level 1 index for each group + idx = df.index.get_level_values(0) + sub_idx = df.index.get_level_values(1) + max_sub_indices = df.groupby(idx)[[]].max().index + + # Create a mask for rows we want to keep + mask = pd.Series(False, index=df.index) + for time in max_sub_indices: + max_sub_idx = df.loc[time].index.get_level_values(0).max() + mask.loc[(time, max_sub_idx)] = True + + # Apply the mask and drop the middle level + return df[mask].droplevel(1) + + +def is_mhe_data(series: pd.Series) -> bool: + """ + Detect if the data represents MHE (Moving Horizon Estimator) results + rather than MPC predictions. + + Args: + series: Series of predictions with time steps as index + + Returns: + bool: True if the data appears to be MHE data, False otherwise + """ + # Get the unique prediction time points + unique_time_points = series.index.unique(level=0) + + # For each time point, check the distribution of indices + negative_indices_count = 0 + positive_indices_count = 0 + + for time_point in unique_time_points: + prediction = series.xs(time_point, level=0) + # Count negative and non-negative indices + negative_indices_count += sum(prediction.index < 0) + positive_indices_count += sum(prediction.index >= 0) + + # If we have mostly negative indices with just a few non-negative ones, + # it's likely MHE data (which primarily contains past states) + if negative_indices_count > 0 and positive_indices_count <= unique_time_points.size: + return True + + return False + + +def plot_mpc_plotly( + series: pd.Series, + step: bool = False, + convert_to: Literal["seconds", "minutes", "hours", "days"] = "seconds", + y_axis_label: str = "", + use_datetime: bool = False, + max_predictions: int = 1000, +) -> go.Figure: + """ + Create a plotly figure from MPC prediction series. + + Args: + series: Series of MPC predictions with time steps as index + step: Whether to display step plots (True) or continuous lines (False) + convert_to: Unit for time conversion + y_axis_label: Label for y-axis + use_datetime: Whether to interpret timestamps as datetime + max_predictions: Maximum number of predictions to show (for performance) + + Returns: + Plotly figure object + """ + fig = go.Figure() + predictions_grouped = series.groupby(level=0) + number_of_predictions = predictions_grouped.ngroups + + # Detect if this is MHE data + is_mhe = is_mhe_data(series) + + # Sample predictions if there are too many + if number_of_predictions > max_predictions: + # Always include the most recent prediction + most_recent_time = series.index.unique(level=0)[-1] + + # Calculate step size for the remaining predictions + remaining_slots = max_predictions - 1 + step_size = (number_of_predictions - 1) // remaining_slots + + # Select evenly spaced predictions and combine with most recent + selected_times = series.index.unique(level=0)[:-1:step_size][:remaining_slots] + selected_times = pd.Index(list(selected_times) + [most_recent_time]) + predictions_iterator = ((t, series.xs(t, level=0)) for t in selected_times) + number_of_predictions = max_predictions + else: + selected_times = series.index.unique(level=0) + predictions_iterator = ((t, series.xs(t, level=0)) for t in selected_times) + + # stores the first value of each prediction (only for selected times) + actual_values: dict[float, float] = {} + + for i, (time_seconds, prediction) in enumerate(predictions_iterator): + prediction: pd.Series = prediction.dropna() + + # For MPC, only show future values (index >= 0) + # For MHE, show all values including past (don't filter) + if not is_mhe: + prediction = prediction[prediction.index >= 0] + + if use_datetime: + time_converted = pd.Timestamp(time_seconds, unit="s", tz="UTC").tz_convert( + "Europe/Berlin" + ) + relative_times = prediction.index + try: + # For MHE, the reference point is typically at index 0 + # For MPC, the reference point is also at index 0 + actual_values[time_converted] = prediction.loc[0] + except KeyError: + pass + timedeltas = pd.to_timedelta(relative_times, unit="s") + base_time = pd.Timestamp(time_seconds, unit="s", tz="UTC") + prediction.index = base_time + timedeltas + prediction.index = prediction.index.tz_convert("Europe/Berlin") + else: + time_converted = time_seconds / TIME_CONVERSION[convert_to] + try: + actual_values[time_converted] = prediction.loc[0] + except KeyError: + pass + prediction.index = (prediction.index + time_seconds) / TIME_CONVERSION[ + convert_to + ] + + progress = i / number_of_predictions + prediction_color = interpolate_colors( + progress=progress, + colors=[EBCColors.red, EBCColors.dark_grey], + ) + + # For MHE data, use a different line style to visually distinguish from MPC + line_style = "dash" if is_mhe else None + line_width = 1.0 if is_mhe else 0.7 + + trace_kwargs = dict( + x=prediction.index, + y=prediction, + mode="lines", + line=dict( + color=f"rgb{prediction_color}", + width=line_width, + shape="hv" if step else None, + dash=line_style, + ), + name=( + f"{time_converted}" + if use_datetime + else f"{time_converted} {convert_to[0]}" + ), + legendgroup="Prediction", + legendgrouptitle_text="Predictions", + visible=True, + legendrank=i + 2, + ) + + fig.add_trace(go.Scattergl(**trace_kwargs)) + + actual_series = pd.Series(actual_values) + fig.add_trace( + go.Scattergl( + x=actual_series.index, + y=actual_series, + mode="lines", + line=dict(color="black", width=1.5, shape="hv" if step else None), + name="Actual Values", + legendrank=1, + ) + ) + + # Add annotation to indicate if this is MHE data + if is_mhe: + fig.add_annotation( + x=0.05, + y=0.95, + xref="paper", + yref="paper", + text="MHE Data (includes past values)", + showarrow=False, + font=dict(color="red", size=12), + bgcolor="rgba(255, 255, 255, 0.8)", + bordercolor="red", + borderwidth=1, + borderpad=4, + ) + + x_axis_label = "Time" if use_datetime else f"Time in {convert_to}" + fig.update_layout( + showlegend=True, + legend=dict( + groupclick="toggleitem", + itemclick="toggle", + itemdoubleclick="toggleothers", + ), + xaxis_title=x_axis_label, + yaxis_title=y_axis_label, + uirevision="same", + ) + + return fig + + +def make_components( + data: pd.DataFrame, + convert_to: str, + stats: Optional[pd.DataFrame] = None, + use_datetime: bool = False, + step: bool = False, +) -> html.Div: + """ + Create dashboard components from MPC data and stats. + + Args: + data: DataFrame with MPC data + convert_to: Time unit for plotting + stats: Optional DataFrame with MPC statistics + use_datetime: Whether to interpret timestamps as datetime + step: Whether to use step plots + + Returns: + Dash HTML Div containing all components + """ + components = [] + + # Add statistics components if available + if stats is not None: + # Add solver iterations plot + solver_plot = solver_return(stats, convert_to) + if solver_plot is not None: + components.insert(0, html.Div([solver_plot])) + + # Add objective plot if available + obj_value_plot = obj_plot(stats, convert_to) + if obj_value_plot is not None: + components.insert(1, html.Div([obj_value_plot])) + + # Create one component for each variable + # Remove try-except to expose errors directly + if isinstance(data.columns, pd.MultiIndex): + for var_type, column in data.columns: + if var_type == "variable": + components.append( + html.Div( + [ + dcc.Graph( + id=f"plot-{column}", + figure=plot_mpc_plotly( + data[var_type][column], + step=step, + convert_to=convert_to, + y_axis_label=column, + use_datetime=use_datetime, + ), + style={ + "min-width": "600px", + "min-height": "400px", + "max-width": "900px", + "max-height": "450px", + }, + ), + ], + className="draggable", + ) + ) + # Handle alternative column structures explicitly without exception handling + elif isinstance(data.columns, pd.Index): + for column in data.columns: + if column.startswith("variable_"): + column_name = column.replace("variable_", "") + components.append( + html.Div( + [ + dcc.Graph( + id=f"plot-{column_name}", + figure=plot_mpc_plotly( + data[column], + step=step, + convert_to=convert_to, + y_axis_label=column_name, + use_datetime=use_datetime, + ), + style={ + "min-width": "600px", + "min-height": "400px", + "max-width": "900px", + "max-height": "450px", + }, + ), + ], + className="draggable", + ) + ) + + return html.Div( + components, + style={ + "display": "grid", + "grid-template-columns": "repeat(auto-fit, minmax(600px, 1fr))", + "grid-gap": "20px", + "padding": "20px", + "min-width": "600px", + "min-height": "200px", + }, + id="plot-container", + ) + + +def detect_index_type(data: pd.DataFrame) -> Tuple[bool, bool]: + """ + Detect the type of index in the DataFrame. + + Args: + data: DataFrame to check + + Returns: + Tuple of (is_multi_index, is_datetime) + """ + is_multi_index = isinstance(data.index, pd.MultiIndex) + + # Check if it's a datetime index (or the first level is datetime) + if is_multi_index: + first_level = data.index.levels[0] + is_datetime = pd.api.types.is_datetime64_any_dtype(first_level) + if not is_datetime: + # Check if it might be a Unix timestamp (large integer values) + if pd.api.types.is_numeric_dtype(first_level): + is_datetime = ( + first_level.max() > 1e9 + ) # Simple heuristic for Unix timestamp + else: + is_datetime = pd.api.types.is_datetime64_any_dtype(data.index) + if not is_datetime and pd.api.types.is_numeric_dtype(data.index): + is_datetime = data.index.max() > 1e9 + + return is_multi_index, is_datetime + + +def show_multi_room_dashboard( + results: Dict[str, Dict[str, Any]], scale: str = "hours", step: bool = False +): + """ + Show a dashboard with dropdown selection for different agents/rooms. + + Args: + results: Dictionary with agent results from mas.get_results() + scale: Time scale for plotting ("seconds", "minutes", "hours", "days") + step: Whether to use step plots + """ + app = dash.Dash(__name__, title="Multi-Agent MPC Results") + + # Get all agents + agent_ids = list(results.keys()) + + if not agent_ids: + raise ValueError("No agents found in results dictionary") + + # Find first valid MPC data to determine index type + first_agent_id = None + first_module_id = None + for agent_id in agent_ids: + for module_id, module_data in results[agent_id].items(): + if isinstance(module_data, pd.DataFrame): + first_agent_id = agent_id + first_module_id = module_id + break + if first_agent_id: + break + + if not first_agent_id: + raise ValueError("No valid MPC data found in results") + + first_data = results[first_agent_id][first_module_id] + is_multi_index, use_datetime = detect_index_type(first_data) + + # Create agent and module selector dropdowns + app.layout = html.Div( + [ + html.H1("Multi-Agent MPC Results"), + html.Div( + [ + html.Div( + [ + html.Label("Select Agent:"), + dcc.Dropdown( + id="agent-selector", + options=[ + {"label": agent_id, "value": agent_id} + for agent_id in agent_ids + ], + value=first_agent_id, + ), + ], + style={ + "width": "300px", + "margin": "10px", + "display": "inline-block", + }, + ), + html.Div( + [ + html.Label("Select Module:"), + dcc.Dropdown( + id="module-selector", + # Options will be set by callback + ), + ], + style={ + "width": "300px", + "margin": "10px", + "display": "inline-block", + }, + ), + ], + ), + html.Div( + html.Button( + "Toggle Step Plot", id="toggle-step", style={"margin": "10px"} + ) + ), + html.Div(id="agent-dashboard"), + dcc.Store(id="step-state", data=step), + ] + ) + + @app.callback( + [Output("module-selector", "options"), Output("module-selector", "value")], + [Input("agent-selector", "value")], + ) + def update_module_options(selected_agent): + if not selected_agent: + return [], None + + module_options = [] + first_module = None + + for module_id, module_data in results[selected_agent].items(): + if isinstance(module_data, pd.DataFrame): + module_options.append({"label": module_id, "value": module_id}) + if first_module is None: + first_module = module_id + + return module_options, first_module + + @app.callback( + Output("step-state", "data"), + [Input("toggle-step", "n_clicks")], + [State("step-state", "data")], + ) + def toggle_step_plot(n_clicks, current_step): + if n_clicks: + return not current_step + return current_step + + @app.callback( + Output("agent-dashboard", "children"), + [ + Input("agent-selector", "value"), + Input("module-selector", "value"), + Input("step-state", "data"), + ], + ) + def update_dashboard(selected_agent, selected_module, step_state): + if not selected_agent or not selected_module: + return html.Div("Please select both an agent and a module") + + # Remove try-except to expose errors directly + data = results[selected_agent][selected_module] + + if not isinstance(data, pd.DataFrame): + return html.Div(f"Selected module does not contain valid MPC data") + + # Reduce triple index to double index if needed + if isinstance(data.index, pd.MultiIndex) and len(data.index.levels) > 2: + data = reduce_triple_index(data) + + # Check if data needs time normalization + if is_multi_index and not use_datetime: + # Remove try-except to expose errors directly + first_time = data.index.levels[0][0] + data.index = data.index.set_levels( + data.index.levels[0] - first_time, level=0 + ) + + # Get stats data if available + stats = None + if f"{selected_module}_stats" in results[selected_agent]: + stats = results[selected_agent][f"{selected_module}_stats"] + + # Create the dashboard components + return make_components( + data=data, + convert_to=scale, + stats=stats, + use_datetime=use_datetime, + step=step_state, + ) + + # Launch the dashboard + port = get_port() + webbrowser.open_new_tab(f"http://localhost:{port}") + app.run(debug=False, port=port) + + +def launch_dashboard_from_results( + results: Dict[str, Dict[str, Any]], scale: str = "hours", step: bool = False +) -> bool: + """ + Launch the multi-agent dashboard from results dictionary returned by mas.get_results(). + + Args: + results: Dictionary with agent results from mas.get_results() + scale: Time scale for plotting ("seconds", "minutes", "hours", "days") + step: Whether to use step plots + + Returns: + bool: True if dashboard was launched, False otherwise + """ + if not results or not isinstance(results, dict): + raise ValueError("Invalid results: Expected non-empty dictionary") + + # Validate results structure + valid_data_found = False + + for agent_id, agent_data in results.items(): + if not isinstance(agent_data, dict): + continue + + for module_id, module_data in agent_data.items(): + if not isinstance(module_data, pd.DataFrame): + continue + + # Check if this DataFrame has the expected structure for MPC data + if isinstance(module_data.index, pd.MultiIndex): + if len(module_data.index.levels) > 1: + # This looks like MPC data with multi-level index + valid_data_found = True + break + else: + # Single level index might still be valid for some data + valid_data_found = module_data.shape[0] > 0 + break + + if valid_data_found: + break + + if not valid_data_found: + raise ValueError("No valid MPC data found in results") + + # Launch the dashboard without catching exceptions + print(f"Launching dashboard with scale={scale}") + show_multi_room_dashboard(results, scale=scale, step=step) + return True + + +def process_mas_results( + results: Dict[str, Dict[str, Any]], +) -> Dict[str, Dict[str, Any]]: + """ + Process results from LocalMASAgency to prepare them for visualization. + + Args: + results: Raw results from mas.get_results() + + Returns: + Processed results ready for dashboard visualization + """ + processed_results = {} + + for agent_id, agent_data in results.items(): + processed_results[agent_id] = {} + + # Find all DataFrame modules that could be MPC data + for module_id, module_data in agent_data.items(): + if not isinstance(module_data, pd.DataFrame): + continue + + # Remove try-except to expose errors directly + # Check if this looks like MPC data + if isinstance(module_data.index, pd.MultiIndex): + if isinstance(module_data.columns, pd.MultiIndex): + # This is likely MPC data with variables, parameters, etc. + processed_results[agent_id][module_id] = module_data + elif any( + col.startswith(("variable_", "parameter_")) + for col in module_data.columns + ): + # This might be MPC data with flattened column names + processed_results[agent_id][module_id] = module_data + + # Check for stats data with matching prefix + stats_module_id = f"{module_id}_stats" + if stats_module_id in agent_data and isinstance( + agent_data[stats_module_id], pd.DataFrame + ): + processed_results[agent_id][stats_module_id] = agent_data[ + stats_module_id + ] + + return processed_results + + +if __name__ == "__main__": + # Example usage + import sys + + if len(sys.argv) > 1: + # If a path is provided as an argument, try to load from files + path = Path(sys.argv[1]) + if path.exists() and path.is_dir(): + print(f"Loading data from directory: {path}") + # Note: This function is referenced but not defined in the provided code + # show_multi_room_dashboard_from_files(path, scale="hours") + else: + raise FileNotFoundError(f"Directory not found: {path}") + else: + print("No directory specified. Please provide a directory path.") diff --git a/agentlib_mpc/utils/sampling.py b/agentlib_mpc/utils/sampling.py index 5483cf60..d58370f8 100644 --- a/agentlib_mpc/utils/sampling.py +++ b/agentlib_mpc/utils/sampling.py @@ -3,6 +3,7 @@ from typing import Union, Iterable, Sequence, List from numbers import Real +from io import StringIO import numpy as np import pandas as pd @@ -51,10 +52,13 @@ def sample( Obtain the specified portion of the trajectory. Args: - trajectory: The trajectory to be sampled. Scalars will be - expanded onto the grid. Lists need to exactly match the provided - grid. Otherwise, a pandas Series is accepted with the timestamp as index. A - dict with the keys as time stamps is also accepted. + trajectory: The trajectory to be sampled. Accepted formats: + - Scalars will be expanded onto the grid. + - Lists need to exactly match the provided grid. + - A pandas Series is accepted with the timestamp as index. + - A dict with the keys as time stamps is also accepted. + - A pandas Series serialized as a json-str is accepted. + Any given string will be cast as Series. current: start time of requested trajectory grid: target interpolation grid in seconds in relative terms (i.e. starting from 0 usually) @@ -85,6 +89,9 @@ def sample( f"Passed list with length {len(trajectory)} " f"does not match target ({target_grid_length})." ) + if isinstance(trajectory, str): + trajectory = pd.read_json(StringIO(trajectory), typ="series", convert_axes=False) + trajectory.index = trajectory.index.astype(float) if isinstance(trajectory, pd.Series): trajectory = trajectory.dropna() source_grid = np.array(trajectory.index) diff --git a/examples/admm/admm_example_coordinator.py b/examples/admm/admm_example_coordinator.py index 0af7101a..dd484156 100644 --- a/examples/admm/admm_example_coordinator.py +++ b/examples/admm/admm_example_coordinator.py @@ -122,6 +122,6 @@ def run_example( until=1800, start_pred=0, show_dashboard=False, - cleanup=True, + cleanup=False, log_level=logging.INFO, ) diff --git a/examples/one_room_mpc/physical/simple_mpc_with_deactivation.py b/examples/one_room_mpc/physical/simple_mpc_with_deactivation.py new file mode 100644 index 00000000..79eab59b --- /dev/null +++ b/examples/one_room_mpc/physical/simple_mpc_with_deactivation.py @@ -0,0 +1,378 @@ +import logging +import os +from pathlib import Path +from typing import List + +import pandas as pd + +from agentlib_mpc.models.casadi_model import ( + CasadiModel, + CasadiInput, + CasadiState, + CasadiParameter, + CasadiOutput, + CasadiModelConfig, +) +from agentlib.utils.multi_agent_system import LocalMASAgency + +from agentlib_mpc.utils.analysis import load_mpc_stats +from agentlib_mpc.utils.plotting.interactive import show_dashboard + +logger = logging.getLogger(__name__) + +# script variables +ub = 295.15 + + +class MyCasadiModelConfig(CasadiModelConfig): + inputs: List[CasadiInput] = [ + # controls + CasadiInput( + name="mDot", + value=0.0225, + unit="m³/s", + description="Air mass flow into zone", + ), + # disturbances + CasadiInput( + name="load", value=150, unit="W", description="Heat load into zone" + ), + CasadiInput( + name="T_in", value=290.15, unit="K", description="Inflow air temperature" + ), + # settings + CasadiInput( + name="T_upper", + value=294.15, + unit="K", + description="Upper boundary (soft) for T.", + ), + ] + + states: List[CasadiState] = [ + # differential + CasadiState( + name="T", value=293.15, unit="K", description="Temperature of zone" + ), + # algebraic + # slack variables + CasadiState( + name="T_slack", + value=0, + unit="K", + description="Slack variable of temperature of zone", + ), + ] + + parameters: List[CasadiParameter] = [ + CasadiParameter( + name="cp", + value=1000, + unit="J/kg*K", + description="thermal capacity of the air", + ), + CasadiParameter( + name="C", value=100000, unit="J/K", description="thermal capacity of zone" + ), + CasadiParameter( + name="s_T", + value=1, + unit="-", + description="Weight for T in constraint function", + ), + CasadiParameter( + name="mpc_active", + value=1, + unit="1", + description="Flag, whether mpc or default control is used.", + ), + CasadiParameter( + name="mDot_default", + value=0.025, + unit="kg/s", + description="Default mass flow.", + ), + CasadiParameter( + name="r_mDot", + value=1, + unit="-", + description="Weight for mDot in objective function", + ), + ] + outputs: List[CasadiOutput] = [ + CasadiOutput(name="T_out", unit="K", description="Temperature of zone") + ] + + +class MyCasadiModel(CasadiModel): + config: MyCasadiModelConfig + + def setup_system(self): + mDot = self.mDot * self.mpc_active + self.mDot_default * (1 - self.mpc_active) + + # Define ode + self.T.ode = self.cp * mDot / self.C * (self.T_in - self.T) + self.load / self.C + + # Define ae + self.T_out.alg = self.T + + # Constraints: List[(lower bound, function, upper bound)] + self.constraints = [ + # soft constraints + (0, self.T + self.T_slack, self.T_upper), + ] + + # Objective function + objective = sum( + [ + self.r_mDot * mDot, + self.s_T * self.T_slack**2, + ] + ) + + return objective + + +ENV_CONFIG = {"rt": False, "factor": 0.01, "t_sample": 60} + +AGENT_MPC = { + "id": "myMPCAgent", + "modules": [ + {"module_id": "Ag1Com", "type": "local_broadcast"}, + { + "module_id": "skip_mpc", + "type": "agentlib_mpc.skip_mpc_intervals", + "intervals": [[30, 35], [50, 55]], + "time_unit": "minutes", + "log_level": "debug", + "public_active_message": { + "name": "public_active_message", + "alias": "mpc_active", + "value": True, + }, + "public_inactive_message": { + "name": "public_inactive_message", + "alias": "mpc_active", + "value": False, + }, + }, + { + "module_id": "myMPC", + "type": "agentlib_mpc.mpc", + "optimization_backend": { + "type": "casadi", + "model": {"type": {"file": __file__, "class_name": "MyCasadiModel"}}, + "discretization_options": { + "collocation_order": 2, + "collocation_method": "legendre", + }, + "solver": { + "name": "fatrop", # use fatrop with casadi 3.6.6 for speedup + }, + "results_file": "results//mpc.csv", + "save_results": True, + "overwrite_result_file": True, + }, + "time_step": 300, + "prediction_horizon": 15, + "parameters": [ + {"name": "s_T", "value": 3}, + {"name": "r_mDot", "value": 1}, + ], + "inputs": [ + {"name": "T_in", "value": 290.15}, + {"name": "load", "value": 150}, + {"name": "T_upper", "value": ub}, + ], + "controls": [{"name": "mDot", "value": 0.02, "ub": 0.05, "lb": 0}], + "outputs": [{"name": "T_out"}], + "states": [ + { + "name": "T", + "value": 298.16, + "ub": 303.15, + "lb": 288.15, + "alias": "T", + "source": "SimAgent", + } + ], + "enable_deactivation": True, + "deactivation_source": {"module_id": "skip_mpc"}, + }, + ], +} +AGENT_SIM = { + "id": "SimAgent", + "modules": [ + {"module_id": "Ag1Com", "type": "local_broadcast"}, + { + "module_id": "room", + "type": "simulator", + "model": { + "type": {"file": __file__, "class_name": "MyCasadiModel"}, + "states": [{"name": "T", "value": 298.16}], + }, + "t_sample": 10, + "update_inputs_on_callback": False, + "save_results": True, + "result_causalities": ["input", "parameter", "local", "output"], + "outputs": [ + {"name": "T_out", "value": 298, "alias": "T"}, + ], + "inputs": [ + {"name": "mDot", "value": 0.02, "alias": "mDot"}, + ], + "parameters": [{"name": "mpc_active", "value": True}], + }, + ], +} + + +def run_example( + with_plots=True, log_level=logging.INFO, until=10000, with_dashboard=False +): + # Change the working directly so that relative paths work + os.chdir(Path(__file__).parent) + + # Set the log-level + logging.basicConfig(level=log_level) + mas = LocalMASAgency( + agent_configs=[AGENT_MPC, AGENT_SIM], env=ENV_CONFIG, variable_logging=True + ) + mas.run(until=until) + try: + stats = load_mpc_stats("results/__mpc.csv") + except Exception: + stats = None + results = mas.get_results(cleanup=False) + mpc_results = results["myMPCAgent"]["myMPC"] + sim_res = results["SimAgent"]["room"] + + if with_plots: + # Pass the full results to plot + plot(sim_res, until, results) + + if with_dashboard: + show_dashboard(mpc_results, stats) + + return results + + +def plot(sim_res: pd.DataFrame, until: float, results=None): + import matplotlib.pyplot as plt + import numpy as np + + # Calculate performance metrics + t_sim = sim_res["T_out"] + t_sample = t_sim.index[1] - t_sim.index[0] + aie_kh = (t_sim - ub).abs().sum() * t_sample / 3600 + energy_cost_kWh = ( + (sim_res["mDot"] * (sim_res["T_out"] - sim_res["T_in"])).sum() + * t_sample + * 1 + / 3600 + ) # cp is 1 + print(f"Absolute integral error: {aie_kh} Kh.") + print(f"Cooling energy used: {energy_cost_kWh} kWh.") + + # Create a figure with 3 subplots + fig, ax = plt.subplots(3, 1, sharex=True, figsize=(12, 10)) + + # Plot room temperature (from simulator results) + ax[0].plot( + sim_res.index, + sim_res["T_out"] - 273.15, + "b-", + linewidth=2, + label="Room Temperature", + ) + ax[0].axhline( + ub - 273.15, color="red", linestyle="--", linewidth=1.5, label="Upper Boundary" + ) + ax[0].set_ylabel("Temperature (°C)", fontsize=12) + ax[0].legend(fontsize=10) + ax[0].grid(True, alpha=0.3) + ax[0].set_title("Room Temperature Control System Monitoring", fontsize=14) + + # Plot mass flow (from simulator results) + ax[1].plot(sim_res.index, sim_res["mDot"], "g-", linewidth=2, label="Air Mass Flow") + ax[1].set_ylabel("Mass Flow (kg/s)", fontsize=12) + ax[1].set_ylim([0, 0.06]) + ax[1].legend(fontsize=10) + ax[1].grid(True, alpha=0.3) + + # Get the AgentLogger data and plot MPC active flag + mpc_flag_found = False + if results and "myMPCAgent" in results and "AgentLogger" in results["myMPCAgent"]: + agent_logger = results["myMPCAgent"]["AgentLogger"] + # Try "mpc_active" first, then "MPC_FLAG_ACTIVE" + for var_name in ["MPC_FLAG_ACTIVE"]: + if var_name in agent_logger.columns: + mpc_flag = agent_logger[var_name] + ax[2].step( + agent_logger.index, + mpc_flag, + "r-", + linewidth=2, + where="post", + label="MPC Active", + ) + + # Shade regions where MPC is inactive + for i in range(len(mpc_flag) - 1): + if not mpc_flag.iloc[i]: + start_time = mpc_flag.index[i] + # Find when it becomes active again + next_active = np.where(mpc_flag.iloc[i + 1 :].values)[0] + if len(next_active) > 0: + end_time = mpc_flag.index[i + 1 + next_active[0]] + else: + end_time = mpc_flag.index[-1] + + # Shade the inactive regions in all plots + for a in ax: + a.axvspan( + start_time, + end_time, + alpha=0.2, + color="lightgray", + label="_nolegend_", + ) + + mpc_flag_found = True + print(f"Found MPC active flag as '{var_name}' in AgentLogger") + break + + if not mpc_flag_found: + print("Warning: MPC active flag not found in AgentLogger.") + ax[2].text( + 0.5, + 0.5, + "MPC active flag not found", + horizontalalignment="center", + verticalalignment="center", + transform=ax[2].transAxes, + fontsize=12, + ) + + ax[2].set_ylim([-0.1, 1.1]) + ax[2].set_yticks([0, 1]) + ax[2].set_yticklabels(["Inactive", "Active"]) + ax[2].set_ylabel("MPC Status", fontsize=12) + ax[2].set_xlabel("Simulation Time (s)", fontsize=12) + if mpc_flag_found: + ax[2].legend(fontsize=10) + ax[2].grid(True, alpha=0.3) + + # Set x limits for all subplots + for a in ax: + a.set_xlim([0, until]) + + plt.tight_layout() + plt.show() + + +if __name__ == "__main__": + run_example( + with_plots=True, with_dashboard=True, until=7200, log_level=logging.INFO + )